- add third_party src.
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / isolateserver.py
1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Archives a set of files to a server."""
7
8 __version__ = '0.2'
9
10 import functools
11 import hashlib
12 import json
13 import logging
14 import os
15 import re
16 import sys
17 import threading
18 import time
19 import urllib
20 import zlib
21
22 from third_party import colorama
23 from third_party.depot_tools import fix_encoding
24 from third_party.depot_tools import subcommand
25
26 from utils import net
27 from utils import threading_utils
28 from utils import tools
29
30
31 # Version of isolate protocol passed to the server in /handshake request.
32 ISOLATE_PROTOCOL_VERSION = '1.0'
33
34
35 # The number of files to check the isolate server per /pre-upload query.
36 # All files are sorted by likelihood of a change in the file content
37 # (currently file size is used to estimate this: larger the file -> larger the
38 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
39 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
40 # and so on. Numbers here is a trade-off; the more per request, the lower the
41 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42 # larger values cause longer lookups, increasing the initial latency to start
43 # uploading, which is especially an issue for large files. This value is
44 # optimized for the "few thousands files to look up with minimal number of large
45 # files missing" case.
46 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
47
48
49 # A list of already compressed extension types that should not receive any
50 # compression before being uploaded.
51 ALREADY_COMPRESSED_TYPES = [
52     '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53     'wav', 'zip'
54 ]
55
56
57 # The file size to be used when we don't know the correct file size,
58 # generally used for .isolated files.
59 UNKNOWN_FILE_SIZE = None
60
61
62 # The size of each chunk to read when downloading and unzipping files.
63 ZIPPED_FILE_CHUNK = 16 * 1024
64
65 # Chunk size to use when doing disk I/O.
66 DISK_FILE_CHUNK = 1024 * 1024
67
68 # Chunk size to use when reading from network stream.
69 NET_IO_FILE_CHUNK = 16 * 1024
70
71
72 # Read timeout in seconds for downloads from isolate storage. If there's no
73 # response from the server within this timeout whole download will be aborted.
74 DOWNLOAD_READ_TIMEOUT = 60
75
76 # Maximum expected delay (in seconds) between successive file fetches
77 # in run_tha_test. If it takes longer than that, a deadlock might be happening
78 # and all stack frames for all threads are dumped to log.
79 DEADLOCK_TIMEOUT = 5 * 60
80
81
82 # The delay (in seconds) to wait between logging statements when retrieving
83 # the required files. This is intended to let the user (or buildbot) know that
84 # the program is still running.
85 DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
88 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89 # specify the names here.
90 SUPPORTED_ALGOS = {
91   'md5': hashlib.md5,
92   'sha-1': hashlib.sha1,
93   'sha-512': hashlib.sha512,
94 }
95
96
97 # Used for serialization.
98 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
99
100
101 class ConfigError(ValueError):
102   """Generic failure to load a .isolated file."""
103   pass
104
105
106 class MappingError(OSError):
107   """Failed to recreate the tree."""
108   pass
109
110
111 def is_valid_hash(value, algo):
112   """Returns if the value is a valid hash for the corresponding algorithm."""
113   size = 2 * algo().digest_size
114   return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
115
116
117 def hash_file(filepath, algo):
118   """Calculates the hash of a file without reading it all in memory at once.
119
120   |algo| should be one of hashlib hashing algorithm.
121   """
122   digest = algo()
123   with open(filepath, 'rb') as f:
124     while True:
125       chunk = f.read(DISK_FILE_CHUNK)
126       if not chunk:
127         break
128       digest.update(chunk)
129   return digest.hexdigest()
130
131
132 def stream_read(stream, chunk_size):
133   """Reads chunks from |stream| and yields them."""
134   while True:
135     data = stream.read(chunk_size)
136     if not data:
137       break
138     yield data
139
140
141 def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
142   """Yields file content in chunks of given |chunk_size|."""
143   with open(filepath, 'rb') as f:
144     while True:
145       data = f.read(chunk_size)
146       if not data:
147         break
148       yield data
149
150
151 def file_write(filepath, content_generator):
152   """Writes file content as generated by content_generator.
153
154   Creates the intermediary directory as needed.
155
156   Returns the number of bytes written.
157
158   Meant to be mocked out in unit tests.
159   """
160   filedir = os.path.dirname(filepath)
161   if not os.path.isdir(filedir):
162     os.makedirs(filedir)
163   total = 0
164   with open(filepath, 'wb') as f:
165     for d in content_generator:
166       total += len(d)
167       f.write(d)
168   return total
169
170
171 def zip_compress(content_generator, level=7):
172   """Reads chunks from |content_generator| and yields zip compressed chunks."""
173   compressor = zlib.compressobj(level)
174   for chunk in content_generator:
175     compressed = compressor.compress(chunk)
176     if compressed:
177       yield compressed
178   tail = compressor.flush(zlib.Z_FINISH)
179   if tail:
180     yield tail
181
182
183 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
184   """Reads zipped data from |content_generator| and yields decompressed data.
185
186   Decompresses data in small chunks (no larger than |chunk_size|) so that
187   zip bomb file doesn't cause zlib to preallocate huge amount of memory.
188
189   Raises IOError if data is corrupted or incomplete.
190   """
191   decompressor = zlib.decompressobj()
192   compressed_size = 0
193   try:
194     for chunk in content_generator:
195       compressed_size += len(chunk)
196       data = decompressor.decompress(chunk, chunk_size)
197       if data:
198         yield data
199       while decompressor.unconsumed_tail:
200         data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
201         if data:
202           yield data
203     tail = decompressor.flush()
204     if tail:
205       yield tail
206   except zlib.error as e:
207     raise IOError(
208         'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
209   # Ensure all data was read and decompressed.
210   if decompressor.unused_data or decompressor.unconsumed_tail:
211     raise IOError('Not all data was decompressed')
212
213
214 def get_zip_compression_level(filename):
215   """Given a filename calculates the ideal zip compression level to use."""
216   file_ext = os.path.splitext(filename)[1].lower()
217   # TODO(csharp): Profile to find what compression level works best.
218   return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
219
220
221 def create_directories(base_directory, files):
222   """Creates the directory structure needed by the given list of files."""
223   logging.debug('create_directories(%s, %d)', base_directory, len(files))
224   # Creates the tree of directories to create.
225   directories = set(os.path.dirname(f) for f in files)
226   for item in list(directories):
227     while item:
228       directories.add(item)
229       item = os.path.dirname(item)
230   for d in sorted(directories):
231     if d:
232       os.mkdir(os.path.join(base_directory, d))
233
234
235 def create_links(base_directory, files):
236   """Creates any links needed by the given set of files."""
237   for filepath, properties in files:
238     if 'l' not in properties:
239       continue
240     if sys.platform == 'win32':
241       # TODO(maruel): Create junctions or empty text files similar to what
242       # cygwin do?
243       logging.warning('Ignoring symlink %s', filepath)
244       continue
245     outfile = os.path.join(base_directory, filepath)
246     # symlink doesn't exist on Windows. So the 'link' property should
247     # never be specified for windows .isolated file.
248     os.symlink(properties['l'], outfile)  # pylint: disable=E1101
249     if 'm' in properties:
250       lchmod = getattr(os, 'lchmod', None)
251       if lchmod:
252         lchmod(outfile, properties['m'])
253
254
255 def is_valid_file(filepath, size):
256   """Determines if the given files appears valid.
257
258   Currently it just checks the file's size.
259   """
260   if size == UNKNOWN_FILE_SIZE:
261     return os.path.isfile(filepath)
262   actual_size = os.stat(filepath).st_size
263   if size != actual_size:
264     logging.warning(
265         'Found invalid item %s; %d != %d',
266         os.path.basename(filepath), actual_size, size)
267     return False
268   return True
269
270
271 class WorkerPool(threading_utils.AutoRetryThreadPool):
272   """Thread pool that automatically retries on IOError and runs a preconfigured
273   function.
274   """
275   # Initial and maximum number of worker threads.
276   INITIAL_WORKERS = 2
277   MAX_WORKERS = 16
278   RETRIES = 5
279
280   def __init__(self):
281     super(WorkerPool, self).__init__(
282         [IOError],
283         self.RETRIES,
284         self.INITIAL_WORKERS,
285         self.MAX_WORKERS,
286         0,
287         'remote')
288
289
290 class Item(object):
291   """An item to push to Storage.
292
293   It starts its life in a main thread, travels to 'contains' thread, then to
294   'push' thread and then finally back to the main thread.
295
296   It is never used concurrently from multiple threads.
297   """
298
299   def __init__(self, digest, size, is_isolated=False):
300     self.digest = digest
301     self.size = size
302     self.is_isolated = is_isolated
303     self.compression_level = 6
304     self.push_state = None
305
306   def content(self, chunk_size):
307     """Iterable with content of this item in chunks of given size.
308
309     Arguments:
310       chunk_size: preferred size of the chunk to produce, may be ignored.
311     """
312     raise NotImplementedError()
313
314
315 class FileItem(Item):
316   """A file to push to Storage."""
317
318   def __init__(self, path, digest, size, is_isolated):
319     super(FileItem, self).__init__(digest, size, is_isolated)
320     self.path = path
321     self.compression_level = get_zip_compression_level(path)
322
323   def content(self, chunk_size):
324     return file_read(self.path, chunk_size)
325
326
327 class BufferItem(Item):
328   """A byte buffer to push to Storage."""
329
330   def __init__(self, buf, algo, is_isolated=False):
331     super(BufferItem, self).__init__(
332         algo(buf).hexdigest(), len(buf), is_isolated)
333     self.buffer = buf
334
335   def content(self, _chunk_size):
336     return [self.buffer]
337
338
339 class Storage(object):
340   """Efficiently downloads or uploads large set of files via StorageApi."""
341
342   def __init__(self, storage_api, use_zip):
343     self.use_zip = use_zip
344     self._storage_api = storage_api
345     self._cpu_thread_pool = None
346     self._net_thread_pool = None
347
348   @property
349   def cpu_thread_pool(self):
350     """ThreadPool for CPU-bound tasks like zipping."""
351     if self._cpu_thread_pool is None:
352       self._cpu_thread_pool = threading_utils.ThreadPool(
353           2, max(threading_utils.num_processors(), 2), 0, 'zip')
354     return self._cpu_thread_pool
355
356   @property
357   def net_thread_pool(self):
358     """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
359     if self._net_thread_pool is None:
360       self._net_thread_pool = WorkerPool()
361     return self._net_thread_pool
362
363   def close(self):
364     """Waits for all pending tasks to finish."""
365     if self._cpu_thread_pool:
366       self._cpu_thread_pool.join()
367       self._cpu_thread_pool.close()
368       self._cpu_thread_pool = None
369     if self._net_thread_pool:
370       self._net_thread_pool.join()
371       self._net_thread_pool.close()
372       self._net_thread_pool = None
373
374   def __enter__(self):
375     """Context manager interface."""
376     return self
377
378   def __exit__(self, _exc_type, _exc_value, _traceback):
379     """Context manager interface."""
380     self.close()
381     return False
382
383   def upload_tree(self, indir, infiles):
384     """Uploads the given tree to the isolate server.
385
386     Arguments:
387       indir: root directory the infiles are based in.
388       infiles: dict of files to upload from |indir|.
389
390     Returns:
391       List of items that were uploaded. All other items are already there.
392     """
393     logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
394
395     # Convert |indir| + |infiles| into a list of FileItem objects.
396     # Filter out symlinks, since they are not represented by items on isolate
397     # server side.
398     items = [
399         FileItem(
400             path=os.path.join(indir, filepath),
401             digest=metadata['h'],
402             size=metadata['s'],
403             is_isolated=metadata.get('priority') == '0')
404         for filepath, metadata in infiles.iteritems()
405         if 'l' not in metadata
406     ]
407
408     return self.upload_items(items)
409
410   def upload_items(self, items):
411     """Uploads bunch of items to the isolate server.
412
413     Will upload only items that are missing.
414
415     Arguments:
416       items: list of Item instances that represents data to upload.
417
418     Returns:
419       List of items that were uploaded. All other items are already there.
420     """
421     # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
422     # used by swarming.py. There's no need to spawn multiple threads and try to
423     # do stuff in parallel: there's nothing to parallelize. 'contains' check and
424     # 'push' should be performed sequentially in the context of current thread.
425
426     # For each digest keep only first Item that matches it. All other items
427     # are just indistinguishable copies from the point of view of isolate
428     # server (it doesn't care about paths at all, only content and digests).
429     seen = {}
430     duplicates = 0
431     for item in items:
432       if seen.setdefault(item.digest, item) is not item:
433         duplicates += 1
434     items = seen.values()
435     if duplicates:
436       logging.info('Skipped %d duplicated files', duplicates)
437
438     # Enqueue all upload tasks.
439     missing = set()
440     channel = threading_utils.TaskChannel()
441     for missing_item in self.get_missing_items(items):
442       missing.add(missing_item)
443       self.async_push(
444           channel,
445           WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
446           missing_item)
447
448     uploaded = []
449     # No need to spawn deadlock detector thread if there's nothing to upload.
450     if missing:
451       with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
452         # Wait for all started uploads to finish.
453         while len(uploaded) != len(missing):
454           detector.ping()
455           item = channel.pull()
456           uploaded.append(item)
457           logging.debug(
458               'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
459     logging.info('All files are uploaded')
460
461     # Print stats.
462     total = len(items)
463     total_size = sum(f.size for f in items)
464     logging.info(
465         'Total:      %6d, %9.1fkb',
466         total,
467         total_size / 1024.)
468     cache_hit = set(items) - missing
469     cache_hit_size = sum(f.size for f in cache_hit)
470     logging.info(
471         'cache hit:  %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
472         len(cache_hit),
473         cache_hit_size / 1024.,
474         len(cache_hit) * 100. / total,
475         cache_hit_size * 100. / total_size if total_size else 0)
476     cache_miss = missing
477     cache_miss_size = sum(f.size for f in cache_miss)
478     logging.info(
479         'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
480         len(cache_miss),
481         cache_miss_size / 1024.,
482         len(cache_miss) * 100. / total,
483         cache_miss_size * 100. / total_size if total_size else 0)
484
485     return uploaded
486
487   def get_fetch_url(self, digest):
488     """Returns an URL that can be used to fetch an item with given digest.
489
490     Arguments:
491       digest: hex digest of item to fetch.
492
493     Returns:
494       An URL or None if underlying protocol doesn't support this.
495     """
496     return self._storage_api.get_fetch_url(digest)
497
498   def async_push(self, channel, priority, item):
499     """Starts asynchronous push to the server in a parallel thread.
500
501     Arguments:
502       channel: TaskChannel that receives back |item| when upload ends.
503       priority: thread pool task priority for the push.
504       item: item to upload as instance of Item class.
505     """
506     def push(content):
507       """Pushes an item and returns its id, to pass as a result to |channel|."""
508       self._storage_api.push(item, content)
509       return item
510
511     # If zipping is not required, just start a push task.
512     if not self.use_zip:
513       self.net_thread_pool.add_task_with_channel(channel, priority, push,
514           item.content(DISK_FILE_CHUNK))
515       return
516
517     # If zipping is enabled, zip in a separate thread.
518     def zip_and_push():
519       # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
520       # content right here. It will block until all file is zipped.
521       try:
522         stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
523             item.compression_level)
524         data = ''.join(stream)
525       except Exception as exc:
526         logging.error('Failed to zip \'%s\': %s', item, exc)
527         channel.send_exception(exc)
528         return
529       self.net_thread_pool.add_task_with_channel(
530           channel, priority, push, [data])
531     self.cpu_thread_pool.add_task(priority, zip_and_push)
532
533   def async_fetch(self, channel, priority, digest, size, sink):
534     """Starts asynchronous fetch from the server in a parallel thread.
535
536     Arguments:
537       channel: TaskChannel that receives back |digest| when download ends.
538       priority: thread pool task priority for the fetch.
539       digest: hex digest of an item to download.
540       size: expected size of the item (after decompression).
541       sink: function that will be called as sink(generator).
542     """
543     def fetch():
544       try:
545         # Prepare reading pipeline.
546         stream = self._storage_api.fetch(digest)
547         if self.use_zip:
548           stream = zip_decompress(stream, DISK_FILE_CHUNK)
549         # Run |stream| through verifier that will assert its size.
550         verifier = FetchStreamVerifier(stream, size)
551         # Verified stream goes to |sink|.
552         sink(verifier.run())
553       except Exception as err:
554         logging.warning('Failed to fetch %s: %s', digest, err)
555         raise
556       return digest
557
558     # Don't bother with zip_thread_pool for decompression. Decompression is
559     # really fast and most probably IO bound anyway.
560     self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
561
562   def get_missing_items(self, items):
563     """Yields items that are missing from the server.
564
565     Issues multiple parallel queries via StorageApi's 'contains' method.
566
567     Arguments:
568       items: a list of Item objects to check.
569
570     Yields:
571       Item objects that are missing from the server.
572     """
573     channel = threading_utils.TaskChannel()
574     pending = 0
575     # Enqueue all requests.
576     for batch in self.batch_items_for_check(items):
577       self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
578           self._storage_api.contains, batch)
579       pending += 1
580     # Yield results as they come in.
581     for _ in xrange(pending):
582       for missing in channel.pull():
583         yield missing
584
585   @staticmethod
586   def batch_items_for_check(items):
587     """Splits list of items to check for existence on the server into batches.
588
589     Each batch corresponds to a single 'exists?' query to the server via a call
590     to StorageApi's 'contains' method.
591
592     Arguments:
593       items: a list of Item objects.
594
595     Yields:
596       Batches of items to query for existence in a single operation,
597       each batch is a list of Item objects.
598     """
599     batch_count = 0
600     batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
601     next_queries = []
602     for item in sorted(items, key=lambda x: x.size, reverse=True):
603       next_queries.append(item)
604       if len(next_queries) == batch_size_limit:
605         yield next_queries
606         next_queries = []
607         batch_count += 1
608         batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
609             min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
610     if next_queries:
611       yield next_queries
612
613
614 class FetchQueue(object):
615   """Fetches items from Storage and places them into LocalCache.
616
617   It manages multiple concurrent fetch operations. Acts as a bridge between
618   Storage and LocalCache so that Storage and LocalCache don't depend on each
619   other at all.
620   """
621
622   def __init__(self, storage, cache):
623     self.storage = storage
624     self.cache = cache
625     self._channel = threading_utils.TaskChannel()
626     self._pending = set()
627     self._accessed = set()
628     self._fetched = cache.cached_set()
629
630   def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
631     """Starts asynchronous fetch of item |digest|."""
632     # Fetching it now?
633     if digest in self._pending:
634       return
635
636     # Mark this file as in use, verify_all_cached will later ensure it is still
637     # in cache.
638     self._accessed.add(digest)
639
640     # Already fetched? Notify cache to update item's LRU position.
641     if digest in self._fetched:
642       # 'touch' returns True if item is in cache and not corrupted.
643       if self.cache.touch(digest, size):
644         return
645       # Item is corrupted, remove it from cache and fetch it again.
646       self._fetched.remove(digest)
647       self.cache.evict(digest)
648
649     # TODO(maruel): It should look at the free disk space, the current cache
650     # size and the size of the new item on every new item:
651     # - Trim the cache as more entries are listed when free disk space is low,
652     #   otherwise if the amount of data downloaded during the run > free disk
653     #   space, it'll crash.
654     # - Make sure there's enough free disk space to fit all dependencies of
655     #   this run! If not, abort early.
656
657     # Start fetching.
658     self._pending.add(digest)
659     self.storage.async_fetch(
660         self._channel, priority, digest, size,
661         functools.partial(self.cache.write, digest))
662
663   def wait(self, digests):
664     """Starts a loop that waits for at least one of |digests| to be retrieved.
665
666     Returns the first digest retrieved.
667     """
668     # Flush any already fetched items.
669     for digest in digests:
670       if digest in self._fetched:
671         return digest
672
673     # Ensure all requested items are being fetched now.
674     assert all(digest in self._pending for digest in digests), (
675         digests, self._pending)
676
677     # Wait for some requested item to finish fetching.
678     while self._pending:
679       digest = self._channel.pull()
680       self._pending.remove(digest)
681       self._fetched.add(digest)
682       if digest in digests:
683         return digest
684
685     # Should never reach this point due to assert above.
686     raise RuntimeError('Impossible state')
687
688   def inject_local_file(self, path, algo):
689     """Adds local file to the cache as if it was fetched from storage."""
690     with open(path, 'rb') as f:
691       data = f.read()
692     digest = algo(data).hexdigest()
693     self.cache.write(digest, [data])
694     self._fetched.add(digest)
695     return digest
696
697   @property
698   def pending_count(self):
699     """Returns number of items to be fetched."""
700     return len(self._pending)
701
702   def verify_all_cached(self):
703     """True if all accessed items are in cache."""
704     return self._accessed.issubset(self.cache.cached_set())
705
706
707 class FetchStreamVerifier(object):
708   """Verifies that fetched file is valid before passing it to the LocalCache."""
709
710   def __init__(self, stream, expected_size):
711     self.stream = stream
712     self.expected_size = expected_size
713     self.current_size = 0
714
715   def run(self):
716     """Generator that yields same items as |stream|.
717
718     Verifies |stream| is complete before yielding a last chunk to consumer.
719
720     Also wraps IOError produced by consumer into MappingError exceptions since
721     otherwise Storage will retry fetch on unrelated local cache errors.
722     """
723     # Read one chunk ahead, keep it in |stored|.
724     # That way a complete stream can be verified before pushing last chunk
725     # to consumer.
726     stored = None
727     for chunk in self.stream:
728       assert chunk is not None
729       if stored is not None:
730         self._inspect_chunk(stored, is_last=False)
731         try:
732           yield stored
733         except IOError as exc:
734           raise MappingError('Failed to store an item in cache: %s' % exc)
735       stored = chunk
736     if stored is not None:
737       self._inspect_chunk(stored, is_last=True)
738       try:
739         yield stored
740       except IOError as exc:
741         raise MappingError('Failed to store an item in cache: %s' % exc)
742
743   def _inspect_chunk(self, chunk, is_last):
744     """Called for each fetched chunk before passing it to consumer."""
745     self.current_size += len(chunk)
746     if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
747         (self.expected_size != self.current_size)):
748       raise IOError('Incorrect file size: expected %d, got %d' % (
749           self.expected_size, self.current_size))
750
751
752 class StorageApi(object):
753   """Interface for classes that implement low-level storage operations."""
754
755   def get_fetch_url(self, digest):
756     """Returns an URL that can be used to fetch an item with given digest.
757
758     Arguments:
759       digest: hex digest of item to fetch.
760
761     Returns:
762       An URL or None if the protocol doesn't support this.
763     """
764     raise NotImplementedError()
765
766   def fetch(self, digest):
767     """Fetches an object and yields its content.
768
769     Arguments:
770       digest: hash digest of item to download.
771
772     Yields:
773       Chunks of downloaded item (as str objects).
774     """
775     raise NotImplementedError()
776
777   def push(self, item, content):
778     """Uploads an |item| with content generated by |content| generator.
779
780     Arguments:
781       item: Item object that holds information about an item being pushed.
782       content: a generator that yields chunks to push.
783
784     Returns:
785       None.
786     """
787     raise NotImplementedError()
788
789   def contains(self, items):
790     """Checks for existence of given |items| on the server.
791
792     Mutates |items| by assigning opaque implement specific object to Item's
793     push_state attribute on missing entries in the datastore.
794
795     Arguments:
796       items: list of Item objects.
797
798     Returns:
799       A list of items missing on server as a list of Item objects.
800     """
801     raise NotImplementedError()
802
803
804 class IsolateServer(StorageApi):
805   """StorageApi implementation that downloads and uploads to Isolate Server.
806
807   It uploads and downloads directly from Google Storage whenever appropriate.
808   """
809
810   class _PushState(object):
811     """State needed to call .push(), to be stored in Item.push_state."""
812     def __init__(self, upload_url, finalize_url):
813       self.upload_url = upload_url
814       self.finalize_url = finalize_url
815       self.uploaded = False
816       self.finalized = False
817
818   def __init__(self, base_url, namespace):
819     super(IsolateServer, self).__init__()
820     assert base_url.startswith('http'), base_url
821     self.base_url = base_url.rstrip('/')
822     self.namespace = namespace
823     self._lock = threading.Lock()
824     self._server_caps = None
825
826   @staticmethod
827   def _generate_handshake_request():
828     """Returns a dict to be sent as handshake request body."""
829     # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
830     return {
831         'client_app_version': __version__,
832         'fetcher': True,
833         'protocol_version': ISOLATE_PROTOCOL_VERSION,
834         'pusher': True,
835     }
836
837   @staticmethod
838   def _validate_handshake_response(caps):
839     """Validates and normalizes handshake response."""
840     logging.info('Protocol version: %s', caps['protocol_version'])
841     logging.info('Server version: %s', caps['server_app_version'])
842     if caps.get('error'):
843       raise MappingError(caps['error'])
844     if not caps['access_token']:
845       raise ValueError('access_token is missing')
846     return caps
847
848   @property
849   def _server_capabilities(self):
850     """Performs handshake with the server if not yet done.
851
852     Returns:
853       Server capabilities dictionary as returned by /handshake endpoint.
854
855     Raises:
856       MappingError if server rejects the handshake.
857     """
858     # TODO(maruel): Make this request much earlier asynchronously while the
859     # files are being enumerated.
860     with self._lock:
861       if self._server_caps is None:
862         request_body = json.dumps(
863             self._generate_handshake_request(), separators=(',', ':'))
864         response = net.url_read(
865             url=self.base_url + '/content-gs/handshake',
866             data=request_body,
867             content_type='application/json',
868             method='POST')
869         if response is None:
870           raise MappingError('Failed to perform handshake.')
871         try:
872           caps = json.loads(response)
873           if not isinstance(caps, dict):
874             raise ValueError('Expecting JSON dict')
875           self._server_caps = self._validate_handshake_response(caps)
876         except (ValueError, KeyError, TypeError) as exc:
877           # KeyError exception has very confusing str conversion: it's just a
878           # missing key value and nothing else. So print exception class name
879           # as well.
880           raise MappingError('Invalid handshake response (%s): %s' % (
881               exc.__class__.__name__, exc))
882       return self._server_caps
883
884   def get_fetch_url(self, digest):
885     assert isinstance(digest, basestring)
886     return '%s/content-gs/retrieve/%s/%s' % (
887         self.base_url, self.namespace, digest)
888
889   def fetch(self, digest):
890     source_url = self.get_fetch_url(digest)
891     logging.debug('download_file(%s)', source_url)
892
893     # Because the app engine DB is only eventually consistent, retry 404 errors
894     # because the file might just not be visible yet (even though it has been
895     # uploaded).
896     connection = net.url_open(
897         source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
898     if not connection:
899       raise IOError('Unable to open connection to %s' % source_url)
900     return stream_read(connection, NET_IO_FILE_CHUNK)
901
902   def push(self, item, content):
903     assert isinstance(item, Item)
904     assert isinstance(item.push_state, IsolateServer._PushState)
905     assert not item.push_state.finalized
906
907     # TODO(vadimsh): Do not read from |content| generator when retrying push.
908     # If |content| is indeed a generator, it can not be re-winded back
909     # to the beginning of the stream. A retry will find it exhausted. A possible
910     # solution is to wrap |content| generator with some sort of caching
911     # restartable generator. It should be done alongside streaming support
912     # implementation.
913
914     # This push operation may be a retry after failed finalization call below,
915     # no need to reupload contents in that case.
916     if not item.push_state.uploaded:
917       # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
918       # upload support is implemented.
919       if isinstance(content, list) and len(content) == 1:
920         content = content[0]
921       else:
922         content = ''.join(content)
923       # PUT file to |upload_url|.
924       response = net.url_read(
925           url=item.push_state.upload_url,
926           data=content,
927           content_type='application/octet-stream',
928           method='PUT')
929       if response is None:
930         raise IOError('Failed to upload a file %s to %s' % (
931             item.digest, item.push_state.upload_url))
932       item.push_state.uploaded = True
933     else:
934       logging.info(
935           'A file %s already uploaded, retrying finalization only', item.digest)
936
937     # Optionally notify the server that it's done.
938     if item.push_state.finalize_url:
939       # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
940       # send it to isolated server. That way isolate server can verify that
941       # the data safely reached Google Storage (GS provides MD5 and CRC32C of
942       # stored files).
943       response = net.url_read(
944           url=item.push_state.finalize_url,
945           data='',
946           content_type='application/json',
947           method='POST')
948       if response is None:
949         raise IOError('Failed to finalize an upload of %s' % item.digest)
950     item.push_state.finalized = True
951
952   def contains(self, items):
953     logging.info('Checking existence of %d files...', len(items))
954
955     # Request body is a json encoded list of dicts.
956     body = [
957         {
958           'h': item.digest,
959           's': item.size,
960           'i': int(item.is_isolated),
961         } for item in items
962     ]
963
964     query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
965         self.base_url,
966         self.namespace,
967         urllib.quote(self._server_capabilities['access_token']))
968     response_body = net.url_read(
969         url=query_url,
970         data=json.dumps(body, separators=(',', ':')),
971         content_type='application/json',
972         method='POST')
973     if response_body is None:
974       raise MappingError('Failed to execute /pre-upload query')
975
976     # Response body is a list of push_urls (or null if file is already present).
977     try:
978       response = json.loads(response_body)
979       if not isinstance(response, list):
980         raise ValueError('Expecting response with json-encoded list')
981       if len(response) != len(items):
982         raise ValueError(
983             'Incorrect number of items in the list, expected %d, '
984             'but got %d' % (len(items), len(response)))
985     except ValueError as err:
986       raise MappingError(
987           'Invalid response from server: %s, body is %s' % (err, response_body))
988
989     # Pick Items that are missing, attach _PushState to them.
990     missing_items = []
991     for i, push_urls in enumerate(response):
992       if push_urls:
993         assert len(push_urls) == 2, str(push_urls)
994         item = items[i]
995         assert item.push_state is None
996         item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
997         missing_items.append(item)
998     logging.info('Queried %d files, %d cache hit',
999         len(items), len(items) - len(missing_items))
1000     return missing_items
1001
1002
1003 class FileSystem(StorageApi):
1004   """StorageApi implementation that fetches data from the file system.
1005
1006   The common use case is a NFS/CIFS file server that is mounted locally that is
1007   used to fetch the file on a local partition.
1008   """
1009
1010   def __init__(self, base_path):
1011     super(FileSystem, self).__init__()
1012     self.base_path = base_path
1013
1014   def get_fetch_url(self, digest):
1015     return None
1016
1017   def fetch(self, digest):
1018     assert isinstance(digest, basestring)
1019     return file_read(os.path.join(self.base_path, digest))
1020
1021   def push(self, item, content):
1022     assert isinstance(item, Item)
1023     file_write(os.path.join(self.base_path, item.digest), content)
1024
1025   def contains(self, items):
1026     return [
1027         item for item in items
1028         if not os.path.exists(os.path.join(self.base_path, item.digest))
1029     ]
1030
1031
1032 class LocalCache(object):
1033   """Local cache that stores objects fetched via Storage.
1034
1035   It can be accessed concurrently from multiple threads, so it should protect
1036   its internal state with some lock.
1037   """
1038
1039   def __enter__(self):
1040     """Context manager interface."""
1041     return self
1042
1043   def __exit__(self, _exc_type, _exec_value, _traceback):
1044     """Context manager interface."""
1045     return False
1046
1047   def cached_set(self):
1048     """Returns a set of all cached digests (always a new object)."""
1049     raise NotImplementedError()
1050
1051   def touch(self, digest, size):
1052     """Ensures item is not corrupted and updates its LRU position.
1053
1054     Arguments:
1055       digest: hash digest of item to check.
1056       size: expected size of this item.
1057
1058     Returns:
1059       True if item is in cache and not corrupted.
1060     """
1061     raise NotImplementedError()
1062
1063   def evict(self, digest):
1064     """Removes item from cache if it's there."""
1065     raise NotImplementedError()
1066
1067   def read(self, digest):
1068     """Returns contents of the cached item as a single str."""
1069     raise NotImplementedError()
1070
1071   def write(self, digest, content):
1072     """Reads data from |content| generator and stores it in cache."""
1073     raise NotImplementedError()
1074
1075   def link(self, digest, dest, file_mode=None):
1076     """Ensures file at |dest| has same content as cached |digest|."""
1077     raise NotImplementedError()
1078
1079
1080 class MemoryCache(LocalCache):
1081   """LocalCache implementation that stores everything in memory."""
1082
1083   def __init__(self):
1084     super(MemoryCache, self).__init__()
1085     # Let's not assume dict is thread safe.
1086     self._lock = threading.Lock()
1087     self._contents = {}
1088
1089   def cached_set(self):
1090     with self._lock:
1091       return set(self._contents)
1092
1093   def touch(self, digest, size):
1094     with self._lock:
1095       return digest in self._contents
1096
1097   def evict(self, digest):
1098     with self._lock:
1099       self._contents.pop(digest, None)
1100
1101   def read(self, digest):
1102     with self._lock:
1103       return self._contents[digest]
1104
1105   def write(self, digest, content):
1106     # Assemble whole stream before taking the lock.
1107     data = ''.join(content)
1108     with self._lock:
1109       self._contents[digest] = data
1110
1111   def link(self, digest, dest, file_mode=None):
1112     file_write(dest, [self.read(digest)])
1113     if file_mode is not None:
1114       os.chmod(dest, file_mode)
1115
1116
1117 def get_hash_algo(_namespace):
1118   """Return hash algorithm class to use when uploading to given |namespace|."""
1119   # TODO(vadimsh): Implement this at some point.
1120   return hashlib.sha1
1121
1122
1123 def is_namespace_with_compression(namespace):
1124   """Returns True if given |namespace| stores compressed objects."""
1125   return namespace.endswith(('-gzip', '-deflate'))
1126
1127
1128 def get_storage_api(file_or_url, namespace):
1129   """Returns an object that implements StorageApi interface."""
1130   if re.match(r'^https?://.+$', file_or_url):
1131     return IsolateServer(file_or_url, namespace)
1132   else:
1133     return FileSystem(file_or_url)
1134
1135
1136 def get_storage(file_or_url, namespace):
1137   """Returns Storage class configured with appropriate StorageApi instance."""
1138   return Storage(
1139       get_storage_api(file_or_url, namespace),
1140       is_namespace_with_compression(namespace))
1141
1142
1143 def upload_tree(base_url, indir, infiles, namespace):
1144   """Uploads the given tree to the given url.
1145
1146   Arguments:
1147     base_url:  The base url, it is assume that |base_url|/has/ can be used to
1148                query if an element was already uploaded, and |base_url|/store/
1149                can be used to upload a new element.
1150     indir:     Root directory the infiles are based in.
1151     infiles:   dict of files to upload from |indir| to |base_url|.
1152     namespace: The namespace to use on the server.
1153   """
1154   with get_storage(base_url, namespace) as storage:
1155     storage.upload_tree(indir, infiles)
1156   return 0
1157
1158
1159 def load_isolated(content, os_flavor, algo):
1160   """Verifies the .isolated file is valid and loads this object with the json
1161   data.
1162
1163   Arguments:
1164   - content: raw serialized content to load.
1165   - os_flavor: OS to load this file on. Optional.
1166   - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1167           algorithm used on the Isolate Server.
1168   """
1169   try:
1170     data = json.loads(content)
1171   except ValueError:
1172     raise ConfigError('Failed to parse: %s...' % content[:100])
1173
1174   if not isinstance(data, dict):
1175     raise ConfigError('Expected dict, got %r' % data)
1176
1177   # Check 'version' first, since it could modify the parsing after.
1178   value = data.get('version', '1.0')
1179   if not isinstance(value, basestring):
1180     raise ConfigError('Expected string, got %r' % value)
1181   if not re.match(r'^(\d+)\.(\d+)$', value):
1182     raise ConfigError('Expected a compatible version, got %r' % value)
1183   if value.split('.', 1)[0] != '1':
1184     raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1185
1186   if algo is None:
1187     # Default the algorithm used in the .isolated file itself, falls back to
1188     # 'sha-1' if unspecified.
1189     algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1190
1191   for key, value in data.iteritems():
1192     if key == 'algo':
1193       if not isinstance(value, basestring):
1194         raise ConfigError('Expected string, got %r' % value)
1195       if value not in SUPPORTED_ALGOS:
1196         raise ConfigError(
1197             'Expected one of \'%s\', got %r' %
1198             (', '.join(sorted(SUPPORTED_ALGOS)), value))
1199       if value != SUPPORTED_ALGOS_REVERSE[algo]:
1200         raise ConfigError(
1201             'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1202
1203     elif key == 'command':
1204       if not isinstance(value, list):
1205         raise ConfigError('Expected list, got %r' % value)
1206       if not value:
1207         raise ConfigError('Expected non-empty command')
1208       for subvalue in value:
1209         if not isinstance(subvalue, basestring):
1210           raise ConfigError('Expected string, got %r' % subvalue)
1211
1212     elif key == 'files':
1213       if not isinstance(value, dict):
1214         raise ConfigError('Expected dict, got %r' % value)
1215       for subkey, subvalue in value.iteritems():
1216         if not isinstance(subkey, basestring):
1217           raise ConfigError('Expected string, got %r' % subkey)
1218         if not isinstance(subvalue, dict):
1219           raise ConfigError('Expected dict, got %r' % subvalue)
1220         for subsubkey, subsubvalue in subvalue.iteritems():
1221           if subsubkey == 'l':
1222             if not isinstance(subsubvalue, basestring):
1223               raise ConfigError('Expected string, got %r' % subsubvalue)
1224           elif subsubkey == 'm':
1225             if not isinstance(subsubvalue, int):
1226               raise ConfigError('Expected int, got %r' % subsubvalue)
1227           elif subsubkey == 'h':
1228             if not is_valid_hash(subsubvalue, algo):
1229               raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1230           elif subsubkey == 's':
1231             if not isinstance(subsubvalue, int):
1232               raise ConfigError('Expected int, got %r' % subsubvalue)
1233           else:
1234             raise ConfigError('Unknown subsubkey %s' % subsubkey)
1235         if bool('h' in subvalue) == bool('l' in subvalue):
1236           raise ConfigError(
1237               'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1238               subvalue)
1239         if bool('h' in subvalue) != bool('s' in subvalue):
1240           raise ConfigError(
1241               'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1242               subvalue)
1243         if bool('s' in subvalue) == bool('l' in subvalue):
1244           raise ConfigError(
1245               'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1246               subvalue)
1247         if bool('l' in subvalue) and bool('m' in subvalue):
1248           raise ConfigError(
1249               'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
1250               subvalue)
1251
1252     elif key == 'includes':
1253       if not isinstance(value, list):
1254         raise ConfigError('Expected list, got %r' % value)
1255       if not value:
1256         raise ConfigError('Expected non-empty includes list')
1257       for subvalue in value:
1258         if not is_valid_hash(subvalue, algo):
1259           raise ConfigError('Expected sha-1, got %r' % subvalue)
1260
1261     elif key == 'read_only':
1262       if not isinstance(value, bool):
1263         raise ConfigError('Expected bool, got %r' % value)
1264
1265     elif key == 'relative_cwd':
1266       if not isinstance(value, basestring):
1267         raise ConfigError('Expected string, got %r' % value)
1268
1269     elif key == 'os':
1270       if os_flavor and value != os_flavor:
1271         raise ConfigError(
1272             'Expected \'os\' to be \'%s\' but got \'%s\'' %
1273             (os_flavor, value))
1274
1275     elif key == 'version':
1276       # Already checked above.
1277       pass
1278
1279     else:
1280       raise ConfigError('Unknown key %r' % key)
1281
1282   # Automatically fix os.path.sep if necessary. While .isolated files are always
1283   # in the the native path format, someone could want to download an .isolated
1284   # tree from another OS.
1285   wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1286   if 'files' in data:
1287     data['files'] = dict(
1288         (k.replace(wrong_path_sep, os.path.sep), v)
1289         for k, v in data['files'].iteritems())
1290     for v in data['files'].itervalues():
1291       if 'l' in v:
1292         v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1293   if 'relative_cwd' in data:
1294     data['relative_cwd'] = data['relative_cwd'].replace(
1295         wrong_path_sep, os.path.sep)
1296   return data
1297
1298
1299 class IsolatedFile(object):
1300   """Represents a single parsed .isolated file."""
1301   def __init__(self, obj_hash, algo):
1302     """|obj_hash| is really the sha-1 of the file."""
1303     logging.debug('IsolatedFile(%s)' % obj_hash)
1304     self.obj_hash = obj_hash
1305     self.algo = algo
1306     # Set once all the left-side of the tree is parsed. 'Tree' here means the
1307     # .isolate and all the .isolated files recursively included by it with
1308     # 'includes' key. The order of each sha-1 in 'includes', each representing a
1309     # .isolated file in the hash table, is important, as the later ones are not
1310     # processed until the firsts are retrieved and read.
1311     self.can_fetch = False
1312
1313     # Raw data.
1314     self.data = {}
1315     # A IsolatedFile instance, one per object in self.includes.
1316     self.children = []
1317
1318     # Set once the .isolated file is loaded.
1319     self._is_parsed = False
1320     # Set once the files are fetched.
1321     self.files_fetched = False
1322
1323   def load(self, os_flavor, content):
1324     """Verifies the .isolated file is valid and loads this object with the json
1325     data.
1326     """
1327     logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1328     assert not self._is_parsed
1329     self.data = load_isolated(content, os_flavor, self.algo)
1330     self.children = [
1331         IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1332     ]
1333     self._is_parsed = True
1334
1335   def fetch_files(self, fetch_queue, files):
1336     """Adds files in this .isolated file not present in |files| dictionary.
1337
1338     Preemptively request files.
1339
1340     Note that |files| is modified by this function.
1341     """
1342     assert self.can_fetch
1343     if not self._is_parsed or self.files_fetched:
1344       return
1345     logging.debug('fetch_files(%s)' % self.obj_hash)
1346     for filepath, properties in self.data.get('files', {}).iteritems():
1347       # Root isolated has priority on the files being mapped. In particular,
1348       # overriden files must not be fetched.
1349       if filepath not in files:
1350         files[filepath] = properties
1351         if 'h' in properties:
1352           # Preemptively request files.
1353           logging.debug('fetching %s' % filepath)
1354           fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
1355     self.files_fetched = True
1356
1357
1358 class Settings(object):
1359   """Results of a completely parsed .isolated file."""
1360   def __init__(self):
1361     self.command = []
1362     self.files = {}
1363     self.read_only = None
1364     self.relative_cwd = None
1365     # The main .isolated file, a IsolatedFile instance.
1366     self.root = None
1367
1368   def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
1369     """Loads the .isolated and all the included .isolated asynchronously.
1370
1371     It enables support for "included" .isolated files. They are processed in
1372     strict order but fetched asynchronously from the cache. This is important so
1373     that a file in an included .isolated file that is overridden by an embedding
1374     .isolated file is not fetched needlessly. The includes are fetched in one
1375     pass and the files are fetched as soon as all the ones on the left-side
1376     of the tree were fetched.
1377
1378     The prioritization is very important here for nested .isolated files.
1379     'includes' have the highest priority and the algorithm is optimized for both
1380     deep and wide trees. A deep one is a long link of .isolated files referenced
1381     one at a time by one item in 'includes'. A wide one has a large number of
1382     'includes' in a single .isolated file. 'left' is defined as an included
1383     .isolated file earlier in the 'includes' list. So the order of the elements
1384     in 'includes' is important.
1385     """
1386     self.root = IsolatedFile(root_isolated_hash, algo)
1387
1388     # Isolated files being retrieved now: hash -> IsolatedFile instance.
1389     pending = {}
1390     # Set of hashes of already retrieved items to refuse recursive includes.
1391     seen = set()
1392
1393     def retrieve(isolated_file):
1394       h = isolated_file.obj_hash
1395       if h in seen:
1396         raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1397       assert h not in pending
1398       seen.add(h)
1399       pending[h] = isolated_file
1400       fetch_queue.add(WorkerPool.HIGH, h)
1401
1402     retrieve(self.root)
1403
1404     while pending:
1405       item_hash = fetch_queue.wait(pending)
1406       item = pending.pop(item_hash)
1407       item.load(os_flavor, fetch_queue.cache.read(item_hash))
1408       if item_hash == root_isolated_hash:
1409         # It's the root item.
1410         item.can_fetch = True
1411
1412       for new_child in item.children:
1413         retrieve(new_child)
1414
1415       # Traverse the whole tree to see if files can now be fetched.
1416       self._traverse_tree(fetch_queue, self.root)
1417
1418     def check(n):
1419       return all(check(x) for x in n.children) and n.files_fetched
1420     assert check(self.root)
1421
1422     self.relative_cwd = self.relative_cwd or ''
1423     self.read_only = self.read_only or False
1424
1425   def _traverse_tree(self, fetch_queue, node):
1426     if node.can_fetch:
1427       if not node.files_fetched:
1428         self._update_self(fetch_queue, node)
1429       will_break = False
1430       for i in node.children:
1431         if not i.can_fetch:
1432           if will_break:
1433             break
1434           # Automatically mark the first one as fetcheable.
1435           i.can_fetch = True
1436           will_break = True
1437         self._traverse_tree(fetch_queue, i)
1438
1439   def _update_self(self, fetch_queue, node):
1440     node.fetch_files(fetch_queue, self.files)
1441     # Grabs properties.
1442     if not self.command and node.data.get('command'):
1443       # Ensure paths are correctly separated on windows.
1444       self.command = node.data['command']
1445       if self.command:
1446         self.command[0] = self.command[0].replace('/', os.path.sep)
1447         self.command = tools.fix_python_path(self.command)
1448     if self.read_only is None and node.data.get('read_only') is not None:
1449       self.read_only = node.data['read_only']
1450     if (self.relative_cwd is None and
1451         node.data.get('relative_cwd') is not None):
1452       self.relative_cwd = node.data['relative_cwd']
1453
1454
1455 def fetch_isolated(
1456     isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
1457   """Aggressively downloads the .isolated file(s), then download all the files.
1458
1459   Arguments:
1460     isolated_hash: hash of the root *.isolated file.
1461     storage: Storage class that communicates with isolate storage.
1462     cache: LocalCache class that knows how to store and map files locally.
1463     algo: hash algorithm to use.
1464     outdir: Output directory to map file tree to.
1465     os_flavor: OS flavor to choose when reading sections of *.isolated file.
1466     require_command: Ensure *.isolated specifies a command to run.
1467
1468   Returns:
1469     Settings object that holds details about loaded *.isolated file.
1470   """
1471   with cache:
1472     fetch_queue = FetchQueue(storage, cache)
1473     settings = Settings()
1474
1475     with tools.Profiler('GetIsolateds'):
1476       # Optionally support local files by manually adding them to cache.
1477       if not is_valid_hash(isolated_hash, algo):
1478         isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1479
1480       # Load all *.isolated and start loading rest of the files.
1481       settings.load(fetch_queue, isolated_hash, os_flavor, algo)
1482       if require_command and not settings.command:
1483         # TODO(vadimsh): All fetch operations are already enqueue and there's no
1484         # easy way to cancel them.
1485         raise ConfigError('No command to run')
1486
1487     with tools.Profiler('GetRest'):
1488       # Create file system hierarchy.
1489       if not os.path.isdir(outdir):
1490         os.makedirs(outdir)
1491       create_directories(outdir, settings.files)
1492       create_links(outdir, settings.files.iteritems())
1493
1494       # Ensure working directory exists.
1495       cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1496       if not os.path.isdir(cwd):
1497         os.makedirs(cwd)
1498
1499       # Multimap: digest -> list of pairs (path, props).
1500       remaining = {}
1501       for filepath, props in settings.files.iteritems():
1502         if 'h' in props:
1503           remaining.setdefault(props['h'], []).append((filepath, props))
1504
1505       # Now block on the remaining files to be downloaded and mapped.
1506       logging.info('Retrieving remaining files (%d of them)...',
1507           fetch_queue.pending_count)
1508       last_update = time.time()
1509       with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1510         while remaining:
1511           detector.ping()
1512
1513           # Wait for any item to finish fetching to cache.
1514           digest = fetch_queue.wait(remaining)
1515
1516           # Link corresponding files to a fetched item in cache.
1517           for filepath, props in remaining.pop(digest):
1518             cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
1519
1520           # Report progress.
1521           duration = time.time() - last_update
1522           if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1523             msg = '%d files remaining...' % len(remaining)
1524             print msg
1525             logging.info(msg)
1526             last_update = time.time()
1527
1528   # Cache could evict some items we just tried to fetch, it's a fatal error.
1529   if not fetch_queue.verify_all_cached():
1530     raise MappingError('Cache is too small to hold all requested files')
1531   return settings
1532
1533
1534 @subcommand.usage('<file1..fileN> or - to read from stdin')
1535 def CMDarchive(parser, args):
1536   """Archives data to the server."""
1537   options, files = parser.parse_args(args)
1538
1539   if files == ['-']:
1540     files = sys.stdin.readlines()
1541
1542   if not files:
1543     parser.error('Nothing to upload')
1544
1545   # Load the necessary metadata.
1546   # TODO(maruel): Use a worker pool to upload as the hashing is being done.
1547   infiles = dict(
1548       (
1549         f,
1550         {
1551           's': os.stat(f).st_size,
1552           'h': hash_file(f, get_hash_algo(options.namespace)),
1553         }
1554       )
1555       for f in files)
1556
1557   with tools.Profiler('Archive'):
1558     ret = upload_tree(
1559         base_url=options.isolate_server,
1560         indir=os.getcwd(),
1561         infiles=infiles,
1562         namespace=options.namespace)
1563   if not ret:
1564     print '\n'.join('%s  %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1565   return ret
1566
1567
1568 def CMDdownload(parser, args):
1569   """Download data from the server.
1570
1571   It can either download individual files or a complete tree from a .isolated
1572   file.
1573   """
1574   parser.add_option(
1575       '-i', '--isolated', metavar='HASH',
1576       help='hash of an isolated file, .isolated file content is discarded, use '
1577            '--file if you need it')
1578   parser.add_option(
1579       '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1580       help='hash and destination of a file, can be used multiple times')
1581   parser.add_option(
1582       '-t', '--target', metavar='DIR', default=os.getcwd(),
1583       help='destination directory')
1584   options, args = parser.parse_args(args)
1585   if args:
1586     parser.error('Unsupported arguments: %s' % args)
1587   if bool(options.isolated) == bool(options.file):
1588     parser.error('Use one of --isolated or --file, and only one.')
1589
1590   options.target = os.path.abspath(options.target)
1591   storage = get_storage(options.isolate_server, options.namespace)
1592   cache = MemoryCache()
1593   algo = get_hash_algo(options.namespace)
1594
1595   # Fetching individual files.
1596   if options.file:
1597     channel = threading_utils.TaskChannel()
1598     pending = {}
1599     for digest, dest in options.file:
1600       pending[digest] = dest
1601       storage.async_fetch(
1602           channel,
1603           WorkerPool.MED,
1604           digest,
1605           UNKNOWN_FILE_SIZE,
1606           functools.partial(file_write, os.path.join(options.target, dest)))
1607     while pending:
1608       fetched = channel.pull()
1609       dest = pending.pop(fetched)
1610       logging.info('%s: %s', fetched, dest)
1611
1612   # Fetching whole isolated tree.
1613   if options.isolated:
1614     settings = fetch_isolated(
1615         isolated_hash=options.isolated,
1616         storage=storage,
1617         cache=cache,
1618         algo=algo,
1619         outdir=options.target,
1620         os_flavor=None,
1621         require_command=False)
1622     rel = os.path.join(options.target, settings.relative_cwd)
1623     print('To run this test please run from the directory %s:' %
1624           os.path.join(options.target, rel))
1625     print('  ' + ' '.join(settings.command))
1626
1627   return 0
1628
1629
1630 class OptionParserIsolateServer(tools.OptionParserWithLogging):
1631   def __init__(self, **kwargs):
1632     tools.OptionParserWithLogging.__init__(self, **kwargs)
1633     self.add_option(
1634         '-I', '--isolate-server',
1635         metavar='URL', default='',
1636         help='Isolate server to use')
1637     self.add_option(
1638         '--namespace', default='default-gzip',
1639         help='The namespace to use on the server, default: %default')
1640
1641   def parse_args(self, *args, **kwargs):
1642     options, args = tools.OptionParserWithLogging.parse_args(
1643         self, *args, **kwargs)
1644     options.isolate_server = options.isolate_server.rstrip('/')
1645     if not options.isolate_server:
1646       self.error('--isolate-server is required.')
1647     return options, args
1648
1649
1650 def main(args):
1651   dispatcher = subcommand.CommandDispatcher(__name__)
1652   try:
1653     return dispatcher.execute(
1654         OptionParserIsolateServer(version=__version__), args)
1655   except (ConfigError, MappingError) as e:
1656     sys.stderr.write('\nError: ')
1657     sys.stderr.write(str(e))
1658     sys.stderr.write('\n')
1659     return 1
1660
1661
1662 if __name__ == '__main__':
1663   fix_encoding.fix_encoding()
1664   tools.disable_buffering()
1665   colorama.init()
1666   sys.exit(main(sys.argv[1:]))