Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / utils / threading_utils.py
1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
4
5 """Classes and functions related to threading."""
6
7 import functools
8 import inspect
9 import logging
10 import os
11 import Queue
12 import sys
13 import threading
14 import time
15 import traceback
16
17
18 # Priorities for tasks in AutoRetryThreadPool, particular values are important.
19 PRIORITY_HIGH = 1 << 8
20 PRIORITY_MED = 2 << 8
21 PRIORITY_LOW = 3 << 8
22
23
24 class LockWithAssert(object):
25   """Wrapper around (non recursive) Lock that tracks its owner."""
26
27   def __init__(self):
28     self._lock = threading.Lock()
29     self._owner = None
30
31   def __enter__(self):
32     self._lock.acquire()
33     assert self._owner is None
34     self._owner = threading.current_thread()
35
36   def __exit__(self, _exc_type, _exec_value, _traceback):
37     self.assert_locked('Releasing unowned lock')
38     self._owner = None
39     self._lock.release()
40     return False
41
42   def assert_locked(self, msg=None):
43     """Asserts the lock is owned by running thread."""
44     assert self._owner == threading.current_thread(), msg
45
46
47 class ThreadPoolError(Exception):
48   """Base class for exceptions raised by ThreadPool."""
49   pass
50
51
52 class ThreadPoolEmpty(ThreadPoolError):
53   """Trying to get task result from a thread pool with no pending tasks."""
54   pass
55
56
57 class ThreadPoolClosed(ThreadPoolError):
58   """Trying to do something with a closed thread pool."""
59   pass
60
61
62 class ThreadPool(object):
63   """Multithreaded worker pool with priority support.
64
65   When the priority of tasks match, it works in strict FIFO mode.
66   """
67   QUEUE_CLASS = Queue.PriorityQueue
68
69   def __init__(self, initial_threads, max_threads, queue_size, prefix=None):
70     """Immediately starts |initial_threads| threads.
71
72     Arguments:
73       initial_threads: Number of threads to start immediately. Can be 0 if it is
74                        uncertain that threads will be needed.
75       max_threads: Maximum number of threads that will be started when all the
76                    threads are busy working. Often the number of CPU cores.
77       queue_size: Maximum number of tasks to buffer in the queue. 0 for
78                   unlimited queue. A non-zero value may make add_task()
79                   blocking.
80       prefix: Prefix to use for thread names. Pool's threads will be
81               named '<prefix>-<thread index>'.
82     """
83     prefix = prefix or 'tp-0x%0x' % id(self)
84     logging.debug(
85         'New ThreadPool(%d, %d, %d): %s', initial_threads, max_threads,
86         queue_size, prefix)
87     assert initial_threads <= max_threads
88     assert max_threads <= 1024
89
90     self.tasks = self.QUEUE_CLASS(queue_size)
91     self._max_threads = max_threads
92     self._prefix = prefix
93
94     # Used to assign indexes to tasks.
95     self._num_of_added_tasks_lock = threading.Lock()
96     self._num_of_added_tasks = 0
97
98     # Lock that protected everything below (including conditional variable).
99     self._lock = threading.Lock()
100
101     # Condition 'bool(_outputs) or bool(_exceptions) or _pending_count == 0'.
102     self._outputs_exceptions_cond = threading.Condition(self._lock)
103     self._outputs = []
104     self._exceptions = []
105
106     # Number of pending tasks (queued or being processed now).
107     self._pending_count = 0
108
109     # List of threads.
110     self._workers = []
111     # Number of threads that are waiting for new tasks.
112     self._ready = 0
113     # Number of threads already added to _workers, but not yet running the loop.
114     self._starting = 0
115     # True if close was called. Forbids adding new tasks.
116     self._is_closed = False
117
118     for _ in range(initial_threads):
119       self._add_worker()
120
121   def _add_worker(self):
122     """Adds one worker thread if there isn't too many. Thread-safe."""
123     with self._lock:
124       if len(self._workers) >= self._max_threads or self._is_closed:
125         return False
126       worker = threading.Thread(
127         name='%s-%d' % (self._prefix, len(self._workers)), target=self._run)
128       self._workers.append(worker)
129       self._starting += 1
130     logging.debug('Starting worker thread %s', worker.name)
131     worker.daemon = True
132     worker.start()
133     return True
134
135   def add_task(self, priority, func, *args, **kwargs):
136     """Adds a task, a function to be executed by a worker.
137
138     Arguments:
139     - priority: priority of the task versus others. Lower priority takes
140                 precedence.
141     - func: function to run. Can either return a return value to be added to the
142             output list or be a generator which can emit multiple values.
143     - args and kwargs: arguments to |func|. Note that if func mutates |args| or
144                        |kwargs| and that the task is retried, see
145                        AutoRetryThreadPool, the retry will use the mutated
146                        values.
147
148     Returns:
149       Index of the item added, e.g. the total number of enqueued items up to
150       now.
151     """
152     assert isinstance(priority, int)
153     assert callable(func)
154     with self._lock:
155       if self._is_closed:
156         raise ThreadPoolClosed('Can not add a task to a closed ThreadPool')
157       start_new_worker = (
158         # Pending task count plus new task > number of available workers.
159         self.tasks.qsize() + 1 > self._ready + self._starting and
160         # Enough slots.
161         len(self._workers) < self._max_threads
162       )
163       self._pending_count += 1
164     with self._num_of_added_tasks_lock:
165       self._num_of_added_tasks += 1
166       index = self._num_of_added_tasks
167     self.tasks.put((priority, index, func, args, kwargs))
168     if start_new_worker:
169       self._add_worker()
170     return index
171
172   def _run(self):
173     """Worker thread loop. Runs until a None task is queued."""
174     # Thread has started, adjust counters.
175     with self._lock:
176       self._starting -= 1
177       self._ready += 1
178     while True:
179       try:
180         task = self.tasks.get()
181       finally:
182         with self._lock:
183           self._ready -= 1
184       try:
185         if task is None:
186           # We're done.
187           return
188         _priority, _index, func, args, kwargs = task
189         if inspect.isgeneratorfunction(func):
190           for out in func(*args, **kwargs):
191             self._output_append(out)
192         else:
193           out = func(*args, **kwargs)
194           self._output_append(out)
195       except Exception as e:
196         logging.warning('Caught exception: %s', e)
197         exc_info = sys.exc_info()
198         logging.info(''.join(traceback.format_tb(exc_info[2])))
199         with self._outputs_exceptions_cond:
200           self._exceptions.append(exc_info)
201           self._outputs_exceptions_cond.notifyAll()
202       finally:
203         try:
204           # Mark thread as ready again, mark task as processed. Do it before
205           # waking up threads waiting on self.tasks.join(). Otherwise they might
206           # find ThreadPool still 'busy' and perform unnecessary wait on CV.
207           with self._outputs_exceptions_cond:
208             self._ready += 1
209             self._pending_count -= 1
210             if self._pending_count == 0:
211               self._outputs_exceptions_cond.notifyAll()
212           self.tasks.task_done()
213         except Exception as e:
214           # We need to catch and log this error here because this is the root
215           # function for the thread, nothing higher will catch the error.
216           logging.exception('Caught exception while marking task as done: %s',
217                             e)
218
219   def _output_append(self, out):
220     if out is not None:
221       with self._outputs_exceptions_cond:
222         self._outputs.append(out)
223         self._outputs_exceptions_cond.notifyAll()
224
225   def join(self):
226     """Extracts all the results from each threads unordered.
227
228     Call repeatedly to extract all the exceptions if desired.
229
230     Note: will wait for all work items to be done before returning an exception.
231     To get an exception early, use get_one_result().
232     """
233     # TODO(maruel): Stop waiting as soon as an exception is caught.
234     self.tasks.join()
235     with self._outputs_exceptions_cond:
236       if self._exceptions:
237         e = self._exceptions.pop(0)
238         raise e[0], e[1], e[2]
239       out = self._outputs
240       self._outputs = []
241     return out
242
243   def get_one_result(self):
244     """Returns the next item that was generated or raises an exception if one
245     occurred.
246
247     Raises:
248       ThreadPoolEmpty - no results available.
249     """
250     # Get first available result.
251     for result in self.iter_results():
252       return result
253     # No results -> tasks queue is empty.
254     raise ThreadPoolEmpty('Task queue is empty')
255
256   def iter_results(self):
257     """Yields results as they appear until all tasks are processed."""
258     while True:
259       # Check for pending results.
260       result = None
261       self._on_iter_results_step()
262       with self._outputs_exceptions_cond:
263         if self._exceptions:
264           e = self._exceptions.pop(0)
265           raise e[0], e[1], e[2]
266         if self._outputs:
267           # Remember the result to yield it outside of the lock.
268           result = self._outputs.pop(0)
269         else:
270           # No pending tasks -> all tasks are done.
271           if not self._pending_count:
272             return
273           # Some task is queued, wait for its result to appear.
274           # Use non-None timeout so that process reacts to Ctrl+C and other
275           # signals, see http://bugs.python.org/issue8844.
276           self._outputs_exceptions_cond.wait(timeout=0.1)
277           continue
278       yield result
279
280   def close(self):
281     """Closes all the threads."""
282     # Ensure no new threads can be started, self._workers is effectively
283     # a constant after that and can be accessed outside the lock.
284     with self._lock:
285       if self._is_closed:
286         raise ThreadPoolClosed('Can not close already closed ThreadPool')
287       self._is_closed = True
288     for _ in range(len(self._workers)):
289       # Enqueueing None causes the worker to stop.
290       self.tasks.put(None)
291     for t in self._workers:
292       t.join()
293     logging.debug(
294       'Thread pool \'%s\' closed: spawned %d threads total',
295       self._prefix, len(self._workers))
296
297   def abort(self):
298     """Empties the queue.
299
300     To be used when the pool should stop early, like when Ctrl-C was detected.
301
302     Returns:
303       Number of tasks cancelled.
304     """
305     index = 0
306     while True:
307       try:
308         self.tasks.get_nowait()
309         self.tasks.task_done()
310         index += 1
311       except Queue.Empty:
312         return index
313
314   def _on_iter_results_step(self):
315     pass
316
317   def __enter__(self):
318     """Enables 'with' statement."""
319     return self
320
321   def __exit__(self, _exc_type, _exc_value, _traceback):
322     """Enables 'with' statement."""
323     self.close()
324
325
326 class AutoRetryThreadPool(ThreadPool):
327   """Automatically retries enqueued operations on exception."""
328   # See also PRIORITY_* module-level constants.
329   INTERNAL_PRIORITY_BITS = (1<<8) - 1
330
331   def __init__(self, exceptions, retries, *args, **kwargs):
332     """
333     Arguments:
334       exceptions: list of exception classes that can be retried on.
335       retries: maximum number of retries to do.
336     """
337     assert exceptions and all(issubclass(e, Exception) for e in exceptions), (
338         exceptions)
339     assert 1 <= retries <= self.INTERNAL_PRIORITY_BITS
340     super(AutoRetryThreadPool, self).__init__(*args, **kwargs)
341     self._swallowed_exceptions = tuple(exceptions)
342     self._retries = retries
343
344   def add_task(self, priority, func, *args, **kwargs):
345     """Tasks added must not use the lower priority bits since they are reserved
346     for retries.
347     """
348     assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
349     return super(AutoRetryThreadPool, self).add_task(
350         priority,
351         self._task_executer,
352         priority,
353         None,
354         func,
355         *args,
356         **kwargs)
357
358   def add_task_with_channel(self, channel, priority, func, *args, **kwargs):
359     """Tasks added must not use the lower priority bits since they are reserved
360     for retries.
361     """
362     assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
363     return super(AutoRetryThreadPool, self).add_task(
364         priority,
365         self._task_executer,
366         priority,
367         channel,
368         func,
369         *args,
370         **kwargs)
371
372   def _task_executer(self, priority, channel, func, *args, **kwargs):
373     """Wraps the function and automatically retry on exceptions."""
374     try:
375       result = func(*args, **kwargs)
376       if channel is None:
377         return result
378       channel.send_result(result)
379     except self._swallowed_exceptions as e:
380       # Retry a few times, lowering the priority.
381       actual_retries = priority & self.INTERNAL_PRIORITY_BITS
382       if actual_retries < self._retries:
383         priority += 1
384         logging.debug(
385             'Swallowed exception \'%s\'. Retrying at lower priority %X',
386             e, priority)
387         super(AutoRetryThreadPool, self).add_task(
388             priority,
389             self._task_executer,
390             priority,
391             channel,
392             func,
393             *args,
394             **kwargs)
395         return
396       if channel is None:
397         raise
398       channel.send_exception()
399     except Exception:
400       if channel is None:
401         raise
402       channel.send_exception()
403
404
405 class IOAutoRetryThreadPool(AutoRetryThreadPool):
406   """Thread pool that automatically retries on IOError.
407
408   Supposed to be used for IO bound tasks, and thus default maximum number of
409   worker threads is independent of number of CPU cores.
410   """
411   # Initial and maximum number of worker threads.
412   INITIAL_WORKERS = 2
413   MAX_WORKERS = 16
414   RETRIES = 5
415
416   def __init__(self):
417     super(IOAutoRetryThreadPool, self).__init__(
418         [IOError],
419         self.RETRIES,
420         self.INITIAL_WORKERS,
421         self.MAX_WORKERS,
422         0,
423         'io')
424
425
426 class Progress(object):
427   """Prints progress and accepts updates thread-safely."""
428   def __init__(self, columns):
429     """Creates a Progress bar that will updates asynchronously from the worker
430     threads.
431
432     Arguments:
433       columns: list of tuple(name, initialvalue), defines both the number of
434                columns and their initial values.
435     """
436     assert all(
437         len(c) == 2 and isinstance(c[0], str) and isinstance(c[1], int)
438         for c in columns), columns
439     # Members to be used exclusively in the primary thread.
440     self.use_cr_only = True
441     self.unfinished_commands = set()
442     self.start = time.time()
443     self._last_printed_line = ''
444     self._columns = [c[1] for c in columns]
445     self._columns_lookup = dict((c[0], i) for i, c in enumerate(columns))
446     # Setting it to True forces a print on the first print_update() call.
447     self._value_changed = True
448
449     # To be used in all threads.
450     self._queued_updates = Queue.Queue()
451
452   def update_item(self, name, raw=False, **kwargs):
453     """Queue information to print out.
454
455     Arguments:
456       name: string to print out to describe something that was completed.
457       raw: if True, prints the data without the header.
458       raw: if True, prints the data without the header.
459       <kwargs>: argument name is a name of a column. it's value is the increment
460                 to the column, value is usually 0 or 1.
461     """
462     assert isinstance(name, str)
463     assert isinstance(raw, bool)
464     assert all(isinstance(v, int) for v in kwargs.itervalues())
465     args = [(self._columns_lookup[k], v) for k, v in kwargs.iteritems() if v]
466     self._queued_updates.put((name, raw, args))
467
468   def print_update(self):
469     """Prints the current status."""
470     # Flush all the logging output so it doesn't appear within this output.
471     for handler in logging.root.handlers:
472       handler.flush()
473
474     got_one = False
475     while True:
476       try:
477         name, raw, args = self._queued_updates.get_nowait()
478       except Queue.Empty:
479         break
480
481       for k, v in args:
482         self._columns[k] += v
483       self._value_changed = bool(args)
484       if not name:
485         # Even if raw=True, there's nothing to print.
486         continue
487
488       got_one = True
489       if raw:
490         # Prints the data as-is.
491         self._last_printed_line = ''
492         sys.stdout.write('\n%s\n' % name.strip('\n'))
493       else:
494         line, self._last_printed_line = self._gen_line(name)
495         sys.stdout.write(line)
496
497     if not got_one and self._value_changed:
498       # Make sure a line is printed in that case where statistics changes.
499       line, self._last_printed_line = self._gen_line('')
500       sys.stdout.write(line)
501       got_one = True
502     self._value_changed = False
503     if got_one:
504       # Ensure that all the output is flushed to prevent it from getting mixed
505       # with other output streams (like the logging streams).
506       sys.stdout.flush()
507
508     if self.unfinished_commands:
509       logging.debug('Waiting for the following commands to finish:\n%s',
510                     '\n'.join(self.unfinished_commands))
511
512   def _gen_line(self, name):
513     """Generates the line to be printed."""
514     next_line = ('[%s] %6.2fs %s') % (
515         self._render_columns(), time.time() - self.start, name)
516     # Fill it with whitespace only if self.use_cr_only is set.
517     prefix = ''
518     if self.use_cr_only and self._last_printed_line:
519       prefix = '\r'
520     if self.use_cr_only:
521       suffix = ' ' * max(0, len(self._last_printed_line) - len(next_line))
522     else:
523       suffix = '\n'
524     return '%s%s%s' % (prefix, next_line, suffix), next_line
525
526   def _render_columns(self):
527     """Renders the columns."""
528     columns_as_str = map(str, self._columns)
529     max_len = max(map(len, columns_as_str))
530     return '/'.join(i.rjust(max_len) for i in columns_as_str)
531
532
533 class QueueWithProgress(Queue.PriorityQueue):
534   """Implements progress support in join()."""
535   def __init__(self, progress, *args, **kwargs):
536     Queue.PriorityQueue.__init__(self, *args, **kwargs)
537     self.progress = progress
538
539   def task_done(self):
540     """Contrary to Queue.task_done(), it wakes self.all_tasks_done at each task
541     done.
542     """
543     with self.all_tasks_done:
544       try:
545         unfinished = self.unfinished_tasks - 1
546         if unfinished < 0:
547           raise ValueError('task_done() called too many times')
548         self.unfinished_tasks = unfinished
549         # This is less efficient, because we want the Progress to be updated.
550         self.all_tasks_done.notify_all()
551       except Exception as e:
552         logging.exception('task_done threw an exception.\n%s', e)
553
554   def wake_up(self):
555     """Wakes up all_tasks_done.
556
557     Unlike task_done(), do not substract one from self.unfinished_tasks.
558     """
559     # TODO(maruel): This is highly inefficient, since the listener is awaken
560     # twice; once per output, once per task. There should be no relationship
561     # between the number of output and the number of input task.
562     with self.all_tasks_done:
563       self.all_tasks_done.notify_all()
564
565   def join(self):
566     """Calls print_update() whenever possible."""
567     self.progress.print_update()
568     with self.all_tasks_done:
569       while self.unfinished_tasks:
570         self.progress.print_update()
571         # Use a short wait timeout so updates are printed in a timely manner.
572         # TODO(maruel): Find a way so Progress.queue and self.all_tasks_done
573         # share the same underlying event so no polling is necessary.
574         self.all_tasks_done.wait(0.1)
575       self.progress.print_update()
576
577
578 class ThreadPoolWithProgress(ThreadPool):
579   QUEUE_CLASS = QueueWithProgress
580
581   def __init__(self, progress, *args, **kwargs):
582     self.QUEUE_CLASS = functools.partial(self.QUEUE_CLASS, progress)
583     super(ThreadPoolWithProgress, self).__init__(*args, **kwargs)
584
585   def _output_append(self, out):
586     """Also wakes up the listener on new completed test_case."""
587     super(ThreadPoolWithProgress, self)._output_append(out)
588     self.tasks.wake_up()
589
590   def _on_iter_results_step(self):
591     self.tasks.progress.print_update()
592
593
594 class DeadlockDetector(object):
595   """Context manager that can detect deadlocks.
596
597   It will dump stack frames of all running threads if its 'ping' method isn't
598   called in time.
599
600   Usage:
601     with DeadlockDetector(timeout=60) as detector:
602       for item in some_work():
603         ...
604         detector.ping()
605         ...
606
607   Arguments:
608     timeout - maximum allowed time between calls to 'ping'.
609   """
610
611   def __init__(self, timeout):
612     self.timeout = timeout
613     self._thread = None
614     # Thread stop condition. Also lock for shared variables below.
615     self._stop_cv = threading.Condition()
616     self._stop_flag = False
617     # Time when 'ping' was called last time.
618     self._last_ping = None
619     # True if pings are coming on time.
620     self._alive = True
621
622   def __enter__(self):
623     """Starts internal watcher thread."""
624     assert self._thread is None
625     self.ping()
626     self._thread = threading.Thread(name='deadlock-detector', target=self._run)
627     self._thread.daemon = True
628     self._thread.start()
629     return self
630
631   def __exit__(self, *_args):
632     """Stops internal watcher thread."""
633     assert self._thread is not None
634     with self._stop_cv:
635       self._stop_flag = True
636       self._stop_cv.notify()
637     self._thread.join()
638     self._thread = None
639     self._stop_flag = False
640
641   def ping(self):
642     """Notify detector that main thread is still running.
643
644     Should be called periodically to inform the detector that everything is
645     running as it should.
646     """
647     with self._stop_cv:
648       self._last_ping = time.time()
649       self._alive = True
650
651   def _run(self):
652     """Loop that watches for pings and dumps threads state if ping is late."""
653     with self._stop_cv:
654       while not self._stop_flag:
655         # Skipped deadline? Dump threads and switch to 'not alive' state.
656         if self._alive and time.time() > self._last_ping + self.timeout:
657           self.dump_threads(time.time() - self._last_ping, True)
658           self._alive = False
659
660         # Pings are on time?
661         if self._alive:
662           # Wait until the moment we need to dump stack traces.
663           # Most probably some other thread will call 'ping' to move deadline
664           # further in time. We don't bother to wake up after each 'ping',
665           # only right before initial expected deadline.
666           self._stop_cv.wait(self._last_ping + self.timeout - time.time())
667         else:
668           # Skipped some pings previously. Just periodically silently check
669           # for new pings with some arbitrary frequency.
670           self._stop_cv.wait(self.timeout * 0.1)
671
672   @staticmethod
673   def dump_threads(timeout=None, skip_current_thread=False):
674     """Dumps stack frames of all running threads."""
675     all_threads = threading.enumerate()
676     current_thread_id = threading.current_thread().ident
677
678     # Collect tracebacks: thread name -> traceback string.
679     tracebacks = {}
680
681     # pylint: disable=W0212
682     for thread_id, frame in sys._current_frames().iteritems():
683       # Don't dump deadlock detector's own thread, it's boring.
684       if thread_id == current_thread_id and skip_current_thread:
685         continue
686
687       # Try to get more informative symbolic thread name.
688       name = 'untitled'
689       for thread in all_threads:
690         if thread.ident == thread_id:
691           name = thread.name
692           break
693       name += ' #%d' % (thread_id,)
694       tracebacks[name] = ''.join(traceback.format_stack(frame))
695
696     # Function to print a message. Makes it easier to change output destination.
697     def output(msg):
698       logging.warning(msg.rstrip())
699
700     # Print tracebacks, sorting them by thread name. That way a thread pool's
701     # threads will be printed as one group.
702     output('=============== Potential deadlock detected ===============')
703     if timeout is not None:
704       output('No pings in last %d sec.' % (timeout,))
705     output('Dumping stack frames for all threads:')
706     for name in sorted(tracebacks):
707       output('Traceback for \'%s\':\n%s' % (name, tracebacks[name]))
708     output('===========================================================')
709
710
711 class TaskChannel(object):
712   """Queue of results of async task execution."""
713
714   class Timeout(Exception):
715     """Raised by 'pull' in case of timeout."""
716
717   _ITEM_RESULT = 0
718   _ITEM_EXCEPTION = 1
719
720   def __init__(self):
721     self._queue = Queue.Queue()
722
723   def send_result(self, result):
724     """Enqueues a result of task execution."""
725     self._queue.put((self._ITEM_RESULT, result))
726
727   def send_exception(self, exc_info=None):
728     """Enqueue an exception raised by a task.
729
730     Arguments:
731       exc_info: If given, should be 3-tuple returned by sys.exc_info(),
732                 default is current value of sys.exc_info(). Use default in
733                 'except' blocks to capture currently processed exception.
734     """
735     exc_info = exc_info or sys.exc_info()
736     assert isinstance(exc_info, tuple) and len(exc_info) == 3
737     # Transparently passing Timeout will break 'pull' contract, since a caller
738     # has no way to figure out that's an exception from the task and not from
739     # 'pull' itself. Transform Timeout into generic RuntimeError with
740     # explanation.
741     if isinstance(exc_info[1], TaskChannel.Timeout):
742       exc_info = (
743           RuntimeError,
744           RuntimeError('Task raised Timeout exception'),
745           exc_info[2])
746     self._queue.put((self._ITEM_EXCEPTION, exc_info))
747
748   def pull(self, timeout=None):
749     """Dequeues available result or exception.
750
751     Args:
752       timeout: if not None will block no longer than |timeout| seconds and will
753           raise TaskChannel.Timeout exception if no results are available.
754
755     Returns:
756       Whatever task pushes to the queue by calling 'send_result'.
757
758     Raises:
759       TaskChannel.Timeout: waiting longer than |timeout|.
760       Whatever exception task raises.
761     """
762     try:
763       item_type, value = self._queue.get(timeout=timeout)
764     except Queue.Empty:
765       raise TaskChannel.Timeout()
766     if item_type == self._ITEM_RESULT:
767       return value
768     if item_type == self._ITEM_EXCEPTION:
769       # 'value' is captured sys.exc_info() 3-tuple. Use extended raise syntax
770       # to preserve stack frame of original exception (that was raised in
771       # another thread).
772       assert isinstance(value, tuple) and len(value) == 3
773       raise value[0], value[1], value[2]
774     assert False, 'Impossible queue item type: %r' % item_type
775
776   def wrap_task(self, task):
777     """Decorator that makes a function push results into this channel."""
778     @functools.wraps(task)
779     def wrapped(*args, **kwargs):
780       try:
781         self.send_result(task(*args, **kwargs))
782       except Exception:
783         self.send_exception()
784     return wrapped
785
786
787 def num_processors():
788   """Returns the number of processors.
789
790   Python on OSX 10.6 raises a NotImplementedError exception.
791   """
792   try:
793     # Multiprocessing
794     import multiprocessing
795     return multiprocessing.cpu_count()
796   except:  # pylint: disable=W0702
797     try:
798       # Mac OS 10.6
799       return int(os.sysconf('SC_NPROCESSORS_ONLN'))  # pylint: disable=E1101
800     except:
801       # Some of the windows builders seem to get here.
802       return 4
803
804
805 def enum_processes_win():
806   """Returns all processes on the system that are accessible to this process.
807
808   Returns:
809     Win32_Process COM objects. See
810     http://msdn.microsoft.com/library/aa394372.aspx for more details.
811   """
812   import win32com.client  # pylint: disable=F0401
813   wmi_service = win32com.client.Dispatch('WbemScripting.SWbemLocator')
814   wbem = wmi_service.ConnectServer('.', 'root\\cimv2')
815   return [
816     proc for proc in wbem.ExecQuery('SELECT * FROM Win32_Process')
817     if proc.ExecutablePath
818   ]
819
820
821 def filter_processes_dir_win(processes, root_dir):
822   """Returns all processes which has their main executable located inside
823   root_dir.
824   """
825   root_dir = root_dir.lower()
826   return [
827     proc for proc in processes
828     if proc.ExecutablePath.lower().startswith(root_dir)
829   ]
830
831
832 def filter_processes_tree_win(processes):
833   """Returns all the processes under the current process."""
834   # Convert to dict.
835   processes = {p.ProcessId: p for p in processes}
836   root_pid = os.getpid()
837   out = {root_pid: processes[root_pid]}
838   while True:
839     found = set()
840     for pid in out:
841       found.update(
842           p.ProcessId for p in processes.itervalues()
843           if p.ParentProcessId == pid)
844     found -= set(out)
845     if not found:
846       break
847     out.update((p, processes[p]) for p in found)
848   return out.values()