from directory_zipper import DirectoryZipper
from docs_server_utils import ToUnicode
from file_system import FileNotFoundError
-from future import Future
+from future import All, Future
from path_canonicalizer import PathCanonicalizer
from path_util import AssertIsValid, IsDirectory, Join, ToDirectory
from special_paths import SITE_VERIFICATION_FILE
return self._path_canonicalizer.Canonicalize(path)
def GetContentAndType(self, path):
- '''Returns the ContentAndType of the file at |path|.
+ '''Returns a Future to the ContentAndType of the file at |path|.
+ '''
+ AssertIsValid(path)
+ base, ext = posixpath.splitext(path)
+ if self._directory_zipper and ext == '.zip':
+ return (self._directory_zipper.Zip(ToDirectory(base))
+ .Then(lambda zipped: ContentAndType(zipped,
+ 'application/zip',
+ None)))
+ return self._FindFileForPath(path).Then(self._content_cache.GetFromFile)
+
+ def GetVersion(self, path):
+ '''Returns a Future to the version of the file at |path|.
'''
AssertIsValid(path)
base, ext = posixpath.splitext(path)
-
- # Check for a zip file first, if zip is enabled.
if self._directory_zipper and ext == '.zip':
- zip_future = self._directory_zipper.Zip(ToDirectory(base))
- return Future(callback=
- lambda: ContentAndType(zip_future.Get(), 'application/zip', None))
-
- # If there is no file extension, look for a file with one of the default
- # extensions. If one cannot be found, check if the path is a directory.
- # If it is, then check for an index file with one of the default
- # extensions.
- if not ext:
- new_path = self._AddExt(path)
- # Add a trailing / to check if it is a directory and not a file with
- # no extension.
- if new_path is None and self.file_system.Exists(ToDirectory(path)).Get():
- new_path = self._AddExt(Join(path, 'index'))
- # If an index file wasn't found in this directly then we're never going
- # to find a file.
- if new_path is None:
- return FileNotFoundError.RaiseInFuture('"%s" is a directory' % path)
- if new_path is not None:
- path = new_path
-
- return self._content_cache.GetFromFile(path)
-
- def _AddExt(self, path):
- '''Tries to append each of the default file extensions to path and returns
- the first one that is an existing file.
+ stat_future = self.file_system.StatAsync(ToDirectory(base))
+ else:
+ stat_future = self._FindFileForPath(path).Then(self.file_system.StatAsync)
+ return stat_future.Then(lambda stat: stat.version)
+
+ def _FindFileForPath(self, path):
+ '''Finds the real file backing |path|. This may require looking for the
+ correct file extension, or looking for an 'index' file if it's a directory.
+ Returns None if no path is found.
'''
- for default_ext in self._default_extensions:
- if self.file_system.Exists(path + default_ext).Get():
- return path + default_ext
- return None
+ AssertIsValid(path)
+ _, ext = posixpath.splitext(path)
+
+ if ext:
+ # There was already an extension, trust that it's a path. Elsewhere
+ # up the stack this will be caught if it's not.
+ return Future(value=path)
+
+ def find_file_with_name(name):
+ '''Tries to find a file in the file system called |name| with one of the
+ default extensions of this content provider.
+ If none is found, returns None.
+ '''
+ paths = [name + ext for ext in self._default_extensions]
+ def get_first_path_which_exists(existence):
+ for exists, path in zip(existence, paths):
+ if exists:
+ return path
+ return None
+ return (All(self.file_system.Exists(path) for path in paths)
+ .Then(get_first_path_which_exists))
+
+ def find_index_file():
+ '''Tries to find an index file in |path|, if |path| is a directory.
+ If not, or if there is no index file, returns None.
+ '''
+ def get_index_if_directory_exists(directory_exists):
+ if not directory_exists:
+ return None
+ return find_file_with_name(Join(path, 'index'))
+ return (self.file_system.Exists(ToDirectory(path))
+ .Then(get_index_if_directory_exists))
+
+ # Try to find a file with the right name. If not, and it's a directory,
+ # look for an index file in that directory. If nothing at all is found,
+ # return the original |path| - its nonexistence will be caught up the stack.
+ return (find_file_with_name(path)
+ .Then(lambda found: found or find_index_file())
+ .Then(lambda found: found or path))
def Cron(self):
- futures = [('<path_canonicalizer>', # semi-arbitrary string since there is
- # no path associated with this Future.
- self._path_canonicalizer.Cron())]
+ futures = [self._path_canonicalizer.Cron()]
for root, _, files in self.file_system.Walk(''):
for f in files:
- futures.append((Join(root, f),
- self.GetContentAndType(Join(root, f))))
+ futures.append(self.GetContentAndType(Join(root, f)))
# Also cache the extension-less version of the file if needed.
base, ext = posixpath.splitext(f)
if f != SITE_VERIFICATION_FILE and ext in self._default_extensions:
- futures.append((Join(root, base),
- self.GetContentAndType(Join(root, base))))
+ futures.append(self.GetContentAndType(Join(root, base)))
# TODO(kalman): Cache .zip files for each directory (if supported).
- def resolve():
- for label, future in futures:
- try: future.Get()
- except: logging.error('%s: %s' % (label, traceback.format_exc()))
- return Future(callback=resolve)
+ return All(futures, except_pass=Exception, except_pass_log=True)
def __repr__(self):
return 'ContentProvider of <%s>' % repr(self.file_system)