Run the test suite using a separate process for each test file.
"""
-import os, sys, platform
-import Queue, threading
import multiprocessing
+import os
+import platform
+import sys
from optparse import OptionParser
in_q = None
out_q = None
-def process_dir_worker():
+def process_dir_worker(arg_tuple):
"""Worker thread main loop when in multithreaded mode.
Takes one directory specification at a time and works on it."""
- while True:
- (root, files, test_root, dotest_options) = in_q.get()
- (dir_failed, dir_passed) = process_dir(root, files, test_root, dotest_options)
- out_q.put((dir_failed, dir_passed))
- in_q.task_done()
+ (root, files, test_root, dotest_options) = arg_tuple
+ return process_dir(root, files, test_root, dotest_options)
def walk_and_invoke(test_root, dotest_options, num_threads):
"""Look for matched files and invoke test driver on each one.
In single-threaded mode, each test driver is invoked directly.
In multi-threaded mode, submit each test driver to a worker
queue, and then wait for all to complete."""
+
+ # Collect the test files that we'll run.
+ test_work_items = []
+ for root, dirs, files in os.walk(test_root, topdown=False):
+ test_work_items.append((root, files, test_root, dotest_options))
+
+ # Run the items, either in a pool (for multicore speedup) or
+ # calling each individually.
+ if num_threads > 1:
+ pool = multiprocessing.Pool(num_threads)
+ test_results = pool.map(process_dir_worker, test_work_items)
+ else:
+ test_results = []
+ for work_item in test_work_items:
+ test_results.append(process_dir_worker(work_item))
+
failed = []
passed = []
- if (num_threads > 1):
- print("Running multithreaded with %d threads" % num_threads)
- global in_q
- global out_q
- in_q = Queue.Queue()
- out_q = Queue.Queue()
- for i in range(num_threads):
- t = threading.Thread(target=process_dir_worker)
- t.daemon = True
- t.start()
- else:
- print("Running single-threaded")
- for root, dirs, files in os.walk(test_root, topdown=False):
- if (num_threads > 1):
- in_q.put((root, files, test_root, dotest_options))
- else:
- (dir_failed, dir_passed) = process_dir(root, files, test_root, dotest_options)
- failed += dir_failed
- passed += dir_passed
- if (num_threads > 1):
- in_q.join()
- while not out_q.empty():
- (dir_failed, dir_passed) = out_q.get()
- failed += dir_failed
- passed += dir_passed
+
+ for test_result in test_results:
+ (dir_failed, dir_passed) = test_result
+ failed += dir_failed
+ passed += dir_passed
+
return (failed, passed)
def main():