06344cf59c3813136ae31a7a34e96ea8f7042c07
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / tools / parallel_execution.py
1 # Copyright 2014 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
4
5 """Toolset to run multiple Swarming tasks in parallel."""
6
7 import datetime
8 import getpass
9 import os
10 import subprocess
11 import sys
12 import time
13
14 BASE_DIR = os.path.dirname(os.path.abspath(__file__))
15 ROOT_DIR = os.path.dirname(BASE_DIR)
16
17 sys.path.insert(0, ROOT_DIR)
18
19 import auth
20 import isolateserver
21 from utils import threading_utils
22 from utils import tools
23
24
25 def timestamp():
26   return datetime.datetime.utcnow().isoformat().split('.', 1)[0]
27
28
29 def task_to_name(name, dimensions, isolated_hash):
30   """Returns a task name the same way swarming.py generates them."""
31   return '%s/%s/%s' % (
32       name,
33       '_'.join('%s=%s' % (k, v) for k, v in sorted(dimensions.iteritems())),
34       isolated_hash)
35
36
37 def unique_task_to_name(name, dimensions, isolated_hash, now):
38   """Returns a task name that is guaranteed to be unique."""
39   return '%s/%s/%s' % (
40       getpass.getuser(),
41       task_to_name(name, dimensions, isolated_hash),
42       now)
43
44
45 def capture(cmd):
46   start = time.time()
47   p = subprocess.Popen(
48       [sys.executable] + cmd, cwd=ROOT_DIR, stdout=subprocess.PIPE)
49   out = p.communicate()[0]
50   return p.returncode, out, time.time() - start
51
52
53 def trigger(swarming_server, isolate_server, task_name, isolated_hash, args):
54   """Triggers a specified .isolated file."""
55   cmd = [
56     'swarming.py', 'trigger',
57     '--swarming', swarming_server,
58     '--isolate-server', isolate_server,
59     '--task-name', task_name,
60     isolated_hash,
61   ]
62   return capture(cmd + args)
63
64
65 def collect(swarming_server, task_name):
66   """Collects results of a swarming task."""
67   cmd = ['swarming.py', 'collect', '--swarming', swarming_server, task_name]
68   return capture(cmd)
69
70
71 class Runner(object):
72   """Runners runs tasks in parallel on Swarming."""
73   def __init__(
74       self, swarming_server, isolate_server, add_task, progress,
75       extra_trigger_args):
76     self.swarming_server = swarming_server
77     self.isolate_server = isolate_server
78     self.add_task = add_task
79     self.progress = progress
80     self.extra_trigger_args = extra_trigger_args
81
82   def trigger(self, task_name, isolated_hash, dimensions):
83     args = sum((['--dimension', k, v] for k, v in dimensions.iteritems()), [])
84     returncode, stdout, duration = trigger(
85         self.swarming_server,
86         self.isolate_server,
87         task_name,
88         isolated_hash,
89         self.extra_trigger_args + args)
90     step_name = '%s (%3.2fs)' % (task_name, duration)
91     if returncode:
92       line = 'Failed to trigger %s\n%s' % (step_name, stdout)
93       self.progress.update_item(line, index=1)
94       return
95     self.progress.update_item('Triggered %s' % step_name, index=1)
96     self.add_task(0, self.collect, task_name, dimensions)
97
98   def collect(self, task_name, dimensions):
99     returncode, stdout, duration = collect(self.swarming_server, task_name)
100     step_name = '%s (%3.2fs)' % (task_name, duration)
101     if returncode:
102       # Only print the output for failures, successes are unexciting.
103       self.progress.update_item(
104           'Failed %s:\n%s' % (step_name, stdout), index=1)
105       return (task_name, dimensions, stdout)
106     self.progress.update_item('Passed %s' % step_name, index=1)
107
108
109 def run_swarming_tasks_parallel(
110     swarming_server, isolate_server, extra_trigger_args, tasks):
111   """Triggers swarming tasks in parallel and gets results.
112
113   This is done by using one thread per task and shelling out swarming.py.
114
115   Arguments:
116     extra_trigger_args: list of additional flags to pass down to
117         'swarming.py trigger'
118     tasks: list of tuple(task_name, isolated_hash, dimensions) where dimension
119         are --dimension flags to provide when triggering the task.
120
121   Yields:
122     tuple(name, dimensions, stdout) for the tasks that failed.
123   """
124   runs = len(tasks)
125   # triger + collect
126   total = 2 * runs
127   failed_tasks = []
128   progress = threading_utils.Progress([('index', 0), ('size', total)])
129   progress.use_cr_only = False
130   start = time.time()
131   with threading_utils.ThreadPoolWithProgress(
132       progress, runs, runs, total) as pool:
133     runner = Runner(
134         swarming_server, isolate_server, pool.add_task, progress,
135         extra_trigger_args)
136
137     for task_name, isolated_hash, dimensions in tasks:
138       pool.add_task(0, runner.trigger, task_name, isolated_hash, dimensions)
139
140     # Runner.collect() only return task failures.
141     for failed_task in pool.iter_results():
142       task_name, dimensions, stdout = failed_task
143       yield task_name, dimensions, stdout
144       failed_tasks.append(task_name)
145
146   duration = time.time() - start
147   print('\nCompleted in %3.2fs' % duration)
148   if failed_tasks:
149     print('Detected the following failures:')
150     for task in sorted(failed_tasks):
151       print('  %s' % task)
152
153
154 class OptionParser(tools.OptionParserWithLogging):
155   def __init__(self, **kwargs):
156     tools.OptionParserWithLogging.__init__(self, **kwargs)
157     self.server_group = tools.optparse.OptionGroup(self, 'Server')
158     self.server_group.add_option(
159         '-S', '--swarming',
160         metavar='URL', default=os.environ.get('SWARMING_SERVER', ''),
161         help='Swarming server to use')
162     isolateserver.add_isolate_server_options(self.server_group, False)
163     self.add_option_group(self.server_group)
164     auth.add_auth_options(self)
165     self.add_option(
166         '-d', '--dimension', default=[], action='append', nargs=2,
167         dest='dimensions', metavar='FOO bar',
168         help='dimension to filter on')
169     self.add_option(
170         '--priority', type='int',
171         help='The lower value, the more important the task is. It may be '
172             'important to specify a higher priority since the default value '
173             'will make the task to be triggered only when the slaves are idle.')
174     self.add_option(
175         '--deadline', type='int', default=6*60*60,
176         help='Seconds to allow the task to be pending for a bot to run before '
177             'this task request expires.')
178
179   def parse_args(self, *args, **kwargs):
180     options, args = tools.OptionParserWithLogging.parse_args(
181         self, *args, **kwargs)
182     options.swarming = options.swarming.rstrip('/')
183     if not options.swarming:
184       self.error('--swarming is required.')
185     auth.process_auth_options(self, options)
186     isolateserver.process_isolate_server_options(self.server_group, options)
187     options.dimensions = dict(options.dimensions)
188     return options, args
189
190   def format_description(self, _):
191     return self.description