import urllib2
import re
import platform
-#from guppy import hpy
import gc
import sys
class TestCase:
"""Test Case Model"""
- def __init__(self, case_node, dom, case_order, xml_name, package_name):
- self.purpose = case_node.getAttribute("purpose")
-
- if case_node.getElementsByTagName("test_script_entry").item(0) is not None and case_node.getElementsByTagName("test_script_entry").item(0).childNodes.item(0) is not None:
- self.entry = case_node.getElementsByTagName("test_script_entry").item(0).childNodes.item(0).data
- self.e_result = case_node.getElementsByTagName("test_script_entry").item(0).getAttribute("test_script_expected_result")
+ def __init__(self, case_node, case_order, xml_name, package_name):
+ self.purpose = case_node.get("purpose")
+
+ script_node = case_node.find("./description/test_script_entry")
+ if script_node is not None:
+ self.entry = script_node.text
+ self.e_result = script_node.get("test_script_expected_result")
+ if self.e_result is None:
+ self.e_result = ""
else:
self.entry = ""
self.e_result = ""
self.result = ""
self.msg = ""
self.xml_node = case_node
- self.dom_root = dom
self.start_at = datetime.now()
self.end_at = None
self.is_executed = False
self.time_task = None
self.order = case_order
- self.case_id = case_node.getAttribute("id")
+ self.case_id = case_node.get("id")
self.xml_name = xml_name
self.package_name = package_name
- if case_node.getElementsByTagName("pre_condition").item(0) is not None and case_node.getElementsByTagName("pre_condition").item(0).childNodes.item(0) is not None:
- self.pre_con = case_node.getElementsByTagName("pre_condition").item(0).childNodes.item(0).data
- else:
- self.pre_con = ""
- if case_node.getElementsByTagName("post_condition").item(0) is not None and case_node.getElementsByTagName("post_condition").item(0).childNodes.item(0) is not None:
- self.post_con = case_node.getElementsByTagName("post_condition").item(0).childNodes.item(0).data
- else:
+
+ pre_con_node = case_node.find("./description/pre_condition")
+ post_con_node = case_node.find("./description/post_condition")
+
+ self.pre_con = ""
+ self.post_con = ""
+ if pre_con_node is not None:
+ self.pre_con = pre_con_node.text
+ if self.pre_con is None:
+ self.pre_con = ""
+
+ if post_con_node is not None:
+ self.post_con = post_con_node.text
+ if self.post_con is None:
self.post_con = ""
+
self.steps = []
- if case_node.getElementsByTagName("step") is not None:
- for this_step in case_node.getElementsByTagName("step"):
- desc = ""
- expected = ""
- order = this_step.getAttribute("order")
- if this_step.getElementsByTagName("step_desc").item(0) is not None:
- desc = this_step.getElementsByTagName("step_desc").item(0).childNodes.item(0).data
- if this_step.getElementsByTagName("expected") is not None:
- expected = this_step.getElementsByTagName("expected").item(0).childNodes.item(0).data
- test_step = TestStep(desc, expected, order)
- self.steps.append(test_step.to_json())
-
- if case_node.getAttribute("execution_type") is None:
+
+ step_nodes = case_node.findall("./description/steps/step")
+ for step_node in step_nodes:
+ order = step_node.get("order")
+ desc = ""
+ expected = ""
+ if step_node.find("./step_desc") is not None:
+ desc = step_node.find("./step_desc").text
+ if step_node.find("./expected") is not None:
+ expected = step_node.find("./expected").text
+ test_step = TestStep(desc, expected, order)
+ self.steps.append(test_step.to_json())
+
+ if case_node.get("execution_type") is None:
self.e_type = "auto"
else:
- self.e_type = case_node.getAttribute("execution_type")
-
- if case_node.getElementsByTagName("test_script_entry").item(0).getAttribute("timeout") is None:
+ self.e_type = case_node.get("execution_type")
+
+ if (script_node is None) or (script_node.get("timeout") is None):
self.timeout = 90
- elif case_node.getElementsByTagName("test_script_entry").item(0).getAttribute("timeout"):
- self.timeout = int(case_node.getElementsByTagName("test_script_entry").item(0).getAttribute("timeout"))
else:
- self.timeout = 90
+ self.timeout = int(script_node.get("timeout"))
def print_info_string(self):
try:
self.cancel_time_check()
self.result = test_result
self.msg = test_msg
- self.xml_node.setAttribute("result", test_result)
- result_info = self.dom_root.createElement("result_info")
-
- for childNode in self.xml_node.childNodes:
- import xml.dom.minidom
- if childNode.nodeType is xml.dom.Node.ELEMENT_NODE and childNode.tagName is "result_info":
- self.xml_node.removeChild(childNode)
-
- self.xml_node.appendChild(result_info)
- actual_result = self.dom_root.createElement("actual_result")
- actual_result.appendChild(self.dom_root.createTextNode(test_result))
- result_info.appendChild(actual_result)
- start = self.dom_root.createElement("start")
- end = self.dom_root.createElement("end")
- start.appendChild(self.dom_root.createTextNode(str(self.start_at)))
- end.appendChild(self.dom_root.createTextNode(str(datetime.now())))
+ self.xml_node.set("result", test_result)
+ if self.xml_node.find("./result_info") is not None:
+ self.xml_node.remove(self.xml_node.find("./result_info"))
+
+ result_info = ElementTree.SubElement(self.xml_node, "result_info")
+ actual_result = ElementTree.SubElement(result_info, "actual_result")
+ actual_result.text = str(test_result)
- result_info.appendChild(start)
- result_info.appendChild(end)
+ start = ElementTree.SubElement(result_info, "start")
+ end = ElementTree.SubElement(result_info, "end")
+ stdout = ElementTree.SubElement(result_info, "stdout")
+
+ start.text = str(self.start_at)
+ end.text = str(datetime.now())
+ stdout.text = self.msg
- stdout = self.dom_root.createElement("stdout")
- stdout.appendChild(self.dom_root.createTextNode(self.msg))
- result_info.appendChild(stdout)
-
def set_start_at(self, start_at):
self.start_at = start_at
if self.timeout > 0:
self.time_task.start()
def toXmlNode(self):
- return self.xml_node.toprettyxml(indent=" ")
+ return self.xml_node
def cancel_time_check(self):
if self.time_task is not None:
def read_test_definition(self):
if TestkitWebAPIServer.default_params.has_key("testsuite"):
try:
- from xml.dom.minidom import parse
suites_dict = TestkitWebAPIServer.default_params["testsuite"]
exe_sequence = TestkitWebAPIServer.default_params["exe_sequence"]
index = 1
for package_name in exe_sequence:
suites_array = suites_dict[package_name]
for xml_name in suites_array:
- tmp_xml_root = parse(xml_name)
- for node in tmp_xml_root.getElementsByTagName('testcase'):
- tc = TestCase(node, tmp_xml_root, index, xml_name, package_name)
- index = index + 1
- if tc.is_manual():
- TestkitWebAPIServer.manual_test_cases[tc.purpose] = tc
- else:
- TestkitWebAPIServer.auto_test_cases[tc.purpose] = tc
- if tc.purpose in TestkitWebAPIServer.auto_case_id_array:
- try:
- print "[ Warning: the purpose '%s' is already in the list ]" % tc.purpose
- except Exception, e:
- print "[ Warning: the purpose '%s' is already in the list ]" % str2str(tc.purpose)
- print "[ Error: found unprintable character in case purpose, error: %s ]\n" % e
- else:
- TestkitWebAPIServer.auto_case_id_array.append(tc.purpose)
-
+ single_xml_tree = ElementTree.parse(xml_name)
+ print "[XML name : %s] ------------------------------------------------------------------------------------------------------------" % xml_name
+ tmp_xml_root = single_xml_tree.getroot()
+ for tmp_suite in tmp_xml_root.findall('suite'):
+ for tmp_set in tmp_suite.findall('set'):
+ for node in tmp_set.findall('testcase'):
+ tc = TestCase(node, index, xml_name, package_name)
+ index = index + 1
+ if tc.is_manual():
+ TestkitWebAPIServer.manual_test_cases[tc.purpose] = tc
+ else:
+ TestkitWebAPIServer.auto_test_cases[tc.purpose] = tc
+ if tc.purpose in TestkitWebAPIServer.auto_case_id_array:
+ try:
+ print "[ Warning: the purpose '%s' is already in the list ]" % tc.purpose
+ except Exception, e:
+ print "[ Warning: the purpose '%s' is already in the list ]" % str2str(tc.purpose)
+ print "[ Error: found unprintable character in case purpose, error: %s ]\n" % e
+ else:
+ TestkitWebAPIServer.auto_case_id_array.append(tc.purpose)
+
if TestkitWebAPIServer.xml_dom_root is None:
TestkitWebAPIServer.xml_dom_root = tmp_xml_root
else:
- for suite_node in tmp_xml_root.getElementsByTagName('suite'):
- definition_node = TestkitWebAPIServer.xml_dom_root.childNodes[0]
+ for suite_node in tmp_xml_root.findall('suite'):
+ definition_node = TestkitWebAPIServer.xml_dom_root
if definition_node is None:
- definition_node = TestkitWebAPIServer.xml_dom_root.createElement("test_definition")
- definition_node.appendChild(suite_node)
+ definition_node = ElementTree.Element("test_definition")
+ definition_node.append(suite_node)
TestkitWebAPIServer.xml_dom_root = TestkitWebAPIServer.xml_dom_root
TestkitWebAPIServer.iter_params.update({TestkitWebAPIServer.auto_index_key: 0})
except Exception, e:
print "[ auto case number: %d, manual case number: %d ]" % (len(TestkitWebAPIServer.auto_test_cases), len(TestkitWebAPIServer.manual_test_cases))
collected = gc.collect()
print "[ Garbage collector: collected %d objects. ]" % (collected)
+
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Content-Length", str(len(json.dumps({"OK": 1}))))
print "[ wait 5sec to release memory]"
time.sleep(5)
# write result to file
- result_xml = TestkitWebAPIServer.xml_dom_root.toprettyxml(indent=" ")
+ result_xml = ElementTree.tostring(TestkitWebAPIServer.xml_dom_root, "utf-8")
for key, value in self.auto_test_cases.iteritems():
value.cancel_time_check()
self.save_RESULT(result_xml, TestkitWebAPIServer.default_params["resultfile"])
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"OK": 1}))
+
# close server
- TestkitWebAPIServer.this_server.socket.close()
+ try:
+ TestkitWebAPIServer.this_server.socket.close()
+ except Exception, e:
+ print "[ Error: fail to close webapi http server, error: %s ]" % e
def auto_test_task(self):
if TestkitWebAPIServer.start_auto_test:
xml_name = t[0]
package_name = t[1]
resultfile = t[2]
- print "[ reloading test case definitions ]"
- #TestkitWebAPIServer.default_params.update(parameters)
+ print "[ reloading test case definitions with the XML %s ]" % xml_name
suites_dict = {}
exe_sequence = [package_name]
suite_array = [xml_name]
suites_dict[package_name] = suite_array
TestkitWebAPIServer.default_params["testsuite"] = suites_dict
+ print "[]"
TestkitWebAPIServer.default_params["exe_sequence"] = exe_sequence
TestkitWebAPIServer.default_params["resultfile"] = resultfile
return False
def start_server_up(server):
- server.serve_forever()
+ try:
+ server.serve_forever()
+ except IOError:
+ print "\n[ warnning, a IO error is raised, if the server is shutting down, please ignore it. ]"
def startup(parameters):
try: