import subprocess
import time
import os
-import sys
-from .constants import Constants
-from .logmanager import LOGGER
-from .exception import DevNotFoundErr
-
+import platform
+import re
+import signal
+import ctypes
+
+SDB_PUSH = "sdb -s %s push"
+SDB_SHELL = "sdb -s %s shell"
+SDB_ROOT_ON = "sdb -s %s root on"
+SDB_DEVICES = "sdb devices"
+SDB_START_SERVER = "sdb start-server"
+SDB_KILL_SERVER = "sdb kill-server"
+SDB_CONNECT = "sdb connect %s"
+SDB_PULL = "sdb -s %s pull"
+SDB_CAT_CONFIG = " cat %s"
+SDB_LS = "sdb -s %s shell ls -al"
+FILE_FOUND_SCRIPT = "[ -f %s ] && echo \"Found\" || echo \"Not Found\""
class SdbManager:
@staticmethod
- def hostLiteCommand(command, failct=None):
- LOGGER.info(command)
- proc = subprocess.Popen(command, \
- shell=True, \
- bufsize=0, \
- stdin=subprocess.PIPE, \
- stdout=subprocess.PIPE, \
- stderr=subprocess.PIPE)
+ def hostCommand(command):
+ print command
+ proc = subprocess.Popen(command, shell=True, \
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- curr_fail = 0
while True:
exit_code = proc.poll()
if exit_code is not None:
break
- output = proc.stdout.readline()
+ output = proc.stdout.read()
if output:
- output = output.strip().decode('utf-8')
- LOGGER.info(output)
- if output.find("all tasks for testkit lite are accomplished, goodbye") > -1:
- return True
- elif output.find("error: protocol fault: no status") > -1:
- LOGGER.error(output)
- return False
- elif output.find("error: target offline") > -1:
- LOGGER.error(output)
- return False
- elif output.find("error: target not found") > -1:
- LOGGER.error(output)
- return False
- elif output.find("error: get too many errors, stop current set") > -1:
- LOGGER.error(output)
- return False
- elif output.find("test app no response, hang or not launched") > -1:
- LOGGER.error(output)
- elif output.find("get server status timeout, please check device") > -1:
- LOGGER.error(output)
- return False
- elif output.find("failed to install widget") > -1:
- LOGGER.error(output)
+ print output
+
+ errlog = proc.stderr.read()
+ if errlog:
+ print errlog
- if failct and int(failct) > 0:
- if output.lower().find('execute case:') > -1 and \
- (output.lower().find('fail') > -1 or \
- output.lower().find('block') > -1):
- curr_fail += 1
- if int(failct) == curr_fail:
- return False
return True
@staticmethod
- def hostRecLiteCommand(command):
- LOGGER.info(command)
+ def hostCommandwithResult(command):
+ print command
proc = subprocess.Popen(command, shell=True, \
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
while True:
exit_code = proc.poll()
if exit_code is not None:
break
- output = proc.stdout.readline()
- if output:
- output = output.strip().decode('utf-8')
- LOGGER.info(output)
- if output.find("all tasks for testkit lite are accomplished, goodbye") > -1:
- return True
- return True
-
- @staticmethod
- def hostCommand(command, timeout=90):
- LOGGER.info(command)
- outs = None
- errs = None
- proc = None
- try:
- proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, \
- stderr=subprocess.PIPE)
- outs, errs = proc.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- exc = sys.exc_info()
- raise exc[1].with_traceback(exc[2])
- except Exception:
- exc = sys.exc_info()
- raise exc[1].with_traceback(exc[2])
-
- if outs:
- outs = outs.decode("utf-8")
- if errs:
- errs = errs.decode("utf-8")
-
- if outs:
- LOGGER.info(outs)
- if SdbManager.is_target_offline(outs):
- raise DevNotFoundErr("None")
-
- if errs:
- LOGGER.error(errs)
- if SdbManager.is_target_offline(errs):
- raise DevNotFoundErr("None")
-
- return True
-
- @staticmethod
- def hostCommandwithResult(command, timeout=90):
- LOGGER.info(command)
- try:
- proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, \
- stderr=subprocess.PIPE)
- outs, errs = proc.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- exc = sys.exc_info()
- raise exc[1].with_traceback(exc[2])
- except Exception:
- exc = sys.exc_info()
- raise exc[1].with_traceback(exc[2])
-
- if outs:
- outs = outs.decode(encoding="utf-8")
- if errs:
- errs = errs.decode("utf-8")
- return outs
+ return proc.stdout.read()
@staticmethod
def sdbCommand(command, timeout=90):
- LOGGER.info(command)
-
- outs = None
- errs = None
- proc = None
- try:
- proc = subprocess.Popen(command, \
- shell=True, \
- bufsize=0, \
- stdin=subprocess.PIPE, \
- stdout=subprocess.PIPE, \
- stderr=subprocess.PIPE)
- outs, errs = proc.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- exc = sys.exc_info()
- raise exc[1].with_traceback(exc[2])
- except Exception:
- exc = sys.exc_info()
- raise exc[1].with_traceback(exc[2])
-
- if outs:
- outs = outs.decode("utf-8")
- if errs:
- errs = errs.decode("utf-8")
+ print command
+ proc = subprocess.Popen(command,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ time_out = 0
+ exit_code = None
+ while time_out < timeout:
+ exit_code = proc.poll()
+ if exit_code is not None:
+ break
+ time_out += 0.5
+ time.sleep(0.5)
+ if exit_code is None:
+ SdbManager.killall(proc.pid)
- return outs, errs
+ return exit_code, proc.stdout, proc.stderr
@staticmethod
def _checkDevIdExists(_devid):
if len(_devid) < 0:
- LOGGER.error("The '%s' device Id exists error" % _devid)
+ print "#ERROR#: The '%s' device Id exists error" % _devid
return False
return True
@staticmethod
- def sdbPush(_devid, local, remote):
- FAIL_LOG = "sdb -s %s push failed" % _devid
- if SdbManager._checkDevIdExists(_devid) is False:
- raise DevNotFoundErr(_devid)
- if Constants.checkFileExists(local) is False:
- LOGGER.warning("File exists error:%s" % local)
- return False
-
- push_cmd = Constants.SDB_PUSH % _devid + " " + local + " " + remote
- outLog, errLog = SdbManager.sdbCommand(push_cmd)
-
- if errLog:
- LOGGER.error(FAIL_LOG)
- LOGGER.error(str(errLog))
- if SdbManager.is_target_offline(errLog):
- raise DevNotFoundErr(_devid)
- return False
- if outLog:
- LOGGER.info(str(outLog))
+ def _checkFileExists(aPath):
+ if os.path.exists(aPath):
return True
+ else:
+ return False
@staticmethod
- def sdbPull(_devid, remote, local):
- FAIL_LOG = "sdb -s %s pull failed" % _devid
- if SdbManager._checkDevIdExists(_devid) is False:
- raise DevNotFoundErr(_devid)
-
- rmtFileCheck_cmd = Constants.SDB_SHELL % _devid + " " \
- + Constants.FILE_FOUND_SCRIPT % remote
- outLog, errLog = SdbManager.sdbCommand(rmtFileCheck_cmd)
-
- if errLog:
- LOGGER.error(FAIL_LOG)
- LOGGER.error(str(errLog))
- if SdbManager.is_target_offline(errLog):
- raise DevNotFoundErr(_devid)
+ def sdbPush(_devid, local, remote):
+ if SdbManager._checkFileExists(local) is False:
+ print '#WARNING#: The %s file exists error' % local
return False
- if outLog:
- LOGGER.info(str(outLog))
-
- pull_cmd = Constants.SDB_PULL % _devid + " " + remote + " " + local
- outLog, errLog = SdbManager.sdbCommand(pull_cmd)
- if errLog:
- LOGGER.error(FAIL_LOG)
- LOGGER.error(str(errLog))
- return False
- if outLog:
- LOGGER.info(str(outLog))
+ FAIL_LOG = "#ERROR#: sdb push failed"
+ SdbManager._checkDevIdExists(_devid)
+ push_cmd = SDB_PUSH % _devid + " " + local + " " + remote
+ exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(push_cmd)
- if Constants.checkFileExists(local) is False:
+ if exit_code is None:
+ print FAIL_LOG
+ errLog = re_stderr.read()
+ if errLog: print errLog
return False
- return True
+ else:
+ print re_stdout.read()
+ return True
@staticmethod
- def sdbShell(_devid, cmd, timeout=90):
- FAIL_LOG = "sdb -s %s shell failed" % _devid
- if SdbManager._checkDevIdExists(_devid) is False:
- raise DevNotFoundErr(_devid)
-
- shell_cmd = Constants.SDB_SHELL % _devid + " " + cmd
- outLog, errLog = SdbManager.sdbCommand(shell_cmd, timeout)
+ def sdbShell(deviceId, cmd, timeout=90):
+ SdbManager._checkDevIdExists(deviceId)
+ shell_cmd = SDB_SHELL % deviceId + " " + cmd
+ FAIL_LOG = "#ERROR#: sdb shell failed"
+ exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(shell_cmd, timeout)
- if errLog:
- LOGGER.error(FAIL_LOG)
- LOGGER.error(str(errLog))
- if SdbManager.is_target_offline(errLog):
- raise DevNotFoundErr(_devid)
+ if exit_code is None:
+ print FAIL_LOG
+ errLog = re_stderr.read()
+ if errLog : print errLog
return None
- if outLog:
- LOGGER.info(str(outLog))
- return outLog
-
- @staticmethod
- def sdbRootOn(_devid):
- FAIL_LOG = "sdb -s %s root on failed" % _devid
- if SdbManager._checkDevIdExists(_devid) is False:
- raise DevNotFoundErr(_devid)
- root_cmd = Constants.SDB_ROOT_ON % _devid
- outLog, errLog = SdbManager.sdbCommand(root_cmd, 20)
-
- if errLog:
- LOGGER.error(FAIL_LOG)
- LOGGER.error(str(errLog))
- if SdbManager.is_target_offline(errLog):
- raise DevNotFoundErr(_devid)
- return False
- if outLog:
- LOGGER.info(str(outLog))
-
- checkLog = SdbManager.sdbShell(_devid, 'whoami')
+ else:
+ outLog = re_stdout.read()
+ errLog = re_stderr.read()
+ if errLog:
+ print FAIL_LOG
+ print errLog
+ return None
+ if outLog:
+ print outLog
+ return outLog
+
+ @staticmethod
+ def sdbRootOn(deviceId):
+ SdbManager._checkDevIdExists(deviceId)
+ FAIL_LOG = "#ERROR#: sdb root on failed"
+ is_pass = False
+ root_cmd = SDB_ROOT_ON % deviceId
+ exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(root_cmd, 20)
+
+ if exit_code is None:
+ print FAIL_LOG
+ errLog = re_stderr.read()
+ if errLog: print (str(errLog))
+
+ checkLog = SdbManager.sdbShell(deviceId, 'whoami')
if checkLog and checkLog.find('root') != -1:
- LOGGER.info('root on')
+ print 'SDB ROOT ON'
+ is_pass = True
else:
- LOGGER.error(FAIL_LOG)
+ print FAIL_LOG
+ print str(checkLog)
- @staticmethod
- def sdbDevices():
- FAIL_LOG = "sdb devices failed"
- dev_cmd = Constants.SDB_DEVICES
- outLog, errLog = SdbManager.sdbCommand(dev_cmd, 5)
-
- if errLog:
- LOGGER.error(FAIL_LOG)
- LOGGER.error(str(errLog))
- return None
- if outLog:
- LOGGER.info(str(outLog))
- return outLog
-
- @staticmethod
- def getCapabilityBack(deviceId, tizenVer):
- remote = Constants.getDEVICE_CAPABILITY_PATH(tizenVer)
- local = Constants.LOCAL_CAPABILITY_PATH % deviceId
- return SdbManager.sdbPull(deviceId, remote, local)
+ return is_pass
@staticmethod
- def recoveryDevice(_tizenV, _deviceid):
- if _tizenV:
- tinycmd = "python " + Constants.DEVICE_HEALTH_CMD % _tizenV + \
- " --check --proc=tinyweb --deviceid=" + _deviceid
- stubcmd = "python " + Constants.DEVICE_HEALTH_CMD % _tizenV + \
- " --check --proc=testkit-stub --deviceid=" + _deviceid
- outLog, errLog = SdbManager.sdbCommand(tinycmd)
- if errLog: LOGGER.error(str(errLog))
- if outLog: LOGGER.info(str(outLog))
-
- outLog, errLog = SdbManager.sdbCommand(stubcmd)
- if errLog: LOGGER.error(str(errLog))
- if outLog: LOGGER.info(str(outLog))
-
- @staticmethod
- def checkDirExist(_devid, _dirname):
- lscmd = "ls -al " + _dirname
- outLog = SdbManager.sdbShell(_devid, lscmd)
- if outLog and outLog.find("No such file or directory") > -1:
- return False
- elif outLog is None:
- return False
- return True
-
- @staticmethod
- def findDumpPath(_devid):
- find_cmd = "sdb -s %s shell cat /etc/tizen-platform.conf | grep TZ_SYS_CRASH_ROOT" % _devid
- outLog, errLog = SdbManager.sdbCommand(find_cmd)
-
- if not outLog:
- return None
-
- result = outLog.split('\n')
- LOGGER.debug(str(result))
- if len(result) < 2:
- LOGGER.error("Cannot parsing Crash dump path")
+ def sdbDevices():
+ dev_cmd = SDB_DEVICES
+ start_server_cmd = SDB_START_SERVER
+ SdbManager.sdbCommand(start_server_cmd, 8)
+ exit_code, re_stdout, re_stderr = SdbManager.sdbCommand(dev_cmd, 20)
+ if exit_code is None:
+ print "#ERROR#: %s" % re_stderr.read()
return None
- else:
- crash_root = result[0].split('=')[1].strip()
- dump = result[1].split('=')[1].replace('${TZ_SYS_CRASH_ROOT}', '').strip()
- return crash_root + dump
- @staticmethod
- def resetDumpFiles(_devid):
- dumpPath = SdbManager.findDumpPath(_devid)
- if dumpPath is None:
- return False
- if SdbManager.checkDirExist(_devid, dumpPath):
- rmcmd = "rm -f " + dumpPath + "/*"
- SdbManager.sdbShell(_devid, rmcmd)
+ return re_stdout.read()
@staticmethod
- def deleteDumpFiles(_devid, dump_files):
- dumpPath = SdbManager.findDumpPath(_devid)
- if dumpPath is None:
- return False
- if SdbManager.checkDirExist(_devid, dumpPath):
- for file in dump_files:
- rmcmd = "rm -f " + dumpPath + "/" + file
- SdbManager.sdbShell(_devid, rmcmd)
+ def sdbSetDate(devid):
+ print "set device date"
+ HDATE = '`date "+%Y-%m-%d %H:%M"`'
+ date_cmd = '"date -s \''+ HDATE + '\'"'
+ SdbManager.sdbShell(devid, date_cmd)
@staticmethod
- def exportDumpFiles(_devid, _resultDir):
- dumpPath = SdbManager.findDumpPath(_devid)
- if dumpPath is None:
- return False
- isExist = SdbManager.checkDirExist(_devid, dumpPath)
- if isExist:
- if not os.path.exists(_resultDir):
- os.makedirs(_resultDir)
- chk_crash_mgr_cmd = "ps -ef | grep -v grep | grep crash-manager"
- while True:
- outs = SdbManager.sdbShell(_devid, chk_crash_mgr_cmd)
- if outs is None:
- break
- else:
- wait_time = 10
- for loop in range(wait_time):
- LOGGER.info("Waiting for crash-manager {0}...".\
- format(wait_time - loop))
- time.sleep(1)
-
- remote = dumpPath
- res = SdbManager.sdbPull(_devid, remote, _resultDir)
- if res:
- dump_files = []
- for file in os.listdir(_resultDir):
- dump_files.append(file)
- return dump_files
- else:
- LOGGER.error("Not Found crash dump directory : %s" % dumpPath)
-
- @staticmethod
- def is_target_offline(log):
- if log:
- if str(log).find('target not found') > -1 or \
- str(log).find('target offline') > -1:
- return True
+ def killall(ppid):
+ """Kill all children process by parent process ID"""
+ os_ver = platform.system()
+ try:
+ if os_ver == "Linux" or os_ver == "Darwin":
+ ppid = str(ppid)
+ pidgrp = []
+
+ def getchildpids(ppid):
+ """Return a list of children process"""
+ command = "ps -ef | awk '{if ($3 == %s) print $2;}'" % str(
+ ppid)
+ pids = os.popen(command).read()
+ pids = pids.split()
+ return pids
+
+ pidgrp.extend(getchildpids(ppid))
+ for pid in pidgrp:
+ pidgrp.extend(getchildpids(pid))
+ # Insert self process ID to PID group list
+ pidgrp.insert(0, ppid)
+ while len(pidgrp) > 0:
+ pid = pidgrp.pop()
+ try:
+ os.kill(int(pid), signal.SIGKILL)
+ except OSError, error:
+ pattern = re.compile('No such process')
+ match = pattern.search(str(error))
+ if not match:
+ print "[ Error: fail to kill pid: %s," \
+ " error: %s ]\n" % (int(pid), error)
+ # kill for windows platform
else:
- return False
- return False
+ kernel32 = ctypes.windll.kernel32
+ handle = kernel32.OpenProcess(1, 0, int(ppid))
+ kernel32.TerminateProcess(handle, 0)
+ except OSError, error:
+ pattern = re.compile('No such process')
+ match = pattern.search(str(error))
+ if not match:
+ print "[ Error: fail to kill pid: %s, error: %s ]\n" \
+ % (int(ppid), error)
+ return None