2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
6 """Client tool to trigger tasks or retrieve results from a Swarming server."""
22 from third_party import colorama
23 from third_party.depot_tools import fix_encoding
24 from third_party.depot_tools import subcommand
26 from utils import file_path
27 from third_party.chromium import natsort
29 from utils import threading_utils
30 from utils import tools
31 from utils import zip_package
38 ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
39 TOOLS_PATH = os.path.join(ROOT_DIR, 'tools')
42 # The default time to wait for a shard to finish running.
43 DEFAULT_SHARD_WAIT_TIME = 80 * 60.
47 'No output produced by the task, it may have failed to run.\n'
51 class Failure(Exception):
52 """Generic failure."""
56 class Manifest(object):
57 """Represents a Swarming task manifest.
59 Also includes code to zip code and upload itself.
62 self, isolate_server, namespace, isolated_hash, task_name, shards, env,
63 dimensions, working_dir, deadline, verbose, profile, priority):
64 """Populates a manifest object.
66 isolate_server - isolate server url.
67 namespace - isolate server namespace to use.
68 isolated_hash - The manifest's sha-1 that the slave is going to fetch.
69 task_name - The name to give the task request.
70 shards - The number of swarming shards to request.
71 env - environment variables to set.
72 dimensions - dimensions to filter the task on.
73 working_dir - Relative working directory to start the script.
74 deadline - maximum pending time before this task expires.
75 verbose - if True, have the slave print more details.
76 profile - if True, have the slave print more timing data.
77 priority - int between 0 and 1000, lower the higher priority.
79 self.isolate_server = isolate_server
80 self.namespace = namespace
81 # The reason is that swarm_bot doesn't understand compressed data yet. So
82 # the data to be downloaded by swarm_bot is in 'default', independent of
83 # what run_isolated.py is going to fetch.
84 self.storage = isolateserver.get_storage(isolate_server, 'default')
86 self.isolated_hash = isolated_hash
87 self.bundle = zip_package.ZipPackage(ROOT_DIR)
89 self._task_name = task_name
91 self._env = env.copy()
92 self._dimensions = dimensions.copy()
93 self._working_dir = working_dir
94 self._deadline = deadline
96 self.verbose = bool(verbose)
97 self.profile = bool(profile)
98 self.priority = priority
100 self._isolate_item = None
103 def add_task(self, task_name, actions, time_out=600):
104 """Appends a new task as a TestObject to the swarming manifest file.
106 Tasks cannot be added once the manifest was uploaded.
108 See TestObject in services/swarming/src/common/test_request_message.py for
111 assert not self._isolate_item
115 'decorate_output': self.verbose,
116 'test_name': task_name,
117 'time_out': time_out,
121 """Exports the current configuration into a swarm-readable manifest file.
123 The actual serialization format is defined as a TestCase object as described
124 in services/swarming/src/common/test_request_message.py
126 This function doesn't mutate the object.
131 # Is a TestConfiguration.
133 'config_name': 'isolated',
134 'deadline_to_run': self._deadline,
135 'dimensions': self._dimensions,
136 'min_instances': self._shards,
137 'priority': self.priority,
142 'env_vars': self._env,
143 'restart_on_failure': True,
144 'test_case_name': self._task_name,
145 'tests': self._tasks,
146 'working_dir': self._working_dir,
148 if self._isolate_item:
149 request['data'].append(
151 self.storage.get_fetch_url(self._isolate_item),
154 return json.dumps(request, sort_keys=True, separators=(',',':'))
157 def isolate_item(self):
158 """Calling this property 'closes' the manifest and it can't be modified
161 if self._isolate_item is None:
162 self._isolate_item = isolateserver.BufferItem(
163 self.bundle.zip_into_buffer(), high_priority=True)
164 return self._isolate_item
167 def zip_and_upload(manifest):
168 """Zips up all the files necessary to run a manifest and uploads to Swarming
172 start_time = time.time()
173 with manifest.storage:
174 uploaded = manifest.storage.upload_items([manifest.isolate_item])
175 elapsed = time.time() - start_time
176 except (IOError, OSError) as exc:
177 tools.report_error('Failed to upload the zip file: %s' % exc)
180 if manifest.isolate_item in uploaded:
181 logging.info('Upload complete, time elapsed: %f', elapsed)
183 logging.info('Zip file already on server, time elapsed: %f', elapsed)
188 """Exists so it can be mocked easily."""
192 def get_task_keys(swarm_base_url, task_name):
193 """Returns the Swarming task key for each shards of task_name."""
194 key_data = urllib.urlencode([('name', task_name)])
195 url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data)
197 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
198 result = net.url_read(url, retry_404=True)
201 'Error: Unable to find any task with the name, %s, on swarming server'
204 # TODO(maruel): Compare exact string.
205 if 'No matching' in result:
206 logging.warning('Unable to find any task with the name, %s, on swarming '
207 'server' % task_name)
209 return json.loads(result)
212 'Error: Unable to find any task with the name, %s, on swarming server'
216 def retrieve_results(base_url, task_key, timeout, should_stop):
217 """Retrieves results for a single task_key."""
218 assert isinstance(timeout, float), timeout
219 params = [('r', task_key)]
220 result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params))
223 if timeout and (now() - start) >= timeout:
224 logging.error('retrieve_results(%s) timed out', base_url)
226 # Do retries ourselves.
227 response = net.url_read(result_url, retry_404=False, retry_50x=False)
229 # Aggressively poll for results. Do not use retry_404 so
230 # should_stop is polled more often.
231 remaining = min(5, timeout - (now() - start)) if timeout else 5
233 if should_stop.get():
235 net.sleep_before_retry(1, remaining)
238 data = json.loads(response) or {}
239 except (ValueError, TypeError):
241 'Received corrupted data for task_key %s. Retrying.', task_key)
245 if should_stop.get():
249 def yield_results(swarm_base_url, task_keys, timeout, max_threads):
250 """Yields swarming task results from the swarming server as (index, result).
252 Duplicate shards are ignored, the first one to complete is returned.
254 max_threads is optional and is used to limit the number of parallel fetches
255 done. Since in general the number of task_keys is in the range <=10, it's not
256 worth normally to limit the number threads. Mostly used for testing purposes.
259 (index, result). In particular, 'result' is defined as the
260 GetRunnerResults() function in services/swarming/server/test_runner.py.
262 shards_remaining = range(len(task_keys))
264 min(max_threads, len(task_keys)) if max_threads else len(task_keys))
265 should_stop = threading_utils.Bit()
266 results_remaining = len(task_keys)
267 with threading_utils.ThreadPool(number_threads, number_threads, 0) as pool:
269 for task_key in task_keys:
271 0, retrieve_results, swarm_base_url, task_key, timeout, should_stop)
272 while shards_remaining and results_remaining:
273 result = pool.get_one_result()
274 results_remaining -= 1
276 # Failed to retrieve one key.
277 logging.error('Failed to retrieve the results for a swarming key')
279 shard_index = result['config_instance_index']
280 if shard_index in shards_remaining:
281 shards_remaining.remove(shard_index)
282 yield shard_index, result
284 logging.warning('Ignoring duplicate shard index %d', shard_index)
285 # Pop the last entry, there's no such shard.
286 shards_remaining.pop()
288 # Done, kill the remaining threads.
292 def chromium_setup(manifest):
293 """Sets up the commands to run.
295 Highly chromium specific.
297 # Add uncompressed zip here. It'll be compressed as part of the package sent
298 # to Swarming server.
299 run_test_name = 'run_isolated.zip'
300 manifest.bundle.add_buffer(run_test_name,
301 run_isolated.get_as_zip_package().zip_into_buffer(compress=False))
303 cleanup_script_name = 'swarm_cleanup.py'
304 manifest.bundle.add_file(os.path.join(TOOLS_PATH, cleanup_script_name),
308 'python', run_test_name,
309 '--hash', manifest.isolated_hash,
310 '--namespace', manifest.namespace,
312 if file_path.is_url(manifest.isolate_server):
313 run_cmd.extend(('--isolate-server', manifest.isolate_server))
315 run_cmd.extend(('--indir', manifest.isolate_server))
317 if manifest.verbose or manifest.profile:
318 # Have it print the profiling section.
319 run_cmd.append('--verbose')
320 manifest.add_task('Run Test', run_cmd)
323 manifest.add_task('Clean Up', ['python', cleanup_script_name])
326 def googletest_setup(env, shards):
327 """Sets googletest specific environment variables."""
330 env['GTEST_SHARD_INDEX'] = '%(instance_index)s'
331 env['GTEST_TOTAL_SHARDS'] = '%(num_instances)s'
335 def archive(isolate_server, namespace, isolated, algo, verbose):
336 """Archives a .isolated and all the dependencies on the CAC."""
337 logging.info('archive(%s, %s, %s)', isolate_server, namespace, isolated)
339 if file_path.is_url(isolate_server):
341 flag = '--isolate-server'
343 command = 'hashtable'
346 print('Archiving: %s' % isolated)
350 os.path.join(ROOT_DIR, 'isolate.py'),
352 flag, isolate_server,
353 '--namespace', namespace,
354 '--isolated', isolated,
356 cmd.extend(['--verbose'] * verbose)
357 logging.info(' '.join(cmd))
358 if subprocess.call(cmd, verbose):
360 return isolateserver.hash_file(isolated, algo)
363 shutil.rmtree(tempdir)
366 def process_manifest(
367 swarming, isolate_server, namespace, isolated_hash, task_name, shards,
368 dimensions, env, working_dir, deadline, verbose, profile, priority):
369 """Processes the manifest file and send off the swarming task request."""
372 isolate_server=isolate_server,
374 isolated_hash=isolated_hash,
377 dimensions=dimensions,
379 working_dir=working_dir,
384 except ValueError as e:
385 tools.report_error('Unable to process %s: %s' % (task_name, e))
388 chromium_setup(manifest)
390 logging.info('Zipping up files...')
391 if not zip_and_upload(manifest):
394 logging.info('Server: %s', swarming)
395 logging.info('Task name: %s', task_name)
396 trigger_url = swarming + '/test'
397 manifest_text = manifest.to_json()
398 result = net.url_read(trigger_url, data={'request': manifest_text})
401 'Failed to trigger task %s\n%s' % (task_name, trigger_url))
405 except (ValueError, TypeError) as e:
407 'Failed to trigger task %s' % task_name,
408 'Manifest: %s' % manifest_text,
409 'Bad response: %s' % result,
411 tools.report_error(msg)
416 def isolated_to_hash(isolate_server, namespace, arg, algo, verbose):
417 """Archives a .isolated file if needed.
419 Returns the file hash to trigger and a bool specifying if it was a file (True)
422 if arg.endswith('.isolated'):
423 file_hash = archive(isolate_server, namespace, arg, algo, verbose)
425 tools.report_error('Archival failure %s' % arg)
427 return file_hash, True
428 elif isolateserver.is_valid_hash(arg, algo):
431 tools.report_error('Invalid hash %s' % arg)
439 file_hash_or_isolated,
449 """Sends off the hash swarming task requests."""
450 file_hash, is_file = isolated_to_hash(
451 isolate_server, namespace, file_hash_or_isolated, hashlib.sha1, verbose)
455 # If a file name was passed, use its base name of the isolated hash.
456 # Otherwise, use user name as an approximation of a task name.
458 key = os.path.splitext(os.path.basename(file_hash_or_isolated))[0]
460 key = getpass.getuser()
461 task_name = '%s/%s/%s' % (
463 '_'.join('%s=%s' % (k, v) for k, v in sorted(dimensions.iteritems())),
466 env = googletest_setup(env, shards)
467 # TODO(maruel): It should first create a request manifest object, then pass
468 # it to a function to zip, archive and trigger.
469 result = process_manifest(
471 isolate_server=isolate_server,
473 isolated_hash=file_hash,
476 dimensions=dimensions,
479 working_dir=working_dir,
483 return result, task_name
486 def decorate_shard_output(result, shard_exit_code):
487 """Returns wrapped output for swarming task shard."""
488 tag = 'index %s (machine tag: %s, id: %s)' % (
489 result['config_instance_index'],
490 result['machine_id'],
491 result.get('machine_tag', 'unknown'))
494 '================================================================\n'
495 'Begin output from shard %s\n'
496 '================================================================\n'
499 '================================================================\n'
500 'End output from shard %s. Return %d\n'
501 '================================================================\n'
502 ) % (tag, result['output'] or NO_OUTPUT_FOUND, tag, shard_exit_code)
505 def collect(url, task_name, timeout, decorate):
506 """Retrieves results of a Swarming task."""
507 logging.info('Collecting %s', task_name)
508 task_keys = get_task_keys(url, task_name)
510 raise Failure('No task keys to get results with.')
513 for _index, output in yield_results(url, task_keys, timeout, None):
514 shard_exit_codes = (output['exit_codes'] or '1').split(',')
515 shard_exit_code = max(int(i) for i in shard_exit_codes)
517 print decorate_shard_output(output, shard_exit_code)
521 output['machine_id'],
522 output['machine_tag'],
523 output['exit_codes']))
524 print(''.join(' %s\n' % l for l in output['output'].splitlines()))
525 exit_code = exit_code or shard_exit_code
526 return exit_code if exit_code is not None else 1
529 def add_filter_options(parser):
530 parser.filter_group = tools.optparse.OptionGroup(parser, 'Filtering slaves')
531 parser.filter_group.add_option(
532 '-d', '--dimension', default=[], action='append', nargs=2,
533 dest='dimensions', metavar='FOO bar',
534 help='dimension to filter on')
535 parser.add_option_group(parser.filter_group)
538 def add_trigger_options(parser):
539 """Adds all options to trigger a task on Swarming."""
540 isolateserver.add_isolate_server_options(parser, True)
541 add_filter_options(parser)
543 parser.task_group = tools.optparse.OptionGroup(parser, 'Task properties')
544 parser.task_group.add_option(
545 '-w', '--working-dir', default='swarm_tests',
546 help='Working directory on the swarming slave side. default: %default.')
547 parser.task_group.add_option(
548 '--working_dir', help=tools.optparse.SUPPRESS_HELP)
549 parser.task_group.add_option(
550 '-e', '--env', default=[], action='append', nargs=2, metavar='FOO bar',
551 help='environment variables to set')
552 parser.task_group.add_option(
553 '--priority', type='int', default=100,
554 help='The lower value, the more important the task is')
555 parser.task_group.add_option(
556 '--shards', type='int', default=1, help='number of shards to use')
557 parser.task_group.add_option(
559 help='Display name of the task. It uniquely identifies the task. '
560 'Defaults to <base_name>/<dimensions>/<isolated hash> if an '
561 'isolated file is provided, if a hash is provided, it defaults to '
562 '<user>/<dimensions>/<isolated hash>')
563 parser.task_group.add_option(
564 '--deadline', type='int', default=6*60*60,
565 help='Seconds to allow the task to be pending for a bot to run before '
566 'this task request expires.')
567 parser.add_option_group(parser.task_group)
568 # TODO(maruel): This is currently written in a chromium-specific way.
569 parser.group_logging.add_option(
570 '--profile', action='store_true',
571 default=bool(os.environ.get('ISOLATE_DEBUG')),
572 help='Have run_isolated.py print profiling info')
575 def process_trigger_options(parser, options, args):
576 isolateserver.process_isolate_server_options(parser, options)
578 parser.error('Must pass one .isolated file or its hash (sha1).')
579 options.dimensions = dict(options.dimensions)
580 if not options.dimensions:
581 parser.error('Please at least specify one --dimension')
584 def add_collect_options(parser):
585 parser.server_group.add_option(
588 default=DEFAULT_SHARD_WAIT_TIME,
589 help='Timeout to wait for result, set to 0 for no timeout; default: '
591 parser.group_logging.add_option(
592 '--decorate', action='store_true', help='Decorate output')
595 @subcommand.usage('task_name')
596 def CMDcollect(parser, args):
597 """Retrieves results of a Swarming task.
599 The result can be in multiple part if the execution was sharded. It can
600 potentially have retries.
602 add_collect_options(parser)
603 (options, args) = parser.parse_args(args)
605 parser.error('Must specify one task name.')
607 parser.error('Must specify only one task name.')
610 return collect(options.swarming, args[0], options.timeout, options.decorate)
612 tools.report_error(e)
616 def CMDquery(parser, args):
617 """Returns information about the bots connected to the Swarming server."""
618 add_filter_options(parser)
619 parser.filter_group.add_option(
620 '--dead-only', action='store_true',
621 help='Only print dead bots, useful to reap them and reimage broken bots')
622 parser.filter_group.add_option(
623 '-k', '--keep-dead', action='store_true',
624 help='Do not filter out dead bots')
625 parser.filter_group.add_option(
626 '-b', '--bare', action='store_true',
627 help='Do not print out dimensions')
628 options, args = parser.parse_args(args)
630 if options.keep_dead and options.dead_only:
631 parser.error('Use only one of --keep-dead and --dead-only')
632 service = net.get_http_service(options.swarming)
633 data = service.json_request('GET', '/swarming/api/v1/bots')
635 print >> sys.stderr, 'Failed to access %s' % options.swarming
637 timeout = datetime.timedelta(seconds=data['machine_death_timeout'])
638 utcnow = datetime.datetime.utcnow()
639 for machine in natsort.natsorted(data['machines'], key=lambda x: x['tag']):
640 last_seen = datetime.datetime.strptime(
641 machine['last_seen'], '%Y-%m-%d %H:%M:%S')
642 is_dead = utcnow - last_seen > timeout
643 if options.dead_only:
646 elif not options.keep_dead and is_dead:
649 # If the user requested to filter on dimensions, ensure the bot has all the
650 # dimensions requested.
651 dimensions = machine['dimensions']
652 for key, value in options.dimensions:
653 if key not in dimensions:
655 # A bot can have multiple value for a key, for example,
656 # {'os': ['Windows', 'Windows-6.1']}, so that --dimension os=Windows will
658 if isinstance(dimensions[key], list):
659 if value not in dimensions[key]:
662 if value != dimensions[key]:
667 print ' %s' % dimensions
671 @subcommand.usage('[hash|isolated]')
672 def CMDrun(parser, args):
673 """Triggers a task and wait for the results.
675 Basically, does everything to run a command remotely.
677 add_trigger_options(parser)
678 add_collect_options(parser)
679 options, args = parser.parse_args(args)
680 process_trigger_options(parser, options, args)
683 result, task_name = trigger(
684 swarming=options.swarming,
685 isolate_server=options.isolate_server or options.indir,
686 namespace=options.namespace,
687 file_hash_or_isolated=args[0],
688 task_name=options.task_name,
689 shards=options.shards,
690 dimensions=options.dimensions,
691 env=dict(options.env),
692 working_dir=options.working_dir,
693 deadline=options.deadline,
694 verbose=options.verbose,
695 profile=options.profile,
696 priority=options.priority)
699 'Failed to trigger %s(%s): %s' %
700 (options.task_name, args[0], e.args[0]))
703 tools.report_error('Failed to trigger the task.')
705 if task_name != options.task_name:
706 print('Triggered task: %s' % task_name)
714 tools.report_error(e)
718 @subcommand.usage("(hash|isolated)")
719 def CMDtrigger(parser, args):
720 """Triggers a Swarming task.
722 Accepts either the hash (sha1) of a .isolated file already uploaded or the
723 path to an .isolated file to archive, packages it if needed and sends a
724 Swarming manifest file to the Swarming server.
726 If an .isolated file is specified instead of an hash, it is first archived.
728 add_trigger_options(parser)
729 options, args = parser.parse_args(args)
730 process_trigger_options(parser, options, args)
733 result, task_name = trigger(
734 swarming=options.swarming,
735 isolate_server=options.isolate_server or options.indir,
736 namespace=options.namespace,
737 file_hash_or_isolated=args[0],
738 task_name=options.task_name,
739 dimensions=options.dimensions,
740 shards=options.shards,
741 env=dict(options.env),
742 working_dir=options.working_dir,
743 deadline=options.deadline,
744 verbose=options.verbose,
745 profile=options.profile,
746 priority=options.priority)
747 if task_name != options.task_name and not result:
748 print('Triggered task: %s' % task_name)
751 tools.report_error(e)
755 class OptionParserSwarming(tools.OptionParserWithLogging):
756 def __init__(self, **kwargs):
757 tools.OptionParserWithLogging.__init__(
758 self, prog='swarming.py', **kwargs)
759 self.server_group = tools.optparse.OptionGroup(self, 'Server')
760 self.server_group.add_option(
762 metavar='URL', default=os.environ.get('SWARMING_SERVER', ''),
763 help='Swarming server to use')
764 self.add_option_group(self.server_group)
765 auth.add_auth_options(self)
767 def parse_args(self, *args, **kwargs):
768 options, args = tools.OptionParserWithLogging.parse_args(
769 self, *args, **kwargs)
770 options.swarming = options.swarming.rstrip('/')
771 if not options.swarming:
772 self.error('--swarming is required.')
773 auth.process_auth_options(self, options)
778 dispatcher = subcommand.CommandDispatcher(__name__)
780 return dispatcher.execute(OptionParserSwarming(version=__version__), args)
781 except Exception as e:
782 tools.report_error(e)
786 if __name__ == '__main__':
787 fix_encoding.fix_encoding()
788 tools.disable_buffering()
790 sys.exit(main(sys.argv[1:]))