return 0
# Run the tests, either locally or distributed on the network.
- try:
- start_time = time.time()
- progress_indicator = progress.PROGRESS_INDICATORS[options.progress]()
- if options.junitout:
- progress_indicator = progress.JUnitTestProgressIndicator(
- progress_indicator, options.junitout, options.junittestsuite)
-
- run_networked = not options.no_network
- if not run_networked:
- print("Network distribution disabled, running tests locally.")
- elif utils.GuessOS() != "linux":
- print("Network distribution is only supported on Linux, sorry!")
+ start_time = time.time()
+ progress_indicator = progress.PROGRESS_INDICATORS[options.progress]()
+ if options.junitout:
+ progress_indicator = progress.JUnitTestProgressIndicator(
+ progress_indicator, options.junitout, options.junittestsuite)
+
+ run_networked = not options.no_network
+ if not run_networked:
+ print("Network distribution disabled, running tests locally.")
+ elif utils.GuessOS() != "linux":
+ print("Network distribution is only supported on Linux, sorry!")
+ run_networked = False
+ peers = []
+ if run_networked:
+ peers = network_execution.GetPeers()
+ if not peers:
+ print("No connection to distribution server; running tests locally.")
run_networked = False
- peers = []
- if run_networked:
- peers = network_execution.GetPeers()
- if not peers:
- print("No connection to distribution server; running tests locally.")
- run_networked = False
- elif len(peers) == 1:
- print("No other peers on the network; running tests locally.")
- run_networked = False
- elif num_tests <= 100:
- print("Less than 100 tests, running them locally.")
- run_networked = False
-
- if run_networked:
- runner = network_execution.NetworkedRunner(suites, progress_indicator,
- ctx, peers, workspace)
- else:
- runner = execution.Runner(suites, progress_indicator, ctx)
-
- exit_code = runner.Run(options.j)
- if runner.terminate:
- return exit_code
- overall_duration = time.time() - start_time
- except KeyboardInterrupt:
- raise
+ elif len(peers) == 1:
+ print("No other peers on the network; running tests locally.")
+ run_networked = False
+ elif num_tests <= 100:
+ print("Less than 100 tests, running them locally.")
+ run_networked = False
+
+ if run_networked:
+ runner = network_execution.NetworkedRunner(suites, progress_indicator,
+ ctx, peers, workspace)
+ else:
+ runner = execution.Runner(suites, progress_indicator, ctx)
+
+ exit_code = runner.Run(options.j)
+ overall_duration = time.time() - start_time
if options.time:
verbose.PrintTestDurations(suites, overall_duration)
def RunProcess(verbose, timeout, args, **rest):
- try:
- if verbose: print "#", " ".join(args)
- popen_args = args
- prev_error_mode = SEM_INVALID_VALUE
- if utils.IsWindows():
- popen_args = subprocess.list2cmdline(args)
- # Try to change the error mode to avoid dialogs on fatal errors. Don't
- # touch any existing error mode flags by merging the existing error mode.
- # See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx.
- error_mode = SEM_NOGPFAULTERRORBOX
- prev_error_mode = Win32SetErrorMode(error_mode)
- Win32SetErrorMode(error_mode | prev_error_mode)
- process = subprocess.Popen(
- shell=utils.IsWindows(),
- args=popen_args,
- **rest
- )
- if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE):
- Win32SetErrorMode(prev_error_mode)
- # Compute the end time - if the process crosses this limit we
- # consider it timed out.
- if timeout is None: end_time = None
- else: end_time = time.time() + timeout
- timed_out = False
- # Repeatedly check the exit code from the process in a
- # loop and keep track of whether or not it times out.
- exit_code = None
- sleep_time = INITIAL_SLEEP_TIME
- while exit_code is None:
- if (not end_time is None) and (time.time() >= end_time):
- # Kill the process and wait for it to exit.
- KillProcessWithID(process.pid)
- exit_code = process.wait()
- timed_out = True
- else:
- exit_code = process.poll()
- time.sleep(sleep_time)
- sleep_time = sleep_time * SLEEP_TIME_FACTOR
- if sleep_time > MAX_SLEEP_TIME:
- sleep_time = MAX_SLEEP_TIME
- return (exit_code, timed_out)
- except KeyboardInterrupt:
- raise
+ if verbose: print "#", " ".join(args)
+ popen_args = args
+ prev_error_mode = SEM_INVALID_VALUE
+ if utils.IsWindows():
+ popen_args = subprocess.list2cmdline(args)
+ # Try to change the error mode to avoid dialogs on fatal errors. Don't
+ # touch any existing error mode flags by merging the existing error mode.
+ # See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx.
+ error_mode = SEM_NOGPFAULTERRORBOX
+ prev_error_mode = Win32SetErrorMode(error_mode)
+ Win32SetErrorMode(error_mode | prev_error_mode)
+ process = subprocess.Popen(
+ shell=utils.IsWindows(),
+ args=popen_args,
+ **rest
+ )
+ if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE):
+ Win32SetErrorMode(prev_error_mode)
+ # Compute the end time - if the process crosses this limit we
+ # consider it timed out.
+ if timeout is None: end_time = None
+ else: end_time = time.time() + timeout
+ timed_out = False
+ # Repeatedly check the exit code from the process in a
+ # loop and keep track of whether or not it times out.
+ exit_code = None
+ sleep_time = INITIAL_SLEEP_TIME
+ while exit_code is None:
+ if (not end_time is None) and (time.time() >= end_time):
+ # Kill the process and wait for it to exit.
+ KillProcessWithID(process.pid)
+ exit_code = process.wait()
+ timed_out = True
+ else:
+ exit_code = process.poll()
+ time.sleep(sleep_time)
+ sleep_time = sleep_time * SLEEP_TIME_FACTOR
+ if sleep_time > MAX_SLEEP_TIME:
+ sleep_time = MAX_SLEEP_TIME
+ return (exit_code, timed_out)
def PrintError(string):
stdout=fd_out,
stderr=fd_err
)
- except KeyboardInterrupt:
- raise
- except:
- raise
finally:
+ # TODO(machenbach): A keyboard interrupt before the assignment to
+ # fd_out|err can lead to reference errors here.
os.close(fd_out)
os.close(fd_err)
out = file(outname).read()
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-import multiprocessing
import os
-import threading
import time
+from pool import Pool
from . import commands
from . import utils
-BREAK_NOW = -1
-EXCEPTION = -2
-
-
class Job(object):
def __init__(self, command, dep_command, test_id, timeout, verbose):
self.command = command
def RunTest(job):
- try:
- start_time = time.time()
- if job.dep_command is not None:
- dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
- # TODO(jkummerow): We approximate the test suite specific function
- # IsFailureOutput() by just checking the exit code here. Currently
- # only cctests define dependencies, for which this simplification is
- # correct.
- if dep_output.exit_code != 0:
- return (job.id, dep_output, time.time() - start_time)
- output = commands.Execute(job.command, job.verbose, job.timeout)
- return (job.id, output, time.time() - start_time)
- except KeyboardInterrupt:
- return (-1, BREAK_NOW, 0)
- except Exception, e:
- print(">>> EXCEPTION: %s" % e)
- return (-1, EXCEPTION, 0)
-
+ start_time = time.time()
+ if job.dep_command is not None:
+ dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
+ # TODO(jkummerow): We approximate the test suite specific function
+ # IsFailureOutput() by just checking the exit code here. Currently
+ # only cctests define dependencies, for which this simplification is
+ # correct.
+ if dep_output.exit_code != 0:
+ return (job.id, dep_output, time.time() - start_time)
+ output = commands.Execute(job.command, job.verbose, job.timeout)
+ return (job.id, output, time.time() - start_time)
class Runner(object):
self.remaining = num_tests
self.failed = []
self.crashed = 0
- self.terminate = False
- self.lock = threading.Lock()
def Run(self, jobs):
self.indicator.Starting()
return 0
def _RunInternal(self, jobs):
- pool = multiprocessing.Pool(processes=jobs)
+ pool = Pool(jobs)
test_map = {}
+ # TODO(machenbach): Instead of filling the queue completely before
+ # pool.imap_unordered, make this a generator that already starts testing
+ # while the queue is filled.
queue = []
queued_exception = None
for test in self.tests:
else:
dep_command = None
job = Job(command, dep_command, test.id, timeout, self.context.verbose)
- queue.append(job)
+ queue.append([job])
try:
- kChunkSize = 1
- it = pool.imap_unordered(RunTest, queue, kChunkSize)
+ it = pool.imap_unordered(RunTest, queue)
for result in it:
- test_id = result[0]
- if test_id < 0:
- if result[1] == BREAK_NOW:
- self.terminate = True
- else:
- continue
- if self.terminate:
- pool.terminate()
- pool.join()
- raise BreakNowException("User pressed Ctrl+C or IO went wrong")
- test = test_map[test_id]
+ test = test_map[result[0]]
self.indicator.AboutToRun(test)
test.output = result[1]
test.duration = result[2]
self.succeeded += 1
self.remaining -= 1
self.indicator.HasRun(test, has_unexpected_output)
- except KeyboardInterrupt:
- pool.terminate()
- pool.join()
- raise
- except Exception, e:
- print("Exception: %s" % e)
+ finally:
pool.terminate()
- pool.join()
- raise
if queued_exception:
raise queued_exception
- return
def GetCommand(self, test):
--- /dev/null
+#!/usr/bin/env python
+# Copyright 2014 the V8 project authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from multiprocessing import Event, Process, Queue
+
+class NormalResult():
+ def __init__(self, result):
+ self.result = result
+ self.exception = False
+ self.break_now = False
+
+
+class ExceptionResult():
+ def __init__(self):
+ self.exception = True
+ self.break_now = False
+
+
+class BreakResult():
+ def __init__(self):
+ self.exception = False
+ self.break_now = True
+
+
+def Worker(fn, work_queue, done_queue, done):
+ """Worker to be run in a child process.
+ The worker stops on two conditions. 1. When the poison pill "STOP" is
+ reached or 2. when the event "done" is set."""
+ try:
+ for args in iter(work_queue.get, "STOP"):
+ if done.is_set():
+ break
+ try:
+ done_queue.put(NormalResult(fn(*args)))
+ except Exception, e:
+ print(">>> EXCEPTION: %s" % e)
+ done_queue.put(ExceptionResult())
+ except KeyboardInterrupt:
+ done_queue.put(BreakResult())
+
+
+class Pool():
+ """Distributes tasks to a number of worker processes.
+ New tasks can be added dynamically even after the workers have been started.
+ Requirement: Tasks can only be added from the parent process, e.g. while
+ consuming the results generator."""
+
+ # Factor to calculate the maximum number of items in the work/done queue.
+ # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
+ BUFFER_FACTOR = 4
+
+ def __init__(self, num_workers):
+ self.num_workers = num_workers
+ self.processes = []
+ self.terminated = False
+
+ # Invariant: count >= #work_queue + #done_queue. It is greater when a
+ # worker takes an item from the work_queue and before the result is
+ # submitted to the done_queue. It is equal when no worker is working,
+ # e.g. when all workers have finished, and when no results are processed.
+ # Count is only accessed by the parent process. Only the parent process is
+ # allowed to remove items from the done_queue and to add items to the
+ # work_queue.
+ self.count = 0
+ self.work_queue = Queue()
+ self.done_queue = Queue()
+ self.done = Event()
+
+ def imap_unordered(self, fn, gen):
+ """Maps function "fn" to items in generator "gen" on the worker processes
+ in an arbitrary order. The items are expected to be lists of arguments to
+ the function. Returns a results iterator."""
+ try:
+ gen = iter(gen)
+ self.advance = self._advance_more
+
+ for w in xrange(self.num_workers):
+ p = Process(target=Worker, args=(fn,
+ self.work_queue,
+ self.done_queue,
+ self.done))
+ self.processes.append(p)
+ p.start()
+
+ self.advance(gen)
+ while self.count > 0:
+ result = self.done_queue.get()
+ self.count -= 1
+ if result.exception:
+ # Ignore items with unexpected exceptions.
+ continue
+ elif result.break_now:
+ # A keyboard interrupt happened in one of the worker processes.
+ raise KeyboardInterrupt
+ else:
+ yield result.result
+ self.advance(gen)
+ finally:
+ self.terminate()
+
+ def _advance_more(self, gen):
+ while self.count < self.num_workers * self.BUFFER_FACTOR:
+ try:
+ self.work_queue.put(gen.next())
+ self.count += 1
+ except StopIteration:
+ self.advance = self._advance_empty
+ break
+
+ def _advance_empty(self, gen):
+ pass
+
+ def add(self, args):
+ """Adds an item to the work queue. Can be called dynamically while
+ processing the results from imap_unordered."""
+ self.work_queue.put(args)
+ self.count += 1
+
+ def terminate(self):
+ if self.terminated:
+ return
+ self.terminated = True
+
+ # For exceptional tear down set the "done" event to stop the workers before
+ # they empty the queue buffer.
+ self.done.set()
+
+ for p in self.processes:
+ # During normal tear down the workers block on get(). Feed a poison pill
+ # per worker to make them stop.
+ self.work_queue.put("STOP")
+
+ for p in self.processes:
+ p.join()
--- /dev/null
+#!/usr/bin/env python
+# Copyright 2014 the V8 project authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import unittest
+
+from pool import Pool
+
+def Run(x):
+ if x == 10:
+ raise Exception("Expected exception triggered by test.")
+ return x
+
+class PoolTest(unittest.TestCase):
+ def testNormal(self):
+ results = set()
+ pool = Pool(3)
+ for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
+ results.add(result)
+ self.assertEquals(set(range(0, 10)), results)
+
+ def testException(self):
+ results = set()
+ pool = Pool(3)
+ for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
+ # Item 10 will not appear in results due to an internal exception.
+ results.add(result)
+ expect = set(range(0, 12))
+ expect.remove(10)
+ self.assertEquals(expect, results)
+
+ def testAdd(self):
+ results = set()
+ pool = Pool(3)
+ for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
+ results.add(result)
+ if result < 30:
+ pool.add([result + 20])
+ self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)),
+ results)