1 # Copyright 2014 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
5 """Toolset to run multiple Swarming tasks in parallel."""
14 BASE_DIR = os.path.dirname(os.path.abspath(__file__))
15 ROOT_DIR = os.path.dirname(BASE_DIR)
17 sys.path.insert(0, ROOT_DIR)
21 from utils import threading_utils
22 from utils import tools
26 return datetime.datetime.utcnow().isoformat().split('.', 1)[0]
29 def task_to_name(name, dimensions, isolated_hash):
30 """Returns a task name the same way swarming.py generates them."""
33 '_'.join('%s=%s' % (k, v) for k, v in sorted(dimensions.iteritems())),
37 def unique_task_to_name(name, dimensions, isolated_hash, now):
38 """Returns a task name that is guaranteed to be unique."""
41 task_to_name(name, dimensions, isolated_hash),
48 [sys.executable] + cmd, cwd=ROOT_DIR, stdout=subprocess.PIPE)
49 out = p.communicate()[0]
50 return p.returncode, out, time.time() - start
53 def trigger(swarming_server, isolate_server, task_name, isolated_hash, args):
54 """Triggers a specified .isolated file."""
56 'swarming.py', 'trigger',
57 '--swarming', swarming_server,
58 '--isolate-server', isolate_server,
59 '--task-name', task_name,
62 return capture(cmd + args)
65 def collect(swarming_server, task_name):
66 """Collects results of a swarming task."""
67 cmd = ['swarming.py', 'collect', '--swarming', swarming_server, task_name]
72 """Runners runs tasks in parallel on Swarming."""
74 self, swarming_server, isolate_server, add_task, progress,
76 self.swarming_server = swarming_server
77 self.isolate_server = isolate_server
78 self.add_task = add_task
79 self.progress = progress
80 self.extra_trigger_args = extra_trigger_args
82 def trigger(self, task_name, isolated_hash, dimensions):
83 args = sum((['--dimension', k, v] for k, v in dimensions.iteritems()), [])
84 returncode, stdout, duration = trigger(
89 self.extra_trigger_args + args)
90 step_name = '%s (%3.2fs)' % (task_name, duration)
92 line = 'Failed to trigger %s\n%s' % (step_name, stdout)
93 self.progress.update_item(line, index=1)
95 self.progress.update_item('Triggered %s' % step_name, index=1)
96 self.add_task(0, self.collect, task_name, dimensions)
98 def collect(self, task_name, dimensions):
99 returncode, stdout, duration = collect(self.swarming_server, task_name)
100 step_name = '%s (%3.2fs)' % (task_name, duration)
102 # Only print the output for failures, successes are unexciting.
103 self.progress.update_item(
104 'Failed %s:\n%s' % (step_name, stdout), index=1)
105 return (task_name, dimensions, stdout)
106 self.progress.update_item('Passed %s' % step_name, index=1)
109 def run_swarming_tasks_parallel(
110 swarming_server, isolate_server, extra_trigger_args, tasks):
111 """Triggers swarming tasks in parallel and gets results.
113 This is done by using one thread per task and shelling out swarming.py.
116 extra_trigger_args: list of additional flags to pass down to
117 'swarming.py trigger'
118 tasks: list of tuple(task_name, isolated_hash, dimensions) where dimension
119 are --dimension flags to provide when triggering the task.
122 tuple(name, dimensions, stdout) for the tasks that failed.
128 progress = threading_utils.Progress([('index', 0), ('size', total)])
129 progress.use_cr_only = False
131 with threading_utils.ThreadPoolWithProgress(
132 progress, runs, runs, total) as pool:
134 swarming_server, isolate_server, pool.add_task, progress,
137 for task_name, isolated_hash, dimensions in tasks:
138 pool.add_task(0, runner.trigger, task_name, isolated_hash, dimensions)
140 # Runner.collect() only return task failures.
141 for failed_task in pool.iter_results():
142 task_name, dimensions, stdout = failed_task
143 yield task_name, dimensions, stdout
144 failed_tasks.append(task_name)
146 duration = time.time() - start
147 print('\nCompleted in %3.2fs' % duration)
149 print('Detected the following failures:')
150 for task in sorted(failed_tasks):
154 class OptionParser(tools.OptionParserWithLogging):
155 def __init__(self, **kwargs):
156 tools.OptionParserWithLogging.__init__(self, **kwargs)
157 self.server_group = tools.optparse.OptionGroup(self, 'Server')
158 self.server_group.add_option(
160 metavar='URL', default=os.environ.get('SWARMING_SERVER', ''),
161 help='Swarming server to use')
162 isolateserver.add_isolate_server_options(self.server_group, False)
163 self.add_option_group(self.server_group)
164 auth.add_auth_options(self)
166 '-d', '--dimension', default=[], action='append', nargs=2,
167 dest='dimensions', metavar='FOO bar',
168 help='dimension to filter on')
170 '--priority', type='int',
171 help='The lower value, the more important the task is. It may be '
172 'important to specify a higher priority since the default value '
173 'will make the task to be triggered only when the slaves are idle.')
175 '--deadline', type='int', default=6*60*60,
176 help='Seconds to allow the task to be pending for a bot to run before '
177 'this task request expires.')
179 def parse_args(self, *args, **kwargs):
180 options, args = tools.OptionParserWithLogging.parse_args(
181 self, *args, **kwargs)
182 options.swarming = options.swarming.rstrip('/')
183 if not options.swarming:
184 self.error('--swarming is required.')
185 auth.process_auth_options(self, options)
186 isolateserver.process_isolate_server_options(self.server_group, options)
187 options.dimensions = dict(options.dimensions)
190 def format_description(self, _):
191 return self.description