3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback_impl.h>
24 #include <grpcpp/impl/codegen/server_context_impl.h>
25 #include <grpcpp/impl/codegen/status.h>
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
33 explicit CallbackUnaryHandler(
34 std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
35 const RequestType*, ResponseType*)>
37 : get_reactor_(std::move(get_reactor)) {}
39 void SetMessageAllocator(
40 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
42 allocator_ = allocator;
45 void RunHandler(const HandlerParameter& param) final {
46 // Arena allocate a controller structure (that includes request/response)
47 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48 auto* allocator_state = static_cast<
49 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
52 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53 param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
55 static_cast<::grpc_impl::CallbackServerContext*>(
56 param.server_context),
57 param.call, allocator_state, std::move(param.call_requester));
58 param.server_context->BeginCompletionOp(
59 param.call, [call](bool) { call->MaybeDone(); }, call);
61 ServerUnaryReactor* reactor = nullptr;
62 if (param.status.ok()) {
63 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
65 static_cast<::grpc_impl::CallbackServerContext*>(
66 param.server_context),
67 call->request(), call->response());
70 if (reactor == nullptr) {
71 // if deserialization or reactor creator failed, we need to fail the call
72 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
73 param.call->call(), sizeof(UnimplementedUnaryReactor)))
74 UnimplementedUnaryReactor(
75 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
78 /// Invoke SetupReactor as the last part of the handler
79 call->SetupReactor(reactor);
82 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
83 ::grpc::Status* status, void** handler_data) final {
84 ::grpc::ByteBuffer buf;
86 RequestType* request = nullptr;
87 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
88 allocator_state = nullptr;
89 if (allocator_ != nullptr) {
90 allocator_state = allocator_->AllocateMessages();
93 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
94 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
95 DefaultMessageHolder<RequestType, ResponseType>();
97 *handler_data = allocator_state;
98 request = allocator_state->request();
100 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
105 // Clean up on deserialization failure.
106 allocator_state->Release();
111 std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
112 const RequestType*, ResponseType*)>
114 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
115 allocator_ = nullptr;
117 class ServerCallbackUnaryImpl : public ServerCallbackUnary {
119 void Finish(::grpc::Status s) override {
120 // A callback that only contains a call to MaybeDone can be run as an
121 // inline callback regardless of whether or not OnDone is inlineable
122 // because if the actual OnDone callback needs to be scheduled, MaybeDone
123 // is responsible for dispatching to an executor thread if needed. Thus,
124 // when setting up the finish_tag_, we can set its own callback to
130 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
132 &finish_ops_, /*can_inline=*/true);
133 finish_ops_.set_core_cq_tag(&finish_tag_);
135 if (!ctx_->sent_initial_metadata_) {
136 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
137 ctx_->initial_metadata_flags());
138 if (ctx_->compression_level_set()) {
139 finish_ops_.set_compression_level(ctx_->compression_level());
141 ctx_->sent_initial_metadata_ = true;
143 // The response is dropped if the status is not OK.
145 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146 finish_ops_.SendMessagePtr(response()));
148 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
150 finish_ops_.set_core_cq_tag(&finish_tag_);
151 call_.PerformOps(&finish_ops_);
154 void SendInitialMetadata() override {
155 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
157 // The callback for this function should not be marked inline because it
158 // is directly invoking a user-controlled reaction
159 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
160 // thread. However, any OnDone needed after that can be inlined because it
161 // is already running on an executor thread.
162 meta_tag_.Set(call_.call(),
164 ServerUnaryReactor* reactor =
165 reactor_.load(std::memory_order_relaxed);
166 reactor->OnSendInitialMetadataDone(ok);
167 this->MaybeDone(/*inlineable_ondone=*/true);
169 &meta_ops_, /*can_inline=*/false);
170 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171 ctx_->initial_metadata_flags());
172 if (ctx_->compression_level_set()) {
173 meta_ops_.set_compression_level(ctx_->compression_level());
175 ctx_->sent_initial_metadata_ = true;
176 meta_ops_.set_core_cq_tag(&meta_tag_);
177 call_.PerformOps(&meta_ops_);
181 friend class CallbackUnaryHandler<RequestType, ResponseType>;
183 ServerCallbackUnaryImpl(
184 ::grpc_impl::CallbackServerContext* ctx, ::grpc::internal::Call* call,
185 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
187 std::function<void()> call_requester)
190 allocator_state_(allocator_state),
191 call_requester_(std::move(call_requester)) {
192 ctx_->set_message_allocator_state(allocator_state);
195 /// SetupReactor binds the reactor (which also releases any queued
196 /// operations), maybe calls OnCancel if possible/needed, and maybe marks
197 /// the completion of the RPC. This should be the last component of the
199 void SetupReactor(ServerUnaryReactor* reactor) {
200 reactor_.store(reactor, std::memory_order_relaxed);
201 this->BindReactor(reactor);
202 this->MaybeCallOnCancel(reactor);
203 this->MaybeDone(reactor->InternalInlineable());
206 const RequestType* request() { return allocator_state_->request(); }
207 ResponseType* response() { return allocator_state_->response(); }
209 void CallOnDone() override {
210 reactor_.load(std::memory_order_relaxed)->OnDone();
211 grpc_call* call = call_.call();
212 auto call_requester = std::move(call_requester_);
213 allocator_state_->Release();
214 this->~ServerCallbackUnaryImpl(); // explicitly call destructor
215 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
219 ServerReactor* reactor() override {
220 return reactor_.load(std::memory_order_relaxed);
223 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
225 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
226 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
227 ::grpc::internal::CallOpSendMessage,
228 ::grpc::internal::CallOpServerSendStatus>
230 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
232 ::grpc_impl::CallbackServerContext* const ctx_;
233 ::grpc::internal::Call call_;
234 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
236 std::function<void()> call_requester_;
237 // reactor_ can always be loaded/stored with relaxed memory ordering because
238 // its value is only set once, independently of other data in the object,
239 // and the loads that use it will always actually come provably later even
240 // though they are from different threads since they are triggered by
241 // actions initiated only by the setting up of the reactor_ variable. In
242 // a sense, it's a delayed "const": it gets its value from the SetupReactor
243 // method (not the constructor, so it's not a true const), but it doesn't
244 // change after that and it only gets used by actions caused, directly or
245 // indirectly, by that setup. This comment also applies to the reactor_
246 // variables of the other streaming objects in this file.
247 std::atomic<ServerUnaryReactor*> reactor_;
248 // callbacks_outstanding_ follows a refcount pattern
249 std::atomic<intptr_t> callbacks_outstanding_{
250 3}; // reserve for start, Finish, and CompletionOp
254 template <class RequestType, class ResponseType>
255 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
257 explicit CallbackClientStreamingHandler(
258 std::function<ServerReadReactor<RequestType>*(
259 ::grpc_impl::CallbackServerContext*, ResponseType*)>
261 : get_reactor_(std::move(get_reactor)) {}
262 void RunHandler(const HandlerParameter& param) final {
263 // Arena allocate a reader structure (that includes response)
264 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
266 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
267 param.call->call(), sizeof(ServerCallbackReaderImpl)))
268 ServerCallbackReaderImpl(
269 static_cast<::grpc_impl::CallbackServerContext*>(
270 param.server_context),
271 param.call, std::move(param.call_requester));
272 // Inlineable OnDone can be false in the CompletionOp callback because there
273 // is no read reactor that has an inlineable OnDone; this only applies to
274 // the DefaultReactor (which is unary).
275 param.server_context->BeginCompletionOp(
277 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
280 ServerReadReactor<RequestType>* reactor = nullptr;
281 if (param.status.ok()) {
282 reactor = ::grpc::internal::CatchingReactorGetter<
283 ServerReadReactor<RequestType>>(
285 static_cast<::grpc_impl::CallbackServerContext*>(
286 param.server_context),
290 if (reactor == nullptr) {
291 // if deserialization or reactor creator failed, we need to fail the call
292 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
293 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294 UnimplementedReadReactor<RequestType>(
295 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
298 reader->SetupReactor(reactor);
302 std::function<ServerReadReactor<RequestType>*(
303 ::grpc_impl::CallbackServerContext*, ResponseType*)>
306 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
308 void Finish(::grpc::Status s) override {
309 // A finish tag with only MaybeDone can have its callback inlined
310 // regardless even if OnDone is not inlineable because this callback just
311 // checks a ref and then decides whether or not to dispatch OnDone.
312 finish_tag_.Set(call_.call(),
314 // Inlineable OnDone can be false here because there is
315 // no read reactor that has an inlineable OnDone; this
316 // only applies to the DefaultReactor (which is unary).
317 this->MaybeDone(/*inlineable_ondone=*/false);
319 &finish_ops_, /*can_inline=*/true);
320 if (!ctx_->sent_initial_metadata_) {
321 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322 ctx_->initial_metadata_flags());
323 if (ctx_->compression_level_set()) {
324 finish_ops_.set_compression_level(ctx_->compression_level());
326 ctx_->sent_initial_metadata_ = true;
328 // The response is dropped if the status is not OK.
330 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331 finish_ops_.SendMessagePtr(&resp_));
333 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335 finish_ops_.set_core_cq_tag(&finish_tag_);
336 call_.PerformOps(&finish_ops_);
339 void SendInitialMetadata() override {
340 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
342 // The callback for this function should not be inlined because it invokes
343 // a user-controlled reaction, but any resulting OnDone can be inlined in
344 // the executor to which this callback is dispatched.
345 meta_tag_.Set(call_.call(),
347 ServerReadReactor<RequestType>* reactor =
348 reactor_.load(std::memory_order_relaxed);
349 reactor->OnSendInitialMetadataDone(ok);
350 this->MaybeDone(/*inlineable_ondone=*/true);
352 &meta_ops_, /*can_inline=*/false);
353 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354 ctx_->initial_metadata_flags());
355 if (ctx_->compression_level_set()) {
356 meta_ops_.set_compression_level(ctx_->compression_level());
358 ctx_->sent_initial_metadata_ = true;
359 meta_ops_.set_core_cq_tag(&meta_tag_);
360 call_.PerformOps(&meta_ops_);
363 void Read(RequestType* req) override {
365 read_ops_.RecvMessage(req);
366 call_.PerformOps(&read_ops_);
370 friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
372 ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
373 ::grpc::internal::Call* call,
374 std::function<void()> call_requester)
375 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
377 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378 reactor_.store(reactor, std::memory_order_relaxed);
379 // The callback for this function should not be inlined because it invokes
380 // a user-controlled reaction, but any resulting OnDone can be inlined in
381 // the executor to which this callback is dispatched.
382 read_tag_.Set(call_.call(),
383 [this, reactor](bool ok) {
384 reactor->OnReadDone(ok);
385 this->MaybeDone(/*inlineable_ondone=*/true);
387 &read_ops_, /*can_inline=*/false);
388 read_ops_.set_core_cq_tag(&read_tag_);
389 this->BindReactor(reactor);
390 this->MaybeCallOnCancel(reactor);
391 // Inlineable OnDone can be false here because there is no read
392 // reactor that has an inlineable OnDone; this only applies to the
393 // DefaultReactor (which is unary).
394 this->MaybeDone(/*inlineable_ondone=*/false);
397 ~ServerCallbackReaderImpl() {}
399 ResponseType* response() { return &resp_; }
401 void CallOnDone() override {
402 reactor_.load(std::memory_order_relaxed)->OnDone();
403 grpc_call* call = call_.call();
404 auto call_requester = std::move(call_requester_);
405 this->~ServerCallbackReaderImpl(); // explicitly call destructor
406 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
410 ServerReactor* reactor() override {
411 return reactor_.load(std::memory_order_relaxed);
414 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
416 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
417 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
418 ::grpc::internal::CallOpSendMessage,
419 ::grpc::internal::CallOpServerSendStatus>
421 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
422 ::grpc::internal::CallOpSet<
423 ::grpc::internal::CallOpRecvMessage<RequestType>>
425 ::grpc::internal::CallbackWithSuccessTag read_tag_;
427 ::grpc_impl::CallbackServerContext* const ctx_;
428 ::grpc::internal::Call call_;
430 std::function<void()> call_requester_;
431 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432 std::atomic<ServerReadReactor<RequestType>*> reactor_;
433 // callbacks_outstanding_ follows a refcount pattern
434 std::atomic<intptr_t> callbacks_outstanding_{
435 3}; // reserve for OnStarted, Finish, and CompletionOp
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
442 explicit CallbackServerStreamingHandler(
443 std::function<ServerWriteReactor<ResponseType>*(
444 ::grpc_impl::CallbackServerContext*, const RequestType*)>
446 : get_reactor_(std::move(get_reactor)) {}
447 void RunHandler(const HandlerParameter& param) final {
448 // Arena allocate a writer structure
449 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
451 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
452 param.call->call(), sizeof(ServerCallbackWriterImpl)))
453 ServerCallbackWriterImpl(
454 static_cast<::grpc_impl::CallbackServerContext*>(
455 param.server_context),
456 param.call, static_cast<RequestType*>(param.request),
457 std::move(param.call_requester));
458 // Inlineable OnDone can be false in the CompletionOp callback because there
459 // is no write reactor that has an inlineable OnDone; this only applies to
460 // the DefaultReactor (which is unary).
461 param.server_context->BeginCompletionOp(
463 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
466 ServerWriteReactor<ResponseType>* reactor = nullptr;
467 if (param.status.ok()) {
468 reactor = ::grpc::internal::CatchingReactorGetter<
469 ServerWriteReactor<ResponseType>>(
471 static_cast<::grpc_impl::CallbackServerContext*>(
472 param.server_context),
475 if (reactor == nullptr) {
476 // if deserialization or reactor creator failed, we need to fail the call
477 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
478 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
479 UnimplementedWriteReactor<ResponseType>(
480 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
483 writer->SetupReactor(reactor);
486 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
487 ::grpc::Status* status, void** /*handler_data*/) final {
488 ::grpc::ByteBuffer buf;
491 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
492 call, sizeof(RequestType))) RequestType();
494 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
499 request->~RequestType();
504 std::function<ServerWriteReactor<ResponseType>*(
505 ::grpc_impl::CallbackServerContext*, const RequestType*)>
508 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
510 void Finish(::grpc::Status s) override {
511 // A finish tag with only MaybeDone can have its callback inlined
512 // regardless even if OnDone is not inlineable because this callback just
513 // checks a ref and then decides whether or not to dispatch OnDone.
514 finish_tag_.Set(call_.call(),
516 // Inlineable OnDone can be false here because there is
517 // no write reactor that has an inlineable OnDone; this
518 // only applies to the DefaultReactor (which is unary).
519 this->MaybeDone(/*inlineable_ondone=*/false);
521 &finish_ops_, /*can_inline=*/true);
522 finish_ops_.set_core_cq_tag(&finish_tag_);
524 if (!ctx_->sent_initial_metadata_) {
525 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526 ctx_->initial_metadata_flags());
527 if (ctx_->compression_level_set()) {
528 finish_ops_.set_compression_level(ctx_->compression_level());
530 ctx_->sent_initial_metadata_ = true;
532 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533 call_.PerformOps(&finish_ops_);
536 void SendInitialMetadata() override {
537 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
539 // The callback for this function should not be inlined because it invokes
540 // a user-controlled reaction, but any resulting OnDone can be inlined in
541 // the executor to which this callback is dispatched.
542 meta_tag_.Set(call_.call(),
544 ServerWriteReactor<ResponseType>* reactor =
545 reactor_.load(std::memory_order_relaxed);
546 reactor->OnSendInitialMetadataDone(ok);
547 this->MaybeDone(/*inlineable_ondone=*/true);
549 &meta_ops_, /*can_inline=*/false);
550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551 ctx_->initial_metadata_flags());
552 if (ctx_->compression_level_set()) {
553 meta_ops_.set_compression_level(ctx_->compression_level());
555 ctx_->sent_initial_metadata_ = true;
556 meta_ops_.set_core_cq_tag(&meta_tag_);
557 call_.PerformOps(&meta_ops_);
560 void Write(const ResponseType* resp,
561 ::grpc::WriteOptions options) override {
563 if (options.is_last_message()) {
564 options.set_buffer_hint();
566 if (!ctx_->sent_initial_metadata_) {
567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568 ctx_->initial_metadata_flags());
569 if (ctx_->compression_level_set()) {
570 write_ops_.set_compression_level(ctx_->compression_level());
572 ctx_->sent_initial_metadata_ = true;
574 // TODO(vjpai): don't assert
575 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576 call_.PerformOps(&write_ops_);
579 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580 ::grpc::Status s) override {
581 // This combines the write into the finish callback
582 // Don't send any message if the status is bad
584 // TODO(vjpai): don't assert
585 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
587 Finish(std::move(s));
591 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
593 ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
594 ::grpc::internal::Call* call,
595 const RequestType* req,
596 std::function<void()> call_requester)
600 call_requester_(std::move(call_requester)) {}
602 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
603 reactor_.store(reactor, std::memory_order_relaxed);
604 // The callback for this function should not be inlined because it invokes
605 // a user-controlled reaction, but any resulting OnDone can be inlined in
606 // the executor to which this callback is dispatched.
607 write_tag_.Set(call_.call(),
608 [this, reactor](bool ok) {
609 reactor->OnWriteDone(ok);
610 this->MaybeDone(/*inlineable_ondone=*/true);
612 &write_ops_, /*can_inline=*/false);
613 write_ops_.set_core_cq_tag(&write_tag_);
614 this->BindReactor(reactor);
615 this->MaybeCallOnCancel(reactor);
616 // Inlineable OnDone can be false here because there is no write
617 // reactor that has an inlineable OnDone; this only applies to the
618 // DefaultReactor (which is unary).
619 this->MaybeDone(/*inlineable_ondone=*/false);
621 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
623 const RequestType* request() { return req_; }
625 void CallOnDone() override {
626 reactor_.load(std::memory_order_relaxed)->OnDone();
627 grpc_call* call = call_.call();
628 auto call_requester = std::move(call_requester_);
629 this->~ServerCallbackWriterImpl(); // explicitly call destructor
630 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
634 ServerReactor* reactor() override {
635 return reactor_.load(std::memory_order_relaxed);
638 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
640 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
641 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
642 ::grpc::internal::CallOpSendMessage,
643 ::grpc::internal::CallOpServerSendStatus>
645 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
646 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
647 ::grpc::internal::CallOpSendMessage>
649 ::grpc::internal::CallbackWithSuccessTag write_tag_;
651 ::grpc_impl::CallbackServerContext* const ctx_;
652 ::grpc::internal::Call call_;
653 const RequestType* req_;
654 std::function<void()> call_requester_;
655 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
656 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
657 // callbacks_outstanding_ follows a refcount pattern
658 std::atomic<intptr_t> callbacks_outstanding_{
659 3}; // reserve for OnStarted, Finish, and CompletionOp
663 template <class RequestType, class ResponseType>
664 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
666 explicit CallbackBidiHandler(
667 std::function<ServerBidiReactor<RequestType, ResponseType>*(
668 ::grpc_impl::CallbackServerContext*)>
670 : get_reactor_(std::move(get_reactor)) {}
671 void RunHandler(const HandlerParameter& param) final {
672 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
674 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
675 param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
676 ServerCallbackReaderWriterImpl(
677 static_cast<::grpc_impl::CallbackServerContext*>(
678 param.server_context),
679 param.call, std::move(param.call_requester));
680 // Inlineable OnDone can be false in the CompletionOp callback because there
681 // is no bidi reactor that has an inlineable OnDone; this only applies to
682 // the DefaultReactor (which is unary).
683 param.server_context->BeginCompletionOp(
685 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
688 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
689 if (param.status.ok()) {
690 reactor = ::grpc::internal::CatchingReactorGetter<
691 ServerBidiReactor<RequestType, ResponseType>>(
692 get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
693 param.server_context));
696 if (reactor == nullptr) {
697 // if deserialization or reactor creator failed, we need to fail the call
698 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
700 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
701 UnimplementedBidiReactor<RequestType, ResponseType>(
702 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
705 stream->SetupReactor(reactor);
709 std::function<ServerBidiReactor<RequestType, ResponseType>*(
710 ::grpc_impl::CallbackServerContext*)>
713 class ServerCallbackReaderWriterImpl
714 : public ServerCallbackReaderWriter<RequestType, ResponseType> {
716 void Finish(::grpc::Status s) override {
717 // A finish tag with only MaybeDone can have its callback inlined
718 // regardless even if OnDone is not inlineable because this callback just
719 // checks a ref and then decides whether or not to dispatch OnDone.
720 finish_tag_.Set(call_.call(),
722 // Inlineable OnDone can be false here because there is
723 // no bidi reactor that has an inlineable OnDone; this
724 // only applies to the DefaultReactor (which is unary).
725 this->MaybeDone(/*inlineable_ondone=*/false);
727 &finish_ops_, /*can_inline=*/true);
728 finish_ops_.set_core_cq_tag(&finish_tag_);
730 if (!ctx_->sent_initial_metadata_) {
731 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
732 ctx_->initial_metadata_flags());
733 if (ctx_->compression_level_set()) {
734 finish_ops_.set_compression_level(ctx_->compression_level());
736 ctx_->sent_initial_metadata_ = true;
738 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
739 call_.PerformOps(&finish_ops_);
742 void SendInitialMetadata() override {
743 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
745 // The callback for this function should not be inlined because it invokes
746 // a user-controlled reaction, but any resulting OnDone can be inlined in
747 // the executor to which this callback is dispatched.
748 meta_tag_.Set(call_.call(),
750 ServerBidiReactor<RequestType, ResponseType>* reactor =
751 reactor_.load(std::memory_order_relaxed);
752 reactor->OnSendInitialMetadataDone(ok);
753 this->MaybeDone(/*inlineable_ondone=*/true);
755 &meta_ops_, /*can_inline=*/false);
756 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
757 ctx_->initial_metadata_flags());
758 if (ctx_->compression_level_set()) {
759 meta_ops_.set_compression_level(ctx_->compression_level());
761 ctx_->sent_initial_metadata_ = true;
762 meta_ops_.set_core_cq_tag(&meta_tag_);
763 call_.PerformOps(&meta_ops_);
766 void Write(const ResponseType* resp,
767 ::grpc::WriteOptions options) override {
769 if (options.is_last_message()) {
770 options.set_buffer_hint();
772 if (!ctx_->sent_initial_metadata_) {
773 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774 ctx_->initial_metadata_flags());
775 if (ctx_->compression_level_set()) {
776 write_ops_.set_compression_level(ctx_->compression_level());
778 ctx_->sent_initial_metadata_ = true;
780 // TODO(vjpai): don't assert
781 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
782 call_.PerformOps(&write_ops_);
785 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
786 ::grpc::Status s) override {
787 // Don't send any message if the status is bad
789 // TODO(vjpai): don't assert
790 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
792 Finish(std::move(s));
795 void Read(RequestType* req) override {
797 read_ops_.RecvMessage(req);
798 call_.PerformOps(&read_ops_);
802 friend class CallbackBidiHandler<RequestType, ResponseType>;
804 ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
805 ::grpc::internal::Call* call,
806 std::function<void()> call_requester)
807 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
809 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
810 reactor_.store(reactor, std::memory_order_relaxed);
811 // The callbacks for these functions should not be inlined because they
812 // invoke user-controlled reactions, but any resulting OnDones can be
813 // inlined in the executor to which a callback is dispatched.
814 write_tag_.Set(call_.call(),
815 [this, reactor](bool ok) {
816 reactor->OnWriteDone(ok);
817 this->MaybeDone(/*inlineable_ondone=*/true);
819 &write_ops_, /*can_inline=*/false);
820 write_ops_.set_core_cq_tag(&write_tag_);
821 read_tag_.Set(call_.call(),
822 [this, reactor](bool ok) {
823 reactor->OnReadDone(ok);
824 this->MaybeDone(/*inlineable_ondone=*/true);
826 &read_ops_, /*can_inline=*/false);
827 read_ops_.set_core_cq_tag(&read_tag_);
828 this->BindReactor(reactor);
829 this->MaybeCallOnCancel(reactor);
830 // Inlineable OnDone can be false here because there is no bidi
831 // reactor that has an inlineable OnDone; this only applies to the
832 // DefaultReactor (which is unary).
833 this->MaybeDone(/*inlineable_ondone=*/false);
836 void CallOnDone() override {
837 reactor_.load(std::memory_order_relaxed)->OnDone();
838 grpc_call* call = call_.call();
839 auto call_requester = std::move(call_requester_);
840 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
841 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
845 ServerReactor* reactor() override {
846 return reactor_.load(std::memory_order_relaxed);
849 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
851 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
852 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
853 ::grpc::internal::CallOpSendMessage,
854 ::grpc::internal::CallOpServerSendStatus>
856 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
857 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
858 ::grpc::internal::CallOpSendMessage>
860 ::grpc::internal::CallbackWithSuccessTag write_tag_;
861 ::grpc::internal::CallOpSet<
862 ::grpc::internal::CallOpRecvMessage<RequestType>>
864 ::grpc::internal::CallbackWithSuccessTag read_tag_;
866 ::grpc_impl::CallbackServerContext* const ctx_;
867 ::grpc::internal::Call call_;
868 std::function<void()> call_requester_;
869 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
870 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
871 // callbacks_outstanding_ follows a refcount pattern
872 std::atomic<intptr_t> callbacks_outstanding_{
873 3}; // reserve for OnStarted, Finish, and CompletionOp
877 } // namespace internal
878 } // namespace grpc_impl
880 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H