Merge tag 'block-6.1-2022-11-05' of git://git.kernel.dk/linux
[platform/kernel/linux-starfive.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
5 # Test object.
6 #
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
11
12 from __future__ import annotations
13 import re
14 import sys
15
16 from enum import Enum, auto
17 from typing import Iterable, Iterator, List, Optional, Tuple
18
19 from kunit_printer import stdout
20
21 class Test:
22         """
23         A class to represent a test parsed from KTAP results. All KTAP
24         results within a test log are stored in a main Test object as
25         subtests.
26
27         Attributes:
28         status : TestStatus - status of the test
29         name : str - name of the test
30         expected_count : int - expected number of subtests (0 if single
31                 test case and None if unknown expected number of subtests)
32         subtests : List[Test] - list of subtests
33         log : List[str] - log of KTAP lines that correspond to the test
34         counts : TestCounts - counts of the test statuses and errors of
35                 subtests or of the test itself if the test is a single
36                 test case.
37         """
38         def __init__(self) -> None:
39                 """Creates Test object with default attributes."""
40                 self.status = TestStatus.TEST_CRASHED
41                 self.name = ''
42                 self.expected_count = 0  # type: Optional[int]
43                 self.subtests = []  # type: List[Test]
44                 self.log = []  # type: List[str]
45                 self.counts = TestCounts()
46
47         def __str__(self) -> str:
48                 """Returns string representation of a Test class object."""
49                 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50                         f'{self.subtests}, {self.log}, {self.counts})')
51
52         def __repr__(self) -> str:
53                 """Returns string representation of a Test class object."""
54                 return str(self)
55
56         def add_error(self, error_message: str) -> None:
57                 """Records an error that occurred while parsing this test."""
58                 self.counts.errors += 1
59                 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
60
61 class TestStatus(Enum):
62         """An enumeration class to represent the status of a test."""
63         SUCCESS = auto()
64         FAILURE = auto()
65         SKIPPED = auto()
66         TEST_CRASHED = auto()
67         NO_TESTS = auto()
68         FAILURE_TO_PARSE_TESTS = auto()
69
70 class TestCounts:
71         """
72         Tracks the counts of statuses of all test cases and any errors within
73         a Test.
74
75         Attributes:
76         passed : int - the number of tests that have passed
77         failed : int - the number of tests that have failed
78         crashed : int - the number of tests that have crashed
79         skipped : int - the number of tests that have skipped
80         errors : int - the number of errors in the test and subtests
81         """
82         def __init__(self):
83                 """Creates TestCounts object with counts of all test
84                 statuses and test errors set to 0.
85                 """
86                 self.passed = 0
87                 self.failed = 0
88                 self.crashed = 0
89                 self.skipped = 0
90                 self.errors = 0
91
92         def __str__(self) -> str:
93                 """Returns the string representation of a TestCounts object."""
94                 statuses = [('passed', self.passed), ('failed', self.failed),
95                         ('crashed', self.crashed), ('skipped', self.skipped),
96                         ('errors', self.errors)]
97                 return f'Ran {self.total()} tests: ' + \
98                         ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99
100         def total(self) -> int:
101                 """Returns the total number of test cases within a test
102                 object, where a test case is a test with no subtests.
103                 """
104                 return (self.passed + self.failed + self.crashed +
105                         self.skipped)
106
107         def add_subtest_counts(self, counts: TestCounts) -> None:
108                 """
109                 Adds the counts of another TestCounts object to the current
110                 TestCounts object. Used to add the counts of a subtest to the
111                 parent test.
112
113                 Parameters:
114                 counts - a different TestCounts object whose counts
115                         will be added to the counts of the TestCounts object
116                 """
117                 self.passed += counts.passed
118                 self.failed += counts.failed
119                 self.crashed += counts.crashed
120                 self.skipped += counts.skipped
121                 self.errors += counts.errors
122
123         def get_status(self) -> TestStatus:
124                 """Returns the aggregated status of a Test using test
125                 counts.
126                 """
127                 if self.total() == 0:
128                         return TestStatus.NO_TESTS
129                 if self.crashed:
130                         # Crashes should take priority.
131                         return TestStatus.TEST_CRASHED
132                 if self.failed:
133                         return TestStatus.FAILURE
134                 if self.passed:
135                         # No failures or crashes, looks good!
136                         return TestStatus.SUCCESS
137                 # We have only skipped tests.
138                 return TestStatus.SKIPPED
139
140         def add_status(self, status: TestStatus) -> None:
141                 """Increments the count for `status`."""
142                 if status == TestStatus.SUCCESS:
143                         self.passed += 1
144                 elif status == TestStatus.FAILURE:
145                         self.failed += 1
146                 elif status == TestStatus.SKIPPED:
147                         self.skipped += 1
148                 elif status != TestStatus.NO_TESTS:
149                         self.crashed += 1
150
151 class LineStream:
152         """
153         A class to represent the lines of kernel output.
154         Provides a lazy peek()/pop() interface over an iterator of
155         (line#, text).
156         """
157         _lines: Iterator[Tuple[int, str]]
158         _next: Tuple[int, str]
159         _need_next: bool
160         _done: bool
161
162         def __init__(self, lines: Iterator[Tuple[int, str]]):
163                 """Creates a new LineStream that wraps the given iterator."""
164                 self._lines = lines
165                 self._done = False
166                 self._need_next = True
167                 self._next = (0, '')
168
169         def _get_next(self) -> None:
170                 """Advances the LineSteam to the next line, if necessary."""
171                 if not self._need_next:
172                         return
173                 try:
174                         self._next = next(self._lines)
175                 except StopIteration:
176                         self._done = True
177                 finally:
178                         self._need_next = False
179
180         def peek(self) -> str:
181                 """Returns the current line, without advancing the LineStream.
182                 """
183                 self._get_next()
184                 return self._next[1]
185
186         def pop(self) -> str:
187                 """Returns the current line and advances the LineStream to
188                 the next line.
189                 """
190                 s = self.peek()
191                 if self._done:
192                         raise ValueError(f'LineStream: going past EOF, last line was {s}')
193                 self._need_next = True
194                 return s
195
196         def __bool__(self) -> bool:
197                 """Returns True if stream has more lines."""
198                 self._get_next()
199                 return not self._done
200
201         # Only used by kunit_tool_test.py.
202         def __iter__(self) -> Iterator[str]:
203                 """Empties all lines stored in LineStream object into
204                 Iterator object and returns the Iterator object.
205                 """
206                 while bool(self):
207                         yield self.pop()
208
209         def line_number(self) -> int:
210                 """Returns the line number of the current line."""
211                 self._get_next()
212                 return self._next[0]
213
214 # Parsing helper methods:
215
216 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
217 TAP_START = re.compile(r'TAP version ([0-9]+)$')
218 KTAP_END = re.compile('(List of all partitions:|'
219         'Kernel panic - not syncing: VFS:|reboot: System halted)')
220
221 def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream:
222         """Extracts KTAP lines from the kernel output."""
223         def isolate_ktap_output(kernel_output: Iterable[str]) \
224                         -> Iterator[Tuple[int, str]]:
225                 line_num = 0
226                 started = False
227                 for line in kernel_output:
228                         line_num += 1
229                         line = line.rstrip()  # remove trailing \n
230                         if not started and KTAP_START.search(line):
231                                 # start extracting KTAP lines and set prefix
232                                 # to number of characters before version line
233                                 prefix_len = len(
234                                         line.split('KTAP version')[0])
235                                 started = True
236                                 yield line_num, line[prefix_len:]
237                         elif not started and TAP_START.search(line):
238                                 # start extracting KTAP lines and set prefix
239                                 # to number of characters before version line
240                                 prefix_len = len(line.split('TAP version')[0])
241                                 started = True
242                                 yield line_num, line[prefix_len:]
243                         elif started and KTAP_END.search(line):
244                                 # stop extracting KTAP lines
245                                 break
246                         elif started:
247                                 # remove the prefix and optionally any leading
248                                 # whitespace. Our parsing logic relies on this.
249                                 line = line[prefix_len:]
250                                 if lstrip:
251                                         line = line.lstrip()
252                                 yield line_num, line
253         return LineStream(lines=isolate_ktap_output(kernel_output))
254
255 KTAP_VERSIONS = [1]
256 TAP_VERSIONS = [13, 14]
257
258 def check_version(version_num: int, accepted_versions: List[int],
259                         version_type: str, test: Test) -> None:
260         """
261         Adds error to test object if version number is too high or too
262         low.
263
264         Parameters:
265         version_num - The inputted version number from the parsed KTAP or TAP
266                 header line
267         accepted_version - List of accepted KTAP or TAP versions
268         version_type - 'KTAP' or 'TAP' depending on the type of
269                 version line.
270         test - Test object for current test being parsed
271         """
272         if version_num < min(accepted_versions):
273                 test.add_error(f'{version_type} version lower than expected!')
274         elif version_num > max(accepted_versions):
275                 test.add_error(f'{version_type} version higer than expected!')
276
277 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
278         """
279         Parses KTAP/TAP header line and checks version number.
280         Returns False if fails to parse KTAP/TAP header line.
281
282         Accepted formats:
283         - 'KTAP version [version number]'
284         - 'TAP version [version number]'
285
286         Parameters:
287         lines - LineStream of KTAP output to parse
288         test - Test object for current test being parsed
289
290         Return:
291         True if successfully parsed KTAP/TAP header line
292         """
293         ktap_match = KTAP_START.match(lines.peek())
294         tap_match = TAP_START.match(lines.peek())
295         if ktap_match:
296                 version_num = int(ktap_match.group(1))
297                 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
298         elif tap_match:
299                 version_num = int(tap_match.group(1))
300                 check_version(version_num, TAP_VERSIONS, 'TAP', test)
301         else:
302                 return False
303         test.log.append(lines.pop())
304         return True
305
306 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
307
308 def parse_test_header(lines: LineStream, test: Test) -> bool:
309         """
310         Parses test header and stores test name in test object.
311         Returns False if fails to parse test header line.
312
313         Accepted format:
314         - '# Subtest: [test name]'
315
316         Parameters:
317         lines - LineStream of KTAP output to parse
318         test - Test object for current test being parsed
319
320         Return:
321         True if successfully parsed test header line
322         """
323         match = TEST_HEADER.match(lines.peek())
324         if not match:
325                 return False
326         test.log.append(lines.pop())
327         test.name = match.group(1)
328         return True
329
330 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
331
332 def parse_test_plan(lines: LineStream, test: Test) -> bool:
333         """
334         Parses test plan line and stores the expected number of subtests in
335         test object. Reports an error if expected count is 0.
336         Returns False and sets expected_count to None if there is no valid test
337         plan.
338
339         Accepted format:
340         - '1..[number of subtests]'
341
342         Parameters:
343         lines - LineStream of KTAP output to parse
344         test - Test object for current test being parsed
345
346         Return:
347         True if successfully parsed test plan line
348         """
349         match = TEST_PLAN.match(lines.peek())
350         if not match:
351                 test.expected_count = None
352                 return False
353         test.log.append(lines.pop())
354         expected_count = int(match.group(1))
355         test.expected_count = expected_count
356         return True
357
358 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
359
360 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
361
362 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
363         """
364         Matches current line with the format of a test result line and checks
365         if the name matches the name of the current test.
366         Returns False if fails to match format or name.
367
368         Accepted format:
369         - '[ok|not ok] [test number] [-] [test name] [optional skip
370                 directive]'
371
372         Parameters:
373         lines - LineStream of KTAP output to parse
374         test - Test object for current test being parsed
375
376         Return:
377         True if matched a test result line and the name matching the
378                 expected test name
379         """
380         line = lines.peek()
381         match = TEST_RESULT.match(line)
382         if not match:
383                 return False
384         name = match.group(4)
385         return name == test.name
386
387 def parse_test_result(lines: LineStream, test: Test,
388                         expected_num: int) -> bool:
389         """
390         Parses test result line and stores the status and name in the test
391         object. Reports an error if the test number does not match expected
392         test number.
393         Returns False if fails to parse test result line.
394
395         Note that the SKIP directive is the only direction that causes a
396         change in status.
397
398         Accepted format:
399         - '[ok|not ok] [test number] [-] [test name] [optional skip
400                 directive]'
401
402         Parameters:
403         lines - LineStream of KTAP output to parse
404         test - Test object for current test being parsed
405         expected_num - expected test number for current test
406
407         Return:
408         True if successfully parsed a test result line.
409         """
410         line = lines.peek()
411         match = TEST_RESULT.match(line)
412         skip_match = TEST_RESULT_SKIP.match(line)
413
414         # Check if line matches test result line format
415         if not match:
416                 return False
417         test.log.append(lines.pop())
418
419         # Set name of test object
420         if skip_match:
421                 test.name = skip_match.group(4)
422         else:
423                 test.name = match.group(4)
424
425         # Check test num
426         num = int(match.group(2))
427         if num != expected_num:
428                 test.add_error(f'Expected test number {expected_num} but found {num}')
429
430         # Set status of test object
431         status = match.group(1)
432         if skip_match:
433                 test.status = TestStatus.SKIPPED
434         elif status == 'ok':
435                 test.status = TestStatus.SUCCESS
436         else:
437                 test.status = TestStatus.FAILURE
438         return True
439
440 def parse_diagnostic(lines: LineStream) -> List[str]:
441         """
442         Parse lines that do not match the format of a test result line or
443         test header line and returns them in list.
444
445         Line formats that are not parsed:
446         - '# Subtest: [test name]'
447         - '[ok|not ok] [test number] [-] [test name] [optional skip
448                 directive]'
449
450         Parameters:
451         lines - LineStream of KTAP output to parse
452
453         Return:
454         Log of diagnostic lines
455         """
456         log = []  # type: List[str]
457         while lines and not TEST_RESULT.match(lines.peek()) and not \
458                         TEST_HEADER.match(lines.peek()):
459                 log.append(lines.pop())
460         return log
461
462
463 # Printing helper methods:
464
465 DIVIDER = '=' * 60
466
467 def format_test_divider(message: str, len_message: int) -> str:
468         """
469         Returns string with message centered in fixed width divider.
470
471         Example:
472         '===================== message example ====================='
473
474         Parameters:
475         message - message to be centered in divider line
476         len_message - length of the message to be printed such that
477                 any characters of the color codes are not counted
478
479         Return:
480         String containing message centered in fixed width divider
481         """
482         default_count = 3  # default number of dashes
483         len_1 = default_count
484         len_2 = default_count
485         difference = len(DIVIDER) - len_message - 2  # 2 spaces added
486         if difference > 0:
487                 # calculate number of dashes for each side of the divider
488                 len_1 = int(difference / 2)
489                 len_2 = difference - len_1
490         return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492 def print_test_header(test: Test) -> None:
493         """
494         Prints test header with test name and optionally the expected number
495         of subtests.
496
497         Example:
498         '=================== example (2 subtests) ==================='
499
500         Parameters:
501         test - Test object representing current test being printed
502         """
503         message = test.name
504         if test.expected_count:
505                 if test.expected_count == 1:
506                         message += ' (1 subtest)'
507                 else:
508                         message += f' ({test.expected_count} subtests)'
509         stdout.print_with_timestamp(format_test_divider(message, len(message)))
510
511 def print_log(log: Iterable[str]) -> None:
512         """Prints all strings in saved log for test in yellow."""
513         for m in log:
514                 stdout.print_with_timestamp(stdout.yellow(m))
515
516 def format_test_result(test: Test) -> str:
517         """
518         Returns string with formatted test result with colored status and test
519         name.
520
521         Example:
522         '[PASSED] example'
523
524         Parameters:
525         test - Test object representing current test being printed
526
527         Return:
528         String containing formatted test result
529         """
530         if test.status == TestStatus.SUCCESS:
531                 return stdout.green('[PASSED] ') + test.name
532         if test.status == TestStatus.SKIPPED:
533                 return stdout.yellow('[SKIPPED] ') + test.name
534         if test.status == TestStatus.NO_TESTS:
535                 return stdout.yellow('[NO TESTS RUN] ') + test.name
536         if test.status == TestStatus.TEST_CRASHED:
537                 print_log(test.log)
538                 return stdout.red('[CRASHED] ') + test.name
539         print_log(test.log)
540         return stdout.red('[FAILED] ') + test.name
541
542 def print_test_result(test: Test) -> None:
543         """
544         Prints result line with status of test.
545
546         Example:
547         '[PASSED] example'
548
549         Parameters:
550         test - Test object representing current test being printed
551         """
552         stdout.print_with_timestamp(format_test_result(test))
553
554 def print_test_footer(test: Test) -> None:
555         """
556         Prints test footer with status of test.
557
558         Example:
559         '===================== [PASSED] example ====================='
560
561         Parameters:
562         test - Test object representing current test being printed
563         """
564         message = format_test_result(test)
565         stdout.print_with_timestamp(format_test_divider(message,
566                 len(message) - stdout.color_len()))
567
568 def print_summary_line(test: Test) -> None:
569         """
570         Prints summary line of test object. Color of line is dependent on
571         status of test. Color is green if test passes, yellow if test is
572         skipped, and red if the test fails or crashes. Summary line contains
573         counts of the statuses of the tests subtests or the test itself if it
574         has no subtests.
575
576         Example:
577         "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
578         Errors: 0"
579
580         test - Test object representing current test being printed
581         """
582         if test.status == TestStatus.SUCCESS:
583                 color = stdout.green
584         elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
585                 color = stdout.yellow
586         else:
587                 color = stdout.red
588         stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
589
590 # Other methods:
591
592 def bubble_up_test_results(test: Test) -> None:
593         """
594         If the test has subtests, add the test counts of the subtests to the
595         test and check if any of the tests crashed and if so set the test
596         status to crashed. Otherwise if the test has no subtests add the
597         status of the test to the test counts.
598
599         Parameters:
600         test - Test object for current test being parsed
601         """
602         subtests = test.subtests
603         counts = test.counts
604         status = test.status
605         for t in subtests:
606                 counts.add_subtest_counts(t.counts)
607         if counts.total() == 0:
608                 counts.add_status(status)
609         elif test.counts.get_status() == TestStatus.TEST_CRASHED:
610                 test.status = TestStatus.TEST_CRASHED
611
612 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
613         """
614         Finds next test to parse in LineStream, creates new Test object,
615         parses any subtests of the test, populates Test object with all
616         information (status, name) about the test and the Test objects for
617         any subtests, and then returns the Test object. The method accepts
618         three formats of tests:
619
620         Accepted test formats:
621
622         - Main KTAP/TAP header
623
624         Example:
625
626         KTAP version 1
627         1..4
628         [subtests]
629
630         - Subtest header line
631
632         Example:
633
634         # Subtest: name
635         1..3
636         [subtests]
637         ok 1 name
638
639         - Test result line
640
641         Example:
642
643         ok 1 - test
644
645         Parameters:
646         lines - LineStream of KTAP output to parse
647         expected_num - expected test number for test to be parsed
648         log - list of strings containing any preceding diagnostic lines
649                 corresponding to the current test
650
651         Return:
652         Test object populated with characteristics and any subtests
653         """
654         test = Test()
655         test.log.extend(log)
656         parent_test = False
657         main = parse_ktap_header(lines, test)
658         if main:
659                 # If KTAP/TAP header is found, attempt to parse
660                 # test plan
661                 test.name = "main"
662                 parse_test_plan(lines, test)
663                 parent_test = True
664         else:
665                 # If KTAP/TAP header is not found, test must be subtest
666                 # header or test result line so parse attempt to parser
667                 # subtest header
668                 parent_test = parse_test_header(lines, test)
669                 if parent_test:
670                         # If subtest header is found, attempt to parse
671                         # test plan and print header
672                         parse_test_plan(lines, test)
673                         print_test_header(test)
674         expected_count = test.expected_count
675         subtests = []
676         test_num = 1
677         while parent_test and (expected_count is None or test_num <= expected_count):
678                 # Loop to parse any subtests.
679                 # Break after parsing expected number of tests or
680                 # if expected number of tests is unknown break when test
681                 # result line with matching name to subtest header is found
682                 # or no more lines in stream.
683                 sub_log = parse_diagnostic(lines)
684                 sub_test = Test()
685                 if not lines or (peek_test_name_match(lines, test) and
686                                 not main):
687                         if expected_count and test_num <= expected_count:
688                                 # If parser reaches end of test before
689                                 # parsing expected number of subtests, print
690                                 # crashed subtest and record error
691                                 test.add_error('missing expected subtest!')
692                                 sub_test.log.extend(sub_log)
693                                 test.counts.add_status(
694                                         TestStatus.TEST_CRASHED)
695                                 print_test_result(sub_test)
696                         else:
697                                 test.log.extend(sub_log)
698                                 break
699                 else:
700                         sub_test = parse_test(lines, test_num, sub_log)
701                 subtests.append(sub_test)
702                 test_num += 1
703         test.subtests = subtests
704         if not main:
705                 # If not main test, look for test result line
706                 test.log.extend(parse_diagnostic(lines))
707                 if (parent_test and peek_test_name_match(lines, test)) or \
708                                 not parent_test:
709                         parse_test_result(lines, test, expected_num)
710                 else:
711                         test.add_error('missing subtest result line!')
712
713         # Check for there being no tests
714         if parent_test and len(subtests) == 0:
715                 # Don't override a bad status if this test had one reported.
716                 # Assumption: no subtests means CRASHED is from Test.__init__()
717                 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
718                         test.status = TestStatus.NO_TESTS
719                         test.add_error('0 tests run!')
720
721         # Add statuses to TestCounts attribute in Test object
722         bubble_up_test_results(test)
723         if parent_test and not main:
724                 # If test has subtests and is not the main test object, print
725                 # footer.
726                 print_test_footer(test)
727         elif not main:
728                 print_test_result(test)
729         return test
730
731 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
732         """
733         Using kernel output, extract KTAP lines, parse the lines for test
734         results and print condensed test results and summary line.
735
736         Parameters:
737         kernel_output - Iterable object contains lines of kernel output
738
739         Return:
740         Test - the main test object with all subtests.
741         """
742         stdout.print_with_timestamp(DIVIDER)
743         lines = extract_tap_lines(kernel_output)
744         test = Test()
745         if not lines:
746                 test.name = '<missing>'
747                 test.add_error('could not find any KTAP output!')
748                 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
749         else:
750                 test = parse_test(lines, 0, [])
751                 if test.status != TestStatus.NO_TESTS:
752                         test.status = test.counts.get_status()
753         stdout.print_with_timestamp(DIVIDER)
754         print_summary_line(test)
755         return test