3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
24 #include <type_traits>
26 #include <grpcpp/impl/codegen/call.h>
27 #include <grpcpp/impl/codegen/call_op_set.h>
28 #include <grpcpp/impl/codegen/callback_common.h>
29 #include <grpcpp/impl/codegen/config.h>
30 #include <grpcpp/impl/codegen/core_codegen_interface.h>
31 #include <grpcpp/impl/codegen/message_allocator.h>
32 #include <grpcpp/impl/codegen/server_context.h>
33 #include <grpcpp/impl/codegen/server_interface.h>
34 #include <grpcpp/impl/codegen/status.h>
38 // Declare base class of all reactors as internal
41 // Forward declarations
42 template <class Request, class Response>
43 class CallbackClientStreamingHandler;
44 template <class Request, class Response>
45 class CallbackServerStreamingHandler;
46 template <class Request, class Response>
47 class CallbackBidiHandler;
51 virtual ~ServerReactor() = default;
52 virtual void OnDone() = 0;
53 virtual void OnCancel() = 0;
56 friend class ::grpc::ServerContext;
57 template <class Request, class Response>
58 friend class CallbackClientStreamingHandler;
59 template <class Request, class Response>
60 friend class CallbackServerStreamingHandler;
61 template <class Request, class Response>
62 friend class CallbackBidiHandler;
64 // The ServerReactor is responsible for tracking when it is safe to call
65 // OnCancel. This function should not be called until after OnStarted is done
66 // and the RPC has completed with a cancellation. This is tracked by counting
67 // how many of these conditions have been met and calling OnCancel when none
70 void MaybeCallOnCancel() {
71 if (on_cancel_conditions_remaining_.fetch_sub(
72 1, std::memory_order_acq_rel) == 1) {
77 std::atomic_int on_cancel_conditions_remaining_{2};
80 } // namespace internal
82 namespace experimental {
84 // Forward declarations
85 template <class Request, class Response>
86 class ServerReadReactor;
87 template <class Request, class Response>
88 class ServerWriteReactor;
89 template <class Request, class Response>
90 class ServerBidiReactor;
92 // For unary RPCs, the exposed controller class is only an interface
93 // and the actual implementation is an internal class.
94 class ServerCallbackRpcController {
96 virtual ~ServerCallbackRpcController() = default;
98 // The method handler must call this function when it is done so that
99 // the library knows to free its resources
100 virtual void Finish(Status s) = 0;
102 // Allow the method handler to push out the initial metadata before
103 // the response and status are ready
104 virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
106 /// SetCancelCallback passes in a callback to be called when the RPC is
107 /// canceled for whatever reason (streaming calls have OnCancel instead). This
108 /// is an advanced and uncommon use with several important restrictions. This
109 /// function may not be called more than once on the same RPC.
111 /// If code calls SetCancelCallback on an RPC, it must also call
112 /// ClearCancelCallback before calling Finish on the RPC controller. That
113 /// method makes sure that no cancellation callback is executed for this RPC
114 /// beyond the point of its return. ClearCancelCallback may be called even if
115 /// SetCancelCallback was not called for this RPC, and it may be called
116 /// multiple times. It _must_ be called if SetCancelCallback was called for
119 /// The callback should generally be lightweight and nonblocking and primarily
120 /// concerned with clearing application state related to the RPC or causing
121 /// operations (such as cancellations) to happen on dependent RPCs.
123 /// If the RPC is already canceled at the time that SetCancelCallback is
124 /// called, the callback is invoked immediately.
126 /// The cancellation callback may be executed concurrently with the method
127 /// handler that invokes it but will certainly not issue or execute after the
128 /// return of ClearCancelCallback. If ClearCancelCallback is invoked while the
129 /// callback is already executing, the callback will complete its execution
130 /// before ClearCancelCallback takes effect.
132 /// To preserve the orderings described above, the callback may be called
133 /// under a lock that is also used for ClearCancelCallback and
134 /// ServerContext::IsCancelled, so the callback CANNOT call either of those
135 /// operations on this RPC or any other function that causes those operations
136 /// to be called before the callback completes.
137 virtual void SetCancelCallback(std::function<void()> callback) = 0;
138 virtual void ClearCancelCallback() = 0;
140 // NOTE: This is an API for advanced users who need custom allocators.
141 // Optionally deallocate request early to reduce the size of working set.
142 // A custom MessageAllocator needs to be registered to make use of this.
143 virtual void FreeRequest() = 0;
144 // NOTE: This is an API for advanced users who need custom allocators.
145 // Get and maybe mutate the allocator state associated with the current RPC.
146 virtual void* GetAllocatorState() = 0;
149 // NOTE: The actual streaming object classes are provided
150 // as API only to support mocking. There are no implementations of
151 // these class interfaces in the API.
152 template <class Request>
153 class ServerCallbackReader {
155 virtual ~ServerCallbackReader() {}
156 virtual void Finish(Status s) = 0;
157 virtual void SendInitialMetadata() = 0;
158 virtual void Read(Request* msg) = 0;
161 template <class Response>
162 void BindReactor(ServerReadReactor<Request, Response>* reactor) {
163 reactor->BindReader(this);
167 template <class Response>
168 class ServerCallbackWriter {
170 virtual ~ServerCallbackWriter() {}
172 virtual void Finish(Status s) = 0;
173 virtual void SendInitialMetadata() = 0;
174 virtual void Write(const Response* msg, WriteOptions options) = 0;
175 virtual void WriteAndFinish(const Response* msg, WriteOptions options,
177 // Default implementation that can/should be overridden
178 Write(msg, std::move(options));
179 Finish(std::move(s));
183 template <class Request>
184 void BindReactor(ServerWriteReactor<Request, Response>* reactor) {
185 reactor->BindWriter(this);
189 template <class Request, class Response>
190 class ServerCallbackReaderWriter {
192 virtual ~ServerCallbackReaderWriter() {}
194 virtual void Finish(Status s) = 0;
195 virtual void SendInitialMetadata() = 0;
196 virtual void Read(Request* msg) = 0;
197 virtual void Write(const Response* msg, WriteOptions options) = 0;
198 virtual void WriteAndFinish(const Response* msg, WriteOptions options,
200 // Default implementation that can/should be overridden
201 Write(msg, std::move(options));
202 Finish(std::move(s));
206 void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
207 reactor->BindStream(this);
211 // The following classes are the reactor interfaces that are to be implemented
212 // by the user, returned as the result of the method handler for a callback
213 // method, and activated by the call to OnStarted. The library guarantees that
214 // OnStarted will be called for any reactor that has been created using a
215 // method handler registered on a service. No operation initiation method may be
216 // called until after the call to OnStarted.
217 // Note that none of the classes are pure; all reactions have a default empty
218 // reaction so that the user class only needs to override those classes that it
221 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
222 template <class Request, class Response>
223 class ServerBidiReactor : public internal::ServerReactor {
225 ~ServerBidiReactor() = default;
227 /// Do NOT call any operation initiation method (names that start with Start)
228 /// until after the library has called OnStarted on this object.
230 /// Send any initial metadata stored in the RPC context. If not invoked,
231 /// any initial metadata will be passed along with the first Write or the
232 /// Finish (if there are no writes).
233 void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
235 /// Initiate a read operation.
237 /// \param[out] req Where to eventually store the read message. Valid when
238 /// the library calls OnReadDone
239 void StartRead(Request* req) { stream_->Read(req); }
241 /// Initiate a write operation.
243 /// \param[in] resp The message to be written. The library takes temporary
244 /// ownership until OnWriteDone, at which point the
245 /// application regains ownership of resp.
246 void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
248 /// Initiate a write operation with specified options.
250 /// \param[in] resp The message to be written. The library takes temporary
251 /// ownership until OnWriteDone, at which point the
252 /// application regains ownership of resp.
253 /// \param[in] options The WriteOptions to use for writing this message
254 void StartWrite(const Response* resp, WriteOptions options) {
255 stream_->Write(resp, std::move(options));
258 /// Initiate a write operation with specified options and final RPC Status,
259 /// which also causes any trailing metadata for this RPC to be sent out.
260 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
261 /// single step. A key difference, though, is that this operation doesn't have
262 /// an OnWriteDone reaction - it is considered complete only when OnDone is
263 /// available. An RPC can either have StartWriteAndFinish or Finish, but not
266 /// \param[in] resp The message to be written. The library takes temporary
267 /// ownership until Onone, at which point the application
268 /// regains ownership of resp.
269 /// \param[in] options The WriteOptions to use for writing this message
270 /// \param[in] s The status outcome of this RPC
271 void StartWriteAndFinish(const Response* resp, WriteOptions options,
273 stream_->WriteAndFinish(resp, std::move(options), std::move(s));
276 /// Inform system of a planned write operation with specified options, but
277 /// allow the library to schedule the actual write coalesced with the writing
278 /// of trailing metadata (which takes place on a Finish call).
280 /// \param[in] resp The message to be written. The library takes temporary
281 /// ownership until OnWriteDone, at which point the
282 /// application regains ownership of resp.
283 /// \param[in] options The WriteOptions to use for writing this message
284 void StartWriteLast(const Response* resp, WriteOptions options) {
285 StartWrite(resp, std::move(options.set_last_message()));
288 /// Indicate that the stream is to be finished and the trailing metadata and
289 /// RPC status are to be sent. Every RPC MUST be finished using either Finish
290 /// or StartWriteAndFinish (but not both), even if the RPC is already
293 /// \param[in] s The status outcome of this RPC
294 void Finish(Status s) { stream_->Finish(std::move(s)); }
296 /// Notify the application that a streaming RPC has started and that it is now
297 /// ok to call any operation initiation method. An RPC is considered started
298 /// after the server has received all initial metadata from the client, which
299 /// is a result of the client calling StartCall().
301 /// \param[in] context The context object now associated with this RPC
302 virtual void OnStarted(ServerContext* context) {}
304 /// Notifies the application that an explicit StartSendInitialMetadata
305 /// operation completed. Not used when the sending of initial metadata
306 /// piggybacks onto the first write.
308 /// \param[in] ok Was it successful? If false, no further write-side operation
310 virtual void OnSendInitialMetadataDone(bool ok) {}
312 /// Notifies the application that a StartRead operation completed.
314 /// \param[in] ok Was it successful? If false, no further read-side operation
316 virtual void OnReadDone(bool ok) {}
318 /// Notifies the application that a StartWrite (or StartWriteLast) operation
321 /// \param[in] ok Was it successful? If false, no further write-side operation
323 virtual void OnWriteDone(bool ok) {}
325 /// Notifies the application that all operations associated with this RPC
326 /// have completed. This is an override (from the internal base class) but not
327 /// final, so derived classes should override it if they want to take action.
328 void OnDone() override {}
330 /// Notifies the application that this RPC has been cancelled. This is an
331 /// override (from the internal base class) but not final, so derived classes
332 /// should override it if they want to take action.
333 void OnCancel() override {}
336 friend class ServerCallbackReaderWriter<Request, Response>;
337 void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
341 ServerCallbackReaderWriter<Request, Response>* stream_;
344 /// \a ServerReadReactor is the interface for a client-streaming RPC.
345 template <class Request, class Response>
346 class ServerReadReactor : public internal::ServerReactor {
348 ~ServerReadReactor() = default;
350 /// The following operation initiations are exactly like ServerBidiReactor.
351 void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
352 void StartRead(Request* req) { reader_->Read(req); }
353 void Finish(Status s) { reader_->Finish(std::move(s)); }
355 /// Similar to ServerBidiReactor::OnStarted, except that this also provides
356 /// the response object that the stream fills in before calling Finish.
357 /// (It must be filled in if status is OK, but it may be filled in otherwise.)
359 /// \param[in] context The context object now associated with this RPC
360 /// \param[in] resp The response object to be used by this RPC
361 virtual void OnStarted(ServerContext* context, Response* resp) {}
363 /// The following notifications are exactly like ServerBidiReactor.
364 virtual void OnSendInitialMetadataDone(bool ok) {}
365 virtual void OnReadDone(bool ok) {}
366 void OnDone() override {}
367 void OnCancel() override {}
370 friend class ServerCallbackReader<Request>;
371 void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
373 ServerCallbackReader<Request>* reader_;
376 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
377 template <class Request, class Response>
378 class ServerWriteReactor : public internal::ServerReactor {
380 ~ServerWriteReactor() = default;
382 /// The following operation initiations are exactly like ServerBidiReactor.
383 void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
384 void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
385 void StartWrite(const Response* resp, WriteOptions options) {
386 writer_->Write(resp, std::move(options));
388 void StartWriteAndFinish(const Response* resp, WriteOptions options,
390 writer_->WriteAndFinish(resp, std::move(options), std::move(s));
392 void StartWriteLast(const Response* resp, WriteOptions options) {
393 StartWrite(resp, std::move(options.set_last_message()));
395 void Finish(Status s) { writer_->Finish(std::move(s)); }
397 /// Similar to ServerBidiReactor::OnStarted, except that this also provides
398 /// the request object sent by the client.
400 /// \param[in] context The context object now associated with this RPC
401 /// \param[in] req The request object sent by the client
402 virtual void OnStarted(ServerContext* context, const Request* req) {}
404 /// The following notifications are exactly like ServerBidiReactor.
405 virtual void OnSendInitialMetadataDone(bool ok) {}
406 virtual void OnWriteDone(bool ok) {}
407 void OnDone() override {}
408 void OnCancel() override {}
411 friend class ServerCallbackWriter<Response>;
412 void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
414 ServerCallbackWriter<Response>* writer_;
417 } // namespace experimental
421 template <class Request, class Response>
422 class UnimplementedReadReactor
423 : public experimental::ServerReadReactor<Request, Response> {
425 void OnDone() override { delete this; }
426 void OnStarted(ServerContext*, Response*) override {
427 this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
431 template <class Request, class Response>
432 class UnimplementedWriteReactor
433 : public experimental::ServerWriteReactor<Request, Response> {
435 void OnDone() override { delete this; }
436 void OnStarted(ServerContext*, const Request*) override {
437 this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
441 template <class Request, class Response>
442 class UnimplementedBidiReactor
443 : public experimental::ServerBidiReactor<Request, Response> {
445 void OnDone() override { delete this; }
446 void OnStarted(ServerContext*) override {
447 this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
451 template <class RequestType, class ResponseType>
452 class CallbackUnaryHandler : public MethodHandler {
454 CallbackUnaryHandler(
455 std::function<void(ServerContext*, const RequestType*, ResponseType*,
456 experimental::ServerCallbackRpcController*)>
460 void SetMessageAllocator(
461 experimental::MessageAllocator<RequestType, ResponseType>* allocator) {
462 allocator_ = allocator;
465 void RunHandler(const HandlerParameter& param) final {
466 // Arena allocate a controller structure (that includes request/response)
467 g_core_codegen_interface->grpc_call_ref(param.call->call());
468 auto* allocator_info =
469 static_cast<experimental::RpcAllocatorInfo<RequestType, ResponseType>*>(
470 param.internal_data);
471 auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
472 param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
473 ServerCallbackRpcControllerImpl(param.server_context, param.call,
474 allocator_info, allocator_,
475 std::move(param.call_requester));
476 Status status = param.status;
478 // Call the actual function handler and expect the user to call finish
479 CatchingCallback(func_, param.server_context, controller->request(),
480 controller->response(), controller);
482 // if deserialization failed, we need to fail the call
483 controller->Finish(status);
487 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
488 void** handler_data) final {
491 RequestType* request = nullptr;
492 experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info =
493 new (g_core_codegen_interface->grpc_call_arena_alloc(
494 call, sizeof(*allocator_info)))
495 experimental::RpcAllocatorInfo<RequestType, ResponseType>();
496 if (allocator_ != nullptr) {
497 allocator_->AllocateMessages(allocator_info);
499 allocator_info->request =
500 new (g_core_codegen_interface->grpc_call_arena_alloc(
501 call, sizeof(RequestType))) RequestType();
502 allocator_info->response =
503 new (g_core_codegen_interface->grpc_call_arena_alloc(
504 call, sizeof(ResponseType))) ResponseType();
506 *handler_data = allocator_info;
507 request = allocator_info->request;
508 *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
513 // Clean up on deserialization failure.
514 if (allocator_ != nullptr) {
515 allocator_->DeallocateMessages(allocator_info);
517 allocator_info->request->~RequestType();
518 allocator_info->response->~ResponseType();
519 allocator_info->request = nullptr;
520 allocator_info->response = nullptr;
526 std::function<void(ServerContext*, const RequestType*, ResponseType*,
527 experimental::ServerCallbackRpcController*)>
529 experimental::MessageAllocator<RequestType, ResponseType>* allocator_ =
532 // The implementation class of ServerCallbackRpcController is a private member
533 // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
534 // it to take advantage of CallbackUnaryHandler's friendships.
535 class ServerCallbackRpcControllerImpl
536 : public experimental::ServerCallbackRpcController {
538 void Finish(Status s) override {
539 finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
541 if (!ctx_->sent_initial_metadata_) {
542 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
543 ctx_->initial_metadata_flags());
544 if (ctx_->compression_level_set()) {
545 finish_ops_.set_compression_level(ctx_->compression_level());
547 ctx_->sent_initial_metadata_ = true;
549 // The response is dropped if the status is not OK.
551 finish_ops_.ServerSendStatus(
552 &ctx_->trailing_metadata_,
553 finish_ops_.SendMessagePtr(allocator_info_->response));
555 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
557 finish_ops_.set_core_cq_tag(&finish_tag_);
558 call_.PerformOps(&finish_ops_);
561 void SendInitialMetadata(std::function<void(bool)> f) override {
562 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
563 callbacks_outstanding_++;
564 // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
565 // and if performance of this operation matters
566 meta_tag_.Set(call_.call(),
572 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
573 ctx_->initial_metadata_flags());
574 if (ctx_->compression_level_set()) {
575 meta_ops_.set_compression_level(ctx_->compression_level());
577 ctx_->sent_initial_metadata_ = true;
578 meta_ops_.set_core_cq_tag(&meta_tag_);
579 call_.PerformOps(&meta_ops_);
582 // Neither SetCancelCallback nor ClearCancelCallback should affect the
583 // callbacks_outstanding_ count since they are paired and both must precede
584 // the invocation of Finish (if they are used at all)
585 void SetCancelCallback(std::function<void()> callback) override {
586 ctx_->SetCancelCallback(std::move(callback));
589 void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
591 void FreeRequest() override {
592 if (allocator_ != nullptr) {
593 allocator_->DeallocateRequest(allocator_info_);
597 void* GetAllocatorState() override {
598 return allocator_info_->allocator_state;
602 friend class CallbackUnaryHandler<RequestType, ResponseType>;
604 ServerCallbackRpcControllerImpl(
605 ServerContext* ctx, Call* call,
606 experimental::RpcAllocatorInfo<RequestType, ResponseType>*
608 experimental::MessageAllocator<RequestType, ResponseType>* allocator,
609 std::function<void()> call_requester)
612 allocator_info_(allocator_info),
613 allocator_(allocator),
614 call_requester_(std::move(call_requester)) {
615 ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
618 const RequestType* request() { return allocator_info_->request; }
619 ResponseType* response() { return allocator_info_->response; }
622 if (--callbacks_outstanding_ == 0) {
623 grpc_call* call = call_.call();
624 auto call_requester = std::move(call_requester_);
625 if (allocator_ != nullptr) {
626 allocator_->DeallocateMessages(allocator_info_);
628 if (allocator_info_->request != nullptr) {
629 allocator_info_->request->~RequestType();
631 if (allocator_info_->response != nullptr) {
632 allocator_info_->response->~ResponseType();
635 this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
636 g_core_codegen_interface->grpc_call_unref(call);
641 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
642 CallbackWithSuccessTag meta_tag_;
643 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
644 CallOpServerSendStatus>
646 CallbackWithSuccessTag finish_tag_;
650 experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info_;
651 experimental::MessageAllocator<RequestType, ResponseType>* allocator_;
652 std::function<void()> call_requester_;
653 std::atomic_int callbacks_outstanding_{
654 2}; // reserve for Finish and CompletionOp
658 template <class RequestType, class ResponseType>
659 class CallbackClientStreamingHandler : public MethodHandler {
661 CallbackClientStreamingHandler(
663 experimental::ServerReadReactor<RequestType, ResponseType>*()>
665 : func_(std::move(func)) {}
666 void RunHandler(const HandlerParameter& param) final {
667 // Arena allocate a reader structure (that includes response)
668 g_core_codegen_interface->grpc_call_ref(param.call->call());
670 experimental::ServerReadReactor<RequestType, ResponseType>* reactor =
672 ? CatchingReactorCreator<
673 experimental::ServerReadReactor<RequestType, ResponseType>>(
677 if (reactor == nullptr) {
678 // if deserialization or reactor creator failed, we need to fail the call
679 reactor = new UnimplementedReadReactor<RequestType, ResponseType>;
682 auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
683 param.call->call(), sizeof(ServerCallbackReaderImpl)))
684 ServerCallbackReaderImpl(param.server_context, param.call,
685 std::move(param.call_requester), reactor);
687 reader->BindReactor(reactor);
688 reactor->OnStarted(param.server_context, reader->response());
689 // The earliest that OnCancel can be called is after OnStarted is done.
690 reactor->MaybeCallOnCancel();
695 std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
698 class ServerCallbackReaderImpl
699 : public experimental::ServerCallbackReader<RequestType> {
701 void Finish(Status s) override {
702 finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
704 if (!ctx_->sent_initial_metadata_) {
705 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
706 ctx_->initial_metadata_flags());
707 if (ctx_->compression_level_set()) {
708 finish_ops_.set_compression_level(ctx_->compression_level());
710 ctx_->sent_initial_metadata_ = true;
712 // The response is dropped if the status is not OK.
714 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
715 finish_ops_.SendMessagePtr(&resp_));
717 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
719 finish_ops_.set_core_cq_tag(&finish_tag_);
720 call_.PerformOps(&finish_ops_);
723 void SendInitialMetadata() override {
724 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
725 callbacks_outstanding_++;
726 meta_tag_.Set(call_.call(),
728 reactor_->OnSendInitialMetadataDone(ok);
732 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
733 ctx_->initial_metadata_flags());
734 if (ctx_->compression_level_set()) {
735 meta_ops_.set_compression_level(ctx_->compression_level());
737 ctx_->sent_initial_metadata_ = true;
738 meta_ops_.set_core_cq_tag(&meta_tag_);
739 call_.PerformOps(&meta_ops_);
742 void Read(RequestType* req) override {
743 callbacks_outstanding_++;
744 read_ops_.RecvMessage(req);
745 call_.PerformOps(&read_ops_);
749 friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
751 ServerCallbackReaderImpl(
752 ServerContext* ctx, Call* call, std::function<void()> call_requester,
753 experimental::ServerReadReactor<RequestType, ResponseType>* reactor)
756 call_requester_(std::move(call_requester)),
758 ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
759 read_tag_.Set(call_.call(),
761 reactor_->OnReadDone(ok);
765 read_ops_.set_core_cq_tag(&read_tag_);
768 ~ServerCallbackReaderImpl() {}
770 ResponseType* response() { return &resp_; }
773 if (--callbacks_outstanding_ == 0) {
775 grpc_call* call = call_.call();
776 auto call_requester = std::move(call_requester_);
777 this->~ServerCallbackReaderImpl(); // explicitly call destructor
778 g_core_codegen_interface->grpc_call_unref(call);
783 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
784 CallbackWithSuccessTag meta_tag_;
785 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
786 CallOpServerSendStatus>
788 CallbackWithSuccessTag finish_tag_;
789 CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
790 CallbackWithSuccessTag read_tag_;
795 std::function<void()> call_requester_;
796 experimental::ServerReadReactor<RequestType, ResponseType>* reactor_;
797 std::atomic_int callbacks_outstanding_{
798 3}; // reserve for OnStarted, Finish, and CompletionOp
802 template <class RequestType, class ResponseType>
803 class CallbackServerStreamingHandler : public MethodHandler {
805 CallbackServerStreamingHandler(
807 experimental::ServerWriteReactor<RequestType, ResponseType>*()>
809 : func_(std::move(func)) {}
810 void RunHandler(const HandlerParameter& param) final {
811 // Arena allocate a writer structure
812 g_core_codegen_interface->grpc_call_ref(param.call->call());
814 experimental::ServerWriteReactor<RequestType, ResponseType>* reactor =
816 ? CatchingReactorCreator<
817 experimental::ServerWriteReactor<RequestType, ResponseType>>(
821 if (reactor == nullptr) {
822 // if deserialization or reactor creator failed, we need to fail the call
823 reactor = new UnimplementedWriteReactor<RequestType, ResponseType>;
826 auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
827 param.call->call(), sizeof(ServerCallbackWriterImpl)))
828 ServerCallbackWriterImpl(param.server_context, param.call,
829 static_cast<RequestType*>(param.request),
830 std::move(param.call_requester), reactor);
831 writer->BindReactor(reactor);
832 reactor->OnStarted(param.server_context, writer->request());
833 // The earliest that OnCancel can be called is after OnStarted is done.
834 reactor->MaybeCallOnCancel();
838 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
839 void** handler_data) final {
842 auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
843 call, sizeof(RequestType))) RequestType();
844 *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
849 request->~RequestType();
854 std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
857 class ServerCallbackWriterImpl
858 : public experimental::ServerCallbackWriter<ResponseType> {
860 void Finish(Status s) override {
861 finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
863 finish_ops_.set_core_cq_tag(&finish_tag_);
865 if (!ctx_->sent_initial_metadata_) {
866 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
867 ctx_->initial_metadata_flags());
868 if (ctx_->compression_level_set()) {
869 finish_ops_.set_compression_level(ctx_->compression_level());
871 ctx_->sent_initial_metadata_ = true;
873 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
874 call_.PerformOps(&finish_ops_);
877 void SendInitialMetadata() override {
878 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
879 callbacks_outstanding_++;
880 meta_tag_.Set(call_.call(),
882 reactor_->OnSendInitialMetadataDone(ok);
886 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
887 ctx_->initial_metadata_flags());
888 if (ctx_->compression_level_set()) {
889 meta_ops_.set_compression_level(ctx_->compression_level());
891 ctx_->sent_initial_metadata_ = true;
892 meta_ops_.set_core_cq_tag(&meta_tag_);
893 call_.PerformOps(&meta_ops_);
896 void Write(const ResponseType* resp, WriteOptions options) override {
897 callbacks_outstanding_++;
898 if (options.is_last_message()) {
899 options.set_buffer_hint();
901 if (!ctx_->sent_initial_metadata_) {
902 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
903 ctx_->initial_metadata_flags());
904 if (ctx_->compression_level_set()) {
905 write_ops_.set_compression_level(ctx_->compression_level());
907 ctx_->sent_initial_metadata_ = true;
909 // TODO(vjpai): don't assert
910 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
911 call_.PerformOps(&write_ops_);
914 void WriteAndFinish(const ResponseType* resp, WriteOptions options,
916 // This combines the write into the finish callback
917 // Don't send any message if the status is bad
919 // TODO(vjpai): don't assert
920 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
922 Finish(std::move(s));
926 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
928 ServerCallbackWriterImpl(
929 ServerContext* ctx, Call* call, const RequestType* req,
930 std::function<void()> call_requester,
931 experimental::ServerWriteReactor<RequestType, ResponseType>* reactor)
935 call_requester_(std::move(call_requester)),
937 ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
938 write_tag_.Set(call_.call(),
940 reactor_->OnWriteDone(ok);
944 write_ops_.set_core_cq_tag(&write_tag_);
946 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
948 const RequestType* request() { return req_; }
951 if (--callbacks_outstanding_ == 0) {
953 grpc_call* call = call_.call();
954 auto call_requester = std::move(call_requester_);
955 this->~ServerCallbackWriterImpl(); // explicitly call destructor
956 g_core_codegen_interface->grpc_call_unref(call);
961 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
962 CallbackWithSuccessTag meta_tag_;
963 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
964 CallOpServerSendStatus>
966 CallbackWithSuccessTag finish_tag_;
967 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
968 CallbackWithSuccessTag write_tag_;
972 const RequestType* req_;
973 std::function<void()> call_requester_;
974 experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_;
975 std::atomic_int callbacks_outstanding_{
976 3}; // reserve for OnStarted, Finish, and CompletionOp
980 template <class RequestType, class ResponseType>
981 class CallbackBidiHandler : public MethodHandler {
985 experimental::ServerBidiReactor<RequestType, ResponseType>*()>
987 : func_(std::move(func)) {}
988 void RunHandler(const HandlerParameter& param) final {
989 g_core_codegen_interface->grpc_call_ref(param.call->call());
991 experimental::ServerBidiReactor<RequestType, ResponseType>* reactor =
993 ? CatchingReactorCreator<
994 experimental::ServerBidiReactor<RequestType, ResponseType>>(
998 if (reactor == nullptr) {
999 // if deserialization or reactor creator failed, we need to fail the call
1000 reactor = new UnimplementedBidiReactor<RequestType, ResponseType>;
1003 auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
1004 param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
1005 ServerCallbackReaderWriterImpl(param.server_context, param.call,
1006 std::move(param.call_requester),
1009 stream->BindReactor(reactor);
1010 reactor->OnStarted(param.server_context);
1011 // The earliest that OnCancel can be called is after OnStarted is done.
1012 reactor->MaybeCallOnCancel();
1013 stream->MaybeDone();
1017 std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
1020 class ServerCallbackReaderWriterImpl
1021 : public experimental::ServerCallbackReaderWriter<RequestType,
1024 void Finish(Status s) override {
1025 finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
1027 finish_ops_.set_core_cq_tag(&finish_tag_);
1029 if (!ctx_->sent_initial_metadata_) {
1030 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1031 ctx_->initial_metadata_flags());
1032 if (ctx_->compression_level_set()) {
1033 finish_ops_.set_compression_level(ctx_->compression_level());
1035 ctx_->sent_initial_metadata_ = true;
1037 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1038 call_.PerformOps(&finish_ops_);
1041 void SendInitialMetadata() override {
1042 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1043 callbacks_outstanding_++;
1044 meta_tag_.Set(call_.call(),
1046 reactor_->OnSendInitialMetadataDone(ok);
1050 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1051 ctx_->initial_metadata_flags());
1052 if (ctx_->compression_level_set()) {
1053 meta_ops_.set_compression_level(ctx_->compression_level());
1055 ctx_->sent_initial_metadata_ = true;
1056 meta_ops_.set_core_cq_tag(&meta_tag_);
1057 call_.PerformOps(&meta_ops_);
1060 void Write(const ResponseType* resp, WriteOptions options) override {
1061 callbacks_outstanding_++;
1062 if (options.is_last_message()) {
1063 options.set_buffer_hint();
1065 if (!ctx_->sent_initial_metadata_) {
1066 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1067 ctx_->initial_metadata_flags());
1068 if (ctx_->compression_level_set()) {
1069 write_ops_.set_compression_level(ctx_->compression_level());
1071 ctx_->sent_initial_metadata_ = true;
1073 // TODO(vjpai): don't assert
1074 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
1075 call_.PerformOps(&write_ops_);
1078 void WriteAndFinish(const ResponseType* resp, WriteOptions options,
1079 Status s) override {
1080 // Don't send any message if the status is bad
1082 // TODO(vjpai): don't assert
1083 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
1085 Finish(std::move(s));
1088 void Read(RequestType* req) override {
1089 callbacks_outstanding_++;
1090 read_ops_.RecvMessage(req);
1091 call_.PerformOps(&read_ops_);
1095 friend class CallbackBidiHandler<RequestType, ResponseType>;
1097 ServerCallbackReaderWriterImpl(
1098 ServerContext* ctx, Call* call, std::function<void()> call_requester,
1099 experimental::ServerBidiReactor<RequestType, ResponseType>* reactor)
1102 call_requester_(std::move(call_requester)),
1104 ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1105 write_tag_.Set(call_.call(),
1107 reactor_->OnWriteDone(ok);
1111 write_ops_.set_core_cq_tag(&write_tag_);
1112 read_tag_.Set(call_.call(),
1114 reactor_->OnReadDone(ok);
1118 read_ops_.set_core_cq_tag(&read_tag_);
1120 ~ServerCallbackReaderWriterImpl() {}
1123 if (--callbacks_outstanding_ == 0) {
1125 grpc_call* call = call_.call();
1126 auto call_requester = std::move(call_requester_);
1127 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
1128 g_core_codegen_interface->grpc_call_unref(call);
1133 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
1134 CallbackWithSuccessTag meta_tag_;
1135 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
1136 CallOpServerSendStatus>
1138 CallbackWithSuccessTag finish_tag_;
1139 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
1140 CallbackWithSuccessTag write_tag_;
1141 CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
1142 CallbackWithSuccessTag read_tag_;
1144 ServerContext* ctx_;
1146 std::function<void()> call_requester_;
1147 experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_;
1148 std::atomic_int callbacks_outstanding_{
1149 3}; // reserve for OnStarted, Finish, and CompletionOp
1153 } // namespace internal
1157 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H