- def test_wait(self):
- while True:
- # Check process every second for timeout
- try:
- self.queue.get(timeout=1)
- except queue.Empty:
- pass
-
- for test in self.jobs:
- if test.process_update():
- self.jobs.remove(test)
- return test
-
- def tests_wait(self):
- try:
- test = self.test_wait()
- test.check_results()
- except KeyboardInterrupt:
- for test in self.jobs:
- test.kill_subprocess()
- raise
-
- return test
-
- def start_new_job(self, tests_left):
- try:
- test = tests_left.pop(0)
- except IndexError:
- return False
-
- self.print_test_num(test)
- test.test_start(self.queue)
-
- self.jobs.append(test)
-
- return True
-
- def run_tests(self, starting_test_num, total_num_tests):
- self.total_num_tests = total_num_tests
- self.starting_test_num = starting_test_num
-
- alone_tests = []
- tests = []
- for test in self.tests:
- if test.is_parallel:
- tests.append(test)
- else:
- alone_tests.append(test)
-
- max_num_jobs = min(self.options.num_jobs, len(tests))
- jobs_running = 0
-
- for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
- tests_left = list(tests)
- for i in range(num_jobs):
- if not self.start_new_job(tests_left):
- break
- jobs_running += 1
-
- while jobs_running != 0:
- test = self.tests_wait()
- jobs_running -= 1
- self.print_test_num(test)
- res = test.test_end()
- self.reporter.after_test(test)
- if res != Result.PASSED and (self.options.forever or
- self.options.fatal_error):
- return test.result
- if self.start_new_job(tests_left):
- jobs_running += 1
-
- return Result.PASSED
-
- def print_test_num(self, test):
- cur_test_num = self.starting_test_num + self.tests.index(test) + 1
- sys.stdout.write("[%d / %d] " % (cur_test_num, self.total_num_tests))
-
- def clean_tests(self):
- for test in self.tests:
- test.clean()
-