test: run tests in parallel, common improvements
authorFedor Indutny <fedor@indutny.com>
Wed, 17 Dec 2014 13:34:21 +0000 (20:34 +0700)
committerFedor Indutny <fedor@indutny.com>
Wed, 17 Dec 2014 13:45:37 +0000 (20:45 +0700)
* Allow running tests in mixed parallel/sequential modes
* Add -J flag for running tests on all available CPUs
* Support TEST_THREAD_ID in test/common.js and use it for tmpDir and PORT
* make: use -J flag

Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
PR-URL: https://github.com/iojs/io.js/pull/172
Fix: iojs/io.js#139

.gitignore
Makefile
test/common.js
test/testpy/__init__.py
tools/test.py
vcbuild.bat

index 6581dee..50e672f 100644 (file)
@@ -9,6 +9,7 @@ tags
 *.pyc
 doc/api.xml
 tmp/
+test/tmp*/
 node
 node_g
 *.swp
index 114d198..b436032 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -90,7 +90,7 @@ distclean:
        -rm -rf node_modules
 
 test: all
-       $(PYTHON) tools/test.py --mode=release message parallel sequential
+       $(PYTHON) tools/test.py --mode=release message parallel sequential -J
        $(MAKE) jslint
        $(MAKE) cpplint
 
index bbc772e..82fc2e2 100644 (file)
@@ -27,9 +27,18 @@ var os = require('os');
 exports.testDir = path.dirname(__filename);
 exports.fixturesDir = path.join(exports.testDir, 'fixtures');
 exports.libDir = path.join(exports.testDir, '../lib');
-exports.tmpDir = path.join(exports.testDir, 'tmp');
+exports.tmpDirName = 'tmp';
 exports.PORT = +process.env.NODE_COMMON_PORT || 12346;
 
+if (process.env.TEST_THREAD_ID) {
+  // Distribute ports in parallel tests
+  if (!process.env.NODE_COMMON_PORT)
+    exports.PORT += +process.env.TEST_THREAD_ID * 100;
+
+  exports.tmpDirName += '.' + process.env.TEST_THREAD_ID;
+}
+exports.tmpDir = path.join(exports.testDir, exports.tmpDirName);
+
 exports.opensslCli = path.join(path.dirname(process.execPath), 'openssl-cli');
 if (process.platform === 'win32') {
   exports.PIPE = '\\\\.\\pipe\\libuv-test';
index cf64ba1..a46e964 100644 (file)
@@ -49,30 +49,34 @@ class SimpleTestCase(test.TestCase):
     self.mode = mode
     self.tmpdir = join(dirname(self.config.root), 'tmp')
     self.additional_flags = additional
+
+  def GetTmpDir(self):
+    return "%s.%d" % (self.tmpdir, self.thread_id)
+
   
   def AfterRun(self, result):
     # delete the whole tmp dir
     try:
-      rmtree(self.tmpdir)
+      rmtree(self.GetTmpDir())
     except:
       pass
     # make it again.
     try:
-      mkdir(self.tmpdir)
+      mkdir(self.GetTmpDir())
     except:
       pass
 
   def BeforeRun(self):
     # delete the whole tmp dir
     try:
-      rmtree(self.tmpdir)
+      rmtree(self.GetTmpDir())
     except:
       pass
     # make it again.
     # intermittently fails on win32, so keep trying
-    while not os.path.exists(self.tmpdir):
+    while not os.path.exists(self.GetTmpDir()):
       try:
-        mkdir(self.tmpdir)
+        mkdir(self.GetTmpDir())
       except:
         pass
   
@@ -105,7 +109,6 @@ class SimpleTestCase(test.TestCase):
   def GetSource(self):
     return open(self.file).read()
 
-
 class SimpleTestConfiguration(test.TestConfiguration):
 
   def __init__(self, context, root, section, additional=[]):
@@ -136,6 +139,18 @@ class SimpleTestConfiguration(test.TestConfiguration):
     if exists(status_file):
       test.ReadConfigurationInto(status_file, sections, defs)
 
+class ParallelTestConfiguration(SimpleTestConfiguration):
+  def __init__(self, context, root, section, additional=[]):
+    super(ParallelTestConfiguration, self).__init__(context, root, section,
+                                                    additional)
+
+  def ListTests(self, current_path, path, arch, mode):
+    result = super(ParallelTestConfiguration, self).ListTests(
+         current_path, path, arch, mode)
+    for test in result:
+      test.parallel = True
+    return result
+
 class AddonTestConfiguration(SimpleTestConfiguration):
   def __init__(self, context, root, section, additional=[]):
     super(AddonTestConfiguration, self).__init__(context, root, section)
index cb60fbd..7241f16 100755 (executable)
@@ -40,6 +40,7 @@ import tempfile
 import time
 import threading
 import utils
+import multiprocessing
 
 from os.path import join, dirname, abspath, basename, isdir, exists
 from datetime import datetime
@@ -58,9 +59,13 @@ class ProgressIndicator(object):
   def __init__(self, cases, flaky_tests_mode):
     self.cases = cases
     self.flaky_tests_mode = flaky_tests_mode
-    self.queue = Queue(len(cases))
+    self.parallel_queue = Queue(len(cases))
+    self.sequential_queue = Queue(len(cases))
     for case in cases:
-      self.queue.put_nowait(case)
+      if case.parallel:
+        self.parallel_queue.put_nowait(case)
+      else:
+        self.sequential_queue.put_nowait(case)
     self.succeeded = 0
     self.remaining = len(cases)
     self.total = len(cases)
@@ -87,11 +92,11 @@ class ProgressIndicator(object):
     # That way -j1 avoids threading altogether which is a nice fallback
     # in case of threading problems.
     for i in xrange(tasks - 1):
-      thread = threading.Thread(target=self.RunSingle, args=[])
+      thread = threading.Thread(target=self.RunSingle, args=[True, i + 1])
       threads.append(thread)
       thread.start()
     try:
-      self.RunSingle()
+      self.RunSingle(False, 0)
       # Wait for the remaining threads
       for thread in threads:
         # Use a timeout so that signals (ctrl-c) will be processed.
@@ -105,13 +110,19 @@ class ProgressIndicator(object):
     self.Done()
     return not self.failed
 
-  def RunSingle(self):
+  def RunSingle(self, parallel, thread_id):
     while not self.terminate:
       try:
-        test = self.queue.get_nowait()
+        test = self.parallel_queue.get_nowait()
       except Empty:
-        return
+        if parallel:
+          return
+        try:
+          test = self.sequential_queue.get_nowait()
+        except Empty:
+          return
       case = test.case
+      case.thread_id = thread_id
       self.lock.acquire()
       self.AboutToRun(case)
       self.lock.release()
@@ -381,6 +392,8 @@ class TestCase(object):
     self.duration = None
     self.arch = arch
     self.mode = mode
+    self.parallel = False
+    self.thread_id = 0
 
   def IsNegative(self):
     return False
@@ -399,11 +412,12 @@ class TestCase(object):
   def GetSource(self):
     return "(no source available)"
 
-  def RunCommand(self, command):
+  def RunCommand(self, command, env):
     full_command = self.context.processor(command)
     output = Execute(full_command,
                      self.context,
-                     self.context.GetTimeout(self.mode))
+                     self.context.GetTimeout(self.mode),
+                     env)
     self.Cleanup()
     return TestOutput(self,
                       full_command,
@@ -420,7 +434,9 @@ class TestCase(object):
     self.BeforeRun()
 
     try:
-      result = self.RunCommand(self.GetCommand())
+      result = self.RunCommand(self.GetCommand(), {
+        "TEST_THREAD_ID": "%d" % self.thread_id
+      })
     finally:
       # Tests can leave the tty in non-blocking mode. If the test runner
       # tries to print to stdout/stderr after that and the tty buffer is
@@ -559,15 +575,22 @@ def CheckedUnlink(name):
     PrintError("os.unlink() " + str(e))
 
 
-def Execute(args, context, timeout=None):
+def Execute(args, context, timeout=None, env={}):
   (fd_out, outname) = tempfile.mkstemp()
   (fd_err, errname) = tempfile.mkstemp()
+
+  # Extend environment
+  env_copy = os.environ.copy()
+  for key, value in env.iteritems():
+    env_copy[key] = value
+
   (process, exit_code, timed_out) = RunProcess(
     context,
     timeout,
     args = args,
     stdout = fd_out,
     stderr = fd_err,
+    env = env_copy
   )
   os.close(fd_out)
   os.close(fd_err)
@@ -1068,6 +1091,7 @@ class ClassifiedTest(object):
   def __init__(self, case, outcomes):
     self.case = case
     self.outcomes = outcomes
+    self.parallel = self.case.parallel
 
 
 class Configuration(object):
@@ -1224,6 +1248,8 @@ def BuildOptions():
       default=False, action="store_true")
   result.add_option("-j", help="The number of parallel tasks to run",
       default=1, type="int")
+  result.add_option("-J", help="Run tasks in parallel on all cores",
+      default=False, action="store_true")
   result.add_option("--time", help="Print timing information after running",
       default=False, action="store_true")
   result.add_option("--suppress-dialogs", help="Suppress Windows dialogs for crashing tests",
@@ -1245,6 +1271,8 @@ def ProcessOptions(options):
   VERBOSE = options.verbose
   options.arch = options.arch.split(',')
   options.mode = options.mode.split(',')
+  if options.J:
+    options.j = multiprocessing.cpu_count()
   def CheckTestMode(name, option):
     if not option in ["run", "skip", "dontcare"]:
       print "Unknown %s mode %s" % (name, option)
index ab46819..8a66f7b 100644 (file)
@@ -163,7 +163,7 @@ if "%test%"=="" goto exit
 if "%config%"=="Debug" set test_args=--mode=debug
 if "%config%"=="Release" set test_args=--mode=release
 
-if "%test%"=="test" set test_args=%test_args% sequential parallel message
+if "%test%"=="test" set test_args=%test_args% sequential parallel message -J
 if "%test%"=="test-internet" set test_args=%test_args% internet
 if "%test%"=="test-pummel" set test_args=%test_args% pummel
 if "%test%"=="test-simple" set test_args=%test_args% sequential parallel