binman: Re-enable concurrent tests
[platform/kernel/u-boot.git] / tools / patman / test_util.py
1 # SPDX-License-Identifier: GPL-2.0+
2 #
3 # Copyright (c) 2016 Google, Inc
4 #
5
6 from contextlib import contextmanager
7 import glob
8 import multiprocessing
9 import os
10 import sys
11 import unittest
12
13 from patman import command
14
15 from io import StringIO
16
17 use_concurrent = True
18 try:
19     from concurrencytest.concurrencytest import ConcurrentTestSuite
20     from concurrencytest.concurrencytest import fork_for_tests
21 except:
22     use_concurrent = False
23
24
25 def RunTestCoverage(prog, filter_fname, exclude_list, build_dir, required=None,
26                     extra_args=None):
27     """Run tests and check that we get 100% coverage
28
29     Args:
30         prog: Program to run (with be passed a '-t' argument to run tests
31         filter_fname: Normally all *.py files in the program's directory will
32             be included. If this is not None, then it is used to filter the
33             list so that only filenames that don't contain filter_fname are
34             included.
35         exclude_list: List of file patterns to exclude from the coverage
36             calculation
37         build_dir: Build directory, used to locate libfdt.py
38         required: List of modules which must be in the coverage report
39         extra_args (str): Extra arguments to pass to the tool before the -t/test
40             arg
41
42     Raises:
43         ValueError if the code coverage is not 100%
44     """
45     # This uses the build output from sandbox_spl to get _libfdt.so
46     path = os.path.dirname(prog)
47     if filter_fname:
48         glob_list = glob.glob(os.path.join(path, '*.py'))
49         glob_list = [fname for fname in glob_list if filter_fname in fname]
50     else:
51         glob_list = []
52     glob_list += exclude_list
53     glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*']
54     glob_list += ['*concurrencytest*']
55     test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t'
56     prefix = ''
57     if build_dir:
58         prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
59     cmd = ('%spython3-coverage run '
60            '--omit "%s" %s %s %s -P1' % (prefix, ','.join(glob_list),
61                                          prog, extra_args or '', test_cmd))
62     os.system(cmd)
63     stdout = command.Output('python3-coverage', 'report')
64     lines = stdout.splitlines()
65     if required:
66         # Convert '/path/to/name.py' just the module name 'name'
67         test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0]
68                         for line in lines if '/etype/' in line])
69         missing_list = required
70         missing_list.discard('__init__')
71         missing_list.difference_update(test_set)
72         if missing_list:
73             print('Missing tests for %s' % (', '.join(missing_list)))
74             print(stdout)
75             ok = False
76
77     coverage = lines[-1].split(' ')[-1]
78     ok = True
79     print(coverage)
80     if coverage != '100%':
81         print(stdout)
82         print("Type 'python3-coverage html' to get a report in "
83               'htmlcov/index.html')
84         print('Coverage error: %s, but should be 100%%' % coverage)
85         ok = False
86     if not ok:
87         raise ValueError('Test coverage failure')
88
89
90 # Use this to suppress stdout/stderr output:
91 # with capture_sys_output() as (stdout, stderr)
92 #   ...do something...
93 @contextmanager
94 def capture_sys_output():
95     capture_out, capture_err = StringIO(), StringIO()
96     old_out, old_err = sys.stdout, sys.stderr
97     try:
98         sys.stdout, sys.stderr = capture_out, capture_err
99         yield capture_out, capture_err
100     finally:
101         sys.stdout, sys.stderr = old_out, old_err
102
103
104 def ReportResult(toolname:str, test_name: str, result: unittest.TestResult):
105     """Report the results from a suite of tests
106
107     Args:
108         toolname: Name of the tool that ran the tests
109         test_name: Name of test that was run, or None for all
110         result: A unittest.TestResult object containing the results
111     """
112     # Remove errors which just indicate a missing test. Since Python v3.5 If an
113     # ImportError or AttributeError occurs while traversing name then a
114     # synthetic test that raises that error when run will be returned. These
115     # errors are included in the errors accumulated by result.errors.
116     if test_name:
117         errors = []
118
119         for test, err in result.errors:
120             if ("has no attribute '%s'" % test_name) not in err:
121                 errors.append((test, err))
122             result.testsRun -= 1
123         result.errors = errors
124
125     print(result)
126     for test, err in result.errors:
127         print(test.id(), err)
128     for test, err in result.failures:
129         print(err, result.failures)
130     if result.skipped:
131         print('%d %s test%s SKIPPED:' % (len(result.skipped), toolname,
132             's' if len(result.skipped) > 1 else ''))
133         for skip_info in result.skipped:
134             print('%s: %s' % (skip_info[0], skip_info[1]))
135     if result.errors or result.failures:
136         print('%s tests FAILED' % toolname)
137         return 1
138     return 0
139
140
141 def RunTestSuites(result, debug, verbosity, test_preserve_dirs, processes,
142                   test_name, toolpath, test_class_list):
143     """Run a series of test suites and collect the results
144
145     Args:
146         result: A unittest.TestResult object to add the results to
147         debug: True to enable debugging, which shows a full stack trace on error
148         verbosity: Verbosity level to use (0-4)
149         test_preserve_dirs: True to preserve the input directory used by tests
150             so that it can be examined afterwards (only useful for debugging
151             tests). If a single test is selected (in args[0]) it also preserves
152             the output directory for this test. Both directories are displayed
153             on the command line.
154         processes: Number of processes to use to run tests (None=same as #CPUs)
155         test_name: Name of test to run, or None for all
156         toolpath: List of paths to use for tools
157         test_class_list: List of test classes to run
158     """
159     for module in []:
160         suite = doctest.DocTestSuite(module)
161         suite.run(result)
162
163     sys.argv = [sys.argv[0]]
164     if debug:
165         sys.argv.append('-D')
166     if verbosity:
167         sys.argv.append('-v%d' % verbosity)
168     if toolpath:
169         for path in toolpath:
170             sys.argv += ['--toolpath', path]
171
172     suite = unittest.TestSuite()
173     loader = unittest.TestLoader()
174     for module in test_class_list:
175         # Test the test module about our arguments, if it is interested
176         if hasattr(module, 'setup_test_args'):
177             setup_test_args = getattr(module, 'setup_test_args')
178             setup_test_args(preserve_indir=test_preserve_dirs,
179                 preserve_outdirs=test_preserve_dirs and test_name is not None,
180                 toolpath=toolpath, verbosity=verbosity)
181         if test_name:
182             try:
183                 suite.addTests(loader.loadTestsFromName(test_name, module))
184             except AttributeError:
185                 continue
186         else:
187             suite.addTests(loader.loadTestsFromTestCase(module))
188     if use_concurrent and processes != 1:
189         concurrent_suite = ConcurrentTestSuite(suite,
190                 fork_for_tests(processes or multiprocessing.cpu_count()))
191         concurrent_suite.run(result)
192     else:
193         suite.run(result)