2 # Copyright 2020 The Pigweed Authors
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
8 # https://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
15 """Watch files for changes and rebuild.
17 pw watch runs Ninja in a build directory when source files change. It works with
18 any Ninja project (GN or CMake).
22 # Find a build directory and build the default target
25 # Find a build directory and build the stm32f429i target
26 pw watch python.lint stm32f429i
28 # Build pw_run_tests.modules in the out/cmake directory
29 pw watch -C out/cmake pw_run_tests.modules
31 # Build the default target in out/ and pw_apps in out/cmake
32 pw watch -C out -C out/cmake pw_apps
34 # Find a directory and build python.tests, and build pw_apps in out/cmake
35 pw watch python.tests -C out/cmake pw_apps
39 from dataclasses import dataclass
42 from pathlib import Path
47 from typing import (Iterable, List, NamedTuple, NoReturn, Optional, Sequence,
50 from watchdog.events import FileSystemEventHandler # type: ignore
51 from watchdog.observers import Observer # type: ignore
53 import pw_cli.branding
58 from pw_watch.debounce import DebouncedFunction, Debouncer
60 _COLOR = pw_cli.color.colors()
61 _LOG = logging.getLogger(__name__)
62 _ERRNO_INOTIFY_LIMIT_REACHED = 28
65 ██████╗ █████╗ ███████╗███████╗██╗
66 ██╔══██╗██╔══██╗██╔════╝██╔════╝██║
67 ██████╔╝███████║███████╗███████╗██║
68 ██╔═══╝ ██╔══██║╚════██║╚════██║╚═╝
69 ██║ ██║ ██║███████║███████║██╗
70 ╚═╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚═╝
73 # Pick a visually-distinct font from "PASS" to ensure that readers can't
74 # possibly mistake the difference between the two states.
77 ▓█▓ ░▒████▄ ▓██▒ ░▓██▒
78 ▒████▒ ░▒█▀ ▀█▄ ▒██▒ ▒██░
79 ░▓█▒ ░░██▄▄▄▄██ ░██░ ▒██░
80 ░▒█░ ▓█ ▓██▒░██░░ ████████▒
88 # TODO(keir): Figure out a better strategy for exiting. The problem with the
89 # watcher is that doing a "clean exit" is slow. However, by directly exiting,
90 # we remove the possibility of the wrapper script doing anything on exit.
91 def _die(*args) -> NoReturn:
96 class WatchCharset(NamedTuple):
101 _ASCII_CHARSET = WatchCharset(_COLOR.green('OK '), _COLOR.red('FAIL'))
102 _EMOJI_CHARSET = WatchCharset('✔️ ', '💥')
105 @dataclass(frozen=True)
108 targets: Tuple[str, ...] = ()
110 def args(self) -> Tuple[str, ...]:
111 return (str(self.build_dir), *self.targets)
113 def __str__(self) -> str:
114 return ' '.join(shlex.quote(arg) for arg in self.args())
117 class PigweedBuildWatcher(FileSystemEventHandler, DebouncedFunction):
118 """Process filesystem events and launch builds if necessary."""
121 patterns: Sequence[str] = (),
122 ignore_patterns: Sequence[str] = (),
123 build_commands: Sequence[BuildCommand] = (),
124 ignore_dirs=Optional[List[str]],
125 charset: WatchCharset = _ASCII_CHARSET,
126 restart: bool = False,
130 self.patterns = patterns
131 self.ignore_patterns = ignore_patterns
132 self.build_commands = build_commands
133 self.ignore_dirs = ignore_dirs or []
134 self.ignore_dirs.extend(cmd.build_dir for cmd in self.build_commands)
135 self.charset: WatchCharset = charset
137 self.restart_on_changes = restart
138 self._current_build: Optional[subprocess.Popen] = None
140 self.debouncer = Debouncer(self)
142 # Track state of a build. These need to be members instead of locals
143 # due to the split between dispatch(), run(), and on_complete().
144 self.matching_path: Optional[str] = None
145 self.builds_succeeded: List[bool] = []
147 self.wait_for_keypress_thread = threading.Thread(
148 None, self._wait_for_enter)
149 self.wait_for_keypress_thread.start()
151 def _wait_for_enter(self):
155 self.debouncer.press('Manual build requested...')
156 # Ctrl-C on Unix generates KeyboardInterrupt
157 # Ctrl-Z on Windows generates EOFError
158 except (KeyboardInterrupt, EOFError):
159 _exit_due_to_interrupt()
161 def _path_matches(self, raw_path: str) -> bool:
162 """Returns true if path matches according to the watcher patterns"""
163 modified_path = Path(raw_path).resolve()
165 # Check for modifications inside the ignore directories, and skip them.
166 # Ideally these events would never hit the watcher, but selectively
167 # watching directories at the OS level is not trivial due to limitations
168 # of the watchdog module.
169 for ignore_dir in self.ignore_dirs:
170 resolved_ignore_dir = Path(ignore_dir).resolve()
172 modified_path.relative_to(resolved_ignore_dir)
173 # If no ValueError is raised by the .relative_to() call, then
174 # this file is inside the ignore directory; so skip it.
177 # Otherwise, the file isn't in the ignore directory, so run the
178 # normal pattern checks below.
181 return ((not any(modified_path.match(x) for x in self.ignore_patterns))
182 and any(modified_path.match(x) for x in self.patterns))
184 def dispatch(self, event) -> None:
185 # There isn't any point in triggering builds on new directory creation.
186 # It's the creation or modification of files that indicate something
187 # meaningful enough changed for a build.
188 if event.is_directory:
191 # Collect paths of interest from the event.
192 paths: List[str] = []
193 if hasattr(event, 'dest_path'):
194 paths.append(os.fsdecode(event.dest_path))
196 paths.append(os.fsdecode(event.src_path))
198 _LOG.debug('File event: %s', path)
200 # Check for matching paths among the one or two in the event.
202 if self._path_matches(path):
203 self._handle_matched_event(path)
206 def _handle_matched_event(self, matching_path: str) -> None:
207 if self.matching_path is None:
208 self.matching_path = matching_path
210 self.debouncer.press(
211 f'File change detected: {os.path.relpath(matching_path)}')
213 # Implementation of DebouncedFunction.run()
215 # Note: This will run on the timer thread created by the Debouncer, rather
216 # than on the main thread that's watching file events. This enables the
217 # watcher to continue receiving file change events during a build.
219 """Run all the builds in serial and capture pass/fail for each."""
221 # Clear the screen and show a banner indicating the build is starting.
222 print('\033c', end='') # TODO(pwbug/38): Not Windows compatible.
223 print(pw_cli.branding.banner())
226 ' Watching for changes. Ctrl-C to exit; enter to rebuild'))
228 _LOG.info('Change detected: %s', self.matching_path)
230 self.builds_succeeded = []
231 num_builds = len(self.build_commands)
232 _LOG.info('Starting build with %d directories', num_builds)
234 env = os.environ.copy()
235 # Force colors in Pigweed subcommands run through the watcher.
236 env['PW_USE_COLOR'] = '1'
238 for i, cmd in enumerate(self.build_commands, 1):
239 _LOG.info('[%d/%d] Starting build: %s', i, num_builds, cmd)
241 # Run the build. Put a blank before/after for visual separation.
243 self._current_build = subprocess.Popen(
244 ['ninja', '-C', *cmd.args()], env=env)
245 returncode = self._current_build.wait()
248 build_ok = (returncode == 0)
253 level = logging.ERROR
255 _LOG.log(level, '[%d/%d] Finished build: %s %s', i, num_builds,
257 self.builds_succeeded.append(build_ok)
259 # Implementation of DebouncedFunction.cancel()
261 if self.restart_on_changes:
262 self._current_build.kill()
267 # Implementation of DebouncedFunction.run()
268 def on_complete(self, cancelled=False):
269 # First, use the standard logging facilities to report build status.
271 _LOG.error('Finished; build was interrupted')
272 elif all(self.builds_succeeded):
273 _LOG.info('Finished; all successful')
275 _LOG.info('Finished; some builds failed')
277 # Then, show a more distinct colored banner.
279 # Write out build summary table so you can tell which builds passed
280 # and which builds failed.
282 print(' .------------------------------------')
284 for (succeeded, cmd) in zip(self.builds_succeeded,
285 self.build_commands):
286 slug = (self.charset.slug_ok
287 if succeeded else self.charset.slug_fail)
288 print(f' | {slug} {cmd}')
290 print(" '------------------------------------")
292 # Build was interrupted.
294 print(' .------------------------------------')
296 print(' | ', self.charset.slug_fail, '- interrupted')
298 print(" '------------------------------------")
300 # Show a large color banner so it is obvious what the overall result is.
301 if all(self.builds_succeeded) and not cancelled:
302 print(_COLOR.green(_PASS_MESSAGE))
304 print(_COLOR.red(_FAIL_MESSAGE))
306 self.matching_path = None
308 # Implementation of DebouncedFunction.on_keyboard_interrupt()
309 def on_keyboard_interrupt(self):
310 _exit_due_to_interrupt()
313 _WATCH_PATTERN_DELIMITER = ','
335 def add_parser_arguments(parser: argparse.ArgumentParser) -> None:
336 """Sets up an argument parser for pw watch."""
337 parser.add_argument('--patterns',
338 help=(_WATCH_PATTERN_DELIMITER +
339 '-delimited list of globs to '
340 'watch to trigger recompile'),
341 default=_WATCH_PATTERN_DELIMITER.join(_WATCH_PATTERNS))
342 parser.add_argument('--ignore_patterns',
343 dest='ignore_patterns_string',
344 help=(_WATCH_PATTERN_DELIMITER +
345 '-delimited list of globs to '
346 'ignore events from'))
348 parser.add_argument('--exclude_list',
351 help=('directories to ignore during pw watch'),
353 parser.add_argument('--restart',
355 help='restart an ongoing build if files change')
357 'default_build_targets',
361 help=('Automatically locate a build directory and build these '
362 'targets. For example, `host docs` searches for a Ninja '
363 'build directory (starting with out/) and builds the '
364 '`host` and `docs` targets. To specify one or more '
365 'directories, ust the -C / --build_directory option.'))
369 dest='build_directories',
373 metavar=('directory', 'target'),
374 help=('Specify a build directory and optionally targets to '
375 'build. `pw watch -C out tgt` is equivalent to `ninja '
380 # Note: The "proper" way to exit is via observer.stop(), then
381 # running a join. However it's slower, so just exit immediately.
383 # Additionally, since there are several threads in the watcher, the usual
384 # sys.exit approach doesn't work. Instead, run the low level exit which
386 os._exit(code) # pylint: disable=protected-access
389 def _exit_due_to_interrupt():
390 # To keep the log lines aligned with each other in the presence of
391 # a '^C' from the keyboard interrupt, add a newline before the log.
394 _LOG.info('Got Ctrl-C; exiting...')
398 def _exit_due_to_inotify_limit():
399 # Show information and suggested commands in OSError: inotify limit reached.
400 _LOG.error('Inotify limit reached: run this in your terminal if you '
401 'are in Linux to temporarily increase inotify limit. \n')
403 _COLOR.green(' sudo sysctl fs.inotify.max_user_watches='
405 print(' Change $NEW_LIMIT$ with an integer number, '
406 'e.g., 1000 should be enough.')
410 def _exit_due_to_pigweed_not_installed():
411 # Show information and suggested commands when pigweed environment variable
413 _LOG.error('Environment variable $PW_ROOT not defined or is defined '
414 'outside the current directory.')
415 _LOG.error('Did you forget to activate the Pigweed environment? '
416 'Try source ./activate.sh')
417 _LOG.error('Did you forget to install the Pigweed environment? '
418 'Try source ./bootstrap.sh')
422 # Go over each directory inside of the current directory.
423 # If it is not on the path of elements in directories_to_exclude, add
424 # (directory, True) to subdirectories_to_watch and later recursively call
425 # Observer() on them.
426 # Otherwise add (directory, False) to subdirectories_to_watch and later call
427 # Observer() with recursion=False.
428 def minimal_watch_directories(to_watch: Path, to_exclude: Iterable[Path]):
429 """Determine which subdirectory to watch recursively"""
431 to_watch = Path(to_watch)
433 assert False, "Please watch one directory at a time."
435 # Reformat to_exclude.
436 directories_to_exclude: List[Path] = [
437 to_watch.joinpath(directory_to_exclude)
438 for directory_to_exclude in to_exclude
439 if to_watch.joinpath(directory_to_exclude).is_dir()
442 # Split the relative path of directories_to_exclude (compared to to_watch),
443 # and generate all parent paths needed to be watched without recursion.
444 exclude_dir_parents = {to_watch}
445 for directory_to_exclude in directories_to_exclude:
447 Path(directory_to_exclude).relative_to(to_watch).parts)[:-1]
450 dir_tmp = Path(dir_tmp, part)
451 exclude_dir_parents.add(dir_tmp)
453 # Go over all layers of directory. Append those that are the parents of
454 # directories_to_exclude to the list with recursion==False, and others
455 # with recursion==True.
456 for directory in exclude_dir_parents:
457 dir_path = Path(directory)
458 yield dir_path, False
459 for item in Path(directory).iterdir():
460 if (item.is_dir() and item not in exclude_dir_parents
461 and item not in directories_to_exclude):
465 def gitignore_patterns():
466 """Load patterns in pw_root_dir/.gitignore and return as [str]"""
467 pw_root_dir = Path(os.environ['PW_ROOT'])
469 # Get top level .gitignore entries
470 gitignore_path = pw_root_dir / Path('.gitignore')
471 if gitignore_path.exists():
472 for line in gitignore_path.read_text().splitlines():
473 globname = line.strip()
474 # If line is empty or a comment.
475 if not globname or globname.startswith('#'):
480 def get_common_excludes() -> List[Path]:
481 """Find commonly excluded directories, and return them as a [Path]"""
482 exclude_list: List[Path] = []
484 # Preset exclude list for Pigweed's upstream directories.
485 pw_root_dir = Path(os.environ['PW_ROOT'])
486 exclude_list.extend([
487 pw_root_dir / ignored_directory for ignored_directory in [
488 '.environment', # Bootstrap-created CIPD and Python venv.
489 '.presubmit', # Presubmit-created CIPD and Python venv.
490 '.git', # Pigweed's git repo.
491 '.mypy_cache', # Python static analyzer.
492 '.cargo', # Rust package manager.
493 'out', # Typical build directory.
497 # Preset exclude for common downstream project structures.
499 # By convention, Pigweed projects use "out" as a build directory, so if
500 # watch is invoked outside the Pigweed root, also ignore the local out
502 if Path.cwd() != pw_root_dir:
503 exclude_list.append(Path('out'))
505 # Check for and warn about legacy directories.
506 legacy_directories = [
507 '.cipd', # Legacy CIPD location.
508 '.python3-venv', # Legacy Python venv location.
511 for legacy_directory in legacy_directories:
512 full_legacy_directory = pw_root_dir / legacy_directory
513 if full_legacy_directory.is_dir():
514 _LOG.warning('Legacy environment directory found: %s',
515 str(full_legacy_directory))
516 exclude_list.append(full_legacy_directory)
519 _LOG.warning('Found legacy environment directory(s); these '
525 def _find_build_dir(default_build_dir: Path = Path('out')) -> Optional[Path]:
526 """Searches for a build directory, returning the first it finds."""
527 # Give priority to out/, then something under out/.
528 if default_build_dir.joinpath('build.ninja').exists():
529 return default_build_dir
531 for path in default_build_dir.glob('**/build.ninja'):
534 for path in Path.cwd().glob('**/build.ninja'):
540 def watch(default_build_targets: List[str], build_directories: List[str],
541 patterns: str, ignore_patterns_string: str, exclude_list: List[Path],
543 """Watches files and runs Ninja commands when they change."""
544 _LOG.info('Starting Pigweed build watcher')
546 # Get pigweed directory information from environment variable PW_ROOT.
547 if os.environ['PW_ROOT'] is None:
548 _exit_due_to_pigweed_not_installed()
549 pw_root = Path(os.environ['PW_ROOT']).resolve()
550 if Path.cwd().resolve() not in [pw_root, *pw_root.parents]:
551 _exit_due_to_pigweed_not_installed()
553 # Preset exclude list for pigweed directory.
554 exclude_list += get_common_excludes()
557 BuildCommand(Path(build_dir[0]), tuple(build_dir[1:]))
558 for build_dir in build_directories
561 # If no build directory was specified, search the tree for a build.ninja.
562 if default_build_targets or not build_directories:
563 build_dir = _find_build_dir()
565 # Make sure we found something; if not, bail.
566 if build_dir is None:
567 _die("No build dirs found. Did you forget to run 'gn gen out'?")
569 build_commands.append(
570 BuildCommand(build_dir, tuple(default_build_targets)))
572 # Verify that the build output directories exist.
573 for i, build_target in enumerate(build_commands, 1):
574 if not build_target.build_dir.is_dir():
575 _die("Build directory doesn't exist: %s", build_target)
577 _LOG.info('Will build [%d/%d]: %s', i, len(build_commands),
580 _LOG.debug('Patterns: %s', patterns)
582 # Try to make a short display path for the watched directory that has
583 # "$HOME" instead of the full home directory. This is nice for users
584 # who have deeply nested home directories.
585 path_to_log = str(Path().resolve()).replace(str(Path.home()), '$HOME')
587 # Ignore the user-specified patterns.
588 ignore_patterns = (ignore_patterns_string.split(_WATCH_PATTERN_DELIMITER)
589 if ignore_patterns_string else [])
590 # Ignore top level pw_root_dir/.gitignore patterns.
591 ignore_patterns += gitignore_patterns()
593 ignore_dirs = ['.presubmit', '.python3-env']
595 env = pw_cli.env.pigweed_environment()
597 charset = _EMOJI_CHARSET
599 charset = _ASCII_CHARSET
601 event_handler = PigweedBuildWatcher(
602 patterns=patterns.split(_WATCH_PATTERN_DELIMITER),
603 ignore_patterns=ignore_patterns,
604 build_commands=build_commands,
605 ignore_dirs=ignore_dirs,
611 # It can take awhile to configure the filesystem watcher, so have the
612 # message reflect that with the "...". Run inside the try: to
613 # gracefully handle the user Ctrl-C'ing out during startup.
615 _LOG.info('Attaching filesystem watcher to %s/...', path_to_log)
617 # Observe changes for all files in the root directory. Whether the
618 # directory should be observed recursively or not is determined by the
619 # second element in subdirectories_to_watch.
621 for path, rec in minimal_watch_directories(Path.cwd(), exclude_list):
622 observer = Observer()
629 observers.append(observer)
631 event_handler.debouncer.press('Triggering initial build...')
632 for observer in observers:
633 while observer.is_alive():
636 # Ctrl-C on Unix generates KeyboardInterrupt
637 # Ctrl-Z on Windows generates EOFError
638 except (KeyboardInterrupt, EOFError):
639 _exit_due_to_interrupt()
640 except OSError as err:
641 if err.args[0] == _ERRNO_INOTIFY_LIMIT_REACHED:
642 _exit_due_to_inotify_limit()
646 _LOG.critical('Should never get here')
651 """Watch files for changes and rebuild."""
652 parser = argparse.ArgumentParser(
654 formatter_class=argparse.RawDescriptionHelpFormatter)
655 add_parser_arguments(parser)
656 watch(**vars(parser.parse_args()))
659 if __name__ == '__main__':