Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_hdlc / py / decode_test.py
1 #!/usr/bin/env python
2 # Copyright 2020 The Pigweed Authors
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
6 # the License at
7 #
8 #     https://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
14 # the License.
15 """Contains the Python decoder tests and generates C++ decoder tests."""
16
17 from typing import Iterator, List, NamedTuple, Tuple, Union
18 import unittest
19
20 from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest
21 from pw_build.generated_tests import parse_test_generation_args
22 from pw_hdlc.decode import Frame, FrameDecoder, FrameStatus, NO_ADDRESS
23 from pw_hdlc.protocol import frame_check_sequence as fcs
24
25
26 def _encode(address: int, control: int, data: bytes) -> bytes:
27     frame = bytearray([address, control]) + data
28     frame += fcs(frame)
29     frame = frame.replace(b'\x7d', b'\x7d\x5d')
30     frame = frame.replace(b'\x7e', b'\x7d\x5e')
31     return b''.join([b'\x7e', frame, b'\x7e'])
32
33
34 class Expected(NamedTuple):
35     address: int
36     control: bytes
37     data: bytes
38     status: FrameStatus = FrameStatus.OK
39
40     @classmethod
41     def error(cls, status: FrameStatus):
42         assert status is not FrameStatus.OK
43         return cls(NO_ADDRESS, b'', b'', status)
44
45     def __eq__(self, other) -> bool:
46         """Define == so an Expected and a Frame can be compared."""
47         return (self.address == other.address and self.control == other.control
48                 and self.data == other.data and self.status is other.status)
49
50
51 class ExpectedRaw(NamedTuple):
52     raw_encoded: bytes
53     status: FrameStatus
54
55     def __eq__(self, other) -> bool:
56         """Define == so an ExpectedRaw and a Frame can be compared."""
57         return (self.raw_encoded == other.raw_encoded
58                 and self.status is other.status)
59
60
61 Expectation = Union[Expected, ExpectedRaw]
62
63 _PARTIAL = fcs(b'\x0ACmsg\x5e')
64 _ESCAPED_FLAG_TEST_CASE = (
65     b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e',
66     [
67         Expected.error(FrameStatus.FRAMING_ERROR),
68         Expected.error(FrameStatus.FRAMING_ERROR),
69     ],
70 )
71
72 TEST_CASES: Tuple[GroupOrTest[Tuple[bytes, List[Expectation]]], ...] = (
73     'Empty payload',
74     (_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
75     (_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
76     (_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
77     'Simple one-byte payload',
78     (_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
79     (_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
80     'Simple multi-byte payload',
81     (_encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]),
82     (_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]),
83     'Escaped one-byte payload',
84     (_encode(1, 2, b'\x7e'), [Expected(1, b'\2', b'\x7e')]),
85     (_encode(1, 2, b'\x7d'), [Expected(1, b'\2', b'\x7d')]),
86     (_encode(1, 2, b'\x7e') + _encode(1, 2, b'\x7d'),
87      [Expected(1, b'\2', b'\x7e'),
88       Expected(1, b'\2', b'\x7d')]),
89     'Escaped address',
90     (_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]),
91     (_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]),
92     'Escaped control',
93     (_encode(0, 0x7e, b'C'), [Expected(0, b'\x7e', b'C')]),
94     (_encode(0, 0x7d, b'D'), [Expected(0, b'\x7d', b'D')]),
95     'Escaped address and control',
96     (_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'\x7d', b'E')]),
97     (_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'\x7e', b'F')]),
98     (_encode(0x7e, 0x7e, b'\x7e'), [Expected(0x7e, b'\x7e', b'\x7e')]),
99     'Multiple frames separated by single flag',
100     (_encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
101      [Expected(0, b'\0', b'A'),
102       Expected(1, b'\2', b'123')]),
103     (_encode(0xff, 0, b'Yo')[:-1] * 3 + b'\x7e',
104      [Expected(0xff, b'\0', b'Yo')] * 3),
105     'Ignore empty frames',
106     (b'\x7e\x7e', []),
107     (b'\x7e' * 10, []),
108     (b'\x7e\x7e' + _encode(1, 2, b'3') + b'\x7e' * 5,
109      [Expected(1, b'\2', b'3')]),
110     (b'\x7e' * 10 + _encode(1, 2, b':O') + b'\x7e' * 3 + _encode(3, 4, b':P'),
111      [Expected(1, b'\2', b':O'),
112       Expected(3, b'\4', b':P')]),
113     'Cannot escape flag',
114     (b'\x7e\xAA\x7d\x7e\xab\x00Hello' + fcs(b'\xab\0Hello') + b'\x7e', [
115         Expected.error(FrameStatus.FRAMING_ERROR),
116         Expected(0xab, b'\0', b'Hello'),
117     ]),
118     _ESCAPED_FLAG_TEST_CASE,
119     'Frame too short',
120     (b'\x7e1\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
121     (b'\x7e12\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
122     (b'\x7e12345\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
123     'Incorrect frame check sequence',
124     (b'\x7e123456\x7e', [Expected.error(FrameStatus.FCS_MISMATCH)]),
125     (b'\x7e\1\2msg\xff\xff\xff\xff\x7e',
126      [Expected.error(FrameStatus.FCS_MISMATCH)]),
127     (_encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [
128         Expected.error(FrameStatus.FCS_MISMATCH),
129         Expected(1, b'\2', b'def'),
130     ]),
131     'Invalid escape in address',
132     (b'\x7e\x7d\x7d\0' + fcs(b'\x5d\0') + b'\x7e',
133      [Expected.error(FrameStatus.FRAMING_ERROR)]),
134     'Invalid escape in control',
135     (b'\x7e\0\x7d\x7d' + fcs(b'\0\x5d') + b'\x7e',
136      [Expected.error(FrameStatus.FRAMING_ERROR)]),
137     'Invalid escape in data',
138     (b'\x7e\0\1\x7d\x7d' + fcs(b'\0\1\x5d') + b'\x7e',
139      [Expected.error(FrameStatus.FRAMING_ERROR)]),
140     'Frame ends with escape',
141     (b'\x7e\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
142     (b'\x7e\1\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
143     (b'\x7e\1\2abc\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
144     (b'\x7e\1\2abcd\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
145     (b'\x7e\1\2abcd1234\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
146     'Inter-frame data is only escapes',
147     (b'\x7e\x7d\x7e\x7d\x7e', [
148         Expected.error(FrameStatus.FRAMING_ERROR),
149         Expected.error(FrameStatus.FRAMING_ERROR),
150     ]),
151     (b'\x7e\x7d\x7d\x7e\x7d\x7d\x7e', [
152         Expected.error(FrameStatus.FRAMING_ERROR),
153         Expected.error(FrameStatus.FRAMING_ERROR),
154     ]),
155     'Data before first flag',
156     (b'\0\1' + fcs(b'\0\1'), []),
157     (b'\0\1' + fcs(b'\0\1') + b'\x7e',
158      [Expected.error(FrameStatus.FRAMING_ERROR)]),
159     'No frames emitted until flag',
160     (_encode(1, 2, b'3')[:-1], []),
161     (b'\x7e' + _encode(1, 2, b'3')[1:-1] * 2, []),
162     'Only flag and escape characters can be escaped',
163     (b'\x7e\x7d\0' + _encode(1, 2, b'3'),
164      [Expected.error(FrameStatus.FRAMING_ERROR),
165       Expected(1, b'\2', b'3')]),
166     (b'\x7e1234\x7da' + _encode(1, 2, b'3'),
167      [Expected.error(FrameStatus.FRAMING_ERROR),
168       Expected(1, b'\2', b'3')]),
169     'Invalid frame records raw data',
170     (b'Hello?~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]),
171     (b'~~Hel\x7d\x7dlo~',
172      [ExpectedRaw(b'Hel\x7d\x7dlo', FrameStatus.FRAMING_ERROR)]),
173     (b'Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]),
174     (b'~~~~Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FCS_MISMATCH)]),
175     (b'Hello?~~Goodbye~', [
176         ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR),
177         ExpectedRaw(b'Goodbye', FrameStatus.FCS_MISMATCH),
178     ]),
179 )  # yapf: disable
180 # Formatting for the above tuple is very slow, so disable yapf.
181
182 _TESTS = TestGenerator(TEST_CASES)
183
184
185 def _expected(frames: List[Frame]) -> Iterator[str]:
186     for i, frame in enumerate(frames, 1):
187         if frame.ok():
188             yield f'      Frame(kDecodedFrame{i:02}),'
189         else:
190             yield f'      Status::DataLoss(),  // Frame {i}'
191
192
193 _CPP_HEADER = """\
194 #include "pw_hdlc/decoder.h"
195
196 #include <array>
197 #include <cstddef>
198 #include <variant>
199
200 #include "gtest/gtest.h"
201 #include "pw_bytes/array.h"
202
203 namespace pw::hdlc {
204 namespace {
205 """
206
207 _CPP_FOOTER = """\
208 }  // namespace
209 }  // namespace pw::hdlc"""
210
211
212 def _cpp_test(ctx: Context) -> Iterator[str]:
213     """Generates a C++ test for the provided test data."""
214     data, _ = ctx.test_case
215     frames = list(FrameDecoder().process(data))
216     data_bytes = ''.join(rf'\x{byte:02x}' for byte in data)
217
218     yield f'TEST(Decoder, {ctx.cc_name()}) {{'
219     yield f'  static constexpr auto kData = bytes::String("{data_bytes}");\n'
220
221     for i, frame in enumerate(frames, 1):
222         if frame.status is FrameStatus.OK:
223             frame_bytes = ''.join(rf'\x{byte:02x}'
224                                   for byte in frame.raw_decoded)
225             yield (f'  static constexpr auto kDecodedFrame{i:02} = '
226                    f'bytes::String("{frame_bytes}");')
227         else:
228             yield f'  // Frame {i}: {frame.status.value}'
229
230     yield ''
231
232     expected = '\n'.join(_expected(frames)) or '      // No frames'
233     decoder_size = max(len(data), 8)  # Make sure large enough for a frame
234
235     yield f"""\
236   DecoderBuffer<{decoder_size}> decoder;
237
238   static constexpr std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{
239 {expected}
240   }};
241
242   size_t decoded_frames = 0;
243
244   decoder.Process(kData, [&](const Result<Frame>& result) {{
245     ASSERT_LT(decoded_frames++, kExpected.size());
246     auto& expected = kExpected[decoded_frames - 1];
247
248     if (std::holds_alternative<Status>(expected)) {{
249       EXPECT_EQ(Status::DataLoss(), result.status());
250     }} else {{
251       ASSERT_EQ(OkStatus(), result.status());
252
253       const Frame& decoded_frame = result.value();
254       const Frame& expected_frame = std::get<Frame>(expected);
255       EXPECT_EQ(expected_frame.address(), decoded_frame.address());
256       EXPECT_EQ(expected_frame.control(), decoded_frame.control());
257       ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size());
258       EXPECT_EQ(std::memcmp(expected_frame.data().data(),
259                             decoded_frame.data().data(),
260                             expected_frame.data().size()),
261                 0);
262     }}
263   }});
264
265   EXPECT_EQ(decoded_frames, kExpected.size());
266 }}"""
267
268
269 def _define_py_test(ctx: Context) -> PyTest:
270     data, expected_frames = ctx.test_case
271
272     def test(self) -> None:
273         # Decode in one call
274         self.assertEqual(expected_frames,
275                          list(FrameDecoder().process(data)),
276                          msg=f'{ctx.group}: {data!r}')
277
278         # Decode byte-by-byte
279         decoder = FrameDecoder()
280         decoded_frames: List[Frame] = []
281         for i in range(len(data)):
282             decoded_frames += decoder.process(data[i:i + 1])
283
284         self.assertEqual(expected_frames,
285                          decoded_frames,
286                          msg=f'{ctx.group} (byte-by-byte): {data!r}')
287
288     return test
289
290
291 # Class that tests all cases in TEST_CASES.
292 DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_test)
293
294 if __name__ == '__main__':
295     args = parse_test_generation_args()
296     if args.generate_cc_test:
297         _TESTS.cc_tests(args.generate_cc_test, _cpp_test, _CPP_HEADER,
298                         _CPP_FOOTER)
299     else:
300         unittest.main()