'subprocess': SubprocessModuleStub,
'sys': SysModuleStub,
'thermal_throttle': ThermalThrottleModuleStub,
- 'logging': LoggingStub
+ 'logging': LoggingStub,
+ 'certutils': CertUtilsStub
}
self.adb_commands = None
self.os = None
def device(self):
return self._adb_device
+ def device_serial(self):
+ return self._device
+
def __init__(self):
self.attached_devices = []
self.adb_device = AdbDevice()
'internal': INTERNAL_BUCKET,
}
+ # These are used to test for CloudStorage errors.
+ INTERNAL_PERMISSION = 2
+ PARTNER_PERMISSION = 1
+ PUBLIC_PERMISSION = 0
+ # Not logged in.
+ CREDENTIALS_ERROR_PERMISSION = -1
+
+ class NotFoundError(Exception):
+ pass
+
class CloudStorageError(Exception):
pass
+ class PermissionError(CloudStorageError):
+ pass
+
+ class CredentialsError(CloudStorageError):
+ pass
+
def __init__(self):
- self.remote_paths = []
+ self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
+ CloudStorageModuleStub.PARTNER_BUCKET:{},
+ CloudStorageModuleStub.PUBLIC_BUCKET:{}}
+ self.remote_paths = self.default_remote_paths
self.local_file_hashes = {}
self.local_hash_files = {}
+ self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
+
+ def SetPermissionLevelForTesting(self, permission_level):
+ self.permission_level = permission_level
+
+ def CheckPermissionLevelForBucket(self, bucket):
+ if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
+ return
+ elif (self.permission_level ==
+ CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
+ raise CloudStorageModuleStub.CredentialsError()
+ elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
+ if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
+ raise CloudStorageModuleStub.PermissionError()
+ elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
+ if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
+ raise CloudStorageModuleStub.PermissionError()
+ else:
+ raise CloudStorageModuleStub.NotFoundError()
+
+ def SetRemotePathsForTesting(self, remote_path_dict=None):
+ if not remote_path_dict:
+ self.remote_paths = self.default_remote_paths
+ return
+ self.remote_paths = remote_path_dict
- def List(self, _):
+ def GetRemotePathsForTesting(self):
+ if not self.remote_paths:
+ self.remote_paths = self.default_remote_paths
return self.remote_paths
+ # Set a dictionary of data files and their "calculated" hashes.
+ def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
+ self.local_file_hashes = calculated_hash_dictionary
+
+ def GetLocalDataFiles(self):
+ return self.local_file_hashes.keys()
+
+ # Set a dictionary of hash files and the hashes they should contain.
+ def SetHashFileContentsForTesting(self, hash_file_dictionary):
+ self.local_hash_files = hash_file_dictionary
+
+ def GetLocalHashFiles(self):
+ return self.local_hash_files.keys()
+
+ def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
+ self.remote_paths[bucket][remote_path] = new_hash
+
+ def List(self, bucket):
+ if not bucket or not bucket in self.remote_paths:
+ bucket_error = ('Incorrect bucket specified, correct buckets:' +
+ str(self.remote_paths))
+ raise CloudStorageModuleStub.CloudStorageError(bucket_error)
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ return list(self.remote_paths[bucket].keys())
+
+ def Exists(self, bucket, remote_path):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ return remote_path in self.remote_paths[bucket]
+
def Insert(self, bucket, remote_path, local_path):
- pass
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ if not local_path in self.GetLocalDataFiles():
+ file_path_error = 'Local file path does not exist'
+ raise CloudStorageModuleStub.CloudStorageError(file_path_error)
+ self.remote_paths[bucket][remote_path] = (
+ CloudStorageModuleStub.CalculateHash(self, local_path))
+
+ def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
+ CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+ if not remote_path in self.remote_paths[bucket]:
+ if only_if_changed:
+ return False
+ raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
+ remote_hash = self.remote_paths[bucket][remote_path]
+ local_hash = self.local_file_hashes[local_path]
+ if only_if_changed and remote_hash == local_hash:
+ return False
+ self.local_file_hashes[local_path] = remote_hash
+ self.local_hash_files[local_path + '.sha1'] = remote_hash
+ return remote_hash
+
+ def Get(self, bucket, remote_path, local_path):
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+ local_path, False)
+
+ def GetIfChanged(self, bucket, local_path):
+ remote_path = os.path.basename(local_path)
+ return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+ local_path, True)
def CalculateHash(self, file_path):
return self.local_file_hashes[file_path]
class LoggingStub(object):
def __init__(self):
self.warnings = []
+ self.errors = []
def info(self, msg, *args):
pass
- def warn(self, msg, *args):
+ def error(self, msg, *args):
+ self.errors.append(msg % args)
+
+ def warning(self, msg, *args):
self.warnings.append(msg % args)
+ def warn(self, msg, *args):
+ self.warning(msg, *args)
+
class OpenFunctionStub(object):
class FileStub(object):
def __init__(self):
self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
+
+class CertUtilsStub(object):
+ openssl_import_error = None
+
+ @staticmethod
+ def write_dummy_ca_cert(_ca_cert_str, _key_str, _cert_path):
+ raise Exception("write_dummy_ca_cert exception")
+
+ @staticmethod
+ def generate_dummy_ca_cert():
+ return '-', '-'