Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_unit_test / py / pw_unit_test / test_runner.py
1 # Copyright 2019 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Runs Pigweed unit tests built using GN."""
15
16 import argparse
17 import asyncio
18 import enum
19 import json
20 import logging
21 import os
22 import shlex
23 import subprocess
24 import sys
25
26 from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple
27
28 import pw_cli.log
29 import pw_cli.process
30
31 # Global logger for the script.
32 _LOG: logging.Logger = logging.getLogger(__name__)
33
34
35 def register_arguments(parser: argparse.ArgumentParser) -> None:
36     """Registers command-line arguments."""
37
38     parser.add_argument('--root',
39                         type=str,
40                         default='out',
41                         help='Path to the root build directory')
42     parser.add_argument('-r',
43                         '--runner',
44                         type=str,
45                         required=True,
46                         help='Executable which runs a test on the target')
47     parser.add_argument('runner_args',
48                         nargs=argparse.REMAINDER,
49                         help='Arguments to forward to the test runner')
50
51     # The runner script can either run binaries directly or groups.
52     group = parser.add_mutually_exclusive_group()
53     group.add_argument('-g',
54                        '--group',
55                        action='append',
56                        help='Test groups to run')
57     group.add_argument('-t',
58                        '--test',
59                        action='append',
60                        help='Test binaries to run')
61
62
63 class TestResult(enum.Enum):
64     """Result of a single unit test run."""
65     UNKNOWN = 0
66     SUCCESS = 1
67     FAILURE = 2
68
69
70 class Test:
71     """A unit test executable."""
72     def __init__(self, name: str, file_path: str):
73         self.name: str = name
74         self.file_path: str = file_path
75         self.status: TestResult = TestResult.UNKNOWN
76
77     def __repr__(self) -> str:
78         return f'Test({self.name})'
79
80     def __eq__(self, other: object) -> bool:
81         if not isinstance(other, Test):
82             return NotImplemented
83         return self.file_path == other.file_path
84
85     def __hash__(self) -> int:
86         return hash(self.file_path)
87
88
89 class TestGroup:
90     """Graph node representing a group of unit tests."""
91     def __init__(self, name: str, tests: Iterable[Test]):
92         self._name: str = name
93         self._deps: Iterable['TestGroup'] = []
94         self._tests: Iterable[Test] = tests
95
96     def set_deps(self, deps: Iterable['TestGroup']) -> None:
97         """Updates the dependency list of this group."""
98         self._deps = deps
99
100     def all_test_dependencies(self) -> List[Test]:
101         """Returns a list of all tests in this group and its dependencies."""
102         return list(self._all_test_dependencies(set()))
103
104     def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]:
105         if self._name in processed_groups:
106             return set()
107
108         tests: Set[Test] = set()
109         for dep in self._deps:
110             tests.update(
111                 dep._all_test_dependencies(  # pylint: disable=protected-access
112                     processed_groups))
113
114         tests.update(self._tests)
115         processed_groups.add(self._name)
116
117         return tests
118
119     def __repr__(self) -> str:
120         return f'TestGroup({self._name})'
121
122
123 class TestRunner:
124     """Runs unit tests by calling out to a runner script."""
125     def __init__(self, executable: str, args: Sequence[str],
126                  tests: Iterable[Test]):
127         self._executable: str = executable
128         self._args: Sequence[str] = args
129         self._tests: List[Test] = list(tests)
130
131     async def run_tests(self) -> None:
132         """Runs all registered unit tests through the runner script."""
133
134         for idx, test in enumerate(self._tests, 1):
135             total = str(len(self._tests))
136             test_counter = f'Test {idx:{len(total)}}/{total}'
137
138             _LOG.info('%s: [ RUN] %s', test_counter, test.name)
139             command = [self._executable, test.file_path, *self._args]
140
141             if self._executable.endswith('.py'):
142                 command.insert(0, sys.executable)
143
144             try:
145                 process = await pw_cli.process.run_async(*command)
146                 if process.returncode == 0:
147                     test.status = TestResult.SUCCESS
148                     test_result = 'PASS'
149                 else:
150                     test.status = TestResult.FAILURE
151                     test_result = 'FAIL'
152
153                     _LOG.log(pw_cli.log.LOGLEVEL_STDOUT, '[%s]\n%s',
154                              pw_cli.color.colors().bold_white(process.pid),
155                              process.output.decode(errors='ignore').rstrip())
156
157                     _LOG.info('%s: [%s] %s', test_counter, test_result,
158                               test.name)
159             except subprocess.CalledProcessError as err:
160                 _LOG.error(err)
161                 return
162
163     def all_passed(self) -> bool:
164         """Returns true if all unit tests passed."""
165         return all(test.status is TestResult.SUCCESS for test in self._tests)
166
167
168 # Filename extension for unit test metadata files.
169 METADATA_EXTENSION = '.testinfo.json'
170
171
172 def find_test_metadata(root: str) -> List[str]:
173     """Locates all test metadata files located within a directory tree."""
174
175     metadata: List[str] = []
176     for path, _, files in os.walk(root):
177         for filename in files:
178             if not filename.endswith(METADATA_EXTENSION):
179                 continue
180
181             full_path = os.path.join(path, filename)
182             _LOG.debug('Found group metadata at %s', full_path)
183             metadata.append(full_path)
184
185     return metadata
186
187
188 # TODO(frolv): This is copied from the Python runner script.
189 # It should be extracted into a library and imported instead.
190 def find_binary(target: str) -> str:
191     """Tries to find a binary for a gn build target.
192
193     Args:
194         target: Relative filesystem path to the target's output directory and
195             target name, separated by a colon.
196
197     Returns:
198         Full path to the target's binary.
199
200     Raises:
201         FileNotFoundError: No binary found for target.
202     """
203
204     target_path, target_name = target.split(':')
205
206     for extension in ['', '.elf', '.exe']:
207         potential_filename = f'{target_path}/{target_name}{extension}'
208         if os.path.isfile(potential_filename):
209             return potential_filename
210
211     raise FileNotFoundError(
212         f'Could not find output binary for build target {target}')
213
214
215 def parse_metadata(metadata: List[str], root: str) -> Dict[str, TestGroup]:
216     """Builds a graph of test group objects from metadata.
217
218     Args:
219         metadata: List of paths to JSON test metadata files.
220         root: Root output directory of the build.
221
222     Returns:
223         Map of group name to TestGroup object. All TestGroup objects are fully
224         populated with the paths to their unit tests and references to their
225         dependencies.
226     """
227     def canonicalize(path: str) -> str:
228         """Removes a trailing slash from a GN target's directory.
229
230         '//module:target'  -> '//module:target'
231         '//module/:target' -> '//module:target'
232         """
233         index = path.find(':')
234         if index == -1 or path[index - 1] != '/':
235             return path
236         return path[:index - 1] + path[index:]
237
238     group_deps: List[Tuple[str, List[str]]] = []
239     all_tests: Dict[str, Test] = {}
240     test_groups: Dict[str, TestGroup] = {}
241     num_tests = 0
242
243     for path in metadata:
244         with open(path, 'r') as metadata_file:
245             metadata_list = json.load(metadata_file)
246
247         deps: List[str] = []
248         tests: List[Test] = []
249
250         for entry in metadata_list:
251             if entry['type'] == 'self':
252                 group_name = canonicalize(entry['name'])
253             elif entry['type'] == 'dep':
254                 deps.append(canonicalize(entry['group']))
255             elif entry['type'] == 'test':
256                 test_directory = os.path.join(root, entry['test_directory'])
257                 test_binary = find_binary(
258                     f'{test_directory}:{entry["test_name"]}')
259
260                 if test_binary not in all_tests:
261                     all_tests[test_binary] = Test(entry['test_name'],
262                                                   test_binary)
263
264                 tests.append(all_tests[test_binary])
265
266         if deps:
267             group_deps.append((group_name, deps))
268
269         num_tests += len(tests)
270         test_groups[group_name] = TestGroup(group_name, tests)
271
272     for name, deps in group_deps:
273         test_groups[name].set_deps([test_groups[dep] for dep in deps])
274
275     _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests)
276     return test_groups
277
278
279 def tests_from_groups(group_names: Optional[Sequence[str]],
280                       root: str) -> List[Test]:
281     """Returns unit tests belonging to test groups and their dependencies.
282
283     If args.names is nonempty, only searches groups specified there.
284     Otherwise, finds tests from all known test groups.
285     """
286
287     _LOG.info('Scanning for tests...')
288     metadata = find_test_metadata(root)
289     test_groups = parse_metadata(metadata, root)
290
291     groups_to_run = group_names if group_names else test_groups.keys()
292     tests_to_run: Set[Test] = set()
293
294     for name in groups_to_run:
295         try:
296             tests_to_run.update(test_groups[name].all_test_dependencies())
297         except KeyError:
298             _LOG.error('Unknown test group: %s', name)
299             sys.exit(1)
300
301     _LOG.info('Running test groups %s', ', '.join(groups_to_run))
302     return list(tests_to_run)
303
304
305 def tests_from_paths(paths: Sequence[str]) -> List[Test]:
306     """Returns a list of tests from test executable paths."""
307
308     tests: List[Test] = []
309     for path in paths:
310         name = os.path.splitext(os.path.basename(path))[0]
311         tests.append(Test(name, path))
312     return tests
313
314
315 # TODO(frolv): Try to figure out a better solution for passing through the
316 # corrected sys.argv across all pw commands.
317 async def find_and_run_tests(argv_copy: List[str],
318                              root: str,
319                              runner: str,
320                              runner_args: Sequence[str] = (),
321                              group: Optional[Sequence[str]] = None,
322                              test: Optional[Sequence[str]] = None) -> int:
323     """Runs some unit tests."""
324
325     if runner_args:
326         if runner_args[0] != '--':
327             _LOG.error('Unrecognized argument: %s', runner_args[0])
328             _LOG.info('')
329             _LOG.info('Did you mean to pass this argument to the runner?')
330             _LOG.info('Insert a -- in front of it to forward it through:')
331             _LOG.info('')
332
333             index = argv_copy.index(runner_args[0])
334             fixed_cmd = [*argv_copy[:index], '--', *argv_copy[index:]]
335
336             _LOG.info('  %s', ' '.join(shlex.quote(arg) for arg in fixed_cmd))
337             _LOG.info('')
338
339             return 1
340
341         runner_args = runner_args[1:]
342
343     if test:
344         tests = tests_from_paths(test)
345     else:
346         tests = tests_from_groups(group, root)
347
348     test_runner = TestRunner(runner, runner_args, tests)
349     await test_runner.run_tests()
350
351     return 0 if test_runner.all_passed() else 1
352
353
354 def main() -> int:
355     """Run Pigweed unit tests built using GN."""
356
357     parser = argparse.ArgumentParser(description=main.__doc__)
358     register_arguments(parser)
359     parser.add_argument('-v',
360                         '--verbose',
361                         action='store_true',
362                         help='Output additional logs as the script runs')
363
364     args_as_dict = dict(vars(parser.parse_args()))
365     del args_as_dict['verbose']
366     return asyncio.run(find_and_run_tests(sys.argv, **args_as_dict))
367
368
369 if __name__ == '__main__':
370     pw_cli.log.install(hide_timestamp=True)
371     sys.exit(main())