Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / third_party / typ / typ / pool.py
1 # Copyright 2014 Google Inc. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import copy
16 import multiprocessing
17 import pickle
18
19 from typ.host import Host
20
21
22 def make_pool(host, jobs, callback, context, pre_fn, post_fn):
23     _validate_args(context, pre_fn, post_fn)
24     if jobs > 1:
25         return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn)
26     else:
27         return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn)
28
29
30 class _MessageType(object):
31     Request = 'Request'
32     Response = 'Response'
33     Close = 'Close'
34     Done = 'Done'
35     Error = 'Error'
36     Interrupt = 'Interrupt'
37
38     values = [Request, Response, Close, Done, Error, Interrupt]
39
40
41 def _validate_args(context, pre_fn, post_fn):
42     try:
43         _ = pickle.dumps(context)
44     except Exception as e:
45         raise ValueError('context passed to make_pool is not picklable: %s'
46                          % str(e))
47     try:
48         _ = pickle.dumps(pre_fn)
49     except pickle.PickleError:
50         raise ValueError('pre_fn passed to make_pool is not picklable')
51     try:
52         _ = pickle.dumps(post_fn)
53     except pickle.PickleError:
54         raise ValueError('post_fn passed to make_pool is not picklable')
55
56
57 class _ProcessPool(object):
58
59     def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
60         self.host = host
61         self.jobs = jobs
62         self.requests = multiprocessing.Queue()
63         self.responses = multiprocessing.Queue()
64         self.workers = []
65         self.discarded_responses = []
66         self.closed = False
67         self.erred = False
68         for worker_num in range(1, jobs + 1):
69             w = multiprocessing.Process(target=_loop,
70                                         args=(self.requests, self.responses,
71                                               host.for_mp(), worker_num,
72                                               callback, context,
73                                               pre_fn, post_fn))
74             w.start()
75             self.workers.append(w)
76
77     def send(self, msg):
78         self.requests.put((_MessageType.Request, msg))
79
80     def get(self):
81         msg_type, resp = self.responses.get()
82         if msg_type == _MessageType.Error:
83             self._handle_error(resp)
84         elif msg_type == _MessageType.Interrupt:
85             raise KeyboardInterrupt
86         assert msg_type == _MessageType.Response
87         return resp
88
89     def close(self):
90         for _ in self.workers:
91             self.requests.put((_MessageType.Close, None))
92         self.closed = True
93
94     def join(self):
95         # TODO: one would think that we could close self.requests in close(),
96         # above, and close self.responses below, but if we do, we get
97         # weird tracebacks in the daemon threads multiprocessing starts up.
98         # Instead, we have to hack the innards of multiprocessing. It
99         # seems likely that there's a bug somewhere, either in this module or
100         # in multiprocessing.
101         if self.host.is_python3:  # pragma: python3
102             multiprocessing.queues.is_exiting = lambda: True
103         else:  # pragma: python2
104             multiprocessing.util._exiting = True
105
106         if not self.closed:
107             # We must be aborting; terminate the workers rather than
108             # shutting down cleanly.
109             for w in self.workers:
110                 w.terminate()
111                 w.join()
112             return []
113
114         final_responses = []
115         error = None
116         interrupted = None
117         for w in self.workers:
118             while True:
119                 msg_type, resp = self.responses.get()
120                 if msg_type == _MessageType.Error:
121                     error = resp
122                     break
123                 if msg_type == _MessageType.Interrupt:
124                     interrupted = True
125                     break
126                 if msg_type == _MessageType.Done:
127                     final_responses.append(resp[1])
128                     break
129                 self.discarded_responses.append(resp)
130
131         for w in self.workers:
132             w.join()
133
134         # TODO: See comment above at the beginning of the function for
135         # why this is commented out.
136         # self.responses.close()
137
138         if error:
139             self._handle_error(error)
140         if interrupted:
141             raise KeyboardInterrupt
142         return final_responses
143
144     def _handle_error(self, msg):
145         worker_num, ex_str = msg
146         self.erred = True
147         raise Exception("error from worker %d: %s" % (worker_num, ex_str))
148
149
150 # 'Too many arguments' pylint: disable=R0913
151
152 def _loop(requests, responses, host, worker_num,
153           callback, context, pre_fn, post_fn, should_loop=True):
154     host = host or Host()
155     try:
156         context_after_pre = pre_fn(host, worker_num, context)
157         keep_looping = True
158         while keep_looping:
159             message_type, args = requests.get(block=True)
160             if message_type == _MessageType.Close:
161                 responses.put((_MessageType.Done,
162                                (worker_num, post_fn(context_after_pre))))
163                 break
164             assert message_type == _MessageType.Request
165             resp = callback(context_after_pre, args)
166             responses.put((_MessageType.Response, resp))
167             keep_looping = should_loop
168     except KeyboardInterrupt as e:
169         responses.put((_MessageType.Interrupt, (worker_num, str(e))))
170     except Exception as e:
171         responses.put((_MessageType.Error, (worker_num, str(e))))
172
173
174 class _AsyncPool(object):
175
176     def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
177         self.host = host or Host()
178         self.jobs = jobs
179         self.callback = callback
180         self.context = copy.deepcopy(context)
181         self.msgs = []
182         self.closed = False
183         self.post_fn = post_fn
184         self.context_after_pre = pre_fn(self.host, 1, self.context)
185         self.final_context = None
186
187     def send(self, msg):
188         self.msgs.append(msg)
189
190     def get(self):
191         return self.callback(self.context_after_pre, self.msgs.pop(0))
192
193     def close(self):
194         self.closed = True
195         self.final_context = self.post_fn(self.context_after_pre)
196
197     def join(self):
198         if not self.closed:
199             self.close()
200         return [self.final_context]