- add sources.
[platform/framework/web/crosswalk.git] / src / chrome / common / extensions / docs / server2 / mock_file_system.py
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 from file_system import FileSystem, FileNotFoundError
6 from future import Gettable, Future
7 from test_file_system import TestFileSystem
8
9 class MockFileSystem(FileSystem):
10   '''Wraps FileSystems to add a selection of mock behaviour:
11
12   - asserting how often Stat/Read calls are being made to it.
13   - primitive changes/versioning via applying object "diffs", mapping paths to
14     new content (similar to how TestFileSystem works).
15   '''
16   def __init__(self, file_system):
17     self._file_system = file_system
18     # Updates are modelled are stored as TestFileSystems because they've
19     # implemented a bunch of logic to interpret paths into dictionaries.
20     self._updates = []
21     self._read_count = 0
22     self._read_resolve_count = 0
23     self._stat_count = 0
24
25   @staticmethod
26   def Create(file_system, updates):
27     mock_file_system = MockFileSystem(file_system)
28     for update in updates:
29       mock_file_system.Update(update)
30     return mock_file_system
31
32   #
33   # FileSystem implementation.
34   #
35
36   def Read(self, paths, binary=False):
37     '''Reads |paths| from |_file_system|, then applies the most recent update
38     from |_updates|, if any.
39     '''
40     self._read_count += 1
41     future_result = self._file_system.Read(paths, binary=binary)
42     def resolve():
43       self._read_resolve_count += 1
44       result = future_result.Get()
45       for path in result.iterkeys():
46         _, update = self._GetMostRecentUpdate(path)
47         if update is not None:
48           result[path] = update
49       return result
50     return Future(delegate=Gettable(resolve))
51
52   def _GetMostRecentUpdate(self, path):
53     for revision, update in reversed(list(enumerate(self._updates))):
54       try:
55         return (revision + 1, update.ReadSingle(path).Get())
56       except FileNotFoundError:
57         pass
58     return (0, None)
59
60   def Stat(self, path):
61     self._stat_count += 1
62     return self._StatImpl(path)
63
64   def _StatImpl(self, path):
65     result = self._file_system.Stat(path)
66     result.version = self._UpdateStat(result.version, path)
67     child_versions = result.child_versions
68     if child_versions is not None:
69       for child_path in child_versions.iterkeys():
70         child_versions[child_path] = self._UpdateStat(
71             child_versions[child_path],
72             '%s%s' % (path, child_path))
73     return result
74
75   def _UpdateStat(self, version, path):
76     if not path.endswith('/'):
77       return str(int(version) + self._GetMostRecentUpdate(path)[0])
78     # Bleh, it's a directory, need to recursively search all the children.
79     child_paths = self._file_system.ReadSingle(path).Get()
80     if not child_paths:
81       return version
82     return str(max([int(version)] +
83                    [int(self._StatImpl('%s%s' % (path, child_path)).version)
84                     for child_path in child_paths]))
85
86   def GetIdentity(self):
87     return self._file_system.GetIdentity()
88
89   def __str__(self):
90     return repr(self)
91
92   def __repr__(self):
93     return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
94         self._read_count, self._stat_count, len(self._updates))
95
96   #
97   # Testing methods.
98   #
99
100   def GetStatCount(self):
101     return self._stat_count
102
103   def CheckAndReset(self, stat_count=0, read_count=0, read_resolve_count=0):
104     '''Returns a tuple (success, error). Use in tests like:
105     self.assertTrue(*object_store.CheckAndReset(...))
106     '''
107     errors = []
108     for desc, expected, actual in (
109         ('read_count', read_count, self._read_count),
110         ('read_resolve_count', read_resolve_count, self._read_resolve_count),
111         ('stat_count', stat_count, self._stat_count)):
112       if actual != expected:
113         errors.append('%s: expected %s got %s' % (desc, expected, actual))
114     try:
115       return (len(errors) == 0, ', '.join(errors))
116     finally:
117       self.Reset()
118
119   def Reset(self):
120     self._read_count = 0
121     self._read_resolve_count = 0
122     self._stat_count = 0
123
124   def Update(self, update):
125     self._updates.append(TestFileSystem(update))