Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / chrome / common / extensions / docs / server2 / mock_file_system.py
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import posixpath
6
7 from file_system import FileSystem, FileNotFoundError
8 from future import Gettable, Future
9 from test_file_system import _List, _StatTracker, TestFileSystem
10 from path_util import IsDirectory
11
12
13 class MockFileSystem(FileSystem):
14   '''Wraps FileSystems to add a selection of mock behaviour:
15   - asserting how often Stat/Read calls are being made to it.
16   - primitive changes/versioning via applying object "diffs", mapping paths to
17     new content (similar to how TestFileSystem works).
18   '''
19   def __init__(self, file_system):
20     self._file_system = file_system
21     # Updates are stored as TestFileSystems because it already implements a
22     # bunch of logic to intepret paths into dictionaries.
23     self._updates = []
24     self._stat_tracker = _StatTracker()
25     self._read_count = 0
26     self._read_resolve_count = 0
27     self._stat_count = 0
28
29   @staticmethod
30   def Create(file_system, updates):
31     mock_file_system = MockFileSystem(file_system)
32     for update in updates:
33       mock_file_system.Update(update)
34     return mock_file_system
35
36   #
37   # FileSystem implementation.
38   #
39
40   def Read(self, paths):
41     '''Reads |paths| from |_file_system|, then applies the most recent update
42     from |_updates|, if any.
43     '''
44     self._read_count += 1
45     future_result = self._file_system.Read(paths)
46     def resolve():
47       self._read_resolve_count += 1
48       result = future_result.Get()
49       for path in result.iterkeys():
50         update = self._GetMostRecentUpdate(path)
51         if update is not None:
52           result[path] = update
53       return result
54     return Future(delegate=Gettable(resolve))
55
56   def Refresh(self):
57     return self._file_system.Refresh()
58
59   def _GetMostRecentUpdate(self, path):
60     '''Returns the latest update for the file at |path|, or None if |path|
61     has never been updated.
62     '''
63     for update in reversed(self._updates):
64       try:
65         return update.ReadSingle(path).Get()
66       except FileNotFoundError:
67         pass
68     return None
69
70   def Stat(self, path):
71     self._stat_count += 1
72
73     # This only supports numeric stat values since we need to add to it.  In
74     # reality the logic here could just be to randomly mutate the stat values
75     # every time there's an Update but that's less meaningful for testing.
76     def stradd(a, b):
77       return str(int(a) + b)
78
79     stat = self._file_system.Stat(path)
80     stat.version = stradd(stat.version, self._stat_tracker.GetVersion(path))
81     if stat.child_versions:
82       for child_path, child_version in stat.child_versions.iteritems():
83         stat.child_versions[child_path] = stradd(
84             stat.child_versions[child_path],
85             self._stat_tracker.GetVersion(posixpath.join(path, child_path)))
86
87     return stat
88
89   def GetIdentity(self):
90     return self._file_system.GetIdentity()
91
92   def __str__(self):
93     return repr(self)
94
95   def __repr__(self):
96     return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
97         self._read_count, self._stat_count, len(self._updates))
98
99   #
100   # Testing methods.
101   #
102
103   def GetStatCount(self):
104     return self._stat_count
105
106   def CheckAndReset(self, stat_count=0, read_count=0, read_resolve_count=0):
107     '''Returns a tuple (success, error). Use in tests like:
108     self.assertTrue(*object_store.CheckAndReset(...))
109     '''
110     errors = []
111     for desc, expected, actual in (
112         ('read_count', read_count, self._read_count),
113         ('read_resolve_count', read_resolve_count, self._read_resolve_count),
114         ('stat_count', stat_count, self._stat_count)):
115       if actual != expected:
116         errors.append('%s: expected %s got %s' % (desc, expected, actual))
117     try:
118       return (len(errors) == 0, ', '.join(errors))
119     finally:
120       self.Reset()
121
122   def Reset(self):
123     self._read_count = 0
124     self._read_resolve_count = 0
125     self._stat_count = 0
126
127   def Update(self, update):
128     self._updates.append(TestFileSystem(update))
129     for path in _List(update).iterkeys():
130       # Any files (not directories) which changed are now at the version
131       # derived from |_updates|.
132       if not IsDirectory(path):
133         self._stat_tracker.SetVersion(path, len(self._updates))