Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / build / android / pylib / base / test_dispatcher.py
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Dispatches tests, either sharding or replicating them.
6
7 Performs the following steps:
8 * Create a test collection factory, using the given tests
9   - If sharding: test collection factory returns the same shared test collection
10     to all test runners
11   - If replciating: test collection factory returns a unique test collection to
12     each test runner, with the same set of tests in each.
13 * Create a test runner for each device.
14 * Run each test runner in its own thread, grabbing tests from the test
15   collection until there are no tests left.
16 """
17
18 import logging
19 import threading
20
21 from pylib import android_commands
22 from pylib import constants
23 from pylib.base import base_test_result
24 from pylib.utils import reraiser_thread
25 from pylib.utils import watchdog_timer
26
27
28 DEFAULT_TIMEOUT = 7 * 60  # seven minutes
29
30
31 class _ThreadSafeCounter(object):
32   """A threadsafe counter."""
33
34   def __init__(self):
35     self._lock = threading.Lock()
36     self._value = 0
37
38   def GetAndIncrement(self):
39     """Get the current value and increment it atomically.
40
41     Returns:
42       The value before incrementing.
43     """
44     with self._lock:
45       pre_increment = self._value
46       self._value += 1
47       return pre_increment
48
49
50 class _Test(object):
51   """Holds a test with additional metadata."""
52
53   def __init__(self, test, tries=0):
54     """Initializes the _Test object.
55
56     Args:
57       test: The test.
58       tries: Number of tries so far.
59     """
60     self.test = test
61     self.tries = tries
62
63
64 class _TestCollection(object):
65   """A threadsafe collection of tests.
66
67   Args:
68     tests: List of tests to put in the collection.
69   """
70
71   def __init__(self, tests=None):
72     if not tests:
73       tests = []
74     self._lock = threading.Lock()
75     self._tests = []
76     self._tests_in_progress = 0
77     # Used to signal that an item is avaliable or all items have been handled.
78     self._item_avaliable_or_all_done = threading.Event()
79     for t in tests:
80       self.add(t)
81
82   def _pop(self):
83     """Pop a test from the collection.
84
85     Waits until a test is avaliable or all tests have been handled.
86
87     Returns:
88       A test or None if all tests have been handled.
89     """
90     while True:
91       # Wait for a test to be avaliable or all tests to have been handled.
92       self._item_avaliable_or_all_done.wait()
93       with self._lock:
94         # Check which of the two conditions triggered the signal.
95         if self._tests_in_progress == 0:
96           return None
97         try:
98           return self._tests.pop(0)
99         except IndexError:
100           # Another thread beat us to the avaliable test, wait again.
101           self._item_avaliable_or_all_done.clear()
102
103   def add(self, test):
104     """Add an test to the collection.
105
106     Args:
107       test: A test to add.
108     """
109     with self._lock:
110       self._tests.append(test)
111       self._item_avaliable_or_all_done.set()
112       self._tests_in_progress += 1
113
114   def test_completed(self):
115     """Indicate that a test has been fully handled."""
116     with self._lock:
117       self._tests_in_progress -= 1
118       if self._tests_in_progress == 0:
119         # All tests have been handled, signal all waiting threads.
120         self._item_avaliable_or_all_done.set()
121
122   def __iter__(self):
123     """Iterate through tests in the collection until all have been handled."""
124     while True:
125       r = self._pop()
126       if r is None:
127         break
128       yield r
129
130   def __len__(self):
131     """Return the number of tests currently in the collection."""
132     return len(self._tests)
133
134
135 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
136                        num_retries, tag_results_with_device=False):
137   """Runs tests from the test_collection until empty using the given runner.
138
139   Adds TestRunResults objects to the out_results list and may add tests to the
140   out_retry list.
141
142   Args:
143     runner: A TestRunner object used to run the tests.
144     test_collection: A _TestCollection from which to get _Test objects to run.
145     out_results: A list to add TestRunResults to.
146     watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
147     num_retries: Number of retries for a test.
148     tag_results_with_device: If True, appends the name of the device on which
149         the test was run to the test name. Used when replicating to identify
150         which device ran each copy of the test, and to ensure each copy of the
151         test is recorded separately.
152   """
153
154   def TagTestRunResults(test_run_results):
155     """Tags all results with the last 4 digits of the device id.
156
157     Used when replicating tests to distinguish the same tests run on different
158     devices. We use a set to store test results, so the hash (generated from
159     name and tag) must be unique to be considered different results.
160     """
161     new_test_run_results = base_test_result.TestRunResults()
162     for test_result in test_run_results.GetAll():
163       test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName()))
164       new_test_run_results.AddResult(test_result)
165     return new_test_run_results
166
167   for test in test_collection:
168     watcher.Reset()
169     try:
170       if not android_commands.IsDeviceAttached(runner.device):
171         # Device is unresponsive, stop handling tests on this device.
172         msg = 'Device %s is unresponsive.' % runner.device
173         logging.warning(msg)
174         raise android_commands.errors.DeviceUnresponsiveError(msg)
175       result, retry = runner.RunTest(test.test)
176       if tag_results_with_device:
177         result = TagTestRunResults(result)
178       test.tries += 1
179       if retry and test.tries <= num_retries:
180         # Retry non-passing results, only record passing results.
181         pass_results = base_test_result.TestRunResults()
182         pass_results.AddResults(result.GetPass())
183         out_results.append(pass_results)
184         logging.warning('Will retry test, try #%s.' % test.tries)
185         test_collection.add(_Test(test=retry, tries=test.tries))
186       else:
187         # All tests passed or retry limit reached. Either way, record results.
188         out_results.append(result)
189     except:
190       # An unhandleable exception, ensure tests get run by another device and
191       # reraise this exception on the main thread.
192       test_collection.add(test)
193       raise
194     finally:
195       # Retries count as separate tasks so always mark the popped test as done.
196       test_collection.test_completed()
197
198
199 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
200   """Creates a test runner for each device and calls SetUp() in parallel.
201
202   Note: if a device is unresponsive the corresponding TestRunner will not be
203     added to out_runners.
204
205   Args:
206     runner_factory: Callable that takes a device and index and returns a
207       TestRunner object.
208     device: The device serial number to set up.
209     out_runners: List to add the successfully set up TestRunner object.
210     threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
211   """
212   try:
213     index = threadsafe_counter.GetAndIncrement()
214     logging.warning('Creating shard %s for device %s.', index, device)
215     runner = runner_factory(device, index)
216     runner.SetUp()
217     out_runners.append(runner)
218   except android_commands.errors.DeviceUnresponsiveError as e:
219     logging.warning('Failed to create shard for %s: [%s]', device, e)
220
221
222 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
223                  tag_results_with_device=False):
224   """Run all tests using the given TestRunners.
225
226   Args:
227     runners: A list of TestRunner objects.
228     test_collection_factory: A callable to generate a _TestCollection object for
229         each test runner.
230     num_retries: Number of retries for a test.
231     timeout: Watchdog timeout in seconds.
232     tag_results_with_device: If True, appends the name of the device on which
233         the test was run to the test name. Used when replicating to identify
234         which device ran each copy of the test, and to ensure each copy of the
235         test is recorded separately.
236
237   Returns:
238     A tuple of (TestRunResults object, exit code)
239   """
240   logging.warning('Running tests with %s test runners.' % (len(runners)))
241   results = []
242   exit_code = 0
243   run_results = base_test_result.TestRunResults()
244   watcher = watchdog_timer.WatchdogTimer(timeout)
245   test_collections = [test_collection_factory() for _ in runners]
246
247   threads = [
248       reraiser_thread.ReraiserThread(
249           _RunTestsFromQueue,
250           [r, tc, results, watcher, num_retries, tag_results_with_device],
251           name=r.device[-4:])
252       for r, tc in zip(runners, test_collections)]
253
254   workers = reraiser_thread.ReraiserThreadGroup(threads)
255   workers.StartAll()
256
257   # Catch DeviceUnresponsiveErrors and set a warning exit code
258   try:
259     workers.JoinAll(watcher)
260   except android_commands.errors.DeviceUnresponsiveError as e:
261     logging.error(e)
262     exit_code = constants.WARNING_EXIT_CODE
263
264   assert all([len(tc) == 0 for tc in test_collections]), (
265       'Some tests were not run, all devices are likely offline (ran %d tests)' %
266       len(run_results.GetAll()))
267
268   for r in results:
269     run_results.AddTestRunResults(r)
270   if not run_results.DidRunPass():
271     exit_code = constants.ERROR_EXIT_CODE
272   return (run_results, exit_code)
273
274
275 def _CreateRunners(runner_factory, devices, timeout=None):
276   """Creates a test runner for each device and calls SetUp() in parallel.
277
278   Note: if a device is unresponsive the corresponding TestRunner will not be
279     included in the returned list.
280
281   Args:
282     runner_factory: Callable that takes a device and index and returns a
283       TestRunner object.
284     devices: List of device serial numbers as strings.
285     timeout: Watchdog timeout in seconds, defaults to the default timeout.
286
287   Returns:
288     A list of TestRunner objects.
289   """
290   logging.warning('Creating %s test runners.' % len(devices))
291   runners = []
292   counter = _ThreadSafeCounter()
293   threads = reraiser_thread.ReraiserThreadGroup(
294       [reraiser_thread.ReraiserThread(_SetUp,
295                                       [runner_factory, d, runners, counter],
296                                       name=d[-4:])
297        for d in devices])
298   threads.StartAll()
299   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
300   return runners
301
302
303 def _TearDownRunners(runners, timeout=None):
304   """Calls TearDown() for each test runner in parallel.
305
306   Args:
307     runners: A list of TestRunner objects.
308     timeout: Watchdog timeout in seconds, defaults to the default timeout.
309   """
310   threads = reraiser_thread.ReraiserThreadGroup(
311       [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
312        for r in runners])
313   threads.StartAll()
314   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
315
316
317 def RunTests(tests, runner_factory, devices, shard=True,
318              test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
319              num_retries=2):
320   """Run all tests on attached devices, retrying tests that don't pass.
321
322   Args:
323     tests: List of tests to run.
324     runner_factory: Callable that takes a device and index and returns a
325         TestRunner object.
326     devices: List of attached devices.
327     shard: True if we should shard, False if we should replicate tests.
328       - Sharding tests will distribute tests across all test runners through a
329         shared test collection.
330       - Replicating tests will copy all tests to each test runner through a
331         unique test collection for each test runner.
332     test_timeout: Watchdog timeout in seconds for running tests.
333     setup_timeout: Watchdog timeout in seconds for creating and cleaning up
334         test runners.
335     num_retries: Number of retries for a test.
336
337   Returns:
338     A tuple of (base_test_result.TestRunResults object, exit code).
339   """
340   if not tests:
341     logging.critical('No tests to run.')
342     return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
343
344   if shard:
345     # Generate a shared _TestCollection object for all test runners, so they
346     # draw from a common pool of tests.
347     shared_test_collection = _TestCollection([_Test(t) for t in tests])
348     test_collection_factory = lambda: shared_test_collection
349     tag_results_with_device = False
350     log_string = 'sharded across devices'
351   else:
352     # Generate a unique _TestCollection object for each test runner, but use
353     # the same set of tests.
354     test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
355     tag_results_with_device = True
356     log_string = 'replicated on each device'
357
358   logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
359   runners = _CreateRunners(runner_factory, devices, setup_timeout)
360   try:
361     return _RunAllTests(runners, test_collection_factory,
362                         num_retries, test_timeout, tag_results_with_device)
363   finally:
364     try:
365       _TearDownRunners(runners, setup_timeout)
366     except android_commands.errors.DeviceUnresponsiveError as e:
367       logging.warning('Device unresponsive during TearDown: [%s]', e)