nrwt: make the worker class stand alone with a cleaner interface
authordpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Tue, 3 Jul 2012 23:31:57 +0000 (23:31 +0000)
committerdpranke@chromium.org <dpranke@chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Tue, 3 Jul 2012 23:31:57 +0000 (23:31 +0000)
https://bugs.webkit.org/show_bug.cgi?id=90409

Reviewed by Ojan Vafai.

Currently the Worker class derives from AbstractWorker, which is
kind of crufty and awkward; it would be better if we did not
rely on shared state.

This change changes that so that Worker derives from object, and
exposes the following interface:
  __init__() - called in the manager process
  safe_init() - called in the worker process to initialize
    unpicklable state
  handle() - a single routine to handle all messages
  cleanup() - called so the worker can clean up

Also, all of the "administrative" messages that are handled by
the worker (notification of start/stop/etc.) move into
manager_worker_broker - this reduces worker.py to just handling
the mechanics of actually running each test.

For the moment, we do this by creating Yet Another wrapper/proxy
class in manager_worker_broker, but this will get simpler
shortly when the rest of m_w_b is cleaned up.

With this change worker is now in its new form but there will be
a follow-on change that cleans up some names and other minor
things.

This change is again mostly just moving things around and should
be covered by the (updated) existing tests.

* Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
(get):
(AbstractWorker.__init__):
(AbstractWorker.run):
(AbstractWorker):
(AbstractWorker.handle_stop):
(AbstractWorker.handle_test_list):
(AbstractWorker.yield_to_broker):
(AbstractWorker.post_message):
(_WorkerConnection.__init__):
(_Process.run):
* Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
(_TestWorker):
(_TestWorker.__init__):
(_TestWorker.name):
(_TestWorker.cleanup):
(_TestWorker.handle):
(_TestWorker.safe_init):
(_TestWorker.stop):
(_TestsMixin.handle_finished_test):
(_TestsMixin.setUp):
(_TestsMixin.test_cancel):
(_TestsMixin.test_done):
* Scripts/webkitpy/layout_tests/controllers/worker.py:
(Worker):
(Worker.__init__):
(Worker.safe_init):
(Worker.handle):

git-svn-id: http://svn.webkit.org/repository/webkit/trunk@121809 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Tools/ChangeLog
Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py
Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py
Tools/Scripts/webkitpy/layout_tests/controllers/worker.py

index 17d2eaf..e73ca6c 100644 (file)
@@ -1,5 +1,69 @@
 2012-07-03  Dirk Pranke  <dpranke@chromium.org>
 
+        nrwt: make the worker class stand alone with a cleaner interface
+        https://bugs.webkit.org/show_bug.cgi?id=90409
+
+        Reviewed by Ojan Vafai.
+
+        Currently the Worker class derives from AbstractWorker, which is
+        kind of crufty and awkward; it would be better if we did not
+        rely on shared state.
+
+        This change changes that so that Worker derives from object, and
+        exposes the following interface:
+          __init__() - called in the manager process
+          safe_init() - called in the worker process to initialize
+            unpicklable state
+          handle() - a single routine to handle all messages
+          cleanup() - called so the worker can clean up
+
+        Also, all of the "administrative" messages that are handled by
+        the worker (notification of start/stop/etc.) move into
+        manager_worker_broker - this reduces worker.py to just handling
+        the mechanics of actually running each test.
+
+        For the moment, we do this by creating Yet Another wrapper/proxy
+        class in manager_worker_broker, but this will get simpler
+        shortly when the rest of m_w_b is cleaned up.
+
+        With this change worker is now in its new form but there will be
+        a follow-on change that cleans up some names and other minor
+        things.
+
+        This change is again mostly just moving things around and should
+        be covered by the (updated) existing tests.
+
+        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
+        (get):
+        (AbstractWorker.__init__):
+        (AbstractWorker.run):
+        (AbstractWorker):
+        (AbstractWorker.handle_stop):
+        (AbstractWorker.handle_test_list):
+        (AbstractWorker.yield_to_broker):
+        (AbstractWorker.post_message):
+        (_WorkerConnection.__init__):
+        (_Process.run):
+        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
+        (_TestWorker):
+        (_TestWorker.__init__):
+        (_TestWorker.name):
+        (_TestWorker.cleanup):
+        (_TestWorker.handle):
+        (_TestWorker.safe_init):
+        (_TestWorker.stop):
+        (_TestsMixin.handle_finished_test):
+        (_TestsMixin.setUp):
+        (_TestsMixin.test_cancel):
+        (_TestsMixin.test_done):
+        * Scripts/webkitpy/layout_tests/controllers/worker.py:
+        (Worker):
+        (Worker.__init__):
+        (Worker.safe_init):
+        (Worker.handle):
+
+2012-07-03  Dirk Pranke  <dpranke@chromium.org>
+
         nrwt: moving child process logging code into manager_worker_broker
         https://bugs.webkit.org/show_bug.cgi?id=90408
 
index 790df93..7e83384 100755 (executable)
@@ -75,6 +75,7 @@ import sys
 import traceback
 
 
+from webkitpy.common.host import Host
 from webkitpy.common.system import stack_utils
 from webkitpy.layout_tests.views import metered_stream
 
@@ -96,7 +97,7 @@ def get(max_workers, client, worker_factory, host=None):
         max_workers - max # of workers to run concurrently.
         client - BrokerClient implementation to dispatch
             replies to.
-        worker_factory: factory method for creatin objects that implement the AbstractWorker interface.
+        worker_factory: factory method for creating objects that implement the Worker interface.
         host: optional picklable host object that can be passed to workers for testing.
     Returns:
         A handle to an object that will talk to a message broker configured
@@ -263,23 +264,15 @@ class _BrokerConnection(object):
 
 class AbstractWorker(BrokerClient):
     def __init__(self, worker_connection, worker_number):
-        """The constructor should be used to do any simple initialization
-        necessary, but should not do anything that creates data structures
-        that cannot be Pickled or sent across processes (like opening
-        files or sockets). Complex initialization should be done at the
-        start of the run() call.
-
-        Args:
-            worker_connection - handle to the _BrokerConnection object creating
-                the worker and that can be used for messaging.
-            worker_number - the number/index of the current worker."""
         BrokerClient.__init__(self)
+        self.worker = None
         self._worker_connection = worker_connection
         self._worker_number = worker_number
         self._name = 'worker/%d' % worker_number
         self._done = False
         self._canceled = False
         self._options = optparse.Values({'verbose': False})
+        self.host = None
 
     def name(self):
         return self._name
@@ -290,10 +283,14 @@ class AbstractWorker(BrokerClient):
     def stop_handling_messages(self):
         self._done = True
 
-    def run(self):
+    def run(self, host):
         """Callback for the worker to start executing. Typically does any
         remaining initialization and then calls broker_connection.run_message_loop()."""
         exception_msg = ""
+        self.host = host
+
+        self.worker.safe_init()
+        _log.debug('%s starting' % self._name)
 
         try:
             self._worker_connection.run_message_loop()
@@ -308,6 +305,14 @@ class AbstractWorker(BrokerClient):
             self._worker_connection.raise_exception(sys.exc_info())
         finally:
             _log.debug("%s done with message loop%s" % (self._name, exception_msg))
+            self.worker.cleanup()
+            self._worker_connection.post_message('done')
+
+    def handle_stop(self, source):
+        self._done = True
+
+    def handle_test_list(self, source, list_name, test_list):
+        self.worker.handle('test_list', source, list_name, test_list)
 
     def cancel(self):
         """Called when possible to indicate to the worker to stop processing
@@ -315,6 +320,12 @@ class AbstractWorker(BrokerClient):
         method being called, so clients should not rely solely on this."""
         self._canceled = True
 
+    def yield_to_broker(self):
+        self._worker_connection.yield_to_broker()
+
+    def post_message(self, *args):
+        self._worker_connection.post_message(*args)
+
 
 class _ManagerConnection(_BrokerConnection):
     def __init__(self, broker, client, worker_factory, host):
@@ -362,7 +373,12 @@ class _MultiProcessManager(_ManagerConnection):
 
 class _WorkerConnection(_BrokerConnection):
     def __init__(self, host, broker, worker_factory, worker_number):
-        self._client = worker_factory(self, worker_number)
+        # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the
+        # actual Worker (created by worker_factory) is very confusing, but this all gets better when
+        # _WorkerConnection and AbstractWorker get merged.
+        self._client = AbstractWorker(self, worker_number)
+        self._worker = worker_factory(self._client, worker_number)
+        self._client.worker = self._worker
         self._host = host
         self._log_messages = []
         self._logger = None
@@ -451,6 +467,8 @@ class _Process(multiprocessing.Process):
         self._client = client
 
     def run(self):
+        if not self._worker_connection._host:
+            self._worker_connection._host = Host()
         self._worker_connection.run()
 
 
index f8be07b..d7c3714 100644 (file)
@@ -54,31 +54,34 @@ def make_broker(manager, max_workers, start_queue=None, stop_queue=None):
     return manager_worker_broker.get(max_workers, manager, _TestWorker)
 
 
-class _TestWorker(manager_worker_broker.AbstractWorker):
-    def __init__(self, worker_connection, worker_number=1):
-        super(_TestWorker, self).__init__(worker_connection, worker_number)
+class _TestWorker(object):
+    def __init__(self, caller, worker_number):
+        self._caller = caller
         self._thing_to_greet = 'everybody'
         self._starting_queue = starting_queue
         self._stopping_queue = stopping_queue
+        self._options = optparse.Values({'verbose': False})
 
-    def handle_stop(self, src):
-        self.stop_handling_messages()
+    def name(self):
+        return WORKER_NAME
+
+    def cleanup(self):
+        pass
 
-    def handle_test(self, src, an_int, a_str):
+    def handle(self, message, src, an_int, a_str):
         assert an_int == 1
         assert a_str == "hello, world"
-        self._worker_connection.post_message('test', 2, 'hi, ' + self._thing_to_greet)
+        self._caller.post_message('finished_test', 2)
 
-    def run(self, host):
+    def safe_init(self):
         if self._starting_queue:
             self._starting_queue.put('')
 
         if self._stopping_queue:
             self._stopping_queue.get()
-        try:
-            super(_TestWorker, self).run()
-        finally:
-            self._worker_connection.post_message('done')
+
+    def stop(self):
+        self._caller.post_message('done')
 
 
 class FunctionTests(unittest.TestCase):
@@ -105,16 +108,14 @@ class _TestsMixin(object):
     def handle_done(self, src, log_messages):
         self._done = True
 
-    def handle_test(self, src, an_int, a_str):
+    def handle_finished_test(self, src, an_int, log_messages):
         self._an_int = an_int
-        self._a_str = a_str
 
     def handle_exception(self, src, exception_type, exception_value, stack):
         raise exception_type(exception_value)
 
     def setUp(self):
         self._an_int = None
-        self._a_str = None
         self._broker = None
         self._done = False
         self._exception = None
@@ -136,7 +137,7 @@ class _TestsMixin(object):
     def test_cancel(self):
         self.make_broker()
         worker = self._broker.start_worker(1)
-        self._broker.post_message('test', 1, 'hello, world')
+        self._broker.post_message('test_list', 1, 'hello, world')
         worker.cancel()
         worker.join(0.1)
         self.assertFalse(worker.is_alive())
@@ -145,14 +146,13 @@ class _TestsMixin(object):
     def test_done(self):
         self.make_broker()
         worker = self._broker.start_worker(1)
-        self._broker.post_message('test', 1, 'hello, world')
+        self._broker.post_message('test_list', 1, 'hello, world')
         self._broker.post_message('stop')
         self._broker.run_message_loop()
         worker.join(0.5)
         self.assertFalse(worker.is_alive())
         self.assertTrue(self.is_done())
         self.assertEqual(self._an_int, 2)
-        self.assertEqual(self._a_str, 'hi, everybody')
         self._broker.cleanup()
 
     def test_unknown_message(self):
index 22a96aa..1f6d0b5 100644 (file)
@@ -32,8 +32,6 @@ import logging
 import threading
 import time
 
-from webkitpy.common.host import Host
-from webkitpy.layout_tests.controllers import manager_worker_broker
 from webkitpy.layout_tests.controllers import single_test_runner
 from webkitpy.layout_tests.models import test_expectations
 from webkitpy.layout_tests.models import test_results
@@ -42,11 +40,16 @@ from webkitpy.layout_tests.models import test_results
 _log = logging.getLogger(__name__)
 
 
-class Worker(manager_worker_broker.AbstractWorker):
+class Worker(object):
     def __init__(self, worker_connection, worker_number, results_directory, options):
-        super(Worker, self).__init__(worker_connection, worker_number)
+        self._worker_connection = worker_connection
+        self._worker_number = worker_number
+        self._name = 'worker/%d' % worker_number
         self._results_directory = results_directory
         self._options = options
+
+        # The remaining fields are initialized in safe_init()
+        self._host = None
         self._port = None
         self._batch_size = None
         self._batch_count = None
@@ -59,35 +62,19 @@ class Worker(manager_worker_broker.AbstractWorker):
         self.cleanup()
 
     def safe_init(self):
-        """This method should only be called when it is is safe for the mixin
-        to create state that can't be Pickled.
+        """This method is called when it is safe for the object to create state that
+        does not need to be pickled (usually this means it is called in a child process)."""
+        self._host = self._worker_connection.host
+        self._filesystem = self._host.filesystem
+        self._port = self._host.port_factory.get(self._options.platform, self._options)
 
-        This routine exists so that the mixin can be created and then marshaled
-        across into a child process."""
-        self._filesystem = self._port.host.filesystem
         self._batch_count = 0
         self._batch_size = self._options.batch_size or 0
         tests_run_filename = self._filesystem.join(self._results_directory, "tests_run%d.txt" % self._worker_number)
         self._tests_run_file = self._filesystem.open_text_file_for_writing(tests_run_filename)
 
-    def run(self, host):
-        if not host:
-            host = Host()
-
-        options = self._options
-        self._port = host.port_factory.get(options.platform, options)
-
-        self.safe_init()
-        try:
-            _log.debug("%s starting" % self._name)
-            super(Worker, self).run()
-        finally:
-            self.kill_driver()
-            _log.debug("%s exiting" % self._name)
-            self.cleanup()
-            self._worker_connection.post_message('done')
-
-    def handle_test_list(self, src, list_name, test_list):
+    def handle(self, name, source, list_name, test_list):
+        assert name == 'test_list'
         start_time = time.time()
         num_tests = 0
         for test_input in test_list:
@@ -99,9 +86,6 @@ class Worker(manager_worker_broker.AbstractWorker):
         elapsed_time = time.time() - start_time
         self._worker_connection.post_message('finished_list', list_name, num_tests, elapsed_time)
 
-    def handle_stop(self, src):
-        self.stop_handling_messages()
-
     def _update_test_input(self, test_input):
         test_input.reference_files = self._port.reference_files(test_input.test_name)
         if test_input.reference_files: