Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / tools / telemetry / telemetry / unittest / system_stub.py
1 # Copyright 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Provides stubs for os, sys and subprocess for testing
6
7 This test allows one to test code that itself uses os, sys, and subprocess.
8 """
9
10 import os
11 import re
12 import shlex
13 import sys
14
15
16 class Override(object):
17   def __init__(self, base_module, module_list):
18     stubs = {'adb_commands': AdbCommandsModuleStub,
19              'cloud_storage': CloudStorageModuleStub,
20              'open': OpenFunctionStub,
21              'os': OsModuleStub,
22              'perf_control': PerfControlModuleStub,
23              'raw_input': RawInputFunctionStub,
24              'subprocess': SubprocessModuleStub,
25              'sys': SysModuleStub,
26              'thermal_throttle': ThermalThrottleModuleStub,
27              'logging': LoggingStub,
28              'certutils': CertUtilsStub
29     }
30     self.adb_commands = None
31     self.os = None
32     self.subprocess = None
33     self.sys = None
34
35     self._base_module = base_module
36     self._overrides = {}
37
38     for module_name in module_list:
39       self._overrides[module_name] = getattr(base_module, module_name, None)
40       setattr(self, module_name, stubs[module_name]())
41       setattr(base_module, module_name, getattr(self, module_name))
42
43     if self.os and self.sys:
44       self.os.path.sys = self.sys
45
46   def __del__(self):
47     assert not len(self._overrides)
48
49   def Restore(self):
50     for module_name, original_module in self._overrides.iteritems():
51       setattr(self._base_module, module_name, original_module)
52     self._overrides = {}
53
54
55 class AndroidCommands(object):
56
57   def __init__(self):
58     self.can_access_protected_file_contents = False
59
60   def CanAccessProtectedFileContents(self):
61     return self.can_access_protected_file_contents
62
63
64 class AdbDevice(object):
65
66   def __init__(self):
67     self.shell_command_handlers = {}
68     self.mock_content = []
69     self.system_properties = {}
70     if self.system_properties.get('ro.product.cpu.abi') == None:
71       self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
72     self.old_interface = AndroidCommands()
73
74   def RunShellCommand(self, args):
75     if isinstance(args, basestring):
76       args = shlex.split(args)
77     handler = self.shell_command_handlers[args[0]]
78     return handler(args)
79
80   def FileExists(self, _):
81     return False
82
83   def ReadFile(self, device_path, as_root=False):  # pylint: disable=W0613
84     return self.mock_content
85
86   def GetProp(self, property_name):
87     return self.system_properties[property_name]
88
89   def SetProp(self, property_name, property_value):
90     self.system_properties[property_name] = property_value
91
92
93 class AdbCommandsModuleStub(object):
94
95   class AdbCommandsStub(object):
96
97     def __init__(self, module, device):
98       self._module = module
99       self._device = device
100       self.is_root_enabled = True
101       self._adb_device = module.adb_device
102
103     def IsRootEnabled(self):
104       return self.is_root_enabled
105
106     def RestartAdbdOnDevice(self):
107       pass
108
109     def IsUserBuild(self):
110       return False
111
112     def WaitForDevicePm(self):
113       pass
114
115     def device(self):
116       return self._adb_device
117
118     def device_serial(self):
119       return self._device
120
121   def __init__(self):
122     self.attached_devices = []
123     self.adb_device = AdbDevice()
124
125     def AdbCommandsStubConstructor(device=None):
126       return AdbCommandsModuleStub.AdbCommandsStub(self, device)
127     self.AdbCommands = AdbCommandsStubConstructor
128
129   @staticmethod
130   def IsAndroidSupported():
131     return True
132
133   def GetAttachedDevices(self):
134     return self.attached_devices
135
136   def SetupPrebuiltTools(self, _):
137     return True
138
139   def CleanupLeftoverProcesses(self):
140     pass
141
142
143 class CloudStorageModuleStub(object):
144   PUBLIC_BUCKET = 'chromium-telemetry'
145   PARTNER_BUCKET = 'chrome-partner-telemetry'
146   INTERNAL_BUCKET = 'chrome-telemetry'
147   BUCKET_ALIASES = {
148     'public': PUBLIC_BUCKET,
149     'partner': PARTNER_BUCKET,
150     'internal': INTERNAL_BUCKET,
151   }
152
153   # These are used to test for CloudStorage errors.
154   INTERNAL_PERMISSION = 2
155   PARTNER_PERMISSION = 1
156   PUBLIC_PERMISSION = 0
157   # Not logged in.
158   CREDENTIALS_ERROR_PERMISSION = -1
159
160   class NotFoundError(Exception):
161     pass
162
163   class CloudStorageError(Exception):
164     pass
165
166   class PermissionError(CloudStorageError):
167     pass
168
169   class CredentialsError(CloudStorageError):
170     pass
171
172   def __init__(self):
173     self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
174                                  CloudStorageModuleStub.PARTNER_BUCKET:{},
175                                  CloudStorageModuleStub.PUBLIC_BUCKET:{}}
176     self.remote_paths = self.default_remote_paths
177     self.local_file_hashes = {}
178     self.local_hash_files = {}
179     self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
180
181   def SetPermissionLevelForTesting(self, permission_level):
182     self.permission_level = permission_level
183
184   def CheckPermissionLevelForBucket(self, bucket):
185     if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
186       return
187     elif (self.permission_level ==
188           CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
189       raise CloudStorageModuleStub.CredentialsError()
190     elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
191       if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
192         raise CloudStorageModuleStub.PermissionError()
193     elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
194       if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
195         raise CloudStorageModuleStub.PermissionError()
196     else:
197       raise CloudStorageModuleStub.NotFoundError()
198
199   def SetRemotePathsForTesting(self, remote_path_dict=None):
200     if not remote_path_dict:
201       self.remote_paths = self.default_remote_paths
202       return
203     self.remote_paths = remote_path_dict
204
205   def GetRemotePathsForTesting(self):
206     if not self.remote_paths:
207       self.remote_paths = self.default_remote_paths
208     return self.remote_paths
209
210   # Set a dictionary of data files and their "calculated" hashes.
211   def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
212     self.local_file_hashes = calculated_hash_dictionary
213
214   def GetLocalDataFiles(self):
215     return self.local_file_hashes.keys()
216
217   # Set a dictionary of hash files and the hashes they should contain.
218   def SetHashFileContentsForTesting(self, hash_file_dictionary):
219     self.local_hash_files = hash_file_dictionary
220
221   def GetLocalHashFiles(self):
222     return self.local_hash_files.keys()
223
224   def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
225     self.remote_paths[bucket][remote_path] = new_hash
226
227   def List(self, bucket):
228     if not bucket or not bucket in self.remote_paths:
229       bucket_error = ('Incorrect bucket specified, correct buckets:' +
230                       str(self.remote_paths))
231       raise CloudStorageModuleStub.CloudStorageError(bucket_error)
232     CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
233     return list(self.remote_paths[bucket].keys())
234
235   def Exists(self, bucket, remote_path):
236     CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
237     return remote_path in self.remote_paths[bucket]
238
239   def Insert(self, bucket, remote_path, local_path):
240     CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
241     if not local_path in self.GetLocalDataFiles():
242       file_path_error = 'Local file path does not exist'
243       raise CloudStorageModuleStub.CloudStorageError(file_path_error)
244     self.remote_paths[bucket][remote_path] = (
245       CloudStorageModuleStub.CalculateHash(self, local_path))
246
247   def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
248     CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
249     if not remote_path in self.remote_paths[bucket]:
250       if only_if_changed:
251         return False
252       raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
253     remote_hash = self.remote_paths[bucket][remote_path]
254     local_hash = self.local_file_hashes[local_path]
255     if only_if_changed and remote_hash == local_hash:
256       return False
257     self.local_file_hashes[local_path] = remote_hash
258     self.local_hash_files[local_path + '.sha1'] = remote_hash
259     return remote_hash
260
261   def Get(self, bucket, remote_path, local_path):
262     return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
263                                             local_path, False)
264
265   def GetIfChanged(self, bucket, local_path):
266     remote_path = os.path.basename(local_path)
267     return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
268                                             local_path, True)
269
270   def CalculateHash(self, file_path):
271     return self.local_file_hashes[file_path]
272
273   def ReadHash(self, hash_path):
274     return self.local_hash_files[hash_path]
275
276
277 class LoggingStub(object):
278   def __init__(self):
279     self.warnings = []
280     self.errors = []
281
282   def info(self, msg, *args):
283     pass
284
285   def error(self, msg, *args):
286     self.errors.append(msg % args)
287
288   def warning(self, msg, *args):
289     self.warnings.append(msg % args)
290
291   def warn(self, msg, *args):
292     self.warning(msg, *args)
293
294
295 class OpenFunctionStub(object):
296   class FileStub(object):
297     def __init__(self, data):
298       self._data = data
299
300     def __enter__(self):
301       return self
302
303     def __exit__(self, *args):
304       pass
305
306     def read(self, size=None):
307       if size:
308         return self._data[:size]
309       else:
310         return self._data
311
312     def write(self, data):
313       self._data.write(data)
314
315     def close(self):
316       pass
317
318   def __init__(self):
319     self.files = {}
320
321   def __call__(self, name, *args, **kwargs):
322     return OpenFunctionStub.FileStub(self.files[name])
323
324
325 class OsModuleStub(object):
326   class OsEnvironModuleStub(object):
327     def get(self, _):
328       return None
329
330   class OsPathModuleStub(object):
331     def __init__(self, sys_module):
332       self.sys = sys_module
333       self.files = []
334
335     def exists(self, path):
336       return path in self.files
337
338     def isfile(self, path):
339       return path in self.files
340
341     def join(self, *paths):
342       def IsAbsolutePath(path):
343         if self.sys.platform.startswith('win'):
344           return re.match('[a-zA-Z]:\\\\', path)
345         else:
346           return path.startswith('/')
347
348       # Per Python specification, if any component is an absolute path,
349       # discard previous components.
350       for index, path in reversed(list(enumerate(paths))):
351         if IsAbsolutePath(path):
352           paths = paths[index:]
353           break
354
355       if self.sys.platform.startswith('win'):
356         tmp = os.path.join(*paths)
357         return tmp.replace('/', '\\')
358       else:
359         tmp = os.path.join(*paths)
360         return tmp.replace('\\', '/')
361
362     @staticmethod
363     def expanduser(path):
364       return os.path.expanduser(path)
365
366     @staticmethod
367     def dirname(path):
368       return os.path.dirname(path)
369
370     @staticmethod
371     def splitext(path):
372       return os.path.splitext(path)
373
374   X_OK = os.X_OK
375
376   pathsep = os.pathsep
377
378   def __init__(self, sys_module=sys):
379     self.path = OsModuleStub.OsPathModuleStub(sys_module)
380     self.environ = OsModuleStub.OsEnvironModuleStub()
381     self.display = ':0'
382     self.local_app_data = None
383     self.sys_path = None
384     self.program_files = None
385     self.program_files_x86 = None
386     self.devnull = os.devnull
387
388   def access(self, path, _):
389     return path in self.path.files
390
391   def getenv(self, name, value=None):
392     if name == 'DISPLAY':
393       env = self.display
394     elif name == 'LOCALAPPDATA':
395       env = self.local_app_data
396     elif name == 'PATH':
397       env = self.sys_path
398     elif name == 'PROGRAMFILES':
399       env = self.program_files
400     elif name == 'PROGRAMFILES(X86)':
401       env = self.program_files_x86
402     else:
403       raise NotImplementedError('Unsupported getenv')
404     return env if env else value
405
406   def chdir(self, path):
407     pass
408
409
410 class PerfControlModuleStub(object):
411   class PerfControlStub(object):
412     def __init__(self, adb):
413       pass
414
415   def __init__(self):
416     self.PerfControl = PerfControlModuleStub.PerfControlStub
417
418
419 class RawInputFunctionStub(object):
420   def __init__(self):
421     self.input = ''
422
423   def __call__(self, name, *args, **kwargs):
424     return self.input
425
426
427 class SubprocessModuleStub(object):
428   class PopenStub(object):
429     def __init__(self):
430       self.communicate_result = ('', '')
431
432     def __call__(self, args, **kwargs):
433       return self
434
435     def communicate(self):
436       return self.communicate_result
437
438   def __init__(self):
439     self.Popen = SubprocessModuleStub.PopenStub()
440     self.PIPE = None
441
442   def call(self, *args, **kwargs):
443     pass
444
445
446 class SysModuleStub(object):
447   def __init__(self):
448     self.platform = ''
449
450
451 class ThermalThrottleModuleStub(object):
452   class ThermalThrottleStub(object):
453     def __init__(self, adb):
454       pass
455
456   def __init__(self):
457     self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
458
459 class CertUtilsStub(object):
460   openssl_import_error = None
461
462   @staticmethod
463   def write_dummy_ca_cert(_ca_cert_str, _key_str, _cert_path):
464     raise Exception("write_dummy_ca_cert exception")
465
466   @staticmethod
467   def generate_dummy_ca_cert():
468     return '-', '-'