2 # Copyright 2020 The Pigweed Authors
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
8 # https://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
15 """Contains the Python decoder tests and generates C++ decoder tests."""
17 from typing import Iterator, List, NamedTuple, Tuple
20 from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest
21 from pw_build.generated_tests import parse_test_generation_args
22 from pw_hdlc_lite.decode import Frame, FrameDecoder, FrameStatus, NO_ADDRESS
23 from pw_hdlc_lite.protocol import frame_check_sequence as fcs
26 def _encode(address: int, control: int, data: bytes) -> bytes:
27 frame = bytearray([address, control]) + data
29 frame = frame.replace(b'\x7d', b'\x7d\x5d')
30 frame = frame.replace(b'\x7e', b'\x7d\x5e')
31 return b''.join([b'\x7e', frame, b'\x7e'])
34 class Expected(NamedTuple):
38 status: FrameStatus = FrameStatus.OK
40 def __eq__(self, other) -> bool:
41 """Define == so an Expected and a Frame can be compared."""
42 return (self.address == other.address and self.control == other.control
43 and self.data == other.data and self.status is other.status)
46 _PARTIAL = fcs(b'\x0ACmsg\x5e')
47 _ESCAPED_FLAG_TEST_CASE = (
48 b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e',
50 Expected(0xA, b'C', b'', FrameStatus.INCOMPLETE),
51 Expected(_PARTIAL[0], _PARTIAL[1:2], b'', FrameStatus.INCOMPLETE),
55 TEST_CASES: Tuple[GroupOrTest[Tuple[bytes, List[Expected]]], ...] = (
57 (_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
58 (_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
59 (_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
60 'Simple one-byte payload',
61 (_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
62 (_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
63 'Simple multi-byte payload',
64 (_encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]),
65 (_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]),
66 'Escaped one-byte payload',
67 (_encode(1, 2, b'\x7e'), [Expected(1, b'\2', b'\x7e')]),
68 (_encode(1, 2, b'\x7d'), [Expected(1, b'\2', b'\x7d')]),
69 (_encode(1, 2, b'\x7e') + _encode(1, 2, b'\x7d'),
70 [Expected(1, b'\2', b'\x7e'),
71 Expected(1, b'\2', b'\x7d')]),
73 (_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]),
74 (_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]),
76 (_encode(0, 0x7e, b'C'), [Expected(0, b'\x7e', b'C')]),
77 (_encode(0, 0x7d, b'D'), [Expected(0, b'\x7d', b'D')]),
78 'Escaped address and control',
79 (_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'\x7d', b'E')]),
80 (_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'\x7e', b'F')]),
81 (_encode(0x7e, 0x7e, b'\x7e'), [Expected(0x7e, b'\x7e', b'\x7e')]),
82 'Multiple frames separated by single flag',
83 (_encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
84 [Expected(0, b'\0', b'A'),
85 Expected(1, b'\2', b'123')]),
86 (_encode(0xff, 0, b'Yo')[:-1] * 3 + b'\x7e',
87 [Expected(0xff, b'\0', b'Yo')] * 3),
88 'Ignore empty frames',
91 (b'\x7e\x7e' + _encode(1, 2, b'3') + b'\x7e' * 5,
92 [Expected(1, b'\2', b'3')]),
93 (b'\x7e' * 10 + _encode(1, 2, b':O') + b'\x7e' * 3 + _encode(3, 4, b':P'),
94 [Expected(1, b'\2', b':O'),
95 Expected(3, b'\4', b':P')]),
97 (b'\x7e\xAA\x7d\x7e\xab\x00Hello' + fcs(b'\xab\0Hello') + b'\x7e', [
98 Expected(0xAA, b'', b'', FrameStatus.INCOMPLETE),
99 Expected(0xab, b'\0', b'Hello'),
101 _ESCAPED_FLAG_TEST_CASE,
103 (b'\x7e1\x7e', [Expected(ord('1'), b'', b'', FrameStatus.INCOMPLETE)]),
104 (b'\x7e12\x7e', [Expected(ord('1'), b'2', b'', FrameStatus.INCOMPLETE)]),
105 (b'\x7e12345\x7e', [Expected(ord('1'), b'2', b'',
106 FrameStatus.INCOMPLETE)]),
107 'Incorrect frame check sequence',
109 [Expected(ord('1'), b'2', b'', FrameStatus.FCS_MISMATCH)]),
110 (b'\x7e\1\2msg\xff\xff\xff\xff\x7e',
111 [Expected(0x1, b'\2', b'msg', FrameStatus.FCS_MISMATCH)]),
112 (_encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [
113 Expected(0xA, b'\x0B', b'??', FrameStatus.FCS_MISMATCH),
114 Expected(1, b'\2', b'def'),
116 'Invalid escape in address',
117 (b'\x7e\x7d\x7d\0' + fcs(b'\x5d\0') + b'\x7e',
119 fcs(b'\x5d\0')[0:1], b'', FrameStatus.INVALID_ESCAPE)]),
120 'Invalid escape in control',
121 (b'\x7e\0\x7d\x7d' + fcs(b'\0\x5d') + b'\x7e',
123 fcs(b'\0\x5d')[0:1], b'', FrameStatus.INVALID_ESCAPE)]),
124 'Invalid escape in data',
125 (b'\x7e\0\1\x7d\x7d' + fcs(b'\0\1\x5d') + b'\x7e',
126 [Expected(0, b'\1', b'', FrameStatus.INVALID_ESCAPE)]),
127 'Frame ends with escape',
128 (b'\x7e\x7d\x7e', [Expected(NO_ADDRESS, b'', b'',
129 FrameStatus.INCOMPLETE)]),
130 (b'\x7e\1\x7d\x7e', [Expected(1, b'', b'', FrameStatus.INCOMPLETE)]),
131 (b'\x7e\1\2abc\x7d\x7e', [Expected(1, b'\2', b'',
132 FrameStatus.INCOMPLETE)]),
133 (b'\x7e\1\2abcd\x7d\x7e',
134 [Expected(1, b'\2', b'', FrameStatus.INCOMPLETE)]),
135 (b'\x7e\1\2abcd1234\x7d\x7e',
136 [Expected(1, b'\2', b'abcd', FrameStatus.INCOMPLETE)]),
137 'Inter-frame data is only escapes',
138 (b'\x7e\x7d\x7e\x7d\x7e', [
139 Expected(NO_ADDRESS, b'', b'', FrameStatus.INCOMPLETE),
140 Expected(NO_ADDRESS, b'', b'', FrameStatus.INCOMPLETE),
142 (b'\x7e\x7d\x7d\x7e\x7d\x7d\x7e', [
143 Expected(NO_ADDRESS, b'', b'', FrameStatus.INVALID_ESCAPE),
144 Expected(NO_ADDRESS, b'', b'', FrameStatus.INVALID_ESCAPE),
146 'Data before first flag',
147 (b'\0\1' + fcs(b'\0\1'), []),
148 (b'\0\1' + fcs(b'\0\1') + b'\x7e',
149 [Expected(0, b'\1', b'', FrameStatus.INCOMPLETE)]),
150 'No frames emitted until flag',
151 (_encode(1, 2, b'3')[:-1], []),
152 (b'\x7e' + _encode(1, 2, b'3')[1:-1] * 2, []),
154 # Formatting for the above tuple is very slow, so disable yapf.
156 _TESTS = TestGenerator(TEST_CASES)
159 def _expected(frames: List[Frame]) -> Iterator[str]:
160 for i, frame in enumerate(frames, 1):
162 yield f' Frame(kDecodedFrame{i:02}),'
164 yield f' Status::DATA_LOSS, // Frame {i}'
168 #include "pw_hdlc_lite/decoder.h"
174 #include "gtest/gtest.h"
175 #include "pw_bytes/array.h"
177 namespace pw::hdlc_lite {
183 } // namespace pw::hdlc_lite"""
186 def _cpp_test(ctx: Context) -> Iterator[str]:
187 """Generates a C++ test for the provided test data."""
188 data, _ = ctx.test_case
189 frames = list(FrameDecoder().process(data))
190 data_bytes = ''.join(rf'\x{byte:02x}' for byte in data)
192 yield f'TEST(Decoder, {ctx.cc_name()}) {{'
193 yield f' static constexpr auto kData = bytes::String("{data_bytes}");\n'
195 for i, frame in enumerate(frames, 1):
196 if frame.status is FrameStatus.OK:
197 frame_bytes = ''.join(rf'\x{byte:02x}' for byte in frame.raw)
198 yield (f' static constexpr auto kDecodedFrame{i:02} = '
199 f'bytes::String("{frame_bytes}");')
201 yield f' // Frame {i}: {frame.status.value}'
205 expected = '\n'.join(_expected(frames)) or ' // No frames'
206 decoder_size = max(len(data), 8) # Make sure large enough for a frame
209 DecoderBuffer<{decoder_size}> decoder;
211 static constexpr std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{
215 size_t decoded_frames = 0;
217 decoder.Process(kData, [&](const Result<Frame>& result) {{
218 ASSERT_LT(decoded_frames++, kExpected.size());
219 auto& expected = kExpected[decoded_frames - 1];
221 if (std::holds_alternative<Status>(expected)) {{
222 EXPECT_EQ(Status::DATA_LOSS, result.status());
224 ASSERT_EQ(Status::OK, result.status());
226 const Frame& decoded_frame = result.value();
227 const Frame& expected_frame = std::get<Frame>(expected);
228 EXPECT_EQ(expected_frame.address(), decoded_frame.address());
229 EXPECT_EQ(expected_frame.control(), decoded_frame.control());
230 ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size());
231 EXPECT_EQ(std::memcmp(expected_frame.data().data(),
232 decoded_frame.data().data(),
233 expected_frame.data().size()),
238 EXPECT_EQ(decoded_frames, kExpected.size());
242 def _define_py_test(ctx: Context) -> PyTest:
243 data, expected_frames = ctx.test_case
245 def test(self) -> None:
247 self.assertEqual(expected_frames,
248 list(FrameDecoder().process(data)),
249 msg=f'{ctx.group}: {data!r}')
251 # Decode byte-by-byte
252 decoder = FrameDecoder()
254 for i in range(len(data)):
255 decoded_frames += decoder.process(data[i:i + 1])
257 self.assertEqual(expected_frames,
259 msg=f'{ctx.group} (byte-by-byte): {data!r}')
264 # Class that tests all cases in TEST_CASES.
265 DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_test)
267 if __name__ == '__main__':
268 args = parse_test_generation_args()
269 if args.generate_cc_test:
270 _TESTS.cc_tests(args.generate_cc_test, _cpp_test, _CPP_HEADER,