1 # Copyright 2020 The Pigweed Authors
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
7 # https://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
14 """Utilities for using HDLC with pw_rpc."""
18 from pathlib import Path
22 from types import ModuleType
23 from typing import Any, BinaryIO, Callable, Iterable, List, NoReturn, Union
25 from pw_hdlc_lite.decode import FrameDecoder
26 from pw_hdlc_lite import encode
28 from pw_rpc import callback_client
29 from pw_protobuf_compiler import python_protos
31 _LOG = logging.getLogger(__name__)
34 DEFAULT_ADDRESS = ord('R')
37 def channel_output(writer: Callable[[bytes], Any],
38 address: int = DEFAULT_ADDRESS,
39 delay_s: float = 0) -> Callable[[bytes], None]:
40 """Returns a function that can be used as a channel output for pw_rpc."""
44 def slow_write(data: bytes) -> None:
45 """Slows down writes in case unbuffered serial is in use."""
50 return lambda data: slow_write(encode.information_frame(address, data))
52 return lambda data: writer(encode.information_frame(address, data))
55 def read_and_process_data(rpc_client: pw_rpc.Client,
58 output_sep: bytes = os.linesep.encode(),
59 rpc_address: int = DEFAULT_ADDRESS) -> NoReturn:
60 """Reads HDLC frames from the device and passes them to the RPC client."""
61 decoder = FrameDecoder()
65 for frame in decoder.process_valid_frames(byte):
67 _LOG.error('Failed to parse frame: %s', frame.status.value)
70 if frame.address == rpc_address:
71 if not rpc_client.process_packet(frame.data):
72 _LOG.error('Packet not handled by RPC client: %s', frame)
73 elif frame.address == STDOUT_ADDRESS:
74 output.write(frame.data)
75 output.write(output_sep)
78 _LOG.error('Unhandled frame for address %d: %s', frame.address,
79 frame.data.decoder(errors='replace'))
82 _PathOrModule = Union[str, Path, ModuleType]
86 """An RPC client configured to run over HDLC."""
89 proto_paths_or_modules: Iterable[_PathOrModule],
90 output: BinaryIO = sys.stdout.buffer,
91 channels: Iterable[pw_rpc.Channel] = None,
92 client_impl: pw_rpc.client.ClientImpl = None):
93 """Creates an RPC client configured to communicate using HDLC.
96 device: serial.Serial (or any BinaryIO class) for reading/writing data
97 proto_paths_or_modules: paths to .proto files or proto modules
98 output: where to write "stdout" output from the device
103 proto_paths: List[Union[Path, str]] = []
104 for proto in proto_paths_or_modules:
105 if isinstance(proto, (Path, str)):
106 proto_paths.append(proto)
108 proto_modules.append(proto)
110 proto_modules += python_protos.compile_and_import(proto_paths)
113 channels = [pw_rpc.Channel(1, channel_output(device.write))]
115 if client_impl is None:
116 client_impl = callback_client.Impl()
118 self.client = pw_rpc.Client.from_modules(client_impl, channels,
121 # Start background thread that reads and processes RPC packets.
122 threading.Thread(target=read_and_process_data,
124 args=(self.client, device, output)).start()
126 def rpcs(self, channel_id: int = None) -> pw_rpc.client.Services:
127 """Returns object for accessing services on the specified channel.
129 This skips some intermediate layers to make it simpler to invoke RPCs
130 from an HdlcRpcClient. If only one channel is in use, the channel ID is
133 if channel_id is None:
134 return next(iter(self.client.channels())).rpcs
136 return self.client.channel(channel_id).rpcs