self.wanted_tests_patterns = []
self.blacklisted_tests_patterns = []
self._generators = []
- self.queue = queue.Queue()
- self.jobs = []
- self.total_num_tests = 0
- self.starting_test_num = 0
self.check_testslist = True
self.all_tests = None
self.expected_failures = {}
return False
- def test_wait(self):
- while True:
- # Check process every second for timeout
- try:
- self.queue.get(timeout=1)
- except queue.Empty:
- pass
-
- for test in self.jobs:
- if test.process_update():
- self.jobs.remove(test)
- return test
-
- def tests_wait(self):
- try:
- test = self.test_wait()
- test.check_results()
- except KeyboardInterrupt:
- for test in self.jobs:
- test.kill_subprocess()
- raise
-
- return test
-
- def start_new_job(self, tests_left):
- try:
- test = tests_left.pop(0)
- except IndexError:
- return False
-
- self.print_test_num(test)
- test.test_start(self.queue)
-
- self.jobs.append(test)
-
- return True
-
- def run_tests(self, starting_test_num, total_num_tests):
- self.total_num_tests = total_num_tests
- self.starting_test_num = starting_test_num
-
- alone_tests = []
- tests = []
- for test in self.tests:
- if test.is_parallel:
- tests.append(test)
- else:
- alone_tests.append(test)
-
- max_num_jobs = min(self.options.num_jobs, len(tests))
- jobs_running = 0
-
- for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
- tests_left = list(tests)
- for i in range(num_jobs):
- if not self.start_new_job(tests_left):
- break
- jobs_running += 1
-
- while jobs_running != 0:
- test = self.tests_wait()
- jobs_running -= 1
- self.print_test_num(test)
- res = test.test_end()
- self.reporter.after_test(test)
- if res != Result.PASSED and (self.options.forever or
- self.options.fatal_error):
- return test.result
- if self.start_new_job(tests_left):
- jobs_running += 1
-
- return Result.PASSED
-
- def print_test_num(self, test):
- cur_test_num = self.starting_test_num + self.tests.index(test) + 1
- sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests))
-
- def clean_tests(self):
- for test in self.tests:
- test.clean()
-
def needs_http_server(self):
return False
self.all_tests = None
self.wanted_tests_patterns = []
+ self.queue = queue.Queue()
+ self.jobs = []
+ self.total_num_tests = 0
+
def _list_app_dirs(self):
app_dirs = []
app_dirs.append(os.path.join(self.libsdir, "apps"))
return True
return False
+ def print_test_num(self, test):
+ cur_test_num = self.tests.index(test) + 1
+ sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests))
+
+ def test_wait(self):
+ while True:
+ # Check process every second for timeout
+ try:
+ self.queue.get(timeout=1)
+ except queue.Empty:
+ pass
+
+ for test in self.jobs:
+ if test.process_update():
+ self.jobs.remove(test)
+ return test
+
+ def tests_wait(self):
+ try:
+ test = self.test_wait()
+ test.check_results()
+ except KeyboardInterrupt:
+ for test in self.jobs:
+ test.kill_subprocess()
+ raise
+
+ return test
+
+ def start_new_job(self, tests_left):
+ try:
+ test = tests_left.pop(0)
+ except IndexError:
+ return False
+
+ self.print_test_num(test)
+ test.test_start(self.queue)
+
+ self.jobs.append(test)
+
+ return True
+
def _run_tests(self):
cur_test_num = 0
if all_tests == -1:
return False
self.all_tests = all_tests
- total_num_tests = len(self.all_tests)
+ self.total_num_tests = len(self.all_tests)
self.reporter.init_timer()
- for tester in self.testers:
- if not self._tester_needed(tester):
- continue
- res = tester.run_tests(cur_test_num, total_num_tests)
- cur_test_num += len(tester.list_tests())
- if res != Result.PASSED and (self.options.forever or
- self.options.fatal_error):
- return False
+ alone_tests = []
+ tests = []
+ for test in self.tests:
+ if test.is_parallel:
+ tests.append(test)
+ else:
+ alone_tests.append(test)
+
+ max_num_jobs = min(self.options.num_jobs, len(tests))
+ jobs_running = 0
+
+ for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
+ tests_left = list(tests)
+ for i in range(num_jobs):
+ if not self.start_new_job(tests_left):
+ break
+ jobs_running += 1
+
+ while jobs_running != 0:
+ test = self.tests_wait()
+ jobs_running -= 1
+ self.print_test_num(test)
+ res = test.test_end()
+ self.reporter.after_test(test)
+ if res != Result.PASSED and (self.options.forever or
+ self.options.fatal_error):
+ return test.result
+ if self.start_new_job(tests_left):
+ jobs_running += 1
return True
def clean_tests(self):
- for tester in self.testers:
- tester.clean_tests()
+ for test in self.tests:
+ test.clean()
def run_tests(self):
if self.options.forever: