Merge tag 'u-boot-rockchip-20200501' of https://gitlab.denx.de/u-boot/custodians...
[platform/kernel/u-boot.git] / tools / patman / test_util.py
index 09f258c..4d28d9f 100644 (file)
@@ -3,21 +3,23 @@
 # Copyright (c) 2016 Google, Inc
 #
 
-from __future__ import print_function
-
 from contextlib import contextmanager
 import glob
+import multiprocessing
 import os
 import sys
+import unittest
 
-import command
+from patman import command
+from patman import test_util
 
-try:
-  from StringIO import StringIO
-except ImportError:
-  from io import StringIO
+from io import StringIO
 
-PYTHON = 'python%d' % sys.version_info[0]
+use_concurrent = True
+try:
+    from concurrencytest import ConcurrentTestSuite, fork_for_tests
+except:
+    use_concurrent = False
 
 
 def RunTestCoverage(prog, filter_fname, exclude_list, build_dir, required=None):
@@ -46,12 +48,15 @@ def RunTestCoverage(prog, filter_fname, exclude_list, build_dir, required=None):
         glob_list = []
     glob_list += exclude_list
     glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*']
-    test_cmd = 'test' if 'binman.py' in prog else '-t'
-    cmd = ('PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools %s-coverage run '
-           '--omit "%s" %s %s -P1' % (build_dir, PYTHON, ','.join(glob_list),
+    test_cmd = 'test' if 'binman' in prog else '-t'
+    prefix = ''
+    if build_dir:
+        prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
+    cmd = ('%spython3-coverage run '
+           '--omit "%s" %s %s -P1' % (prefix, ','.join(glob_list),
                                       prog, test_cmd))
     os.system(cmd)
-    stdout = command.Output('%s-coverage' % PYTHON, 'report')
+    stdout = command.Output('python3-coverage', 'report')
     lines = stdout.splitlines()
     if required:
         # Convert '/path/to/name.py' just the module name 'name'
@@ -70,8 +75,8 @@ def RunTestCoverage(prog, filter_fname, exclude_list, build_dir, required=None):
     print(coverage)
     if coverage != '100%':
         print(stdout)
-        print("Type '%s-coverage html' to get a report in "
-              'htmlcov/index.html' % PYTHON)
+        print("Type 'python3-coverage html' to get a report in "
+              'htmlcov/index.html')
         print('Coverage error: %s, but should be 100%%' % coverage)
         ok = False
     if not ok:
@@ -90,3 +95,95 @@ def capture_sys_output():
         yield capture_out, capture_err
     finally:
         sys.stdout, sys.stderr = old_out, old_err
+
+
+def ReportResult(toolname:str, test_name: str, result: unittest.TestResult):
+    """Report the results from a suite of tests
+
+    Args:
+        toolname: Name of the tool that ran the tests
+        test_name: Name of test that was run, or None for all
+        result: A unittest.TestResult object containing the results
+    """
+    # Remove errors which just indicate a missing test. Since Python v3.5 If an
+    # ImportError or AttributeError occurs while traversing name then a
+    # synthetic test that raises that error when run will be returned. These
+    # errors are included in the errors accumulated by result.errors.
+    if test_name:
+        errors = []
+
+        for test, err in result.errors:
+            if ("has no attribute '%s'" % test_name) not in err:
+                errors.append((test, err))
+            result.testsRun -= 1
+        result.errors = errors
+
+    print(result)
+    for test, err in result.errors:
+        print(test.id(), err)
+    for test, err in result.failures:
+        print(err, result.failures)
+    if result.skipped:
+        print('%d binman test%s SKIPPED:' %
+              (len(result.skipped), 's' if len(result.skipped) > 1 else ''))
+        for skip_info in result.skipped:
+            print('%s: %s' % (skip_info[0], skip_info[1]))
+    if result.errors or result.failures:
+        print('binman tests FAILED')
+        return 1
+    return 0
+
+
+def RunTestSuites(result, debug, verbosity, test_preserve_dirs, processes,
+                  test_name, toolpath, test_class_list):
+    """Run a series of test suites and collect the results
+
+    Args:
+        result: A unittest.TestResult object to add the results to
+        debug: True to enable debugging, which shows a full stack trace on error
+        verbosity: Verbosity level to use (0-4)
+        test_preserve_dirs: True to preserve the input directory used by tests
+            so that it can be examined afterwards (only useful for debugging
+            tests). If a single test is selected (in args[0]) it also preserves
+            the output directory for this test. Both directories are displayed
+            on the command line.
+        processes: Number of processes to use to run tests (None=same as #CPUs)
+        test_name: Name of test to run, or None for all
+        toolpath: List of paths to use for tools
+        test_class_list: List of test classes to run
+    """
+    for module in []:
+        suite = doctest.DocTestSuite(module)
+        suite.run(result)
+
+    sys.argv = [sys.argv[0]]
+    if debug:
+        sys.argv.append('-D')
+    if verbosity:
+        sys.argv.append('-v%d' % verbosity)
+    if toolpath:
+        for path in toolpath:
+            sys.argv += ['--toolpath', path]
+
+    suite = unittest.TestSuite()
+    loader = unittest.TestLoader()
+    for module in test_class_list:
+        # Test the test module about our arguments, if it is interested
+        if hasattr(module, 'setup_test_args'):
+            setup_test_args = getattr(module, 'setup_test_args')
+            setup_test_args(preserve_indir=test_preserve_dirs,
+                preserve_outdirs=test_preserve_dirs and test_name is not None,
+                toolpath=toolpath, verbosity=verbosity)
+        if test_name:
+            try:
+                suite.addTests(loader.loadTestsFromName(test_name, module))
+            except AttributeError:
+                continue
+        else:
+            suite.addTests(loader.loadTestsFromTestCase(module))
+    if use_concurrent and processes != 1:
+        concurrent_suite = ConcurrentTestSuite(suite,
+                fork_for_tests(processes or multiprocessing.cpu_count()))
+        concurrent_suite.run(result)
+    else:
+        suite.run(result)