Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_cli / py / pw_cli / plugins.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Registry for plugins."""
15
16 import argparse
17 import collections
18 import importlib
19 import inspect
20 import logging
21 from pathlib import Path
22 import sys
23 from textwrap import TextWrapper
24 from typing import Callable, Dict, List, Iterable, Optional, Union
25
26 from pw_cli import arguments
27
28 _LOG = logging.getLogger(__name__)
29
30 REGISTRY_FILE = 'PW_PLUGINS'
31
32
33 class Error(Exception):
34     """Failed to register a Pigweed plugin."""
35     def __str__(self):
36         """Displays the error as a string, including the __cause__ if present.
37
38         Adding __cause__ gives useful context without displaying a backtrace.
39         """
40         if self.__cause__ is None:
41             return super().__str__()
42
43         return (f'{super().__str__()} '
44                 f'({type(self.__cause__).__name__}: {self.__cause__})')
45
46
47 class _Plugin:
48     """A plugin for the pw command."""
49     def __init__(self, name: str, module: str, function: str,
50                  source: Union[Path, str]):
51         self.name = name
52         self.source = source
53
54         # Attempt to access the module and function. Catch any errors that might
55         # occur, since a bad plugin shouldn't break the rest of the pw command.
56         try:
57             self._module = importlib.import_module(module)
58         except Exception as err:
59             raise Error(f'Failed to import module "{module}"') from err
60
61         try:
62             self._function: Callable[[], int] = getattr(self._module, function)
63         except AttributeError as err:
64             raise Error(
65                 f'The function "{module}.{function}" does not exist') from err
66
67         try:
68             params = inspect.signature(self._function).parameters
69         except TypeError:
70             raise Error(
71                 'Plugin functions must be callable, but '
72                 f'{module}.{function} is a {type(self._function).__name__}')
73
74         positional_args = sum(p.default == p.empty for p in params.values())
75         if positional_args:
76             raise Error(
77                 f'Plugin functions cannot have any required positional '
78                 f'arguments, but {module}.{function} has {positional_args}')
79
80     def run(self, args: List[str]) -> int:
81         original_sys_argv = sys.argv
82         sys.argv = [f'pw {self.name}', *args]
83
84         try:
85             return self._function()
86         finally:
87             sys.argv = original_sys_argv
88
89     def help(self) -> str:
90         """Returns a brief description of this plugin."""
91         return self._function.__doc__ or self._module.__doc__ or ''
92
93     def details(self) -> List[str]:
94         return [
95             f'help      {self.help()}',
96             f'module    {self._module.__name__}',
97             f'function  {self._function.__name__}',
98             f'source    {self.source}',
99         ]
100
101
102 # This is the global CLI plugin registry.
103 _registry: Dict[str, _Plugin] = {}
104 _sources: List[Path] = []  # Paths to PW_PLUGINS files
105 _errors: Dict[str, List[Error]] = collections.defaultdict(list)
106
107
108 def _get(name: str) -> _Plugin:
109     if name in _registry:
110         return _registry[name]
111
112     if name in _errors:
113         raise Error(f'Registration for "{name}" failed: ' +
114                     ', '.join(str(e) for e in _errors[name]))
115
116     raise Error(f'The plugin "{name}" has not been registered')
117
118
119 def errors() -> Dict[str, List[Error]]:
120     return _errors
121
122
123 def run(name: str, args: List[str]) -> int:
124     """Runs a plugin by name. Raises Error if the plugin is not registered."""
125     return _get(name).run(args)
126
127
128 def command_help() -> str:
129     """Returns a help string for the registered plugins."""
130     width = max(len(name) for name in _registry) + 1 if _registry else 1
131     help_items = '\n'.join(f'  {name:{width}} {plugin.help()}'
132                            for name, plugin in sorted(_registry.items()))
133     return f'supported commands:\n{help_items}'
134
135
136 _BUILTIN_PLUGIN = '<built-in>'
137
138
139 def _valid_registration(name: str, module: str, function: str,
140                         source: Union[Path, str]) -> bool:
141     """Determines if a plugin should be registered or not."""
142     existing = _registry.get(name)
143
144     if existing is None:
145         return True
146
147     if source == _BUILTIN_PLUGIN:
148         raise Error(
149             f'Attempted to register built-in plugin "{name}", but that '
150             f'plugin was previously registered ({existing.source})!')
151
152     if existing.source == _BUILTIN_PLUGIN:
153         _LOG.debug('%s: Overriding built-in plugin "%s" with %s.%s', source,
154                    name, module, function)
155         return True
156
157     if source == _registry[name].source:
158         _LOG.warning(
159             '%s: "%s" is registered multiple times in this file! '
160             'Only the first registration takes effect', source, name)
161     else:
162         _LOG.debug(
163             '%s: The plugin "%s" was previously registered in %s; '
164             'ignoring registration as %s.%s', source, name,
165             _registry[name].source, module, function)
166
167     return False
168
169
170 def _register(name: str,
171               module: str,
172               function: str,
173               source: Union[Path, str] = _BUILTIN_PLUGIN) -> None:
174     """Registers a plugin from the specified source."""
175
176     if not _valid_registration(name, module, function, source):
177         return
178
179     try:
180         _registry[name] = _Plugin(name, module, function, source)
181         _LOG.debug('%s: Registered plugin "%s" for %s.%s', source, name,
182                    module, function)
183     except Error as err:
184         _errors[name].append(err)
185         _LOG.error('%s: Failed to register plugin "%s": %s', source, name, err)
186
187
188 def find_in_parents(name: str, path: Path) -> Optional[Path]:
189     """Searches parent directories of the path for a file or directory."""
190     path = path.resolve()
191
192     while not path.joinpath(name).exists():
193         path = path.parent
194
195         if path.samefile(path.parent):
196             return None
197
198     return path.joinpath(name)
199
200
201 def find_all_in_parents(name: str, path: Path) -> Iterable[Path]:
202     """Searches all parent directories of the path for files or directories."""
203
204     while True:
205         result = find_in_parents(name, path)
206         if result is None:
207             return
208
209         yield result
210         path = result.parent.parent
211
212
213 def _register_builtin_plugins():
214     """Registers the commands that are included with pw by default."""
215
216     _register('doctor', 'pw_doctor.doctor', 'main')
217     _register('format', 'pw_presubmit.format_code', 'main')
218     _register('help', 'pw_cli.plugins', '_help_command')
219     _register('logdemo', 'pw_cli.log', 'main')
220     _register('module-check', 'pw_module.check', 'main')
221     _register('test', 'pw_unit_test.test_runner', 'main')
222     _register('watch', 'pw_watch.watch', 'main')
223
224
225 def register(directory: Path):
226     """Finds and registers command line plugins."""
227     _register_builtin_plugins()
228
229     # Find pw plugins files starting in the current and parent directories.
230     for path in find_all_in_parents(REGISTRY_FILE, directory):
231         if not path.is_file():
232             continue
233
234         _LOG.debug('Found plugins file %s', path)
235         _sources.append(path)
236
237         with path.open() as contents:
238             for lineno, line in enumerate(contents, 1):
239                 line = line.strip()
240                 if line and not line.startswith('#'):
241                     try:
242                         name, module, function = line.split()
243                         _register(name, module, function, path)
244                     except ValueError as err:
245                         _errors[line.strip()].append(Error(err))
246                         _LOG.error(
247                             '%s:%d: Failed to parse plugin entry "%s": '
248                             'Expected 3 items (name, module, function), got %d',
249                             path, lineno, line, len(line.split()))
250
251
252 def _help_text(plugins: Iterable[str] = ()) -> Iterable[str]:
253     """Yields detailed information about commands."""
254     yield arguments.format_help()
255
256     if not plugins:
257         plugins = list(_registry)
258
259     yield '\ndetailed command information:'
260
261     wrapper = TextWrapper(width=80,
262                           initial_indent='   ',
263                           subsequent_indent=' ' * 13)
264
265     for plugin in sorted(plugins):
266         yield f'  [{plugin}]'
267
268         try:
269             for line in _get(plugin).details():
270                 yield wrapper.fill(line)
271         except Error as err:
272             yield wrapper.fill(f'error     {err}')
273
274         yield ''
275
276     yield 'PW_PLUGINS files:'
277
278     if _sources:
279         yield from (f'  [{i}] {file}' for i, file in enumerate(_sources, 1))
280     else:
281         yield '  (none found)'
282
283
284 def _help_command():
285     """Display detailed information about pw commands."""
286     parser = argparse.ArgumentParser(description=_help_command.__doc__)
287     parser.add_argument('plugins',
288                         metavar='plugin',
289                         nargs='*',
290                         help='command for which to display detailed info')
291
292     for line in _help_text(**vars(parser.parse_args())):
293         print(line, file=sys.stderr)