Make test runner more chatty to avoid it getting killed by buildbot.
authormachenbach <machenbach@chromium.org>
Wed, 8 Apr 2015 09:53:35 +0000 (02:53 -0700)
committerCommit bot <commit-bot@chromium.org>
Wed, 8 Apr 2015 09:53:39 +0000 (09:53 +0000)
NOTRY=true
NOTREECHECKS=true

Review URL: https://codereview.chromium.org/1064043002

Cr-Commit-Position: refs/heads/master@{#27653}

tools/testrunner/local/execution.py
tools/testrunner/local/pool.py
tools/testrunner/local/pool_unittest.py
tools/testrunner/local/progress.py

index 5c5fbac..35e0efe 100644 (file)
@@ -230,11 +230,14 @@ class Runner(object):
     try:
       it = pool.imap_unordered(RunTest, queue)
       for result in it:
-        test = test_map[result[0]]
+        if result.heartbeat:
+          self.indicator.Heartbeat()
+          continue
+        test = test_map[result.value[0]]
         if self.context.predictable:
-          update_perf = self._ProcessTestPredictable(test, result, pool)
+          update_perf = self._ProcessTestPredictable(test, result.value, pool)
         else:
-          update_perf = self._ProcessTestNormal(test, result, pool)
+          update_perf = self._ProcessTestNormal(test, result.value, pool)
         if update_perf:
           self._RunPerfSafe(lambda: self.perfdata.UpdatePerfData(test))
     finally:
index 602a2d4..b933f73 100644 (file)
@@ -3,6 +3,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+from Queue import Empty
 from multiprocessing import Event, Process, Queue
 
 class NormalResult():
@@ -24,6 +25,20 @@ class BreakResult():
     self.break_now = True
 
 
+class MaybeResult():
+  def __init__(self, heartbeat, value):
+    self.heartbeat = heartbeat
+    self.value = value
+
+  @staticmethod
+  def create_heartbeat():
+    return MaybeResult(True, None)
+
+  @staticmethod
+  def create_result(value):
+    return MaybeResult(False, value)
+
+
 def Worker(fn, work_queue, done_queue, done):
   """Worker to be run in a child process.
   The worker stops on two conditions. 1. When the poison pill "STOP" is
@@ -51,7 +66,7 @@ class Pool():
   # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
   BUFFER_FACTOR = 4
 
-  def __init__(self, num_workers):
+  def __init__(self, num_workers, heartbeat_timeout=30):
     self.num_workers = num_workers
     self.processes = []
     self.terminated = False
@@ -67,11 +82,15 @@ class Pool():
     self.work_queue = Queue()
     self.done_queue = Queue()
     self.done = Event()
+    self.heartbeat_timeout = heartbeat_timeout
 
   def imap_unordered(self, fn, gen):
     """Maps function "fn" to items in generator "gen" on the worker processes
     in an arbitrary order. The items are expected to be lists of arguments to
-    the function. Returns a results iterator."""
+    the function. Returns a results iterator. A result value of type
+    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
+    that the runner is still waiting for the result to be computed, or it wraps
+    the real result."""
     try:
       gen = iter(gen)
       self.advance = self._advance_more
@@ -86,7 +105,14 @@ class Pool():
 
       self.advance(gen)
       while self.count > 0:
-        result = self.done_queue.get()
+        while True:
+          try:
+            result = self.done_queue.get(timeout=self.heartbeat_timeout)
+            break
+          except Empty:
+            # Indicate a heartbeat. The iterator will continue fetching the
+            # next result.
+            yield MaybeResult.create_heartbeat()
         self.count -= 1
         if result.exception:
           # Ignore items with unexpected exceptions.
@@ -95,7 +121,7 @@ class Pool():
           # A keyboard interrupt happened in one of the worker processes.
           raise KeyboardInterrupt
         else:
-          yield result.result
+          yield MaybeResult.create_result(result.result)
         self.advance(gen)
     finally:
       self.terminate()
index bf2b3f8..335d20a 100644 (file)
@@ -17,7 +17,7 @@ class PoolTest(unittest.TestCase):
     results = set()
     pool = Pool(3)
     for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
-      results.add(result)
+      results.add(result.value)
     self.assertEquals(set(range(0, 10)), results)
 
   def testException(self):
@@ -25,7 +25,7 @@ class PoolTest(unittest.TestCase):
     pool = Pool(3)
     for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
       # Item 10 will not appear in results due to an internal exception.
-      results.add(result)
+      results.add(result.value)
     expect = set(range(0, 12))
     expect.remove(10)
     self.assertEquals(expect, results)
@@ -34,8 +34,8 @@ class PoolTest(unittest.TestCase):
     results = set()
     pool = Pool(3)
     for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
-      results.add(result)
-      if result < 30:
-        pool.add([result + 20])
+      results.add(result.value)
+      if result.value < 30:
+        pool.add([result.value + 20])
     self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)),
                       results)
index 2616958..f47fa3a 100644 (file)
@@ -66,6 +66,9 @@ class ProgressIndicator(object):
   def HasRun(self, test, has_unexpected_output):
     pass
 
+  def Heartbeat(self):
+    pass
+
   def PrintFailureHeader(self, test):
     if test.suite.IsNegativeTest(test):
       negative_marker = '[negative] '
@@ -128,6 +131,10 @@ class VerboseProgressIndicator(SimpleProgressIndicator):
       outcome = 'pass'
     print 'Done running %s: %s' % (test.GetLabel(), outcome)
 
+  def Heartbeat(self):
+    print 'Still working...'
+    sys.stdout.flush()
+
 
 class DotsProgressIndicator(SimpleProgressIndicator):