# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import posixpath
+
from file_system import FileSystem, FileNotFoundError
from future import Gettable, Future
-from test_file_system import TestFileSystem
+from test_file_system import _List, _StatTracker, TestFileSystem
+from path_util import IsDirectory
+
class MockFileSystem(FileSystem):
'''Wraps FileSystems to add a selection of mock behaviour:
-
- asserting how often Stat/Read calls are being made to it.
- primitive changes/versioning via applying object "diffs", mapping paths to
new content (similar to how TestFileSystem works).
'''
def __init__(self, file_system):
self._file_system = file_system
- # Updates are modelled are stored as TestFileSystems because they've
- # implemented a bunch of logic to interpret paths into dictionaries.
+ # Updates are stored as TestFileSystems because it already implements a
+ # bunch of logic to intepret paths into dictionaries.
self._updates = []
+ self._stat_tracker = _StatTracker()
self._read_count = 0
self._read_resolve_count = 0
self._stat_count = 0
self._read_resolve_count += 1
result = future_result.Get()
for path in result.iterkeys():
- _, update = self._GetMostRecentUpdate(path)
+ update = self._GetMostRecentUpdate(path)
if update is not None:
result[path] = update
return result
return self._file_system.Refresh()
def _GetMostRecentUpdate(self, path):
- for revision, update in reversed(list(enumerate(self._updates))):
+ '''Returns the latest update for the file at |path|, or None if |path|
+ has never been updated.
+ '''
+ for update in reversed(self._updates):
try:
- return (revision + 1, update.ReadSingle(path).Get())
+ return update.ReadSingle(path).Get()
except FileNotFoundError:
pass
- return (0, None)
+ return None
def Stat(self, path):
self._stat_count += 1
- return self._StatImpl(path)
-
- def _StatImpl(self, path):
- result = self._file_system.Stat(path)
- result.version = self._UpdateStat(result.version, path)
- child_versions = result.child_versions
- if child_versions is not None:
- for child_path in child_versions.iterkeys():
- child_versions[child_path] = self._UpdateStat(
- child_versions[child_path],
- '%s%s' % (path, child_path))
- return result
-
- def _UpdateStat(self, version, path):
- if not path.endswith('/'):
- return str(int(version) + self._GetMostRecentUpdate(path)[0])
- # Bleh, it's a directory, need to recursively search all the children.
- child_paths = self._file_system.ReadSingle(path).Get()
- if not child_paths:
- return version
- return str(max([int(version)] +
- [int(self._StatImpl('%s%s' % (path, child_path)).version)
- for child_path in child_paths]))
+
+ # This only supports numeric stat values since we need to add to it. In
+ # reality the logic here could just be to randomly mutate the stat values
+ # every time there's an Update but that's less meaningful for testing.
+ def stradd(a, b):
+ return str(int(a) + b)
+
+ stat = self._file_system.Stat(path)
+ stat.version = stradd(stat.version, self._stat_tracker.GetVersion(path))
+ if stat.child_versions:
+ for child_path, child_version in stat.child_versions.iteritems():
+ stat.child_versions[child_path] = stradd(
+ stat.child_versions[child_path],
+ self._stat_tracker.GetVersion(posixpath.join(path, child_path)))
+
+ return stat
def GetIdentity(self):
return self._file_system.GetIdentity()
def Update(self, update):
self._updates.append(TestFileSystem(update))
+ for path in _List(update).iterkeys():
+ # Any files (not directories) which changed are now at the version
+ # derived from |_updates|.
+ if not IsDirectory(path):
+ self._stat_tracker.SetVersion(path, len(self._updates))