1 # Copyright 2020 The Pigweed Authors
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
7 # https://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
14 """Decoder class for decoding bytes using HDLC-Lite protocol"""
18 from typing import Iterator, NamedTuple, Optional, Tuple
21 from pw_hdlc_lite import protocol
23 _LOG = logging.getLogger('pw_hdlc_lite')
26 class FrameStatus(enum.Enum):
27 """Indicates that an error occurred."""
29 FCS_MISMATCH = 'frame check sequence failure'
30 INCOMPLETE = 'incomplete frame'
31 INVALID_ESCAPE = 'invalid escape character'
34 _MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32
39 class Frame(NamedTuple):
40 """Represents an HDLC frame."""
42 # All bytes in the frame (address, control, information, FCS)
45 # Whether parsing the frame succeeded.
46 status: FrameStatus = FrameStatus.OK
49 def address(self) -> int:
50 """The frame's address field (assumes only one byte for now)."""
51 return self.raw[0] if self.raw else NO_ADDRESS
54 def control(self) -> bytes:
55 """The control byte (assumes only one byte for now)."""
56 return self.raw[1:2] if len(self.raw) >= 2 else b''
59 def data(self) -> bytes:
60 """The information field in the frame."""
61 return self.raw[2:-4] if len(self.raw) >= _MIN_FRAME_SIZE else b''
64 """True if this represents a valid frame.
66 If false, then parsing failed. The status is set to indicate what type
67 of error occurred, and the data field contains all bytes parsed from the
68 frame (including bytes parsed as address or control bytes).
70 return self.status is FrameStatus.OK
73 class _BaseFrameState:
74 """Base class for all frame parsing states."""
75 def __init__(self, data: bytearray):
76 self._data = data # All data seen in the current frame
77 self._escape_next = False
79 def handle_flag(self) -> Tuple['_BaseFrameState', Optional[Frame]]:
80 """Handles an HDLC flag character (0x7e).
82 The HDLC flag is always interpreted as the start of a new frame.
85 (next state, optional frame or error)
87 # If there is data or an escape character, the frame is incomplete.
88 if self._escape_next or self._data:
89 return _AddressState(), Frame(bytes(self._data),
90 FrameStatus.INCOMPLETE)
92 return _AddressState(), None
94 def handle_escape(self) -> '_BaseFrameState':
95 """Handles an HDLC escape character (0x7d); returns the next state."""
97 # If two escapes occur in a row, the frame is invalid.
98 return _InterframeState(self._data, FrameStatus.INVALID_ESCAPE)
100 self._escape_next = True
103 def handle_byte(self, byte: int) -> '_BaseFrameState':
104 """Handles a byte, which may have been escaped; returns next state."""
105 self._data.append(protocol.escape(byte) if self._escape_next else byte)
106 self._escape_next = False
110 class _InterframeState(_BaseFrameState):
111 """Not currently in a frame; any data is discarded."""
112 def __init__(self, data: bytearray, error: FrameStatus):
113 super().__init__(data)
116 def handle_flag(self) -> Tuple[_BaseFrameState, Optional[Frame]]:
117 # If this state was entered due to an error, report that error before
118 # starting a new frame.
119 if self._error is not FrameStatus.OK:
120 return _AddressState(), Frame(bytes(self._data), self._error)
122 return super().handle_flag()
125 class _AddressState(_BaseFrameState):
126 """First field in a frame: the address."""
128 super().__init__(bytearray())
130 def handle_byte(self, byte: int) -> _BaseFrameState:
131 super().handle_byte(byte)
132 # Only handle single-byte addresses for now.
133 return _ControlState(self._data)
136 class _ControlState(_BaseFrameState):
137 """Second field in a frame: control."""
138 def handle_byte(self, byte: int) -> _BaseFrameState:
139 super().handle_byte(byte)
140 # Only handle a single control byte for now.
141 return _DataState(self._data)
144 class _DataState(_BaseFrameState):
145 """The information field in a frame."""
146 def handle_flag(self) -> Tuple[_BaseFrameState, Frame]:
147 return _AddressState(), Frame(bytes(self._data), self._check_frame())
149 def _check_frame(self) -> FrameStatus:
150 # If the last character was an escape, assume bytes are missing.
151 if self._escape_next or len(self._data) < _MIN_FRAME_SIZE:
152 return FrameStatus.INCOMPLETE
154 frame_crc = int.from_bytes(self._data[-4:], 'little')
155 if zlib.crc32(self._data[:-4]) != frame_crc:
156 return FrameStatus.FCS_MISMATCH
158 return FrameStatus.OK
162 """Decodes one or more HDLC frames from a stream of data."""
164 self._data = bytearray()
165 self._unescape_next_byte_flag = False
166 self._state = _InterframeState(bytearray(), FrameStatus.OK)
168 def process(self, data: bytes) -> Iterator[Frame]:
169 """Decodes and yields HDLC frames, including corrupt frames.
171 The ok() method on Frame indicates whether it is valid or represents a
175 Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
178 frame = self._process_byte(byte)
182 def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
183 """Decodes and yields valid HDLC frames, logging any errors."""
184 for frame in self.process(data):
188 _LOG.warning('Failed to decode frame: %s; discarded %d bytes',
189 frame.status.value, len(frame.data))
190 _LOG.debug('Discarded data: %s', frame.data)
192 def _process_byte(self, byte: int) -> Optional[Frame]:
193 if byte == protocol.FLAG:
194 self._state, frame = self._state.handle_flag()
197 if byte == protocol.ESCAPE:
198 self._state = self._state.handle_escape()
200 self._state = self._state.handle_byte(byte)