Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / build / android / pylib / utils / parallelizer.py
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """ Wrapper that allows method execution in parallel.
6
7 This class wraps a list of objects of the same type, emulates their
8 interface, and executes any functions called on the objects in parallel
9 in ReraiserThreads.
10
11 This means that, given a list of objects:
12
13   class Foo:
14     def __init__(self):
15       self.baz = Baz()
16
17     def bar(self, my_param):
18       // do something
19
20   list_of_foos = [Foo(1), Foo(2), Foo(3)]
21
22 we can take a sequential operation on that list of objects:
23
24   for f in list_of_foos:
25     f.bar('Hello')
26
27 and run it in parallel across all of the objects:
28
29   Parallelizer(list_of_foos).bar('Hello')
30
31 It can also handle (non-method) attributes of objects, so that this:
32
33   for f in list_of_foos:
34     f.baz.myBazMethod()
35
36 can be run in parallel with:
37
38   Parallelizer(list_of_foos).baz.myBazMethod()
39
40 Because it emulates the interface of the wrapped objects, a Parallelizer
41 can be passed to a method or function that takes objects of that type:
42
43   def DoesSomethingWithFoo(the_foo):
44     the_foo.bar('Hello')
45     the_foo.bar('world')
46     the_foo.baz.myBazMethod
47
48   DoesSomethingWithFoo(Parallelizer(list_of_foos))
49
50 Note that this class spins up a thread for each object. Using this class
51 to parallelize operations that are already fast will incur a net performance
52 penalty.
53
54 """
55 # pylint: disable=W0613
56
57 from pylib.utils import reraiser_thread
58 from pylib.utils import watchdog_timer
59
60 _DEFAULT_TIMEOUT = 30
61 _DEFAULT_RETRIES = 3
62
63
64 class Parallelizer(object):
65   """Allows parallel execution of method calls across a group of objects."""
66
67   def __init__(self, objs):
68     assert (objs is not None and len(objs) > 0), (
69         "Passed empty list to 'Parallelizer'")
70     self._orig_objs = objs
71     self._objs = objs
72
73   def __getattr__(self, name):
74     """Emulate getting the |name| attribute of |self|.
75
76     Args:
77       name: The name of the attribute to retrieve.
78     Returns:
79       A Parallelizer emulating the |name| attribute of |self|.
80     """
81     self.pGet(None)
82
83     r = type(self)(self._orig_objs)
84     r._objs = [getattr(o, name) for o in self._objs]
85     return r
86
87   def __getitem__(self, index):
88     """Emulate getting the value of |self| at |index|.
89
90     Returns:
91       A Parallelizer emulating the value of |self| at |index|.
92     """
93     self.pGet(None)
94
95     r = type(self)(self._orig_objs)
96     r._objs = [o[index] for o in self._objs]
97     return r
98
99   def __call__(self, *args, **kwargs):
100     """Emulate calling |self| with |args| and |kwargs|.
101
102     Note that this call is asynchronous. Call pFinish on the return value to
103     block until the call finishes.
104
105     Returns:
106       A Parallelizer wrapping the ReraiserThreadGroup running the call in
107       parallel.
108     Raises:
109       AttributeError if the wrapped objects aren't callable.
110     """
111     self.pGet(None)
112
113     if not self._objs:
114       raise AttributeError('Nothing to call.')
115     for o in self._objs:
116       if not callable(o):
117         raise AttributeError("'%s' is not callable" % o.__name__)
118
119     r = type(self)(self._orig_objs)
120     r._objs = reraiser_thread.ReraiserThreadGroup(
121         [reraiser_thread.ReraiserThread(
122             o, args=args, kwargs=kwargs,
123             name='%s.%s' % (str(d), o.__name__))
124          for d, o in zip(self._orig_objs, self._objs)])
125     r._objs.StartAll() # pylint: disable=W0212
126     return r
127
128   def pFinish(self, timeout):
129     """Finish any outstanding asynchronous operations.
130
131     Args:
132       timeout: The maximum number of seconds to wait for an individual
133                result to return, or None to wait forever.
134     Returns:
135       self, now emulating the return values.
136     """
137     self._assertNoShadow('pFinish')
138     if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
139       self._objs.JoinAll()
140       self._objs = self._objs.GetAllReturnValues(
141           watchdog_timer.WatchdogTimer(timeout))
142     return self
143
144   def pGet(self, timeout):
145     """Get the current wrapped objects.
146
147     Args:
148       timeout: Same as |pFinish|.
149     Returns:
150       A list of the results, in order of the provided devices.
151     Raises:
152       Any exception raised by any of the called functions.
153     """
154     self._assertNoShadow('pGet')
155     self.pFinish(timeout)
156     return self._objs
157
158   def pMap(self, f, *args, **kwargs):
159     """Map a function across the current wrapped objects in parallel.
160
161     This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
162
163     Note that this call is asynchronous. Call pFinish on the return value to
164     block until the call finishes.
165
166     Args:
167       f: The function to call.
168       args: The positional args to pass to f.
169       kwargs: The keyword args to pass to f.
170     Returns:
171       A Parallelizer wrapping the ReraiserThreadGroup running the map in
172       parallel.
173     """
174     self._assertNoShadow('pMap')
175     r = type(self)(self._orig_objs)
176     r._objs = reraiser_thread.ReraiserThreadGroup(
177         [reraiser_thread.ReraiserThread(
178             f, args=tuple([o] + list(args)), kwargs=kwargs,
179             name='%s(%s)' % (f.__name__, d))
180          for d, o in zip(self._orig_objs, self._objs)])
181     r._objs.StartAll() # pylint: disable=W0212
182     return r
183
184   def _assertNoShadow(self, attr_name):
185     """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
186
187     If the wrapped objects _do_ have an |attr_name| attribute, it will be
188     inaccessible to clients.
189
190     Args:
191       attr_name: The attribute to check.
192     Raises:
193       AssertionError if the wrapped objects have an attribute named 'attr_name'
194       or '_assertNoShadow'.
195     """
196     if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
197       assert(not hasattr(self._objs, '_assertNoShadow'))
198       assert(not hasattr(self._objs, 'pGet'))
199     else:
200       assert(not any(hasattr(o, '_assertNoShadow') for o in self._objs))
201       assert(not any(hasattr(o, 'pGet') for o in self._objs))
202
203
204 class SyncParallelizer(Parallelizer):
205   """A Parallelizer that blocks on function calls."""
206
207   #override
208   def __call__(self, *args, **kwargs):
209     """Emulate calling |self| with |args| and |kwargs|.
210
211     Note that this call is synchronous.
212
213     Returns:
214       A Parallelizer emulating the value returned from calling |self| with
215       |args| and |kwargs|.
216     Raises:
217       AttributeError if the wrapped objects aren't callable.
218     """
219     r = super(SyncParallelizer, self).__call__(*args, **kwargs)
220     r.pFinish(None)
221     return r
222
223   #override
224   def pMap(self, f, *args, **kwargs):
225     """Map a function across the current wrapped objects in parallel.
226
227     This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
228
229     Note that this call is synchronous.
230
231     Args:
232       f: The function to call.
233       args: The positional args to pass to f.
234       kwargs: The keyword args to pass to f.
235     Returns:
236       A Parallelizer wrapping the ReraiserThreadGroup running the map in
237       parallel.
238     """
239     r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
240     r.pFinish(None)
241     return r
242