Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_presubmit / py / pw_presubmit / presubmit.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Tools for running presubmit checks in a Git repository.
15
16 Presubmit checks are defined as a function or other callable. The function may
17 take either no arguments or a list of the paths on which to run. Presubmit
18 checks communicate failure by raising any exception.
19
20 For example, either of these functions may be used as presubmit checks:
21
22   @pw_presubmit.filter_paths(endswith='.py')
23   def file_contains_ni(ctx: PresubmitContext):
24       for path in ctx.paths:
25           with open(path) as file:
26               contents = file.read()
27               if 'ni' not in contents and 'nee' not in contents:
28                   raise PresumitFailure('Files must say "ni"!', path=path)
29
30   def run_the_build():
31       subprocess.run(['make', 'release'], check=True)
32
33 Presubmit checks that accept a list of paths may use the filter_paths decorator
34 to automatically filter the paths list for file types they care about. See the
35 pragma_once function for an example.
36
37 See pigweed_presbumit.py for an example of how to define presubmit checks.
38 """
39
40 import collections
41 import contextlib
42 import dataclasses
43 import enum
44 from inspect import Parameter, signature
45 import itertools
46 import logging
47 import os
48 from pathlib import Path
49 import re
50 import subprocess
51 import time
52 from typing import (Callable, Collection, Dict, Iterable, Iterator, List,
53                     NamedTuple, Optional, Pattern, Sequence, Set, Tuple, Union)
54
55 from pw_presubmit import git_repo, tools
56 from pw_presubmit.tools import plural
57
58 _LOG: logging.Logger = logging.getLogger(__name__)
59
60 color_red = tools.make_color(31)
61 color_bold_red = tools.make_color(31, 1)
62 color_black_on_red = tools.make_color(30, 41)
63 color_yellow = tools.make_color(33, 1)
64 color_green = tools.make_color(32)
65 color_black_on_green = tools.make_color(30, 42)
66 color_aqua = tools.make_color(36)
67 color_bold_white = tools.make_color(37, 1)
68
69 _SUMMARY_BOX = '══╦╗ ║║══╩╝'
70 _CHECK_UPPER = '━━━┓       '
71 _CHECK_LOWER = '       ━━━┛'
72
73 WIDTH = 80
74
75 _LEFT = 7
76 _RIGHT = 11
77
78
79 def _title(msg, style=_SUMMARY_BOX) -> str:
80     msg = f' {msg} '.center(WIDTH - 2)
81     return tools.make_box('^').format(*style, section1=msg, width1=len(msg))
82
83
84 def _format_time(time_s: float) -> str:
85     minutes, seconds = divmod(time_s, 60)
86     return f' {int(minutes)}:{seconds:04.1f}'
87
88
89 def _box(style, left, middle, right, box=tools.make_box('><>')) -> str:
90     return box.format(*style,
91                       section1=left + ('' if left.endswith(' ') else ' '),
92                       width1=_LEFT,
93                       section2=' ' + middle,
94                       width2=WIDTH - _LEFT - _RIGHT - 4,
95                       section3=right + ' ',
96                       width3=_RIGHT)
97
98
99 class PresubmitFailure(Exception):
100     """Optional exception to use for presubmit failures."""
101     def __init__(self, description: str = '', path=None):
102         super().__init__(f'{path}: {description}' if path else description)
103
104
105 class _Result(enum.Enum):
106
107     PASS = 'PASSED'  # Check completed successfully.
108     FAIL = 'FAILED'  # Check failed.
109     CANCEL = 'CANCEL'  # Check didn't complete.
110
111     def colorized(self, width: int, invert: bool = False) -> str:
112         if self is _Result.PASS:
113             color = color_black_on_green if invert else color_green
114         elif self is _Result.FAIL:
115             color = color_black_on_red if invert else color_red
116         elif self is _Result.CANCEL:
117             color = color_yellow
118         else:
119             color = lambda value: value
120
121         padding = (width - len(self.value)) // 2 * ' '
122         return padding + color(self.value) + padding
123
124
125 class Program(collections.abc.Sequence):
126     """A sequence of presubmit checks; basically a tuple with a name."""
127     def __init__(self, name: str, steps: Iterable[Callable]):
128         self.name = name
129         self._steps = tuple(tools.flatten(steps))
130
131     def __getitem__(self, i):
132         return self._steps[i]
133
134     def __len__(self):
135         return len(self._steps)
136
137     def __str__(self):
138         return self.name
139
140     def title(self):
141         return f'{self.name if self.name else ""} presubmit checks'.strip()
142
143
144 class Programs(collections.abc.Mapping):
145     """A mapping of presubmit check programs.
146
147     Use is optional. Helpful when managing multiple presubmit check programs.
148     """
149     def __init__(self, **programs: Sequence):
150         """Initializes a name: program mapping from the provided keyword args.
151
152         A program is a sequence of presubmit check functions. The sequence may
153         contain nested sequences, which are flattened.
154         """
155         self._programs: Dict[str, Program] = {
156             name: Program(name, checks)
157             for name, checks in programs.items()
158         }
159
160     def all_steps(self) -> Dict[str, Callable]:
161         return {c.__name__: c for c in itertools.chain(*self.values())}
162
163     def __getitem__(self, item: str) -> Program:
164         return self._programs[item]
165
166     def __iter__(self) -> Iterator[str]:
167         return iter(self._programs)
168
169     def __len__(self) -> int:
170         return len(self._programs)
171
172
173 @dataclasses.dataclass(frozen=True)
174 class PresubmitContext:
175     """Context passed into presubmit checks."""
176     root: Path
177     repos: Tuple[Path, ...]
178     output_dir: Path
179     paths: Tuple[Path, ...]
180     package_root: Path
181
182     def relative_paths(self, start: Optional[Path] = None) -> Tuple[Path, ...]:
183         return tuple(
184             tools.relative_paths(self.paths, start if start else self.root))
185
186     def paths_by_repo(self) -> Dict[Path, List[Path]]:
187         repos = collections.defaultdict(list)
188
189         for path in self.paths:
190             repos[git_repo.root(path)].append(path)
191
192         return repos
193
194
195 class _Filter(NamedTuple):
196     endswith: Tuple[str, ...] = ('', )
197     exclude: Tuple[Pattern[str], ...] = ()
198
199     def matches(self, path: str) -> bool:
200         return (any(path.endswith(end) for end in self.endswith)
201                 and not any(exp.search(path) for exp in self.exclude))
202
203
204 class Presubmit:
205     """Runs a series of presubmit checks on a list of files."""
206     def __init__(self, root: Path, repos: Sequence[Path],
207                  output_directory: Path, paths: Sequence[Path],
208                  package_root: Path):
209         self._root = root.resolve()
210         self._repos = tuple(repos)
211         self._output_directory = output_directory.resolve()
212         self._paths = tuple(paths)
213         self._relative_paths = tuple(
214             tools.relative_paths(self._paths, self._root))
215         self._package_root = package_root.resolve()
216
217     def run(self, program: Program, keep_going: bool = False) -> bool:
218         """Executes a series of presubmit checks on the paths."""
219
220         checks = self._apply_filters(program)
221
222         _LOG.debug('Running %s for %s', program.title(), self._root.name)
223         print(_title(f'{self._root.name}: {program.title()}'))
224
225         _LOG.info('%d of %d checks apply to %s in %s', len(checks),
226                   len(program), plural(self._paths, 'file'), self._root)
227
228         print()
229         for line in tools.file_summary(self._relative_paths):
230             print(line)
231         print()
232
233         if not self._paths:
234             print(color_yellow('No files are being checked!'))
235
236         _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
237
238         start_time: float = time.time()
239         passed, failed, skipped = self._execute_checks(checks, keep_going)
240         self._log_summary(time.time() - start_time, passed, failed, skipped)
241
242         return not failed and not skipped
243
244     def _apply_filters(
245             self, program: Sequence[Callable]
246     ) -> List[Tuple['_Check', Sequence[Path]]]:
247         """Returns list of (check, paths) for checks that should run."""
248         checks = [c if isinstance(c, _Check) else _Check(c) for c in program]
249         filter_to_checks: Dict[_Filter,
250                                List[_Check]] = collections.defaultdict(list)
251
252         for check in checks:
253             filter_to_checks[check.filter].append(check)
254
255         check_to_paths = self._map_checks_to_paths(filter_to_checks)
256         return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
257
258     def _map_checks_to_paths(
259         self, filter_to_checks: Dict[_Filter, List['_Check']]
260     ) -> Dict['_Check', Sequence[Path]]:
261         checks_to_paths: Dict[_Check, Sequence[Path]] = {}
262
263         posix_paths = tuple(p.as_posix() for p in self._relative_paths)
264
265         for filt, checks in filter_to_checks.items():
266             filtered_paths = tuple(
267                 path for path, filter_path in zip(self._paths, posix_paths)
268                 if filt.matches(filter_path))
269
270             for check in checks:
271                 if filtered_paths or check.always_run:
272                     checks_to_paths[check] = filtered_paths
273                 else:
274                     _LOG.debug('Skipping "%s": no relevant files', check.name)
275
276         return checks_to_paths
277
278     def _log_summary(self, time_s: float, passed: int, failed: int,
279                      skipped: int) -> None:
280         summary_items = []
281         if passed:
282             summary_items.append(f'{passed} passed')
283         if failed:
284             summary_items.append(f'{failed} failed')
285         if skipped:
286             summary_items.append(f'{skipped} not run')
287         summary = ', '.join(summary_items) or 'nothing was done'
288
289         result = _Result.FAIL if failed or skipped else _Result.PASS
290         total = passed + failed + skipped
291
292         _LOG.debug('Finished running %d checks on %s in %.1f s', total,
293                    plural(self._paths, 'file'), time_s)
294         _LOG.debug('Presubmit checks %s: %s', result.value, summary)
295
296         print(
297             _box(
298                 _SUMMARY_BOX, result.colorized(_LEFT, invert=True),
299                 f'{total} checks on {plural(self._paths, "file")}: {summary}',
300                 _format_time(time_s)))
301
302     @contextlib.contextmanager
303     def _context(self, name: str, paths: Tuple[Path, ...]):
304         # There are many characters banned from filenames on Windows. To
305         # simplify things, just strip everything that's not a letter, digit,
306         # or underscore.
307         sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
308         output_directory = self._output_directory.joinpath(sanitized_name)
309         os.makedirs(output_directory, exist_ok=True)
310
311         handler = logging.FileHandler(output_directory.joinpath('step.log'),
312                                       mode='w')
313         handler.setLevel(logging.DEBUG)
314
315         try:
316             _LOG.addHandler(handler)
317
318             yield PresubmitContext(
319                 root=self._root,
320                 repos=self._repos,
321                 output_dir=output_directory,
322                 paths=paths,
323                 package_root=self._package_root,
324             )
325
326         finally:
327             _LOG.removeHandler(handler)
328
329     def _execute_checks(self, program,
330                         keep_going: bool) -> Tuple[int, int, int]:
331         """Runs presubmit checks; returns (passed, failed, skipped) lists."""
332         passed = failed = 0
333
334         for i, (check, paths) in enumerate(program, 1):
335             with self._context(check.name, paths) as ctx:
336                 result = check.run(ctx, i, len(program))
337
338             if result is _Result.PASS:
339                 passed += 1
340             elif result is _Result.CANCEL:
341                 break
342             else:
343                 failed += 1
344                 if not keep_going:
345                     break
346
347         return passed, failed, len(program) - passed - failed
348
349
350 def _process_pathspecs(repos: Iterable[Path],
351                        pathspecs: Iterable[str]) -> Dict[Path, List[str]]:
352     pathspecs_by_repo: Dict[Path, List[str]] = {repo: [] for repo in repos}
353     repos_with_paths: Set[Path] = set()
354
355     for pathspec in pathspecs:
356         # If the pathspec is a path to an existing file, only use it for the
357         # repo it is in.
358         if os.path.exists(pathspec):
359             # Raise an exception if the path exists but is not in a known repo.
360             repo = git_repo.within_repo(pathspec)
361             if repo not in pathspecs_by_repo:
362                 raise ValueError(
363                     f'{pathspec} is not in a Git repository in this presubmit')
364
365             # Make the path relative to the repo's root.
366             pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo))
367             repos_with_paths.add(repo)
368         else:
369             # Pathspecs that are not paths (e.g. '*.h') are used for all repos.
370             for patterns in pathspecs_by_repo.values():
371                 patterns.append(pathspec)
372
373     # If any paths were specified, only search for paths in those repos.
374     if repos_with_paths:
375         for repo in set(pathspecs_by_repo) - repos_with_paths:
376             del pathspecs_by_repo[repo]
377
378     return pathspecs_by_repo
379
380
381 def run(program: Sequence[Callable],
382         root: Path,
383         repos: Collection[Path] = (),
384         base: Optional[str] = None,
385         paths: Sequence[str] = (),
386         exclude: Sequence[Pattern] = (),
387         output_directory: Optional[Path] = None,
388         package_root: Path = None,
389         keep_going: bool = False) -> bool:
390     """Lists files in the current Git repo and runs a Presubmit with them.
391
392     This changes the directory to the root of the Git repository after listing
393     paths, so all presubmit checks can assume they run from there.
394
395     The paths argument contains Git pathspecs. If no pathspecs are provided, all
396     paths in all repos are included. If paths to files or directories are
397     provided, only files within those repositories are searched. Patterns are
398     searched across all repositories. For example, if the pathspecs "my_module/"
399     and "*.h", paths under "my_module/" in the containing repo and paths in all
400     repos matching "*.h" will be included in the presubmit.
401
402     Args:
403         program: list of presubmit check functions to run
404         root: root path of the project
405         repos: paths to the roots of Git repositories to check
406         name: name to use to refer to this presubmit check run
407         base: optional base Git commit to list files against
408         paths: optional list of Git pathspecs to run the checks against
409         exclude: regular expressions for Posix-style paths to exclude
410         output_directory: where to place output files
411         package_root: where to place package files
412         keep_going: whether to continue running checks if an error occurs
413
414     Returns:
415         True if all presubmit checks succeeded
416     """
417     repos = [repo.resolve() for repo in repos]
418
419     for repo in repos:
420         if git_repo.root(repo) != repo:
421             raise ValueError(f'{repo} is not the root of a Git repo; '
422                              'presubmit checks must be run from a Git repo')
423
424     pathspecs_by_repo = _process_pathspecs(repos, paths)
425
426     files: List[Path] = []
427
428     for repo, pathspecs in pathspecs_by_repo.items():
429         files += tools.exclude_paths(
430             exclude, git_repo.list_files(base, pathspecs, repo), root)
431
432         _LOG.info(
433             'Checking %s',
434             git_repo.describe_files(repo, repo, base, pathspecs, exclude))
435
436     if output_directory is None:
437         output_directory = root / '.presubmit'
438
439     if package_root is None:
440         package_root = output_directory / 'packages'
441
442     presubmit = Presubmit(
443         root=root,
444         repos=repos,
445         output_directory=output_directory,
446         paths=files,
447         package_root=package_root,
448     )
449
450     if not isinstance(program, Program):
451         program = Program('', program)
452
453     return presubmit.run(program, keep_going)
454
455
456 class _Check:
457     """Wraps a presubmit check function.
458
459     This class consolidates the logic for running and logging a presubmit check.
460     It also supports filtering the paths passed to the presubmit check.
461     """
462     def __init__(self,
463                  check_function: Callable,
464                  path_filter: _Filter = _Filter(),
465                  always_run: bool = True):
466         _ensure_is_valid_presubmit_check_function(check_function)
467
468         self._check: Callable = check_function
469         self.filter: _Filter = path_filter
470         self.always_run: bool = always_run
471
472         # Since _Check wraps a presubmit function, adopt that function's name.
473         self.__name__ = self._check.__name__
474
475     @property
476     def name(self):
477         return self.__name__
478
479     def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
480         """Runs the presubmit check on the provided paths."""
481
482         print(
483             _box(_CHECK_UPPER, f'{count}/{total}', self.name,
484                  plural(ctx.paths, "file")))
485
486         _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
487                    plural(ctx.paths, "file"))
488
489         start_time_s = time.time()
490         result = self._call_function(ctx)
491         time_str = _format_time(time.time() - start_time_s)
492         _LOG.debug('%s %s', self.name, result.value)
493
494         print(_box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
495         _LOG.debug('%s duration:%s', self.name, time_str)
496
497         return result
498
499     def _call_function(self, ctx: PresubmitContext) -> _Result:
500         try:
501             self._check(ctx)
502         except PresubmitFailure as failure:
503             if str(failure):
504                 _LOG.warning('%s', failure)
505             return _Result.FAIL
506         except Exception as failure:  # pylint: disable=broad-except
507             _LOG.exception('Presubmit check %s failed!', self.name)
508             return _Result.FAIL
509         except KeyboardInterrupt:
510             print()
511             return _Result.CANCEL
512
513         return _Result.PASS
514
515     def __call__(self, ctx: PresubmitContext, *args, **kwargs):
516         """Calling a _Check calls its underlying function directly.
517
518       This makes it possible to call functions wrapped by @filter_paths. The
519       prior filters are ignored, so new filters may be applied.
520       """
521         return self._check(ctx, *args, **kwargs)
522
523
524 def _required_args(function: Callable) -> Iterable[Parameter]:
525     """Returns the required arguments for a function."""
526     optional_types = Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD
527
528     for param in signature(function).parameters.values():
529         if param.default is param.empty and param.kind not in optional_types:
530             yield param
531
532
533 def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
534     """Checks if a Callable can be used as a presubmit check."""
535     try:
536         required_args = tuple(_required_args(check))
537     except (TypeError, ValueError):
538         raise TypeError('Presubmit checks must be callable, but '
539                         f'{check!r} is a {type(check).__name__}')
540
541     if len(required_args) != 1:
542         raise TypeError(
543             f'Presubmit check functions must have exactly one required '
544             f'positional argument (the PresubmitContext), but '
545             f'{check.__name__} has {len(required_args)} required arguments' +
546             (f' ({", ".join(a.name for a in required_args)})'
547              if required_args else ''))
548
549
550 def _make_str_tuple(value: Iterable[str]) -> Tuple[str, ...]:
551     return tuple([value] if isinstance(value, str) else value)
552
553
554 def filter_paths(endswith: Iterable[str] = (''),
555                  exclude: Iterable[Union[Pattern[str], str]] = (),
556                  always_run: bool = False) -> Callable[[Callable], _Check]:
557     """Decorator for filtering the paths list for a presubmit check function.
558
559     Path filters only apply when the function is used as a presubmit check.
560     Filters are ignored when the functions are called directly. This makes it
561     possible to reuse functions wrapped in @filter_paths in other presubmit
562     checks, potentially with different path filtering rules.
563
564     Args:
565         endswith: str or iterable of path endings to include
566         exclude: regular expressions of paths to exclude
567
568     Returns:
569         a wrapped version of the presubmit function
570     """
571     def filter_paths_for_function(function: Callable):
572         return _Check(function,
573                       _Filter(_make_str_tuple(endswith),
574                               tuple(re.compile(e) for e in exclude)),
575                       always_run=always_run)
576
577     return filter_paths_for_function
578
579
580 @filter_paths(endswith='.h', exclude=(r'\.pb\.h$', ))
581 def pragma_once(ctx: PresubmitContext) -> None:
582     """Presubmit check that ensures all header files contain '#pragma once'."""
583
584     for path in ctx.paths:
585         with open(path) as file:
586             for line in file:
587                 if line.startswith('#pragma once'):
588                     break
589             else:
590                 raise PresubmitFailure('#pragma once is missing!', path=path)
591
592
593 def call(*args, **kwargs) -> None:
594     """Optional subprocess wrapper that causes a PresubmitFailure on errors."""
595     attributes, command = tools.format_command(args, kwargs)
596     _LOG.debug('[RUN] %s\n%s', attributes, command)
597
598     process = subprocess.run(args,
599                              stdout=subprocess.PIPE,
600                              stderr=subprocess.STDOUT,
601                              **kwargs)
602     logfunc = _LOG.warning if process.returncode else _LOG.debug
603
604     logfunc('[FINISHED]\n%s', command)
605     logfunc('[RESULT] %s with return code %d',
606             'Failed' if process.returncode else 'Passed', process.returncode)
607
608     output = process.stdout.decode(errors='backslashreplace')
609     if output:
610         logfunc('[OUTPUT]\n%s', output)
611
612     if process.returncode:
613         raise PresubmitFailure