clk: x86: Rename clk-lpt to more specific clk-lpss-atom
[platform/kernel/linux-rpi.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self) -> None:
21                 self.status = TestStatus.SUCCESS
22                 self.name = ''
23                 self.cases = []  # type: List[TestCase]
24
25         def __str__(self) -> str:
26                 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self) -> str:
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self) -> None:
33                 self.status = TestStatus.SUCCESS
34                 self.name = ''
35                 self.log = []  # type: List[str]
36
37         def __str__(self) -> str:
38                 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self) -> str:
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         SKIPPED = auto()
47         TEST_CRASHED = auto()
48         NO_TESTS = auto()
49         FAILURE_TO_PARSE_TESTS = auto()
50
51 class LineStream:
52         """Provides a peek()/pop() interface over an iterator of (line#, text)."""
53         _lines: Iterator[Tuple[int, str]]
54         _next: Tuple[int, str]
55         _done: bool
56
57         def __init__(self, lines: Iterator[Tuple[int, str]]):
58                 self._lines = lines
59                 self._done = False
60                 self._next = (0, '')
61                 self._get_next()
62
63         def _get_next(self) -> None:
64                 try:
65                         self._next = next(self._lines)
66                 except StopIteration:
67                         self._done = True
68
69         def peek(self) -> str:
70                 return self._next[1]
71
72         def pop(self) -> str:
73                 n = self._next
74                 self._get_next()
75                 return n[1]
76
77         def __bool__(self) -> bool:
78                 return not self._done
79
80         # Only used by kunit_tool_test.py.
81         def __iter__(self) -> Iterator[str]:
82                 while bool(self):
83                         yield self.pop()
84
85         def line_number(self) -> int:
86                 return self._next[0]
87
88 kunit_start_re = re.compile(r'TAP version [0-9]+$')
89 kunit_end_re = re.compile('(List of all partitions:|'
90                           'Kernel panic - not syncing: VFS:|reboot: System halted)')
91
92 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
93         def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
94                 line_num = 0
95                 started = False
96                 for line in kernel_output:
97                         line_num += 1
98                         line = line.rstrip()  # line always has a trailing \n
99                         if kunit_start_re.search(line):
100                                 prefix_len = len(line.split('TAP version')[0])
101                                 started = True
102                                 yield line_num, line[prefix_len:]
103                         elif kunit_end_re.search(line):
104                                 break
105                         elif started:
106                                 yield line_num, line[prefix_len:]
107         return LineStream(lines=isolate_kunit_output(kernel_output))
108
109 def raw_output(kernel_output) -> None:
110         for line in kernel_output:
111                 print(line.rstrip())
112
113 DIVIDER = '=' * 60
114
115 RESET = '\033[0;0m'
116
117 def red(text) -> str:
118         return '\033[1;31m' + text + RESET
119
120 def yellow(text) -> str:
121         return '\033[1;33m' + text + RESET
122
123 def green(text) -> str:
124         return '\033[1;32m' + text + RESET
125
126 def print_with_timestamp(message) -> None:
127         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
128
129 def format_suite_divider(message) -> str:
130         return '======== ' + message + ' ========'
131
132 def print_suite_divider(message) -> None:
133         print_with_timestamp(DIVIDER)
134         print_with_timestamp(format_suite_divider(message))
135
136 def print_log(log) -> None:
137         for m in log:
138                 print_with_timestamp(m)
139
140 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
141
142 def consume_non_diagnostic(lines: LineStream) -> None:
143         while lines and not TAP_ENTRIES.match(lines.peek()):
144                 lines.pop()
145
146 def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
147         while lines and not TAP_ENTRIES.match(lines.peek()):
148                 test_case.log.append(lines.peek())
149                 lines.pop()
150
151 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
152
153 OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
154
155 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
156
157 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
158
159 def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
160         save_non_diagnostic(lines, test_case)
161         if not lines:
162                 test_case.status = TestStatus.TEST_CRASHED
163                 return True
164         line = lines.peek()
165         match = OK_NOT_OK_SUBTEST.match(line)
166         while not match and lines:
167                 line = lines.pop()
168                 match = OK_NOT_OK_SUBTEST.match(line)
169         if match:
170                 test_case.log.append(lines.pop())
171                 test_case.name = match.group(2)
172                 skip_match = OK_NOT_OK_SKIP.match(line)
173                 if skip_match:
174                         test_case.status = TestStatus.SKIPPED
175                         return True
176                 if test_case.status == TestStatus.TEST_CRASHED:
177                         return True
178                 if match.group(1) == 'ok':
179                         test_case.status = TestStatus.SUCCESS
180                 else:
181                         test_case.status = TestStatus.FAILURE
182                 return True
183         else:
184                 return False
185
186 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
187 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
188
189 def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
190         save_non_diagnostic(lines, test_case)
191         if not lines:
192                 return False
193         line = lines.peek()
194         match = SUBTEST_DIAGNOSTIC.match(line)
195         if match:
196                 test_case.log.append(lines.pop())
197                 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
198                 if crash_match:
199                         test_case.status = TestStatus.TEST_CRASHED
200                 return True
201         else:
202                 return False
203
204 def parse_test_case(lines: LineStream) -> Optional[TestCase]:
205         test_case = TestCase()
206         save_non_diagnostic(lines, test_case)
207         while parse_diagnostic(lines, test_case):
208                 pass
209         if parse_ok_not_ok_test_case(lines, test_case):
210                 return test_case
211         else:
212                 return None
213
214 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
215
216 def parse_subtest_header(lines: LineStream) -> Optional[str]:
217         consume_non_diagnostic(lines)
218         if not lines:
219                 return None
220         match = SUBTEST_HEADER.match(lines.peek())
221         if match:
222                 lines.pop()
223                 return match.group(1)
224         else:
225                 return None
226
227 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
228
229 def parse_subtest_plan(lines: LineStream) -> Optional[int]:
230         consume_non_diagnostic(lines)
231         match = SUBTEST_PLAN.match(lines.peek())
232         if match:
233                 lines.pop()
234                 return int(match.group(1))
235         else:
236                 return None
237
238 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
239         if left == right:
240                 return left
241         elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
242                 return TestStatus.TEST_CRASHED
243         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
244                 return TestStatus.FAILURE
245         elif left == TestStatus.SKIPPED:
246                 return right
247         else:
248                 return left
249
250 def parse_ok_not_ok_test_suite(lines: LineStream,
251                                test_suite: TestSuite,
252                                expected_suite_index: int) -> bool:
253         consume_non_diagnostic(lines)
254         if not lines:
255                 test_suite.status = TestStatus.TEST_CRASHED
256                 return False
257         line = lines.peek()
258         match = OK_NOT_OK_MODULE.match(line)
259         if match:
260                 lines.pop()
261                 if match.group(1) == 'ok':
262                         test_suite.status = TestStatus.SUCCESS
263                 else:
264                         test_suite.status = TestStatus.FAILURE
265                 skip_match = OK_NOT_OK_SKIP.match(line)
266                 if skip_match:
267                         test_suite.status = TestStatus.SKIPPED
268                 suite_index = int(match.group(2))
269                 if suite_index != expected_suite_index:
270                         print_with_timestamp(
271                                 red('[ERROR] ') + 'expected_suite_index ' +
272                                 str(expected_suite_index) + ', but got ' +
273                                 str(suite_index))
274                 return True
275         else:
276                 return False
277
278 def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
279         return reduce(max_status, status_list, TestStatus.SKIPPED)
280
281 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
282         max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
283         return max_status(max_test_case_status, test_suite.status)
284
285 def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
286         if not lines:
287                 return None
288         consume_non_diagnostic(lines)
289         test_suite = TestSuite()
290         test_suite.status = TestStatus.SUCCESS
291         name = parse_subtest_header(lines)
292         if not name:
293                 return None
294         test_suite.name = name
295         expected_test_case_num = parse_subtest_plan(lines)
296         if expected_test_case_num is None:
297                 return None
298         while expected_test_case_num > 0:
299                 test_case = parse_test_case(lines)
300                 if not test_case:
301                         break
302                 test_suite.cases.append(test_case)
303                 expected_test_case_num -= 1
304         if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
305                 test_suite.status = bubble_up_test_case_errors(test_suite)
306                 return test_suite
307         elif not lines:
308                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
309                 return test_suite
310         else:
311                 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
312                 return None
313
314 TAP_HEADER = re.compile(r'^TAP version 14$')
315
316 def parse_tap_header(lines: LineStream) -> bool:
317         consume_non_diagnostic(lines)
318         if TAP_HEADER.match(lines.peek()):
319                 lines.pop()
320                 return True
321         else:
322                 return False
323
324 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
325
326 def parse_test_plan(lines: LineStream) -> Optional[int]:
327         consume_non_diagnostic(lines)
328         match = TEST_PLAN.match(lines.peek())
329         if match:
330                 lines.pop()
331                 return int(match.group(1))
332         else:
333                 return None
334
335 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
336         return bubble_up_errors(x.status for x in test_suites)
337
338 def parse_test_result(lines: LineStream) -> TestResult:
339         consume_non_diagnostic(lines)
340         if not lines or not parse_tap_header(lines):
341                 return TestResult(TestStatus.NO_TESTS, [], lines)
342         expected_test_suite_num = parse_test_plan(lines)
343         if not expected_test_suite_num:
344                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
345         test_suites = []
346         for i in range(1, expected_test_suite_num + 1):
347                 test_suite = parse_test_suite(lines, i)
348                 if test_suite:
349                         test_suites.append(test_suite)
350                 else:
351                         print_with_timestamp(
352                                 red('[ERROR] ') + ' expected ' +
353                                 str(expected_test_suite_num) +
354                                 ' test suites, but got ' + str(i - 2))
355                         break
356         test_suite = parse_test_suite(lines, -1)
357         if test_suite:
358                 print_with_timestamp(red('[ERROR] ') +
359                         'got unexpected test suite: ' + test_suite.name)
360         if test_suites:
361                 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
362         else:
363                 return TestResult(TestStatus.NO_TESTS, [], lines)
364
365 class TestCounts:
366         passed: int
367         failed: int
368         crashed: int
369         skipped: int
370
371         def __init__(self):
372                 self.passed = 0
373                 self.failed = 0
374                 self.crashed = 0
375                 self.skipped = 0
376
377         def total(self) -> int:
378                 return self.passed + self.failed + self.crashed + self.skipped
379
380 def print_and_count_results(test_result: TestResult) -> TestCounts:
381         counts = TestCounts()
382         for test_suite in test_result.suites:
383                 if test_suite.status == TestStatus.SUCCESS:
384                         print_suite_divider(green('[PASSED] ') + test_suite.name)
385                 elif test_suite.status == TestStatus.SKIPPED:
386                         print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
387                 elif test_suite.status == TestStatus.TEST_CRASHED:
388                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
389                 else:
390                         print_suite_divider(red('[FAILED] ') + test_suite.name)
391                 for test_case in test_suite.cases:
392                         if test_case.status == TestStatus.SUCCESS:
393                                 counts.passed += 1
394                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
395                         elif test_case.status == TestStatus.SKIPPED:
396                                 counts.skipped += 1
397                                 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
398                         elif test_case.status == TestStatus.TEST_CRASHED:
399                                 counts.crashed += 1
400                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
401                                 print_log(map(yellow, test_case.log))
402                                 print_with_timestamp('')
403                         else:
404                                 counts.failed += 1
405                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
406                                 print_log(map(yellow, test_case.log))
407                                 print_with_timestamp('')
408         return counts
409
410 def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
411         counts = TestCounts()
412         lines = extract_tap_lines(kernel_output)
413         test_result = parse_test_result(lines)
414         if test_result.status == TestStatus.NO_TESTS:
415                 print(red('[ERROR] ') + yellow('no tests run!'))
416         elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
417                 print(red('[ERROR] ') + yellow('could not parse test results!'))
418         else:
419                 counts = print_and_count_results(test_result)
420         print_with_timestamp(DIVIDER)
421         if test_result.status == TestStatus.SUCCESS:
422                 fmt = green
423         elif test_result.status == TestStatus.SKIPPED:
424                 fmt = yellow
425         else:
426                 fmt =red
427         print_with_timestamp(
428                 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
429                     (counts.total(), counts.failed, counts.crashed, counts.skipped)))
430         return test_result