Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / build / android / pylib / base / test_dispatcher.py
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Dispatches tests, either sharding or replicating them.
6
7 Performs the following steps:
8 * Create a test collection factory, using the given tests
9   - If sharding: test collection factory returns the same shared test collection
10     to all test runners
11   - If replciating: test collection factory returns a unique test collection to
12     each test runner, with the same set of tests in each.
13 * Create a test runner for each device.
14 * Run each test runner in its own thread, grabbing tests from the test
15   collection until there are no tests left.
16 """
17
18 import logging
19 import threading
20
21 from pylib import android_commands
22 from pylib import constants
23 from pylib.base import base_test_result
24 from pylib.device import adb_wrapper
25 from pylib.utils import reraiser_thread
26 from pylib.utils import watchdog_timer
27
28
29 DEFAULT_TIMEOUT = 7 * 60  # seven minutes
30
31
32 class _ThreadSafeCounter(object):
33   """A threadsafe counter."""
34
35   def __init__(self):
36     self._lock = threading.Lock()
37     self._value = 0
38
39   def GetAndIncrement(self):
40     """Get the current value and increment it atomically.
41
42     Returns:
43       The value before incrementing.
44     """
45     with self._lock:
46       pre_increment = self._value
47       self._value += 1
48       return pre_increment
49
50
51 class _Test(object):
52   """Holds a test with additional metadata."""
53
54   def __init__(self, test, tries=0):
55     """Initializes the _Test object.
56
57     Args:
58       test: The test.
59       tries: Number of tries so far.
60     """
61     self.test = test
62     self.tries = tries
63
64
65 class _TestCollection(object):
66   """A threadsafe collection of tests.
67
68   Args:
69     tests: List of tests to put in the collection.
70   """
71
72   def __init__(self, tests=None):
73     if not tests:
74       tests = []
75     self._lock = threading.Lock()
76     self._tests = []
77     self._tests_in_progress = 0
78     # Used to signal that an item is avaliable or all items have been handled.
79     self._item_avaliable_or_all_done = threading.Event()
80     for t in tests:
81       self.add(t)
82
83   def _pop(self):
84     """Pop a test from the collection.
85
86     Waits until a test is avaliable or all tests have been handled.
87
88     Returns:
89       A test or None if all tests have been handled.
90     """
91     while True:
92       # Wait for a test to be avaliable or all tests to have been handled.
93       self._item_avaliable_or_all_done.wait()
94       with self._lock:
95         # Check which of the two conditions triggered the signal.
96         if self._tests_in_progress == 0:
97           return None
98         try:
99           return self._tests.pop(0)
100         except IndexError:
101           # Another thread beat us to the avaliable test, wait again.
102           self._item_avaliable_or_all_done.clear()
103
104   def add(self, test):
105     """Add an test to the collection.
106
107     Args:
108       test: A test to add.
109     """
110     with self._lock:
111       self._tests.append(test)
112       self._item_avaliable_or_all_done.set()
113       self._tests_in_progress += 1
114
115   def test_completed(self):
116     """Indicate that a test has been fully handled."""
117     with self._lock:
118       self._tests_in_progress -= 1
119       if self._tests_in_progress == 0:
120         # All tests have been handled, signal all waiting threads.
121         self._item_avaliable_or_all_done.set()
122
123   def __iter__(self):
124     """Iterate through tests in the collection until all have been handled."""
125     while True:
126       r = self._pop()
127       if r is None:
128         break
129       yield r
130
131   def __len__(self):
132     """Return the number of tests currently in the collection."""
133     return len(self._tests)
134
135
136 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
137                        num_retries, tag_results_with_device=False):
138   """Runs tests from the test_collection until empty using the given runner.
139
140   Adds TestRunResults objects to the out_results list and may add tests to the
141   out_retry list.
142
143   Args:
144     runner: A TestRunner object used to run the tests.
145     test_collection: A _TestCollection from which to get _Test objects to run.
146     out_results: A list to add TestRunResults to.
147     watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
148     num_retries: Number of retries for a test.
149     tag_results_with_device: If True, appends the name of the device on which
150         the test was run to the test name. Used when replicating to identify
151         which device ran each copy of the test, and to ensure each copy of the
152         test is recorded separately.
153   """
154
155   def TagTestRunResults(test_run_results):
156     """Tags all results with the last 4 digits of the device id.
157
158     Used when replicating tests to distinguish the same tests run on different
159     devices. We use a set to store test results, so the hash (generated from
160     name and tag) must be unique to be considered different results.
161     """
162     new_test_run_results = base_test_result.TestRunResults()
163     for test_result in test_run_results.GetAll():
164       test_result.SetName('%s_%s' % (runner.device_serial[-4:],
165                                      test_result.GetName()))
166       new_test_run_results.AddResult(test_result)
167     return new_test_run_results
168
169   for test in test_collection:
170     watcher.Reset()
171     try:
172       if runner.device_serial not in android_commands.GetAttachedDevices():
173         # Device is unresponsive, stop handling tests on this device.
174         msg = 'Device %s is unresponsive.' % runner.device_serial
175         logging.warning(msg)
176         raise adb_wrapper.DeviceUnreachableError(msg)
177       result, retry = runner.RunTest(test.test)
178       if tag_results_with_device:
179         result = TagTestRunResults(result)
180       test.tries += 1
181       if retry and test.tries <= num_retries:
182         # Retry non-passing results, only record passing results.
183         pass_results = base_test_result.TestRunResults()
184         pass_results.AddResults(result.GetPass())
185         out_results.append(pass_results)
186         logging.warning('Will retry test, try #%s.' % test.tries)
187         test_collection.add(_Test(test=retry, tries=test.tries))
188       else:
189         # All tests passed or retry limit reached. Either way, record results.
190         out_results.append(result)
191     except:
192       # An unhandleable exception, ensure tests get run by another device and
193       # reraise this exception on the main thread.
194       test_collection.add(test)
195       raise
196     finally:
197       # Retries count as separate tasks so always mark the popped test as done.
198       test_collection.test_completed()
199
200
201 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
202   """Creates a test runner for each device and calls SetUp() in parallel.
203
204   Note: if a device is unresponsive the corresponding TestRunner will not be
205     added to out_runners.
206
207   Args:
208     runner_factory: Callable that takes a device and index and returns a
209       TestRunner object.
210     device: The device serial number to set up.
211     out_runners: List to add the successfully set up TestRunner object.
212     threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
213   """
214   try:
215     index = threadsafe_counter.GetAndIncrement()
216     logging.warning('Creating shard %s for device %s.', index, device)
217     runner = runner_factory(device, index)
218     runner.SetUp()
219     out_runners.append(runner)
220   except (adb_wrapper.DeviceUnreachableError,
221           # TODO(jbudorick) Remove this once the underlying implementations
222           #                 for the above are switched or wrapped.
223           android_commands.errors.DeviceUnresponsiveError) as e:
224     logging.warning('Failed to create shard for %s: [%s]', device, e)
225
226
227 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
228                  tag_results_with_device=False):
229   """Run all tests using the given TestRunners.
230
231   Args:
232     runners: A list of TestRunner objects.
233     test_collection_factory: A callable to generate a _TestCollection object for
234         each test runner.
235     num_retries: Number of retries for a test.
236     timeout: Watchdog timeout in seconds.
237     tag_results_with_device: If True, appends the name of the device on which
238         the test was run to the test name. Used when replicating to identify
239         which device ran each copy of the test, and to ensure each copy of the
240         test is recorded separately.
241
242   Returns:
243     A tuple of (TestRunResults object, exit code)
244   """
245   logging.warning('Running tests with %s test runners.' % (len(runners)))
246   results = []
247   exit_code = 0
248   run_results = base_test_result.TestRunResults()
249   watcher = watchdog_timer.WatchdogTimer(timeout)
250   test_collections = [test_collection_factory() for _ in runners]
251
252   threads = [
253       reraiser_thread.ReraiserThread(
254           _RunTestsFromQueue,
255           [r, tc, results, watcher, num_retries, tag_results_with_device],
256           name=r.device_serial[-4:])
257       for r, tc in zip(runners, test_collections)]
258
259   workers = reraiser_thread.ReraiserThreadGroup(threads)
260   workers.StartAll()
261
262   # Catch DeviceUnreachableErrors and set a warning exit code
263   try:
264     workers.JoinAll(watcher)
265   except (adb_wrapper.DeviceUnreachableError,
266           # TODO(jbudorick) Remove this once the underlying implementations
267           #                 for the above are switched or wrapped.
268           android_commands.errors.DeviceUnresponsiveError) as e:
269     logging.error(e)
270     exit_code = constants.WARNING_EXIT_CODE
271
272   assert all([len(tc) == 0 for tc in test_collections]), (
273       'Some tests were not run, all devices are likely offline (ran %d tests)' %
274       len(run_results.GetAll()))
275
276   for r in results:
277     run_results.AddTestRunResults(r)
278   if not run_results.DidRunPass():
279     exit_code = constants.ERROR_EXIT_CODE
280   return (run_results, exit_code)
281
282
283 def _CreateRunners(runner_factory, devices, timeout=None):
284   """Creates a test runner for each device and calls SetUp() in parallel.
285
286   Note: if a device is unresponsive the corresponding TestRunner will not be
287     included in the returned list.
288
289   Args:
290     runner_factory: Callable that takes a device and index and returns a
291       TestRunner object.
292     devices: List of device serial numbers as strings.
293     timeout: Watchdog timeout in seconds, defaults to the default timeout.
294
295   Returns:
296     A list of TestRunner objects.
297   """
298   logging.warning('Creating %s test runners.' % len(devices))
299   runners = []
300   counter = _ThreadSafeCounter()
301   threads = reraiser_thread.ReraiserThreadGroup(
302       [reraiser_thread.ReraiserThread(_SetUp,
303                                       [runner_factory, d, runners, counter],
304                                       name=d[-4:])
305        for d in devices])
306   threads.StartAll()
307   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
308   return runners
309
310
311 def _TearDownRunners(runners, timeout=None):
312   """Calls TearDown() for each test runner in parallel.
313
314   Args:
315     runners: A list of TestRunner objects.
316     timeout: Watchdog timeout in seconds, defaults to the default timeout.
317   """
318   threads = reraiser_thread.ReraiserThreadGroup(
319       [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:])
320        for r in runners])
321   threads.StartAll()
322   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
323
324
325 def RunTests(tests, runner_factory, devices, shard=True,
326              test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
327              num_retries=2):
328   """Run all tests on attached devices, retrying tests that don't pass.
329
330   Args:
331     tests: List of tests to run.
332     runner_factory: Callable that takes a device and index and returns a
333         TestRunner object.
334     devices: List of attached devices.
335     shard: True if we should shard, False if we should replicate tests.
336       - Sharding tests will distribute tests across all test runners through a
337         shared test collection.
338       - Replicating tests will copy all tests to each test runner through a
339         unique test collection for each test runner.
340     test_timeout: Watchdog timeout in seconds for running tests.
341     setup_timeout: Watchdog timeout in seconds for creating and cleaning up
342         test runners.
343     num_retries: Number of retries for a test.
344
345   Returns:
346     A tuple of (base_test_result.TestRunResults object, exit code).
347   """
348   if not tests:
349     logging.critical('No tests to run.')
350     return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
351
352   if shard:
353     # Generate a shared _TestCollection object for all test runners, so they
354     # draw from a common pool of tests.
355     shared_test_collection = _TestCollection([_Test(t) for t in tests])
356     test_collection_factory = lambda: shared_test_collection
357     tag_results_with_device = False
358     log_string = 'sharded across devices'
359   else:
360     # Generate a unique _TestCollection object for each test runner, but use
361     # the same set of tests.
362     test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
363     tag_results_with_device = True
364     log_string = 'replicated on each device'
365
366   logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
367   runners = _CreateRunners(runner_factory, devices, setup_timeout)
368   try:
369     return _RunAllTests(runners, test_collection_factory,
370                         num_retries, test_timeout, tag_results_with_device)
371   finally:
372     try:
373       _TearDownRunners(runners, setup_timeout)
374     except (adb_wrapper.DeviceUnreachableError,
375             # TODO(jbudorick) Remove this once the underlying implementations
376             #                 for the above are switched or wrapped.
377             android_commands.errors.DeviceUnresponsiveError) as e:
378       logging.warning('Device unresponsive during TearDown: [%s]', e)