Introduce a dynamic process pool for the local test driver
authormachenbach@chromium.org <machenbach@chromium.org@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Wed, 14 May 2014 13:30:57 +0000 (13:30 +0000)
committermachenbach@chromium.org <machenbach@chromium.org@ce2b1a6d-e550-0410-aec6-3dcde31c8c00>
Wed, 14 May 2014 13:30:57 +0000 (13:30 +0000)
The new process pool allows adding jobs after testing has been started. It will also allow to restructure building the job queue (in a follow up CL), so that testing can start instantly while the queue is being built.

Also attempts to clean up the keyboard-interrupt logic. Idea: Only catch keyboard interrupt once per process at the outermost level. Use proper "finally" clauses to clean up everywhere where a keyboard interrupt might occur. Never turn named exceptions into none-exceptions using anonymous "raise".

TEST=python -m unittest pool_unittest
R=jkummerow@chromium.org

Review URL: https://codereview.chromium.org/275093002

git-svn-id: https://v8.googlecode.com/svn/branches/bleeding_edge@21310 ce2b1a6d-e550-0410-aec6-3dcde31c8c00

tools/run-tests.py
tools/testrunner/local/commands.py
tools/testrunner/local/execution.py
tools/testrunner/local/pool.py [new file with mode: 0644]
tools/testrunner/local/pool_unittest.py [new file with mode: 0644]

index 346926c..6b07574 100755 (executable)
@@ -463,44 +463,39 @@ def Execute(arch, mode, args, options, suites, workspace):
     return 0
 
   # Run the tests, either locally or distributed on the network.
-  try:
-    start_time = time.time()
-    progress_indicator = progress.PROGRESS_INDICATORS[options.progress]()
-    if options.junitout:
-      progress_indicator = progress.JUnitTestProgressIndicator(
-          progress_indicator, options.junitout, options.junittestsuite)
-
-    run_networked = not options.no_network
-    if not run_networked:
-      print("Network distribution disabled, running tests locally.")
-    elif utils.GuessOS() != "linux":
-      print("Network distribution is only supported on Linux, sorry!")
+  start_time = time.time()
+  progress_indicator = progress.PROGRESS_INDICATORS[options.progress]()
+  if options.junitout:
+    progress_indicator = progress.JUnitTestProgressIndicator(
+        progress_indicator, options.junitout, options.junittestsuite)
+
+  run_networked = not options.no_network
+  if not run_networked:
+    print("Network distribution disabled, running tests locally.")
+  elif utils.GuessOS() != "linux":
+    print("Network distribution is only supported on Linux, sorry!")
+    run_networked = False
+  peers = []
+  if run_networked:
+    peers = network_execution.GetPeers()
+    if not peers:
+      print("No connection to distribution server; running tests locally.")
       run_networked = False
-    peers = []
-    if run_networked:
-      peers = network_execution.GetPeers()
-      if not peers:
-        print("No connection to distribution server; running tests locally.")
-        run_networked = False
-      elif len(peers) == 1:
-        print("No other peers on the network; running tests locally.")
-        run_networked = False
-      elif num_tests <= 100:
-        print("Less than 100 tests, running them locally.")
-        run_networked = False
-
-    if run_networked:
-      runner = network_execution.NetworkedRunner(suites, progress_indicator,
-                                                 ctx, peers, workspace)
-    else:
-      runner = execution.Runner(suites, progress_indicator, ctx)
-
-    exit_code = runner.Run(options.j)
-    if runner.terminate:
-      return exit_code
-    overall_duration = time.time() - start_time
-  except KeyboardInterrupt:
-    raise
+    elif len(peers) == 1:
+      print("No other peers on the network; running tests locally.")
+      run_networked = False
+    elif num_tests <= 100:
+      print("Less than 100 tests, running them locally.")
+      run_networked = False
+
+  if run_networked:
+    runner = network_execution.NetworkedRunner(suites, progress_indicator,
+                                               ctx, peers, workspace)
+  else:
+    runner = execution.Runner(suites, progress_indicator, ctx)
+
+  exit_code = runner.Run(options.j)
+  overall_duration = time.time() - start_time
 
   if options.time:
     verbose.PrintTestDurations(suites, overall_duration)
index 4f3dc51..d6445d0 100644 (file)
@@ -64,49 +64,46 @@ def Win32SetErrorMode(mode):
 
 
 def RunProcess(verbose, timeout, args, **rest):
-  try:
-    if verbose: print "#", " ".join(args)
-    popen_args = args
-    prev_error_mode = SEM_INVALID_VALUE
-    if utils.IsWindows():
-      popen_args = subprocess.list2cmdline(args)
-      # Try to change the error mode to avoid dialogs on fatal errors. Don't
-      # touch any existing error mode flags by merging the existing error mode.
-      # See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx.
-      error_mode = SEM_NOGPFAULTERRORBOX
-      prev_error_mode = Win32SetErrorMode(error_mode)
-      Win32SetErrorMode(error_mode | prev_error_mode)
-    process = subprocess.Popen(
-      shell=utils.IsWindows(),
-      args=popen_args,
-      **rest
-    )
-    if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE):
-      Win32SetErrorMode(prev_error_mode)
-    # Compute the end time - if the process crosses this limit we
-    # consider it timed out.
-    if timeout is None: end_time = None
-    else: end_time = time.time() + timeout
-    timed_out = False
-    # Repeatedly check the exit code from the process in a
-    # loop and keep track of whether or not it times out.
-    exit_code = None
-    sleep_time = INITIAL_SLEEP_TIME
-    while exit_code is None:
-      if (not end_time is None) and (time.time() >= end_time):
-        # Kill the process and wait for it to exit.
-        KillProcessWithID(process.pid)
-        exit_code = process.wait()
-        timed_out = True
-      else:
-        exit_code = process.poll()
-        time.sleep(sleep_time)
-        sleep_time = sleep_time * SLEEP_TIME_FACTOR
-        if sleep_time > MAX_SLEEP_TIME:
-          sleep_time = MAX_SLEEP_TIME
-    return (exit_code, timed_out)
-  except KeyboardInterrupt:
-    raise
+  if verbose: print "#", " ".join(args)
+  popen_args = args
+  prev_error_mode = SEM_INVALID_VALUE
+  if utils.IsWindows():
+    popen_args = subprocess.list2cmdline(args)
+    # Try to change the error mode to avoid dialogs on fatal errors. Don't
+    # touch any existing error mode flags by merging the existing error mode.
+    # See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx.
+    error_mode = SEM_NOGPFAULTERRORBOX
+    prev_error_mode = Win32SetErrorMode(error_mode)
+    Win32SetErrorMode(error_mode | prev_error_mode)
+  process = subprocess.Popen(
+    shell=utils.IsWindows(),
+    args=popen_args,
+    **rest
+  )
+  if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE):
+    Win32SetErrorMode(prev_error_mode)
+  # Compute the end time - if the process crosses this limit we
+  # consider it timed out.
+  if timeout is None: end_time = None
+  else: end_time = time.time() + timeout
+  timed_out = False
+  # Repeatedly check the exit code from the process in a
+  # loop and keep track of whether or not it times out.
+  exit_code = None
+  sleep_time = INITIAL_SLEEP_TIME
+  while exit_code is None:
+    if (not end_time is None) and (time.time() >= end_time):
+      # Kill the process and wait for it to exit.
+      KillProcessWithID(process.pid)
+      exit_code = process.wait()
+      timed_out = True
+    else:
+      exit_code = process.poll()
+      time.sleep(sleep_time)
+      sleep_time = sleep_time * SLEEP_TIME_FACTOR
+      if sleep_time > MAX_SLEEP_TIME:
+        sleep_time = MAX_SLEEP_TIME
+  return (exit_code, timed_out)
 
 
 def PrintError(string):
@@ -142,11 +139,9 @@ def Execute(args, verbose=False, timeout=None):
       stdout=fd_out,
       stderr=fd_err
     )
-  except KeyboardInterrupt:
-    raise
-  except:
-    raise
   finally:
+    # TODO(machenbach): A keyboard interrupt before the assignment to
+    # fd_out|err can lead to reference errors here.
     os.close(fd_out)
     os.close(fd_err)
     out = file(outname).read()
index f4a4020..80d881b 100644 (file)
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 
-import multiprocessing
 import os
-import threading
 import time
 
+from pool import Pool
 from . import commands
 from . import utils
 
 
-BREAK_NOW = -1
-EXCEPTION = -2
-
-
 class Job(object):
   def __init__(self, command, dep_command, test_id, timeout, verbose):
     self.command = command
@@ -49,24 +44,17 @@ class Job(object):
 
 
 def RunTest(job):
-  try:
-    start_time = time.time()
-    if job.dep_command is not None:
-      dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
-      # TODO(jkummerow): We approximate the test suite specific function
-      # IsFailureOutput() by just checking the exit code here. Currently
-      # only cctests define dependencies, for which this simplification is
-      # correct.
-      if dep_output.exit_code != 0:
-        return (job.id, dep_output, time.time() - start_time)
-    output = commands.Execute(job.command, job.verbose, job.timeout)
-    return (job.id, output, time.time() - start_time)
-  except KeyboardInterrupt:
-    return (-1, BREAK_NOW, 0)
-  except Exception, e:
-    print(">>> EXCEPTION: %s" % e)
-    return (-1, EXCEPTION, 0)
-
+  start_time = time.time()
+  if job.dep_command is not None:
+    dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
+    # TODO(jkummerow): We approximate the test suite specific function
+    # IsFailureOutput() by just checking the exit code here. Currently
+    # only cctests define dependencies, for which this simplification is
+    # correct.
+    if dep_output.exit_code != 0:
+      return (job.id, dep_output, time.time() - start_time)
+  output = commands.Execute(job.command, job.verbose, job.timeout)
+  return (job.id, output, time.time() - start_time)
 
 class Runner(object):
 
@@ -83,8 +71,6 @@ class Runner(object):
     self.remaining = num_tests
     self.failed = []
     self.crashed = 0
-    self.terminate = False
-    self.lock = threading.Lock()
 
   def Run(self, jobs):
     self.indicator.Starting()
@@ -95,8 +81,11 @@ class Runner(object):
     return 0
 
   def _RunInternal(self, jobs):
-    pool = multiprocessing.Pool(processes=jobs)
+    pool = Pool(jobs)
     test_map = {}
+    # TODO(machenbach): Instead of filling the queue completely before
+    # pool.imap_unordered, make this a generator that already starts testing
+    # while the queue is filled.
     queue = []
     queued_exception = None
     for test in self.tests:
@@ -119,22 +108,11 @@ class Runner(object):
       else:
         dep_command = None
       job = Job(command, dep_command, test.id, timeout, self.context.verbose)
-      queue.append(job)
+      queue.append([job])
     try:
-      kChunkSize = 1
-      it = pool.imap_unordered(RunTest, queue, kChunkSize)
+      it = pool.imap_unordered(RunTest, queue)
       for result in it:
-        test_id = result[0]
-        if test_id < 0:
-          if result[1] == BREAK_NOW:
-            self.terminate = True
-          else:
-            continue
-        if self.terminate:
-          pool.terminate()
-          pool.join()
-          raise BreakNowException("User pressed Ctrl+C or IO went wrong")
-        test = test_map[test_id]
+        test = test_map[result[0]]
         self.indicator.AboutToRun(test)
         test.output = result[1]
         test.duration = result[2]
@@ -147,18 +125,10 @@ class Runner(object):
           self.succeeded += 1
         self.remaining -= 1
         self.indicator.HasRun(test, has_unexpected_output)
-    except KeyboardInterrupt:
-      pool.terminate()
-      pool.join()
-      raise
-    except Exception, e:
-      print("Exception: %s" % e)
+    finally:
       pool.terminate()
-      pool.join()
-      raise
     if queued_exception:
       raise queued_exception
-    return
 
 
   def GetCommand(self, test):
diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py
new file mode 100644 (file)
index 0000000..8f629f9
--- /dev/null
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# Copyright 2014 the V8 project authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from multiprocessing import Event, Process, Queue
+
+class NormalResult():
+  def __init__(self, result):
+    self.result = result
+    self.exception = False
+    self.break_now = False
+
+
+class ExceptionResult():
+  def __init__(self):
+    self.exception = True
+    self.break_now = False
+
+
+class BreakResult():
+  def __init__(self):
+    self.exception = False
+    self.break_now = True
+
+
+def Worker(fn, work_queue, done_queue, done):
+  """Worker to be run in a child process.
+  The worker stops on two conditions. 1. When the poison pill "STOP" is
+  reached or 2. when the event "done" is set."""
+  try:
+    for args in iter(work_queue.get, "STOP"):
+      if done.is_set():
+        break
+      try:
+        done_queue.put(NormalResult(fn(*args)))
+      except Exception, e:
+        print(">>> EXCEPTION: %s" % e)
+        done_queue.put(ExceptionResult())
+  except KeyboardInterrupt:
+    done_queue.put(BreakResult())
+
+
+class Pool():
+  """Distributes tasks to a number of worker processes.
+  New tasks can be added dynamically even after the workers have been started.
+  Requirement: Tasks can only be added from the parent process, e.g. while
+  consuming the results generator."""
+
+  # Factor to calculate the maximum number of items in the work/done queue.
+  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
+  BUFFER_FACTOR = 4
+
+  def __init__(self, num_workers):
+    self.num_workers = num_workers
+    self.processes = []
+    self.terminated = False
+
+    # Invariant: count >= #work_queue + #done_queue. It is greater when a
+    # worker takes an item from the work_queue and before the result is
+    # submitted to the done_queue. It is equal when no worker is working,
+    # e.g. when all workers have finished, and when no results are processed.
+    # Count is only accessed by the parent process. Only the parent process is
+    # allowed to remove items from the done_queue and to add items to the
+    # work_queue.
+    self.count = 0
+    self.work_queue = Queue()
+    self.done_queue = Queue()
+    self.done = Event()
+
+  def imap_unordered(self, fn, gen):
+    """Maps function "fn" to items in generator "gen" on the worker processes
+    in an arbitrary order. The items are expected to be lists of arguments to
+    the function. Returns a results iterator."""
+    try:
+      gen = iter(gen)
+      self.advance = self._advance_more
+
+      for w in xrange(self.num_workers):
+        p = Process(target=Worker, args=(fn,
+                                         self.work_queue,
+                                         self.done_queue,
+                                         self.done))
+        self.processes.append(p)
+        p.start()
+
+      self.advance(gen)
+      while self.count > 0:
+        result = self.done_queue.get()
+        self.count -= 1
+        if result.exception:
+          # Ignore items with unexpected exceptions.
+          continue
+        elif result.break_now:
+          # A keyboard interrupt happened in one of the worker processes.
+          raise KeyboardInterrupt
+        else:
+          yield result.result
+        self.advance(gen)
+    finally:
+      self.terminate()
+
+  def _advance_more(self, gen):
+    while self.count < self.num_workers * self.BUFFER_FACTOR:
+      try:
+        self.work_queue.put(gen.next())
+        self.count += 1
+      except StopIteration:
+        self.advance = self._advance_empty
+        break
+
+  def _advance_empty(self, gen):
+    pass
+
+  def add(self, args):
+    """Adds an item to the work queue. Can be called dynamically while
+    processing the results from imap_unordered."""
+    self.work_queue.put(args)
+    self.count += 1
+
+  def terminate(self):
+    if self.terminated:
+      return
+    self.terminated = True
+
+    # For exceptional tear down set the "done" event to stop the workers before
+    # they empty the queue buffer.
+    self.done.set()
+
+    for p in self.processes:
+      # During normal tear down the workers block on get(). Feed a poison pill
+      # per worker to make them stop.
+      self.work_queue.put("STOP")
+
+    for p in self.processes:
+      p.join()
diff --git a/tools/testrunner/local/pool_unittest.py b/tools/testrunner/local/pool_unittest.py
new file mode 100644 (file)
index 0000000..bf2b3f8
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+# Copyright 2014 the V8 project authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import unittest
+
+from pool import Pool
+
+def Run(x):
+  if x == 10:
+    raise Exception("Expected exception triggered by test.")
+  return x
+
+class PoolTest(unittest.TestCase):
+  def testNormal(self):
+    results = set()
+    pool = Pool(3)
+    for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
+      results.add(result)
+    self.assertEquals(set(range(0, 10)), results)
+
+  def testException(self):
+    results = set()
+    pool = Pool(3)
+    for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
+      # Item 10 will not appear in results due to an internal exception.
+      results.add(result)
+    expect = set(range(0, 12))
+    expect.remove(10)
+    self.assertEquals(expect, results)
+
+  def testAdd(self):
+    results = set()
+    pool = Pool(3)
+    for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
+      results.add(result)
+      if result < 30:
+        pool.add([result + 20])
+    self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)),
+                      results)