3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
33 explicit CallbackUnaryHandler(
34 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35 const RequestType*, ResponseType*)>
37 : get_reactor_(std::move(get_reactor)) {}
39 void SetMessageAllocator(
40 MessageAllocator<RequestType, ResponseType>* allocator) {
41 allocator_ = allocator;
44 void RunHandler(const HandlerParameter& param) final {
45 // Arena allocate a controller structure (that includes request/response)
46 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
47 auto* allocator_state =
48 static_cast<MessageHolder<RequestType, ResponseType>*>(
51 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
52 param.call->call(), sizeof(ServerCallbackUnaryImpl)))
53 ServerCallbackUnaryImpl(
54 static_cast<::grpc::CallbackServerContext*>(param.server_context),
55 param.call, allocator_state, param.call_requester);
56 param.server_context->BeginCompletionOp(
57 param.call, [call](bool) { call->MaybeDone(); }, call);
59 ServerUnaryReactor* reactor = nullptr;
60 if (param.status.ok()) {
61 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63 static_cast<::grpc::CallbackServerContext*>(param.server_context),
64 call->request(), call->response());
67 if (reactor == nullptr) {
68 // if deserialization or reactor creator failed, we need to fail the call
69 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
70 param.call->call(), sizeof(UnimplementedUnaryReactor)))
71 UnimplementedUnaryReactor(
72 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
75 /// Invoke SetupReactor as the last part of the handler
76 call->SetupReactor(reactor);
79 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
80 ::grpc::Status* status, void** handler_data) final {
81 ::grpc::ByteBuffer buf;
83 RequestType* request = nullptr;
84 MessageHolder<RequestType, ResponseType>* allocator_state = nullptr;
85 if (allocator_ != nullptr) {
86 allocator_state = allocator_->AllocateMessages();
89 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
90 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
91 DefaultMessageHolder<RequestType, ResponseType>();
93 *handler_data = allocator_state;
94 request = allocator_state->request();
96 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
101 // Clean up on deserialization failure.
102 allocator_state->Release();
107 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
108 const RequestType*, ResponseType*)>
110 MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
112 class ServerCallbackUnaryImpl : public ServerCallbackUnary {
114 void Finish(::grpc::Status s) override {
115 // A callback that only contains a call to MaybeDone can be run as an
116 // inline callback regardless of whether or not OnDone is inlineable
117 // because if the actual OnDone callback needs to be scheduled, MaybeDone
118 // is responsible for dispatching to an executor thread if needed. Thus,
119 // when setting up the finish_tag_, we can set its own callback to
125 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
127 &finish_ops_, /*can_inline=*/true);
128 finish_ops_.set_core_cq_tag(&finish_tag_);
130 if (!ctx_->sent_initial_metadata_) {
131 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
132 ctx_->initial_metadata_flags());
133 if (ctx_->compression_level_set()) {
134 finish_ops_.set_compression_level(ctx_->compression_level());
136 ctx_->sent_initial_metadata_ = true;
138 // The response is dropped if the status is not OK.
140 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
141 finish_ops_.SendMessagePtr(response()));
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
145 finish_ops_.set_core_cq_tag(&finish_tag_);
146 call_.PerformOps(&finish_ops_);
149 void SendInitialMetadata() override {
150 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
152 // The callback for this function should not be marked inline because it
153 // is directly invoking a user-controlled reaction
154 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
155 // thread. However, any OnDone needed after that can be inlined because it
156 // is already running on an executor thread.
160 ServerUnaryReactor* reactor =
161 reactor_.load(std::memory_order_relaxed);
162 reactor->OnSendInitialMetadataDone(ok);
163 this->MaybeDone(/*inlineable_ondone=*/true);
165 &meta_ops_, /*can_inline=*/false);
166 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
167 ctx_->initial_metadata_flags());
168 if (ctx_->compression_level_set()) {
169 meta_ops_.set_compression_level(ctx_->compression_level());
171 ctx_->sent_initial_metadata_ = true;
172 meta_ops_.set_core_cq_tag(&meta_tag_);
173 call_.PerformOps(&meta_ops_);
177 friend class CallbackUnaryHandler<RequestType, ResponseType>;
179 ServerCallbackUnaryImpl(
180 ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
181 MessageHolder<RequestType, ResponseType>* allocator_state,
182 std::function<void()> call_requester)
185 allocator_state_(allocator_state),
186 call_requester_(std::move(call_requester)) {
187 ctx_->set_message_allocator_state(allocator_state);
190 /// SetupReactor binds the reactor (which also releases any queued
191 /// operations), maybe calls OnCancel if possible/needed, and maybe marks
192 /// the completion of the RPC. This should be the last component of the
194 void SetupReactor(ServerUnaryReactor* reactor) {
195 reactor_.store(reactor, std::memory_order_relaxed);
196 this->BindReactor(reactor);
197 this->MaybeCallOnCancel(reactor);
198 this->MaybeDone(reactor->InternalInlineable());
201 const RequestType* request() { return allocator_state_->request(); }
202 ResponseType* response() { return allocator_state_->response(); }
204 void CallOnDone() override {
205 reactor_.load(std::memory_order_relaxed)->OnDone();
206 grpc_call* call = call_.call();
207 auto call_requester = std::move(call_requester_);
208 allocator_state_->Release();
209 if (ctx_->context_allocator() != nullptr) {
210 ctx_->context_allocator()->Release(ctx_);
212 this->~ServerCallbackUnaryImpl(); // explicitly call destructor
213 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
217 ServerReactor* reactor() override {
218 return reactor_.load(std::memory_order_relaxed);
221 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
223 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
224 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
225 ::grpc::internal::CallOpSendMessage,
226 ::grpc::internal::CallOpServerSendStatus>
228 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
230 ::grpc::CallbackServerContext* const ctx_;
231 ::grpc::internal::Call call_;
232 MessageHolder<RequestType, ResponseType>* const allocator_state_;
233 std::function<void()> call_requester_;
234 // reactor_ can always be loaded/stored with relaxed memory ordering because
235 // its value is only set once, independently of other data in the object,
236 // and the loads that use it will always actually come provably later even
237 // though they are from different threads since they are triggered by
238 // actions initiated only by the setting up of the reactor_ variable. In
239 // a sense, it's a delayed "const": it gets its value from the SetupReactor
240 // method (not the constructor, so it's not a true const), but it doesn't
241 // change after that and it only gets used by actions caused, directly or
242 // indirectly, by that setup. This comment also applies to the reactor_
243 // variables of the other streaming objects in this file.
244 std::atomic<ServerUnaryReactor*> reactor_;
245 // callbacks_outstanding_ follows a refcount pattern
246 std::atomic<intptr_t> callbacks_outstanding_{
247 3}; // reserve for start, Finish, and CompletionOp
251 template <class RequestType, class ResponseType>
252 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
254 explicit CallbackClientStreamingHandler(
255 std::function<ServerReadReactor<RequestType>*(
256 ::grpc::CallbackServerContext*, ResponseType*)>
258 : get_reactor_(std::move(get_reactor)) {}
259 void RunHandler(const HandlerParameter& param) final {
260 // Arena allocate a reader structure (that includes response)
261 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
263 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
264 param.call->call(), sizeof(ServerCallbackReaderImpl)))
265 ServerCallbackReaderImpl(
266 static_cast<::grpc::CallbackServerContext*>(param.server_context),
267 param.call, param.call_requester);
268 // Inlineable OnDone can be false in the CompletionOp callback because there
269 // is no read reactor that has an inlineable OnDone; this only applies to
270 // the DefaultReactor (which is unary).
271 param.server_context->BeginCompletionOp(
273 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
276 ServerReadReactor<RequestType>* reactor = nullptr;
277 if (param.status.ok()) {
278 reactor = ::grpc::internal::CatchingReactorGetter<
279 ServerReadReactor<RequestType>>(
281 static_cast<::grpc::CallbackServerContext*>(param.server_context),
285 if (reactor == nullptr) {
286 // if deserialization or reactor creator failed, we need to fail the call
287 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
288 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
289 UnimplementedReadReactor<RequestType>(
290 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
293 reader->SetupReactor(reactor);
297 std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
301 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
303 void Finish(::grpc::Status s) override {
304 // A finish tag with only MaybeDone can have its callback inlined
305 // regardless even if OnDone is not inlineable because this callback just
306 // checks a ref and then decides whether or not to dispatch OnDone.
310 // Inlineable OnDone can be false here because there is
311 // no read reactor that has an inlineable OnDone; this
312 // only applies to the DefaultReactor (which is unary).
313 this->MaybeDone(/*inlineable_ondone=*/false);
315 &finish_ops_, /*can_inline=*/true);
316 if (!ctx_->sent_initial_metadata_) {
317 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
318 ctx_->initial_metadata_flags());
319 if (ctx_->compression_level_set()) {
320 finish_ops_.set_compression_level(ctx_->compression_level());
322 ctx_->sent_initial_metadata_ = true;
324 // The response is dropped if the status is not OK.
326 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327 finish_ops_.SendMessagePtr(&resp_));
329 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
331 finish_ops_.set_core_cq_tag(&finish_tag_);
332 call_.PerformOps(&finish_ops_);
335 void SendInitialMetadata() override {
336 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
338 // The callback for this function should not be inlined because it invokes
339 // a user-controlled reaction, but any resulting OnDone can be inlined in
340 // the executor to which this callback is dispatched.
344 ServerReadReactor<RequestType>* reactor =
345 reactor_.load(std::memory_order_relaxed);
346 reactor->OnSendInitialMetadataDone(ok);
347 this->MaybeDone(/*inlineable_ondone=*/true);
349 &meta_ops_, /*can_inline=*/false);
350 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
351 ctx_->initial_metadata_flags());
352 if (ctx_->compression_level_set()) {
353 meta_ops_.set_compression_level(ctx_->compression_level());
355 ctx_->sent_initial_metadata_ = true;
356 meta_ops_.set_core_cq_tag(&meta_tag_);
357 call_.PerformOps(&meta_ops_);
360 void Read(RequestType* req) override {
362 read_ops_.RecvMessage(req);
363 call_.PerformOps(&read_ops_);
367 friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
369 ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
370 ::grpc::internal::Call* call,
371 std::function<void()> call_requester)
372 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
374 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
375 reactor_.store(reactor, std::memory_order_relaxed);
376 // The callback for this function should not be inlined because it invokes
377 // a user-controlled reaction, but any resulting OnDone can be inlined in
378 // the executor to which this callback is dispatched.
381 [this, reactor](bool ok) {
382 if (GPR_UNLIKELY(!ok)) {
383 ctx_->MaybeMarkCancelledOnRead();
385 reactor->OnReadDone(ok);
386 this->MaybeDone(/*inlineable_ondone=*/true);
388 &read_ops_, /*can_inline=*/false);
389 read_ops_.set_core_cq_tag(&read_tag_);
390 this->BindReactor(reactor);
391 this->MaybeCallOnCancel(reactor);
392 // Inlineable OnDone can be false here because there is no read
393 // reactor that has an inlineable OnDone; this only applies to the
394 // DefaultReactor (which is unary).
395 this->MaybeDone(/*inlineable_ondone=*/false);
398 ~ServerCallbackReaderImpl() {}
400 ResponseType* response() { return &resp_; }
402 void CallOnDone() override {
403 reactor_.load(std::memory_order_relaxed)->OnDone();
404 grpc_call* call = call_.call();
405 auto call_requester = std::move(call_requester_);
406 if (ctx_->context_allocator() != nullptr) {
407 ctx_->context_allocator()->Release(ctx_);
409 this->~ServerCallbackReaderImpl(); // explicitly call destructor
410 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
414 ServerReactor* reactor() override {
415 return reactor_.load(std::memory_order_relaxed);
418 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
420 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
421 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
422 ::grpc::internal::CallOpSendMessage,
423 ::grpc::internal::CallOpServerSendStatus>
425 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
426 ::grpc::internal::CallOpSet<
427 ::grpc::internal::CallOpRecvMessage<RequestType>>
429 ::grpc::internal::CallbackWithSuccessTag read_tag_;
431 ::grpc::CallbackServerContext* const ctx_;
432 ::grpc::internal::Call call_;
434 std::function<void()> call_requester_;
435 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
436 std::atomic<ServerReadReactor<RequestType>*> reactor_;
437 // callbacks_outstanding_ follows a refcount pattern
438 std::atomic<intptr_t> callbacks_outstanding_{
439 3}; // reserve for OnStarted, Finish, and CompletionOp
443 template <class RequestType, class ResponseType>
444 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
446 explicit CallbackServerStreamingHandler(
447 std::function<ServerWriteReactor<ResponseType>*(
448 ::grpc::CallbackServerContext*, const RequestType*)>
450 : get_reactor_(std::move(get_reactor)) {}
451 void RunHandler(const HandlerParameter& param) final {
452 // Arena allocate a writer structure
453 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
455 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
456 param.call->call(), sizeof(ServerCallbackWriterImpl)))
457 ServerCallbackWriterImpl(
458 static_cast<::grpc::CallbackServerContext*>(param.server_context),
459 param.call, static_cast<RequestType*>(param.request),
460 param.call_requester);
461 // Inlineable OnDone can be false in the CompletionOp callback because there
462 // is no write reactor that has an inlineable OnDone; this only applies to
463 // the DefaultReactor (which is unary).
464 param.server_context->BeginCompletionOp(
466 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
469 ServerWriteReactor<ResponseType>* reactor = nullptr;
470 if (param.status.ok()) {
471 reactor = ::grpc::internal::CatchingReactorGetter<
472 ServerWriteReactor<ResponseType>>(
474 static_cast<::grpc::CallbackServerContext*>(param.server_context),
477 if (reactor == nullptr) {
478 // if deserialization or reactor creator failed, we need to fail the call
479 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
480 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
481 UnimplementedWriteReactor<ResponseType>(
482 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
485 writer->SetupReactor(reactor);
488 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
489 ::grpc::Status* status, void** /*handler_data*/) final {
490 ::grpc::ByteBuffer buf;
493 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
494 call, sizeof(RequestType))) RequestType();
496 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
501 request->~RequestType();
506 std::function<ServerWriteReactor<ResponseType>*(
507 ::grpc::CallbackServerContext*, const RequestType*)>
510 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
512 void Finish(::grpc::Status s) override {
513 // A finish tag with only MaybeDone can have its callback inlined
514 // regardless even if OnDone is not inlineable because this callback just
515 // checks a ref and then decides whether or not to dispatch OnDone.
519 // Inlineable OnDone can be false here because there is
520 // no write reactor that has an inlineable OnDone; this
521 // only applies to the DefaultReactor (which is unary).
522 this->MaybeDone(/*inlineable_ondone=*/false);
524 &finish_ops_, /*can_inline=*/true);
525 finish_ops_.set_core_cq_tag(&finish_tag_);
527 if (!ctx_->sent_initial_metadata_) {
528 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
529 ctx_->initial_metadata_flags());
530 if (ctx_->compression_level_set()) {
531 finish_ops_.set_compression_level(ctx_->compression_level());
533 ctx_->sent_initial_metadata_ = true;
535 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
536 call_.PerformOps(&finish_ops_);
539 void SendInitialMetadata() override {
540 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
542 // The callback for this function should not be inlined because it invokes
543 // a user-controlled reaction, but any resulting OnDone can be inlined in
544 // the executor to which this callback is dispatched.
548 ServerWriteReactor<ResponseType>* reactor =
549 reactor_.load(std::memory_order_relaxed);
550 reactor->OnSendInitialMetadataDone(ok);
551 this->MaybeDone(/*inlineable_ondone=*/true);
553 &meta_ops_, /*can_inline=*/false);
554 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
555 ctx_->initial_metadata_flags());
556 if (ctx_->compression_level_set()) {
557 meta_ops_.set_compression_level(ctx_->compression_level());
559 ctx_->sent_initial_metadata_ = true;
560 meta_ops_.set_core_cq_tag(&meta_tag_);
561 call_.PerformOps(&meta_ops_);
564 void Write(const ResponseType* resp,
565 ::grpc::WriteOptions options) override {
567 if (options.is_last_message()) {
568 options.set_buffer_hint();
570 if (!ctx_->sent_initial_metadata_) {
571 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
572 ctx_->initial_metadata_flags());
573 if (ctx_->compression_level_set()) {
574 write_ops_.set_compression_level(ctx_->compression_level());
576 ctx_->sent_initial_metadata_ = true;
578 // TODO(vjpai): don't assert
579 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
580 call_.PerformOps(&write_ops_);
583 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
584 ::grpc::Status s) override {
585 // This combines the write into the finish callback
586 // TODO(vjpai): don't assert
587 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
588 Finish(std::move(s));
592 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
594 ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
595 ::grpc::internal::Call* call,
596 const RequestType* req,
597 std::function<void()> call_requester)
601 call_requester_(std::move(call_requester)) {}
603 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
604 reactor_.store(reactor, std::memory_order_relaxed);
605 // The callback for this function should not be inlined because it invokes
606 // a user-controlled reaction, but any resulting OnDone can be inlined in
607 // the executor to which this callback is dispatched.
610 [this, reactor](bool ok) {
611 reactor->OnWriteDone(ok);
612 this->MaybeDone(/*inlineable_ondone=*/true);
614 &write_ops_, /*can_inline=*/false);
615 write_ops_.set_core_cq_tag(&write_tag_);
616 this->BindReactor(reactor);
617 this->MaybeCallOnCancel(reactor);
618 // Inlineable OnDone can be false here because there is no write
619 // reactor that has an inlineable OnDone; this only applies to the
620 // DefaultReactor (which is unary).
621 this->MaybeDone(/*inlineable_ondone=*/false);
623 ~ServerCallbackWriterImpl() {
624 if (req_ != nullptr) {
625 req_->~RequestType();
629 const RequestType* request() { return req_; }
631 void CallOnDone() override {
632 reactor_.load(std::memory_order_relaxed)->OnDone();
633 grpc_call* call = call_.call();
634 auto call_requester = std::move(call_requester_);
635 if (ctx_->context_allocator() != nullptr) {
636 ctx_->context_allocator()->Release(ctx_);
638 this->~ServerCallbackWriterImpl(); // explicitly call destructor
639 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
643 ServerReactor* reactor() override {
644 return reactor_.load(std::memory_order_relaxed);
647 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
649 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
650 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
651 ::grpc::internal::CallOpSendMessage,
652 ::grpc::internal::CallOpServerSendStatus>
654 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
655 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
656 ::grpc::internal::CallOpSendMessage>
658 ::grpc::internal::CallbackWithSuccessTag write_tag_;
660 ::grpc::CallbackServerContext* const ctx_;
661 ::grpc::internal::Call call_;
662 const RequestType* req_;
663 std::function<void()> call_requester_;
664 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
665 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
666 // callbacks_outstanding_ follows a refcount pattern
667 std::atomic<intptr_t> callbacks_outstanding_{
668 3}; // reserve for OnStarted, Finish, and CompletionOp
672 template <class RequestType, class ResponseType>
673 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
675 explicit CallbackBidiHandler(
676 std::function<ServerBidiReactor<RequestType, ResponseType>*(
677 ::grpc::CallbackServerContext*)>
679 : get_reactor_(std::move(get_reactor)) {}
680 void RunHandler(const HandlerParameter& param) final {
681 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
683 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
684 param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
685 ServerCallbackReaderWriterImpl(
686 static_cast<::grpc::CallbackServerContext*>(param.server_context),
687 param.call, param.call_requester);
688 // Inlineable OnDone can be false in the CompletionOp callback because there
689 // is no bidi reactor that has an inlineable OnDone; this only applies to
690 // the DefaultReactor (which is unary).
691 param.server_context->BeginCompletionOp(
693 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
696 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
697 if (param.status.ok()) {
698 reactor = ::grpc::internal::CatchingReactorGetter<
699 ServerBidiReactor<RequestType, ResponseType>>(
701 static_cast<::grpc::CallbackServerContext*>(param.server_context));
704 if (reactor == nullptr) {
705 // if deserialization or reactor creator failed, we need to fail the call
706 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
708 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
709 UnimplementedBidiReactor<RequestType, ResponseType>(
710 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
713 stream->SetupReactor(reactor);
717 std::function<ServerBidiReactor<RequestType, ResponseType>*(
718 ::grpc::CallbackServerContext*)>
721 class ServerCallbackReaderWriterImpl
722 : public ServerCallbackReaderWriter<RequestType, ResponseType> {
724 void Finish(::grpc::Status s) override {
725 // A finish tag with only MaybeDone can have its callback inlined
726 // regardless even if OnDone is not inlineable because this callback just
727 // checks a ref and then decides whether or not to dispatch OnDone.
731 // Inlineable OnDone can be false here because there is
732 // no bidi reactor that has an inlineable OnDone; this
733 // only applies to the DefaultReactor (which is unary).
734 this->MaybeDone(/*inlineable_ondone=*/false);
736 &finish_ops_, /*can_inline=*/true);
737 finish_ops_.set_core_cq_tag(&finish_tag_);
739 if (!ctx_->sent_initial_metadata_) {
740 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
741 ctx_->initial_metadata_flags());
742 if (ctx_->compression_level_set()) {
743 finish_ops_.set_compression_level(ctx_->compression_level());
745 ctx_->sent_initial_metadata_ = true;
747 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
748 call_.PerformOps(&finish_ops_);
751 void SendInitialMetadata() override {
752 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
754 // The callback for this function should not be inlined because it invokes
755 // a user-controlled reaction, but any resulting OnDone can be inlined in
756 // the executor to which this callback is dispatched.
760 ServerBidiReactor<RequestType, ResponseType>* reactor =
761 reactor_.load(std::memory_order_relaxed);
762 reactor->OnSendInitialMetadataDone(ok);
763 this->MaybeDone(/*inlineable_ondone=*/true);
765 &meta_ops_, /*can_inline=*/false);
766 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
767 ctx_->initial_metadata_flags());
768 if (ctx_->compression_level_set()) {
769 meta_ops_.set_compression_level(ctx_->compression_level());
771 ctx_->sent_initial_metadata_ = true;
772 meta_ops_.set_core_cq_tag(&meta_tag_);
773 call_.PerformOps(&meta_ops_);
776 void Write(const ResponseType* resp,
777 ::grpc::WriteOptions options) override {
779 if (options.is_last_message()) {
780 options.set_buffer_hint();
782 if (!ctx_->sent_initial_metadata_) {
783 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
784 ctx_->initial_metadata_flags());
785 if (ctx_->compression_level_set()) {
786 write_ops_.set_compression_level(ctx_->compression_level());
788 ctx_->sent_initial_metadata_ = true;
790 // TODO(vjpai): don't assert
791 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
792 call_.PerformOps(&write_ops_);
795 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
796 ::grpc::Status s) override {
797 // TODO(vjpai): don't assert
798 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
799 Finish(std::move(s));
802 void Read(RequestType* req) override {
804 read_ops_.RecvMessage(req);
805 call_.PerformOps(&read_ops_);
809 friend class CallbackBidiHandler<RequestType, ResponseType>;
811 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
812 ::grpc::internal::Call* call,
813 std::function<void()> call_requester)
814 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
816 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
817 reactor_.store(reactor, std::memory_order_relaxed);
818 // The callbacks for these functions should not be inlined because they
819 // invoke user-controlled reactions, but any resulting OnDones can be
820 // inlined in the executor to which a callback is dispatched.
823 [this, reactor](bool ok) {
824 reactor->OnWriteDone(ok);
825 this->MaybeDone(/*inlineable_ondone=*/true);
827 &write_ops_, /*can_inline=*/false);
828 write_ops_.set_core_cq_tag(&write_tag_);
831 [this, reactor](bool ok) {
832 if (GPR_UNLIKELY(!ok)) {
833 ctx_->MaybeMarkCancelledOnRead();
835 reactor->OnReadDone(ok);
836 this->MaybeDone(/*inlineable_ondone=*/true);
838 &read_ops_, /*can_inline=*/false);
839 read_ops_.set_core_cq_tag(&read_tag_);
840 this->BindReactor(reactor);
841 this->MaybeCallOnCancel(reactor);
842 // Inlineable OnDone can be false here because there is no bidi
843 // reactor that has an inlineable OnDone; this only applies to the
844 // DefaultReactor (which is unary).
845 this->MaybeDone(/*inlineable_ondone=*/false);
848 void CallOnDone() override {
849 reactor_.load(std::memory_order_relaxed)->OnDone();
850 grpc_call* call = call_.call();
851 auto call_requester = std::move(call_requester_);
852 if (ctx_->context_allocator() != nullptr) {
853 ctx_->context_allocator()->Release(ctx_);
855 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
856 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
860 ServerReactor* reactor() override {
861 return reactor_.load(std::memory_order_relaxed);
864 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
866 ::grpc::internal::CallbackWithSuccessTag meta_tag_;
867 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
868 ::grpc::internal::CallOpSendMessage,
869 ::grpc::internal::CallOpServerSendStatus>
871 ::grpc::internal::CallbackWithSuccessTag finish_tag_;
872 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
873 ::grpc::internal::CallOpSendMessage>
875 ::grpc::internal::CallbackWithSuccessTag write_tag_;
876 ::grpc::internal::CallOpSet<
877 ::grpc::internal::CallOpRecvMessage<RequestType>>
879 ::grpc::internal::CallbackWithSuccessTag read_tag_;
881 ::grpc::CallbackServerContext* const ctx_;
882 ::grpc::internal::Call call_;
883 std::function<void()> call_requester_;
884 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
885 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
886 // callbacks_outstanding_ follows a refcount pattern
887 std::atomic<intptr_t> callbacks_outstanding_{
888 3}; // reserve for OnStarted, Finish, and CompletionOp
892 } // namespace internal
895 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H