--- /dev/null
+[LOGGING]
+log_level=INFO
+
+[EXTENSION]
+crosswalk=--external-extensions-path=/usr/lib/tizen-extensions-crosswalk
\ No newline at end of file
[public_version]
-version=2.3.22
+version=3.0.7
[internal_version]
-version=2.3.22
+version=3.0.7
# GNU General Public License for more details.
#
# Authors:
-# Yuanyuan,Zou <zouyuanx@intel.com>
+# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
SUBDIRS = impl
stderr=subprocess.PIPE)
time_cnt = 0
exit_code = None
+ result = []
while time_cnt < timeout:
exit_code = proc.poll()
if not exit_code is None:
exit_code = -1
result = []
else:
- result = proc.stdout.readlines() or proc.stderr.readlines()
+ if not cmd.endswith('&'):
+ result = proc.stdout.readlines() or proc.stderr.readlines()
return [exit_code, result]
if timeout is not None:
timeout -= 0.1
if timeout <= 0:
- try:
- exit_code = "timeout"
- cmd_open.terminate()
- time.sleep(5)
- except OSError:
- killall(cmd_open.pid)
+ exit_code = "timeout"
+ killall(cmd_open.pid)
+ time.sleep(3)
break
time.sleep(0.1)
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2013 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Liu,chengtao <liux.chengtao@intel.com>
+"""General Config Class"""
+
+import os
+import ConfigParser
+from ConfigParser import NoOptionError, NoSectionError
+
+CONFIG_FILE = "/opt/testkit/lite/commodule/CONFIG"
+cfg = ConfigParser.ConfigParser()
+if os.path.exists(CONFIG_FILE):
+ cfg.read(CONFIG_FILE)
+else:
+ cfg.read(os.path.join(os.getcwd(), "CONFIG"))
+
+
+class Config:
+
+ LOG_LEVEL = cfg.get('LOGGING', 'log_level')
+
+ @staticmethod
+ def get_extension(extension_name):
+ ret = ""
+ try:
+ if extension_name is not None:
+ ret = cfg.get('EXTENSION', extension_name)
+ except (NoOptionError, NoSectionError) as error:
+ pass
+ except IOError as error:
+ pass
+ return ret
from .log import LOGGER
+class InvalidDeviceException(Exception):
+ """
+ Device_Id not defined / Invalid Exception
+ """
+ __data = ""
+ def __init__(self, data):
+ self.__data = data
+
+ def __str__(self):
+ return self.__data
+
class Connector:
else:
self.conn = get_target_conn()
except Exception as error:
- LOGGER.error("[Error: Failed to initilize com-module,"
+ LOGGER.error("[ Error: Initialize communication failed,"
" exception: % s]\n" % error)
def get_connector(self):
# GNU General Public License for more details.
#
# Authors:
-# Yuanyuan,Zou <zouyuanx@intel.com>
+# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
commoduleimpldir = /usr/lib/python2.7/dist-packages/commodule/impl
from commodule.log import LOGGER
from commodule.autoexec import shell_command, shell_command_ext
from commodule.killall import killall
+from commodule.connector import InvalidDeviceException
+
LOCAL_HOST_NS = "127.0.0.1"
APP_QUERY_STR = "adb -s %s shell ps | grep %s | awk '{print $2}' "
APK_INSTALL = "adb -s %s shell pm install %s"
APK_UNINSTALL = "adb -s %s shell pm uninstall %s"
APK_LIST = "adb -s %s shell pm list packages |grep '%s'|cut -d ':' -f2"
-DLOG_CLEAR = "adb -s %s shell logcat -c"
-DLOG_WRT = "adb -s %s shell logcat -v time"
+APP_NONBLOCK_STR = "adb -s %s shell '%s' &"
APP_START = "adb -s %s shell am start -n %s"
APP_STOP = "adb -s %s shell am force-stop %s"
-XWALK_APP = "org.xwalk.%s/.%sActivity"
+XWALK_APP_STR = "org.xwalk.%s/.%sActivity"
+
+LOGCAT_CLEAR = "adb -s %s shell logcat -c"
+LOGCAT_START = "adb -s %s shell logcat -v time"
+DMESG_CLEAR = "adb -s %s shell dmesg -c"
+DMESG_START = "adb -s %s shell cat /proc/kmsg"
def debug_trace(cmdline, logfile):
global debug_flag, metux
- wbuffile = file(logfile, "w")
+ wbuffile = file(logfile, "a")
import subprocess
exit_code = None
proc = subprocess.Popen(args=cmdline,
else:
return True
- def get_launcher_opt(self, test_launcher, test_suite, test_set, fuzzy_match, auto_iu):
+ def get_launcher_opt(self, test_launcher, test_ext, test_widget, test_suite, test_set):
"""get test option dict """
test_opt = {}
test_opt["suite_name"] = test_suite
test_opt["launcher"] = test_launcher
- test_opt["test_app_id"] = test_launcher
- if test_launcher.startswith('xwalk'):
+ if test_launcher.find('xwalk') >= 0:
+ if test_widget is not None and test_widget != "":
+ test_suite = test_widget
test_suite = test_suite.replace('-', '_')
- test_opt["test_app_id"] = XWALK_APP % (test_suite, test_suite)
+ test_opt["test_app_id"] = XWALK_APP_STR % (test_suite, test_suite)
+ else:
+ test_opt["test_app_id"] = test_launcher
return test_opt
def get_server_url(self, remote_port="8000"):
global debug_flag, metux
debug_flag = True
metux = threading.Lock()
- cmdline = DLOG_CLEAR % self.deviceid
- exit_code, ret = shell_command(cmdline)
- cmdline = DLOG_WRT % self.deviceid
- threading.Thread(target=debug_trace, args=(cmdline, dlogfile)).start()
+ logcat_cmd = LOGCAT_CLEAR % self.deviceid
+ exit_code, ret = shell_command(logcat_cmd)
+ dmesg_cmd = DMESG_CLEAR % self.deviceid
+ exit_code, ret = shell_command(logcat_cmd)
+ logcat_cmd = LOGCAT_START % self.deviceid
+ dmesg_cmd = DMESG_START % self.deviceid
+ threading.Thread(target=debug_trace, args=(logcat_cmd, dlogfile+'.logcat')).start()
+ threading.Thread(target=debug_trace, args=(dmesg_cmd, dlogfile+'.dmesg')).start()
def stop_debug(self):
global debug_flag, metux
metux.release()
def launch_app(self, wgt_name):
+ blauched = False
if wgt_name.find('xwalk') != -1:
timecnt = 0
blauched = False
break
timecnt += 1
time.sleep(3)
- return blauched
else:
- exit_code, ret = self.shell_cmd(wgt_name)
- return True
+ cmdline = APP_NONBLOCK_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ time.sleep(3)
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ if ret and len(ret):
+ blauched = True
+ return blauched
def kill_app(self, wgt_name):
pkg_name = wgt_name.split('/')[0]
install_app = install_package
uninstall_app = uninstall_package
+ def get_buildinfo(self):
+ """ get builf info"""
+ build_info = {}
+ build_info['buildid'] = ''
+ build_info['manufacturer'] = ''
+ build_info['model'] = ''
+ return build_info
+
def get_target_conn(device_id=None):
""" Get connection for Test Target"""
if device_id is None:
dev_list = _get_device_ids()
- device_id = dev_list[0] if len(dev_list) else None
+ if len(dev_list):
+ device_id = dev_list[0]
+ else:
+ raise InvalidDeviceException('No android device found!')
return AndroidMobile(device_id)
# GNU General Public License for more details.
#
# Authors:
-# Liu,chengtao <chengtaox.liu@intel.com>
+# Chengtao,Liu <chengtaox.liu@intel.com>
""" The implementation of local host communication"""
# copyfile(local_path, remote_path)
return False
- def get_launcher_opt(self, test_launcher, test_suite, test_set, fuzzy_match, auto_iu):
+ def get_launcher_opt(self, test_launcher, test_ext, test_widget, test_suite, test_set):
"""get test option dict """
test_opt = {}
test_opt["suite_name"] = test_suite
def stop_debug(self):
pass
+ def get_buildinfo(self):
+ """ get builf info"""
+ build_info = {}
+ build_info['buildid'] = ''
+ build_info['manufacturer'] = ''
+ build_info['model'] = ''
+ return build_info
+
def get_target_conn():
""" Get connection for Test Target"""
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Chengtao,Liu <chengtaox.liu@intel.com>
+
+""" The implementation of tizen IVI communication"""
+
+import os
+import time
+import socket
+import threading
+import re
+from shutil import copyfile
+
+from commodule.log import LOGGER
+from commodule.autoexec import shell_command, shell_command_ext
+from commodule.killall import killall
+from commodule.connector import InvalidDeviceException
+
+
+HOST_NS = "127.0.0.1"
+os.environ['no_proxy'] = HOST_NS
+RPM_INSTALL = "ssh %s rpm -ivh %s"
+RPM_UNINSTALL = "ssh %s rpm -e %s"
+RPM_LIST = "ssh %s rpm -qa | grep tct"
+APP_QUERY_STR = "ssh %s \"ps aux |grep '%s'|grep -v grep\"|awk '{print $2}'"
+APP_KILL_STR = "ssh %s kill -9 %s"
+WRT_QUERY_STR = "ssh %s \"wrt-launcher -l|grep '%s'|grep -v grep\"|awk '{print $2\":\"$NF}'"
+WRT_START_STR = "ssh %s wrt-launcher -s %s"
+WRT_STOP_STR = "ssh %s wrt-launcher -k %s"
+WRT_INSTALL_STR = "ssh %s wrt-installer -i %s"
+WRT_UNINSTL_STR = "ssh %s wrt-installer -un %s"
+WGT_LOCATION = "/opt/usr/media/tct/opt/%s/%s.wgt"
+
+
+class tizenIVI:
+
+ """ Implementation for transfer data
+ between Host and tizenivi PC
+ """
+
+ def __init__(self, deviceid="root@127.0.0.1"):
+ self.deviceid = deviceid
+
+ def shell_cmd(self, cmd="", timeout=15):
+ cmd = "ssh %s %s" % (self.deviceid, cmd)
+ return shell_command(cmd, timeout)
+
+ def check_process(self, process_name):
+ exit_code, ret = shell_command(APP_QUERY_STR % (self.deviceid, process_name))
+ return len(ret)
+
+ def launch_stub(self, stub_app, stub_port="8000", debug_opt=""):
+ cmdline = "%s --port:%s %s" % (stub_app, stub_port, debug_opt)
+ exit_code, ret = self.shell_cmd(cmdline)
+ time.sleep(2)
+
+ def shell_cmd_ext(self,
+ cmd="",
+ timeout=None,
+ boutput=False,
+ stdout_file=None,
+ stderr_file=None):
+ cmd = "ssh %s '%s; echo returncode=$?'" % (self.deviceid, cmd)
+ return shell_command_ext(cmd, timeout, boutput, stdout_file, stderr_file)
+
+ def shell_cmd_host(self,
+ cmd="",
+ timeout=None,
+ boutput=False,
+ stdout_file=None,
+ stderr_file=None):
+ cmd = cmd.replace("$deviceid", self.deviceid)
+ return shell_command_ext(cmd, timeout, boutput, stdout_file, stderr_file)
+
+ def get_device_ids(self):
+ """
+ get deivce list of ids
+ """
+ return ['localhost']
+
+ def get_device_info(self):
+ """
+ get tizenivi deivce inforamtion
+ """
+ device_info = {}
+ resolution_str = ""
+ screen_size_str = ""
+ device_model_str = ""
+ device_name_str = ""
+ build_id_str = ""
+ os_version_str = ""
+
+ # get resolution and screen size
+ exit_code, ret = shell_command("ssh %s xrandr" % self.deviceid)
+ pattern = re.compile("connected (\d+)x(\d+).* (\d+mm) x (\d+mm)")
+ for line in ret:
+ match = pattern.search(line)
+ if match:
+ resolution_str = "%s x %s" % (match.group(1), match.group(2))
+ screen_size_str = "%s x %s" % (match.group(3), match.group(4))
+
+ # get architecture
+ exit_code, ret = shell_command("ssh %s uname -m" % self.deviceid)
+ if len(ret) > 0:
+ device_model_str = ret[0]
+
+ # get hostname
+ exit_code, ret = shell_command("ssh %s uname -n" % self.deviceid)
+ if len(ret) > 0:
+ device_name_str = ret[0]
+
+ # get os version
+ exit_code, ret = shell_command("ssh %s cat /etc/issue" % self.deviceid)
+ for line in ret:
+ if len(line) > 1:
+ os_version_str = "%s %s" % (os_version_str, line)
+
+ # get build id
+ exit_code, ret = shell_command("ssh %s cat /etc/os-release" % self.deviceid)
+ for line in ret:
+ if line.find("BUILD_ID=") != -1:
+ build_id_str = line.split('=')[1].strip('\"\r\n')
+
+ os_version_str = os_version_str[0:-1]
+ device_info["device_id"] = self.deviceid
+ device_info["resolution"] = resolution_str
+ device_info["screen_size"] = screen_size_str
+ device_info["device_model"] = device_model_str
+ device_info["device_name"] = device_name_str
+ device_info["os_version"] = os_version_str
+ device_info["build_id"] = build_id_str
+ return device_info
+
+ def get_server_url(self, remote_port="8000"):
+ """get server url"""
+ remote_ip = self.deviceid
+ remote_ip = remote_ip.split('@')[1]
+ os.environ['no_proxy'] = remote_ip
+ url_forward = "http://%s:%s" % (remote_ip, remote_port)
+ return url_forward
+
+ def install_package(self, pkgpath):
+ """
+ install a package on tizenivi device
+ """
+ cmd = RPM_INSTALL % (self.deviceid, pkgpath)
+ exit_code, ret = shell_command(cmd)
+ return ret
+
+ def install_package(self, pkgname):
+ """
+ install a package on tizenivi device
+ """
+ cmd = RPM_UNINSTALL % (self.deviceid, pkgname)
+ exit_code, ret = shell_command(cmd)
+ return ret
+
+ def get_installed_package(self):
+ """get list of installed package from device"""
+ cmd = RPM_LIST % (self.deviceid)
+ exit_code, ret = shell_command(cmd)
+ return ret
+
+ def download_file(self, remote_path, local_path):
+ """download file"""
+ local_path_dir = os.path.dirname(local_path)
+ if not os.path.exists(local_path_dir):
+ os.makedirs(local_path_dir)
+ cmd = "scp %s:%s %s" % (self.deviceid, remote_path, local_path)
+ exit_code, ret = shell_command(cmd)
+ if not os.path.exists(local_path):
+ return False
+ return True
+
+ def upload_file(self, remote_path, local_path):
+ """upload file"""
+ cmd = "scp %s %s:%s" % (local_path, self.deviceid, remote_path)
+ exit_code, ret = shell_command(cmd)
+ return True
+
+ def get_launcher_opt(self, test_launcher, test_ext, test_widget, test_suite, test_set):
+ """get test option dict """
+ test_opt = {}
+ test_opt["suite_name"] = test_suite
+ test_opt["launcher"] = test_launcher
+ test_opt["test_app_id"] = test_suite
+ cmd = ""
+ if test_launcher.find('WRTLauncher') != -1:
+ test_app_id = None
+ client_cmds = test_launcher.strip().split()
+ wrt_tag = client_cmds[1] if len(client_cmds) > 1 else ""
+ test_opt['fuzzy_match'] = fuzzy_match = wrt_tag.find('z') != -1
+ test_opt['auto_iu'] = auto_iu = wrt_tag.find('iu') != -1
+ test_opt['self_exec'] = wrt_tag.find('a') != -1
+ test_opt['self_repeat'] = wrt_tag.find('r') != -1
+ test_opt["launcher"] = "wrt-launcher"
+ # test suite need to be installed
+ if auto_iu:
+ test_wgt = test_set
+ test_wgt_path = WGT_LOCATION % (test_suite, test_set)
+ if not self.install_app(test_wgt_path):
+ LOGGER.info("[ failed to install widget \"%s\" in target ]"
+ % test_wgt)
+ return None
+ else:
+ test_wgt = test_suite
+
+ # query the whether test widget is installed ok
+ cmd = WRT_QUERY_STR % (self.deviceid, test_wgt)
+ exit_code, ret = shell_command(cmd)
+ if exit_code == -1:
+ return None
+ print 'id', ret
+ for line in ret:
+ items = line.split(':')
+ if len(items) < 1:
+ continue
+ if (fuzzy_match and items[0].find(test_wgt) != -1) or items[0] == test_wgt:
+ test_app_id = items[1].strip('\r\n')
+ break
+
+ if test_app_id is None:
+ LOGGER.info("[ test widget \"%s\" not found in target ]"
+ % test_wgt)
+ return None
+ else:
+ test_opt["test_app_id"] = test_app_id
+ return test_opt
+
+ def start_debug(self, dlogfile):
+ global debug_flag, metux
+ debug_flag = True
+
+ def stop_debug(self):
+ global debug_flag
+ debug_flag = False
+
+ def launch_app(self, wgt_name):
+ timecnt = 0
+ blauched = False
+ print 'widget', wgt_name
+ cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ cmdline = WRT_START_STR % (self.deviceid, wgt_name)
+ while timecnt < 3:
+ exit_code, ret = shell_command(cmdline)
+ if len(ret) > 0 and ret[0].find('launched') != -1:
+ blauched = True
+ break
+ timecnt += 1
+ time.sleep(3)
+ return blauched
+
+ def kill_app(self, wgt_name):
+ cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ return True
+
+ def install_app(self, wgt_path="", timeout=90):
+ cmd = WRT_INSTALL_STR % (self.deviceid, wgt_path)
+ exit_code, ret = shell_command(cmd, timeout)
+ if exit_code == -1:
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_path)
+ exit_code, ret = shell_command(cmd)
+ for line in ret:
+ cmd = APP_KILL_STR % (self.deviceid, line.strip('\r\n'))
+ exit_code, ret = shell_command(cmd)
+ return False
+ else:
+ return True
+
+ def uninstall_app(self, wgt_name):
+ cmd = WRT_UNINSTL_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ return True
+
+ def get_buildinfo(self):
+ """ get builf info"""
+ build_info = {}
+ build_info['buildid'] = ''
+ build_info['manufacturer'] = ''
+ build_info['model'] = ''
+ return build_info
+
+
+def get_target_conn(deviceid=None):
+ """ Get connection for Test Target"""
+ if deviceid is None or '@' not in deviceid:
+ raise InvalidDeviceException('deviceid("username@ip") required by TIZEN-IVI device!')
+ return tizenIVI(deviceid)
import threading
import re
import shutil
+import xml.etree.ElementTree as etree
from commodule.log import LOGGER
from commodule.autoexec import shell_command, shell_command_ext
from commodule.killall import killall
+from commodule.config import Config
+from commodule.connector import InvalidDeviceException
LOCAL_HOST_NS = "127.0.0.1"
+BUILD_INFO_FILE = '/opt/usr/media/Documents/tct/buildinfo.xml'
RPM_INSTALL = "sdb -s %s shell rpm -ivh %s"
RPM_UNINSTALL = "sdb -s %s shell rpm -e %s"
RPM_LIST = "sdb -s %s shell \"rpm -qa|grep tct\""
APP_QUERY_STR = "sdb -s %s shell \"ps aux|grep '%s'|grep -v grep\"|awk '{print $2}'"
APP_KILL_STR = "sdb -s %s shell kill -9 %s"
+APP_NONBLOCK_STR = "sdb -s %s shell '%s' &"
+
+
+# wrt-launcher constants
+WRT_MAIN = "wrt-launcher"
WRT_QUERY_STR = "sdb -s %s shell wrt-launcher -l | grep '%s'|awk '{print $2\":\"$NF}'"
WRT_START_STR = "sdb -s %s shell 'wrt-launcher -s %s; echo returncode=$?'"
WRT_STOP_STR = "sdb -s %s shell wrt-launcher -k %s"
WRT_INSTALL_STR = "sdb -s %s shell wrt-installer -i %s"
WRT_UNINSTL_STR = "sdb -s %s shell wrt-installer -un %s"
-DLOG_CLEAR = "sdb -s %s shell dlogutil -c"
-DLOG_WRT = "sdb -s %s shell dlogutil WRT:D -v time"
+WRT_LOCATION = "/opt/usr/media/tct/opt/%s/%s.wgt"
+# crosswalk constants
+XWALK_MAIN = "xwalk"
+XWALK_QUERY_STR = "sdb -s %s shell su - app -c 'export XDG_RUNTIME_DIR=\"/run/user/5000\";xwalk --list-apps' | grep %s | awk '{print $(NF-1)}'"
+XWALK_START_STR = "sdb -s %s shell su - app -c 'export XDG_RUNTIME_DIR=\"/run/user/5000\";xwalk --allow-file-access-from-files %s %s' &"
+XWALK_INSTALL_STR = "sdb -s %s shell su - app -c 'export XDG_RUNTIME_DIR=\"/run/user/5000\";xwalk --install %s'"
+XWALK_UNINSTL_STR = "sdb -s %s shell su - app -c 'export XDG_RUNTIME_DIR=\"/run/user/5000\";xwalk --uninstall %s'"
+XWALK_LOCATION = "/opt/usr/media/tct/opt/%s/%s.wgt"
+
+# dlog constants
+DLOG_CLEAR = "sdb -s %s shell dlogutil -c"
+DLOG_WRT = "sdb -s %s shell dlogutil -v time"
def debug_trace(cmdline, logfile):
global debug_flag, metux
- wbuffile = file(logfile, "w")
+ wbuffile = file(logfile, "a")
import subprocess
exit_code = None
proc = subprocess.Popen(args=cmdline,
def __init__(self, device_id=None):
self.deviceid = device_id
self._wrt = False
+ self._xwalk = False
+ self._extension = ""
def shell_cmd(self, cmd="", timeout=15):
cmdline = "sdb -s %s shell \"%s\" " % (self.deviceid, cmd)
else:
return True
- def get_launcher_opt(self, test_launcher, test_suite, test_set, fuzzy_match, auto_iu):
+ def _get_wrt_app(self, test_suite, test_set, fuzzy_match, auto_iu):
+ test_app_id = None
+ if auto_iu:
+ test_wgt = test_set
+ test_wgt_path = WRT_LOCATION % (test_suite, test_wgt)
+ if not self.install_app(test_wgt_path):
+ LOGGER.info("[ failed to install widget \"%s\" in target ]"
+ % test_wgt)
+ return None
+ else:
+ test_wgt = test_suite
+
+ # check if widget installed already
+ cmd = WRT_QUERY_STR % (self.deviceid, test_wgt)
+ exit_code, ret = shell_command(cmd)
+ if exit_code == -1:
+ return None
+ for line in ret:
+ items = line.split(':')
+ if len(items) < 1:
+ continue
+ if (fuzzy_match and items[0].find(test_wgt) != -1) or items[0] == test_wgt:
+ test_app_id = items[1].strip('\r\n')
+ break
+
+ if test_app_id is None:
+ LOGGER.info("[ test widget \"%s\" not found in target ]"
+ % test_wgt)
+ return None
+
+ return test_app_id
+
+ def _get_xwalk_app(self, test_suite, test_set, fuzzy_match, auto_iu):
+ test_app_id = None
+ if auto_iu:
+ test_wgt = test_set
+ test_wgt_path = XWALK_LOCATION % (test_suite, test_wgt)
+ if not self.install_app(test_wgt_path):
+ LOGGER.info("[ failed to install widget \"%s\" in target ]"
+ % test_wgt)
+ return None
+ else:
+ test_wgt = test_suite
+
+ # check if widget installed already
+ cmd = XWALK_QUERY_STR % (self.deviceid, test_wgt)
+ exit_code, ret = shell_command(cmd)
+ if exit_code == -1:
+ return None
+ for line in ret:
+ test_app_id = line.strip('\r\n')
+
+ if test_app_id is None:
+ LOGGER.info("[ test widget \"%s\" not found in target ]"
+ % test_wgt)
+ return None
+
+ return test_app_id
+
+ def get_launcher_opt(self, test_launcher, test_ext, test_widget, test_suite, test_set):
"""
get test option dict
"""
test_opt = {}
+ self._wrt = False
+ self._xwalk = False
+ app_id = None
test_opt["suite_name"] = test_suite
test_opt["launcher"] = test_launcher
- test_opt["test_app_id"] = test_launcher
- self._wrt = False
- if test_launcher.find('WRTLauncher') != -1:
+ if test_widget is not None and test_widget != "":
+ test_suite = test_widget
+ if test_launcher.find('WRTLauncher') >= 0:
self._wrt = True
- cmd = ""
- test_app_id = None
- test_opt["launcher"] = "wrt-launcher"
- # test suite need to be installed by commodule
- if auto_iu:
- test_wgt = test_set
- test_wgt_path = "/opt/usr/media/tct/opt/%s/%s.wgt" % (test_suite, test_wgt)
- if not self.install_app(test_wgt_path):
- LOGGER.info("[ failed to install widget \"%s\" in target ]"
- % test_wgt)
- return None
- else:
- test_wgt = test_suite
-
- # query the whether test widget is installed ok
- cmd = WRT_QUERY_STR % (self.deviceid, test_wgt)
- exit_code, ret = shell_command(cmd)
- if exit_code == -1:
- return None
- for line in ret:
- items = line.split(':')
- if len(items) < 1:
- continue
- if (fuzzy_match and items[0].find(test_wgt) != -1) or items[0] == test_wgt:
- test_app_id = items[1].strip('\r\n')
- break
+ test_opt["launcher"] = WRT_MAIN
+ app_id = self._get_wrt_app(test_suite, test_set, fuzzy_match, auto_iu)
+ elif test_launcher.find('xwalk') >= 0:
+ self._xwalk = True
+ test_opt["launcher"] = XWALK_MAIN
+ self._extension = Config.get_extension(test_ext)
+ client_cmds = test_launcher.strip().split()
+ xpk_tag = client_cmds[1] if len(client_cmds) > 1 else ""
+ test_opt['fuzzy_match'] = fuzzy_match = xpk_tag.find('z') != -1
+ test_opt['auto_iu'] = auto_iu = xpk_tag.find('iu') != -1
+ test_opt['self_exec'] = xpk_tag.find('a') != -1
+ test_opt['self_repeat'] = xpk_tag.find('r') != -1
+ app_id = self._get_xwalk_app(test_suite, test_set, fuzzy_match, auto_iu)
+ else:
+ app_id = test_launcher
- if test_app_id is None:
- LOGGER.info("[ test widget \"%s\" not found in target ]"
- % test_wgt)
- return None
- else:
- test_opt["test_app_id"] = test_app_id
+ if app_id is None:
+ return None
+ test_opt["test_app_id"] = app_id
return test_opt
def install_package(self, pkgpath):
cmdline = DLOG_CLEAR % self.deviceid
exit_code, ret = shell_command(cmdline)
cmdline = DLOG_WRT % self.deviceid
- threading.Thread(target=debug_trace, args=(cmdline, dlogfile)).start()
+ threading.Thread(target=debug_trace, args=(cmdline, dlogfile+'.dlog')).start()
def stop_debug(self):
global debug_flag, metux
metux.release()
def launch_app(self, wgt_name):
- if not self._wrt:
- exit_code,ret = self.shell_cmd(wgt_name)
- return True
- timecnt = 0
blauched = False
- cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
- exit_code, ret = shell_command(cmdline)
- cmdline = WRT_START_STR % (self.deviceid, wgt_name)
- while timecnt < 3:
- exit_code, ret_out, ret_err = shell_command_ext(cmdline, 30)
- if exit_code == "0":
- blauched = True
- break
- timecnt += 1
+ if self._wrt:
+ timecnt = 0
+ cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ cmdline = WRT_START_STR % (self.deviceid, wgt_name)
+ while timecnt < 3:
+ exit_code, ret_out, ret_err = shell_command_ext(cmdline, 30)
+ if exit_code == "0":
+ blauched = True
+ break
+ timecnt += 1
+ time.sleep(3)
+ elif self._xwalk:
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ for line in ret:
+ cmd = APP_KILL_STR % (self.deviceid, line.strip('\r\n'))
+ exit_code, ret = shell_command(cmd)
+ cmdline = XWALK_START_STR % (self.deviceid, self._extension, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ time.sleep(3)
+ blauched = True
+ else:
+ cmdline = APP_NONBLOCK_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
time.sleep(3)
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ if ret and len(ret):
+ blauched = True
+
return blauched
def kill_app(self, wgt_name):
- if not self._wrt:
- return True
- cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
- exit_code, ret = shell_command(cmdline)
+ if self._wrt:
+ cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ elif self._xwalk:
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ for line in ret:
+ cmd = APP_KILL_STR % (self.deviceid, line.strip('\r\n'))
+ exit_code, ret = shell_command(cmd)
return True
def install_app(self, wgt_path="", timeout=90):
- cmd = WRT_INSTALL_STR % (self.deviceid, wgt_path)
+ if self._wrt:
+ cmd = WRT_INSTALL_STR % (self.deviceid, wgt_path)
+ elif self._xwalk:
+ cmd = XWALK_INSTALL_STR % (self.deviceid, wgt_path)
+ else:
+ return True
exit_code, ret = shell_command(cmd, timeout)
if exit_code == -1:
cmd = APP_QUERY_STR % (self.deviceid, wgt_path)
return True
def uninstall_app(self, wgt_name):
- cmd = WRT_UNINSTL_STR % (self.deviceid, wgt_name)
+ if self._wrt:
+ cmd = WRT_UNINSTL_STR % (self.deviceid, wgt_name)
+ elif self._xwalk:
+ cmd = XWALK_UNINSTL_STR % (self.deviceid, wgt_name)
+ else:
+ return True
exit_code, ret = shell_command(cmd)
return True
+ def get_buildinfo(self):
+ """ get builf info"""
+ build_info = {}
+ build_info['buildid'] = ''
+ build_info['manufacturer'] = ''
+ build_info['model'] = ''
+
+ builfinfo_file = os.path.expanduser("~") + os.sep + "tizen_buildinfo.xml"
+ if self.download_file(BUILD_INFO_FILE, builfinfo_file) and os.path.exists(builfinfo_file):
+ root = etree.parse(builfinfo_file).getroot()
+ for element in root.findall("buildinfo"):
+ if element is not None:
+ if element.get("name").lower() == 'buildversion':
+ child = etree.Element.getchildren(element)
+ if child and child[0].text:
+ buildid = child[0].text
+ build_info['buildid'] = buildid
+ if element.get("name").lower() == 'manufacturer':
+ child = etree.Element.getchildren(element)
+ if child and child[0].text:
+ manufacturer = child[0].text
+ build_info['manufacturer'] = manufacturer
+ if element.get("name").lower() == 'model':
+ child = etree.Element.getchildren(element)
+ if child and child[0].text:
+ model = child[0].text
+ build_info['model'] = model
+ os.remove(builfinfo_file)
+ return build_info
def get_target_conn(device_id=None):
""" Get connection for Test Target"""
if device_id is None:
dev_list = _get_device_ids()
- device_id = dev_list[0] if len(dev_list) else None
+ if len(dev_list):
+ device_id = dev_list[0]
+ else:
+ raise InvalidDeviceException('No TIZEN device found!')
return TizenMobile(device_id)
copyfile(local_path, remote_path)
return True
- def get_launcher_opt(self, test_launcher, test_suite, test_set, fuzzy_match, auto_iu):
+ def get_launcher_opt(self, test_launcher, test_ext, test_widget, test_suite, test_set):
"""get test option dict """
test_opt = {}
test_opt["suite_name"] = test_suite
test_opt["launcher"] = test_launcher
test_opt["test_app_id"] = test_suite
+ cmd = ""
if test_launcher.find('WRTLauncher') != -1:
- cmd = ""
test_app_id = None
+ client_cmds = test_launcher.strip().split()
+ wrt_tag = client_cmds[1] if len(client_cmds) > 1 else ""
+ test_opt['fuzzy_match'] = fuzzy_match = wrt_tag.find('z') != -1
+ test_opt['auto_iu'] = auto_iu = wrt_tag.find('iu') != -1
+ test_opt['self_exec'] = wrt_tag.find('a') != -1
+ test_opt['self_repeat'] = wrt_tag.find('r') != -1
test_opt["launcher"] = "wrt-launcher"
# test suite need to be installed
if auto_iu:
exit_code, ret = shell_command(cmd)
return True
+ def get_buildinfo(self):
+ """ get builf info"""
+ build_info = {}
+ build_info['buildid'] = ''
+ build_info['manufacturer'] = ''
+ build_info['model'] = ''
+ return build_info
+
def get_target_conn():
""" Get connection for Test Target"""
if os_ver == "Linux" or os_ver == "Darwin":
ppid = str(ppid)
pidgrp = []
-
+ if not ppid.isdigit():
+ return
def getchildpids(ppid):
"""Return a list of children process"""
command = "ps -ef | awk '{if ($3 == %s) print $2;}'" % str(
if msg is not None:
self._logger.critical(msg)
+from .config import Config
-LOGGER = Logger.get_logger(level=os.environ.get('LOG_LEVEL', 'INFO'))
+LOGGER = Logger.get_logger(level=Config.LOG_LEVEL)
--- /dev/null
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Zhang, Huihui <huihuix.zhang@intel.com>
+# Wendong,Sui <weidongx.sun@intel.com>
+
+testkitdbusdir = /etc/dbus-1/system.d/
+dist_testkitdbus_DATA = com.intel.testkit.conf
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?> <!-- -*- XML -*- -->
+
+<!DOCTYPE busconfig PUBLIC
+ "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
+ "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
+<busconfig>
+
+ <!-- Root can own the service -->
+ <policy user="root">
+ <allow own="com.intel.testkit"/>
+ <allow send_destination="com.intel.testkit"/>
+ <allow send_interface="com.intel.testkit"/>
+ </policy>
+
+ <policy at_console="true">
+ <allow own="com.intel.testkit"/>
+ <allow send_destination="com.intel.testkit"/>
+ <allow send_interface="com.intel.testkit"/>
+ </policy>
+
+ <policy context="default">
+ <allow own="com.intel.testkit"/>
+ <allow send_destination="com.intel.testkit"/>
+ <allow send_interface="com.intel.testkit"/>
+ </policy>
+
+</busconfig>
-testkit-lite (2.3.22) unstable; urgency=low
+testkit-lite (3.0.7) unstable; urgency=low
- * TCT2.2.1 Release.
+ * Public Release.
- -- ChengTao Liu <chengtaox.liu@intel.com> Mon, 4 Nov 2013 11:13:11 +0800
+ -- Shaofeng Tang <shaofeng.tang@intel.com> Fri, 21 Feb 2014 11:13:11 +0800
Package: testkit-lite
Architecture: all
-Depends: ${misc:Depends}, ${python:Depends}
+Depends: ${misc:Depends}, ${python:Depends}, python-dbus, python-gobject, python-gtk2
XB-Python-Version: ${python:Versions}
Description: Test runner for test execution.
-testkit-lite_2.3.4_all.deb utils standard
+testkit-lite_3.0.7_all.deb utils standard
#!/bin/sh
# Set permissions
-mkdir /opt/testkit/lite/test_packages
+mkdir -p /opt/testkit/lite/test_packages
chmod ugo+rwx -R /opt/testkit/lite
ln -fs /usr/share/pyshared/commodule /usr/lib/python2.7/dist-packages/commodule
ln -fs /usr/share/pyshared/testkitlite /usr/lib/python2.7/dist-packages/testkitlite
Summary: TCT-Lite
Name: testkit-lite
-Version: 2.3.22
+Version: 3.0.7
Release: 1
License: GPLv2
Group: Applications/System
%{python_sitelib}/testkitlite/*
%{python_sitelib}/commodule/*
%{python_sitelib}/testkit_lite-*.egg-info/*
+/etc/dbus-1/system.d/com.intel.testkit.conf
/opt/testkit/lite/VERSION
+/opt/testkit/lite/commodule/CONFIG
%{_bindir}/testkit-lite
+%{_bindir}/testkit-lite-dbus
%defattr(-,root,root)
%doc
-/opt/testkit/lite/testkit-lite_user_guide_for_tct.pdf
+/opt/testkit/lite/testkit-lite_user_guide.pdf
+/opt/testkit/lite/testkit-lite_tutorial.pdf
+/opt/testkit/lite/test_definition_schema.pdf
%changelog
# GNU General Public License for more details.
#
# Authors:
-# Jing,Wang <jing.j.wang@intel.com>
-# Yuanyuan,Zou <zouyuanx@intel.com>
+# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
from setuptools import setup, find_packages
name = "testkit-lite",
description = "Test runner for test execution",
url = "https://github.com/testkit/testkit-lite",
- author = "Cathy Shen",
- author_email = "cathy.shen@intel.com",
- version = "2.3.22",
+ author = "Shaofeng Tang",
+ author_email = "shaofeng.tang@intel.com",
+ version = "3.0.7",
include_package_data = True,
- data_files = [('/opt/testkit/lite/',
- ('VERSION', 'doc/testkit-lite_user_guide_for_tct.pdf'))],
- scripts = ('testkit-lite',),
+ data_files = [('/opt/testkit/lite', ['VERSION', 'doc/testkit-lite_user_guide.pdf', 'doc/testkit-lite_tutorial.pdf', 'doc/test_definition_schema.pdf']),
+ ('/opt/testkit/lite/commodule/', ['CONFIG']),
+ ('/etc/dbus-1/system.d/', ['dbus/com.intel.testkit.conf'])],
+ scripts = ('testkit-lite', 'testkit-lite-dbus'),
packages = find_packages(),
)
#
# Authors:
# Jing,Wang <jing.j.wang@intel.com>
-# Yuanyuan,Zou <zouyuanx@intel.com>
+# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
""" testkit lite tools"""
import os
import sys
import traceback
import platform
+import signal
import ConfigParser
import xml.etree.ElementTree as etree
from optparse import OptionParser, make_option
# import process kill
try:
- from testkitlite.common.process_killall import kill_testkit_lite, clean_testxml
+ from testkitlite.common.process_killall import get_device_lock, release_device_lock, clean_testxml
except ImportError, err:
LOGGER.error("[ Error: loading module killall failed, error: %s ]\n" % err)
LOGGER.info("try to run command \
COMMON_FILTERS = {
"suite": [],
"set": [],
+ "priority": [],
"id": [],
"type": [],
+ "status": [],
"component": []}
down_status = False
remote_test = False
can_merge_result = False
-
-# start testkit-lite in Singleton mode
-kill_testkit_lite(PID_FILE)
-
-if not os_ver == "Linux" and not os_ver == "Darwin":
- try:
- if not EXISTS(LOG_DIR):
- os.makedirs(LOG_DIR)
- except OSError, err:
- LOGGER.error("[ Error: create results directory:"
- " %s failed, error: %s ]\n" % (LOG_DIR, err))
-
-try:
- with open(PID_FILE, "w") as fd:
- PID = str(os.getpid())
- fd.writelines(PID + '\n')
-except OSError, e:
- LOGGER.error("[ Error: can't create pid log file: %s, error: %s ]\n" %
- (PID_FILE, err))
- sys.exit(1)
-
-try:
- os.chmod(PID_FILE, 0666)
-except OSError:
- pass
+device_id = ""
+device_locked = False
+RUNNER = None
# detect version option
if "--version" in sys.argv:
CONFIG.read(VERSION_FILE)
VERSION = CONFIG.get('public_version', 'version')
LOGGER.info("V%s" % VERSION)
+ sys.exit()
except ConfigParser.Error, err:
LOGGER.error(
"[ Error: fail to parse version info, error: %s ]\n" % err)
- sys.exit(1)
+ sys.exit(1)
# detect internal version option
if "--internal-version" in sys.argv:
CONFIG.read(VERSION_FILE)
VERSION = CONFIG.get('internal_version', 'version')
print VERSION
+ sys.exit()
except ConfigParser.Error, err:
print "[ Error: fail to parse version info, error: %s ]\n" % err
- sys.exit(1)
+ sys.exit(1)
# init test engine here
try:
- from testkitlite.engines.default.runner import TRunner
+ from testkitlite.engines.default.runner import TRunner, TestCaseNotFoundException
from commodule.connector import Connector
except ImportError, err:
LOGGER.error("[ Error: loading test engine failed, error: %s ]\n" % err)
del parser.rargs[:len(value)]
setattr(parser.values, option.dest, value)
+def unlock_and_exit(exit_code=signal.SIGINT):
+ if device_locked:
+ release_device_lock(device_id)
+ sys.exit(exit_code)
+
+def final_clean_test():
+ try:
+ if RUNNER is not None:
+ if RUNNER.session_id:
+ RUNNER.finalize_test(RUNNER.session_id)
+ if can_merge_result:
+ RUNNER.merge_resultfile(START_TIME, CURRENT_LOG_DIR)
+ if down_status:
+ clean_testxml(OPTIONS.testxml, remote_test)
+ except (KeyboardInterrupt, Exception), err:
+ pass
+
+def sig_exit_handler(sig, func=None):
+ final_clean_test()
+ LOGGER.info("\n[ exiting testkit-lite on system signal ]\n")
+ unlock_and_exit()
+
+signal.signal(signal.SIGTSTP, sig_exit_handler)
+signal.signal(signal.SIGTERM, sig_exit_handler)
try:
OPTION_LIST = [
make_option("-f", "--testxml", dest="testxml",
"set \"localhost\" for local web testing"),
make_option("--capability", dest="capability", action="store",
help="set platform for sepecfic device capability"),
- make_option("--quit", dest="quit", action="store_true",
- help="quit testkit-lite"),
make_option("--debug", dest="debug", action="store_true",
help="run in debug mode,more log information print out"),
make_option("--rerun", dest="rerun", action="store_true",
try:
# untrusted behaviour of %%prog
- USAGE = "%%prog [options] -f [prefix:]\"<somewhere/test.xml>\" -e \
-\"<launcher-name>\"\n\
-forms: %%prog -f [prefix:]\"<somewhere>/test.xml\"\n\
+ USAGE = "%%prog [options] -f [prefix:]\"<somewhere/test.xml>\" \n\
+forms: %%prog -f [prefix:]\"<somewhere>/test.xml\" \n\
%%prog -f [prefix:]\"<somewhere>/test.xml\" -D\n\
%%prog -f [prefix:]\"<somewhere>/test.xml\" -A\n\
%%prog -f [prefix:]\"<somewhere>/test.xml\" -M\n\
- %%prog -f [prefix:]\"<somewhere>/test.xml\" --type \"smoke\"\n\
- %%prog -f [prefix:]\"<somewhere>/test.xml\" --component \"TizenAPI/Communication/Messaging\"\n\
- %%prog -f [prefix:]\"<somewhere>/test1.xml <somewhere>/test2.xml \
-<somewhere>/test3.xml\" \n\
- %%prog -f [prefix:]\"<somewhere>/test.xml\" -D -A \ --capability <capability file> --comm <commoduletype>\n\
- %%prog -f [prefix:]\"<somewhere>/test.xml\" -D -A \
-...\n\
-exmaples: \n\
- run a web test package from device side with device WRT (it is default): \n\
- %%prog -f device:\"/opt/usr/media/tct/opt/tct-websocket-w3c-tests/tests.xml\" -e \
-'WRTLauncher' -A \n\
- run a web test package from local path with chrome browser: \n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --set <set_name>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --type <type_name>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --status <status_name>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --priority <priority_value>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --component <component_name>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --id <case_id>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --capability <capability_file> --comm <comm_type>\n\
+ %%prog -f [prefix:]\"<somewhere>/test.xml\" --comm <comm_type>\n\
+ %%prog -f [prefix:]\"<somewhere>/test1.xml <somewhere>/test2.xml <somewhere>/test3.xml\" \n\
+exmaples of \"prefix\" usage: \n\
+ run a web test with a test definition (XML file) from device side: \n\
+ %%prog -f device:\"/opt/usr/media/tct/opt/tct-websocket-w3c-tests/tests.xml\" -A \n\
+ run a web test with a test definition (XML file) from localhost: \n\
+ %%prog -f \"/opt/usr/media/tct/opt/tct-websocket-w3c-tests/tests.xml\" -A \n\
+exmaples of \"-e\" usage: \n\
+ run a web test package with TIZEN web-runtime, launcher provided in tests.xml, so \"-e\" is omitted: \n\
+ %%prog -f device:\"/opt/usr/media/tct/opt/tct-websocket-w3c-tests/tests.xml\" -A \n\
+ run a web test package with chrome browser: \n\
%%prog -f \"/usr/share/webapi-webkit-tests/tests.xml\" -e \
-'<somewhere>/chrome-startup' -A --comm localhost \n\
+'google-chrome --allow-file-access-from-files --disable-web-security --start-maximized --user-data-dir=/home/test/data /home/test/webrunner/index.html' -A --comm localhost \n\
\n\
Note: \n\
1) Proxy settings should be disabled when execute webapi packages\n\
when running same tests.xml with same options. This is caused \
by python's API 'getiterator' from module 'xml.etree.ElementTree'\n\
7) run command 'testkit-lite', \
- it might not be able to locate module 'testkitlite.engines.\
+it might not be able to locate module 'testkitlite.engines.\
default.runner', \
- run command 'export PYTHONPATH=/usr/share/pyshared/' \
- run command 'export PYTHONPATH=/usr/lib/python2.7/dist-packages' \
- to resolve this issue" % (LOG_DIR)
+run command 'export PYTHONPATH=/usr/share/pyshared/' \
+run command 'export PYTHONPATH=/usr/lib/python2.7/dist-packages' \
+to resolve this issue" % (LOG_DIR)
except Exception:
USAGE = None
sys.argv.append("-h")
PARSERS = OptionParser(option_list=OPTION_LIST, usage=USAGE)
-
(OPTIONS, ARGS) = PARSERS.parse_args()
- # detect quit action
- if OPTIONS.quit:
- try:
- LOGGER.info("[ Quit testkit-lite now ]")
- kill_testkit_lite(PID_FILE)
- except Exception, err:
- LOGGER.error("[ Error: fail to kill existing testkit-lite,"
- " error: %s ]\n" % err)
- sys.exit(1)
# detect conflict
if OPTIONS.bautoonly and OPTIONS.bmanualonly:
if CONNECTOR == None:
LOGGER.error("[ Error: init commodule error... ]\n")
sys.exit(1)
+ device_id = CONNECTOR.get_device_info()['device_id']
+ if not OPTIONS.non_active:
+ device_locked = get_device_lock(device_id)
+ if not device_locked:
+ LOGGER.error("[ Error: Failed to get device lock for current instance... ]\n")
+ sys.exit(1)
# create runner
RUNNER = TRunner(CONNECTOR)
# set capability
if OPTIONS.capability:
- GET_CAPABILITY_STATUS = RUNNER.get_capability(OPTIONS.capability)
- if not GET_CAPABILITY_STATUS:
- sys.exit(1)
+ if not RUNNER.get_capability(OPTIONS.capability):
+ unlock_and_exit()
if "device:" in OPTIONS.testxml[0]:
remote_test = True
LOGGER.error("[ Error: "
"can't create test package directory: %s, error: %s ]\n" %
(TEST_PACKAGES_DIR, err))
- sys.exit(1)
+ unlock_and_exit()
REMOTE_TESTLITS = OPTIONS.testxml[0]
REMOTE_TESTLITS = REMOTE_TESTLITS.split(':')[1]
TESTLISTARRARY = REMOTE_TESTLITS.split()
if not down_status:
LOGGER.error("can not get test definition file, pls check file on device:%s"
% remote_file)
- sys.exit(1)
+ unlock_and_exit()
LOCALARRY.append(local_test_package)
OPTIONS.testxml = LOCALARRY
else:
if not OPTIONS.testxml:
LOGGER.error("[ Error: not specify a test xml... ]\n")
- sys.exit(1)
+ unlock_and_exit()
# 1) prepare log dir
if os_ver == "Linux" or os_ver == "Darwin":
if not filename.startswith('/'):
LOGGER.error("[ Error:"
" xml file %s should start with '/' ]" % filename)
- sys.exit(1)
- else:
- filename = filename.split('/')[-2]
- if filename == "":
- LOGGER.error("[ Error:"
- " unable to find package name from %s ]" % t)
- sys.exit(1)
+ unlock_and_exit()
+ file_items = filename.split('/')
else:
- filename = filename.split('\\')[-2]
+ file_items = filename.split('\\')
+ if len(file_items) < 2 or file_items[-2] == "" or file_items[-1] == "":
+ LOGGER.error("[ Error:"
+ " unable to find package name from %s ]" % t)
+ unlock_and_exit()
+ filename = file_items[-2] + '_' + file_items[-1]
filename = "%s.total" % BASENAME(filename)
resultfile = "%s.xml" % filename
resultfile = JOIN(CURRENT_LOG_DIR, resultfile)
except etree.ParseError:
LOGGER.error("[ Error: no case found in testxml, "
"pls check the test package ]\n")
- sys.exit(1)
+ unlock_and_exit()
no_test_definition = 1
for tf in ep.getiterator('test_definition'):
no_test_definition = 0
WFILTERS['execution_type'] = ["manual"]
RUNNER.add_filter_rules(**WFILTERS)
RUNNER.apply_filter(suiteparent)
- # just leave suite and set for merge result
+ # merge duplicated test set under suite node
+ tset_list = set()
for suite in ep.getiterator('suite'):
for tset in suite.getiterator('set'):
for testcase in tset.getiterator('testcase'):
tset.remove(testcase)
+ if tset.get('name') in tset_list:
+ suite.remove(tset)
+ else:
+ tset_list.add(tset.get('name'))
try:
with open(resultfile, 'w') as output:
tree = etree.ElementTree(element=suiteparent)
"error: %s ]\n" % (resultfile, err))
else:
print "[ Have no test xml found ]"
- sys.exit(1)
+ unlock_and_exit()
+
START_TIME = datetime.today().strftime("%Y-%m-%d_%H_%M_%S")
if not OPTIONS.bautoonly:
if OPTIONS.bmanualonly:
try:
can_merge_result = True
RUNNER.run_case(CURRENT_LOG_DIR)
+ except TestCaseNotFoundException, err:
+ LOGGER.info("\n[ Error: exiting testkit-lite on error: %s ]\n" % err)
+ unlock_and_exit()
except Exception, err:
clean_testxml(TESTXMLS, remote_test)
traceback.print_exc()
- LOGGER.error("[ Error: run_test_case failed ,error: %s ]\n" % err)
+ LOGGER.error("[ Error: run test failed, error: %s ]\n" % err)
+
try:
RUNNER.merge_resultfile(START_TIME, CURRENT_LOG_DIR)
clean_testxml(TESTXMLS, remote_test)
LOGGER.info("[ all tasks for testkit lite are accomplished, goodbye ]")
+ unlock_and_exit(0)
except Exception, err:
traceback.print_exc()
clean_testxml(TESTXMLS,remote_test)
- LOGGER.error("[ Error: merge_resultfile "
- "failed,error: %s ]\n" % err)
-
+ LOGGER.error("[ Error: merge result failed, error: %s ]\n" % err)
+ unlock_and_exit()
except KeyboardInterrupt, err:
- if RUNNER.session_id:
- RUNNER.finalize_test(RUNNER.session_id)
- if can_merge_result:
- RUNNER.merge_resultfile(START_TIME, CURRENT_LOG_DIR)
- if down_status:
- clean_testxml(OPTIONS.testxml, remote_test)
+ final_clean_test()
LOGGER.info("\n[ exiting testkit-lite on user cancel ]\n")
- sys.exit(1)
+ unlock_and_exit()
except Exception, err:
- if RUNNER.session_id:
- RUNNER.finalize_test(RUNNER.session_id)
- if can_merge_result:
- RUNNER.merge_resultfile(START_TIME, CURRENT_LOG_DIR)
- if down_status:
- clean_testxml(OPTIONS.testxml, remote_test)
- LOGGER.error("\n[ Error: exiting testkit-lite "
- "by catching a critical error: %s ]\n" % err)
+ final_clean_test()
+ LOGGER.error("\n[ Error: exiting testkit-lite due to critical error: %s ]\n" % err)
traceback.print_exc()
- sys.exit(1)
+ unlock_and_exit()
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# ChengTao.Liu <chengtaox.liu@intel.com>
+# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
+""" testkit dbus service"""
+
+import dbus
+import dbus.service
+from dbus.mainloop.glib import DBusGMainLoop
+import gobject
+import gtk
+
+class deviceMgr(dbus.service.Object):
+ def __init__(self):
+ bus_name = dbus.service.BusName('com.intel.testkit', bus=dbus.SessionBus())
+ dbus.service.Object.__init__(self, bus_name, "/com/intel/testkit/devices")
+ self._devices = []
+
+ @dbus.service.method(dbus_interface="com.intel.testkit", in_signature="s", out_signature="b")
+ def addDevice(self, device_id):
+ if device_id in self._devices:
+ return False
+ else:
+ self._devices.append(device_id)
+ return True
+
+ @dbus.service.method(dbus_interface="com.intel.testkit", in_signature="s", out_signature="")
+ def removeDevice(self, device_id):
+ if device_id in self._devices:
+ self._devices.remove(device_id)
+ return None
+
+DBusGMainLoop(set_as_default=True)
+gobject.threads_init()
+device_mgr = deviceMgr()
+loop = gobject.MainLoop()
+loop.run()
\ No newline at end of file
# GNU General Public License for more details.
#
# Authors:
-# Zhang, Huihui <huihuix.zhang@intel.com>
-# Wendong,Sui <weidongx.sun@intel.com>
+# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
+
""" kill testkit-lite """
import os
-import platform
-import signal
import re
-import ctypes
+import dbus
+import time
from commodule.log import LOGGER
from commodule.killall import killall
+from commodule.autoexec import shell_command
+
+DEVICE_DBUS = "testkit-lite-dbus"
+DEVICE_WHITE_LIST = ['localhost', '127.0.0.1']
def kill_testkit_lite(pid_file):
pid = pidfile.readline().rstrip("\n")
if pid:
killall(pid)
- except IOError, error:
+ except IOError as error:
pattern = re.compile('No such file or directory|No such process')
match = pattern.search(str(error))
if not match:
"error: %s ]\n" % error)
return None
+def launch_dbus_deamon():
+ exit_code, ret = shell_command(DEVICE_DBUS + '&')
+ time.sleep(3)
+
+def get_device_lock(device_id):
+ """ set device lock for current testkit lite"""
+ if device_id in DEVICE_WHITE_LIST:
+ return True
+ bus = dbus.SessionBus()
+ try:
+ device_service = bus.get_object('com.intel.testkit', '/com/intel/testkit/devices')
+ except Exception as error:
+ launch_dbus_deamon()
+ device_service = bus.get_object('com.intel.testkit', '/com/intel/testkit/devices')
+
+ try:
+ ret = device_service.addDevice(device_id)
+ return bool(ret)
+ except Exception as error:
+ return False
+
+def release_device_lock(device_id):
+ """ kill testkit lite"""
+ if device_id in DEVICE_WHITE_LIST:
+ return True
+ bus = dbus.SessionBus()
+ try:
+ device_service = bus.get_object('com.intel.testkit', '/com/intel/testkit/devices')
+ ret = device_service.removeDevice(device_id)
+ return True
+ except Exception as error:
+ return False
+
def clean_testxml(testxmls,remote_test):
"""clean all test xmls"""
if remote_test:
fd_name = os.path.dirname(testxml)
os.remove(testxml)
os.rmdir(fd_name)
- return None
\ No newline at end of file
+ return None
EXISTS = os.path.exists
ABSPATH = os.path.abspath
+# test constants
+OPT_LAUNCHER = 'test-launcher'
+OPT_EXTENSION = 'test-extension'
+OPT_DEBUG_LOG = 'debug-log-base'
+OPT_CAPABILITY = 'capability'
+OPT_DEBUG = 'debug'
+OPT_RERUN = 'rerun'
+OPT_WIDGET = 'test-widget'
+OPT_STUB = 'stub-name'
+OPT_SUITE = 'testsuite-name'
+OPT_SET = 'testset-name'
+
+class TestCaseNotFoundException(Exception):
+ """
+ Test case not found Exception
+ """
+ __data = ""
+ def __init__(self, data):
+ self.__data = data
+
+ def __str__(self):
+ return self.__data
class TRunner:
self.has_capability = False
self.rerun = False
self.test_prefix = ""
+ self.filter_ok = False
def set_global_parameters(self, options):
"get all options "
# resultdir is set to current directory by default
if not resultdir:
resultdir = os.getcwd()
- ok_prepare = True
- if ok_prepare:
- try:
- filename = testxmlfile
- filename = os.path.splitext(filename)[0]
- if platform.system() == "Linux":
- filename = filename.split('/')[-2]
- else:
- filename = filename.split('\\')[-2]
- if self.filter_rules["execution_type"] == ["manual"]:
- resultfile = "%s.manual.xml" % filename
- else:
- resultfile = "%s.auto.xml" % filename
- resultfile = JOIN(resultdir, resultfile)
- if not EXISTS(resultdir):
- os.mkdir(resultdir)
- LOGGER.info("[ analysis test xml file: %s ]" % resultfile)
- self.__prepare_result_file(testxmlfile, resultfile)
- self.__split_test_xml(resultfile, resultdir)
- except IOError as error:
- LOGGER.error(error)
- ok_prepare &= False
- return ok_prepare
+ try:
+ filename = testxmlfile
+ filename = os.path.splitext(filename)[0]
+ os_ver = platform.system()
+ if os_ver == "Linux" or os_ver == "Darwin":
+ file_items = filename.split('/')
+ else:
+ file_items = filename.split('\\')
+ if len(file_items) < 2 or file_items[-2] == "" or file_items[-1] == "":
+ return False
+ filename = file_items[-2] + '_' + file_items[-1]
+ if self.filter_rules["execution_type"] == ["manual"]:
+ resultfile = "%s.manual.xml" % filename
+ else:
+ resultfile = "%s.auto.xml" % filename
+ resultfile = JOIN(resultdir, resultfile)
+ if not EXISTS(resultdir):
+ os.mkdir(resultdir)
+ LOGGER.info("[ analysis test xml file: %s ]" % resultfile)
+ self.__prepare_result_file(testxmlfile, resultfile)
+ self.__split_test_xml(resultfile, resultdir)
+ except IOError as error:
+ LOGGER.error(error)
+ return False
+ return True
def __split_test_xml(self, resultfile, resultdir):
""" split_test_xml into auto and manual"""
suitefilename).getiterator('testcase')
if case_suite_find:
if tsuite.get('launcher'):
- if tsuite.get('launcher').find('WRTLauncher'):
- self.__splite_core_test(suitefilename)
- else:
- testsuite_dict_value_list.append(suitefilename)
- if testsuite_dict_add_flag == 0:
- self.exe_sequence.append(test_file_name)
- testsuite_dict_add_flag = 1
- self.resultfiles.add(suitefilename)
+ testsuite_dict_value_list.append(suitefilename)
+ if testsuite_dict_add_flag == 0:
+ self.exe_sequence.append(test_file_name)
+ testsuite_dict_add_flag = 1
+ self.resultfiles.add(suitefilename)
else:
if self.filter_rules["execution_type"] == ["auto"]:
self.core_auto_files.append(suitefilename)
def run_case(self, latest_dir):
""" run case """
+ # case not found
+ case_ids = self.filter_rules.get('id')
+ if case_ids and not self.filter_ok:
+ raise TestCaseNotFoundException('Test case %s not found!' % case_ids)
# run core auto cases
self.__run_core_auto()
try:
if self.resultfile:
if os.path.splitext(self.resultfile)[-1] == '.xml':
- if not os.path.exists(os.path.dirname(self.resultfile)):
- if len(os.path.dirname(self.resultfile)) > 0:
- os.makedirs(os.path.dirname(self.resultfile))
+ if not EXISTS(DIRNAME(self.resultfile)):
+ if len(DIRNAME(self.resultfile)) > 0:
+ os.makedirs(DIRNAME(self.resultfile))
LOGGER.info("[ copy result xml to output file:"
" %s ]" % self.resultfile)
copyfile(mergefile, self.resultfile)
def __get_environment(self):
""" get environment """
device_info = self.connector.get_device_info()
- build_infos = get_buildinfo(self.connector)
+ build_infos = self.connector.get_buildinfo()
# add environment node
environment = etree.Element('environment')
environment.attrib['device_id'] = device_info["device_id"]
environment.attrib['device_model'] = device_info["device_model"]
environment.attrib['device_name'] = device_info["device_name"]
- environment.attrib['build_id'] = build_infos['buildid']
environment.attrib['host'] = platform.platform()
+ environment.attrib['lite_version'] = get_version_info()
environment.attrib['resolution'] = device_info["resolution"]
environment.attrib['screen_size'] = device_info["screen_size"]
+ environment.attrib['build_id'] = build_infos['buildid']
environment.attrib['device_model'] = build_infos['model']
environment.attrib['manufacturer'] = build_infos['manufacturer']
other = etree.Element('other')
case_detail_tmp.setdefault("purpose", tcase.get('purpose'))
case_detail_tmp.setdefault("order", str(case_order))
case_detail_tmp.setdefault("onload_delay", "3")
+ case_detail_tmp.setdefault("location", "device")
if tcase.find('description/test_script_entry') is not None:
tc_entry = tcase.find(
case_detail_tmp["expected_result"] = tcase.find(
'description/test_script_entry'
).get('test_script_expected_result')
+ if tcase.find(
+ 'description/test_script_entry'
+ ).get('location'):
+ case_detail_tmp["location"] = tcase.find(
+ 'description/test_script_entry'
+ ).get('location')
for this_step in tcase.getiterator("step"):
step_detail_tmp = {}
step_detail_tmp.setdefault("order", "1")
for tcase in tset.getiterator('testcase'):
if not self.__apply_filter_case_check(tcase):
tset.remove(tcase)
+ else:
+ self.filter_ok = True
def __apply_filter_case_check(self, tcase):
"""filter cases"""
t_val.append(i.text)
if len(set(rules[key]) & set(t_val)) == 0:
return False
+ else:
+ return False
return True
def __apply_capability_filter_set(self, tset):
""" prepare_starup_parameters """
starup_parameters = {}
- LOGGER.info("[ prepare_starup_parameters ]")
+ LOGGER.info("[ preparing for startup options ]")
try:
parse_tree = etree.parse(testxml)
tsuite = parse_tree.getroot().getiterator('suite')[0]
tset = parse_tree.getroot().getiterator('set')[0]
if tset.get("launcher") is not None:
- starup_parameters['test-launcher'] = tset.get("launcher")
+ starup_parameters[OPT_LAUNCHER] = tset.get("launcher")
else:
- starup_parameters['test-launcher'] = tsuite.get("launcher")
- starup_parameters['testsuite-name'] = tsuite.get("name")
- starup_parameters['testset-name'] = tset.get("name")
- starup_parameters['stub-name'] = self.stub_name
- if self.external_test is not None:
- starup_parameters['external-test'] = self.external_test
- starup_parameters['debug'] = self.debug
- starup_parameters['test_prefix'] = self.test_prefix
+ starup_parameters[OPT_LAUNCHER] = tsuite.get("launcher")
+ if tsuite.get("extension") is not None:
+ starup_parameters[OPT_EXTENSION] = tsuite.get("extension")
+ if tsuite.get("widget") is not None:
+ starup_parameters[OPT_WIDGET] = tsuite.get("widget")
+ starup_parameters[OPT_SUITE] = tsuite.get("name")
+ starup_parameters[OPT_SET] = tset.get("name")
+ starup_parameters[OPT_STUB] = self.stub_name
+ if self.external_test is not None and \
+ starup_parameters[OPT_LAUNCHER].find(self.external_test) == -1:
+ starup_parameters[OPT_LAUNCHER] = self.external_test
+ starup_parameters[OPT_EXTENSION] = self.external_test.split(' ')[0]
+ starup_parameters[OPT_DEBUG] = self.debug
+ if self.resultfile:
+ debug_dir = DIRNAME(self.resultfile)
+ debug_name = os.path.splitext(BASENAME(self.resultfile))[0]
+ if not EXISTS(debug_dir):
+ os.makedirs(debug_dir)
+ else:
+ debug_dir = DIRNAME(testxml)
+ debug_name = os.path.splitext(BASENAME(testxml))[0]
+ starup_parameters[OPT_DEBUG_LOG] = JOIN(debug_dir, debug_name)
+ self.debug_log_file = starup_parameters[OPT_DEBUG_LOG] + '.dlog'
if self.rerun:
- starup_parameters['rerun'] = self.rerun
+ starup_parameters[OPT_RERUN] = self.rerun
if len(self.capabilities) > 0:
- starup_parameters['capability'] = self.capabilities
+ starup_parameters[OPT_CAPABILITY] = self.capabilities
except IOError as error:
LOGGER.error(
"[ Error: prepare starup parameters, error: %s ]" % error)
if 'resultfile' in set_result:
self.__write_file_result(set_result_xml, set_result)
else:
- write_json_result(set_result_xml, set_result)
+ write_json_result(set_result_xml, set_result, self.debug_log_file)
def __write_file_result(self, set_result_xml, set_result):
"""write xml result file"""
test_em = test_tree.getroot()
result_tree = etree.parse(result_file)
result_em = result_tree.getroot()
- dubug_file = os.path.basename(set_result_xml)
- dubug_file = os.path.splitext(dubug_file)[0] + '.dlog'
+ dubug_file = BASENAME(self.debug_log_file)
for result_suite in result_em.getiterator('suite'):
for result_set in result_suite.getiterator('set'):
for test_suite in test_em.getiterator('suite'):
return summary
-def write_json_result(set_result_xml, set_result):
+def write_json_result(set_result_xml, set_result, debug_log_file):
''' fetch result form JSON'''
case_results = set_result["cases"]
try:
parse_tree = etree.parse(set_result_xml)
root_em = parse_tree.getroot()
- dubug_file = os.path.basename(set_result_xml)
- dubug_file = os.path.splitext(dubug_file)[0] + '.dlog'
+ dubug_file = BASENAME(debug_log_file)
for tset in root_em.getiterator('set'):
tset.set("set_debug_msg", dubug_file)
for tcase in tset.getiterator('testcase'):
traceback.print_exc()
LOGGER.error(
"[ Error: fail to write cases result, error: %s ]\n" % error)
-
-
-def get_buildinfo(conn):
- """ get builf info"""
- device_file = '/opt/usr/media/Documents/tct/buildinfo.xml'
- builfinfo_file = '/opt/testkit/lite/buildinfo.xml'
- build_info = {}
- build_info['buildid'] = ''
- build_info['manufacturer'] = ''
- build_info['model'] = ''
-
- if conn.download_file(device_file, builfinfo_file) and EXISTS(builfinfo_file):
- root = etree.parse(builfinfo_file).getroot()
- for element in root.findall("buildinfo"):
- if element is not None:
- if element.get("name").lower() == 'buildversion':
- child = etree.Element.getchildren(element)
- if child and child[0].text:
- buildid = child[0].text
- build_info['buildid'] = buildid
- if element.get("name").lower() == 'manufacturer':
- child = etree.Element.getchildren(element)
- if child and child[0].text:
- manufacturer = child[0].text
- build_info['manufacturer'] = manufacturer
- if element.get("name").lower() == 'model':
- child = etree.Element.getchildren(element)
- if child and child[0].text:
- model = child[0].text
- build_info['model'] = model
- os.remove(builfinfo_file)
- return build_info
from datetime import datetime
from commodule.log import LOGGER
+from commodule.str2 import str2str
from commodule.httprequest import get_url, http_request
CNT_RETRY = 10
UIFW_SET_NUM = 0
LAUNCH_ERROR = 1
BLOCK_ERROR = 3
+FILES_ROOT = os.path.expanduser("~") + os.sep
class TestSetResut(object):
LOGGER.info(self._progress %
(self._suite_name, case_it['case_id'], case_it['result']))
if case_it['result'].lower() in ['fail', 'block'] and 'stdout' in case_it:
- LOGGER.info(case_it['stdout'])
+ LOGGER.info(str2str(case_it['stdout']))
self._mutex.release()
def get_result(self):
LOGGER.info('[ end of dlog message ]')
-def _core_test_exec(conn, test_set_name, exetype, cases_queue, result_obj):
+def _core_test_exec(conn, test_session, test_set_name, exetype, cases_queue, result_obj):
"""function for running core tests"""
exetype = exetype.lower()
total_count = len(cases_queue)
current_idx = 0
manual_skip_all = False
result_list = []
+ stdout_file = FILES_ROOT + test_session + "_stdout.log"
+ stderr_file = FILES_ROOT + test_session + "_stderr.log"
for test_case in cases_queue:
if result_obj.get_status() == 1:
break
continue
expected_result = test_case.get('expected_result', '0')
time_out = int(test_case.get('timeout', '90'))
+ location = test_case.get('location', 'device')
measures = test_case.get('measures', [])
retmeasures = []
LOGGER.info("\n[core test] execute case:\nTestCase: %s\n"
LOGGER.info("start time: %s" % strtime)
test_case["start_at"] = strtime
if exetype == 'auto':
- return_code, stdout, stderr = conn.shell_cmd_ext(
- core_cmd, time_out, False)
+ return_code, stdout, stderr = -1, [], []
+ if location == 'host':
+ return_code, stdout, stderr = conn.shell_cmd_host(core_cmd, time_out, False, stdout_file, stderr_file)
+ else:
+ return_code, stdout, stderr = conn.shell_cmd_ext(core_cmd, time_out, False, stdout_file, stderr_file)
if return_code is not None and return_code != "timeout":
test_case["result"] = "pass" if str(
return_code) == expected_result else "fail"
fname = item['file']
if fname is None:
continue
- tmpname = os.path.expanduser("~") + os.sep + "mea_tmp"
+ tmpname = FILES_ROOT + test_session + "_mea_tmp"
if conn.download_file(fname, tmpname):
try:
config = ConfigParser.ConfigParser()
break
if not conn.launch_app(test_web_app):
+ LOGGER.error("[ ERROR: launch test app %s failed! ]" % test_web_app)
result_obj.set_status(1)
break
result_obj.set_status(1)
break
if error_code == LAUNCH_ERROR:
- relaunch_cnt += 1
- if relaunch_cnt >= 3:
- test_set_finished = True
- result_obj.set_status(1)
- break
+ LOGGER.error("[ ERROR: test app no response, hang or not launched! ]")
+ test_set_finished = True
+ result_obj.set_status(1)
+ break
elif error_code == BLOCK_ERROR:
relaunch_cnt = 0
else:
UIFW_SET_NUM = 1
LOGGER.info('[webuifw] start test executing')
if not conn.launch_app(test_web_app):
- LOGGER.info("[ launch test app \"%s\" failed! ]" %
- self.opts['test_app_id'])
+ LOGGER.info("[ launch test app \"%s\" failed! ]" % test_web_app)
result_obj.set_result({"resultfile": ""})
result_obj.set_status(1)
- result_file = os.path.expanduser("~") + os.sep + test_session + "_uifw.xml"
+ result_file = FILES_ROOT + test_session + "_uifw.xml"
while time_out > 0:
LOGGER.info('[webuifw] waiting for test completed...')
return None
session_id = str(uuid.uuid1())
- cmdline = ""
- debug_opt = ""
stub_app = params.get('stub-name', 'testkit-stub')
stub_port = params.get('stub-port', '8000')
- test_launcher = params.get('external-test', '')
testsuite_name = params.get('testsuite-name', '')
testset_name = params.get('testset-name', '')
capability_opt = params.get("capability", None)
- client_cmds = params.get('test-launcher', '').strip().split()
- wrt_tag = client_cmds[1] if len(client_cmds) > 1 else ""
- self.opts['fuzzy_match'] = fuzzy_match = wrt_tag.find('z') != -1
- self.opts['auto_iu'] = auto_iu = wrt_tag.find('iu') != -1
- self.opts['self_exec'] = wrt_tag.find('a') != -1
- self.opts['self_repeat'] = wrt_tag.find('r') != -1
- self.opts['debug_mode'] = params.get("debug", False)
+ test_launcher = params.get('test-launcher', '')
+ test_extension = params.get('test-extension', None)
+ test_widget = params.get('test-widget', None)
test_opt = self.conn.get_launcher_opt(
- test_launcher, testsuite_name, testset_name, fuzzy_match, auto_iu)
+ test_launcher, test_extension, test_widget, testsuite_name, testset_name)
if test_opt is None:
- LOGGER.info("[ init the test options, get failed ]")
+ LOGGER.info("[ init the test launcher, get failed ]")
return None
-
- # to be removed in later version
- test_opt["suite_id"] = test_opt["test_app_id"]
+ LOGGER.info("[ web test launcher: %s ]" % test_opt["launcher"])
+ LOGGER.info("[ web test app: %s ]" % test_opt["test_app_id"])
self.opts.update(test_opt)
+ self.opts['debug_mode'] = params.get("debug", False)
# uifw, this suite don't need stub
if self.opts['self_exec'] or self.opts['self_repeat']:
return session_id
# enable debug information
- if self.opts['debug_mode']:
- debug_opt = '--debug'
+ stub_debug_opt = "--debug" if self.opts['debug_mode'] else ""
- if self.__init_test_stub(stub_app, stub_port, debug_opt):
+ # suite_id to be removed in later version
+ test_opt["suite_id"] = test_opt["test_app_id"]
+ if self.__init_test_stub(stub_app, stub_port, stub_debug_opt):
ret = http_request(get_url(
self.server_url, "/init_test"), "POST", test_opt)
if ret is None:
"""init the test envrionment"""
self.opts['testset_name'] = params.get('testset-name', '')
self.opts['testsuite_name'] = params.get('testsuite-name', '')
+ self.opts['debug_log_base'] = params.get("debug-log-base", '')
if params.get('test-launcher') is not None:
self.opts['test_type'] = "webapi"
return self.__init_webtest_opt(params)
self.opts['async_th'] = threading.Thread(
target=_core_test_exec,
args=(
- self.conn, test_set_name, exetype, cases, self.result_obj)
+ self.conn, sessionid, test_set_name, exetype, cases, self.result_obj)
)
self.opts['async_th'].start()
return True
return False
# start debug trace thread
- dlogfile = test_set['current_set_name'].replace('.xml', '.dlog')
- self.opts['dlog_file'] = dlogfile
- self.conn.start_debug(dlogfile)
+ self.conn.start_debug(self.opts['debug_log_base'])
time.sleep(1)
self.result_obj = TestSetResut(
if sessionid is None:
return False
- self.result_obj.set_status(1)
+ if self.result_obj is not None:
+ self.result_obj.set_status(1)
# stop test app
if self.opts['test_type'] == "webapi":
# stop debug thread
self.conn.stop_debug()
- # add dlog output for debug
- if self.opts['debug_mode']:
- _print_dlog(self.opts['dlog_file'])
-
return True
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+#
# Authors:
# Zhang, Huihui <huihuix.zhang@intel.com>
# Wendong,Sui <weidongx.sun@intel.com>
</xs:attribute>
<xs:attribute name="timeout" type="xs:int" default="90">
</xs:attribute>
+ <xs:attribute name="location" type="xs:string" default="device">
+ </xs:attribute>
</xs:extension>
</xs:simpleContent>
</xs:complexType>