import collections
import datetime
import inspect
-import json
import logging
import os
+import random
import re
import shlex
import signal
import errors
from pylib.device import device_blacklist
+from pylib.device import device_errors
# Pattern to search for the next whole line of pexpect output and capture it
# into a match group. We can't use ^ and $ for line start end with pexpect,
# Property in /data/local.prop that controls Java assertions.
JAVA_ASSERT_PROPERTY = 'dalvik.vm.enableassertions'
-MEMORY_INFO_RE = re.compile('^(?P<key>\w+):\s+(?P<usage_kb>\d+) kB$')
-NVIDIA_MEMORY_INFO_RE = re.compile('^\s*(?P<user>\S+)\s*(?P<name>\S+)\s*'
- '(?P<pid>\d+)\s*(?P<usage_bytes>\d+)$')
-
# Keycode "enum" suitable for passing to AndroidCommands.SendKey().
KEYCODE_HOME = 3
KEYCODE_BACK = 4
MD5SUM_DEVICE_FOLDER = constants.TEST_EXECUTABLE_DIR + '/md5sum/'
MD5SUM_DEVICE_PATH = MD5SUM_DEVICE_FOLDER + 'md5sum_bin'
-MD5SUM_LD_LIBRARY_PATH = 'LD_LIBRARY_PATH=%s' % MD5SUM_DEVICE_FOLDER
+
+PIE_WRAPPER_PATH = constants.TEST_EXECUTABLE_DIR + '/run_pie'
CONTROL_USB_CHARGING_COMMANDS = [
{
},
]
+class DeviceTempFile(object):
+ def __init__(self, android_commands, prefix='temp_file', suffix=''):
+ """Find an unused temporary file path in the devices external directory.
+
+ When this object is closed, the file will be deleted on the device.
+ """
+ self.android_commands = android_commands
+ while True:
+ # TODO(cjhopman): This could actually return the same file in multiple
+ # calls if the caller doesn't write to the files immediately. This is
+ # expected to never happen.
+ i = random.randint(0, 1000000)
+ self.name = '%s/%s-%d-%010d%s' % (
+ android_commands.GetExternalStorage(),
+ prefix, int(time.time()), i, suffix)
+ if not android_commands.FileExistsOnDevice(self.name):
+ break
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
+ def close(self):
+ self.android_commands.RunShellCommand('rm ' + self.name)
+
+
def GetAVDs():
"""Returns a list of AVDs."""
re_avd = re.compile('^[ ]+Name: ([a-zA-Z0-9_:.-]+)', re.MULTILINE)
}
self._protected_file_access_method_initialized = None
self._privileged_command_runner = None
+ self._pie_wrapper = None
@property
def system_properties(self):
Returns:
True if device is in 'device' mode, False otherwise.
"""
- out = self._adb.SendCommand('get-state')
- return out.strip() == 'device'
+ # TODO(aurimas): revert to using adb get-state when android L adb is fixed.
+ #out = self._adb.SendCommand('get-state')
+ #return out.strip() == 'device'
+
+ out = self._adb.SendCommand('devices')
+ for line in out.split('\n'):
+ if self._device in line and 'device' in line:
+ return True
+ return False
def IsRootEnabled(self):
"""Checks if root is enabled on the device."""
def GetExternalStorage(self):
if not self._external_storage:
self._external_storage = self.RunShellCommand('echo $EXTERNAL_STORAGE')[0]
- assert self._external_storage, 'Unable to find $EXTERNAL_STORAGE'
+ if not self._external_storage:
+ raise device_errors.CommandFailedError(
+ ['shell', "'echo $EXTERNAL_STORAGE'"],
+ 'Unable to find $EXTERNAL_STORAGE')
return self._external_storage
- def WaitForDevicePm(self):
+ def WaitForDevicePm(self, timeout=120):
"""Blocks until the device's package manager is available.
To workaround http://b/5201039, we restart the shell and retry if the
retries = 3
while retries:
try:
- self._adb.WaitForDevicePm()
+ self._adb.WaitForDevicePm(wait_time=timeout)
return # Success
except errors.WaitForResponseTimedOutError as e:
last_err = e
timeout = 120
# To run tests we need at least the package manager and the sd card (or
# other external storage) to be ready.
- self.WaitForDevicePm()
+ self.WaitForDevicePm(timeout)
self.WaitForSdCardReady(timeout)
def Shutdown(self):
raise errors.MsgException('Remount failed: %s' % out)
def RestartAdbdOnDevice(self):
- logging.info('Killing adbd on the device...')
- adb_pids = self.ExtractPid('adbd')
- if not adb_pids:
- raise errors.MsgException('Unable to obtain adbd pid')
- try:
- self.KillAll('adbd', signum=signal.SIGTERM, with_su=True)
- logging.info('Waiting for device to settle...')
+ logging.info('Restarting adbd on the device...')
+ with DeviceTempFile(self, suffix=".sh") as temp_script_file:
+ host_script_path = os.path.join(constants.DIR_SOURCE_ROOT,
+ 'build',
+ 'android',
+ 'pylib',
+ 'restart_adbd.sh')
+ self._adb.Push(host_script_path, temp_script_file.name)
+ self.RunShellCommand('. %s' % temp_script_file.name)
self._adb.SendCommand('wait-for-device')
- new_adb_pids = self.ExtractPid('adbd')
- if new_adb_pids == adb_pids:
- logging.warning('adbd on the device may not have been restarted.')
- except Exception as e:
- logging.error('Exception when trying to kill adbd on the device [%s]', e)
def RestartAdbServer(self):
"""Restart the adb server."""
raise errors.WaitForResponseTimedOutError(
'SD card not ready after %s seconds' % timeout_time)
- def _CheckCommandIsValid(self, command):
- """Raises a ValueError if the command is not valid."""
-
- # A dict of commands the user should not run directly and a mapping to the
- # API they should use instead.
- preferred_apis = {
- 'getprop': 'system_properties[<PROPERTY>]',
- 'setprop': 'system_properties[<PROPERTY>]',
- 'su': 'RunShellCommandWithSU()',
- }
+ def GetAndroidToolStatusAndOutput(self, command, lib_path=None, *args, **kw):
+ """Runs a native Android binary, wrapping the command as necessary.
- # A dict of commands to methods that may call them.
- whitelisted_callers = {
- 'su': 'RunShellCommandWithSU',
- 'getprop': 'ProvisionDevices',
- }
+ This is a specialization of GetShellCommandStatusAndOutput, which is meant
+ for running tools/android/ binaries and handle properly: (1) setting the
+ lib path (for component=shared_library), (2) using the PIE wrapper on ICS.
+ See crbug.com/373219 for more context.
- base_command = shlex.split(command)[0].strip(';')
- if (base_command in preferred_apis and
- (base_command not in whitelisted_callers or
- whitelisted_callers[base_command] not in [
- f[3] for f in inspect.stack()])):
- error_msg = ('%s should not be run directly. Instead use: %s' %
- (base_command, preferred_apis[base_command]))
- raise ValueError(error_msg)
+ Args:
+ command: String containing the command to send.
+ lib_path: (optional) path to the folder containing the dependent libs.
+ Same other arguments of GetCmdStatusAndOutput.
+ """
+ # The first time this command is run the device is inspected to check
+ # whether a wrapper for running PIE executable is needed (only Android ICS)
+ # or not. The results is cached, so the wrapper is pushed only once.
+ if self._pie_wrapper is None:
+ # None: did not check; '': did check and not needed; '/path': use /path.
+ self._pie_wrapper = ''
+ if self.GetBuildId().startswith('I'): # Ixxxx = Android ICS.
+ run_pie_dist_path = os.path.join(constants.GetOutDirectory(), 'run_pie')
+ assert os.path.exists(run_pie_dist_path), 'Please build run_pie'
+ # The PIE loader must be pushed manually (i.e. no PushIfNeeded) because
+ # PushIfNeeded requires md5sum and md5sum requires the wrapper as well.
+ command = 'push %s %s' % (run_pie_dist_path, PIE_WRAPPER_PATH)
+ assert _HasAdbPushSucceeded(self._adb.SendCommand(command))
+ self._pie_wrapper = PIE_WRAPPER_PATH
+
+ if self._pie_wrapper:
+ command = '%s %s' % (self._pie_wrapper, command)
+ if lib_path:
+ command = 'LD_LIBRARY_PATH=%s %s' % (lib_path, command)
+ return self.GetShellCommandStatusAndOutput(command, *args, **kw)
# It is tempting to turn this function into a generator, however this is not
# possible without using a private (local) adb_shell instance (to ensure no
"""Send a command to the adb shell and return the result.
Args:
- command: String containing the shell command to send. Must not include
- the single quotes as we use them to escape the whole command.
+ command: String containing the shell command to send.
timeout_time: Number of seconds to wait for command to respond before
retrying, used by AdbInterface.SendShellCommand.
log_result: Boolean to indicate whether we should log the result of the
Returns:
list containing the lines of output received from running the command
"""
- self._CheckCommandIsValid(command)
self._LogShell(command)
if "'" in command:
- logging.warning(command + " contains ' quotes")
+ command = command.replace('\'', '\'\\\'\'')
result = self._adb.SendShellCommand(
"'%s'" % command, timeout_time).splitlines()
# TODO(b.kelemen): we should really be able to drop the stderr of the
self.RunShellCommand(cmd)
return len(pids)
- def KillAllBlocking(self, process, timeout_sec):
+ def KillAllBlocking(self, process, timeout_sec, signum=9, with_su=False):
"""Blocking version of killall, connected via adb.
This waits until no process matching the corresponding name appears in ps'
Args:
process: name of the process to kill off
timeout_sec: the timeout in seconds
-
+ signum: same as |KillAll|
+ with_su: same as |KillAll|
Returns:
the number of processes killed
"""
- processes_killed = self.KillAll(process)
+ processes_killed = self.KillAll(process, signum=signum, with_su=with_su)
if processes_killed:
elapsed = 0
wait_period = 0.1
time.sleep(wait_period)
elapsed += wait_period
if elapsed >= timeout_sec:
- return 0
+ return processes_killed - self.ExtractPid(process)
return processes_killed
@staticmethod
trace_file_name: If used, turns on and saves the trace to this file name.
force_stop: force stop the target app before starting the activity (-S
flag).
+ Returns:
+ The output of the underlying command as a list of lines.
"""
cmd = self._GetActivityCommand(package, activity, wait_for_completion,
action, category, data, extras,
trace_file_name, force_stop, flags)
- self.RunShellCommand(cmd)
+ return self.RunShellCommand(cmd)
def StartActivityTimed(self, package, activity, wait_for_completion=False,
action='android.intent.action.VIEW',
Args - as for StartActivity
Returns:
- a timestamp string for the time at which the activity started
+ A tuple containing:
+ - the output of the underlying command as a list of lines, and
+ - a timestamp string for the time at which the activity started
"""
cmd = self._GetActivityCommand(package, activity, wait_for_completion,
action, category, data, extras,
trace_file_name, force_stop, flags)
self.StartMonitoringLogcat()
- self.RunShellCommand('log starting activity; ' + cmd)
+ out = self.RunShellCommand('log starting activity; ' + cmd)
activity_started_re = re.compile('.*starting activity.*')
m = self.WaitForLogMatch(activity_started_re, None)
assert m
start_line = m.group(0)
- return GetLogTimestamp(start_line, self.GetDeviceYear())
+ return (out, GetLogTimestamp(start_line, self.GetDeviceYear()))
def StartCrashUploadService(self, package):
# TODO(frankf): We really need a python wrapper around Intent
md5sum_dist_path = os.path.join(constants.GetOutDirectory(),
'md5sum_dist')
assert os.path.exists(md5sum_dist_path), 'Please build md5sum.'
- command = 'push %s %s' % (md5sum_dist_path, MD5SUM_DEVICE_FOLDER)
- assert _HasAdbPushSucceeded(self._adb.SendCommand(command))
-
- cmd = (MD5SUM_LD_LIBRARY_PATH + ' ' + self._util_wrapper + ' ' +
- MD5SUM_DEVICE_PATH + ' ' + device_path)
- device_hash_tuples = _ParseMd5SumOutput(
- self.RunShellCommand(cmd, timeout_time=2 * 60))
+ md5sum_dist_mtime = os.stat(md5sum_dist_path).st_mtime
+ if (md5sum_dist_path not in self._push_if_needed_cache or
+ self._push_if_needed_cache[md5sum_dist_path] != md5sum_dist_mtime):
+ command = 'push %s %s' % (md5sum_dist_path, MD5SUM_DEVICE_FOLDER)
+ assert _HasAdbPushSucceeded(self._adb.SendCommand(command))
+ self._push_if_needed_cache[md5sum_dist_path] = md5sum_dist_mtime
+
+ (_, md5_device_output) = self.GetAndroidToolStatusAndOutput(
+ self._util_wrapper + ' ' + MD5SUM_DEVICE_PATH + ' ' + device_path,
+ lib_path=MD5SUM_DEVICE_FOLDER,
+ timeout_time=2 * 60)
+ device_hash_tuples = _ParseMd5SumOutput(md5_device_output)
assert os.path.exists(host_path), 'Local path not found %s' % host_path
md5sum_output = cmd_helper.GetCmdOutput(
[os.path.join(constants.GetOutDirectory(), 'md5sum_bin_host'),
All pushed files can be removed by calling RemovePushedFiles().
"""
MAX_INDIVIDUAL_PUSHES = 50
- assert os.path.exists(host_path), 'Local path not found %s' % host_path
+ if not os.path.exists(host_path):
+ raise device_errors.CommandFailedError(
+ 'Local path not found %s' % host_path, device=str(self))
# See if the file on the host changed since the last push (if any) and
# return early if it didn't. Note that this shortcut assumes that the tests
self._pushed_files.append(device_path)
self._potential_push_size += size
+ if os.path.isdir(host_path):
+ self.RunShellCommand('mkdir -p "%s"' % device_path)
+
changed_files = self.GetFilesChanged(host_path, device_path)
logging.info('Found %d files that need to be pushed to %s',
len(changed_files), device_path)
# approximates the push time for each method.
if len(changed_files) > MAX_INDIVIDUAL_PUSHES or diff_size > 0.5 * size:
self._actual_push_size += size
- if os.path.isdir(host_path):
- self.RunShellCommand('mkdir -p %s' % device_path)
Push(host_path, device_path)
else:
for f in changed_files:
f.flush()
self._adb.Push(f.name, filename)
- _TEMP_FILE_BASE_FMT = 'temp_file_%d'
- _TEMP_SCRIPT_FILE_BASE_FMT = 'temp_script_file_%d.sh'
-
- def _GetDeviceTempFileName(self, base_name):
- i = 0
- while self.FileExistsOnDevice(
- self.GetExternalStorage() + '/' + base_name % i):
- i += 1
- return self.GetExternalStorage() + '/' + base_name % i
-
def RunShellCommandWithSU(self, command, timeout_time=20, log_result=False):
return self.RunShellCommand('su -c %s' % command, timeout_time, log_result)
This is less efficient than SetFileContents.
"""
- # TODO(skyostil): Remove this once it has been through all the bots.
- for file_name in (AndroidCommands._TEMP_FILE_BASE_FMT,
- AndroidCommands._TEMP_SCRIPT_FILE_BASE_FMT):
- self.RunShellCommand('rm ' + self.GetExternalStorage() + '/' +
- file_name.replace('%d', '*'))
-
- temp_file = self._GetDeviceTempFileName(AndroidCommands._TEMP_FILE_BASE_FMT)
- temp_script = self._GetDeviceTempFileName(
- AndroidCommands._TEMP_SCRIPT_FILE_BASE_FMT)
+ with DeviceTempFile(self) as temp_file:
+ with DeviceTempFile(self, suffix=".sh") as temp_script:
+ # Put the contents in a temporary file
+ self.SetFileContents(temp_file.name, contents)
+ # Create a script to copy the file contents to its final destination
+ self.SetFileContents(temp_script.name,
+ 'cat %s > %s' % (temp_file.name, filename))
+
+ command = 'sh %s' % temp_script.name
+ command_runner = self._GetProtectedFileCommandRunner()
+ if command_runner:
+ return command_runner(command)
+ else:
+ logging.warning(
+ 'Could not set contents of protected file: %s' % filename)
- try:
- # Put the contents in a temporary file
- self.SetFileContents(temp_file, contents)
- # Create a script to copy the file contents to its final destination
- self.SetFileContents(temp_script, 'cat %s > %s' % (temp_file, filename))
-
- command = 'sh %s' % temp_script
- command_runner = self._GetProtectedFileCommandRunner()
- if command_runner:
- return command_runner(command)
- else:
- logging.warning(
- 'Could not set contents of protected file: %s' % filename)
- finally:
- # And remove the temporary files
- self.RunShellCommand('rm ' + temp_file)
- self.RunShellCommand('rm ' + temp_script)
def RemovePushedFiles(self):
"""Removes all files pushed with PushIfNeeded() from the device."""
"""
# Example output:
# /foo/bar:
- # -rw-r----- 1 user group 102 2011-05-12 12:29:54.131623387 +0100 baz.txt
+ # -rw-r----- user group 102 2011-05-12 12:29:54.131623387 +0100 baz.txt
re_file = re.compile('^-(?P<perms>[^\s]+)\s+'
'(?P<user>[^\s]+)\s+'
'(?P<group>[^\s]+)\s+'
temp_props_file = tempfile.NamedTemporaryFile()
properties = ''
if self._adb.Pull(LOCAL_PROPERTIES_PATH, temp_props_file.name):
- properties = file(temp_props_file.name).read()
+ with open(temp_props_file.name) as f:
+ properties = f.read()
re_search = re.compile(r'^\s*' + re.escape(JAVA_ASSERT_PROPERTY) +
r'\s*=\s*all\s*$', re.MULTILINE)
if enable != bool(re.search(re_search, properties)):
def GetSubscriberInfo(self):
"""Returns the device subscriber info (e.g. GSM and device ID) as string."""
iphone_sub = self.RunShellCommand('dumpsys iphonesubinfo')
- assert iphone_sub
+ # Do not assert here. Devices (e.g. Nakasi on K) may not have iphonesubinfo.
return '\n'.join(iphone_sub)
def GetBatteryInfo(self):
- """Returns the device battery info (e.g. status, level, etc) as string."""
+ """Returns a {str: str} dict of battery info (e.g. status, level, etc)."""
battery = self.RunShellCommand('dumpsys battery')
assert battery
- return '\n'.join(battery)
+ battery_info = {}
+ for line in battery[1:]:
+ k, _, v = line.partition(': ')
+ battery_info[k.strip()] = v.strip()
+ return battery_info
def GetSetupWizardStatus(self):
"""Returns the status of the device setup wizard (e.g. DISABLED)."""
pid: The pid number of the specific process running on device.
Returns:
- A tuple containg:
- [0]: Dict of {metric:usage_kb}, for the process which has specified pid.
+ Dict of {metric:usage_kb}, for the process which has specified pid.
The metric keys which may be included are: Size, Rss, Pss, Shared_Clean,
- Shared_Dirty, Private_Clean, Private_Dirty, Referenced, Swap,
- KernelPageSize, MMUPageSize, Nvidia (tablet only), VmHWM.
- [1]: Detailed /proc/[PID]/smaps information.
+ Shared_Dirty, Private_Clean, Private_Dirty, VmHWM.
"""
+ showmap = self.RunShellCommand('showmap %d' % pid)
+ if not showmap or not showmap[-1].endswith('TOTAL'):
+ logging.warning('Invalid output for showmap %s', str(showmap))
+ return {}
+ items = showmap[-1].split()
+ if len(items) != 9:
+ logging.warning('Invalid TOTAL for showmap %s', str(items))
+ return {}
usage_dict = collections.defaultdict(int)
- smaps = collections.defaultdict(dict)
- current_smap = ''
- for line in self.GetProtectedFileContents('/proc/%s/smaps' % pid):
- items = line.split()
- # See man 5 proc for more details. The format is:
- # address perms offset dev inode pathname
- if len(items) > 5:
- current_smap = ' '.join(items[5:])
- elif len(items) > 3:
- current_smap = ' '.join(items[3:])
- match = re.match(MEMORY_INFO_RE, line)
- if match:
- key = match.group('key')
- usage_kb = int(match.group('usage_kb'))
- usage_dict[key] += usage_kb
- if key not in smaps[current_smap]:
- smaps[current_smap][key] = 0
- smaps[current_smap][key] += usage_kb
- if not usage_dict or not any(usage_dict.values()):
- # Presumably the process died between ps and calling this method.
- logging.warning('Could not find memory usage for pid ' + str(pid))
-
- for line in self.GetProtectedFileContents('/d/nvmap/generic-0/clients'):
- match = re.match(NVIDIA_MEMORY_INFO_RE, line)
- if match and match.group('pid') == pid:
- usage_bytes = int(match.group('usage_bytes'))
- usage_dict['Nvidia'] = int(round(usage_bytes / 1000.0)) # kB
- break
-
+ usage_dict.update({
+ 'Size': int(items[0].strip()),
+ 'Rss': int(items[1].strip()),
+ 'Pss': int(items[2].strip()),
+ 'Shared_Clean': int(items[3].strip()),
+ 'Shared_Dirty': int(items[4].strip()),
+ 'Private_Clean': int(items[5].strip()),
+ 'Private_Dirty': int(items[6].strip()),
+ })
peak_value_kb = 0
for line in self.GetProtectedFileContents('/proc/%s/status' % pid):
if not line.startswith('VmHWM:'): # Format: 'VmHWM: +[0-9]+ kB'
continue
peak_value_kb = int(line.split(':')[1].strip().split(' ')[0])
+ break
usage_dict['VmHWM'] = peak_value_kb
if not peak_value_kb:
logging.warning('Could not find memory peak value for pid ' + str(pid))
- return (usage_dict, smaps)
-
- def GetMemoryUsageForPackage(self, package):
- """Returns the memory usage for all processes whose name contains |pacakge|.
-
- Args:
- package: A string holding process name to lookup pid list for.
-
- Returns:
- A tuple containg:
- [0]: Dict of {metric:usage_kb}, summed over all pids associated with
- |name|.
- The metric keys which may be included are: Size, Rss, Pss, Shared_Clean,
- Shared_Dirty, Private_Clean, Private_Dirty, Referenced, Swap,
- KernelPageSize, MMUPageSize, Nvidia (tablet only).
- [1]: a list with detailed /proc/[PID]/smaps information.
- """
- usage_dict = collections.defaultdict(int)
- pid_list = self.ExtractPid(package)
- smaps = collections.defaultdict(dict)
-
- for pid in pid_list:
- usage_dict_per_pid, smaps_per_pid = self.GetMemoryUsageForPid(pid)
- smaps[pid] = smaps_per_pid
- for (key, value) in usage_dict_per_pid.items():
- usage_dict[key] += value
-
- return usage_dict, smaps
+ return usage_dict
def ProcessesUsingDevicePort(self, device_port):
"""Lists processes using the specified device port on loopback interface.
device_file: Absolute path to the file to retrieve from the device.
host_file: Absolute path to the file to store on the host.
"""
- assert self._adb.Pull(device_file, host_file)
+ if not self._adb.Pull(device_file, host_file):
+ raise device_errors.AdbCommandFailedError(
+ ['pull', device_file, host_file], 'Failed to pull file from device.')
assert os.path.exists(host_file)
def SetUtilWrapper(self, util_wrapper):
"""
self._util_wrapper = util_wrapper
- def RunInstrumentationTest(self, test, test_package, instr_args, timeout):
- """Runs a single instrumentation test.
-
- Args:
- test: Test class/method.
- test_package: Package name of test apk.
- instr_args: Extra key/value to pass to am instrument.
- timeout: Timeout time in seconds.
-
- Returns:
- An instance of am_instrument_parser.TestResult object.
- """
- instrumentation_path = ('%s/android.test.InstrumentationTestRunner' %
- test_package)
- args_with_filter = dict(instr_args)
- args_with_filter['class'] = test
- logging.info(args_with_filter)
- (raw_results, _) = self._adb.StartInstrumentation(
- instrumentation_path=instrumentation_path,
- instrumentation_args=args_with_filter,
- timeout_time=timeout)
- assert len(raw_results) == 1
- return raw_results[0]
-
def RunUIAutomatorTest(self, test, test_package, timeout):
"""Runs a single uiautomator test.
dest: absolute path of destination directory
"""
logging.info('In EfficientDeviceDirectoryCopy %s %s', source, dest)
- temp_script_file = self._GetDeviceTempFileName(
- AndroidCommands._TEMP_SCRIPT_FILE_BASE_FMT)
- host_script_path = os.path.join(constants.DIR_SOURCE_ROOT,
- 'build',
- 'android',
- 'pylib',
- 'efficient_android_directory_copy.sh')
- try:
- self._adb.Push(host_script_path, temp_script_file)
- self.EnableAdbRoot()
- out = self.RunShellCommand('sh %s %s %s' % (temp_script_file,
- source,
- dest),
- timeout_time=120)
+ with DeviceTempFile(self, suffix=".sh") as temp_script_file:
+ host_script_path = os.path.join(constants.DIR_SOURCE_ROOT,
+ 'build',
+ 'android',
+ 'pylib',
+ 'efficient_android_directory_copy.sh')
+ self._adb.Push(host_script_path, temp_script_file.name)
+ out = self.RunShellCommand(
+ 'sh %s %s %s' % (temp_script_file.name, source, dest),
+ timeout_time=120)
if self._device:
device_repr = self._device[-4:]
else:
device_repr = '????'
for line in out:
logging.info('[%s]> %s', device_repr, line)
- finally:
- self.RunShellCommand('rm %s' % temp_script_file)
def _GetControlUsbChargingCommand(self):
if self._control_usb_charging_command['cached']:
return self._control_usb_charging_command['command']
self._control_usb_charging_command['cached'] = True
+ if not self.IsRootEnabled():
+ return None
for command in CONTROL_USB_CHARGING_COMMANDS:
# Assert command is valid.
assert 'disable_command' in command
def CanControlUsbCharging(self):
return self._GetControlUsbChargingCommand() is not None
- def DisableUsbCharging(self):
+ def DisableUsbCharging(self, timeout=10):
command = self._GetControlUsbChargingCommand()
if not command:
raise Exception('Unable to act on usb charging.')
disable_command = command['disable_command']
+ t0 = time.time()
# Do not loop directly on self.IsDeviceCharging to cut the number of calls
# to the device.
while True:
+ if t0 + timeout - time.time() < 0:
+ raise pexpect.TIMEOUT('Unable to enable USB charging in time.')
self.RunShellCommand(disable_command)
if not self.IsDeviceCharging():
break
- def EnableUsbCharging(self):
+ def EnableUsbCharging(self, timeout=10):
command = self._GetControlUsbChargingCommand()
if not command:
raise Exception('Unable to act on usb charging.')
disable_command = command['enable_command']
+ t0 = time.time()
# Do not loop directly on self.IsDeviceCharging to cut the number of calls
# to the device.
while True:
+ if t0 + timeout - time.time() < 0:
+ raise pexpect.TIMEOUT('Unable to enable USB charging in time.')
self.RunShellCommand(disable_command)
if self.IsDeviceCharging():
break