1 # Copyright 2020 The Pigweed Authors
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
7 # https://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
14 """Decoder class for decoding bytes using HDLC protocol"""
16 from dataclasses import dataclass
19 from typing import Iterator, Optional
22 from pw_hdlc import protocol
24 _LOG = logging.getLogger('pw_hdlc')
27 _MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32
30 class FrameStatus(enum.Enum):
31 """Indicates that an error occurred."""
33 FCS_MISMATCH = 'frame check sequence failure'
34 FRAMING_ERROR = 'invalid flag or escape characters'
37 @dataclass(frozen=True)
39 """Represents an HDLC frame."""
41 # The complete HDLC-encoded frame, excluding HDLC flag characters.
44 # The complete decoded frame (address, control, information, FCS).
47 # Whether parsing the frame succeeded.
48 status: FrameStatus = FrameStatus.OK
51 def address(self) -> int:
52 """The frame's address field (assumes only one byte for now)."""
53 return self.raw_decoded[0] if self.ok() else NO_ADDRESS
56 def control(self) -> bytes:
57 """The control byte (assumes only one byte for now)."""
58 return self.raw_decoded[1:2] if self.ok() else b''
61 def data(self) -> bytes:
62 """The information field in the frame."""
63 return self.raw_decoded[2:-4] if self.ok() else b''
66 """True if this represents a valid frame.
68 If false, then parsing failed. The status is set to indicate what type
69 of error occurred, and the data field contains all bytes parsed from the
70 frame (including bytes parsed as address or control bytes).
72 return self.status is FrameStatus.OK
74 def __repr__(self) -> str:
76 body = (f'address={self.address}, control={self.control!r}, '
77 f'data={self.data!r}')
79 body = str(self.status)
81 return f'{type(self).__name__}({body})'
84 class _State(enum.Enum):
90 def _check_frame(frame_data: bytes) -> FrameStatus:
91 if len(frame_data) < _MIN_FRAME_SIZE:
92 return FrameStatus.FRAMING_ERROR
94 frame_crc = int.from_bytes(frame_data[-4:], 'little')
95 if zlib.crc32(frame_data[:-4]) != frame_crc:
96 return FrameStatus.FCS_MISMATCH
102 """Decodes one or more HDLC frames from a stream of data."""
104 self._decoded_data = bytearray()
105 self._raw_data = bytearray()
106 self._state = _State.INTERFRAME
108 def process(self, data: bytes) -> Iterator[Frame]:
109 """Decodes and yields HDLC frames, including corrupt frames.
111 The ok() method on Frame indicates whether it is valid or represents a
115 Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
118 frame = self._process_byte(byte)
122 def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
123 """Decodes and yields valid HDLC frames, logging any errors."""
124 for frame in self.process(data):
128 _LOG.warning('Failed to decode frame: %s; discarded %d bytes',
129 frame.status.value, len(frame.raw_encoded))
130 _LOG.debug('Discarded data: %s', frame.raw_encoded)
132 def _finish_frame(self, status: FrameStatus) -> Frame:
133 frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
134 self._raw_data.clear()
135 self._decoded_data.clear()
138 def _process_byte(self, byte: int) -> Optional[Frame]:
139 frame: Optional[Frame] = None
141 # Record every byte except the flag character.
142 if byte != protocol.FLAG:
143 self._raw_data.append(byte)
145 if self._state is _State.INTERFRAME:
146 if byte == protocol.FLAG:
148 frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
150 self._state = _State.FRAME
151 elif self._state is _State.FRAME:
152 if byte == protocol.FLAG:
154 frame = self._finish_frame(_check_frame(
157 self._state = _State.FRAME
158 elif byte == protocol.ESCAPE:
159 self._state = _State.FRAME_ESCAPE
161 self._decoded_data.append(byte)
162 elif self._state is _State.FRAME_ESCAPE:
163 if byte == protocol.FLAG:
164 frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
165 self._state = _State.FRAME
166 elif byte in protocol.VALID_ESCAPED_BYTES:
167 self._state = _State.FRAME
168 self._decoded_data.append(protocol.escape(byte))
170 self._state = _State.INTERFRAME
172 raise AssertionError(f'Invalid decoder state: {self._state}')