Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_protobuf_compiler / py / pw_protobuf_compiler / python_protos.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Tools for compiling and importing Python protos on the fly."""
15
16 import importlib.util
17 import logging
18 import os
19 from pathlib import Path
20 import subprocess
21 import shlex
22 import tempfile
23 from types import ModuleType
24 from typing import (Dict, Generic, Iterable, Iterator, List, NamedTuple, Set,
25                     Tuple, TypeVar, Union)
26
27 _LOG = logging.getLogger(__name__)
28
29 PathOrStr = Union[Path, str]
30
31
32 def compile_protos(
33     output_dir: PathOrStr,
34     proto_files: Iterable[PathOrStr],
35     includes: Iterable[PathOrStr] = ()) -> None:
36     """Compiles proto files for Python by invoking the protobuf compiler.
37
38     Proto files not covered by one of the provided include paths will have their
39     directory added as an include path.
40     """
41     proto_paths: List[Path] = [Path(f).resolve() for f in proto_files]
42     include_paths: Set[Path] = set(Path(d).resolve() for d in includes)
43
44     for path in proto_paths:
45         if not any(include in path.parents for include in include_paths):
46             include_paths.add(path.parent)
47
48     cmd: Tuple[PathOrStr, ...] = (
49         'protoc',
50         '--python_out',
51         os.path.abspath(output_dir),
52         *(f'-I{d}' for d in include_paths),
53         *proto_paths,
54     )
55
56     _LOG.debug('%s', ' '.join(shlex.quote(str(c)) for c in cmd))
57     process = subprocess.run(cmd, capture_output=True)
58
59     if process.returncode:
60         _LOG.error('protoc invocation failed!\n%s\n%s',
61                    ' '.join(shlex.quote(str(c)) for c in cmd),
62                    process.stderr.decode())
63         process.check_returncode()
64
65
66 def _import_module(name: str, path: str) -> ModuleType:
67     spec = importlib.util.spec_from_file_location(name, path)
68     module = importlib.util.module_from_spec(spec)
69     spec.loader.exec_module(module)  # type: ignore[union-attr]
70     return module
71
72
73 def import_modules(directory: PathOrStr) -> Iterator:
74     """Imports modules in a directory and yields them."""
75     parent = os.path.dirname(directory)
76
77     for dirpath, _, files in os.walk(directory):
78         path_parts = os.path.relpath(dirpath, parent).split(os.sep)
79
80         for file in files:
81             name, ext = os.path.splitext(file)
82
83             if ext == '.py':
84                 yield _import_module(f'{".".join(path_parts)}.{name}',
85                                      os.path.join(dirpath, file))
86
87
88 def compile_and_import(proto_files: Iterable[PathOrStr],
89                        includes: Iterable[PathOrStr] = (),
90                        output_dir: PathOrStr = None) -> Iterator:
91     """Compiles protos and imports their modules; yields the proto modules.
92
93     Args:
94       proto_files: paths to .proto files to compile
95       includes: include paths to use for .proto compilation
96       output_dir: where to place the generated modules; a temporary directory is
97           used if omitted
98
99     Yields:
100       the generated protobuf Python modules
101     """
102
103     if output_dir:
104         compile_protos(output_dir, proto_files, includes)
105         yield from import_modules(output_dir)
106     else:
107         with tempfile.TemporaryDirectory(prefix='compiled_protos_') as tempdir:
108             compile_protos(tempdir, proto_files, includes)
109             yield from import_modules(tempdir)
110
111
112 def compile_and_import_file(proto_file: PathOrStr,
113                             includes: Iterable[PathOrStr] = (),
114                             output_dir: PathOrStr = None):
115     """Compiles and imports the module for a single .proto file."""
116     return next(iter(compile_and_import([proto_file], includes, output_dir)))
117
118
119 def compile_and_import_strings(contents: Iterable[str],
120                                includes: Iterable[PathOrStr] = (),
121                                output_dir: PathOrStr = None) -> Iterator:
122     """Compiles protos in one or more strings."""
123
124     if isinstance(contents, str):
125         contents = [contents]
126
127     with tempfile.TemporaryDirectory(prefix='proto_sources_') as path:
128         protos = []
129
130         for proto in contents:
131             # Use a hash of the proto so the same contents map to the same file
132             # name. The protobuf package complains if it seems the same contents
133             # in files with different names.
134             protos.append(Path(path, f'protobuf_{hash(proto):x}.proto'))
135             protos[-1].write_text(proto)
136
137         yield from compile_and_import(protos, includes, output_dir)
138
139
140 T = TypeVar('T')
141
142
143 class _NestedPackage(Generic[T]):
144     """Facilitates navigating protobuf packages as attributes."""
145     def __init__(self, package: str):
146         self._packages: Dict[str, _NestedPackage[T]] = {}
147         self._items: List[T] = []
148         self._package = package
149
150     def _add_package(self, subpackage: str, package: '_NestedPackage') -> None:
151         self._packages[subpackage] = package
152
153     def _add_item(self, item) -> None:
154         if item not in self._items:  # Don't store the same item multiple times.
155             self._items.append(item)
156
157     def __getattr__(self, attr: str):
158         """Look up subpackages or package members."""
159         if attr in self._packages:
160             return self._packages[attr]
161
162         for item in self._items:
163             if hasattr(item, attr):
164                 return getattr(item, attr)
165
166         raise AttributeError(
167             f'Proto package "{self._package}" does not contain "{attr}"')
168
169     def __getitem__(self, subpackage: str) -> '_NestedPackage[T]':
170         """Support accessing nested packages by name."""
171         result = self
172
173         for package in subpackage.split('.'):
174             result = result._packages[package]
175
176         return result
177
178     def __dir__(self) -> List[str]:
179         """List subpackages and members of modules as attributes."""
180         attributes = list(self._packages)
181
182         for item in self._items:
183             for attr, value in vars(item).items():
184                 # Exclude private variables and modules from dir().
185                 if not attr.startswith('_') and not isinstance(
186                         value, ModuleType):
187                     attributes.append(attr)
188
189         return attributes
190
191     def __iter__(self) -> Iterator['_NestedPackage[T]']:
192         """Iterate over nested packages."""
193         return iter(self._packages.values())
194
195     def __repr__(self) -> str:
196         msg = [f'ProtoPackage({self._package!r}']
197
198         public_members = [
199             i for i in vars(self)
200             if i not in self._packages and not i.startswith('_')
201         ]
202         if public_members:
203             msg.append(f'members={str(public_members)}')
204
205         if self._packages:
206             msg.append(f'subpackages={str(list(self._packages))}')
207
208         return ', '.join(msg) + ')'
209
210     def __str__(self) -> str:
211         return self._package
212
213
214 class Packages(NamedTuple):
215     """Items in a protobuf package structure; returned from as_package."""
216     items_by_package: Dict[str, List]
217     packages: _NestedPackage
218
219
220 def as_packages(items: Iterable[Tuple[str, T]],
221                 packages: Packages = None) -> Packages:
222     """Places items in a proto-style package structure navigable by attributes.
223
224     Args:
225       items: (package, item) tuples to insert into the package structure
226       packages: if provided, update this Packages instead of creating a new one
227     """
228     if packages is None:
229         packages = Packages({}, _NestedPackage(''))
230
231     for package, item in items:
232         packages.items_by_package.setdefault(package, []).append(item)
233
234         entry = packages.packages
235         subpackages = package.split('.')
236
237         # pylint: disable=protected-access
238         for i, subpackage in enumerate(subpackages, 1):
239             if subpackage not in entry._packages:
240                 entry._add_package(subpackage,
241                                    _NestedPackage('.'.join(subpackages[:i])))
242
243             entry = entry._packages[subpackage]
244
245         entry._add_item(item)
246         # pylint: enable=protected-access
247
248     return packages
249
250
251 PathOrModule = Union[str, Path, ModuleType]
252
253
254 class Library:
255     """A collection of protocol buffer modules sorted by package.
256
257     In Python, each .proto file is compiled into a Python module. The Library
258     class makes it simple to navigate a collection of Python modules
259     corresponding to .proto files, without relying on the location of these
260     compiled modules.
261
262     Proto messages and other types can be directly accessed by their protocol
263     buffer package name. For example, the foo.bar.Baz message can be accessed
264     in a Library called `protos` as:
265
266       protos.packages.foo.bar.Baz
267
268     A Library also provides the modules_by_package dictionary, for looking up
269     the list of modules in a particular package, and the modules() generator
270     for iterating over all modules.
271     """
272     @classmethod
273     def from_paths(cls, protos: Iterable[PathOrModule]) -> 'Library':
274         """Creates a Library from paths to proto files or proto modules."""
275         paths: List[PathOrStr] = []
276         modules: List[ModuleType] = []
277
278         for proto in protos:
279             if isinstance(proto, (Path, str)):
280                 paths.append(proto)
281             else:
282                 modules.append(proto)
283
284         if paths:
285             modules += compile_and_import(paths)
286         return Library(modules)
287
288     @classmethod
289     def from_strings(cls,
290                      contents: Iterable[str],
291                      includes: Iterable[PathOrStr] = (),
292                      output_dir: PathOrStr = None) -> 'Library':
293         """Creates a proto library from protos in the provided strings."""
294         return cls(compile_and_import_strings(contents, includes, output_dir))
295
296     def __init__(self, modules: Iterable[ModuleType]):
297         """Constructs a Library from an iterable of modules.
298
299         A Library can be constructed with modules dynamically compiled by
300         compile_and_import. For example:
301
302             protos = Library(compile_and_import(list_of_proto_files))
303         """
304         self.modules_by_package, self.packages = as_packages(
305             (m.DESCRIPTOR.package, m)  # type: ignore[attr-defined]
306             for m in modules)
307
308     def modules(self) -> Iterable:
309         """Allows iterating over all protobuf modules in this library."""
310         for module_list in self.modules_by_package.values():
311             yield from module_list
312
313
314 def proto_repr(message) -> str:
315     """Creates a repr-like string for a protobuf."""
316     fields = []
317
318     for field in message.DESCRIPTOR.fields:
319         value = getattr(message, field.name)
320
321         # Include fields if has_<field>() is true or the value is non-default.
322         if hasattr(message, 'has_' + field.name):
323             if not getattr(message, 'has_' + field.name)():
324                 continue
325         elif value == field.default_value:
326             continue
327
328         fields.append(f'{field.name}={value!r}')
329
330     return f'{message.DESCRIPTOR.full_name}({", ".join(fields)})'