From: machenbach@chromium.org Date: Wed, 14 May 2014 13:30:57 +0000 (+0000) Subject: Introduce a dynamic process pool for the local test driver X-Git-Tag: upstream/4.7.83~9129 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=fc437f400763867ac9a9e1c46c92f94512f2fb53;p=platform%2Fupstream%2Fv8.git Introduce a dynamic process pool for the local test driver The new process pool allows adding jobs after testing has been started. It will also allow to restructure building the job queue (in a follow up CL), so that testing can start instantly while the queue is being built. Also attempts to clean up the keyboard-interrupt logic. Idea: Only catch keyboard interrupt once per process at the outermost level. Use proper "finally" clauses to clean up everywhere where a keyboard interrupt might occur. Never turn named exceptions into none-exceptions using anonymous "raise". TEST=python -m unittest pool_unittest R=jkummerow@chromium.org Review URL: https://codereview.chromium.org/275093002 git-svn-id: https://v8.googlecode.com/svn/branches/bleeding_edge@21310 ce2b1a6d-e550-0410-aec6-3dcde31c8c00 --- diff --git a/tools/run-tests.py b/tools/run-tests.py index 346926c..6b07574 100755 --- a/tools/run-tests.py +++ b/tools/run-tests.py @@ -463,44 +463,39 @@ def Execute(arch, mode, args, options, suites, workspace): return 0 # Run the tests, either locally or distributed on the network. - try: - start_time = time.time() - progress_indicator = progress.PROGRESS_INDICATORS[options.progress]() - if options.junitout: - progress_indicator = progress.JUnitTestProgressIndicator( - progress_indicator, options.junitout, options.junittestsuite) - - run_networked = not options.no_network - if not run_networked: - print("Network distribution disabled, running tests locally.") - elif utils.GuessOS() != "linux": - print("Network distribution is only supported on Linux, sorry!") + start_time = time.time() + progress_indicator = progress.PROGRESS_INDICATORS[options.progress]() + if options.junitout: + progress_indicator = progress.JUnitTestProgressIndicator( + progress_indicator, options.junitout, options.junittestsuite) + + run_networked = not options.no_network + if not run_networked: + print("Network distribution disabled, running tests locally.") + elif utils.GuessOS() != "linux": + print("Network distribution is only supported on Linux, sorry!") + run_networked = False + peers = [] + if run_networked: + peers = network_execution.GetPeers() + if not peers: + print("No connection to distribution server; running tests locally.") run_networked = False - peers = [] - if run_networked: - peers = network_execution.GetPeers() - if not peers: - print("No connection to distribution server; running tests locally.") - run_networked = False - elif len(peers) == 1: - print("No other peers on the network; running tests locally.") - run_networked = False - elif num_tests <= 100: - print("Less than 100 tests, running them locally.") - run_networked = False - - if run_networked: - runner = network_execution.NetworkedRunner(suites, progress_indicator, - ctx, peers, workspace) - else: - runner = execution.Runner(suites, progress_indicator, ctx) - - exit_code = runner.Run(options.j) - if runner.terminate: - return exit_code - overall_duration = time.time() - start_time - except KeyboardInterrupt: - raise + elif len(peers) == 1: + print("No other peers on the network; running tests locally.") + run_networked = False + elif num_tests <= 100: + print("Less than 100 tests, running them locally.") + run_networked = False + + if run_networked: + runner = network_execution.NetworkedRunner(suites, progress_indicator, + ctx, peers, workspace) + else: + runner = execution.Runner(suites, progress_indicator, ctx) + + exit_code = runner.Run(options.j) + overall_duration = time.time() - start_time if options.time: verbose.PrintTestDurations(suites, overall_duration) diff --git a/tools/testrunner/local/commands.py b/tools/testrunner/local/commands.py index 4f3dc51..d6445d0 100644 --- a/tools/testrunner/local/commands.py +++ b/tools/testrunner/local/commands.py @@ -64,49 +64,46 @@ def Win32SetErrorMode(mode): def RunProcess(verbose, timeout, args, **rest): - try: - if verbose: print "#", " ".join(args) - popen_args = args - prev_error_mode = SEM_INVALID_VALUE - if utils.IsWindows(): - popen_args = subprocess.list2cmdline(args) - # Try to change the error mode to avoid dialogs on fatal errors. Don't - # touch any existing error mode flags by merging the existing error mode. - # See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx. - error_mode = SEM_NOGPFAULTERRORBOX - prev_error_mode = Win32SetErrorMode(error_mode) - Win32SetErrorMode(error_mode | prev_error_mode) - process = subprocess.Popen( - shell=utils.IsWindows(), - args=popen_args, - **rest - ) - if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE): - Win32SetErrorMode(prev_error_mode) - # Compute the end time - if the process crosses this limit we - # consider it timed out. - if timeout is None: end_time = None - else: end_time = time.time() + timeout - timed_out = False - # Repeatedly check the exit code from the process in a - # loop and keep track of whether or not it times out. - exit_code = None - sleep_time = INITIAL_SLEEP_TIME - while exit_code is None: - if (not end_time is None) and (time.time() >= end_time): - # Kill the process and wait for it to exit. - KillProcessWithID(process.pid) - exit_code = process.wait() - timed_out = True - else: - exit_code = process.poll() - time.sleep(sleep_time) - sleep_time = sleep_time * SLEEP_TIME_FACTOR - if sleep_time > MAX_SLEEP_TIME: - sleep_time = MAX_SLEEP_TIME - return (exit_code, timed_out) - except KeyboardInterrupt: - raise + if verbose: print "#", " ".join(args) + popen_args = args + prev_error_mode = SEM_INVALID_VALUE + if utils.IsWindows(): + popen_args = subprocess.list2cmdline(args) + # Try to change the error mode to avoid dialogs on fatal errors. Don't + # touch any existing error mode flags by merging the existing error mode. + # See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx. + error_mode = SEM_NOGPFAULTERRORBOX + prev_error_mode = Win32SetErrorMode(error_mode) + Win32SetErrorMode(error_mode | prev_error_mode) + process = subprocess.Popen( + shell=utils.IsWindows(), + args=popen_args, + **rest + ) + if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE): + Win32SetErrorMode(prev_error_mode) + # Compute the end time - if the process crosses this limit we + # consider it timed out. + if timeout is None: end_time = None + else: end_time = time.time() + timeout + timed_out = False + # Repeatedly check the exit code from the process in a + # loop and keep track of whether or not it times out. + exit_code = None + sleep_time = INITIAL_SLEEP_TIME + while exit_code is None: + if (not end_time is None) and (time.time() >= end_time): + # Kill the process and wait for it to exit. + KillProcessWithID(process.pid) + exit_code = process.wait() + timed_out = True + else: + exit_code = process.poll() + time.sleep(sleep_time) + sleep_time = sleep_time * SLEEP_TIME_FACTOR + if sleep_time > MAX_SLEEP_TIME: + sleep_time = MAX_SLEEP_TIME + return (exit_code, timed_out) def PrintError(string): @@ -142,11 +139,9 @@ def Execute(args, verbose=False, timeout=None): stdout=fd_out, stderr=fd_err ) - except KeyboardInterrupt: - raise - except: - raise finally: + # TODO(machenbach): A keyboard interrupt before the assignment to + # fd_out|err can lead to reference errors here. os.close(fd_out) os.close(fd_err) out = file(outname).read() diff --git a/tools/testrunner/local/execution.py b/tools/testrunner/local/execution.py index f4a4020..80d881b 100644 --- a/tools/testrunner/local/execution.py +++ b/tools/testrunner/local/execution.py @@ -26,19 +26,14 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import multiprocessing import os -import threading import time +from pool import Pool from . import commands from . import utils -BREAK_NOW = -1 -EXCEPTION = -2 - - class Job(object): def __init__(self, command, dep_command, test_id, timeout, verbose): self.command = command @@ -49,24 +44,17 @@ class Job(object): def RunTest(job): - try: - start_time = time.time() - if job.dep_command is not None: - dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout) - # TODO(jkummerow): We approximate the test suite specific function - # IsFailureOutput() by just checking the exit code here. Currently - # only cctests define dependencies, for which this simplification is - # correct. - if dep_output.exit_code != 0: - return (job.id, dep_output, time.time() - start_time) - output = commands.Execute(job.command, job.verbose, job.timeout) - return (job.id, output, time.time() - start_time) - except KeyboardInterrupt: - return (-1, BREAK_NOW, 0) - except Exception, e: - print(">>> EXCEPTION: %s" % e) - return (-1, EXCEPTION, 0) - + start_time = time.time() + if job.dep_command is not None: + dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout) + # TODO(jkummerow): We approximate the test suite specific function + # IsFailureOutput() by just checking the exit code here. Currently + # only cctests define dependencies, for which this simplification is + # correct. + if dep_output.exit_code != 0: + return (job.id, dep_output, time.time() - start_time) + output = commands.Execute(job.command, job.verbose, job.timeout) + return (job.id, output, time.time() - start_time) class Runner(object): @@ -83,8 +71,6 @@ class Runner(object): self.remaining = num_tests self.failed = [] self.crashed = 0 - self.terminate = False - self.lock = threading.Lock() def Run(self, jobs): self.indicator.Starting() @@ -95,8 +81,11 @@ class Runner(object): return 0 def _RunInternal(self, jobs): - pool = multiprocessing.Pool(processes=jobs) + pool = Pool(jobs) test_map = {} + # TODO(machenbach): Instead of filling the queue completely before + # pool.imap_unordered, make this a generator that already starts testing + # while the queue is filled. queue = [] queued_exception = None for test in self.tests: @@ -119,22 +108,11 @@ class Runner(object): else: dep_command = None job = Job(command, dep_command, test.id, timeout, self.context.verbose) - queue.append(job) + queue.append([job]) try: - kChunkSize = 1 - it = pool.imap_unordered(RunTest, queue, kChunkSize) + it = pool.imap_unordered(RunTest, queue) for result in it: - test_id = result[0] - if test_id < 0: - if result[1] == BREAK_NOW: - self.terminate = True - else: - continue - if self.terminate: - pool.terminate() - pool.join() - raise BreakNowException("User pressed Ctrl+C or IO went wrong") - test = test_map[test_id] + test = test_map[result[0]] self.indicator.AboutToRun(test) test.output = result[1] test.duration = result[2] @@ -147,18 +125,10 @@ class Runner(object): self.succeeded += 1 self.remaining -= 1 self.indicator.HasRun(test, has_unexpected_output) - except KeyboardInterrupt: - pool.terminate() - pool.join() - raise - except Exception, e: - print("Exception: %s" % e) + finally: pool.terminate() - pool.join() - raise if queued_exception: raise queued_exception - return def GetCommand(self, test): diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py new file mode 100644 index 0000000..8f629f9 --- /dev/null +++ b/tools/testrunner/local/pool.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# Copyright 2014 the V8 project authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +from multiprocessing import Event, Process, Queue + +class NormalResult(): + def __init__(self, result): + self.result = result + self.exception = False + self.break_now = False + + +class ExceptionResult(): + def __init__(self): + self.exception = True + self.break_now = False + + +class BreakResult(): + def __init__(self): + self.exception = False + self.break_now = True + + +def Worker(fn, work_queue, done_queue, done): + """Worker to be run in a child process. + The worker stops on two conditions. 1. When the poison pill "STOP" is + reached or 2. when the event "done" is set.""" + try: + for args in iter(work_queue.get, "STOP"): + if done.is_set(): + break + try: + done_queue.put(NormalResult(fn(*args))) + except Exception, e: + print(">>> EXCEPTION: %s" % e) + done_queue.put(ExceptionResult()) + except KeyboardInterrupt: + done_queue.put(BreakResult()) + + +class Pool(): + """Distributes tasks to a number of worker processes. + New tasks can be added dynamically even after the workers have been started. + Requirement: Tasks can only be added from the parent process, e.g. while + consuming the results generator.""" + + # Factor to calculate the maximum number of items in the work/done queue. + # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. + BUFFER_FACTOR = 4 + + def __init__(self, num_workers): + self.num_workers = num_workers + self.processes = [] + self.terminated = False + + # Invariant: count >= #work_queue + #done_queue. It is greater when a + # worker takes an item from the work_queue and before the result is + # submitted to the done_queue. It is equal when no worker is working, + # e.g. when all workers have finished, and when no results are processed. + # Count is only accessed by the parent process. Only the parent process is + # allowed to remove items from the done_queue and to add items to the + # work_queue. + self.count = 0 + self.work_queue = Queue() + self.done_queue = Queue() + self.done = Event() + + def imap_unordered(self, fn, gen): + """Maps function "fn" to items in generator "gen" on the worker processes + in an arbitrary order. The items are expected to be lists of arguments to + the function. Returns a results iterator.""" + try: + gen = iter(gen) + self.advance = self._advance_more + + for w in xrange(self.num_workers): + p = Process(target=Worker, args=(fn, + self.work_queue, + self.done_queue, + self.done)) + self.processes.append(p) + p.start() + + self.advance(gen) + while self.count > 0: + result = self.done_queue.get() + self.count -= 1 + if result.exception: + # Ignore items with unexpected exceptions. + continue + elif result.break_now: + # A keyboard interrupt happened in one of the worker processes. + raise KeyboardInterrupt + else: + yield result.result + self.advance(gen) + finally: + self.terminate() + + def _advance_more(self, gen): + while self.count < self.num_workers * self.BUFFER_FACTOR: + try: + self.work_queue.put(gen.next()) + self.count += 1 + except StopIteration: + self.advance = self._advance_empty + break + + def _advance_empty(self, gen): + pass + + def add(self, args): + """Adds an item to the work queue. Can be called dynamically while + processing the results from imap_unordered.""" + self.work_queue.put(args) + self.count += 1 + + def terminate(self): + if self.terminated: + return + self.terminated = True + + # For exceptional tear down set the "done" event to stop the workers before + # they empty the queue buffer. + self.done.set() + + for p in self.processes: + # During normal tear down the workers block on get(). Feed a poison pill + # per worker to make them stop. + self.work_queue.put("STOP") + + for p in self.processes: + p.join() diff --git a/tools/testrunner/local/pool_unittest.py b/tools/testrunner/local/pool_unittest.py new file mode 100644 index 0000000..bf2b3f8 --- /dev/null +++ b/tools/testrunner/local/pool_unittest.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# Copyright 2014 the V8 project authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import unittest + +from pool import Pool + +def Run(x): + if x == 10: + raise Exception("Expected exception triggered by test.") + return x + +class PoolTest(unittest.TestCase): + def testNormal(self): + results = set() + pool = Pool(3) + for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]): + results.add(result) + self.assertEquals(set(range(0, 10)), results) + + def testException(self): + results = set() + pool = Pool(3) + for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]): + # Item 10 will not appear in results due to an internal exception. + results.add(result) + expect = set(range(0, 12)) + expect.remove(10) + self.assertEquals(expect, results) + + def testAdd(self): + results = set() + pool = Pool(3) + for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]): + results.add(result) + if result < 30: + pool.add([result + 20]) + self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)), + results)