6c66c42e747a544afb8ce240a3e7f7f73066162f
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_handlers.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
26
27 namespace grpc {
28 namespace internal {
29
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
33   explicit CallbackUnaryHandler(
34       std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35                                         const RequestType*, ResponseType*)>
36           get_reactor)
37       : get_reactor_(std::move(get_reactor)) {}
38
39   void SetMessageAllocator(
40       MessageAllocator<RequestType, ResponseType>* allocator) {
41     allocator_ = allocator;
42   }
43
44   void RunHandler(const HandlerParameter& param) final {
45     // Arena allocate a controller structure (that includes request/response)
46     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
47     auto* allocator_state =
48         static_cast<MessageHolder<RequestType, ResponseType>*>(
49             param.internal_data);
50
51     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
52         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
53         ServerCallbackUnaryImpl(
54             static_cast<::grpc::CallbackServerContext*>(param.server_context),
55             param.call, allocator_state, param.call_requester);
56     param.server_context->BeginCompletionOp(
57         param.call, [call](bool) { call->MaybeDone(); }, call);
58
59     ServerUnaryReactor* reactor = nullptr;
60     if (param.status.ok()) {
61       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
62           get_reactor_,
63           static_cast<::grpc::CallbackServerContext*>(param.server_context),
64           call->request(), call->response());
65     }
66
67     if (reactor == nullptr) {
68       // if deserialization or reactor creator failed, we need to fail the call
69       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
70           param.call->call(), sizeof(UnimplementedUnaryReactor)))
71           UnimplementedUnaryReactor(
72               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
73     }
74
75     /// Invoke SetupReactor as the last part of the handler
76     call->SetupReactor(reactor);
77   }
78
79   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
80                     ::grpc::Status* status, void** handler_data) final {
81     ::grpc::ByteBuffer buf;
82     buf.set_buffer(req);
83     RequestType* request = nullptr;
84     MessageHolder<RequestType, ResponseType>* allocator_state = nullptr;
85     if (allocator_ != nullptr) {
86       allocator_state = allocator_->AllocateMessages();
87     } else {
88       allocator_state =
89           new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
90               call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
91               DefaultMessageHolder<RequestType, ResponseType>();
92     }
93     *handler_data = allocator_state;
94     request = allocator_state->request();
95     *status =
96         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
97     buf.Release();
98     if (status->ok()) {
99       return request;
100     }
101     // Clean up on deserialization failure.
102     allocator_state->Release();
103     return nullptr;
104   }
105
106  private:
107   std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
108                                     const RequestType*, ResponseType*)>
109       get_reactor_;
110   MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
111
112   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
113    public:
114     void Finish(::grpc::Status s) override {
115       // A callback that only contains a call to MaybeDone can be run as an
116       // inline callback regardless of whether or not OnDone is inlineable
117       // because if the actual OnDone callback needs to be scheduled, MaybeDone
118       // is responsible for dispatching to an executor thread if needed. Thus,
119       // when setting up the finish_tag_, we can set its own callback to
120       // inlineable.
121       finish_tag_.Set(
122           call_.call(),
123           [this](bool) {
124             this->MaybeDone(
125                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
126           },
127           &finish_ops_, /*can_inline=*/true);
128       finish_ops_.set_core_cq_tag(&finish_tag_);
129
130       if (!ctx_->sent_initial_metadata_) {
131         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
132                                         ctx_->initial_metadata_flags());
133         if (ctx_->compression_level_set()) {
134           finish_ops_.set_compression_level(ctx_->compression_level());
135         }
136         ctx_->sent_initial_metadata_ = true;
137       }
138       // The response is dropped if the status is not OK.
139       if (s.ok()) {
140         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
141                                      finish_ops_.SendMessagePtr(response()));
142       } else {
143         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
144       }
145       finish_ops_.set_core_cq_tag(&finish_tag_);
146       call_.PerformOps(&finish_ops_);
147     }
148
149     void SendInitialMetadata() override {
150       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
151       this->Ref();
152       // The callback for this function should not be marked inline because it
153       // is directly invoking a user-controlled reaction
154       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
155       // thread. However, any OnDone needed after that can be inlined because it
156       // is already running on an executor thread.
157       meta_tag_.Set(
158           call_.call(),
159           [this](bool ok) {
160             ServerUnaryReactor* reactor =
161                 reactor_.load(std::memory_order_relaxed);
162             reactor->OnSendInitialMetadataDone(ok);
163             this->MaybeDone(/*inlineable_ondone=*/true);
164           },
165           &meta_ops_, /*can_inline=*/false);
166       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
167                                     ctx_->initial_metadata_flags());
168       if (ctx_->compression_level_set()) {
169         meta_ops_.set_compression_level(ctx_->compression_level());
170       }
171       ctx_->sent_initial_metadata_ = true;
172       meta_ops_.set_core_cq_tag(&meta_tag_);
173       call_.PerformOps(&meta_ops_);
174     }
175
176    private:
177     friend class CallbackUnaryHandler<RequestType, ResponseType>;
178
179     ServerCallbackUnaryImpl(
180         ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
181         MessageHolder<RequestType, ResponseType>* allocator_state,
182         std::function<void()> call_requester)
183         : ctx_(ctx),
184           call_(*call),
185           allocator_state_(allocator_state),
186           call_requester_(std::move(call_requester)) {
187       ctx_->set_message_allocator_state(allocator_state);
188     }
189
190     /// SetupReactor binds the reactor (which also releases any queued
191     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
192     /// the completion of the RPC. This should be the last component of the
193     /// handler.
194     void SetupReactor(ServerUnaryReactor* reactor) {
195       reactor_.store(reactor, std::memory_order_relaxed);
196       this->BindReactor(reactor);
197       this->MaybeCallOnCancel(reactor);
198       this->MaybeDone(reactor->InternalInlineable());
199     }
200
201     const RequestType* request() { return allocator_state_->request(); }
202     ResponseType* response() { return allocator_state_->response(); }
203
204     void CallOnDone() override {
205       reactor_.load(std::memory_order_relaxed)->OnDone();
206       grpc_call* call = call_.call();
207       auto call_requester = std::move(call_requester_);
208       allocator_state_->Release();
209       if (ctx_->context_allocator() != nullptr) {
210         ctx_->context_allocator()->Release(ctx_);
211       }
212       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
213       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
214       call_requester();
215     }
216
217     ServerReactor* reactor() override {
218       return reactor_.load(std::memory_order_relaxed);
219     }
220
221     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
222         meta_ops_;
223     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
224     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
225                                 ::grpc::internal::CallOpSendMessage,
226                                 ::grpc::internal::CallOpServerSendStatus>
227         finish_ops_;
228     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
229
230     ::grpc::CallbackServerContext* const ctx_;
231     ::grpc::internal::Call call_;
232     MessageHolder<RequestType, ResponseType>* const allocator_state_;
233     std::function<void()> call_requester_;
234     // reactor_ can always be loaded/stored with relaxed memory ordering because
235     // its value is only set once, independently of other data in the object,
236     // and the loads that use it will always actually come provably later even
237     // though they are from different threads since they are triggered by
238     // actions initiated only by the setting up of the reactor_ variable. In
239     // a sense, it's a delayed "const": it gets its value from the SetupReactor
240     // method (not the constructor, so it's not a true const), but it doesn't
241     // change after that and it only gets used by actions caused, directly or
242     // indirectly, by that setup. This comment also applies to the reactor_
243     // variables of the other streaming objects in this file.
244     std::atomic<ServerUnaryReactor*> reactor_;
245     // callbacks_outstanding_ follows a refcount pattern
246     std::atomic<intptr_t> callbacks_outstanding_{
247         3};  // reserve for start, Finish, and CompletionOp
248   };
249 };
250
251 template <class RequestType, class ResponseType>
252 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
253  public:
254   explicit CallbackClientStreamingHandler(
255       std::function<ServerReadReactor<RequestType>*(
256           ::grpc::CallbackServerContext*, ResponseType*)>
257           get_reactor)
258       : get_reactor_(std::move(get_reactor)) {}
259   void RunHandler(const HandlerParameter& param) final {
260     // Arena allocate a reader structure (that includes response)
261     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
262
263     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
264         param.call->call(), sizeof(ServerCallbackReaderImpl)))
265         ServerCallbackReaderImpl(
266             static_cast<::grpc::CallbackServerContext*>(param.server_context),
267             param.call, param.call_requester);
268     // Inlineable OnDone can be false in the CompletionOp callback because there
269     // is no read reactor that has an inlineable OnDone; this only applies to
270     // the DefaultReactor (which is unary).
271     param.server_context->BeginCompletionOp(
272         param.call,
273         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
274         reader);
275
276     ServerReadReactor<RequestType>* reactor = nullptr;
277     if (param.status.ok()) {
278       reactor = ::grpc::internal::CatchingReactorGetter<
279           ServerReadReactor<RequestType>>(
280           get_reactor_,
281           static_cast<::grpc::CallbackServerContext*>(param.server_context),
282           reader->response());
283     }
284
285     if (reactor == nullptr) {
286       // if deserialization or reactor creator failed, we need to fail the call
287       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
288           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
289           UnimplementedReadReactor<RequestType>(
290               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
291     }
292
293     reader->SetupReactor(reactor);
294   }
295
296  private:
297   std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
298                                                 ResponseType*)>
299       get_reactor_;
300
301   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
302    public:
303     void Finish(::grpc::Status s) override {
304       // A finish tag with only MaybeDone can have its callback inlined
305       // regardless even if OnDone is not inlineable because this callback just
306       // checks a ref and then decides whether or not to dispatch OnDone.
307       finish_tag_.Set(
308           call_.call(),
309           [this](bool) {
310             // Inlineable OnDone can be false here because there is
311             // no read reactor that has an inlineable OnDone; this
312             // only applies to the DefaultReactor (which is unary).
313             this->MaybeDone(/*inlineable_ondone=*/false);
314           },
315           &finish_ops_, /*can_inline=*/true);
316       if (!ctx_->sent_initial_metadata_) {
317         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
318                                         ctx_->initial_metadata_flags());
319         if (ctx_->compression_level_set()) {
320           finish_ops_.set_compression_level(ctx_->compression_level());
321         }
322         ctx_->sent_initial_metadata_ = true;
323       }
324       // The response is dropped if the status is not OK.
325       if (s.ok()) {
326         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327                                      finish_ops_.SendMessagePtr(&resp_));
328       } else {
329         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
330       }
331       finish_ops_.set_core_cq_tag(&finish_tag_);
332       call_.PerformOps(&finish_ops_);
333     }
334
335     void SendInitialMetadata() override {
336       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
337       this->Ref();
338       // The callback for this function should not be inlined because it invokes
339       // a user-controlled reaction, but any resulting OnDone can be inlined in
340       // the executor to which this callback is dispatched.
341       meta_tag_.Set(
342           call_.call(),
343           [this](bool ok) {
344             ServerReadReactor<RequestType>* reactor =
345                 reactor_.load(std::memory_order_relaxed);
346             reactor->OnSendInitialMetadataDone(ok);
347             this->MaybeDone(/*inlineable_ondone=*/true);
348           },
349           &meta_ops_, /*can_inline=*/false);
350       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
351                                     ctx_->initial_metadata_flags());
352       if (ctx_->compression_level_set()) {
353         meta_ops_.set_compression_level(ctx_->compression_level());
354       }
355       ctx_->sent_initial_metadata_ = true;
356       meta_ops_.set_core_cq_tag(&meta_tag_);
357       call_.PerformOps(&meta_ops_);
358     }
359
360     void Read(RequestType* req) override {
361       this->Ref();
362       read_ops_.RecvMessage(req);
363       call_.PerformOps(&read_ops_);
364     }
365
366    private:
367     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
368
369     ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
370                              ::grpc::internal::Call* call,
371                              std::function<void()> call_requester)
372         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
373
374     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
375       reactor_.store(reactor, std::memory_order_relaxed);
376       // The callback for this function should not be inlined because it invokes
377       // a user-controlled reaction, but any resulting OnDone can be inlined in
378       // the executor to which this callback is dispatched.
379       read_tag_.Set(
380           call_.call(),
381           [this, reactor](bool ok) {
382             if (GPR_UNLIKELY(!ok)) {
383               ctx_->MaybeMarkCancelledOnRead();
384             }
385             reactor->OnReadDone(ok);
386             this->MaybeDone(/*inlineable_ondone=*/true);
387           },
388           &read_ops_, /*can_inline=*/false);
389       read_ops_.set_core_cq_tag(&read_tag_);
390       this->BindReactor(reactor);
391       this->MaybeCallOnCancel(reactor);
392       // Inlineable OnDone can be false here because there is no read
393       // reactor that has an inlineable OnDone; this only applies to the
394       // DefaultReactor (which is unary).
395       this->MaybeDone(/*inlineable_ondone=*/false);
396     }
397
398     ~ServerCallbackReaderImpl() {}
399
400     ResponseType* response() { return &resp_; }
401
402     void CallOnDone() override {
403       reactor_.load(std::memory_order_relaxed)->OnDone();
404       grpc_call* call = call_.call();
405       auto call_requester = std::move(call_requester_);
406       if (ctx_->context_allocator() != nullptr) {
407         ctx_->context_allocator()->Release(ctx_);
408       }
409       this->~ServerCallbackReaderImpl();  // explicitly call destructor
410       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
411       call_requester();
412     }
413
414     ServerReactor* reactor() override {
415       return reactor_.load(std::memory_order_relaxed);
416     }
417
418     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
419         meta_ops_;
420     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
421     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
422                                 ::grpc::internal::CallOpSendMessage,
423                                 ::grpc::internal::CallOpServerSendStatus>
424         finish_ops_;
425     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
426     ::grpc::internal::CallOpSet<
427         ::grpc::internal::CallOpRecvMessage<RequestType>>
428         read_ops_;
429     ::grpc::internal::CallbackWithSuccessTag read_tag_;
430
431     ::grpc::CallbackServerContext* const ctx_;
432     ::grpc::internal::Call call_;
433     ResponseType resp_;
434     std::function<void()> call_requester_;
435     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
436     std::atomic<ServerReadReactor<RequestType>*> reactor_;
437     // callbacks_outstanding_ follows a refcount pattern
438     std::atomic<intptr_t> callbacks_outstanding_{
439         3};  // reserve for OnStarted, Finish, and CompletionOp
440   };
441 };
442
443 template <class RequestType, class ResponseType>
444 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
445  public:
446   explicit CallbackServerStreamingHandler(
447       std::function<ServerWriteReactor<ResponseType>*(
448           ::grpc::CallbackServerContext*, const RequestType*)>
449           get_reactor)
450       : get_reactor_(std::move(get_reactor)) {}
451   void RunHandler(const HandlerParameter& param) final {
452     // Arena allocate a writer structure
453     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
454
455     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
456         param.call->call(), sizeof(ServerCallbackWriterImpl)))
457         ServerCallbackWriterImpl(
458             static_cast<::grpc::CallbackServerContext*>(param.server_context),
459             param.call, static_cast<RequestType*>(param.request),
460             param.call_requester);
461     // Inlineable OnDone can be false in the CompletionOp callback because there
462     // is no write reactor that has an inlineable OnDone; this only applies to
463     // the DefaultReactor (which is unary).
464     param.server_context->BeginCompletionOp(
465         param.call,
466         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
467         writer);
468
469     ServerWriteReactor<ResponseType>* reactor = nullptr;
470     if (param.status.ok()) {
471       reactor = ::grpc::internal::CatchingReactorGetter<
472           ServerWriteReactor<ResponseType>>(
473           get_reactor_,
474           static_cast<::grpc::CallbackServerContext*>(param.server_context),
475           writer->request());
476     }
477     if (reactor == nullptr) {
478       // if deserialization or reactor creator failed, we need to fail the call
479       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
480           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
481           UnimplementedWriteReactor<ResponseType>(
482               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
483     }
484
485     writer->SetupReactor(reactor);
486   }
487
488   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
489                     ::grpc::Status* status, void** /*handler_data*/) final {
490     ::grpc::ByteBuffer buf;
491     buf.set_buffer(req);
492     auto* request =
493         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
494             call, sizeof(RequestType))) RequestType();
495     *status =
496         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
497     buf.Release();
498     if (status->ok()) {
499       return request;
500     }
501     request->~RequestType();
502     return nullptr;
503   }
504
505  private:
506   std::function<ServerWriteReactor<ResponseType>*(
507       ::grpc::CallbackServerContext*, const RequestType*)>
508       get_reactor_;
509
510   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
511    public:
512     void Finish(::grpc::Status s) override {
513       // A finish tag with only MaybeDone can have its callback inlined
514       // regardless even if OnDone is not inlineable because this callback just
515       // checks a ref and then decides whether or not to dispatch OnDone.
516       finish_tag_.Set(
517           call_.call(),
518           [this](bool) {
519             // Inlineable OnDone can be false here because there is
520             // no write reactor that has an inlineable OnDone; this
521             // only applies to the DefaultReactor (which is unary).
522             this->MaybeDone(/*inlineable_ondone=*/false);
523           },
524           &finish_ops_, /*can_inline=*/true);
525       finish_ops_.set_core_cq_tag(&finish_tag_);
526
527       if (!ctx_->sent_initial_metadata_) {
528         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
529                                         ctx_->initial_metadata_flags());
530         if (ctx_->compression_level_set()) {
531           finish_ops_.set_compression_level(ctx_->compression_level());
532         }
533         ctx_->sent_initial_metadata_ = true;
534       }
535       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
536       call_.PerformOps(&finish_ops_);
537     }
538
539     void SendInitialMetadata() override {
540       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
541       this->Ref();
542       // The callback for this function should not be inlined because it invokes
543       // a user-controlled reaction, but any resulting OnDone can be inlined in
544       // the executor to which this callback is dispatched.
545       meta_tag_.Set(
546           call_.call(),
547           [this](bool ok) {
548             ServerWriteReactor<ResponseType>* reactor =
549                 reactor_.load(std::memory_order_relaxed);
550             reactor->OnSendInitialMetadataDone(ok);
551             this->MaybeDone(/*inlineable_ondone=*/true);
552           },
553           &meta_ops_, /*can_inline=*/false);
554       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
555                                     ctx_->initial_metadata_flags());
556       if (ctx_->compression_level_set()) {
557         meta_ops_.set_compression_level(ctx_->compression_level());
558       }
559       ctx_->sent_initial_metadata_ = true;
560       meta_ops_.set_core_cq_tag(&meta_tag_);
561       call_.PerformOps(&meta_ops_);
562     }
563
564     void Write(const ResponseType* resp,
565                ::grpc::WriteOptions options) override {
566       this->Ref();
567       if (options.is_last_message()) {
568         options.set_buffer_hint();
569       }
570       if (!ctx_->sent_initial_metadata_) {
571         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
572                                        ctx_->initial_metadata_flags());
573         if (ctx_->compression_level_set()) {
574           write_ops_.set_compression_level(ctx_->compression_level());
575         }
576         ctx_->sent_initial_metadata_ = true;
577       }
578       // TODO(vjpai): don't assert
579       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
580       call_.PerformOps(&write_ops_);
581     }
582
583     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
584                         ::grpc::Status s) override {
585       // This combines the write into the finish callback
586       // TODO(vjpai): don't assert
587       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
588       Finish(std::move(s));
589     }
590
591    private:
592     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
593
594     ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
595                              ::grpc::internal::Call* call,
596                              const RequestType* req,
597                              std::function<void()> call_requester)
598         : ctx_(ctx),
599           call_(*call),
600           req_(req),
601           call_requester_(std::move(call_requester)) {}
602
603     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
604       reactor_.store(reactor, std::memory_order_relaxed);
605       // The callback for this function should not be inlined because it invokes
606       // a user-controlled reaction, but any resulting OnDone can be inlined in
607       // the executor to which this callback is dispatched.
608       write_tag_.Set(
609           call_.call(),
610           [this, reactor](bool ok) {
611             reactor->OnWriteDone(ok);
612             this->MaybeDone(/*inlineable_ondone=*/true);
613           },
614           &write_ops_, /*can_inline=*/false);
615       write_ops_.set_core_cq_tag(&write_tag_);
616       this->BindReactor(reactor);
617       this->MaybeCallOnCancel(reactor);
618       // Inlineable OnDone can be false here because there is no write
619       // reactor that has an inlineable OnDone; this only applies to the
620       // DefaultReactor (which is unary).
621       this->MaybeDone(/*inlineable_ondone=*/false);
622     }
623     ~ServerCallbackWriterImpl() {
624       if (req_ != nullptr) {
625         req_->~RequestType();
626       }
627     }
628
629     const RequestType* request() { return req_; }
630
631     void CallOnDone() override {
632       reactor_.load(std::memory_order_relaxed)->OnDone();
633       grpc_call* call = call_.call();
634       auto call_requester = std::move(call_requester_);
635       if (ctx_->context_allocator() != nullptr) {
636         ctx_->context_allocator()->Release(ctx_);
637       }
638       this->~ServerCallbackWriterImpl();  // explicitly call destructor
639       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
640       call_requester();
641     }
642
643     ServerReactor* reactor() override {
644       return reactor_.load(std::memory_order_relaxed);
645     }
646
647     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
648         meta_ops_;
649     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
650     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
651                                 ::grpc::internal::CallOpSendMessage,
652                                 ::grpc::internal::CallOpServerSendStatus>
653         finish_ops_;
654     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
655     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
656                                 ::grpc::internal::CallOpSendMessage>
657         write_ops_;
658     ::grpc::internal::CallbackWithSuccessTag write_tag_;
659
660     ::grpc::CallbackServerContext* const ctx_;
661     ::grpc::internal::Call call_;
662     const RequestType* req_;
663     std::function<void()> call_requester_;
664     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
665     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
666     // callbacks_outstanding_ follows a refcount pattern
667     std::atomic<intptr_t> callbacks_outstanding_{
668         3};  // reserve for OnStarted, Finish, and CompletionOp
669   };
670 };
671
672 template <class RequestType, class ResponseType>
673 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
674  public:
675   explicit CallbackBidiHandler(
676       std::function<ServerBidiReactor<RequestType, ResponseType>*(
677           ::grpc::CallbackServerContext*)>
678           get_reactor)
679       : get_reactor_(std::move(get_reactor)) {}
680   void RunHandler(const HandlerParameter& param) final {
681     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
682
683     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
684         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
685         ServerCallbackReaderWriterImpl(
686             static_cast<::grpc::CallbackServerContext*>(param.server_context),
687             param.call, param.call_requester);
688     // Inlineable OnDone can be false in the CompletionOp callback because there
689     // is no bidi reactor that has an inlineable OnDone; this only applies to
690     // the DefaultReactor (which is unary).
691     param.server_context->BeginCompletionOp(
692         param.call,
693         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
694         stream);
695
696     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
697     if (param.status.ok()) {
698       reactor = ::grpc::internal::CatchingReactorGetter<
699           ServerBidiReactor<RequestType, ResponseType>>(
700           get_reactor_,
701           static_cast<::grpc::CallbackServerContext*>(param.server_context));
702     }
703
704     if (reactor == nullptr) {
705       // if deserialization or reactor creator failed, we need to fail the call
706       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
707           param.call->call(),
708           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
709           UnimplementedBidiReactor<RequestType, ResponseType>(
710               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
711     }
712
713     stream->SetupReactor(reactor);
714   }
715
716  private:
717   std::function<ServerBidiReactor<RequestType, ResponseType>*(
718       ::grpc::CallbackServerContext*)>
719       get_reactor_;
720
721   class ServerCallbackReaderWriterImpl
722       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
723    public:
724     void Finish(::grpc::Status s) override {
725       // A finish tag with only MaybeDone can have its callback inlined
726       // regardless even if OnDone is not inlineable because this callback just
727       // checks a ref and then decides whether or not to dispatch OnDone.
728       finish_tag_.Set(
729           call_.call(),
730           [this](bool) {
731             // Inlineable OnDone can be false here because there is
732             // no bidi reactor that has an inlineable OnDone; this
733             // only applies to the DefaultReactor (which is unary).
734             this->MaybeDone(/*inlineable_ondone=*/false);
735           },
736           &finish_ops_, /*can_inline=*/true);
737       finish_ops_.set_core_cq_tag(&finish_tag_);
738
739       if (!ctx_->sent_initial_metadata_) {
740         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
741                                         ctx_->initial_metadata_flags());
742         if (ctx_->compression_level_set()) {
743           finish_ops_.set_compression_level(ctx_->compression_level());
744         }
745         ctx_->sent_initial_metadata_ = true;
746       }
747       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
748       call_.PerformOps(&finish_ops_);
749     }
750
751     void SendInitialMetadata() override {
752       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
753       this->Ref();
754       // The callback for this function should not be inlined because it invokes
755       // a user-controlled reaction, but any resulting OnDone can be inlined in
756       // the executor to which this callback is dispatched.
757       meta_tag_.Set(
758           call_.call(),
759           [this](bool ok) {
760             ServerBidiReactor<RequestType, ResponseType>* reactor =
761                 reactor_.load(std::memory_order_relaxed);
762             reactor->OnSendInitialMetadataDone(ok);
763             this->MaybeDone(/*inlineable_ondone=*/true);
764           },
765           &meta_ops_, /*can_inline=*/false);
766       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
767                                     ctx_->initial_metadata_flags());
768       if (ctx_->compression_level_set()) {
769         meta_ops_.set_compression_level(ctx_->compression_level());
770       }
771       ctx_->sent_initial_metadata_ = true;
772       meta_ops_.set_core_cq_tag(&meta_tag_);
773       call_.PerformOps(&meta_ops_);
774     }
775
776     void Write(const ResponseType* resp,
777                ::grpc::WriteOptions options) override {
778       this->Ref();
779       if (options.is_last_message()) {
780         options.set_buffer_hint();
781       }
782       if (!ctx_->sent_initial_metadata_) {
783         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
784                                        ctx_->initial_metadata_flags());
785         if (ctx_->compression_level_set()) {
786           write_ops_.set_compression_level(ctx_->compression_level());
787         }
788         ctx_->sent_initial_metadata_ = true;
789       }
790       // TODO(vjpai): don't assert
791       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
792       call_.PerformOps(&write_ops_);
793     }
794
795     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
796                         ::grpc::Status s) override {
797       // TODO(vjpai): don't assert
798       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
799       Finish(std::move(s));
800     }
801
802     void Read(RequestType* req) override {
803       this->Ref();
804       read_ops_.RecvMessage(req);
805       call_.PerformOps(&read_ops_);
806     }
807
808    private:
809     friend class CallbackBidiHandler<RequestType, ResponseType>;
810
811     ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
812                                    ::grpc::internal::Call* call,
813                                    std::function<void()> call_requester)
814         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
815
816     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
817       reactor_.store(reactor, std::memory_order_relaxed);
818       // The callbacks for these functions should not be inlined because they
819       // invoke user-controlled reactions, but any resulting OnDones can be
820       // inlined in the executor to which a callback is dispatched.
821       write_tag_.Set(
822           call_.call(),
823           [this, reactor](bool ok) {
824             reactor->OnWriteDone(ok);
825             this->MaybeDone(/*inlineable_ondone=*/true);
826           },
827           &write_ops_, /*can_inline=*/false);
828       write_ops_.set_core_cq_tag(&write_tag_);
829       read_tag_.Set(
830           call_.call(),
831           [this, reactor](bool ok) {
832             if (GPR_UNLIKELY(!ok)) {
833               ctx_->MaybeMarkCancelledOnRead();
834             }
835             reactor->OnReadDone(ok);
836             this->MaybeDone(/*inlineable_ondone=*/true);
837           },
838           &read_ops_, /*can_inline=*/false);
839       read_ops_.set_core_cq_tag(&read_tag_);
840       this->BindReactor(reactor);
841       this->MaybeCallOnCancel(reactor);
842       // Inlineable OnDone can be false here because there is no bidi
843       // reactor that has an inlineable OnDone; this only applies to the
844       // DefaultReactor (which is unary).
845       this->MaybeDone(/*inlineable_ondone=*/false);
846     }
847
848     void CallOnDone() override {
849       reactor_.load(std::memory_order_relaxed)->OnDone();
850       grpc_call* call = call_.call();
851       auto call_requester = std::move(call_requester_);
852       if (ctx_->context_allocator() != nullptr) {
853         ctx_->context_allocator()->Release(ctx_);
854       }
855       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
856       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
857       call_requester();
858     }
859
860     ServerReactor* reactor() override {
861       return reactor_.load(std::memory_order_relaxed);
862     }
863
864     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
865         meta_ops_;
866     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
867     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
868                                 ::grpc::internal::CallOpSendMessage,
869                                 ::grpc::internal::CallOpServerSendStatus>
870         finish_ops_;
871     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
872     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
873                                 ::grpc::internal::CallOpSendMessage>
874         write_ops_;
875     ::grpc::internal::CallbackWithSuccessTag write_tag_;
876     ::grpc::internal::CallOpSet<
877         ::grpc::internal::CallOpRecvMessage<RequestType>>
878         read_ops_;
879     ::grpc::internal::CallbackWithSuccessTag read_tag_;
880
881     ::grpc::CallbackServerContext* const ctx_;
882     ::grpc::internal::Call call_;
883     std::function<void()> call_requester_;
884     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
885     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
886     // callbacks_outstanding_ follows a refcount pattern
887     std::atomic<intptr_t> callbacks_outstanding_{
888         3};  // reserve for OnStarted, Finish, and CompletionOp
889   };
890 };
891
892 }  // namespace internal
893 }  // namespace grpc
894
895 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H