1 // Copyright 2020 The Pigweed Authors
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
7 // https://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
19 #include "pw_assert/light.h"
20 #include "pw_containers/vector.h"
21 #include "pw_preprocessor/arguments.h"
22 #include "pw_rpc/channel.h"
23 #include "pw_rpc/internal/hash.h"
24 #include "pw_rpc/internal/method_lookup.h"
25 #include "pw_rpc/internal/nanopb_method.h"
26 #include "pw_rpc/internal/packet.h"
27 #include "pw_rpc/internal/server.h"
31 // Declares a context object that may be used to invoke an RPC. The context is
32 // declared with the name of the implemented service and the method to invoke.
33 // The RPC can then be invoked with the call method.
35 // For a unary RPC, context.call(request) returns the status, and the response
36 // struct can be accessed via context.response().
38 // PW_NANOPB_TEST_METHOD_CONTEXT(my::CoolService, TheMethod) context;
39 // EXPECT_EQ(OkStatus(), context.call({.some_arg = 123}));
40 // EXPECT_EQ(500, context.response().some_response_value);
42 // For a server streaming RPC, context.call(request) invokes the method. As in a
43 // normal RPC, the method completes when the ServerWriter's Finish method is
44 // called (or it goes out of scope).
46 // PW_NANOPB_TEST_METHOD_CONTEXT(my::CoolService, TheStreamingMethod) context;
47 // context.call({.some_arg = 123});
49 // EXPECT_TRUE(context.done()); // Check that the RPC completed
50 // EXPECT_EQ(OkStatus(), context.status()); // Check the status
52 // EXPECT_EQ(3u, context.responses().size());
53 // EXPECT_EQ(123, context.responses()[0].value); // check individual responses
55 // for (const MyResponse& response : context.responses()) {
56 // // iterate over the responses
59 // PW_NANOPB_TEST_METHOD_CONTEXT forwards its constructor arguments to the
60 // underlying serivce. For example:
62 // PW_NANOPB_TEST_METHOD_CONTEXT(MyService, Go) context(service, args);
64 // PW_NANOPB_TEST_METHOD_CONTEXT takes two optional arguments:
66 // size_t max_responses: maximum responses to store; ignored unless streaming
67 // size_t output_size_bytes: buffer size; must be large enough for a packet
71 // PW_NANOPB_TEST_METHOD_CONTEXT(MyService, BestMethod, 3, 256) context;
72 // ASSERT_EQ(3u, context.responses().max_size());
74 #define PW_NANOPB_TEST_METHOD_CONTEXT(service, method, ...) \
75 ::pw::rpc::NanopbTestMethodContext<service, \
77 ::pw::rpc::internal::Hash(#method), \
79 template <typename Service,
82 size_t max_responses = 4,
83 size_t output_size_bytes = 128>
84 class NanopbTestMethodContext;
86 // Internal classes that implement NanopbTestMethodContext.
87 namespace internal::test::nanopb {
89 // A ChannelOutput implementation that stores the outgoing payloads and status.
90 template <typename Response>
91 class MessageOutput final : public ChannelOutput {
93 MessageOutput(const internal::NanopbMethod& method,
94 Vector<Response>& responses,
95 std::span<std::byte> buffer)
96 : ChannelOutput("internal::test::nanopb::MessageOutput"),
98 responses_(responses),
103 Status last_status() const { return last_status_; }
104 void set_last_status(Status status) { last_status_ = status; }
106 size_t total_responses() const { return total_responses_; }
108 bool stream_ended() const { return stream_ended_; }
113 std::span<std::byte> AcquireBuffer() override { return buffer_; }
115 Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
117 const internal::NanopbMethod& method_;
118 Vector<Response>& responses_;
119 std::span<std::byte> buffer_;
120 size_t total_responses_;
125 // Collects everything needed to invoke a particular RPC.
126 template <typename Service,
129 size_t max_responses,
131 struct InvocationContext {
132 using Request = internal::Request<method>;
133 using Response = internal::Response<method>;
135 template <typename... Args>
136 InvocationContext(Args&&... args)
137 : output(MethodLookup::GetNanopbMethod<Service, method_id>(),
140 channel(Channel::Create<123>(&output)),
141 server(std::span(&channel, 1)),
142 service(std::forward<Args>(args)...),
143 call(static_cast<internal::Server&>(server),
144 static_cast<internal::Channel&>(channel),
146 MethodLookup::GetNanopbMethod<Service, method_id>()) {}
148 MessageOutput<Response> output;
150 rpc::Channel channel;
153 Vector<Response, max_responses> responses;
154 std::array<std::byte, output_size> buffer = {};
156 internal::ServerCall call;
159 // Method invocation context for a unary RPC. Returns the status in call() and
160 // provides the response through the response() method.
161 template <typename Service, auto method, uint32_t method_id, size_t output_size>
164 InvocationContext<Service, method, method_id, 1, output_size> ctx_;
167 using Request = typename decltype(ctx_)::Request;
168 using Response = typename decltype(ctx_)::Response;
170 template <typename... Args>
171 UnaryContext(Args&&... args) : ctx_(std::forward<Args>(args)...) {}
173 Service& service() { return ctx_.service; }
175 // Invokes the RPC with the provided request. Returns the status.
176 Status call(const Request& request) {
178 ctx_.responses.emplace_back();
179 ctx_.responses.back() = {};
180 return CallMethodImplFunction<method>(
181 ctx_.call, request, ctx_.responses.back());
184 // Gives access to the RPC's response.
185 const Response& response() const {
186 PW_ASSERT(ctx_.responses.size() > 0u);
187 return ctx_.responses.back();
191 // Method invocation context for a server streaming RPC.
192 template <typename Service,
195 size_t max_responses,
197 class ServerStreamingContext {
199 InvocationContext<Service, method, method_id, max_responses, output_size>
203 using Request = typename decltype(ctx_)::Request;
204 using Response = typename decltype(ctx_)::Response;
206 template <typename... Args>
207 ServerStreamingContext(Args&&... args) : ctx_(std::forward<Args>(args)...) {}
209 Service& service() { return ctx_.service; }
211 // Invokes the RPC with the provided request.
212 void call(const Request& request) {
214 internal::BaseServerWriter server_writer(ctx_.call);
215 return CallMethodImplFunction<method>(
218 static_cast<ServerWriter<Response>&>(server_writer));
221 // Returns a server writer which writes responses into the context's buffer.
222 // This should not be called alongside call(); use one or the other.
223 ServerWriter<Response> writer() {
225 internal::BaseServerWriter server_writer(ctx_.call);
226 return std::move(static_cast<ServerWriter<Response>&>(server_writer));
229 // Returns the responses that have been recorded. The maximum number of
230 // responses is responses().max_size(). responses().back() is always the most
231 // recent response, even if total_responses() > responses().max_size().
232 const Vector<Response>& responses() const { return ctx_.responses; }
234 // The total number of responses sent, which may be larger than
235 // responses.max_size().
236 size_t total_responses() const { return ctx_.output.total_responses(); }
238 // True if the stream has terminated.
239 bool done() const { return ctx_.output.stream_ended(); }
241 // The status of the stream. Only valid if done() is true.
242 Status status() const {
244 return ctx_.output.last_status();
248 // Alias to select the type of the context object to use based on which type of
250 template <typename Service,
255 using Context = std::tuple_element_t<
256 static_cast<size_t>(internal::MethodTraits<decltype(method)>::kType),
257 std::tuple<UnaryContext<Service, method, method_id, output_size>,
258 ServerStreamingContext<Service,
263 // TODO(hepler): Support client and bidi streaming
266 template <typename Response>
267 void MessageOutput<Response>::clear() {
269 total_responses_ = 0;
270 stream_ended_ = false;
271 last_status_ = Status::Unknown();
274 template <typename Response>
275 Status MessageOutput<Response>::SendAndReleaseBuffer(
276 std::span<const std::byte> buffer) {
277 PW_ASSERT(!stream_ended_);
278 PW_ASSERT(buffer.data() == buffer_.data());
280 if (buffer.empty()) {
284 Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
285 PW_ASSERT(result.ok());
287 last_status_ = result.value().status();
289 switch (result.value().type()) {
290 case internal::PacketType::RESPONSE:
291 // If we run out of space, the back message is always the most recent.
292 responses_.emplace_back();
293 responses_.back() = {};
295 method_.DecodeResponse(result.value().payload(), &responses_.back()));
296 total_responses_ += 1;
298 case internal::PacketType::SERVER_STREAM_END:
299 stream_ended_ = true;
302 PW_CRASH("Unhandled PacketType");
307 } // namespace internal::test::nanopb
309 template <typename Service,
312 size_t max_responses,
313 size_t output_size_bytes>
314 class NanopbTestMethodContext
315 : public internal::test::nanopb::Context<Service,
321 // Forwards constructor arguments to the service class.
322 template <typename... ServiceArgs>
323 NanopbTestMethodContext(ServiceArgs&&... service_args)
324 : internal::test::nanopb::Context<Service,
329 std::forward<ServiceArgs>(service_args)...) {}
332 } // namespace pw::rpc