Change script for apply upstream code
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_hdlc_lite / py / pw_hdlc_lite / rpc_console.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """Console for interacting with pw_rpc over HDLC.
15
16 To start the console, provide a serial port as the --device argument and paths
17 or globs for .proto files that define the RPC services to support:
18
19   python -m pw_hdlc_lite.rpc_console --device /dev/ttyUSB0 sample.proto
20
21 This starts an IPython console for communicating with the connected device. A
22 few variables are predefined in the interactive console. These include:
23
24     rpcs   - used to invoke RPCs
25     device - the serial device used for communication
26     client - the pw_rpc.Client
27
28 An example echo RPC command:
29
30   rpcs.pw.rpc.EchoService.Echo(msg="hello!")
31 """
32
33 import argparse
34 import glob
35 import logging
36 from pathlib import Path
37 import sys
38 from typing import Collection, Iterable, Iterator, BinaryIO
39
40 import IPython
41 import serial
42
43 from pw_hdlc_lite.rpc import HdlcRpcClient
44
45 _LOG = logging.getLogger(__name__)
46
47
48 def _parse_args():
49     """Parses and returns the command line arguments."""
50     parser = argparse.ArgumentParser(description=__doc__)
51     parser.add_argument('-d',
52                         '--device',
53                         required=True,
54                         help='the serial port to use')
55     parser.add_argument('-b',
56                         '--baudrate',
57                         type=int,
58                         default=115200,
59                         help='the baud rate to use')
60     parser.add_argument(
61         '-o',
62         '--output',
63         type=argparse.FileType('wb'),
64         default=sys.stdout.buffer,
65         help=('The file to which to write device output (HDLC channel 1); '
66               'provide - or omit for stdout.'))
67     parser.add_argument('proto_globs',
68                         nargs='+',
69                         help='glob pattern for .proto files')
70     return parser.parse_args()
71
72
73 def _expand_globs(globs: Iterable[str]) -> Iterator[Path]:
74     for pattern in globs:
75         for file in glob.glob(pattern, recursive=True):
76             yield Path(file)
77
78
79 def _start_ipython_terminal(client: HdlcRpcClient) -> None:
80     """Starts an interactive IPython terminal with preset variables."""
81     local_variables = dict(
82         client=client,
83         channel_client=client.client.channel(1),
84         rpcs=client.client.channel(1).rpcs,
85     )
86
87     print(__doc__)  # Print the banner
88     IPython.terminal.embed.InteractiveShellEmbed().mainloop(
89         local_ns=local_variables, module=argparse.Namespace())
90
91
92 def console(device: str, baudrate: int, proto_globs: Collection[str],
93             output: BinaryIO) -> int:
94     """Starts an interactive RPC console for HDLC."""
95     # argparse.FileType doesn't correctly handle '-' for binary files.
96     if output is sys.stdout:
97         output = sys.stdout.buffer
98
99     if not proto_globs:
100         proto_globs = ['**/*.proto']
101
102     protos = list(_expand_globs(proto_globs))
103
104     if not protos:
105         _LOG.critical('No .proto files were found with %s',
106                       ', '.join(proto_globs))
107         _LOG.critical('At least one .proto file is required')
108         return 1
109
110     _LOG.debug('Found %d .proto files found with %s', len(protos),
111                ', '.join(proto_globs))
112
113     _start_ipython_terminal(
114         HdlcRpcClient(serial.Serial(device, baudrate), protos, output))
115     return 0
116
117
118 def main() -> int:
119     return console(**vars(_parse_args()))
120
121
122 if __name__ == '__main__':
123     sys.exit(main())