Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_handlers.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
26
27 namespace grpc {
28 namespace internal {
29
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
33   explicit CallbackUnaryHandler(
34       std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35                                         const RequestType*, ResponseType*)>
36           get_reactor)
37       : get_reactor_(std::move(get_reactor)) {}
38
39   void SetMessageAllocator(
40       ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
41           allocator) {
42     allocator_ = allocator;
43   }
44
45   void RunHandler(const HandlerParameter& param) final {
46     // Arena allocate a controller structure (that includes request/response)
47     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48     auto* allocator_state = static_cast<
49         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
50         param.internal_data);
51
52     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54         ServerCallbackUnaryImpl(
55             static_cast<::grpc::CallbackServerContext*>(param.server_context),
56             param.call, allocator_state, std::move(param.call_requester));
57     param.server_context->BeginCompletionOp(
58         param.call, [call](bool) { call->MaybeDone(); }, call);
59
60     ServerUnaryReactor* reactor = nullptr;
61     if (param.status.ok()) {
62       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63           get_reactor_,
64           static_cast<::grpc::CallbackServerContext*>(param.server_context),
65           call->request(), call->response());
66     }
67
68     if (reactor == nullptr) {
69       // if deserialization or reactor creator failed, we need to fail the call
70       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
71           param.call->call(), sizeof(UnimplementedUnaryReactor)))
72           UnimplementedUnaryReactor(
73               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
74     }
75
76     /// Invoke SetupReactor as the last part of the handler
77     call->SetupReactor(reactor);
78   }
79
80   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
81                     ::grpc::Status* status, void** handler_data) final {
82     ::grpc::ByteBuffer buf;
83     buf.set_buffer(req);
84     RequestType* request = nullptr;
85     ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
86         allocator_state = nullptr;
87     if (allocator_ != nullptr) {
88       allocator_state = allocator_->AllocateMessages();
89     } else {
90       allocator_state =
91           new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
92               call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
93               DefaultMessageHolder<RequestType, ResponseType>();
94     }
95     *handler_data = allocator_state;
96     request = allocator_state->request();
97     *status =
98         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
99     buf.Release();
100     if (status->ok()) {
101       return request;
102     }
103     // Clean up on deserialization failure.
104     allocator_state->Release();
105     return nullptr;
106   }
107
108  private:
109   std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
110                                     const RequestType*, ResponseType*)>
111       get_reactor_;
112   ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
113       allocator_ = nullptr;
114
115   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
116    public:
117     void Finish(::grpc::Status s) override {
118       // A callback that only contains a call to MaybeDone can be run as an
119       // inline callback regardless of whether or not OnDone is inlineable
120       // because if the actual OnDone callback needs to be scheduled, MaybeDone
121       // is responsible for dispatching to an executor thread if needed. Thus,
122       // when setting up the finish_tag_, we can set its own callback to
123       // inlineable.
124       finish_tag_.Set(
125           call_.call(),
126           [this](bool) {
127             this->MaybeDone(
128                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129           },
130           &finish_ops_, /*can_inline=*/true);
131       finish_ops_.set_core_cq_tag(&finish_tag_);
132
133       if (!ctx_->sent_initial_metadata_) {
134         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135                                         ctx_->initial_metadata_flags());
136         if (ctx_->compression_level_set()) {
137           finish_ops_.set_compression_level(ctx_->compression_level());
138         }
139         ctx_->sent_initial_metadata_ = true;
140       }
141       // The response is dropped if the status is not OK.
142       if (s.ok()) {
143         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144                                      finish_ops_.SendMessagePtr(response()));
145       } else {
146         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147       }
148       finish_ops_.set_core_cq_tag(&finish_tag_);
149       call_.PerformOps(&finish_ops_);
150     }
151
152     void SendInitialMetadata() override {
153       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
154       this->Ref();
155       // The callback for this function should not be marked inline because it
156       // is directly invoking a user-controlled reaction
157       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158       // thread. However, any OnDone needed after that can be inlined because it
159       // is already running on an executor thread.
160       meta_tag_.Set(call_.call(),
161                     [this](bool ok) {
162                       ServerUnaryReactor* reactor =
163                           reactor_.load(std::memory_order_relaxed);
164                       reactor->OnSendInitialMetadataDone(ok);
165                       this->MaybeDone(/*inlineable_ondone=*/true);
166                     },
167                     &meta_ops_, /*can_inline=*/false);
168       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
169                                     ctx_->initial_metadata_flags());
170       if (ctx_->compression_level_set()) {
171         meta_ops_.set_compression_level(ctx_->compression_level());
172       }
173       ctx_->sent_initial_metadata_ = true;
174       meta_ops_.set_core_cq_tag(&meta_tag_);
175       call_.PerformOps(&meta_ops_);
176     }
177
178    private:
179     friend class CallbackUnaryHandler<RequestType, ResponseType>;
180
181     ServerCallbackUnaryImpl(
182         ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
183         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
184             allocator_state,
185         std::function<void()> call_requester)
186         : ctx_(ctx),
187           call_(*call),
188           allocator_state_(allocator_state),
189           call_requester_(std::move(call_requester)) {
190       ctx_->set_message_allocator_state(allocator_state);
191     }
192
193     /// SetupReactor binds the reactor (which also releases any queued
194     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
195     /// the completion of the RPC. This should be the last component of the
196     /// handler.
197     void SetupReactor(ServerUnaryReactor* reactor) {
198       reactor_.store(reactor, std::memory_order_relaxed);
199       this->BindReactor(reactor);
200       this->MaybeCallOnCancel(reactor);
201       this->MaybeDone(reactor->InternalInlineable());
202     }
203
204     const RequestType* request() { return allocator_state_->request(); }
205     ResponseType* response() { return allocator_state_->response(); }
206
207     void CallOnDone() override {
208       reactor_.load(std::memory_order_relaxed)->OnDone();
209       grpc_call* call = call_.call();
210       auto call_requester = std::move(call_requester_);
211       allocator_state_->Release();
212       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
213       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
214       call_requester();
215     }
216
217     ServerReactor* reactor() override {
218       return reactor_.load(std::memory_order_relaxed);
219     }
220
221     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
222         meta_ops_;
223     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
224     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
225                                 ::grpc::internal::CallOpSendMessage,
226                                 ::grpc::internal::CallOpServerSendStatus>
227         finish_ops_;
228     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
229
230     ::grpc::CallbackServerContext* const ctx_;
231     ::grpc::internal::Call call_;
232     ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
233         allocator_state_;
234     std::function<void()> call_requester_;
235     // reactor_ can always be loaded/stored with relaxed memory ordering because
236     // its value is only set once, independently of other data in the object,
237     // and the loads that use it will always actually come provably later even
238     // though they are from different threads since they are triggered by
239     // actions initiated only by the setting up of the reactor_ variable. In
240     // a sense, it's a delayed "const": it gets its value from the SetupReactor
241     // method (not the constructor, so it's not a true const), but it doesn't
242     // change after that and it only gets used by actions caused, directly or
243     // indirectly, by that setup. This comment also applies to the reactor_
244     // variables of the other streaming objects in this file.
245     std::atomic<ServerUnaryReactor*> reactor_;
246     // callbacks_outstanding_ follows a refcount pattern
247     std::atomic<intptr_t> callbacks_outstanding_{
248         3};  // reserve for start, Finish, and CompletionOp
249   };
250 };
251
252 template <class RequestType, class ResponseType>
253 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
254  public:
255   explicit CallbackClientStreamingHandler(
256       std::function<ServerReadReactor<RequestType>*(
257           ::grpc::CallbackServerContext*, ResponseType*)>
258           get_reactor)
259       : get_reactor_(std::move(get_reactor)) {}
260   void RunHandler(const HandlerParameter& param) final {
261     // Arena allocate a reader structure (that includes response)
262     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
263
264     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
265         param.call->call(), sizeof(ServerCallbackReaderImpl)))
266         ServerCallbackReaderImpl(
267             static_cast<::grpc::CallbackServerContext*>(param.server_context),
268             param.call, std::move(param.call_requester));
269     // Inlineable OnDone can be false in the CompletionOp callback because there
270     // is no read reactor that has an inlineable OnDone; this only applies to
271     // the DefaultReactor (which is unary).
272     param.server_context->BeginCompletionOp(
273         param.call,
274         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
275         reader);
276
277     ServerReadReactor<RequestType>* reactor = nullptr;
278     if (param.status.ok()) {
279       reactor = ::grpc::internal::CatchingReactorGetter<
280           ServerReadReactor<RequestType>>(
281           get_reactor_,
282           static_cast<::grpc::CallbackServerContext*>(param.server_context),
283           reader->response());
284     }
285
286     if (reactor == nullptr) {
287       // if deserialization or reactor creator failed, we need to fail the call
288       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
289           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
290           UnimplementedReadReactor<RequestType>(
291               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
292     }
293
294     reader->SetupReactor(reactor);
295   }
296
297  private:
298   std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
299                                                 ResponseType*)>
300       get_reactor_;
301
302   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
303    public:
304     void Finish(::grpc::Status s) override {
305       // A finish tag with only MaybeDone can have its callback inlined
306       // regardless even if OnDone is not inlineable because this callback just
307       // checks a ref and then decides whether or not to dispatch OnDone.
308       finish_tag_.Set(call_.call(),
309                       [this](bool) {
310                         // Inlineable OnDone can be false here because there is
311                         // no read reactor that has an inlineable OnDone; this
312                         // only applies to the DefaultReactor (which is unary).
313                         this->MaybeDone(/*inlineable_ondone=*/false);
314                       },
315                       &finish_ops_, /*can_inline=*/true);
316       if (!ctx_->sent_initial_metadata_) {
317         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
318                                         ctx_->initial_metadata_flags());
319         if (ctx_->compression_level_set()) {
320           finish_ops_.set_compression_level(ctx_->compression_level());
321         }
322         ctx_->sent_initial_metadata_ = true;
323       }
324       // The response is dropped if the status is not OK.
325       if (s.ok()) {
326         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327                                      finish_ops_.SendMessagePtr(&resp_));
328       } else {
329         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
330       }
331       finish_ops_.set_core_cq_tag(&finish_tag_);
332       call_.PerformOps(&finish_ops_);
333     }
334
335     void SendInitialMetadata() override {
336       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
337       this->Ref();
338       // The callback for this function should not be inlined because it invokes
339       // a user-controlled reaction, but any resulting OnDone can be inlined in
340       // the executor to which this callback is dispatched.
341       meta_tag_.Set(call_.call(),
342                     [this](bool ok) {
343                       ServerReadReactor<RequestType>* reactor =
344                           reactor_.load(std::memory_order_relaxed);
345                       reactor->OnSendInitialMetadataDone(ok);
346                       this->MaybeDone(/*inlineable_ondone=*/true);
347                     },
348                     &meta_ops_, /*can_inline=*/false);
349       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
350                                     ctx_->initial_metadata_flags());
351       if (ctx_->compression_level_set()) {
352         meta_ops_.set_compression_level(ctx_->compression_level());
353       }
354       ctx_->sent_initial_metadata_ = true;
355       meta_ops_.set_core_cq_tag(&meta_tag_);
356       call_.PerformOps(&meta_ops_);
357     }
358
359     void Read(RequestType* req) override {
360       this->Ref();
361       read_ops_.RecvMessage(req);
362       call_.PerformOps(&read_ops_);
363     }
364
365    private:
366     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
367
368     ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
369                              ::grpc::internal::Call* call,
370                              std::function<void()> call_requester)
371         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
372
373     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
374       reactor_.store(reactor, std::memory_order_relaxed);
375       // The callback for this function should not be inlined because it invokes
376       // a user-controlled reaction, but any resulting OnDone can be inlined in
377       // the executor to which this callback is dispatched.
378       read_tag_.Set(call_.call(),
379                     [this, reactor](bool ok) {
380                       reactor->OnReadDone(ok);
381                       this->MaybeDone(/*inlineable_ondone=*/true);
382                     },
383                     &read_ops_, /*can_inline=*/false);
384       read_ops_.set_core_cq_tag(&read_tag_);
385       this->BindReactor(reactor);
386       this->MaybeCallOnCancel(reactor);
387       // Inlineable OnDone can be false here because there is no read
388       // reactor that has an inlineable OnDone; this only applies to the
389       // DefaultReactor (which is unary).
390       this->MaybeDone(/*inlineable_ondone=*/false);
391     }
392
393     ~ServerCallbackReaderImpl() {}
394
395     ResponseType* response() { return &resp_; }
396
397     void CallOnDone() override {
398       reactor_.load(std::memory_order_relaxed)->OnDone();
399       grpc_call* call = call_.call();
400       auto call_requester = std::move(call_requester_);
401       this->~ServerCallbackReaderImpl();  // explicitly call destructor
402       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
403       call_requester();
404     }
405
406     ServerReactor* reactor() override {
407       return reactor_.load(std::memory_order_relaxed);
408     }
409
410     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
411         meta_ops_;
412     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
413     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
414                                 ::grpc::internal::CallOpSendMessage,
415                                 ::grpc::internal::CallOpServerSendStatus>
416         finish_ops_;
417     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
418     ::grpc::internal::CallOpSet<
419         ::grpc::internal::CallOpRecvMessage<RequestType>>
420         read_ops_;
421     ::grpc::internal::CallbackWithSuccessTag read_tag_;
422
423     ::grpc::CallbackServerContext* const ctx_;
424     ::grpc::internal::Call call_;
425     ResponseType resp_;
426     std::function<void()> call_requester_;
427     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
428     std::atomic<ServerReadReactor<RequestType>*> reactor_;
429     // callbacks_outstanding_ follows a refcount pattern
430     std::atomic<intptr_t> callbacks_outstanding_{
431         3};  // reserve for OnStarted, Finish, and CompletionOp
432   };
433 };
434
435 template <class RequestType, class ResponseType>
436 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
437  public:
438   explicit CallbackServerStreamingHandler(
439       std::function<ServerWriteReactor<ResponseType>*(
440           ::grpc::CallbackServerContext*, const RequestType*)>
441           get_reactor)
442       : get_reactor_(std::move(get_reactor)) {}
443   void RunHandler(const HandlerParameter& param) final {
444     // Arena allocate a writer structure
445     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
446
447     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
448         param.call->call(), sizeof(ServerCallbackWriterImpl)))
449         ServerCallbackWriterImpl(
450             static_cast<::grpc::CallbackServerContext*>(param.server_context),
451             param.call, static_cast<RequestType*>(param.request),
452             std::move(param.call_requester));
453     // Inlineable OnDone can be false in the CompletionOp callback because there
454     // is no write reactor that has an inlineable OnDone; this only applies to
455     // the DefaultReactor (which is unary).
456     param.server_context->BeginCompletionOp(
457         param.call,
458         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
459         writer);
460
461     ServerWriteReactor<ResponseType>* reactor = nullptr;
462     if (param.status.ok()) {
463       reactor = ::grpc::internal::CatchingReactorGetter<
464           ServerWriteReactor<ResponseType>>(
465           get_reactor_,
466           static_cast<::grpc::CallbackServerContext*>(param.server_context),
467           writer->request());
468     }
469     if (reactor == nullptr) {
470       // if deserialization or reactor creator failed, we need to fail the call
471       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
472           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
473           UnimplementedWriteReactor<ResponseType>(
474               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
475     }
476
477     writer->SetupReactor(reactor);
478   }
479
480   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
481                     ::grpc::Status* status, void** /*handler_data*/) final {
482     ::grpc::ByteBuffer buf;
483     buf.set_buffer(req);
484     auto* request =
485         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
486             call, sizeof(RequestType))) RequestType();
487     *status =
488         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
489     buf.Release();
490     if (status->ok()) {
491       return request;
492     }
493     request->~RequestType();
494     return nullptr;
495   }
496
497  private:
498   std::function<ServerWriteReactor<ResponseType>*(
499       ::grpc::CallbackServerContext*, const RequestType*)>
500       get_reactor_;
501
502   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
503    public:
504     void Finish(::grpc::Status s) override {
505       // A finish tag with only MaybeDone can have its callback inlined
506       // regardless even if OnDone is not inlineable because this callback just
507       // checks a ref and then decides whether or not to dispatch OnDone.
508       finish_tag_.Set(call_.call(),
509                       [this](bool) {
510                         // Inlineable OnDone can be false here because there is
511                         // no write reactor that has an inlineable OnDone; this
512                         // only applies to the DefaultReactor (which is unary).
513                         this->MaybeDone(/*inlineable_ondone=*/false);
514                       },
515                       &finish_ops_, /*can_inline=*/true);
516       finish_ops_.set_core_cq_tag(&finish_tag_);
517
518       if (!ctx_->sent_initial_metadata_) {
519         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
520                                         ctx_->initial_metadata_flags());
521         if (ctx_->compression_level_set()) {
522           finish_ops_.set_compression_level(ctx_->compression_level());
523         }
524         ctx_->sent_initial_metadata_ = true;
525       }
526       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
527       call_.PerformOps(&finish_ops_);
528     }
529
530     void SendInitialMetadata() override {
531       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
532       this->Ref();
533       // The callback for this function should not be inlined because it invokes
534       // a user-controlled reaction, but any resulting OnDone can be inlined in
535       // the executor to which this callback is dispatched.
536       meta_tag_.Set(call_.call(),
537                     [this](bool ok) {
538                       ServerWriteReactor<ResponseType>* reactor =
539                           reactor_.load(std::memory_order_relaxed);
540                       reactor->OnSendInitialMetadataDone(ok);
541                       this->MaybeDone(/*inlineable_ondone=*/true);
542                     },
543                     &meta_ops_, /*can_inline=*/false);
544       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
545                                     ctx_->initial_metadata_flags());
546       if (ctx_->compression_level_set()) {
547         meta_ops_.set_compression_level(ctx_->compression_level());
548       }
549       ctx_->sent_initial_metadata_ = true;
550       meta_ops_.set_core_cq_tag(&meta_tag_);
551       call_.PerformOps(&meta_ops_);
552     }
553
554     void Write(const ResponseType* resp,
555                ::grpc::WriteOptions options) override {
556       this->Ref();
557       if (options.is_last_message()) {
558         options.set_buffer_hint();
559       }
560       if (!ctx_->sent_initial_metadata_) {
561         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
562                                        ctx_->initial_metadata_flags());
563         if (ctx_->compression_level_set()) {
564           write_ops_.set_compression_level(ctx_->compression_level());
565         }
566         ctx_->sent_initial_metadata_ = true;
567       }
568       // TODO(vjpai): don't assert
569       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
570       call_.PerformOps(&write_ops_);
571     }
572
573     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
574                         ::grpc::Status s) override {
575       // This combines the write into the finish callback
576       // TODO(vjpai): don't assert
577       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
578       Finish(std::move(s));
579     }
580
581    private:
582     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
583
584     ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
585                              ::grpc::internal::Call* call,
586                              const RequestType* req,
587                              std::function<void()> call_requester)
588         : ctx_(ctx),
589           call_(*call),
590           req_(req),
591           call_requester_(std::move(call_requester)) {}
592
593     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
594       reactor_.store(reactor, std::memory_order_relaxed);
595       // The callback for this function should not be inlined because it invokes
596       // a user-controlled reaction, but any resulting OnDone can be inlined in
597       // the executor to which this callback is dispatched.
598       write_tag_.Set(call_.call(),
599                      [this, reactor](bool ok) {
600                        reactor->OnWriteDone(ok);
601                        this->MaybeDone(/*inlineable_ondone=*/true);
602                      },
603                      &write_ops_, /*can_inline=*/false);
604       write_ops_.set_core_cq_tag(&write_tag_);
605       this->BindReactor(reactor);
606       this->MaybeCallOnCancel(reactor);
607       // Inlineable OnDone can be false here because there is no write
608       // reactor that has an inlineable OnDone; this only applies to the
609       // DefaultReactor (which is unary).
610       this->MaybeDone(/*inlineable_ondone=*/false);
611     }
612     ~ServerCallbackWriterImpl() { req_->~RequestType(); }
613
614     const RequestType* request() { return req_; }
615
616     void CallOnDone() override {
617       reactor_.load(std::memory_order_relaxed)->OnDone();
618       grpc_call* call = call_.call();
619       auto call_requester = std::move(call_requester_);
620       this->~ServerCallbackWriterImpl();  // explicitly call destructor
621       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
622       call_requester();
623     }
624
625     ServerReactor* reactor() override {
626       return reactor_.load(std::memory_order_relaxed);
627     }
628
629     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
630         meta_ops_;
631     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
632     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
633                                 ::grpc::internal::CallOpSendMessage,
634                                 ::grpc::internal::CallOpServerSendStatus>
635         finish_ops_;
636     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
637     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
638                                 ::grpc::internal::CallOpSendMessage>
639         write_ops_;
640     ::grpc::internal::CallbackWithSuccessTag write_tag_;
641
642     ::grpc::CallbackServerContext* const ctx_;
643     ::grpc::internal::Call call_;
644     const RequestType* req_;
645     std::function<void()> call_requester_;
646     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
647     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
648     // callbacks_outstanding_ follows a refcount pattern
649     std::atomic<intptr_t> callbacks_outstanding_{
650         3};  // reserve for OnStarted, Finish, and CompletionOp
651   };
652 };
653
654 template <class RequestType, class ResponseType>
655 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
656  public:
657   explicit CallbackBidiHandler(
658       std::function<ServerBidiReactor<RequestType, ResponseType>*(
659           ::grpc::CallbackServerContext*)>
660           get_reactor)
661       : get_reactor_(std::move(get_reactor)) {}
662   void RunHandler(const HandlerParameter& param) final {
663     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
664
665     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
666         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
667         ServerCallbackReaderWriterImpl(
668             static_cast<::grpc::CallbackServerContext*>(param.server_context),
669             param.call, std::move(param.call_requester));
670     // Inlineable OnDone can be false in the CompletionOp callback because there
671     // is no bidi reactor that has an inlineable OnDone; this only applies to
672     // the DefaultReactor (which is unary).
673     param.server_context->BeginCompletionOp(
674         param.call,
675         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
676         stream);
677
678     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
679     if (param.status.ok()) {
680       reactor = ::grpc::internal::CatchingReactorGetter<
681           ServerBidiReactor<RequestType, ResponseType>>(
682           get_reactor_,
683           static_cast<::grpc::CallbackServerContext*>(param.server_context));
684     }
685
686     if (reactor == nullptr) {
687       // if deserialization or reactor creator failed, we need to fail the call
688       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
689           param.call->call(),
690           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
691           UnimplementedBidiReactor<RequestType, ResponseType>(
692               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
693     }
694
695     stream->SetupReactor(reactor);
696   }
697
698  private:
699   std::function<ServerBidiReactor<RequestType, ResponseType>*(
700       ::grpc::CallbackServerContext*)>
701       get_reactor_;
702
703   class ServerCallbackReaderWriterImpl
704       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
705    public:
706     void Finish(::grpc::Status s) override {
707       // A finish tag with only MaybeDone can have its callback inlined
708       // regardless even if OnDone is not inlineable because this callback just
709       // checks a ref and then decides whether or not to dispatch OnDone.
710       finish_tag_.Set(call_.call(),
711                       [this](bool) {
712                         // Inlineable OnDone can be false here because there is
713                         // no bidi reactor that has an inlineable OnDone; this
714                         // only applies to the DefaultReactor (which is unary).
715                         this->MaybeDone(/*inlineable_ondone=*/false);
716                       },
717                       &finish_ops_, /*can_inline=*/true);
718       finish_ops_.set_core_cq_tag(&finish_tag_);
719
720       if (!ctx_->sent_initial_metadata_) {
721         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
722                                         ctx_->initial_metadata_flags());
723         if (ctx_->compression_level_set()) {
724           finish_ops_.set_compression_level(ctx_->compression_level());
725         }
726         ctx_->sent_initial_metadata_ = true;
727       }
728       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
729       call_.PerformOps(&finish_ops_);
730     }
731
732     void SendInitialMetadata() override {
733       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
734       this->Ref();
735       // The callback for this function should not be inlined because it invokes
736       // a user-controlled reaction, but any resulting OnDone can be inlined in
737       // the executor to which this callback is dispatched.
738       meta_tag_.Set(call_.call(),
739                     [this](bool ok) {
740                       ServerBidiReactor<RequestType, ResponseType>* reactor =
741                           reactor_.load(std::memory_order_relaxed);
742                       reactor->OnSendInitialMetadataDone(ok);
743                       this->MaybeDone(/*inlineable_ondone=*/true);
744                     },
745                     &meta_ops_, /*can_inline=*/false);
746       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
747                                     ctx_->initial_metadata_flags());
748       if (ctx_->compression_level_set()) {
749         meta_ops_.set_compression_level(ctx_->compression_level());
750       }
751       ctx_->sent_initial_metadata_ = true;
752       meta_ops_.set_core_cq_tag(&meta_tag_);
753       call_.PerformOps(&meta_ops_);
754     }
755
756     void Write(const ResponseType* resp,
757                ::grpc::WriteOptions options) override {
758       this->Ref();
759       if (options.is_last_message()) {
760         options.set_buffer_hint();
761       }
762       if (!ctx_->sent_initial_metadata_) {
763         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
764                                        ctx_->initial_metadata_flags());
765         if (ctx_->compression_level_set()) {
766           write_ops_.set_compression_level(ctx_->compression_level());
767         }
768         ctx_->sent_initial_metadata_ = true;
769       }
770       // TODO(vjpai): don't assert
771       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
772       call_.PerformOps(&write_ops_);
773     }
774
775     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
776                         ::grpc::Status s) override {
777       // TODO(vjpai): don't assert
778       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
779       Finish(std::move(s));
780     }
781
782     void Read(RequestType* req) override {
783       this->Ref();
784       read_ops_.RecvMessage(req);
785       call_.PerformOps(&read_ops_);
786     }
787
788    private:
789     friend class CallbackBidiHandler<RequestType, ResponseType>;
790
791     ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
792                                    ::grpc::internal::Call* call,
793                                    std::function<void()> call_requester)
794         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
795
796     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
797       reactor_.store(reactor, std::memory_order_relaxed);
798       // The callbacks for these functions should not be inlined because they
799       // invoke user-controlled reactions, but any resulting OnDones can be
800       // inlined in the executor to which a callback is dispatched.
801       write_tag_.Set(call_.call(),
802                      [this, reactor](bool ok) {
803                        reactor->OnWriteDone(ok);
804                        this->MaybeDone(/*inlineable_ondone=*/true);
805                      },
806                      &write_ops_, /*can_inline=*/false);
807       write_ops_.set_core_cq_tag(&write_tag_);
808       read_tag_.Set(call_.call(),
809                     [this, reactor](bool ok) {
810                       reactor->OnReadDone(ok);
811                       this->MaybeDone(/*inlineable_ondone=*/true);
812                     },
813                     &read_ops_, /*can_inline=*/false);
814       read_ops_.set_core_cq_tag(&read_tag_);
815       this->BindReactor(reactor);
816       this->MaybeCallOnCancel(reactor);
817       // Inlineable OnDone can be false here because there is no bidi
818       // reactor that has an inlineable OnDone; this only applies to the
819       // DefaultReactor (which is unary).
820       this->MaybeDone(/*inlineable_ondone=*/false);
821     }
822
823     void CallOnDone() override {
824       reactor_.load(std::memory_order_relaxed)->OnDone();
825       grpc_call* call = call_.call();
826       auto call_requester = std::move(call_requester_);
827       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
828       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
829       call_requester();
830     }
831
832     ServerReactor* reactor() override {
833       return reactor_.load(std::memory_order_relaxed);
834     }
835
836     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
837         meta_ops_;
838     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
839     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
840                                 ::grpc::internal::CallOpSendMessage,
841                                 ::grpc::internal::CallOpServerSendStatus>
842         finish_ops_;
843     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
844     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
845                                 ::grpc::internal::CallOpSendMessage>
846         write_ops_;
847     ::grpc::internal::CallbackWithSuccessTag write_tag_;
848     ::grpc::internal::CallOpSet<
849         ::grpc::internal::CallOpRecvMessage<RequestType>>
850         read_ops_;
851     ::grpc::internal::CallbackWithSuccessTag read_tag_;
852
853     ::grpc::CallbackServerContext* const ctx_;
854     ::grpc::internal::Call call_;
855     std::function<void()> call_requester_;
856     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
857     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
858     // callbacks_outstanding_ follows a refcount pattern
859     std::atomic<intptr_t> callbacks_outstanding_{
860         3};  // reserve for OnStarted, Finish, and CompletionOp
861   };
862 };
863
864 }  // namespace internal
865 }  // namespace grpc
866
867 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H