1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
20 def __init__(self) -> None:
21 self.status = TestStatus.SUCCESS
23 self.cases = [] # type: List[TestCase]
25 def __str__(self) -> str:
26 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
28 def __repr__(self) -> str:
31 class TestCase(object):
32 def __init__(self) -> None:
33 self.status = TestStatus.SUCCESS
35 self.log = [] # type: List[str]
37 def __str__(self) -> str:
38 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
40 def __repr__(self) -> str:
43 class TestStatus(Enum):
49 FAILURE_TO_PARSE_TESTS = auto()
52 """Provides a peek()/pop() interface over an iterator of (line#, text)."""
53 _lines: Iterator[Tuple[int, str]]
54 _next: Tuple[int, str]
57 def __init__(self, lines: Iterator[Tuple[int, str]]):
63 def _get_next(self) -> None:
65 self._next = next(self._lines)
69 def peek(self) -> str:
77 def __bool__(self) -> bool:
80 # Only used by kunit_tool_test.py.
81 def __iter__(self) -> Iterator[str]:
85 def line_number(self) -> int:
88 kunit_start_re = re.compile(r'TAP version [0-9]+$')
89 kunit_end_re = re.compile('(List of all partitions:|'
90 'Kernel panic - not syncing: VFS:|reboot: System halted)')
92 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
93 def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
96 for line in kernel_output:
98 line = line.rstrip() # line always has a trailing \n
99 if kunit_start_re.search(line):
100 prefix_len = len(line.split('TAP version')[0])
102 yield line_num, line[prefix_len:]
103 elif kunit_end_re.search(line):
106 yield line_num, line[prefix_len:]
107 return LineStream(lines=isolate_kunit_output(kernel_output))
109 def raw_output(kernel_output) -> None:
110 for line in kernel_output:
117 def red(text) -> str:
118 return '\033[1;31m' + text + RESET
120 def yellow(text) -> str:
121 return '\033[1;33m' + text + RESET
123 def green(text) -> str:
124 return '\033[1;32m' + text + RESET
126 def print_with_timestamp(message) -> None:
127 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
129 def format_suite_divider(message) -> str:
130 return '======== ' + message + ' ========'
132 def print_suite_divider(message) -> None:
133 print_with_timestamp(DIVIDER)
134 print_with_timestamp(format_suite_divider(message))
136 def print_log(log) -> None:
138 print_with_timestamp(m)
140 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
142 def consume_non_diagnostic(lines: LineStream) -> None:
143 while lines and not TAP_ENTRIES.match(lines.peek()):
146 def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
147 while lines and not TAP_ENTRIES.match(lines.peek()):
148 test_case.log.append(lines.peek())
151 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
153 OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
155 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
157 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
159 def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
160 save_non_diagnostic(lines, test_case)
162 test_case.status = TestStatus.TEST_CRASHED
165 match = OK_NOT_OK_SUBTEST.match(line)
166 while not match and lines:
168 match = OK_NOT_OK_SUBTEST.match(line)
170 test_case.log.append(lines.pop())
171 test_case.name = match.group(2)
172 skip_match = OK_NOT_OK_SKIP.match(line)
174 test_case.status = TestStatus.SKIPPED
176 if test_case.status == TestStatus.TEST_CRASHED:
178 if match.group(1) == 'ok':
179 test_case.status = TestStatus.SUCCESS
181 test_case.status = TestStatus.FAILURE
186 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
187 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
189 def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
190 save_non_diagnostic(lines, test_case)
194 match = SUBTEST_DIAGNOSTIC.match(line)
196 test_case.log.append(lines.pop())
197 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
199 test_case.status = TestStatus.TEST_CRASHED
204 def parse_test_case(lines: LineStream) -> Optional[TestCase]:
205 test_case = TestCase()
206 save_non_diagnostic(lines, test_case)
207 while parse_diagnostic(lines, test_case):
209 if parse_ok_not_ok_test_case(lines, test_case):
214 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
216 def parse_subtest_header(lines: LineStream) -> Optional[str]:
217 consume_non_diagnostic(lines)
220 match = SUBTEST_HEADER.match(lines.peek())
223 return match.group(1)
227 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
229 def parse_subtest_plan(lines: LineStream) -> Optional[int]:
230 consume_non_diagnostic(lines)
231 match = SUBTEST_PLAN.match(lines.peek())
234 return int(match.group(1))
238 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
241 elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
242 return TestStatus.TEST_CRASHED
243 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
244 return TestStatus.FAILURE
245 elif left == TestStatus.SKIPPED:
250 def parse_ok_not_ok_test_suite(lines: LineStream,
251 test_suite: TestSuite,
252 expected_suite_index: int) -> bool:
253 consume_non_diagnostic(lines)
255 test_suite.status = TestStatus.TEST_CRASHED
258 match = OK_NOT_OK_MODULE.match(line)
261 if match.group(1) == 'ok':
262 test_suite.status = TestStatus.SUCCESS
264 test_suite.status = TestStatus.FAILURE
265 skip_match = OK_NOT_OK_SKIP.match(line)
267 test_suite.status = TestStatus.SKIPPED
268 suite_index = int(match.group(2))
269 if suite_index != expected_suite_index:
270 print_with_timestamp(
271 red('[ERROR] ') + 'expected_suite_index ' +
272 str(expected_suite_index) + ', but got ' +
278 def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
279 return reduce(max_status, status_list, TestStatus.SKIPPED)
281 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
282 max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
283 return max_status(max_test_case_status, test_suite.status)
285 def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
288 consume_non_diagnostic(lines)
289 test_suite = TestSuite()
290 test_suite.status = TestStatus.SUCCESS
291 name = parse_subtest_header(lines)
294 test_suite.name = name
295 expected_test_case_num = parse_subtest_plan(lines)
296 if expected_test_case_num is None:
298 while expected_test_case_num > 0:
299 test_case = parse_test_case(lines)
302 test_suite.cases.append(test_case)
303 expected_test_case_num -= 1
304 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
305 test_suite.status = bubble_up_test_case_errors(test_suite)
308 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
311 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
314 TAP_HEADER = re.compile(r'^TAP version 14$')
316 def parse_tap_header(lines: LineStream) -> bool:
317 consume_non_diagnostic(lines)
318 if TAP_HEADER.match(lines.peek()):
324 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
326 def parse_test_plan(lines: LineStream) -> Optional[int]:
327 consume_non_diagnostic(lines)
328 match = TEST_PLAN.match(lines.peek())
331 return int(match.group(1))
335 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
336 return bubble_up_errors(x.status for x in test_suites)
338 def parse_test_result(lines: LineStream) -> TestResult:
339 consume_non_diagnostic(lines)
340 if not lines or not parse_tap_header(lines):
341 return TestResult(TestStatus.NO_TESTS, [], lines)
342 expected_test_suite_num = parse_test_plan(lines)
343 if not expected_test_suite_num:
344 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
346 for i in range(1, expected_test_suite_num + 1):
347 test_suite = parse_test_suite(lines, i)
349 test_suites.append(test_suite)
351 print_with_timestamp(
352 red('[ERROR] ') + ' expected ' +
353 str(expected_test_suite_num) +
354 ' test suites, but got ' + str(i - 2))
356 test_suite = parse_test_suite(lines, -1)
358 print_with_timestamp(red('[ERROR] ') +
359 'got unexpected test suite: ' + test_suite.name)
361 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
363 return TestResult(TestStatus.NO_TESTS, [], lines)
377 def total(self) -> int:
378 return self.passed + self.failed + self.crashed + self.skipped
380 def print_and_count_results(test_result: TestResult) -> TestCounts:
381 counts = TestCounts()
382 for test_suite in test_result.suites:
383 if test_suite.status == TestStatus.SUCCESS:
384 print_suite_divider(green('[PASSED] ') + test_suite.name)
385 elif test_suite.status == TestStatus.SKIPPED:
386 print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
387 elif test_suite.status == TestStatus.TEST_CRASHED:
388 print_suite_divider(red('[CRASHED] ' + test_suite.name))
390 print_suite_divider(red('[FAILED] ') + test_suite.name)
391 for test_case in test_suite.cases:
392 if test_case.status == TestStatus.SUCCESS:
394 print_with_timestamp(green('[PASSED] ') + test_case.name)
395 elif test_case.status == TestStatus.SKIPPED:
397 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
398 elif test_case.status == TestStatus.TEST_CRASHED:
400 print_with_timestamp(red('[CRASHED] ' + test_case.name))
401 print_log(map(yellow, test_case.log))
402 print_with_timestamp('')
405 print_with_timestamp(red('[FAILED] ') + test_case.name)
406 print_log(map(yellow, test_case.log))
407 print_with_timestamp('')
410 def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
411 counts = TestCounts()
412 lines = extract_tap_lines(kernel_output)
413 test_result = parse_test_result(lines)
414 if test_result.status == TestStatus.NO_TESTS:
415 print(red('[ERROR] ') + yellow('no tests run!'))
416 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
417 print(red('[ERROR] ') + yellow('could not parse test results!'))
419 counts = print_and_count_results(test_result)
420 print_with_timestamp(DIVIDER)
421 if test_result.status == TestStatus.SUCCESS:
423 elif test_result.status == TestStatus.SKIPPED:
427 print_with_timestamp(
428 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
429 (counts.total(), counts.failed, counts.crashed, counts.skipped)))