from file_system import FileSystem, StatInfo, FileNotFoundError
from future import Future
+from path_util import IsDirectory
+from third_party.json_schema_compiler.memoize import memoize
class CachingFileSystem(FileSystem):
return self._file_system.Refresh()
def Stat(self, path):
+ return self.StatAsync(path).Get()
+
+ def StatAsync(self, path):
'''Stats the directory given, or if a file is given, stats the file's parent
directory to get info about the file.
'''
if dir_path and not dir_path.endswith('/'):
dir_path += '/'
- # ... and we only ever need to cache the dir stat, too.
- dir_stat = self._stat_object_store.Get(dir_path).Get()
- if dir_stat is None:
- dir_stat = self._file_system.Stat(dir_path)
- assert dir_stat is not None # should raise a FileNotFoundError
- self._stat_object_store.Set(dir_path, dir_stat)
-
- if path == dir_path:
- stat_info = dir_stat
- else:
+ def make_stat_info(dir_stat):
+ '''Converts a dir stat into the correct resulting StatInfo; if the Stat
+ was for a file, the StatInfo should just contain that file.
+ '''
+ if path == dir_path:
+ return dir_stat
+ # Was a file stat. Extract that file.
file_version = dir_stat.child_versions.get(file_path)
if file_version is None:
raise FileNotFoundError('No stat found for %s in %s (found %s)' %
(path, dir_path, dir_stat.child_versions))
- stat_info = StatInfo(file_version)
+ return StatInfo(file_version)
- return stat_info
+ dir_stat = self._stat_object_store.Get(dir_path).Get()
+ if dir_stat is not None:
+ return Future(value=make_stat_info(dir_stat))
+
+ dir_stat_future = self._MemoizedStatAsyncFromFileSystem(dir_path)
+ def resolve():
+ dir_stat = dir_stat_future.Get()
+ assert dir_stat is not None # should have raised a FileNotFoundError
+ # We only ever need to cache the dir stat.
+ self._stat_object_store.Set(dir_path, dir_stat)
+ return make_stat_info(dir_stat)
+ return Future(callback=resolve)
- def Read(self, paths):
+ @memoize
+ def _MemoizedStatAsyncFromFileSystem(self, dir_path):
+ '''This is a simple wrapper to memoize Futures to directory stats, since
+ StatAsync makes heavy use of it. Only cache directories so that the
+ memoized cache doesn't blow up.
+ '''
+ assert IsDirectory(dir_path)
+ return self._file_system.StatAsync(dir_path)
+
+ def Read(self, paths, skip_not_found=False):
'''Reads a list of files. If a file is in memcache and it is not out of
date, it is returned. Otherwise, the file is retrieved from the file system.
'''
- read_values = self._read_object_store.GetMulti(paths).Get()
- stat_values = self._stat_object_store.GetMulti(paths).Get()
- results = {} # maps path to read value
- uncached = {} # maps path to stat value
+ cached_read_values = self._read_object_store.GetMulti(paths).Get()
+ cached_stat_values = self._stat_object_store.GetMulti(paths).Get()
+
+ # Populate a map of paths to Futures to their stat. They may have already
+ # been cached in which case their Future will already have been constructed
+ # with a value.
+ stat_futures = {}
+
+ def swallow_file_not_found_error(future):
+ def resolve():
+ try: return future.Get()
+ except FileNotFoundError: return Nnone
+ return Future(callback=resolve)
+
for path in paths:
- stat_value = stat_values.get(path)
+ stat_value = cached_stat_values.get(path)
if stat_value is None:
- # TODO(cduvall): do a concurrent Stat with the missing stat values.
- try:
- stat_value = self.Stat(path)
- except:
- return Future(exc_info=sys.exc_info())
- read_value = read_values.get(path)
- if read_value is None:
- uncached[path] = stat_value
- continue
- read_data, read_version = read_value
- if stat_value.version != read_version:
- uncached[path] = stat_value
- continue
- results[path] = read_data
-
- if not uncached:
- return Future(value=results)
-
- uncached_read_futures = self._file_system.Read(uncached.keys())
+ stat_future = self.StatAsync(path)
+ if skip_not_found:
+ stat_future = swallow_file_not_found_error(stat_future)
+ else:
+ stat_future = Future(value=stat_value)
+ stat_futures[path] = stat_future
+
+ # Filter only the cached data which is fresh by comparing to the latest
+ # stat. The cached read data includes the cached version. Remove it for
+ # the result returned to callers.
+ fresh_data = dict(
+ (path, data) for path, (data, version) in cached_read_values.iteritems()
+ if stat_futures[path].Get().version == version)
+
+ if len(fresh_data) == len(paths):
+ # Everything was cached and up-to-date.
+ return Future(value=fresh_data)
+
+ # Read in the values that were uncached or old.
+ read_futures = self._file_system.Read(
+ set(paths) - set(fresh_data.iterkeys()),
+ skip_not_found=skip_not_found)
def resolve():
- new_results = uncached_read_futures.Get()
- # Update the cached data in the object store. This is a path -> (read,
- # version) mapping.
- self._read_object_store.SetMulti(dict(
- (path, (new_result, uncached[path].version))
- for path, new_result in new_results.iteritems()))
- new_results.update(results)
+ new_results = read_futures.Get()
+ # Update the cache. This is a path -> (data, version) mapping.
+ self._read_object_store.SetMulti(
+ dict((path, (new_result, stat_futures[path].Get().version))
+ for path, new_result in new_results.iteritems()))
+ new_results.update(fresh_data)
return new_results
return Future(callback=resolve)