2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
6 """Triggers a ton of fake jobs to test its handling under high load.
8 Generates an histogram with the latencies to process the tasks and number of
25 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
27 sys.path.insert(0, ROOT_DIR)
29 from third_party import colorama
33 from utils import graph
35 from utils import threading_utils
37 # Line too long (NN/80)
38 # pylint: disable=C0301
40 OS_NAME = 'Comodore64'
41 TASK_OUTPUT = 'This task ran with great success'
44 def print_results(results, columns, buckets):
45 delays = [i for i in results if isinstance(i, float)]
46 failures = [i for i in results if not isinstance(i, float)]
48 print('%sDELAYS%s:' % (colorama.Fore.RED, colorama.Fore.RESET))
49 graph.print_histogram(
50 graph.generate_histogram(delays, buckets), columns, ' %.3f')
52 print('Total items : %d' % len(results))
55 average = sum(delays)/ len(delays)
56 print('Average delay: %s' % graph.to_units(average))
60 print('%sEVENTS%s:' % (colorama.Fore.RED, colorama.Fore.RESET))
63 values.setdefault(f, 0)
65 graph.print_histogram(values, columns, ' %s')
69 def calculate_version(url):
70 """Retrieves the swarm_bot code and returns the SHA-1 for it."""
71 # Cannot use url_open() since zipfile requires .seek().
72 archive = zipfile.ZipFile(StringIO.StringIO(net.url_read(url)))
74 # https://code.google.com/p/swarming/source/browse/services/swarming/common/bot_archive.py
76 for f in archive.namelist():
77 d.update(archive.read(f))
81 class FakeSwarmBot(object):
82 """This is a Fake swarm_bot implementation simulating it is running
85 It polls for job, acts as if it was processing them and return the fake
89 self, swarming_url, dimensions, swarm_bot_version_hash, index, progress,
90 duration, events, kill_event):
91 self._lock = threading.Lock()
92 self._swarming = swarming_url
94 self._progress = progress
95 self._duration = duration
97 self._kill_event = kill_event
98 # Use an impossible hostname.
99 self._machine_id = '%s-%d' % (socket.getfqdn().lower(), index)
102 # https://code.google.com/p/swarming/source/browse/src/swarm_bot/slave_machine.py?repo=swarming-server
104 # https://chromium.googlesource.com/chromium/tools/build.git/ \
105 # +/master/scripts/tools/swarm_bootstrap/swarm_bootstrap.py
108 'dimensions': dimensions,
109 'id': self._machine_id,
111 'tag': self._machine_id,
112 'version': swarm_bot_version_hash,
115 self._thread = threading.Thread(target=self._run, name='bot%d' % index)
116 self._thread.daemon = True
123 return self._thread.is_alive()
127 self._progress.update_item('%d alive' % self._index, bots=1)
129 if self._kill_event.is_set():
131 data = {'attributes': json.dumps(self._attributes)}
132 request = net.url_open(self._swarming + '/poll_for_test', data=data)
134 self._events.put('poll_for_test_empty')
138 manifest = json.load(request)
140 self._progress.update_item('Failed to poll')
141 self._events.put('poll_for_test_invalid')
144 commands = [c['function'] for c in manifest.get('commands', [])]
147 self._events.put('sleep')
148 time.sleep(manifest['come_back'])
151 if commands == ['UpdateSlave']:
152 # Calculate the proper SHA-1 and loop again.
153 # This could happen if the Swarming server is upgraded while this
155 self._attributes['version'] = calculate_version(
156 manifest['commands'][0]['args'])
157 self._events.put('update_slave')
160 if commands != ['StoreFiles', 'RunCommands']:
161 self._progress.update_item(
162 'Unexpected RPC call %s\n%s' % (commands, manifest))
163 self._events.put('unknown_rpc')
166 # The normal way Swarming works is that it 'stores' a test_run.swarm
167 # file and then defer control to swarm_bot/local_test_runner.py.
168 store_cmd = manifest['commands'][0]
169 assert len(store_cmd['args']) == 1, store_cmd['args']
170 filepath, filename, test_run_content = store_cmd['args'][0]
171 assert filepath == ''
172 assert filename == 'test_run.swarm'
173 assert 'local_test_runner.py' in manifest['commands'][1]['args'][0], (
174 manifest['commands'][1])
175 result_url = manifest['result_url']
176 test_run = json.loads(test_run_content)
177 assert result_url == test_run['result_url']
178 ping_url = test_run['ping_url']
179 ping_delay = test_run['ping_delay']
180 self._progress.update_item('%d processing' % self._index, processing=1)
182 # Fake activity and send pings as requested.
184 remaining = max(0, (start + self._duration) - time.time())
185 if remaining > ping_delay:
186 # Include empty data to ensure the request is a POST request.
187 result = net.url_read(ping_url, data={})
188 assert result == 'Success.', result
189 remaining = max(0, (start + self._duration) - time.time())
192 time.sleep(remaining)
195 'c': test_run['configuration']['config_name'],
196 'n': test_run['test_run_name'],
198 'result_output': TASK_OUTPUT,
202 result = net.url_read(manifest['result_url'], data=data)
203 self._progress.update_item(
204 '%d processed' % self._index, processing=-1, processed=1)
206 self._events.put('result_url_fail')
208 assert result == 'Successfully update the runner results.', result
209 self._events.put(time.time() - start)
212 # Unregister itself. Otherwise the server will have tons of fake slaves
213 # that the admin will have to remove manually.
214 response = net.url_open(
215 self._swarming + '/delete_machine_stats',
216 data=[('r', self._machine_id)])
218 self._events.put('failed_unregister')
222 self._progress.update_item('%d quit' % self._index, bots=-1)
227 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
230 metavar='URL', default='',
231 help='Swarming server to use')
232 swarming.add_filter_options(parser)
233 # Use improbable values to reduce the chance of interferring with real slaves.
237 ('machine', os.uname()[4] + '-experimental'),
241 group = optparse.OptionGroup(parser, 'Load generated')
243 '--slaves', type='int', default=300, metavar='N',
244 help='Number of swarm bot slaves, default: %default')
246 '-c', '--consume', type='float', default=60., metavar='N',
247 help='Duration (s) for consuming a request, default: %default')
248 parser.add_option_group(group)
250 group = optparse.OptionGroup(parser, 'Display options')
252 '--columns', type='int', default=graph.get_console_width(), metavar='N',
253 help='For histogram display, default:%default')
255 '--buckets', type='int', default=20, metavar='N',
256 help='Number of buckets for histogram display, default:%default')
257 parser.add_option_group(group)
260 '--dump', metavar='FOO.JSON', help='Dumps to json file')
262 '-v', '--verbose', action='store_true', help='Enables logging')
264 options, args = parser.parse_args()
265 logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL)
267 parser.error('Unsupported args: %s' % args)
268 options.swarming = options.swarming.rstrip('/')
269 if not options.swarming:
270 parser.error('--swarming is required.')
271 if options.consume <= 0:
272 parser.error('Needs --consume > 0. 0.01 is a valid value.')
273 swarming.process_filter_options(parser, options)
276 'Running %d slaves, each task lasting %.1fs' % (
277 options.slaves, options.consume))
278 print('Ctrl-C to exit.')
279 print('[processing/processed/bots]')
280 columns = [('processing', 0), ('processed', 0), ('bots', 0)]
281 progress = threading_utils.Progress(columns)
282 events = Queue.Queue()
284 kill_event = threading.Event()
285 swarm_bot_version_hash = calculate_version(
286 options.swarming + '/get_slave_code')
289 options.swarming, options.dimensions, swarm_bot_version_hash, i, progress,
290 options.consume, events, kill_event)
291 for i in range(options.slaves)
294 # Wait for all the slaves to come alive.
295 while not all(s.is_alive() for s in slaves):
297 progress.update_item('Ready to run')
299 progress.print_update()
301 # The slaves could be told to die.
302 slaves = [s for s in slaves if s.is_alive()]
303 except KeyboardInterrupt:
306 progress.update_item('Waiting for slaves to quit.', raw=True)
307 progress.update_item('')
309 progress.print_update()
310 slaves = [s for s in slaves if s.is_alive()]
311 # At this point, progress is not used anymore.
313 print('Ran for %.1fs.' % (time.time() - start))
315 results = events.queue
316 print_results(results, options.columns, options.buckets)
318 with open(options.dump, 'w') as f:
319 json.dump(results, f, separators=(',',':'))
323 if __name__ == '__main__':