Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_rpc / py / pw_rpc / codegen_nanopb.py
1 # Copyright 2020 The Pigweed Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 # use this file except in compliance with the License. You may obtain a copy of
5 # the License at
6 #
7 #     https://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations under
13 # the License.
14 """This module generates the code for nanopb-based pw_rpc services."""
15
16 import os
17 from typing import Iterable, Iterator
18
19 from pw_protobuf.output_file import OutputFile
20 from pw_protobuf.proto_tree import ProtoNode, ProtoService, ProtoServiceMethod
21 from pw_protobuf.proto_tree import build_node_tree
22 from pw_rpc import codegen
23 from pw_rpc.codegen import RPC_NAMESPACE
24 import pw_rpc.ids
25
26 PROTO_H_EXTENSION = '.pb.h'
27 PROTO_CC_EXTENSION = '.pb.cc'
28 NANOPB_H_EXTENSION = '.pb.h'
29
30
31 def _proto_filename_to_nanopb_header(proto_file: str) -> str:
32     """Returns the generated nanopb header name for a .proto file."""
33     return os.path.splitext(proto_file)[0] + NANOPB_H_EXTENSION
34
35
36 def _proto_filename_to_generated_header(proto_file: str) -> str:
37     """Returns the generated C++ RPC header name for a .proto file."""
38     filename = os.path.splitext(proto_file)[0]
39     return f'{filename}.rpc{PROTO_H_EXTENSION}'
40
41
42 def _generate_method_descriptor(method: ProtoServiceMethod, method_id: int,
43                                 output: OutputFile) -> None:
44     """Generates a nanopb method descriptor for an RPC method."""
45
46     req_fields = f'{method.request_type().nanopb_name()}_fields'
47     res_fields = f'{method.response_type().nanopb_name()}_fields'
48     impl_method = f'&Implementation::{method.name()}'
49
50     output.write_line(
51         f'{RPC_NAMESPACE}::internal::GetNanopbOrRawMethodFor<{impl_method}, '
52         f'{method.type().cc_enum()}>(')
53     with output.indent(4):
54         output.write_line(f'0x{method_id:08x},  // Hash of "{method.name()}"')
55         output.write_line(f'{req_fields},')
56         output.write_line(f'{res_fields}),')
57
58
59 def _generate_server_writer_alias(output: OutputFile) -> None:
60     output.write_line('template <typename T>')
61     output.write_line('using ServerWriter = ::pw::rpc::ServerWriter<T>;')
62
63
64 def _generate_code_for_service(service: ProtoService, root: ProtoNode,
65                                output: OutputFile) -> None:
66     """Generates a C++ derived class for a nanopb RPC service."""
67     codegen.service_class(service, root, output, _generate_server_writer_alias,
68                           'NanopbMethodUnion', _generate_method_descriptor)
69
70
71 def _generate_code_for_client_method(method: ProtoServiceMethod,
72                                      output: OutputFile) -> None:
73     """Outputs client code for a single RPC method."""
74
75     req = method.request_type().nanopb_name()
76     res = method.response_type().nanopb_name()
77     method_id = pw_rpc.ids.calculate(method.name())
78
79     if method.type() == ProtoServiceMethod.Type.UNARY:
80         callback = f'{RPC_NAMESPACE}::UnaryResponseHandler<{res}>'
81     elif method.type() == ProtoServiceMethod.Type.SERVER_STREAMING:
82         callback = f'{RPC_NAMESPACE}::ServerStreamingResponseHandler<{res}>'
83     else:
84         raise NotImplementedError(
85             'Only unary and server streaming RPCs are currently supported')
86
87     output.write_line()
88     output.write_line(f'static NanopbClientCall<\n    {callback}>')
89     output.write_line(f'{method.name()}({RPC_NAMESPACE}::Channel& channel,')
90     with output.indent(len(method.name()) + 1):
91         output.write_line(f'const {req}& request,')
92         output.write_line(f'{callback}& callback) {{')
93
94     with output.indent():
95         output.write_line(f'NanopbClientCall<{callback}>')
96         output.write_line('    call(&channel,')
97         with output.indent(9):
98             output.write_line('kServiceId,')
99             output.write_line(
100                 f'0x{method_id:08x},  // Hash of "{method.name()}"')
101             output.write_line('callback,')
102             output.write_line(f'{req}_fields,')
103             output.write_line(f'{res}_fields);')
104         output.write_line('call.SendRequest(&request);')
105         output.write_line('return call;')
106
107     output.write_line('}')
108
109
110 def _generate_code_for_client(service: ProtoService, root: ProtoNode,
111                               output: OutputFile) -> None:
112     """Outputs client code for an RPC service."""
113
114     output.write_line('namespace nanopb {')
115
116     class_name = f'{service.cpp_namespace(root)}Client'
117     output.write_line(f'\nclass {class_name} {{')
118     output.write_line(' public:')
119
120     with output.indent():
121         output.write_line('template <typename T>')
122         output.write_line(
123             f'using NanopbClientCall = {RPC_NAMESPACE}::NanopbClientCall<T>;')
124
125         output.write_line('')
126         output.write_line(f'{class_name}() = delete;')
127
128         for method in service.methods():
129             _generate_code_for_client_method(method, output)
130
131     service_name_hash = pw_rpc.ids.calculate(service.proto_path())
132     output.write_line('\n private:')
133
134     with output.indent():
135         output.write_line(f'// Hash of "{service.proto_path()}".')
136         output.write_line(
137             f'static constexpr uint32_t kServiceId = 0x{service_name_hash:08x};'
138         )
139
140     output.write_line('};')
141
142     output.write_line('\n}  // namespace nanopb\n')
143
144
145 def includes(proto_file, unused_package: ProtoNode) -> Iterator[str]:
146     yield '#include "pw_rpc/internal/nanopb_method_union.h"'
147     yield '#include "pw_rpc/nanopb_client_call.h"'
148
149     # Include the corresponding nanopb header file for this proto file, in which
150     # the file's messages and enums are generated. All other files imported from
151     # the .proto file are #included in there.
152     nanopb_header = _proto_filename_to_nanopb_header(proto_file.name)
153     yield f'#include "{nanopb_header}"'
154
155
156 def _generate_code_for_package(proto_file, package: ProtoNode,
157                                output: OutputFile) -> None:
158     """Generates code for a header file corresponding to a .proto file."""
159
160     codegen.package(proto_file, package, output, includes,
161                     _generate_code_for_service, _generate_code_for_client)
162
163
164 def _unary_stub(method: ProtoServiceMethod, output: OutputFile) -> None:
165     output.write_line(f'pw::Status {method.name()}(ServerContext&, '
166                       f'const {method.request_type().nanopb_name()}& request, '
167                       f'{method.response_type().nanopb_name()}& response) {{')
168
169     with output.indent():
170         output.write_line(codegen.STUB_REQUEST_TODO)
171         output.write_line('static_cast<void>(request);')
172         output.write_line(codegen.STUB_RESPONSE_TODO)
173         output.write_line('static_cast<void>(response);')
174         output.write_line('return pw::Status::Unimplemented();')
175
176     output.write_line('}')
177
178
179 def _server_streaming_stub(method: ProtoServiceMethod,
180                            output: OutputFile) -> None:
181     output.write_line(
182         f'void {method.name()}(ServerContext&, '
183         f'const {method.request_type().nanopb_name()}& request, '
184         f'ServerWriter<{method.response_type().nanopb_name()}>& writer) {{')
185
186     with output.indent():
187         output.write_line(codegen.STUB_REQUEST_TODO)
188         output.write_line('static_cast<void>(request);')
189         output.write_line(codegen.STUB_WRITER_TODO)
190         output.write_line('static_cast<void>(writer);')
191
192     output.write_line('}')
193
194
195 def process_proto_file(proto_file) -> Iterable[OutputFile]:
196     """Generates code for a single .proto file."""
197
198     _, package_root = build_node_tree(proto_file)
199     output_filename = _proto_filename_to_generated_header(proto_file.name)
200     output_file = OutputFile(output_filename)
201     _generate_code_for_package(proto_file, package_root, output_file)
202
203     output_file.write_line()
204     codegen.package_stubs(package_root, output_file, _unary_stub,
205                           _server_streaming_stub)
206
207     return [output_file]