try:
it = pool.imap_unordered(RunTest, queue)
for result in it:
- test = test_map[result[0]]
+ if result.heartbeat:
+ self.indicator.Heartbeat()
+ continue
+ test = test_map[result.value[0]]
if self.context.predictable:
- update_perf = self._ProcessTestPredictable(test, result, pool)
+ update_perf = self._ProcessTestPredictable(test, result.value, pool)
else:
- update_perf = self._ProcessTestNormal(test, result, pool)
+ update_perf = self._ProcessTestNormal(test, result.value, pool)
if update_perf:
self._RunPerfSafe(lambda: self.perfdata.UpdatePerfData(test))
finally:
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+from Queue import Empty
from multiprocessing import Event, Process, Queue
class NormalResult():
self.break_now = True
+class MaybeResult():
+ def __init__(self, heartbeat, value):
+ self.heartbeat = heartbeat
+ self.value = value
+
+ @staticmethod
+ def create_heartbeat():
+ return MaybeResult(True, None)
+
+ @staticmethod
+ def create_result(value):
+ return MaybeResult(False, value)
+
+
def Worker(fn, work_queue, done_queue, done):
"""Worker to be run in a child process.
The worker stops on two conditions. 1. When the poison pill "STOP" is
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
- def __init__(self, num_workers):
+ def __init__(self, num_workers, heartbeat_timeout=30):
self.num_workers = num_workers
self.processes = []
self.terminated = False
self.work_queue = Queue()
self.done_queue = Queue()
self.done = Event()
+ self.heartbeat_timeout = heartbeat_timeout
def imap_unordered(self, fn, gen):
"""Maps function "fn" to items in generator "gen" on the worker processes
in an arbitrary order. The items are expected to be lists of arguments to
- the function. Returns a results iterator."""
+ the function. Returns a results iterator. A result value of type
+ MaybeResult either indicates a heartbeat of the runner, i.e. indicating
+ that the runner is still waiting for the result to be computed, or it wraps
+ the real result."""
try:
gen = iter(gen)
self.advance = self._advance_more
self.advance(gen)
while self.count > 0:
- result = self.done_queue.get()
+ while True:
+ try:
+ result = self.done_queue.get(timeout=self.heartbeat_timeout)
+ break
+ except Empty:
+ # Indicate a heartbeat. The iterator will continue fetching the
+ # next result.
+ yield MaybeResult.create_heartbeat()
self.count -= 1
if result.exception:
# Ignore items with unexpected exceptions.
# A keyboard interrupt happened in one of the worker processes.
raise KeyboardInterrupt
else:
- yield result.result
+ yield MaybeResult.create_result(result.result)
self.advance(gen)
finally:
self.terminate()
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
- results.add(result)
+ results.add(result.value)
self.assertEquals(set(range(0, 10)), results)
def testException(self):
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
# Item 10 will not appear in results due to an internal exception.
- results.add(result)
+ results.add(result.value)
expect = set(range(0, 12))
expect.remove(10)
self.assertEquals(expect, results)
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
- results.add(result)
- if result < 30:
- pool.add([result + 20])
+ results.add(result.value)
+ if result.value < 30:
+ pool.add([result.value + 20])
self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)),
results)
def HasRun(self, test, has_unexpected_output):
pass
+ def Heartbeat(self):
+ pass
+
def PrintFailureHeader(self, test):
if test.suite.IsNegativeTest(test):
negative_marker = '[negative] '
outcome = 'pass'
print 'Done running %s: %s' % (test.GetLabel(), outcome)
+ def Heartbeat(self):
+ print 'Still working...'
+ sys.stdout.flush()
+
class DotsProgressIndicator(SimpleProgressIndicator):