Imported Upstream version 1.33.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_handlers.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
26
27 namespace grpc {
28 namespace internal {
29
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
33   explicit CallbackUnaryHandler(
34       std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35                                         const RequestType*, ResponseType*)>
36           get_reactor)
37       : get_reactor_(std::move(get_reactor)) {}
38
39   void SetMessageAllocator(
40       ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
41           allocator) {
42     allocator_ = allocator;
43   }
44
45   void RunHandler(const HandlerParameter& param) final {
46     // Arena allocate a controller structure (that includes request/response)
47     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48     auto* allocator_state = static_cast<
49         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
50         param.internal_data);
51
52     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54         ServerCallbackUnaryImpl(
55             static_cast<::grpc::CallbackServerContext*>(param.server_context),
56             param.call, allocator_state, std::move(param.call_requester));
57     param.server_context->BeginCompletionOp(
58         param.call, [call](bool) { call->MaybeDone(); }, call);
59
60     ServerUnaryReactor* reactor = nullptr;
61     if (param.status.ok()) {
62       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63           get_reactor_,
64           static_cast<::grpc::CallbackServerContext*>(param.server_context),
65           call->request(), call->response());
66     }
67
68     if (reactor == nullptr) {
69       // if deserialization or reactor creator failed, we need to fail the call
70       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
71           param.call->call(), sizeof(UnimplementedUnaryReactor)))
72           UnimplementedUnaryReactor(
73               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
74     }
75
76     /// Invoke SetupReactor as the last part of the handler
77     call->SetupReactor(reactor);
78   }
79
80   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
81                     ::grpc::Status* status, void** handler_data) final {
82     ::grpc::ByteBuffer buf;
83     buf.set_buffer(req);
84     RequestType* request = nullptr;
85     ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
86         allocator_state = nullptr;
87     if (allocator_ != nullptr) {
88       allocator_state = allocator_->AllocateMessages();
89     } else {
90       allocator_state =
91           new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
92               call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
93               DefaultMessageHolder<RequestType, ResponseType>();
94     }
95     *handler_data = allocator_state;
96     request = allocator_state->request();
97     *status =
98         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
99     buf.Release();
100     if (status->ok()) {
101       return request;
102     }
103     // Clean up on deserialization failure.
104     allocator_state->Release();
105     return nullptr;
106   }
107
108  private:
109   std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
110                                     const RequestType*, ResponseType*)>
111       get_reactor_;
112   ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
113       allocator_ = nullptr;
114
115   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
116    public:
117     void Finish(::grpc::Status s) override {
118       // A callback that only contains a call to MaybeDone can be run as an
119       // inline callback regardless of whether or not OnDone is inlineable
120       // because if the actual OnDone callback needs to be scheduled, MaybeDone
121       // is responsible for dispatching to an executor thread if needed. Thus,
122       // when setting up the finish_tag_, we can set its own callback to
123       // inlineable.
124       finish_tag_.Set(
125           call_.call(),
126           [this](bool) {
127             this->MaybeDone(
128                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129           },
130           &finish_ops_, /*can_inline=*/true);
131       finish_ops_.set_core_cq_tag(&finish_tag_);
132
133       if (!ctx_->sent_initial_metadata_) {
134         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135                                         ctx_->initial_metadata_flags());
136         if (ctx_->compression_level_set()) {
137           finish_ops_.set_compression_level(ctx_->compression_level());
138         }
139         ctx_->sent_initial_metadata_ = true;
140       }
141       // The response is dropped if the status is not OK.
142       if (s.ok()) {
143         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144                                      finish_ops_.SendMessagePtr(response()));
145       } else {
146         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147       }
148       finish_ops_.set_core_cq_tag(&finish_tag_);
149       call_.PerformOps(&finish_ops_);
150     }
151
152     void SendInitialMetadata() override {
153       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
154       this->Ref();
155       // The callback for this function should not be marked inline because it
156       // is directly invoking a user-controlled reaction
157       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158       // thread. However, any OnDone needed after that can be inlined because it
159       // is already running on an executor thread.
160       meta_tag_.Set(
161           call_.call(),
162           [this](bool ok) {
163             ServerUnaryReactor* reactor =
164                 reactor_.load(std::memory_order_relaxed);
165             reactor->OnSendInitialMetadataDone(ok);
166             this->MaybeDone(/*inlineable_ondone=*/true);
167           },
168           &meta_ops_, /*can_inline=*/false);
169       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170                                     ctx_->initial_metadata_flags());
171       if (ctx_->compression_level_set()) {
172         meta_ops_.set_compression_level(ctx_->compression_level());
173       }
174       ctx_->sent_initial_metadata_ = true;
175       meta_ops_.set_core_cq_tag(&meta_tag_);
176       call_.PerformOps(&meta_ops_);
177     }
178
179    private:
180     friend class CallbackUnaryHandler<RequestType, ResponseType>;
181
182     ServerCallbackUnaryImpl(
183         ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
184         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
185             allocator_state,
186         std::function<void()> call_requester)
187         : ctx_(ctx),
188           call_(*call),
189           allocator_state_(allocator_state),
190           call_requester_(std::move(call_requester)) {
191       ctx_->set_message_allocator_state(allocator_state);
192     }
193
194     /// SetupReactor binds the reactor (which also releases any queued
195     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
196     /// the completion of the RPC. This should be the last component of the
197     /// handler.
198     void SetupReactor(ServerUnaryReactor* reactor) {
199       reactor_.store(reactor, std::memory_order_relaxed);
200       this->BindReactor(reactor);
201       this->MaybeCallOnCancel(reactor);
202       this->MaybeDone(reactor->InternalInlineable());
203     }
204
205     const RequestType* request() { return allocator_state_->request(); }
206     ResponseType* response() { return allocator_state_->response(); }
207
208     void CallOnDone() override {
209       reactor_.load(std::memory_order_relaxed)->OnDone();
210       grpc_call* call = call_.call();
211       auto call_requester = std::move(call_requester_);
212       allocator_state_->Release();
213       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
214       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
215       call_requester();
216     }
217
218     ServerReactor* reactor() override {
219       return reactor_.load(std::memory_order_relaxed);
220     }
221
222     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
223         meta_ops_;
224     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
225     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
226                                 ::grpc::internal::CallOpSendMessage,
227                                 ::grpc::internal::CallOpServerSendStatus>
228         finish_ops_;
229     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
230
231     ::grpc::CallbackServerContext* const ctx_;
232     ::grpc::internal::Call call_;
233     ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
234         allocator_state_;
235     std::function<void()> call_requester_;
236     // reactor_ can always be loaded/stored with relaxed memory ordering because
237     // its value is only set once, independently of other data in the object,
238     // and the loads that use it will always actually come provably later even
239     // though they are from different threads since they are triggered by
240     // actions initiated only by the setting up of the reactor_ variable. In
241     // a sense, it's a delayed "const": it gets its value from the SetupReactor
242     // method (not the constructor, so it's not a true const), but it doesn't
243     // change after that and it only gets used by actions caused, directly or
244     // indirectly, by that setup. This comment also applies to the reactor_
245     // variables of the other streaming objects in this file.
246     std::atomic<ServerUnaryReactor*> reactor_;
247     // callbacks_outstanding_ follows a refcount pattern
248     std::atomic<intptr_t> callbacks_outstanding_{
249         3};  // reserve for start, Finish, and CompletionOp
250   };
251 };
252
253 template <class RequestType, class ResponseType>
254 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
255  public:
256   explicit CallbackClientStreamingHandler(
257       std::function<ServerReadReactor<RequestType>*(
258           ::grpc::CallbackServerContext*, ResponseType*)>
259           get_reactor)
260       : get_reactor_(std::move(get_reactor)) {}
261   void RunHandler(const HandlerParameter& param) final {
262     // Arena allocate a reader structure (that includes response)
263     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
264
265     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
266         param.call->call(), sizeof(ServerCallbackReaderImpl)))
267         ServerCallbackReaderImpl(
268             static_cast<::grpc::CallbackServerContext*>(param.server_context),
269             param.call, std::move(param.call_requester));
270     // Inlineable OnDone can be false in the CompletionOp callback because there
271     // is no read reactor that has an inlineable OnDone; this only applies to
272     // the DefaultReactor (which is unary).
273     param.server_context->BeginCompletionOp(
274         param.call,
275         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
276         reader);
277
278     ServerReadReactor<RequestType>* reactor = nullptr;
279     if (param.status.ok()) {
280       reactor = ::grpc::internal::CatchingReactorGetter<
281           ServerReadReactor<RequestType>>(
282           get_reactor_,
283           static_cast<::grpc::CallbackServerContext*>(param.server_context),
284           reader->response());
285     }
286
287     if (reactor == nullptr) {
288       // if deserialization or reactor creator failed, we need to fail the call
289       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
290           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
291           UnimplementedReadReactor<RequestType>(
292               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
293     }
294
295     reader->SetupReactor(reactor);
296   }
297
298  private:
299   std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
300                                                 ResponseType*)>
301       get_reactor_;
302
303   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
304    public:
305     void Finish(::grpc::Status s) override {
306       // A finish tag with only MaybeDone can have its callback inlined
307       // regardless even if OnDone is not inlineable because this callback just
308       // checks a ref and then decides whether or not to dispatch OnDone.
309       finish_tag_.Set(
310           call_.call(),
311           [this](bool) {
312             // Inlineable OnDone can be false here because there is
313             // no read reactor that has an inlineable OnDone; this
314             // only applies to the DefaultReactor (which is unary).
315             this->MaybeDone(/*inlineable_ondone=*/false);
316           },
317           &finish_ops_, /*can_inline=*/true);
318       if (!ctx_->sent_initial_metadata_) {
319         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
320                                         ctx_->initial_metadata_flags());
321         if (ctx_->compression_level_set()) {
322           finish_ops_.set_compression_level(ctx_->compression_level());
323         }
324         ctx_->sent_initial_metadata_ = true;
325       }
326       // The response is dropped if the status is not OK.
327       if (s.ok()) {
328         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
329                                      finish_ops_.SendMessagePtr(&resp_));
330       } else {
331         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
332       }
333       finish_ops_.set_core_cq_tag(&finish_tag_);
334       call_.PerformOps(&finish_ops_);
335     }
336
337     void SendInitialMetadata() override {
338       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
339       this->Ref();
340       // The callback for this function should not be inlined because it invokes
341       // a user-controlled reaction, but any resulting OnDone can be inlined in
342       // the executor to which this callback is dispatched.
343       meta_tag_.Set(
344           call_.call(),
345           [this](bool ok) {
346             ServerReadReactor<RequestType>* reactor =
347                 reactor_.load(std::memory_order_relaxed);
348             reactor->OnSendInitialMetadataDone(ok);
349             this->MaybeDone(/*inlineable_ondone=*/true);
350           },
351           &meta_ops_, /*can_inline=*/false);
352       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
353                                     ctx_->initial_metadata_flags());
354       if (ctx_->compression_level_set()) {
355         meta_ops_.set_compression_level(ctx_->compression_level());
356       }
357       ctx_->sent_initial_metadata_ = true;
358       meta_ops_.set_core_cq_tag(&meta_tag_);
359       call_.PerformOps(&meta_ops_);
360     }
361
362     void Read(RequestType* req) override {
363       this->Ref();
364       read_ops_.RecvMessage(req);
365       call_.PerformOps(&read_ops_);
366     }
367
368    private:
369     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
370
371     ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
372                              ::grpc::internal::Call* call,
373                              std::function<void()> call_requester)
374         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
375
376     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
377       reactor_.store(reactor, std::memory_order_relaxed);
378       // The callback for this function should not be inlined because it invokes
379       // a user-controlled reaction, but any resulting OnDone can be inlined in
380       // the executor to which this callback is dispatched.
381       read_tag_.Set(
382           call_.call(),
383           [this, reactor](bool ok) {
384             reactor->OnReadDone(ok);
385             this->MaybeDone(/*inlineable_ondone=*/true);
386           },
387           &read_ops_, /*can_inline=*/false);
388       read_ops_.set_core_cq_tag(&read_tag_);
389       this->BindReactor(reactor);
390       this->MaybeCallOnCancel(reactor);
391       // Inlineable OnDone can be false here because there is no read
392       // reactor that has an inlineable OnDone; this only applies to the
393       // DefaultReactor (which is unary).
394       this->MaybeDone(/*inlineable_ondone=*/false);
395     }
396
397     ~ServerCallbackReaderImpl() {}
398
399     ResponseType* response() { return &resp_; }
400
401     void CallOnDone() override {
402       reactor_.load(std::memory_order_relaxed)->OnDone();
403       grpc_call* call = call_.call();
404       auto call_requester = std::move(call_requester_);
405       this->~ServerCallbackReaderImpl();  // explicitly call destructor
406       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
407       call_requester();
408     }
409
410     ServerReactor* reactor() override {
411       return reactor_.load(std::memory_order_relaxed);
412     }
413
414     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
415         meta_ops_;
416     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
417     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
418                                 ::grpc::internal::CallOpSendMessage,
419                                 ::grpc::internal::CallOpServerSendStatus>
420         finish_ops_;
421     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
422     ::grpc::internal::CallOpSet<
423         ::grpc::internal::CallOpRecvMessage<RequestType>>
424         read_ops_;
425     ::grpc::internal::CallbackWithSuccessTag read_tag_;
426
427     ::grpc::CallbackServerContext* const ctx_;
428     ::grpc::internal::Call call_;
429     ResponseType resp_;
430     std::function<void()> call_requester_;
431     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432     std::atomic<ServerReadReactor<RequestType>*> reactor_;
433     // callbacks_outstanding_ follows a refcount pattern
434     std::atomic<intptr_t> callbacks_outstanding_{
435         3};  // reserve for OnStarted, Finish, and CompletionOp
436   };
437 };
438
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
441  public:
442   explicit CallbackServerStreamingHandler(
443       std::function<ServerWriteReactor<ResponseType>*(
444           ::grpc::CallbackServerContext*, const RequestType*)>
445           get_reactor)
446       : get_reactor_(std::move(get_reactor)) {}
447   void RunHandler(const HandlerParameter& param) final {
448     // Arena allocate a writer structure
449     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
450
451     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
452         param.call->call(), sizeof(ServerCallbackWriterImpl)))
453         ServerCallbackWriterImpl(
454             static_cast<::grpc::CallbackServerContext*>(param.server_context),
455             param.call, static_cast<RequestType*>(param.request),
456             std::move(param.call_requester));
457     // Inlineable OnDone can be false in the CompletionOp callback because there
458     // is no write reactor that has an inlineable OnDone; this only applies to
459     // the DefaultReactor (which is unary).
460     param.server_context->BeginCompletionOp(
461         param.call,
462         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
463         writer);
464
465     ServerWriteReactor<ResponseType>* reactor = nullptr;
466     if (param.status.ok()) {
467       reactor = ::grpc::internal::CatchingReactorGetter<
468           ServerWriteReactor<ResponseType>>(
469           get_reactor_,
470           static_cast<::grpc::CallbackServerContext*>(param.server_context),
471           writer->request());
472     }
473     if (reactor == nullptr) {
474       // if deserialization or reactor creator failed, we need to fail the call
475       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
476           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
477           UnimplementedWriteReactor<ResponseType>(
478               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
479     }
480
481     writer->SetupReactor(reactor);
482   }
483
484   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
485                     ::grpc::Status* status, void** /*handler_data*/) final {
486     ::grpc::ByteBuffer buf;
487     buf.set_buffer(req);
488     auto* request =
489         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
490             call, sizeof(RequestType))) RequestType();
491     *status =
492         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
493     buf.Release();
494     if (status->ok()) {
495       return request;
496     }
497     request->~RequestType();
498     return nullptr;
499   }
500
501  private:
502   std::function<ServerWriteReactor<ResponseType>*(
503       ::grpc::CallbackServerContext*, const RequestType*)>
504       get_reactor_;
505
506   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
507    public:
508     void Finish(::grpc::Status s) override {
509       // A finish tag with only MaybeDone can have its callback inlined
510       // regardless even if OnDone is not inlineable because this callback just
511       // checks a ref and then decides whether or not to dispatch OnDone.
512       finish_tag_.Set(
513           call_.call(),
514           [this](bool) {
515             // Inlineable OnDone can be false here because there is
516             // no write reactor that has an inlineable OnDone; this
517             // only applies to the DefaultReactor (which is unary).
518             this->MaybeDone(/*inlineable_ondone=*/false);
519           },
520           &finish_ops_, /*can_inline=*/true);
521       finish_ops_.set_core_cq_tag(&finish_tag_);
522
523       if (!ctx_->sent_initial_metadata_) {
524         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
525                                         ctx_->initial_metadata_flags());
526         if (ctx_->compression_level_set()) {
527           finish_ops_.set_compression_level(ctx_->compression_level());
528         }
529         ctx_->sent_initial_metadata_ = true;
530       }
531       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
532       call_.PerformOps(&finish_ops_);
533     }
534
535     void SendInitialMetadata() override {
536       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
537       this->Ref();
538       // The callback for this function should not be inlined because it invokes
539       // a user-controlled reaction, but any resulting OnDone can be inlined in
540       // the executor to which this callback is dispatched.
541       meta_tag_.Set(
542           call_.call(),
543           [this](bool ok) {
544             ServerWriteReactor<ResponseType>* reactor =
545                 reactor_.load(std::memory_order_relaxed);
546             reactor->OnSendInitialMetadataDone(ok);
547             this->MaybeDone(/*inlineable_ondone=*/true);
548           },
549           &meta_ops_, /*can_inline=*/false);
550       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551                                     ctx_->initial_metadata_flags());
552       if (ctx_->compression_level_set()) {
553         meta_ops_.set_compression_level(ctx_->compression_level());
554       }
555       ctx_->sent_initial_metadata_ = true;
556       meta_ops_.set_core_cq_tag(&meta_tag_);
557       call_.PerformOps(&meta_ops_);
558     }
559
560     void Write(const ResponseType* resp,
561                ::grpc::WriteOptions options) override {
562       this->Ref();
563       if (options.is_last_message()) {
564         options.set_buffer_hint();
565       }
566       if (!ctx_->sent_initial_metadata_) {
567         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568                                        ctx_->initial_metadata_flags());
569         if (ctx_->compression_level_set()) {
570           write_ops_.set_compression_level(ctx_->compression_level());
571         }
572         ctx_->sent_initial_metadata_ = true;
573       }
574       // TODO(vjpai): don't assert
575       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576       call_.PerformOps(&write_ops_);
577     }
578
579     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580                         ::grpc::Status s) override {
581       // This combines the write into the finish callback
582       // TODO(vjpai): don't assert
583       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584       Finish(std::move(s));
585     }
586
587    private:
588     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
589
590     ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
591                              ::grpc::internal::Call* call,
592                              const RequestType* req,
593                              std::function<void()> call_requester)
594         : ctx_(ctx),
595           call_(*call),
596           req_(req),
597           call_requester_(std::move(call_requester)) {}
598
599     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600       reactor_.store(reactor, std::memory_order_relaxed);
601       // The callback for this function should not be inlined because it invokes
602       // a user-controlled reaction, but any resulting OnDone can be inlined in
603       // the executor to which this callback is dispatched.
604       write_tag_.Set(
605           call_.call(),
606           [this, reactor](bool ok) {
607             reactor->OnWriteDone(ok);
608             this->MaybeDone(/*inlineable_ondone=*/true);
609           },
610           &write_ops_, /*can_inline=*/false);
611       write_ops_.set_core_cq_tag(&write_tag_);
612       this->BindReactor(reactor);
613       this->MaybeCallOnCancel(reactor);
614       // Inlineable OnDone can be false here because there is no write
615       // reactor that has an inlineable OnDone; this only applies to the
616       // DefaultReactor (which is unary).
617       this->MaybeDone(/*inlineable_ondone=*/false);
618     }
619     ~ServerCallbackWriterImpl() { req_->~RequestType(); }
620
621     const RequestType* request() { return req_; }
622
623     void CallOnDone() override {
624       reactor_.load(std::memory_order_relaxed)->OnDone();
625       grpc_call* call = call_.call();
626       auto call_requester = std::move(call_requester_);
627       this->~ServerCallbackWriterImpl();  // explicitly call destructor
628       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
629       call_requester();
630     }
631
632     ServerReactor* reactor() override {
633       return reactor_.load(std::memory_order_relaxed);
634     }
635
636     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
637         meta_ops_;
638     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
639     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
640                                 ::grpc::internal::CallOpSendMessage,
641                                 ::grpc::internal::CallOpServerSendStatus>
642         finish_ops_;
643     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
644     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
645                                 ::grpc::internal::CallOpSendMessage>
646         write_ops_;
647     ::grpc::internal::CallbackWithSuccessTag write_tag_;
648
649     ::grpc::CallbackServerContext* const ctx_;
650     ::grpc::internal::Call call_;
651     const RequestType* req_;
652     std::function<void()> call_requester_;
653     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
654     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
655     // callbacks_outstanding_ follows a refcount pattern
656     std::atomic<intptr_t> callbacks_outstanding_{
657         3};  // reserve for OnStarted, Finish, and CompletionOp
658   };
659 };
660
661 template <class RequestType, class ResponseType>
662 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
663  public:
664   explicit CallbackBidiHandler(
665       std::function<ServerBidiReactor<RequestType, ResponseType>*(
666           ::grpc::CallbackServerContext*)>
667           get_reactor)
668       : get_reactor_(std::move(get_reactor)) {}
669   void RunHandler(const HandlerParameter& param) final {
670     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
671
672     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
673         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
674         ServerCallbackReaderWriterImpl(
675             static_cast<::grpc::CallbackServerContext*>(param.server_context),
676             param.call, std::move(param.call_requester));
677     // Inlineable OnDone can be false in the CompletionOp callback because there
678     // is no bidi reactor that has an inlineable OnDone; this only applies to
679     // the DefaultReactor (which is unary).
680     param.server_context->BeginCompletionOp(
681         param.call,
682         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
683         stream);
684
685     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
686     if (param.status.ok()) {
687       reactor = ::grpc::internal::CatchingReactorGetter<
688           ServerBidiReactor<RequestType, ResponseType>>(
689           get_reactor_,
690           static_cast<::grpc::CallbackServerContext*>(param.server_context));
691     }
692
693     if (reactor == nullptr) {
694       // if deserialization or reactor creator failed, we need to fail the call
695       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
696           param.call->call(),
697           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
698           UnimplementedBidiReactor<RequestType, ResponseType>(
699               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
700     }
701
702     stream->SetupReactor(reactor);
703   }
704
705  private:
706   std::function<ServerBidiReactor<RequestType, ResponseType>*(
707       ::grpc::CallbackServerContext*)>
708       get_reactor_;
709
710   class ServerCallbackReaderWriterImpl
711       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
712    public:
713     void Finish(::grpc::Status s) override {
714       // A finish tag with only MaybeDone can have its callback inlined
715       // regardless even if OnDone is not inlineable because this callback just
716       // checks a ref and then decides whether or not to dispatch OnDone.
717       finish_tag_.Set(
718           call_.call(),
719           [this](bool) {
720             // Inlineable OnDone can be false here because there is
721             // no bidi reactor that has an inlineable OnDone; this
722             // only applies to the DefaultReactor (which is unary).
723             this->MaybeDone(/*inlineable_ondone=*/false);
724           },
725           &finish_ops_, /*can_inline=*/true);
726       finish_ops_.set_core_cq_tag(&finish_tag_);
727
728       if (!ctx_->sent_initial_metadata_) {
729         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
730                                         ctx_->initial_metadata_flags());
731         if (ctx_->compression_level_set()) {
732           finish_ops_.set_compression_level(ctx_->compression_level());
733         }
734         ctx_->sent_initial_metadata_ = true;
735       }
736       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
737       call_.PerformOps(&finish_ops_);
738     }
739
740     void SendInitialMetadata() override {
741       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
742       this->Ref();
743       // The callback for this function should not be inlined because it invokes
744       // a user-controlled reaction, but any resulting OnDone can be inlined in
745       // the executor to which this callback is dispatched.
746       meta_tag_.Set(
747           call_.call(),
748           [this](bool ok) {
749             ServerBidiReactor<RequestType, ResponseType>* reactor =
750                 reactor_.load(std::memory_order_relaxed);
751             reactor->OnSendInitialMetadataDone(ok);
752             this->MaybeDone(/*inlineable_ondone=*/true);
753           },
754           &meta_ops_, /*can_inline=*/false);
755       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
756                                     ctx_->initial_metadata_flags());
757       if (ctx_->compression_level_set()) {
758         meta_ops_.set_compression_level(ctx_->compression_level());
759       }
760       ctx_->sent_initial_metadata_ = true;
761       meta_ops_.set_core_cq_tag(&meta_tag_);
762       call_.PerformOps(&meta_ops_);
763     }
764
765     void Write(const ResponseType* resp,
766                ::grpc::WriteOptions options) override {
767       this->Ref();
768       if (options.is_last_message()) {
769         options.set_buffer_hint();
770       }
771       if (!ctx_->sent_initial_metadata_) {
772         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773                                        ctx_->initial_metadata_flags());
774         if (ctx_->compression_level_set()) {
775           write_ops_.set_compression_level(ctx_->compression_level());
776         }
777         ctx_->sent_initial_metadata_ = true;
778       }
779       // TODO(vjpai): don't assert
780       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
781       call_.PerformOps(&write_ops_);
782     }
783
784     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
785                         ::grpc::Status s) override {
786       // TODO(vjpai): don't assert
787       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
788       Finish(std::move(s));
789     }
790
791     void Read(RequestType* req) override {
792       this->Ref();
793       read_ops_.RecvMessage(req);
794       call_.PerformOps(&read_ops_);
795     }
796
797    private:
798     friend class CallbackBidiHandler<RequestType, ResponseType>;
799
800     ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
801                                    ::grpc::internal::Call* call,
802                                    std::function<void()> call_requester)
803         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
804
805     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
806       reactor_.store(reactor, std::memory_order_relaxed);
807       // The callbacks for these functions should not be inlined because they
808       // invoke user-controlled reactions, but any resulting OnDones can be
809       // inlined in the executor to which a callback is dispatched.
810       write_tag_.Set(
811           call_.call(),
812           [this, reactor](bool ok) {
813             reactor->OnWriteDone(ok);
814             this->MaybeDone(/*inlineable_ondone=*/true);
815           },
816           &write_ops_, /*can_inline=*/false);
817       write_ops_.set_core_cq_tag(&write_tag_);
818       read_tag_.Set(
819           call_.call(),
820           [this, reactor](bool ok) {
821             reactor->OnReadDone(ok);
822             this->MaybeDone(/*inlineable_ondone=*/true);
823           },
824           &read_ops_, /*can_inline=*/false);
825       read_ops_.set_core_cq_tag(&read_tag_);
826       this->BindReactor(reactor);
827       this->MaybeCallOnCancel(reactor);
828       // Inlineable OnDone can be false here because there is no bidi
829       // reactor that has an inlineable OnDone; this only applies to the
830       // DefaultReactor (which is unary).
831       this->MaybeDone(/*inlineable_ondone=*/false);
832     }
833
834     void CallOnDone() override {
835       reactor_.load(std::memory_order_relaxed)->OnDone();
836       grpc_call* call = call_.call();
837       auto call_requester = std::move(call_requester_);
838       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
839       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
840       call_requester();
841     }
842
843     ServerReactor* reactor() override {
844       return reactor_.load(std::memory_order_relaxed);
845     }
846
847     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
848         meta_ops_;
849     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
850     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
851                                 ::grpc::internal::CallOpSendMessage,
852                                 ::grpc::internal::CallOpServerSendStatus>
853         finish_ops_;
854     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
855     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
856                                 ::grpc::internal::CallOpSendMessage>
857         write_ops_;
858     ::grpc::internal::CallbackWithSuccessTag write_tag_;
859     ::grpc::internal::CallOpSet<
860         ::grpc::internal::CallOpRecvMessage<RequestType>>
861         read_ops_;
862     ::grpc::internal::CallbackWithSuccessTag read_tag_;
863
864     ::grpc::CallbackServerContext* const ctx_;
865     ::grpc::internal::Call call_;
866     std::function<void()> call_requester_;
867     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
868     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
869     // callbacks_outstanding_ follows a refcount pattern
870     std::atomic<intptr_t> callbacks_outstanding_{
871         3};  // reserve for OnStarted, Finish, and CompletionOp
872   };
873 };
874
875 }  // namespace internal
876 }  // namespace grpc
877
878 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H