https://bugs.webkit.org/show_bug.cgi?id=90409
Reviewed by Ojan Vafai.
Currently the Worker class derives from AbstractWorker, which is
kind of crufty and awkward; it would be better if we did not
rely on shared state.
This change changes that so that Worker derives from object, and
exposes the following interface:
__init__() - called in the manager process
safe_init() - called in the worker process to initialize
unpicklable state
handle() - a single routine to handle all messages
cleanup() - called so the worker can clean up
Also, all of the "administrative" messages that are handled by
the worker (notification of start/stop/etc.) move into
manager_worker_broker - this reduces worker.py to just handling
the mechanics of actually running each test.
For the moment, we do this by creating Yet Another wrapper/proxy
class in manager_worker_broker, but this will get simpler
shortly when the rest of m_w_b is cleaned up.
With this change worker is now in its new form but there will be
a follow-on change that cleans up some names and other minor
things.
This change is again mostly just moving things around and should
be covered by the (updated) existing tests.
* Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
(get):
(AbstractWorker.__init__):
(AbstractWorker.run):
(AbstractWorker):
(AbstractWorker.handle_stop):
(AbstractWorker.handle_test_list):
(AbstractWorker.yield_to_broker):
(AbstractWorker.post_message):
(_WorkerConnection.__init__):
(_Process.run):
* Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
(_TestWorker):
(_TestWorker.__init__):
(_TestWorker.name):
(_TestWorker.cleanup):
(_TestWorker.handle):
(_TestWorker.safe_init):
(_TestWorker.stop):
(_TestsMixin.handle_finished_test):
(_TestsMixin.setUp):
(_TestsMixin.test_cancel):
(_TestsMixin.test_done):
* Scripts/webkitpy/layout_tests/controllers/worker.py:
(Worker):
(Worker.__init__):
(Worker.safe_init):
(Worker.handle):
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@121809
268f45cc-cd09-0410-ab3c-
d52691b4dbfc
2012-07-03 Dirk Pranke <dpranke@chromium.org>
+ nrwt: make the worker class stand alone with a cleaner interface
+ https://bugs.webkit.org/show_bug.cgi?id=90409
+
+ Reviewed by Ojan Vafai.
+
+ Currently the Worker class derives from AbstractWorker, which is
+ kind of crufty and awkward; it would be better if we did not
+ rely on shared state.
+
+ This change changes that so that Worker derives from object, and
+ exposes the following interface:
+ __init__() - called in the manager process
+ safe_init() - called in the worker process to initialize
+ unpicklable state
+ handle() - a single routine to handle all messages
+ cleanup() - called so the worker can clean up
+
+ Also, all of the "administrative" messages that are handled by
+ the worker (notification of start/stop/etc.) move into
+ manager_worker_broker - this reduces worker.py to just handling
+ the mechanics of actually running each test.
+
+ For the moment, we do this by creating Yet Another wrapper/proxy
+ class in manager_worker_broker, but this will get simpler
+ shortly when the rest of m_w_b is cleaned up.
+
+ With this change worker is now in its new form but there will be
+ a follow-on change that cleans up some names and other minor
+ things.
+
+ This change is again mostly just moving things around and should
+ be covered by the (updated) existing tests.
+
+ * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
+ (get):
+ (AbstractWorker.__init__):
+ (AbstractWorker.run):
+ (AbstractWorker):
+ (AbstractWorker.handle_stop):
+ (AbstractWorker.handle_test_list):
+ (AbstractWorker.yield_to_broker):
+ (AbstractWorker.post_message):
+ (_WorkerConnection.__init__):
+ (_Process.run):
+ * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
+ (_TestWorker):
+ (_TestWorker.__init__):
+ (_TestWorker.name):
+ (_TestWorker.cleanup):
+ (_TestWorker.handle):
+ (_TestWorker.safe_init):
+ (_TestWorker.stop):
+ (_TestsMixin.handle_finished_test):
+ (_TestsMixin.setUp):
+ (_TestsMixin.test_cancel):
+ (_TestsMixin.test_done):
+ * Scripts/webkitpy/layout_tests/controllers/worker.py:
+ (Worker):
+ (Worker.__init__):
+ (Worker.safe_init):
+ (Worker.handle):
+
+2012-07-03 Dirk Pranke <dpranke@chromium.org>
+
nrwt: moving child process logging code into manager_worker_broker
https://bugs.webkit.org/show_bug.cgi?id=90408
import traceback
+from webkitpy.common.host import Host
from webkitpy.common.system import stack_utils
from webkitpy.layout_tests.views import metered_stream
max_workers - max # of workers to run concurrently.
client - BrokerClient implementation to dispatch
replies to.
- worker_factory: factory method for creatin objects that implement the AbstractWorker interface.
+ worker_factory: factory method for creating objects that implement the Worker interface.
host: optional picklable host object that can be passed to workers for testing.
Returns:
A handle to an object that will talk to a message broker configured
class AbstractWorker(BrokerClient):
def __init__(self, worker_connection, worker_number):
- """The constructor should be used to do any simple initialization
- necessary, but should not do anything that creates data structures
- that cannot be Pickled or sent across processes (like opening
- files or sockets). Complex initialization should be done at the
- start of the run() call.
-
- Args:
- worker_connection - handle to the _BrokerConnection object creating
- the worker and that can be used for messaging.
- worker_number - the number/index of the current worker."""
BrokerClient.__init__(self)
+ self.worker = None
self._worker_connection = worker_connection
self._worker_number = worker_number
self._name = 'worker/%d' % worker_number
self._done = False
self._canceled = False
self._options = optparse.Values({'verbose': False})
+ self.host = None
def name(self):
return self._name
def stop_handling_messages(self):
self._done = True
- def run(self):
+ def run(self, host):
"""Callback for the worker to start executing. Typically does any
remaining initialization and then calls broker_connection.run_message_loop()."""
exception_msg = ""
+ self.host = host
+
+ self.worker.safe_init()
+ _log.debug('%s starting' % self._name)
try:
self._worker_connection.run_message_loop()
self._worker_connection.raise_exception(sys.exc_info())
finally:
_log.debug("%s done with message loop%s" % (self._name, exception_msg))
+ self.worker.cleanup()
+ self._worker_connection.post_message('done')
+
+ def handle_stop(self, source):
+ self._done = True
+
+ def handle_test_list(self, source, list_name, test_list):
+ self.worker.handle('test_list', source, list_name, test_list)
def cancel(self):
"""Called when possible to indicate to the worker to stop processing
method being called, so clients should not rely solely on this."""
self._canceled = True
+ def yield_to_broker(self):
+ self._worker_connection.yield_to_broker()
+
+ def post_message(self, *args):
+ self._worker_connection.post_message(*args)
+
class _ManagerConnection(_BrokerConnection):
def __init__(self, broker, client, worker_factory, host):
class _WorkerConnection(_BrokerConnection):
def __init__(self, host, broker, worker_factory, worker_number):
- self._client = worker_factory(self, worker_number)
+ # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the
+ # actual Worker (created by worker_factory) is very confusing, but this all gets better when
+ # _WorkerConnection and AbstractWorker get merged.
+ self._client = AbstractWorker(self, worker_number)
+ self._worker = worker_factory(self._client, worker_number)
+ self._client.worker = self._worker
self._host = host
self._log_messages = []
self._logger = None
self._client = client
def run(self):
+ if not self._worker_connection._host:
+ self._worker_connection._host = Host()
self._worker_connection.run()
return manager_worker_broker.get(max_workers, manager, _TestWorker)
-class _TestWorker(manager_worker_broker.AbstractWorker):
- def __init__(self, worker_connection, worker_number=1):
- super(_TestWorker, self).__init__(worker_connection, worker_number)
+class _TestWorker(object):
+ def __init__(self, caller, worker_number):
+ self._caller = caller
self._thing_to_greet = 'everybody'
self._starting_queue = starting_queue
self._stopping_queue = stopping_queue
+ self._options = optparse.Values({'verbose': False})
- def handle_stop(self, src):
- self.stop_handling_messages()
+ def name(self):
+ return WORKER_NAME
+
+ def cleanup(self):
+ pass
- def handle_test(self, src, an_int, a_str):
+ def handle(self, message, src, an_int, a_str):
assert an_int == 1
assert a_str == "hello, world"
- self._worker_connection.post_message('test', 2, 'hi, ' + self._thing_to_greet)
+ self._caller.post_message('finished_test', 2)
- def run(self, host):
+ def safe_init(self):
if self._starting_queue:
self._starting_queue.put('')
if self._stopping_queue:
self._stopping_queue.get()
- try:
- super(_TestWorker, self).run()
- finally:
- self._worker_connection.post_message('done')
+
+ def stop(self):
+ self._caller.post_message('done')
class FunctionTests(unittest.TestCase):
def handle_done(self, src, log_messages):
self._done = True
- def handle_test(self, src, an_int, a_str):
+ def handle_finished_test(self, src, an_int, log_messages):
self._an_int = an_int
- self._a_str = a_str
def handle_exception(self, src, exception_type, exception_value, stack):
raise exception_type(exception_value)
def setUp(self):
self._an_int = None
- self._a_str = None
self._broker = None
self._done = False
self._exception = None
def test_cancel(self):
self.make_broker()
worker = self._broker.start_worker(1)
- self._broker.post_message('test', 1, 'hello, world')
+ self._broker.post_message('test_list', 1, 'hello, world')
worker.cancel()
worker.join(0.1)
self.assertFalse(worker.is_alive())
def test_done(self):
self.make_broker()
worker = self._broker.start_worker(1)
- self._broker.post_message('test', 1, 'hello, world')
+ self._broker.post_message('test_list', 1, 'hello, world')
self._broker.post_message('stop')
self._broker.run_message_loop()
worker.join(0.5)
self.assertFalse(worker.is_alive())
self.assertTrue(self.is_done())
self.assertEqual(self._an_int, 2)
- self.assertEqual(self._a_str, 'hi, everybody')
self._broker.cleanup()
def test_unknown_message(self):
import threading
import time
-from webkitpy.common.host import Host
-from webkitpy.layout_tests.controllers import manager_worker_broker
from webkitpy.layout_tests.controllers import single_test_runner
from webkitpy.layout_tests.models import test_expectations
from webkitpy.layout_tests.models import test_results
_log = logging.getLogger(__name__)
-class Worker(manager_worker_broker.AbstractWorker):
+class Worker(object):
def __init__(self, worker_connection, worker_number, results_directory, options):
- super(Worker, self).__init__(worker_connection, worker_number)
+ self._worker_connection = worker_connection
+ self._worker_number = worker_number
+ self._name = 'worker/%d' % worker_number
self._results_directory = results_directory
self._options = options
+
+ # The remaining fields are initialized in safe_init()
+ self._host = None
self._port = None
self._batch_size = None
self._batch_count = None
self.cleanup()
def safe_init(self):
- """This method should only be called when it is is safe for the mixin
- to create state that can't be Pickled.
+ """This method is called when it is safe for the object to create state that
+ does not need to be pickled (usually this means it is called in a child process)."""
+ self._host = self._worker_connection.host
+ self._filesystem = self._host.filesystem
+ self._port = self._host.port_factory.get(self._options.platform, self._options)
- This routine exists so that the mixin can be created and then marshaled
- across into a child process."""
- self._filesystem = self._port.host.filesystem
self._batch_count = 0
self._batch_size = self._options.batch_size or 0
tests_run_filename = self._filesystem.join(self._results_directory, "tests_run%d.txt" % self._worker_number)
self._tests_run_file = self._filesystem.open_text_file_for_writing(tests_run_filename)
- def run(self, host):
- if not host:
- host = Host()
-
- options = self._options
- self._port = host.port_factory.get(options.platform, options)
-
- self.safe_init()
- try:
- _log.debug("%s starting" % self._name)
- super(Worker, self).run()
- finally:
- self.kill_driver()
- _log.debug("%s exiting" % self._name)
- self.cleanup()
- self._worker_connection.post_message('done')
-
- def handle_test_list(self, src, list_name, test_list):
+ def handle(self, name, source, list_name, test_list):
+ assert name == 'test_list'
start_time = time.time()
num_tests = 0
for test_input in test_list:
elapsed_time = time.time() - start_time
self._worker_connection.post_message('finished_list', list_name, num_tests, elapsed_time)
- def handle_stop(self, src):
- self.stop_handling_messages()
-
def _update_test_input(self, test_input):
test_input.reference_files = self._port.reference_files(test_input.test_name)
if test_input.reference_files: