import subprocess
import time
import os
-import platform
-import re
-import signal
-import ctypes
-
-SDB_PUSH = "sdb -s %s push"
-SDB_SHELL = "sdb -s %s shell"
-SDB_ROOT_ON = "sdb -s %s root on"
-SDB_DEVICES = "sdb devices"
-SDB_START_SERVER = "sdb start-server"
-SDB_KILL_SERVER = "sdb kill-server"
-SDB_CONNECT = "sdb connect %s"
-SDB_PULL = "sdb -s %s pull"
-SDB_CAT_CONFIG = " cat %s"
-SDB_LS = "sdb -s %s shell ls -al"
-FILE_FOUND_SCRIPT = "[ -f %s ] && echo \"Found\" || echo \"Not Found\""
+import sys
+from .constants import Constants
+from .logmanager import LOGGER
+from .exception import DevNotFoundErr
+
class SdbManager:
@staticmethod
- def hostCommand(command):
- print command
- proc = subprocess.Popen(command, shell=True, \
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ def hostLiteCommand(command, failct=None):
+ LOGGER.info(command)
+ proc = subprocess.Popen(command, \
+ shell=True, \
+ bufsize=0, \
+ stdin=subprocess.PIPE, \
+ stdout=subprocess.PIPE, \
+ stderr=subprocess.PIPE)
+ curr_fail = 0
while True:
exit_code = proc.poll()
if exit_code is not None:
break
- output = proc.stdout.read()
+ output = proc.stdout.readline()
if output:
- print output
-
- errlog = proc.stderr.read()
- if errlog:
- print errlog
+ output = output.strip().decode('utf-8')
+ LOGGER.info(output)
+ if output.find("all tasks for testkit lite are accomplished, goodbye") > -1:
+ return True
+ elif output.find("error: protocol fault: no status") > -1:
+ LOGGER.error(output)
+ return False
+ elif output.find("error: target offline") > -1:
+ LOGGER.error(output)
+ return False
+ elif output.find("error: target not found") > -1:
+ LOGGER.error(output)
+ return False
+ elif output.find("error: get too many errors, stop current set") > -1:
+ LOGGER.error(output)
+ return False
+ elif output.find("test app no response, hang or not launched") > -1:
+ LOGGER.error(output)
+ elif output.find("get server status timeout, please check device") > -1:
+ LOGGER.error(output)
+ return False
+ elif output.find("failed to install widget") > -1:
+ LOGGER.error(output)
+ if failct and int(failct) > 0:
+ if output.lower().find('execute case:') > -1 and \
+ (output.lower().find('fail') > -1 or \
+ output.lower().find('block') > -1):
+ curr_fail += 1
+ if int(failct) == curr_fail:
+ return False
return True
@staticmethod
- def hostCommandwithResult(command):
- print command
+ def hostRecLiteCommand(command):
+ LOGGER.info(command)
proc = subprocess.Popen(command, shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
while True:
exit_code = proc.poll()
if exit_code is not None:
break
+ output = proc.stdout.readline()
+ if output:
+ output = output.strip().decode('utf-8')
+ LOGGER.info(output)
+ if output.find("all tasks for testkit lite are accomplished, goodbye") > -1:
+ return True
+ return True
+
+ @staticmethod
+ def hostCommand(command, timeout=90):
+ LOGGER.info(command)
+ outs = None
+ errs = None
+ proc = None
+ try:
+ proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, \
+ stderr=subprocess.PIPE)
+ outs, errs = proc.communicate(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ exc = sys.exc_info()
+ raise exc[1].with_traceback(exc[2])
+ except Exception:
+ exc = sys.exc_info()
+ raise exc[1].with_traceback(exc[2])
+
+ if outs:
+ outs = outs.decode("utf-8")
+ if errs:
+ errs = errs.decode("utf-8")
+
+ if outs:
+ LOGGER.info(outs)
+ if SdbManager.is_target_offline(outs):
+ raise DevNotFoundErr("None")
+
+ if errs:
+ LOGGER.error(errs)
+ if SdbManager.is_target_offline(errs):
+ raise DevNotFoundErr("None")
+
+ return True
+
+ @staticmethod
+ def hostCommandwithResult(command, timeout=90):
+ LOGGER.info(command)
+ try:
+ proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, \
+ stderr=subprocess.PIPE)
+ outs, errs = proc.communicate(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ exc = sys.exc_info()
+ raise exc[1].with_traceback(exc[2])
+ except Exception:
+ exc = sys.exc_info()
+ raise exc[1].with_traceback(exc[2])
+
+ if outs:
+ outs = outs.decode(encoding="utf-8")
+ if errs:
+ errs = errs.decode("utf-8")
- return proc.stdout.read()
+ return outs
@staticmethod
def sdbCommand(command, timeout=90):
- print command
- proc = subprocess.Popen(command,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- time_out = 0
- exit_code = None
- while time_out < timeout:
- exit_code = proc.poll()
- if exit_code is not None:
- break
- time_out += 0.5
- time.sleep(0.5)
- if exit_code is None:
- SdbManager.killall(proc.pid)
+ LOGGER.info(command)
+
+ outs = None
+ errs = None
+ proc = None
+ try:
+ proc = subprocess.Popen(command, \
+ shell=True, \
+ bufsize=0, \
+ stdin=subprocess.PIPE, \
+ stdout=subprocess.PIPE, \
+ stderr=subprocess.PIPE)
+ outs, errs = proc.communicate(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ exc = sys.exc_info()
+ raise exc[1].with_traceback(exc[2])
+ except Exception:
+ exc = sys.exc_info()
+ raise exc[1].with_traceback(exc[2])
- return exit_code, proc.stdout, proc.stderr
+ if outs:
+ outs = outs.decode("utf-8")
+ if errs:
+ errs = errs.decode("utf-8")
+
+ return outs, errs
@staticmethod
def _checkDevIdExists(_devid):
if len(_devid) < 0:
- print "#ERROR#: The '%s' device Id exists error" % _devid
+ LOGGER.error("The '%s' device Id exists error" % _devid)
return False
return True
@staticmethod
- def _checkFileExists(aPath):
- if os.path.exists(aPath):
- return True
- else:
+ def sdbPush(_devid, local, remote):
+ FAIL_LOG = "sdb -s %s push failed" % _devid
+ if SdbManager._checkDevIdExists(_devid) is False:
+ raise DevNotFoundErr(_devid)
+ if Constants.checkFileExists(local) is False:
+ LOGGER.warning("File exists error:%s" % local)
return False
+ push_cmd = Constants.SDB_PUSH % _devid + " " + local + " " + remote
+ outLog, errLog = SdbManager.sdbCommand(push_cmd)
+
+ if errLog:
+ LOGGER.error(FAIL_LOG)
+ LOGGER.error(str(errLog))
+ if SdbManager.is_target_offline(errLog):
+ raise DevNotFoundErr(_devid)
+ return False
+ if outLog:
+ LOGGER.info(str(outLog))
+ return True
+
@staticmethod
- def sdbPush(_devid, local, remote):
- if SdbManager._checkFileExists(local) is False:
- print '#WARNING#: The %s file exists error' % local
+ def sdbPull(_devid, remote, local):
+ FAIL_LOG = "sdb -s %s pull failed" % _devid
+ if SdbManager._checkDevIdExists(_devid) is False:
+ raise DevNotFoundErr(_devid)
+
+ rmtFileCheck_cmd = Constants.SDB_SHELL % _devid + " " \
+ + Constants.FILE_FOUND_SCRIPT % remote
+ outLog, errLog = SdbManager.sdbCommand(rmtFileCheck_cmd)
+
+ if errLog:
+ LOGGER.error(FAIL_LOG)
+ LOGGER.error(str(errLog))
+ if SdbManager.is_target_offline(errLog):
+ raise DevNotFoundErr(_devid)
return False
+ if outLog:
+ LOGGER.info(str(outLog))
- FAIL_LOG = "#ERROR#: sdb push failed"
- SdbManager._checkDevIdExists(_devid)
- push_cmd = SDB_PUSH % _devid + " " + local + " " + remote
- exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(push_cmd)
+ pull_cmd = Constants.SDB_PULL % _devid + " " + remote + " " + local
+ outLog, errLog = SdbManager.sdbCommand(pull_cmd)
- if exit_code is None:
- print FAIL_LOG
- errLog = re_stderr.read()
- if errLog: print errLog
+ if errLog:
+ LOGGER.error(FAIL_LOG)
+ LOGGER.error(str(errLog))
return False
- else:
- print re_stdout.read()
- return True
+ if outLog:
+ LOGGER.info(str(outLog))
+
+ if Constants.checkFileExists(local) is False:
+ return False
+ return True
@staticmethod
- def sdbShell(deviceId, cmd, timeout=90):
- SdbManager._checkDevIdExists(deviceId)
- shell_cmd = SDB_SHELL % deviceId + " " + cmd
- FAIL_LOG = "#ERROR#: sdb shell failed"
- exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(shell_cmd, timeout)
+ def sdbShell(_devid, cmd, timeout=90):
+ FAIL_LOG = "sdb -s %s shell failed" % _devid
+ if SdbManager._checkDevIdExists(_devid) is False:
+ raise DevNotFoundErr(_devid)
+
+ shell_cmd = Constants.SDB_SHELL % _devid + " " + cmd
+ outLog, errLog = SdbManager.sdbCommand(shell_cmd, timeout)
- if exit_code is None:
- print FAIL_LOG
- errLog = re_stderr.read()
- if errLog : print errLog
+ if errLog:
+ LOGGER.error(FAIL_LOG)
+ LOGGER.error(str(errLog))
+ if SdbManager.is_target_offline(errLog):
+ raise DevNotFoundErr(_devid)
return None
- else:
- outLog = re_stdout.read()
- errLog = re_stderr.read()
- if errLog:
- print FAIL_LOG
- print errLog
- return None
- if outLog:
- print outLog
- return outLog
-
- @staticmethod
- def sdbRootOn(deviceId):
- SdbManager._checkDevIdExists(deviceId)
- FAIL_LOG = "#ERROR#: sdb root on failed"
- is_pass = False
- root_cmd = SDB_ROOT_ON % deviceId
- exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(root_cmd, 20)
-
- if exit_code is None:
- print FAIL_LOG
- errLog = re_stderr.read()
- if errLog: print (str(errLog))
-
- checkLog = SdbManager.sdbShell(deviceId, 'whoami')
+ if outLog:
+ LOGGER.info(str(outLog))
+ return outLog
+
+ @staticmethod
+ def sdbRootOn(_devid):
+ FAIL_LOG = "sdb -s %s root on failed" % _devid
+ if SdbManager._checkDevIdExists(_devid) is False:
+ raise DevNotFoundErr(_devid)
+ root_cmd = Constants.SDB_ROOT_ON % _devid
+ outLog, errLog = SdbManager.sdbCommand(root_cmd, 20)
+
+ if errLog:
+ LOGGER.error(FAIL_LOG)
+ LOGGER.error(str(errLog))
+ if SdbManager.is_target_offline(errLog):
+ raise DevNotFoundErr(_devid)
+ return False
+ if outLog:
+ LOGGER.info(str(outLog))
+
+ checkLog = SdbManager.sdbShell(_devid, 'whoami')
if checkLog and checkLog.find('root') != -1:
- print 'SDB ROOT ON'
- is_pass = True
+ LOGGER.info('root on')
else:
- print FAIL_LOG
- print str(checkLog)
-
- return is_pass
+ LOGGER.error(FAIL_LOG)
@staticmethod
def sdbDevices():
- dev_cmd = SDB_DEVICES
- start_server_cmd = SDB_START_SERVER
- SdbManager.sdbCommand(start_server_cmd, 8)
- exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(dev_cmd, 20)
- if exit_code is None:
- print "#ERROR#: %s" % re_stderr.read()
+ FAIL_LOG = "sdb devices failed"
+ dev_cmd = Constants.SDB_DEVICES
+ outLog, errLog = SdbManager.sdbCommand(dev_cmd, 5)
+
+ if errLog:
+ LOGGER.error(FAIL_LOG)
+ LOGGER.error(str(errLog))
return None
+ if outLog:
+ LOGGER.info(str(outLog))
+ return outLog
- return re_stdout.read()
+ @staticmethod
+ def getCapabilityBack(deviceId, tizenVer):
+ remote = Constants.getDEVICE_CAPABILITY_PATH(tizenVer)
+ local = Constants.LOCAL_CAPABILITY_PATH % deviceId
+ return SdbManager.sdbPull(deviceId, remote, local)
@staticmethod
- def sdbSetDate(devid):
- print "set device date"
- HDATE = '`date "+%Y-%m-%d %H:%M"`'
- date_cmd = '"date -s \''+ HDATE + '\'"'
- SdbManager.sdbShell(devid, date_cmd)
+ def recoveryDevice(_tizenV, _deviceid):
+ if _tizenV:
+ tinycmd = "python " + Constants.DEVICE_HEALTH_CMD % _tizenV + \
+ " --check --proc=tinyweb --deviceid=" + _deviceid
+ stubcmd = "python " + Constants.DEVICE_HEALTH_CMD % _tizenV + \
+ " --check --proc=testkit-stub --deviceid=" + _deviceid
+ outLog, errLog = SdbManager.sdbCommand(tinycmd)
+ if errLog: LOGGER.error(str(errLog))
+ if outLog: LOGGER.info(str(outLog))
+
+ outLog, errLog = SdbManager.sdbCommand(stubcmd)
+ if errLog: LOGGER.error(str(errLog))
+ if outLog: LOGGER.info(str(outLog))
@staticmethod
- def killall(ppid):
- """Kill all children process by parent process ID"""
- os_ver = platform.system()
- try:
- if os_ver == "Linux" or os_ver == "Darwin":
- ppid = str(ppid)
- pidgrp = []
-
- def getchildpids(ppid):
- """Return a list of children process"""
- command = "ps -ef | awk '{if ($3 == %s) print $2;}'" % str(
- ppid)
- pids = os.popen(command).read()
- pids = pids.split()
- return pids
-
- pidgrp.extend(getchildpids(ppid))
- for pid in pidgrp:
- pidgrp.extend(getchildpids(pid))
- # Insert self process ID to PID group list
- pidgrp.insert(0, ppid)
- while len(pidgrp) > 0:
- pid = pidgrp.pop()
- try:
- os.kill(int(pid), signal.SIGKILL)
- except OSError, error:
- pattern = re.compile('No such process')
- match = pattern.search(str(error))
- if not match:
- print "[ Error: fail to kill pid: %s," \
- " error: %s ]\n" % (int(pid), error)
- # kill for windows platform
+ def checkDirExist(_devid, _dirname):
+ lscmd = "ls -al " + _dirname
+ outLog = SdbManager.sdbShell(_devid, lscmd)
+ if outLog and outLog.find("No such file or directory") > -1:
+ return False
+ elif outLog is None:
+ return False
+ return True
+
+ @staticmethod
+ def findDumpPath(_devid):
+ find_cmd = "sdb -s %s shell cat /etc/tizen-platform.conf | grep TZ_SYS_CRASH_ROOT" % _devid
+ outLog, errLog = SdbManager.sdbCommand(find_cmd)
+
+ if not outLog:
+ return None
+
+ result = outLog.split('\n')
+ LOGGER.debug(str(result))
+ if len(result) < 2:
+ LOGGER.error("Cannot parsing Crash dump path")
+ return None
+ else:
+ crash_root = result[0].split('=')[1].strip()
+ dump = result[1].split('=')[1].replace('${TZ_SYS_CRASH_ROOT}', '').strip()
+ return crash_root + dump
+
+ @staticmethod
+ def resetDumpFiles(_devid):
+ dumpPath = SdbManager.findDumpPath(_devid)
+ if dumpPath is None:
+ return False
+ if SdbManager.checkDirExist(_devid, dumpPath):
+ rmcmd = "rm -f " + dumpPath + "/*"
+ SdbManager.sdbShell(_devid, rmcmd)
+
+ @staticmethod
+ def deleteDumpFiles(_devid, dump_files):
+ dumpPath = SdbManager.findDumpPath(_devid)
+ if dumpPath is None:
+ return False
+ if SdbManager.checkDirExist(_devid, dumpPath):
+ for file in dump_files:
+ rmcmd = "rm -f " + dumpPath + "/" + file
+ SdbManager.sdbShell(_devid, rmcmd)
+
+ @staticmethod
+ def exportDumpFiles(_devid, _resultDir):
+ dumpPath = SdbManager.findDumpPath(_devid)
+ if dumpPath is None:
+ return False
+ isExist = SdbManager.checkDirExist(_devid, dumpPath)
+ if isExist:
+ if not os.path.exists(_resultDir):
+ os.makedirs(_resultDir)
+ chk_crash_mgr_cmd = "ps -ef | grep -v grep | grep crash-manager"
+ while True:
+ outs = SdbManager.sdbShell(_devid, chk_crash_mgr_cmd)
+ if outs is None:
+ break
+ else:
+ wait_time = 10
+ for loop in range(wait_time):
+ LOGGER.info("Waiting for crash-manager {0}...".\
+ format(wait_time - loop))
+ time.sleep(1)
+
+ remote = dumpPath
+ res = SdbManager.sdbPull(_devid, remote, _resultDir)
+ if res:
+ dump_files = []
+ for file in os.listdir(_resultDir):
+ dump_files.append(file)
+ return dump_files
+ else:
+ LOGGER.error("Not Found crash dump directory : %s" % dumpPath)
+
+ @staticmethod
+ def is_target_offline(log):
+ if log:
+ if str(log).find('target not found') > -1 or \
+ str(log).find('target offline') > -1:
+ return True
else:
- kernel32 = ctypes.windll.kernel32
- handle = kernel32.OpenProcess(1, 0, int(ppid))
- kernel32.TerminateProcess(handle, 0)
- except OSError, error:
- pattern = re.compile('No such process')
- match = pattern.search(str(error))
- if not match:
- print "[ Error: fail to kill pid: %s, error: %s ]\n" \
- % (int(ppid), error)
- return None
+ return False
+ return False