1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Dispatches tests, either sharding or replicating them.
7 Performs the following steps:
8 * Create a test collection factory, using the given tests
9 - If sharding: test collection factory returns the same shared test collection
11 - If replciating: test collection factory returns a unique test collection to
12 each test runner, with the same set of tests in each.
13 * Create a test runner for each device.
14 * Run each test runner in its own thread, grabbing tests from the test
15 collection until there are no tests left.
21 from pylib import android_commands
22 from pylib import constants
23 from pylib.base import base_test_result
24 from pylib.utils import reraiser_thread
25 from pylib.utils import watchdog_timer
28 DEFAULT_TIMEOUT = 7 * 60 # seven minutes
31 class _ThreadSafeCounter(object):
32 """A threadsafe counter."""
35 self._lock = threading.Lock()
38 def GetAndIncrement(self):
39 """Get the current value and increment it atomically.
42 The value before incrementing.
45 pre_increment = self._value
51 """Holds a test with additional metadata."""
53 def __init__(self, test, tries=0):
54 """Initializes the _Test object.
58 tries: Number of tries so far.
64 class _TestCollection(object):
65 """A threadsafe collection of tests.
68 tests: List of tests to put in the collection.
71 def __init__(self, tests=None):
74 self._lock = threading.Lock()
76 self._tests_in_progress = 0
77 # Used to signal that an item is avaliable or all items have been handled.
78 self._item_avaliable_or_all_done = threading.Event()
83 """Pop a test from the collection.
85 Waits until a test is avaliable or all tests have been handled.
88 A test or None if all tests have been handled.
91 # Wait for a test to be avaliable or all tests to have been handled.
92 self._item_avaliable_or_all_done.wait()
94 # Check which of the two conditions triggered the signal.
95 if self._tests_in_progress == 0:
98 return self._tests.pop(0)
100 # Another thread beat us to the avaliable test, wait again.
101 self._item_avaliable_or_all_done.clear()
104 """Add an test to the collection.
110 self._tests.append(test)
111 self._item_avaliable_or_all_done.set()
112 self._tests_in_progress += 1
114 def test_completed(self):
115 """Indicate that a test has been fully handled."""
117 self._tests_in_progress -= 1
118 if self._tests_in_progress == 0:
119 # All tests have been handled, signal all waiting threads.
120 self._item_avaliable_or_all_done.set()
123 """Iterate through tests in the collection until all have been handled."""
131 """Return the number of tests currently in the collection."""
132 return len(self._tests)
135 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
136 num_retries, tag_results_with_device=False):
137 """Runs tests from the test_collection until empty using the given runner.
139 Adds TestRunResults objects to the out_results list and may add tests to the
143 runner: A TestRunner object used to run the tests.
144 test_collection: A _TestCollection from which to get _Test objects to run.
145 out_results: A list to add TestRunResults to.
146 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
147 num_retries: Number of retries for a test.
148 tag_results_with_device: If True, appends the name of the device on which
149 the test was run to the test name. Used when replicating to identify
150 which device ran each copy of the test, and to ensure each copy of the
151 test is recorded separately.
154 def TagTestRunResults(test_run_results):
155 """Tags all results with the last 4 digits of the device id.
157 Used when replicating tests to distinguish the same tests run on different
158 devices. We use a set to store test results, so the hash (generated from
159 name and tag) must be unique to be considered different results.
161 new_test_run_results = base_test_result.TestRunResults()
162 for test_result in test_run_results.GetAll():
163 test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName()))
164 new_test_run_results.AddResult(test_result)
165 return new_test_run_results
167 for test in test_collection:
170 if not android_commands.IsDeviceAttached(runner.device):
171 # Device is unresponsive, stop handling tests on this device.
172 msg = 'Device %s is unresponsive.' % runner.device
174 raise android_commands.errors.DeviceUnresponsiveError(msg)
175 result, retry = runner.RunTest(test.test)
176 if tag_results_with_device:
177 result = TagTestRunResults(result)
179 if retry and test.tries <= num_retries:
180 # Retry non-passing results, only record passing results.
181 pass_results = base_test_result.TestRunResults()
182 pass_results.AddResults(result.GetPass())
183 out_results.append(pass_results)
184 logging.warning('Will retry test, try #%s.' % test.tries)
185 test_collection.add(_Test(test=retry, tries=test.tries))
187 # All tests passed or retry limit reached. Either way, record results.
188 out_results.append(result)
190 # An unhandleable exception, ensure tests get run by another device and
191 # reraise this exception on the main thread.
192 test_collection.add(test)
195 # Retries count as separate tasks so always mark the popped test as done.
196 test_collection.test_completed()
199 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
200 """Creates a test runner for each device and calls SetUp() in parallel.
202 Note: if a device is unresponsive the corresponding TestRunner will not be
203 added to out_runners.
206 runner_factory: Callable that takes a device and index and returns a
208 device: The device serial number to set up.
209 out_runners: List to add the successfully set up TestRunner object.
210 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
213 index = threadsafe_counter.GetAndIncrement()
214 logging.warning('Creating shard %s for device %s.', index, device)
215 runner = runner_factory(device, index)
217 out_runners.append(runner)
218 except android_commands.errors.DeviceUnresponsiveError as e:
219 logging.warning('Failed to create shard for %s: [%s]', device, e)
222 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
223 tag_results_with_device=False):
224 """Run all tests using the given TestRunners.
227 runners: A list of TestRunner objects.
228 test_collection_factory: A callable to generate a _TestCollection object for
230 num_retries: Number of retries for a test.
231 timeout: Watchdog timeout in seconds.
232 tag_results_with_device: If True, appends the name of the device on which
233 the test was run to the test name. Used when replicating to identify
234 which device ran each copy of the test, and to ensure each copy of the
235 test is recorded separately.
238 A tuple of (TestRunResults object, exit code)
240 logging.warning('Running tests with %s test runners.' % (len(runners)))
243 run_results = base_test_result.TestRunResults()
244 watcher = watchdog_timer.WatchdogTimer(timeout)
245 test_collections = [test_collection_factory() for _ in runners]
248 reraiser_thread.ReraiserThread(
250 [r, tc, results, watcher, num_retries, tag_results_with_device],
252 for r, tc in zip(runners, test_collections)]
254 workers = reraiser_thread.ReraiserThreadGroup(threads)
257 # Catch DeviceUnresponsiveErrors and set a warning exit code
259 workers.JoinAll(watcher)
260 except android_commands.errors.DeviceUnresponsiveError as e:
262 exit_code = constants.WARNING_EXIT_CODE
264 assert all([len(tc) == 0 for tc in test_collections]), (
265 'Some tests were not run, all devices are likely offline (ran %d tests)' %
266 len(run_results.GetAll()))
269 run_results.AddTestRunResults(r)
270 if not run_results.DidRunPass():
271 exit_code = constants.ERROR_EXIT_CODE
272 return (run_results, exit_code)
275 def _CreateRunners(runner_factory, devices, timeout=None):
276 """Creates a test runner for each device and calls SetUp() in parallel.
278 Note: if a device is unresponsive the corresponding TestRunner will not be
279 included in the returned list.
282 runner_factory: Callable that takes a device and index and returns a
284 devices: List of device serial numbers as strings.
285 timeout: Watchdog timeout in seconds, defaults to the default timeout.
288 A list of TestRunner objects.
290 logging.warning('Creating %s test runners.' % len(devices))
292 counter = _ThreadSafeCounter()
293 threads = reraiser_thread.ReraiserThreadGroup(
294 [reraiser_thread.ReraiserThread(_SetUp,
295 [runner_factory, d, runners, counter],
299 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
303 def _TearDownRunners(runners, timeout=None):
304 """Calls TearDown() for each test runner in parallel.
307 runners: A list of TestRunner objects.
308 timeout: Watchdog timeout in seconds, defaults to the default timeout.
310 threads = reraiser_thread.ReraiserThreadGroup(
311 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
314 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
317 def RunTests(tests, runner_factory, devices, shard=True,
318 test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
320 """Run all tests on attached devices, retrying tests that don't pass.
323 tests: List of tests to run.
324 runner_factory: Callable that takes a device and index and returns a
326 devices: List of attached devices.
327 shard: True if we should shard, False if we should replicate tests.
328 - Sharding tests will distribute tests across all test runners through a
329 shared test collection.
330 - Replicating tests will copy all tests to each test runner through a
331 unique test collection for each test runner.
332 test_timeout: Watchdog timeout in seconds for running tests.
333 setup_timeout: Watchdog timeout in seconds for creating and cleaning up
335 num_retries: Number of retries for a test.
338 A tuple of (base_test_result.TestRunResults object, exit code).
341 logging.critical('No tests to run.')
342 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
345 # Generate a shared _TestCollection object for all test runners, so they
346 # draw from a common pool of tests.
347 shared_test_collection = _TestCollection([_Test(t) for t in tests])
348 test_collection_factory = lambda: shared_test_collection
349 tag_results_with_device = False
350 log_string = 'sharded across devices'
352 # Generate a unique _TestCollection object for each test runner, but use
353 # the same set of tests.
354 test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
355 tag_results_with_device = True
356 log_string = 'replicated on each device'
358 logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
359 runners = _CreateRunners(runner_factory, devices, setup_timeout)
361 return _RunAllTests(runners, test_collection_factory,
362 num_retries, test_timeout, tag_results_with_device)
365 _TearDownRunners(runners, setup_timeout)
366 except android_commands.errors.DeviceUnresponsiveError as e:
367 logging.warning('Device unresponsive during TearDown: [%s]', e)