Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / isolateserver.py
1 #!/usr/bin/env python
2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
5
6 """Archives a set of files or directories to a server."""
7
8 __version__ = '0.3.4'
9
10 import functools
11 import hashlib
12 import json
13 import logging
14 import os
15 import re
16 import shutil
17 import stat
18 import sys
19 import tempfile
20 import threading
21 import time
22 import urllib
23 import urlparse
24 import zlib
25
26 from third_party import colorama
27 from third_party.depot_tools import fix_encoding
28 from third_party.depot_tools import subcommand
29
30 from utils import file_path
31 from utils import net
32 from utils import on_error
33 from utils import threading_utils
34 from utils import tools
35
36 import auth
37
38
39 # Version of isolate protocol passed to the server in /handshake request.
40 ISOLATE_PROTOCOL_VERSION = '1.0'
41 # Version stored and expected in .isolated files.
42 ISOLATED_FILE_VERSION = '1.4'
43
44
45 # The number of files to check the isolate server per /pre-upload query.
46 # All files are sorted by likelihood of a change in the file content
47 # (currently file size is used to estimate this: larger the file -> larger the
48 # possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
49 # are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
50 # and so on. Numbers here is a trade-off; the more per request, the lower the
51 # effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
52 # larger values cause longer lookups, increasing the initial latency to start
53 # uploading, which is especially an issue for large files. This value is
54 # optimized for the "few thousands files to look up with minimal number of large
55 # files missing" case.
56 ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
57
58
59 # A list of already compressed extension types that should not receive any
60 # compression before being uploaded.
61 ALREADY_COMPRESSED_TYPES = [
62     '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
63     'wav', 'zip'
64 ]
65
66
67 # The file size to be used when we don't know the correct file size,
68 # generally used for .isolated files.
69 UNKNOWN_FILE_SIZE = None
70
71
72 # Chunk size to use when doing disk I/O.
73 DISK_FILE_CHUNK = 1024 * 1024
74
75 # Chunk size to use when reading from network stream.
76 NET_IO_FILE_CHUNK = 16 * 1024
77
78
79 # Read timeout in seconds for downloads from isolate storage. If there's no
80 # response from the server within this timeout whole download will be aborted.
81 DOWNLOAD_READ_TIMEOUT = 60
82
83 # Maximum expected delay (in seconds) between successive file fetches
84 # in run_tha_test. If it takes longer than that, a deadlock might be happening
85 # and all stack frames for all threads are dumped to log.
86 DEADLOCK_TIMEOUT = 5 * 60
87
88
89 # The delay (in seconds) to wait between logging statements when retrieving
90 # the required files. This is intended to let the user (or buildbot) know that
91 # the program is still running.
92 DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
95 # Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
96 # specify the names here.
97 SUPPORTED_ALGOS = {
98   'md5': hashlib.md5,
99   'sha-1': hashlib.sha1,
100   'sha-512': hashlib.sha512,
101 }
102
103
104 # Used for serialization.
105 SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
106
107
108 DEFAULT_BLACKLIST = (
109   # Temporary vim or python files.
110   r'^.+\.(?:pyc|swp)$',
111   # .git or .svn directory.
112   r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
113 )
114
115
116 # Chromium-specific.
117 DEFAULT_BLACKLIST += (
118   r'^.+\.(?:run_test_cases)$',
119   r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
120 )
121
122
123 class Error(Exception):
124   """Generic runtime error."""
125   pass
126
127
128 class ConfigError(ValueError):
129   """Generic failure to load a .isolated file."""
130   pass
131
132
133 class MappingError(OSError):
134   """Failed to recreate the tree."""
135   pass
136
137
138 def is_valid_hash(value, algo):
139   """Returns if the value is a valid hash for the corresponding algorithm."""
140   size = 2 * algo().digest_size
141   return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
142
143
144 def hash_file(filepath, algo):
145   """Calculates the hash of a file without reading it all in memory at once.
146
147   |algo| should be one of hashlib hashing algorithm.
148   """
149   digest = algo()
150   with open(filepath, 'rb') as f:
151     while True:
152       chunk = f.read(DISK_FILE_CHUNK)
153       if not chunk:
154         break
155       digest.update(chunk)
156   return digest.hexdigest()
157
158
159 def stream_read(stream, chunk_size):
160   """Reads chunks from |stream| and yields them."""
161   while True:
162     data = stream.read(chunk_size)
163     if not data:
164       break
165     yield data
166
167
168 def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
169   """Yields file content in chunks of |chunk_size| starting from |offset|."""
170   with open(filepath, 'rb') as f:
171     if offset:
172       f.seek(offset)
173     while True:
174       data = f.read(chunk_size)
175       if not data:
176         break
177       yield data
178
179
180 def file_write(filepath, content_generator):
181   """Writes file content as generated by content_generator.
182
183   Creates the intermediary directory as needed.
184
185   Returns the number of bytes written.
186
187   Meant to be mocked out in unit tests.
188   """
189   filedir = os.path.dirname(filepath)
190   if not os.path.isdir(filedir):
191     os.makedirs(filedir)
192   total = 0
193   with open(filepath, 'wb') as f:
194     for d in content_generator:
195       total += len(d)
196       f.write(d)
197   return total
198
199
200 def zip_compress(content_generator, level=7):
201   """Reads chunks from |content_generator| and yields zip compressed chunks."""
202   compressor = zlib.compressobj(level)
203   for chunk in content_generator:
204     compressed = compressor.compress(chunk)
205     if compressed:
206       yield compressed
207   tail = compressor.flush(zlib.Z_FINISH)
208   if tail:
209     yield tail
210
211
212 def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
213   """Reads zipped data from |content_generator| and yields decompressed data.
214
215   Decompresses data in small chunks (no larger than |chunk_size|) so that
216   zip bomb file doesn't cause zlib to preallocate huge amount of memory.
217
218   Raises IOError if data is corrupted or incomplete.
219   """
220   decompressor = zlib.decompressobj()
221   compressed_size = 0
222   try:
223     for chunk in content_generator:
224       compressed_size += len(chunk)
225       data = decompressor.decompress(chunk, chunk_size)
226       if data:
227         yield data
228       while decompressor.unconsumed_tail:
229         data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
230         if data:
231           yield data
232     tail = decompressor.flush()
233     if tail:
234       yield tail
235   except zlib.error as e:
236     raise IOError(
237         'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
238   # Ensure all data was read and decompressed.
239   if decompressor.unused_data or decompressor.unconsumed_tail:
240     raise IOError('Not all data was decompressed')
241
242
243 def get_zip_compression_level(filename):
244   """Given a filename calculates the ideal zip compression level to use."""
245   file_ext = os.path.splitext(filename)[1].lower()
246   # TODO(csharp): Profile to find what compression level works best.
247   return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
248
249
250 def create_directories(base_directory, files):
251   """Creates the directory structure needed by the given list of files."""
252   logging.debug('create_directories(%s, %d)', base_directory, len(files))
253   # Creates the tree of directories to create.
254   directories = set(os.path.dirname(f) for f in files)
255   for item in list(directories):
256     while item:
257       directories.add(item)
258       item = os.path.dirname(item)
259   for d in sorted(directories):
260     if d:
261       os.mkdir(os.path.join(base_directory, d))
262
263
264 def create_symlinks(base_directory, files):
265   """Creates any symlinks needed by the given set of files."""
266   for filepath, properties in files:
267     if 'l' not in properties:
268       continue
269     if sys.platform == 'win32':
270       # TODO(maruel): Create symlink via the win32 api.
271       logging.warning('Ignoring symlink %s', filepath)
272       continue
273     outfile = os.path.join(base_directory, filepath)
274     # os.symlink() doesn't exist on Windows.
275     os.symlink(properties['l'], outfile)  # pylint: disable=E1101
276
277
278 def is_valid_file(filepath, size):
279   """Determines if the given files appears valid.
280
281   Currently it just checks the file's size.
282   """
283   if size == UNKNOWN_FILE_SIZE:
284     return os.path.isfile(filepath)
285   actual_size = os.stat(filepath).st_size
286   if size != actual_size:
287     logging.warning(
288         'Found invalid item %s; %d != %d',
289         os.path.basename(filepath), actual_size, size)
290     return False
291   return True
292
293
294 class WorkerPool(threading_utils.AutoRetryThreadPool):
295   """Thread pool that automatically retries on IOError and runs a preconfigured
296   function.
297   """
298   # Initial and maximum number of worker threads.
299   INITIAL_WORKERS = 2
300   MAX_WORKERS = 16
301   RETRIES = 5
302
303   def __init__(self):
304     super(WorkerPool, self).__init__(
305         [IOError],
306         self.RETRIES,
307         self.INITIAL_WORKERS,
308         self.MAX_WORKERS,
309         0,
310         'remote')
311
312
313 class Item(object):
314   """An item to push to Storage.
315
316   Its digest and size may be provided in advance, if known. Otherwise they will
317   be derived from content(). If digest is provided, it MUST correspond to
318   hash algorithm used by Storage.
319
320   When used with Storage, Item starts its life in a main thread, travels
321   to 'contains' thread, then to 'push' thread and then finally back to
322   the main thread. It is never used concurrently from multiple threads.
323   """
324
325   def __init__(self, digest=None, size=None, high_priority=False):
326     self.digest = digest
327     self.size = size
328     self.high_priority = high_priority
329     self.compression_level = 6
330
331   def content(self):
332     """Iterable with content of this item as byte string (str) chunks."""
333     raise NotImplementedError()
334
335   def prepare(self, hash_algo):
336     """Ensures self.digest and self.size are set.
337
338     Uses content() as a source of data to calculate them. Does nothing if digest
339     and size is already known.
340
341     Arguments:
342       hash_algo: hash algorithm to use to calculate digest.
343     """
344     if self.digest is None or self.size is None:
345       digest = hash_algo()
346       total = 0
347       for chunk in self.content():
348         digest.update(chunk)
349         total += len(chunk)
350       self.digest = digest.hexdigest()
351       self.size = total
352
353
354 class FileItem(Item):
355   """A file to push to Storage.
356
357   Its digest and size may be provided in advance, if known. Otherwise they will
358   be derived from the file content.
359   """
360
361   def __init__(self, path, digest=None, size=None, high_priority=False):
362     super(FileItem, self).__init__(
363         digest,
364         size if size is not None else os.stat(path).st_size,
365         high_priority)
366     self.path = path
367     self.compression_level = get_zip_compression_level(path)
368
369   def content(self):
370     return file_read(self.path)
371
372
373 class BufferItem(Item):
374   """A byte buffer to push to Storage."""
375
376   def __init__(self, buf, high_priority=False):
377     super(BufferItem, self).__init__(None, len(buf), high_priority)
378     self.buffer = buf
379
380   def content(self):
381     return [self.buffer]
382
383
384 class Storage(object):
385   """Efficiently downloads or uploads large set of files via StorageApi.
386
387   Implements compression support, parallel 'contains' checks, parallel uploads
388   and more.
389
390   Works only within single namespace (and thus hashing algorithm and compression
391   scheme are fixed).
392
393   Spawns multiple internal threads. Thread safe, but not fork safe.
394   """
395
396   def __init__(self, storage_api):
397     self._storage_api = storage_api
398     self._use_zip = is_namespace_with_compression(storage_api.namespace)
399     self._hash_algo = get_hash_algo(storage_api.namespace)
400     self._cpu_thread_pool = None
401     self._net_thread_pool = None
402
403   @property
404   def hash_algo(self):
405     """Hashing algorithm used to name files in storage based on their content.
406
407     Defined by |namespace|. See also 'get_hash_algo'.
408     """
409     return self._hash_algo
410
411   @property
412   def location(self):
413     """Location of a backing store that this class is using.
414
415     Exact meaning depends on the storage_api type. For IsolateServer it is
416     an URL of isolate server, for FileSystem is it a path in file system.
417     """
418     return self._storage_api.location
419
420   @property
421   def namespace(self):
422     """Isolate namespace used by this storage.
423
424     Indirectly defines hashing scheme and compression method used.
425     """
426     return self._storage_api.namespace
427
428   @property
429   def cpu_thread_pool(self):
430     """ThreadPool for CPU-bound tasks like zipping."""
431     if self._cpu_thread_pool is None:
432       self._cpu_thread_pool = threading_utils.ThreadPool(
433           2, max(threading_utils.num_processors(), 2), 0, 'zip')
434     return self._cpu_thread_pool
435
436   @property
437   def net_thread_pool(self):
438     """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
439     if self._net_thread_pool is None:
440       self._net_thread_pool = WorkerPool()
441     return self._net_thread_pool
442
443   def close(self):
444     """Waits for all pending tasks to finish."""
445     if self._cpu_thread_pool:
446       self._cpu_thread_pool.join()
447       self._cpu_thread_pool.close()
448       self._cpu_thread_pool = None
449     if self._net_thread_pool:
450       self._net_thread_pool.join()
451       self._net_thread_pool.close()
452       self._net_thread_pool = None
453
454   def __enter__(self):
455     """Context manager interface."""
456     return self
457
458   def __exit__(self, _exc_type, _exc_value, _traceback):
459     """Context manager interface."""
460     self.close()
461     return False
462
463   def upload_items(self, items):
464     """Uploads a bunch of items to the isolate server.
465
466     It figures out what items are missing from the server and uploads only them.
467
468     Arguments:
469       items: list of Item instances that represents data to upload.
470
471     Returns:
472       List of items that were uploaded. All other items are already there.
473     """
474     # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
475     # used by swarming.py. There's no need to spawn multiple threads and try to
476     # do stuff in parallel: there's nothing to parallelize. 'contains' check and
477     # 'push' should be performed sequentially in the context of current thread.
478
479     # Ensure all digests are calculated.
480     for item in items:
481       item.prepare(self._hash_algo)
482
483     # For each digest keep only first Item that matches it. All other items
484     # are just indistinguishable copies from the point of view of isolate
485     # server (it doesn't care about paths at all, only content and digests).
486     seen = {}
487     duplicates = 0
488     for item in items:
489       if seen.setdefault(item.digest, item) is not item:
490         duplicates += 1
491     items = seen.values()
492     if duplicates:
493       logging.info('Skipped %d duplicated files', duplicates)
494
495     # Enqueue all upload tasks.
496     missing = set()
497     uploaded = []
498     channel = threading_utils.TaskChannel()
499     for missing_item, push_state in self.get_missing_items(items):
500       missing.add(missing_item)
501       self.async_push(channel, missing_item, push_state)
502
503     # No need to spawn deadlock detector thread if there's nothing to upload.
504     if missing:
505       with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
506         # Wait for all started uploads to finish.
507         while len(uploaded) != len(missing):
508           detector.ping()
509           item = channel.pull()
510           uploaded.append(item)
511           logging.debug(
512               'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
513     logging.info('All files are uploaded')
514
515     # Print stats.
516     total = len(items)
517     total_size = sum(f.size for f in items)
518     logging.info(
519         'Total:      %6d, %9.1fkb',
520         total,
521         total_size / 1024.)
522     cache_hit = set(items) - missing
523     cache_hit_size = sum(f.size for f in cache_hit)
524     logging.info(
525         'cache hit:  %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
526         len(cache_hit),
527         cache_hit_size / 1024.,
528         len(cache_hit) * 100. / total,
529         cache_hit_size * 100. / total_size if total_size else 0)
530     cache_miss = missing
531     cache_miss_size = sum(f.size for f in cache_miss)
532     logging.info(
533         'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
534         len(cache_miss),
535         cache_miss_size / 1024.,
536         len(cache_miss) * 100. / total,
537         cache_miss_size * 100. / total_size if total_size else 0)
538
539     return uploaded
540
541   def get_fetch_url(self, item):
542     """Returns an URL that can be used to fetch given item once it's uploaded.
543
544     Note that if namespace uses compression, data at given URL is compressed.
545
546     Arguments:
547       item: Item to get fetch URL for.
548
549     Returns:
550       An URL or None if underlying protocol doesn't support this.
551     """
552     item.prepare(self._hash_algo)
553     return self._storage_api.get_fetch_url(item.digest)
554
555   def async_push(self, channel, item, push_state):
556     """Starts asynchronous push to the server in a parallel thread.
557
558     Can be used only after |item| was checked for presence on a server with
559     'get_missing_items' call. 'get_missing_items' returns |push_state| object
560     that contains storage specific information describing how to upload
561     the item (for example in case of cloud storage, it is signed upload URLs).
562
563     Arguments:
564       channel: TaskChannel that receives back |item| when upload ends.
565       item: item to upload as instance of Item class.
566       push_state: push state returned by 'get_missing_items' call for |item|.
567
568     Returns:
569       None, but |channel| later receives back |item| when upload ends.
570     """
571     # Thread pool task priority.
572     priority = WorkerPool.HIGH if item.high_priority else WorkerPool.MED
573
574     def push(content):
575       """Pushes an Item and returns it to |channel|."""
576       item.prepare(self._hash_algo)
577       self._storage_api.push(item, push_state, content)
578       return item
579
580     # If zipping is not required, just start a push task.
581     if not self._use_zip:
582       self.net_thread_pool.add_task_with_channel(
583           channel, priority, push, item.content())
584       return
585
586     # If zipping is enabled, zip in a separate thread.
587     def zip_and_push():
588       # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
589       # content right here. It will block until all file is zipped.
590       try:
591         stream = zip_compress(item.content(), item.compression_level)
592         data = ''.join(stream)
593       except Exception as exc:
594         logging.error('Failed to zip \'%s\': %s', item, exc)
595         channel.send_exception()
596         return
597       self.net_thread_pool.add_task_with_channel(
598           channel, priority, push, [data])
599     self.cpu_thread_pool.add_task(priority, zip_and_push)
600
601   def push(self, item, push_state):
602     """Synchronously pushes a single item to the server.
603
604     If you need to push many items at once, consider using 'upload_items' or
605     'async_push' with instance of TaskChannel.
606
607     Arguments:
608       item: item to upload as instance of Item class.
609       push_state: push state returned by 'get_missing_items' call for |item|.
610
611     Returns:
612       Pushed item (same object as |item|).
613     """
614     channel = threading_utils.TaskChannel()
615     with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
616       self.async_push(channel, item, push_state)
617       pushed = channel.pull()
618       assert pushed is item
619     return item
620
621   def async_fetch(self, channel, priority, digest, size, sink):
622     """Starts asynchronous fetch from the server in a parallel thread.
623
624     Arguments:
625       channel: TaskChannel that receives back |digest| when download ends.
626       priority: thread pool task priority for the fetch.
627       digest: hex digest of an item to download.
628       size: expected size of the item (after decompression).
629       sink: function that will be called as sink(generator).
630     """
631     def fetch():
632       try:
633         # Prepare reading pipeline.
634         stream = self._storage_api.fetch(digest)
635         if self._use_zip:
636           stream = zip_decompress(stream, DISK_FILE_CHUNK)
637         # Run |stream| through verifier that will assert its size.
638         verifier = FetchStreamVerifier(stream, size)
639         # Verified stream goes to |sink|.
640         sink(verifier.run())
641       except Exception as err:
642         logging.error('Failed to fetch %s: %s', digest, err)
643         raise
644       return digest
645
646     # Don't bother with zip_thread_pool for decompression. Decompression is
647     # really fast and most probably IO bound anyway.
648     self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
649
650   def get_missing_items(self, items):
651     """Yields items that are missing from the server.
652
653     Issues multiple parallel queries via StorageApi's 'contains' method.
654
655     Arguments:
656       items: a list of Item objects to check.
657
658     Yields:
659       For each missing item it yields a pair (item, push_state), where:
660         * item - Item object that is missing (one of |items|).
661         * push_state - opaque object that contains storage specific information
662             describing how to upload the item (for example in case of cloud
663             storage, it is signed upload URLs). It can later be passed to
664             'async_push'.
665     """
666     channel = threading_utils.TaskChannel()
667     pending = 0
668
669     # Ensure all digests are calculated.
670     for item in items:
671       item.prepare(self._hash_algo)
672
673     # Enqueue all requests.
674     for batch in batch_items_for_check(items):
675       self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
676           self._storage_api.contains, batch)
677       pending += 1
678
679     # Yield results as they come in.
680     for _ in xrange(pending):
681       for missing_item, push_state in channel.pull().iteritems():
682         yield missing_item, push_state
683
684
685 def batch_items_for_check(items):
686   """Splits list of items to check for existence on the server into batches.
687
688   Each batch corresponds to a single 'exists?' query to the server via a call
689   to StorageApi's 'contains' method.
690
691   Arguments:
692     items: a list of Item objects.
693
694   Yields:
695     Batches of items to query for existence in a single operation,
696     each batch is a list of Item objects.
697   """
698   batch_count = 0
699   batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
700   next_queries = []
701   for item in sorted(items, key=lambda x: x.size, reverse=True):
702     next_queries.append(item)
703     if len(next_queries) == batch_size_limit:
704       yield next_queries
705       next_queries = []
706       batch_count += 1
707       batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
708           min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
709   if next_queries:
710     yield next_queries
711
712
713 class FetchQueue(object):
714   """Fetches items from Storage and places them into LocalCache.
715
716   It manages multiple concurrent fetch operations. Acts as a bridge between
717   Storage and LocalCache so that Storage and LocalCache don't depend on each
718   other at all.
719   """
720
721   def __init__(self, storage, cache):
722     self.storage = storage
723     self.cache = cache
724     self._channel = threading_utils.TaskChannel()
725     self._pending = set()
726     self._accessed = set()
727     self._fetched = cache.cached_set()
728
729   def add(self, digest, size=UNKNOWN_FILE_SIZE, priority=WorkerPool.MED):
730     """Starts asynchronous fetch of item |digest|."""
731     # Fetching it now?
732     if digest in self._pending:
733       return
734
735     # Mark this file as in use, verify_all_cached will later ensure it is still
736     # in cache.
737     self._accessed.add(digest)
738
739     # Already fetched? Notify cache to update item's LRU position.
740     if digest in self._fetched:
741       # 'touch' returns True if item is in cache and not corrupted.
742       if self.cache.touch(digest, size):
743         return
744       # Item is corrupted, remove it from cache and fetch it again.
745       self._fetched.remove(digest)
746       self.cache.evict(digest)
747
748     # TODO(maruel): It should look at the free disk space, the current cache
749     # size and the size of the new item on every new item:
750     # - Trim the cache as more entries are listed when free disk space is low,
751     #   otherwise if the amount of data downloaded during the run > free disk
752     #   space, it'll crash.
753     # - Make sure there's enough free disk space to fit all dependencies of
754     #   this run! If not, abort early.
755
756     # Start fetching.
757     self._pending.add(digest)
758     self.storage.async_fetch(
759         self._channel, priority, digest, size,
760         functools.partial(self.cache.write, digest))
761
762   def wait(self, digests):
763     """Starts a loop that waits for at least one of |digests| to be retrieved.
764
765     Returns the first digest retrieved.
766     """
767     # Flush any already fetched items.
768     for digest in digests:
769       if digest in self._fetched:
770         return digest
771
772     # Ensure all requested items are being fetched now.
773     assert all(digest in self._pending for digest in digests), (
774         digests, self._pending)
775
776     # Wait for some requested item to finish fetching.
777     while self._pending:
778       digest = self._channel.pull()
779       self._pending.remove(digest)
780       self._fetched.add(digest)
781       if digest in digests:
782         return digest
783
784     # Should never reach this point due to assert above.
785     raise RuntimeError('Impossible state')
786
787   def inject_local_file(self, path, algo):
788     """Adds local file to the cache as if it was fetched from storage."""
789     with open(path, 'rb') as f:
790       data = f.read()
791     digest = algo(data).hexdigest()
792     self.cache.write(digest, [data])
793     self._fetched.add(digest)
794     return digest
795
796   @property
797   def pending_count(self):
798     """Returns number of items to be fetched."""
799     return len(self._pending)
800
801   def verify_all_cached(self):
802     """True if all accessed items are in cache."""
803     return self._accessed.issubset(self.cache.cached_set())
804
805
806 class FetchStreamVerifier(object):
807   """Verifies that fetched file is valid before passing it to the LocalCache."""
808
809   def __init__(self, stream, expected_size):
810     self.stream = stream
811     self.expected_size = expected_size
812     self.current_size = 0
813
814   def run(self):
815     """Generator that yields same items as |stream|.
816
817     Verifies |stream| is complete before yielding a last chunk to consumer.
818
819     Also wraps IOError produced by consumer into MappingError exceptions since
820     otherwise Storage will retry fetch on unrelated local cache errors.
821     """
822     # Read one chunk ahead, keep it in |stored|.
823     # That way a complete stream can be verified before pushing last chunk
824     # to consumer.
825     stored = None
826     for chunk in self.stream:
827       assert chunk is not None
828       if stored is not None:
829         self._inspect_chunk(stored, is_last=False)
830         try:
831           yield stored
832         except IOError as exc:
833           raise MappingError('Failed to store an item in cache: %s' % exc)
834       stored = chunk
835     if stored is not None:
836       self._inspect_chunk(stored, is_last=True)
837       try:
838         yield stored
839       except IOError as exc:
840         raise MappingError('Failed to store an item in cache: %s' % exc)
841
842   def _inspect_chunk(self, chunk, is_last):
843     """Called for each fetched chunk before passing it to consumer."""
844     self.current_size += len(chunk)
845     if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
846         (self.expected_size != self.current_size)):
847       raise IOError('Incorrect file size: expected %d, got %d' % (
848           self.expected_size, self.current_size))
849
850
851 class StorageApi(object):
852   """Interface for classes that implement low-level storage operations.
853
854   StorageApi is oblivious of compression and hashing scheme used. This details
855   are handled in higher level Storage class.
856
857   Clients should generally not use StorageApi directly. Storage class is
858   preferred since it implements compression and upload optimizations.
859   """
860
861   @property
862   def location(self):
863     """Location of a backing store that this class is using.
864
865     Exact meaning depends on the type. For IsolateServer it is an URL of isolate
866     server, for FileSystem is it a path in file system.
867     """
868     raise NotImplementedError()
869
870   @property
871   def namespace(self):
872     """Isolate namespace used by this storage.
873
874     Indirectly defines hashing scheme and compression method used.
875     """
876     raise NotImplementedError()
877
878   def get_fetch_url(self, digest):
879     """Returns an URL that can be used to fetch an item with given digest.
880
881     Arguments:
882       digest: hex digest of item to fetch.
883
884     Returns:
885       An URL or None if the protocol doesn't support this.
886     """
887     raise NotImplementedError()
888
889   def fetch(self, digest, offset=0):
890     """Fetches an object and yields its content.
891
892     Arguments:
893       digest: hash digest of item to download.
894       offset: offset (in bytes) from the start of the file to resume fetch from.
895
896     Yields:
897       Chunks of downloaded item (as str objects).
898     """
899     raise NotImplementedError()
900
901   def push(self, item, push_state, content=None):
902     """Uploads an |item| with content generated by |content| generator.
903
904     |item| MUST go through 'contains' call to get |push_state| before it can
905     be pushed to the storage.
906
907     To be clear, here is one possible usage:
908       all_items = [... all items to push as Item subclasses ...]
909       for missing_item, push_state in storage_api.contains(all_items).items():
910         storage_api.push(missing_item, push_state)
911
912     When pushing to a namespace with compression, data that should be pushed
913     and data provided by the item is not the same. In that case |content| is
914     not None and it yields chunks of compressed data (using item.content() as
915     a source of original uncompressed data). This is implemented by Storage
916     class.
917
918     Arguments:
919       item: Item object that holds information about an item being pushed.
920       push_state: push state object as returned by 'contains' call.
921       content: a generator that yields chunks to push, item.content() if None.
922
923     Returns:
924       None.
925     """
926     raise NotImplementedError()
927
928   def contains(self, items):
929     """Checks for |items| on the server, prepares missing ones for upload.
930
931     Arguments:
932       items: list of Item objects to check for presence.
933
934     Returns:
935       A dict missing Item -> opaque push state object to be passed to 'push'.
936       See doc string for 'push'.
937     """
938     raise NotImplementedError()
939
940
941 class _IsolateServerPushState(object):
942   """Per-item state passed from IsolateServer.contains to IsolateServer.push.
943
944   Note this needs to be a global class to support pickling.
945   """
946
947   def __init__(self, upload_url, finalize_url):
948     self.upload_url = upload_url
949     self.finalize_url = finalize_url
950     self.uploaded = False
951     self.finalized = False
952
953
954 class IsolateServer(StorageApi):
955   """StorageApi implementation that downloads and uploads to Isolate Server.
956
957   It uploads and downloads directly from Google Storage whenever appropriate.
958   Works only within single namespace.
959   """
960
961   def __init__(self, base_url, namespace):
962     super(IsolateServer, self).__init__()
963     assert base_url.startswith('http'), base_url
964     self._base_url = base_url.rstrip('/')
965     self._namespace = namespace
966     self._lock = threading.Lock()
967     self._server_caps = None
968
969   @staticmethod
970   def _generate_handshake_request():
971     """Returns a dict to be sent as handshake request body."""
972     # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
973     return {
974         'client_app_version': __version__,
975         'fetcher': True,
976         'protocol_version': ISOLATE_PROTOCOL_VERSION,
977         'pusher': True,
978     }
979
980   @staticmethod
981   def _validate_handshake_response(caps):
982     """Validates and normalizes handshake response."""
983     logging.info('Protocol version: %s', caps['protocol_version'])
984     logging.info('Server version: %s', caps['server_app_version'])
985     if caps.get('error'):
986       raise MappingError(caps['error'])
987     if not caps['access_token']:
988       raise ValueError('access_token is missing')
989     return caps
990
991   @property
992   def _server_capabilities(self):
993     """Performs handshake with the server if not yet done.
994
995     Returns:
996       Server capabilities dictionary as returned by /handshake endpoint.
997
998     Raises:
999       MappingError if server rejects the handshake.
1000     """
1001     # TODO(maruel): Make this request much earlier asynchronously while the
1002     # files are being enumerated.
1003
1004     # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1005     # namespace-level ACLs to this call.
1006     with self._lock:
1007       if self._server_caps is None:
1008         request_body = json.dumps(
1009             self._generate_handshake_request(), separators=(',', ':'))
1010         response = net.url_read(
1011             url=self._base_url + '/content-gs/handshake',
1012             data=request_body,
1013             content_type='application/json',
1014             method='POST')
1015         if response is None:
1016           raise MappingError('Failed to perform handshake.')
1017         try:
1018           caps = json.loads(response)
1019           if not isinstance(caps, dict):
1020             raise ValueError('Expecting JSON dict')
1021           self._server_caps = self._validate_handshake_response(caps)
1022         except (ValueError, KeyError, TypeError) as exc:
1023           # KeyError exception has very confusing str conversion: it's just a
1024           # missing key value and nothing else. So print exception class name
1025           # as well.
1026           raise MappingError('Invalid handshake response (%s): %s' % (
1027               exc.__class__.__name__, exc))
1028       return self._server_caps
1029
1030   @property
1031   def location(self):
1032     return self._base_url
1033
1034   @property
1035   def namespace(self):
1036     return self._namespace
1037
1038   def get_fetch_url(self, digest):
1039     assert isinstance(digest, basestring)
1040     return '%s/content-gs/retrieve/%s/%s' % (
1041         self._base_url, self._namespace, digest)
1042
1043   def fetch(self, digest, offset=0):
1044     source_url = self.get_fetch_url(digest)
1045     logging.debug('download_file(%s, %d)', source_url, offset)
1046
1047     connection = net.url_open(
1048         source_url,
1049         read_timeout=DOWNLOAD_READ_TIMEOUT,
1050         headers={'Range': 'bytes=%d-' % offset} if offset else None)
1051
1052     if not connection:
1053       raise IOError('Request failed - %s' % source_url)
1054
1055     # If |offset| is used, verify server respects it by checking Content-Range.
1056     if offset:
1057       content_range = connection.get_header('Content-Range')
1058       if not content_range:
1059         raise IOError('Missing Content-Range header')
1060
1061       # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1062       # According to a spec, <size> can be '*' meaning "Total size of the file
1063       # is not known in advance".
1064       try:
1065         match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1066         if not match:
1067           raise ValueError()
1068         content_offset = int(match.group(1))
1069         last_byte_index = int(match.group(2))
1070         size = None if match.group(3) == '*' else int(match.group(3))
1071       except ValueError:
1072         raise IOError('Invalid Content-Range header: %s' % content_range)
1073
1074       # Ensure returned offset equals requested one.
1075       if offset != content_offset:
1076         raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1077             offset, content_offset, content_range))
1078
1079       # Ensure entire tail of the file is returned.
1080       if size is not None and last_byte_index + 1 != size:
1081         raise IOError('Incomplete response. Content-Range: %s' % content_range)
1082
1083     return stream_read(connection, NET_IO_FILE_CHUNK)
1084
1085   def push(self, item, push_state, content=None):
1086     assert isinstance(item, Item)
1087     assert item.digest is not None
1088     assert item.size is not None
1089     assert isinstance(push_state, _IsolateServerPushState)
1090     assert not push_state.finalized
1091
1092     # Default to item.content().
1093     content = item.content() if content is None else content
1094
1095     # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1096     if isinstance(content, basestring):
1097       assert not isinstance(content, unicode), 'Unicode string is not allowed'
1098       content = [content]
1099
1100     # TODO(vadimsh): Do not read from |content| generator when retrying push.
1101     # If |content| is indeed a generator, it can not be re-winded back
1102     # to the beginning of the stream. A retry will find it exhausted. A possible
1103     # solution is to wrap |content| generator with some sort of caching
1104     # restartable generator. It should be done alongside streaming support
1105     # implementation.
1106
1107     # This push operation may be a retry after failed finalization call below,
1108     # no need to reupload contents in that case.
1109     if not push_state.uploaded:
1110       # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1111       # upload support is implemented.
1112       if isinstance(content, list) and len(content) == 1:
1113         content = content[0]
1114       else:
1115         content = ''.join(content)
1116       # PUT file to |upload_url|.
1117       response = net.url_read(
1118           url=push_state.upload_url,
1119           data=content,
1120           content_type='application/octet-stream',
1121           method='PUT')
1122       if response is None:
1123         raise IOError('Failed to upload a file %s to %s' % (
1124             item.digest, push_state.upload_url))
1125       push_state.uploaded = True
1126     else:
1127       logging.info(
1128           'A file %s already uploaded, retrying finalization only', item.digest)
1129
1130     # Optionally notify the server that it's done.
1131     if push_state.finalize_url:
1132       # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1133       # send it to isolated server. That way isolate server can verify that
1134       # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1135       # stored files).
1136       response = net.url_read(
1137           url=push_state.finalize_url,
1138           data='',
1139           content_type='application/json',
1140           method='POST')
1141       if response is None:
1142         raise IOError('Failed to finalize an upload of %s' % item.digest)
1143     push_state.finalized = True
1144
1145   def contains(self, items):
1146     logging.info('Checking existence of %d files...', len(items))
1147
1148     # Ensure all items were initialized with 'prepare' call. Storage does that.
1149     assert all(i.digest is not None and i.size is not None for i in items)
1150
1151     # Request body is a json encoded list of dicts.
1152     body = [
1153         {
1154           'h': item.digest,
1155           's': item.size,
1156           'i': int(item.high_priority),
1157         } for item in items
1158     ]
1159
1160     query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1161         self._base_url,
1162         self._namespace,
1163         urllib.quote(self._server_capabilities['access_token']))
1164     response_body = net.url_read(
1165         url=query_url,
1166         data=json.dumps(body, separators=(',', ':')),
1167         content_type='application/json',
1168         method='POST')
1169     if response_body is None:
1170       raise MappingError('Failed to execute /pre-upload query')
1171
1172     # Response body is a list of push_urls (or null if file is already present).
1173     try:
1174       response = json.loads(response_body)
1175       if not isinstance(response, list):
1176         raise ValueError('Expecting response with json-encoded list')
1177       if len(response) != len(items):
1178         raise ValueError(
1179             'Incorrect number of items in the list, expected %d, '
1180             'but got %d' % (len(items), len(response)))
1181     except ValueError as err:
1182       raise MappingError(
1183           'Invalid response from server: %s, body is %s' % (err, response_body))
1184
1185     # Pick Items that are missing, attach _PushState to them.
1186     missing_items = {}
1187     for i, push_urls in enumerate(response):
1188       if push_urls:
1189         assert len(push_urls) == 2, str(push_urls)
1190         missing_items[items[i]] = _IsolateServerPushState(
1191             push_urls[0], push_urls[1])
1192     logging.info('Queried %d files, %d cache hit',
1193         len(items), len(items) - len(missing_items))
1194     return missing_items
1195
1196
1197 class FileSystem(StorageApi):
1198   """StorageApi implementation that fetches data from the file system.
1199
1200   The common use case is a NFS/CIFS file server that is mounted locally that is
1201   used to fetch the file on a local partition.
1202   """
1203
1204   # Used for push_state instead of None. That way caller is forced to
1205   # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1206   _DUMMY_PUSH_STATE = object()
1207
1208   def __init__(self, base_path, namespace):
1209     super(FileSystem, self).__init__()
1210     self._base_path = base_path
1211     self._namespace = namespace
1212
1213   @property
1214   def location(self):
1215     return self._base_path
1216
1217   @property
1218   def namespace(self):
1219     return self._namespace
1220
1221   def get_fetch_url(self, digest):
1222     return None
1223
1224   def fetch(self, digest, offset=0):
1225     assert isinstance(digest, basestring)
1226     return file_read(os.path.join(self._base_path, digest), offset=offset)
1227
1228   def push(self, item, push_state, content=None):
1229     assert isinstance(item, Item)
1230     assert item.digest is not None
1231     assert item.size is not None
1232     assert push_state is self._DUMMY_PUSH_STATE
1233     content = item.content() if content is None else content
1234     if isinstance(content, basestring):
1235       assert not isinstance(content, unicode), 'Unicode string is not allowed'
1236       content = [content]
1237     file_write(os.path.join(self._base_path, item.digest), content)
1238
1239   def contains(self, items):
1240     assert all(i.digest is not None and i.size is not None for i in items)
1241     return dict(
1242         (item, self._DUMMY_PUSH_STATE) for item in items
1243         if not os.path.exists(os.path.join(self._base_path, item.digest))
1244     )
1245
1246
1247 class LocalCache(object):
1248   """Local cache that stores objects fetched via Storage.
1249
1250   It can be accessed concurrently from multiple threads, so it should protect
1251   its internal state with some lock.
1252   """
1253   cache_dir = None
1254
1255   def __enter__(self):
1256     """Context manager interface."""
1257     return self
1258
1259   def __exit__(self, _exc_type, _exec_value, _traceback):
1260     """Context manager interface."""
1261     return False
1262
1263   def cached_set(self):
1264     """Returns a set of all cached digests (always a new object)."""
1265     raise NotImplementedError()
1266
1267   def touch(self, digest, size):
1268     """Ensures item is not corrupted and updates its LRU position.
1269
1270     Arguments:
1271       digest: hash digest of item to check.
1272       size: expected size of this item.
1273
1274     Returns:
1275       True if item is in cache and not corrupted.
1276     """
1277     raise NotImplementedError()
1278
1279   def evict(self, digest):
1280     """Removes item from cache if it's there."""
1281     raise NotImplementedError()
1282
1283   def read(self, digest):
1284     """Returns contents of the cached item as a single str."""
1285     raise NotImplementedError()
1286
1287   def write(self, digest, content):
1288     """Reads data from |content| generator and stores it in cache."""
1289     raise NotImplementedError()
1290
1291   def hardlink(self, digest, dest, file_mode):
1292     """Ensures file at |dest| has same content as cached |digest|.
1293
1294     If file_mode is provided, it is used to set the executable bit if
1295     applicable.
1296     """
1297     raise NotImplementedError()
1298
1299
1300 class MemoryCache(LocalCache):
1301   """LocalCache implementation that stores everything in memory."""
1302
1303   def __init__(self, file_mode_mask=0500):
1304     """Args:
1305       file_mode_mask: bit mask to AND file mode with. Default value will make
1306           all mapped files to be read only.
1307     """
1308     super(MemoryCache, self).__init__()
1309     self._file_mode_mask = file_mode_mask
1310     # Let's not assume dict is thread safe.
1311     self._lock = threading.Lock()
1312     self._contents = {}
1313
1314   def cached_set(self):
1315     with self._lock:
1316       return set(self._contents)
1317
1318   def touch(self, digest, size):
1319     with self._lock:
1320       return digest in self._contents
1321
1322   def evict(self, digest):
1323     with self._lock:
1324       self._contents.pop(digest, None)
1325
1326   def read(self, digest):
1327     with self._lock:
1328       return self._contents[digest]
1329
1330   def write(self, digest, content):
1331     # Assemble whole stream before taking the lock.
1332     data = ''.join(content)
1333     with self._lock:
1334       self._contents[digest] = data
1335
1336   def hardlink(self, digest, dest, file_mode):
1337     """Since data is kept in memory, there is no filenode to hardlink."""
1338     file_write(dest, [self.read(digest)])
1339     if file_mode is not None:
1340       os.chmod(dest, file_mode & self._file_mode_mask)
1341
1342
1343 def get_hash_algo(_namespace):
1344   """Return hash algorithm class to use when uploading to given |namespace|."""
1345   # TODO(vadimsh): Implement this at some point.
1346   return hashlib.sha1
1347
1348
1349 def is_namespace_with_compression(namespace):
1350   """Returns True if given |namespace| stores compressed objects."""
1351   return namespace.endswith(('-gzip', '-deflate'))
1352
1353
1354 def get_storage_api(file_or_url, namespace):
1355   """Returns an object that implements low-level StorageApi interface.
1356
1357   It is used by Storage to work with single isolate |namespace|. It should
1358   rarely be used directly by clients, see 'get_storage' for
1359   a better alternative.
1360
1361   Arguments:
1362     file_or_url: a file path to use file system based storage, or URL of isolate
1363         service to use shared cloud based storage.
1364     namespace: isolate namespace to operate in, also defines hashing and
1365         compression scheme used, i.e. namespace names that end with '-gzip'
1366         store compressed data.
1367
1368   Returns:
1369     Instance of StorageApi subclass.
1370   """
1371   if file_path.is_url(file_or_url):
1372     return IsolateServer(file_or_url, namespace)
1373   else:
1374     return FileSystem(file_or_url, namespace)
1375
1376
1377 def get_storage(file_or_url, namespace):
1378   """Returns Storage class that can upload and download from |namespace|.
1379
1380   Arguments:
1381     file_or_url: a file path to use file system based storage, or URL of isolate
1382         service to use shared cloud based storage.
1383     namespace: isolate namespace to operate in, also defines hashing and
1384         compression scheme used, i.e. namespace names that end with '-gzip'
1385         store compressed data.
1386
1387   Returns:
1388     Instance of Storage.
1389   """
1390   return Storage(get_storage_api(file_or_url, namespace))
1391
1392
1393 def expand_symlinks(indir, relfile):
1394   """Follows symlinks in |relfile|, but treating symlinks that point outside the
1395   build tree as if they were ordinary directories/files. Returns the final
1396   symlink-free target and a list of paths to symlinks encountered in the
1397   process.
1398
1399   The rule about symlinks outside the build tree is for the benefit of the
1400   Chromium OS ebuild, which symlinks the output directory to an unrelated path
1401   in the chroot.
1402
1403   Fails when a directory loop is detected, although in theory we could support
1404   that case.
1405   """
1406   is_directory = relfile.endswith(os.path.sep)
1407   done = indir
1408   todo = relfile.strip(os.path.sep)
1409   symlinks = []
1410
1411   while todo:
1412     pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1413         done, todo)
1414     if not symlink:
1415       todo = file_path.fix_native_path_case(done, todo)
1416       done = os.path.join(done, todo)
1417       break
1418     symlink_path = os.path.join(done, pre_symlink, symlink)
1419     post_symlink = post_symlink.lstrip(os.path.sep)
1420     # readlink doesn't exist on Windows.
1421     # pylint: disable=E1101
1422     target = os.path.normpath(os.path.join(done, pre_symlink))
1423     symlink_target = os.readlink(symlink_path)
1424     if os.path.isabs(symlink_target):
1425       # Absolute path are considered a normal directories. The use case is
1426       # generally someone who puts the output directory on a separate drive.
1427       target = symlink_target
1428     else:
1429       # The symlink itself could be using the wrong path case.
1430       target = file_path.fix_native_path_case(target, symlink_target)
1431
1432     if not os.path.exists(target):
1433       raise MappingError(
1434           'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1435     target = file_path.get_native_path_case(target)
1436     if not file_path.path_starts_with(indir, target):
1437       done = symlink_path
1438       todo = post_symlink
1439       continue
1440     if file_path.path_starts_with(target, symlink_path):
1441       raise MappingError(
1442           'Can\'t map recursive symlink reference %s -> %s' %
1443           (symlink_path, target))
1444     logging.info('Found symlink: %s -> %s', symlink_path, target)
1445     symlinks.append(os.path.relpath(symlink_path, indir))
1446     # Treat the common prefix of the old and new paths as done, and start
1447     # scanning again.
1448     target = target.split(os.path.sep)
1449     symlink_path = symlink_path.split(os.path.sep)
1450     prefix_length = 0
1451     for target_piece, symlink_path_piece in zip(target, symlink_path):
1452       if target_piece == symlink_path_piece:
1453         prefix_length += 1
1454       else:
1455         break
1456     done = os.path.sep.join(target[:prefix_length])
1457     todo = os.path.join(
1458         os.path.sep.join(target[prefix_length:]), post_symlink)
1459
1460   relfile = os.path.relpath(done, indir)
1461   relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1462   return relfile, symlinks
1463
1464
1465 def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1466   """Expands a single input. It can result in multiple outputs.
1467
1468   This function is recursive when relfile is a directory.
1469
1470   Note: this code doesn't properly handle recursive symlink like one created
1471   with:
1472     ln -s .. foo
1473   """
1474   if os.path.isabs(relfile):
1475     raise MappingError('Can\'t map absolute path %s' % relfile)
1476
1477   infile = file_path.normpath(os.path.join(indir, relfile))
1478   if not infile.startswith(indir):
1479     raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1480
1481   filepath = os.path.join(indir, relfile)
1482   native_filepath = file_path.get_native_path_case(filepath)
1483   if filepath != native_filepath:
1484     # Special case './'.
1485     if filepath != native_filepath + '.' + os.path.sep:
1486       # While it'd be nice to enforce path casing on Windows, it's impractical.
1487       # Also give up enforcing strict path case on OSX. Really, it's that sad.
1488       # The case where it happens is very specific and hard to reproduce:
1489       # get_native_path_case(
1490       #    u'Foo.framework/Versions/A/Resources/Something.nib') will return
1491       # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1492       #
1493       # Note that this is really something deep in OSX because running
1494       # ls Foo.framework/Versions/A
1495       # will print out 'Resources', while file_path.get_native_path_case()
1496       # returns a lower case 'r'.
1497       #
1498       # So *something* is happening under the hood resulting in the command 'ls'
1499       # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree.  We
1500       # have no idea why.
1501       if sys.platform not in ('darwin', 'win32'):
1502         raise MappingError(
1503             'File path doesn\'t equal native file path\n%s != %s' %
1504             (filepath, native_filepath))
1505
1506   symlinks = []
1507   if follow_symlinks:
1508     relfile, symlinks = expand_symlinks(indir, relfile)
1509
1510   if relfile.endswith(os.path.sep):
1511     if not os.path.isdir(infile):
1512       raise MappingError(
1513           '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1514
1515     # Special case './'.
1516     if relfile.startswith('.' + os.path.sep):
1517       relfile = relfile[2:]
1518     outfiles = symlinks
1519     try:
1520       for filename in os.listdir(infile):
1521         inner_relfile = os.path.join(relfile, filename)
1522         if blacklist and blacklist(inner_relfile):
1523           continue
1524         if os.path.isdir(os.path.join(indir, inner_relfile)):
1525           inner_relfile += os.path.sep
1526         outfiles.extend(
1527             expand_directory_and_symlink(indir, inner_relfile, blacklist,
1528                                          follow_symlinks))
1529       return outfiles
1530     except OSError as e:
1531       raise MappingError(
1532           'Unable to iterate over directory %s.\n%s' % (infile, e))
1533   else:
1534     # Always add individual files even if they were blacklisted.
1535     if os.path.isdir(infile):
1536       raise MappingError(
1537           'Input directory %s must have a trailing slash' % infile)
1538
1539     if not os.path.isfile(infile):
1540       raise MappingError('Input file %s doesn\'t exist' % infile)
1541
1542     return symlinks + [relfile]
1543
1544
1545 def process_input(filepath, prevdict, read_only, algo):
1546   """Processes an input file, a dependency, and return meta data about it.
1547
1548   Behaviors:
1549   - Retrieves the file mode, file size, file timestamp, file link
1550     destination if it is a file link and calcultate the SHA-1 of the file's
1551     content if the path points to a file and not a symlink.
1552
1553   Arguments:
1554     filepath: File to act on.
1555     prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1556               to skip recalculating the hash. Optional.
1557     read_only: If 1 or 2, the file mode is manipulated. In practice, only save
1558                one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1559                windows, mode is not set since all files are 'executable' by
1560                default.
1561     algo:      Hashing algorithm used.
1562
1563   Returns:
1564     The necessary data to create a entry in the 'files' section of an .isolated
1565     file.
1566   """
1567   out = {}
1568   # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1569   # if prevdict.get('T') == True:
1570   #   # The file's content is ignored. Skip the time and hard code mode.
1571   #   out['s'] = 0
1572   #   out['h'] = algo().hexdigest()
1573   #   out['T'] = True
1574   #   return out
1575
1576   # Always check the file stat and check if it is a link. The timestamp is used
1577   # to know if the file's content/symlink destination should be looked into.
1578   # E.g. only reuse from prevdict if the timestamp hasn't changed.
1579   # There is the risk of the file's timestamp being reset to its last value
1580   # manually while its content changed. We don't protect against that use case.
1581   try:
1582     filestats = os.lstat(filepath)
1583   except OSError:
1584     # The file is not present.
1585     raise MappingError('%s is missing' % filepath)
1586   is_link = stat.S_ISLNK(filestats.st_mode)
1587
1588   if sys.platform != 'win32':
1589     # Ignore file mode on Windows since it's not really useful there.
1590     filemode = stat.S_IMODE(filestats.st_mode)
1591     # Remove write access for group and all access to 'others'.
1592     filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1593     if read_only:
1594       filemode &= ~stat.S_IWUSR
1595     if filemode & stat.S_IXUSR:
1596       filemode |= stat.S_IXGRP
1597     else:
1598       filemode &= ~stat.S_IXGRP
1599     if not is_link:
1600       out['m'] = filemode
1601
1602   # Used to skip recalculating the hash or link destination. Use the most recent
1603   # update time.
1604   # TODO(maruel): Save it in the .state file instead of .isolated so the
1605   # .isolated file is deterministic.
1606   out['t'] = int(round(filestats.st_mtime))
1607
1608   if not is_link:
1609     out['s'] = filestats.st_size
1610     # If the timestamp wasn't updated and the file size is still the same, carry
1611     # on the sha-1.
1612     if (prevdict.get('t') == out['t'] and
1613         prevdict.get('s') == out['s']):
1614       # Reuse the previous hash if available.
1615       out['h'] = prevdict.get('h')
1616     if not out.get('h'):
1617       out['h'] = hash_file(filepath, algo)
1618   else:
1619     # If the timestamp wasn't updated, carry on the link destination.
1620     if prevdict.get('t') == out['t']:
1621       # Reuse the previous link destination if available.
1622       out['l'] = prevdict.get('l')
1623     if out.get('l') is None:
1624       # The link could be in an incorrect path case. In practice, this only
1625       # happen on OSX on case insensitive HFS.
1626       # TODO(maruel): It'd be better if it was only done once, in
1627       # expand_directory_and_symlink(), so it would not be necessary to do again
1628       # here.
1629       symlink_value = os.readlink(filepath)  # pylint: disable=E1101
1630       filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1631       native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1632       out['l'] = os.path.relpath(native_dest, filedir)
1633   return out
1634
1635
1636 def save_isolated(isolated, data):
1637   """Writes one or multiple .isolated files.
1638
1639   Note: this reference implementation does not create child .isolated file so it
1640   always returns an empty list.
1641
1642   Returns the list of child isolated files that are included by |isolated|.
1643   """
1644   # Make sure the data is valid .isolated data by 'reloading' it.
1645   algo = SUPPORTED_ALGOS[data['algo']]
1646   load_isolated(json.dumps(data), algo)
1647   tools.write_json(isolated, data, True)
1648   return []
1649
1650
1651 def upload_tree(base_url, indir, infiles, namespace):
1652   """Uploads the given tree to the given url.
1653
1654   Arguments:
1655     base_url:  The base url, it is assume that |base_url|/has/ can be used to
1656                query if an element was already uploaded, and |base_url|/store/
1657                can be used to upload a new element.
1658     indir:     Root directory the infiles are based in.
1659     infiles:   dict of files to upload from |indir| to |base_url|.
1660     namespace: The namespace to use on the server.
1661   """
1662   logging.info('upload_tree(indir=%s, files=%d)', indir, len(infiles))
1663
1664   # Convert |indir| + |infiles| into a list of FileItem objects.
1665   # Filter out symlinks, since they are not represented by items on isolate
1666   # server side.
1667   items = [
1668       FileItem(
1669           path=os.path.join(indir, filepath),
1670           digest=metadata['h'],
1671           size=metadata['s'],
1672           high_priority=metadata.get('priority') == '0')
1673       for filepath, metadata in infiles.iteritems()
1674       if 'l' not in metadata
1675   ]
1676
1677   with get_storage(base_url, namespace) as storage:
1678     storage.upload_items(items)
1679   return 0
1680
1681
1682 def load_isolated(content, algo):
1683   """Verifies the .isolated file is valid and loads this object with the json
1684   data.
1685
1686   Arguments:
1687   - content: raw serialized content to load.
1688   - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1689           algorithm used on the Isolate Server.
1690   """
1691   try:
1692     data = json.loads(content)
1693   except ValueError:
1694     raise ConfigError('Failed to parse: %s...' % content[:100])
1695
1696   if not isinstance(data, dict):
1697     raise ConfigError('Expected dict, got %r' % data)
1698
1699   # Check 'version' first, since it could modify the parsing after.
1700   value = data.get('version', '1.0')
1701   if not isinstance(value, basestring):
1702     raise ConfigError('Expected string, got %r' % value)
1703   try:
1704     version = tuple(map(int, value.split('.')))
1705   except ValueError:
1706     raise ConfigError('Expected valid version, got %r' % value)
1707
1708   expected_version = tuple(map(int, ISOLATED_FILE_VERSION.split('.')))
1709   # Major version must match.
1710   if version[0] != expected_version[0]:
1711     raise ConfigError(
1712         'Expected compatible \'%s\' version, got %r' %
1713         (ISOLATED_FILE_VERSION, value))
1714
1715   if algo is None:
1716     # TODO(maruel): Remove the default around Jan 2014.
1717     # Default the algorithm used in the .isolated file itself, falls back to
1718     # 'sha-1' if unspecified.
1719     algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1720
1721   for key, value in data.iteritems():
1722     if key == 'algo':
1723       if not isinstance(value, basestring):
1724         raise ConfigError('Expected string, got %r' % value)
1725       if value not in SUPPORTED_ALGOS:
1726         raise ConfigError(
1727             'Expected one of \'%s\', got %r' %
1728             (', '.join(sorted(SUPPORTED_ALGOS)), value))
1729       if value != SUPPORTED_ALGOS_REVERSE[algo]:
1730         raise ConfigError(
1731             'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1732
1733     elif key == 'command':
1734       if not isinstance(value, list):
1735         raise ConfigError('Expected list, got %r' % value)
1736       if not value:
1737         raise ConfigError('Expected non-empty command')
1738       for subvalue in value:
1739         if not isinstance(subvalue, basestring):
1740           raise ConfigError('Expected string, got %r' % subvalue)
1741
1742     elif key == 'files':
1743       if not isinstance(value, dict):
1744         raise ConfigError('Expected dict, got %r' % value)
1745       for subkey, subvalue in value.iteritems():
1746         if not isinstance(subkey, basestring):
1747           raise ConfigError('Expected string, got %r' % subkey)
1748         if not isinstance(subvalue, dict):
1749           raise ConfigError('Expected dict, got %r' % subvalue)
1750         for subsubkey, subsubvalue in subvalue.iteritems():
1751           if subsubkey == 'l':
1752             if not isinstance(subsubvalue, basestring):
1753               raise ConfigError('Expected string, got %r' % subsubvalue)
1754           elif subsubkey == 'm':
1755             if not isinstance(subsubvalue, int):
1756               raise ConfigError('Expected int, got %r' % subsubvalue)
1757           elif subsubkey == 'h':
1758             if not is_valid_hash(subsubvalue, algo):
1759               raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1760           elif subsubkey == 's':
1761             if not isinstance(subsubvalue, (int, long)):
1762               raise ConfigError('Expected int or long, got %r' % subsubvalue)
1763           else:
1764             raise ConfigError('Unknown subsubkey %s' % subsubkey)
1765         if bool('h' in subvalue) == bool('l' in subvalue):
1766           raise ConfigError(
1767               'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1768               subvalue)
1769         if bool('h' in subvalue) != bool('s' in subvalue):
1770           raise ConfigError(
1771               'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1772               subvalue)
1773         if bool('s' in subvalue) == bool('l' in subvalue):
1774           raise ConfigError(
1775               'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1776               subvalue)
1777         if bool('l' in subvalue) and bool('m' in subvalue):
1778           raise ConfigError(
1779               'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
1780               subvalue)
1781
1782     elif key == 'includes':
1783       if not isinstance(value, list):
1784         raise ConfigError('Expected list, got %r' % value)
1785       if not value:
1786         raise ConfigError('Expected non-empty includes list')
1787       for subvalue in value:
1788         if not is_valid_hash(subvalue, algo):
1789           raise ConfigError('Expected sha-1, got %r' % subvalue)
1790
1791     elif key == 'os':
1792       if version >= (1, 4):
1793         raise ConfigError('Key \'os\' is not allowed starting version 1.4')
1794
1795     elif key == 'read_only':
1796       if not value in (0, 1, 2):
1797         raise ConfigError('Expected 0, 1 or 2, got %r' % value)
1798
1799     elif key == 'relative_cwd':
1800       if not isinstance(value, basestring):
1801         raise ConfigError('Expected string, got %r' % value)
1802
1803     elif key == 'version':
1804       # Already checked above.
1805       pass
1806
1807     else:
1808       raise ConfigError('Unknown key %r' % key)
1809
1810   # Automatically fix os.path.sep if necessary. While .isolated files are always
1811   # in the the native path format, someone could want to download an .isolated
1812   # tree from another OS.
1813   wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1814   if 'files' in data:
1815     data['files'] = dict(
1816         (k.replace(wrong_path_sep, os.path.sep), v)
1817         for k, v in data['files'].iteritems())
1818     for v in data['files'].itervalues():
1819       if 'l' in v:
1820         v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1821   if 'relative_cwd' in data:
1822     data['relative_cwd'] = data['relative_cwd'].replace(
1823         wrong_path_sep, os.path.sep)
1824   return data
1825
1826
1827 class IsolatedFile(object):
1828   """Represents a single parsed .isolated file."""
1829   def __init__(self, obj_hash, algo):
1830     """|obj_hash| is really the sha-1 of the file."""
1831     logging.debug('IsolatedFile(%s)' % obj_hash)
1832     self.obj_hash = obj_hash
1833     self.algo = algo
1834     # Set once all the left-side of the tree is parsed. 'Tree' here means the
1835     # .isolate and all the .isolated files recursively included by it with
1836     # 'includes' key. The order of each sha-1 in 'includes', each representing a
1837     # .isolated file in the hash table, is important, as the later ones are not
1838     # processed until the firsts are retrieved and read.
1839     self.can_fetch = False
1840
1841     # Raw data.
1842     self.data = {}
1843     # A IsolatedFile instance, one per object in self.includes.
1844     self.children = []
1845
1846     # Set once the .isolated file is loaded.
1847     self._is_parsed = False
1848     # Set once the files are fetched.
1849     self.files_fetched = False
1850
1851   def load(self, content):
1852     """Verifies the .isolated file is valid and loads this object with the json
1853     data.
1854     """
1855     logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1856     assert not self._is_parsed
1857     self.data = load_isolated(content, self.algo)
1858     self.children = [
1859         IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1860     ]
1861     self._is_parsed = True
1862
1863   def fetch_files(self, fetch_queue, files):
1864     """Adds files in this .isolated file not present in |files| dictionary.
1865
1866     Preemptively request files.
1867
1868     Note that |files| is modified by this function.
1869     """
1870     assert self.can_fetch
1871     if not self._is_parsed or self.files_fetched:
1872       return
1873     logging.debug('fetch_files(%s)' % self.obj_hash)
1874     for filepath, properties in self.data.get('files', {}).iteritems():
1875       # Root isolated has priority on the files being mapped. In particular,
1876       # overriden files must not be fetched.
1877       if filepath not in files:
1878         files[filepath] = properties
1879         if 'h' in properties:
1880           # Preemptively request files.
1881           logging.debug('fetching %s' % filepath)
1882           fetch_queue.add(properties['h'], properties['s'], WorkerPool.MED)
1883     self.files_fetched = True
1884
1885
1886 class Settings(object):
1887   """Results of a completely parsed .isolated file."""
1888   def __init__(self):
1889     self.command = []
1890     self.files = {}
1891     self.read_only = None
1892     self.relative_cwd = None
1893     # The main .isolated file, a IsolatedFile instance.
1894     self.root = None
1895
1896   def load(self, fetch_queue, root_isolated_hash, algo):
1897     """Loads the .isolated and all the included .isolated asynchronously.
1898
1899     It enables support for "included" .isolated files. They are processed in
1900     strict order but fetched asynchronously from the cache. This is important so
1901     that a file in an included .isolated file that is overridden by an embedding
1902     .isolated file is not fetched needlessly. The includes are fetched in one
1903     pass and the files are fetched as soon as all the ones on the left-side
1904     of the tree were fetched.
1905
1906     The prioritization is very important here for nested .isolated files.
1907     'includes' have the highest priority and the algorithm is optimized for both
1908     deep and wide trees. A deep one is a long link of .isolated files referenced
1909     one at a time by one item in 'includes'. A wide one has a large number of
1910     'includes' in a single .isolated file. 'left' is defined as an included
1911     .isolated file earlier in the 'includes' list. So the order of the elements
1912     in 'includes' is important.
1913     """
1914     self.root = IsolatedFile(root_isolated_hash, algo)
1915
1916     # Isolated files being retrieved now: hash -> IsolatedFile instance.
1917     pending = {}
1918     # Set of hashes of already retrieved items to refuse recursive includes.
1919     seen = set()
1920
1921     def retrieve(isolated_file):
1922       h = isolated_file.obj_hash
1923       if h in seen:
1924         raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1925       assert h not in pending
1926       seen.add(h)
1927       pending[h] = isolated_file
1928       fetch_queue.add(h, priority=WorkerPool.HIGH)
1929
1930     retrieve(self.root)
1931
1932     while pending:
1933       item_hash = fetch_queue.wait(pending)
1934       item = pending.pop(item_hash)
1935       item.load(fetch_queue.cache.read(item_hash))
1936       if item_hash == root_isolated_hash:
1937         # It's the root item.
1938         item.can_fetch = True
1939
1940       for new_child in item.children:
1941         retrieve(new_child)
1942
1943       # Traverse the whole tree to see if files can now be fetched.
1944       self._traverse_tree(fetch_queue, self.root)
1945
1946     def check(n):
1947       return all(check(x) for x in n.children) and n.files_fetched
1948     assert check(self.root)
1949
1950     self.relative_cwd = self.relative_cwd or ''
1951
1952   def _traverse_tree(self, fetch_queue, node):
1953     if node.can_fetch:
1954       if not node.files_fetched:
1955         self._update_self(fetch_queue, node)
1956       will_break = False
1957       for i in node.children:
1958         if not i.can_fetch:
1959           if will_break:
1960             break
1961           # Automatically mark the first one as fetcheable.
1962           i.can_fetch = True
1963           will_break = True
1964         self._traverse_tree(fetch_queue, i)
1965
1966   def _update_self(self, fetch_queue, node):
1967     node.fetch_files(fetch_queue, self.files)
1968     # Grabs properties.
1969     if not self.command and node.data.get('command'):
1970       # Ensure paths are correctly separated on windows.
1971       self.command = node.data['command']
1972       if self.command:
1973         self.command[0] = self.command[0].replace('/', os.path.sep)
1974         self.command = tools.fix_python_path(self.command)
1975     if self.read_only is None and node.data.get('read_only') is not None:
1976       self.read_only = node.data['read_only']
1977     if (self.relative_cwd is None and
1978         node.data.get('relative_cwd') is not None):
1979       self.relative_cwd = node.data['relative_cwd']
1980
1981
1982 def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
1983   """Aggressively downloads the .isolated file(s), then download all the files.
1984
1985   Arguments:
1986     isolated_hash: hash of the root *.isolated file.
1987     storage: Storage class that communicates with isolate storage.
1988     cache: LocalCache class that knows how to store and map files locally.
1989     outdir: Output directory to map file tree to.
1990     require_command: Ensure *.isolated specifies a command to run.
1991
1992   Returns:
1993     Settings object that holds details about loaded *.isolated file.
1994   """
1995   logging.debug(
1996       'fetch_isolated(%s, %s, %s, %s, %s)',
1997       isolated_hash, storage, cache, outdir, require_command)
1998   # Hash algorithm to use, defined by namespace |storage| is using.
1999   algo = storage.hash_algo
2000   with cache:
2001     fetch_queue = FetchQueue(storage, cache)
2002     settings = Settings()
2003
2004     with tools.Profiler('GetIsolateds'):
2005       # Optionally support local files by manually adding them to cache.
2006       if not is_valid_hash(isolated_hash, algo):
2007         logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
2008         try:
2009           isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
2010         except IOError:
2011           raise MappingError(
2012               '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2013               'valid hash?' % isolated_hash)
2014
2015       # Load all *.isolated and start loading rest of the files.
2016       settings.load(fetch_queue, isolated_hash, algo)
2017       if require_command and not settings.command:
2018         # TODO(vadimsh): All fetch operations are already enqueue and there's no
2019         # easy way to cancel them.
2020         raise ConfigError('No command to run')
2021
2022     with tools.Profiler('GetRest'):
2023       # Create file system hierarchy.
2024       if not os.path.isdir(outdir):
2025         os.makedirs(outdir)
2026       create_directories(outdir, settings.files)
2027       create_symlinks(outdir, settings.files.iteritems())
2028
2029       # Ensure working directory exists.
2030       cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
2031       if not os.path.isdir(cwd):
2032         os.makedirs(cwd)
2033
2034       # Multimap: digest -> list of pairs (path, props).
2035       remaining = {}
2036       for filepath, props in settings.files.iteritems():
2037         if 'h' in props:
2038           remaining.setdefault(props['h'], []).append((filepath, props))
2039
2040       # Now block on the remaining files to be downloaded and mapped.
2041       logging.info('Retrieving remaining files (%d of them)...',
2042           fetch_queue.pending_count)
2043       last_update = time.time()
2044       with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
2045         while remaining:
2046           detector.ping()
2047
2048           # Wait for any item to finish fetching to cache.
2049           digest = fetch_queue.wait(remaining)
2050
2051           # Link corresponding files to a fetched item in cache.
2052           for filepath, props in remaining.pop(digest):
2053             cache.hardlink(
2054                 digest, os.path.join(outdir, filepath), props.get('m'))
2055
2056           # Report progress.
2057           duration = time.time() - last_update
2058           if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2059             msg = '%d files remaining...' % len(remaining)
2060             print msg
2061             logging.info(msg)
2062             last_update = time.time()
2063
2064   # Cache could evict some items we just tried to fetch, it's a fatal error.
2065   if not fetch_queue.verify_all_cached():
2066     raise MappingError('Cache is too small to hold all requested files')
2067   return settings
2068
2069
2070 def directory_to_metadata(root, algo, blacklist):
2071   """Returns the FileItem list and .isolated metadata for a directory."""
2072   root = file_path.get_native_path_case(root)
2073   paths = expand_directory_and_symlink(
2074       root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
2075   metadata = dict(
2076       (relpath, process_input(os.path.join(root, relpath), {}, False, algo))
2077       for relpath in paths
2078   )
2079   for v in metadata.itervalues():
2080     v.pop('t')
2081   items = [
2082       FileItem(
2083           path=os.path.join(root, relpath),
2084           digest=meta['h'],
2085           size=meta['s'],
2086           high_priority=relpath.endswith('.isolated'))
2087       for relpath, meta in metadata.iteritems() if 'h' in meta
2088   ]
2089   return items, metadata
2090
2091
2092 def archive_files_to_storage(storage, files, blacklist):
2093   """Stores every entries and returns the relevant data.
2094
2095   Arguments:
2096     storage: a Storage object that communicates with the remote object store.
2097     files: list of file paths to upload. If a directory is specified, a
2098            .isolated file is created and its hash is returned.
2099     blacklist: function that returns True if a file should be omitted.
2100   """
2101   assert all(isinstance(i, unicode) for i in files), files
2102   if len(files) != len(set(map(os.path.abspath, files))):
2103     raise Error('Duplicate entries found.')
2104
2105   results = []
2106   # The temporary directory is only created as needed.
2107   tempdir = None
2108   try:
2109     # TODO(maruel): Yield the files to a worker thread.
2110     items_to_upload = []
2111     for f in files:
2112       try:
2113         filepath = os.path.abspath(f)
2114         if os.path.isdir(filepath):
2115           # Uploading a whole directory.
2116           items, metadata = directory_to_metadata(
2117               filepath, storage.hash_algo, blacklist)
2118
2119           # Create the .isolated file.
2120           if not tempdir:
2121             tempdir = tempfile.mkdtemp(prefix='isolateserver')
2122           handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
2123           os.close(handle)
2124           data = {
2125               'algo': SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
2126               'files': metadata,
2127               'version': ISOLATED_FILE_VERSION,
2128           }
2129           save_isolated(isolated, data)
2130           h = hash_file(isolated, storage.hash_algo)
2131           items_to_upload.extend(items)
2132           items_to_upload.append(
2133               FileItem(
2134                   path=isolated,
2135                   digest=h,
2136                   size=os.stat(isolated).st_size,
2137                   high_priority=True))
2138           results.append((h, f))
2139
2140         elif os.path.isfile(filepath):
2141           h = hash_file(filepath, storage.hash_algo)
2142           items_to_upload.append(
2143             FileItem(
2144                 path=filepath,
2145                 digest=h,
2146                 size=os.stat(filepath).st_size,
2147                 high_priority=f.endswith('.isolated')))
2148           results.append((h, f))
2149         else:
2150           raise Error('%s is neither a file or directory.' % f)
2151       except OSError:
2152         raise Error('Failed to process %s.' % f)
2153     # Technically we would care about which files were uploaded but we don't
2154     # much in practice.
2155     _uploaded_files = storage.upload_items(items_to_upload)
2156     return results
2157   finally:
2158     if tempdir:
2159       shutil.rmtree(tempdir)
2160
2161
2162 def archive(out, namespace, files, blacklist):
2163   if files == ['-']:
2164     files = sys.stdin.readlines()
2165
2166   if not files:
2167     raise Error('Nothing to upload')
2168
2169   files = [f.decode('utf-8') for f in files]
2170   blacklist = tools.gen_blacklist(blacklist)
2171   with get_storage(out, namespace) as storage:
2172     results = archive_files_to_storage(storage, files, blacklist)
2173   print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2174
2175
2176 @subcommand.usage('<file1..fileN> or - to read from stdin')
2177 def CMDarchive(parser, args):
2178   """Archives data to the server.
2179
2180   If a directory is specified, a .isolated file is created the whole directory
2181   is uploaded. Then this .isolated file can be included in another one to run
2182   commands.
2183
2184   The commands output each file that was processed with its content hash. For
2185   directories, the .isolated generated for the directory is listed as the
2186   directory entry itself.
2187   """
2188   add_isolate_server_options(parser, False)
2189   parser.add_option(
2190       '--blacklist',
2191       action='append', default=list(DEFAULT_BLACKLIST),
2192       help='List of regexp to use as blacklist filter when uploading '
2193            'directories')
2194   options, files = parser.parse_args(args)
2195   process_isolate_server_options(parser, options)
2196   if file_path.is_url(options.isolate_server):
2197     auth.ensure_logged_in(options.isolate_server)
2198   try:
2199     archive(options.isolate_server, options.namespace, files, options.blacklist)
2200   except Error as e:
2201     parser.error(e.args[0])
2202   return 0
2203
2204
2205 def CMDdownload(parser, args):
2206   """Download data from the server.
2207
2208   It can either download individual files or a complete tree from a .isolated
2209   file.
2210   """
2211   add_isolate_server_options(parser, True)
2212   parser.add_option(
2213       '-i', '--isolated', metavar='HASH',
2214       help='hash of an isolated file, .isolated file content is discarded, use '
2215            '--file if you need it')
2216   parser.add_option(
2217       '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2218       help='hash and destination of a file, can be used multiple times')
2219   parser.add_option(
2220       '-t', '--target', metavar='DIR', default=os.getcwd(),
2221       help='destination directory')
2222   options, args = parser.parse_args(args)
2223   process_isolate_server_options(parser, options)
2224   if args:
2225     parser.error('Unsupported arguments: %s' % args)
2226   if bool(options.isolated) == bool(options.file):
2227     parser.error('Use one of --isolated or --file, and only one.')
2228
2229   options.target = os.path.abspath(options.target)
2230
2231   remote = options.isolate_server or options.indir
2232   if file_path.is_url(remote):
2233     auth.ensure_logged_in(remote)
2234
2235   with get_storage(remote, options.namespace) as storage:
2236     # Fetching individual files.
2237     if options.file:
2238       channel = threading_utils.TaskChannel()
2239       pending = {}
2240       for digest, dest in options.file:
2241         pending[digest] = dest
2242         storage.async_fetch(
2243             channel,
2244             WorkerPool.MED,
2245             digest,
2246             UNKNOWN_FILE_SIZE,
2247             functools.partial(file_write, os.path.join(options.target, dest)))
2248       while pending:
2249         fetched = channel.pull()
2250         dest = pending.pop(fetched)
2251         logging.info('%s: %s', fetched, dest)
2252
2253     # Fetching whole isolated tree.
2254     if options.isolated:
2255       settings = fetch_isolated(
2256           isolated_hash=options.isolated,
2257           storage=storage,
2258           cache=MemoryCache(),
2259           outdir=options.target,
2260           require_command=False)
2261       rel = os.path.join(options.target, settings.relative_cwd)
2262       print('To run this test please run from the directory %s:' %
2263             os.path.join(options.target, rel))
2264       print('  ' + ' '.join(settings.command))
2265
2266   return 0
2267
2268
2269 @subcommand.usage('<file1..fileN> or - to read from stdin')
2270 def CMDhashtable(parser, args):
2271   """Archives data to a hashtable on the file system.
2272
2273   If a directory is specified, a .isolated file is created the whole directory
2274   is uploaded. Then this .isolated file can be included in another one to run
2275   commands.
2276
2277   The commands output each file that was processed with its content hash. For
2278   directories, the .isolated generated for the directory is listed as the
2279   directory entry itself.
2280   """
2281   add_outdir_options(parser)
2282   parser.add_option(
2283       '--blacklist',
2284       action='append', default=list(DEFAULT_BLACKLIST),
2285       help='List of regexp to use as blacklist filter when uploading '
2286            'directories')
2287   options, files = parser.parse_args(args)
2288   process_outdir_options(parser, options, os.getcwd())
2289   try:
2290     # Do not compress files when archiving to the file system.
2291     archive(options.outdir, 'default', files, options.blacklist)
2292   except Error as e:
2293     parser.error(e.args[0])
2294   return 0
2295
2296
2297 def add_isolate_server_options(parser, add_indir):
2298   """Adds --isolate-server and --namespace options to parser.
2299
2300   Includes --indir if desired.
2301   """
2302   parser.add_option(
2303       '-I', '--isolate-server',
2304       metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
2305       help='URL of the Isolate Server to use. Defaults to the environment '
2306            'variable ISOLATE_SERVER if set. No need to specify https://, this '
2307            'is assumed.')
2308   parser.add_option(
2309       '--namespace', default='default-gzip',
2310       help='The namespace to use on the Isolate Server, default: %default')
2311   if add_indir:
2312     parser.add_option(
2313         '--indir', metavar='DIR',
2314         help='Directory used to store the hashtable instead of using an '
2315              'isolate server.')
2316
2317
2318 def process_isolate_server_options(parser, options):
2319   """Processes the --isolate-server and --indir options and aborts if neither is
2320   specified.
2321   """
2322   has_indir = hasattr(options, 'indir')
2323   if not options.isolate_server:
2324     if not has_indir:
2325       parser.error('--isolate-server is required.')
2326     elif not options.indir:
2327       parser.error('Use one of --indir or --isolate-server.')
2328   else:
2329     if has_indir and options.indir:
2330       parser.error('Use only one of --indir or --isolate-server.')
2331
2332   if options.isolate_server:
2333     parts = urlparse.urlparse(options.isolate_server, 'https')
2334     if parts.query:
2335       parser.error('--isolate-server doesn\'t support query parameter.')
2336     if parts.fragment:
2337       parser.error('--isolate-server doesn\'t support fragment in the url.')
2338     # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2339     # what is desired here.
2340     new = list(parts)
2341     if not new[1] and new[2]:
2342       new[1] = new[2].rstrip('/')
2343       new[2] = ''
2344     new[2] = new[2].rstrip('/')
2345     options.isolate_server = urlparse.urlunparse(new)
2346     on_error.report_on_exception_exit(options.isolate_server)
2347     return
2348
2349   if file_path.is_url(options.indir):
2350     parser.error('Can\'t use an URL for --indir.')
2351   options.indir = unicode(options.indir).replace('/', os.path.sep)
2352   options.indir = os.path.abspath(
2353       os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2354   if not os.path.isdir(options.indir):
2355     parser.error('Path given to --indir must exist.')
2356
2357
2358
2359 def add_outdir_options(parser):
2360   """Adds --outdir, which is orthogonal to --isolate-server.
2361
2362   Note: On upload, separate commands are used between 'archive' and 'hashtable'.
2363   On 'download', the same command can download from either an isolate server or
2364   a file system.
2365   """
2366   parser.add_option(
2367       '-o', '--outdir', metavar='DIR',
2368       help='Directory used to recreate the tree.')
2369
2370
2371 def process_outdir_options(parser, options, cwd):
2372   if not options.outdir:
2373     parser.error('--outdir is required.')
2374   if file_path.is_url(options.outdir):
2375     parser.error('Can\'t use an URL for --outdir.')
2376   options.outdir = unicode(options.outdir).replace('/', os.path.sep)
2377   # outdir doesn't need native path case since tracing is never done from there.
2378   options.outdir = os.path.abspath(
2379       os.path.normpath(os.path.join(cwd, options.outdir)))
2380   # In theory, we'd create the directory outdir right away. Defer doing it in
2381   # case there's errors in the command line.
2382
2383
2384 class OptionParserIsolateServer(tools.OptionParserWithLogging):
2385   def __init__(self, **kwargs):
2386     tools.OptionParserWithLogging.__init__(
2387         self,
2388         version=__version__,
2389         prog=os.path.basename(sys.modules[__name__].__file__),
2390         **kwargs)
2391     auth.add_auth_options(self)
2392
2393   def parse_args(self, *args, **kwargs):
2394     options, args = tools.OptionParserWithLogging.parse_args(
2395         self, *args, **kwargs)
2396     auth.process_auth_options(self, options)
2397     return options, args
2398
2399
2400 def main(args):
2401   dispatcher = subcommand.CommandDispatcher(__name__)
2402   return dispatcher.execute(OptionParserIsolateServer(), args)
2403
2404
2405 if __name__ == '__main__':
2406   fix_encoding.fix_encoding()
2407   tools.disable_buffering()
2408   colorama.init()
2409   sys.exit(main(sys.argv[1:]))