3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
20 """ Test Reporters implementation. """
28 from .loggable import Loggable
29 from xml.sax import saxutils
30 from .utils import Result, printc, Colors
32 UNICODE_STRINGS = (type(str()) == type(str())) # noqa
35 class UnknownResult(Exception):
39 CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
43 """Replaces invalid XML characters with '?'."""
44 return CONTROL_CHARACTERS.sub('?', value)
47 def escape_cdata(cdata):
48 """Escape a string for an XML CDATA section."""
49 return xml_safe(cdata).replace(']]>', ']]>]]><![CDATA[')
52 class Reporter(Loggable):
55 def __init__(self, options):
56 Loggable.__init__(self)
58 self.options = options
60 self.stats = {'timeout': 0,
69 """Initialize a timer before starting tests."""
70 self._start_time = time.time()
72 def set_failed(self, test):
73 if test.result == Result.SKIPPED:
74 self.stats["skipped"] += 1
76 self.stats["failures"] += 1
78 def set_passed(self, test):
79 if test.result == Result.KNOWN_ERROR:
80 self.stats["known_error"] += 1
82 self.stats["passed"] += 1
84 def add_results(self, test):
85 self.debug("%s", test)
86 if test.result == Result.PASSED or \
87 test.result == Result.KNOWN_ERROR:
89 elif test.result == Result.FAILED or \
90 test.result == Result.TIMEOUT or \
91 test.result == Result.SKIPPED:
94 raise UnknownResult("%s" % test.result)
96 def after_test(self, test):
97 if test not in self.results:
98 self.results.append(test)
100 self.add_results(test)
102 def final_report(self):
104 lenstat = (len("Statistics") + 1)
105 printc("Statistics:\n%s" % (lenstat * "-"), Colors.OKBLUE)
106 if self._start_time > 0:
107 printc("\n%sTotal time spent: %s seconds\n" %
108 ((lenstat * " "), datetime.timedelta(
109 seconds=(time.time() - self._start_time))),
111 printc("%sPassed: %d" %
112 (lenstat * " ", self.stats["passed"]), Colors.OKGREEN)
113 printc("%sSkipped: %d" %
114 (lenstat * " ", self.stats["skipped"]), Colors.WARNING)
115 printc("%sFailed: %d" %
116 (lenstat * " ", self.stats["failures"]), Colors.FAIL)
117 printc("%sKnown error: %d" %
118 (lenstat * " ", self.stats["known_error"]), Colors.OKBLUE)
120 (lenstat * " ", (len("Failed: 0")) * "-"), Colors.OKBLUE)
122 total = self.stats["failures"] + self.stats["passed"]
123 color = Colors.WARNING
124 if total == self.stats["passed"]:
125 color = Colors.OKGREEN
126 elif total == self.stats["failures"]:
129 printc("%sTotal: %d" % (lenstat * " ", total), color)
131 return self.stats["failures"]
134 class XunitReporter(Reporter):
136 """This reporter provides test results in the standard XUnit XML format."""
140 def __init__(self, options):
141 super(XunitReporter, self).__init__(options)
143 self._createTmpFile()
145 def final_report(self):
147 return super(XunitReporter, self).final_report()
149 def _get_all_logs_data(self, test):
150 if not self.options.redirect_logs:
154 value = test.get_log_content()
156 captured += escape_cdata(value)
157 for extralog in test.extra_logfiles:
158 captured += "\n\n===== %s =====\n\n" % escape_cdata(
159 os.path.basename(extralog))
160 value = test.get_extra_log_content(extralog)
161 captured += escape_cdata(value)
165 def _get_captured(self, test):
166 return '<system-out><![CDATA[%s]]></system-out>' % self._get_all_logs_data(test)
168 def _quoteattr(self, attr):
169 """Escape an XML attribute. Value can be unicode."""
170 attr = xml_safe(attr)
171 if isinstance(attr, str) and not UNICODE_STRINGS:
172 attr = attr.encode(self.encoding)
173 return saxutils.quoteattr(attr)
176 """Writes an Xunit-formatted XML file
178 The file includes a report of test errors and failures.
181 self.debug("Writing XML file to: %s", self.options.xunit_file)
182 xml_file = codecs.open(self.options.xunit_file, 'w',
183 self.encoding, 'replace')
185 self.stats['encoding'] = self.encoding
186 self.stats['total'] = (self.stats['timeout'] + self.stats['failures']
187 + self.stats['passed'] + self.stats['skipped'])
189 xml_file.write('<?xml version="1.0" encoding="%(encoding)s"?>'
190 '<testsuite name="gst-validate-launcher" tests="%(total)d" '
191 'errors="%(timeout)d" failures="%(failures)d" '
192 'skipped="%(skipped)d">' % self.stats)
194 tmp_xml_file = codecs.open(self.tmp_xml_file.name, 'r',
195 self.encoding, 'replace')
197 for l in tmp_xml_file:
200 xml_file.write('</testsuite>')
203 os.remove(self.tmp_xml_file.name)
205 self._createTmpFile()
207 def _createTmpFile(self):
208 self.tmp_xml_file = tempfile.NamedTemporaryFile(delete=False)
209 self.tmp_xml_file.close()
211 def set_failed(self, test):
212 """Add failure output to Xunit report.
214 super().set_failed(test)
216 xml_file = codecs.open(self.tmp_xml_file.name, 'a',
217 self.encoding, 'replace')
218 xml_file.write(self._forceUnicode(
219 '<testcase name=%(name)s time="%(taken).3f">'
220 '<failure type=%(errtype)s message=%(message)s>%(logs)s'
221 '</failure></testcase>' %
222 {'name': self._quoteattr(test.get_classname() + '.' + test.get_name()),
223 'taken': test.time_taken,
224 'logs': self._get_all_logs_data(test),
225 'errtype': self._quoteattr(test.result),
226 'message': self._quoteattr(test.message),
230 def set_passed(self, test):
231 """Add success output to Xunit report.
233 self.stats['passed'] += 1
235 xml_file = codecs.open(self.tmp_xml_file.name, 'a',
236 self.encoding, 'replace')
237 xml_file.write(self._forceUnicode(
238 '<testcase name=%(name)s '
239 'time="%(taken).3f">%(systemout)s</testcase>' %
240 {'name': self._quoteattr(test.get_classname() + '.' + test.get_name()),
241 'taken': test.time_taken,
242 'systemout': self._get_captured(test),
246 def _forceUnicode(self, s):
247 if not UNICODE_STRINGS:
248 if isinstance(s, str):
249 s = s.decode(self.encoding, 'replace')