Imported Upstream version 1.0.0
[platform/upstream/js.git] / js / src / tests / workers.py
1 # Multiprocess activities with a push-driven divide-process-collect model.
2
3 import os, sys, time
4 from threading import Thread, Lock
5 from Queue import Queue, Empty
6 from datetime import datetime
7
8 class Source:
9     def __init__(self, task_list, results, verbose = False):
10         self.tasks = Queue()
11         for task in task_list:
12             self.tasks.put_nowait(task)
13
14         self.results = results
15         self.verbose = verbose
16     
17     def start(self, worker_count):
18         t0 = datetime.now()
19
20         sink = Sink(self.results)
21         self.workers = [ Worker(_+1, self.tasks, sink, self.verbose) for _ in range(worker_count) ]
22         if self.verbose: print '[P] Starting workers.'
23         for w in self.workers:
24             w.t0 = t0
25             w.start()
26         ans = self.join_workers()
27         if self.verbose: print '[P] Finished.'
28
29         t1 = datetime.now()
30         dt = t1-t0
31
32         return ans
33
34     def join_workers(self):
35         try:
36             for w in self.workers:
37                 w.thread.join(20000)
38             return True
39         except KeyboardInterrupt:
40             for w in self.workers:
41                 w.stop = True
42             return False
43
44 class Sink:
45     def __init__(self, results):
46         self.results = results
47         self.lock = Lock()
48
49     def push(self, result):
50         self.lock.acquire()
51         try:
52             self.results.push(result)
53         finally:
54             self.lock.release()
55
56 class Worker(object):
57     def __init__(self, id, tasks, sink, verbose):
58         self.id = id
59         self.tasks = tasks
60         self.sink = sink
61         self.verbose = verbose
62
63         self.thread = None
64         self.stop = False
65
66     def log(self, msg):
67         dd = datetime.now() - self.t0
68         dt = dd.seconds + 1e-6 * dd.microseconds
69         
70         if self.verbose:
71             print '[W%d %.3f] %s' % (self.id, dt, msg)
72
73     def start(self):
74         self.thread = Thread(target=self.run)
75         self.thread.setDaemon(True)
76         self.thread.start()
77
78     def run(self):
79         try:
80             while True:
81                 if self.stop:
82                     break
83                 self.log('Get next task.')
84                 task = self.tasks.get(False)
85                 self.log('Start task %s.'%str(task))
86                 result = task()
87                 self.log('Finished task.')
88                 self.sink.push(result)
89                 self.log('Pushed result.')
90         except Empty:
91             pass