Spawns multiple internal threads. Thread safe, but not fork safe.
"""
- def __init__(self, storage_api, use_zip, hash_algo):
- self.use_zip = use_zip
- self.hash_algo = hash_algo
+ def __init__(self, storage_api):
self._storage_api = storage_api
+ self._use_zip = is_namespace_with_compression(storage_api.namespace)
+ self._hash_algo = get_hash_algo(storage_api.namespace)
self._cpu_thread_pool = None
self._net_thread_pool = None
@property
+ def hash_algo(self):
+ """Hashing algorithm used to name files in storage based on their content.
+
+ Defined by |namespace|. See also 'get_hash_algo'.
+ """
+ return self._hash_algo
+
+ @property
+ def location(self):
+ """Location of a backing store that this class is using.
+
+ Exact meaning depends on the storage_api type. For IsolateServer it is
+ an URL of isolate server, for FileSystem is it a path in file system.
+ """
+ return self._storage_api.location
+
+ @property
+ def namespace(self):
+ """Isolate namespace used by this storage.
+
+ Indirectly defines hashing scheme and compression method used.
+ """
+ return self._storage_api.namespace
+
+ @property
def cpu_thread_pool(self):
"""ThreadPool for CPU-bound tasks like zipping."""
if self._cpu_thread_pool is None:
# Ensure all digests are calculated.
for item in items:
- item.prepare(self.hash_algo)
+ item.prepare(self._hash_algo)
# For each digest keep only first Item that matches it. All other items
# are just indistinguishable copies from the point of view of isolate
Returns:
An URL or None if underlying protocol doesn't support this.
"""
- item.prepare(self.hash_algo)
+ item.prepare(self._hash_algo)
return self._storage_api.get_fetch_url(item.digest)
def async_push(self, channel, item, push_state):
priority = WorkerPool.HIGH if item.high_priority else WorkerPool.MED
def push(content):
- """Pushes an item and returns it to |channel|."""
- item.prepare(self.hash_algo)
+ """Pushes an Item and returns it to |channel|."""
+ item.prepare(self._hash_algo)
self._storage_api.push(item, push_state, content)
return item
# If zipping is not required, just start a push task.
- if not self.use_zip:
+ if not self._use_zip:
self.net_thread_pool.add_task_with_channel(
channel, priority, push, item.content())
return
try:
# Prepare reading pipeline.
stream = self._storage_api.fetch(digest)
- if self.use_zip:
+ if self._use_zip:
stream = zip_decompress(stream, DISK_FILE_CHUNK)
# Run |stream| through verifier that will assert its size.
verifier = FetchStreamVerifier(stream, size)
# Ensure all digests are calculated.
for item in items:
- item.prepare(self.hash_algo)
+ item.prepare(self._hash_algo)
# Enqueue all requests.
for batch in batch_items_for_check(items):
preferred since it implements compression and upload optimizations.
"""
+ @property
+ def location(self):
+ """Location of a backing store that this class is using.
+
+ Exact meaning depends on the type. For IsolateServer it is an URL of isolate
+ server, for FileSystem is it a path in file system.
+ """
+ raise NotImplementedError()
+
+ @property
+ def namespace(self):
+ """Isolate namespace used by this storage.
+
+ Indirectly defines hashing scheme and compression method used.
+ """
+ raise NotImplementedError()
+
def get_fetch_url(self, digest):
"""Returns an URL that can be used to fetch an item with given digest.
def __init__(self, base_url, namespace):
super(IsolateServer, self).__init__()
assert base_url.startswith('http'), base_url
- self.base_url = base_url.rstrip('/')
- self.namespace = namespace
+ self._base_url = base_url.rstrip('/')
+ self._namespace = namespace
self._lock = threading.Lock()
self._server_caps = None
request_body = json.dumps(
self._generate_handshake_request(), separators=(',', ':'))
response = net.url_read(
- url=self.base_url + '/content-gs/handshake',
+ url=self._base_url + '/content-gs/handshake',
data=request_body,
content_type='application/json',
method='POST')
exc.__class__.__name__, exc))
return self._server_caps
+ @property
+ def location(self):
+ return self._base_url
+
+ @property
+ def namespace(self):
+ return self._namespace
+
def get_fetch_url(self, digest):
assert isinstance(digest, basestring)
return '%s/content-gs/retrieve/%s/%s' % (
- self.base_url, self.namespace, digest)
+ self._base_url, self._namespace, digest)
def fetch(self, digest, offset=0):
source_url = self.get_fetch_url(digest)
logging.debug('download_file(%s, %d)', source_url, offset)
- # Because the app engine DB is only eventually consistent, retry 404 errors
- # because the file might just not be visible yet (even though it has been
- # uploaded).
connection = net.url_open(
source_url,
- retry_404=True,
read_timeout=DOWNLOAD_READ_TIMEOUT,
headers={'Range': 'bytes=%d-' % offset} if offset else None)
]
query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
- self.base_url,
- self.namespace,
+ self._base_url,
+ self._namespace,
urllib.quote(self._server_capabilities['access_token']))
response_body = net.url_read(
url=query_url,
# call 'contains' before 'push'. Naively passing None in 'push' will not work.
_DUMMY_PUSH_STATE = object()
- def __init__(self, base_path):
+ def __init__(self, base_path, namespace):
super(FileSystem, self).__init__()
- self.base_path = base_path
+ self._base_path = base_path
+ self._namespace = namespace
+
+ @property
+ def location(self):
+ return self._base_path
+
+ @property
+ def namespace(self):
+ return self._namespace
def get_fetch_url(self, digest):
return None
def fetch(self, digest, offset=0):
assert isinstance(digest, basestring)
- return file_read(os.path.join(self.base_path, digest), offset=offset)
+ return file_read(os.path.join(self._base_path, digest), offset=offset)
def push(self, item, push_state, content=None):
assert isinstance(item, Item)
if isinstance(content, basestring):
assert not isinstance(content, unicode), 'Unicode string is not allowed'
content = [content]
- file_write(os.path.join(self.base_path, item.digest), content)
+ file_write(os.path.join(self._base_path, item.digest), content)
def contains(self, items):
assert all(i.digest is not None and i.size is not None for i in items)
return dict(
(item, self._DUMMY_PUSH_STATE) for item in items
- if not os.path.exists(os.path.join(self.base_path, item.digest))
+ if not os.path.exists(os.path.join(self._base_path, item.digest))
)
class MemoryCache(LocalCache):
"""LocalCache implementation that stores everything in memory."""
- def __init__(self):
+ def __init__(self, file_mode_mask=0500):
+ """Args:
+ file_mode_mask: bit mask to AND file mode with. Default value will make
+ all mapped files to be read only.
+ """
super(MemoryCache, self).__init__()
+ self._file_mode_mask = file_mode_mask
# Let's not assume dict is thread safe.
self._lock = threading.Lock()
self._contents = {}
"""Since data is kept in memory, there is no filenode to hardlink."""
file_write(dest, [self.read(digest)])
if file_mode is not None:
- # Ignores all other bits.
- os.chmod(dest, file_mode & 0500)
+ os.chmod(dest, file_mode & self._file_mode_mask)
def get_hash_algo(_namespace):
if file_path.is_url(file_or_url):
return IsolateServer(file_or_url, namespace)
else:
- return FileSystem(file_or_url)
+ return FileSystem(file_or_url, namespace)
def get_storage(file_or_url, namespace):
Returns:
Instance of Storage.
"""
- return Storage(
- get_storage_api(file_or_url, namespace),
- is_namespace_with_compression(namespace),
- get_hash_algo(namespace))
+ return Storage(get_storage_api(file_or_url, namespace))
def expand_symlinks(indir, relfile):
self.relative_cwd = node.data['relative_cwd']
-def fetch_isolated(
- isolated_hash, storage, cache, algo, outdir, require_command):
+def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
"""Aggressively downloads the .isolated file(s), then download all the files.
Arguments:
isolated_hash: hash of the root *.isolated file.
storage: Storage class that communicates with isolate storage.
cache: LocalCache class that knows how to store and map files locally.
- algo: hash algorithm to use.
outdir: Output directory to map file tree to.
require_command: Ensure *.isolated specifies a command to run.
Returns:
Settings object that holds details about loaded *.isolated file.
"""
+ # Hash algorithm to use, defined by namespace |storage| is using.
+ algo = storage.hash_algo
with cache:
fetch_queue = FetchQueue(storage, cache)
settings = Settings()
return items, metadata
-def archive_files_to_storage(storage, algo, files, blacklist):
+def archive_files_to_storage(storage, files, blacklist):
"""Stores every entries and returns the relevant data.
Arguments:
storage: a Storage object that communicates with the remote object store.
- algo: an hashlib class to hash content. Usually hashlib.sha1.
files: list of file paths to upload. If a directory is specified, a
.isolated file is created and its hash is returned.
blacklist: function that returns True if a file should be omitted.
filepath = os.path.abspath(f)
if os.path.isdir(filepath):
# Uploading a whole directory.
- items, metadata = directory_to_metadata(filepath, algo, blacklist)
+ items, metadata = directory_to_metadata(
+ filepath, storage.hash_algo, blacklist)
# Create the .isolated file.
if not tempdir:
handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
os.close(handle)
data = {
- 'algo': SUPPORTED_ALGOS_REVERSE[algo],
+ 'algo': SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
'files': metadata,
'version': ISOLATED_FILE_VERSION,
}
save_isolated(isolated, data)
- h = hash_file(isolated, algo)
+ h = hash_file(isolated, storage.hash_algo)
items_to_upload.extend(items)
items_to_upload.append(
FileItem(
results.append((h, f))
elif os.path.isfile(filepath):
- h = hash_file(filepath, algo)
+ h = hash_file(filepath, storage.hash_algo)
items_to_upload.append(
FileItem(
path=filepath,
raise Error('Nothing to upload')
files = [f.decode('utf-8') for f in files]
- algo = get_hash_algo(namespace)
blacklist = tools.gen_blacklist(blacklist)
with get_storage(out, namespace) as storage:
- results = archive_files_to_storage(storage, algo, files, blacklist)
+ results = archive_files_to_storage(storage, files, blacklist)
print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
isolated_hash=options.isolated,
storage=storage,
cache=MemoryCache(),
- algo=get_hash_algo(options.namespace),
outdir=options.target,
require_command=False)
rel = os.path.join(options.target, settings.relative_cwd)