3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
33 explicit CallbackUnaryHandler(
34 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35 const RequestType*, ResponseType*)>
37 : get_reactor_(std::move(get_reactor)) {}
39 void SetMessageAllocator(
40 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
42 allocator_ = allocator;
45 void RunHandler(const HandlerParameter& param) final {
46 // Arena allocate a controller structure (that includes request/response)
47 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48 auto* allocator_state = static_cast<
49 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
52 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53 param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
55 static_cast<::grpc::CallbackServerContext*>(param.server_context),
56 param.call, allocator_state, std::move(param.call_requester));
57 param.server_context->BeginCompletionOp(
58 param.call, [call](bool) { call->MaybeDone(); }, call);
60 ServerUnaryReactor* reactor = nullptr;
61 if (param.status.ok()) {
62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64 static_cast<::grpc::CallbackServerContext*>(param.server_context),
65 call->request(), call->response());
68 if (reactor == nullptr) {
69 // if deserialization or reactor creator failed, we need to fail the call
70 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
71 param.call->call(), sizeof(UnimplementedUnaryReactor)))
72 UnimplementedUnaryReactor(
73 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
76 /// Invoke SetupReactor as the last part of the handler
77 call->SetupReactor(reactor);
80 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
81 ::grpc::Status* status, void** handler_data) final {
82 ::grpc::ByteBuffer buf;
84 RequestType* request = nullptr;
85 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
86 allocator_state = nullptr;
87 if (allocator_ != nullptr) {
88 allocator_state = allocator_->AllocateMessages();
91 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
92 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
93 DefaultMessageHolder<RequestType, ResponseType>();
95 *handler_data = allocator_state;
96 request = allocator_state->request();
98 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
103 // Clean up on deserialization failure.
104 allocator_state->Release();
109 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
110 const RequestType*, ResponseType*)>
112 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
113 allocator_ = nullptr;
115 class ServerCallbackUnaryImpl : public ServerCallbackUnary {
117 void Finish(::grpc::Status s) override {
118 // A callback that only contains a call to MaybeDone can be run as an
119 // inline callback regardless of whether or not OnDone is inlineable
120 // because if the actual OnDone callback needs to be scheduled, MaybeDone
121 // is responsible for dispatching to an executor thread if needed. Thus,
122 // when setting up the finish_tag_, we can set its own callback to
128 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
130 &finish_ops_, /*can_inline=*/true);
131 finish_ops_.set_core_cq_tag(&finish_tag_);
133 if (!ctx_->sent_initial_metadata_) {
134 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135 ctx_->initial_metadata_flags());
136 if (ctx_->compression_level_set()) {
137 finish_ops_.set_compression_level(ctx_->compression_level());
139 ctx_->sent_initial_metadata_ = true;
141 // The response is dropped if the status is not OK.
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144 finish_ops_.SendMessagePtr(response()));
146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
148 finish_ops_.set_core_cq_tag(&finish_tag_);
149 call_.PerformOps(&finish_ops_);
152 void SendInitialMetadata() override {
153 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
155 // The callback for this function should not be marked inline because it
156 // is directly invoking a user-controlled reaction
157 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158 // thread. However, any OnDone needed after that can be inlined because it
159 // is already running on an executor thread.
163 ServerUnaryReactor* reactor =
164 reactor_.load(std::memory_order_relaxed);
165 reactor->OnSendInitialMetadataDone(ok);
166 this->MaybeDone(/*inlineable_ondone=*/true);
168 &meta_ops_, /*can_inline=*/false);
169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170 ctx_->initial_metadata_flags());
171 if (ctx_->compression_level_set()) {
172 meta_ops_.set_compression_level(ctx_->compression_level());
174 ctx_->sent_initial_metadata_ = true;
175 meta_ops_.set_core_cq_tag(&meta_tag_);
176 call_.PerformOps(&meta_ops_);
180 friend class CallbackUnaryHandler<RequestType, ResponseType>;
182 ServerCallbackUnaryImpl(
183 ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
184 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
186 std::function<void()> call_requester)
189 allocator_state_(allocator_state),
190 call_requester_(std::move(call_requester)) {
191 ctx_->set_message_allocator_state(allocator_state);
194 /// SetupReactor binds the reactor (which also releases any queued
195 /// operations), maybe calls OnCancel if possible/needed, and maybe marks
196 /// the completion of the RPC. This should be the last component of the
198 void SetupReactor(ServerUnaryReactor* reactor) {
199 reactor_.store(reactor, std::memory_order_relaxed);
200 this->BindReactor(reactor);
201 this->MaybeCallOnCancel(reactor);
202 this->MaybeDone(reactor->InternalInlineable());
205 const RequestType* request() { return allocator_state_->request(); }
206 ResponseType* response() { return allocator_state_->response(); }
208 void CallOnDone() override {
209 reactor_.load(std::memory_order_relaxed)->OnDone();
210 grpc_call* call = call_.call();
211 auto call_requester = std::move(call_requester_);
212 allocator_state_->Release();
213 this->~ServerCallbackUnaryImpl(); // explicitly call destructor
214 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
218 ServerReactor* reactor() override {
219 return reactor_.load(std::memory_order_relaxed);
222 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
224 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
225 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
226 ::grpc::internal::CallOpSendMessage,
227 ::grpc::internal::CallOpServerSendStatus>
229 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
231 ::grpc::CallbackServerContext* const ctx_;
232 ::grpc::internal::Call call_;
233 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
235 std::function<void()> call_requester_;
236 // reactor_ can always be loaded/stored with relaxed memory ordering because
237 // its value is only set once, independently of other data in the object,
238 // and the loads that use it will always actually come provably later even
239 // though they are from different threads since they are triggered by
240 // actions initiated only by the setting up of the reactor_ variable. In
241 // a sense, it's a delayed "const": it gets its value from the SetupReactor
242 // method (not the constructor, so it's not a true const), but it doesn't
243 // change after that and it only gets used by actions caused, directly or
244 // indirectly, by that setup. This comment also applies to the reactor_
245 // variables of the other streaming objects in this file.
246 std::atomic<ServerUnaryReactor*> reactor_;
247 // callbacks_outstanding_ follows a refcount pattern
248 std::atomic<intptr_t> callbacks_outstanding_{
249 3}; // reserve for start, Finish, and CompletionOp
253 template <class RequestType, class ResponseType>
254 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
256 explicit CallbackClientStreamingHandler(
257 std::function<ServerReadReactor<RequestType>*(
258 ::grpc::CallbackServerContext*, ResponseType*)>
260 : get_reactor_(std::move(get_reactor)) {}
261 void RunHandler(const HandlerParameter& param) final {
262 // Arena allocate a reader structure (that includes response)
263 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
265 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
266 param.call->call(), sizeof(ServerCallbackReaderImpl)))
267 ServerCallbackReaderImpl(
268 static_cast<::grpc::CallbackServerContext*>(param.server_context),
269 param.call, std::move(param.call_requester));
270 // Inlineable OnDone can be false in the CompletionOp callback because there
271 // is no read reactor that has an inlineable OnDone; this only applies to
272 // the DefaultReactor (which is unary).
273 param.server_context->BeginCompletionOp(
275 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
278 ServerReadReactor<RequestType>* reactor = nullptr;
279 if (param.status.ok()) {
280 reactor = ::grpc::internal::CatchingReactorGetter<
281 ServerReadReactor<RequestType>>(
283 static_cast<::grpc::CallbackServerContext*>(param.server_context),
287 if (reactor == nullptr) {
288 // if deserialization or reactor creator failed, we need to fail the call
289 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
290 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
291 UnimplementedReadReactor<RequestType>(
292 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
295 reader->SetupReactor(reactor);
299 std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
303 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
305 void Finish(::grpc::Status s) override {
306 // A finish tag with only MaybeDone can have its callback inlined
307 // regardless even if OnDone is not inlineable because this callback just
308 // checks a ref and then decides whether or not to dispatch OnDone.
312 // Inlineable OnDone can be false here because there is
313 // no read reactor that has an inlineable OnDone; this
314 // only applies to the DefaultReactor (which is unary).
315 this->MaybeDone(/*inlineable_ondone=*/false);
317 &finish_ops_, /*can_inline=*/true);
318 if (!ctx_->sent_initial_metadata_) {
319 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
320 ctx_->initial_metadata_flags());
321 if (ctx_->compression_level_set()) {
322 finish_ops_.set_compression_level(ctx_->compression_level());
324 ctx_->sent_initial_metadata_ = true;
326 // The response is dropped if the status is not OK.
328 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
329 finish_ops_.SendMessagePtr(&resp_));
331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
333 finish_ops_.set_core_cq_tag(&finish_tag_);
334 call_.PerformOps(&finish_ops_);
337 void SendInitialMetadata() override {
338 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
340 // The callback for this function should not be inlined because it invokes
341 // a user-controlled reaction, but any resulting OnDone can be inlined in
342 // the executor to which this callback is dispatched.
346 ServerReadReactor<RequestType>* reactor =
347 reactor_.load(std::memory_order_relaxed);
348 reactor->OnSendInitialMetadataDone(ok);
349 this->MaybeDone(/*inlineable_ondone=*/true);
351 &meta_ops_, /*can_inline=*/false);
352 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
353 ctx_->initial_metadata_flags());
354 if (ctx_->compression_level_set()) {
355 meta_ops_.set_compression_level(ctx_->compression_level());
357 ctx_->sent_initial_metadata_ = true;
358 meta_ops_.set_core_cq_tag(&meta_tag_);
359 call_.PerformOps(&meta_ops_);
362 void Read(RequestType* req) override {
364 read_ops_.RecvMessage(req);
365 call_.PerformOps(&read_ops_);
369 friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371 ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
372 ::grpc::internal::Call* call,
373 std::function<void()> call_requester)
374 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
377 reactor_.store(reactor, std::memory_order_relaxed);
378 // The callback for this function should not be inlined because it invokes
379 // a user-controlled reaction, but any resulting OnDone can be inlined in
380 // the executor to which this callback is dispatched.
383 [this, reactor](bool ok) {
384 reactor->OnReadDone(ok);
385 this->MaybeDone(/*inlineable_ondone=*/true);
387 &read_ops_, /*can_inline=*/false);
388 read_ops_.set_core_cq_tag(&read_tag_);
389 this->BindReactor(reactor);
390 this->MaybeCallOnCancel(reactor);
391 // Inlineable OnDone can be false here because there is no read
392 // reactor that has an inlineable OnDone; this only applies to the
393 // DefaultReactor (which is unary).
394 this->MaybeDone(/*inlineable_ondone=*/false);
397 ~ServerCallbackReaderImpl() {}
399 ResponseType* response() { return &resp_; }
401 void CallOnDone() override {
402 reactor_.load(std::memory_order_relaxed)->OnDone();
403 grpc_call* call = call_.call();
404 auto call_requester = std::move(call_requester_);
405 this->~ServerCallbackReaderImpl(); // explicitly call destructor
406 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
410 ServerReactor* reactor() override {
411 return reactor_.load(std::memory_order_relaxed);
414 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
416 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
417 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
418 ::grpc::internal::CallOpSendMessage,
419 ::grpc::internal::CallOpServerSendStatus>
421 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
422 ::grpc::internal::CallOpSet<
423 ::grpc::internal::CallOpRecvMessage<RequestType>>
425 ::grpc::internal::CallbackWithSuccessTag read_tag_;
427 ::grpc::CallbackServerContext* const ctx_;
428 ::grpc::internal::Call call_;
430 std::function<void()> call_requester_;
431 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432 std::atomic<ServerReadReactor<RequestType>*> reactor_;
433 // callbacks_outstanding_ follows a refcount pattern
434 std::atomic<intptr_t> callbacks_outstanding_{
435 3}; // reserve for OnStarted, Finish, and CompletionOp
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
442 explicit CallbackServerStreamingHandler(
443 std::function<ServerWriteReactor<ResponseType>*(
444 ::grpc::CallbackServerContext*, const RequestType*)>
446 : get_reactor_(std::move(get_reactor)) {}
447 void RunHandler(const HandlerParameter& param) final {
448 // Arena allocate a writer structure
449 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
451 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
452 param.call->call(), sizeof(ServerCallbackWriterImpl)))
453 ServerCallbackWriterImpl(
454 static_cast<::grpc::CallbackServerContext*>(param.server_context),
455 param.call, static_cast<RequestType*>(param.request),
456 std::move(param.call_requester));
457 // Inlineable OnDone can be false in the CompletionOp callback because there
458 // is no write reactor that has an inlineable OnDone; this only applies to
459 // the DefaultReactor (which is unary).
460 param.server_context->BeginCompletionOp(
462 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
465 ServerWriteReactor<ResponseType>* reactor = nullptr;
466 if (param.status.ok()) {
467 reactor = ::grpc::internal::CatchingReactorGetter<
468 ServerWriteReactor<ResponseType>>(
470 static_cast<::grpc::CallbackServerContext*>(param.server_context),
473 if (reactor == nullptr) {
474 // if deserialization or reactor creator failed, we need to fail the call
475 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
476 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
477 UnimplementedWriteReactor<ResponseType>(
478 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
481 writer->SetupReactor(reactor);
484 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
485 ::grpc::Status* status, void** /*handler_data*/) final {
486 ::grpc::ByteBuffer buf;
489 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
490 call, sizeof(RequestType))) RequestType();
492 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
497 request->~RequestType();
502 std::function<ServerWriteReactor<ResponseType>*(
503 ::grpc::CallbackServerContext*, const RequestType*)>
506 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
508 void Finish(::grpc::Status s) override {
509 // A finish tag with only MaybeDone can have its callback inlined
510 // regardless even if OnDone is not inlineable because this callback just
511 // checks a ref and then decides whether or not to dispatch OnDone.
515 // Inlineable OnDone can be false here because there is
516 // no write reactor that has an inlineable OnDone; this
517 // only applies to the DefaultReactor (which is unary).
518 this->MaybeDone(/*inlineable_ondone=*/false);
520 &finish_ops_, /*can_inline=*/true);
521 finish_ops_.set_core_cq_tag(&finish_tag_);
523 if (!ctx_->sent_initial_metadata_) {
524 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
525 ctx_->initial_metadata_flags());
526 if (ctx_->compression_level_set()) {
527 finish_ops_.set_compression_level(ctx_->compression_level());
529 ctx_->sent_initial_metadata_ = true;
531 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
532 call_.PerformOps(&finish_ops_);
535 void SendInitialMetadata() override {
536 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
538 // The callback for this function should not be inlined because it invokes
539 // a user-controlled reaction, but any resulting OnDone can be inlined in
540 // the executor to which this callback is dispatched.
544 ServerWriteReactor<ResponseType>* reactor =
545 reactor_.load(std::memory_order_relaxed);
546 reactor->OnSendInitialMetadataDone(ok);
547 this->MaybeDone(/*inlineable_ondone=*/true);
549 &meta_ops_, /*can_inline=*/false);
550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551 ctx_->initial_metadata_flags());
552 if (ctx_->compression_level_set()) {
553 meta_ops_.set_compression_level(ctx_->compression_level());
555 ctx_->sent_initial_metadata_ = true;
556 meta_ops_.set_core_cq_tag(&meta_tag_);
557 call_.PerformOps(&meta_ops_);
560 void Write(const ResponseType* resp,
561 ::grpc::WriteOptions options) override {
563 if (options.is_last_message()) {
564 options.set_buffer_hint();
566 if (!ctx_->sent_initial_metadata_) {
567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568 ctx_->initial_metadata_flags());
569 if (ctx_->compression_level_set()) {
570 write_ops_.set_compression_level(ctx_->compression_level());
572 ctx_->sent_initial_metadata_ = true;
574 // TODO(vjpai): don't assert
575 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576 call_.PerformOps(&write_ops_);
579 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580 ::grpc::Status s) override {
581 // This combines the write into the finish callback
582 // TODO(vjpai): don't assert
583 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584 Finish(std::move(s));
588 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
590 ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
591 ::grpc::internal::Call* call,
592 const RequestType* req,
593 std::function<void()> call_requester)
597 call_requester_(std::move(call_requester)) {}
599 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600 reactor_.store(reactor, std::memory_order_relaxed);
601 // The callback for this function should not be inlined because it invokes
602 // a user-controlled reaction, but any resulting OnDone can be inlined in
603 // the executor to which this callback is dispatched.
606 [this, reactor](bool ok) {
607 reactor->OnWriteDone(ok);
608 this->MaybeDone(/*inlineable_ondone=*/true);
610 &write_ops_, /*can_inline=*/false);
611 write_ops_.set_core_cq_tag(&write_tag_);
612 this->BindReactor(reactor);
613 this->MaybeCallOnCancel(reactor);
614 // Inlineable OnDone can be false here because there is no write
615 // reactor that has an inlineable OnDone; this only applies to the
616 // DefaultReactor (which is unary).
617 this->MaybeDone(/*inlineable_ondone=*/false);
619 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
621 const RequestType* request() { return req_; }
623 void CallOnDone() override {
624 reactor_.load(std::memory_order_relaxed)->OnDone();
625 grpc_call* call = call_.call();
626 auto call_requester = std::move(call_requester_);
627 this->~ServerCallbackWriterImpl(); // explicitly call destructor
628 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
632 ServerReactor* reactor() override {
633 return reactor_.load(std::memory_order_relaxed);
636 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
638 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
639 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
640 ::grpc::internal::CallOpSendMessage,
641 ::grpc::internal::CallOpServerSendStatus>
643 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
644 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
645 ::grpc::internal::CallOpSendMessage>
647 ::grpc::internal::CallbackWithSuccessTag write_tag_;
649 ::grpc::CallbackServerContext* const ctx_;
650 ::grpc::internal::Call call_;
651 const RequestType* req_;
652 std::function<void()> call_requester_;
653 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
654 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
655 // callbacks_outstanding_ follows a refcount pattern
656 std::atomic<intptr_t> callbacks_outstanding_{
657 3}; // reserve for OnStarted, Finish, and CompletionOp
661 template <class RequestType, class ResponseType>
662 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
664 explicit CallbackBidiHandler(
665 std::function<ServerBidiReactor<RequestType, ResponseType>*(
666 ::grpc::CallbackServerContext*)>
668 : get_reactor_(std::move(get_reactor)) {}
669 void RunHandler(const HandlerParameter& param) final {
670 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
672 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
673 param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
674 ServerCallbackReaderWriterImpl(
675 static_cast<::grpc::CallbackServerContext*>(param.server_context),
676 param.call, std::move(param.call_requester));
677 // Inlineable OnDone can be false in the CompletionOp callback because there
678 // is no bidi reactor that has an inlineable OnDone; this only applies to
679 // the DefaultReactor (which is unary).
680 param.server_context->BeginCompletionOp(
682 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
685 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
686 if (param.status.ok()) {
687 reactor = ::grpc::internal::CatchingReactorGetter<
688 ServerBidiReactor<RequestType, ResponseType>>(
690 static_cast<::grpc::CallbackServerContext*>(param.server_context));
693 if (reactor == nullptr) {
694 // if deserialization or reactor creator failed, we need to fail the call
695 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
697 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
698 UnimplementedBidiReactor<RequestType, ResponseType>(
699 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
702 stream->SetupReactor(reactor);
706 std::function<ServerBidiReactor<RequestType, ResponseType>*(
707 ::grpc::CallbackServerContext*)>
710 class ServerCallbackReaderWriterImpl
711 : public ServerCallbackReaderWriter<RequestType, ResponseType> {
713 void Finish(::grpc::Status s) override {
714 // A finish tag with only MaybeDone can have its callback inlined
715 // regardless even if OnDone is not inlineable because this callback just
716 // checks a ref and then decides whether or not to dispatch OnDone.
720 // Inlineable OnDone can be false here because there is
721 // no bidi reactor that has an inlineable OnDone; this
722 // only applies to the DefaultReactor (which is unary).
723 this->MaybeDone(/*inlineable_ondone=*/false);
725 &finish_ops_, /*can_inline=*/true);
726 finish_ops_.set_core_cq_tag(&finish_tag_);
728 if (!ctx_->sent_initial_metadata_) {
729 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
730 ctx_->initial_metadata_flags());
731 if (ctx_->compression_level_set()) {
732 finish_ops_.set_compression_level(ctx_->compression_level());
734 ctx_->sent_initial_metadata_ = true;
736 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
737 call_.PerformOps(&finish_ops_);
740 void SendInitialMetadata() override {
741 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
743 // The callback for this function should not be inlined because it invokes
744 // a user-controlled reaction, but any resulting OnDone can be inlined in
745 // the executor to which this callback is dispatched.
749 ServerBidiReactor<RequestType, ResponseType>* reactor =
750 reactor_.load(std::memory_order_relaxed);
751 reactor->OnSendInitialMetadataDone(ok);
752 this->MaybeDone(/*inlineable_ondone=*/true);
754 &meta_ops_, /*can_inline=*/false);
755 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
756 ctx_->initial_metadata_flags());
757 if (ctx_->compression_level_set()) {
758 meta_ops_.set_compression_level(ctx_->compression_level());
760 ctx_->sent_initial_metadata_ = true;
761 meta_ops_.set_core_cq_tag(&meta_tag_);
762 call_.PerformOps(&meta_ops_);
765 void Write(const ResponseType* resp,
766 ::grpc::WriteOptions options) override {
768 if (options.is_last_message()) {
769 options.set_buffer_hint();
771 if (!ctx_->sent_initial_metadata_) {
772 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773 ctx_->initial_metadata_flags());
774 if (ctx_->compression_level_set()) {
775 write_ops_.set_compression_level(ctx_->compression_level());
777 ctx_->sent_initial_metadata_ = true;
779 // TODO(vjpai): don't assert
780 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
781 call_.PerformOps(&write_ops_);
784 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
785 ::grpc::Status s) override {
786 // TODO(vjpai): don't assert
787 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
788 Finish(std::move(s));
791 void Read(RequestType* req) override {
793 read_ops_.RecvMessage(req);
794 call_.PerformOps(&read_ops_);
798 friend class CallbackBidiHandler<RequestType, ResponseType>;
800 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
801 ::grpc::internal::Call* call,
802 std::function<void()> call_requester)
803 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
805 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
806 reactor_.store(reactor, std::memory_order_relaxed);
807 // The callbacks for these functions should not be inlined because they
808 // invoke user-controlled reactions, but any resulting OnDones can be
809 // inlined in the executor to which a callback is dispatched.
812 [this, reactor](bool ok) {
813 reactor->OnWriteDone(ok);
814 this->MaybeDone(/*inlineable_ondone=*/true);
816 &write_ops_, /*can_inline=*/false);
817 write_ops_.set_core_cq_tag(&write_tag_);
820 [this, reactor](bool ok) {
821 reactor->OnReadDone(ok);
822 this->MaybeDone(/*inlineable_ondone=*/true);
824 &read_ops_, /*can_inline=*/false);
825 read_ops_.set_core_cq_tag(&read_tag_);
826 this->BindReactor(reactor);
827 this->MaybeCallOnCancel(reactor);
828 // Inlineable OnDone can be false here because there is no bidi
829 // reactor that has an inlineable OnDone; this only applies to the
830 // DefaultReactor (which is unary).
831 this->MaybeDone(/*inlineable_ondone=*/false);
834 void CallOnDone() override {
835 reactor_.load(std::memory_order_relaxed)->OnDone();
836 grpc_call* call = call_.call();
837 auto call_requester = std::move(call_requester_);
838 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
839 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
843 ServerReactor* reactor() override {
844 return reactor_.load(std::memory_order_relaxed);
847 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
849 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
850 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
851 ::grpc::internal::CallOpSendMessage,
852 ::grpc::internal::CallOpServerSendStatus>
854 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
855 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
856 ::grpc::internal::CallOpSendMessage>
858 ::grpc::internal::CallbackWithSuccessTag write_tag_;
859 ::grpc::internal::CallOpSet<
860 ::grpc::internal::CallOpRecvMessage<RequestType>>
862 ::grpc::internal::CallbackWithSuccessTag read_tag_;
864 ::grpc::CallbackServerContext* const ctx_;
865 ::grpc::internal::Call call_;
866 std::function<void()> call_requester_;
867 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
868 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
869 // callbacks_outstanding_ follows a refcount pattern
870 std::atomic<intptr_t> callbacks_outstanding_{
871 3}; // reserve for OnStarted, Finish, and CompletionOp
875 } // namespace internal
878 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H