[lit][NFC] Cleanup lit worker process handling
authorJulian Lettner <jlettner@apple.com>
Sat, 16 Feb 2019 00:40:40 +0000 (00:40 +0000)
committerJulian Lettner <jlettner@apple.com>
Sat, 16 Feb 2019 00:40:40 +0000 (00:40 +0000)
Move code that is executed on worker process to separate file. This
makes the use of the pickled arguments stored in global variables in the
worker a bit clearer. (Still not pretty though.)

Extract handling of parallelism groups to it's own function.

Use BoundedSemaphore instead of Semaphore. BoundedSemaphore raises for
unmatched release() calls.

Cleanup imports.

Differential Revision: https://reviews.llvm.org/D58196

llvm-svn: 354187

llvm/utils/lit/lit/run.py
llvm/utils/lit/lit/util.py
llvm/utils/lit/lit/worker.py [new file with mode: 0644]

index a6de83e..5483f7f 100644 (file)
@@ -1,28 +1,9 @@
-import os
-import sys
-import threading
+import multiprocessing
 import time
-import traceback
-try:
-    import Queue as queue
-except ImportError:
-    import queue
-
-try:
-    import win32api
-except ImportError:
-    win32api = None
 
-import multiprocessing
 import lit.Test
-
-def abort_now():
-    """Abort the current process without doing any exception teardown"""
-    sys.stdout.flush()
-    if win32api:
-        win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
-    else:
-        os.kill(0, 9)
+import lit.util
+import lit.worker
 
 class _Display(object):
     def __init__(self, display, provider, maxFailures):
@@ -48,12 +29,11 @@ class Run(object):
         # For example, some ASan tests require lots of virtual memory and run
         # faster with less parallelism on OS X.
         self.parallelism_semaphores = \
-                {k: multiprocessing.Semaphore(v) for k, v in
+                {k: multiprocessing.BoundedSemaphore(v) for k, v in
                  self.lit_config.parallelism_groups.items()}
 
     def execute_test(self, test):
-        return _execute_test_impl(test, self.lit_config,
-                                  self.parallelism_semaphores)
+        return lit.worker._execute_test(test, self.lit_config)
 
     def execute_tests_in_pool(self, jobs, max_time):
         # We need to issue many wait calls, so compute the final deadline and
@@ -67,22 +47,22 @@ class Run(object):
         # interrupts the workers before we make it into our task callback, they
         # will each raise a KeyboardInterrupt exception and print to stderr at
         # the same time.
-        pool = multiprocessing.Pool(jobs, worker_initializer,
+        pool = multiprocessing.Pool(jobs, lit.worker.initializer,
                                     (self.lit_config,
                                      self.parallelism_semaphores))
 
         # Install a console-control signal handler on Windows.
-        if win32api is not None:
+        if lit.util.win32api is not None:
             def console_ctrl_handler(type):
                 print('\nCtrl-C detected, terminating.')
                 pool.terminate()
                 pool.join()
-                abort_now()
+                lit.util.abort_now()
                 return True
-            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+            lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
 
         try:
-            async_results = [pool.apply_async(worker_run_one_test,
+            async_results = [pool.apply_async(lit.worker.run_one_test,
                                               args=(test_index, test),
                                               callback=self.consume_test_result)
                              for test_index, test in enumerate(self.tests)]
@@ -143,11 +123,9 @@ class Run(object):
         self.failure_count = 0
         self.hit_max_failures = False
         if jobs == 1:
-            global child_lit_config
-            child_lit_config = self.lit_config
             for test_index, test in enumerate(self.tests):
-                result = worker_run_one_test(test_index, test)
-                self.consume_test_result(result)
+                lit.worker._execute_test(test, self.lit_config)
+                self.consume_test_result((test_index, test))
                 if self.hit_max_failures:
                     break
         else:
@@ -159,7 +137,7 @@ class Run(object):
                 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
 
     def consume_test_result(self, pool_result):
-        """Test completion callback for worker_run_one_test
+        """Test completion callback for lit.worker.run_one_test
 
         Updates the test result status in the parent process. Each task in the
         pool returns the test index and the result, and we use the index to look
@@ -186,74 +164,3 @@ class Run(object):
         if self.lit_config.maxFailures and \
                 self.failure_count == self.lit_config.maxFailures:
             self.hit_max_failures = True
-
-def _execute_test_impl(test, lit_config, parallelism_semaphores):
-    """Execute one test"""
-    pg = test.config.parallelism_group
-    if callable(pg):
-        pg = pg(test)
-
-    result = None
-    semaphore = None
-    try:
-        if pg:
-            semaphore = parallelism_semaphores[pg]
-        if semaphore:
-            semaphore.acquire()
-        start_time = time.time()
-        result = test.config.test_format.execute(test, lit_config)
-        # Support deprecated result from execute() which returned the result
-        # code and additional output as a tuple.
-        if isinstance(result, tuple):
-            code, output = result
-            result = lit.Test.Result(code, output)
-        elif not isinstance(result, lit.Test.Result):
-            raise ValueError("unexpected result from test execution")
-        result.elapsed = time.time() - start_time
-    except KeyboardInterrupt:
-        raise
-    except:
-        if lit_config.debug:
-            raise
-        output = 'Exception during script execution:\n'
-        output += traceback.format_exc()
-        output += '\n'
-        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
-    finally:
-        if semaphore:
-            semaphore.release()
-
-    test.setResult(result)
-
-child_lit_config = None
-child_parallelism_semaphores = None
-
-def worker_initializer(lit_config, parallelism_semaphores):
-    """Copy expensive repeated data into worker processes"""
-    global child_lit_config
-    child_lit_config = lit_config
-    global child_parallelism_semaphores
-    child_parallelism_semaphores = parallelism_semaphores
-
-def worker_run_one_test(test_index, test):
-    """Run one test in a multiprocessing.Pool
-
-    Side effects in this function and functions it calls are not visible in the
-    main lit process.
-
-    Arguments and results of this function are pickled, so they should be cheap
-    to copy. For efficiency, we copy all data needed to execute all tests into
-    each worker and store it in the child_* global variables. This reduces the
-    cost of each task.
-
-    Returns an index and a Result, which the parent process uses to update
-    the display.
-    """
-    try:
-        _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
-        return (test_index, test)
-    except KeyboardInterrupt as e:
-        # If a worker process gets an interrupt, abort it immediately.
-        abort_now()
-    except:
-        traceback.print_exc()
index dee70b3..58b5563 100644 (file)
@@ -424,3 +424,17 @@ def killProcessAndChildren(pid):
         psutilProc.kill()
     except psutil.NoSuchProcess:
         pass
+
+
+try:
+    import win32api
+except ImportError:
+    win32api = None
+
+def abort_now():
+    """Abort the current process without doing any exception teardown"""
+    sys.stdout.flush()
+    if win32api:
+        win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
+    else:
+        os.kill(0, 9)
diff --git a/llvm/utils/lit/lit/worker.py b/llvm/utils/lit/lit/worker.py
new file mode 100644 (file)
index 0000000..f97b6a0
--- /dev/null
@@ -0,0 +1,82 @@
+# The functions in this module are meant to run on a separate worker process.
+# Exception: in single process mode _execute_test is called directly.
+import time
+import traceback
+
+import lit.Test
+import lit.util
+
+_lit_config = None
+_parallelism_semaphores = None
+
+def initializer(lit_config, parallelism_semaphores):
+    """Copy expensive repeated data into worker processes"""
+    global _lit_config
+    global _parallelism_semaphores
+    _lit_config = lit_config
+    _parallelism_semaphores = parallelism_semaphores
+
+def run_one_test(test_index, test):
+    """Run one test in a multiprocessing.Pool
+
+    Side effects in this function and functions it calls are not visible in the
+    main lit process.
+
+    Arguments and results of this function are pickled, so they should be cheap
+    to copy. For efficiency, we copy all data needed to execute all tests into
+    each worker and store it in the worker_* global variables. This reduces the
+    cost of each task.
+
+    Returns an index and a Result, which the parent process uses to update
+    the display.
+    """
+    try:
+        _execute_test_in_parallelism_group(test, _lit_config,
+                                           _parallelism_semaphores)
+        return (test_index, test)
+    except KeyboardInterrupt:
+        # If a worker process gets an interrupt, abort it immediately.
+        lit.util.abort_now()
+    except:
+        traceback.print_exc()
+
+def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores):
+    """Execute one test inside the appropriate parallelism group"""
+    pg = test.config.parallelism_group
+    if callable(pg):
+        pg = pg(test)
+
+    if pg:
+        semaphore = parallelism_semaphores[pg]
+        try:
+            semaphore.acquire()
+            _execute_test(test, lit_config)
+        finally:
+            semaphore.release()
+    else:
+        _execute_test(test, lit_config)
+
+def _execute_test(test, lit_config):
+    """Execute one test"""
+    try:
+        start_time = time.time()
+        result = test.config.test_format.execute(test, lit_config)
+        # Support deprecated result from execute() which returned the result
+        # code and additional output as a tuple.
+        if isinstance(result, tuple):
+            code, output = result
+            result = lit.Test.Result(code, output)
+        elif not isinstance(result, lit.Test.Result):
+            raise ValueError("unexpected result from test execution")
+        result.elapsed = time.time() - start_time
+    except KeyboardInterrupt:
+        raise
+    except:
+        if lit_config.debug:
+            raise
+        output = 'Exception during script execution:\n'
+        output += traceback.format_exc()
+        output += '\n'
+        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+
+    test.setResult(result)