# Use of this source code is governed under the Apache License, Version 2.0 that
# can be found in the LICENSE file.
-"""Archives a set of files or directories to a server."""
+"""Archives a set of files or directories to an Isolate Server."""
__version__ = '0.3.4'
import functools
-import hashlib
-import json
import logging
import os
import re
import shutil
-import stat
import sys
import tempfile
import threading
from utils import tools
import auth
+import isolated_format
# Version of isolate protocol passed to the server in /handshake request.
ISOLATE_PROTOCOL_VERSION = '1.0'
-# Version stored and expected in .isolated files.
-ISOLATED_FILE_VERSION = '1.4'
+
+
+# The file size to be used when we don't know the correct file size,
+# generally used for .isolated files.
+UNKNOWN_FILE_SIZE = None
+
+
+# Maximum expected delay (in seconds) between successive file fetches or uploads
+# in Storage. If it takes longer than that, a deadlock might be happening
+# and all stack frames for all threads are dumped to log.
+DEADLOCK_TIMEOUT = 5 * 60
# The number of files to check the isolate server per /pre-upload query.
# uploading, which is especially an issue for large files. This value is
# optimized for the "few thousands files to look up with minimal number of large
# files missing" case.
-ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
+ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
# A list of already compressed extension types that should not receive any
# compression before being uploaded.
ALREADY_COMPRESSED_TYPES = [
- '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
- 'wav', 'zip'
+ '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
+ 'png', 'wav', 'zip',
]
-# The file size to be used when we don't know the correct file size,
-# generally used for .isolated files.
-UNKNOWN_FILE_SIZE = None
-
-
-# Chunk size to use when doing disk I/O.
-DISK_FILE_CHUNK = 1024 * 1024
-
# Chunk size to use when reading from network stream.
NET_IO_FILE_CHUNK = 16 * 1024
# response from the server within this timeout whole download will be aborted.
DOWNLOAD_READ_TIMEOUT = 60
-# Maximum expected delay (in seconds) between successive file fetches
-# in run_tha_test. If it takes longer than that, a deadlock might be happening
-# and all stack frames for all threads are dumped to log.
-DEADLOCK_TIMEOUT = 5 * 60
-
# The delay (in seconds) to wait between logging statements when retrieving
# the required files. This is intended to let the user (or buildbot) know that
DELAY_BETWEEN_UPDATES_IN_SECS = 30
-# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
-# specify the names here.
-SUPPORTED_ALGOS = {
- 'md5': hashlib.md5,
- 'sha-1': hashlib.sha1,
- 'sha-512': hashlib.sha512,
-}
-
-
-# Used for serialization.
-SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
-
-
DEFAULT_BLACKLIST = (
# Temporary vim or python files.
r'^.+\.(?:pyc|swp)$',
pass
-class ConfigError(ValueError):
- """Generic failure to load a .isolated file."""
- pass
-
-
-class MappingError(OSError):
- """Failed to recreate the tree."""
- pass
-
-
-def is_valid_hash(value, algo):
- """Returns if the value is a valid hash for the corresponding algorithm."""
- size = 2 * algo().digest_size
- return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
-
-
-def hash_file(filepath, algo):
- """Calculates the hash of a file without reading it all in memory at once.
-
- |algo| should be one of hashlib hashing algorithm.
- """
- digest = algo()
- with open(filepath, 'rb') as f:
- while True:
- chunk = f.read(DISK_FILE_CHUNK)
- if not chunk:
- break
- digest.update(chunk)
- return digest.hexdigest()
-
-
def stream_read(stream, chunk_size):
"""Reads chunks from |stream| and yields them."""
while True:
yield data
-def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
+def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
"""Yields file content in chunks of |chunk_size| starting from |offset|."""
with open(filepath, 'rb') as f:
if offset:
yield tail
-def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
+def zip_decompress(
+ content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
"""Reads zipped data from |content_generator| and yields decompressed data.
Decompresses data in small chunks (no larger than |chunk_size|) so that
return True
-class WorkerPool(threading_utils.AutoRetryThreadPool):
- """Thread pool that automatically retries on IOError and runs a preconfigured
- function.
- """
- # Initial and maximum number of worker threads.
- INITIAL_WORKERS = 2
- MAX_WORKERS = 16
- RETRIES = 5
-
- def __init__(self):
- super(WorkerPool, self).__init__(
- [IOError],
- self.RETRIES,
- self.INITIAL_WORKERS,
- self.MAX_WORKERS,
- 0,
- 'remote')
-
-
class Item(object):
"""An item to push to Storage.
def __init__(self, storage_api):
self._storage_api = storage_api
- self._use_zip = is_namespace_with_compression(storage_api.namespace)
- self._hash_algo = get_hash_algo(storage_api.namespace)
+ self._use_zip = isolated_format.is_namespace_with_compression(
+ storage_api.namespace)
+ self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
self._cpu_thread_pool = None
self._net_thread_pool = None
def hash_algo(self):
"""Hashing algorithm used to name files in storage based on their content.
- Defined by |namespace|. See also 'get_hash_algo'.
+ Defined by |namespace|. See also isolated_format.get_hash_algo().
"""
return self._hash_algo
def net_thread_pool(self):
"""AutoRetryThreadPool for IO-bound tasks, retries IOError."""
if self._net_thread_pool is None:
- self._net_thread_pool = WorkerPool()
+ self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
return self._net_thread_pool
def close(self):
None, but |channel| later receives back |item| when upload ends.
"""
# Thread pool task priority.
- priority = WorkerPool.HIGH if item.high_priority else WorkerPool.MED
+ priority = (
+ threading_utils.PRIORITY_HIGH if item.high_priority
+ else threading_utils.PRIORITY_MED)
def push(content):
"""Pushes an Item and returns it to |channel|."""
# Prepare reading pipeline.
stream = self._storage_api.fetch(digest)
if self._use_zip:
- stream = zip_decompress(stream, DISK_FILE_CHUNK)
+ stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
# Run |stream| through verifier that will assert its size.
verifier = FetchStreamVerifier(stream, size)
# Verified stream goes to |sink|.
# Enqueue all requests.
for batch in batch_items_for_check(items):
- self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
+ self.net_thread_pool.add_task_with_channel(
+ channel, threading_utils.PRIORITY_HIGH,
self._storage_api.contains, batch)
pending += 1
self._accessed = set()
self._fetched = cache.cached_set()
- def add(self, digest, size=UNKNOWN_FILE_SIZE, priority=WorkerPool.MED):
+ def add(
+ self,
+ digest,
+ size=UNKNOWN_FILE_SIZE,
+ priority=threading_utils.PRIORITY_MED):
"""Starts asynchronous fetch of item |digest|."""
# Fetching it now?
if digest in self._pending:
try:
yield stored
except IOError as exc:
- raise MappingError('Failed to store an item in cache: %s' % exc)
+ raise isolated_format.MappingError(
+ 'Failed to store an item in cache: %s' % exc)
stored = chunk
if stored is not None:
self._inspect_chunk(stored, is_last=True)
try:
yield stored
except IOError as exc:
- raise MappingError('Failed to store an item in cache: %s' % exc)
+ raise isolated_format.MappingError(
+ 'Failed to store an item in cache: %s' % exc)
def _inspect_chunk(self, chunk, is_last):
"""Called for each fetched chunk before passing it to consumer."""
self.current_size += len(chunk)
- if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
+ if (is_last and
+ (self.expected_size != UNKNOWN_FILE_SIZE) and
(self.expected_size != self.current_size)):
raise IOError('Incorrect file size: expected %d, got %d' % (
self.expected_size, self.current_size))
logging.info('Protocol version: %s', caps['protocol_version'])
logging.info('Server version: %s', caps['server_app_version'])
if caps.get('error'):
- raise MappingError(caps['error'])
+ raise isolated_format.MappingError(caps['error'])
if not caps['access_token']:
raise ValueError('access_token is missing')
return caps
# namespace-level ACLs to this call.
with self._lock:
if self._server_caps is None:
- request_body = json.dumps(
- self._generate_handshake_request(), separators=(',', ':'))
- response = net.url_read(
- url=self._base_url + '/content-gs/handshake',
- data=request_body,
- content_type='application/json',
- method='POST')
- if response is None:
- raise MappingError('Failed to perform handshake.')
try:
- caps = json.loads(response)
+ caps = net.url_read_json(
+ url=self._base_url + '/content-gs/handshake',
+ data=self._generate_handshake_request())
+ if caps is None:
+ raise isolated_format.MappingError('Failed to perform handshake.')
if not isinstance(caps, dict):
raise ValueError('Expecting JSON dict')
self._server_caps = self._validate_handshake_response(caps)
# KeyError exception has very confusing str conversion: it's just a
# missing key value and nothing else. So print exception class name
# as well.
- raise MappingError('Invalid handshake response (%s): %s' % (
+ raise isolated_format.MappingError(
+ 'Invalid handshake response (%s): %s' % (
exc.__class__.__name__, exc))
return self._server_caps
# send it to isolated server. That way isolate server can verify that
# the data safely reached Google Storage (GS provides MD5 and CRC32C of
# stored files).
+ # TODO(maruel): Fix the server to accept propery data={} so
+ # url_read_json() can be used.
response = net.url_read(
url=push_state.finalize_url,
data='',
self._base_url,
self._namespace,
urllib.quote(self._server_capabilities['access_token']))
- response_body = net.url_read(
- url=query_url,
- data=json.dumps(body, separators=(',', ':')),
- content_type='application/json',
- method='POST')
- if response_body is None:
- raise MappingError('Failed to execute /pre-upload query')
# Response body is a list of push_urls (or null if file is already present).
+ response = None
try:
- response = json.loads(response_body)
+ response = net.url_read_json(url=query_url, data=body)
+ if response is None:
+ raise isolated_format.MappingError(
+ 'Failed to execute /pre-upload query')
if not isinstance(response, list):
raise ValueError('Expecting response with json-encoded list')
if len(response) != len(items):
'Incorrect number of items in the list, expected %d, '
'but got %d' % (len(items), len(response)))
except ValueError as err:
- raise MappingError(
- 'Invalid response from server: %s, body is %s' % (err, response_body))
+ raise isolated_format.MappingError(
+ 'Invalid response from server: %s, body is %s' % (err, response))
# Pick Items that are missing, attach _PushState to them.
missing_items = {}
os.chmod(dest, file_mode & self._file_mode_mask)
-def get_hash_algo(_namespace):
- """Return hash algorithm class to use when uploading to given |namespace|."""
- # TODO(vadimsh): Implement this at some point.
- return hashlib.sha1
+class IsolatedBundle(object):
+ """Fetched and parsed .isolated file with all dependencies."""
+
+ def __init__(self):
+ self.command = []
+ self.files = {}
+ self.read_only = None
+ self.relative_cwd = None
+ # The main .isolated file, a IsolatedFile instance.
+ self.root = None
+ def fetch(self, fetch_queue, root_isolated_hash, algo):
+ """Fetches the .isolated and all the included .isolated.
-def is_namespace_with_compression(namespace):
- """Returns True if given |namespace| stores compressed objects."""
- return namespace.endswith(('-gzip', '-deflate'))
+ It enables support for "included" .isolated files. They are processed in
+ strict order but fetched asynchronously from the cache. This is important so
+ that a file in an included .isolated file that is overridden by an embedding
+ .isolated file is not fetched needlessly. The includes are fetched in one
+ pass and the files are fetched as soon as all the ones on the left-side
+ of the tree were fetched.
+
+ The prioritization is very important here for nested .isolated files.
+ 'includes' have the highest priority and the algorithm is optimized for both
+ deep and wide trees. A deep one is a long link of .isolated files referenced
+ one at a time by one item in 'includes'. A wide one has a large number of
+ 'includes' in a single .isolated file. 'left' is defined as an included
+ .isolated file earlier in the 'includes' list. So the order of the elements
+ in 'includes' is important.
+
+ As a side effect this method starts asynchronous fetch of all data files
+ by adding them to |fetch_queue|. It doesn't wait for data files to finish
+ fetching though.
+ """
+ self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
+
+ # Isolated files being retrieved now: hash -> IsolatedFile instance.
+ pending = {}
+ # Set of hashes of already retrieved items to refuse recursive includes.
+ seen = set()
+ # Set of IsolatedFile's whose data files have already being fetched.
+ processed = set()
+
+ def retrieve_async(isolated_file):
+ h = isolated_file.obj_hash
+ if h in seen:
+ raise isolated_format.IsolatedError(
+ 'IsolatedFile %s is retrieved recursively' % h)
+ assert h not in pending
+ seen.add(h)
+ pending[h] = isolated_file
+ fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
+
+ # Start fetching root *.isolated file (single file, not the whole bundle).
+ retrieve_async(self.root)
+
+ while pending:
+ # Wait until some *.isolated file is fetched, parse it.
+ item_hash = fetch_queue.wait(pending)
+ item = pending.pop(item_hash)
+ item.load(fetch_queue.cache.read(item_hash))
+
+ # Start fetching included *.isolated files.
+ for new_child in item.children:
+ retrieve_async(new_child)
+
+ # Always fetch *.isolated files in traversal order, waiting if necessary
+ # until next to-be-processed node loads. "Waiting" is done by yielding
+ # back to the outer loop, that waits until some *.isolated is loaded.
+ for node in isolated_format.walk_includes(self.root):
+ if node not in processed:
+ # Not visited, and not yet loaded -> wait for it to load.
+ if not node.is_loaded:
+ break
+ # Not visited and loaded -> process it and continue the traversal.
+ self._start_fetching_files(node, fetch_queue)
+ processed.add(node)
+
+ # All *.isolated files should be processed by now and only them.
+ all_isolateds = set(isolated_format.walk_includes(self.root))
+ assert all_isolateds == processed, (all_isolateds, processed)
+
+ # Extract 'command' and other bundle properties.
+ for node in isolated_format.walk_includes(self.root):
+ self._update_self(node)
+ self.relative_cwd = self.relative_cwd or ''
+
+ def _start_fetching_files(self, isolated, fetch_queue):
+ """Starts fetching files from |isolated| that are not yet being fetched.
+
+ Modifies self.files.
+ """
+ logging.debug('fetch_files(%s)', isolated.obj_hash)
+ for filepath, properties in isolated.data.get('files', {}).iteritems():
+ # Root isolated has priority on the files being mapped. In particular,
+ # overridden files must not be fetched.
+ if filepath not in self.files:
+ self.files[filepath] = properties
+ if 'h' in properties:
+ # Preemptively request files.
+ logging.debug('fetching %s', filepath)
+ fetch_queue.add(
+ properties['h'], properties['s'], threading_utils.PRIORITY_MED)
+
+ def _update_self(self, node):
+ """Extracts bundle global parameters from loaded *.isolated file.
+
+ Will be called with each loaded *.isolated file in order of traversal of
+ isolated include graph (see isolated_format.walk_includes).
+ """
+ # Grabs properties.
+ if not self.command and node.data.get('command'):
+ # Ensure paths are correctly separated on windows.
+ self.command = node.data['command']
+ if self.command:
+ self.command[0] = self.command[0].replace('/', os.path.sep)
+ self.command = tools.fix_python_path(self.command)
+ if self.read_only is None and node.data.get('read_only') is not None:
+ self.read_only = node.data['read_only']
+ if (self.relative_cwd is None and
+ node.data.get('relative_cwd') is not None):
+ self.relative_cwd = node.data['relative_cwd']
def get_storage_api(file_or_url, namespace):
return Storage(get_storage_api(file_or_url, namespace))
-def expand_symlinks(indir, relfile):
- """Follows symlinks in |relfile|, but treating symlinks that point outside the
- build tree as if they were ordinary directories/files. Returns the final
- symlink-free target and a list of paths to symlinks encountered in the
- process.
-
- The rule about symlinks outside the build tree is for the benefit of the
- Chromium OS ebuild, which symlinks the output directory to an unrelated path
- in the chroot.
-
- Fails when a directory loop is detected, although in theory we could support
- that case.
- """
- is_directory = relfile.endswith(os.path.sep)
- done = indir
- todo = relfile.strip(os.path.sep)
- symlinks = []
-
- while todo:
- pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
- done, todo)
- if not symlink:
- todo = file_path.fix_native_path_case(done, todo)
- done = os.path.join(done, todo)
- break
- symlink_path = os.path.join(done, pre_symlink, symlink)
- post_symlink = post_symlink.lstrip(os.path.sep)
- # readlink doesn't exist on Windows.
- # pylint: disable=E1101
- target = os.path.normpath(os.path.join(done, pre_symlink))
- symlink_target = os.readlink(symlink_path)
- if os.path.isabs(symlink_target):
- # Absolute path are considered a normal directories. The use case is
- # generally someone who puts the output directory on a separate drive.
- target = symlink_target
- else:
- # The symlink itself could be using the wrong path case.
- target = file_path.fix_native_path_case(target, symlink_target)
-
- if not os.path.exists(target):
- raise MappingError(
- 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
- target = file_path.get_native_path_case(target)
- if not file_path.path_starts_with(indir, target):
- done = symlink_path
- todo = post_symlink
- continue
- if file_path.path_starts_with(target, symlink_path):
- raise MappingError(
- 'Can\'t map recursive symlink reference %s -> %s' %
- (symlink_path, target))
- logging.info('Found symlink: %s -> %s', symlink_path, target)
- symlinks.append(os.path.relpath(symlink_path, indir))
- # Treat the common prefix of the old and new paths as done, and start
- # scanning again.
- target = target.split(os.path.sep)
- symlink_path = symlink_path.split(os.path.sep)
- prefix_length = 0
- for target_piece, symlink_path_piece in zip(target, symlink_path):
- if target_piece == symlink_path_piece:
- prefix_length += 1
- else:
- break
- done = os.path.sep.join(target[:prefix_length])
- todo = os.path.join(
- os.path.sep.join(target[prefix_length:]), post_symlink)
-
- relfile = os.path.relpath(done, indir)
- relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
- return relfile, symlinks
-
-
-def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
- """Expands a single input. It can result in multiple outputs.
-
- This function is recursive when relfile is a directory.
-
- Note: this code doesn't properly handle recursive symlink like one created
- with:
- ln -s .. foo
- """
- if os.path.isabs(relfile):
- raise MappingError('Can\'t map absolute path %s' % relfile)
-
- infile = file_path.normpath(os.path.join(indir, relfile))
- if not infile.startswith(indir):
- raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
-
- filepath = os.path.join(indir, relfile)
- native_filepath = file_path.get_native_path_case(filepath)
- if filepath != native_filepath:
- # Special case './'.
- if filepath != native_filepath + '.' + os.path.sep:
- # While it'd be nice to enforce path casing on Windows, it's impractical.
- # Also give up enforcing strict path case on OSX. Really, it's that sad.
- # The case where it happens is very specific and hard to reproduce:
- # get_native_path_case(
- # u'Foo.framework/Versions/A/Resources/Something.nib') will return
- # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
- #
- # Note that this is really something deep in OSX because running
- # ls Foo.framework/Versions/A
- # will print out 'Resources', while file_path.get_native_path_case()
- # returns a lower case 'r'.
- #
- # So *something* is happening under the hood resulting in the command 'ls'
- # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
- # have no idea why.
- if sys.platform not in ('darwin', 'win32'):
- raise MappingError(
- 'File path doesn\'t equal native file path\n%s != %s' %
- (filepath, native_filepath))
-
- symlinks = []
- if follow_symlinks:
- relfile, symlinks = expand_symlinks(indir, relfile)
-
- if relfile.endswith(os.path.sep):
- if not os.path.isdir(infile):
- raise MappingError(
- '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
-
- # Special case './'.
- if relfile.startswith('.' + os.path.sep):
- relfile = relfile[2:]
- outfiles = symlinks
- try:
- for filename in os.listdir(infile):
- inner_relfile = os.path.join(relfile, filename)
- if blacklist and blacklist(inner_relfile):
- continue
- if os.path.isdir(os.path.join(indir, inner_relfile)):
- inner_relfile += os.path.sep
- outfiles.extend(
- expand_directory_and_symlink(indir, inner_relfile, blacklist,
- follow_symlinks))
- return outfiles
- except OSError as e:
- raise MappingError(
- 'Unable to iterate over directory %s.\n%s' % (infile, e))
- else:
- # Always add individual files even if they were blacklisted.
- if os.path.isdir(infile):
- raise MappingError(
- 'Input directory %s must have a trailing slash' % infile)
-
- if not os.path.isfile(infile):
- raise MappingError('Input file %s doesn\'t exist' % infile)
-
- return symlinks + [relfile]
-
-
-def process_input(filepath, prevdict, read_only, algo):
- """Processes an input file, a dependency, and return meta data about it.
-
- Behaviors:
- - Retrieves the file mode, file size, file timestamp, file link
- destination if it is a file link and calcultate the SHA-1 of the file's
- content if the path points to a file and not a symlink.
-
- Arguments:
- filepath: File to act on.
- prevdict: the previous dictionary. It is used to retrieve the cached sha-1
- to skip recalculating the hash. Optional.
- read_only: If 1 or 2, the file mode is manipulated. In practice, only save
- one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
- windows, mode is not set since all files are 'executable' by
- default.
- algo: Hashing algorithm used.
-
- Returns:
- The necessary data to create a entry in the 'files' section of an .isolated
- file.
- """
- out = {}
- # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
- # if prevdict.get('T') == True:
- # # The file's content is ignored. Skip the time and hard code mode.
- # out['s'] = 0
- # out['h'] = algo().hexdigest()
- # out['T'] = True
- # return out
-
- # Always check the file stat and check if it is a link. The timestamp is used
- # to know if the file's content/symlink destination should be looked into.
- # E.g. only reuse from prevdict if the timestamp hasn't changed.
- # There is the risk of the file's timestamp being reset to its last value
- # manually while its content changed. We don't protect against that use case.
- try:
- filestats = os.lstat(filepath)
- except OSError:
- # The file is not present.
- raise MappingError('%s is missing' % filepath)
- is_link = stat.S_ISLNK(filestats.st_mode)
-
- if sys.platform != 'win32':
- # Ignore file mode on Windows since it's not really useful there.
- filemode = stat.S_IMODE(filestats.st_mode)
- # Remove write access for group and all access to 'others'.
- filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
- if read_only:
- filemode &= ~stat.S_IWUSR
- if filemode & stat.S_IXUSR:
- filemode |= stat.S_IXGRP
- else:
- filemode &= ~stat.S_IXGRP
- if not is_link:
- out['m'] = filemode
-
- # Used to skip recalculating the hash or link destination. Use the most recent
- # update time.
- # TODO(maruel): Save it in the .state file instead of .isolated so the
- # .isolated file is deterministic.
- out['t'] = int(round(filestats.st_mtime))
-
- if not is_link:
- out['s'] = filestats.st_size
- # If the timestamp wasn't updated and the file size is still the same, carry
- # on the sha-1.
- if (prevdict.get('t') == out['t'] and
- prevdict.get('s') == out['s']):
- # Reuse the previous hash if available.
- out['h'] = prevdict.get('h')
- if not out.get('h'):
- out['h'] = hash_file(filepath, algo)
- else:
- # If the timestamp wasn't updated, carry on the link destination.
- if prevdict.get('t') == out['t']:
- # Reuse the previous link destination if available.
- out['l'] = prevdict.get('l')
- if out.get('l') is None:
- # The link could be in an incorrect path case. In practice, this only
- # happen on OSX on case insensitive HFS.
- # TODO(maruel): It'd be better if it was only done once, in
- # expand_directory_and_symlink(), so it would not be necessary to do again
- # here.
- symlink_value = os.readlink(filepath) # pylint: disable=E1101
- filedir = file_path.get_native_path_case(os.path.dirname(filepath))
- native_dest = file_path.fix_native_path_case(filedir, symlink_value)
- out['l'] = os.path.relpath(native_dest, filedir)
- return out
-
-
-def save_isolated(isolated, data):
- """Writes one or multiple .isolated files.
-
- Note: this reference implementation does not create child .isolated file so it
- always returns an empty list.
-
- Returns the list of child isolated files that are included by |isolated|.
- """
- # Make sure the data is valid .isolated data by 'reloading' it.
- algo = SUPPORTED_ALGOS[data['algo']]
- load_isolated(json.dumps(data), algo)
- tools.write_json(isolated, data, True)
- return []
-
-
def upload_tree(base_url, indir, infiles, namespace):
"""Uploads the given tree to the given url.
return 0
-def load_isolated(content, algo):
- """Verifies the .isolated file is valid and loads this object with the json
- data.
-
- Arguments:
- - content: raw serialized content to load.
- - algo: hashlib algorithm class. Used to confirm the algorithm matches the
- algorithm used on the Isolate Server.
- """
- try:
- data = json.loads(content)
- except ValueError:
- raise ConfigError('Failed to parse: %s...' % content[:100])
-
- if not isinstance(data, dict):
- raise ConfigError('Expected dict, got %r' % data)
-
- # Check 'version' first, since it could modify the parsing after.
- value = data.get('version', '1.0')
- if not isinstance(value, basestring):
- raise ConfigError('Expected string, got %r' % value)
- try:
- version = tuple(map(int, value.split('.')))
- except ValueError:
- raise ConfigError('Expected valid version, got %r' % value)
-
- expected_version = tuple(map(int, ISOLATED_FILE_VERSION.split('.')))
- # Major version must match.
- if version[0] != expected_version[0]:
- raise ConfigError(
- 'Expected compatible \'%s\' version, got %r' %
- (ISOLATED_FILE_VERSION, value))
-
- if algo is None:
- # TODO(maruel): Remove the default around Jan 2014.
- # Default the algorithm used in the .isolated file itself, falls back to
- # 'sha-1' if unspecified.
- algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
-
- for key, value in data.iteritems():
- if key == 'algo':
- if not isinstance(value, basestring):
- raise ConfigError('Expected string, got %r' % value)
- if value not in SUPPORTED_ALGOS:
- raise ConfigError(
- 'Expected one of \'%s\', got %r' %
- (', '.join(sorted(SUPPORTED_ALGOS)), value))
- if value != SUPPORTED_ALGOS_REVERSE[algo]:
- raise ConfigError(
- 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
-
- elif key == 'command':
- if not isinstance(value, list):
- raise ConfigError('Expected list, got %r' % value)
- if not value:
- raise ConfigError('Expected non-empty command')
- for subvalue in value:
- if not isinstance(subvalue, basestring):
- raise ConfigError('Expected string, got %r' % subvalue)
-
- elif key == 'files':
- if not isinstance(value, dict):
- raise ConfigError('Expected dict, got %r' % value)
- for subkey, subvalue in value.iteritems():
- if not isinstance(subkey, basestring):
- raise ConfigError('Expected string, got %r' % subkey)
- if not isinstance(subvalue, dict):
- raise ConfigError('Expected dict, got %r' % subvalue)
- for subsubkey, subsubvalue in subvalue.iteritems():
- if subsubkey == 'l':
- if not isinstance(subsubvalue, basestring):
- raise ConfigError('Expected string, got %r' % subsubvalue)
- elif subsubkey == 'm':
- if not isinstance(subsubvalue, int):
- raise ConfigError('Expected int, got %r' % subsubvalue)
- elif subsubkey == 'h':
- if not is_valid_hash(subsubvalue, algo):
- raise ConfigError('Expected sha-1, got %r' % subsubvalue)
- elif subsubkey == 's':
- if not isinstance(subsubvalue, (int, long)):
- raise ConfigError('Expected int or long, got %r' % subsubvalue)
- else:
- raise ConfigError('Unknown subsubkey %s' % subsubkey)
- if bool('h' in subvalue) == bool('l' in subvalue):
- raise ConfigError(
- 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
- subvalue)
- if bool('h' in subvalue) != bool('s' in subvalue):
- raise ConfigError(
- 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
- subvalue)
- if bool('s' in subvalue) == bool('l' in subvalue):
- raise ConfigError(
- 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
- subvalue)
- if bool('l' in subvalue) and bool('m' in subvalue):
- raise ConfigError(
- 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
- subvalue)
-
- elif key == 'includes':
- if not isinstance(value, list):
- raise ConfigError('Expected list, got %r' % value)
- if not value:
- raise ConfigError('Expected non-empty includes list')
- for subvalue in value:
- if not is_valid_hash(subvalue, algo):
- raise ConfigError('Expected sha-1, got %r' % subvalue)
-
- elif key == 'os':
- if version >= (1, 4):
- raise ConfigError('Key \'os\' is not allowed starting version 1.4')
-
- elif key == 'read_only':
- if not value in (0, 1, 2):
- raise ConfigError('Expected 0, 1 or 2, got %r' % value)
-
- elif key == 'relative_cwd':
- if not isinstance(value, basestring):
- raise ConfigError('Expected string, got %r' % value)
-
- elif key == 'version':
- # Already checked above.
- pass
-
- else:
- raise ConfigError('Unknown key %r' % key)
-
- # Automatically fix os.path.sep if necessary. While .isolated files are always
- # in the the native path format, someone could want to download an .isolated
- # tree from another OS.
- wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
- if 'files' in data:
- data['files'] = dict(
- (k.replace(wrong_path_sep, os.path.sep), v)
- for k, v in data['files'].iteritems())
- for v in data['files'].itervalues():
- if 'l' in v:
- v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
- if 'relative_cwd' in data:
- data['relative_cwd'] = data['relative_cwd'].replace(
- wrong_path_sep, os.path.sep)
- return data
-
-
-class IsolatedFile(object):
- """Represents a single parsed .isolated file."""
- def __init__(self, obj_hash, algo):
- """|obj_hash| is really the sha-1 of the file."""
- logging.debug('IsolatedFile(%s)' % obj_hash)
- self.obj_hash = obj_hash
- self.algo = algo
- # Set once all the left-side of the tree is parsed. 'Tree' here means the
- # .isolate and all the .isolated files recursively included by it with
- # 'includes' key. The order of each sha-1 in 'includes', each representing a
- # .isolated file in the hash table, is important, as the later ones are not
- # processed until the firsts are retrieved and read.
- self.can_fetch = False
-
- # Raw data.
- self.data = {}
- # A IsolatedFile instance, one per object in self.includes.
- self.children = []
-
- # Set once the .isolated file is loaded.
- self._is_parsed = False
- # Set once the files are fetched.
- self.files_fetched = False
-
- def load(self, content):
- """Verifies the .isolated file is valid and loads this object with the json
- data.
- """
- logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
- assert not self._is_parsed
- self.data = load_isolated(content, self.algo)
- self.children = [
- IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
- ]
- self._is_parsed = True
-
- def fetch_files(self, fetch_queue, files):
- """Adds files in this .isolated file not present in |files| dictionary.
-
- Preemptively request files.
-
- Note that |files| is modified by this function.
- """
- assert self.can_fetch
- if not self._is_parsed or self.files_fetched:
- return
- logging.debug('fetch_files(%s)' % self.obj_hash)
- for filepath, properties in self.data.get('files', {}).iteritems():
- # Root isolated has priority on the files being mapped. In particular,
- # overriden files must not be fetched.
- if filepath not in files:
- files[filepath] = properties
- if 'h' in properties:
- # Preemptively request files.
- logging.debug('fetching %s' % filepath)
- fetch_queue.add(properties['h'], properties['s'], WorkerPool.MED)
- self.files_fetched = True
-
-
-class Settings(object):
- """Results of a completely parsed .isolated file."""
- def __init__(self):
- self.command = []
- self.files = {}
- self.read_only = None
- self.relative_cwd = None
- # The main .isolated file, a IsolatedFile instance.
- self.root = None
-
- def load(self, fetch_queue, root_isolated_hash, algo):
- """Loads the .isolated and all the included .isolated asynchronously.
-
- It enables support for "included" .isolated files. They are processed in
- strict order but fetched asynchronously from the cache. This is important so
- that a file in an included .isolated file that is overridden by an embedding
- .isolated file is not fetched needlessly. The includes are fetched in one
- pass and the files are fetched as soon as all the ones on the left-side
- of the tree were fetched.
-
- The prioritization is very important here for nested .isolated files.
- 'includes' have the highest priority and the algorithm is optimized for both
- deep and wide trees. A deep one is a long link of .isolated files referenced
- one at a time by one item in 'includes'. A wide one has a large number of
- 'includes' in a single .isolated file. 'left' is defined as an included
- .isolated file earlier in the 'includes' list. So the order of the elements
- in 'includes' is important.
- """
- self.root = IsolatedFile(root_isolated_hash, algo)
-
- # Isolated files being retrieved now: hash -> IsolatedFile instance.
- pending = {}
- # Set of hashes of already retrieved items to refuse recursive includes.
- seen = set()
-
- def retrieve(isolated_file):
- h = isolated_file.obj_hash
- if h in seen:
- raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
- assert h not in pending
- seen.add(h)
- pending[h] = isolated_file
- fetch_queue.add(h, priority=WorkerPool.HIGH)
-
- retrieve(self.root)
-
- while pending:
- item_hash = fetch_queue.wait(pending)
- item = pending.pop(item_hash)
- item.load(fetch_queue.cache.read(item_hash))
- if item_hash == root_isolated_hash:
- # It's the root item.
- item.can_fetch = True
-
- for new_child in item.children:
- retrieve(new_child)
-
- # Traverse the whole tree to see if files can now be fetched.
- self._traverse_tree(fetch_queue, self.root)
-
- def check(n):
- return all(check(x) for x in n.children) and n.files_fetched
- assert check(self.root)
-
- self.relative_cwd = self.relative_cwd or ''
-
- def _traverse_tree(self, fetch_queue, node):
- if node.can_fetch:
- if not node.files_fetched:
- self._update_self(fetch_queue, node)
- will_break = False
- for i in node.children:
- if not i.can_fetch:
- if will_break:
- break
- # Automatically mark the first one as fetcheable.
- i.can_fetch = True
- will_break = True
- self._traverse_tree(fetch_queue, i)
-
- def _update_self(self, fetch_queue, node):
- node.fetch_files(fetch_queue, self.files)
- # Grabs properties.
- if not self.command and node.data.get('command'):
- # Ensure paths are correctly separated on windows.
- self.command = node.data['command']
- if self.command:
- self.command[0] = self.command[0].replace('/', os.path.sep)
- self.command = tools.fix_python_path(self.command)
- if self.read_only is None and node.data.get('read_only') is not None:
- self.read_only = node.data['read_only']
- if (self.relative_cwd is None and
- node.data.get('relative_cwd') is not None):
- self.relative_cwd = node.data['relative_cwd']
-
-
def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
"""Aggressively downloads the .isolated file(s), then download all the files.
require_command: Ensure *.isolated specifies a command to run.
Returns:
- Settings object that holds details about loaded *.isolated file.
+ IsolatedBundle object that holds details about loaded *.isolated file.
"""
logging.debug(
'fetch_isolated(%s, %s, %s, %s, %s)',
algo = storage.hash_algo
with cache:
fetch_queue = FetchQueue(storage, cache)
- settings = Settings()
+ bundle = IsolatedBundle()
with tools.Profiler('GetIsolateds'):
# Optionally support local files by manually adding them to cache.
- if not is_valid_hash(isolated_hash, algo):
+ if not isolated_format.is_valid_hash(isolated_hash, algo):
logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
try:
isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
except IOError:
- raise MappingError(
+ raise isolated_format.MappingError(
'%s doesn\'t seem to be a valid file. Did you intent to pass a '
'valid hash?' % isolated_hash)
# Load all *.isolated and start loading rest of the files.
- settings.load(fetch_queue, isolated_hash, algo)
- if require_command and not settings.command:
+ bundle.fetch(fetch_queue, isolated_hash, algo)
+ if require_command and not bundle.command:
# TODO(vadimsh): All fetch operations are already enqueue and there's no
# easy way to cancel them.
- raise ConfigError('No command to run')
+ raise isolated_format.IsolatedError('No command to run')
with tools.Profiler('GetRest'):
# Create file system hierarchy.
if not os.path.isdir(outdir):
os.makedirs(outdir)
- create_directories(outdir, settings.files)
- create_symlinks(outdir, settings.files.iteritems())
+ create_directories(outdir, bundle.files)
+ create_symlinks(outdir, bundle.files.iteritems())
# Ensure working directory exists.
- cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
+ cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
if not os.path.isdir(cwd):
os.makedirs(cwd)
# Multimap: digest -> list of pairs (path, props).
remaining = {}
- for filepath, props in settings.files.iteritems():
+ for filepath, props in bundle.files.iteritems():
if 'h' in props:
remaining.setdefault(props['h'], []).append((filepath, props))
# Cache could evict some items we just tried to fetch, it's a fatal error.
if not fetch_queue.verify_all_cached():
- raise MappingError('Cache is too small to hold all requested files')
- return settings
+ raise isolated_format.MappingError(
+ 'Cache is too small to hold all requested files')
+ return bundle
def directory_to_metadata(root, algo, blacklist):
"""Returns the FileItem list and .isolated metadata for a directory."""
root = file_path.get_native_path_case(root)
- paths = expand_directory_and_symlink(
+ paths = isolated_format.expand_directory_and_symlink(
root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
- metadata = dict(
- (relpath, process_input(os.path.join(root, relpath), {}, False, algo))
- for relpath in paths
- )
+ metadata = {
+ relpath: isolated_format.file_to_metadata(
+ os.path.join(root, relpath), {}, False, algo)
+ for relpath in paths
+ }
for v in metadata.itervalues():
v.pop('t')
items = [
handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
os.close(handle)
data = {
- 'algo': SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
+ 'algo':
+ isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
'files': metadata,
- 'version': ISOLATED_FILE_VERSION,
+ 'version': isolated_format.ISOLATED_FILE_VERSION,
}
- save_isolated(isolated, data)
- h = hash_file(isolated, storage.hash_algo)
+ isolated_format.save_isolated(isolated, data)
+ h = isolated_format.hash_file(isolated, storage.hash_algo)
items_to_upload.extend(items)
items_to_upload.append(
FileItem(
results.append((h, f))
elif os.path.isfile(filepath):
- h = hash_file(filepath, storage.hash_algo)
+ h = isolated_format.hash_file(filepath, storage.hash_algo)
items_to_upload.append(
FileItem(
path=filepath,
pending[digest] = dest
storage.async_fetch(
channel,
- WorkerPool.MED,
+ threading_utils.PRIORITY_MED,
digest,
UNKNOWN_FILE_SIZE,
functools.partial(file_write, os.path.join(options.target, dest)))
# Fetching whole isolated tree.
if options.isolated:
- settings = fetch_isolated(
+ bundle = fetch_isolated(
isolated_hash=options.isolated,
storage=storage,
cache=MemoryCache(),
outdir=options.target,
require_command=False)
- rel = os.path.join(options.target, settings.relative_cwd)
+ rel = os.path.join(options.target, bundle.relative_cwd)
print('To run this test please run from the directory %s:' %
os.path.join(options.target, rel))
- print(' ' + ' '.join(settings.command))
-
- return 0
-
-
-@subcommand.usage('<file1..fileN> or - to read from stdin')
-def CMDhashtable(parser, args):
- """Archives data to a hashtable on the file system.
-
- If a directory is specified, a .isolated file is created the whole directory
- is uploaded. Then this .isolated file can be included in another one to run
- commands.
+ print(' ' + ' '.join(bundle.command))
- The commands output each file that was processed with its content hash. For
- directories, the .isolated generated for the directory is listed as the
- directory entry itself.
- """
- add_outdir_options(parser)
- parser.add_option(
- '--blacklist',
- action='append', default=list(DEFAULT_BLACKLIST),
- help='List of regexp to use as blacklist filter when uploading '
- 'directories')
- options, files = parser.parse_args(args)
- process_outdir_options(parser, options, os.getcwd())
- try:
- # Do not compress files when archiving to the file system.
- archive(options.outdir, 'default', files, options.blacklist)
- except Error as e:
- parser.error(e.args[0])
return 0
parser.error('Path given to --indir must exist.')
-
-def add_outdir_options(parser):
- """Adds --outdir, which is orthogonal to --isolate-server.
-
- Note: On upload, separate commands are used between 'archive' and 'hashtable'.
- On 'download', the same command can download from either an isolate server or
- a file system.
- """
- parser.add_option(
- '-o', '--outdir', metavar='DIR',
- help='Directory used to recreate the tree.')
-
-
-def process_outdir_options(parser, options, cwd):
- if not options.outdir:
- parser.error('--outdir is required.')
- if file_path.is_url(options.outdir):
- parser.error('Can\'t use an URL for --outdir.')
- options.outdir = unicode(options.outdir).replace('/', os.path.sep)
- # outdir doesn't need native path case since tracing is never done from there.
- options.outdir = os.path.abspath(
- os.path.normpath(os.path.join(cwd, options.outdir)))
- # In theory, we'd create the directory outdir right away. Defer doing it in
- # case there's errors in the command line.
-
-
class OptionParserIsolateServer(tools.OptionParserWithLogging):
def __init__(self, **kwargs):
tools.OptionParserWithLogging.__init__(