1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
7 from file_system import FileSystem, FileNotFoundError
8 from future import Gettable, Future
9 from test_file_system import _List, _StatTracker, TestFileSystem
10 from path_util import IsDirectory
13 class MockFileSystem(FileSystem):
14 '''Wraps FileSystems to add a selection of mock behaviour:
15 - asserting how often Stat/Read calls are being made to it.
16 - primitive changes/versioning via applying object "diffs", mapping paths to
17 new content (similar to how TestFileSystem works).
19 def __init__(self, file_system):
20 self._file_system = file_system
21 # Updates are stored as TestFileSystems because it already implements a
22 # bunch of logic to intepret paths into dictionaries.
24 self._stat_tracker = _StatTracker()
26 self._read_resolve_count = 0
30 def Create(file_system, updates):
31 mock_file_system = MockFileSystem(file_system)
32 for update in updates:
33 mock_file_system.Update(update)
34 return mock_file_system
37 # FileSystem implementation.
40 def Read(self, paths):
41 '''Reads |paths| from |_file_system|, then applies the most recent update
42 from |_updates|, if any.
45 future_result = self._file_system.Read(paths)
47 self._read_resolve_count += 1
48 result = future_result.Get()
49 for path in result.iterkeys():
50 update = self._GetMostRecentUpdate(path)
51 if update is not None:
54 return Future(delegate=Gettable(resolve))
57 return self._file_system.Refresh()
59 def _GetMostRecentUpdate(self, path):
60 '''Returns the latest update for the file at |path|, or None if |path|
61 has never been updated.
63 for update in reversed(self._updates):
65 return update.ReadSingle(path).Get()
66 except FileNotFoundError:
73 # This only supports numeric stat values since we need to add to it. In
74 # reality the logic here could just be to randomly mutate the stat values
75 # every time there's an Update but that's less meaningful for testing.
77 return str(int(a) + b)
79 stat = self._file_system.Stat(path)
80 stat.version = stradd(stat.version, self._stat_tracker.GetVersion(path))
81 if stat.child_versions:
82 for child_path, child_version in stat.child_versions.iteritems():
83 stat.child_versions[child_path] = stradd(
84 stat.child_versions[child_path],
85 self._stat_tracker.GetVersion(posixpath.join(path, child_path)))
89 def GetIdentity(self):
90 return self._file_system.GetIdentity()
96 return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
97 self._read_count, self._stat_count, len(self._updates))
103 def GetStatCount(self):
104 return self._stat_count
106 def CheckAndReset(self, stat_count=0, read_count=0, read_resolve_count=0):
107 '''Returns a tuple (success, error). Use in tests like:
108 self.assertTrue(*object_store.CheckAndReset(...))
111 for desc, expected, actual in (
112 ('read_count', read_count, self._read_count),
113 ('read_resolve_count', read_resolve_count, self._read_resolve_count),
114 ('stat_count', stat_count, self._stat_count)):
115 if actual != expected:
116 errors.append('%s: expected %s got %s' % (desc, expected, actual))
118 return (len(errors) == 0, ', '.join(errors))
124 self._read_resolve_count = 0
127 def Update(self, update):
128 self._updates.append(TestFileSystem(update))
129 for path in _List(update).iterkeys():
130 # Any files (not directories) which changed are now at the version
131 # derived from |_updates|.
132 if not IsDirectory(path):
133 self._stat_tracker.SetVersion(path, len(self._updates))