1 # Copyright 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Provides stubs for os, sys and subprocess for testing
7 This test allows one to test code that itself uses os, sys, and subprocess.
16 class Override(object):
17 def __init__(self, base_module, module_list):
18 stubs = {'adb_commands': AdbCommandsModuleStub,
19 'cloud_storage': CloudStorageModuleStub,
20 'open': OpenFunctionStub,
22 'perf_control': PerfControlModuleStub,
23 'raw_input': RawInputFunctionStub,
24 'subprocess': SubprocessModuleStub,
26 'thermal_throttle': ThermalThrottleModuleStub,
27 'logging': LoggingStub,
28 'certutils': CertUtilsStub
30 self.adb_commands = None
32 self.subprocess = None
35 self._base_module = base_module
38 for module_name in module_list:
39 self._overrides[module_name] = getattr(base_module, module_name, None)
40 setattr(self, module_name, stubs[module_name]())
41 setattr(base_module, module_name, getattr(self, module_name))
43 if self.os and self.sys:
44 self.os.path.sys = self.sys
47 assert not len(self._overrides)
50 for module_name, original_module in self._overrides.iteritems():
51 setattr(self._base_module, module_name, original_module)
55 class AndroidCommands(object):
58 self.can_access_protected_file_contents = False
60 def CanAccessProtectedFileContents(self):
61 return self.can_access_protected_file_contents
64 class AdbDevice(object):
67 self.shell_command_handlers = {}
68 self.mock_content = []
69 self.system_properties = {}
70 if self.system_properties.get('ro.product.cpu.abi') == None:
71 self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
72 self.old_interface = AndroidCommands()
74 def RunShellCommand(self, args):
75 if isinstance(args, basestring):
76 args = shlex.split(args)
77 handler = self.shell_command_handlers[args[0]]
80 def FileExists(self, _):
83 def ReadFile(self, device_path, as_root=False): # pylint: disable=W0613
84 return self.mock_content
86 def GetProp(self, property_name):
87 return self.system_properties[property_name]
89 def SetProp(self, property_name, property_value):
90 self.system_properties[property_name] = property_value
93 class AdbCommandsModuleStub(object):
95 class AdbCommandsStub(object):
97 def __init__(self, module, device):
100 self.is_root_enabled = True
101 self._adb_device = module.adb_device
103 def IsRootEnabled(self):
104 return self.is_root_enabled
106 def RestartAdbdOnDevice(self):
109 def IsUserBuild(self):
112 def WaitForDevicePm(self):
116 return self._adb_device
118 def device_serial(self):
122 self.attached_devices = []
123 self.adb_device = AdbDevice()
125 def AdbCommandsStubConstructor(device=None):
126 return AdbCommandsModuleStub.AdbCommandsStub(self, device)
127 self.AdbCommands = AdbCommandsStubConstructor
130 def IsAndroidSupported():
133 def GetAttachedDevices(self):
134 return self.attached_devices
136 def SetupPrebuiltTools(self, _):
139 def CleanupLeftoverProcesses(self):
143 class CloudStorageModuleStub(object):
144 PUBLIC_BUCKET = 'chromium-telemetry'
145 PARTNER_BUCKET = 'chrome-partner-telemetry'
146 INTERNAL_BUCKET = 'chrome-telemetry'
148 'public': PUBLIC_BUCKET,
149 'partner': PARTNER_BUCKET,
150 'internal': INTERNAL_BUCKET,
153 # These are used to test for CloudStorage errors.
154 INTERNAL_PERMISSION = 2
155 PARTNER_PERMISSION = 1
156 PUBLIC_PERMISSION = 0
158 CREDENTIALS_ERROR_PERMISSION = -1
160 class NotFoundError(Exception):
163 class CloudStorageError(Exception):
166 class PermissionError(CloudStorageError):
169 class CredentialsError(CloudStorageError):
173 self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
174 CloudStorageModuleStub.PARTNER_BUCKET:{},
175 CloudStorageModuleStub.PUBLIC_BUCKET:{}}
176 self.remote_paths = self.default_remote_paths
177 self.local_file_hashes = {}
178 self.local_hash_files = {}
179 self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
181 def SetPermissionLevelForTesting(self, permission_level):
182 self.permission_level = permission_level
184 def CheckPermissionLevelForBucket(self, bucket):
185 if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
187 elif (self.permission_level ==
188 CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
189 raise CloudStorageModuleStub.CredentialsError()
190 elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
191 if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
192 raise CloudStorageModuleStub.PermissionError()
193 elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
194 if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
195 raise CloudStorageModuleStub.PermissionError()
197 raise CloudStorageModuleStub.NotFoundError()
199 def SetRemotePathsForTesting(self, remote_path_dict=None):
200 if not remote_path_dict:
201 self.remote_paths = self.default_remote_paths
203 self.remote_paths = remote_path_dict
205 def GetRemotePathsForTesting(self):
206 if not self.remote_paths:
207 self.remote_paths = self.default_remote_paths
208 return self.remote_paths
210 # Set a dictionary of data files and their "calculated" hashes.
211 def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
212 self.local_file_hashes = calculated_hash_dictionary
214 def GetLocalDataFiles(self):
215 return self.local_file_hashes.keys()
217 # Set a dictionary of hash files and the hashes they should contain.
218 def SetHashFileContentsForTesting(self, hash_file_dictionary):
219 self.local_hash_files = hash_file_dictionary
221 def GetLocalHashFiles(self):
222 return self.local_hash_files.keys()
224 def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
225 self.remote_paths[bucket][remote_path] = new_hash
227 def List(self, bucket):
228 if not bucket or not bucket in self.remote_paths:
229 bucket_error = ('Incorrect bucket specified, correct buckets:' +
230 str(self.remote_paths))
231 raise CloudStorageModuleStub.CloudStorageError(bucket_error)
232 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
233 return list(self.remote_paths[bucket].keys())
235 def Exists(self, bucket, remote_path):
236 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
237 return remote_path in self.remote_paths[bucket]
239 def Insert(self, bucket, remote_path, local_path):
240 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
241 if not local_path in self.GetLocalDataFiles():
242 file_path_error = 'Local file path does not exist'
243 raise CloudStorageModuleStub.CloudStorageError(file_path_error)
244 self.remote_paths[bucket][remote_path] = (
245 CloudStorageModuleStub.CalculateHash(self, local_path))
247 def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
248 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
249 if not remote_path in self.remote_paths[bucket]:
252 raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
253 remote_hash = self.remote_paths[bucket][remote_path]
254 local_hash = self.local_file_hashes[local_path]
255 if only_if_changed and remote_hash == local_hash:
257 self.local_file_hashes[local_path] = remote_hash
258 self.local_hash_files[local_path + '.sha1'] = remote_hash
261 def Get(self, bucket, remote_path, local_path):
262 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
265 def GetIfChanged(self, bucket, local_path):
266 remote_path = os.path.basename(local_path)
267 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
270 def CalculateHash(self, file_path):
271 return self.local_file_hashes[file_path]
273 def ReadHash(self, hash_path):
274 return self.local_hash_files[hash_path]
277 class LoggingStub(object):
282 def info(self, msg, *args):
285 def error(self, msg, *args):
286 self.errors.append(msg % args)
288 def warning(self, msg, *args):
289 self.warnings.append(msg % args)
291 def warn(self, msg, *args):
292 self.warning(msg, *args)
295 class OpenFunctionStub(object):
296 class FileStub(object):
297 def __init__(self, data):
303 def __exit__(self, *args):
306 def read(self, size=None):
308 return self._data[:size]
312 def write(self, data):
313 self._data.write(data)
321 def __call__(self, name, *args, **kwargs):
322 return OpenFunctionStub.FileStub(self.files[name])
325 class OsModuleStub(object):
326 class OsEnvironModuleStub(object):
330 class OsPathModuleStub(object):
331 def __init__(self, sys_module):
332 self.sys = sys_module
335 def exists(self, path):
336 return path in self.files
338 def isfile(self, path):
339 return path in self.files
341 def join(self, *paths):
342 def IsAbsolutePath(path):
343 if self.sys.platform.startswith('win'):
344 return re.match('[a-zA-Z]:\\\\', path)
346 return path.startswith('/')
348 # Per Python specification, if any component is an absolute path,
349 # discard previous components.
350 for index, path in reversed(list(enumerate(paths))):
351 if IsAbsolutePath(path):
352 paths = paths[index:]
355 if self.sys.platform.startswith('win'):
356 tmp = os.path.join(*paths)
357 return tmp.replace('/', '\\')
359 tmp = os.path.join(*paths)
360 return tmp.replace('\\', '/')
363 def expanduser(path):
364 return os.path.expanduser(path)
368 return os.path.dirname(path)
372 return os.path.splitext(path)
378 def __init__(self, sys_module=sys):
379 self.path = OsModuleStub.OsPathModuleStub(sys_module)
380 self.environ = OsModuleStub.OsEnvironModuleStub()
382 self.local_app_data = None
384 self.program_files = None
385 self.program_files_x86 = None
386 self.devnull = os.devnull
388 def access(self, path, _):
389 return path in self.path.files
391 def getenv(self, name, value=None):
392 if name == 'DISPLAY':
394 elif name == 'LOCALAPPDATA':
395 env = self.local_app_data
398 elif name == 'PROGRAMFILES':
399 env = self.program_files
400 elif name == 'PROGRAMFILES(X86)':
401 env = self.program_files_x86
403 raise NotImplementedError('Unsupported getenv')
404 return env if env else value
406 def chdir(self, path):
410 class PerfControlModuleStub(object):
411 class PerfControlStub(object):
412 def __init__(self, adb):
416 self.PerfControl = PerfControlModuleStub.PerfControlStub
419 class RawInputFunctionStub(object):
423 def __call__(self, name, *args, **kwargs):
427 class SubprocessModuleStub(object):
428 class PopenStub(object):
430 self.communicate_result = ('', '')
432 def __call__(self, args, **kwargs):
435 def communicate(self):
436 return self.communicate_result
439 self.Popen = SubprocessModuleStub.PopenStub()
442 def call(self, *args, **kwargs):
446 class SysModuleStub(object):
451 class ThermalThrottleModuleStub(object):
452 class ThermalThrottleStub(object):
453 def __init__(self, adb):
457 self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
459 class CertUtilsStub(object):
460 openssl_import_error = None
463 def write_dummy_ca_cert(_ca_cert_str, _key_str, _cert_path):
464 raise Exception("write_dummy_ca_cert exception")
467 def generate_dummy_ca_cert():