2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
6 """Archives a set of files or directories to a server."""
26 from third_party import colorama
27 from third_party.depot_tools import fix_encoding
28 from third_party.depot_tools import subcommand
30 from utils import file_path
32 from utils import on_error
33 from utils import threading_utils
34 from utils import tools
39 # Version of isolate protocol passed to the server in /handshake request.
40 ISOLATE_PROTOCOL_VERSION = '1.0'
41 # Version stored and expected in .isolated files.
42 ISOLATED_FILE_VERSION = '1.4'
45 # The number of files to check the isolate server per /pre-upload query.
46 # All files are sorted by likelihood of a change in the file content
47 # (currently file size is used to estimate this: larger the file -> larger the
48 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
49 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
50 # and so on. Numbers here is a trade-off; the more per request, the lower the
51 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
52 # larger values cause longer lookups, increasing the initial latency to start
53 # uploading, which is especially an issue for large files. This value is
54 # optimized for the "few thousands files to look up with minimal number of large
55 # files missing" case.
56 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
59 # A list of already compressed extension types that should not receive any
60 # compression before being uploaded.
61 ALREADY_COMPRESSED_TYPES = [
62 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
67 # The file size to be used when we don't know the correct file size,
68 # generally used for .isolated files.
69 UNKNOWN_FILE_SIZE = None
72 # Chunk size to use when doing disk I/O.
73 DISK_FILE_CHUNK = 1024 * 1024
75 # Chunk size to use when reading from network stream.
76 NET_IO_FILE_CHUNK = 16 * 1024
79 # Read timeout in seconds for downloads from isolate storage. If there's no
80 # response from the server within this timeout whole download will be aborted.
81 DOWNLOAD_READ_TIMEOUT = 60
83 # Maximum expected delay (in seconds) between successive file fetches
84 # in run_tha_test. If it takes longer than that, a deadlock might be happening
85 # and all stack frames for all threads are dumped to log.
86 DEADLOCK_TIMEOUT = 5 * 60
89 # The delay (in seconds) to wait between logging statements when retrieving
90 # the required files. This is intended to let the user (or buildbot) know that
91 # the program is still running.
92 DELAY_BETWEEN_UPDATES_IN_SECS = 30
95 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
96 # specify the names here.
99 'sha-1': hashlib.sha1,
100 'sha-512': hashlib.sha512,
104 # Used for serialization.
105 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
108 DEFAULT_BLACKLIST = (
109 # Temporary vim or python files.
110 r'^.+\.(?:pyc|swp)$',
111 # .git or .svn directory.
112 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
117 DEFAULT_BLACKLIST += (
118 r'^.+\.(?:run_test_cases)$',
119 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
123 class Error(Exception):
124 """Generic runtime error."""
128 class ConfigError(ValueError):
129 """Generic failure to load a .isolated file."""
133 class MappingError(OSError):
134 """Failed to recreate the tree."""
138 def is_valid_hash(value, algo):
139 """Returns if the value is a valid hash for the corresponding algorithm."""
140 size = 2 * algo().digest_size
141 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
144 def hash_file(filepath, algo):
145 """Calculates the hash of a file without reading it all in memory at once.
147 |algo| should be one of hashlib hashing algorithm.
150 with open(filepath, 'rb') as f:
152 chunk = f.read(DISK_FILE_CHUNK)
156 return digest.hexdigest()
159 def stream_read(stream, chunk_size):
160 """Reads chunks from |stream| and yields them."""
162 data = stream.read(chunk_size)
168 def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
169 """Yields file content in chunks of |chunk_size| starting from |offset|."""
170 with open(filepath, 'rb') as f:
174 data = f.read(chunk_size)
180 def file_write(filepath, content_generator):
181 """Writes file content as generated by content_generator.
183 Creates the intermediary directory as needed.
185 Returns the number of bytes written.
187 Meant to be mocked out in unit tests.
189 filedir = os.path.dirname(filepath)
190 if not os.path.isdir(filedir):
193 with open(filepath, 'wb') as f:
194 for d in content_generator:
200 def zip_compress(content_generator, level=7):
201 """Reads chunks from |content_generator| and yields zip compressed chunks."""
202 compressor = zlib.compressobj(level)
203 for chunk in content_generator:
204 compressed = compressor.compress(chunk)
207 tail = compressor.flush(zlib.Z_FINISH)
212 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
213 """Reads zipped data from |content_generator| and yields decompressed data.
215 Decompresses data in small chunks (no larger than |chunk_size|) so that
216 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
218 Raises IOError if data is corrupted or incomplete.
220 decompressor = zlib.decompressobj()
223 for chunk in content_generator:
224 compressed_size += len(chunk)
225 data = decompressor.decompress(chunk, chunk_size)
228 while decompressor.unconsumed_tail:
229 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
232 tail = decompressor.flush()
235 except zlib.error as e:
237 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
238 # Ensure all data was read and decompressed.
239 if decompressor.unused_data or decompressor.unconsumed_tail:
240 raise IOError('Not all data was decompressed')
243 def get_zip_compression_level(filename):
244 """Given a filename calculates the ideal zip compression level to use."""
245 file_ext = os.path.splitext(filename)[1].lower()
246 # TODO(csharp): Profile to find what compression level works best.
247 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
250 def create_directories(base_directory, files):
251 """Creates the directory structure needed by the given list of files."""
252 logging.debug('create_directories(%s, %d)', base_directory, len(files))
253 # Creates the tree of directories to create.
254 directories = set(os.path.dirname(f) for f in files)
255 for item in list(directories):
257 directories.add(item)
258 item = os.path.dirname(item)
259 for d in sorted(directories):
261 os.mkdir(os.path.join(base_directory, d))
264 def create_symlinks(base_directory, files):
265 """Creates any symlinks needed by the given set of files."""
266 for filepath, properties in files:
267 if 'l' not in properties:
269 if sys.platform == 'win32':
270 # TODO(maruel): Create symlink via the win32 api.
271 logging.warning('Ignoring symlink %s', filepath)
273 outfile = os.path.join(base_directory, filepath)
274 # os.symlink() doesn't exist on Windows.
275 os.symlink(properties['l'], outfile) # pylint: disable=E1101
278 def is_valid_file(filepath, size):
279 """Determines if the given files appears valid.
281 Currently it just checks the file's size.
283 if size == UNKNOWN_FILE_SIZE:
284 return os.path.isfile(filepath)
285 actual_size = os.stat(filepath).st_size
286 if size != actual_size:
288 'Found invalid item %s; %d != %d',
289 os.path.basename(filepath), actual_size, size)
294 class WorkerPool(threading_utils.AutoRetryThreadPool):
295 """Thread pool that automatically retries on IOError and runs a preconfigured
298 # Initial and maximum number of worker threads.
304 super(WorkerPool, self).__init__(
307 self.INITIAL_WORKERS,
314 """An item to push to Storage.
316 Its digest and size may be provided in advance, if known. Otherwise they will
317 be derived from content(). If digest is provided, it MUST correspond to
318 hash algorithm used by Storage.
320 When used with Storage, Item starts its life in a main thread, travels
321 to 'contains' thread, then to 'push' thread and then finally back to
322 the main thread. It is never used concurrently from multiple threads.
325 def __init__(self, digest=None, size=None, high_priority=False):
328 self.high_priority = high_priority
329 self.compression_level = 6
332 """Iterable with content of this item as byte string (str) chunks."""
333 raise NotImplementedError()
335 def prepare(self, hash_algo):
336 """Ensures self.digest and self.size are set.
338 Uses content() as a source of data to calculate them. Does nothing if digest
339 and size is already known.
342 hash_algo: hash algorithm to use to calculate digest.
344 if self.digest is None or self.size is None:
347 for chunk in self.content():
350 self.digest = digest.hexdigest()
354 class FileItem(Item):
355 """A file to push to Storage.
357 Its digest and size may be provided in advance, if known. Otherwise they will
358 be derived from the file content.
361 def __init__(self, path, digest=None, size=None, high_priority=False):
362 super(FileItem, self).__init__(
364 size if size is not None else os.stat(path).st_size,
367 self.compression_level = get_zip_compression_level(path)
370 return file_read(self.path)
373 class BufferItem(Item):
374 """A byte buffer to push to Storage."""
376 def __init__(self, buf, high_priority=False):
377 super(BufferItem, self).__init__(None, len(buf), high_priority)
384 class Storage(object):
385 """Efficiently downloads or uploads large set of files via StorageApi.
387 Implements compression support, parallel 'contains' checks, parallel uploads
390 Works only within single namespace (and thus hashing algorithm and compression
393 Spawns multiple internal threads. Thread safe, but not fork safe.
396 def __init__(self, storage_api):
397 self._storage_api = storage_api
398 self._use_zip = is_namespace_with_compression(storage_api.namespace)
399 self._hash_algo = get_hash_algo(storage_api.namespace)
400 self._cpu_thread_pool = None
401 self._net_thread_pool = None
405 """Hashing algorithm used to name files in storage based on their content.
407 Defined by |namespace|. See also 'get_hash_algo'.
409 return self._hash_algo
413 """Location of a backing store that this class is using.
415 Exact meaning depends on the storage_api type. For IsolateServer it is
416 an URL of isolate server, for FileSystem is it a path in file system.
418 return self._storage_api.location
422 """Isolate namespace used by this storage.
424 Indirectly defines hashing scheme and compression method used.
426 return self._storage_api.namespace
429 def cpu_thread_pool(self):
430 """ThreadPool for CPU-bound tasks like zipping."""
431 if self._cpu_thread_pool is None:
432 self._cpu_thread_pool = threading_utils.ThreadPool(
433 2, max(threading_utils.num_processors(), 2), 0, 'zip')
434 return self._cpu_thread_pool
437 def net_thread_pool(self):
438 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
439 if self._net_thread_pool is None:
440 self._net_thread_pool = WorkerPool()
441 return self._net_thread_pool
444 """Waits for all pending tasks to finish."""
445 if self._cpu_thread_pool:
446 self._cpu_thread_pool.join()
447 self._cpu_thread_pool.close()
448 self._cpu_thread_pool = None
449 if self._net_thread_pool:
450 self._net_thread_pool.join()
451 self._net_thread_pool.close()
452 self._net_thread_pool = None
455 """Context manager interface."""
458 def __exit__(self, _exc_type, _exc_value, _traceback):
459 """Context manager interface."""
463 def upload_items(self, items):
464 """Uploads a bunch of items to the isolate server.
466 It figures out what items are missing from the server and uploads only them.
469 items: list of Item instances that represents data to upload.
472 List of items that were uploaded. All other items are already there.
474 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
475 # used by swarming.py. There's no need to spawn multiple threads and try to
476 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
477 # 'push' should be performed sequentially in the context of current thread.
479 # Ensure all digests are calculated.
481 item.prepare(self._hash_algo)
483 # For each digest keep only first Item that matches it. All other items
484 # are just indistinguishable copies from the point of view of isolate
485 # server (it doesn't care about paths at all, only content and digests).
489 if seen.setdefault(item.digest, item) is not item:
491 items = seen.values()
493 logging.info('Skipped %d duplicated files', duplicates)
495 # Enqueue all upload tasks.
498 channel = threading_utils.TaskChannel()
499 for missing_item, push_state in self.get_missing_items(items):
500 missing.add(missing_item)
501 self.async_push(channel, missing_item, push_state)
503 # No need to spawn deadlock detector thread if there's nothing to upload.
505 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
506 # Wait for all started uploads to finish.
507 while len(uploaded) != len(missing):
509 item = channel.pull()
510 uploaded.append(item)
512 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
513 logging.info('All files are uploaded')
517 total_size = sum(f.size for f in items)
519 'Total: %6d, %9.1fkb',
522 cache_hit = set(items) - missing
523 cache_hit_size = sum(f.size for f in cache_hit)
525 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
527 cache_hit_size / 1024.,
528 len(cache_hit) * 100. / total,
529 cache_hit_size * 100. / total_size if total_size else 0)
531 cache_miss_size = sum(f.size for f in cache_miss)
533 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
535 cache_miss_size / 1024.,
536 len(cache_miss) * 100. / total,
537 cache_miss_size * 100. / total_size if total_size else 0)
541 def get_fetch_url(self, item):
542 """Returns an URL that can be used to fetch given item once it's uploaded.
544 Note that if namespace uses compression, data at given URL is compressed.
547 item: Item to get fetch URL for.
550 An URL or None if underlying protocol doesn't support this.
552 item.prepare(self._hash_algo)
553 return self._storage_api.get_fetch_url(item.digest)
555 def async_push(self, channel, item, push_state):
556 """Starts asynchronous push to the server in a parallel thread.
558 Can be used only after |item| was checked for presence on a server with
559 'get_missing_items' call. 'get_missing_items' returns |push_state| object
560 that contains storage specific information describing how to upload
561 the item (for example in case of cloud storage, it is signed upload URLs).
564 channel: TaskChannel that receives back |item| when upload ends.
565 item: item to upload as instance of Item class.
566 push_state: push state returned by 'get_missing_items' call for |item|.
569 None, but |channel| later receives back |item| when upload ends.
571 # Thread pool task priority.
572 priority = WorkerPool.HIGH if item.high_priority else WorkerPool.MED
575 """Pushes an Item and returns it to |channel|."""
576 item.prepare(self._hash_algo)
577 self._storage_api.push(item, push_state, content)
580 # If zipping is not required, just start a push task.
581 if not self._use_zip:
582 self.net_thread_pool.add_task_with_channel(
583 channel, priority, push, item.content())
586 # If zipping is enabled, zip in a separate thread.
588 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
589 # content right here. It will block until all file is zipped.
591 stream = zip_compress(item.content(), item.compression_level)
592 data = ''.join(stream)
593 except Exception as exc:
594 logging.error('Failed to zip \'%s\': %s', item, exc)
595 channel.send_exception()
597 self.net_thread_pool.add_task_with_channel(
598 channel, priority, push, [data])
599 self.cpu_thread_pool.add_task(priority, zip_and_push)
601 def push(self, item, push_state):
602 """Synchronously pushes a single item to the server.
604 If you need to push many items at once, consider using 'upload_items' or
605 'async_push' with instance of TaskChannel.
608 item: item to upload as instance of Item class.
609 push_state: push state returned by 'get_missing_items' call for |item|.
612 Pushed item (same object as |item|).
614 channel = threading_utils.TaskChannel()
615 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
616 self.async_push(channel, item, push_state)
617 pushed = channel.pull()
618 assert pushed is item
621 def async_fetch(self, channel, priority, digest, size, sink):
622 """Starts asynchronous fetch from the server in a parallel thread.
625 channel: TaskChannel that receives back |digest| when download ends.
626 priority: thread pool task priority for the fetch.
627 digest: hex digest of an item to download.
628 size: expected size of the item (after decompression).
629 sink: function that will be called as sink(generator).
633 # Prepare reading pipeline.
634 stream = self._storage_api.fetch(digest)
636 stream = zip_decompress(stream, DISK_FILE_CHUNK)
637 # Run |stream| through verifier that will assert its size.
638 verifier = FetchStreamVerifier(stream, size)
639 # Verified stream goes to |sink|.
641 except Exception as err:
642 logging.error('Failed to fetch %s: %s', digest, err)
646 # Don't bother with zip_thread_pool for decompression. Decompression is
647 # really fast and most probably IO bound anyway.
648 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
650 def get_missing_items(self, items):
651 """Yields items that are missing from the server.
653 Issues multiple parallel queries via StorageApi's 'contains' method.
656 items: a list of Item objects to check.
659 For each missing item it yields a pair (item, push_state), where:
660 * item - Item object that is missing (one of |items|).
661 * push_state - opaque object that contains storage specific information
662 describing how to upload the item (for example in case of cloud
663 storage, it is signed upload URLs). It can later be passed to
666 channel = threading_utils.TaskChannel()
669 # Ensure all digests are calculated.
671 item.prepare(self._hash_algo)
673 # Enqueue all requests.
674 for batch in batch_items_for_check(items):
675 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
676 self._storage_api.contains, batch)
679 # Yield results as they come in.
680 for _ in xrange(pending):
681 for missing_item, push_state in channel.pull().iteritems():
682 yield missing_item, push_state
685 def batch_items_for_check(items):
686 """Splits list of items to check for existence on the server into batches.
688 Each batch corresponds to a single 'exists?' query to the server via a call
689 to StorageApi's 'contains' method.
692 items: a list of Item objects.
695 Batches of items to query for existence in a single operation,
696 each batch is a list of Item objects.
699 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
701 for item in sorted(items, key=lambda x: x.size, reverse=True):
702 next_queries.append(item)
703 if len(next_queries) == batch_size_limit:
707 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
708 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
713 class FetchQueue(object):
714 """Fetches items from Storage and places them into LocalCache.
716 It manages multiple concurrent fetch operations. Acts as a bridge between
717 Storage and LocalCache so that Storage and LocalCache don't depend on each
721 def __init__(self, storage, cache):
722 self.storage = storage
724 self._channel = threading_utils.TaskChannel()
725 self._pending = set()
726 self._accessed = set()
727 self._fetched = cache.cached_set()
729 def add(self, digest, size=UNKNOWN_FILE_SIZE, priority=WorkerPool.MED):
730 """Starts asynchronous fetch of item |digest|."""
732 if digest in self._pending:
735 # Mark this file as in use, verify_all_cached will later ensure it is still
737 self._accessed.add(digest)
739 # Already fetched? Notify cache to update item's LRU position.
740 if digest in self._fetched:
741 # 'touch' returns True if item is in cache and not corrupted.
742 if self.cache.touch(digest, size):
744 # Item is corrupted, remove it from cache and fetch it again.
745 self._fetched.remove(digest)
746 self.cache.evict(digest)
748 # TODO(maruel): It should look at the free disk space, the current cache
749 # size and the size of the new item on every new item:
750 # - Trim the cache as more entries are listed when free disk space is low,
751 # otherwise if the amount of data downloaded during the run > free disk
752 # space, it'll crash.
753 # - Make sure there's enough free disk space to fit all dependencies of
754 # this run! If not, abort early.
757 self._pending.add(digest)
758 self.storage.async_fetch(
759 self._channel, priority, digest, size,
760 functools.partial(self.cache.write, digest))
762 def wait(self, digests):
763 """Starts a loop that waits for at least one of |digests| to be retrieved.
765 Returns the first digest retrieved.
767 # Flush any already fetched items.
768 for digest in digests:
769 if digest in self._fetched:
772 # Ensure all requested items are being fetched now.
773 assert all(digest in self._pending for digest in digests), (
774 digests, self._pending)
776 # Wait for some requested item to finish fetching.
778 digest = self._channel.pull()
779 self._pending.remove(digest)
780 self._fetched.add(digest)
781 if digest in digests:
784 # Should never reach this point due to assert above.
785 raise RuntimeError('Impossible state')
787 def inject_local_file(self, path, algo):
788 """Adds local file to the cache as if it was fetched from storage."""
789 with open(path, 'rb') as f:
791 digest = algo(data).hexdigest()
792 self.cache.write(digest, [data])
793 self._fetched.add(digest)
797 def pending_count(self):
798 """Returns number of items to be fetched."""
799 return len(self._pending)
801 def verify_all_cached(self):
802 """True if all accessed items are in cache."""
803 return self._accessed.issubset(self.cache.cached_set())
806 class FetchStreamVerifier(object):
807 """Verifies that fetched file is valid before passing it to the LocalCache."""
809 def __init__(self, stream, expected_size):
811 self.expected_size = expected_size
812 self.current_size = 0
815 """Generator that yields same items as |stream|.
817 Verifies |stream| is complete before yielding a last chunk to consumer.
819 Also wraps IOError produced by consumer into MappingError exceptions since
820 otherwise Storage will retry fetch on unrelated local cache errors.
822 # Read one chunk ahead, keep it in |stored|.
823 # That way a complete stream can be verified before pushing last chunk
826 for chunk in self.stream:
827 assert chunk is not None
828 if stored is not None:
829 self._inspect_chunk(stored, is_last=False)
832 except IOError as exc:
833 raise MappingError('Failed to store an item in cache: %s' % exc)
835 if stored is not None:
836 self._inspect_chunk(stored, is_last=True)
839 except IOError as exc:
840 raise MappingError('Failed to store an item in cache: %s' % exc)
842 def _inspect_chunk(self, chunk, is_last):
843 """Called for each fetched chunk before passing it to consumer."""
844 self.current_size += len(chunk)
845 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
846 (self.expected_size != self.current_size)):
847 raise IOError('Incorrect file size: expected %d, got %d' % (
848 self.expected_size, self.current_size))
851 class StorageApi(object):
852 """Interface for classes that implement low-level storage operations.
854 StorageApi is oblivious of compression and hashing scheme used. This details
855 are handled in higher level Storage class.
857 Clients should generally not use StorageApi directly. Storage class is
858 preferred since it implements compression and upload optimizations.
863 """Location of a backing store that this class is using.
865 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
866 server, for FileSystem is it a path in file system.
868 raise NotImplementedError()
872 """Isolate namespace used by this storage.
874 Indirectly defines hashing scheme and compression method used.
876 raise NotImplementedError()
878 def get_fetch_url(self, digest):
879 """Returns an URL that can be used to fetch an item with given digest.
882 digest: hex digest of item to fetch.
885 An URL or None if the protocol doesn't support this.
887 raise NotImplementedError()
889 def fetch(self, digest, offset=0):
890 """Fetches an object and yields its content.
893 digest: hash digest of item to download.
894 offset: offset (in bytes) from the start of the file to resume fetch from.
897 Chunks of downloaded item (as str objects).
899 raise NotImplementedError()
901 def push(self, item, push_state, content=None):
902 """Uploads an |item| with content generated by |content| generator.
904 |item| MUST go through 'contains' call to get |push_state| before it can
905 be pushed to the storage.
907 To be clear, here is one possible usage:
908 all_items = [... all items to push as Item subclasses ...]
909 for missing_item, push_state in storage_api.contains(all_items).items():
910 storage_api.push(missing_item, push_state)
912 When pushing to a namespace with compression, data that should be pushed
913 and data provided by the item is not the same. In that case |content| is
914 not None and it yields chunks of compressed data (using item.content() as
915 a source of original uncompressed data). This is implemented by Storage
919 item: Item object that holds information about an item being pushed.
920 push_state: push state object as returned by 'contains' call.
921 content: a generator that yields chunks to push, item.content() if None.
926 raise NotImplementedError()
928 def contains(self, items):
929 """Checks for |items| on the server, prepares missing ones for upload.
932 items: list of Item objects to check for presence.
935 A dict missing Item -> opaque push state object to be passed to 'push'.
936 See doc string for 'push'.
938 raise NotImplementedError()
941 class _IsolateServerPushState(object):
942 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
944 Note this needs to be a global class to support pickling.
947 def __init__(self, upload_url, finalize_url):
948 self.upload_url = upload_url
949 self.finalize_url = finalize_url
950 self.uploaded = False
951 self.finalized = False
954 class IsolateServer(StorageApi):
955 """StorageApi implementation that downloads and uploads to Isolate Server.
957 It uploads and downloads directly from Google Storage whenever appropriate.
958 Works only within single namespace.
961 def __init__(self, base_url, namespace):
962 super(IsolateServer, self).__init__()
963 assert base_url.startswith('http'), base_url
964 self._base_url = base_url.rstrip('/')
965 self._namespace = namespace
966 self._lock = threading.Lock()
967 self._server_caps = None
970 def _generate_handshake_request():
971 """Returns a dict to be sent as handshake request body."""
972 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
974 'client_app_version': __version__,
976 'protocol_version': ISOLATE_PROTOCOL_VERSION,
981 def _validate_handshake_response(caps):
982 """Validates and normalizes handshake response."""
983 logging.info('Protocol version: %s', caps['protocol_version'])
984 logging.info('Server version: %s', caps['server_app_version'])
985 if caps.get('error'):
986 raise MappingError(caps['error'])
987 if not caps['access_token']:
988 raise ValueError('access_token is missing')
992 def _server_capabilities(self):
993 """Performs handshake with the server if not yet done.
996 Server capabilities dictionary as returned by /handshake endpoint.
999 MappingError if server rejects the handshake.
1001 # TODO(maruel): Make this request much earlier asynchronously while the
1002 # files are being enumerated.
1004 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1005 # namespace-level ACLs to this call.
1007 if self._server_caps is None:
1008 request_body = json.dumps(
1009 self._generate_handshake_request(), separators=(',', ':'))
1010 response = net.url_read(
1011 url=self._base_url + '/content-gs/handshake',
1013 content_type='application/json',
1015 if response is None:
1016 raise MappingError('Failed to perform handshake.')
1018 caps = json.loads(response)
1019 if not isinstance(caps, dict):
1020 raise ValueError('Expecting JSON dict')
1021 self._server_caps = self._validate_handshake_response(caps)
1022 except (ValueError, KeyError, TypeError) as exc:
1023 # KeyError exception has very confusing str conversion: it's just a
1024 # missing key value and nothing else. So print exception class name
1026 raise MappingError('Invalid handshake response (%s): %s' % (
1027 exc.__class__.__name__, exc))
1028 return self._server_caps
1032 return self._base_url
1035 def namespace(self):
1036 return self._namespace
1038 def get_fetch_url(self, digest):
1039 assert isinstance(digest, basestring)
1040 return '%s/content-gs/retrieve/%s/%s' % (
1041 self._base_url, self._namespace, digest)
1043 def fetch(self, digest, offset=0):
1044 source_url = self.get_fetch_url(digest)
1045 logging.debug('download_file(%s, %d)', source_url, offset)
1047 connection = net.url_open(
1049 read_timeout=DOWNLOAD_READ_TIMEOUT,
1050 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1053 raise IOError('Request failed - %s' % source_url)
1055 # If |offset| is used, verify server respects it by checking Content-Range.
1057 content_range = connection.get_header('Content-Range')
1058 if not content_range:
1059 raise IOError('Missing Content-Range header')
1061 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1062 # According to a spec, <size> can be '*' meaning "Total size of the file
1063 # is not known in advance".
1065 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1068 content_offset = int(match.group(1))
1069 last_byte_index = int(match.group(2))
1070 size = None if match.group(3) == '*' else int(match.group(3))
1072 raise IOError('Invalid Content-Range header: %s' % content_range)
1074 # Ensure returned offset equals requested one.
1075 if offset != content_offset:
1076 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1077 offset, content_offset, content_range))
1079 # Ensure entire tail of the file is returned.
1080 if size is not None and last_byte_index + 1 != size:
1081 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1083 return stream_read(connection, NET_IO_FILE_CHUNK)
1085 def push(self, item, push_state, content=None):
1086 assert isinstance(item, Item)
1087 assert item.digest is not None
1088 assert item.size is not None
1089 assert isinstance(push_state, _IsolateServerPushState)
1090 assert not push_state.finalized
1092 # Default to item.content().
1093 content = item.content() if content is None else content
1095 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1096 if isinstance(content, basestring):
1097 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1100 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1101 # If |content| is indeed a generator, it can not be re-winded back
1102 # to the beginning of the stream. A retry will find it exhausted. A possible
1103 # solution is to wrap |content| generator with some sort of caching
1104 # restartable generator. It should be done alongside streaming support
1107 # This push operation may be a retry after failed finalization call below,
1108 # no need to reupload contents in that case.
1109 if not push_state.uploaded:
1110 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1111 # upload support is implemented.
1112 if isinstance(content, list) and len(content) == 1:
1113 content = content[0]
1115 content = ''.join(content)
1116 # PUT file to |upload_url|.
1117 response = net.url_read(
1118 url=push_state.upload_url,
1120 content_type='application/octet-stream',
1122 if response is None:
1123 raise IOError('Failed to upload a file %s to %s' % (
1124 item.digest, push_state.upload_url))
1125 push_state.uploaded = True
1128 'A file %s already uploaded, retrying finalization only', item.digest)
1130 # Optionally notify the server that it's done.
1131 if push_state.finalize_url:
1132 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1133 # send it to isolated server. That way isolate server can verify that
1134 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1136 response = net.url_read(
1137 url=push_state.finalize_url,
1139 content_type='application/json',
1141 if response is None:
1142 raise IOError('Failed to finalize an upload of %s' % item.digest)
1143 push_state.finalized = True
1145 def contains(self, items):
1146 logging.info('Checking existence of %d files...', len(items))
1148 # Ensure all items were initialized with 'prepare' call. Storage does that.
1149 assert all(i.digest is not None and i.size is not None for i in items)
1151 # Request body is a json encoded list of dicts.
1156 'i': int(item.high_priority),
1160 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1163 urllib.quote(self._server_capabilities['access_token']))
1164 response_body = net.url_read(
1166 data=json.dumps(body, separators=(',', ':')),
1167 content_type='application/json',
1169 if response_body is None:
1170 raise MappingError('Failed to execute /pre-upload query')
1172 # Response body is a list of push_urls (or null if file is already present).
1174 response = json.loads(response_body)
1175 if not isinstance(response, list):
1176 raise ValueError('Expecting response with json-encoded list')
1177 if len(response) != len(items):
1179 'Incorrect number of items in the list, expected %d, '
1180 'but got %d' % (len(items), len(response)))
1181 except ValueError as err:
1183 'Invalid response from server: %s, body is %s' % (err, response_body))
1185 # Pick Items that are missing, attach _PushState to them.
1187 for i, push_urls in enumerate(response):
1189 assert len(push_urls) == 2, str(push_urls)
1190 missing_items[items[i]] = _IsolateServerPushState(
1191 push_urls[0], push_urls[1])
1192 logging.info('Queried %d files, %d cache hit',
1193 len(items), len(items) - len(missing_items))
1194 return missing_items
1197 class FileSystem(StorageApi):
1198 """StorageApi implementation that fetches data from the file system.
1200 The common use case is a NFS/CIFS file server that is mounted locally that is
1201 used to fetch the file on a local partition.
1204 # Used for push_state instead of None. That way caller is forced to
1205 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1206 _DUMMY_PUSH_STATE = object()
1208 def __init__(self, base_path, namespace):
1209 super(FileSystem, self).__init__()
1210 self._base_path = base_path
1211 self._namespace = namespace
1215 return self._base_path
1218 def namespace(self):
1219 return self._namespace
1221 def get_fetch_url(self, digest):
1224 def fetch(self, digest, offset=0):
1225 assert isinstance(digest, basestring)
1226 return file_read(os.path.join(self._base_path, digest), offset=offset)
1228 def push(self, item, push_state, content=None):
1229 assert isinstance(item, Item)
1230 assert item.digest is not None
1231 assert item.size is not None
1232 assert push_state is self._DUMMY_PUSH_STATE
1233 content = item.content() if content is None else content
1234 if isinstance(content, basestring):
1235 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1237 file_write(os.path.join(self._base_path, item.digest), content)
1239 def contains(self, items):
1240 assert all(i.digest is not None and i.size is not None for i in items)
1242 (item, self._DUMMY_PUSH_STATE) for item in items
1243 if not os.path.exists(os.path.join(self._base_path, item.digest))
1247 class LocalCache(object):
1248 """Local cache that stores objects fetched via Storage.
1250 It can be accessed concurrently from multiple threads, so it should protect
1251 its internal state with some lock.
1255 def __enter__(self):
1256 """Context manager interface."""
1259 def __exit__(self, _exc_type, _exec_value, _traceback):
1260 """Context manager interface."""
1263 def cached_set(self):
1264 """Returns a set of all cached digests (always a new object)."""
1265 raise NotImplementedError()
1267 def touch(self, digest, size):
1268 """Ensures item is not corrupted and updates its LRU position.
1271 digest: hash digest of item to check.
1272 size: expected size of this item.
1275 True if item is in cache and not corrupted.
1277 raise NotImplementedError()
1279 def evict(self, digest):
1280 """Removes item from cache if it's there."""
1281 raise NotImplementedError()
1283 def read(self, digest):
1284 """Returns contents of the cached item as a single str."""
1285 raise NotImplementedError()
1287 def write(self, digest, content):
1288 """Reads data from |content| generator and stores it in cache."""
1289 raise NotImplementedError()
1291 def hardlink(self, digest, dest, file_mode):
1292 """Ensures file at |dest| has same content as cached |digest|.
1294 If file_mode is provided, it is used to set the executable bit if
1297 raise NotImplementedError()
1300 class MemoryCache(LocalCache):
1301 """LocalCache implementation that stores everything in memory."""
1303 def __init__(self, file_mode_mask=0500):
1305 file_mode_mask: bit mask to AND file mode with. Default value will make
1306 all mapped files to be read only.
1308 super(MemoryCache, self).__init__()
1309 self._file_mode_mask = file_mode_mask
1310 # Let's not assume dict is thread safe.
1311 self._lock = threading.Lock()
1314 def cached_set(self):
1316 return set(self._contents)
1318 def touch(self, digest, size):
1320 return digest in self._contents
1322 def evict(self, digest):
1324 self._contents.pop(digest, None)
1326 def read(self, digest):
1328 return self._contents[digest]
1330 def write(self, digest, content):
1331 # Assemble whole stream before taking the lock.
1332 data = ''.join(content)
1334 self._contents[digest] = data
1336 def hardlink(self, digest, dest, file_mode):
1337 """Since data is kept in memory, there is no filenode to hardlink."""
1338 file_write(dest, [self.read(digest)])
1339 if file_mode is not None:
1340 os.chmod(dest, file_mode & self._file_mode_mask)
1343 def get_hash_algo(_namespace):
1344 """Return hash algorithm class to use when uploading to given |namespace|."""
1345 # TODO(vadimsh): Implement this at some point.
1349 def is_namespace_with_compression(namespace):
1350 """Returns True if given |namespace| stores compressed objects."""
1351 return namespace.endswith(('-gzip', '-deflate'))
1354 def get_storage_api(file_or_url, namespace):
1355 """Returns an object that implements low-level StorageApi interface.
1357 It is used by Storage to work with single isolate |namespace|. It should
1358 rarely be used directly by clients, see 'get_storage' for
1359 a better alternative.
1362 file_or_url: a file path to use file system based storage, or URL of isolate
1363 service to use shared cloud based storage.
1364 namespace: isolate namespace to operate in, also defines hashing and
1365 compression scheme used, i.e. namespace names that end with '-gzip'
1366 store compressed data.
1369 Instance of StorageApi subclass.
1371 if file_path.is_url(file_or_url):
1372 return IsolateServer(file_or_url, namespace)
1374 return FileSystem(file_or_url, namespace)
1377 def get_storage(file_or_url, namespace):
1378 """Returns Storage class that can upload and download from |namespace|.
1381 file_or_url: a file path to use file system based storage, or URL of isolate
1382 service to use shared cloud based storage.
1383 namespace: isolate namespace to operate in, also defines hashing and
1384 compression scheme used, i.e. namespace names that end with '-gzip'
1385 store compressed data.
1388 Instance of Storage.
1390 return Storage(get_storage_api(file_or_url, namespace))
1393 def expand_symlinks(indir, relfile):
1394 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1395 build tree as if they were ordinary directories/files. Returns the final
1396 symlink-free target and a list of paths to symlinks encountered in the
1399 The rule about symlinks outside the build tree is for the benefit of the
1400 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1403 Fails when a directory loop is detected, although in theory we could support
1406 is_directory = relfile.endswith(os.path.sep)
1408 todo = relfile.strip(os.path.sep)
1412 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1415 todo = file_path.fix_native_path_case(done, todo)
1416 done = os.path.join(done, todo)
1418 symlink_path = os.path.join(done, pre_symlink, symlink)
1419 post_symlink = post_symlink.lstrip(os.path.sep)
1420 # readlink doesn't exist on Windows.
1421 # pylint: disable=E1101
1422 target = os.path.normpath(os.path.join(done, pre_symlink))
1423 symlink_target = os.readlink(symlink_path)
1424 if os.path.isabs(symlink_target):
1425 # Absolute path are considered a normal directories. The use case is
1426 # generally someone who puts the output directory on a separate drive.
1427 target = symlink_target
1429 # The symlink itself could be using the wrong path case.
1430 target = file_path.fix_native_path_case(target, symlink_target)
1432 if not os.path.exists(target):
1434 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1435 target = file_path.get_native_path_case(target)
1436 if not file_path.path_starts_with(indir, target):
1440 if file_path.path_starts_with(target, symlink_path):
1442 'Can\'t map recursive symlink reference %s -> %s' %
1443 (symlink_path, target))
1444 logging.info('Found symlink: %s -> %s', symlink_path, target)
1445 symlinks.append(os.path.relpath(symlink_path, indir))
1446 # Treat the common prefix of the old and new paths as done, and start
1448 target = target.split(os.path.sep)
1449 symlink_path = symlink_path.split(os.path.sep)
1451 for target_piece, symlink_path_piece in zip(target, symlink_path):
1452 if target_piece == symlink_path_piece:
1456 done = os.path.sep.join(target[:prefix_length])
1457 todo = os.path.join(
1458 os.path.sep.join(target[prefix_length:]), post_symlink)
1460 relfile = os.path.relpath(done, indir)
1461 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1462 return relfile, symlinks
1465 def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1466 """Expands a single input. It can result in multiple outputs.
1468 This function is recursive when relfile is a directory.
1470 Note: this code doesn't properly handle recursive symlink like one created
1474 if os.path.isabs(relfile):
1475 raise MappingError('Can\'t map absolute path %s' % relfile)
1477 infile = file_path.normpath(os.path.join(indir, relfile))
1478 if not infile.startswith(indir):
1479 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1481 filepath = os.path.join(indir, relfile)
1482 native_filepath = file_path.get_native_path_case(filepath)
1483 if filepath != native_filepath:
1484 # Special case './'.
1485 if filepath != native_filepath + '.' + os.path.sep:
1486 # While it'd be nice to enforce path casing on Windows, it's impractical.
1487 # Also give up enforcing strict path case on OSX. Really, it's that sad.
1488 # The case where it happens is very specific and hard to reproduce:
1489 # get_native_path_case(
1490 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1491 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1493 # Note that this is really something deep in OSX because running
1494 # ls Foo.framework/Versions/A
1495 # will print out 'Resources', while file_path.get_native_path_case()
1496 # returns a lower case 'r'.
1498 # So *something* is happening under the hood resulting in the command 'ls'
1499 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1501 if sys.platform not in ('darwin', 'win32'):
1503 'File path doesn\'t equal native file path\n%s != %s' %
1504 (filepath, native_filepath))
1508 relfile, symlinks = expand_symlinks(indir, relfile)
1510 if relfile.endswith(os.path.sep):
1511 if not os.path.isdir(infile):
1513 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1515 # Special case './'.
1516 if relfile.startswith('.' + os.path.sep):
1517 relfile = relfile[2:]
1520 for filename in os.listdir(infile):
1521 inner_relfile = os.path.join(relfile, filename)
1522 if blacklist and blacklist(inner_relfile):
1524 if os.path.isdir(os.path.join(indir, inner_relfile)):
1525 inner_relfile += os.path.sep
1527 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1530 except OSError as e:
1532 'Unable to iterate over directory %s.\n%s' % (infile, e))
1534 # Always add individual files even if they were blacklisted.
1535 if os.path.isdir(infile):
1537 'Input directory %s must have a trailing slash' % infile)
1539 if not os.path.isfile(infile):
1540 raise MappingError('Input file %s doesn\'t exist' % infile)
1542 return symlinks + [relfile]
1545 def process_input(filepath, prevdict, read_only, algo):
1546 """Processes an input file, a dependency, and return meta data about it.
1549 - Retrieves the file mode, file size, file timestamp, file link
1550 destination if it is a file link and calcultate the SHA-1 of the file's
1551 content if the path points to a file and not a symlink.
1554 filepath: File to act on.
1555 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1556 to skip recalculating the hash. Optional.
1557 read_only: If 1 or 2, the file mode is manipulated. In practice, only save
1558 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1559 windows, mode is not set since all files are 'executable' by
1561 algo: Hashing algorithm used.
1564 The necessary data to create a entry in the 'files' section of an .isolated
1568 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1569 # if prevdict.get('T') == True:
1570 # # The file's content is ignored. Skip the time and hard code mode.
1572 # out['h'] = algo().hexdigest()
1576 # Always check the file stat and check if it is a link. The timestamp is used
1577 # to know if the file's content/symlink destination should be looked into.
1578 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1579 # There is the risk of the file's timestamp being reset to its last value
1580 # manually while its content changed. We don't protect against that use case.
1582 filestats = os.lstat(filepath)
1584 # The file is not present.
1585 raise MappingError('%s is missing' % filepath)
1586 is_link = stat.S_ISLNK(filestats.st_mode)
1588 if sys.platform != 'win32':
1589 # Ignore file mode on Windows since it's not really useful there.
1590 filemode = stat.S_IMODE(filestats.st_mode)
1591 # Remove write access for group and all access to 'others'.
1592 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1594 filemode &= ~stat.S_IWUSR
1595 if filemode & stat.S_IXUSR:
1596 filemode |= stat.S_IXGRP
1598 filemode &= ~stat.S_IXGRP
1602 # Used to skip recalculating the hash or link destination. Use the most recent
1604 # TODO(maruel): Save it in the .state file instead of .isolated so the
1605 # .isolated file is deterministic.
1606 out['t'] = int(round(filestats.st_mtime))
1609 out['s'] = filestats.st_size
1610 # If the timestamp wasn't updated and the file size is still the same, carry
1612 if (prevdict.get('t') == out['t'] and
1613 prevdict.get('s') == out['s']):
1614 # Reuse the previous hash if available.
1615 out['h'] = prevdict.get('h')
1616 if not out.get('h'):
1617 out['h'] = hash_file(filepath, algo)
1619 # If the timestamp wasn't updated, carry on the link destination.
1620 if prevdict.get('t') == out['t']:
1621 # Reuse the previous link destination if available.
1622 out['l'] = prevdict.get('l')
1623 if out.get('l') is None:
1624 # The link could be in an incorrect path case. In practice, this only
1625 # happen on OSX on case insensitive HFS.
1626 # TODO(maruel): It'd be better if it was only done once, in
1627 # expand_directory_and_symlink(), so it would not be necessary to do again
1629 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1630 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1631 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1632 out['l'] = os.path.relpath(native_dest, filedir)
1636 def save_isolated(isolated, data):
1637 """Writes one or multiple .isolated files.
1639 Note: this reference implementation does not create child .isolated file so it
1640 always returns an empty list.
1642 Returns the list of child isolated files that are included by |isolated|.
1644 # Make sure the data is valid .isolated data by 'reloading' it.
1645 algo = SUPPORTED_ALGOS[data['algo']]
1646 load_isolated(json.dumps(data), algo)
1647 tools.write_json(isolated, data, True)
1651 def upload_tree(base_url, indir, infiles, namespace):
1652 """Uploads the given tree to the given url.
1655 base_url: The base url, it is assume that |base_url|/has/ can be used to
1656 query if an element was already uploaded, and |base_url|/store/
1657 can be used to upload a new element.
1658 indir: Root directory the infiles are based in.
1659 infiles: dict of files to upload from |indir| to |base_url|.
1660 namespace: The namespace to use on the server.
1662 logging.info('upload_tree(indir=%s, files=%d)', indir, len(infiles))
1664 # Convert |indir| + |infiles| into a list of FileItem objects.
1665 # Filter out symlinks, since they are not represented by items on isolate
1669 path=os.path.join(indir, filepath),
1670 digest=metadata['h'],
1672 high_priority=metadata.get('priority') == '0')
1673 for filepath, metadata in infiles.iteritems()
1674 if 'l' not in metadata
1677 with get_storage(base_url, namespace) as storage:
1678 storage.upload_items(items)
1682 def load_isolated(content, algo):
1683 """Verifies the .isolated file is valid and loads this object with the json
1687 - content: raw serialized content to load.
1688 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1689 algorithm used on the Isolate Server.
1692 data = json.loads(content)
1694 raise ConfigError('Failed to parse: %s...' % content[:100])
1696 if not isinstance(data, dict):
1697 raise ConfigError('Expected dict, got %r' % data)
1699 # Check 'version' first, since it could modify the parsing after.
1700 value = data.get('version', '1.0')
1701 if not isinstance(value, basestring):
1702 raise ConfigError('Expected string, got %r' % value)
1704 version = tuple(map(int, value.split('.')))
1706 raise ConfigError('Expected valid version, got %r' % value)
1708 expected_version = tuple(map(int, ISOLATED_FILE_VERSION.split('.')))
1709 # Major version must match.
1710 if version[0] != expected_version[0]:
1712 'Expected compatible \'%s\' version, got %r' %
1713 (ISOLATED_FILE_VERSION, value))
1716 # TODO(maruel): Remove the default around Jan 2014.
1717 # Default the algorithm used in the .isolated file itself, falls back to
1718 # 'sha-1' if unspecified.
1719 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1721 for key, value in data.iteritems():
1723 if not isinstance(value, basestring):
1724 raise ConfigError('Expected string, got %r' % value)
1725 if value not in SUPPORTED_ALGOS:
1727 'Expected one of \'%s\', got %r' %
1728 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1729 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1731 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1733 elif key == 'command':
1734 if not isinstance(value, list):
1735 raise ConfigError('Expected list, got %r' % value)
1737 raise ConfigError('Expected non-empty command')
1738 for subvalue in value:
1739 if not isinstance(subvalue, basestring):
1740 raise ConfigError('Expected string, got %r' % subvalue)
1742 elif key == 'files':
1743 if not isinstance(value, dict):
1744 raise ConfigError('Expected dict, got %r' % value)
1745 for subkey, subvalue in value.iteritems():
1746 if not isinstance(subkey, basestring):
1747 raise ConfigError('Expected string, got %r' % subkey)
1748 if not isinstance(subvalue, dict):
1749 raise ConfigError('Expected dict, got %r' % subvalue)
1750 for subsubkey, subsubvalue in subvalue.iteritems():
1751 if subsubkey == 'l':
1752 if not isinstance(subsubvalue, basestring):
1753 raise ConfigError('Expected string, got %r' % subsubvalue)
1754 elif subsubkey == 'm':
1755 if not isinstance(subsubvalue, int):
1756 raise ConfigError('Expected int, got %r' % subsubvalue)
1757 elif subsubkey == 'h':
1758 if not is_valid_hash(subsubvalue, algo):
1759 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1760 elif subsubkey == 's':
1761 if not isinstance(subsubvalue, (int, long)):
1762 raise ConfigError('Expected int or long, got %r' % subsubvalue)
1764 raise ConfigError('Unknown subsubkey %s' % subsubkey)
1765 if bool('h' in subvalue) == bool('l' in subvalue):
1767 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1769 if bool('h' in subvalue) != bool('s' in subvalue):
1771 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1773 if bool('s' in subvalue) == bool('l' in subvalue):
1775 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1777 if bool('l' in subvalue) and bool('m' in subvalue):
1779 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
1782 elif key == 'includes':
1783 if not isinstance(value, list):
1784 raise ConfigError('Expected list, got %r' % value)
1786 raise ConfigError('Expected non-empty includes list')
1787 for subvalue in value:
1788 if not is_valid_hash(subvalue, algo):
1789 raise ConfigError('Expected sha-1, got %r' % subvalue)
1792 if version >= (1, 4):
1793 raise ConfigError('Key \'os\' is not allowed starting version 1.4')
1795 elif key == 'read_only':
1796 if not value in (0, 1, 2):
1797 raise ConfigError('Expected 0, 1 or 2, got %r' % value)
1799 elif key == 'relative_cwd':
1800 if not isinstance(value, basestring):
1801 raise ConfigError('Expected string, got %r' % value)
1803 elif key == 'version':
1804 # Already checked above.
1808 raise ConfigError('Unknown key %r' % key)
1810 # Automatically fix os.path.sep if necessary. While .isolated files are always
1811 # in the the native path format, someone could want to download an .isolated
1812 # tree from another OS.
1813 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1815 data['files'] = dict(
1816 (k.replace(wrong_path_sep, os.path.sep), v)
1817 for k, v in data['files'].iteritems())
1818 for v in data['files'].itervalues():
1820 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1821 if 'relative_cwd' in data:
1822 data['relative_cwd'] = data['relative_cwd'].replace(
1823 wrong_path_sep, os.path.sep)
1827 class IsolatedFile(object):
1828 """Represents a single parsed .isolated file."""
1829 def __init__(self, obj_hash, algo):
1830 """|obj_hash| is really the sha-1 of the file."""
1831 logging.debug('IsolatedFile(%s)' % obj_hash)
1832 self.obj_hash = obj_hash
1834 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1835 # .isolate and all the .isolated files recursively included by it with
1836 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1837 # .isolated file in the hash table, is important, as the later ones are not
1838 # processed until the firsts are retrieved and read.
1839 self.can_fetch = False
1843 # A IsolatedFile instance, one per object in self.includes.
1846 # Set once the .isolated file is loaded.
1847 self._is_parsed = False
1848 # Set once the files are fetched.
1849 self.files_fetched = False
1851 def load(self, content):
1852 """Verifies the .isolated file is valid and loads this object with the json
1855 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1856 assert not self._is_parsed
1857 self.data = load_isolated(content, self.algo)
1859 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1861 self._is_parsed = True
1863 def fetch_files(self, fetch_queue, files):
1864 """Adds files in this .isolated file not present in |files| dictionary.
1866 Preemptively request files.
1868 Note that |files| is modified by this function.
1870 assert self.can_fetch
1871 if not self._is_parsed or self.files_fetched:
1873 logging.debug('fetch_files(%s)' % self.obj_hash)
1874 for filepath, properties in self.data.get('files', {}).iteritems():
1875 # Root isolated has priority on the files being mapped. In particular,
1876 # overriden files must not be fetched.
1877 if filepath not in files:
1878 files[filepath] = properties
1879 if 'h' in properties:
1880 # Preemptively request files.
1881 logging.debug('fetching %s' % filepath)
1882 fetch_queue.add(properties['h'], properties['s'], WorkerPool.MED)
1883 self.files_fetched = True
1886 class Settings(object):
1887 """Results of a completely parsed .isolated file."""
1891 self.read_only = None
1892 self.relative_cwd = None
1893 # The main .isolated file, a IsolatedFile instance.
1896 def load(self, fetch_queue, root_isolated_hash, algo):
1897 """Loads the .isolated and all the included .isolated asynchronously.
1899 It enables support for "included" .isolated files. They are processed in
1900 strict order but fetched asynchronously from the cache. This is important so
1901 that a file in an included .isolated file that is overridden by an embedding
1902 .isolated file is not fetched needlessly. The includes are fetched in one
1903 pass and the files are fetched as soon as all the ones on the left-side
1904 of the tree were fetched.
1906 The prioritization is very important here for nested .isolated files.
1907 'includes' have the highest priority and the algorithm is optimized for both
1908 deep and wide trees. A deep one is a long link of .isolated files referenced
1909 one at a time by one item in 'includes'. A wide one has a large number of
1910 'includes' in a single .isolated file. 'left' is defined as an included
1911 .isolated file earlier in the 'includes' list. So the order of the elements
1912 in 'includes' is important.
1914 self.root = IsolatedFile(root_isolated_hash, algo)
1916 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1918 # Set of hashes of already retrieved items to refuse recursive includes.
1921 def retrieve(isolated_file):
1922 h = isolated_file.obj_hash
1924 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1925 assert h not in pending
1927 pending[h] = isolated_file
1928 fetch_queue.add(h, priority=WorkerPool.HIGH)
1933 item_hash = fetch_queue.wait(pending)
1934 item = pending.pop(item_hash)
1935 item.load(fetch_queue.cache.read(item_hash))
1936 if item_hash == root_isolated_hash:
1937 # It's the root item.
1938 item.can_fetch = True
1940 for new_child in item.children:
1943 # Traverse the whole tree to see if files can now be fetched.
1944 self._traverse_tree(fetch_queue, self.root)
1947 return all(check(x) for x in n.children) and n.files_fetched
1948 assert check(self.root)
1950 self.relative_cwd = self.relative_cwd or ''
1952 def _traverse_tree(self, fetch_queue, node):
1954 if not node.files_fetched:
1955 self._update_self(fetch_queue, node)
1957 for i in node.children:
1961 # Automatically mark the first one as fetcheable.
1964 self._traverse_tree(fetch_queue, i)
1966 def _update_self(self, fetch_queue, node):
1967 node.fetch_files(fetch_queue, self.files)
1969 if not self.command and node.data.get('command'):
1970 # Ensure paths are correctly separated on windows.
1971 self.command = node.data['command']
1973 self.command[0] = self.command[0].replace('/', os.path.sep)
1974 self.command = tools.fix_python_path(self.command)
1975 if self.read_only is None and node.data.get('read_only') is not None:
1976 self.read_only = node.data['read_only']
1977 if (self.relative_cwd is None and
1978 node.data.get('relative_cwd') is not None):
1979 self.relative_cwd = node.data['relative_cwd']
1982 def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
1983 """Aggressively downloads the .isolated file(s), then download all the files.
1986 isolated_hash: hash of the root *.isolated file.
1987 storage: Storage class that communicates with isolate storage.
1988 cache: LocalCache class that knows how to store and map files locally.
1989 outdir: Output directory to map file tree to.
1990 require_command: Ensure *.isolated specifies a command to run.
1993 Settings object that holds details about loaded *.isolated file.
1996 'fetch_isolated(%s, %s, %s, %s, %s)',
1997 isolated_hash, storage, cache, outdir, require_command)
1998 # Hash algorithm to use, defined by namespace |storage| is using.
1999 algo = storage.hash_algo
2001 fetch_queue = FetchQueue(storage, cache)
2002 settings = Settings()
2004 with tools.Profiler('GetIsolateds'):
2005 # Optionally support local files by manually adding them to cache.
2006 if not is_valid_hash(isolated_hash, algo):
2007 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
2009 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
2012 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2013 'valid hash?' % isolated_hash)
2015 # Load all *.isolated and start loading rest of the files.
2016 settings.load(fetch_queue, isolated_hash, algo)
2017 if require_command and not settings.command:
2018 # TODO(vadimsh): All fetch operations are already enqueue and there's no
2019 # easy way to cancel them.
2020 raise ConfigError('No command to run')
2022 with tools.Profiler('GetRest'):
2023 # Create file system hierarchy.
2024 if not os.path.isdir(outdir):
2026 create_directories(outdir, settings.files)
2027 create_symlinks(outdir, settings.files.iteritems())
2029 # Ensure working directory exists.
2030 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
2031 if not os.path.isdir(cwd):
2034 # Multimap: digest -> list of pairs (path, props).
2036 for filepath, props in settings.files.iteritems():
2038 remaining.setdefault(props['h'], []).append((filepath, props))
2040 # Now block on the remaining files to be downloaded and mapped.
2041 logging.info('Retrieving remaining files (%d of them)...',
2042 fetch_queue.pending_count)
2043 last_update = time.time()
2044 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
2048 # Wait for any item to finish fetching to cache.
2049 digest = fetch_queue.wait(remaining)
2051 # Link corresponding files to a fetched item in cache.
2052 for filepath, props in remaining.pop(digest):
2054 digest, os.path.join(outdir, filepath), props.get('m'))
2057 duration = time.time() - last_update
2058 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2059 msg = '%d files remaining...' % len(remaining)
2062 last_update = time.time()
2064 # Cache could evict some items we just tried to fetch, it's a fatal error.
2065 if not fetch_queue.verify_all_cached():
2066 raise MappingError('Cache is too small to hold all requested files')
2070 def directory_to_metadata(root, algo, blacklist):
2071 """Returns the FileItem list and .isolated metadata for a directory."""
2072 root = file_path.get_native_path_case(root)
2073 paths = expand_directory_and_symlink(
2074 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
2076 (relpath, process_input(os.path.join(root, relpath), {}, False, algo))
2077 for relpath in paths
2079 for v in metadata.itervalues():
2083 path=os.path.join(root, relpath),
2086 high_priority=relpath.endswith('.isolated'))
2087 for relpath, meta in metadata.iteritems() if 'h' in meta
2089 return items, metadata
2092 def archive_files_to_storage(storage, files, blacklist):
2093 """Stores every entries and returns the relevant data.
2096 storage: a Storage object that communicates with the remote object store.
2097 files: list of file paths to upload. If a directory is specified, a
2098 .isolated file is created and its hash is returned.
2099 blacklist: function that returns True if a file should be omitted.
2101 assert all(isinstance(i, unicode) for i in files), files
2102 if len(files) != len(set(map(os.path.abspath, files))):
2103 raise Error('Duplicate entries found.')
2106 # The temporary directory is only created as needed.
2109 # TODO(maruel): Yield the files to a worker thread.
2110 items_to_upload = []
2113 filepath = os.path.abspath(f)
2114 if os.path.isdir(filepath):
2115 # Uploading a whole directory.
2116 items, metadata = directory_to_metadata(
2117 filepath, storage.hash_algo, blacklist)
2119 # Create the .isolated file.
2121 tempdir = tempfile.mkdtemp(prefix='isolateserver')
2122 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
2125 'algo': SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
2127 'version': ISOLATED_FILE_VERSION,
2129 save_isolated(isolated, data)
2130 h = hash_file(isolated, storage.hash_algo)
2131 items_to_upload.extend(items)
2132 items_to_upload.append(
2136 size=os.stat(isolated).st_size,
2137 high_priority=True))
2138 results.append((h, f))
2140 elif os.path.isfile(filepath):
2141 h = hash_file(filepath, storage.hash_algo)
2142 items_to_upload.append(
2146 size=os.stat(filepath).st_size,
2147 high_priority=f.endswith('.isolated')))
2148 results.append((h, f))
2150 raise Error('%s is neither a file or directory.' % f)
2152 raise Error('Failed to process %s.' % f)
2153 # Technically we would care about which files were uploaded but we don't
2155 _uploaded_files = storage.upload_items(items_to_upload)
2159 shutil.rmtree(tempdir)
2162 def archive(out, namespace, files, blacklist):
2164 files = sys.stdin.readlines()
2167 raise Error('Nothing to upload')
2169 files = [f.decode('utf-8') for f in files]
2170 blacklist = tools.gen_blacklist(blacklist)
2171 with get_storage(out, namespace) as storage:
2172 results = archive_files_to_storage(storage, files, blacklist)
2173 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2176 @subcommand.usage('<file1..fileN> or - to read from stdin')
2177 def CMDarchive(parser, args):
2178 """Archives data to the server.
2180 If a directory is specified, a .isolated file is created the whole directory
2181 is uploaded. Then this .isolated file can be included in another one to run
2184 The commands output each file that was processed with its content hash. For
2185 directories, the .isolated generated for the directory is listed as the
2186 directory entry itself.
2188 add_isolate_server_options(parser, False)
2191 action='append', default=list(DEFAULT_BLACKLIST),
2192 help='List of regexp to use as blacklist filter when uploading '
2194 options, files = parser.parse_args(args)
2195 process_isolate_server_options(parser, options)
2196 if file_path.is_url(options.isolate_server):
2197 auth.ensure_logged_in(options.isolate_server)
2199 archive(options.isolate_server, options.namespace, files, options.blacklist)
2201 parser.error(e.args[0])
2205 def CMDdownload(parser, args):
2206 """Download data from the server.
2208 It can either download individual files or a complete tree from a .isolated
2211 add_isolate_server_options(parser, True)
2213 '-i', '--isolated', metavar='HASH',
2214 help='hash of an isolated file, .isolated file content is discarded, use '
2215 '--file if you need it')
2217 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2218 help='hash and destination of a file, can be used multiple times')
2220 '-t', '--target', metavar='DIR', default=os.getcwd(),
2221 help='destination directory')
2222 options, args = parser.parse_args(args)
2223 process_isolate_server_options(parser, options)
2225 parser.error('Unsupported arguments: %s' % args)
2226 if bool(options.isolated) == bool(options.file):
2227 parser.error('Use one of --isolated or --file, and only one.')
2229 options.target = os.path.abspath(options.target)
2231 remote = options.isolate_server or options.indir
2232 if file_path.is_url(remote):
2233 auth.ensure_logged_in(remote)
2235 with get_storage(remote, options.namespace) as storage:
2236 # Fetching individual files.
2238 channel = threading_utils.TaskChannel()
2240 for digest, dest in options.file:
2241 pending[digest] = dest
2242 storage.async_fetch(
2247 functools.partial(file_write, os.path.join(options.target, dest)))
2249 fetched = channel.pull()
2250 dest = pending.pop(fetched)
2251 logging.info('%s: %s', fetched, dest)
2253 # Fetching whole isolated tree.
2254 if options.isolated:
2255 settings = fetch_isolated(
2256 isolated_hash=options.isolated,
2258 cache=MemoryCache(),
2259 outdir=options.target,
2260 require_command=False)
2261 rel = os.path.join(options.target, settings.relative_cwd)
2262 print('To run this test please run from the directory %s:' %
2263 os.path.join(options.target, rel))
2264 print(' ' + ' '.join(settings.command))
2269 @subcommand.usage('<file1..fileN> or - to read from stdin')
2270 def CMDhashtable(parser, args):
2271 """Archives data to a hashtable on the file system.
2273 If a directory is specified, a .isolated file is created the whole directory
2274 is uploaded. Then this .isolated file can be included in another one to run
2277 The commands output each file that was processed with its content hash. For
2278 directories, the .isolated generated for the directory is listed as the
2279 directory entry itself.
2281 add_outdir_options(parser)
2284 action='append', default=list(DEFAULT_BLACKLIST),
2285 help='List of regexp to use as blacklist filter when uploading '
2287 options, files = parser.parse_args(args)
2288 process_outdir_options(parser, options, os.getcwd())
2290 # Do not compress files when archiving to the file system.
2291 archive(options.outdir, 'default', files, options.blacklist)
2293 parser.error(e.args[0])
2297 def add_isolate_server_options(parser, add_indir):
2298 """Adds --isolate-server and --namespace options to parser.
2300 Includes --indir if desired.
2303 '-I', '--isolate-server',
2304 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
2305 help='URL of the Isolate Server to use. Defaults to the environment '
2306 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2309 '--namespace', default='default-gzip',
2310 help='The namespace to use on the Isolate Server, default: %default')
2313 '--indir', metavar='DIR',
2314 help='Directory used to store the hashtable instead of using an '
2318 def process_isolate_server_options(parser, options):
2319 """Processes the --isolate-server and --indir options and aborts if neither is
2322 has_indir = hasattr(options, 'indir')
2323 if not options.isolate_server:
2325 parser.error('--isolate-server is required.')
2326 elif not options.indir:
2327 parser.error('Use one of --indir or --isolate-server.')
2329 if has_indir and options.indir:
2330 parser.error('Use only one of --indir or --isolate-server.')
2332 if options.isolate_server:
2333 parts = urlparse.urlparse(options.isolate_server, 'https')
2335 parser.error('--isolate-server doesn\'t support query parameter.')
2337 parser.error('--isolate-server doesn\'t support fragment in the url.')
2338 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2339 # what is desired here.
2341 if not new[1] and new[2]:
2342 new[1] = new[2].rstrip('/')
2344 new[2] = new[2].rstrip('/')
2345 options.isolate_server = urlparse.urlunparse(new)
2346 on_error.report_on_exception_exit(options.isolate_server)
2349 if file_path.is_url(options.indir):
2350 parser.error('Can\'t use an URL for --indir.')
2351 options.indir = unicode(options.indir).replace('/', os.path.sep)
2352 options.indir = os.path.abspath(
2353 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2354 if not os.path.isdir(options.indir):
2355 parser.error('Path given to --indir must exist.')
2359 def add_outdir_options(parser):
2360 """Adds --outdir, which is orthogonal to --isolate-server.
2362 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
2363 On 'download', the same command can download from either an isolate server or
2367 '-o', '--outdir', metavar='DIR',
2368 help='Directory used to recreate the tree.')
2371 def process_outdir_options(parser, options, cwd):
2372 if not options.outdir:
2373 parser.error('--outdir is required.')
2374 if file_path.is_url(options.outdir):
2375 parser.error('Can\'t use an URL for --outdir.')
2376 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
2377 # outdir doesn't need native path case since tracing is never done from there.
2378 options.outdir = os.path.abspath(
2379 os.path.normpath(os.path.join(cwd, options.outdir)))
2380 # In theory, we'd create the directory outdir right away. Defer doing it in
2381 # case there's errors in the command line.
2384 class OptionParserIsolateServer(tools.OptionParserWithLogging):
2385 def __init__(self, **kwargs):
2386 tools.OptionParserWithLogging.__init__(
2388 version=__version__,
2389 prog=os.path.basename(sys.modules[__name__].__file__),
2391 auth.add_auth_options(self)
2393 def parse_args(self, *args, **kwargs):
2394 options, args = tools.OptionParserWithLogging.parse_args(
2395 self, *args, **kwargs)
2396 auth.process_auth_options(self, options)
2397 return options, args
2401 dispatcher = subcommand.CommandDispatcher(__name__)
2402 return dispatcher.execute(OptionParserIsolateServer(), args)
2405 if __name__ == '__main__':
2406 fix_encoding.fix_encoding()
2407 tools.disable_buffering()
2409 sys.exit(main(sys.argv[1:]))