Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_hdlc / py / pw_hdlc / rpc_console.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Console for interacting with pw_rpc over HDLC.
15
16 To start the console, provide a serial port as the --device argument and paths
17 or globs for .proto files that define the RPC services to support:
18
19   python -m pw_hdlc.rpc_console --device /dev/ttyUSB0 sample.proto
20
21 This starts an IPython console for communicating with the connected device. A
22 few variables are predefined in the interactive console. These include:
23
24     rpcs   - used to invoke RPCs
25     device - the serial device used for communication
26     client - the pw_rpc.Client
27     protos - protocol buffer messages indexed by proto package
28
29 An example echo RPC command:
30
31   rpcs.pw.rpc.EchoService.Echo(msg="hello!")
32 """
33
34 import argparse
35 import glob
36 import logging
37 from pathlib import Path
38 import sys
39 from typing import Any, Collection, Iterable, Iterator
40 import socket
41
42 import IPython  # type: ignore
43 import serial  # type: ignore
44
45 from pw_hdlc.rpc import HdlcRpcClient, default_channels, write_to_file
46
47 _LOG = logging.getLogger(__name__)
48
49 PW_RPC_MAX_PACKET_SIZE = 256
50 SOCKET_SERVER = 'localhost'
51 SOCKET_PORT = 33000
52 MKFIFO_MODE = 0o666
53
54
55 def _parse_args():
56     """Parses and returns the command line arguments."""
57     parser = argparse.ArgumentParser(description=__doc__)
58     group = parser.add_mutually_exclusive_group(required=True)
59     group.add_argument('-d', '--device', help='the serial port to use')
60     parser.add_argument('-b',
61                         '--baudrate',
62                         type=int,
63                         default=115200,
64                         help='the baud rate to use')
65     parser.add_argument(
66         '-o',
67         '--output',
68         type=argparse.FileType('wb'),
69         default=sys.stdout.buffer,
70         help=('The file to which to write device output (HDLC channel 1); '
71               'provide - or omit for stdout.'))
72     group.add_argument('-s',
73                        '--socket-addr',
74                        type=str,
75                        help='use socket to connect to server, type default for\
76             localhost:33000, or manually input the server address:port')
77     parser.add_argument('proto_globs',
78                         nargs='+',
79                         help='glob pattern for .proto files')
80     return parser.parse_args()
81
82
83 def _expand_globs(globs: Iterable[str]) -> Iterator[Path]:
84     for pattern in globs:
85         for file in glob.glob(pattern, recursive=True):
86             yield Path(file)
87
88
89 def _start_ipython_terminal(client: HdlcRpcClient) -> None:
90     """Starts an interactive IPython terminal with preset variables."""
91     local_variables = dict(
92         client=client,
93         channel_client=client.client.channel(1),
94         rpcs=client.client.channel(1).rpcs,
95         protos=client.protos.packages,
96     )
97
98     print(__doc__)  # Print the banner
99     IPython.terminal.embed.InteractiveShellEmbed().mainloop(
100         local_ns=local_variables, module=argparse.Namespace())
101
102
103 class SocketClientImpl:
104     def __init__(self, config: str):
105         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106         socket_server = ''
107         socket_port = 0
108
109         if config == 'default':
110             socket_server = SOCKET_SERVER
111             socket_port = SOCKET_PORT
112         else:
113             socket_server, socket_port_str = config.split(':')
114             socket_port = int(socket_port_str)
115         self.socket.connect((socket_server, socket_port))
116
117     def write(self, data: bytes):
118         self.socket.sendall(data)
119
120     def read(self, num_bytes: int = PW_RPC_MAX_PACKET_SIZE):
121         return self.socket.recv(num_bytes)
122
123
124 def console(device: str, baudrate: int, proto_globs: Collection[str],
125             socket_addr: str, output: Any) -> int:
126     """Starts an interactive RPC console for HDLC."""
127     # argparse.FileType doesn't correctly handle '-' for binary files.
128     if output is sys.stdout:
129         output = sys.stdout.buffer
130
131     if not proto_globs:
132         proto_globs = ['**/*.proto']
133
134     protos = list(_expand_globs(proto_globs))
135
136     if not protos:
137         _LOG.critical('No .proto files were found with %s',
138                       ', '.join(proto_globs))
139         _LOG.critical('At least one .proto file is required')
140         return 1
141
142     _LOG.debug('Found %d .proto files found with %s', len(protos),
143                ', '.join(proto_globs))
144
145     if socket_addr is None:
146         serial_device = serial.Serial(device, baudrate, timeout=1)
147         read = lambda: serial_device.read(8192)
148         write = serial_device.write
149     else:
150         try:
151             socket_device = SocketClientImpl(socket_addr)
152             read = socket_device.read
153             write = socket_device.write
154         except ValueError:
155             _LOG.exception('Failed to initialize socket at %s', socket_addr)
156             return 1
157
158     _start_ipython_terminal(
159         HdlcRpcClient(read, protos, default_channels(write),
160                       lambda data: write_to_file(data, output)))
161     return 0
162
163
164 def main() -> int:
165     return console(**vars(_parse_args()))
166
167
168 if __name__ == '__main__':
169     sys.exit(main())