2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Archives a set of files to a server."""
22 from third_party import colorama
23 from third_party.depot_tools import fix_encoding
24 from third_party.depot_tools import subcommand
27 from utils import threading_utils
28 from utils import tools
31 # Version of isolate protocol passed to the server in /handshake request.
32 ISOLATE_PROTOCOL_VERSION = '1.0'
35 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
49 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
57 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None
62 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024
65 # Chunk size to use when doing disk I/O.
66 DISK_FILE_CHUNK = 1024 * 1024
68 # Chunk size to use when reading from network stream.
69 NET_IO_FILE_CHUNK = 16 * 1024
72 # Read timeout in seconds for downloads from isolate storage. If there's no
73 # response from the server within this timeout whole download will be aborted.
74 DOWNLOAD_READ_TIMEOUT = 60
76 # Maximum expected delay (in seconds) between successive file fetches
77 # in run_tha_test. If it takes longer than that, a deadlock might be happening
78 # and all stack frames for all threads are dumped to log.
79 DEADLOCK_TIMEOUT = 5 * 60
82 # The delay (in seconds) to wait between logging statements when retrieving
83 # the required files. This is intended to let the user (or buildbot) know that
84 # the program is still running.
85 DELAY_BETWEEN_UPDATES_IN_SECS = 30
88 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89 # specify the names here.
92 'sha-1': hashlib.sha1,
93 'sha-512': hashlib.sha512,
97 # Used for serialization.
98 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
101 class ConfigError(ValueError):
102 """Generic failure to load a .isolated file."""
106 class MappingError(OSError):
107 """Failed to recreate the tree."""
111 def is_valid_hash(value, algo):
112 """Returns if the value is a valid hash for the corresponding algorithm."""
113 size = 2 * algo().digest_size
114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
117 def hash_file(filepath, algo):
118 """Calculates the hash of a file without reading it all in memory at once.
120 |algo| should be one of hashlib hashing algorithm.
123 with open(filepath, 'rb') as f:
125 chunk = f.read(DISK_FILE_CHUNK)
129 return digest.hexdigest()
132 def stream_read(stream, chunk_size):
133 """Reads chunks from |stream| and yields them."""
135 data = stream.read(chunk_size)
141 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
142 """Yields file content in chunks of given |chunk_size|."""
143 with open(filepath, 'rb') as f:
145 data = f.read(chunk_size)
151 def file_write(filepath, content_generator):
152 """Writes file content as generated by content_generator.
154 Creates the intermediary directory as needed.
156 Returns the number of bytes written.
158 Meant to be mocked out in unit tests.
160 filedir = os.path.dirname(filepath)
161 if not os.path.isdir(filedir):
164 with open(filepath, 'wb') as f:
165 for d in content_generator:
171 def zip_compress(content_generator, level=7):
172 """Reads chunks from |content_generator| and yields zip compressed chunks."""
173 compressor = zlib.compressobj(level)
174 for chunk in content_generator:
175 compressed = compressor.compress(chunk)
178 tail = compressor.flush(zlib.Z_FINISH)
183 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
184 """Reads zipped data from |content_generator| and yields decompressed data.
186 Decompresses data in small chunks (no larger than |chunk_size|) so that
187 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
189 Raises IOError if data is corrupted or incomplete.
191 decompressor = zlib.decompressobj()
194 for chunk in content_generator:
195 compressed_size += len(chunk)
196 data = decompressor.decompress(chunk, chunk_size)
199 while decompressor.unconsumed_tail:
200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
203 tail = decompressor.flush()
206 except zlib.error as e:
208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
209 # Ensure all data was read and decompressed.
210 if decompressor.unused_data or decompressor.unconsumed_tail:
211 raise IOError('Not all data was decompressed')
214 def get_zip_compression_level(filename):
215 """Given a filename calculates the ideal zip compression level to use."""
216 file_ext = os.path.splitext(filename)[1].lower()
217 # TODO(csharp): Profile to find what compression level works best.
218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
221 def create_directories(base_directory, files):
222 """Creates the directory structure needed by the given list of files."""
223 logging.debug('create_directories(%s, %d)', base_directory, len(files))
224 # Creates the tree of directories to create.
225 directories = set(os.path.dirname(f) for f in files)
226 for item in list(directories):
228 directories.add(item)
229 item = os.path.dirname(item)
230 for d in sorted(directories):
232 os.mkdir(os.path.join(base_directory, d))
235 def create_links(base_directory, files):
236 """Creates any links needed by the given set of files."""
237 for filepath, properties in files:
238 if 'l' not in properties:
240 if sys.platform == 'win32':
241 # TODO(maruel): Create junctions or empty text files similar to what
243 logging.warning('Ignoring symlink %s', filepath)
245 outfile = os.path.join(base_directory, filepath)
246 # symlink doesn't exist on Windows. So the 'link' property should
247 # never be specified for windows .isolated file.
248 os.symlink(properties['l'], outfile) # pylint: disable=E1101
249 if 'm' in properties:
250 lchmod = getattr(os, 'lchmod', None)
252 lchmod(outfile, properties['m'])
255 def is_valid_file(filepath, size):
256 """Determines if the given files appears valid.
258 Currently it just checks the file's size.
260 if size == UNKNOWN_FILE_SIZE:
261 return os.path.isfile(filepath)
262 actual_size = os.stat(filepath).st_size
263 if size != actual_size:
265 'Found invalid item %s; %d != %d',
266 os.path.basename(filepath), actual_size, size)
271 class WorkerPool(threading_utils.AutoRetryThreadPool):
272 """Thread pool that automatically retries on IOError and runs a preconfigured
275 # Initial and maximum number of worker threads.
281 super(WorkerPool, self).__init__(
284 self.INITIAL_WORKERS,
291 """An item to push to Storage.
293 It starts its life in a main thread, travels to 'contains' thread, then to
294 'push' thread and then finally back to the main thread.
296 It is never used concurrently from multiple threads.
299 def __init__(self, digest, size, is_isolated=False):
302 self.is_isolated = is_isolated
303 self.compression_level = 6
304 self.push_state = None
306 def content(self, chunk_size):
307 """Iterable with content of this item in chunks of given size.
310 chunk_size: preferred size of the chunk to produce, may be ignored.
312 raise NotImplementedError()
315 class FileItem(Item):
316 """A file to push to Storage."""
318 def __init__(self, path, digest, size, is_isolated):
319 super(FileItem, self).__init__(digest, size, is_isolated)
321 self.compression_level = get_zip_compression_level(path)
323 def content(self, chunk_size):
324 return file_read(self.path, chunk_size)
327 class BufferItem(Item):
328 """A byte buffer to push to Storage."""
330 def __init__(self, buf, algo, is_isolated=False):
331 super(BufferItem, self).__init__(
332 algo(buf).hexdigest(), len(buf), is_isolated)
335 def content(self, _chunk_size):
339 class Storage(object):
340 """Efficiently downloads or uploads large set of files via StorageApi."""
342 def __init__(self, storage_api, use_zip):
343 self.use_zip = use_zip
344 self._storage_api = storage_api
345 self._cpu_thread_pool = None
346 self._net_thread_pool = None
349 def cpu_thread_pool(self):
350 """ThreadPool for CPU-bound tasks like zipping."""
351 if self._cpu_thread_pool is None:
352 self._cpu_thread_pool = threading_utils.ThreadPool(
353 2, max(threading_utils.num_processors(), 2), 0, 'zip')
354 return self._cpu_thread_pool
357 def net_thread_pool(self):
358 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
359 if self._net_thread_pool is None:
360 self._net_thread_pool = WorkerPool()
361 return self._net_thread_pool
364 """Waits for all pending tasks to finish."""
365 if self._cpu_thread_pool:
366 self._cpu_thread_pool.join()
367 self._cpu_thread_pool.close()
368 self._cpu_thread_pool = None
369 if self._net_thread_pool:
370 self._net_thread_pool.join()
371 self._net_thread_pool.close()
372 self._net_thread_pool = None
375 """Context manager interface."""
378 def __exit__(self, _exc_type, _exc_value, _traceback):
379 """Context manager interface."""
383 def upload_tree(self, indir, infiles):
384 """Uploads the given tree to the isolate server.
387 indir: root directory the infiles are based in.
388 infiles: dict of files to upload from |indir|.
391 List of items that were uploaded. All other items are already there.
393 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
395 # Convert |indir| + |infiles| into a list of FileItem objects.
396 # Filter out symlinks, since they are not represented by items on isolate
400 path=os.path.join(indir, filepath),
401 digest=metadata['h'],
403 is_isolated=metadata.get('priority') == '0')
404 for filepath, metadata in infiles.iteritems()
405 if 'l' not in metadata
408 return self.upload_items(items)
410 def upload_items(self, items):
411 """Uploads bunch of items to the isolate server.
413 Will upload only items that are missing.
416 items: list of Item instances that represents data to upload.
419 List of items that were uploaded. All other items are already there.
421 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
422 # used by swarming.py. There's no need to spawn multiple threads and try to
423 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
424 # 'push' should be performed sequentially in the context of current thread.
426 # For each digest keep only first Item that matches it. All other items
427 # are just indistinguishable copies from the point of view of isolate
428 # server (it doesn't care about paths at all, only content and digests).
432 if seen.setdefault(item.digest, item) is not item:
434 items = seen.values()
436 logging.info('Skipped %d duplicated files', duplicates)
438 # Enqueue all upload tasks.
440 channel = threading_utils.TaskChannel()
441 for missing_item in self.get_missing_items(items):
442 missing.add(missing_item)
445 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
449 # No need to spawn deadlock detector thread if there's nothing to upload.
451 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
452 # Wait for all started uploads to finish.
453 while len(uploaded) != len(missing):
455 item = channel.pull()
456 uploaded.append(item)
458 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
459 logging.info('All files are uploaded')
463 total_size = sum(f.size for f in items)
465 'Total: %6d, %9.1fkb',
468 cache_hit = set(items) - missing
469 cache_hit_size = sum(f.size for f in cache_hit)
471 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
473 cache_hit_size / 1024.,
474 len(cache_hit) * 100. / total,
475 cache_hit_size * 100. / total_size if total_size else 0)
477 cache_miss_size = sum(f.size for f in cache_miss)
479 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
481 cache_miss_size / 1024.,
482 len(cache_miss) * 100. / total,
483 cache_miss_size * 100. / total_size if total_size else 0)
487 def get_fetch_url(self, digest):
488 """Returns an URL that can be used to fetch an item with given digest.
491 digest: hex digest of item to fetch.
494 An URL or None if underlying protocol doesn't support this.
496 return self._storage_api.get_fetch_url(digest)
498 def async_push(self, channel, priority, item):
499 """Starts asynchronous push to the server in a parallel thread.
502 channel: TaskChannel that receives back |item| when upload ends.
503 priority: thread pool task priority for the push.
504 item: item to upload as instance of Item class.
507 """Pushes an item and returns its id, to pass as a result to |channel|."""
508 self._storage_api.push(item, content)
511 # If zipping is not required, just start a push task.
513 self.net_thread_pool.add_task_with_channel(channel, priority, push,
514 item.content(DISK_FILE_CHUNK))
517 # If zipping is enabled, zip in a separate thread.
519 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
520 # content right here. It will block until all file is zipped.
522 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
523 item.compression_level)
524 data = ''.join(stream)
525 except Exception as exc:
526 logging.error('Failed to zip \'%s\': %s', item, exc)
527 channel.send_exception(exc)
529 self.net_thread_pool.add_task_with_channel(
530 channel, priority, push, [data])
531 self.cpu_thread_pool.add_task(priority, zip_and_push)
533 def async_fetch(self, channel, priority, digest, size, sink):
534 """Starts asynchronous fetch from the server in a parallel thread.
537 channel: TaskChannel that receives back |digest| when download ends.
538 priority: thread pool task priority for the fetch.
539 digest: hex digest of an item to download.
540 size: expected size of the item (after decompression).
541 sink: function that will be called as sink(generator).
545 # Prepare reading pipeline.
546 stream = self._storage_api.fetch(digest)
548 stream = zip_decompress(stream, DISK_FILE_CHUNK)
549 # Run |stream| through verifier that will assert its size.
550 verifier = FetchStreamVerifier(stream, size)
551 # Verified stream goes to |sink|.
553 except Exception as err:
554 logging.warning('Failed to fetch %s: %s', digest, err)
558 # Don't bother with zip_thread_pool for decompression. Decompression is
559 # really fast and most probably IO bound anyway.
560 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
562 def get_missing_items(self, items):
563 """Yields items that are missing from the server.
565 Issues multiple parallel queries via StorageApi's 'contains' method.
568 items: a list of Item objects to check.
571 Item objects that are missing from the server.
573 channel = threading_utils.TaskChannel()
575 # Enqueue all requests.
576 for batch in self.batch_items_for_check(items):
577 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
578 self._storage_api.contains, batch)
580 # Yield results as they come in.
581 for _ in xrange(pending):
582 for missing in channel.pull():
586 def batch_items_for_check(items):
587 """Splits list of items to check for existence on the server into batches.
589 Each batch corresponds to a single 'exists?' query to the server via a call
590 to StorageApi's 'contains' method.
593 items: a list of Item objects.
596 Batches of items to query for existence in a single operation,
597 each batch is a list of Item objects.
600 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
602 for item in sorted(items, key=lambda x: x.size, reverse=True):
603 next_queries.append(item)
604 if len(next_queries) == batch_size_limit:
608 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
609 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
614 class FetchQueue(object):
615 """Fetches items from Storage and places them into LocalCache.
617 It manages multiple concurrent fetch operations. Acts as a bridge between
618 Storage and LocalCache so that Storage and LocalCache don't depend on each
622 def __init__(self, storage, cache):
623 self.storage = storage
625 self._channel = threading_utils.TaskChannel()
626 self._pending = set()
627 self._accessed = set()
628 self._fetched = cache.cached_set()
630 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
631 """Starts asynchronous fetch of item |digest|."""
633 if digest in self._pending:
636 # Mark this file as in use, verify_all_cached will later ensure it is still
638 self._accessed.add(digest)
640 # Already fetched? Notify cache to update item's LRU position.
641 if digest in self._fetched:
642 # 'touch' returns True if item is in cache and not corrupted.
643 if self.cache.touch(digest, size):
645 # Item is corrupted, remove it from cache and fetch it again.
646 self._fetched.remove(digest)
647 self.cache.evict(digest)
649 # TODO(maruel): It should look at the free disk space, the current cache
650 # size and the size of the new item on every new item:
651 # - Trim the cache as more entries are listed when free disk space is low,
652 # otherwise if the amount of data downloaded during the run > free disk
653 # space, it'll crash.
654 # - Make sure there's enough free disk space to fit all dependencies of
655 # this run! If not, abort early.
658 self._pending.add(digest)
659 self.storage.async_fetch(
660 self._channel, priority, digest, size,
661 functools.partial(self.cache.write, digest))
663 def wait(self, digests):
664 """Starts a loop that waits for at least one of |digests| to be retrieved.
666 Returns the first digest retrieved.
668 # Flush any already fetched items.
669 for digest in digests:
670 if digest in self._fetched:
673 # Ensure all requested items are being fetched now.
674 assert all(digest in self._pending for digest in digests), (
675 digests, self._pending)
677 # Wait for some requested item to finish fetching.
679 digest = self._channel.pull()
680 self._pending.remove(digest)
681 self._fetched.add(digest)
682 if digest in digests:
685 # Should never reach this point due to assert above.
686 raise RuntimeError('Impossible state')
688 def inject_local_file(self, path, algo):
689 """Adds local file to the cache as if it was fetched from storage."""
690 with open(path, 'rb') as f:
692 digest = algo(data).hexdigest()
693 self.cache.write(digest, [data])
694 self._fetched.add(digest)
698 def pending_count(self):
699 """Returns number of items to be fetched."""
700 return len(self._pending)
702 def verify_all_cached(self):
703 """True if all accessed items are in cache."""
704 return self._accessed.issubset(self.cache.cached_set())
707 class FetchStreamVerifier(object):
708 """Verifies that fetched file is valid before passing it to the LocalCache."""
710 def __init__(self, stream, expected_size):
712 self.expected_size = expected_size
713 self.current_size = 0
716 """Generator that yields same items as |stream|.
718 Verifies |stream| is complete before yielding a last chunk to consumer.
720 Also wraps IOError produced by consumer into MappingError exceptions since
721 otherwise Storage will retry fetch on unrelated local cache errors.
723 # Read one chunk ahead, keep it in |stored|.
724 # That way a complete stream can be verified before pushing last chunk
727 for chunk in self.stream:
728 assert chunk is not None
729 if stored is not None:
730 self._inspect_chunk(stored, is_last=False)
733 except IOError as exc:
734 raise MappingError('Failed to store an item in cache: %s' % exc)
736 if stored is not None:
737 self._inspect_chunk(stored, is_last=True)
740 except IOError as exc:
741 raise MappingError('Failed to store an item in cache: %s' % exc)
743 def _inspect_chunk(self, chunk, is_last):
744 """Called for each fetched chunk before passing it to consumer."""
745 self.current_size += len(chunk)
746 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
747 (self.expected_size != self.current_size)):
748 raise IOError('Incorrect file size: expected %d, got %d' % (
749 self.expected_size, self.current_size))
752 class StorageApi(object):
753 """Interface for classes that implement low-level storage operations."""
755 def get_fetch_url(self, digest):
756 """Returns an URL that can be used to fetch an item with given digest.
759 digest: hex digest of item to fetch.
762 An URL or None if the protocol doesn't support this.
764 raise NotImplementedError()
766 def fetch(self, digest):
767 """Fetches an object and yields its content.
770 digest: hash digest of item to download.
773 Chunks of downloaded item (as str objects).
775 raise NotImplementedError()
777 def push(self, item, content):
778 """Uploads an |item| with content generated by |content| generator.
781 item: Item object that holds information about an item being pushed.
782 content: a generator that yields chunks to push.
787 raise NotImplementedError()
789 def contains(self, items):
790 """Checks for existence of given |items| on the server.
792 Mutates |items| by assigning opaque implement specific object to Item's
793 push_state attribute on missing entries in the datastore.
796 items: list of Item objects.
799 A list of items missing on server as a list of Item objects.
801 raise NotImplementedError()
804 class IsolateServer(StorageApi):
805 """StorageApi implementation that downloads and uploads to Isolate Server.
807 It uploads and downloads directly from Google Storage whenever appropriate.
810 class _PushState(object):
811 """State needed to call .push(), to be stored in Item.push_state."""
812 def __init__(self, upload_url, finalize_url):
813 self.upload_url = upload_url
814 self.finalize_url = finalize_url
815 self.uploaded = False
816 self.finalized = False
818 def __init__(self, base_url, namespace):
819 super(IsolateServer, self).__init__()
820 assert base_url.startswith('http'), base_url
821 self.base_url = base_url.rstrip('/')
822 self.namespace = namespace
823 self._lock = threading.Lock()
824 self._server_caps = None
827 def _generate_handshake_request():
828 """Returns a dict to be sent as handshake request body."""
829 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
831 'client_app_version': __version__,
833 'protocol_version': ISOLATE_PROTOCOL_VERSION,
838 def _validate_handshake_response(caps):
839 """Validates and normalizes handshake response."""
840 logging.info('Protocol version: %s', caps['protocol_version'])
841 logging.info('Server version: %s', caps['server_app_version'])
842 if caps.get('error'):
843 raise MappingError(caps['error'])
844 if not caps['access_token']:
845 raise ValueError('access_token is missing')
849 def _server_capabilities(self):
850 """Performs handshake with the server if not yet done.
853 Server capabilities dictionary as returned by /handshake endpoint.
856 MappingError if server rejects the handshake.
858 # TODO(maruel): Make this request much earlier asynchronously while the
859 # files are being enumerated.
861 if self._server_caps is None:
862 request_body = json.dumps(
863 self._generate_handshake_request(), separators=(',', ':'))
864 response = net.url_read(
865 url=self.base_url + '/content-gs/handshake',
867 content_type='application/json',
870 raise MappingError('Failed to perform handshake.')
872 caps = json.loads(response)
873 if not isinstance(caps, dict):
874 raise ValueError('Expecting JSON dict')
875 self._server_caps = self._validate_handshake_response(caps)
876 except (ValueError, KeyError, TypeError) as exc:
877 # KeyError exception has very confusing str conversion: it's just a
878 # missing key value and nothing else. So print exception class name
880 raise MappingError('Invalid handshake response (%s): %s' % (
881 exc.__class__.__name__, exc))
882 return self._server_caps
884 def get_fetch_url(self, digest):
885 assert isinstance(digest, basestring)
886 return '%s/content-gs/retrieve/%s/%s' % (
887 self.base_url, self.namespace, digest)
889 def fetch(self, digest):
890 source_url = self.get_fetch_url(digest)
891 logging.debug('download_file(%s)', source_url)
893 # Because the app engine DB is only eventually consistent, retry 404 errors
894 # because the file might just not be visible yet (even though it has been
896 connection = net.url_open(
897 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
899 raise IOError('Unable to open connection to %s' % source_url)
900 return stream_read(connection, NET_IO_FILE_CHUNK)
902 def push(self, item, content):
903 assert isinstance(item, Item)
904 assert isinstance(item.push_state, IsolateServer._PushState)
905 assert not item.push_state.finalized
907 # TODO(vadimsh): Do not read from |content| generator when retrying push.
908 # If |content| is indeed a generator, it can not be re-winded back
909 # to the beginning of the stream. A retry will find it exhausted. A possible
910 # solution is to wrap |content| generator with some sort of caching
911 # restartable generator. It should be done alongside streaming support
914 # This push operation may be a retry after failed finalization call below,
915 # no need to reupload contents in that case.
916 if not item.push_state.uploaded:
917 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
918 # upload support is implemented.
919 if isinstance(content, list) and len(content) == 1:
922 content = ''.join(content)
923 # PUT file to |upload_url|.
924 response = net.url_read(
925 url=item.push_state.upload_url,
927 content_type='application/octet-stream',
930 raise IOError('Failed to upload a file %s to %s' % (
931 item.digest, item.push_state.upload_url))
932 item.push_state.uploaded = True
935 'A file %s already uploaded, retrying finalization only', item.digest)
937 # Optionally notify the server that it's done.
938 if item.push_state.finalize_url:
939 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
940 # send it to isolated server. That way isolate server can verify that
941 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
943 response = net.url_read(
944 url=item.push_state.finalize_url,
946 content_type='application/json',
949 raise IOError('Failed to finalize an upload of %s' % item.digest)
950 item.push_state.finalized = True
952 def contains(self, items):
953 logging.info('Checking existence of %d files...', len(items))
955 # Request body is a json encoded list of dicts.
960 'i': int(item.is_isolated),
964 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
967 urllib.quote(self._server_capabilities['access_token']))
968 response_body = net.url_read(
970 data=json.dumps(body, separators=(',', ':')),
971 content_type='application/json',
973 if response_body is None:
974 raise MappingError('Failed to execute /pre-upload query')
976 # Response body is a list of push_urls (or null if file is already present).
978 response = json.loads(response_body)
979 if not isinstance(response, list):
980 raise ValueError('Expecting response with json-encoded list')
981 if len(response) != len(items):
983 'Incorrect number of items in the list, expected %d, '
984 'but got %d' % (len(items), len(response)))
985 except ValueError as err:
987 'Invalid response from server: %s, body is %s' % (err, response_body))
989 # Pick Items that are missing, attach _PushState to them.
991 for i, push_urls in enumerate(response):
993 assert len(push_urls) == 2, str(push_urls)
995 assert item.push_state is None
996 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
997 missing_items.append(item)
998 logging.info('Queried %d files, %d cache hit',
999 len(items), len(items) - len(missing_items))
1000 return missing_items
1003 class FileSystem(StorageApi):
1004 """StorageApi implementation that fetches data from the file system.
1006 The common use case is a NFS/CIFS file server that is mounted locally that is
1007 used to fetch the file on a local partition.
1010 def __init__(self, base_path):
1011 super(FileSystem, self).__init__()
1012 self.base_path = base_path
1014 def get_fetch_url(self, digest):
1017 def fetch(self, digest):
1018 assert isinstance(digest, basestring)
1019 return file_read(os.path.join(self.base_path, digest))
1021 def push(self, item, content):
1022 assert isinstance(item, Item)
1023 file_write(os.path.join(self.base_path, item.digest), content)
1025 def contains(self, items):
1027 item for item in items
1028 if not os.path.exists(os.path.join(self.base_path, item.digest))
1032 class LocalCache(object):
1033 """Local cache that stores objects fetched via Storage.
1035 It can be accessed concurrently from multiple threads, so it should protect
1036 its internal state with some lock.
1039 def __enter__(self):
1040 """Context manager interface."""
1043 def __exit__(self, _exc_type, _exec_value, _traceback):
1044 """Context manager interface."""
1047 def cached_set(self):
1048 """Returns a set of all cached digests (always a new object)."""
1049 raise NotImplementedError()
1051 def touch(self, digest, size):
1052 """Ensures item is not corrupted and updates its LRU position.
1055 digest: hash digest of item to check.
1056 size: expected size of this item.
1059 True if item is in cache and not corrupted.
1061 raise NotImplementedError()
1063 def evict(self, digest):
1064 """Removes item from cache if it's there."""
1065 raise NotImplementedError()
1067 def read(self, digest):
1068 """Returns contents of the cached item as a single str."""
1069 raise NotImplementedError()
1071 def write(self, digest, content):
1072 """Reads data from |content| generator and stores it in cache."""
1073 raise NotImplementedError()
1075 def link(self, digest, dest, file_mode=None):
1076 """Ensures file at |dest| has same content as cached |digest|."""
1077 raise NotImplementedError()
1080 class MemoryCache(LocalCache):
1081 """LocalCache implementation that stores everything in memory."""
1084 super(MemoryCache, self).__init__()
1085 # Let's not assume dict is thread safe.
1086 self._lock = threading.Lock()
1089 def cached_set(self):
1091 return set(self._contents)
1093 def touch(self, digest, size):
1095 return digest in self._contents
1097 def evict(self, digest):
1099 self._contents.pop(digest, None)
1101 def read(self, digest):
1103 return self._contents[digest]
1105 def write(self, digest, content):
1106 # Assemble whole stream before taking the lock.
1107 data = ''.join(content)
1109 self._contents[digest] = data
1111 def link(self, digest, dest, file_mode=None):
1112 file_write(dest, [self.read(digest)])
1113 if file_mode is not None:
1114 os.chmod(dest, file_mode)
1117 def get_hash_algo(_namespace):
1118 """Return hash algorithm class to use when uploading to given |namespace|."""
1119 # TODO(vadimsh): Implement this at some point.
1123 def is_namespace_with_compression(namespace):
1124 """Returns True if given |namespace| stores compressed objects."""
1125 return namespace.endswith(('-gzip', '-deflate'))
1128 def get_storage_api(file_or_url, namespace):
1129 """Returns an object that implements StorageApi interface."""
1130 if re.match(r'^https?://.+$', file_or_url):
1131 return IsolateServer(file_or_url, namespace)
1133 return FileSystem(file_or_url)
1136 def get_storage(file_or_url, namespace):
1137 """Returns Storage class configured with appropriate StorageApi instance."""
1139 get_storage_api(file_or_url, namespace),
1140 is_namespace_with_compression(namespace))
1143 def upload_tree(base_url, indir, infiles, namespace):
1144 """Uploads the given tree to the given url.
1147 base_url: The base url, it is assume that |base_url|/has/ can be used to
1148 query if an element was already uploaded, and |base_url|/store/
1149 can be used to upload a new element.
1150 indir: Root directory the infiles are based in.
1151 infiles: dict of files to upload from |indir| to |base_url|.
1152 namespace: The namespace to use on the server.
1154 with get_storage(base_url, namespace) as storage:
1155 storage.upload_tree(indir, infiles)
1159 def load_isolated(content, os_flavor, algo):
1160 """Verifies the .isolated file is valid and loads this object with the json
1164 - content: raw serialized content to load.
1165 - os_flavor: OS to load this file on. Optional.
1166 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1167 algorithm used on the Isolate Server.
1170 data = json.loads(content)
1172 raise ConfigError('Failed to parse: %s...' % content[:100])
1174 if not isinstance(data, dict):
1175 raise ConfigError('Expected dict, got %r' % data)
1177 # Check 'version' first, since it could modify the parsing after.
1178 value = data.get('version', '1.0')
1179 if not isinstance(value, basestring):
1180 raise ConfigError('Expected string, got %r' % value)
1181 if not re.match(r'^(\d+)\.(\d+)$', value):
1182 raise ConfigError('Expected a compatible version, got %r' % value)
1183 if value.split('.', 1)[0] != '1':
1184 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1187 # Default the algorithm used in the .isolated file itself, falls back to
1188 # 'sha-1' if unspecified.
1189 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1191 for key, value in data.iteritems():
1193 if not isinstance(value, basestring):
1194 raise ConfigError('Expected string, got %r' % value)
1195 if value not in SUPPORTED_ALGOS:
1197 'Expected one of \'%s\', got %r' %
1198 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1199 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1201 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1203 elif key == 'command':
1204 if not isinstance(value, list):
1205 raise ConfigError('Expected list, got %r' % value)
1207 raise ConfigError('Expected non-empty command')
1208 for subvalue in value:
1209 if not isinstance(subvalue, basestring):
1210 raise ConfigError('Expected string, got %r' % subvalue)
1212 elif key == 'files':
1213 if not isinstance(value, dict):
1214 raise ConfigError('Expected dict, got %r' % value)
1215 for subkey, subvalue in value.iteritems():
1216 if not isinstance(subkey, basestring):
1217 raise ConfigError('Expected string, got %r' % subkey)
1218 if not isinstance(subvalue, dict):
1219 raise ConfigError('Expected dict, got %r' % subvalue)
1220 for subsubkey, subsubvalue in subvalue.iteritems():
1221 if subsubkey == 'l':
1222 if not isinstance(subsubvalue, basestring):
1223 raise ConfigError('Expected string, got %r' % subsubvalue)
1224 elif subsubkey == 'm':
1225 if not isinstance(subsubvalue, int):
1226 raise ConfigError('Expected int, got %r' % subsubvalue)
1227 elif subsubkey == 'h':
1228 if not is_valid_hash(subsubvalue, algo):
1229 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1230 elif subsubkey == 's':
1231 if not isinstance(subsubvalue, int):
1232 raise ConfigError('Expected int, got %r' % subsubvalue)
1234 raise ConfigError('Unknown subsubkey %s' % subsubkey)
1235 if bool('h' in subvalue) == bool('l' in subvalue):
1237 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1239 if bool('h' in subvalue) != bool('s' in subvalue):
1241 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1243 if bool('s' in subvalue) == bool('l' in subvalue):
1245 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1247 if bool('l' in subvalue) and bool('m' in subvalue):
1249 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
1252 elif key == 'includes':
1253 if not isinstance(value, list):
1254 raise ConfigError('Expected list, got %r' % value)
1256 raise ConfigError('Expected non-empty includes list')
1257 for subvalue in value:
1258 if not is_valid_hash(subvalue, algo):
1259 raise ConfigError('Expected sha-1, got %r' % subvalue)
1261 elif key == 'read_only':
1262 if not isinstance(value, bool):
1263 raise ConfigError('Expected bool, got %r' % value)
1265 elif key == 'relative_cwd':
1266 if not isinstance(value, basestring):
1267 raise ConfigError('Expected string, got %r' % value)
1270 if os_flavor and value != os_flavor:
1272 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1275 elif key == 'version':
1276 # Already checked above.
1280 raise ConfigError('Unknown key %r' % key)
1282 # Automatically fix os.path.sep if necessary. While .isolated files are always
1283 # in the the native path format, someone could want to download an .isolated
1284 # tree from another OS.
1285 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1287 data['files'] = dict(
1288 (k.replace(wrong_path_sep, os.path.sep), v)
1289 for k, v in data['files'].iteritems())
1290 for v in data['files'].itervalues():
1292 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1293 if 'relative_cwd' in data:
1294 data['relative_cwd'] = data['relative_cwd'].replace(
1295 wrong_path_sep, os.path.sep)
1299 class IsolatedFile(object):
1300 """Represents a single parsed .isolated file."""
1301 def __init__(self, obj_hash, algo):
1302 """|obj_hash| is really the sha-1 of the file."""
1303 logging.debug('IsolatedFile(%s)' % obj_hash)
1304 self.obj_hash = obj_hash
1306 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1307 # .isolate and all the .isolated files recursively included by it with
1308 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1309 # .isolated file in the hash table, is important, as the later ones are not
1310 # processed until the firsts are retrieved and read.
1311 self.can_fetch = False
1315 # A IsolatedFile instance, one per object in self.includes.
1318 # Set once the .isolated file is loaded.
1319 self._is_parsed = False
1320 # Set once the files are fetched.
1321 self.files_fetched = False
1323 def load(self, os_flavor, content):
1324 """Verifies the .isolated file is valid and loads this object with the json
1327 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1328 assert not self._is_parsed
1329 self.data = load_isolated(content, os_flavor, self.algo)
1331 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1333 self._is_parsed = True
1335 def fetch_files(self, fetch_queue, files):
1336 """Adds files in this .isolated file not present in |files| dictionary.
1338 Preemptively request files.
1340 Note that |files| is modified by this function.
1342 assert self.can_fetch
1343 if not self._is_parsed or self.files_fetched:
1345 logging.debug('fetch_files(%s)' % self.obj_hash)
1346 for filepath, properties in self.data.get('files', {}).iteritems():
1347 # Root isolated has priority on the files being mapped. In particular,
1348 # overriden files must not be fetched.
1349 if filepath not in files:
1350 files[filepath] = properties
1351 if 'h' in properties:
1352 # Preemptively request files.
1353 logging.debug('fetching %s' % filepath)
1354 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
1355 self.files_fetched = True
1358 class Settings(object):
1359 """Results of a completely parsed .isolated file."""
1363 self.read_only = None
1364 self.relative_cwd = None
1365 # The main .isolated file, a IsolatedFile instance.
1368 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
1369 """Loads the .isolated and all the included .isolated asynchronously.
1371 It enables support for "included" .isolated files. They are processed in
1372 strict order but fetched asynchronously from the cache. This is important so
1373 that a file in an included .isolated file that is overridden by an embedding
1374 .isolated file is not fetched needlessly. The includes are fetched in one
1375 pass and the files are fetched as soon as all the ones on the left-side
1376 of the tree were fetched.
1378 The prioritization is very important here for nested .isolated files.
1379 'includes' have the highest priority and the algorithm is optimized for both
1380 deep and wide trees. A deep one is a long link of .isolated files referenced
1381 one at a time by one item in 'includes'. A wide one has a large number of
1382 'includes' in a single .isolated file. 'left' is defined as an included
1383 .isolated file earlier in the 'includes' list. So the order of the elements
1384 in 'includes' is important.
1386 self.root = IsolatedFile(root_isolated_hash, algo)
1388 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1390 # Set of hashes of already retrieved items to refuse recursive includes.
1393 def retrieve(isolated_file):
1394 h = isolated_file.obj_hash
1396 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1397 assert h not in pending
1399 pending[h] = isolated_file
1400 fetch_queue.add(WorkerPool.HIGH, h)
1405 item_hash = fetch_queue.wait(pending)
1406 item = pending.pop(item_hash)
1407 item.load(os_flavor, fetch_queue.cache.read(item_hash))
1408 if item_hash == root_isolated_hash:
1409 # It's the root item.
1410 item.can_fetch = True
1412 for new_child in item.children:
1415 # Traverse the whole tree to see if files can now be fetched.
1416 self._traverse_tree(fetch_queue, self.root)
1419 return all(check(x) for x in n.children) and n.files_fetched
1420 assert check(self.root)
1422 self.relative_cwd = self.relative_cwd or ''
1423 self.read_only = self.read_only or False
1425 def _traverse_tree(self, fetch_queue, node):
1427 if not node.files_fetched:
1428 self._update_self(fetch_queue, node)
1430 for i in node.children:
1434 # Automatically mark the first one as fetcheable.
1437 self._traverse_tree(fetch_queue, i)
1439 def _update_self(self, fetch_queue, node):
1440 node.fetch_files(fetch_queue, self.files)
1442 if not self.command and node.data.get('command'):
1443 # Ensure paths are correctly separated on windows.
1444 self.command = node.data['command']
1446 self.command[0] = self.command[0].replace('/', os.path.sep)
1447 self.command = tools.fix_python_path(self.command)
1448 if self.read_only is None and node.data.get('read_only') is not None:
1449 self.read_only = node.data['read_only']
1450 if (self.relative_cwd is None and
1451 node.data.get('relative_cwd') is not None):
1452 self.relative_cwd = node.data['relative_cwd']
1456 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
1457 """Aggressively downloads the .isolated file(s), then download all the files.
1460 isolated_hash: hash of the root *.isolated file.
1461 storage: Storage class that communicates with isolate storage.
1462 cache: LocalCache class that knows how to store and map files locally.
1463 algo: hash algorithm to use.
1464 outdir: Output directory to map file tree to.
1465 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1466 require_command: Ensure *.isolated specifies a command to run.
1469 Settings object that holds details about loaded *.isolated file.
1472 fetch_queue = FetchQueue(storage, cache)
1473 settings = Settings()
1475 with tools.Profiler('GetIsolateds'):
1476 # Optionally support local files by manually adding them to cache.
1477 if not is_valid_hash(isolated_hash, algo):
1478 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1480 # Load all *.isolated and start loading rest of the files.
1481 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
1482 if require_command and not settings.command:
1483 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1484 # easy way to cancel them.
1485 raise ConfigError('No command to run')
1487 with tools.Profiler('GetRest'):
1488 # Create file system hierarchy.
1489 if not os.path.isdir(outdir):
1491 create_directories(outdir, settings.files)
1492 create_links(outdir, settings.files.iteritems())
1494 # Ensure working directory exists.
1495 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1496 if not os.path.isdir(cwd):
1499 # Multimap: digest -> list of pairs (path, props).
1501 for filepath, props in settings.files.iteritems():
1503 remaining.setdefault(props['h'], []).append((filepath, props))
1505 # Now block on the remaining files to be downloaded and mapped.
1506 logging.info('Retrieving remaining files (%d of them)...',
1507 fetch_queue.pending_count)
1508 last_update = time.time()
1509 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1513 # Wait for any item to finish fetching to cache.
1514 digest = fetch_queue.wait(remaining)
1516 # Link corresponding files to a fetched item in cache.
1517 for filepath, props in remaining.pop(digest):
1518 cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
1521 duration = time.time() - last_update
1522 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1523 msg = '%d files remaining...' % len(remaining)
1526 last_update = time.time()
1528 # Cache could evict some items we just tried to fetch, it's a fatal error.
1529 if not fetch_queue.verify_all_cached():
1530 raise MappingError('Cache is too small to hold all requested files')
1534 @subcommand.usage('<file1..fileN> or - to read from stdin')
1535 def CMDarchive(parser, args):
1536 """Archives data to the server."""
1537 options, files = parser.parse_args(args)
1540 files = sys.stdin.readlines()
1543 parser.error('Nothing to upload')
1545 # Load the necessary metadata.
1546 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
1551 's': os.stat(f).st_size,
1552 'h': hash_file(f, get_hash_algo(options.namespace)),
1557 with tools.Profiler('Archive'):
1559 base_url=options.isolate_server,
1562 namespace=options.namespace)
1564 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1568 def CMDdownload(parser, args):
1569 """Download data from the server.
1571 It can either download individual files or a complete tree from a .isolated
1575 '-i', '--isolated', metavar='HASH',
1576 help='hash of an isolated file, .isolated file content is discarded, use '
1577 '--file if you need it')
1579 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1580 help='hash and destination of a file, can be used multiple times')
1582 '-t', '--target', metavar='DIR', default=os.getcwd(),
1583 help='destination directory')
1584 options, args = parser.parse_args(args)
1586 parser.error('Unsupported arguments: %s' % args)
1587 if bool(options.isolated) == bool(options.file):
1588 parser.error('Use one of --isolated or --file, and only one.')
1590 options.target = os.path.abspath(options.target)
1591 storage = get_storage(options.isolate_server, options.namespace)
1592 cache = MemoryCache()
1593 algo = get_hash_algo(options.namespace)
1595 # Fetching individual files.
1597 channel = threading_utils.TaskChannel()
1599 for digest, dest in options.file:
1600 pending[digest] = dest
1601 storage.async_fetch(
1606 functools.partial(file_write, os.path.join(options.target, dest)))
1608 fetched = channel.pull()
1609 dest = pending.pop(fetched)
1610 logging.info('%s: %s', fetched, dest)
1612 # Fetching whole isolated tree.
1613 if options.isolated:
1614 settings = fetch_isolated(
1615 isolated_hash=options.isolated,
1619 outdir=options.target,
1621 require_command=False)
1622 rel = os.path.join(options.target, settings.relative_cwd)
1623 print('To run this test please run from the directory %s:' %
1624 os.path.join(options.target, rel))
1625 print(' ' + ' '.join(settings.command))
1630 class OptionParserIsolateServer(tools.OptionParserWithLogging):
1631 def __init__(self, **kwargs):
1632 tools.OptionParserWithLogging.__init__(self, **kwargs)
1634 '-I', '--isolate-server',
1635 metavar='URL', default='',
1636 help='Isolate server to use')
1638 '--namespace', default='default-gzip',
1639 help='The namespace to use on the server, default: %default')
1641 def parse_args(self, *args, **kwargs):
1642 options, args = tools.OptionParserWithLogging.parse_args(
1643 self, *args, **kwargs)
1644 options.isolate_server = options.isolate_server.rstrip('/')
1645 if not options.isolate_server:
1646 self.error('--isolate-server is required.')
1647 return options, args
1651 dispatcher = subcommand.CommandDispatcher(__name__)
1653 return dispatcher.execute(
1654 OptionParserIsolateServer(version=__version__), args)
1655 except (ConfigError, MappingError) as e:
1656 sys.stderr.write('\nError: ')
1657 sys.stderr.write(str(e))
1658 sys.stderr.write('\n')
1662 if __name__ == '__main__':
1663 fix_encoding.fix_encoding()
1664 tools.disable_buffering()
1666 sys.exit(main(sys.argv[1:]))