Added option to parallelize test runs.
authorchristian.plesner.hansen@gmail.com <christian.plesner.hansen@gmail.com@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Tue, 30 Sep 2008 15:32:07 +0000 (15:32 +0000)
committerchristian.plesner.hansen@gmail.com <christian.plesner.hansen@gmail.com@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Tue, 30 Sep 2008 15:32:07 +0000 (15:32 +0000)
git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@399 ce2b1a6d-e550-0410-aec6-3dcde31c8c00

tools/test.py

index d8ecb5f..36595c9 100755 (executable)
@@ -27,6 +27,7 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
+
 import imp
 import optparse
 import os
@@ -38,7 +39,9 @@ import subprocess
 import sys
 import tempfile
 import time
+import threading
 import utils
+from Queue import Queue, Empty
 
 
 VERBOSE = False
@@ -53,11 +56,15 @@ class ProgressIndicator(object):
 
   def __init__(self, cases):
     self.cases = cases
+    self.queue = Queue(len(cases))
+    for case in cases:
+      self.queue.put_nowait(case)
     self.succeeded = 0
-    self.failed = 0
-    self.remaining = len(self.cases)
-    self.total = len(self.cases)
-    self.failed_tests = [ ]
+    self.remaining = len(cases)
+    self.total = len(cases)
+    self.failed = [ ]
+    self.terminate = False
+    self.lock = threading.Lock()
 
   def PrintFailureHeader(self, test):
     if test.IsNegative():
@@ -70,21 +77,56 @@ class ProgressIndicator(object):
     }
     print "Path: %s" % "/".join(test.path)
 
-  def Run(self):
+  def Run(self, tasks):
     self.Starting()
-    for test in self.cases:
+    threads = []
+    # Spawn N-1 threads and then use this thread as the last one.
+    # That way -j1 avoids threading altogether which is a nice fallback
+    # in case of threading problems.
+    for i in xrange(tasks - 1):
+      thread = threading.Thread(target=self.RunSingle, args=[])
+      threads.append(thread)
+      thread.start()
+    try:
+      self.RunSingle()
+      # Wait for the remaining threads
+      for thread in threads:
+        # Use a timeout so that signals (ctrl-c) will be processed.
+        thread.join(timeout=10000000)
+    except Exception, e:
+      # If there's an exception we schedule an interruption for any
+      # remaining threads.
+      self.terminate = True
+      print e
+      return False
+    self.Done()
+    return not self.failed
+
+  def RunSingle(self):
+    while not self.terminate:
+      try:
+        test = self.queue.get_nowait()
+      except Empty:
+        return
       case = test.case
+      self.lock.acquire()
       self.AboutToRun(case)
-      output = case.Run()
+      self.lock.release()
+      try:
+        output = case.Run()
+      except IOError, e:
+        assert self.terminate
+        return
+      if self.terminate:
+        return
+      self.lock.acquire()
       if output.UnexpectedOutput():
-        self.failed += 1
-        self.failed_tests.append(output)
+        self.failed.append(output)
       else:
         self.succeeded += 1
       self.remaining -= 1
       self.HasRun(output)
-    self.Done()
-    return self.failed == 0
+      self.lock.release()
 
 
 def EscapeCommand(command):
@@ -106,7 +148,7 @@ class SimpleProgressIndicator(ProgressIndicator):
 
   def Done(self):
     print
-    for failed in self.failed_tests:
+    for failed in self.failed:
       self.PrintFailureHeader(failed.test)
       if failed.output.stderr:
         print "--- stderr ---"
@@ -115,14 +157,14 @@ class SimpleProgressIndicator(ProgressIndicator):
         print "--- stdout ---"
         print failed.output.stdout.strip()
       print "Command: %s" % EscapeCommand(failed.command)
-    if len(self.failed_tests) == 0:
+    if len(self.failed) == 0:
       print "==="
       print "=== All tests succeeded"
       print "==="
     else:
       print
       print "==="
-      print "=== %i tests failed" % len(self.failed_tests)
+      print "=== %i tests failed" % len(self.failed)
       print "==="
 
 
@@ -145,7 +187,7 @@ class DotsProgressIndicator(SimpleProgressIndicator):
     pass
 
   def HasRun(self, output):
-    total = self.succeeded + self.failed
+    total = self.succeeded + len(self.failed)
     if (total > 1) and (total % 50 == 1):
       sys.stdout.write('\n')
     if output.UnexpectedOutput():
@@ -197,7 +239,7 @@ class CompactProgressIndicator(ProgressIndicator):
     status = self.templates['status_line'] % {
       'passed': self.succeeded,
       'remaining': (((self.total - self.remaining) * 100) // self.total),
-      'failed': self.failed,
+      'failed': len(self.failed),
       'test': name,
       'mins': int(elapsed) / 60,
       'secs': int(elapsed) % 60
@@ -509,12 +551,12 @@ class Context(object):
     else:
       return name
 
-def RunTestCases(all_cases, progress):
+def RunTestCases(all_cases, progress, tasks):
   def DoSkip(case):
     return SKIP in c.outcomes or SLOW in c.outcomes
   cases_to_run = [ c for c in all_cases if not DoSkip(c) ]
   progress = PROGRESS_INDICATORS[progress](cases_to_run)
-  return progress.Run()
+  return progress.Run(tasks)
 
 
 def BuildRequirements(context, requirements, mode, scons_flags):
@@ -977,6 +1019,8 @@ def BuildOptions():
       default=False, action="store_true")
   result.add_option("--warn-unused", help="Report unused rules",
       default=False, action="store_true")
+  result.add_option("-j", help="The number of parallel tasks to run",
+      default=1, type="int")
   return result
 
 
@@ -1102,6 +1146,8 @@ def Main():
                     join(buildspace, 'shell'),
                     options.timeout,
                     GetSpecialCommandProcessor(options.special_command))
+  if options.j != 1:
+    options.scons_flags += ['-j', str(options.j)]
   if not options.no_build:
     reqs = [ ]
     for path in paths:
@@ -1164,7 +1210,7 @@ def Main():
     return 0
   else:
     try:
-      if RunTestCases(all_cases, options.progress):
+      if RunTestCases(all_cases, options.progress, options.j):
         return 0
       else:
         return 1