[public_version]
-version=3.0.10
+version=3.1.0
[internal_version]
-version=3.0.10
+version=3.1.0
+++ /dev/null
-# Copyright (C) 2012 Intel Corporation
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# Authors:
-# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
-
-
-SUBDIRS = impl
-
-commoduledir = /usr/lib/python2.7/dist-packages/commodule
-dist_commodule_SCRIPTS = *.py
-testkit-lite (3.0.10) unstable; urgency=low
+testkit-lite (3.1.0) unstable; urgency=low
* Public Release.
-testkit-lite_3.0.7_all.deb utils standard
+testkit-lite_3.1.2_all.deb utils standard
# Set permissions
mkdir -p /opt/testkit/lite/test_packages
chmod ugo+rwx -R /opt/testkit/lite
-ln -fs /usr/share/pyshared/commodule /usr/lib/python2.7/dist-packages/commodule
ln -fs /usr/share/pyshared/testkitlite /usr/lib/python2.7/dist-packages/testkitlite
Build-Depends: debhelper (>= 8.0.0), python-setuptools, python-support (>= 0.8.4)
Standards-Version: 3.9.2
Homepage: https://github.com/testkit/testkit-manager
-Version: 2.3.4
+Version: 3.1.2
Files:
- 82c2118d146839ec9139559f5049d2bf 6361871 testkit-lite_2.3.4.tar.gz
+ 9eb0a55894d02fdfe29003dc25f15808 1470605 testkit-lite_3.1.2.tar.gz
Summary: TCT-Lite
Name: testkit-lite
-Version: 3.0.10
+Version: 3.1.0
Release: 1
License: GPLv2
Group: Applications/System
%files
%{python_sitelib}/testkitlite/*
-%{python_sitelib}/commodule/*
%{python_sitelib}/testkit_lite-*.egg-info/*
/etc/dbus-1/system.d/com.intel.testkit.conf
/opt/testkit/lite/VERSION
url = "https://github.com/testkit/testkit-lite",
author = "Shaofeng Tang",
author_email = "shaofeng.tang@intel.com",
- version = "3.0.10",
+ version = "3.1.0",
include_package_data = True,
data_files = [('/opt/testkit/lite', ['VERSION', 'doc/testkit-lite_user_guide.pdf', 'doc/testkit-lite_tutorial.pdf', 'doc/test_definition_schema.pdf']),
('/opt/testkit/lite/commodule/', ['CONFIG']),
('/etc/dbus-1/system.d/', ['dbus/com.intel.testkit.conf'])],
- scripts = ('testkit-lite', 'testkit-lite-dbus'),
+ scripts = ('testkit-lite', 'dbus/testkit-lite-dbus'),
packages = find_packages(),
cmdclass = {'install_data': post_install_cmd},
)
from optparse import OptionParser, make_option
from datetime import datetime
-# import logger
+
try:
- from commodule.log import LOGGER
+ # import logger
+ from testkitlite.util.log import LOGGER
except ImportError, err:
print "[ Error: loading logging failed, error: %s ]\n" % err
print "try to run command " \
- "'export PYTHONPATH=/usr/lib/python2.7/dist-packages' and " \
- "'export PYTHONPATH=/usr/share/pyshared/' to resolve this issue"
+ "'export PYTHONPATH=/usr/local/lib/python2.7/dist-packages' or " \
+ "'export PYTHONPATH=/usr/local/lib/python2.7/site-packages' to resolve module missed issue"
sys.exit(1)
-# import process kill
-try:
- from testkitlite.common.process_killall import get_device_lock, release_device_lock, clean_testxml
-except ImportError, err:
- LOGGER.error("[ Error: loading module killall failed, error: %s ]\n" % err)
- LOGGER.info("try to run command \
-'export PYTHONPATH=/usr/lib/python2.7/dist-packages' and \
-'export PYTHONPATH=/usr/share/pyshared/' to resolve this issue")
- sys.exit(1)
# get platform version info
-os_ver = platform.system()
+OS_VER = platform.system()
JOIN = os.path.join
EXISTS = os.path.exists
DIRNAME = os.path.dirname
ISLINK = os.path.islink
TESTKIT_DIR = "/opt/testkit/lite"
-if not os_ver == "Linux" and not os_ver == "Darwin":
+if not OS_VER == "Linux" and not OS_VER == "Darwin":
TESTKIT_DIR = DIRNAME(ABSPATH(__file__))
sys.path += [JOIN(TESTKIT_DIR)]
TESTKIT_DIR = JOIN(TESTKIT_DIR, "results")
LOG_DIR = TESTKIT_DIR
-PID_FILE = JOIN(LOG_DIR, "pid.log")
TEST_PACKAGES_DIR = JOIN(TESTKIT_DIR, "test_packages")
COMMON_FILTERS = {
"suite": [],
sys.exit(1)
-# init test engine here
-try:
- from testkitlite.engines.default.runner import TRunner, TestCaseNotFoundException
- from commodule.connector import Connector
-except ImportError, err:
- LOGGER.error("[ Error: loading test engine failed, error: %s ]\n" % err)
- LOGGER.info("try to run command "
- "'export PYTHONPATH=/usr/lib/python2.7/dist-packages' and "
- "'export PYTHONPATH=/usr/share/pyshared/'to resolve this issue")
- sys.exit(1)
-
-
def varnarg(option, opt_str, value, parser):
""" parser srg"""
value = []
except (KeyboardInterrupt, Exception), err:
pass
-def sig_exit_handler(sig, func=None):
+def sig_exit_handler(*args):
final_clean_test()
LOGGER.info("\n[ exiting testkit-lite on system signal ]\n")
unlock_and_exit()
make_option("-e", dest="exttest", action="store",
help="Launch external test with a launcher,\
supports browser or other web-runtime"),
+ make_option("-k", "--worker", dest="worker", action="store",
+ help="Specify a test engine for execution, use value 'default' by default"),
make_option("--version", dest="version_info", action="store_true",
help="Show version information"),
make_option("--internal-version", dest="internal_version_info",
7) run command 'testkit-lite', \
it might not be able to locate module 'testkitlite.engines.\
default.runner', \
-run command 'export PYTHONPATH=/usr/share/pyshared/' \
-run command 'export PYTHONPATH=/usr/lib/python2.7/dist-packages' \
+run command 'export PYTHONPATH=/usr/local/lib/python2.7/dist-packages' or \
+run command 'export PYTHONPATH=/usr/local/lib/python2.7/site-packages' \
to resolve this issue" % (LOG_DIR)
except Exception:
USAGE = None
PARSERS = OptionParser(option_list=OPTION_LIST, usage=USAGE)
(OPTIONS, ARGS) = PARSERS.parse_args()
- # detect conflict
+ # init test engine here
+ from testkitlite.util.connector import ConnectorBuilder
+ from testkitlite.util.process import get_device_lock, release_device_lock, clean_testxml
+ from testkitlite.util.session import TestSession
+ from testkitlite.util.errors import TestCaseNotFoundException, TestEngineException
+
+ #execute_type
+ exec_types = ["auto","manual"]
if OPTIONS.bautoonly and OPTIONS.bmanualonly:
raise ValueError("-A and -M are conflict")
- if OPTIONS.commodule:
- if OPTIONS.commodule == 'localhost' and "device:" in OPTIONS.testxml[0]:
- raise ValueError("For single mode, pls set local file to test ")
- COMMODULE_TYPE = OPTIONS.commodule
- else:
- COMMODULE_TYPE = "tizenmobile"
-
- CONNECTOR = Connector({"testmode": COMMODULE_TYPE, "deviceid":OPTIONS.device_serial}).get_connector()
+ elif OPTIONS.bautoonly:
+ exec_types.remove("manual")
+ elif OPTIONS.bmanualonly:
+ exec_types.remove("auto")
+
+ #connector options
+ conn_opt = {}
+ conn_opt['commodule'] = OPTIONS.commodule or "tizenmobile"
+ conn_opt['deviceid'] = OPTIONS.device_serial
+ CONNECTOR = ConnectorBuilder(conn_opt).get_connector()
if CONNECTOR == None:
- LOGGER.error("[ Error: init commodule error... ]\n")
sys.exit(1)
+
device_id = CONNECTOR.get_device_info()['device_id']
if not OPTIONS.non_active:
device_locked = get_device_lock(device_id)
if not device_locked:
- LOGGER.error("[ Error: Failed to get device lock for current instance... ]\n")
+ LOGGER.error("[ Error: Failed to get device for current session... ]\n")
sys.exit(1)
- # create runner
- RUNNER = TRunner(CONNECTOR)
- RUNNER.set_pid_log(PID_FILE)
-
- # apply all options
- RUNNER.set_global_parameters(OPTIONS)
-
- # set capability
- if OPTIONS.capability:
- if not RUNNER.get_capability(OPTIONS.capability):
- unlock_and_exit()
-
if "device:" in OPTIONS.testxml[0]:
+ if not CONNECTOR.is_support_remote():
+ raise ValueError("For '%s' mode, please test file without prefix 'device:' " % conn_opt['commodule'])
+
remote_test = True
try:
if not EXISTS(TEST_PACKAGES_DIR):
tmp_remote_folder, tmp_remote_file[1])
local_test_package = JOIN(
TEST_PACKAGES_DIR, tmp_remote_test_xml)
- down_status = RUNNER.connector.download_file(remote_file, local_test_package)
+ down_status = CONNECTOR.download_file(remote_file, local_test_package)
if not down_status:
- LOGGER.error("can not get test definition file, pls check file on device:%s"
+ LOGGER.error("Failed to get test file '%s' from device"
% remote_file)
unlock_and_exit()
LOCALARRY.append(local_test_package)
OPTIONS.testxml = LOCALARRY
else:
if len(OPTIONS.testxml) == 1:
- i = 0
- start = 0
- end = 0
- LOCAL_TESTLISTS= []
+ i, start, end = 0, 0, 0
+ LOCAL_TESTLISTS = []
temp_xml = OPTIONS.testxml[0]
while(i < len(temp_xml)):
tmp = temp_xml[i:len(temp_xml)]
- if ".xml" in tmp :
- index = tmp.index(".xml")+4+i
+ if ".xml" in tmp:
+ index = tmp.index(".xml") + 4 + i
end = index
- i=index+1
+ i = index + 1
LOCAL_TESTLISTS.append(temp_xml[start:end])
- start = index+1
+ start = index + 1
else:
- print 'no xml found'
+ LOGGER.error("No xml found")
break
OPTIONS.testxml = LOCAL_TESTLISTS
+ # load test engine
+ workername = OPTIONS.worker or 'default'
+ try:
+ exec "from testkitlite.engines.%s import TestWorker" % workername
+ except Exception as error:
+ raise TestEngineException(workername)
+ WORKER = TestWorker(CONNECTOR)
+
+ # create runner
+ RUNNER = TestSession(CONNECTOR, WORKER)
+ # apply all options
+ RUNNER.set_global_parameters(OPTIONS)
+ # set capability
+ if not RUNNER.get_capability(OPTIONS.capability):
+ unlock_and_exit()
# apply filter
WFILTERS = {}
for flt in COMMON_FILTERS:
unlock_and_exit()
# 1) prepare log dir
- if os_ver == "Linux" or os_ver == "Darwin":
+ if OS_VER == "Linux" or OS_VER == "Darwin":
SESSION = datetime.today().isoformat('-')
else:
SESSION = datetime.today().strftime("%Y-%m-%d_%H_%M_%S")
if EXISTS(t):
filename = t
filename = os.path.splitext(filename)[0]
- if os_ver == "Linux" or os_ver == "Darwin":
+ if OS_VER == "Linux" or OS_VER == "Darwin":
if not filename.startswith('/'):
LOGGER.error("[ Error:"
" xml file %s should start with '/' ]" % filename)
for suite in ep.getiterator('suite'):
suite.tail = "\n"
suiteparent.append(suite)
- if OPTIONS.bautoonly:
- WFILTERS['execution_type'] = ["auto"]
- RUNNER.add_filter_rules(**WFILTERS)
- if OPTIONS.bmanualonly:
- WFILTERS['execution_type'] = ["manual"]
- RUNNER.add_filter_rules(**WFILTERS)
+ WFILTERS['execution_type'] = exec_types
+ RUNNER.add_filter_rules(**WFILTERS)
RUNNER.apply_filter(suiteparent)
# merge duplicated test set under suite node
tset_list = set()
"[ Error: create filtered total result file: %s failed, "
"error: %s ]\n" % (resultfile, err))
else:
- print "[ Have no test xml found ]"
+ print "[ test xml '%s' found ]" % t
unlock_and_exit()
- START_TIME = datetime.today().strftime("%Y-%m-%d_%H_%M_%S")
- if not OPTIONS.bautoonly:
- if OPTIONS.bmanualonly:
- for t in TESTXMLS:
- try:
- WFILTERS['execution_type'] = ["manual"]
- RUNNER.add_filter_rules(**WFILTERS)
- RUNNER.prepare_run(t, resultdir=CURRENT_LOG_DIR)
- except IOError, err:
- LOGGER.error("[ Error: prepare_run test xml: "
- "%s from testkit-lite failed, error: %s ]\n" % (t, err))
- else:
- for t in TESTXMLS:
- try:
- WFILTERS['execution_type'] = ["auto"]
- RUNNER.add_filter_rules(**WFILTERS)
- RUNNER.prepare_run(t, resultdir=CURRENT_LOG_DIR)
- except IOError, err:
- LOGGER.error("[ Error: prepare_run test xml: "
- "%s from testkit-lite failed, error: %s ]\n" % (t, err))
- for t in TESTXMLS:
- try:
- WFILTERS['execution_type'] = ["manual"]
- RUNNER.add_filter_rules(**WFILTERS)
- RUNNER.prepare_run(t, resultdir=CURRENT_LOG_DIR)
- except IOError, err:
- LOGGER.error("[ Error: prepare_run test xml: "
- "%s from testkit-lite failed, error: %s ]\n" % (t, err))
- else:
- for t in TESTXMLS:
+ for t in TESTXMLS:
+ for e_type in exec_types:
try:
- WFILTERS['execution_type'] = ["auto"]
+ WFILTERS['execution_type'] = [e_type]
RUNNER.add_filter_rules(**WFILTERS)
RUNNER.prepare_run(t, resultdir=CURRENT_LOG_DIR)
except IOError, err:
- clean_testxml(TESTXMLS, remote_test)
LOGGER.error("[ Error: prepare_run test xml: "
"%s from testkit-lite failed, error: %s ]\n" % (t, err))
+ START_TIME = datetime.today().strftime("%Y-%m-%d_%H_%M_%S")
try:
can_merge_result = True
RUNNER.run_case(CURRENT_LOG_DIR)
- except TestCaseNotFoundException, err:
+ except (TestEngineException, TestCaseNotFoundException), err:
LOGGER.info("\n[ Error: exiting testkit-lite on error: %s ]\n" % err)
unlock_and_exit()
except Exception, err:
# Yuanyuan,Zou <yuanyuanx.zou@intel.com>
-commoduleimpldir = /usr/lib/python2.7/dist-packages/commodule/impl
+commoduleimpldir = /usr/lib/python2.7/dist-packages/commodule
dist_commoduleimpl_SCRIPTS = *.py
import threading
import re
-from commodule.log import LOGGER
-from commodule.autoexec import shell_command, shell_command_ext
-from commodule.killall import killall
-from commodule.connector import InvalidDeviceException
+from testkitlite.util.log import LOGGER
+from testkitlite.util.autoexec import shell_command, shell_command_ext
+from testkitlite.util.killall import killall
+from testkitlite.util.errors import InvalidDeviceException
LOCAL_HOST_NS = "127.0.0.1"
def __init__(self, device_id=None):
self.deviceid = device_id
+ self.support_remote = True
+
+ def is_support_remote(self):
+ return self.support_remote
def shell_cmd(self, cmd="", timeout=15):
cmdline = "adb -s %s shell %s" % (self.deviceid, cmd)
timeout=None,
boutput=False,
stdout_file=None,
- stderr_file=None):
+ stderr_file=None,
+ callbk=None):
cmdline = "adb -s %s shell '%s; echo returncode=$?'" % (
self.deviceid, cmd)
- return shell_command_ext(cmdline, timeout, boutput, stdout_file, stderr_file)
+ return shell_command_ext(cmdline, timeout, boutput, stdout_file, stderr_file, callbk)
def get_device_info(self):
"""get android deivce inforamtion"""
import re
from shutil import copyfile
-from commodule.log import LOGGER
-from commodule.autoexec import shell_command, shell_command_ext
+from testkitlite.util.log import LOGGER
+from testkitlite.util.autoexec import shell_command, shell_command_ext
HOST_NS = "127.0.0.1"
def __init__(self):
self.deviceid = "localhost"
+ self.support_remote = False
+
+ def is_support_remote(self):
+ return self.support_remote
def shell_cmd(self, cmd="", timeout=15):
return shell_command(cmd, timeout)
import re
from shutil import copyfile
-from commodule.log import LOGGER
-from commodule.autoexec import shell_command, shell_command_ext
-from commodule.killall import killall
-from commodule.connector import InvalidDeviceException
+from testkitlite.util.log import LOGGER
+from testkitlite.util.autoexec import shell_command, shell_command_ext
+from testkitlite.util.killall import killall
+from testkitlite.util.errors import InvalidDeviceException
HOST_NS = "127.0.0.1"
os.environ['no_proxy'] = HOST_NS
+
+# common constants
RPM_INSTALL = "ssh %s rpm -ivh %s"
RPM_UNINSTALL = "ssh %s rpm -e %s"
RPM_LIST = "ssh %s rpm -qa | grep tct"
APP_QUERY_STR = "ssh %s \"ps aux |grep '%s'|grep -v grep\"|awk '{print $2}'"
APP_KILL_STR = "ssh %s kill -9 %s"
+APP_NONBLOCK_STR = "ssh %s \"%s &\""
+SSH_COMMAND_RTN = "ssh %s \"%s; echo returncode=$?\""
+SSH_COMMAND_APP = "ssh %s \"su - app -c 'export DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/5000/dbus/user_bus_socket; %s; echo returncode=$?'\""
+
+# wrt-launcher constants
+WRT_MAIN = "wrt-launcher"
WRT_QUERY_STR = "ssh %s \"wrt-launcher -l|grep '%s'|grep -v grep\"|awk '{print $2\":\"$NF}'"
-WRT_START_STR = "ssh %s wrt-launcher -s %s"
+WRT_START_STR = "ssh %s 'wrt-launcher -s %s; echo returncode=$?'"
WRT_STOP_STR = "ssh %s wrt-launcher -k %s"
WRT_INSTALL_STR = "ssh %s wrt-installer -i %s"
WRT_UNINSTL_STR = "ssh %s wrt-installer -un %s"
-WGT_LOCATION = "/opt/usr/media/tct/opt/%s/%s.wgt"
+WRT_LOCATION = "/opt/usr/media/tct/opt/%s/%s.wgt"
+
+# crosswalk constants
+XWALK_MAIN = "xwalkctl"
+XWALK_QUERY_STR = "ssh %s \"su - app -c 'export DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/5000/dbus/user_bus_socket;xwalkctl' \"| grep -w %s | awk '{print $(NF-1)}'"
+XWALK_START_STR = "ssh %s \"su - app -c 'export DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/5000/dbus/user_bus_socket;xwalk-launcher %s' & \""
+XWALK_INSTALL_STR = "ssh %s \"su - app -c 'export DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/5000/dbus/user_bus_socket;xwalkctl --install %s' \""
+XWALK_UNINSTL_STR = "ssh %s \"su - app -c 'export DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/5000/dbus/user_bus_socket;xwalkctl --uninstall %s' \""
+XWALK_LOCATION = "/opt/usr/media/tct/opt/%s/%s.wgt"
class tizenIVI:
""" Implementation for transfer data
- between Host and tizenivi PC
+ between Host and IVI/PC on SSH connection
"""
def __init__(self, deviceid="root@127.0.0.1"):
self.deviceid = deviceid
+ self._wrt = False
+ self._xwalk = False
+ self.support_remote = True
+
+ def is_support_remote(self):
+ return self.support_remote
def shell_cmd(self, cmd="", timeout=15):
cmd = "ssh %s %s" % (self.deviceid, cmd)
boutput=False,
stdout_file=None,
stderr_file=None):
- cmd = "ssh %s '%s; echo returncode=$?'" % (self.deviceid, cmd)
- return shell_command_ext(cmd, timeout, boutput, stdout_file, stderr_file)
+ if cmd.startswith('app_user@'):
+ cmdline = SSH_COMMAND_APP % (self.deviceid, cmd[9:])
+ else:
+ cmdline = SSH_COMMAND_RTN % (self.deviceid, cmd)
+ return shell_command_ext(cmdline, timeout, boutput, stdout_file, stderr_file)
def shell_cmd_host(self,
cmd="",
cmd = "scp %s %s:%s" % (local_path, self.deviceid, remote_path)
exit_code, ret = shell_command(cmd)
return True
+ def _get_wrt_app(self, test_suite, test_set, fuzzy_match, auto_iu):
+ test_app_id = None
+ if auto_iu:
+ test_wgt = test_set
+ test_wgt_path = WRT_LOCATION % (test_suite, test_wgt)
+ if not self.install_app(test_wgt_path):
+ LOGGER.info("[ failed to install widget \"%s\" in target ]"
+ % test_wgt)
+ return None
+ else:
+ test_wgt = test_suite
+
+ # check if widget installed already
+ cmd = WRT_QUERY_STR % (test_wgt)
+ exit_code, ret = shell_command(cmd)
+ if exit_code == -1:
+ return None
+ for line in ret:
+ items = line.split(':')
+ if len(items) < 1:
+ continue
+ if (fuzzy_match and items[0].find(test_wgt) != -1) or items[0] == test_wgt:
+ test_app_id = items[1].strip('\r\n')
+ break
+
+ if test_app_id is None:
+ LOGGER.info("[ test widget \"%s\" not found in target ]"
+ % test_wgt)
+ return None
+
+ return test_app_id
+
+ def _get_xwalk_app(self, test_suite, test_set, fuzzy_match, auto_iu):
+ test_app_id = None
+ if auto_iu:
+ test_wgt = test_set
+ test_wgt_path = XWALK_LOCATION % (test_suite, test_wgt)
+ if not self.install_app(test_wgt_path):
+ LOGGER.info("[ failed to install widget \"%s\" in target ]"
+ % test_wgt)
+ return None
+ else:
+ test_wgt = test_suite
+
+ # check if widget installed already
+ cmd = XWALK_QUERY_STR % (self.deviceid, test_wgt)
+ exit_code, ret = shell_command(cmd)
+ if exit_code == -1:
+ return None
+ for line in ret:
+ test_app_id = line.strip('\r\n')
+
+ if test_app_id is None:
+ LOGGER.info("[ test widget \"%s\" not found in target ]"
+ % test_wgt)
+ return None
+
+ return test_app_id
def get_launcher_opt(self, test_launcher, test_ext, test_widget, test_suite, test_set):
- """get test option dict """
+ """
+ get test option dict
+ """
test_opt = {}
+ self._wrt = False
+ self._xwalk = False
+ app_id = None
test_opt["suite_name"] = test_suite
test_opt["launcher"] = test_launcher
- test_opt["test_app_id"] = test_suite
- cmd = ""
- if test_launcher.find('WRTLauncher') != -1:
- test_app_id = None
+ if test_widget is not None and test_widget != "":
+ test_suite = test_widget
+ if test_launcher.find('WRTLauncher') >= 0:
+ self._wrt = True
+ test_opt["launcher"] = WRT_MAIN
client_cmds = test_launcher.strip().split()
wrt_tag = client_cmds[1] if len(client_cmds) > 1 else ""
test_opt['fuzzy_match'] = fuzzy_match = wrt_tag.find('z') != -1
test_opt['auto_iu'] = auto_iu = wrt_tag.find('iu') != -1
test_opt['self_exec'] = wrt_tag.find('a') != -1
test_opt['self_repeat'] = wrt_tag.find('r') != -1
- test_opt["launcher"] = "wrt-launcher"
- # test suite need to be installed
- if auto_iu:
- test_wgt = test_set
- test_wgt_path = WGT_LOCATION % (test_suite, test_set)
- if not self.install_app(test_wgt_path):
- LOGGER.info("[ failed to install widget \"%s\" in target ]"
- % test_wgt)
- return None
- else:
- test_wgt = test_suite
-
- # query the whether test widget is installed ok
- cmd = WRT_QUERY_STR % (self.deviceid, test_wgt)
- exit_code, ret = shell_command(cmd)
- if exit_code == -1:
- return None
- print 'id', ret
- for line in ret:
- items = line.split(':')
- if len(items) < 1:
- continue
- if (fuzzy_match and items[0].find(test_wgt) != -1) or items[0] == test_wgt:
- test_app_id = items[1].strip('\r\n')
- break
+ app_id = self._get_wrt_app(test_suite, test_set, fuzzy_match, auto_iu)
+ elif test_launcher.find('xwalk') >= 0 and len(test_launcher) <= 16:
+ self._xwalk = True
+ test_opt["launcher"] = XWALK_MAIN
+ client_cmds = test_launcher.strip().split()
+ xpk_tag = client_cmds[1] if len(client_cmds) > 1 else ""
+ test_opt['fuzzy_match'] = fuzzy_match = xpk_tag.find('z') != -1
+ test_opt['auto_iu'] = auto_iu = xpk_tag.find('iu') != -1
+ test_opt['self_exec'] = xpk_tag.find('a') != -1
+ test_opt['self_repeat'] = xpk_tag.find('r') != -1
+ app_id = self._get_xwalk_app(test_suite, test_set, fuzzy_match, auto_iu)
+ else:
+ app_id = test_launcher
- if test_app_id is None:
- LOGGER.info("[ test widget \"%s\" not found in target ]"
- % test_wgt)
- return None
- else:
- test_opt["test_app_id"] = test_app_id
+ if app_id is None:
+ return None
+ test_opt["test_app_id"] = app_id
return test_opt
def start_debug(self, dlogfile):
debug_flag = False
def launch_app(self, wgt_name):
- timecnt = 0
blauched = False
- print 'widget', wgt_name
- cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
- exit_code, ret = shell_command(cmdline)
- cmdline = WRT_START_STR % (self.deviceid, wgt_name)
- while timecnt < 3:
+ if self._wrt:
+ timecnt = 0
+ cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ cmdline = WRT_START_STR % (self.deviceid, wgt_name)
+ while timecnt < 3:
+ exit_code, ret_out, ret_err = shell_command_ext(cmdline, 30)
+ if exit_code == "0":
+ blauched = True
+ break
+ timecnt += 1
+ time.sleep(3)
+ elif self._xwalk:
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ for line in ret:
+ cmd = APP_KILL_STR % (self.deviceid, line.strip('\r\n'))
+ exit_code, ret = shell_command(cmd)
+ cmdline = XWALK_START_STR % (self.deviceid, wgt_name)
exit_code, ret = shell_command(cmdline)
- if len(ret) > 0 and ret[0].find('launched') != -1:
- blauched = True
- break
- timecnt += 1
time.sleep(3)
+ blauched = True
+ else:
+ cmdline = APP_NONBLOCK_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ time.sleep(3)
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ if ret and len(ret):
+ blauched = True
+
return blauched
def kill_app(self, wgt_name):
- cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
- exit_code, ret = shell_command(cmdline)
+ if self._wrt:
+ cmdline = WRT_STOP_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmdline)
+ elif self._xwalk:
+ cmd = APP_QUERY_STR % (self.deviceid, wgt_name)
+ exit_code, ret = shell_command(cmd)
+ for line in ret:
+ cmd = APP_KILL_STR % (line.strip('\r\n'))
+ exit_code, ret = shell_command(cmd)
return True
def install_app(self, wgt_path="", timeout=90):
- cmd = WRT_INSTALL_STR % (self.deviceid, wgt_path)
+ if self._wrt:
+ cmd = WRT_INSTALL_STR % (self.deviceid, wgt_path)
+ elif self._xwalk:
+ cmd = XWALK_INSTALL_STR % (self.deviceid, wgt_path)
+ else:
+ return True
exit_code, ret = shell_command(cmd, timeout)
if exit_code == -1:
cmd = APP_QUERY_STR % (self.deviceid, wgt_path)
return True
def uninstall_app(self, wgt_name):
- cmd = WRT_UNINSTL_STR % (self.deviceid, wgt_name)
+ if self._wrt:
+ cmd = WRT_UNINSTL_STR % (self.deviceid, wgt_name)
+ elif self._xwalk:
+ cmd = XWALK_UNINSTL_STR % (self.deviceid, wgt_name)
+ else:
+ return True
exit_code, ret = shell_command(cmd)
return True
def get_target_conn(deviceid=None):
""" Get connection for Test Target"""
if deviceid is None or '@' not in deviceid:
- raise InvalidDeviceException('deviceid("username@ip") required by TIZEN-IVI device!')
+ raise InvalidDeviceException('deviceid("username@ip") required by SSH connection!')
return tizenIVI(deviceid)
import re
from shutil import copyfile
-from commodule.log import LOGGER
-from commodule.autoexec import shell_command, shell_command_ext
-from commodule.killall import killall
-from commodule.config import Config
+from testkitlite.util.log import LOGGER
+from testkitlite.util.autoexec import shell_command, shell_command_ext
+from testkitlite.util.killall import killall
HOST_NS = "127.0.0.1"
os.environ['no_proxy'] = HOST_NS
killall(proc.pid)
-class tizenpcPC:
+class tizenHost:
""" Implementation for transfer data on TIZEN localhost
"""
self.deviceid = "localhost"
self._wrt = False
self._xwalk = False
- self._extension = ""
+ self.support_remote = False
+
+ def is_support_remote(self):
+ return self.support_remote
def shell_cmd(self, cmd="", timeout=15):
return shell_command(cmd, timeout)
elif test_launcher.find('xwalk') >= 0 and len(test_launcher) <= 16:
self._xwalk = True
test_opt["launcher"] = XWALK_MAIN
- self._extension = Config.get_extension(test_ext)
client_cmds = test_launcher.strip().split()
xpk_tag = client_cmds[1] if len(client_cmds) > 1 else ""
test_opt['fuzzy_match'] = fuzzy_match = xpk_tag.find('z') != -1
global debug_flag, metux
debug_flag = True
metux = threading.Lock()
- #cmdline = DLOG_CLEAR
- #exit_code, ret = shell_command(cmdline)
- #cmdline = DLOG_WRT
- #threading.Thread(target=debug_trace, args=(cmdline, dlogfile)).start()
def stop_debug(self):
global debug_flag, metux
def get_target_conn():
""" Get connection for Test Target"""
- return tizenpcPC()
+ return tizenHost()
import shutil
import xml.etree.ElementTree as etree
-from commodule.log import LOGGER
-from commodule.autoexec import shell_command, shell_command_ext
-from commodule.killall import killall
-from commodule.config import Config
-from commodule.connector import InvalidDeviceException
+from testkitlite.util.log import LOGGER
+from testkitlite.util.autoexec import shell_command, shell_command_ext
+from testkitlite.util.killall import killall
+from testkitlite.util.errors import InvalidDeviceException
LOCAL_HOST_NS = "127.0.0.1"
self.deviceid = device_id
self._wrt = False
self._xwalk = False
- self._extension = ""
+ self.support_remote = True
+
+ def is_support_remote(self):
+ return self.support_remote
def shell_cmd(self, cmd="", timeout=15):
cmdline = SDB_COMMAND % (self.deviceid, cmd)
elif test_launcher.find('xwalk') >= 0 and len(test_launcher) <= 16:
self._xwalk = True
test_opt["launcher"] = XWALK_MAIN
- self._extension = Config.get_extension(test_ext)
client_cmds = test_launcher.strip().split()
xpk_tag = client_cmds[1] if len(client_cmds) > 1 else ""
test_opt['fuzzy_match'] = fuzzy_match = xpk_tag.find('z') != -1
build_info['model'] = ''
return build_info
-
def get_target_conn(device_id=None):
""" Get connection for Test Target"""
if device_id is None:
# Zhang, Huihui <huihuix.zhang@intel.com>
# Wendong,Sui <weidongx.sun@intel.com>
-SUBDIRS = default
-
testkitliteenginesdir = /usr/lib/python2.7/dist-packages/testkitlite/engines
-dist_testkitliteengines_SCRIPTS = __init__.py
+dist_testkitliteengines_SCRIPTS = *.py
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Chengtao,Liu <chengtaox.liu@intel.com>
+""" The implementation of pyunit test engine"""
+
+import os
+import time
+import sys
+import threading
+import uuid
+import StringIO
+import unittest
+from unittest import TestResult
+from datetime import datetime
+from testkitlite.util.log import LOGGER
+from testkitlite.util.result import TestSetResut
+
+
+ANDROID_UNIT_STATUS = "INSTRUMENTATION_STATUS:"
+ANDROID_UNIT_STATUS_CODE = "INSTRUMENTATION_STATUS_CODE:"
+ANDROID_UNIT_STATUS_LEN = len(ANDROID_UNIT_STATUS)
+ANDROID_UNIT_STATUS_CODE_LEN = len(ANDROID_UNIT_STATUS_CODE)
+ANDROID_UNIT_START = "am instrument -r -w -e class %s %s/android.test.InstrumentationTestRunner"
+DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S"
+result_buffer = None
+
+
+def _case_create(case_id, purpose, status, message):
+ _case = dict()
+ _case['case_id'] = case_id
+ _case['purpose'] = case_id
+ _case['start_at'] = datetime.now().strftime(DATE_FORMAT_STR)
+ if status == '-2':
+ _case['result'] = 'FAIL'
+ elif status == '0':
+ _case['result'] = 'PASS'
+ _case['stdout'] = '[message]' + message
+ _case['end_at'] = datetime.now().strftime(DATE_FORMAT_STR)
+ return _case
+
+
+def _adunit_lines_handler(outstr):
+ """android unittest result wrapper"""
+ lines = outstr.split('\r\n')
+ results = []
+ b_stack = False
+ case_id = purpose = result = message = ''
+ for line in lines:
+ if line.startswith(ANDROID_UNIT_STATUS):
+ content = line[ANDROID_UNIT_STATUS_LEN:].strip()
+ if content.startswith('test='):
+ result = message = ''
+ b_stack = False
+ case_id = content[content.find('test=')+5:]
+ purpose = case_id
+ elif content.startswith('stack='):
+ message = content[content.find('stack=')+6:]
+ b_stack = True
+ elif line.startswith(ANDROID_UNIT_STATUS_CODE):
+ status = line[ANDROID_UNIT_STATUS_CODE_LEN:].strip()
+ if status != '1': # FAIL / PASS
+ results.append(_case_create(case_id, purpose, status, message))
+ else:
+ if b_stack:
+ message += line
+ result_buffer.extend_result(results)
+
+
+def _adunit_test_exec(conn, test_session, test_set_path, result_obj):
+ """function for running core tests"""
+ global result_buffer
+ result_buffer = result_obj
+ result_obj.set_status(0)
+ LOGGER.info('[ android unit test, entry: %s ]' % test_set_path)
+ test_cmd = ANDROID_UNIT_START % (test_set_path, '.'.join(test_set_path.split('.')[:-1]))
+ _code, _out, _error = conn.shell_cmd_ext(cmd=test_cmd, timeout=None, boutput=True, callbk=_adunit_lines_handler)
+ result_obj.set_status(1)
+
+
+class TestWorker(object):
+
+ """Test executor for testkit-lite"""
+
+ def __init__(self, conn):
+ super(TestWorker, self).__init__()
+ self.conn = conn
+ self.server_url = None
+ self.result_obj = None
+ self.opts = dict({'block_size': 300,
+ 'test_type': None,
+ 'auto_iu': False,
+ 'fuzzy_match': False,
+ 'self_exec': False,
+ 'self_repeat': False,
+ 'debug_mode': False
+ })
+
+ def init_test(self, params):
+ """init the test envrionment"""
+ self.opts['testset_name'] = params.get('testset-name', '')
+ self.opts['testsuite_name'] = params.get('testsuite-name', '')
+ self.opts['debug_log_base'] = params.get("debug-log-base", '')
+ return str(uuid.uuid1())
+
+ def run_test(self, sessionid, test_set):
+ """
+ process the execution for a test set
+ """
+ if sessionid is None:
+ return False
+
+ # start debug trace thread
+ self.conn.start_debug(self.opts['debug_log_base'])
+ time.sleep(1)
+ self.result_obj = TestSetResut(
+ self.opts['testsuite_name'], self.opts['testset_name'])
+ self.opts['async_th'] = threading.Thread(
+ target=_adunit_test_exec,
+ args=(self.conn, sessionid, test_set['test_set_src'], self.result_obj)
+ )
+ self.opts['async_th'].start()
+ return True
+
+ def get_test_status(self, sessionid):
+ """poll the test task status"""
+ if sessionid is None:
+ return None
+ result = {}
+ result["msg"] = []
+ result["finished"] = str(self.result_obj.get_status())
+ return result
+
+ def get_test_result(self, sessionid):
+ """get the test result for a test set """
+ result = {}
+ if sessionid is None:
+ return result
+
+ result = self.result_obj.get_result()
+ return result
+
+ def finalize_test(self, sessionid):
+ """clear the test stub and related resources"""
+ if sessionid is None:
+ return False
+
+ if self.result_obj is not None:
+ self.result_obj.set_status(1)
+
+ # stop debug thread
+ self.conn.stop_debug()
+
+ return True
#
# Authors:
# Chengtao,Liu <chengtaox.liu@intel.com>
-""" The implementation test worker"""
+""" The implementation of default test engine"""
import os
import time
import ConfigParser
from datetime import datetime
-from commodule.log import LOGGER
-from commodule.str2 import str2str
-from commodule.httprequest import get_url, http_request
+from testkitlite.util.log import LOGGER
+from testkitlite.util.httprequest import get_url, http_request
+from testkitlite.util.result import TestSetResut
+
CNT_RETRY = 10
DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S"
FILES_ROOT = os.path.expanduser("~") + os.sep
-class TestSetResut(object):
-
- """ test result """
-
- _progress = "execute case: %s # %s...(%s)"
- _mutex = threading.Lock()
-
- def __init__(self, testsuite_name="", testset_name=""):
- self._suite_name = testsuite_name
- self._set_name = testset_name
- self._result = {"cases": []}
- self._finished = 0
-
- def set_status(self, flag=0):
- """set finished tag"""
- self._mutex.acquire()
- self._finished = flag
- self._mutex.release()
-
- def get_status(self):
- """return finished tag"""
- self._mutex.acquire()
- flag = self._finished
- self._mutex.release()
- return flag
-
- def set_result(self, tresult):
- """set cases result to result buffer"""
- self._mutex.acquire()
- self._result = tresult
- self._mutex.release()
-
- def extend_result(self, cases_result=None, print_out=True):
- """update cases result to the result buffer"""
- self._mutex.acquire()
- if cases_result is not None:
- self._result["cases"].extend(cases_result)
-
- if print_out:
- for case_it in cases_result:
- LOGGER.info(self._progress %
- (self._suite_name, case_it['case_id'], case_it['result']))
- if case_it['result'].lower() in ['fail', 'block'] and 'stdout' in case_it:
- LOGGER.info(str2str(case_it['stdout']))
- self._mutex.release()
-
- def get_result(self):
- """get cases result from the result buffer"""
- self._mutex.acquire()
- result = self._result
- self._mutex.release()
- return result
-
-
-def _print_dlog(dlog_file):
- if os.path.exists(dlog_file):
- LOGGER.info('[ start of dlog message ]')
- readbuff = file(dlog_file, "r")
- for line in readbuff.readlines():
- LOGGER.info(line.strip('\n'))
- LOGGER.info('[ end of dlog message ]')
-
-
def _core_test_exec(conn, test_session, test_set_name, exetype, cases_queue, result_obj):
"""function for running core tests"""
exetype = exetype.lower()
"""Test executor for testkit-lite"""
- def __init__(self, client=None):
+ def __init__(self, conn):
super(TestWorker, self).__init__()
- self.conn = client
+ self.conn = conn
self.server_url = None
self.result_obj = None
self.opts = dict({'block_size': 300,
+++ /dev/null
-# Copyright (C) 2012 Intel Corporation
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# Authors:
-# Zhang, Huihui <huihuix.zhang@intel.com>
-# Wendong,Sui <weidongx.sun@intel.com>
-
-testkitliteenginesdefaultdir = /usr/lib/python2.7/dist-packages/testkitlite/engines/default
-dist_testkitliteenginesdefault_SCRIPTS = *.py
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Chengtao,Liu <chengtaox.liu@intel.com>
+""" The implementation of pyunit test engine"""
+
+import os
+import time
+import sys
+import threading
+import uuid
+import StringIO
+import unittest
+from unittest import TestResult
+from datetime import datetime
+from testkitlite.util.log import LOGGER
+from testkitlite.util.result import TestSetResut
+
+
+DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S"
+result_buffer = None
+class LiteTestResult(TestResult):
+
+ """Python unittest result wrapper"""
+
+ def startTest(self, test):
+ super(LiteTestResult, self).startTest(test)
+ self._case = {}
+ case_full_id = test.id()
+ self._case['case_id'] = case_full_id.split('.')[-1]
+ self._case['purpose'] = case_full_id
+ self._case['start_at'] = datetime.now().strftime(DATE_FORMAT_STR)
+
+ def stopTest(self, test):
+ self._case['end_at'] = datetime.now().strftime(DATE_FORMAT_STR)
+ super(LiteTestResult, self).stopTest(test)
+ if result_buffer is not None:
+ result_buffer.extend_result([self._case])
+
+ def addSuccess(self, test):
+ super(LiteTestResult, self).addSuccess(test)
+ self._case['result'] = 'PASS'
+
+ def addError(self, test, err):
+ super(LiteTestResult, self).addError(test, err)
+ _, _exc_str = self.errors[-1]
+ self._case['result'] = 'BLOCK'
+ self._case['stdout'] = '[message]' + _exc_str
+
+ def addFailure(self, test, err):
+ super(LiteTestResult, self).addFailure(test, err)
+ _, _exc_str = self.failures[-1]
+ self._case['result'] = 'FAIL'
+ self._case['stdout'] = '[message]' + _exc_str
+
+
+def _pyunit_test_exec(test_session, test_set_path, result_obj):
+ """function for running core tests"""
+ global result_buffer
+ result_buffer = result_obj
+ result_obj.set_status(0)
+ LOGGER.info('[ pyunit test: %s ]' % test_set_path)
+ try:
+ tests = unittest.TestLoader().discover(test_set_path)
+ unittest.TextTestRunner(resultclass=LiteTestResult, buffer=True).run(tests)
+ except ImportError as error:
+ pass
+ result_obj.set_status(1)
+
+
+class TestWorker(object):
+
+ """Test executor for testkit-lite"""
+
+ def __init__(self, conn):
+ super(TestWorker, self).__init__()
+ self.conn = conn
+ self.server_url = None
+ self.result_obj = None
+ self.opts = dict({'block_size': 300,
+ 'test_type': None,
+ 'auto_iu': False,
+ 'fuzzy_match': False,
+ 'self_exec': False,
+ 'self_repeat': False,
+ 'debug_mode': False
+ })
+
+ def init_test(self, params):
+ """init the test envrionment"""
+ self.opts['testset_name'] = params.get('testset-name', '')
+ self.opts['testsuite_name'] = params.get('testsuite-name', '')
+ self.opts['debug_log_base'] = params.get("debug-log-base", '')
+ return str(uuid.uuid1())
+
+ def run_test(self, sessionid, test_set):
+ """
+ process the execution for a test set
+ """
+ if sessionid is None:
+ return False
+
+ # start debug trace thread
+ self.conn.start_debug(self.opts['debug_log_base'])
+ time.sleep(1)
+ self.result_obj = TestSetResut(
+ self.opts['testsuite_name'], self.opts['testset_name'])
+ self.opts['async_th'] = threading.Thread(
+ target=_pyunit_test_exec,
+ args=(sessionid, test_set['test_set_src'], self.result_obj)
+ )
+ self.opts['async_th'].start()
+ return True
+
+ def get_test_status(self, sessionid):
+ """poll the test task status"""
+ if sessionid is None:
+ return None
+ result = {}
+ result["msg"] = []
+ result["finished"] = str(self.result_obj.get_status())
+ return result
+
+ def get_test_result(self, sessionid):
+ """get the test result for a test set """
+ result = {}
+ if sessionid is None:
+ return result
+
+ result = self.result_obj.get_result()
+ return result
+
+ def finalize_test(self, sessionid):
+ """clear the test stub and related resources"""
+ if sessionid is None:
+ return False
+
+ if self.result_obj is not None:
+ self.result_obj.set_status(1)
+
+ # stop debug thread
+ self.conn.stop_debug()
+
+ return True
# Zhang, Huihui <huihuix.zhang@intel.com>
# Wendong,Sui <weidongx.sun@intel.com>
-testkitlitecommondir = /usr/lib/python2.7/dist-packages/testkitlite/common
-dist_testkitlitecommon_SCRIPTS = *.py
+testkitliteutildir = /usr/lib/python2.7/dist-packages/testkitlite/util
+dist_testkitliteutil_SCRIPTS = *.py
import time
import subprocess
-from .killall import killall
-from .str2 import str2str
+from testkitlite.util.killall import killall
+from testkitlite.util.str2 import str2str
def shell_command(cmd, timeout=15):
timeout=None,
boutput=False,
stdout_file=None,
- stderr_file=None):
+ stderr_file=None,
+ callbk=None):
"""shell executor, return [exitcode, stdout/stderr]
timeout: None means unlimited timeout
boutput: specify whether print output during the command running
def print_log():
"""print the stdout to terminate"""
- sys.stdout.write(rbuffile1.read())
- sys.stdout.write(rbuffile2.read())
- sys.stdout.flush()
+ if callbk and callable(callbk):
+ callbk(rbuffile1.read())
+ else:
+ sys.stdout.write(rbuffile1.read())
+ sys.stdout.write(rbuffile2.read())
+ sys.stdout.flush()
while True:
exit_code = cmd_open.poll()
# Liu,chengtao <chengtaox.liu@intel.com>
"""Test connector for test instance and target instance"""
-from .log import LOGGER
+from testkitlite.util.log import LOGGER
-class InvalidDeviceException(Exception):
- """
- Device_Id not defined / Invalid Exception
- """
- __data = ""
- def __init__(self, data):
- self.__data = data
- def __str__(self):
- return self.__data
-
-
-class Connector:
+class ConnectorBuilder:
"""Communication module for test host and test remote"""
def __init__(self, config):
self.conn = None
- if "testmode" in config:
+ if "commodule" in config:
try:
- exec "from impl.%s import get_target_conn" % config[
- "testmode"]
+ exec "from testkitlite.commodule.%s import get_target_conn" % config[
+ "commodule"]
device_no = config.get('deviceid', None)
if device_no is not None:
self.conn = get_target_conn(device_no)
else:
self.conn = get_target_conn()
except Exception as error:
- LOGGER.error("[ Error: Initialize communication failed,"
- " exception: % s]\n" % error)
+ LOGGER.error("[ Error: Initialize commodule failed: '%s']\n" % error)
def get_connector(self):
"""list the handler instance"""
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2013 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Liu,chengtao <liux.chengtao@intel.com>
+""" The definition of exceptions"""
+
+
+class InvalidDeviceException(Exception):
+ """
+ Device_Id not defined / Invalid Exception
+ """
+ __data = ""
+ def __init__(self, data):
+ self.__data = data
+
+ def __str__(self):
+ return self.__data
+
+class TestCaseNotFoundException(Exception):
+ """
+ Test case not found Exception
+ """
+ __data = ""
+ def __init__(self, data):
+ self.__data = data
+
+ def __str__(self):
+ return self.__data
+
+class TestEngineException(Exception):
+ """
+ Test case not found Exception
+ """
+ def __init__(self, name):
+ self.__engine = name
+
+ def __str__(self):
+ return "Failed to load test engine '%s'" % self.__engine
\ No newline at end of file
import re
import dbus
import time
-from commodule.log import LOGGER
-from commodule.killall import killall
-from commodule.autoexec import shell_command
+from testkitlite.util.log import LOGGER
+from testkitlite.util.killall import killall
+from testkitlite.util.autoexec import shell_command
DEVICE_DBUS = "testkit-lite-dbus"
DEVICE_WHITE_LIST = ['localhost', '127.0.0.1']
--- /dev/null
+#!/usr/bin/python
+#
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Chengtao,Liu <chengtaox.liu@intel.com>
+""" The implementation test result"""
+
+import threading
+from testkitlite.util.log import LOGGER
+from testkitlite.util.str2 import str2str
+
+
+class TestSetResut(object):
+
+ """ test result """
+
+ _progress = "execute case: %s # %s...(%s)"
+ _mutex = threading.Lock()
+
+ def __init__(self, testsuite_name="", testset_name=""):
+ self._suite_name = testsuite_name
+ self._set_name = testset_name
+ self._result = {"cases": []}
+ self._finished = 0
+
+ def set_status(self, flag=0):
+ """set finished tag"""
+ self._mutex.acquire()
+ self._finished = flag
+ self._mutex.release()
+
+ def get_status(self):
+ """return finished tag"""
+ self._mutex.acquire()
+ flag = self._finished
+ self._mutex.release()
+ return flag
+
+ def set_result(self, tresult):
+ """set cases result to result buffer"""
+ self._mutex.acquire()
+ self._result = tresult
+ self._mutex.release()
+
+ def extend_result(self, cases_result=None, print_out=True):
+ """update cases result to the result buffer"""
+ self._mutex.acquire()
+ if cases_result is not None:
+ self._result["cases"].extend(cases_result)
+
+ if print_out:
+ for case_it in cases_result:
+ LOGGER.info(self._progress %
+ (self._suite_name, case_it['case_id'], case_it['result']))
+ if case_it['result'].lower() in ['fail', 'block'] and 'stdout' in case_it:
+ LOGGER.info(str2str(case_it['stdout']))
+ self._mutex.release()
+
+ def get_result(self):
+ """get cases result from the result buffer"""
+ self._mutex.acquire()
+ result = self._result
+ self._mutex.release()
+ return result
from shutil import move
from os import remove
import copy
-from commodule.log import LOGGER
-from commodule.autoexec import shell_command
-from commodule.str2 import str2xmlstr
-
-
-from .worker import TestWorker
+from testkitlite.util.log import LOGGER
+from testkitlite.util.str2 import str2xmlstr
+from testkitlite.util.errors import TestCaseNotFoundException
JOIN = os.path.join
DIRNAME = os.path.dirname
OPT_STUB = 'stub-name'
OPT_SUITE = 'testsuite-name'
OPT_SET = 'testset-name'
+OPT_test_set_src = 'test-set-src'
-class TestCaseNotFoundException(Exception):
- """
- Test case not found Exception
- """
- __data = ""
- def __init__(self, data):
- self.__data = data
- def __str__(self):
- return self.__data
-
-class TRunner:
+class TestSession:
"""
Parse the testdefinition.xml files.
Conduct tests execution.
"""
- def __init__(self, connector):
+ def __init__(self, connector, worker):
""" init all self parameters here """
# dryrun
self.bdryrun = False
self.resultfiles = set()
self.core_auto_files = []
self.core_manual_files = []
+ self.unit_test_files = []
self.skip_all_manual = False
self.testsuite_dict = {}
self.exe_sequence = []
self.first_run = True
self.deviceid = None
self.session_id = None
- self.pid_log = None
self.set_parameters = {}
self.connector = connector
- self.testworker = TestWorker(connector)
self.stub_name = "testkit-stub"
+ self.testworker = worker
self.capabilities = {}
self.has_capability = False
self.rerun = False
# apply user specify test result file
if options.resultfile:
self.resultfile = options.resultfile
- # set the external test WRTLauncher
+ # set the external test
if options.exttest:
self.external_test = options.exttest
if options.debug:
self.rerun = options.rerun
if options.test_prefix:
self.test_prefix = options.test_prefix
-
- def set_pid_log(self, pid_log):
- """ get pid_log file """
- self.pid_log = pid_log
+ if options.worker:
+ self.worker_name = options.worker
def add_filter_rules(self, **kargs):
"""
def prepare_run(self, testxmlfile, resultdir=None):
"""
testxmlfile: target testxml file
- execdir and resultdir: should be the absolute path since TRunner
+ execdir and resultdir: should be the absolute path since TestSession
is the common lib
"""
# resultdir is set to current directory by default
def __split_test_xml(self, resultfile, resultdir):
""" split_test_xml into auto and manual"""
- casefind = etree.parse(resultfile).getiterator('testcase')
- if casefind:
+ setfind = etree.parse(resultfile).getiterator('set')
+ if setfind:
test_file_name = "%s" % BASENAME(resultfile)
test_file_name = os.path.splitext(test_file_name)[0]
self.__splite_external_test(
resultfile, test_file_name, resultdir)
- def __splite_core_test(self, resultfile):
- """select core test"""
- if self.filter_rules["execution_type"] == ["auto"]:
- self.core_auto_files.append(resultfile)
- else:
- self.core_manual_files.append(resultfile)
- self.resultfiles.add(resultfile)
-
def __splite_external_test(self, resultfile, test_file_name, resultdir):
"""select external_test"""
testsuite_dict_value_list = []
else:
self.core_manual_files.append(suitefilename)
self.resultfiles.add(suitefilename)
+ else:
+ self.unit_test_files.append(suitefilename)
+ self.resultfiles.add(suitefilename)
+
filename_diff += 1
if testsuite_dict_add_flag:
self.testsuite_dict[test_file_name] = testsuite_dict_value_list
case_ids = self.filter_rules.get('id')
if case_ids and not self.filter_ok:
raise TestCaseNotFoundException('Test case %s not found!' % case_ids)
+
# run core auto cases
self.__run_core_auto()
# run core manual cases
self.__run_core_manual()
+ # run unit test cases
+ self.__run_unit_test()
+
def __run_core_auto(self):
""" core auto cases run"""
self.core_auto_files.sort()
time.sleep(3)
LOGGER.info("\n[ testing xml: %s.xml ]" % temp_test_xml)
self.current_test_xml = temp_test_xml
- self.__run_with_commodule(core_auto_file)
+ self.__run_with_worker(core_auto_file)
def __run_core_manual(self):
""" core manual cases run """
if self.non_active:
self.skip_all_manual = True
else:
- self.__run_with_commodule(core_manual_file)
+ self.__run_with_worker(core_manual_file)
+
+ def __run_unit_test(self):
+ """ unit test cases run """
+ for ut_file in self.unit_test_files:
+ temp_test_xml = os.path.splitext(ut_file)[0]
+ temp_test_xml = os.path.splitext(temp_test_xml)[0]
+ temp_test_xml = os.path.splitext(temp_test_xml)[0]
+ temp_test_xml += ".auto"
+ # print identical xml file name
+ if self.current_test_xml != temp_test_xml:
+ time.sleep(3)
+ LOGGER.info("\n[ testing xml: %s.xml ]" % temp_test_xml)
+ self.current_test_xml = temp_test_xml
+ self.__run_with_worker(ut_file)
def __run_webapi_test(self, latest_dir):
""" run webAPI test"""
if self.bdryrun:
- LOGGER.info("[ WRTLauncher mode does not support dryrun ]")
+ LOGGER.info("[ Web Test mode does not support dryrun ]")
return True
list_auto = []
% JOIN(latest_dir, webapi_total_file))
self.current_test_xml = JOIN(latest_dir, webapi_total_file)
- self.__run_with_commodule(webapi_file)
+ self.__run_with_worker(webapi_file)
- def __run_with_commodule(self, webapi_file):
+ def __run_with_worker(self, suite_test_xml):
"""run_with_commodule,Initialization,check status,get result"""
try:
# prepare test set list
- test_xml_set_list = self.__split_xml_to_set(webapi_file)
+ test_xml_set_list = self.__split_xml_to_set(suite_test_xml)
# create temporary parameter
for test_xml_set in test_xml_set_list:
LOGGER.info("\n[ run set: %s ]" % test_xml_set)
for test_xml_set in test_xml_set_list:
test_xml_set_tmp = etree.parse(test_xml_set)
set_keep_number = 1
- # LOGGER.debug("[ process set: %s ]" % test_xml_set)
for temp_suite in test_xml_set_tmp.getiterator('suite'):
for test_xml_set_temp_set in temp_suite.getiterator('set'):
if set_keep_number != set_number:
set_keep_number += 1
set_number -= 1
test_xml_set_tmp.write(test_xml_set)
- for empty_set in test_xml_set_list_empty:
- LOGGER.debug("[ remove empty set: %s ]" % empty_set)
- test_xml_set_list.remove(empty_set)
- self.resultfiles.discard(empty_set)
+ # for empty_set in test_xml_set_list_empty:
+ # LOGGER.debug("[ remove empty set: %s ]" % empty_set)
+ # test_xml_set_list.remove(empty_set)
+ # self.resultfiles.discard(empty_set)
if len(test_xml_set_list) > 1:
test_xml_set_list.reverse()
return test_xml_set_list
for total_set in total_suite.getiterator('set'):
for result_suite in result_xml.getiterator('suite'):
for result_set in result_suite.getiterator('set'):
- # when total xml and result xml have same suite
- # name and set name
+ # when total xml and result xml have same suite, set
self.__merge_result_by_name(
result_set, total_set, result_suite, total_suite)
total_xml.write(totalfile)
and result_suite.get('name') == total_suite.get('name'):
if result_set.get('set_debug_msg'):
total_set.set("set_debug_msg", result_set.get('set_debug_msg'))
- # set cases that doesn't have result in result \
- # set to N/A
- # append cases from result set to total set
result_case_iterator = result_set.getiterator(
'testcase')
if result_case_iterator:
- # LOGGER.info("----[ suite: %s, set: %s, time: %s ]"
- #% (result_suite.get('name'), result_set.get('name'),
- # datetime.today().strftime("%Y-%m-%d_%H_%M_%S")))
for result_case in result_case_iterator:
try:
self.__count_result(result_case)
def __count_result(self, result_case):
""" record the pass,failed,block,N/A case number"""
-
if not result_case.get('result'):
result_case.set('result', 'N/A')
# add empty result node structure for N/A case
"casecount", str(len(tset.getiterator('testcase')))
)
parameters.setdefault("current_set_name", xml_set_tmp)
+ if tset.get("test_set_src") is not None:
+ set_entry = self.test_prefix + tset.get("test_set_src")
+ parameters.setdefault("test_set_src", set_entry)
for tcase in tset.getiterator('testcase'):
case_detail_tmp = {}
step_tmp = []
- parameters.setdefault(
- "exetype", tcase.get('execution_type')
- )
-
+ parameters.setdefault("exetype", tcase.get('execution_type'))
parameters.setdefault("type", tcase.get('type'))
case_detail_tmp.setdefault("case_id", tcase.get('id'))
case_detail_tmp.setdefault("purpose", tcase.get('purpose'))
case_tmp.append(case_detail_tmp)
case_order += 1
parameters.setdefault("cases", case_tmp)
+ parameters.setdefault("exetype", "")
+ parameters.setdefault("type", "")
if self.bdryrun:
parameters.setdefault("dryrun", True)
self.set_parameters = parameters
'''
# check test running or end
# if the status id end return True ,else return False
-
session_status = self.testworker.get_test_status(self.session_id)
- # session_status["finished"] == "0" is running
- # session_status["finished"] == "1" is end
if not session_status == None:
if session_status["finished"] == "0":
progress_msg_list = session_status["msg"]
def get_capability(self, file_name):
"""get_capability from file """
-
+ if file_name is None:
+ return True
capability_xml = file_name
capabilities = {}
try:
return False
def __write_set_result(self, testxmlfile, result):
- '''
+ """
get the result JSON form com_module,
write them to orignal testxmlfile
-
- '''
+ """
# write the set_result to set_xml
set_result_xml = testxmlfile
# covert JOSN to python dict string
set_result = result
if 'resultfile' in set_result:
- self.__write_file_result(set_result_xml, set_result)
+ write_file_result(set_result_xml, set_result, self.debug_log_file)
else:
write_json_result(set_result_xml, set_result, self.debug_log_file)
- def __write_file_result(self, set_result_xml, set_result):
- """write xml result file"""
- result_file = set_result['resultfile']
- try:
- if self.rerun:
- LOGGER.info("[ Web UI FW Unit Test Does not support rerun.\
- Result should be N/A ]\n")
- else:
- test_tree = etree.parse(set_result_xml)
- test_em = test_tree.getroot()
- result_tree = etree.parse(result_file)
- result_em = result_tree.getroot()
- dubug_file = BASENAME(self.debug_log_file)
- for result_suite in result_em.getiterator('suite'):
- for result_set in result_suite.getiterator('set'):
- for test_suite in test_em.getiterator('suite'):
- for test_set in test_suite.getiterator('set'):
- if result_set.get('name') == \
- test_set.get('name'):
- result_set.set("set_debug_msg", dubug_file)
- test_suite.remove(test_set)
- test_suite.append(result_set)
- test_tree.write(set_result_xml)
- os.remove(result_file)
- LOGGER.info("[ cases result saved to resultfile ]\n")
- except OSError as error:
- traceback.print_exc()
- LOGGER.error(
- "[ Error: fail to write cases result, error: %s ]\n" % error)
-
def get_capability_form_node(capability_em):
''' splite capability key and value form element tree'''
return summary
-def expand_subcases(tset, tcase, sub_num, result_msg):
+def write_file_result(set_result_xml, set_result, debug_log_file):
+ """write xml result file"""
+ result_file = set_result['resultfile']
+ try:
+ test_tree = etree.parse(set_result_xml)
+ test_em = test_tree.getroot()
+ result_tree = etree.parse(result_file)
+ result_em = result_tree.getroot()
+ dubug_file = BASENAME(debug_log_file)
+ for result_suite in result_em.getiterator('suite'):
+ for result_set in result_suite.getiterator('set'):
+ for test_suite in test_em.getiterator('suite'):
+ for test_set in test_suite.getiterator('set'):
+ if result_set.get('name') == \
+ test_set.get('name'):
+ result_set.set("set_debug_msg", dubug_file)
+ test_suite.remove(test_set)
+ test_suite.append(result_set)
+ test_tree.write(set_result_xml)
+ os.remove(result_file)
+ LOGGER.info("[ cases result saved to resultfile ]\n")
+ except OSError as error:
+ traceback.print_exc()
+ LOGGER.error(
+ "[ Error: fail to write cases result, error: %s ]\n" % error)
+
+
+def __expand_subcases(tset, tcase, sub_num, result_msg):
sub_case_result = result_msg.split("[assert]")[1:]
for i in range(sub_num):
sub_case = copy.deepcopy(tcase)
tset.remove(tcase)
+def __write_by_create(tset, case_results):
+ for case_result in case_results:
+ tcase = etree.Element('testcase')
+ tcase.set('id', case_result['case_id'])
+ tcase.set('purpose', case_result['purpose'])
+ tcase.set('result', case_result['result'].upper())
+ result_info = etree.SubElement(tcase, "result_info")
+ actual_result = etree.SubElement(result_info, "actual_result")
+ actual_result.text = case_result['result'].upper()
+ start = etree.SubElement(result_info, "start")
+ end = etree.SubElement(result_info, "end")
+ stdout = etree.SubElement(result_info, "stdout")
+ stderr = etree.SubElement(result_info, "stderr")
+ if 'start_at' in case_result:
+ start.text = case_result['start_at']
+ if 'end_at' in case_result:
+ end.text = case_result['end_at']
+ if 'stdout' in case_result:
+ stdout.text = str2xmlstr(case_result['stdout'])
+ if 'stderr' in case_result:
+ stderr.text = str2xmlstr(case_result['stderr'])
+ tset.append(tcase)
+
+
+def __write_by_caseid(tset, case_results):
+ for tcase in tset.getiterator('testcase'):
+ for case_result in case_results:
+ if tcase.get("id") == case_result['case_id']:
+ tcase.set('result', case_result['result'].upper())
+ # Check performance test
+ if tcase.find('measurement') is not None:
+ for measurement in tcase.getiterator(
+ 'measurement'):
+ if 'measures' in case_result:
+ m_results = case_result['measures']
+ for m_result in m_results:
+ if measurement.get('name') == \
+ m_result['name'] and 'value' in m_result:
+ measurement.set(
+ 'value', m_result[
+ 'value'])
+ if tcase.find("./result_info") is not None:
+ tcase.remove(tcase.find("./result_info"))
+ result_info = etree.SubElement(tcase, "result_info")
+ actual_result = etree.SubElement(
+ result_info, "actual_result")
+ actual_result.text = case_result['result'].upper()
+ start = etree.SubElement(result_info, "start")
+ end = etree.SubElement(result_info, "end")
+ stdout = etree.SubElement(result_info, "stdout")
+ stderr = etree.SubElement(result_info, "stderr")
+ if 'start_at' in case_result:
+ start.text = case_result['start_at']
+ if 'end_at' in case_result:
+ end.text = case_result['end_at']
+ if 'stdout' in case_result:
+ stdout.text = str2xmlstr(case_result['stdout'])
+ if 'stderr' in case_result:
+ stderr.text = str2xmlstr(case_result['stderr'])
+ if tcase.get("subcase") is not None:
+ sub_num = int(tcase.get("subcase"))
+ result_msg = case_result['stdout']
+ __expand_subcases(tset, tcase, sub_num, result_msg)
+
+
def write_json_result(set_result_xml, set_result, debug_log_file):
''' fetch result form JSON'''
dubug_file = BASENAME(debug_log_file)
for tset in root_em.getiterator('set'):
tset.set("set_debug_msg", dubug_file)
- for tcase in tset.getiterator('testcase'):
- for case_result in case_results:
- if tcase.get("id") == case_result['case_id']:
- tcase.set('result', case_result['result'].upper())
- # Check performance test
- if tcase.find('measurement') is not None:
- for measurement in tcase.getiterator(
- 'measurement'):
- if 'measures' in case_result:
- m_results = case_result['measures']
- for m_result in m_results:
- if measurement.get('name') == \
- m_result['name'] and 'value' in m_result:
- measurement.set(
- 'value', m_result[
- 'value'])
- if tcase.find("./result_info") is not None:
- tcase.remove(tcase.find("./result_info"))
- result_info = etree.SubElement(tcase, "result_info")
- actual_result = etree.SubElement(
- result_info, "actual_result")
- actual_result.text = case_result['result'].upper()
-
- start = etree.SubElement(result_info, "start")
- end = etree.SubElement(result_info, "end")
- stdout = etree.SubElement(result_info, "stdout")
- stderr = etree.SubElement(result_info, "stderr")
- if 'start_at' in case_result:
- start.text = case_result['start_at']
- if 'end_at' in case_result:
- end.text = case_result['end_at']
- if 'stdout' in case_result:
- stdout.text = str2xmlstr(case_result['stdout'])
- if 'stderr' in case_result:
- stderr.text = str2xmlstr(case_result['stderr'])
- if tcase.get("subcase") is not None:
- sub_num = int(tcase.get("subcase"))
- result_msg = case_result['stdout']
- expand_subcases(tset, tcase, sub_num, result_msg)
+ if tset.get('test_set_src') is not None:
+ __write_by_create(tset, case_results)
+ else:
+ __write_by_caseid(tset, case_results)
parse_tree.write(set_result_xml)
-
LOGGER.info("[ cases result saved to resultfile ]\n")
except IOError as error:
traceback.print_exc()