2012-06-28 Dirk Pranke <dpranke@chromium.org>
+ nrwt: clean up how arguments are passed to workers
+ https://bugs.webkit.org/show_bug.cgi?id=90126
+
+ Reviewed by Ojan Vafai.
+
+ The way arguments are passed to workers has been crufty. It
+ turns out it can be a lot cleaner via two things:
+ 1) using a factory method instead of instantiating objects
+ directly in manager_worker_broker removes the need for passing
+ 'worker arguments' to the broker.
+ 2) it turns out that since mock hosts and test ports are purely
+ in-memory constructions, they can be pickled and passed to child
+ workers, meaning that the worker no longer needs hacky code to
+ pass the port in a special case or to guess what to do if we
+ don't have a host - all of the test-specific logic can move to
+ the test file, where we can stub out the mock host's
+ port_factory to return the same already-created port when it
+ needs to be shared.
+
+ This change also moves WorkerException to manager_worker_broker.py
+ where it belongs, and removes several useless tests that were
+ just a maintenance burden (and would've needed rewriting when we
+ change the rest of the broker implementation).
+
+ * Scripts/webkitpy/layout_tests/controllers/manager.py:
+ (Manager._run_tests.worker_factory):
+ (Manager._run_tests):
+ * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
+ (get):
+ (WorkerException):
+ (AbstractWorker.__init__):
+ (_ManagerConnection.__init__):
+ (_ManagerConnection.start_worker):
+ (_InlineManager.__init__):
+ (_InlineManager.start_worker):
+ (_MultiProcessManager._can_pickle_host):
+ (_MultiProcessManager):
+ (_MultiProcessManager.start_worker):
+ (_WorkerConnection.__init__):
+ (_InlineWorkerConnection.__init__):
+ (_InlineWorkerConnection.join):
+ (_InlineWorkerConnection.run):
+ (_Process.run):
+ (_MultiProcessWorkerConnection.__init__):
+ (_MultiProcessWorkerConnection.start):
+ (_MultiProcessWorkerConnection):
+ (_MultiProcessWorkerConnection.run):
+ * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
+ (_TestWorker.__init__):
+ (_TestWorker.run):
+ (_TestsMixin.test_name):
+ (_TestsMixin.test_cancel):
+ (_TestsMixin.test_done):
+ (_TestsMixin.test_unknown_message):
+ (InlineBrokerTests.setUp):
+ (MultiProcessBrokerTests.setUp):
+ * Scripts/webkitpy/layout_tests/controllers/worker.py:
+ (Worker.__init__):
+ (Worker.run):
+ * Scripts/webkitpy/layout_tests/controllers/worker_unittest.py: Removed.
+ * Scripts/webkitpy/layout_tests/run_webkit_tests_integrationtest.py:
+ (passing_run):
+ (logging_run):
+ (run_and_capture):
+ (MainTest.test_child_processes_2):
+ (MainTest.test_child_processes_min):
+ (MainTest.test_exception_raised):
+ (MainTest.test_keyboard_interrupt):
+ (MainTest.test_retrying_and_flaky_tests):
+ (MainTest.test_run_order__inline):
+
+2012-06-28 Dirk Pranke <dpranke@chromium.org>
+
nrwt: don't try to catch worker exceptions in run_webkit_tests.__main__
https://bugs.webkit.org/show_bug.cgi?id=90125
return self.__class__, (self.reason,)
-class WorkerException(Exception):
- """Raised when we receive an unexpected/unknown exception from a worker."""
- pass
+WorkerException = manager_worker_broker.WorkerException
class TestShard(object):
num_workers = min(num_workers, len(all_shards))
self._log_num_workers(num_workers, len(all_shards), len(locked_shards))
- manager_connection = manager_worker_broker.get(num_workers, self, worker.Worker)
+ def worker_factory(worker_connection, worker_number):
+ return worker.Worker(worker_connection, worker_number, self.results_directory(), self._options)
+
+ manager_connection = manager_worker_broker.get(num_workers, self, worker_factory, self._port.host)
if self._options.dry_run:
return (keyboard_interrupted, interrupted, thread_timings, self._group_stats, self._all_results)
self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
for worker_number in xrange(num_workers):
- worker_arguments = worker.WorkerArguments(worker_number, self.results_directory(), self._options)
- worker_connection = manager_connection.start_worker(worker_arguments)
- if num_workers == 1:
- # FIXME: We need to be able to share a port with the work so
- # that some of the tests can query state on the port; ideally
- # we'd rewrite the tests so that this wasn't necessary.
- #
- # Note that this only works because in the inline case
- # the worker hasn't really started yet and won't start
- # running until we call run_message_loop(), below.
- worker_connection.set_inline_arguments(self._port)
-
+ worker_connection = manager_connection.start_worker(worker_number)
worker_state = _WorkerState(worker_number, worker_connection)
self._worker_states[worker_connection.name()] = worker_state
ANY_WORKER_TOPIC = 'workers'
-def get(max_workers, client, worker_class):
+def get(max_workers, client, worker_factory, host=None):
"""Return a connection to a manager/worker message_broker
Args:
max_workers - max # of workers to run concurrently.
client - BrokerClient implementation to dispatch
replies to.
- worker_class - type of workers to create. This class should override
- the methods in AbstractWorker.
+ worker_factory: factory method for creatin objects that implement the AbstractWorker interface.
+ host: optional picklable host object that can be passed to workers for testing.
Returns:
A handle to an object that will talk to a message broker configured
for the normal manager/worker communication."""
manager_class = _MultiProcessManager
broker = _Broker(queue_class)
- return manager_class(broker, client, worker_class)
+ return manager_class(broker, client, worker_factory, host)
+
+
+class WorkerException(Exception):
+ """Raised when we receive an unexpected/unknown exception from a worker."""
+ pass
class BrokerClient(object):
class AbstractWorker(BrokerClient):
- def __init__(self, worker_connection, worker_arguments=None):
+ def __init__(self, worker_connection, worker_number):
"""The constructor should be used to do any simple initialization
necessary, but should not do anything that creates data structures
that cannot be Pickled or sent across processes (like opening
Args:
worker_connection - handle to the _BrokerConnection object creating
the worker and that can be used for messaging.
- worker_arguments - (optional, Picklable) object passed to the worker from the manager"""
+ worker_number - the number/index of the current worker."""
BrokerClient.__init__(self)
self._worker_connection = worker_connection
- self._name = 'worker'
+ self._worker_number = worker_number
+ self._name = 'worker/%d' % worker_number
self._done = False
self._canceled = False
class _ManagerConnection(_BrokerConnection):
- def __init__(self, broker, client, worker_class):
- """Base initialization for all Manager objects.
-
- Args:
- broker: handle to the message_broker object
- client: callback object (the caller)
- worker_class: class object to use to create workers.
- """
+ def __init__(self, broker, client, worker_factory, host):
_BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC)
- self._worker_class = worker_class
-
- def start_worker(self, worker_arguments=None):
- """Starts a new worker.
+ self._worker_factory = worker_factory
+ self._host = host
- Args:
- worker_arguments - an optional Picklable object that is passed to the worker constructor
- """
+ def start_worker(self, worker_number):
raise NotImplementedError
class _InlineManager(_ManagerConnection):
- def __init__(self, broker, client, worker_class):
- _ManagerConnection.__init__(self, broker, client, worker_class)
+ def __init__(self, broker, client, worker_factory, host):
+ _ManagerConnection.__init__(self, broker, client, worker_factory, host)
self._inline_worker = None
- def start_worker(self, worker_arguments=None):
- self._inline_worker = _InlineWorkerConnection(self._broker,
- self._client, self._worker_class, worker_arguments)
+ def start_worker(self, worker_number):
+ host = self._host
+ self._inline_worker = _InlineWorkerConnection(host, self._broker, self._client, self._worker_factory, worker_number)
return self._inline_worker
- def set_inline_arguments(self, arguments=None):
- # Note that this method only exists here, and not on all
- # ManagerConnections; calling this method on a MultiProcessManager
- # will deliberately result in a runtime error.
- self._inline_worker.set_inline_arguments(arguments)
-
def run_message_loop(self, delay_secs=None):
# Note that delay_secs is ignored in this case since we can't easily
# implement it.
class _MultiProcessManager(_ManagerConnection):
- def start_worker(self, worker_arguments=None):
- worker_connection = _MultiProcessWorkerConnection(self._broker,
- self._worker_class, worker_arguments)
+ def _can_pickle_host(self):
+ try:
+ cPickle.dumps(self._host)
+ return True
+ except TypeError:
+ return False
+
+ def start_worker(self, worker_number):
+ host = None
+ if self._can_pickle_host():
+ host = self._host
+ worker_connection = _MultiProcessWorkerConnection(host, self._broker, self._worker_factory, worker_number)
worker_connection.start()
return worker_connection
class _WorkerConnection(_BrokerConnection):
- def __init__(self, broker, worker_class, worker_arguments=None):
- self._client = worker_class(self, worker_arguments)
+ def __init__(self, host, broker, worker_factory, worker_number):
+ self._client = worker_factory(self, worker_number)
+ self._host = host
_BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC)
def name(self):
class _InlineWorkerConnection(_WorkerConnection):
- def __init__(self, broker, manager_client, worker_class, worker_arguments):
- _WorkerConnection.__init__(self, broker, worker_class, worker_arguments)
+ def __init__(self, host, broker, manager_client, worker_factory, worker_number):
+ _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
self._alive = False
self._manager_client = manager_client
def join(self, timeout):
assert not self._alive
- def set_inline_arguments(self, arguments):
- self._client.set_inline_arguments(arguments)
-
def run(self):
self._alive = True
try:
- self._client.run()
+ self._client.run(self._host, set_up_logging=False)
finally:
self._alive = False
self._client = client
def run(self):
- self._client.run()
+ self._worker_connection.run()
class _MultiProcessWorkerConnection(_WorkerConnection):
- def __init__(self, broker, worker_class, worker_arguments):
- _WorkerConnection.__init__(self, broker, worker_class, worker_arguments)
+ def __init__(self, host, broker, worker_factory, worker_number):
+ _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
self._proc = _Process(self, self._client)
def cancel(self):
def start(self):
self._proc.start()
+
+ def run(self):
+ # FIXME: set_up_logging shouldn't be needed.
+ self._client.run(self._host, set_up_logging=True)
stopping_queue = None
-WORKER_NAME = 'TestWorker'
-
+WORKER_NAME = 'worker/1'
def make_broker(manager, max_workers, start_queue=None, stop_queue=None):
global starting_queue
class _TestWorker(manager_worker_broker.AbstractWorker):
- def __init__(self, worker_connection, worker_arguments=None):
- super(_TestWorker, self).__init__(worker_connection)
- self._name = WORKER_NAME
+ def __init__(self, worker_connection, worker_number=1):
+ super(_TestWorker, self).__init__(worker_connection, worker_number)
self._thing_to_greet = 'everybody'
self._starting_queue = starting_queue
self._stopping_queue = stopping_queue
- def set_inline_arguments(self, thing_to_greet):
- self._thing_to_greet = thing_to_greet
-
def handle_stop(self, src):
self.stop_handling_messages()
assert a_str == "hello, world"
self._worker_connection.post_message('test', 2, 'hi, ' + self._thing_to_greet)
- def run(self):
+ def run(self, host, set_up_logging):
if self._starting_queue:
self._starting_queue.put('')
def test_name(self):
self.make_broker()
- worker = self._broker.start_worker()
+ worker = self._broker.start_worker(1)
self.assertEquals(worker.name(), WORKER_NAME)
worker.cancel()
worker.join(0.1)
def test_cancel(self):
self.make_broker()
- worker = self._broker.start_worker()
+ worker = self._broker.start_worker(1)
self._broker.post_message('test', 1, 'hello, world')
worker.cancel()
worker.join(0.1)
def test_done(self):
self.make_broker()
- worker = self._broker.start_worker()
+ worker = self._broker.start_worker(1)
self._broker.post_message('test', 1, 'hello, world')
self._broker.post_message('stop')
self._broker.run_message_loop()
def test_unknown_message(self):
self.make_broker()
- worker = self._broker.start_worker()
+ worker = self._broker.start_worker(1)
self._broker.post_message('unknown')
try:
self._broker.run_message_loop()
_TestsMixin.setUp(self)
self._max_workers = 1
- def test_inline_arguments(self):
- self.make_broker()
- worker = self._broker.start_worker()
- worker.set_inline_arguments('me')
- self._broker.post_message('test', 1, 'hello, world')
- self._broker.post_message('stop')
- self._broker.run_message_loop()
- self.assertEquals(self._a_str, 'hi, me')
-
# FIXME: https://bugs.webkit.org/show_bug.cgi?id=54520.
if sys.platform not in ('cygwin', 'win32'):
self._max_workers = 2
-class InterfaceTest(unittest.TestCase):
- # These tests mostly exist to pacify coverage.
-
- # FIXME: There must be a better way to do this and also verify
- # that classes do implement every abstract method in an interface.
- def test_brokerclient_is_abstract(self):
- # Test that all the base class methods are abstract and have the
- # signature we expect.
- obj = manager_worker_broker.BrokerClient()
- self.assertRaises(NotImplementedError, obj.is_done)
- self.assertRaises(NotImplementedError, obj.name)
-
- def test_managerconnection_is_abstract(self):
- # Test that all the base class methods are abstract and have the
- # signature we expect.
- broker = make_broker(self, 1)
- obj = manager_worker_broker._ManagerConnection(broker._broker, self, None)
- self.assertRaises(NotImplementedError, obj.start_worker)
-
- def test_workerconnection_is_abstract(self):
- # Test that all the base class methods are abstract and have the
- # signature we expect.
- broker = make_broker(self, 1)
- obj = manager_worker_broker._WorkerConnection(broker._broker, _TestWorker, None)
- self.assertRaises(NotImplementedError, obj.cancel)
- self.assertRaises(NotImplementedError, obj.is_alive)
- self.assertRaises(NotImplementedError, obj.join, None)
-
-
class MessageTest(unittest.TestCase):
def test__no_body(self):
msg = manager_worker_broker._Message('src', 'topic_name', 'message_name', None)
_log = logging.getLogger(__name__)
-class WorkerArguments(object):
- def __init__(self, worker_number, results_directory, options):
- self.worker_number = worker_number
- self.results_directory = results_directory
- self.options = options
-
-
class Worker(manager_worker_broker.AbstractWorker):
- def __init__(self, worker_connection, worker_arguments):
- super(Worker, self).__init__(worker_connection, worker_arguments)
- self._worker_number = worker_arguments.worker_number
- self._name = 'worker/%d' % self._worker_number
- self._results_directory = worker_arguments.results_directory
- self._options = worker_arguments.options
+ def __init__(self, worker_connection, worker_number, results_directory, options):
+ super(Worker, self).__init__(worker_connection, worker_number)
+ self._results_directory = results_directory
+ self._options = options
self._port = None
self._batch_size = None
self._batch_count = None
self._log_handler = _WorkerLogHandler(self)
self._logger.addHandler(self._log_handler)
- def _set_up_host_and_port(self):
- options = self._options
- if options.platform and 'test' in options.platform:
- # It is lame to import mocks into real code, but this allows us to use the test port in multi-process tests as well.
- from webkitpy.common.host_mock import MockHost
- host = MockHost()
- else:
+ def run(self, host, set_up_logging):
+ if not host:
host = Host()
- self._port = host.port_factory.get(options.platform, options)
- def set_inline_arguments(self, port):
- self._port = port
-
- def run(self):
- if not self._port:
- # We are running in a child process and need to initialize things.
+ # FIXME: this should move into manager_worker_broker.py.
+ if set_up_logging:
self._set_up_logging()
- self._set_up_host_and_port()
+
+ options = self._options
+ self._port = host.port_factory.get(options.platform, options)
self.safe_init()
try:
+++ /dev/null
-# Copyright (c) 2012 Google Inc. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import unittest
-
-from webkitpy.layout_tests.controllers.worker import Worker, WorkerArguments
-from webkitpy.tool.mocktool import MockOptions
-
-
-class FakeConnection(object):
- def run_message_loop(self):
- pass
-
- def post_message(self, message_name, *message_args):
- pass
-
-
-class WorkerTest(unittest.TestCase):
- def test_default_platform_in_worker(self):
- # This checks that we got a port and didn't raise an exception
- # if we didn't specify a port with the --platform flag.
- worker_connection = FakeConnection()
- worker = Worker(worker_connection, WorkerArguments(1, '/tmp', MockOptions(platform=None, print_options=None, verbose=False, batch_size=0)))
- worker._set_up_host_and_port()
- self.assertNotEquals(worker._port, None)
-
-
-if __name__ == '__main__':
- unittest.main()
from webkitpy.layout_tests import port
from webkitpy.layout_tests import run_webkit_tests
-from webkitpy.layout_tests.controllers.manager import WorkerException
+from webkitpy.layout_tests.controllers.manager_worker_broker import WorkerException
from webkitpy.layout_tests.port import Port
from webkitpy.layout_tests.port.test import TestPort, TestDriver
from webkitpy.test.skip import skip_if
return run_webkit_tests.parse_args(args)
-def passing_run(extra_args=None, port_obj=None, record_results=False, tests_included=False, host=None):
+def passing_run(extra_args=None, port_obj=None, record_results=False, tests_included=False, host=None, shared_port=True):
options, parsed_args = parse_args(extra_args, record_results, tests_included)
if not port_obj:
host = host or MockHost()
port_obj = host.port_factory.get(port_name=options.platform, options=options)
+
+ if shared_port:
+ port_obj.host.port_factory.get = lambda *args, **kwargs: port_obj
+
buildbot_output = StringIO.StringIO()
regular_output = StringIO.StringIO()
res = run_webkit_tests.run(port_obj, options, parsed_args, buildbot_output=buildbot_output, regular_output=regular_output)
return res == 0 and not regular_output.getvalue() and not buildbot_output.getvalue()
-def logging_run(extra_args=None, port_obj=None, record_results=False, tests_included=False, host=None, new_results=False):
+def logging_run(extra_args=None, port_obj=None, record_results=False, tests_included=False, host=None, new_results=False, shared_port=True):
options, parsed_args = parse_args(extra_args=extra_args,
record_results=record_results,
tests_included=tests_included,
if not port_obj:
port_obj = host.port_factory.get(port_name=options.platform, options=options)
- res, buildbot_output, regular_output = run_and_capture(port_obj, options, parsed_args)
+ res, buildbot_output, regular_output = run_and_capture(port_obj, options, parsed_args, shared_port)
return (res, buildbot_output, regular_output, host.user)
-def run_and_capture(port_obj, options, parsed_args):
+def run_and_capture(port_obj, options, parsed_args, shared_port=True):
+ if shared_port:
+ port_obj.host.port_factory.get = lambda *args, **kwargs: port_obj
oc = outputcapture.OutputCapture()
try:
oc.capture_output()
def test_child_processes_2(self):
if self.should_test_processes:
_, _, regular_output, _ = logging_run(
- ['--print', 'config', '--child-processes', '2'])
+ ['--print', 'config', '--child-processes', '2'], shared_port=False)
self.assertTrue(any(['Running 2 ' in line for line in regular_output.buflist]))
def test_child_processes_min(self):
if self.should_test_processes:
_, _, regular_output, _ = logging_run(
['--print', 'config', '--child-processes', '2', 'passes'],
- tests_included=True)
+ tests_included=True, shared_port=False)
self.assertTrue(any(['Running 1 ' in line for line in regular_output.buflist]))
def test_dryrun(self):
if self.should_test_processes:
self.assertRaises(WorkerException, logging_run,
- ['--child-processes', '2', '--force', 'failures/expected/exception.html', 'passes/text.html'], tests_included=True)
+ ['--child-processes', '2', '--force', 'failures/expected/exception.html', 'passes/text.html'], tests_included=True, shared_port=False)
def test_full_results_html(self):
# FIXME: verify html?
if self.should_test_processes:
self.assertRaises(KeyboardInterrupt, logging_run,
- ['failures/expected/keyboard.html', 'passes/text.html', '--child-processes', '2', '--force'], tests_included=True)
+ ['failures/expected/keyboard.html', 'passes/text.html', '--child-processes', '2', '--force'], tests_included=True, shared_port=False)
def test_no_tests_found(self):
res, out, err, user = logging_run(['resources'], tests_included=True)
# Now we test that --clobber-old-results does remove the old entries and the old retries,
# and that we don't retry again.
+ host = MockHost()
res, out, err, _ = logging_run(['--no-retry-failures', '--clobber-old-results', 'failures/flaky'], tests_included=True, host=host)
self.assertEquals(res, 1)
self.assertTrue('Clobbering old results' in err.getvalue())
self.assertTrue(host.filesystem.exists('/tmp/layout-test-results/failures/flaky/text-actual.txt'))
self.assertFalse(host.filesystem.exists('retries'))
-
- # These next tests test that we run the tests in ascending alphabetical
- # order per directory. HTTP tests are sharded separately from other tests,
- # so we have to test both.
- def assert_run_order(self, child_processes='1'):
- tests_run = get_tests_run(['--child-processes', child_processes, 'passes'],
- tests_included=True, flatten_batches=True)
+ def test_run_order__inline(self):
+ # These next tests test that we run the tests in ascending alphabetical
+ # order per directory. HTTP tests are sharded separately from other tests,
+ # so we have to test both.
+ tests_run = get_tests_run(['passes'], tests_included=True, flatten_batches=True)
self.assertEquals(tests_run, sorted(tests_run))
- tests_run = get_tests_run(['--child-processes', child_processes, 'http/tests/passes'],
- tests_included=True, flatten_batches=True)
+ tests_run = get_tests_run(['http/tests/passes'], tests_included=True, flatten_batches=True)
self.assertEquals(tests_run, sorted(tests_run))
- def test_run_order__inline(self):
- self.assert_run_order()
-
def test_tolerance(self):
class ImageDiffTestPort(TestPort):
def diff_image(self, expected_contents, actual_contents, tolerance=None):