1 # SPDX-License-Identifier: GPL-2.0
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
12 from __future__ import annotations
16 from enum import Enum, auto
17 from typing import Iterable, Iterator, List, Optional, Tuple
19 from kunit_printer import stdout
23 A class to represent a test parsed from KTAP results. All KTAP
24 results within a test log are stored in a main Test object as
28 status : TestStatus - status of the test
29 name : str - name of the test
30 expected_count : int - expected number of subtests (0 if single
31 test case and None if unknown expected number of subtests)
32 subtests : List[Test] - list of subtests
33 log : List[str] - log of KTAP lines that correspond to the test
34 counts : TestCounts - counts of the test statuses and errors of
35 subtests or of the test itself if the test is a single
38 def __init__(self) -> None:
39 """Creates Test object with default attributes."""
40 self.status = TestStatus.TEST_CRASHED
42 self.expected_count = 0 # type: Optional[int]
43 self.subtests = [] # type: List[Test]
44 self.log = [] # type: List[str]
45 self.counts = TestCounts()
47 def __str__(self) -> str:
48 """Returns string representation of a Test class object."""
49 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50 f'{self.subtests}, {self.log}, {self.counts})')
52 def __repr__(self) -> str:
53 """Returns string representation of a Test class object."""
56 def add_error(self, error_message: str) -> None:
57 """Records an error that occurred while parsing this test."""
58 self.counts.errors += 1
59 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61 class TestStatus(Enum):
62 """An enumeration class to represent the status of a test."""
68 FAILURE_TO_PARSE_TESTS = auto()
72 Tracks the counts of statuses of all test cases and any errors within
76 passed : int - the number of tests that have passed
77 failed : int - the number of tests that have failed
78 crashed : int - the number of tests that have crashed
79 skipped : int - the number of tests that have skipped
80 errors : int - the number of errors in the test and subtests
83 """Creates TestCounts object with counts of all test
84 statuses and test errors set to 0.
92 def __str__(self) -> str:
93 """Returns the string representation of a TestCounts object."""
94 statuses = [('passed', self.passed), ('failed', self.failed),
95 ('crashed', self.crashed), ('skipped', self.skipped),
96 ('errors', self.errors)]
97 return f'Ran {self.total()} tests: ' + \
98 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
100 def total(self) -> int:
101 """Returns the total number of test cases within a test
102 object, where a test case is a test with no subtests.
104 return (self.passed + self.failed + self.crashed +
107 def add_subtest_counts(self, counts: TestCounts) -> None:
109 Adds the counts of another TestCounts object to the current
110 TestCounts object. Used to add the counts of a subtest to the
114 counts - a different TestCounts object whose counts
115 will be added to the counts of the TestCounts object
117 self.passed += counts.passed
118 self.failed += counts.failed
119 self.crashed += counts.crashed
120 self.skipped += counts.skipped
121 self.errors += counts.errors
123 def get_status(self) -> TestStatus:
124 """Returns the aggregated status of a Test using test
127 if self.total() == 0:
128 return TestStatus.NO_TESTS
130 # Crashes should take priority.
131 return TestStatus.TEST_CRASHED
133 return TestStatus.FAILURE
135 # No failures or crashes, looks good!
136 return TestStatus.SUCCESS
137 # We have only skipped tests.
138 return TestStatus.SKIPPED
140 def add_status(self, status: TestStatus) -> None:
141 """Increments the count for `status`."""
142 if status == TestStatus.SUCCESS:
144 elif status == TestStatus.FAILURE:
146 elif status == TestStatus.SKIPPED:
148 elif status != TestStatus.NO_TESTS:
153 A class to represent the lines of kernel output.
154 Provides a lazy peek()/pop() interface over an iterator of
157 _lines: Iterator[Tuple[int, str]]
158 _next: Tuple[int, str]
162 def __init__(self, lines: Iterator[Tuple[int, str]]):
163 """Creates a new LineStream that wraps the given iterator."""
166 self._need_next = True
169 def _get_next(self) -> None:
170 """Advances the LineSteam to the next line, if necessary."""
171 if not self._need_next:
174 self._next = next(self._lines)
175 except StopIteration:
178 self._need_next = False
180 def peek(self) -> str:
181 """Returns the current line, without advancing the LineStream.
186 def pop(self) -> str:
187 """Returns the current line and advances the LineStream to
192 raise ValueError(f'LineStream: going past EOF, last line was {s}')
193 self._need_next = True
196 def __bool__(self) -> bool:
197 """Returns True if stream has more lines."""
199 return not self._done
201 # Only used by kunit_tool_test.py.
202 def __iter__(self) -> Iterator[str]:
203 """Empties all lines stored in LineStream object into
204 Iterator object and returns the Iterator object.
209 def line_number(self) -> int:
210 """Returns the line number of the current line."""
214 # Parsing helper methods:
216 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
217 TAP_START = re.compile(r'TAP version ([0-9]+)$')
218 KTAP_END = re.compile('(List of all partitions:|'
219 'Kernel panic - not syncing: VFS:|reboot: System halted)')
221 def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream:
222 """Extracts KTAP lines from the kernel output."""
223 def isolate_ktap_output(kernel_output: Iterable[str]) \
224 -> Iterator[Tuple[int, str]]:
227 for line in kernel_output:
229 line = line.rstrip() # remove trailing \n
230 if not started and KTAP_START.search(line):
231 # start extracting KTAP lines and set prefix
232 # to number of characters before version line
234 line.split('KTAP version')[0])
236 yield line_num, line[prefix_len:]
237 elif not started and TAP_START.search(line):
238 # start extracting KTAP lines and set prefix
239 # to number of characters before version line
240 prefix_len = len(line.split('TAP version')[0])
242 yield line_num, line[prefix_len:]
243 elif started and KTAP_END.search(line):
244 # stop extracting KTAP lines
247 # remove the prefix and optionally any leading
248 # whitespace. Our parsing logic relies on this.
249 line = line[prefix_len:]
253 return LineStream(lines=isolate_ktap_output(kernel_output))
256 TAP_VERSIONS = [13, 14]
258 def check_version(version_num: int, accepted_versions: List[int],
259 version_type: str, test: Test) -> None:
261 Adds error to test object if version number is too high or too
265 version_num - The inputted version number from the parsed KTAP or TAP
267 accepted_version - List of accepted KTAP or TAP versions
268 version_type - 'KTAP' or 'TAP' depending on the type of
270 test - Test object for current test being parsed
272 if version_num < min(accepted_versions):
273 test.add_error(f'{version_type} version lower than expected!')
274 elif version_num > max(accepted_versions):
275 test.add_error(f'{version_type} version higer than expected!')
277 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
279 Parses KTAP/TAP header line and checks version number.
280 Returns False if fails to parse KTAP/TAP header line.
283 - 'KTAP version [version number]'
284 - 'TAP version [version number]'
287 lines - LineStream of KTAP output to parse
288 test - Test object for current test being parsed
291 True if successfully parsed KTAP/TAP header line
293 ktap_match = KTAP_START.match(lines.peek())
294 tap_match = TAP_START.match(lines.peek())
296 version_num = int(ktap_match.group(1))
297 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
299 version_num = int(tap_match.group(1))
300 check_version(version_num, TAP_VERSIONS, 'TAP', test)
303 test.log.append(lines.pop())
306 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
308 def parse_test_header(lines: LineStream, test: Test) -> bool:
310 Parses test header and stores test name in test object.
311 Returns False if fails to parse test header line.
314 - '# Subtest: [test name]'
317 lines - LineStream of KTAP output to parse
318 test - Test object for current test being parsed
321 True if successfully parsed test header line
323 match = TEST_HEADER.match(lines.peek())
326 test.log.append(lines.pop())
327 test.name = match.group(1)
330 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
332 def parse_test_plan(lines: LineStream, test: Test) -> bool:
334 Parses test plan line and stores the expected number of subtests in
335 test object. Reports an error if expected count is 0.
336 Returns False and sets expected_count to None if there is no valid test
340 - '1..[number of subtests]'
343 lines - LineStream of KTAP output to parse
344 test - Test object for current test being parsed
347 True if successfully parsed test plan line
349 match = TEST_PLAN.match(lines.peek())
351 test.expected_count = None
353 test.log.append(lines.pop())
354 expected_count = int(match.group(1))
355 test.expected_count = expected_count
358 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
360 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
362 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
364 Matches current line with the format of a test result line and checks
365 if the name matches the name of the current test.
366 Returns False if fails to match format or name.
369 - '[ok|not ok] [test number] [-] [test name] [optional skip
373 lines - LineStream of KTAP output to parse
374 test - Test object for current test being parsed
377 True if matched a test result line and the name matching the
381 match = TEST_RESULT.match(line)
384 name = match.group(4)
385 return name == test.name
387 def parse_test_result(lines: LineStream, test: Test,
388 expected_num: int) -> bool:
390 Parses test result line and stores the status and name in the test
391 object. Reports an error if the test number does not match expected
393 Returns False if fails to parse test result line.
395 Note that the SKIP directive is the only direction that causes a
399 - '[ok|not ok] [test number] [-] [test name] [optional skip
403 lines - LineStream of KTAP output to parse
404 test - Test object for current test being parsed
405 expected_num - expected test number for current test
408 True if successfully parsed a test result line.
411 match = TEST_RESULT.match(line)
412 skip_match = TEST_RESULT_SKIP.match(line)
414 # Check if line matches test result line format
417 test.log.append(lines.pop())
419 # Set name of test object
421 test.name = skip_match.group(4)
423 test.name = match.group(4)
426 num = int(match.group(2))
427 if num != expected_num:
428 test.add_error(f'Expected test number {expected_num} but found {num}')
430 # Set status of test object
431 status = match.group(1)
433 test.status = TestStatus.SKIPPED
435 test.status = TestStatus.SUCCESS
437 test.status = TestStatus.FAILURE
440 def parse_diagnostic(lines: LineStream) -> List[str]:
442 Parse lines that do not match the format of a test result line or
443 test header line and returns them in list.
445 Line formats that are not parsed:
446 - '# Subtest: [test name]'
447 - '[ok|not ok] [test number] [-] [test name] [optional skip
451 lines - LineStream of KTAP output to parse
454 Log of diagnostic lines
456 log = [] # type: List[str]
457 while lines and not TEST_RESULT.match(lines.peek()) and not \
458 TEST_HEADER.match(lines.peek()):
459 log.append(lines.pop())
463 # Printing helper methods:
467 def format_test_divider(message: str, len_message: int) -> str:
469 Returns string with message centered in fixed width divider.
472 '===================== message example ====================='
475 message - message to be centered in divider line
476 len_message - length of the message to be printed such that
477 any characters of the color codes are not counted
480 String containing message centered in fixed width divider
482 default_count = 3 # default number of dashes
483 len_1 = default_count
484 len_2 = default_count
485 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
487 # calculate number of dashes for each side of the divider
488 len_1 = int(difference / 2)
489 len_2 = difference - len_1
490 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
492 def print_test_header(test: Test) -> None:
494 Prints test header with test name and optionally the expected number
498 '=================== example (2 subtests) ==================='
501 test - Test object representing current test being printed
504 if test.expected_count:
505 if test.expected_count == 1:
506 message += ' (1 subtest)'
508 message += f' ({test.expected_count} subtests)'
509 stdout.print_with_timestamp(format_test_divider(message, len(message)))
511 def print_log(log: Iterable[str]) -> None:
512 """Prints all strings in saved log for test in yellow."""
514 stdout.print_with_timestamp(stdout.yellow(m))
516 def format_test_result(test: Test) -> str:
518 Returns string with formatted test result with colored status and test
525 test - Test object representing current test being printed
528 String containing formatted test result
530 if test.status == TestStatus.SUCCESS:
531 return stdout.green('[PASSED] ') + test.name
532 if test.status == TestStatus.SKIPPED:
533 return stdout.yellow('[SKIPPED] ') + test.name
534 if test.status == TestStatus.NO_TESTS:
535 return stdout.yellow('[NO TESTS RUN] ') + test.name
536 if test.status == TestStatus.TEST_CRASHED:
538 return stdout.red('[CRASHED] ') + test.name
540 return stdout.red('[FAILED] ') + test.name
542 def print_test_result(test: Test) -> None:
544 Prints result line with status of test.
550 test - Test object representing current test being printed
552 stdout.print_with_timestamp(format_test_result(test))
554 def print_test_footer(test: Test) -> None:
556 Prints test footer with status of test.
559 '===================== [PASSED] example ====================='
562 test - Test object representing current test being printed
564 message = format_test_result(test)
565 stdout.print_with_timestamp(format_test_divider(message,
566 len(message) - stdout.color_len()))
568 def print_summary_line(test: Test) -> None:
570 Prints summary line of test object. Color of line is dependent on
571 status of test. Color is green if test passes, yellow if test is
572 skipped, and red if the test fails or crashes. Summary line contains
573 counts of the statuses of the tests subtests or the test itself if it
577 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
580 test - Test object representing current test being printed
582 if test.status == TestStatus.SUCCESS:
584 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
585 color = stdout.yellow
588 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
592 def bubble_up_test_results(test: Test) -> None:
594 If the test has subtests, add the test counts of the subtests to the
595 test and check if any of the tests crashed and if so set the test
596 status to crashed. Otherwise if the test has no subtests add the
597 status of the test to the test counts.
600 test - Test object for current test being parsed
602 subtests = test.subtests
606 counts.add_subtest_counts(t.counts)
607 if counts.total() == 0:
608 counts.add_status(status)
609 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
610 test.status = TestStatus.TEST_CRASHED
612 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
614 Finds next test to parse in LineStream, creates new Test object,
615 parses any subtests of the test, populates Test object with all
616 information (status, name) about the test and the Test objects for
617 any subtests, and then returns the Test object. The method accepts
618 three formats of tests:
620 Accepted test formats:
622 - Main KTAP/TAP header
630 - Subtest header line
646 lines - LineStream of KTAP output to parse
647 expected_num - expected test number for test to be parsed
648 log - list of strings containing any preceding diagnostic lines
649 corresponding to the current test
652 Test object populated with characteristics and any subtests
657 main = parse_ktap_header(lines, test)
659 # If KTAP/TAP header is found, attempt to parse
662 parse_test_plan(lines, test)
665 # If KTAP/TAP header is not found, test must be subtest
666 # header or test result line so parse attempt to parser
668 parent_test = parse_test_header(lines, test)
670 # If subtest header is found, attempt to parse
671 # test plan and print header
672 parse_test_plan(lines, test)
673 print_test_header(test)
674 expected_count = test.expected_count
677 while parent_test and (expected_count is None or test_num <= expected_count):
678 # Loop to parse any subtests.
679 # Break after parsing expected number of tests or
680 # if expected number of tests is unknown break when test
681 # result line with matching name to subtest header is found
682 # or no more lines in stream.
683 sub_log = parse_diagnostic(lines)
685 if not lines or (peek_test_name_match(lines, test) and
687 if expected_count and test_num <= expected_count:
688 # If parser reaches end of test before
689 # parsing expected number of subtests, print
690 # crashed subtest and record error
691 test.add_error('missing expected subtest!')
692 sub_test.log.extend(sub_log)
693 test.counts.add_status(
694 TestStatus.TEST_CRASHED)
695 print_test_result(sub_test)
697 test.log.extend(sub_log)
700 sub_test = parse_test(lines, test_num, sub_log)
701 subtests.append(sub_test)
703 test.subtests = subtests
705 # If not main test, look for test result line
706 test.log.extend(parse_diagnostic(lines))
707 if (parent_test and peek_test_name_match(lines, test)) or \
709 parse_test_result(lines, test, expected_num)
711 test.add_error('missing subtest result line!')
713 # Check for there being no tests
714 if parent_test and len(subtests) == 0:
715 # Don't override a bad status if this test had one reported.
716 # Assumption: no subtests means CRASHED is from Test.__init__()
717 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
718 test.status = TestStatus.NO_TESTS
719 test.add_error('0 tests run!')
721 # Add statuses to TestCounts attribute in Test object
722 bubble_up_test_results(test)
723 if parent_test and not main:
724 # If test has subtests and is not the main test object, print
726 print_test_footer(test)
728 print_test_result(test)
731 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
733 Using kernel output, extract KTAP lines, parse the lines for test
734 results and print condensed test results and summary line.
737 kernel_output - Iterable object contains lines of kernel output
740 Test - the main test object with all subtests.
742 stdout.print_with_timestamp(DIVIDER)
743 lines = extract_tap_lines(kernel_output)
746 test.name = '<missing>'
747 test.add_error('could not find any KTAP output!')
748 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
750 test = parse_test(lines, 0, [])
751 if test.status != TestStatus.NO_TESTS:
752 test.status = test.counts.get_status()
753 stdout.print_with_timestamp(DIVIDER)
754 print_summary_line(test)