Imported Upstream version 1.28.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_handlers.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback_impl.h>
24 #include <grpcpp/impl/codegen/server_context_impl.h>
25 #include <grpcpp/impl/codegen/status.h>
26
27 namespace grpc_impl {
28 namespace internal {
29
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
33   explicit CallbackUnaryHandler(
34       std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
35                                         const RequestType*, ResponseType*)>
36           get_reactor)
37       : get_reactor_(std::move(get_reactor)) {}
38
39   void SetMessageAllocator(
40       ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
41           allocator) {
42     allocator_ = allocator;
43   }
44
45   void RunHandler(const HandlerParameter& param) final {
46     // Arena allocate a controller structure (that includes request/response)
47     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48     auto* allocator_state = static_cast<
49         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
50         param.internal_data);
51
52     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54         ServerCallbackUnaryImpl(
55             static_cast<::grpc_impl::CallbackServerContext*>(
56                 param.server_context),
57             param.call, allocator_state, std::move(param.call_requester));
58     param.server_context->BeginCompletionOp(
59         param.call, [call](bool) { call->MaybeDone(); }, call);
60
61     ServerUnaryReactor* reactor = nullptr;
62     if (param.status.ok()) {
63       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64           get_reactor_,
65           static_cast<::grpc_impl::CallbackServerContext*>(
66               param.server_context),
67           call->request(), call->response());
68     }
69
70     if (reactor == nullptr) {
71       // if deserialization or reactor creator failed, we need to fail the call
72       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
73           param.call->call(), sizeof(UnimplementedUnaryReactor)))
74           UnimplementedUnaryReactor(
75               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
76     }
77
78     /// Invoke SetupReactor as the last part of the handler
79     call->SetupReactor(reactor);
80   }
81
82   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
83                     ::grpc::Status* status, void** handler_data) final {
84     ::grpc::ByteBuffer buf;
85     buf.set_buffer(req);
86     RequestType* request = nullptr;
87     ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
88         allocator_state = nullptr;
89     if (allocator_ != nullptr) {
90       allocator_state = allocator_->AllocateMessages();
91     } else {
92       allocator_state =
93           new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
94               call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
95               DefaultMessageHolder<RequestType, ResponseType>();
96     }
97     *handler_data = allocator_state;
98     request = allocator_state->request();
99     *status =
100         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
101     buf.Release();
102     if (status->ok()) {
103       return request;
104     }
105     // Clean up on deserialization failure.
106     allocator_state->Release();
107     return nullptr;
108   }
109
110  private:
111   std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*,
112                                     const RequestType*, ResponseType*)>
113       get_reactor_;
114   ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
115       allocator_ = nullptr;
116
117   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
118    public:
119     void Finish(::grpc::Status s) override {
120       // A callback that only contains a call to MaybeDone can be run as an
121       // inline callback regardless of whether or not OnDone is inlineable
122       // because if the actual OnDone callback needs to be scheduled, MaybeDone
123       // is responsible for dispatching to an executor thread if needed. Thus,
124       // when setting up the finish_tag_, we can set its own callback to
125       // inlineable.
126       finish_tag_.Set(
127           call_.call(),
128           [this](bool) {
129             this->MaybeDone(
130                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131           },
132           &finish_ops_, /*can_inline=*/true);
133       finish_ops_.set_core_cq_tag(&finish_tag_);
134
135       if (!ctx_->sent_initial_metadata_) {
136         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
137                                         ctx_->initial_metadata_flags());
138         if (ctx_->compression_level_set()) {
139           finish_ops_.set_compression_level(ctx_->compression_level());
140         }
141         ctx_->sent_initial_metadata_ = true;
142       }
143       // The response is dropped if the status is not OK.
144       if (s.ok()) {
145         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146                                      finish_ops_.SendMessagePtr(response()));
147       } else {
148         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
149       }
150       finish_ops_.set_core_cq_tag(&finish_tag_);
151       call_.PerformOps(&finish_ops_);
152     }
153
154     void SendInitialMetadata() override {
155       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
156       this->Ref();
157       // The callback for this function should not be marked inline because it
158       // is directly invoking a user-controlled reaction
159       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
160       // thread. However, any OnDone needed after that can be inlined because it
161       // is already running on an executor thread.
162       meta_tag_.Set(call_.call(),
163                     [this](bool ok) {
164                       ServerUnaryReactor* reactor =
165                           reactor_.load(std::memory_order_relaxed);
166                       reactor->OnSendInitialMetadataDone(ok);
167                       this->MaybeDone(/*inlineable_ondone=*/true);
168                     },
169                     &meta_ops_, /*can_inline=*/false);
170       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171                                     ctx_->initial_metadata_flags());
172       if (ctx_->compression_level_set()) {
173         meta_ops_.set_compression_level(ctx_->compression_level());
174       }
175       ctx_->sent_initial_metadata_ = true;
176       meta_ops_.set_core_cq_tag(&meta_tag_);
177       call_.PerformOps(&meta_ops_);
178     }
179
180    private:
181     friend class CallbackUnaryHandler<RequestType, ResponseType>;
182
183     ServerCallbackUnaryImpl(
184         ::grpc_impl::CallbackServerContext* ctx, ::grpc::internal::Call* call,
185         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
186             allocator_state,
187         std::function<void()> call_requester)
188         : ctx_(ctx),
189           call_(*call),
190           allocator_state_(allocator_state),
191           call_requester_(std::move(call_requester)) {
192       ctx_->set_message_allocator_state(allocator_state);
193     }
194
195     /// SetupReactor binds the reactor (which also releases any queued
196     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
197     /// the completion of the RPC. This should be the last component of the
198     /// handler.
199     void SetupReactor(ServerUnaryReactor* reactor) {
200       reactor_.store(reactor, std::memory_order_relaxed);
201       this->BindReactor(reactor);
202       this->MaybeCallOnCancel(reactor);
203       this->MaybeDone(reactor->InternalInlineable());
204     }
205
206     const RequestType* request() { return allocator_state_->request(); }
207     ResponseType* response() { return allocator_state_->response(); }
208
209     void CallOnDone() override {
210       reactor_.load(std::memory_order_relaxed)->OnDone();
211       grpc_call* call = call_.call();
212       auto call_requester = std::move(call_requester_);
213       allocator_state_->Release();
214       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
215       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
216       call_requester();
217     }
218
219     ServerReactor* reactor() override {
220       return reactor_.load(std::memory_order_relaxed);
221     }
222
223     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
224         meta_ops_;
225     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
226     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
227                                 ::grpc::internal::CallOpSendMessage,
228                                 ::grpc::internal::CallOpServerSendStatus>
229         finish_ops_;
230     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
231
232     ::grpc_impl::CallbackServerContext* const ctx_;
233     ::grpc::internal::Call call_;
234     ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
235         allocator_state_;
236     std::function<void()> call_requester_;
237     // reactor_ can always be loaded/stored with relaxed memory ordering because
238     // its value is only set once, independently of other data in the object,
239     // and the loads that use it will always actually come provably later even
240     // though they are from different threads since they are triggered by
241     // actions initiated only by the setting up of the reactor_ variable. In
242     // a sense, it's a delayed "const": it gets its value from the SetupReactor
243     // method (not the constructor, so it's not a true const), but it doesn't
244     // change after that and it only gets used by actions caused, directly or
245     // indirectly, by that setup. This comment also applies to the reactor_
246     // variables of the other streaming objects in this file.
247     std::atomic<ServerUnaryReactor*> reactor_;
248     // callbacks_outstanding_ follows a refcount pattern
249     std::atomic<intptr_t> callbacks_outstanding_{
250         3};  // reserve for start, Finish, and CompletionOp
251   };
252 };
253
254 template <class RequestType, class ResponseType>
255 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
256  public:
257   explicit CallbackClientStreamingHandler(
258       std::function<ServerReadReactor<RequestType>*(
259           ::grpc_impl::CallbackServerContext*, ResponseType*)>
260           get_reactor)
261       : get_reactor_(std::move(get_reactor)) {}
262   void RunHandler(const HandlerParameter& param) final {
263     // Arena allocate a reader structure (that includes response)
264     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
265
266     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
267         param.call->call(), sizeof(ServerCallbackReaderImpl)))
268         ServerCallbackReaderImpl(
269             static_cast<::grpc_impl::CallbackServerContext*>(
270                 param.server_context),
271             param.call, std::move(param.call_requester));
272     // Inlineable OnDone can be false in the CompletionOp callback because there
273     // is no read reactor that has an inlineable OnDone; this only applies to
274     // the DefaultReactor (which is unary).
275     param.server_context->BeginCompletionOp(
276         param.call,
277         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
278         reader);
279
280     ServerReadReactor<RequestType>* reactor = nullptr;
281     if (param.status.ok()) {
282       reactor = ::grpc::internal::CatchingReactorGetter<
283           ServerReadReactor<RequestType>>(
284           get_reactor_,
285           static_cast<::grpc_impl::CallbackServerContext*>(
286               param.server_context),
287           reader->response());
288     }
289
290     if (reactor == nullptr) {
291       // if deserialization or reactor creator failed, we need to fail the call
292       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
293           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294           UnimplementedReadReactor<RequestType>(
295               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
296     }
297
298     reader->SetupReactor(reactor);
299   }
300
301  private:
302   std::function<ServerReadReactor<RequestType>*(
303       ::grpc_impl::CallbackServerContext*, ResponseType*)>
304       get_reactor_;
305
306   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307    public:
308     void Finish(::grpc::Status s) override {
309       // A finish tag with only MaybeDone can have its callback inlined
310       // regardless even if OnDone is not inlineable because this callback just
311       // checks a ref and then decides whether or not to dispatch OnDone.
312       finish_tag_.Set(call_.call(),
313                       [this](bool) {
314                         // Inlineable OnDone can be false here because there is
315                         // no read reactor that has an inlineable OnDone; this
316                         // only applies to the DefaultReactor (which is unary).
317                         this->MaybeDone(/*inlineable_ondone=*/false);
318                       },
319                       &finish_ops_, /*can_inline=*/true);
320       if (!ctx_->sent_initial_metadata_) {
321         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322                                         ctx_->initial_metadata_flags());
323         if (ctx_->compression_level_set()) {
324           finish_ops_.set_compression_level(ctx_->compression_level());
325         }
326         ctx_->sent_initial_metadata_ = true;
327       }
328       // The response is dropped if the status is not OK.
329       if (s.ok()) {
330         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331                                      finish_ops_.SendMessagePtr(&resp_));
332       } else {
333         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334       }
335       finish_ops_.set_core_cq_tag(&finish_tag_);
336       call_.PerformOps(&finish_ops_);
337     }
338
339     void SendInitialMetadata() override {
340       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
341       this->Ref();
342       // The callback for this function should not be inlined because it invokes
343       // a user-controlled reaction, but any resulting OnDone can be inlined in
344       // the executor to which this callback is dispatched.
345       meta_tag_.Set(call_.call(),
346                     [this](bool ok) {
347                       ServerReadReactor<RequestType>* reactor =
348                           reactor_.load(std::memory_order_relaxed);
349                       reactor->OnSendInitialMetadataDone(ok);
350                       this->MaybeDone(/*inlineable_ondone=*/true);
351                     },
352                     &meta_ops_, /*can_inline=*/false);
353       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354                                     ctx_->initial_metadata_flags());
355       if (ctx_->compression_level_set()) {
356         meta_ops_.set_compression_level(ctx_->compression_level());
357       }
358       ctx_->sent_initial_metadata_ = true;
359       meta_ops_.set_core_cq_tag(&meta_tag_);
360       call_.PerformOps(&meta_ops_);
361     }
362
363     void Read(RequestType* req) override {
364       this->Ref();
365       read_ops_.RecvMessage(req);
366       call_.PerformOps(&read_ops_);
367     }
368
369    private:
370     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371
372     ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
373                              ::grpc::internal::Call* call,
374                              std::function<void()> call_requester)
375         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376
377     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378       reactor_.store(reactor, std::memory_order_relaxed);
379       // The callback for this function should not be inlined because it invokes
380       // a user-controlled reaction, but any resulting OnDone can be inlined in
381       // the executor to which this callback is dispatched.
382       read_tag_.Set(call_.call(),
383                     [this, reactor](bool ok) {
384                       reactor->OnReadDone(ok);
385                       this->MaybeDone(/*inlineable_ondone=*/true);
386                     },
387                     &read_ops_, /*can_inline=*/false);
388       read_ops_.set_core_cq_tag(&read_tag_);
389       this->BindReactor(reactor);
390       this->MaybeCallOnCancel(reactor);
391       // Inlineable OnDone can be false here because there is no read
392       // reactor that has an inlineable OnDone; this only applies to the
393       // DefaultReactor (which is unary).
394       this->MaybeDone(/*inlineable_ondone=*/false);
395     }
396
397     ~ServerCallbackReaderImpl() {}
398
399     ResponseType* response() { return &resp_; }
400
401     void CallOnDone() override {
402       reactor_.load(std::memory_order_relaxed)->OnDone();
403       grpc_call* call = call_.call();
404       auto call_requester = std::move(call_requester_);
405       this->~ServerCallbackReaderImpl();  // explicitly call destructor
406       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
407       call_requester();
408     }
409
410     ServerReactor* reactor() override {
411       return reactor_.load(std::memory_order_relaxed);
412     }
413
414     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
415         meta_ops_;
416     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
417     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
418                                 ::grpc::internal::CallOpSendMessage,
419                                 ::grpc::internal::CallOpServerSendStatus>
420         finish_ops_;
421     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
422     ::grpc::internal::CallOpSet<
423         ::grpc::internal::CallOpRecvMessage<RequestType>>
424         read_ops_;
425     ::grpc::internal::CallbackWithSuccessTag read_tag_;
426
427     ::grpc_impl::CallbackServerContext* const ctx_;
428     ::grpc::internal::Call call_;
429     ResponseType resp_;
430     std::function<void()> call_requester_;
431     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432     std::atomic<ServerReadReactor<RequestType>*> reactor_;
433     // callbacks_outstanding_ follows a refcount pattern
434     std::atomic<intptr_t> callbacks_outstanding_{
435         3};  // reserve for OnStarted, Finish, and CompletionOp
436   };
437 };
438
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
441  public:
442   explicit CallbackServerStreamingHandler(
443       std::function<ServerWriteReactor<ResponseType>*(
444           ::grpc_impl::CallbackServerContext*, const RequestType*)>
445           get_reactor)
446       : get_reactor_(std::move(get_reactor)) {}
447   void RunHandler(const HandlerParameter& param) final {
448     // Arena allocate a writer structure
449     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
450
451     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
452         param.call->call(), sizeof(ServerCallbackWriterImpl)))
453         ServerCallbackWriterImpl(
454             static_cast<::grpc_impl::CallbackServerContext*>(
455                 param.server_context),
456             param.call, static_cast<RequestType*>(param.request),
457             std::move(param.call_requester));
458     // Inlineable OnDone can be false in the CompletionOp callback because there
459     // is no write reactor that has an inlineable OnDone; this only applies to
460     // the DefaultReactor (which is unary).
461     param.server_context->BeginCompletionOp(
462         param.call,
463         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
464         writer);
465
466     ServerWriteReactor<ResponseType>* reactor = nullptr;
467     if (param.status.ok()) {
468       reactor = ::grpc::internal::CatchingReactorGetter<
469           ServerWriteReactor<ResponseType>>(
470           get_reactor_,
471           static_cast<::grpc_impl::CallbackServerContext*>(
472               param.server_context),
473           writer->request());
474     }
475     if (reactor == nullptr) {
476       // if deserialization or reactor creator failed, we need to fail the call
477       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
478           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
479           UnimplementedWriteReactor<ResponseType>(
480               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
481     }
482
483     writer->SetupReactor(reactor);
484   }
485
486   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
487                     ::grpc::Status* status, void** /*handler_data*/) final {
488     ::grpc::ByteBuffer buf;
489     buf.set_buffer(req);
490     auto* request =
491         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
492             call, sizeof(RequestType))) RequestType();
493     *status =
494         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
495     buf.Release();
496     if (status->ok()) {
497       return request;
498     }
499     request->~RequestType();
500     return nullptr;
501   }
502
503  private:
504   std::function<ServerWriteReactor<ResponseType>*(
505       ::grpc_impl::CallbackServerContext*, const RequestType*)>
506       get_reactor_;
507
508   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
509    public:
510     void Finish(::grpc::Status s) override {
511       // A finish tag with only MaybeDone can have its callback inlined
512       // regardless even if OnDone is not inlineable because this callback just
513       // checks a ref and then decides whether or not to dispatch OnDone.
514       finish_tag_.Set(call_.call(),
515                       [this](bool) {
516                         // Inlineable OnDone can be false here because there is
517                         // no write reactor that has an inlineable OnDone; this
518                         // only applies to the DefaultReactor (which is unary).
519                         this->MaybeDone(/*inlineable_ondone=*/false);
520                       },
521                       &finish_ops_, /*can_inline=*/true);
522       finish_ops_.set_core_cq_tag(&finish_tag_);
523
524       if (!ctx_->sent_initial_metadata_) {
525         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526                                         ctx_->initial_metadata_flags());
527         if (ctx_->compression_level_set()) {
528           finish_ops_.set_compression_level(ctx_->compression_level());
529         }
530         ctx_->sent_initial_metadata_ = true;
531       }
532       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533       call_.PerformOps(&finish_ops_);
534     }
535
536     void SendInitialMetadata() override {
537       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
538       this->Ref();
539       // The callback for this function should not be inlined because it invokes
540       // a user-controlled reaction, but any resulting OnDone can be inlined in
541       // the executor to which this callback is dispatched.
542       meta_tag_.Set(call_.call(),
543                     [this](bool ok) {
544                       ServerWriteReactor<ResponseType>* reactor =
545                           reactor_.load(std::memory_order_relaxed);
546                       reactor->OnSendInitialMetadataDone(ok);
547                       this->MaybeDone(/*inlineable_ondone=*/true);
548                     },
549                     &meta_ops_, /*can_inline=*/false);
550       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551                                     ctx_->initial_metadata_flags());
552       if (ctx_->compression_level_set()) {
553         meta_ops_.set_compression_level(ctx_->compression_level());
554       }
555       ctx_->sent_initial_metadata_ = true;
556       meta_ops_.set_core_cq_tag(&meta_tag_);
557       call_.PerformOps(&meta_ops_);
558     }
559
560     void Write(const ResponseType* resp,
561                ::grpc::WriteOptions options) override {
562       this->Ref();
563       if (options.is_last_message()) {
564         options.set_buffer_hint();
565       }
566       if (!ctx_->sent_initial_metadata_) {
567         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568                                        ctx_->initial_metadata_flags());
569         if (ctx_->compression_level_set()) {
570           write_ops_.set_compression_level(ctx_->compression_level());
571         }
572         ctx_->sent_initial_metadata_ = true;
573       }
574       // TODO(vjpai): don't assert
575       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576       call_.PerformOps(&write_ops_);
577     }
578
579     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580                         ::grpc::Status s) override {
581       // This combines the write into the finish callback
582       // Don't send any message if the status is bad
583       if (s.ok()) {
584         // TODO(vjpai): don't assert
585         GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
586       }
587       Finish(std::move(s));
588     }
589
590    private:
591     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
592
593     ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
594                              ::grpc::internal::Call* call,
595                              const RequestType* req,
596                              std::function<void()> call_requester)
597         : ctx_(ctx),
598           call_(*call),
599           req_(req),
600           call_requester_(std::move(call_requester)) {}
601
602     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
603       reactor_.store(reactor, std::memory_order_relaxed);
604       // The callback for this function should not be inlined because it invokes
605       // a user-controlled reaction, but any resulting OnDone can be inlined in
606       // the executor to which this callback is dispatched.
607       write_tag_.Set(call_.call(),
608                      [this, reactor](bool ok) {
609                        reactor->OnWriteDone(ok);
610                        this->MaybeDone(/*inlineable_ondone=*/true);
611                      },
612                      &write_ops_, /*can_inline=*/false);
613       write_ops_.set_core_cq_tag(&write_tag_);
614       this->BindReactor(reactor);
615       this->MaybeCallOnCancel(reactor);
616       // Inlineable OnDone can be false here because there is no write
617       // reactor that has an inlineable OnDone; this only applies to the
618       // DefaultReactor (which is unary).
619       this->MaybeDone(/*inlineable_ondone=*/false);
620     }
621     ~ServerCallbackWriterImpl() { req_->~RequestType(); }
622
623     const RequestType* request() { return req_; }
624
625     void CallOnDone() override {
626       reactor_.load(std::memory_order_relaxed)->OnDone();
627       grpc_call* call = call_.call();
628       auto call_requester = std::move(call_requester_);
629       this->~ServerCallbackWriterImpl();  // explicitly call destructor
630       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
631       call_requester();
632     }
633
634     ServerReactor* reactor() override {
635       return reactor_.load(std::memory_order_relaxed);
636     }
637
638     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
639         meta_ops_;
640     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
641     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
642                                 ::grpc::internal::CallOpSendMessage,
643                                 ::grpc::internal::CallOpServerSendStatus>
644         finish_ops_;
645     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
646     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
647                                 ::grpc::internal::CallOpSendMessage>
648         write_ops_;
649     ::grpc::internal::CallbackWithSuccessTag write_tag_;
650
651     ::grpc_impl::CallbackServerContext* const ctx_;
652     ::grpc::internal::Call call_;
653     const RequestType* req_;
654     std::function<void()> call_requester_;
655     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
656     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
657     // callbacks_outstanding_ follows a refcount pattern
658     std::atomic<intptr_t> callbacks_outstanding_{
659         3};  // reserve for OnStarted, Finish, and CompletionOp
660   };
661 };
662
663 template <class RequestType, class ResponseType>
664 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
665  public:
666   explicit CallbackBidiHandler(
667       std::function<ServerBidiReactor<RequestType, ResponseType>*(
668           ::grpc_impl::CallbackServerContext*)>
669           get_reactor)
670       : get_reactor_(std::move(get_reactor)) {}
671   void RunHandler(const HandlerParameter& param) final {
672     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
673
674     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
675         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
676         ServerCallbackReaderWriterImpl(
677             static_cast<::grpc_impl::CallbackServerContext*>(
678                 param.server_context),
679             param.call, std::move(param.call_requester));
680     // Inlineable OnDone can be false in the CompletionOp callback because there
681     // is no bidi reactor that has an inlineable OnDone; this only applies to
682     // the DefaultReactor (which is unary).
683     param.server_context->BeginCompletionOp(
684         param.call,
685         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
686         stream);
687
688     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
689     if (param.status.ok()) {
690       reactor = ::grpc::internal::CatchingReactorGetter<
691           ServerBidiReactor<RequestType, ResponseType>>(
692           get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
693                             param.server_context));
694     }
695
696     if (reactor == nullptr) {
697       // if deserialization or reactor creator failed, we need to fail the call
698       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
699           param.call->call(),
700           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
701           UnimplementedBidiReactor<RequestType, ResponseType>(
702               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
703     }
704
705     stream->SetupReactor(reactor);
706   }
707
708  private:
709   std::function<ServerBidiReactor<RequestType, ResponseType>*(
710       ::grpc_impl::CallbackServerContext*)>
711       get_reactor_;
712
713   class ServerCallbackReaderWriterImpl
714       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
715    public:
716     void Finish(::grpc::Status s) override {
717       // A finish tag with only MaybeDone can have its callback inlined
718       // regardless even if OnDone is not inlineable because this callback just
719       // checks a ref and then decides whether or not to dispatch OnDone.
720       finish_tag_.Set(call_.call(),
721                       [this](bool) {
722                         // Inlineable OnDone can be false here because there is
723                         // no bidi reactor that has an inlineable OnDone; this
724                         // only applies to the DefaultReactor (which is unary).
725                         this->MaybeDone(/*inlineable_ondone=*/false);
726                       },
727                       &finish_ops_, /*can_inline=*/true);
728       finish_ops_.set_core_cq_tag(&finish_tag_);
729
730       if (!ctx_->sent_initial_metadata_) {
731         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
732                                         ctx_->initial_metadata_flags());
733         if (ctx_->compression_level_set()) {
734           finish_ops_.set_compression_level(ctx_->compression_level());
735         }
736         ctx_->sent_initial_metadata_ = true;
737       }
738       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
739       call_.PerformOps(&finish_ops_);
740     }
741
742     void SendInitialMetadata() override {
743       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
744       this->Ref();
745       // The callback for this function should not be inlined because it invokes
746       // a user-controlled reaction, but any resulting OnDone can be inlined in
747       // the executor to which this callback is dispatched.
748       meta_tag_.Set(call_.call(),
749                     [this](bool ok) {
750                       ServerBidiReactor<RequestType, ResponseType>* reactor =
751                           reactor_.load(std::memory_order_relaxed);
752                       reactor->OnSendInitialMetadataDone(ok);
753                       this->MaybeDone(/*inlineable_ondone=*/true);
754                     },
755                     &meta_ops_, /*can_inline=*/false);
756       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
757                                     ctx_->initial_metadata_flags());
758       if (ctx_->compression_level_set()) {
759         meta_ops_.set_compression_level(ctx_->compression_level());
760       }
761       ctx_->sent_initial_metadata_ = true;
762       meta_ops_.set_core_cq_tag(&meta_tag_);
763       call_.PerformOps(&meta_ops_);
764     }
765
766     void Write(const ResponseType* resp,
767                ::grpc::WriteOptions options) override {
768       this->Ref();
769       if (options.is_last_message()) {
770         options.set_buffer_hint();
771       }
772       if (!ctx_->sent_initial_metadata_) {
773         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774                                        ctx_->initial_metadata_flags());
775         if (ctx_->compression_level_set()) {
776           write_ops_.set_compression_level(ctx_->compression_level());
777         }
778         ctx_->sent_initial_metadata_ = true;
779       }
780       // TODO(vjpai): don't assert
781       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
782       call_.PerformOps(&write_ops_);
783     }
784
785     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
786                         ::grpc::Status s) override {
787       // Don't send any message if the status is bad
788       if (s.ok()) {
789         // TODO(vjpai): don't assert
790         GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
791       }
792       Finish(std::move(s));
793     }
794
795     void Read(RequestType* req) override {
796       this->Ref();
797       read_ops_.RecvMessage(req);
798       call_.PerformOps(&read_ops_);
799     }
800
801    private:
802     friend class CallbackBidiHandler<RequestType, ResponseType>;
803
804     ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
805                                    ::grpc::internal::Call* call,
806                                    std::function<void()> call_requester)
807         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
808
809     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
810       reactor_.store(reactor, std::memory_order_relaxed);
811       // The callbacks for these functions should not be inlined because they
812       // invoke user-controlled reactions, but any resulting OnDones can be
813       // inlined in the executor to which a callback is dispatched.
814       write_tag_.Set(call_.call(),
815                      [this, reactor](bool ok) {
816                        reactor->OnWriteDone(ok);
817                        this->MaybeDone(/*inlineable_ondone=*/true);
818                      },
819                      &write_ops_, /*can_inline=*/false);
820       write_ops_.set_core_cq_tag(&write_tag_);
821       read_tag_.Set(call_.call(),
822                     [this, reactor](bool ok) {
823                       reactor->OnReadDone(ok);
824                       this->MaybeDone(/*inlineable_ondone=*/true);
825                     },
826                     &read_ops_, /*can_inline=*/false);
827       read_ops_.set_core_cq_tag(&read_tag_);
828       this->BindReactor(reactor);
829       this->MaybeCallOnCancel(reactor);
830       // Inlineable OnDone can be false here because there is no bidi
831       // reactor that has an inlineable OnDone; this only applies to the
832       // DefaultReactor (which is unary).
833       this->MaybeDone(/*inlineable_ondone=*/false);
834     }
835
836     void CallOnDone() override {
837       reactor_.load(std::memory_order_relaxed)->OnDone();
838       grpc_call* call = call_.call();
839       auto call_requester = std::move(call_requester_);
840       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
841       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
842       call_requester();
843     }
844
845     ServerReactor* reactor() override {
846       return reactor_.load(std::memory_order_relaxed);
847     }
848
849     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
850         meta_ops_;
851     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
852     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
853                                 ::grpc::internal::CallOpSendMessage,
854                                 ::grpc::internal::CallOpServerSendStatus>
855         finish_ops_;
856     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
857     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
858                                 ::grpc::internal::CallOpSendMessage>
859         write_ops_;
860     ::grpc::internal::CallbackWithSuccessTag write_tag_;
861     ::grpc::internal::CallOpSet<
862         ::grpc::internal::CallOpRecvMessage<RequestType>>
863         read_ops_;
864     ::grpc::internal::CallbackWithSuccessTag read_tag_;
865
866     ::grpc_impl::CallbackServerContext* const ctx_;
867     ::grpc::internal::Call call_;
868     std::function<void()> call_requester_;
869     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
870     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
871     // callbacks_outstanding_ follows a refcount pattern
872     std::atomic<intptr_t> callbacks_outstanding_{
873         3};  // reserve for OnStarted, Finish, and CompletionOp
874   };
875 };
876
877 }  // namespace internal
878 }  // namespace grpc_impl
879
880 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H