1 # Multiprocess activities with a push-driven divide-process-collect model.
4 from threading import Thread, Lock
5 from Queue import Queue, Empty
6 from datetime import datetime
9 def __init__(self, task_list, results, verbose = False):
11 for task in task_list:
12 self.tasks.put_nowait(task)
14 self.results = results
15 self.verbose = verbose
17 def start(self, worker_count):
20 sink = Sink(self.results)
21 self.workers = [ Worker(_+1, self.tasks, sink, self.verbose) for _ in range(worker_count) ]
22 if self.verbose: print '[P] Starting workers.'
23 for w in self.workers:
26 ans = self.join_workers()
27 if self.verbose: print '[P] Finished.'
34 def join_workers(self):
36 for w in self.workers:
39 except KeyboardInterrupt:
40 for w in self.workers:
45 def __init__(self, results):
46 self.results = results
49 def push(self, result):
52 self.results.push(result)
57 def __init__(self, id, tasks, sink, verbose):
61 self.verbose = verbose
67 dd = datetime.now() - self.t0
68 dt = dd.seconds + 1e-6 * dd.microseconds
71 print '[W%d %.3f] %s' % (self.id, dt, msg)
74 self.thread = Thread(target=self.run)
75 self.thread.setDaemon(True)
83 self.log('Get next task.')
84 task = self.tasks.get(False)
85 self.log('Start task %s.'%str(task))
87 self.log('Finished task.')
88 self.sink.push(result)
89 self.log('Pushed result.')