1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 from copy import deepcopy
7 from file_system import FileSystem, StatInfo, FileNotFoundError
8 from future import Future
11 def _GetAsyncFetchCallback(unpatched_files_future,
15 def patch_directory_listing(path, original_listing):
16 added, deleted, modified = (
17 patched_file_system._GetDirectoryListingFromPatch(path))
18 if original_listing is None:
20 raise FileNotFoundError('Directory %s not found in the patch.' % path)
22 return list((set(original_listing) | set(added)) - set(deleted))
25 files = unpatched_files_future.Get()
26 files.update(patched_files_future.Get())
28 dict((path, patch_directory_listing(path, dirs_value[path]))
29 for path in dirs_value))
35 class PatchedFileSystem(FileSystem):
36 ''' Class to fetch resources with a patch applied.
38 def __init__(self, base_file_system, patcher):
39 self._base_file_system = base_file_system
40 self._patcher = patcher
42 def Read(self, paths, skip_not_found=False):
44 added, deleted, modified = self._patcher.GetPatchedFiles()
45 if set(paths) & set(deleted):
46 def raise_file_not_found():
47 raise FileNotFoundError('Files are removed from the patch.')
48 return Future(callback=raise_file_not_found)
49 patched_files |= (set(added) | set(modified))
50 dir_paths = set(path for path in paths if path.endswith('/'))
51 file_paths = set(paths) - dir_paths
52 patched_paths = file_paths & patched_files
53 unpatched_paths = file_paths - patched_files
54 return Future(callback=_GetAsyncFetchCallback(
55 self._base_file_system.Read(unpatched_paths,
56 skip_not_found=skip_not_found),
57 self._patcher.Apply(patched_paths, self._base_file_system),
58 self._TryReadDirectory(dir_paths),
62 return self._base_file_system.Refresh()
64 ''' Given the list of patched files, it's not possible to determine whether
65 a directory to read exists in self._base_file_system. So try reading each one
66 and handle FileNotFoundError.
68 def _TryReadDirectory(self, paths):
71 assert path.endswith('/')
73 value[path] = self._base_file_system.ReadSingle(path).Get()
74 except FileNotFoundError:
78 def _GetDirectoryListingFromPatch(self, path):
79 assert path.endswith('/')
80 def _FindChildrenInPath(files, path):
83 if f.startswith(path):
84 child_path = f[len(path):]
86 child_name = child_path[0:child_path.find('/') + 1]
88 child_name = child_path
89 result.append(child_name)
92 added, deleted, modified = (tuple(
93 _FindChildrenInPath(files, path)
94 for files in self._patcher.GetPatchedFiles()))
96 # A patch applies to files only. It cannot delete directories.
97 deleted_files = [child for child in deleted if not child.endswith('/')]
98 # However, these directories are actually modified because their children
100 modified += [child for child in deleted if child.endswith('/')]
102 return (added, deleted_files, modified)
104 def _PatchStat(self, stat_info, version, added, deleted, modified):
105 assert len(added) + len(deleted) + len(modified) > 0
106 assert stat_info.child_versions is not None
108 # Deep copy before patching to make sure it doesn't interfere with values
110 stat_info = deepcopy(stat_info)
112 stat_info.version = version
113 for child in added + modified:
114 stat_info.child_versions[child] = version
115 for child in deleted:
116 if stat_info.child_versions.get(child):
117 del stat_info.child_versions[child]
121 def Stat(self, path):
122 version = self._patcher.GetVersion()
123 assert version is not None
124 version = 'patched_%s' % version
126 directory, filename = path.rsplit('/', 1)
127 added, deleted, modified = self._GetDirectoryListingFromPatch(
131 # There are new files added. It's possible (if |directory| is new) that
132 # self._base_file_system.Stat will throw an exception.
134 stat_info = self._PatchStat(
135 self._base_file_system.Stat(directory + '/'),
140 except FileNotFoundError:
141 stat_info = StatInfo(
143 dict((child, version) for child in added + modified))
144 elif len(deleted) + len(modified) > 0:
145 # No files were added.
146 stat_info = self._PatchStat(self._base_file_system.Stat(directory + '/'),
152 # No changes are made in this directory.
153 return self._base_file_system.Stat(path)
155 if stat_info.child_versions is not None:
157 if filename in stat_info.child_versions:
158 stat_info = StatInfo(stat_info.child_versions[filename])
160 raise FileNotFoundError('%s was not in child versions' % filename)
163 def GetIdentity(self):
164 return '%s(%s,%s)' % (self.__class__.__name__,
165 self._base_file_system.GetIdentity(),
166 self._patcher.GetIdentity())