From f16b900643091f60190e9e54df1139f1e4668d56 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Sat, 2 Dec 2017 09:36:27 -0300 Subject: [PATCH 1/1] validate:launcher: Launch tests in `_TestsLauncher` not in TestsManagaer So that Test from several TestManager can run in parallel and thus avoid waiting for tests from one TestManager to run the following one., Also by design TestsLauncher should always have been the responsible for ... launching tests. --- .gitignore | 1 + validate/launcher/baseclasses.py | 173 +++++++++++++++++---------------------- 2 files changed, 78 insertions(+), 96 deletions(-) diff --git a/.gitignore b/.gitignore index 347d4e6..21bfda3 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ .#* tags build* +mesonbuild* \ No newline at end of file diff --git a/validate/launcher/baseclasses.py b/validate/launcher/baseclasses.py index d3b19b7..22513fa 100644 --- a/validate/launcher/baseclasses.py +++ b/validate/launcher/baseclasses.py @@ -1050,10 +1050,6 @@ class TestsManager(Loggable): self.wanted_tests_patterns = [] self.blacklisted_tests_patterns = [] self._generators = [] - self.queue = queue.Queue() - self.jobs = [] - self.total_num_tests = 0 - self.starting_test_num = 0 self.check_testslist = True self.all_tests = None self.expected_failures = {} @@ -1239,87 +1235,6 @@ class TestsManager(Loggable): return False - def test_wait(self): - while True: - # Check process every second for timeout - try: - self.queue.get(timeout=1) - except queue.Empty: - pass - - for test in self.jobs: - if test.process_update(): - self.jobs.remove(test) - return test - - def tests_wait(self): - try: - test = self.test_wait() - test.check_results() - except KeyboardInterrupt: - for test in self.jobs: - test.kill_subprocess() - raise - - return test - - def start_new_job(self, tests_left): - try: - test = tests_left.pop(0) - except IndexError: - return False - - self.print_test_num(test) - test.test_start(self.queue) - - self.jobs.append(test) - - return True - - def run_tests(self, starting_test_num, total_num_tests): - self.total_num_tests = total_num_tests - self.starting_test_num = starting_test_num - - alone_tests = [] - tests = [] - for test in self.tests: - if test.is_parallel: - tests.append(test) - else: - alone_tests.append(test) - - max_num_jobs = min(self.options.num_jobs, len(tests)) - jobs_running = 0 - - for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]: - tests_left = list(tests) - for i in range(num_jobs): - if not self.start_new_job(tests_left): - break - jobs_running += 1 - - while jobs_running != 0: - test = self.tests_wait() - jobs_running -= 1 - self.print_test_num(test) - res = test.test_end() - self.reporter.after_test(test) - if res != Result.PASSED and (self.options.forever or - self.options.fatal_error): - return test.result - if self.start_new_job(tests_left): - jobs_running += 1 - - return Result.PASSED - - def print_test_num(self, test): - cur_test_num = self.starting_test_num + self.tests.index(test) + 1 - sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests)) - - def clean_tests(self): - for test in self.tests: - test.clean() - def needs_http_server(self): return False @@ -1375,6 +1290,10 @@ class _TestsLauncher(Loggable): self.all_tests = None self.wanted_tests_patterns = [] + self.queue = queue.Queue() + self.jobs = [] + self.total_num_tests = 0 + def _list_app_dirs(self): app_dirs = [] app_dirs.append(os.path.join(self.libsdir, "apps")) @@ -1627,6 +1546,47 @@ class _TestsLauncher(Loggable): return True return False + def print_test_num(self, test): + cur_test_num = self.tests.index(test) + 1 + sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests)) + + def test_wait(self): + while True: + # Check process every second for timeout + try: + self.queue.get(timeout=1) + except queue.Empty: + pass + + for test in self.jobs: + if test.process_update(): + self.jobs.remove(test) + return test + + def tests_wait(self): + try: + test = self.test_wait() + test.check_results() + except KeyboardInterrupt: + for test in self.jobs: + test.kill_subprocess() + raise + + return test + + def start_new_job(self, tests_left): + try: + test = tests_left.pop(0) + except IndexError: + return False + + self.print_test_num(test) + test.test_start(self.queue) + + self.jobs.append(test) + + return True + def _run_tests(self): cur_test_num = 0 @@ -1635,23 +1595,44 @@ class _TestsLauncher(Loggable): if all_tests == -1: return False self.all_tests = all_tests - total_num_tests = len(self.all_tests) + self.total_num_tests = len(self.all_tests) self.reporter.init_timer() - for tester in self.testers: - if not self._tester_needed(tester): - continue - res = tester.run_tests(cur_test_num, total_num_tests) - cur_test_num += len(tester.list_tests()) - if res != Result.PASSED and (self.options.forever or - self.options.fatal_error): - return False + alone_tests = [] + tests = [] + for test in self.tests: + if test.is_parallel: + tests.append(test) + else: + alone_tests.append(test) + + max_num_jobs = min(self.options.num_jobs, len(tests)) + jobs_running = 0 + + for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]: + tests_left = list(tests) + for i in range(num_jobs): + if not self.start_new_job(tests_left): + break + jobs_running += 1 + + while jobs_running != 0: + test = self.tests_wait() + jobs_running -= 1 + self.print_test_num(test) + res = test.test_end() + self.reporter.after_test(test) + if res != Result.PASSED and (self.options.forever or + self.options.fatal_error): + return test.result + if self.start_new_job(tests_left): + jobs_running += 1 return True def clean_tests(self): - for tester in self.testers: - tester.clean_tests() + for test in self.tests: + test.clean() def run_tests(self): if self.options.forever: -- 2.7.4