gkdbus: Fix underflow and unreachable code bug
[platform/upstream/glib.git] / gobject / tests / taptestrunner.py
1 #!/usr/bin/env python
2 # coding=utf-8
3
4 # Copyright (c) 2015 Remko Tronçon (https://el-tramo.be)
5 # Copied from https://github.com/remko/pycotap/
6 #
7 # SPDX-License-Identifier: MIT
8 #
9 # Released under the MIT license
10 #
11 # Permission is hereby granted, free of charge, to any person obtaining a copy
12 # of this software and associated documentation files (the "Software"), to deal
13 # in the Software without restriction, including without limitation the rights
14 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 # copies of the Software, and to permit persons to whom the Software is
16 # furnished to do so, subject to the following conditions:
17 #
18 # The above copyright notice and this permission notice shall be included in
19 # all copies or substantial portions of the Software.
20 #
21 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 # SOFTWARE.
28
29
30 import unittest
31 import sys
32 import base64
33 from io import StringIO
34
35
36 # Log modes
37 class LogMode(object):
38     LogToError, LogToDiagnostics, LogToYAML, LogToAttachment = range(4)
39
40
41 class TAPTestResult(unittest.TestResult):
42     def __init__(self, output_stream, error_stream, message_log, test_output_log):
43         super(TAPTestResult, self).__init__(self, output_stream)
44         self.output_stream = output_stream
45         self.error_stream = error_stream
46         self.orig_stdout = None
47         self.orig_stderr = None
48         self.message = None
49         self.test_output = None
50         self.message_log = message_log
51         self.test_output_log = test_output_log
52         self.output_stream.write("TAP version 13\n")
53         self._set_streams()
54
55     def printErrors(self):
56         self.print_raw("1..%d\n" % self.testsRun)
57         self._reset_streams()
58
59     def _set_streams(self):
60         self.orig_stdout = sys.stdout
61         self.orig_stderr = sys.stderr
62         if self.message_log == LogMode.LogToError:
63             self.message = self.error_stream
64         else:
65             self.message = StringIO()
66         if self.test_output_log == LogMode.LogToError:
67             self.test_output = self.error_stream
68         else:
69             self.test_output = StringIO()
70
71         if self.message_log == self.test_output_log:
72             self.test_output = self.message
73         sys.stdout = sys.stderr = self.test_output
74
75     def _reset_streams(self):
76         sys.stdout = self.orig_stdout
77         sys.stderr = self.orig_stderr
78
79     def print_raw(self, text):
80         self.output_stream.write(text)
81         self.output_stream.flush()
82
83     def print_result(self, result, test, directive=None):
84         self.output_stream.write("%s %d %s" % (result, self.testsRun, test.id()))
85         if directive:
86             self.output_stream.write(" # " + directive)
87         self.output_stream.write("\n")
88         self.output_stream.flush()
89
90     def ok(self, test, directive=None):
91         self.print_result("ok", test, directive)
92
93     def not_ok(self, test):
94         self.print_result("not ok", test)
95
96     def startTest(self, test):
97         super(TAPTestResult, self).startTest(test)
98
99     def stopTest(self, test):
100         super(TAPTestResult, self).stopTest(test)
101         if self.message_log == self.test_output_log:
102             logs = [(self.message_log, self.message, "output")]
103         else:
104             logs = [
105                 (self.test_output_log, self.test_output, "test_output"),
106                 (self.message_log, self.message, "message"),
107             ]
108         for log_mode, log, log_name in logs:
109             if log_mode != LogMode.LogToError:
110                 output = log.getvalue()
111                 if len(output):
112                     if log_mode == LogMode.LogToYAML:
113                         self.print_raw("  ---\n")
114                         self.print_raw("    " + log_name + ": |\n")
115                         self.print_raw(
116                             "      " + output.rstrip().replace("\n", "\n      ") + "\n"
117                         )
118                         self.print_raw("  ...\n")
119                     elif log_mode == LogMode.LogToAttachment:
120                         self.print_raw("  ---\n")
121                         self.print_raw("    " + log_name + ":\n")
122                         self.print_raw("      File-Name: " + log_name + ".txt\n")
123                         self.print_raw("      File-Type: text/plain\n")
124                         self.print_raw(
125                             "      File-Content: " + base64.b64encode(output) + "\n"
126                         )
127                         self.print_raw("  ...\n")
128                     else:
129                         self.print_raw(
130                             "# " + output.rstrip().replace("\n", "\n# ") + "\n"
131                         )
132                 # Truncate doesn't change the current stream position.
133                 # Seek to the beginning to avoid extensions on subsequent writes.
134                 log.seek(0)
135                 log.truncate(0)
136
137     def addSuccess(self, test):
138         super(TAPTestResult, self).addSuccess(test)
139         self.ok(test)
140
141     def addError(self, test, err):
142         super(TAPTestResult, self).addError(test, err)
143         self.message.write(self.errors[-1][1] + "\n")
144         self.not_ok(test)
145
146     def addFailure(self, test, err):
147         super(TAPTestResult, self).addFailure(test, err)
148         self.message.write(self.failures[-1][1] + "\n")
149         self.not_ok(test)
150
151     def addSkip(self, test, reason):
152         super(TAPTestResult, self).addSkip(test, reason)
153         self.ok(test, "SKIP " + reason)
154
155     def addExpectedFailure(self, test, err):
156         super(TAPTestResult, self).addExpectedFailure(test, err)
157         self.ok(test)
158
159     def addUnexpectedSuccess(self, test):
160         super(TAPTestResult, self).addUnexpectedSuccess(test)
161         self.message.write("Unexpected success" + "\n")
162         self.not_ok(test)
163
164
165 class TAPTestRunner(object):
166     def __init__(
167         self,
168         message_log=LogMode.LogToYAML,
169         test_output_log=LogMode.LogToDiagnostics,
170         output_stream=sys.stdout,
171         error_stream=sys.stderr,
172     ):
173         self.output_stream = output_stream
174         self.error_stream = error_stream
175         self.message_log = message_log
176         self.test_output_log = test_output_log
177
178     def run(self, test):
179         result = TAPTestResult(
180             self.output_stream,
181             self.error_stream,
182             self.message_log,
183             self.test_output_log,
184         )
185         test(result)
186         result.printErrors()
187
188         return result