Change script for apply upstream code
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_hdlc_lite / py / pw_hdlc_lite / rpc.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Utilities for using HDLC with pw_rpc."""
15
16 import logging
17 import os
18 from pathlib import Path
19 import sys
20 import threading
21 import time
22 from types import ModuleType
23 from typing import Any, BinaryIO, Callable, Iterable, List, NoReturn, Union
24
25 from pw_hdlc_lite.decode import FrameDecoder
26 from pw_hdlc_lite import encode
27 import pw_rpc
28 from pw_rpc import callback_client
29 from pw_protobuf_compiler import python_protos
30
31 _LOG = logging.getLogger(__name__)
32
33 STDOUT_ADDRESS = 1
34 DEFAULT_ADDRESS = ord('R')
35
36
37 def channel_output(writer: Callable[[bytes], Any],
38                    address: int = DEFAULT_ADDRESS,
39                    delay_s: float = 0) -> Callable[[bytes], None]:
40     """Returns a function that can be used as a channel output for pw_rpc."""
41
42     if delay_s:
43
44         def slow_write(data: bytes) -> None:
45             """Slows down writes in case unbuffered serial is in use."""
46             for byte in data:
47                 time.sleep(delay_s)
48                 writer(bytes([byte]))
49
50         return lambda data: slow_write(encode.information_frame(address, data))
51
52     return lambda data: writer(encode.information_frame(address, data))
53
54
55 def read_and_process_data(rpc_client: pw_rpc.Client,
56                           device: BinaryIO,
57                           output: BinaryIO,
58                           output_sep: bytes = os.linesep.encode(),
59                           rpc_address: int = DEFAULT_ADDRESS) -> NoReturn:
60     """Reads HDLC frames from the device and passes them to the RPC client."""
61     decoder = FrameDecoder()
62
63     while True:
64         byte = device.read()
65         for frame in decoder.process_valid_frames(byte):
66             if not frame.ok():
67                 _LOG.error('Failed to parse frame: %s', frame.status.value)
68                 continue
69
70             if frame.address == rpc_address:
71                 if not rpc_client.process_packet(frame.data):
72                     _LOG.error('Packet not handled by RPC client: %s', frame)
73             elif frame.address == STDOUT_ADDRESS:
74                 output.write(frame.data)
75                 output.write(output_sep)
76                 output.flush()
77             else:
78                 _LOG.error('Unhandled frame for address %d: %s', frame.address,
79                            frame.data.decoder(errors='replace'))
80
81
82 _PathOrModule = Union[str, Path, ModuleType]
83
84
85 class HdlcRpcClient:
86     """An RPC client configured to run over HDLC."""
87     def __init__(self,
88                  device: BinaryIO,
89                  proto_paths_or_modules: Iterable[_PathOrModule],
90                  output: BinaryIO = sys.stdout.buffer,
91                  channels: Iterable[pw_rpc.Channel] = None,
92                  client_impl: pw_rpc.client.ClientImpl = None):
93         """Creates an RPC client configured to communicate using HDLC.
94
95         Args:
96           device: serial.Serial (or any BinaryIO class) for reading/writing data
97           proto_paths_or_modules: paths to .proto files or proto modules
98           output: where to write "stdout" output from the device
99         """
100         self.device = device
101
102         proto_modules = []
103         proto_paths: List[Union[Path, str]] = []
104         for proto in proto_paths_or_modules:
105             if isinstance(proto, (Path, str)):
106                 proto_paths.append(proto)
107             else:
108                 proto_modules.append(proto)
109
110         proto_modules += python_protos.compile_and_import(proto_paths)
111
112         if channels is None:
113             channels = [pw_rpc.Channel(1, channel_output(device.write))]
114
115         if client_impl is None:
116             client_impl = callback_client.Impl()
117
118         self.client = pw_rpc.Client.from_modules(client_impl, channels,
119                                                  proto_modules)
120
121         # Start background thread that reads and processes RPC packets.
122         threading.Thread(target=read_and_process_data,
123                          daemon=True,
124                          args=(self.client, device, output)).start()
125
126     def rpcs(self, channel_id: int = None) -> pw_rpc.client.Services:
127         """Returns object for accessing services on the specified channel.
128
129         This skips some intermediate layers to make it simpler to invoke RPCs
130         from an HdlcRpcClient. If only one channel is in use, the channel ID is
131         not necessary.
132         """
133         if channel_id is None:
134             return next(iter(self.client.channels())).rpcs
135
136         return self.client.channel(channel_id).rpcs