Move files from gst-devtools into the "subprojects/gst-devtools/" subdir
[platform/upstream/gstreamer.git] / subprojects / gst-devtools / validate / launcher / reporters.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the
17 # Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 # Boston, MA 02110-1301, USA.
19
20 """ Test Reporters implementation. """
21
22 import os
23 import re
24 import time
25 import codecs
26 import datetime
27 import tempfile
28 from .loggable import Loggable
29 from xml.sax import saxutils
30 from .utils import Result, printc, Colors
31
32 UNICODE_STRINGS = (type(str()) == type(str()))  # noqa
33
34
35 class UnknownResult(Exception):
36     pass
37
38
39 CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
40
41
42 def xml_safe(value):
43     """Replaces invalid XML characters with '?'."""
44     return CONTROL_CHARACTERS.sub('?', value)
45
46
47 def escape_cdata(cdata):
48     """Escape a string for an XML CDATA section."""
49     return xml_safe(cdata).replace(']]>', ']]>]]&gt;<![CDATA[')
50
51
52 class Reporter(Loggable):
53     name = 'simple'
54
55     def __init__(self, options):
56         Loggable.__init__(self)
57
58         self.options = options
59         self._start_time = 0
60         self.stats = {'timeout': 0,
61                       'failures': 0,
62                       'passed': 0,
63                       'skipped': 0,
64                       'known_error': 0
65                       }
66         self.results = []
67
68     def init_timer(self):
69         """Initialize a timer before starting tests."""
70         self._start_time = time.time()
71
72     def set_failed(self, test):
73         if test.result == Result.SKIPPED:
74             self.stats["skipped"] += 1
75         else:
76             self.stats["failures"] += 1
77
78     def set_passed(self, test):
79         if test.result == Result.KNOWN_ERROR:
80             self.stats["known_error"] += 1
81         else:
82             self.stats["passed"] += 1
83
84     def add_results(self, test):
85         self.debug("%s", test)
86         if test.result == Result.PASSED or \
87                 test.result == Result.KNOWN_ERROR:
88             self.set_passed(test)
89         elif test.result == Result.FAILED or \
90                 test.result == Result.TIMEOUT or \
91                 test.result == Result.SKIPPED:
92             self.set_failed(test)
93         else:
94             raise UnknownResult("%s" % test.result)
95
96     def after_test(self, test):
97         if test not in self.results:
98             self.results.append(test)
99
100         self.add_results(test)
101
102     def final_report(self):
103         print("\n")
104         lenstat = (len("Statistics") + 1)
105         printc("Statistics:\n%s" % (lenstat * "-"), Colors.OKBLUE)
106         if self._start_time > 0:
107             printc("\n%sTotal time spent: %s seconds\n" %
108                    ((lenstat * " "), datetime.timedelta(
109                     seconds=(time.time() - self._start_time))),
110                    Colors.OKBLUE)
111         printc("%sPassed: %d" %
112                (lenstat * " ", self.stats["passed"]), Colors.OKGREEN)
113         printc("%sSkipped: %d" %
114                (lenstat * " ", self.stats["skipped"]), Colors.WARNING)
115         printc("%sFailed: %d" %
116                (lenstat * " ", self.stats["failures"]), Colors.FAIL)
117         printc("%sKnown error: %d" %
118                (lenstat * " ", self.stats["known_error"]), Colors.OKBLUE)
119         printc("%s%s" %
120                (lenstat * " ", (len("Failed: 0")) * "-"), Colors.OKBLUE)
121
122         total = self.stats["failures"] + self.stats["passed"]
123         color = Colors.WARNING
124         if total == self.stats["passed"]:
125             color = Colors.OKGREEN
126         elif total == self.stats["failures"]:
127             color = Colors.FAIL
128
129         printc("%sTotal: %d" % (lenstat * " ", total), color)
130
131         return self.stats["failures"]
132
133
134 class XunitReporter(Reporter):
135
136     """This reporter provides test results in the standard XUnit XML format."""
137     name = 'xunit'
138     encoding = 'UTF-8'
139
140     def __init__(self, options):
141         super(XunitReporter, self).__init__(options)
142
143         self._createTmpFile()
144
145     def final_report(self):
146         self.report()
147         return super(XunitReporter, self).final_report()
148
149     def _get_all_logs_data(self, test):
150         if not self.options.redirect_logs:
151             return ""
152
153         captured = ""
154         value = test.get_log_content()
155         if value:
156             captured += escape_cdata(value)
157         for extralog in test.extra_logfiles:
158             captured += "\n\n===== %s =====\n\n" % escape_cdata(
159                 os.path.basename(extralog))
160             value = test.get_extra_log_content(extralog)
161             captured += escape_cdata(value)
162
163         return captured
164
165     def _get_captured(self, test):
166         return '<system-out><![CDATA[%s]]></system-out>' % self._get_all_logs_data(test)
167
168     def _quoteattr(self, attr):
169         """Escape an XML attribute. Value can be unicode."""
170         attr = xml_safe(attr)
171         if isinstance(attr, str) and not UNICODE_STRINGS:
172             attr = attr.encode(self.encoding)
173         return saxutils.quoteattr(attr)
174
175     def report(self):
176         """Writes an Xunit-formatted XML file
177
178         The file includes a report of test errors and failures.
179
180         """
181         self.debug("Writing XML file to: %s", self.options.xunit_file)
182         xml_file = codecs.open(self.options.xunit_file, 'w',
183                                self.encoding, 'replace')
184
185         self.stats['encoding'] = self.encoding
186         self.stats['total'] = (self.stats['timeout'] + self.stats['failures']
187                                + self.stats['passed'] + self.stats['skipped'])
188
189         xml_file.write('<?xml version="1.0" encoding="%(encoding)s"?>'
190                        '<testsuite name="gst-validate-launcher" tests="%(total)d" '
191                        'errors="%(timeout)d" failures="%(failures)d" '
192                        'skipped="%(skipped)d">' % self.stats)
193
194         tmp_xml_file = codecs.open(self.tmp_xml_file.name, 'r',
195                                    self.encoding, 'replace')
196
197         for l in tmp_xml_file:
198             xml_file.write(l)
199
200         xml_file.write('</testsuite>')
201         xml_file.close()
202         tmp_xml_file.close()
203         os.remove(self.tmp_xml_file.name)
204
205         self._createTmpFile()
206
207     def _createTmpFile(self):
208         self.tmp_xml_file = tempfile.NamedTemporaryFile(delete=False)
209         self.tmp_xml_file.close()
210
211     def set_failed(self, test):
212         """Add failure output to Xunit report.
213         """
214         super().set_failed(test)
215
216         xml_file = codecs.open(self.tmp_xml_file.name, 'a',
217                                self.encoding, 'replace')
218         xml_file.write(self._forceUnicode(
219             '<testcase name=%(name)s time="%(taken).3f">'
220             '<failure type=%(errtype)s message=%(message)s>%(logs)s'
221             '</failure></testcase>' %
222             {'name': self._quoteattr(test.get_classname() + '.' + test.get_name()),
223              'taken': test.time_taken,
224              'logs': self._get_all_logs_data(test),
225              'errtype': self._quoteattr(test.result),
226              'message': self._quoteattr(test.message),
227              }))
228         xml_file.close()
229
230     def set_passed(self, test):
231         """Add success output to Xunit report.
232         """
233         self.stats['passed'] += 1
234
235         xml_file = codecs.open(self.tmp_xml_file.name, 'a',
236                                self.encoding, 'replace')
237         xml_file.write(self._forceUnicode(
238             '<testcase name=%(name)s '
239             'time="%(taken).3f">%(systemout)s</testcase>' %
240             {'name': self._quoteattr(test.get_classname() + '.' + test.get_name()),
241              'taken': test.time_taken,
242              'systemout': self._get_captured(test),
243              }))
244         xml_file.close()
245
246     def _forceUnicode(self, s):
247         if not UNICODE_STRINGS:
248             if isinstance(s, str):
249                 s = s.decode(self.encoding, 'replace')
250         return s