or files, which yield a set of output file.
"""
+import collections
import hashlib
import logging
import os
import pynacl.file_tools
import pynacl.gsd_storage
import pynacl.hashing_tools
+import pynacl.log_tools
import pynacl.working_directory
import command
import substituter
+CloudStorageItem = collections.namedtuple('CloudStorageItem',
+ ['dir_item', 'log_url'])
+
+
class UserError(Exception):
pass
should be used when possible.
cache_results: Flag that indicates if successful computations should be
written to the cache.
- print_url: Function that accepts an URL for printing the build result,
- or None.
+ print_url: Function that accepts a CloudStorageItem for printing URL
+ results, or None if no printing is needed.
extra_paths: Extra substitution paths that can be used by commands.
"""
self._storage = storage
)
self._use_cached_results = use_cached_results
self._cache_results = cache_results
- self._cached_dir_items = {}
+ self._cached_cloud_items = {}
self._print_url = print_url
self._system_summary = system_summary
self._path_hash_cache = {}
self._extra_paths = extra_paths
def KeyForOutput(self, package, output_hash):
- """Compute the key to store a give output in the data-store.
+ """Compute the key to store a given output in the data-store.
Args:
package: Package name.
"""
return 'computed/%s.txt' % build_signature
- def WriteOutputFromHash(self, package, out_hash, output):
+ def KeyForLog(self, package, output_hash):
+ """Compute the key to store a given log file in the data-store.
+
+ Args:
+ package: Package name.
+ output_hash: Stable hash of the package output.
+ Returns:
+ Key that this instance of the package log should be stored/retrieved.
+ """
+ return 'log/%s_%s.log' % (package, output_hash)
+
+ def GetLogFile(self, work_dir, package):
+ """Returns the local log file for a given package.
+
+ Args:
+ work_dir: The work directory for the package.
+ package: The package name.
+ Returns:
+ Path to the local log file within the work directory.
+ """
+ return os.path.join(work_dir, '%s.log' % package)
+
+ def WriteOutputFromHash(self, work_dir, package, out_hash, output):
"""Write output from the cache.
Args:
+ work_dir: Working directory path.
package: Package name (for tgz name).
out_hash: Hash of desired output.
output: Output path.
Returns:
- URL from which output was obtained if successful, or None if not.
+ CloudStorageItem on success, None if not.
"""
key = self.KeyForOutput(package, out_hash)
dir_item = self._directory_storage.GetDirectory(key, output)
logging.warning('Object does not match expected hash, '
'has hashing method changed?')
return None
- return dir_item
- def _ProcessCachedDir(self, package, dir_item):
+ log_key = self.KeyForLog(package, out_hash)
+ log_file = self.GetLogFile(work_dir, package)
+ pynacl.file_tools.RemoveFile(log_file)
+ log_url = self._storage.GetFile(log_key, log_file)
+
+ return CloudStorageItem(dir_item, log_url)
+
+ def _ProcessCloudItem(self, package, cloud_item):
"""Processes cached directory storage items.
Args:
package: Package name for the cached directory item.
- dir_item: DirectoryStorageItem returned from directory_storage.
+ cloud_item: CloudStorageItem representing a memoized item in the cloud.
"""
# Store the cached URL as a tuple for book keeping.
- self._cached_dir_items[package] = dir_item
+ self._cached_cloud_items[package] = cloud_item
# If a print URL function has been specified, print the URL now.
if self._print_url is not None:
- self._print_url(dir_item.url)
+ self._print_url(cloud_item)
- def WriteResultToCache(self, package, build_signature, output):
+ def WriteResultToCache(self, work_dir, package, build_signature, output):
"""Cache a computed result by key.
Also prints URLs when appropriate.
Args:
+ work_dir: work directory for the package builder.
package: Package name (for tgz name).
build_signature: The input hash of the computation.
output: A path containing the output of the computation.
output_key = self.KeyForOutput(package, out_hash)
# Try to get an existing copy in a temporary directory.
wd = pynacl.working_directory.TemporaryWorkingDirectory()
- with wd as work_dir:
- temp_output = os.path.join(work_dir, 'out')
+ with wd as temp_dir:
+ temp_output = os.path.join(temp_dir, 'out')
dir_item = self._directory_storage.GetDirectory(output_key, temp_output)
+
+ log_key = self.KeyForLog(package, out_hash)
+ log_file = self.GetLogFile(work_dir, package)
+ log_url = None
+
if dir_item is None:
# Isn't present. Cache the computed result instead.
dir_item = self._directory_storage.PutDirectory(output, output_key)
+
+ if os.path.isfile(log_file):
+ log_url = self._storage.PutFile(log_file, log_key)
+
logging.info('Computed fresh result and cached it.')
else:
# Cached version is present. Replace the current output with that.
if self._use_cached_results:
pynacl.file_tools.RemoveDirectoryIfPresent(output)
shutil.move(temp_output, output)
- logging.info(
- 'Recomputed result matches cached value, '
- 'using cached value instead.')
+
+ pynacl.file_tools.RemoveFile(log_file)
+ log_url = self._storage.GetFile(log_key, log_file)
+
+ logging.info('Recomputed result matches cached value, '
+ 'using cached value instead.')
+ else:
+ log_key_exists = self._storage.Exists(log_key)
+ if log_key_exists:
+ log_url = log_key_exists
+
# Upload an entry mapping from computation input to output hash.
self._storage.PutData(
out_hash, self.KeyForBuildSignature(build_signature))
- self._ProcessCachedDir(package, dir_item)
+
+ cloud_item = CloudStorageItem(dir_item, log_url)
+ self._ProcessCloudItem(package, cloud_item)
except pynacl.gsd_storage.GSDStorageError:
logging.info('Failed to cache result.')
raise
- def ReadMemoizedResultFromCache(self, package, build_signature, output):
+ def ReadMemoizedResultFromCache(self, work_dir, package,
+ build_signature, output):
"""Read a cached result (if it exists) from the cache.
Also prints URLs when appropriate.
Args:
+ work_dir: Working directory for the build.
package: Package name (for tgz name).
build_signature: Build signature of the computation.
output: Output path.
out_hash = self._storage.GetData(
self.KeyForBuildSignature(build_signature))
if out_hash is not None:
- dir_item = self.WriteOutputFromHash(package, out_hash, output)
- if dir_item is not None:
+ cloud_item = self.WriteOutputFromHash(work_dir, package,
+ out_hash, output)
+ if cloud_item is not None:
logging.info('Retrieved cached result.')
- self._ProcessCachedDir(package, dir_item)
+ self._ProcessCloudItem(package, cloud_item)
return True
return False
- def GetCachedDirItems(self):
- """Returns the complete list of all cached directory items for this run."""
- return self._cached_dir_items.values()
+ def GetCachedCloudItems(self):
+ """Returns the complete list of all cached cloud items for this run."""
+ return self._cached_cloud_items.values()
- def GetCachedDirItemForPackage(self, package):
- """Returns cached directory item for package or None if not processed."""
- return self._cached_dir_items.get(package, None)
+ def GetCachedCloudItemForPackage(self, package):
+ """Returns cached cloud item for package or None if not processed."""
+ return self._cached_cloud_items.get(package, None)
def Run(self, package, inputs, output, commands, cmd_options=None,
working_dir=None, memoize=True, signature_file=None, subdir=None):
signature_file.flush()
# We're done if it's in the cache.
- if (memoize and
- self.ReadMemoizedResultFromCache(package, build_signature, output)):
+ if (memoize and self.ReadMemoizedResultFromCache(work_dir, package,
+ build_signature,
+ output)):
return
if subdir:
commands = [command for command in commands
if command.CheckRunCond(cmd_options)]
+ # Create a logger that will save the log for each command.
+ # This logger will process any messages and then pass the results
+ # up to the base logger.
+ base_logger = pynacl.log_tools.GetConsoleLogger()
+ cmd_logger = base_logger.getChild('OnceCmdLogger')
+ cmd_logger.setLevel(logging.DEBUG)
+
+ log_file = self.GetLogFile(work_dir, package)
+ file_log_handler = logging.FileHandler(log_file, 'wb')
+ file_log_handler.setLevel(logging.DEBUG)
+ file_log_handler.setFormatter(
+ logging.Formatter(fmt='[%(levelname)s - %(asctime)s] %(message)s'))
+ cmd_logger.addHandler(file_log_handler)
+
+ # Log some helpful information
+ cmd_logger.propagate = False
+ cmd_logger.debug('Hostname: %s', platform.node())
+ cmd_logger.debug('Machine: %s', platform.machine())
+ cmd_logger.debug('Platform: %s', sys.platform)
+ cmd_logger.propagate = True
+
for command in commands:
paths = inputs.copy()
paths.update(self._extra_paths)
paths['output'] = subdir if subdir else output
nonpath_subst['build_signature'] = build_signature
subst = substituter.Substituter(work_dir, paths, nonpath_subst)
- command.Invoke(subst)
+ command.Invoke(cmd_logger, subst)
+
+ # Uninstall the file log handler
+ cmd_logger.removeHandler(file_log_handler)
+ file_log_handler.close()
- # Confirm that we aren't hitting something we've cached.
- for path in self._path_hash_cache:
- if not os.path.relpath(output, path).startswith(os.pardir + os.sep):
- raise UserError(
- 'Package %s outputs to a directory already used as an input: %s' %
- (package, path))
+ # Confirm that we aren't hitting something we've cached.
+ for path in self._path_hash_cache:
+ if not os.path.relpath(output, path).startswith(os.pardir + os.sep):
+ raise UserError(
+ 'Package %s outputs to a directory already used as an input: %s' %
+ (package, path))
- if memoize:
- self.WriteResultToCache(package, build_signature, output)
+ if memoize:
+ self.WriteResultToCache(work_dir, package, build_signature, output)
def SystemSummary(self):
"""Gather a string describing intrinsic properties of the current machine.