Change script for apply upstream code
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_hdlc_lite / py / pw_hdlc_lite / decode.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Decoder class for decoding bytes using HDLC-Lite protocol"""
15
16 import enum
17 import logging
18 from typing import Iterator, NamedTuple, Optional, Tuple
19 import zlib
20
21 from pw_hdlc_lite import protocol
22
23 _LOG = logging.getLogger('pw_hdlc_lite')
24
25
26 class FrameStatus(enum.Enum):
27     """Indicates that an error occurred."""
28     OK = 'OK'
29     FCS_MISMATCH = 'frame check sequence failure'
30     INCOMPLETE = 'incomplete frame'
31     INVALID_ESCAPE = 'invalid escape character'
32
33
34 _MIN_FRAME_SIZE = 6  # 1 B address + 1 B control + 4 B CRC-32
35
36 NO_ADDRESS = -1
37
38
39 class Frame(NamedTuple):
40     """Represents an HDLC frame."""
41
42     # All bytes in the frame (address, control, information, FCS)
43     raw: bytes
44
45     # Whether parsing the frame succeeded.
46     status: FrameStatus = FrameStatus.OK
47
48     @property
49     def address(self) -> int:
50         """The frame's address field (assumes only one byte for now)."""
51         return self.raw[0] if self.raw else NO_ADDRESS
52
53     @property
54     def control(self) -> bytes:
55         """The control byte (assumes only one byte for now)."""
56         return self.raw[1:2] if len(self.raw) >= 2 else b''
57
58     @property
59     def data(self) -> bytes:
60         """The information field in the frame."""
61         return self.raw[2:-4] if len(self.raw) >= _MIN_FRAME_SIZE else b''
62
63     def ok(self) -> bool:
64         """True if this represents a valid frame.
65
66         If false, then parsing failed. The status is set to indicate what type
67         of error occurred, and the data field contains all bytes parsed from the
68         frame (including bytes parsed as address or control bytes).
69         """
70         return self.status is FrameStatus.OK
71
72
73 class _BaseFrameState:
74     """Base class for all frame parsing states."""
75     def __init__(self, data: bytearray):
76         self._data = data  # All data seen in the current frame
77         self._escape_next = False
78
79     def handle_flag(self) -> Tuple['_BaseFrameState', Optional[Frame]]:
80         """Handles an HDLC flag character (0x7e).
81
82         The HDLC flag is always interpreted as the start of a new frame.
83
84         Returns:
85             (next state, optional frame or error)
86         """
87         # If there is data or an escape character, the frame is incomplete.
88         if self._escape_next or self._data:
89             return _AddressState(), Frame(bytes(self._data),
90                                           FrameStatus.INCOMPLETE)
91
92         return _AddressState(), None
93
94     def handle_escape(self) -> '_BaseFrameState':
95         """Handles an HDLC escape character (0x7d); returns the next state."""
96         if self._escape_next:
97             # If two escapes occur in a row, the frame is invalid.
98             return _InterframeState(self._data, FrameStatus.INVALID_ESCAPE)
99
100         self._escape_next = True
101         return self
102
103     def handle_byte(self, byte: int) -> '_BaseFrameState':
104         """Handles a byte, which may have been escaped; returns next state."""
105         self._data.append(protocol.escape(byte) if self._escape_next else byte)
106         self._escape_next = False
107         return self
108
109
110 class _InterframeState(_BaseFrameState):
111     """Not currently in a frame; any data is discarded."""
112     def __init__(self, data: bytearray, error: FrameStatus):
113         super().__init__(data)
114         self._error = error
115
116     def handle_flag(self) -> Tuple[_BaseFrameState, Optional[Frame]]:
117         # If this state was entered due to an error, report that error before
118         # starting a new frame.
119         if self._error is not FrameStatus.OK:
120             return _AddressState(), Frame(bytes(self._data), self._error)
121
122         return super().handle_flag()
123
124
125 class _AddressState(_BaseFrameState):
126     """First field in a frame: the address."""
127     def __init__(self):
128         super().__init__(bytearray())
129
130     def handle_byte(self, byte: int) -> _BaseFrameState:
131         super().handle_byte(byte)
132         # Only handle single-byte addresses for now.
133         return _ControlState(self._data)
134
135
136 class _ControlState(_BaseFrameState):
137     """Second field in a frame: control."""
138     def handle_byte(self, byte: int) -> _BaseFrameState:
139         super().handle_byte(byte)
140         # Only handle a single control byte for now.
141         return _DataState(self._data)
142
143
144 class _DataState(_BaseFrameState):
145     """The information field in a frame."""
146     def handle_flag(self) -> Tuple[_BaseFrameState, Frame]:
147         return _AddressState(), Frame(bytes(self._data), self._check_frame())
148
149     def _check_frame(self) -> FrameStatus:
150         # If the last character was an escape, assume bytes are missing.
151         if self._escape_next or len(self._data) < _MIN_FRAME_SIZE:
152             return FrameStatus.INCOMPLETE
153
154         frame_crc = int.from_bytes(self._data[-4:], 'little')
155         if zlib.crc32(self._data[:-4]) != frame_crc:
156             return FrameStatus.FCS_MISMATCH
157
158         return FrameStatus.OK
159
160
161 class FrameDecoder:
162     """Decodes one or more HDLC frames from a stream of data."""
163     def __init__(self):
164         self._data = bytearray()
165         self._unescape_next_byte_flag = False
166         self._state = _InterframeState(bytearray(), FrameStatus.OK)
167
168     def process(self, data: bytes) -> Iterator[Frame]:
169         """Decodes and yields HDLC frames, including corrupt frames.
170
171         The ok() method on Frame indicates whether it is valid or represents a
172         frame parsing error.
173
174         Yields:
175           Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
176         """
177         for byte in data:
178             frame = self._process_byte(byte)
179             if frame:
180                 yield frame
181
182     def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
183         """Decodes and yields valid HDLC frames, logging any errors."""
184         for frame in self.process(data):
185             if frame.ok():
186                 yield frame
187             else:
188                 _LOG.warning('Failed to decode frame: %s; discarded %d bytes',
189                              frame.status.value, len(frame.data))
190                 _LOG.debug('Discarded data: %s', frame.data)
191
192     def _process_byte(self, byte: int) -> Optional[Frame]:
193         if byte == protocol.FLAG:
194             self._state, frame = self._state.handle_flag()
195             return frame
196
197         if byte == protocol.ESCAPE:
198             self._state = self._state.handle_escape()
199         else:
200             self._state = self._state.handle_byte(byte)
201
202         return None