1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 from file_system import FileSystem, FileNotFoundError
6 from future import Gettable, Future
7 from test_file_system import TestFileSystem
9 class MockFileSystem(FileSystem):
10 '''Wraps FileSystems to add a selection of mock behaviour:
12 - asserting how often Stat/Read calls are being made to it.
13 - primitive changes/versioning via applying object "diffs", mapping paths to
14 new content (similar to how TestFileSystem works).
16 def __init__(self, file_system):
17 self._file_system = file_system
18 # Updates are modelled are stored as TestFileSystems because they've
19 # implemented a bunch of logic to interpret paths into dictionaries.
22 self._read_resolve_count = 0
26 def Create(file_system, updates):
27 mock_file_system = MockFileSystem(file_system)
28 for update in updates:
29 mock_file_system.Update(update)
30 return mock_file_system
33 # FileSystem implementation.
36 def Read(self, paths, binary=False):
37 '''Reads |paths| from |_file_system|, then applies the most recent update
38 from |_updates|, if any.
41 future_result = self._file_system.Read(paths, binary=binary)
43 self._read_resolve_count += 1
44 result = future_result.Get()
45 for path in result.iterkeys():
46 _, update = self._GetMostRecentUpdate(path)
47 if update is not None:
50 return Future(delegate=Gettable(resolve))
52 def _GetMostRecentUpdate(self, path):
53 for revision, update in reversed(list(enumerate(self._updates))):
55 return (revision + 1, update.ReadSingle(path).Get())
56 except FileNotFoundError:
62 return self._StatImpl(path)
64 def _StatImpl(self, path):
65 result = self._file_system.Stat(path)
66 result.version = self._UpdateStat(result.version, path)
67 child_versions = result.child_versions
68 if child_versions is not None:
69 for child_path in child_versions.iterkeys():
70 child_versions[child_path] = self._UpdateStat(
71 child_versions[child_path],
72 '%s%s' % (path, child_path))
75 def _UpdateStat(self, version, path):
76 if not path.endswith('/'):
77 return str(int(version) + self._GetMostRecentUpdate(path)[0])
78 # Bleh, it's a directory, need to recursively search all the children.
79 child_paths = self._file_system.ReadSingle(path).Get()
82 return str(max([int(version)] +
83 [int(self._StatImpl('%s%s' % (path, child_path)).version)
84 for child_path in child_paths]))
86 def GetIdentity(self):
87 return self._file_system.GetIdentity()
93 return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
94 self._read_count, self._stat_count, len(self._updates))
100 def GetStatCount(self):
101 return self._stat_count
103 def CheckAndReset(self, stat_count=0, read_count=0, read_resolve_count=0):
104 '''Returns a tuple (success, error). Use in tests like:
105 self.assertTrue(*object_store.CheckAndReset(...))
108 for desc, expected, actual in (
109 ('read_count', read_count, self._read_count),
110 ('read_resolve_count', read_resolve_count, self._read_resolve_count),
111 ('stat_count', stat_count, self._stat_count)):
112 if actual != expected:
113 errors.append('%s: expected %s got %s' % (desc, expected, actual))
115 return (len(errors) == 0, ', '.join(errors))
121 self._read_resolve_count = 0
124 def Update(self, update):
125 self._updates.append(TestFileSystem(update))