2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
6 """Uploads a ton of stuff to isolateserver to test its handling.
8 Generates an histogram with the latencies to download a just uploaded file.
10 Note that it only looks at uploading and downloading and do not test
11 /content-gs/contains, which is datastore read bound.
23 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
25 sys.path.insert(0, ROOT_DIR)
27 from third_party import colorama
31 from utils import graph
32 from utils import threading_utils
35 class Randomness(object):
36 def __init__(self, random_pool_size=1024):
37 """Creates 1mb of random data in a pool in 1kb chunks."""
39 ''.join(chr(random.randrange(256)) for _ in xrange(1024))
40 for _ in xrange(random_pool_size)
44 """Returns a str containing random data from the pool of size |size|."""
45 chunks = int(size / 1024)
46 rest = size - (chunks*1024)
47 data = ''.join(random.choice(self.pool) for _ in xrange(chunks))
48 data += random.choice(self.pool)[:rest]
52 class Progress(threading_utils.Progress):
53 def _render_columns(self):
54 """Prints the size data as 'units'."""
56 str(self._columns[0]),
57 graph.to_units(self._columns[1]).rjust(6),
58 str(self._columns[2]),
60 max_len = max((len(columns_as_str[0]), len(columns_as_str[2])))
61 return '/'.join(i.rjust(max_len) for i in columns_as_str)
64 def print_results(results, columns, buckets):
65 delays = [i[0] for i in results if isinstance(i[0], float)]
66 failures = [i for i in results if not isinstance(i[0], float)]
67 sizes = [i[1] for i in results]
69 print('%sSIZES%s (bytes):' % (colorama.Fore.RED, colorama.Fore.RESET))
70 graph.print_histogram(
71 graph.generate_histogram(sizes, buckets), columns, '%d')
73 total_size = sum(sizes)
74 print('Total size : %s' % graph.to_units(total_size))
75 print('Total items : %d' % len(sizes))
76 print('Average size: %s' % graph.to_units(total_size / len(sizes)))
77 print('Largest item: %s' % graph.to_units(max(sizes)))
79 print('%sDELAYS%s (seconds):' % (colorama.Fore.RED, colorama.Fore.RESET))
80 graph.print_histogram(
81 graph.generate_histogram(delays, buckets), columns, '%.3f')
85 print('%sFAILURES%s:' % (colorama.Fore.RED, colorama.Fore.RESET))
87 '\n'.join(' %s (%s)' % (i[0], graph.to_units(i[1])) for i in failures))
90 def gen_size(mid_size):
91 """Interesting non-guassian distribution, to get a few very large files.
93 Found via guessing on Wikipedia. Module 'random' says it's threadsafe.
95 return int(random.gammavariate(3, 2) * mid_size / 4)
98 def send_and_receive(random_pool, storage, progress, size):
99 """Sends a random file and gets it back.
101 # TODO(maruel): Add a batching argument of value [1, 500] to batch requests.
103 Returns (delay, size)
105 # Create a file out of the pool.
109 isolateserver.BufferItem(random_pool.gen(size), False)
110 for _ in xrange(batch)
113 # len(_uploaded) may be < len(items) happen if the items is not random
114 # enough or value of --mid-size is very low compared to --items.
115 _uploaded = storage.upload_items(items)
119 cache = isolateserver.MemoryCache()
120 queue = isolateserver.FetchQueue(storage, cache)
122 queue.add(i.digest, i.size)
124 waiting = [i.digest for i in items]
126 waiting.remove(queue.wait(waiting))
128 expected = {i.digest: ''.join(i.content()) for i in items}
129 for d in cache.cached_set():
130 actual = cache.read(d)
131 assert expected.pop(d) == actual
132 assert not expected, expected
134 duration = max(0, time.time() - start)
135 except isolateserver.MappingError as e:
137 if isinstance(duration, float):
138 progress.update_item('', index=1, data=size)
140 progress.update_item('', index=1)
141 return (duration, size)
147 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
149 '-I', '--isolate-server',
150 metavar='URL', default='',
151 help='Isolate server to use')
153 '--namespace', default='temporary%d-gzip' % time.time(), metavar='XX',
154 help='Namespace to use on the server, default: %default')
156 '--threads', type='int', default=16, metavar='N',
157 help='Parallel worker threads to use, default:%default')
159 data_group = optparse.OptionGroup(parser, 'Amount of data')
161 data_group, '--items', default=0, help='Number of items to upload')
163 data_group, '--max-size', default=0,
164 help='Loop until this amount of data was transferred')
166 data_group, '--mid-size', default=100*1024,
167 help='Rough average size of each item, default:%default')
168 parser.add_option_group(data_group)
170 ui_group = optparse.OptionGroup(parser, 'Result histogram')
172 '--columns', type='int', default=graph.get_console_width(), metavar='N',
173 help='Width of histogram, default:%default')
175 '--buckets', type='int', default=20, metavar='N',
176 help='Number of histogram\'s buckets, default:%default')
177 parser.add_option_group(ui_group)
179 log_group = optparse.OptionGroup(parser, 'Logging')
180 log_group.add_option(
181 '--dump', metavar='FOO.JSON', help='Dumps to json file')
182 log_group.add_option(
183 '-v', '--verbose', action='store_true', help='Enable logging')
184 parser.add_option_group(log_group)
186 options, args = parser.parse_args()
188 logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL)
190 parser.error('Unsupported args: %s' % args)
191 if bool(options.max_size) == bool(options.items):
193 'Use one of --max-size or --items.\n'
194 ' Use --max-size if you want to run it until NN bytes where '
196 ' Otherwise use --items to run it for NN items.')
197 options.isolate_server = options.isolate_server.rstrip('/')
198 if not options.isolate_server:
199 parser.error('--isolate-server is required.')
202 ' - Using %d thread, items=%d, max-size=%d, mid-size=%d' % (
203 options.threads, options.items, options.max_size, options.mid_size))
207 random_pool = Randomness()
208 print(' - Generated pool after %.1fs' % (time.time() - start))
210 columns = [('index', 0), ('data', 0), ('size', options.items)]
211 progress = Progress(columns)
212 storage = isolateserver.get_storage(options.isolate_server, options.namespace)
213 do_item = functools.partial(
219 # TODO(maruel): Handle Ctrl-C should:
220 # - Stop adding tasks.
221 # - Stop scheduling tasks in ThreadPool.
222 # - Wait for the remaining ungoing tasks to complete.
223 # - Still print details and write the json file.
224 with threading_utils.ThreadPoolWithProgress(
225 progress, options.threads, options.threads, 0) as pool:
227 for _ in xrange(options.items):
228 pool.add_task(0, do_item, gen_size(options.mid_size))
229 progress.print_update()
230 elif options.max_size:
231 # This one is approximate.
234 size = gen_size(options.mid_size)
235 progress.update_item('', size=1)
236 progress.print_update()
237 pool.add_task(0, do_item, size)
239 if total >= options.max_size:
241 results = sorted(pool.join())
244 print(' - Took %.1fs.' % (time.time() - start))
246 print_results(results, options.columns, options.buckets)
248 with open(options.dump, 'w') as f:
249 json.dump(results, f, separators=(',',':'))
253 if __name__ == '__main__':