1 # Copyright 2020 The Pigweed Authors
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
7 # https://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
14 """Tools for running presubmit checks in a Git repository.
16 Presubmit checks are defined as a function or other callable. The function may
17 take either no arguments or a list of the paths on which to run. Presubmit
18 checks communicate failure by raising any exception.
20 For example, either of these functions may be used as presubmit checks:
22 @pw_presubmit.filter_paths(endswith='.py')
23 def file_contains_ni(ctx: PresubmitContext):
24 for path in ctx.paths:
25 with open(path) as file:
26 contents = file.read()
27 if 'ni' not in contents and 'nee' not in contents:
28 raise PresumitFailure('Files must say "ni"!', path=path)
31 subprocess.run(['make', 'release'], check=True)
33 Presubmit checks that accept a list of paths may use the filter_paths decorator
34 to automatically filter the paths list for file types they care about. See the
35 pragma_once function for an example.
37 See pigweed_presbumit.py for an example of how to define presubmit checks.
44 from inspect import Parameter, signature
48 from pathlib import Path
52 from typing import (Callable, Collection, Dict, Iterable, Iterator, List,
53 NamedTuple, Optional, Pattern, Sequence, Set, Tuple, Union)
55 from pw_presubmit import git_repo, tools
56 from pw_presubmit.tools import plural
58 _LOG: logging.Logger = logging.getLogger(__name__)
60 color_red = tools.make_color(31)
61 color_bold_red = tools.make_color(31, 1)
62 color_black_on_red = tools.make_color(30, 41)
63 color_yellow = tools.make_color(33, 1)
64 color_green = tools.make_color(32)
65 color_black_on_green = tools.make_color(30, 42)
66 color_aqua = tools.make_color(36)
67 color_bold_white = tools.make_color(37, 1)
69 _SUMMARY_BOX = '══╦╗ ║║══╩╝'
70 _CHECK_UPPER = '━━━┓ '
71 _CHECK_LOWER = ' ━━━┛'
79 def _title(msg, style=_SUMMARY_BOX) -> str:
80 msg = f' {msg} '.center(WIDTH - 2)
81 return tools.make_box('^').format(*style, section1=msg, width1=len(msg))
84 def _format_time(time_s: float) -> str:
85 minutes, seconds = divmod(time_s, 60)
86 return f' {int(minutes)}:{seconds:04.1f}'
89 def _box(style, left, middle, right, box=tools.make_box('><>')) -> str:
90 return box.format(*style,
91 section1=left + ('' if left.endswith(' ') else ' '),
93 section2=' ' + middle,
94 width2=WIDTH - _LEFT - _RIGHT - 4,
99 class PresubmitFailure(Exception):
100 """Optional exception to use for presubmit failures."""
101 def __init__(self, description: str = '', path=None):
102 super().__init__(f'{path}: {description}' if path else description)
105 class _Result(enum.Enum):
107 PASS = 'PASSED' # Check completed successfully.
108 FAIL = 'FAILED' # Check failed.
109 CANCEL = 'CANCEL' # Check didn't complete.
111 def colorized(self, width: int, invert: bool = False) -> str:
112 if self is _Result.PASS:
113 color = color_black_on_green if invert else color_green
114 elif self is _Result.FAIL:
115 color = color_black_on_red if invert else color_red
116 elif self is _Result.CANCEL:
119 color = lambda value: value
121 padding = (width - len(self.value)) // 2 * ' '
122 return padding + color(self.value) + padding
125 class Program(collections.abc.Sequence):
126 """A sequence of presubmit checks; basically a tuple with a name."""
127 def __init__(self, name: str, steps: Iterable[Callable]):
129 self._steps = tuple(tools.flatten(steps))
131 def __getitem__(self, i):
132 return self._steps[i]
135 return len(self._steps)
141 return f'{self.name if self.name else ""} presubmit checks'.strip()
144 class Programs(collections.abc.Mapping):
145 """A mapping of presubmit check programs.
147 Use is optional. Helpful when managing multiple presubmit check programs.
149 def __init__(self, **programs: Sequence):
150 """Initializes a name: program mapping from the provided keyword args.
152 A program is a sequence of presubmit check functions. The sequence may
153 contain nested sequences, which are flattened.
155 self._programs: Dict[str, Program] = {
156 name: Program(name, checks)
157 for name, checks in programs.items()
160 def all_steps(self) -> Dict[str, Callable]:
161 return {c.__name__: c for c in itertools.chain(*self.values())}
163 def __getitem__(self, item: str) -> Program:
164 return self._programs[item]
166 def __iter__(self) -> Iterator[str]:
167 return iter(self._programs)
169 def __len__(self) -> int:
170 return len(self._programs)
173 @dataclasses.dataclass(frozen=True)
174 class PresubmitContext:
175 """Context passed into presubmit checks."""
177 repos: Tuple[Path, ...]
179 paths: Tuple[Path, ...]
182 def relative_paths(self, start: Optional[Path] = None) -> Tuple[Path, ...]:
184 tools.relative_paths(self.paths, start if start else self.root))
186 def paths_by_repo(self) -> Dict[Path, List[Path]]:
187 repos = collections.defaultdict(list)
189 for path in self.paths:
190 repos[git_repo.root(path)].append(path)
195 class _Filter(NamedTuple):
196 endswith: Tuple[str, ...] = ('', )
197 exclude: Tuple[Pattern[str], ...] = ()
199 def matches(self, path: str) -> bool:
200 return (any(path.endswith(end) for end in self.endswith)
201 and not any(exp.search(path) for exp in self.exclude))
205 """Runs a series of presubmit checks on a list of files."""
206 def __init__(self, root: Path, repos: Sequence[Path],
207 output_directory: Path, paths: Sequence[Path],
209 self._root = root.resolve()
210 self._repos = tuple(repos)
211 self._output_directory = output_directory.resolve()
212 self._paths = tuple(paths)
213 self._relative_paths = tuple(
214 tools.relative_paths(self._paths, self._root))
215 self._package_root = package_root.resolve()
217 def run(self, program: Program, keep_going: bool = False) -> bool:
218 """Executes a series of presubmit checks on the paths."""
220 checks = self._apply_filters(program)
222 _LOG.debug('Running %s for %s', program.title(), self._root.name)
223 print(_title(f'{self._root.name}: {program.title()}'))
225 _LOG.info('%d of %d checks apply to %s in %s', len(checks),
226 len(program), plural(self._paths, 'file'), self._root)
229 for line in tools.file_summary(self._relative_paths):
234 print(color_yellow('No files are being checked!'))
236 _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
238 start_time: float = time.time()
239 passed, failed, skipped = self._execute_checks(checks, keep_going)
240 self._log_summary(time.time() - start_time, passed, failed, skipped)
242 return not failed and not skipped
245 self, program: Sequence[Callable]
246 ) -> List[Tuple['_Check', Sequence[Path]]]:
247 """Returns list of (check, paths) for checks that should run."""
248 checks = [c if isinstance(c, _Check) else _Check(c) for c in program]
249 filter_to_checks: Dict[_Filter,
250 List[_Check]] = collections.defaultdict(list)
253 filter_to_checks[check.filter].append(check)
255 check_to_paths = self._map_checks_to_paths(filter_to_checks)
256 return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
258 def _map_checks_to_paths(
259 self, filter_to_checks: Dict[_Filter, List['_Check']]
260 ) -> Dict['_Check', Sequence[Path]]:
261 checks_to_paths: Dict[_Check, Sequence[Path]] = {}
263 posix_paths = tuple(p.as_posix() for p in self._relative_paths)
265 for filt, checks in filter_to_checks.items():
266 filtered_paths = tuple(
267 path for path, filter_path in zip(self._paths, posix_paths)
268 if filt.matches(filter_path))
271 if filtered_paths or check.always_run:
272 checks_to_paths[check] = filtered_paths
274 _LOG.debug('Skipping "%s": no relevant files', check.name)
276 return checks_to_paths
278 def _log_summary(self, time_s: float, passed: int, failed: int,
279 skipped: int) -> None:
282 summary_items.append(f'{passed} passed')
284 summary_items.append(f'{failed} failed')
286 summary_items.append(f'{skipped} not run')
287 summary = ', '.join(summary_items) or 'nothing was done'
289 result = _Result.FAIL if failed or skipped else _Result.PASS
290 total = passed + failed + skipped
292 _LOG.debug('Finished running %d checks on %s in %.1f s', total,
293 plural(self._paths, 'file'), time_s)
294 _LOG.debug('Presubmit checks %s: %s', result.value, summary)
298 _SUMMARY_BOX, result.colorized(_LEFT, invert=True),
299 f'{total} checks on {plural(self._paths, "file")}: {summary}',
300 _format_time(time_s)))
302 @contextlib.contextmanager
303 def _context(self, name: str, paths: Tuple[Path, ...]):
304 # There are many characters banned from filenames on Windows. To
305 # simplify things, just strip everything that's not a letter, digit,
307 sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
308 output_directory = self._output_directory.joinpath(sanitized_name)
309 os.makedirs(output_directory, exist_ok=True)
311 handler = logging.FileHandler(output_directory.joinpath('step.log'),
313 handler.setLevel(logging.DEBUG)
316 _LOG.addHandler(handler)
318 yield PresubmitContext(
321 output_dir=output_directory,
323 package_root=self._package_root,
327 _LOG.removeHandler(handler)
329 def _execute_checks(self, program,
330 keep_going: bool) -> Tuple[int, int, int]:
331 """Runs presubmit checks; returns (passed, failed, skipped) lists."""
334 for i, (check, paths) in enumerate(program, 1):
335 with self._context(check.name, paths) as ctx:
336 result = check.run(ctx, i, len(program))
338 if result is _Result.PASS:
340 elif result is _Result.CANCEL:
347 return passed, failed, len(program) - passed - failed
350 def _process_pathspecs(repos: Iterable[Path],
351 pathspecs: Iterable[str]) -> Dict[Path, List[str]]:
352 pathspecs_by_repo: Dict[Path, List[str]] = {repo: [] for repo in repos}
353 repos_with_paths: Set[Path] = set()
355 for pathspec in pathspecs:
356 # If the pathspec is a path to an existing file, only use it for the
358 if os.path.exists(pathspec):
359 # Raise an exception if the path exists but is not in a known repo.
360 repo = git_repo.within_repo(pathspec)
361 if repo not in pathspecs_by_repo:
363 f'{pathspec} is not in a Git repository in this presubmit')
365 # Make the path relative to the repo's root.
366 pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo))
367 repos_with_paths.add(repo)
369 # Pathspecs that are not paths (e.g. '*.h') are used for all repos.
370 for patterns in pathspecs_by_repo.values():
371 patterns.append(pathspec)
373 # If any paths were specified, only search for paths in those repos.
375 for repo in set(pathspecs_by_repo) - repos_with_paths:
376 del pathspecs_by_repo[repo]
378 return pathspecs_by_repo
381 def run(program: Sequence[Callable],
383 repos: Collection[Path] = (),
384 base: Optional[str] = None,
385 paths: Sequence[str] = (),
386 exclude: Sequence[Pattern] = (),
387 output_directory: Optional[Path] = None,
388 package_root: Path = None,
389 keep_going: bool = False) -> bool:
390 """Lists files in the current Git repo and runs a Presubmit with them.
392 This changes the directory to the root of the Git repository after listing
393 paths, so all presubmit checks can assume they run from there.
395 The paths argument contains Git pathspecs. If no pathspecs are provided, all
396 paths in all repos are included. If paths to files or directories are
397 provided, only files within those repositories are searched. Patterns are
398 searched across all repositories. For example, if the pathspecs "my_module/"
399 and "*.h", paths under "my_module/" in the containing repo and paths in all
400 repos matching "*.h" will be included in the presubmit.
403 program: list of presubmit check functions to run
404 root: root path of the project
405 repos: paths to the roots of Git repositories to check
406 name: name to use to refer to this presubmit check run
407 base: optional base Git commit to list files against
408 paths: optional list of Git pathspecs to run the checks against
409 exclude: regular expressions for Posix-style paths to exclude
410 output_directory: where to place output files
411 package_root: where to place package files
412 keep_going: whether to continue running checks if an error occurs
415 True if all presubmit checks succeeded
417 repos = [repo.resolve() for repo in repos]
420 if git_repo.root(repo) != repo:
421 raise ValueError(f'{repo} is not the root of a Git repo; '
422 'presubmit checks must be run from a Git repo')
424 pathspecs_by_repo = _process_pathspecs(repos, paths)
426 files: List[Path] = []
428 for repo, pathspecs in pathspecs_by_repo.items():
429 files += tools.exclude_paths(
430 exclude, git_repo.list_files(base, pathspecs, repo), root)
434 git_repo.describe_files(repo, repo, base, pathspecs, exclude))
436 if output_directory is None:
437 output_directory = root / '.presubmit'
439 if package_root is None:
440 package_root = output_directory / 'packages'
442 presubmit = Presubmit(
445 output_directory=output_directory,
447 package_root=package_root,
450 if not isinstance(program, Program):
451 program = Program('', program)
453 return presubmit.run(program, keep_going)
457 """Wraps a presubmit check function.
459 This class consolidates the logic for running and logging a presubmit check.
460 It also supports filtering the paths passed to the presubmit check.
463 check_function: Callable,
464 path_filter: _Filter = _Filter(),
465 always_run: bool = True):
466 _ensure_is_valid_presubmit_check_function(check_function)
468 self._check: Callable = check_function
469 self.filter: _Filter = path_filter
470 self.always_run: bool = always_run
472 # Since _Check wraps a presubmit function, adopt that function's name.
473 self.__name__ = self._check.__name__
479 def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
480 """Runs the presubmit check on the provided paths."""
483 _box(_CHECK_UPPER, f'{count}/{total}', self.name,
484 plural(ctx.paths, "file")))
486 _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
487 plural(ctx.paths, "file"))
489 start_time_s = time.time()
490 result = self._call_function(ctx)
491 time_str = _format_time(time.time() - start_time_s)
492 _LOG.debug('%s %s', self.name, result.value)
494 print(_box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
495 _LOG.debug('%s duration:%s', self.name, time_str)
499 def _call_function(self, ctx: PresubmitContext) -> _Result:
502 except PresubmitFailure as failure:
504 _LOG.warning('%s', failure)
506 except Exception as failure: # pylint: disable=broad-except
507 _LOG.exception('Presubmit check %s failed!', self.name)
509 except KeyboardInterrupt:
511 return _Result.CANCEL
515 def __call__(self, ctx: PresubmitContext, *args, **kwargs):
516 """Calling a _Check calls its underlying function directly.
518 This makes it possible to call functions wrapped by @filter_paths. The
519 prior filters are ignored, so new filters may be applied.
521 return self._check(ctx, *args, **kwargs)
524 def _required_args(function: Callable) -> Iterable[Parameter]:
525 """Returns the required arguments for a function."""
526 optional_types = Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD
528 for param in signature(function).parameters.values():
529 if param.default is param.empty and param.kind not in optional_types:
533 def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
534 """Checks if a Callable can be used as a presubmit check."""
536 required_args = tuple(_required_args(check))
537 except (TypeError, ValueError):
538 raise TypeError('Presubmit checks must be callable, but '
539 f'{check!r} is a {type(check).__name__}')
541 if len(required_args) != 1:
543 f'Presubmit check functions must have exactly one required '
544 f'positional argument (the PresubmitContext), but '
545 f'{check.__name__} has {len(required_args)} required arguments' +
546 (f' ({", ".join(a.name for a in required_args)})'
547 if required_args else ''))
550 def _make_str_tuple(value: Iterable[str]) -> Tuple[str, ...]:
551 return tuple([value] if isinstance(value, str) else value)
554 def filter_paths(endswith: Iterable[str] = (''),
555 exclude: Iterable[Union[Pattern[str], str]] = (),
556 always_run: bool = False) -> Callable[[Callable], _Check]:
557 """Decorator for filtering the paths list for a presubmit check function.
559 Path filters only apply when the function is used as a presubmit check.
560 Filters are ignored when the functions are called directly. This makes it
561 possible to reuse functions wrapped in @filter_paths in other presubmit
562 checks, potentially with different path filtering rules.
565 endswith: str or iterable of path endings to include
566 exclude: regular expressions of paths to exclude
569 a wrapped version of the presubmit function
571 def filter_paths_for_function(function: Callable):
572 return _Check(function,
573 _Filter(_make_str_tuple(endswith),
574 tuple(re.compile(e) for e in exclude)),
575 always_run=always_run)
577 return filter_paths_for_function
580 @filter_paths(endswith='.h', exclude=(r'\.pb\.h$', ))
581 def pragma_once(ctx: PresubmitContext) -> None:
582 """Presubmit check that ensures all header files contain '#pragma once'."""
584 for path in ctx.paths:
585 with open(path) as file:
587 if line.startswith('#pragma once'):
590 raise PresubmitFailure('#pragma once is missing!', path=path)
593 def call(*args, **kwargs) -> None:
594 """Optional subprocess wrapper that causes a PresubmitFailure on errors."""
595 attributes, command = tools.format_command(args, kwargs)
596 _LOG.debug('[RUN] %s\n%s', attributes, command)
598 process = subprocess.run(args,
599 stdout=subprocess.PIPE,
600 stderr=subprocess.STDOUT,
602 logfunc = _LOG.warning if process.returncode else _LOG.debug
604 logfunc('[FINISHED]\n%s', command)
605 logfunc('[RESULT] %s with return code %d',
606 'Failed' if process.returncode else 'Passed', process.returncode)
608 output = process.stdout.decode(errors='backslashreplace')
610 logfunc('[OUTPUT]\n%s', output)
612 if process.returncode:
613 raise PresubmitFailure