@staticmethod
def sdbCommand(command, timeout=90):
+ max_retry = 20
+ retry_attempt = 0
+ exit_code = None
+ re_stdout = None
+ re_stderr = None
+
+ while retry_attempt < max_retry:
+ exit_code, re_stdout, re_stderr = \
+ SdbManager.sdbCommand_internal(command, timeout)
+ print "exit_code: %s, retry_attempt: %d, max_retry: %d" % \
+ (str(exit_code), retry_attempt, max_retry)
+ #print "stdout: %s, stderr: %s" % \
+ # (str(re_stdout.readline()), str(re_stderr.readline()))
+ if exit_code == 0:
+ break
+ else:
+ retry_attempt += 1
+ if (retry_attempt % 5) == 4:
+ SdbManager.sdbCommand_internal(SDB_KILL_SERVER, timeout)
+ continue
+
+ if exit_code is None:
+ SdbManager.killall(proc.pid)
+
+ return exit_code, re_stdout, re_stderr
+
+ @staticmethod
+ def sdbCommand_internal(command, timeout=90):
print command
proc = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- time_out = 0
+ time_out = 0
exit_code = None
while time_out < timeout:
exit_code = proc.poll()
if exit_code is not None:
break
- time_out += 0.5
+ time_out += 0.5
time.sleep(0.5)
- if exit_code is None:
- SdbManager.killall(proc.pid)
return exit_code, proc.stdout, proc.stderr
+import os
import time
import threading
from multiprocessing import Process, Queue, Manager
from .logmanager import LOGGER
+def wait_time(_time):
+ for loop in range(_time):
+ LOGGER.debug("Wait {} seconds......".format(_time-loop))
+ time.sleep(1)
+
+
class AutoPlanExecutor(threading.Thread):
def __init__(self, plan, threadCounter, isPreconSet, envManager, \
self.rebooting.get()
def run(self):
- LOGGER.debug("Starting AutoPlanDevExecutor, with [device]: " \
- + self.deviceId)
+ LOGGER.debug("Starting AutoPlanDevExecutor|DevID:{},PID:{}".\
+ format(self.deviceId, str(os.getpid())))
suite = None
is_reboot_timeout = False
while True:
if self.isKilled():
- LOGGER.debug("finished AutoPlanDevExecutor, with [device]: " \
- + self.deviceId)
+ LOGGER.debug("finished AutoPlanDevExecutor|DevID:{},PID:{}".\
+ format(self.deviceId, str(os.getpid())))
break
if not self.devmgr.isDeviceAvailable(self.deviceId) and \
return
suiteName = suite.getSuiteName()
- LOGGER.info("Start to execute auto suite:%s" % suiteName)
+ LOGGER.info("Start to execute|suite:{},devid:{},pid:{}".\
+ format(suiteName, self.deviceId, os.getpid()))
LOGGER.debug("current suite Auto count : %s " % suite.getAutoNum())
LOGGER.info('Checking if the suite ' + suiteName \
LOGGER.info('Uninstalling the suite ' + suiteName)
suite.unInstallSuite(self.deviceId, self.remTmpDir)
- LOGGER.info("Finished to execute auto suite:%s" % suiteName)
+ LOGGER.info("Finished to execute|suite:{},devid:{},pid:{}".\
+ format(suiteName, self.deviceId, os.getpid()))
+ wait_time(10)
def auto_test_inhost(self):
suite = self.suite
if skip_package and str(skip_package).find(packName) > -1:
LOGGER.info("[skip package : %s]" % packName)
else:
- is_make_ne = False
- if make_ne_package and str(make_ne_package).find(packName) > -1:
- is_make_ne = True
+ is_make_ne = 0
+ if make_ne_package and str(make_ne_package).find(suite_name) > -1:
+ issue_num = _pars_issue_num(make_ne_package[0], suite_name)
+ is_make_ne = int(issue_num)
suite = TctShellSuite(packName, None, \
xml_suite.find("auto_tcn").text, \
elif auto_num == 0 and manual_num == 0:
pass
else:
- is_make_ne = False
+ is_make_ne = 0
if make_ne_package and str(make_ne_package).find(suite_name) > -1:
- is_make_ne = True
+ issue_num = _pars_issue_num(make_ne_package[0], suite_name)
+ is_make_ne = int(issue_num)
suite = TctShellSuite(suite_name, None, auto_num, manual_num, \
suite_pkg_name, suite_launcher, suite_category, \
suite_pkg_name = os.path.join(suite_profile, \
os.path.basename(suite_path))
- is_make_ne = False
+ is_make_ne = 0
if make_ne_package and str(make_ne_package).find(suite_name) > -1:
- is_make_ne = True
+ issue_num = _pars_issue_num(make_ne_package[0], suite_name)
+ is_make_ne = int(issue_num)
suite = TctShellSuite(suite_name, tc_name, auto_num, manual_num, \
suite_pkg_name, suite_launcher, suite_category, \
if skip_package and str(skip_package).find(suite_name) > -1:
LOGGER.error("[skip package : %s]" % suite_name)
else:
- is_make_ne = False
+ is_make_ne = 0
if make_ne_package and str(make_ne_package).find(suite_name) > -1:
- is_make_ne = True
+ issue_num = _pars_issue_num(make_ne_package[0], suite_name)
+ is_make_ne = int(issue_num)
suite = TctShellSuite(suite_name, None, \
xml_suite.find("auto_tcn").text, xml_suite.find("manual_tcn").text,\
if skip_package and str(skip_package).find(suite_name) > -1:
LOGGER.error("[skip package : %s]" % suite_name)
else:
- is_make_ne = False
- if make_ne_package and str(make_ne_package).find(suite_name) > -1:
- is_make_ne = True
+ is_make_ne = 0
+ if make_ne_package and \
+ str(make_ne_package).find(suite_name) > -1:
+ issue_num = _pars_issue_num(make_ne_package[0], \
+ suite_name)
+ is_make_ne = int(issue_num)
suite = TctShellSuite(suite_name, None, auto_num, manual_num, \
suite_pkg_name, suite_launcher, suite_category, \
else:
return False
+
+def _pars_issue_num(_make_ne_package, _suite_name):
+ ne_pack_list = _make_ne_package.split(',')
+ for _pn in ne_pack_list:
+ if _pn.find(_suite_name) > -1:
+ return _pn.split(':')[1]
+ return 0
+
+
class IntentionGenerator:
# return : bool
@staticmethod
#PARAM tizenV = 'tizen_web_2.4'
def __init__(self, suiteName, tcName, auto_num, manual_num, \
suite_pkg_name, launcher, category, tizenV, skip_count, skip_tc, \
- devmgr, is_make_ne=False):
+ devmgr, is_make_ne=0):
self.suiteName = suiteName
self.tcName = tcName
self.auto_num = auto_num
if isSkip:
lcmd += 'None' + " "
- elif self.is_make_ne:
- lcmd += 'NE_{}'.format(str(deviceId)) + " "
+ elif self.is_make_ne != 0:
+ lcmd += 'NE:{}_{}'.format(str(self.is_make_ne), \
+ str(deviceId)) + " "
else:
lcmd += str(deviceId) + " "
def __init__(self, device_id=None):
self.deviceid = device_id
- self.is_NE = False
- if self.deviceid and self.deviceid.find('NE_') > -1:
- self.deviceid = self.deviceid.replace('NE_', '')
- self.is_NE = True
+ self.is_NE = 0
+ if self.deviceid and self.deviceid.find('NE') > -1:
+ self.is_NE = int(self.deviceid.split(':')[1].split('_')[0])
+ self.deviceid = self.deviceid.split('_')[1]
self._wrt = False
if devid.find('None') > -1:
test_case['stdout'] = "target not found"
else:
- test_case['stdout'] = 'precondition failed'
+ issue_num = int(self.conn.is_NE_mode())
+ err_msg = "precondition failed "
+ if issue_num == 1:
+ err_msg += "(not connected to wifi)"
+ elif issue_num == 2:
+ err_msg += "(not set email account)"
+ elif issue_num == 3:
+ err_msg += "(not available device network)"
+ elif issue_num == 4:
+ err_msg += "(not enable bluetooth)"
+ elif issue_num == 5:
+ err_msg += "(not enable nfc)"
+ elif issue_num == 6:
+ err_msg += "(not available sim card)"
+ test_case['stdout'] = err_msg
result_list.append(test_case)
def __init__(self, device_id=None):
self.deviceid = device_id
- self.is_NE = False
- if self.deviceid and str(self.deviceid).find('NE_') > -1:
- self.deviceid = self.deviceid.replace('NE_', '')
- self.is_NE = True
+ self.is_NE = 0
+ if self.deviceid and str(self.deviceid).find('NE') > -1:
+ self.is_NE = int(self.deviceid.split(':')[1].split('_')[0])
+ self.deviceid = self.deviceid.split('_')[1]
self._wrt = False
self._xwalk = False
self.support_remote = True
if devid.find('None') > -1:
test_case['stdout'] = "target not found"
else:
- test_case['stdout'] = 'precondition failed'
+ issue_num = int(self.conn.is_NE_mode())
+ err_msg = "precondition failed "
+ if issue_num == 1:
+ err_msg += "(not connected to wifi)"
+ elif issue_num == 2:
+ err_msg += "(not set email account)"
+ elif issue_num == 3:
+ err_msg += "(not available device network)"
+ elif issue_num == 4:
+ err_msg += "(not enable bluetooth)"
+ elif issue_num == 5:
+ err_msg += "(not enable nfc)"
+ elif issue_num == 6:
+ err_msg += "(not available sim card)"
+ test_case['stdout'] = err_msg
result_list.append(test_case)