[public_version]
-version=3.1.3
+version=3.1.4
[internal_version]
-version=3.1.3
+version=3.1.4
url = "https://github.com/testkit/testkit-lite",
author = "Shaofeng Tang",
author_email = "shaofeng.tang@intel.com",
- version = "3.1.0",
+ version = "3.1.4",
include_package_data = True,
data_files = [('/opt/testkit/lite', ['VERSION', 'doc/testkit-lite_user_guide.pdf', 'doc/testkit-lite_tutorial.pdf', 'doc/test_definition_schema.pdf']),
('/opt/testkit/lite/commodule/', ['CONFIG']),
supports browser or other web-runtime"),
make_option("-k", "--worker", dest="worker", action="store",
help="Specify a test engine for execution, use value 'default' by default"),
+ make_option("-p", "--target-platform", dest="targetplatform",
+ action="store",
+ help="specify special test target platform, e.g. xw_android, chrome_ubuntu"),
+ make_option("--webdriver-url", dest="wdurl", action="store",
+ help="specify special web driver listening url"),
make_option("--version", dest="version_info", action="store_true",
help="Show version information"),
make_option("--internal-version", dest="internal_version_info",
--- /dev/null
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Zhang, Huihui <huihuix.zhang@intel.com>
+# Wendong,Sui <weidongx.sun@intel.com>
+
+testkitlitecapdir = /usr/lib/python2.7/dist-packages/testkitlite/capability
+dist_testkitlitecap_SCRIPTS = *.py
--- /dev/null
+def initCapability(test_app_name=None):
+ capability = {'chrome.binary': '/usr/bin/chromium-browser'}
+ return {'desired_capabilities': capability, 'test_prefix': 'file:///'}
--- /dev/null
+def initCapability(test_app_name=None):
+ capability = {'xwalkOptions': {'androidPackage': 'org.xwalk.%s' %
+ test_app_name, 'androidActivity': '.%sActivity' % test_app_name}}
+ return {'desired_capabilities': capability, 'test_prefix': 'file:///android_asset/www/'}
--- /dev/null
+import os
+import re
+import time
+import sys
+import thread
+import threading
+import socket
+import json
+import hashlib
+import signal
+import logging
+import subprocess
+import ConfigParser
+from testkitlite.util import tr_utils
+from urlparse import urlparse
+
+TE = None
+EXE_LOCK = threading.Lock()
+DEFAULT_TIMEOUT = 90
+TE_LOG_LEVEL = logging.DEBUG
+REF_SET_TYPE = 'ref'
+JS_SET_TYPE = 'js'
+SCRIPT_SET_TYPE = 'script'
+STR_PASS = 'PASS'
+STR_FAIL = 'FAIL'
+STR_BLOCK = 'BLOCK'
+STR_NOTRUN = 'n/a'
+DEFAULT_WD_URL = 'http://127.0.0.1:9515'
+MH_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mh.html")
+
+
+class TestExecuter:
+
+ def __init__(self, test_env=None):
+ self.runner_proc = test_env['runner_proc']
+ self.exe_thread = None
+ self.exe_status = 'READY'
+ self.tests_json = ''
+ self.target_platform = test_env['target_platform']
+ self.web_driver = None
+ self.wd_url = test_env['wd_url']
+ self.suite_name = test_env['suite_name']
+ self.set_type = test_env['set_type']
+ self.set_exetype = test_env['set_exetype']
+ self.test_prefix = ''
+ self.exe_socket_file = test_env['exe_socket_file']
+ self.exe_socket_buff_size = test_env['exe_socket_buff_size']
+ self.exe_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.exe_socket.connect(self.exe_socket_file)
+ self.TE_LOG = logging.getLogger("TestExecuter")
+ self.TE_LOG.setLevel(TE_LOG_LEVEL)
+ file_handler = logging.FileHandler(
+ os.path.join(test_env['session_dir'], 'te.log'))
+ file_handler.setLevel(TE_LOG_LEVEL)
+ file_formatter = logging.Formatter(
+ "[TE] %(asctime)s - %(levelname)s - %(message)s")
+ file_handler.setFormatter(file_formatter)
+ self.TE_LOG.addHandler(file_handler)
+ stream_handler = logging.StreamHandler()
+ if test_env['log_debug']:
+ stream_handler.setLevel(logging.DEBUG)
+ else:
+ stream_handler.setLevel(logging.INFO)
+ stream_formatter = logging.Formatter("[TE] %(message)s")
+ stream_handler.setFormatter(stream_formatter)
+ self.TE_LOG.addHandler(stream_handler)
+ signal.signal(signal.SIGINT, self.__exitHandler)
+ signal.signal(signal.SIGTERM, self.__exitHandler)
+
+ def __exitHandler(self, a, b):
+ if self.web_driver:
+ self.web_driver.quit()
+ self.web_driver = None
+ with EXE_LOCK:
+ self.exe_status = 'DONE'
+
+ def __updateTestPrefix(self):
+ if self.target_platform.upper().find('ANDROID') >= 0:
+ url_components = urlparse(self.web_driver.current_url)
+ if url_components.scheme == 'http':
+ self.test_prefix = '%s://%s/' % (url_components.scheme,
+ url_components.netloc)
+
+ def __initWebDriver(self):
+ from selenium.webdriver.remote.webdriver import WebDriver
+ from selenium.webdriver.support.ui import WebDriverWait
+ global WebDriver
+ global WebDriverWait
+ if self.web_driver:
+ self.web_driver.quit()
+ self.web_driver = None
+
+ try:
+ if self.wd_url == '':
+ self.wd_url = DEFAULT_WD_URL
+
+ test_app_name = ''
+ if self.target_platform.upper().find('ANDROID') >= 0:
+ test_app_name = self.suite_name.replace('-', '_')
+ self.TE_LOG.debug(
+ 'Got ANDROID platform, update the app name to %s' % test_app_name)
+ else:
+
+ test_app_name = self.suite_name
+
+ exec 'from testkitlite.capability.%s import initCapability' % self.target_platform
+ driver_env = initCapability(test_app_name)
+ self.test_prefix = driver_env['test_prefix']
+ self.web_driver = WebDriver(
+ self.wd_url, driver_env['desired_capabilities'])
+
+ self.__updateTestPrefix()
+ except Exception, e:
+ self.TE_LOG.error('Init Web Driver failed: %s' % e)
+ return False
+ return True
+
+ def __talkWithRunnerRecv(self):
+ try:
+ exe_data = self.exe_socket.recv(self.exe_socket_buff_size)
+ exe_json = json.loads(exe_data)
+ command = exe_json['COMMAND']
+ data = exe_json['DATA']
+ self.TE_LOG.debug('Receive Data: %s' % exe_json)
+ except Exception, e:
+ self.TE_LOG.debug('Receive data failed, %s' % e)
+ time.sleep(2)
+ return (None, None)
+ return (command, data)
+
+ def __talkWithRunnerSend(self, data=None):
+ try:
+ self.TE_LOG.debug('Send Data: %s' % data)
+ self.exe_socket.send(json.dumps(data))
+ except Exception, e:
+ self.TE_LOG.debug('Send data failed, %s' % e)
+ time.sleep(2)
+ return False
+ return True
+
+ def __initWebManualHarness(self):
+ if self.target_platform.upper().find('CHROME') >= 0:
+ self.web_driver.get('%s%s' % (self.test_prefix, MH_FILE))
+ else:
+ print self.test_prefix
+ self.web_driver.get('%s/index.html' % self.test_prefix)
+
+ try:
+ harness_page_file = open(MH_FILE)
+ harness_page_raw = harness_page_file.read()
+ harness_page_file.close()
+ except Exception, e:
+ self.TE_LOG.debug('Read manual harness file failed: %s' % e)
+ return False
+ harness_page_raw = harness_page_raw.replace(
+ '\n', '').replace('"', '\\"').replace("'", "\\'")
+ self.web_driver.execute_script(
+ "document.write(\"%s\")" % harness_page_raw)
+ self.web_driver.execute_script("document.close()")
+ self.web_driver.execute_script("init_mh()")
+
+ return True
+
+ def __runWebManualTests(self):
+ if not self.__initWebManualHarness():
+ self.TE_LOG.error(
+ 'Init web manual harness failed, exit from executer')
+ return False
+
+ case_num = len(self.tests_json['cases'])
+ i_case = 0
+ while True:
+ try:
+ if i_case >= (case_num - 1):
+ i_case = case_num - 1
+ self.web_driver.execute_script(
+ "document.getElementById(\"forward-bt\").disabled=\"true\"")
+ else:
+ self.web_driver.execute_script(
+ "document.getElementById(\"forward-bt\").disabled=\"\"")
+
+ if i_case <= 0:
+ i_case = 0
+ self.web_driver.execute_script(
+ "document.getElementById(\"back-bt\").disabled=\"true\"")
+ else:
+ self.web_driver.execute_script(
+ "document.getElementById(\"back-bt\").disabled=\"\"")
+
+ if self.set_type == REF_SET_TYPE:
+ self.web_driver.execute_script(
+ "document.getElementById(\"test-entry\").textContent=\"%s%s\"" % (self.test_prefix, self.tests_json['cases'][i_case]['entry']))
+ self.web_driver.execute_script(
+ "document.getElementById(\"refer-test-entry\").textContent=\"%s%s\"" % (self.test_prefix, self.tests_json['cases'][i_case]['refer_entry']))
+ self.web_driver.execute_script(
+ "document.getElementById(\"run-refer-test-bt\").style.display=\"\"")
+ self.web_driver.execute_script(
+ "document.getElementById(\"refer-test-entry-area\").style.display=\"\"")
+ elif self.set_type == JS_SET_TYPE:
+ self.web_driver.execute_script(
+ "document.getElementById(\"test-entry\").textContent=\"%s%s\"" % (self.test_prefix, self.tests_json['cases'][i_case]['entry']))
+ self.web_driver.execute_script(
+ "document.getElementById(\"run-refer-test-bt\").style.display=\"none\"")
+ self.web_driver.execute_script(
+ "document.getElementById(\"refer-test-entry-area\").style.display=\"none\"")
+
+ WebDriverWait(self.web_driver, 3600).until(lambda strdiff: self.web_driver.execute_script(
+ "return document.getElementById(\"case-info-area\").className") != "READY")
+ i_case_status = self.web_driver.find_element_by_id(
+ "case-info-area").get_attribute("class")
+ self.web_driver.execute_script(
+ "document.getElementById(\"case-info-area\").className = \"READY\"")
+ if i_case_status in [STR_PASS, STR_FAIL, STR_BLOCK]:
+ self.tests_json['cases'][i_case]['result'] = i_case_status
+ self.TE_LOG.info("Cases %s: %s" % (self.tests_json['cases'][i_case][
+ 'case_id'], self.tests_json['cases'][i_case]['result']))
+ i_case = i_case + 1
+ elif i_case_status == "FORWARD":
+ i_case = i_case + 1
+ elif i_case_status == "BACK":
+ i_case = i_case - 1
+ else:
+ break
+ except Exception, e:
+ self.tests_json['cases'][i_case]['result'] = STR_BLOCK
+ self.TE_LOG.error("Run %s: failed: %s, exit from executer" %
+ (self.tests_json['cases'][i_case]['case_id'], e))
+ break
+
+ def __checkPageNotFound(self, page_url=None):
+ if self.web_driver.current_url.find('data:text/html,chromewebdata') >= 0:
+ self.TE_LOG.debug("Page not found: %s" %
+ self.web_driver.current_url)
+ return False
+ else:
+ return True
+
+ def __runRefTests(self, haha=None, kkkk=None):
+ for i_case in self.tests_json['cases']:
+ with EXE_LOCK:
+ if self.exe_status == 'DONE':
+ return False
+
+ i_case['start_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ try:
+ i_case_timeout = i_case['timeout']
+ self.TE_LOG.debug(
+ "Using special timeout value: %s" % i_case_timeout)
+ except Exception, e:
+ i_case_timeout = DEFAULT_TIMEOUT
+
+ i_page_url = '%s%s' % (self.test_prefix, i_case['entry'])
+ try:
+ self.web_driver.set_page_load_timeout(i_case_timeout)
+ self.web_driver.implicitly_wait(i_case['onload_delay'])
+ self.web_driver.get(i_page_url)
+ except Exception, e:
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked by %s" % (i_case['case_id'], e))
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ continue
+
+ if not self.__checkPageNotFound(i_page_url):
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked, page not found" % i_case['case_id'])
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ i_case['stdout'] = "page not found"
+ continue
+
+ try:
+ test01_md5 = hashlib.md5(
+ self.web_driver.get_screenshot_as_base64()).hexdigest().upper()
+ except Exception, e:
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked by %s" % (i_case['case_id'], e))
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ continue
+
+ try:
+ i_refer_case_timeout = i_case['refer_timeout']
+ except Exception, e:
+ i_refer_case_timeout = DEFAULT_TIMEOUT
+
+ i_ref_page_url = '%s%s' % (self.test_prefix, i_case['refer_entry'])
+ try:
+ self.web_driver.set_page_load_timeout(i_refer_case_timeout)
+ self.web_driver.implicitly_wait(i_case['onload_delay'])
+ self.web_driver.get(i_ref_page_url)
+ except Exception, e:
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked by %s" % (i_case['case_id'], e))
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ continue
+
+ if not self.__checkPageNotFound(i_ref_page_url):
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked, ref page not found" % i_case['case_id'])
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ continue
+
+ try:
+ test02_md5 = hashlib.md5(
+ self.web_driver.get_screenshot_as_base64()).hexdigest().upper()
+ except Exception, e:
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked by %s" % (i_case['case_id'], e))
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ continue
+
+ if test01_md5 == test02_md5:
+ i_case['result'] = STR_PASS
+ else:
+ i_case['result'] = STR_FAIL
+
+ self.TE_LOG.info("Cases %s: %s" %
+ (i_case['case_id'], i_case['result']))
+
+ def __getCaseIndex(self, url):
+ try:
+ value_pos = url.index('value')
+ if value_pos == -1:
+ return 0
+ eq_value = url[value_pos:]
+ eq_index = eq_value.index('=')
+ if eq_index == -1:
+ return 0
+ sub_value = eq_value[eq_index + 1:]
+ if sub_value is not None:
+ return sub_value
+ except Exception, e:
+ return 0
+
+ def __checkUrlSame(self, pre_url, url):
+ try:
+ if pre_url == '' or url == '':
+ return False
+
+ index_pre_url = pre_url.index('?')
+ if index_pre_url == -1:
+ return False
+ ab_pre_url = pre_url[0:index_pre_url]
+
+ index_url = url.index('?')
+ if index_url == -1:
+ return False
+ ab_url = url[0:index_url]
+ if ab_pre_url == ab_url:
+ return True
+ else:
+ return False
+ except Exception:
+ return False
+
+ def __runJSTests(self, haha=None, kkkk=None):
+ element_index = -1
+ for i_case in self.tests_json['cases']:
+ element_index += 1
+ with EXE_LOCK:
+ if self.exe_status == 'DONE':
+ return False
+ i_case['start_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ try:
+ sub_index = 0
+ url_equal = False
+ if element_index:
+ pre_url = self.tests_json['cases'][
+ element_index - 1]['entry']
+ url_equal = self.__checkUrlSame(pre_url, i_case['entry'])
+ i_case_timeout = i_case['timeout']
+ self.TE_LOG.debug(
+ "Using special timeout value: %s" % i_case_timeout)
+ except Exception, e:
+ i_case_timeout = DEFAULT_TIMEOUT
+
+ i_page_url = '%s%s' % (self.test_prefix, i_case['entry'])
+ try:
+ self.web_driver.set_page_load_timeout(i_case_timeout)
+ sub_index = self.__getCaseIndex(i_case['entry'])
+ if not url_equal:
+ self.web_driver.implicitly_wait(i_case['onload_delay'])
+ self.web_driver.get(i_page_url)
+ except Exception, e:
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.debug(
+ "Cases %s: blocked by %s" % (i_case['case_id'], e))
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ continue
+
+ if not self.__checkPageNotFound(i_page_url):
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.info(
+ "Cases %s: blocked, page not found" % i_case['case_id'])
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ i_case['stdout'] = "page not found"
+ continue
+
+ try:
+ if sub_index:
+ sub_index = int(sub_index) - 1
+ table = self.web_driver.find_element_by_xpath(
+ "//table[@id='results']")
+ tr = table.find_elements_by_xpath(".//tbody/tr")[sub_index]
+ sub_result = tr.find_elements_by_xpath(".//td")[0].text
+ error_message = tr.find_elements_by_xpath(".//td")[2].text
+ if sub_result.upper() == 'PASS':
+ i_case['result'] = STR_PASS
+ elif sub_result.upper() == 'FAIL':
+ i_case['result'] = STR_FAIL
+ i_case['stdout'] = error_message
+ else:
+ i_case['result'] = STR_BLOCK
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ except Exception, e:
+ result = self.web_driver.find_element_by_class_name('pass')
+ if result.text == STR_FAIL:
+ i_case['result'] = STR_FAIL
+ elif result.text == STR_PASS:
+ i_case['result'] = STR_PASS
+ else:
+ i_case['result'] = STR_BLOCK
+ i_case['end_at'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S", time.localtime())
+ self.TE_LOG.info("Cases %s: %s" %
+ (i_case['case_id'], i_case['result']))
+
+ def __runScriptTests(self, haha=None, kkkk=None):
+ for i_case in self.tests_json['cases']:
+ with EXE_LOCK:
+ if self.exe_status == 'DONE':
+ return False
+ try:
+ i_case_timeout = int(i_case['timeout'])
+ self.TE_LOG.debug(
+ "Using special timeout value: %s" % i_case_timeout)
+ except Exception, e:
+ i_case_timeout = DEFAULT_TIMEOUT
+
+ try:
+ i_case_proc = subprocess.Popen(
+ args="%s" % i_case['entry'], shell=True)
+ i_case_pre_time = time.time()
+ while True:
+ i_case_exit_code = i_case_proc.poll()
+ i_case_elapsed_time = time.time() - i_case_pre_time
+ if i_case_exit_code == None:
+ if i_case_elapsed_time >= i_case_timeout:
+ tr_utils.KillAllProcesses(ppid=i_case_proc.pid)
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.debug(
+ "Run %s timeout" % i_case['case_id'])
+ break
+ elif str(i_case_exit_code) == str(i_case['expected_result']):
+ i_case['result'] = STR_PASS
+ break
+ else:
+ i_case['result'] = STR_FAIL
+ break
+ time.sleep(1)
+ self.TE_LOG.info("Cases %s: %s" %
+ (i_case['case_id'], i_case['result']))
+ except Exception, e:
+ i_case['result'] = STR_BLOCK
+ self.TE_LOG.error(
+ "Run %s: failed: %s, exit from executer" % (i_case['case_id'], e))
+ break
+
+ def __runTests(self, haha=None, kkkk=None):
+ for i_case in self.tests_json['cases']:
+ i_case['result'] = STR_NOTRUN
+ if self.set_exetype == "manual":
+ self.__runWebManualTests()
+ elif self.set_type == REF_SET_TYPE:
+ self.__runRefTests()
+ elif self.set_type == JS_SET_TYPE:
+ self.__runJSTests()
+ elif self.set_type == SCRIPT_SET_TYPE:
+ self.__runScriptTests()
+
+ with EXE_LOCK:
+ self.exe_status = 'DONE'
+
+ return True
+
+ def runTestsExecuter(self):
+ if not self.set_type in [REF_SET_TYPE, JS_SET_TYPE, SCRIPT_SET_TYPE]:
+ self.TE_LOG.error(
+ "Unsupported set type %s, exit from executer" % self.set_type)
+ return False
+
+ if self.set_type in [REF_SET_TYPE, JS_SET_TYPE]:
+ try:
+ if not self.__initWebDriver():
+ if self.web_driver:
+ self.web_driver.quit()
+ self.web_driver = None
+ self.TE_LOG.error("Exit from executer")
+ return False
+ except Exception, e:
+ if self.web_driver:
+ self.web_driver.quit()
+ self.TE_LOG.error(
+ "Init Web Driver failed: %s, exit from executer" % e)
+ return False
+
+ while True:
+ if not tr_utils.pidExists(self.runner_proc):
+ if self.set_type in [REF_SET_TYPE, JS_SET_TYPE]:
+ if self.web_driver:
+ self.web_driver.quit()
+ self.web_driver = None
+ self.TE_LOG.debug('Can not find runner, exit from executer')
+ return False
+
+ exe_command, exe_data = self.__talkWithRunnerRecv()
+ if exe_command == 'GET_STATUS':
+ with EXE_LOCK:
+ self.__talkWithRunnerSend(
+ {'COMMAND': exe_command, 'DATA': self.exe_status})
+ elif exe_command == 'TESTS':
+ with EXE_LOCK:
+ self.exe_status = 'RUNNING'
+ self.tests_json = exe_data['data']
+ self.exe_thread = thread.start_new_thread(
+ self.__runTests, (1, 2))
+ self.__talkWithRunnerSend(
+ {'COMMAND': exe_command, 'DATA': 'OK'})
+ elif exe_command == 'GET_RESULTS':
+ if not self.__talkWithRunnerSend({'COMMAND': exe_command, 'DATA': self.tests_json}):
+ continue
+ with EXE_LOCK:
+ self.exe_status = 'READY'
+ elif exe_command == 'TERMINAL':
+ if self.web_driver:
+ self.web_driver.quit()
+ self.web_driver = None
+ with EXE_LOCK:
+ self.exe_status = 'DONE'
+ self.__talkWithRunnerSend(
+ {'COMMAND': exe_command, 'DATA': 'OK'})
+ else:
+ continue
+
+ def EndExecuter(self):
+ if self.web_driver:
+ self.web_driver.quit()
+ self.web_driver = None
+ return True
--- /dev/null
+
+#!/usr/bin/python
+#
+# Copyright (C) 2012 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# Authors:
+# Chengtao,Liu <chengtaox.liu@intel.com>
+""" The implementation of default test engine"""
+import os
+import time
+import sys
+import logging
+import json
+import socket
+import shutil
+import uuid
+import threading
+import re
+import ConfigParser
+from datetime import datetime
+from tempfile import mktemp
+import xml.etree.ElementTree as etree
+from testkitlite.engines import test_executer
+from shutil import copyfile
+from testkitlite.util import tr_utils
+from testkitlite.util.log import LOGGER
+from testkitlite.util.result import TestSetResut
+
+LOG = logging.getLogger("TestWorker")
+EXECUTER_POLLING_INTERVAL = 2
+CNT_RETRY = 10
+
+
+def initExecuter(test_env=None):
+ TE = test_executer.TestExecuter(test_env)
+ TE.runTestsExecuter()
+
+
+def _run_webdrvier_test(self, cases, result_obj):
+ """
+ process the execution for web api test
+ may be splitted to serveral blocks,
+ with the unit size defined by block_size
+ """
+ case_count = len(cases)
+ blknum = 0
+ if case_count % self.opts['block_size'] == 0:
+ blknum = case_count / self.opts['block_size']
+ else:
+ blknum = case_count / self.opts['block_size'] + 1
+
+ idx = 1
+ test_set_queues = []
+ while idx <= blknum:
+ block_data = {}
+ start = (idx - 1) * self.opts['block_size']
+ if idx == blknum:
+ end = case_count
+ else:
+ end = idx * self.opts['block_size']
+ block_data = cases[start:end]
+ test_set_queues.append({'cases': block_data})
+ idx += 1
+
+ self.testcases = []
+ get_result = False
+ abort_from_set = False
+
+ for section_json in test_set_queues:
+ if result_obj.get_status() == 1:
+ break
+ LOG.info("Loading a new section for testing ...")
+ time.sleep(EXECUTER_POLLING_INTERVAL)
+ get_result = False
+ while True:
+ if result_obj.get_status() == 1:
+ break
+ exe_command, exe_data = self.talkWithEXE(
+ 'GET_STATUS', '', 0)
+ if exe_command == 'GET_STATUS':
+ if exe_data == 'READY':
+ exe_command, exe_data = self.talkWithEXE(
+ 'TESTS', {'data': section_json}, 0)
+ if exe_command != 'TESTS' or exe_data != 'OK':
+ LOG.debug('Send tests failed')
+ result_obj.set_status(1)
+ break
+ continue
+ elif exe_data == 'RUNNING':
+ continue
+ elif exe_data == 'DONE':
+ exe_command, exe_data = self.talkWithEXE(
+ 'GET_RESULTS', '', 0)
+ if exe_data is not None and len(exe_data):
+ result_obj.extend_result(exe_data['cases'])
+ get_result = True
+ break
+ elif exe_data == 'ERROR':
+ LOG.error('Executer got error')
+ get_result = True
+ result_obj.set_status(1)
+ break
+ else:
+ abort_from_set = True
+ result_obj.set_status(1)
+ break
+ if abort_from_set:
+ LOG.error('Exit from current set execution')
+ break
+ result_obj.set_status(1)
+ exe_command, exe_data = self.talkWithEXE(
+ 'TERMINAL', '', 1)
+
+
+class TestWorker(object):
+
+ """Test executor for testkit-lite"""
+
+ def __init__(self, conn):
+ super(TestWorker, self).__init__()
+ self.conn = conn
+ self.server_url = None
+ self.result_obj = None
+ self.opts = dict({'block_size': 20,
+ 'test_type': None,
+ 'exe_socket_buff_size': 20480,
+ 'runner_proc': os.getpid(),
+ })
+ self.testcases = []
+ self.runner_proc = self.opts['runner_proc']
+ self.exe_socket_file = os.path.expanduser(
+ "~") + os.sep + str(self.runner_proc) + '.socket'
+ self.exe_proc = None
+ self.exe_socket = None
+ self.exe_socket_connect = None
+
+ def init_test(self, params):
+ """init the test envrionment"""
+ """init the test envrionment"""
+ self.opts['testset_name'] = params.get('testset-name', '')
+ self.opts['suite_name'] = params.get('testsuite-name', '')
+ self.opts['debug_log_base'] = params.get("debug-log-base", '')
+ self.opts['wd_url'] = params.get("wd_url", '')
+ self.opts['target_platform'] = params.get("target_platform", '')
+ self.opts['set_type'] = params.get("set_type", '')
+ self.opts['set_exetype'] = params.get("set_exetype", '')
+ self.opts['session_dir'] = params.get("session_dir", '')
+ self.opts['log_debug'] = params.get("log_debug", '')
+ self.opts['exe_socket_file'] = self.exe_socket_file
+
+ if not self.__exitExecuter():
+ LOG.debug('__exitExecuter failed')
+ return None
+
+ if self.__initExecuterSocket():
+ time.sleep(EXECUTER_POLLING_INTERVAL)
+ if (not self.exe_proc) or (not tr_utils.pidExists(self.exe_proc)):
+ LOG.debug('Executer not existing')
+ return None
+ else:
+ timecnt = 0
+ blaunched = False
+ while timecnt < CNT_RETRY:
+ exe_command, exe_data = self.talkWithEXE(
+ 'GET_STATUS', '', 0)
+ if exe_command == 'GET_STATUS':
+ if exe_data == 'READY':
+ blaunched = True
+ break
+ else:
+ timecnt += 1
+ if not blaunched:
+ LOGGER.info("[ launch stub process failed! ]")
+ return None
+ else:
+ return str(uuid.uuid1())
+
+ def __initExecuter(self):
+ try:
+ new_proc = os.fork()
+
+ if new_proc == 0:
+ initExecuter(self.opts)
+ sys.exit(0)
+ else:
+ self.exe_proc = new_proc
+ LOG.debug('Runner Proc: %s, Executer Proc: %s' %
+ (self.runner_proc, self.exe_proc))
+ return True
+ except OSError, e:
+ return False
+
+ def __exitExecuter(self):
+ if self.exe_socket:
+ self.exe_socket_connect.close()
+ self.exe_socket.close()
+ self.exe_socket = None
+ try:
+ os.remove(self.exe_socket_file)
+ except Exception, e:
+ pass
+
+ if self.exe_proc and tr_utils.pidExists(self.exe_proc):
+ if not tr_utils.KillAllProcesses(self.exe_proc):
+ return False
+ self.exe_proc = None
+ return True
+
+ def __initExecuterSocket(self):
+ if not self.exe_socket:
+ try:
+ os.remove(self.exe_socket_file)
+ except OSError:
+ pass
+ try:
+ self.exe_socket = socket.socket(
+ socket.AF_UNIX, socket.SOCK_STREAM)
+ self.exe_socket.bind(self.exe_socket_file)
+ self.exe_socket.listen(1)
+ except Exception, e:
+ LOG.error('Setup socket failed')
+ return False
+ if not self.__initExecuter():
+ LOG.error('Init Executer failed')
+ if self.exe_proc and tr_utils.pidExists(self.exe_proc):
+ killProcGroup(self.exe_proc)
+ self.exe_proc = None
+ self.exe_socket.close()
+ self.exe_socket = None
+ return False
+ self.exe_socket_connect, addr = self.exe_socket.accept()
+ return True
+
+ def talkWithEXE(self, command=None, data=None, recv_timeout=None):
+ LOG.debug('Start send: %s, %s' % (command, data))
+ try:
+ self.exe_socket.settimeout(recv_timeout)
+ self.exe_socket_connect.send(
+ json.dumps({'COMMAND': command, 'DATA': data}))
+ exe_data = self.exe_socket_connect.recv(
+ self.opts['exe_socket_buff_size'])
+ exe_json = json.loads(exe_data)
+ if exe_json['COMMAND']:
+ command = exe_json['COMMAND']
+ if exe_json['DATA']:
+ data = exe_json['DATA']
+ LOG.debug('Got: %s, %s' % (command, data))
+ except Exception, e:
+ LOG.error('Talk with executer failed: %s, kill executer' % e)
+ self.__exitExecuter()
+ return (None, None)
+
+ return (command, data)
+
+ def __saveSectionToSetJSON(self, testcases, exe_data):
+ if exe_data:
+ for i_case in exe_data['cases']:
+ testcases.append(i_case)
+
+ return testcases
+
+ def run_test(self, sessionid, test_set):
+ """
+ process the execution for a test set
+ """
+ if sessionid is None:
+ return False
+
+ if not "cases" in test_set:
+ return False
+
+ if len(test_set["cases"]) == 0:
+ return False
+
+ self.result_obj = TestSetResut(
+ self.opts['suite_name'], self.opts['testset_name'])
+ cases, exetype, ctype = test_set[
+ "cases"], test_set["exetype"], test_set["type"]
+
+ self.opts['async_th'] = threading.Thread(
+ target=_run_webdrvier_test,
+ args=(self, cases, self.result_obj)
+ )
+ self.opts['async_th'].start()
+ return True
+
+ def get_test_status(self, sessionid):
+ """poll the test task status"""
+ if sessionid is None:
+ return None
+ result = {}
+ result["msg"] = []
+ result["finished"] = str(self.result_obj.get_status())
+ return result
+
+ def get_test_result(self, sessionid):
+ """get the test result for a test set """
+ result = {}
+ if sessionid is None:
+ return result
+
+ result = self.result_obj.get_result()
+ return result
+
+ def finalize_test(self, sessionid):
+ """clear the test stub and related resources"""
+ if sessionid is None:
+ return False
+
+ if self.result_obj is not None:
+ self.result_obj.set_status(1)
+
+ # remove socketfile
+
+ return True
OPT_DEBUG = 'debug'
OPT_RERUN = 'rerun'
OPT_WIDGET = 'test-widget'
-OPT_STUB = 'stub-name'
+OPT_STUB = 'stub-name'
OPT_SUITE = 'testsuite-name'
OPT_SET = 'testset-name'
OPT_test_set_src = 'test-set-src'
self.resultfiles = set()
self.core_auto_files = []
self.core_manual_files = []
- self.unit_test_files = []
+ self.unit_test_files = []
self.skip_all_manual = False
self.testsuite_dict = {}
self.exe_sequence = []
self.rerun = False
self.test_prefix = ""
self.filter_ok = False
+ self.targetplatform = None
+ self.wdurl = ""
def set_global_parameters(self, options):
"get all options "
self.test_prefix = options.test_prefix
if options.worker:
self.worker_name = options.worker
+ if options.targetplatform:
+ self.targetplatform = options.targetplatform
+ if options.wdurl:
+ self.wdurl = options.wdurl
def add_filter_rules(self, **kargs):
"""
# resultdir is set to current directory by default
if not resultdir:
resultdir = os.getcwd()
+ self.session_dir = resultdir
try:
filename = testxmlfile
filename = os.path.splitext(filename)[0]
self.resultfiles.add(suitefilename)
else:
self.unit_test_files.append(suitefilename)
- self.resultfiles.add(suitefilename)
+ self.resultfiles.add(suitefilename)
filename_diff += 1
if testsuite_dict_add_flag:
try:
parse_tree = etree.parse(xml_set_tmp)
root_em = parse_tree.getroot()
+ tsuite = root_em.getiterator('suite')[0]
case_tmp = []
+ parameters.setdefault("suite_name", tsuite.get('name'))
for tset in root_em.getiterator('set'):
case_order = 1
parameters.setdefault(
"casecount", str(len(tset.getiterator('testcase')))
)
parameters.setdefault("current_set_name", xml_set_tmp)
+
+ parameters.setdefault("name", tset.get('name'))
+ parameters.setdefault("type", tset.get('type'))
+ parameters.setdefault(
+ "exetype", '')
+
if tset.get("test_set_src") is not None:
set_entry = self.test_prefix + tset.get("test_set_src")
parameters.setdefault("test_set_src", set_entry)
for tcase in tset.getiterator('testcase'):
case_detail_tmp = {}
step_tmp = []
- parameters.setdefault("exetype", tcase.get('execution_type'))
- parameters.setdefault("type", tcase.get('type'))
+ parameters["exetype"] = tcase.get('execution_type')
case_detail_tmp.setdefault("case_id", tcase.get('id'))
case_detail_tmp.setdefault("purpose", tcase.get('purpose'))
case_detail_tmp.setdefault("order", str(case_order))
case_detail_tmp["location"] = tcase.find(
'description/test_script_entry'
).get('location')
- for this_step in tcase.getiterator("step"):
- step_detail_tmp = {}
- step_detail_tmp.setdefault("order", "1")
- step_detail_tmp["order"] = str(this_step.get('order'))
-
- if this_step.find("step_desc") is not None:
- text = this_step.find("step_desc").text
- if text is not None:
- step_detail_tmp["step_desc"] = text
-
- if this_step.find("expected") is not None:
- text = this_step.find("expected").text
- if text is not None:
- step_detail_tmp["expected"] = text
-
- step_tmp.append(step_detail_tmp)
+ tc_refer_entry = ""
+ if tcase.find('description/refer_test_script_entry') is not None:
+ tc_refer_entry = tcase.find(
+ 'description/refer_test_script_entry').text
+
+ case_detail_tmp["refer_entry"] = tc_refer_entry
+
+ if tcase.find('description/refer_test_script_entry')is not None:
+ case_detail_tmp["refer_timeout"] = tcase.find(
+ 'description/refer_test_script_entry').get('timeout')
+ if tcase.find('description/refer_test_script_entry')is not None:
+ case_detail_tmp["refer_expected_result"] = tcase.find(
+ 'description/refer_test_script_entry').get('test_script_expected_result')
+ if tcase.find('description/refer_test_script_entry') is not None:
+ case_detail_tmp["refer_location"] = tcase.find(
+ 'description/refer_test_script_entry').get('location')
+
+ if tcase.getiterator("step"):
+ for this_step in tcase.getiterator("step"):
+ step_detail_tmp = {}
+ step_detail_tmp.setdefault("order", "1")
+ step_detail_tmp["order"] = str(
+ this_step.get('order'))
+
+ if this_step.find("step_desc") is not None:
+ text = this_step.find("step_desc").text
+ if text is not None:
+ step_detail_tmp["step_desc"] = text
+
+ if this_step.find("expected") is not None:
+ text = this_step.find("expected").text
+ if text is not None:
+ step_detail_tmp["expected"] = text
+
+ step_tmp.append(step_detail_tmp)
case_detail_tmp['steps'] = step_tmp
case_tmp.append(case_detail_tmp)
case_order += 1
parameters.setdefault("cases", case_tmp)
- parameters.setdefault("exetype", "")
- parameters.setdefault("type", "")
if self.bdryrun:
parameters.setdefault("dryrun", True)
self.set_parameters = parameters
starup_parameters[OPT_RERUN] = self.rerun
if len(self.capabilities) > 0:
starup_parameters[OPT_CAPABILITY] = self.capabilities
+ # for webdriver
+ starup_parameters['target_platform'] = self.targetplatform
+ starup_parameters['wd_url'] = self.wdurl
+ starup_parameters['set_type'] = self.set_parameters['type']
+ starup_parameters['set_exetype'] = self.set_parameters['exetype']
+ starup_parameters['session_dir'] = self.session_dir
+ starup_parameters['log_debug'] = self.debug
+
except IOError as error:
LOGGER.error(
"[ Error: prepare starup parameters, error: %s ]" % error)
def __write_by_caseid(tset, case_results):
+ tset.set("set_debug_msg", "N/A")
for tcase in tset.getiterator('testcase'):
for case_result in case_results:
if tcase.get("id") == case_result['case_id']:
--- /dev/null
+import os
+import signal
+import sys
+import logging
+
+LOG = logging.getLogger("TestRunner")
+
+
+def pidExists(pid):
+ if pid < 0:
+ return False
+ try:
+ os.kill(pid, 0)
+ except OSError as e:
+ return False
+ else:
+ return True
+
+
+def IsWindows():
+ return sys.platform == 'cygwin' or sys.platform.startswith('win')
+
+
+def KillAllProcesses(ppid=None):
+ if IsWindows():
+ subprocess.check_call("TASKKILL /F /PID %s /T" % ppid)
+ else:
+ ppid = str(ppid)
+ pidgrp = []
+
+ def GetChildPids(ppid):
+ command = "ps -ef | awk '{if ($3 ==%s) print $2;}'" % str(ppid)
+ pids = os.popen(command).read()
+ pids = pids.split()
+
+ return pids
+
+ pidgrp.extend(GetChildPids(ppid))
+ for pid in pidgrp:
+ pidgrp.extend(GetChildPids(pid))
+
+ pidgrp.insert(0, ppid)
+ while len(pidgrp) > 0:
+ pid = pidgrp.pop()
+ try:
+ os.kill(int(pid), signal.SIGKILL)
+ return True
+ except OSError:
+ try:
+ os.popen("kill -9 %d" % int(pid))
+ return True
+ except:
+ return False