Imported Upstream version 1.31.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_handlers.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback_impl.h>
24 #include <grpcpp/impl/codegen/server_context_impl.h>
25 #include <grpcpp/impl/codegen/status.h>
26
27 namespace grpc_impl {
28 namespace internal {
29
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
33   explicit CallbackUnaryHandler(
34       std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
35                                         const RequestType*, ResponseType*)>
36           get_reactor)
37       : get_reactor_(std::move(get_reactor)) {}
38
39   void SetMessageAllocator(
40       ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
41           allocator) {
42     allocator_ = allocator;
43   }
44
45   void RunHandler(const HandlerParameter& param) final {
46     // Arena allocate a controller structure (that includes request/response)
47     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48     auto* allocator_state = static_cast<
49         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
50         param.internal_data);
51
52     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54         ServerCallbackUnaryImpl(
55             static_cast<::grpc_impl::CallbackServerContext*>(
56                 param.server_context),
57             param.call, allocator_state, std::move(param.call_requester));
58     param.server_context->BeginCompletionOp(
59         param.call, [call](bool) { call->MaybeDone(); }, call);
60
61     ServerUnaryReactor* reactor = nullptr;
62     if (param.status.ok()) {
63       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64           get_reactor_,
65           static_cast<::grpc_impl::CallbackServerContext*>(
66               param.server_context),
67           call->request(), call->response());
68     }
69
70     if (reactor == nullptr) {
71       // if deserialization or reactor creator failed, we need to fail the call
72       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
73           param.call->call(), sizeof(UnimplementedUnaryReactor)))
74           UnimplementedUnaryReactor(
75               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
76     }
77
78     /// Invoke SetupReactor as the last part of the handler
79     call->SetupReactor(reactor);
80   }
81
82   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
83                     ::grpc::Status* status, void** handler_data) final {
84     ::grpc::ByteBuffer buf;
85     buf.set_buffer(req);
86     RequestType* request = nullptr;
87     ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
88         allocator_state = nullptr;
89     if (allocator_ != nullptr) {
90       allocator_state = allocator_->AllocateMessages();
91     } else {
92       allocator_state =
93           new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
94               call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
95               DefaultMessageHolder<RequestType, ResponseType>();
96     }
97     *handler_data = allocator_state;
98     request = allocator_state->request();
99     *status =
100         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
101     buf.Release();
102     if (status->ok()) {
103       return request;
104     }
105     // Clean up on deserialization failure.
106     allocator_state->Release();
107     return nullptr;
108   }
109
110  private:
111   std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
112                                     const RequestType*, ResponseType*)>
113       get_reactor_;
114   ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
115       allocator_ = nullptr;
116
117   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
118    public:
119     void Finish(::grpc::Status s) override {
120       // A callback that only contains a call to MaybeDone can be run as an
121       // inline callback regardless of whether or not OnDone is inlineable
122       // because if the actual OnDone callback needs to be scheduled, MaybeDone
123       // is responsible for dispatching to an executor thread if needed. Thus,
124       // when setting up the finish_tag_, we can set its own callback to
125       // inlineable.
126       finish_tag_.Set(
127           call_.call(),
128           [this](bool) {
129             this->MaybeDone(
130                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131           },
132           &finish_ops_, /*can_inline=*/true);
133       finish_ops_.set_core_cq_tag(&finish_tag_);
134
135       if (!ctx_->sent_initial_metadata_) {
136         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
137                                         ctx_->initial_metadata_flags());
138         if (ctx_->compression_level_set()) {
139           finish_ops_.set_compression_level(ctx_->compression_level());
140         }
141         ctx_->sent_initial_metadata_ = true;
142       }
143       // The response is dropped if the status is not OK.
144       if (s.ok()) {
145         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146                                      finish_ops_.SendMessagePtr(response()));
147       } else {
148         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
149       }
150       finish_ops_.set_core_cq_tag(&finish_tag_);
151       call_.PerformOps(&finish_ops_);
152     }
153
154     void SendInitialMetadata() override {
155       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
156       this->Ref();
157       // The callback for this function should not be marked inline because it
158       // is directly invoking a user-controlled reaction
159       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
160       // thread. However, any OnDone needed after that can be inlined because it
161       // is already running on an executor thread.
162       meta_tag_.Set(call_.call(),
163                     [this](bool ok) {
164                       ServerUnaryReactor* reactor =
165                           reactor_.load(std::memory_order_relaxed);
166                       reactor->OnSendInitialMetadataDone(ok);
167                       this->MaybeDone(/*inlineable_ondone=*/true);
168                     },
169                     &meta_ops_, /*can_inline=*/false);
170       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171                                     ctx_->initial_metadata_flags());
172       if (ctx_->compression_level_set()) {
173         meta_ops_.set_compression_level(ctx_->compression_level());
174       }
175       ctx_->sent_initial_metadata_ = true;
176       meta_ops_.set_core_cq_tag(&meta_tag_);
177       call_.PerformOps(&meta_ops_);
178     }
179
180    private:
181     friend class CallbackUnaryHandler<RequestType, ResponseType>;
182
183     ServerCallbackUnaryImpl(
184         ::grpc_impl::CallbackServerContext* ctx, ::grpc::internal::Call* call,
185         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
186             allocator_state,
187         std::function<void()> call_requester)
188         : ctx_(ctx),
189           call_(*call),
190           allocator_state_(allocator_state),
191           call_requester_(std::move(call_requester)) {
192       ctx_->set_message_allocator_state(allocator_state);
193     }
194
195     /// SetupReactor binds the reactor (which also releases any queued
196     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
197     /// the completion of the RPC. This should be the last component of the
198     /// handler.
199     void SetupReactor(ServerUnaryReactor* reactor) {
200       reactor_.store(reactor, std::memory_order_relaxed);
201       this->BindReactor(reactor);
202       this->MaybeCallOnCancel(reactor);
203       this->MaybeDone(reactor->InternalInlineable());
204     }
205
206     const RequestType* request() { return allocator_state_->request(); }
207     ResponseType* response() { return allocator_state_->response(); }
208
209     void CallOnDone() override {
210       reactor_.load(std::memory_order_relaxed)->OnDone();
211       grpc_call* call = call_.call();
212       auto call_requester = std::move(call_requester_);
213       allocator_state_->Release();
214       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
215       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
216       call_requester();
217     }
218
219     ServerReactor* reactor() override {
220       return reactor_.load(std::memory_order_relaxed);
221     }
222
223     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
224         meta_ops_;
225     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
226     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
227                                 ::grpc::internal::CallOpSendMessage,
228                                 ::grpc::internal::CallOpServerSendStatus>
229         finish_ops_;
230     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
231
232     ::grpc_impl::CallbackServerContext* const ctx_;
233     ::grpc::internal::Call call_;
234     ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
235         allocator_state_;
236     std::function<void()> call_requester_;
237     // reactor_ can always be loaded/stored with relaxed memory ordering because
238     // its value is only set once, independently of other data in the object,
239     // and the loads that use it will always actually come provably later even
240     // though they are from different threads since they are triggered by
241     // actions initiated only by the setting up of the reactor_ variable. In
242     // a sense, it's a delayed "const": it gets its value from the SetupReactor
243     // method (not the constructor, so it's not a true const), but it doesn't
244     // change after that and it only gets used by actions caused, directly or
245     // indirectly, by that setup. This comment also applies to the reactor_
246     // variables of the other streaming objects in this file.
247     std::atomic<ServerUnaryReactor*> reactor_;
248     // callbacks_outstanding_ follows a refcount pattern
249     std::atomic<intptr_t> callbacks_outstanding_{
250         3};  // reserve for start, Finish, and CompletionOp
251   };
252 };
253
254 template <class RequestType, class ResponseType>
255 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
256  public:
257   explicit CallbackClientStreamingHandler(
258       std::function<ServerReadReactor<RequestType>*(
259           ::grpc_impl::CallbackServerContext*, ResponseType*)>
260           get_reactor)
261       : get_reactor_(std::move(get_reactor)) {}
262   void RunHandler(const HandlerParameter& param) final {
263     // Arena allocate a reader structure (that includes response)
264     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
265
266     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
267         param.call->call(), sizeof(ServerCallbackReaderImpl)))
268         ServerCallbackReaderImpl(
269             static_cast<::grpc_impl::CallbackServerContext*>(
270                 param.server_context),
271             param.call, std::move(param.call_requester));
272     // Inlineable OnDone can be false in the CompletionOp callback because there
273     // is no read reactor that has an inlineable OnDone; this only applies to
274     // the DefaultReactor (which is unary).
275     param.server_context->BeginCompletionOp(
276         param.call,
277         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
278         reader);
279
280     ServerReadReactor<RequestType>* reactor = nullptr;
281     if (param.status.ok()) {
282       reactor = ::grpc::internal::CatchingReactorGetter<
283           ServerReadReactor<RequestType>>(
284           get_reactor_,
285           static_cast<::grpc_impl::CallbackServerContext*>(
286               param.server_context),
287           reader->response());
288     }
289
290     if (reactor == nullptr) {
291       // if deserialization or reactor creator failed, we need to fail the call
292       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
293           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294           UnimplementedReadReactor<RequestType>(
295               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
296     }
297
298     reader->SetupReactor(reactor);
299   }
300
301  private:
302   std::function<ServerReadReactor<RequestType>*(
303       ::grpc_impl::CallbackServerContext*, ResponseType*)>
304       get_reactor_;
305
306   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307    public:
308     void Finish(::grpc::Status s) override {
309       // A finish tag with only MaybeDone can have its callback inlined
310       // regardless even if OnDone is not inlineable because this callback just
311       // checks a ref and then decides whether or not to dispatch OnDone.
312       finish_tag_.Set(call_.call(),
313                       [this](bool) {
314                         // Inlineable OnDone can be false here because there is
315                         // no read reactor that has an inlineable OnDone; this
316                         // only applies to the DefaultReactor (which is unary).
317                         this->MaybeDone(/*inlineable_ondone=*/false);
318                       },
319                       &finish_ops_, /*can_inline=*/true);
320       if (!ctx_->sent_initial_metadata_) {
321         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322                                         ctx_->initial_metadata_flags());
323         if (ctx_->compression_level_set()) {
324           finish_ops_.set_compression_level(ctx_->compression_level());
325         }
326         ctx_->sent_initial_metadata_ = true;
327       }
328       // The response is dropped if the status is not OK.
329       if (s.ok()) {
330         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331                                      finish_ops_.SendMessagePtr(&resp_));
332       } else {
333         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334       }
335       finish_ops_.set_core_cq_tag(&finish_tag_);
336       call_.PerformOps(&finish_ops_);
337     }
338
339     void SendInitialMetadata() override {
340       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
341       this->Ref();
342       // The callback for this function should not be inlined because it invokes
343       // a user-controlled reaction, but any resulting OnDone can be inlined in
344       // the executor to which this callback is dispatched.
345       meta_tag_.Set(call_.call(),
346                     [this](bool ok) {
347                       ServerReadReactor<RequestType>* reactor =
348                           reactor_.load(std::memory_order_relaxed);
349                       reactor->OnSendInitialMetadataDone(ok);
350                       this->MaybeDone(/*inlineable_ondone=*/true);
351                     },
352                     &meta_ops_, /*can_inline=*/false);
353       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354                                     ctx_->initial_metadata_flags());
355       if (ctx_->compression_level_set()) {
356         meta_ops_.set_compression_level(ctx_->compression_level());
357       }
358       ctx_->sent_initial_metadata_ = true;
359       meta_ops_.set_core_cq_tag(&meta_tag_);
360       call_.PerformOps(&meta_ops_);
361     }
362
363     void Read(RequestType* req) override {
364       this->Ref();
365       read_ops_.RecvMessage(req);
366       call_.PerformOps(&read_ops_);
367     }
368
369    private:
370     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371
372     ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
373                              ::grpc::internal::Call* call,
374                              std::function<void()> call_requester)
375         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376
377     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378       reactor_.store(reactor, std::memory_order_relaxed);
379       // The callback for this function should not be inlined because it invokes
380       // a user-controlled reaction, but any resulting OnDone can be inlined in
381       // the executor to which this callback is dispatched.
382       read_tag_.Set(call_.call(),
383                     [this, reactor](bool ok) {
384                       reactor->OnReadDone(ok);
385                       this->MaybeDone(/*inlineable_ondone=*/true);
386                     },
387                     &read_ops_, /*can_inline=*/false);
388       read_ops_.set_core_cq_tag(&read_tag_);
389       this->BindReactor(reactor);
390       this->MaybeCallOnCancel(reactor);
391       // Inlineable OnDone can be false here because there is no read
392       // reactor that has an inlineable OnDone; this only applies to the
393       // DefaultReactor (which is unary).
394       this->MaybeDone(/*inlineable_ondone=*/false);
395     }
396
397     ~ServerCallbackReaderImpl() {}
398
399     ResponseType* response() { return &resp_; }
400
401     void CallOnDone() override {
402       reactor_.load(std::memory_order_relaxed)->OnDone();
403       grpc_call* call = call_.call();
404       auto call_requester = std::move(call_requester_);
405       this->~ServerCallbackReaderImpl();  // explicitly call destructor
406       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
407       call_requester();
408     }
409
410     ServerReactor* reactor() override {
411       return reactor_.load(std::memory_order_relaxed);
412     }
413
414     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
415         meta_ops_;
416     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
417     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
418                                 ::grpc::internal::CallOpSendMessage,
419                                 ::grpc::internal::CallOpServerSendStatus>
420         finish_ops_;
421     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
422     ::grpc::internal::CallOpSet<
423         ::grpc::internal::CallOpRecvMessage<RequestType>>
424         read_ops_;
425     ::grpc::internal::CallbackWithSuccessTag read_tag_;
426
427     ::grpc_impl::CallbackServerContext* const ctx_;
428     ::grpc::internal::Call call_;
429     ResponseType resp_;
430     std::function<void()> call_requester_;
431     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432     std::atomic<ServerReadReactor<RequestType>*> reactor_;
433     // callbacks_outstanding_ follows a refcount pattern
434     std::atomic<intptr_t> callbacks_outstanding_{
435         3};  // reserve for OnStarted, Finish, and CompletionOp
436   };
437 };
438
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
441  public:
442   explicit CallbackServerStreamingHandler(
443       std::function<ServerWriteReactor<ResponseType>*(
444           ::grpc_impl::CallbackServerContext*, const RequestType*)>
445           get_reactor)
446       : get_reactor_(std::move(get_reactor)) {}
447   void RunHandler(const HandlerParameter& param) final {
448     // Arena allocate a writer structure
449     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
450
451     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
452         param.call->call(), sizeof(ServerCallbackWriterImpl)))
453         ServerCallbackWriterImpl(
454             static_cast<::grpc_impl::CallbackServerContext*>(
455                 param.server_context),
456             param.call, static_cast<RequestType*>(param.request),
457             std::move(param.call_requester));
458     // Inlineable OnDone can be false in the CompletionOp callback because there
459     // is no write reactor that has an inlineable OnDone; this only applies to
460     // the DefaultReactor (which is unary).
461     param.server_context->BeginCompletionOp(
462         param.call,
463         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
464         writer);
465
466     ServerWriteReactor<ResponseType>* reactor = nullptr;
467     if (param.status.ok()) {
468       reactor = ::grpc::internal::CatchingReactorGetter<
469           ServerWriteReactor<ResponseType>>(
470           get_reactor_,
471           static_cast<::grpc_impl::CallbackServerContext*>(
472               param.server_context),
473           writer->request());
474     }
475     if (reactor == nullptr) {
476       // if deserialization or reactor creator failed, we need to fail the call
477       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
478           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
479           UnimplementedWriteReactor<ResponseType>(
480               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
481     }
482
483     writer->SetupReactor(reactor);
484   }
485
486   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
487                     ::grpc::Status* status, void** /*handler_data*/) final {
488     ::grpc::ByteBuffer buf;
489     buf.set_buffer(req);
490     auto* request =
491         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
492             call, sizeof(RequestType))) RequestType();
493     *status =
494         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
495     buf.Release();
496     if (status->ok()) {
497       return request;
498     }
499     request->~RequestType();
500     return nullptr;
501   }
502
503  private:
504   std::function<ServerWriteReactor<ResponseType>*(
505       ::grpc_impl::CallbackServerContext*, const RequestType*)>
506       get_reactor_;
507
508   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
509    public:
510     void Finish(::grpc::Status s) override {
511       // A finish tag with only MaybeDone can have its callback inlined
512       // regardless even if OnDone is not inlineable because this callback just
513       // checks a ref and then decides whether or not to dispatch OnDone.
514       finish_tag_.Set(call_.call(),
515                       [this](bool) {
516                         // Inlineable OnDone can be false here because there is
517                         // no write reactor that has an inlineable OnDone; this
518                         // only applies to the DefaultReactor (which is unary).
519                         this->MaybeDone(/*inlineable_ondone=*/false);
520                       },
521                       &finish_ops_, /*can_inline=*/true);
522       finish_ops_.set_core_cq_tag(&finish_tag_);
523
524       if (!ctx_->sent_initial_metadata_) {
525         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526                                         ctx_->initial_metadata_flags());
527         if (ctx_->compression_level_set()) {
528           finish_ops_.set_compression_level(ctx_->compression_level());
529         }
530         ctx_->sent_initial_metadata_ = true;
531       }
532       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533       call_.PerformOps(&finish_ops_);
534     }
535
536     void SendInitialMetadata() override {
537       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
538       this->Ref();
539       // The callback for this function should not be inlined because it invokes
540       // a user-controlled reaction, but any resulting OnDone can be inlined in
541       // the executor to which this callback is dispatched.
542       meta_tag_.Set(call_.call(),
543                     [this](bool ok) {
544                       ServerWriteReactor<ResponseType>* reactor =
545                           reactor_.load(std::memory_order_relaxed);
546                       reactor->OnSendInitialMetadataDone(ok);
547                       this->MaybeDone(/*inlineable_ondone=*/true);
548                     },
549                     &meta_ops_, /*can_inline=*/false);
550       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551                                     ctx_->initial_metadata_flags());
552       if (ctx_->compression_level_set()) {
553         meta_ops_.set_compression_level(ctx_->compression_level());
554       }
555       ctx_->sent_initial_metadata_ = true;
556       meta_ops_.set_core_cq_tag(&meta_tag_);
557       call_.PerformOps(&meta_ops_);
558     }
559
560     void Write(const ResponseType* resp,
561                ::grpc::WriteOptions options) override {
562       this->Ref();
563       if (options.is_last_message()) {
564         options.set_buffer_hint();
565       }
566       if (!ctx_->sent_initial_metadata_) {
567         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568                                        ctx_->initial_metadata_flags());
569         if (ctx_->compression_level_set()) {
570           write_ops_.set_compression_level(ctx_->compression_level());
571         }
572         ctx_->sent_initial_metadata_ = true;
573       }
574       // TODO(vjpai): don't assert
575       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576       call_.PerformOps(&write_ops_);
577     }
578
579     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580                         ::grpc::Status s) override {
581       // This combines the write into the finish callback
582       // TODO(vjpai): don't assert
583       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584       Finish(std::move(s));
585     }
586
587    private:
588     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
589
590     ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
591                              ::grpc::internal::Call* call,
592                              const RequestType* req,
593                              std::function<void()> call_requester)
594         : ctx_(ctx),
595           call_(*call),
596           req_(req),
597           call_requester_(std::move(call_requester)) {}
598
599     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
600       reactor_.store(reactor, std::memory_order_relaxed);
601       // The callback for this function should not be inlined because it invokes
602       // a user-controlled reaction, but any resulting OnDone can be inlined in
603       // the executor to which this callback is dispatched.
604       write_tag_.Set(call_.call(),
605                      [this, reactor](bool ok) {
606                        reactor->OnWriteDone(ok);
607                        this->MaybeDone(/*inlineable_ondone=*/true);
608                      },
609                      &write_ops_, /*can_inline=*/false);
610       write_ops_.set_core_cq_tag(&write_tag_);
611       this->BindReactor(reactor);
612       this->MaybeCallOnCancel(reactor);
613       // Inlineable OnDone can be false here because there is no write
614       // reactor that has an inlineable OnDone; this only applies to the
615       // DefaultReactor (which is unary).
616       this->MaybeDone(/*inlineable_ondone=*/false);
617     }
618     ~ServerCallbackWriterImpl() { req_->~RequestType(); }
619
620     const RequestType* request() { return req_; }
621
622     void CallOnDone() override {
623       reactor_.load(std::memory_order_relaxed)->OnDone();
624       grpc_call* call = call_.call();
625       auto call_requester = std::move(call_requester_);
626       this->~ServerCallbackWriterImpl();  // explicitly call destructor
627       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
628       call_requester();
629     }
630
631     ServerReactor* reactor() override {
632       return reactor_.load(std::memory_order_relaxed);
633     }
634
635     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
636         meta_ops_;
637     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
638     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
639                                 ::grpc::internal::CallOpSendMessage,
640                                 ::grpc::internal::CallOpServerSendStatus>
641         finish_ops_;
642     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
643     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
644                                 ::grpc::internal::CallOpSendMessage>
645         write_ops_;
646     ::grpc::internal::CallbackWithSuccessTag write_tag_;
647
648     ::grpc_impl::CallbackServerContext* const ctx_;
649     ::grpc::internal::Call call_;
650     const RequestType* req_;
651     std::function<void()> call_requester_;
652     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
653     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
654     // callbacks_outstanding_ follows a refcount pattern
655     std::atomic<intptr_t> callbacks_outstanding_{
656         3};  // reserve for OnStarted, Finish, and CompletionOp
657   };
658 };
659
660 template <class RequestType, class ResponseType>
661 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
662  public:
663   explicit CallbackBidiHandler(
664       std::function<ServerBidiReactor<RequestType, ResponseType>*(
665           ::grpc_impl::CallbackServerContext*)>
666           get_reactor)
667       : get_reactor_(std::move(get_reactor)) {}
668   void RunHandler(const HandlerParameter& param) final {
669     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
670
671     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
672         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
673         ServerCallbackReaderWriterImpl(
674             static_cast<::grpc_impl::CallbackServerContext*>(
675                 param.server_context),
676             param.call, std::move(param.call_requester));
677     // Inlineable OnDone can be false in the CompletionOp callback because there
678     // is no bidi reactor that has an inlineable OnDone; this only applies to
679     // the DefaultReactor (which is unary).
680     param.server_context->BeginCompletionOp(
681         param.call,
682         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
683         stream);
684
685     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
686     if (param.status.ok()) {
687       reactor = ::grpc::internal::CatchingReactorGetter<
688           ServerBidiReactor<RequestType, ResponseType>>(
689           get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
690                             param.server_context));
691     }
692
693     if (reactor == nullptr) {
694       // if deserialization or reactor creator failed, we need to fail the call
695       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
696           param.call->call(),
697           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
698           UnimplementedBidiReactor<RequestType, ResponseType>(
699               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
700     }
701
702     stream->SetupReactor(reactor);
703   }
704
705  private:
706   std::function<ServerBidiReactor<RequestType, ResponseType>*(
707       ::grpc_impl::CallbackServerContext*)>
708       get_reactor_;
709
710   class ServerCallbackReaderWriterImpl
711       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
712    public:
713     void Finish(::grpc::Status s) override {
714       // A finish tag with only MaybeDone can have its callback inlined
715       // regardless even if OnDone is not inlineable because this callback just
716       // checks a ref and then decides whether or not to dispatch OnDone.
717       finish_tag_.Set(call_.call(),
718                       [this](bool) {
719                         // Inlineable OnDone can be false here because there is
720                         // no bidi reactor that has an inlineable OnDone; this
721                         // only applies to the DefaultReactor (which is unary).
722                         this->MaybeDone(/*inlineable_ondone=*/false);
723                       },
724                       &finish_ops_, /*can_inline=*/true);
725       finish_ops_.set_core_cq_tag(&finish_tag_);
726
727       if (!ctx_->sent_initial_metadata_) {
728         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
729                                         ctx_->initial_metadata_flags());
730         if (ctx_->compression_level_set()) {
731           finish_ops_.set_compression_level(ctx_->compression_level());
732         }
733         ctx_->sent_initial_metadata_ = true;
734       }
735       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
736       call_.PerformOps(&finish_ops_);
737     }
738
739     void SendInitialMetadata() override {
740       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
741       this->Ref();
742       // The callback for this function should not be inlined because it invokes
743       // a user-controlled reaction, but any resulting OnDone can be inlined in
744       // the executor to which this callback is dispatched.
745       meta_tag_.Set(call_.call(),
746                     [this](bool ok) {
747                       ServerBidiReactor<RequestType, ResponseType>* reactor =
748                           reactor_.load(std::memory_order_relaxed);
749                       reactor->OnSendInitialMetadataDone(ok);
750                       this->MaybeDone(/*inlineable_ondone=*/true);
751                     },
752                     &meta_ops_, /*can_inline=*/false);
753       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
754                                     ctx_->initial_metadata_flags());
755       if (ctx_->compression_level_set()) {
756         meta_ops_.set_compression_level(ctx_->compression_level());
757       }
758       ctx_->sent_initial_metadata_ = true;
759       meta_ops_.set_core_cq_tag(&meta_tag_);
760       call_.PerformOps(&meta_ops_);
761     }
762
763     void Write(const ResponseType* resp,
764                ::grpc::WriteOptions options) override {
765       this->Ref();
766       if (options.is_last_message()) {
767         options.set_buffer_hint();
768       }
769       if (!ctx_->sent_initial_metadata_) {
770         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
771                                        ctx_->initial_metadata_flags());
772         if (ctx_->compression_level_set()) {
773           write_ops_.set_compression_level(ctx_->compression_level());
774         }
775         ctx_->sent_initial_metadata_ = true;
776       }
777       // TODO(vjpai): don't assert
778       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
779       call_.PerformOps(&write_ops_);
780     }
781
782     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
783                         ::grpc::Status s) override {
784       // TODO(vjpai): don't assert
785       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
786       Finish(std::move(s));
787     }
788
789     void Read(RequestType* req) override {
790       this->Ref();
791       read_ops_.RecvMessage(req);
792       call_.PerformOps(&read_ops_);
793     }
794
795    private:
796     friend class CallbackBidiHandler<RequestType, ResponseType>;
797
798     ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
799                                    ::grpc::internal::Call* call,
800                                    std::function<void()> call_requester)
801         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
802
803     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
804       reactor_.store(reactor, std::memory_order_relaxed);
805       // The callbacks for these functions should not be inlined because they
806       // invoke user-controlled reactions, but any resulting OnDones can be
807       // inlined in the executor to which a callback is dispatched.
808       write_tag_.Set(call_.call(),
809                      [this, reactor](bool ok) {
810                        reactor->OnWriteDone(ok);
811                        this->MaybeDone(/*inlineable_ondone=*/true);
812                      },
813                      &write_ops_, /*can_inline=*/false);
814       write_ops_.set_core_cq_tag(&write_tag_);
815       read_tag_.Set(call_.call(),
816                     [this, reactor](bool ok) {
817                       reactor->OnReadDone(ok);
818                       this->MaybeDone(/*inlineable_ondone=*/true);
819                     },
820                     &read_ops_, /*can_inline=*/false);
821       read_ops_.set_core_cq_tag(&read_tag_);
822       this->BindReactor(reactor);
823       this->MaybeCallOnCancel(reactor);
824       // Inlineable OnDone can be false here because there is no bidi
825       // reactor that has an inlineable OnDone; this only applies to the
826       // DefaultReactor (which is unary).
827       this->MaybeDone(/*inlineable_ondone=*/false);
828     }
829
830     void CallOnDone() override {
831       reactor_.load(std::memory_order_relaxed)->OnDone();
832       grpc_call* call = call_.call();
833       auto call_requester = std::move(call_requester_);
834       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
835       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
836       call_requester();
837     }
838
839     ServerReactor* reactor() override {
840       return reactor_.load(std::memory_order_relaxed);
841     }
842
843     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
844         meta_ops_;
845     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
846     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
847                                 ::grpc::internal::CallOpSendMessage,
848                                 ::grpc::internal::CallOpServerSendStatus>
849         finish_ops_;
850     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
851     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
852                                 ::grpc::internal::CallOpSendMessage>
853         write_ops_;
854     ::grpc::internal::CallbackWithSuccessTag write_tag_;
855     ::grpc::internal::CallOpSet<
856         ::grpc::internal::CallOpRecvMessage<RequestType>>
857         read_ops_;
858     ::grpc::internal::CallbackWithSuccessTag read_tag_;
859
860     ::grpc_impl::CallbackServerContext* const ctx_;
861     ::grpc::internal::Call call_;
862     std::function<void()> call_requester_;
863     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
864     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
865     // callbacks_outstanding_ follows a refcount pattern
866     std::atomic<intptr_t> callbacks_outstanding_{
867         3};  // reserve for OnStarted, Finish, and CompletionOp
868   };
869 };
870
871 }  // namespace internal
872 }  // namespace grpc_impl
873
874 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H