ef88ac3fdb2a454728d1cd87c99ce0034304360d
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / tests / isolateserver_test.py
1 #!/usr/bin/env python
2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
5
6 # pylint: disable=W0212,W0223,W0231,W0613
7
8 import hashlib
9 import json
10 import logging
11 import os
12 import shutil
13 import StringIO
14 import sys
15 import tempfile
16 import threading
17 import unittest
18 import urllib
19 import zlib
20
21 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
22 ROOT_DIR = os.path.dirname(TEST_DIR)
23 sys.path.insert(0, ROOT_DIR)
24 sys.path.insert(0, os.path.join(ROOT_DIR, 'third_party'))
25
26 from depot_tools import auto_stub
27 import isolateserver
28 import test_utils
29 from utils import threading_utils
30
31
32 ALGO = hashlib.sha1
33
34 # Tests here assume ALGO is used for default namespaces, check this assumption.
35 assert isolateserver.get_hash_algo('default') is ALGO
36 assert isolateserver.get_hash_algo('default-gzip') is ALGO
37
38
39 class TestCase(auto_stub.TestCase):
40   """Mocks out url_open() calls and sys.stdout/stderr."""
41   def setUp(self):
42     super(TestCase, self).setUp()
43     self.mock(isolateserver.auth, 'ensure_logged_in', lambda _: None)
44     self.mock(isolateserver.net, 'url_open', self._url_open)
45     self.mock(isolateserver.net, 'sleep_before_retry', lambda *_: None)
46     self._lock = threading.Lock()
47     self._requests = []
48     self.mock(sys, 'stdout', StringIO.StringIO())
49     self.mock(sys, 'stderr', StringIO.StringIO())
50
51   def tearDown(self):
52     try:
53       self.assertEqual([], self._requests)
54       self.checkOutput('', '')
55     finally:
56       super(TestCase, self).tearDown()
57
58   def checkOutput(self, expected_out, expected_err):
59     try:
60       self.assertEqual(expected_err, sys.stderr.getvalue())
61       self.assertEqual(expected_out, sys.stdout.getvalue())
62     finally:
63       # Prevent double-fail.
64       self.mock(sys, 'stdout', StringIO.StringIO())
65       self.mock(sys, 'stderr', StringIO.StringIO())
66
67   def _url_open(self, url, **kwargs):
68     logging.warn('url_open(%s, %s)', url[:500], str(kwargs)[:500])
69     with self._lock:
70       if not self._requests:
71         return None
72       # Ignore 'stream' argument, it's not important for these tests.
73       kwargs.pop('stream', None)
74       for i, n in enumerate(self._requests):
75         if n[0] == url:
76           _, expected_kwargs, result, headers = self._requests.pop(i)
77           self.assertEqual(expected_kwargs, kwargs)
78           if result is not None:
79             return isolateserver.net.HttpResponse.get_fake_response(
80                 result, url, headers)
81           return None
82     self.fail('Unknown request %s' % url)
83
84
85 class TestZipCompression(TestCase):
86   """Test zip_compress and zip_decompress generators."""
87
88   def test_compress_and_decompress(self):
89     """Test data === decompress(compress(data))."""
90     original = [str(x) for x in xrange(0, 1000)]
91     processed = isolateserver.zip_decompress(
92         isolateserver.zip_compress(original))
93     self.assertEqual(''.join(original), ''.join(processed))
94
95   def test_zip_bomb(self):
96     """Verify zip_decompress always returns small chunks."""
97     original = '\x00' * 100000
98     bomb = ''.join(isolateserver.zip_compress(original))
99     decompressed = []
100     chunk_size = 1000
101     for chunk in isolateserver.zip_decompress([bomb], chunk_size):
102       self.assertLessEqual(len(chunk), chunk_size)
103       decompressed.append(chunk)
104     self.assertEqual(original, ''.join(decompressed))
105
106   def test_bad_zip_file(self):
107     """Verify decompressing broken file raises IOError."""
108     with self.assertRaises(IOError):
109       ''.join(isolateserver.zip_decompress(['Im not a zip file']))
110
111
112 class FakeItem(isolateserver.Item):
113   def __init__(self, data, high_priority=False):
114     super(FakeItem, self).__init__(
115         ALGO(data).hexdigest(), len(data), high_priority)
116     self.data = data
117
118   def content(self):
119     return [self.data]
120
121   @property
122   def zipped(self):
123     return zlib.compress(self.data, self.compression_level)
124
125
126 class MockedStorageApi(isolateserver.StorageApi):
127   def __init__(
128       self, missing_hashes, push_side_effect=None, namespace='default'):
129     self.missing_hashes = missing_hashes
130     self.push_side_effect = push_side_effect
131     self.push_calls = []
132     self.contains_calls = []
133     self._namespace = namespace
134
135   @property
136   def namespace(self):
137     return self._namespace
138
139   def push(self, item, push_state, content=None):
140     content = ''.join(item.content() if content is None else content)
141     self.push_calls.append((item, push_state, content))
142     if self.push_side_effect:
143       self.push_side_effect()
144
145   def contains(self, items):
146     self.contains_calls.append(items)
147     missing = {}
148     for item in items:
149       if item.digest in self.missing_hashes:
150         missing[item] = self.missing_hashes[item.digest]
151     return missing
152
153
154 class StorageTest(TestCase):
155   """Tests for Storage methods."""
156
157   def assertEqualIgnoringOrder(self, a, b):
158     """Asserts that containers |a| and |b| contain same items."""
159     self.assertEqual(len(a), len(b))
160     self.assertEqual(set(a), set(b))
161
162   def get_push_state(self, storage, item):
163     missing = list(storage.get_missing_items([item]))
164     self.assertEqual(1, len(missing))
165     self.assertEqual(item, missing[0][0])
166     return missing[0][1]
167
168   def test_batch_items_for_check(self):
169     items = [
170       isolateserver.Item('foo', 12),
171       isolateserver.Item('blow', 0),
172       isolateserver.Item('bizz', 1222),
173       isolateserver.Item('buzz', 1223),
174     ]
175     expected = [
176       [items[3], items[2], items[0], items[1]],
177     ]
178     batches = list(isolateserver.batch_items_for_check(items))
179     self.assertEqual(batches, expected)
180
181   def test_get_missing_items(self):
182     items = [
183       isolateserver.Item('foo', 12),
184       isolateserver.Item('blow', 0),
185       isolateserver.Item('bizz', 1222),
186       isolateserver.Item('buzz', 1223),
187     ]
188     missing = {
189       items[2]: 123,
190       items[3]: 456,
191     }
192
193     storage_api = MockedStorageApi(
194         {item.digest: push_state for item, push_state in missing.iteritems()})
195     storage = isolateserver.Storage(storage_api)
196
197     # 'get_missing_items' is a generator yielding pairs, materialize its
198     # result in a dict.
199     result = dict(storage.get_missing_items(items))
200     self.assertEqual(missing, result)
201
202   def test_async_push(self):
203     for use_zip in (False, True):
204       item = FakeItem('1234567')
205       storage_api = MockedStorageApi(
206           {item.digest: 'push_state'},
207           namespace='default-gzip' if use_zip else 'default')
208       storage = isolateserver.Storage(storage_api)
209       channel = threading_utils.TaskChannel()
210       storage.async_push(channel, item, self.get_push_state(storage, item))
211       # Wait for push to finish.
212       pushed_item = channel.pull()
213       self.assertEqual(item, pushed_item)
214       # StorageApi.push was called with correct arguments.
215       self.assertEqual(
216           [(item, 'push_state', item.zipped if use_zip else item.data)],
217           storage_api.push_calls)
218
219   def test_async_push_generator_errors(self):
220     class FakeException(Exception):
221       pass
222
223     def faulty_generator():
224       yield 'Hi!'
225       raise FakeException('fake exception')
226
227     for use_zip in (False, True):
228       item = FakeItem('')
229       self.mock(item, 'content', faulty_generator)
230       storage_api = MockedStorageApi(
231           {item.digest: 'push_state'},
232           namespace='default-gzip' if use_zip else 'default')
233       storage = isolateserver.Storage(storage_api)
234       channel = threading_utils.TaskChannel()
235       storage.async_push(channel, item, self.get_push_state(storage, item))
236       with self.assertRaises(FakeException):
237         channel.pull()
238       # StorageApi's push should never complete when data can not be read.
239       self.assertEqual(0, len(storage_api.push_calls))
240
241   def test_async_push_upload_errors(self):
242     chunk = 'data_chunk'
243
244     def _generator():
245       yield chunk
246
247     def push_side_effect():
248       raise IOError('Nope')
249
250     # TODO(vadimsh): Retrying push when fetching data from a generator is
251     # broken now (it reuses same generator instance when retrying).
252     content_sources = (
253         # generator(),
254         lambda: [chunk],
255     )
256
257     for use_zip in (False, True):
258       for source in content_sources:
259         item = FakeItem(chunk)
260         self.mock(item, 'content', source)
261         storage_api = MockedStorageApi(
262             {item.digest: 'push_state'},
263             push_side_effect,
264             namespace='default-gzip' if use_zip else 'default')
265         storage = isolateserver.Storage(storage_api)
266         channel = threading_utils.TaskChannel()
267         storage.async_push(channel, item, self.get_push_state(storage, item))
268         with self.assertRaises(IOError):
269           channel.pull()
270         # First initial attempt + all retries.
271         attempts = 1 + isolateserver.WorkerPool.RETRIES
272         # Single push attempt call arguments.
273         expected_push = (
274             item, 'push_state', item.zipped if use_zip else item.data)
275         # Ensure all pushes are attempted.
276         self.assertEqual(
277             [expected_push] * attempts, storage_api.push_calls)
278
279   def test_upload_tree(self):
280     root = 'root'
281     files = {
282       'a': {
283         's': 100,
284         'h': 'hash_a',
285       },
286       'b': {
287         's': 200,
288         'h': 'hash_b',
289       },
290       'c': {
291         's': 300,
292         'h': 'hash_c',
293       },
294       'a_copy': {
295         's': 100,
296         'h': 'hash_a',
297       },
298     }
299     files_data = dict((k, 'x' * files[k]['s']) for k in files)
300     all_hashes = set(f['h'] for f in files.itervalues())
301     missing_hashes = {'hash_a': 'push a', 'hash_b': 'push b'}
302
303     # Files read by mocked_file_read.
304     read_calls = []
305
306     def mocked_file_read(filepath, chunk_size=0, offset=0):
307       self.assertEqual(root, os.path.dirname(filepath))
308       filename = os.path.basename(filepath)
309       self.assertIn(filename, files_data)
310       read_calls.append(filename)
311       return files_data[filename]
312     self.mock(isolateserver, 'file_read', mocked_file_read)
313
314     storage_api = MockedStorageApi(missing_hashes)
315     storage = isolateserver.Storage(storage_api)
316     def mock_get_storage(base_url, namespace):
317       self.assertEqual('base_url', base_url)
318       self.assertEqual('some-namespace', namespace)
319       return storage
320     self.mock(isolateserver, 'get_storage', mock_get_storage)
321
322     isolateserver.upload_tree('base_url', root, files, 'some-namespace')
323
324     # Was reading only missing files.
325     self.assertEqualIgnoringOrder(
326         missing_hashes,
327         [files[path]['h'] for path in read_calls])
328     # 'contains' checked for existence of all files.
329     self.assertEqualIgnoringOrder(
330         all_hashes,
331         [i.digest for i in sum(storage_api.contains_calls, [])])
332     # Pushed only missing files.
333     self.assertEqualIgnoringOrder(
334         missing_hashes,
335         [call[0].digest for call in storage_api.push_calls])
336     # Pushing with correct data, size and push state.
337     for pushed_item, push_state, pushed_content in storage_api.push_calls:
338       filenames = [
339           name for name, metadata in files.iteritems()
340           if metadata['h'] == pushed_item.digest
341       ]
342       # If there are multiple files that map to same hash, upload_tree chooses
343       # a first one.
344       filename = filenames[0]
345       self.assertEqual(os.path.join(root, filename), pushed_item.path)
346       self.assertEqual(files_data[filename], pushed_content)
347       self.assertEqual(missing_hashes[pushed_item.digest], push_state)
348
349
350 class IsolateServerStorageApiTest(TestCase):
351   @staticmethod
352   def mock_handshake_request(server, token='fake token', error=None):
353     handshake_request = {
354       'client_app_version': isolateserver.__version__,
355       'fetcher': True,
356       'protocol_version': isolateserver.ISOLATE_PROTOCOL_VERSION,
357       'pusher': True,
358     }
359     handshake_response = {
360       'access_token': token,
361       'error': error,
362       'protocol_version': isolateserver.ISOLATE_PROTOCOL_VERSION,
363       'server_app_version': 'mocked server T1000',
364     }
365     return (
366       server + '/content-gs/handshake',
367       {
368         'content_type': 'application/json',
369         'method': 'POST',
370         'data': json.dumps(handshake_request, separators=(',', ':')),
371       },
372       json.dumps(handshake_response),
373       None,
374     )
375
376   @staticmethod
377   def mock_fetch_request(server, namespace, item, data,
378                          request_headers=None, response_headers=None):
379     return (
380       server + '/content-gs/retrieve/%s/%s' % (namespace, item),
381       {
382         'read_timeout': 60,
383         'headers': request_headers,
384       },
385       data,
386       response_headers,
387     )
388
389   @staticmethod
390   def mock_contains_request(server, namespace, token, request, response):
391     url = server + '/content-gs/pre-upload/%s?token=%s' % (
392         namespace, urllib.quote(token))
393     return (
394       url,
395       {
396         'data': json.dumps(request, separators=(',', ':')),
397         'content_type': 'application/json',
398         'method': 'POST',
399       },
400       json.dumps(response),
401       None,
402     )
403
404   def test_server_capabilities_success(self):
405     server = 'http://example.com'
406     namespace = 'default'
407     access_token = 'fake token'
408     self._requests = [
409       self.mock_handshake_request(server, access_token),
410     ]
411     storage = isolateserver.IsolateServer(server, namespace)
412     caps = storage._server_capabilities
413     self.assertEqual(access_token, caps['access_token'])
414
415   def test_server_capabilities_network_failure(self):
416     self.mock(isolateserver.net, 'url_open', lambda *_args, **_kwargs: None)
417     with self.assertRaises(isolateserver.MappingError):
418       storage = isolateserver.IsolateServer('http://example.com', 'default')
419       _ = storage._server_capabilities
420
421   def test_server_capabilities_format_failure(self):
422     server = 'http://example.com'
423     namespace = 'default'
424     handshake_req = self.mock_handshake_request(server)
425     self._requests = [
426       (handshake_req[0], handshake_req[1], 'Im a bad response', None),
427     ]
428     storage = isolateserver.IsolateServer(server, namespace)
429     with self.assertRaises(isolateserver.MappingError):
430       _ = storage._server_capabilities
431
432   def test_server_capabilities_respects_error(self):
433     server = 'http://example.com'
434     namespace = 'default'
435     error = 'Im sorry, Dave. Im afraid I cant do that.'
436     self._requests = [
437       self.mock_handshake_request(server, error=error)
438     ]
439     storage = isolateserver.IsolateServer(server, namespace)
440     with self.assertRaises(isolateserver.MappingError) as context:
441       _ = storage._server_capabilities
442     # Server error message should be reported to user.
443     self.assertIn(error, str(context.exception))
444
445   def test_fetch_success(self):
446     server = 'http://example.com'
447     namespace = 'default'
448     data = ''.join(str(x) for x in xrange(1000))
449     item = ALGO(data).hexdigest()
450     self._requests = [
451       self.mock_fetch_request(server, namespace, item, data),
452     ]
453     storage = isolateserver.IsolateServer(server, namespace)
454     fetched = ''.join(storage.fetch(item))
455     self.assertEqual(data, fetched)
456
457   def test_fetch_failure(self):
458     server = 'http://example.com'
459     namespace = 'default'
460     item = ALGO('something').hexdigest()
461     self._requests = [
462       self.mock_fetch_request(server, namespace, item, None),
463     ]
464     storage = isolateserver.IsolateServer(server, namespace)
465     with self.assertRaises(IOError):
466       _ = ''.join(storage.fetch(item))
467
468   def test_fetch_offset_success(self):
469     server = 'http://example.com'
470     namespace = 'default'
471     data = ''.join(str(x) for x in xrange(1000))
472     item = ALGO(data).hexdigest()
473     offset = 200
474     size = len(data)
475
476     good_content_range_headers = [
477       'bytes %d-%d/%d' % (offset, size - 1, size),
478       'bytes %d-%d/*' % (offset, size - 1),
479     ]
480
481     for content_range_header in good_content_range_headers:
482       self._requests = [
483         self.mock_fetch_request(
484             server, namespace, item, data[offset:],
485             request_headers={'Range': 'bytes=%d-' % offset},
486             response_headers={'Content-Range': content_range_header}),
487       ]
488       storage = isolateserver.IsolateServer(server, namespace)
489       fetched = ''.join(storage.fetch(item, offset))
490       self.assertEqual(data[offset:], fetched)
491
492   def test_fetch_offset_bad_header(self):
493     server = 'http://example.com'
494     namespace = 'default'
495     data = ''.join(str(x) for x in xrange(1000))
496     item = ALGO(data).hexdigest()
497     offset = 200
498     size = len(data)
499
500     bad_content_range_headers = [
501       # Missing header.
502       None,
503       '',
504       # Bad format.
505       'not bytes %d-%d/%d' % (offset, size - 1, size),
506       'bytes %d-%d' % (offset, size - 1),
507       # Bad offset.
508       'bytes %d-%d/%d' % (offset - 1, size - 1, size),
509       # Incomplete chunk.
510       'bytes %d-%d/%d' % (offset, offset + 10, size),
511     ]
512
513     for content_range_header in bad_content_range_headers:
514       self._requests = [
515         self.mock_fetch_request(
516             server, namespace, item, data[offset:],
517             request_headers={'Range': 'bytes=%d-' % offset},
518             response_headers={'Content-Range': content_range_header}),
519       ]
520       storage = isolateserver.IsolateServer(server, namespace)
521       with self.assertRaises(IOError):
522         _ = ''.join(storage.fetch(item, offset))
523
524
525   def test_push_success(self):
526     server = 'http://example.com'
527     namespace = 'default'
528     token = 'fake token'
529     data = ''.join(str(x) for x in xrange(1000))
530     item = FakeItem(data)
531     push_urls = (server + '/push_here', server + '/call_this')
532     contains_request = [{'h': item.digest, 's': item.size, 'i': 0}]
533     contains_response = [push_urls]
534     self._requests = [
535       self.mock_handshake_request(server, token),
536       self.mock_contains_request(
537           server, namespace, token, contains_request, contains_response),
538       (
539         push_urls[0],
540         {
541           'data': data,
542           'content_type': 'application/octet-stream',
543           'method': 'PUT',
544         },
545         '',
546         None,
547       ),
548       (
549         push_urls[1],
550         {
551           'data': '',
552           'content_type': 'application/json',
553           'method': 'POST',
554         },
555         '',
556         None,
557       ),
558     ]
559     storage = isolateserver.IsolateServer(server, namespace)
560     missing = storage.contains([item])
561     self.assertEqual([item], missing.keys())
562     push_state = missing[item]
563     storage.push(item, push_state, [data])
564     self.assertTrue(push_state.uploaded)
565     self.assertTrue(push_state.finalized)
566
567   def test_push_failure_upload(self):
568     server = 'http://example.com'
569     namespace = 'default'
570     token = 'fake token'
571     data = ''.join(str(x) for x in xrange(1000))
572     item = FakeItem(data)
573     push_urls = (server + '/push_here', server + '/call_this')
574     contains_request = [{'h': item.digest, 's': item.size, 'i': 0}]
575     contains_response = [push_urls]
576     self._requests = [
577       self.mock_handshake_request(server, token),
578       self.mock_contains_request(
579           server, namespace, token, contains_request, contains_response),
580       (
581         push_urls[0],
582         {
583           'data': data,
584           'content_type': 'application/octet-stream',
585           'method': 'PUT',
586         },
587         None,
588         None,
589       ),
590     ]
591     storage = isolateserver.IsolateServer(server, namespace)
592     missing = storage.contains([item])
593     self.assertEqual([item], missing.keys())
594     push_state = missing[item]
595     with self.assertRaises(IOError):
596       storage.push(item, push_state, [data])
597     self.assertFalse(push_state.uploaded)
598     self.assertFalse(push_state.finalized)
599
600   def test_push_failure_finalize(self):
601     server = 'http://example.com'
602     namespace = 'default'
603     token = 'fake token'
604     data = ''.join(str(x) for x in xrange(1000))
605     item = FakeItem(data)
606     push_urls = (server + '/push_here', server + '/call_this')
607     contains_request = [{'h': item.digest, 's': item.size, 'i': 0}]
608     contains_response = [push_urls]
609     self._requests = [
610       self.mock_handshake_request(server, token),
611       self.mock_contains_request(
612           server, namespace, token, contains_request, contains_response),
613       (
614         push_urls[0],
615         {
616           'data': data,
617           'content_type': 'application/octet-stream',
618           'method': 'PUT',
619         },
620         '',
621         None,
622       ),
623       (
624         push_urls[1],
625         {
626           'data': '',
627           'content_type': 'application/json',
628           'method': 'POST',
629         },
630         None,
631         None,
632       ),
633     ]
634     storage = isolateserver.IsolateServer(server, namespace)
635     missing = storage.contains([item])
636     self.assertEqual([item], missing.keys())
637     push_state = missing[item]
638     with self.assertRaises(IOError):
639       storage.push(item, push_state, [data])
640     self.assertTrue(push_state.uploaded)
641     self.assertFalse(push_state.finalized)
642
643   def test_contains_success(self):
644     server = 'http://example.com'
645     namespace = 'default'
646     token = 'fake token'
647     files = [
648       FakeItem('1', high_priority=True),
649       FakeItem('2' * 100),
650       FakeItem('3' * 200),
651     ]
652     request = [
653       {'h': files[0].digest, 's': files[0].size, 'i': 1},
654       {'h': files[1].digest, 's': files[1].size, 'i': 0},
655       {'h': files[2].digest, 's': files[2].size, 'i': 0},
656     ]
657     response = [
658       None,
659       ['http://example/upload_here_1', None],
660       ['http://example/upload_here_2', 'http://example/call_this'],
661     ]
662     missing = [
663       files[1],
664       files[2],
665     ]
666     self._requests = [
667       self.mock_handshake_request(server, token),
668       self.mock_contains_request(server, namespace, token, request, response),
669     ]
670     storage = isolateserver.IsolateServer(server, namespace)
671     result = storage.contains(files)
672     self.assertEqual(set(missing), set(result.keys()))
673     self.assertEqual(
674         [x for x in response if x],
675         [[result[i].upload_url, result[i].finalize_url] for i in missing])
676
677   def test_contains_network_failure(self):
678     server = 'http://example.com'
679     namespace = 'default'
680     token = 'fake token'
681     req = self.mock_contains_request(server, namespace, token, [], [])
682     self._requests = [
683       self.mock_handshake_request(server, token),
684       (req[0], req[1], None, None),
685     ]
686     storage = isolateserver.IsolateServer(server, namespace)
687     with self.assertRaises(isolateserver.MappingError):
688       storage.contains([])
689
690   def test_contains_format_failure(self):
691     server = 'http://example.com'
692     namespace = 'default'
693     token = 'fake token'
694     self._requests = [
695       self.mock_handshake_request(server, token),
696       self.mock_contains_request(server, namespace, token, [], [1, 2, 3])
697     ]
698     storage = isolateserver.IsolateServer(server, namespace)
699     with self.assertRaises(isolateserver.MappingError):
700       storage.contains([])
701
702
703 class IsolateServerStorageSmokeTest(unittest.TestCase):
704   """Tests public API of Storage class using file system as a store."""
705
706   def setUp(self):
707     super(IsolateServerStorageSmokeTest, self).setUp()
708     self.rootdir = tempfile.mkdtemp(prefix='isolateserver')
709
710   def tearDown(self):
711     try:
712       shutil.rmtree(self.rootdir)
713     finally:
714       super(IsolateServerStorageSmokeTest, self).tearDown()
715
716   def run_synchronous_push_test(self, namespace):
717     storage = isolateserver.get_storage(self.rootdir, namespace)
718
719     # Items to upload.
720     items = [isolateserver.BufferItem('item %d' % i) for i in xrange(10)]
721
722     # Storage is empty, all items are missing.
723     missing = dict(storage.get_missing_items(items))
724     self.assertEqual(set(items), set(missing))
725
726     # Push, one by one.
727     for item, push_state in missing.iteritems():
728       storage.push(item, push_state)
729
730     # All items are there now.
731     self.assertFalse(dict(storage.get_missing_items(items)))
732
733   def test_synchronous_push(self):
734     self.run_synchronous_push_test('default')
735
736   def test_synchronous_push_gzip(self):
737     self.run_synchronous_push_test('default-gzip')
738
739   def run_upload_items_test(self, namespace):
740     storage = isolateserver.get_storage(self.rootdir, namespace)
741
742     # Items to upload.
743     items = [isolateserver.BufferItem('item %d' % i) for i in xrange(10)]
744
745     # Do it.
746     uploaded = storage.upload_items(items)
747     self.assertEqual(set(items), set(uploaded))
748
749     # All items are there now.
750     self.assertFalse(dict(storage.get_missing_items(items)))
751
752     # Now ensure upload_items skips existing items.
753     more = [isolateserver.BufferItem('more item %d' % i) for i in xrange(10)]
754
755     # Uploaded only |more|.
756     uploaded = storage.upload_items(items + more)
757     self.assertEqual(set(more), set(uploaded))
758
759   def test_upload_items(self):
760     self.run_upload_items_test('default')
761
762   def test_upload_items_gzip(self):
763     self.run_upload_items_test('default-gzip')
764
765   def run_push_and_fetch_test(self, namespace):
766     storage = isolateserver.get_storage(self.rootdir, namespace)
767
768     # Upload items.
769     items = [isolateserver.BufferItem('item %d' % i) for i in xrange(10)]
770     uploaded = storage.upload_items(items)
771     self.assertEqual(set(items), set(uploaded))
772
773     # Fetch them all back into local memory cache.
774     cache = isolateserver.MemoryCache()
775     queue = isolateserver.FetchQueue(storage, cache)
776
777     # Start fetching.
778     pending = set()
779     for item in items:
780       pending.add(item.digest)
781       queue.add(item.digest)
782
783     # Wait for fetch to complete.
784     while pending:
785       fetched = queue.wait(pending)
786       pending.discard(fetched)
787
788     # Ensure fetched same data as was pushed.
789     self.assertEqual(
790         [i.buffer for i in items],
791         [cache.read(i.digest) for i in items])
792
793   def test_push_and_fetch(self):
794     self.run_push_and_fetch_test('default')
795
796   def test_push_and_fetch_gzip(self):
797     self.run_push_and_fetch_test('default-gzip')
798
799
800 class IsolateServerDownloadTest(TestCase):
801   tempdir = None
802
803   def tearDown(self):
804     try:
805       if self.tempdir:
806         shutil.rmtree(self.tempdir)
807     finally:
808       super(IsolateServerDownloadTest, self).tearDown()
809
810   def test_download_two_files(self):
811     # Test downloading two files.
812     actual = {}
813     def out(key, generator):
814       actual[key] = ''.join(generator)
815     self.mock(isolateserver, 'file_write', out)
816     server = 'http://example.com'
817     self._requests = [
818       (
819         server + '/content-gs/retrieve/default-gzip/sha-1',
820         {'read_timeout': 60, 'headers': None},
821         zlib.compress('Coucou'),
822         None,
823       ),
824       (
825         server + '/content-gs/retrieve/default-gzip/sha-2',
826         {'read_timeout': 60, 'headers': None},
827         zlib.compress('Bye Bye'),
828         None,
829       ),
830     ]
831     cmd = [
832       'download',
833       '--isolate-server', server,
834       '--target', ROOT_DIR,
835       '--file', 'sha-1', 'path/to/a',
836       '--file', 'sha-2', 'path/to/b',
837     ]
838     self.assertEqual(0, isolateserver.main(cmd))
839     expected = {
840       os.path.join(ROOT_DIR, 'path/to/a'): 'Coucou',
841       os.path.join(ROOT_DIR, 'path/to/b'): 'Bye Bye',
842     }
843     self.assertEqual(expected, actual)
844
845   def test_download_isolated(self):
846     # Test downloading an isolated tree.
847     self.tempdir = tempfile.mkdtemp(prefix='isolateserver')
848     actual = {}
849     def file_write_mock(key, generator):
850       actual[key] = ''.join(generator)
851     self.mock(isolateserver, 'file_write', file_write_mock)
852     self.mock(os, 'makedirs', lambda _: None)
853     server = 'http://example.com'
854
855     files = {
856       os.path.join('a', 'foo'): 'Content',
857       'b': 'More content',
858       }
859     isolated = {
860       'command': ['Absurb', 'command'],
861       'relative_cwd': 'a',
862       'files': dict(
863           (k, {'h': ALGO(v).hexdigest(), 's': len(v)})
864           for k, v in files.iteritems()),
865       'version': isolateserver.ISOLATED_FILE_VERSION,
866     }
867     isolated_data = json.dumps(isolated, sort_keys=True, separators=(',',':'))
868     isolated_hash = ALGO(isolated_data).hexdigest()
869     requests = [(v['h'], files[k]) for k, v in isolated['files'].iteritems()]
870     requests.append((isolated_hash, isolated_data))
871     self._requests = [
872       (
873         server + '/content-gs/retrieve/default-gzip/' + h,
874         {
875           'read_timeout': isolateserver.DOWNLOAD_READ_TIMEOUT,
876           'headers': None,
877         },
878         zlib.compress(v),
879         None,
880       ) for h, v in requests
881     ]
882     cmd = [
883       'download',
884       '--isolate-server', server,
885       '--target', self.tempdir,
886       '--isolated', isolated_hash,
887     ]
888     self.assertEqual(0, isolateserver.main(cmd))
889     expected = dict(
890         (os.path.join(self.tempdir, k), v) for k, v in files.iteritems())
891     self.assertEqual(expected, actual)
892     expected_stdout = (
893         'To run this test please run from the directory %s:\n  Absurb command\n'
894         % os.path.join(self.tempdir, 'a'))
895     self.checkOutput(expected_stdout, '')
896
897
898 class TestIsolated(auto_stub.TestCase):
899   def test_load_isolated_empty(self):
900     m = isolateserver.load_isolated('{}', ALGO)
901     self.assertEqual({}, m)
902
903   def test_load_isolated_good(self):
904     data = {
905       u'command': [u'foo', u'bar'],
906       u'files': {
907         u'a': {
908           u'l': u'somewhere',
909         },
910         u'b': {
911           u'm': 123,
912           u'h': u'0123456789abcdef0123456789abcdef01234567',
913           u's': 3,
914         }
915       },
916       u'includes': [u'0123456789abcdef0123456789abcdef01234567'],
917       u'read_only': 1,
918       u'relative_cwd': u'somewhere_else',
919       u'version': isolateserver.ISOLATED_FILE_VERSION,
920     }
921     m = isolateserver.load_isolated(json.dumps(data), ALGO)
922     self.assertEqual(data, m)
923
924   def test_load_isolated_bad(self):
925     data = {
926       u'files': {
927         u'a': {
928           u'l': u'somewhere',
929           u'h': u'0123456789abcdef0123456789abcdef01234567'
930         }
931       },
932       u'version': isolateserver.ISOLATED_FILE_VERSION,
933     }
934     try:
935       isolateserver.load_isolated(json.dumps(data), ALGO)
936       self.fail()
937     except isolateserver.ConfigError:
938       pass
939
940   def test_load_isolated_os_only(self):
941     # Tolerate 'os' on older version.
942     data = {
943       u'os': 'HP/UX',
944       u'version': '1.3',
945     }
946     m = isolateserver.load_isolated(json.dumps(data), ALGO)
947     self.assertEqual(data, m)
948
949   def test_load_isolated_os_only_bad(self):
950     data = {
951       u'os': 'HP/UX',
952       u'version': isolateserver.ISOLATED_FILE_VERSION,
953     }
954     with self.assertRaises(isolateserver.ConfigError):
955       isolateserver.load_isolated(json.dumps(data), ALGO)
956
957   def test_load_isolated_path(self):
958     # Automatically convert the path case.
959     wrong_path_sep = u'\\' if os.path.sep == '/' else u'/'
960     def gen_data(path_sep):
961       return {
962         u'command': [u'foo', u'bar'],
963         u'files': {
964           path_sep.join(('a', 'b')): {
965             u'l': path_sep.join(('..', 'somewhere')),
966           },
967         },
968         u'relative_cwd': path_sep.join(('somewhere', 'else')),
969         u'version': isolateserver.ISOLATED_FILE_VERSION,
970       }
971
972     data = gen_data(wrong_path_sep)
973     actual = isolateserver.load_isolated(json.dumps(data), ALGO)
974     expected = gen_data(os.path.sep)
975     self.assertEqual(expected, actual)
976
977   def test_save_isolated_good_long_size(self):
978     calls = []
979     self.mock(isolateserver.tools, 'write_json', lambda *x: calls.append(x))
980     data = {
981       u'algo': 'sha-1',
982       u'files': {
983         u'b': {
984           u'm': 123,
985           u'h': u'0123456789abcdef0123456789abcdef01234567',
986           u's': 2181582786L,
987         }
988       },
989     }
990     m = isolateserver.save_isolated('foo', data)
991     self.assertEqual([], m)
992     self.assertEqual([('foo', data, True)], calls)
993
994
995 class SymlinkTest(unittest.TestCase):
996   def setUp(self):
997     super(SymlinkTest, self).setUp()
998     self.old_cwd = os.getcwd()
999     self.cwd = tempfile.mkdtemp(prefix='isolate_')
1000     # Everything should work even from another directory.
1001     os.chdir(self.cwd)
1002
1003   def tearDown(self):
1004     try:
1005       os.chdir(self.old_cwd)
1006       shutil.rmtree(self.cwd)
1007     finally:
1008       super(SymlinkTest, self).tearDown()
1009
1010   if sys.platform == 'darwin':
1011     def test_expand_symlinks_path_case(self):
1012       # Ensures that the resulting path case is fixed on case insensitive file
1013       # system.
1014       os.symlink('dest', os.path.join(self.cwd, 'link'))
1015       os.mkdir(os.path.join(self.cwd, 'Dest'))
1016       open(os.path.join(self.cwd, 'Dest', 'file.txt'), 'w').close()
1017
1018       result = isolateserver.expand_symlinks(unicode(self.cwd), 'link')
1019       self.assertEqual((u'Dest', [u'link']), result)
1020       result = isolateserver.expand_symlinks(unicode(self.cwd), 'link/File.txt')
1021       self.assertEqual((u'Dest/file.txt', [u'link']), result)
1022
1023     def test_expand_directories_and_symlinks_path_case(self):
1024       # Ensures that the resulting path case is fixed on case insensitive file
1025       # system. A superset of test_expand_symlinks_path_case.
1026       # Create *all* the paths with the wrong path case.
1027       basedir = os.path.join(self.cwd, 'baseDir')
1028       os.mkdir(basedir.lower())
1029       subdir = os.path.join(basedir, 'subDir')
1030       os.mkdir(subdir.lower())
1031       open(os.path.join(subdir, 'Foo.txt'), 'w').close()
1032       os.symlink('subDir', os.path.join(basedir, 'linkdir'))
1033       actual = isolateserver.expand_directories_and_symlinks(
1034           unicode(self.cwd), [u'baseDir/'], lambda _: None, True, False)
1035       expected = [
1036         u'basedir/linkdir',
1037         u'basedir/subdir/Foo.txt',
1038         u'basedir/subdir/Foo.txt',
1039       ]
1040       self.assertEqual(expected, actual)
1041
1042     def test_process_input_path_case_simple(self):
1043       # Ensure the symlink dest is saved in the right path case.
1044       subdir = os.path.join(self.cwd, 'subdir')
1045       os.mkdir(subdir)
1046       linkdir = os.path.join(self.cwd, 'linkdir')
1047       os.symlink('subDir', linkdir)
1048       actual = isolateserver.process_input(
1049           unicode(linkdir.upper()), {}, True, ALGO)
1050       expected = {'l': u'subdir', 'm': 360, 't': int(os.stat(linkdir).st_mtime)}
1051       self.assertEqual(expected, actual)
1052
1053     def test_process_input_path_case_complex(self):
1054       # Ensure the symlink dest is saved in the right path case. This includes 2
1055       # layers of symlinks.
1056       basedir = os.path.join(self.cwd, 'basebir')
1057       os.mkdir(basedir)
1058
1059       linkeddir2 = os.path.join(self.cwd, 'linkeddir2')
1060       os.mkdir(linkeddir2)
1061
1062       linkeddir1 = os.path.join(basedir, 'linkeddir1')
1063       os.symlink('../linkedDir2', linkeddir1)
1064
1065       subsymlinkdir = os.path.join(basedir, 'symlinkdir')
1066       os.symlink('linkedDir1', subsymlinkdir)
1067
1068       actual = isolateserver.process_input(
1069           unicode(subsymlinkdir.upper()), {}, True, ALGO)
1070       expected = {
1071         'l': u'linkeddir1', 'm': 360, 't': int(os.stat(subsymlinkdir).st_mtime),
1072       }
1073       self.assertEqual(expected, actual)
1074
1075       actual = isolateserver.process_input(
1076           unicode(linkeddir1.upper()), {}, True, ALGO)
1077       expected = {
1078         'l': u'../linkeddir2', 'm': 360, 't': int(os.stat(linkeddir1).st_mtime),
1079       }
1080       self.assertEqual(expected, actual)
1081
1082   if sys.platform != 'win32':
1083     def test_symlink_input_absolute_path(self):
1084       # A symlink is outside of the checkout, it should be treated as a normal
1085       # directory.
1086       # .../src
1087       # .../src/out -> .../tmp/foo
1088       # .../tmp
1089       # .../tmp/foo
1090       src = os.path.join(self.cwd, u'src')
1091       src_out = os.path.join(src, 'out')
1092       tmp = os.path.join(self.cwd, 'tmp')
1093       tmp_foo = os.path.join(tmp, 'foo')
1094       os.mkdir(src)
1095       os.mkdir(tmp)
1096       os.mkdir(tmp_foo)
1097       # The problem was that it's an absolute path, so it must be considered a
1098       # normal directory.
1099       os.symlink(tmp, src_out)
1100       open(os.path.join(tmp_foo, 'bar.txt'), 'w').close()
1101       actual = isolateserver.expand_symlinks(src, u'out/foo/bar.txt')
1102       self.assertEqual((u'out/foo/bar.txt', []), actual)
1103
1104
1105 def get_storage(_isolate_server, namespace):
1106   class StorageFake(object):
1107     def __enter__(self, *_):
1108       return self
1109
1110     def __exit__(self, *_):
1111       pass
1112
1113     @property
1114     def hash_algo(self):  # pylint: disable=R0201
1115       return isolateserver.get_hash_algo(namespace)
1116
1117     @staticmethod
1118     def upload_items(items):
1119       # Always returns the second item as not present.
1120       return [items[1]]
1121   return StorageFake()
1122
1123
1124 class TestArchive(TestCase):
1125   @staticmethod
1126   def get_isolateserver_prog():
1127     """Returns 'isolateserver.py' or 'isolateserver.pyc'."""
1128     return os.path.basename(sys.modules[isolateserver.__name__].__file__)
1129
1130   def test_archive_no_server(self):
1131     with self.assertRaises(SystemExit):
1132       isolateserver.main(['archive', '.'])
1133     prog = self.get_isolateserver_prog()
1134     self.checkOutput(
1135         '',
1136         'Usage: %(prog)s archive [options] <file1..fileN> or - to read '
1137         'from stdin\n\n'
1138         '%(prog)s: error: --isolate-server is required.\n' % {'prog': prog})
1139
1140   def test_archive_duplicates(self):
1141     with self.assertRaises(SystemExit):
1142       isolateserver.main(
1143           [
1144             'archive', '--isolate-server', 'https://localhost:1',
1145             # Effective dupes.
1146             '.', os.getcwd(),
1147           ])
1148     prog = self.get_isolateserver_prog()
1149     self.checkOutput(
1150         '',
1151         'Usage: %(prog)s archive [options] <file1..fileN> or - to read '
1152         'from stdin\n\n'
1153         '%(prog)s: error: Duplicate entries found.\n' % {'prog': prog})
1154
1155   def test_archive_files(self):
1156     old_cwd = os.getcwd()
1157     try:
1158       os.chdir(os.path.join(TEST_DIR, 'isolateserver'))
1159       self.mock(isolateserver, 'get_storage', get_storage)
1160       f = ['empty_file.txt', 'small_file.txt']
1161       isolateserver.main(
1162           ['archive', '--isolate-server', 'https://localhost:1'] + f)
1163       self.checkOutput(
1164           'da39a3ee5e6b4b0d3255bfef95601890afd80709 empty_file.txt\n'
1165           '0491bd1da8087ad10fcdd7c9634e308804b72158 small_file.txt\n',
1166           '')
1167     finally:
1168       os.chdir(old_cwd)
1169
1170   def help_test_archive(self, cmd_line_prefix):
1171     old_cwd = os.getcwd()
1172     try:
1173       os.chdir(ROOT_DIR)
1174       self.mock(isolateserver, 'get_storage', get_storage)
1175       p = os.path.join(TEST_DIR, 'isolateserver')
1176       isolateserver.main(cmd_line_prefix + [p])
1177       # TODO(maruel): The problem here is that the test depends on the file mode
1178       # of the files in this directory.
1179       # Fix is to copy the files in a temporary directory with known file modes.
1180       #
1181       # If you modify isolateserver.ISOLATED_FILE_VERSION, you'll have to update
1182       # the hash below. Sorry about that.
1183       self.checkOutput(
1184           '1501166255279df1509408567340798d1cf089e7 %s\n' % p,
1185           '')
1186     finally:
1187       os.chdir(old_cwd)
1188
1189   def test_archive_directory(self):
1190     self.help_test_archive(['archive', '--isolate-server',
1191                             'https://localhost:1'])
1192
1193   def test_archive_directory_envvar(self):
1194     with test_utils.EnvVars({'ISOLATE_SERVER': 'https://localhost:1'}):
1195       self.help_test_archive(['archive'])
1196
1197
1198 class OptionsTest(unittest.TestCase):
1199   def test_isolate_server(self):
1200     data = [
1201       (['-I', 'http://foo.com/'], 'http://foo.com'),
1202       (['-I', 'https://foo.com/'], 'https://foo.com'),
1203       (['-I', 'https://foo.com'], 'https://foo.com'),
1204       (['-I', 'https://foo.com/a'], 'https://foo.com/a'),
1205       (['-I', 'https://foo.com/a/'], 'https://foo.com/a'),
1206       (['-I', 'https://foo.com:8080/a/'], 'https://foo.com:8080/a'),
1207       (['-I', 'foo.com'], 'https://foo.com'),
1208       (['-I', 'foo.com:8080'], 'https://foo.com:8080'),
1209       (['-I', 'foo.com/'], 'https://foo.com'),
1210       (['-I', 'foo.com/a/'], 'https://foo.com/a'),
1211     ]
1212     for value, expected in data:
1213       parser = isolateserver.OptionParserIsolateServer()
1214       isolateserver.add_isolate_server_options(parser, False)
1215       options, _ = parser.parse_args(value)
1216       isolateserver.process_isolate_server_options(parser, options)
1217       self.assertEqual(expected, options.isolate_server)
1218
1219   def test_indir(self):
1220     data = [
1221       (['-I', 'http://foo.com/'], ('http://foo.com', None)),
1222       (['--indir', ROOT_DIR], ('', ROOT_DIR)),
1223     ]
1224     for value, (expected_isolate_server, expected_indir) in data:
1225       parser = isolateserver.OptionParserIsolateServer()
1226       isolateserver.add_isolate_server_options(parser, True)
1227       options, _ = parser.parse_args(value)
1228       isolateserver.process_isolate_server_options(parser, options)
1229       self.assertEqual(expected_isolate_server, options.isolate_server)
1230       self.assertEqual(expected_indir, options.indir)
1231
1232
1233 def clear_env_vars():
1234   for e in ('ISOLATE_DEBUG', 'ISOLATE_SERVER'):
1235     os.environ.pop(e, None)
1236
1237
1238 if __name__ == '__main__':
1239   if '-v' in sys.argv:
1240     unittest.TestCase.maxDiff = None
1241   logging.basicConfig(
1242       level=(logging.DEBUG if '-v' in sys.argv else logging.ERROR))
1243   clear_env_vars()
1244   unittest.main()