1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Dispatches tests, either sharding or replicating them.
7 Performs the following steps:
8 * Create a test collection factory, using the given tests
9 - If sharding: test collection factory returns the same shared test collection
11 - If replciating: test collection factory returns a unique test collection to
12 each test runner, with the same set of tests in each.
13 * Create a test runner for each device.
14 * Run each test runner in its own thread, grabbing tests from the test
15 collection until there are no tests left.
21 from pylib import android_commands
22 from pylib import constants
23 from pylib.base import base_test_result
24 from pylib.device import adb_wrapper
25 from pylib.utils import reraiser_thread
26 from pylib.utils import watchdog_timer
29 DEFAULT_TIMEOUT = 7 * 60 # seven minutes
32 class _ThreadSafeCounter(object):
33 """A threadsafe counter."""
36 self._lock = threading.Lock()
39 def GetAndIncrement(self):
40 """Get the current value and increment it atomically.
43 The value before incrementing.
46 pre_increment = self._value
52 """Holds a test with additional metadata."""
54 def __init__(self, test, tries=0):
55 """Initializes the _Test object.
59 tries: Number of tries so far.
65 class _TestCollection(object):
66 """A threadsafe collection of tests.
69 tests: List of tests to put in the collection.
72 def __init__(self, tests=None):
75 self._lock = threading.Lock()
77 self._tests_in_progress = 0
78 # Used to signal that an item is avaliable or all items have been handled.
79 self._item_avaliable_or_all_done = threading.Event()
84 """Pop a test from the collection.
86 Waits until a test is avaliable or all tests have been handled.
89 A test or None if all tests have been handled.
92 # Wait for a test to be avaliable or all tests to have been handled.
93 self._item_avaliable_or_all_done.wait()
95 # Check which of the two conditions triggered the signal.
96 if self._tests_in_progress == 0:
99 return self._tests.pop(0)
101 # Another thread beat us to the avaliable test, wait again.
102 self._item_avaliable_or_all_done.clear()
105 """Add an test to the collection.
111 self._tests.append(test)
112 self._item_avaliable_or_all_done.set()
113 self._tests_in_progress += 1
115 def test_completed(self):
116 """Indicate that a test has been fully handled."""
118 self._tests_in_progress -= 1
119 if self._tests_in_progress == 0:
120 # All tests have been handled, signal all waiting threads.
121 self._item_avaliable_or_all_done.set()
124 """Iterate through tests in the collection until all have been handled."""
132 """Return the number of tests currently in the collection."""
133 return len(self._tests)
136 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
137 num_retries, tag_results_with_device=False):
138 """Runs tests from the test_collection until empty using the given runner.
140 Adds TestRunResults objects to the out_results list and may add tests to the
144 runner: A TestRunner object used to run the tests.
145 test_collection: A _TestCollection from which to get _Test objects to run.
146 out_results: A list to add TestRunResults to.
147 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
148 num_retries: Number of retries for a test.
149 tag_results_with_device: If True, appends the name of the device on which
150 the test was run to the test name. Used when replicating to identify
151 which device ran each copy of the test, and to ensure each copy of the
152 test is recorded separately.
155 def TagTestRunResults(test_run_results):
156 """Tags all results with the last 4 digits of the device id.
158 Used when replicating tests to distinguish the same tests run on different
159 devices. We use a set to store test results, so the hash (generated from
160 name and tag) must be unique to be considered different results.
162 new_test_run_results = base_test_result.TestRunResults()
163 for test_result in test_run_results.GetAll():
164 test_result.SetName('%s_%s' % (runner.device_serial[-4:],
165 test_result.GetName()))
166 new_test_run_results.AddResult(test_result)
167 return new_test_run_results
169 for test in test_collection:
172 if runner.device_serial not in android_commands.GetAttachedDevices():
173 # Device is unresponsive, stop handling tests on this device.
174 msg = 'Device %s is unresponsive.' % runner.device_serial
176 raise adb_wrapper.DeviceUnreachableError(msg)
177 result, retry = runner.RunTest(test.test)
178 if tag_results_with_device:
179 result = TagTestRunResults(result)
181 if retry and test.tries <= num_retries:
182 # Retry non-passing results, only record passing results.
183 pass_results = base_test_result.TestRunResults()
184 pass_results.AddResults(result.GetPass())
185 out_results.append(pass_results)
186 logging.warning('Will retry test, try #%s.' % test.tries)
187 test_collection.add(_Test(test=retry, tries=test.tries))
189 # All tests passed or retry limit reached. Either way, record results.
190 out_results.append(result)
192 # An unhandleable exception, ensure tests get run by another device and
193 # reraise this exception on the main thread.
194 test_collection.add(test)
197 # Retries count as separate tasks so always mark the popped test as done.
198 test_collection.test_completed()
201 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
202 """Creates a test runner for each device and calls SetUp() in parallel.
204 Note: if a device is unresponsive the corresponding TestRunner will not be
205 added to out_runners.
208 runner_factory: Callable that takes a device and index and returns a
210 device: The device serial number to set up.
211 out_runners: List to add the successfully set up TestRunner object.
212 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
215 index = threadsafe_counter.GetAndIncrement()
216 logging.warning('Creating shard %s for device %s.', index, device)
217 runner = runner_factory(device, index)
219 out_runners.append(runner)
220 except (adb_wrapper.DeviceUnreachableError,
221 # TODO(jbudorick) Remove this once the underlying implementations
222 # for the above are switched or wrapped.
223 android_commands.errors.DeviceUnresponsiveError) as e:
224 logging.warning('Failed to create shard for %s: [%s]', device, e)
227 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
228 tag_results_with_device=False):
229 """Run all tests using the given TestRunners.
232 runners: A list of TestRunner objects.
233 test_collection_factory: A callable to generate a _TestCollection object for
235 num_retries: Number of retries for a test.
236 timeout: Watchdog timeout in seconds.
237 tag_results_with_device: If True, appends the name of the device on which
238 the test was run to the test name. Used when replicating to identify
239 which device ran each copy of the test, and to ensure each copy of the
240 test is recorded separately.
243 A tuple of (TestRunResults object, exit code)
245 logging.warning('Running tests with %s test runners.' % (len(runners)))
248 run_results = base_test_result.TestRunResults()
249 watcher = watchdog_timer.WatchdogTimer(timeout)
250 test_collections = [test_collection_factory() for _ in runners]
253 reraiser_thread.ReraiserThread(
255 [r, tc, results, watcher, num_retries, tag_results_with_device],
256 name=r.device_serial[-4:])
257 for r, tc in zip(runners, test_collections)]
259 workers = reraiser_thread.ReraiserThreadGroup(threads)
262 # Catch DeviceUnreachableErrors and set a warning exit code
264 workers.JoinAll(watcher)
265 except (adb_wrapper.DeviceUnreachableError,
266 # TODO(jbudorick) Remove this once the underlying implementations
267 # for the above are switched or wrapped.
268 android_commands.errors.DeviceUnresponsiveError) as e:
270 exit_code = constants.WARNING_EXIT_CODE
272 assert all([len(tc) == 0 for tc in test_collections]), (
273 'Some tests were not run, all devices are likely offline (ran %d tests)' %
274 len(run_results.GetAll()))
277 run_results.AddTestRunResults(r)
278 if not run_results.DidRunPass():
279 exit_code = constants.ERROR_EXIT_CODE
280 return (run_results, exit_code)
283 def _CreateRunners(runner_factory, devices, timeout=None):
284 """Creates a test runner for each device and calls SetUp() in parallel.
286 Note: if a device is unresponsive the corresponding TestRunner will not be
287 included in the returned list.
290 runner_factory: Callable that takes a device and index and returns a
292 devices: List of device serial numbers as strings.
293 timeout: Watchdog timeout in seconds, defaults to the default timeout.
296 A list of TestRunner objects.
298 logging.warning('Creating %s test runners.' % len(devices))
300 counter = _ThreadSafeCounter()
301 threads = reraiser_thread.ReraiserThreadGroup(
302 [reraiser_thread.ReraiserThread(_SetUp,
303 [runner_factory, d, runners, counter],
307 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
311 def _TearDownRunners(runners, timeout=None):
312 """Calls TearDown() for each test runner in parallel.
315 runners: A list of TestRunner objects.
316 timeout: Watchdog timeout in seconds, defaults to the default timeout.
318 threads = reraiser_thread.ReraiserThreadGroup(
319 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:])
322 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
325 def RunTests(tests, runner_factory, devices, shard=True,
326 test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
328 """Run all tests on attached devices, retrying tests that don't pass.
331 tests: List of tests to run.
332 runner_factory: Callable that takes a device and index and returns a
334 devices: List of attached devices.
335 shard: True if we should shard, False if we should replicate tests.
336 - Sharding tests will distribute tests across all test runners through a
337 shared test collection.
338 - Replicating tests will copy all tests to each test runner through a
339 unique test collection for each test runner.
340 test_timeout: Watchdog timeout in seconds for running tests.
341 setup_timeout: Watchdog timeout in seconds for creating and cleaning up
343 num_retries: Number of retries for a test.
346 A tuple of (base_test_result.TestRunResults object, exit code).
349 logging.critical('No tests to run.')
350 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
353 # Generate a shared _TestCollection object for all test runners, so they
354 # draw from a common pool of tests.
355 shared_test_collection = _TestCollection([_Test(t) for t in tests])
356 test_collection_factory = lambda: shared_test_collection
357 tag_results_with_device = False
358 log_string = 'sharded across devices'
360 # Generate a unique _TestCollection object for each test runner, but use
361 # the same set of tests.
362 test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
363 tag_results_with_device = True
364 log_string = 'replicated on each device'
366 logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
367 runners = _CreateRunners(runner_factory, devices, setup_timeout)
369 return _RunAllTests(runners, test_collection_factory,
370 num_retries, test_timeout, tag_results_with_device)
373 _TearDownRunners(runners, setup_timeout)
374 except (adb_wrapper.DeviceUnreachableError,
375 # TODO(jbudorick) Remove this once the underlying implementations
376 # for the above are switched or wrapped.
377 android_commands.errors.DeviceUnresponsiveError) as e:
378 logging.warning('Device unresponsive during TearDown: [%s]', e)