# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import logging
import mimetypes
import posixpath
+import traceback
from compiled_file_system import SingleFile
from directory_zipper import DirectoryZipper
from docs_server_utils import ToUnicode
from file_system import FileNotFoundError
-from future import Gettable, Future
+from future import All, Future
from path_canonicalizer import PathCanonicalizer
-from path_util import AssertIsValid, Join, ToDirectory
+from path_util import AssertIsValid, IsDirectory, Join, ToDirectory
from special_paths import SITE_VERIFICATION_FILE
from third_party.handlebar import Handlebar
from third_party.markdown import markdown
'''Return value from ContentProvider.GetContentAndType.
'''
- def __init__(self, content, content_type):
+ def __init__(self, content, content_type, version):
self.content = content
self.content_type = content_type
+ self.version = version
class ContentProvider(object):
content = ToUnicode(text)
else:
content = text
- return ContentAndType(content, mimetype)
+ return ContentAndType(content,
+ mimetype,
+ self.file_system.Stat(path).version)
def GetCanonicalPath(self, path):
'''Gets the canonical location of |path|. This class is tolerant of
return self._path_canonicalizer.Canonicalize(path)
def GetContentAndType(self, path):
- '''Returns the ContentAndType of the file at |path|.
+ '''Returns a Future to the ContentAndType of the file at |path|.
+ '''
+ AssertIsValid(path)
+ base, ext = posixpath.splitext(path)
+ if self._directory_zipper and ext == '.zip':
+ return (self._directory_zipper.Zip(ToDirectory(base))
+ .Then(lambda zipped: ContentAndType(zipped,
+ 'application/zip',
+ None)))
+ return self._FindFileForPath(path).Then(self._content_cache.GetFromFile)
+
+ def GetVersion(self, path):
+ '''Returns a Future to the version of the file at |path|.
'''
AssertIsValid(path)
base, ext = posixpath.splitext(path)
-
- # Check for a zip file first, if zip is enabled.
if self._directory_zipper and ext == '.zip':
- zip_future = self._directory_zipper.Zip(ToDirectory(base))
- return Future(delegate=Gettable(
- lambda: ContentAndType(zip_future.Get(), 'application/zip')))
-
- # If there is no file extension, look for a file with one of the default
- # extensions.
- #
- # Note that it would make sense to guard this on Exists(path), since a file
- # without an extension may actually exist, but it's such an uncommon case
- # it hardly seems worth the potential performance hit.
- if not ext:
- for default_ext in self._default_extensions:
- if self.file_system.Exists(path + default_ext).Get():
- path += default_ext
- break
-
- return self._content_cache.GetFromFile(path)
+ stat_future = self.file_system.StatAsync(ToDirectory(base))
+ else:
+ stat_future = self._FindFileForPath(path).Then(self.file_system.StatAsync)
+ return stat_future.Then(lambda stat: stat.version)
+
+ def _FindFileForPath(self, path):
+ '''Finds the real file backing |path|. This may require looking for the
+ correct file extension, or looking for an 'index' file if it's a directory.
+ Returns None if no path is found.
+ '''
+ AssertIsValid(path)
+ _, ext = posixpath.splitext(path)
+
+ if ext:
+ # There was already an extension, trust that it's a path. Elsewhere
+ # up the stack this will be caught if it's not.
+ return Future(value=path)
+
+ def find_file_with_name(name):
+ '''Tries to find a file in the file system called |name| with one of the
+ default extensions of this content provider.
+ If none is found, returns None.
+ '''
+ paths = [name + ext for ext in self._default_extensions]
+ def get_first_path_which_exists(existence):
+ for exists, path in zip(existence, paths):
+ if exists:
+ return path
+ return None
+ return (All(self.file_system.Exists(path) for path in paths)
+ .Then(get_first_path_which_exists))
+
+ def find_index_file():
+ '''Tries to find an index file in |path|, if |path| is a directory.
+ If not, or if there is no index file, returns None.
+ '''
+ def get_index_if_directory_exists(directory_exists):
+ if not directory_exists:
+ return None
+ return find_file_with_name(Join(path, 'index'))
+ return (self.file_system.Exists(ToDirectory(path))
+ .Then(get_index_if_directory_exists))
+
+ # Try to find a file with the right name. If not, and it's a directory,
+ # look for an index file in that directory. If nothing at all is found,
+ # return the original |path| - its nonexistence will be caught up the stack.
+ return (find_file_with_name(path)
+ .Then(lambda found: found or find_index_file())
+ .Then(lambda found: found or path))
def Cron(self):
futures = [self._path_canonicalizer.Cron()]
if f != SITE_VERIFICATION_FILE and ext in self._default_extensions:
futures.append(self.GetContentAndType(Join(root, base)))
# TODO(kalman): Cache .zip files for each directory (if supported).
- return Future(delegate=Gettable(lambda: [f.Get() for f in futures]))
+ return All(futures, except_pass=Exception, except_pass_log=True)
def __repr__(self):
return 'ContentProvider of <%s>' % repr(self.file_system)