3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
33 explicit CallbackUnaryHandler(
34 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35 const RequestType*, ResponseType*)>
37 : get_reactor_(std::move(get_reactor)) {}
39 void SetMessageAllocator(
40 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
42 allocator_ = allocator;
45 void RunHandler(const HandlerParameter& param) final {
46 // Arena allocate a controller structure (that includes request/response)
47 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48 auto* allocator_state = static_cast<
49 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
52 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53 param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
55 static_cast<::grpc::CallbackServerContext*>(param.server_context),
56 param.call, allocator_state, param.call_requester);
57 param.server_context->BeginCompletionOp(
58 param.call, [call](bool) { call->MaybeDone(); }, call);
60 ServerUnaryReactor* reactor = nullptr;
61 if (param.status.ok()) {
62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64 static_cast<::grpc::CallbackServerContext*>(param.server_context),
65 call->request(), call->response());
68 if (reactor == nullptr) {
69 // if deserialization or reactor creator failed, we need to fail the call
70 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
71 param.call->call(), sizeof(UnimplementedUnaryReactor)))
72 UnimplementedUnaryReactor(
73 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
76 /// Invoke SetupReactor as the last part of the handler
77 call->SetupReactor(reactor);
80 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
81 ::grpc::Status* status, void** handler_data) final {
82 ::grpc::ByteBuffer buf;
84 RequestType* request = nullptr;
85 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
86 allocator_state = nullptr;
87 if (allocator_ != nullptr) {
88 allocator_state = allocator_->AllocateMessages();
91 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
92 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
93 DefaultMessageHolder<RequestType, ResponseType>();
95 *handler_data = allocator_state;
96 request = allocator_state->request();
98 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
103 // Clean up on deserialization failure.
104 allocator_state->Release();
109 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
110 const RequestType*, ResponseType*)>
112 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
113 allocator_ = nullptr;
115 class ServerCallbackUnaryImpl : public ServerCallbackUnary {
117 void Finish(::grpc::Status s) override {
118 // A callback that only contains a call to MaybeDone can be run as an
119 // inline callback regardless of whether or not OnDone is inlineable
120 // because if the actual OnDone callback needs to be scheduled, MaybeDone
121 // is responsible for dispatching to an executor thread if needed. Thus,
122 // when setting up the finish_tag_, we can set its own callback to
128 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
130 &finish_ops_, /*can_inline=*/true);
131 finish_ops_.set_core_cq_tag(&finish_tag_);
133 if (!ctx_->sent_initial_metadata_) {
134 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135 ctx_->initial_metadata_flags());
136 if (ctx_->compression_level_set()) {
137 finish_ops_.set_compression_level(ctx_->compression_level());
139 ctx_->sent_initial_metadata_ = true;
141 // The response is dropped if the status is not OK.
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144 finish_ops_.SendMessagePtr(response()));
146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
148 finish_ops_.set_core_cq_tag(&finish_tag_);
149 call_.PerformOps(&finish_ops_);
152 void SendInitialMetadata() override {
153 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
155 // The callback for this function should not be marked inline because it
156 // is directly invoking a user-controlled reaction
157 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158 // thread. However, any OnDone needed after that can be inlined because it
159 // is already running on an executor thread.
163 ServerUnaryReactor* reactor =
164 reactor_.load(std::memory_order_relaxed);
165 reactor->OnSendInitialMetadataDone(ok);
166 this->MaybeDone(/*inlineable_ondone=*/true);
168 &meta_ops_, /*can_inline=*/false);
169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170 ctx_->initial_metadata_flags());
171 if (ctx_->compression_level_set()) {
172 meta_ops_.set_compression_level(ctx_->compression_level());
174 ctx_->sent_initial_metadata_ = true;
175 meta_ops_.set_core_cq_tag(&meta_tag_);
176 call_.PerformOps(&meta_ops_);
180 friend class CallbackUnaryHandler<RequestType, ResponseType>;
182 ServerCallbackUnaryImpl(
183 ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
184 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
186 std::function<void()> call_requester)
189 allocator_state_(allocator_state),
190 call_requester_(std::move(call_requester)) {
191 ctx_->set_message_allocator_state(allocator_state);
194 /// SetupReactor binds the reactor (which also releases any queued
195 /// operations), maybe calls OnCancel if possible/needed, and maybe marks
196 /// the completion of the RPC. This should be the last component of the
198 void SetupReactor(ServerUnaryReactor* reactor) {
199 reactor_.store(reactor, std::memory_order_relaxed);
200 this->BindReactor(reactor);
201 this->MaybeCallOnCancel(reactor);
202 this->MaybeDone(reactor->InternalInlineable());
205 const RequestType* request() { return allocator_state_->request(); }
206 ResponseType* response() { return allocator_state_->response(); }
208 void CallOnDone() override {
209 reactor_.load(std::memory_order_relaxed)->OnDone();
210 grpc_call* call = call_.call();
211 auto call_requester = std::move(call_requester_);
212 allocator_state_->Release();
213 if (ctx_->context_allocator() != nullptr) {
214 ctx_->context_allocator()->Release(ctx_);
216 this->~ServerCallbackUnaryImpl(); // explicitly call destructor
217 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
221 ServerReactor* reactor() override {
222 return reactor_.load(std::memory_order_relaxed);
225 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
227 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
228 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
229 ::grpc::internal::CallOpSendMessage,
230 ::grpc::internal::CallOpServerSendStatus>
232 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
234 ::grpc::CallbackServerContext* const ctx_;
235 ::grpc::internal::Call call_;
236 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
238 std::function<void()> call_requester_;
239 // reactor_ can always be loaded/stored with relaxed memory ordering because
240 // its value is only set once, independently of other data in the object,
241 // and the loads that use it will always actually come provably later even
242 // though they are from different threads since they are triggered by
243 // actions initiated only by the setting up of the reactor_ variable. In
244 // a sense, it's a delayed "const": it gets its value from the SetupReactor
245 // method (not the constructor, so it's not a true const), but it doesn't
246 // change after that and it only gets used by actions caused, directly or
247 // indirectly, by that setup. This comment also applies to the reactor_
248 // variables of the other streaming objects in this file.
249 std::atomic<ServerUnaryReactor*> reactor_;
250 // callbacks_outstanding_ follows a refcount pattern
251 std::atomic<intptr_t> callbacks_outstanding_{
252 3}; // reserve for start, Finish, and CompletionOp
256 template <class RequestType, class ResponseType>
257 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
259 explicit CallbackClientStreamingHandler(
260 std::function<ServerReadReactor<RequestType>*(
261 ::grpc::CallbackServerContext*, ResponseType*)>
263 : get_reactor_(std::move(get_reactor)) {}
264 void RunHandler(const HandlerParameter& param) final {
265 // Arena allocate a reader structure (that includes response)
266 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
268 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
269 param.call->call(), sizeof(ServerCallbackReaderImpl)))
270 ServerCallbackReaderImpl(
271 static_cast<::grpc::CallbackServerContext*>(param.server_context),
272 param.call, param.call_requester);
273 // Inlineable OnDone can be false in the CompletionOp callback because there
274 // is no read reactor that has an inlineable OnDone; this only applies to
275 // the DefaultReactor (which is unary).
276 param.server_context->BeginCompletionOp(
278 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
281 ServerReadReactor<RequestType>* reactor = nullptr;
282 if (param.status.ok()) {
283 reactor = ::grpc::internal::CatchingReactorGetter<
284 ServerReadReactor<RequestType>>(
286 static_cast<::grpc::CallbackServerContext*>(param.server_context),
290 if (reactor == nullptr) {
291 // if deserialization or reactor creator failed, we need to fail the call
292 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
293 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294 UnimplementedReadReactor<RequestType>(
295 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
298 reader->SetupReactor(reactor);
302 std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
306 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
308 void Finish(::grpc::Status s) override {
309 // A finish tag with only MaybeDone can have its callback inlined
310 // regardless even if OnDone is not inlineable because this callback just
311 // checks a ref and then decides whether or not to dispatch OnDone.
315 // Inlineable OnDone can be false here because there is
316 // no read reactor that has an inlineable OnDone; this
317 // only applies to the DefaultReactor (which is unary).
318 this->MaybeDone(/*inlineable_ondone=*/false);
320 &finish_ops_, /*can_inline=*/true);
321 if (!ctx_->sent_initial_metadata_) {
322 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
323 ctx_->initial_metadata_flags());
324 if (ctx_->compression_level_set()) {
325 finish_ops_.set_compression_level(ctx_->compression_level());
327 ctx_->sent_initial_metadata_ = true;
329 // The response is dropped if the status is not OK.
331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
332 finish_ops_.SendMessagePtr(&resp_));
334 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
336 finish_ops_.set_core_cq_tag(&finish_tag_);
337 call_.PerformOps(&finish_ops_);
340 void SendInitialMetadata() override {
341 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
343 // The callback for this function should not be inlined because it invokes
344 // a user-controlled reaction, but any resulting OnDone can be inlined in
345 // the executor to which this callback is dispatched.
349 ServerReadReactor<RequestType>* reactor =
350 reactor_.load(std::memory_order_relaxed);
351 reactor->OnSendInitialMetadataDone(ok);
352 this->MaybeDone(/*inlineable_ondone=*/true);
354 &meta_ops_, /*can_inline=*/false);
355 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
356 ctx_->initial_metadata_flags());
357 if (ctx_->compression_level_set()) {
358 meta_ops_.set_compression_level(ctx_->compression_level());
360 ctx_->sent_initial_metadata_ = true;
361 meta_ops_.set_core_cq_tag(&meta_tag_);
362 call_.PerformOps(&meta_ops_);
365 void Read(RequestType* req) override {
367 read_ops_.RecvMessage(req);
368 call_.PerformOps(&read_ops_);
372 friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
374 ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
375 ::grpc::internal::Call* call,
376 std::function<void()> call_requester)
377 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
379 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380 reactor_.store(reactor, std::memory_order_relaxed);
381 // The callback for this function should not be inlined because it invokes
382 // a user-controlled reaction, but any resulting OnDone can be inlined in
383 // the executor to which this callback is dispatched.
386 [this, reactor](bool ok) {
387 reactor->OnReadDone(ok);
388 this->MaybeDone(/*inlineable_ondone=*/true);
390 &read_ops_, /*can_inline=*/false);
391 read_ops_.set_core_cq_tag(&read_tag_);
392 this->BindReactor(reactor);
393 this->MaybeCallOnCancel(reactor);
394 // Inlineable OnDone can be false here because there is no read
395 // reactor that has an inlineable OnDone; this only applies to the
396 // DefaultReactor (which is unary).
397 this->MaybeDone(/*inlineable_ondone=*/false);
400 ~ServerCallbackReaderImpl() {}
402 ResponseType* response() { return &resp_; }
404 void CallOnDone() override {
405 reactor_.load(std::memory_order_relaxed)->OnDone();
406 grpc_call* call = call_.call();
407 auto call_requester = std::move(call_requester_);
408 if (ctx_->context_allocator() != nullptr) {
409 ctx_->context_allocator()->Release(ctx_);
411 this->~ServerCallbackReaderImpl(); // explicitly call destructor
412 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
416 ServerReactor* reactor() override {
417 return reactor_.load(std::memory_order_relaxed);
420 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
422 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
423 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
424 ::grpc::internal::CallOpSendMessage,
425 ::grpc::internal::CallOpServerSendStatus>
427 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
428 ::grpc::internal::CallOpSet<
429 ::grpc::internal::CallOpRecvMessage<RequestType>>
431 ::grpc::internal::CallbackWithSuccessTag read_tag_;
433 ::grpc::CallbackServerContext* const ctx_;
434 ::grpc::internal::Call call_;
436 std::function<void()> call_requester_;
437 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
438 std::atomic<ServerReadReactor<RequestType>*> reactor_;
439 // callbacks_outstanding_ follows a refcount pattern
440 std::atomic<intptr_t> callbacks_outstanding_{
441 3}; // reserve for OnStarted, Finish, and CompletionOp
445 template <class RequestType, class ResponseType>
446 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
448 explicit CallbackServerStreamingHandler(
449 std::function<ServerWriteReactor<ResponseType>*(
450 ::grpc::CallbackServerContext*, const RequestType*)>
452 : get_reactor_(std::move(get_reactor)) {}
453 void RunHandler(const HandlerParameter& param) final {
454 // Arena allocate a writer structure
455 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
457 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
458 param.call->call(), sizeof(ServerCallbackWriterImpl)))
459 ServerCallbackWriterImpl(
460 static_cast<::grpc::CallbackServerContext*>(param.server_context),
461 param.call, static_cast<RequestType*>(param.request),
462 param.call_requester);
463 // Inlineable OnDone can be false in the CompletionOp callback because there
464 // is no write reactor that has an inlineable OnDone; this only applies to
465 // the DefaultReactor (which is unary).
466 param.server_context->BeginCompletionOp(
468 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
471 ServerWriteReactor<ResponseType>* reactor = nullptr;
472 if (param.status.ok()) {
473 reactor = ::grpc::internal::CatchingReactorGetter<
474 ServerWriteReactor<ResponseType>>(
476 static_cast<::grpc::CallbackServerContext*>(param.server_context),
479 if (reactor == nullptr) {
480 // if deserialization or reactor creator failed, we need to fail the call
481 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
482 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
483 UnimplementedWriteReactor<ResponseType>(
484 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
487 writer->SetupReactor(reactor);
490 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
491 ::grpc::Status* status, void** /*handler_data*/) final {
492 ::grpc::ByteBuffer buf;
495 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
496 call, sizeof(RequestType))) RequestType();
498 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
503 request->~RequestType();
508 std::function<ServerWriteReactor<ResponseType>*(
509 ::grpc::CallbackServerContext*, const RequestType*)>
512 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
514 void Finish(::grpc::Status s) override {
515 // A finish tag with only MaybeDone can have its callback inlined
516 // regardless even if OnDone is not inlineable because this callback just
517 // checks a ref and then decides whether or not to dispatch OnDone.
521 // Inlineable OnDone can be false here because there is
522 // no write reactor that has an inlineable OnDone; this
523 // only applies to the DefaultReactor (which is unary).
524 this->MaybeDone(/*inlineable_ondone=*/false);
526 &finish_ops_, /*can_inline=*/true);
527 finish_ops_.set_core_cq_tag(&finish_tag_);
529 if (!ctx_->sent_initial_metadata_) {
530 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
531 ctx_->initial_metadata_flags());
532 if (ctx_->compression_level_set()) {
533 finish_ops_.set_compression_level(ctx_->compression_level());
535 ctx_->sent_initial_metadata_ = true;
537 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
538 call_.PerformOps(&finish_ops_);
541 void SendInitialMetadata() override {
542 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
544 // The callback for this function should not be inlined because it invokes
545 // a user-controlled reaction, but any resulting OnDone can be inlined in
546 // the executor to which this callback is dispatched.
550 ServerWriteReactor<ResponseType>* reactor =
551 reactor_.load(std::memory_order_relaxed);
552 reactor->OnSendInitialMetadataDone(ok);
553 this->MaybeDone(/*inlineable_ondone=*/true);
555 &meta_ops_, /*can_inline=*/false);
556 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
557 ctx_->initial_metadata_flags());
558 if (ctx_->compression_level_set()) {
559 meta_ops_.set_compression_level(ctx_->compression_level());
561 ctx_->sent_initial_metadata_ = true;
562 meta_ops_.set_core_cq_tag(&meta_tag_);
563 call_.PerformOps(&meta_ops_);
566 void Write(const ResponseType* resp,
567 ::grpc::WriteOptions options) override {
569 if (options.is_last_message()) {
570 options.set_buffer_hint();
572 if (!ctx_->sent_initial_metadata_) {
573 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574 ctx_->initial_metadata_flags());
575 if (ctx_->compression_level_set()) {
576 write_ops_.set_compression_level(ctx_->compression_level());
578 ctx_->sent_initial_metadata_ = true;
580 // TODO(vjpai): don't assert
581 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
582 call_.PerformOps(&write_ops_);
585 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
586 ::grpc::Status s) override {
587 // This combines the write into the finish callback
588 // TODO(vjpai): don't assert
589 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
590 Finish(std::move(s));
594 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
596 ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
597 ::grpc::internal::Call* call,
598 const RequestType* req,
599 std::function<void()> call_requester)
603 call_requester_(std::move(call_requester)) {}
605 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
606 reactor_.store(reactor, std::memory_order_relaxed);
607 // The callback for this function should not be inlined because it invokes
608 // a user-controlled reaction, but any resulting OnDone can be inlined in
609 // the executor to which this callback is dispatched.
612 [this, reactor](bool ok) {
613 reactor->OnWriteDone(ok);
614 this->MaybeDone(/*inlineable_ondone=*/true);
616 &write_ops_, /*can_inline=*/false);
617 write_ops_.set_core_cq_tag(&write_tag_);
618 this->BindReactor(reactor);
619 this->MaybeCallOnCancel(reactor);
620 // Inlineable OnDone can be false here because there is no write
621 // reactor that has an inlineable OnDone; this only applies to the
622 // DefaultReactor (which is unary).
623 this->MaybeDone(/*inlineable_ondone=*/false);
625 ~ServerCallbackWriterImpl() {
626 if (req_ != nullptr) {
627 req_->~RequestType();
631 const RequestType* request() { return req_; }
633 void CallOnDone() override {
634 reactor_.load(std::memory_order_relaxed)->OnDone();
635 grpc_call* call = call_.call();
636 auto call_requester = std::move(call_requester_);
637 if (ctx_->context_allocator() != nullptr) {
638 ctx_->context_allocator()->Release(ctx_);
640 this->~ServerCallbackWriterImpl(); // explicitly call destructor
641 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
645 ServerReactor* reactor() override {
646 return reactor_.load(std::memory_order_relaxed);
649 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
651 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
652 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
653 ::grpc::internal::CallOpSendMessage,
654 ::grpc::internal::CallOpServerSendStatus>
656 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
657 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
658 ::grpc::internal::CallOpSendMessage>
660 ::grpc::internal::CallbackWithSuccessTag write_tag_;
662 ::grpc::CallbackServerContext* const ctx_;
663 ::grpc::internal::Call call_;
664 const RequestType* req_;
665 std::function<void()> call_requester_;
666 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
667 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
668 // callbacks_outstanding_ follows a refcount pattern
669 std::atomic<intptr_t> callbacks_outstanding_{
670 3}; // reserve for OnStarted, Finish, and CompletionOp
674 template <class RequestType, class ResponseType>
675 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
677 explicit CallbackBidiHandler(
678 std::function<ServerBidiReactor<RequestType, ResponseType>*(
679 ::grpc::CallbackServerContext*)>
681 : get_reactor_(std::move(get_reactor)) {}
682 void RunHandler(const HandlerParameter& param) final {
683 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
685 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
686 param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
687 ServerCallbackReaderWriterImpl(
688 static_cast<::grpc::CallbackServerContext*>(param.server_context),
689 param.call, param.call_requester);
690 // Inlineable OnDone can be false in the CompletionOp callback because there
691 // is no bidi reactor that has an inlineable OnDone; this only applies to
692 // the DefaultReactor (which is unary).
693 param.server_context->BeginCompletionOp(
695 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
698 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
699 if (param.status.ok()) {
700 reactor = ::grpc::internal::CatchingReactorGetter<
701 ServerBidiReactor<RequestType, ResponseType>>(
703 static_cast<::grpc::CallbackServerContext*>(param.server_context));
706 if (reactor == nullptr) {
707 // if deserialization or reactor creator failed, we need to fail the call
708 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
710 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
711 UnimplementedBidiReactor<RequestType, ResponseType>(
712 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
715 stream->SetupReactor(reactor);
719 std::function<ServerBidiReactor<RequestType, ResponseType>*(
720 ::grpc::CallbackServerContext*)>
723 class ServerCallbackReaderWriterImpl
724 : public ServerCallbackReaderWriter<RequestType, ResponseType> {
726 void Finish(::grpc::Status s) override {
727 // A finish tag with only MaybeDone can have its callback inlined
728 // regardless even if OnDone is not inlineable because this callback just
729 // checks a ref and then decides whether or not to dispatch OnDone.
733 // Inlineable OnDone can be false here because there is
734 // no bidi reactor that has an inlineable OnDone; this
735 // only applies to the DefaultReactor (which is unary).
736 this->MaybeDone(/*inlineable_ondone=*/false);
738 &finish_ops_, /*can_inline=*/true);
739 finish_ops_.set_core_cq_tag(&finish_tag_);
741 if (!ctx_->sent_initial_metadata_) {
742 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743 ctx_->initial_metadata_flags());
744 if (ctx_->compression_level_set()) {
745 finish_ops_.set_compression_level(ctx_->compression_level());
747 ctx_->sent_initial_metadata_ = true;
749 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
750 call_.PerformOps(&finish_ops_);
753 void SendInitialMetadata() override {
754 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
756 // The callback for this function should not be inlined because it invokes
757 // a user-controlled reaction, but any resulting OnDone can be inlined in
758 // the executor to which this callback is dispatched.
762 ServerBidiReactor<RequestType, ResponseType>* reactor =
763 reactor_.load(std::memory_order_relaxed);
764 reactor->OnSendInitialMetadataDone(ok);
765 this->MaybeDone(/*inlineable_ondone=*/true);
767 &meta_ops_, /*can_inline=*/false);
768 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
769 ctx_->initial_metadata_flags());
770 if (ctx_->compression_level_set()) {
771 meta_ops_.set_compression_level(ctx_->compression_level());
773 ctx_->sent_initial_metadata_ = true;
774 meta_ops_.set_core_cq_tag(&meta_tag_);
775 call_.PerformOps(&meta_ops_);
778 void Write(const ResponseType* resp,
779 ::grpc::WriteOptions options) override {
781 if (options.is_last_message()) {
782 options.set_buffer_hint();
784 if (!ctx_->sent_initial_metadata_) {
785 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786 ctx_->initial_metadata_flags());
787 if (ctx_->compression_level_set()) {
788 write_ops_.set_compression_level(ctx_->compression_level());
790 ctx_->sent_initial_metadata_ = true;
792 // TODO(vjpai): don't assert
793 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
794 call_.PerformOps(&write_ops_);
797 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
798 ::grpc::Status s) override {
799 // TODO(vjpai): don't assert
800 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
801 Finish(std::move(s));
804 void Read(RequestType* req) override {
806 read_ops_.RecvMessage(req);
807 call_.PerformOps(&read_ops_);
811 friend class CallbackBidiHandler<RequestType, ResponseType>;
813 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
814 ::grpc::internal::Call* call,
815 std::function<void()> call_requester)
816 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
818 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
819 reactor_.store(reactor, std::memory_order_relaxed);
820 // The callbacks for these functions should not be inlined because they
821 // invoke user-controlled reactions, but any resulting OnDones can be
822 // inlined in the executor to which a callback is dispatched.
825 [this, reactor](bool ok) {
826 reactor->OnWriteDone(ok);
827 this->MaybeDone(/*inlineable_ondone=*/true);
829 &write_ops_, /*can_inline=*/false);
830 write_ops_.set_core_cq_tag(&write_tag_);
833 [this, reactor](bool ok) {
834 reactor->OnReadDone(ok);
835 this->MaybeDone(/*inlineable_ondone=*/true);
837 &read_ops_, /*can_inline=*/false);
838 read_ops_.set_core_cq_tag(&read_tag_);
839 this->BindReactor(reactor);
840 this->MaybeCallOnCancel(reactor);
841 // Inlineable OnDone can be false here because there is no bidi
842 // reactor that has an inlineable OnDone; this only applies to the
843 // DefaultReactor (which is unary).
844 this->MaybeDone(/*inlineable_ondone=*/false);
847 void CallOnDone() override {
848 reactor_.load(std::memory_order_relaxed)->OnDone();
849 grpc_call* call = call_.call();
850 auto call_requester = std::move(call_requester_);
851 if (ctx_->context_allocator() != nullptr) {
852 ctx_->context_allocator()->Release(ctx_);
854 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
855 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
859 ServerReactor* reactor() override {
860 return reactor_.load(std::memory_order_relaxed);
863 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
865 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
866 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
867 ::grpc::internal::CallOpSendMessage,
868 ::grpc::internal::CallOpServerSendStatus>
870 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
871 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
872 ::grpc::internal::CallOpSendMessage>
874 ::grpc::internal::CallbackWithSuccessTag write_tag_;
875 ::grpc::internal::CallOpSet<
876 ::grpc::internal::CallOpRecvMessage<RequestType>>
878 ::grpc::internal::CallbackWithSuccessTag read_tag_;
880 ::grpc::CallbackServerContext* const ctx_;
881 ::grpc::internal::Call call_;
882 std::function<void()> call_requester_;
883 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
884 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
885 // callbacks_outstanding_ follows a refcount pattern
886 std::atomic<intptr_t> callbacks_outstanding_{
887 3}; // reserve for OnStarted, Finish, and CompletionOp
891 } // namespace internal
894 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H