Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_hdlc / py / pw_hdlc / decode.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Decoder class for decoding bytes using HDLC protocol"""
15
16 from dataclasses import dataclass
17 import enum
18 import logging
19 from typing import Iterator, Optional
20 import zlib
21
22 from pw_hdlc import protocol
23
24 _LOG = logging.getLogger('pw_hdlc')
25
26 NO_ADDRESS = -1
27 _MIN_FRAME_SIZE = 6  # 1 B address + 1 B control + 4 B CRC-32
28
29
30 class FrameStatus(enum.Enum):
31     """Indicates that an error occurred."""
32     OK = 'OK'
33     FCS_MISMATCH = 'frame check sequence failure'
34     FRAMING_ERROR = 'invalid flag or escape characters'
35
36
37 @dataclass(frozen=True)
38 class Frame:
39     """Represents an HDLC frame."""
40
41     # The complete HDLC-encoded frame, excluding HDLC flag characters.
42     raw_encoded: bytes
43
44     # The complete decoded frame (address, control, information, FCS).
45     raw_decoded: bytes
46
47     # Whether parsing the frame succeeded.
48     status: FrameStatus = FrameStatus.OK
49
50     @property
51     def address(self) -> int:
52         """The frame's address field (assumes only one byte for now)."""
53         return self.raw_decoded[0] if self.ok() else NO_ADDRESS
54
55     @property
56     def control(self) -> bytes:
57         """The control byte (assumes only one byte for now)."""
58         return self.raw_decoded[1:2] if self.ok() else b''
59
60     @property
61     def data(self) -> bytes:
62         """The information field in the frame."""
63         return self.raw_decoded[2:-4] if self.ok() else b''
64
65     def ok(self) -> bool:
66         """True if this represents a valid frame.
67
68         If false, then parsing failed. The status is set to indicate what type
69         of error occurred, and the data field contains all bytes parsed from the
70         frame (including bytes parsed as address or control bytes).
71         """
72         return self.status is FrameStatus.OK
73
74     def __repr__(self) -> str:
75         if self.ok():
76             body = (f'address={self.address}, control={self.control!r}, '
77                     f'data={self.data!r}')
78         else:
79             body = str(self.status)
80
81         return f'{type(self).__name__}({body})'
82
83
84 class _State(enum.Enum):
85     INTERFRAME = 0
86     FRAME = 1
87     FRAME_ESCAPE = 2
88
89
90 def _check_frame(frame_data: bytes) -> FrameStatus:
91     if len(frame_data) < _MIN_FRAME_SIZE:
92         return FrameStatus.FRAMING_ERROR
93
94     frame_crc = int.from_bytes(frame_data[-4:], 'little')
95     if zlib.crc32(frame_data[:-4]) != frame_crc:
96         return FrameStatus.FCS_MISMATCH
97
98     return FrameStatus.OK
99
100
101 class FrameDecoder:
102     """Decodes one or more HDLC frames from a stream of data."""
103     def __init__(self):
104         self._decoded_data = bytearray()
105         self._raw_data = bytearray()
106         self._state = _State.INTERFRAME
107
108     def process(self, data: bytes) -> Iterator[Frame]:
109         """Decodes and yields HDLC frames, including corrupt frames.
110
111         The ok() method on Frame indicates whether it is valid or represents a
112         frame parsing error.
113
114         Yields:
115           Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
116         """
117         for byte in data:
118             frame = self._process_byte(byte)
119             if frame:
120                 yield frame
121
122     def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
123         """Decodes and yields valid HDLC frames, logging any errors."""
124         for frame in self.process(data):
125             if frame.ok():
126                 yield frame
127             else:
128                 _LOG.warning('Failed to decode frame: %s; discarded %d bytes',
129                              frame.status.value, len(frame.raw_encoded))
130                 _LOG.debug('Discarded data: %s', frame.raw_encoded)
131
132     def _finish_frame(self, status: FrameStatus) -> Frame:
133         frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
134         self._raw_data.clear()
135         self._decoded_data.clear()
136         return frame
137
138     def _process_byte(self, byte: int) -> Optional[Frame]:
139         frame: Optional[Frame] = None
140
141         # Record every byte except the flag character.
142         if byte != protocol.FLAG:
143             self._raw_data.append(byte)
144
145         if self._state is _State.INTERFRAME:
146             if byte == protocol.FLAG:
147                 if self._raw_data:
148                     frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
149
150                 self._state = _State.FRAME
151         elif self._state is _State.FRAME:
152             if byte == protocol.FLAG:
153                 if self._raw_data:
154                     frame = self._finish_frame(_check_frame(
155                         self._decoded_data))
156
157                 self._state = _State.FRAME
158             elif byte == protocol.ESCAPE:
159                 self._state = _State.FRAME_ESCAPE
160             else:
161                 self._decoded_data.append(byte)
162         elif self._state is _State.FRAME_ESCAPE:
163             if byte == protocol.FLAG:
164                 frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
165                 self._state = _State.FRAME
166             elif byte in protocol.VALID_ESCAPED_BYTES:
167                 self._state = _State.FRAME
168                 self._decoded_data.append(protocol.escape(byte))
169             else:
170                 self._state = _State.INTERFRAME
171         else:
172             raise AssertionError(f'Invalid decoder state: {self._state}')
173
174         return frame