Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / tools / telemetry / telemetry / unittest / system_stub.py
index ec61eb9..89e7a91 100644 (file)
@@ -24,7 +24,8 @@ class Override(object):
              'subprocess': SubprocessModuleStub,
              'sys': SysModuleStub,
              'thermal_throttle': ThermalThrottleModuleStub,
-             'logging': LoggingStub
+             'logging': LoggingStub,
+             'certutils': CertUtilsStub
     }
     self.adb_commands = None
     self.os = None
@@ -114,6 +115,9 @@ class AdbCommandsModuleStub(object):
     def device(self):
       return self._adb_device
 
+    def device_serial(self):
+      return self._device
+
   def __init__(self):
     self.attached_devices = []
     self.adb_device = AdbDevice()
@@ -146,19 +150,122 @@ class CloudStorageModuleStub(object):
     'internal': INTERNAL_BUCKET,
   }
 
+  # These are used to test for CloudStorage errors.
+  INTERNAL_PERMISSION = 2
+  PARTNER_PERMISSION = 1
+  PUBLIC_PERMISSION = 0
+  # Not logged in.
+  CREDENTIALS_ERROR_PERMISSION = -1
+
+  class NotFoundError(Exception):
+    pass
+
   class CloudStorageError(Exception):
     pass
 
+  class PermissionError(CloudStorageError):
+    pass
+
+  class CredentialsError(CloudStorageError):
+    pass
+
   def __init__(self):
-    self.remote_paths = []
+    self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
+                                 CloudStorageModuleStub.PARTNER_BUCKET:{},
+                                 CloudStorageModuleStub.PUBLIC_BUCKET:{}}
+    self.remote_paths = self.default_remote_paths
     self.local_file_hashes = {}
     self.local_hash_files = {}
+    self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
+
+  def SetPermissionLevelForTesting(self, permission_level):
+    self.permission_level = permission_level
+
+  def CheckPermissionLevelForBucket(self, bucket):
+    if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
+      return
+    elif (self.permission_level ==
+          CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
+      raise CloudStorageModuleStub.CredentialsError()
+    elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
+      if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
+        raise CloudStorageModuleStub.PermissionError()
+    elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
+      if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
+        raise CloudStorageModuleStub.PermissionError()
+    else:
+      raise CloudStorageModuleStub.NotFoundError()
+
+  def SetRemotePathsForTesting(self, remote_path_dict=None):
+    if not remote_path_dict:
+      self.remote_paths = self.default_remote_paths
+      return
+    self.remote_paths = remote_path_dict
 
-  def List(self, _):
+  def GetRemotePathsForTesting(self):
+    if not self.remote_paths:
+      self.remote_paths = self.default_remote_paths
     return self.remote_paths
 
+  # Set a dictionary of data files and their "calculated" hashes.
+  def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
+    self.local_file_hashes = calculated_hash_dictionary
+
+  def GetLocalDataFiles(self):
+    return self.local_file_hashes.keys()
+
+  # Set a dictionary of hash files and the hashes they should contain.
+  def SetHashFileContentsForTesting(self, hash_file_dictionary):
+    self.local_hash_files = hash_file_dictionary
+
+  def GetLocalHashFiles(self):
+    return self.local_hash_files.keys()
+
+  def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
+    self.remote_paths[bucket][remote_path] = new_hash
+
+  def List(self, bucket):
+    if not bucket or not bucket in self.remote_paths:
+      bucket_error = ('Incorrect bucket specified, correct buckets:' +
+                      str(self.remote_paths))
+      raise CloudStorageModuleStub.CloudStorageError(bucket_error)
+    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+    return list(self.remote_paths[bucket].keys())
+
+  def Exists(self, bucket, remote_path):
+    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+    return remote_path in self.remote_paths[bucket]
+
   def Insert(self, bucket, remote_path, local_path):
-    pass
+    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+    if not local_path in self.GetLocalDataFiles():
+      file_path_error = 'Local file path does not exist'
+      raise CloudStorageModuleStub.CloudStorageError(file_path_error)
+    self.remote_paths[bucket][remote_path] = (
+      CloudStorageModuleStub.CalculateHash(self, local_path))
+
+  def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
+    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
+    if not remote_path in self.remote_paths[bucket]:
+      if only_if_changed:
+        return False
+      raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
+    remote_hash = self.remote_paths[bucket][remote_path]
+    local_hash = self.local_file_hashes[local_path]
+    if only_if_changed and remote_hash == local_hash:
+      return False
+    self.local_file_hashes[local_path] = remote_hash
+    self.local_hash_files[local_path + '.sha1'] = remote_hash
+    return remote_hash
+
+  def Get(self, bucket, remote_path, local_path):
+    return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+                                            local_path, False)
+
+  def GetIfChanged(self, bucket, local_path):
+    remote_path = os.path.basename(local_path)
+    return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
+                                            local_path, True)
 
   def CalculateHash(self, file_path):
     return self.local_file_hashes[file_path]
@@ -170,13 +277,20 @@ class CloudStorageModuleStub(object):
 class LoggingStub(object):
   def __init__(self):
     self.warnings = []
+    self.errors = []
 
   def info(self, msg, *args):
     pass
 
-  def warn(self, msg, *args):
+  def error(self, msg, *args):
+    self.errors.append(msg % args)
+
+  def warning(self, msg, *args):
     self.warnings.append(msg % args)
 
+  def warn(self, msg, *args):
+    self.warning(msg, *args)
+
 
 class OpenFunctionStub(object):
   class FileStub(object):
@@ -341,3 +455,14 @@ class ThermalThrottleModuleStub(object):
 
   def __init__(self):
     self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
+
+class CertUtilsStub(object):
+  openssl_import_error = None
+
+  @staticmethod
+  def write_dummy_ca_cert(_ca_cert_str, _key_str, _cert_path):
+    raise Exception("write_dummy_ca_cert exception")
+
+  @staticmethod
+  def generate_dummy_ca_cert():
+    return '-', '-'