3f6d5cd3555a1a0c94a604b3d29850cce6f75cc5
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback.h
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
21
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25
26 #include <grpcpp/impl/codegen/call.h>
27 #include <grpcpp/impl/codegen/call_op_set.h>
28 #include <grpcpp/impl/codegen/callback_common.h>
29 #include <grpcpp/impl/codegen/config.h>
30 #include <grpcpp/impl/codegen/core_codegen_interface.h>
31 #include <grpcpp/impl/codegen/message_allocator.h>
32 #include <grpcpp/impl/codegen/server_context.h>
33 #include <grpcpp/impl/codegen/server_interface.h>
34 #include <grpcpp/impl/codegen/status.h>
35
36 namespace grpc {
37
38 // Declare base class of all reactors as internal
39 namespace internal {
40
41 // Forward declarations
42 template <class Request, class Response>
43 class CallbackClientStreamingHandler;
44 template <class Request, class Response>
45 class CallbackServerStreamingHandler;
46 template <class Request, class Response>
47 class CallbackBidiHandler;
48
49 class ServerReactor {
50  public:
51   virtual ~ServerReactor() = default;
52   virtual void OnDone() = 0;
53   virtual void OnCancel() = 0;
54
55  private:
56   friend class ::grpc::ServerContext;
57   template <class Request, class Response>
58   friend class CallbackClientStreamingHandler;
59   template <class Request, class Response>
60   friend class CallbackServerStreamingHandler;
61   template <class Request, class Response>
62   friend class CallbackBidiHandler;
63
64   // The ServerReactor is responsible for tracking when it is safe to call
65   // OnCancel. This function should not be called until after OnStarted is done
66   // and the RPC has completed with a cancellation. This is tracked by counting
67   // how many of these conditions have been met and calling OnCancel when none
68   // remain unmet.
69
70   void MaybeCallOnCancel() {
71     if (on_cancel_conditions_remaining_.fetch_sub(
72             1, std::memory_order_acq_rel) == 1) {
73       OnCancel();
74     }
75   }
76
77   std::atomic_int on_cancel_conditions_remaining_{2};
78 };
79
80 }  // namespace internal
81
82 namespace experimental {
83
84 // Forward declarations
85 template <class Request, class Response>
86 class ServerReadReactor;
87 template <class Request, class Response>
88 class ServerWriteReactor;
89 template <class Request, class Response>
90 class ServerBidiReactor;
91
92 // For unary RPCs, the exposed controller class is only an interface
93 // and the actual implementation is an internal class.
94 class ServerCallbackRpcController {
95  public:
96   virtual ~ServerCallbackRpcController() = default;
97
98   // The method handler must call this function when it is done so that
99   // the library knows to free its resources
100   virtual void Finish(Status s) = 0;
101
102   // Allow the method handler to push out the initial metadata before
103   // the response and status are ready
104   virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
105
106   /// SetCancelCallback passes in a callback to be called when the RPC is
107   /// canceled for whatever reason (streaming calls have OnCancel instead). This
108   /// is an advanced and uncommon use with several important restrictions. This
109   /// function may not be called more than once on the same RPC.
110   ///
111   /// If code calls SetCancelCallback on an RPC, it must also call
112   /// ClearCancelCallback before calling Finish on the RPC controller. That
113   /// method makes sure that no cancellation callback is executed for this RPC
114   /// beyond the point of its return. ClearCancelCallback may be called even if
115   /// SetCancelCallback was not called for this RPC, and it may be called
116   /// multiple times. It _must_ be called if SetCancelCallback was called for
117   /// this RPC.
118   ///
119   /// The callback should generally be lightweight and nonblocking and primarily
120   /// concerned with clearing application state related to the RPC or causing
121   /// operations (such as cancellations) to happen on dependent RPCs.
122   ///
123   /// If the RPC is already canceled at the time that SetCancelCallback is
124   /// called, the callback is invoked immediately.
125   ///
126   /// The cancellation callback may be executed concurrently with the method
127   /// handler that invokes it but will certainly not issue or execute after the
128   /// return of ClearCancelCallback. If ClearCancelCallback is invoked while the
129   /// callback is already executing, the callback will complete its execution
130   /// before ClearCancelCallback takes effect.
131   ///
132   /// To preserve the orderings described above, the callback may be called
133   /// under a lock that is also used for ClearCancelCallback and
134   /// ServerContext::IsCancelled, so the callback CANNOT call either of those
135   /// operations on this RPC or any other function that causes those operations
136   /// to be called before the callback completes.
137   virtual void SetCancelCallback(std::function<void()> callback) = 0;
138   virtual void ClearCancelCallback() = 0;
139
140   // NOTE: This is an API for advanced users who need custom allocators.
141   // Optionally deallocate request early to reduce the size of working set.
142   // A custom MessageAllocator needs to be registered to make use of this.
143   virtual void FreeRequest() = 0;
144   // NOTE: This is an API for advanced users who need custom allocators.
145   // Get and maybe mutate the allocator state associated with the current RPC.
146   virtual void* GetAllocatorState() = 0;
147 };
148
149 // NOTE: The actual streaming object classes are provided
150 // as API only to support mocking. There are no implementations of
151 // these class interfaces in the API.
152 template <class Request>
153 class ServerCallbackReader {
154  public:
155   virtual ~ServerCallbackReader() {}
156   virtual void Finish(Status s) = 0;
157   virtual void SendInitialMetadata() = 0;
158   virtual void Read(Request* msg) = 0;
159
160  protected:
161   template <class Response>
162   void BindReactor(ServerReadReactor<Request, Response>* reactor) {
163     reactor->BindReader(this);
164   }
165 };
166
167 template <class Response>
168 class ServerCallbackWriter {
169  public:
170   virtual ~ServerCallbackWriter() {}
171
172   virtual void Finish(Status s) = 0;
173   virtual void SendInitialMetadata() = 0;
174   virtual void Write(const Response* msg, WriteOptions options) = 0;
175   virtual void WriteAndFinish(const Response* msg, WriteOptions options,
176                               Status s) {
177     // Default implementation that can/should be overridden
178     Write(msg, std::move(options));
179     Finish(std::move(s));
180   }
181
182  protected:
183   template <class Request>
184   void BindReactor(ServerWriteReactor<Request, Response>* reactor) {
185     reactor->BindWriter(this);
186   }
187 };
188
189 template <class Request, class Response>
190 class ServerCallbackReaderWriter {
191  public:
192   virtual ~ServerCallbackReaderWriter() {}
193
194   virtual void Finish(Status s) = 0;
195   virtual void SendInitialMetadata() = 0;
196   virtual void Read(Request* msg) = 0;
197   virtual void Write(const Response* msg, WriteOptions options) = 0;
198   virtual void WriteAndFinish(const Response* msg, WriteOptions options,
199                               Status s) {
200     // Default implementation that can/should be overridden
201     Write(msg, std::move(options));
202     Finish(std::move(s));
203   }
204
205  protected:
206   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
207     reactor->BindStream(this);
208   }
209 };
210
211 // The following classes are the reactor interfaces that are to be implemented
212 // by the user, returned as the result of the method handler for a callback
213 // method, and activated by the call to OnStarted. The library guarantees that
214 // OnStarted will be called for any reactor that has been created using a
215 // method handler registered on a service. No operation initiation method may be
216 // called until after the call to OnStarted.
217 // Note that none of the classes are pure; all reactions have a default empty
218 // reaction so that the user class only needs to override those classes that it
219 // cares about.
220
221 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
222 template <class Request, class Response>
223 class ServerBidiReactor : public internal::ServerReactor {
224  public:
225   ~ServerBidiReactor() = default;
226
227   /// Do NOT call any operation initiation method (names that start with Start)
228   /// until after the library has called OnStarted on this object.
229
230   /// Send any initial metadata stored in the RPC context. If not invoked,
231   /// any initial metadata will be passed along with the first Write or the
232   /// Finish (if there are no writes).
233   void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
234
235   /// Initiate a read operation.
236   ///
237   /// \param[out] req Where to eventually store the read message. Valid when
238   ///                 the library calls OnReadDone
239   void StartRead(Request* req) { stream_->Read(req); }
240
241   /// Initiate a write operation.
242   ///
243   /// \param[in] resp The message to be written. The library takes temporary
244   ///                 ownership until OnWriteDone, at which point the
245   ///                 application regains ownership of resp.
246   void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
247
248   /// Initiate a write operation with specified options.
249   ///
250   /// \param[in] resp The message to be written. The library takes temporary
251   ///                 ownership until OnWriteDone, at which point the
252   ///                 application regains ownership of resp.
253   /// \param[in] options The WriteOptions to use for writing this message
254   void StartWrite(const Response* resp, WriteOptions options) {
255     stream_->Write(resp, std::move(options));
256   }
257
258   /// Initiate a write operation with specified options and final RPC Status,
259   /// which also causes any trailing metadata for this RPC to be sent out.
260   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
261   /// single step. A key difference, though, is that this operation doesn't have
262   /// an OnWriteDone reaction - it is considered complete only when OnDone is
263   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
264   /// both.
265   ///
266   /// \param[in] resp The message to be written. The library takes temporary
267   ///                 ownership until Onone, at which point the application
268   ///                 regains ownership of resp.
269   /// \param[in] options The WriteOptions to use for writing this message
270   /// \param[in] s The status outcome of this RPC
271   void StartWriteAndFinish(const Response* resp, WriteOptions options,
272                            Status s) {
273     stream_->WriteAndFinish(resp, std::move(options), std::move(s));
274   }
275
276   /// Inform system of a planned write operation with specified options, but
277   /// allow the library to schedule the actual write coalesced with the writing
278   /// of trailing metadata (which takes place on a Finish call).
279   ///
280   /// \param[in] resp The message to be written. The library takes temporary
281   ///                 ownership until OnWriteDone, at which point the
282   ///                 application regains ownership of resp.
283   /// \param[in] options The WriteOptions to use for writing this message
284   void StartWriteLast(const Response* resp, WriteOptions options) {
285     StartWrite(resp, std::move(options.set_last_message()));
286   }
287
288   /// Indicate that the stream is to be finished and the trailing metadata and
289   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
290   /// or StartWriteAndFinish (but not both), even if the RPC is already
291   /// cancelled.
292   ///
293   /// \param[in] s The status outcome of this RPC
294   void Finish(Status s) { stream_->Finish(std::move(s)); }
295
296   /// Notify the application that a streaming RPC has started and that it is now
297   /// ok to call any operation initiation method. An RPC is considered started
298   /// after the server has received all initial metadata from the client, which
299   /// is a result of the client calling StartCall().
300   ///
301   /// \param[in] context The context object now associated with this RPC
302   virtual void OnStarted(ServerContext* context) {}
303
304   /// Notifies the application that an explicit StartSendInitialMetadata
305   /// operation completed. Not used when the sending of initial metadata
306   /// piggybacks onto the first write.
307   ///
308   /// \param[in] ok Was it successful? If false, no further write-side operation
309   ///               will succeed.
310   virtual void OnSendInitialMetadataDone(bool ok) {}
311
312   /// Notifies the application that a StartRead operation completed.
313   ///
314   /// \param[in] ok Was it successful? If false, no further read-side operation
315   ///               will succeed.
316   virtual void OnReadDone(bool ok) {}
317
318   /// Notifies the application that a StartWrite (or StartWriteLast) operation
319   /// completed.
320   ///
321   /// \param[in] ok Was it successful? If false, no further write-side operation
322   ///               will succeed.
323   virtual void OnWriteDone(bool ok) {}
324
325   /// Notifies the application that all operations associated with this RPC
326   /// have completed. This is an override (from the internal base class) but not
327   /// final, so derived classes should override it if they want to take action.
328   void OnDone() override {}
329
330   /// Notifies the application that this RPC has been cancelled. This is an
331   /// override (from the internal base class) but not final, so derived classes
332   /// should override it if they want to take action.
333   void OnCancel() override {}
334
335  private:
336   friend class ServerCallbackReaderWriter<Request, Response>;
337   void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
338     stream_ = stream;
339   }
340
341   ServerCallbackReaderWriter<Request, Response>* stream_;
342 };
343
344 /// \a ServerReadReactor is the interface for a client-streaming RPC.
345 template <class Request, class Response>
346 class ServerReadReactor : public internal::ServerReactor {
347  public:
348   ~ServerReadReactor() = default;
349
350   /// The following operation initiations are exactly like ServerBidiReactor.
351   void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
352   void StartRead(Request* req) { reader_->Read(req); }
353   void Finish(Status s) { reader_->Finish(std::move(s)); }
354
355   /// Similar to ServerBidiReactor::OnStarted, except that this also provides
356   /// the response object that the stream fills in before calling Finish.
357   /// (It must be filled in if status is OK, but it may be filled in otherwise.)
358   ///
359   /// \param[in] context The context object now associated with this RPC
360   /// \param[in] resp The response object to be used by this RPC
361   virtual void OnStarted(ServerContext* context, Response* resp) {}
362
363   /// The following notifications are exactly like ServerBidiReactor.
364   virtual void OnSendInitialMetadataDone(bool ok) {}
365   virtual void OnReadDone(bool ok) {}
366   void OnDone() override {}
367   void OnCancel() override {}
368
369  private:
370   friend class ServerCallbackReader<Request>;
371   void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
372
373   ServerCallbackReader<Request>* reader_;
374 };
375
376 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
377 template <class Request, class Response>
378 class ServerWriteReactor : public internal::ServerReactor {
379  public:
380   ~ServerWriteReactor() = default;
381
382   /// The following operation initiations are exactly like ServerBidiReactor.
383   void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
384   void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
385   void StartWrite(const Response* resp, WriteOptions options) {
386     writer_->Write(resp, std::move(options));
387   }
388   void StartWriteAndFinish(const Response* resp, WriteOptions options,
389                            Status s) {
390     writer_->WriteAndFinish(resp, std::move(options), std::move(s));
391   }
392   void StartWriteLast(const Response* resp, WriteOptions options) {
393     StartWrite(resp, std::move(options.set_last_message()));
394   }
395   void Finish(Status s) { writer_->Finish(std::move(s)); }
396
397   /// Similar to ServerBidiReactor::OnStarted, except that this also provides
398   /// the request object sent by the client.
399   ///
400   /// \param[in] context The context object now associated with this RPC
401   /// \param[in] req The request object sent by the client
402   virtual void OnStarted(ServerContext* context, const Request* req) {}
403
404   /// The following notifications are exactly like ServerBidiReactor.
405   virtual void OnSendInitialMetadataDone(bool ok) {}
406   virtual void OnWriteDone(bool ok) {}
407   void OnDone() override {}
408   void OnCancel() override {}
409
410  private:
411   friend class ServerCallbackWriter<Response>;
412   void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
413
414   ServerCallbackWriter<Response>* writer_;
415 };
416
417 }  // namespace experimental
418
419 namespace internal {
420
421 template <class Request, class Response>
422 class UnimplementedReadReactor
423     : public experimental::ServerReadReactor<Request, Response> {
424  public:
425   void OnDone() override { delete this; }
426   void OnStarted(ServerContext*, Response*) override {
427     this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
428   }
429 };
430
431 template <class Request, class Response>
432 class UnimplementedWriteReactor
433     : public experimental::ServerWriteReactor<Request, Response> {
434  public:
435   void OnDone() override { delete this; }
436   void OnStarted(ServerContext*, const Request*) override {
437     this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
438   }
439 };
440
441 template <class Request, class Response>
442 class UnimplementedBidiReactor
443     : public experimental::ServerBidiReactor<Request, Response> {
444  public:
445   void OnDone() override { delete this; }
446   void OnStarted(ServerContext*) override {
447     this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
448   }
449 };
450
451 template <class RequestType, class ResponseType>
452 class CallbackUnaryHandler : public MethodHandler {
453  public:
454   CallbackUnaryHandler(
455       std::function<void(ServerContext*, const RequestType*, ResponseType*,
456                          experimental::ServerCallbackRpcController*)>
457           func)
458       : func_(func) {}
459
460   void SetMessageAllocator(
461       experimental::MessageAllocator<RequestType, ResponseType>* allocator) {
462     allocator_ = allocator;
463   }
464
465   void RunHandler(const HandlerParameter& param) final {
466     // Arena allocate a controller structure (that includes request/response)
467     g_core_codegen_interface->grpc_call_ref(param.call->call());
468     auto* allocator_info =
469         static_cast<experimental::RpcAllocatorInfo<RequestType, ResponseType>*>(
470             param.internal_data);
471     auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
472         param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
473         ServerCallbackRpcControllerImpl(param.server_context, param.call,
474                                         allocator_info, allocator_,
475                                         std::move(param.call_requester));
476     Status status = param.status;
477     if (status.ok()) {
478       // Call the actual function handler and expect the user to call finish
479       CatchingCallback(func_, param.server_context, controller->request(),
480                        controller->response(), controller);
481     } else {
482       // if deserialization failed, we need to fail the call
483       controller->Finish(status);
484     }
485   }
486
487   void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
488                     void** handler_data) final {
489     ByteBuffer buf;
490     buf.set_buffer(req);
491     RequestType* request = nullptr;
492     experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info =
493         new (g_core_codegen_interface->grpc_call_arena_alloc(
494             call, sizeof(*allocator_info)))
495             experimental::RpcAllocatorInfo<RequestType, ResponseType>();
496     if (allocator_ != nullptr) {
497       allocator_->AllocateMessages(allocator_info);
498     } else {
499       allocator_info->request =
500           new (g_core_codegen_interface->grpc_call_arena_alloc(
501               call, sizeof(RequestType))) RequestType();
502       allocator_info->response =
503           new (g_core_codegen_interface->grpc_call_arena_alloc(
504               call, sizeof(ResponseType))) ResponseType();
505     }
506     *handler_data = allocator_info;
507     request = allocator_info->request;
508     *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
509     buf.Release();
510     if (status->ok()) {
511       return request;
512     }
513     // Clean up on deserialization failure.
514     if (allocator_ != nullptr) {
515       allocator_->DeallocateMessages(allocator_info);
516     } else {
517       allocator_info->request->~RequestType();
518       allocator_info->response->~ResponseType();
519       allocator_info->request = nullptr;
520       allocator_info->response = nullptr;
521     }
522     return nullptr;
523   }
524
525  private:
526   std::function<void(ServerContext*, const RequestType*, ResponseType*,
527                      experimental::ServerCallbackRpcController*)>
528       func_;
529   experimental::MessageAllocator<RequestType, ResponseType>* allocator_ =
530       nullptr;
531
532   // The implementation class of ServerCallbackRpcController is a private member
533   // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
534   // it to take advantage of CallbackUnaryHandler's friendships.
535   class ServerCallbackRpcControllerImpl
536       : public experimental::ServerCallbackRpcController {
537    public:
538     void Finish(Status s) override {
539       finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
540                       &finish_ops_);
541       if (!ctx_->sent_initial_metadata_) {
542         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
543                                         ctx_->initial_metadata_flags());
544         if (ctx_->compression_level_set()) {
545           finish_ops_.set_compression_level(ctx_->compression_level());
546         }
547         ctx_->sent_initial_metadata_ = true;
548       }
549       // The response is dropped if the status is not OK.
550       if (s.ok()) {
551         finish_ops_.ServerSendStatus(
552             &ctx_->trailing_metadata_,
553             finish_ops_.SendMessagePtr(allocator_info_->response));
554       } else {
555         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
556       }
557       finish_ops_.set_core_cq_tag(&finish_tag_);
558       call_.PerformOps(&finish_ops_);
559     }
560
561     void SendInitialMetadata(std::function<void(bool)> f) override {
562       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
563       callbacks_outstanding_++;
564       // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
565       //              and if performance of this operation matters
566       meta_tag_.Set(call_.call(),
567                     [this, f](bool ok) {
568                       f(ok);
569                       MaybeDone();
570                     },
571                     &meta_ops_);
572       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
573                                     ctx_->initial_metadata_flags());
574       if (ctx_->compression_level_set()) {
575         meta_ops_.set_compression_level(ctx_->compression_level());
576       }
577       ctx_->sent_initial_metadata_ = true;
578       meta_ops_.set_core_cq_tag(&meta_tag_);
579       call_.PerformOps(&meta_ops_);
580     }
581
582     // Neither SetCancelCallback nor ClearCancelCallback should affect the
583     // callbacks_outstanding_ count since they are paired and both must precede
584     // the invocation of Finish (if they are used at all)
585     void SetCancelCallback(std::function<void()> callback) override {
586       ctx_->SetCancelCallback(std::move(callback));
587     }
588
589     void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }
590
591     void FreeRequest() override {
592       if (allocator_ != nullptr) {
593         allocator_->DeallocateRequest(allocator_info_);
594       }
595     }
596
597     void* GetAllocatorState() override {
598       return allocator_info_->allocator_state;
599     }
600
601    private:
602     friend class CallbackUnaryHandler<RequestType, ResponseType>;
603
604     ServerCallbackRpcControllerImpl(
605         ServerContext* ctx, Call* call,
606         experimental::RpcAllocatorInfo<RequestType, ResponseType>*
607             allocator_info,
608         experimental::MessageAllocator<RequestType, ResponseType>* allocator,
609         std::function<void()> call_requester)
610         : ctx_(ctx),
611           call_(*call),
612           allocator_info_(allocator_info),
613           allocator_(allocator),
614           call_requester_(std::move(call_requester)) {
615       ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
616     }
617
618     const RequestType* request() { return allocator_info_->request; }
619     ResponseType* response() { return allocator_info_->response; }
620
621     void MaybeDone() {
622       if (--callbacks_outstanding_ == 0) {
623         grpc_call* call = call_.call();
624         auto call_requester = std::move(call_requester_);
625         if (allocator_ != nullptr) {
626           allocator_->DeallocateMessages(allocator_info_);
627         } else {
628           if (allocator_info_->request != nullptr) {
629             allocator_info_->request->~RequestType();
630           }
631           if (allocator_info_->response != nullptr) {
632             allocator_info_->response->~ResponseType();
633           }
634         }
635         this->~ServerCallbackRpcControllerImpl();  // explicitly call destructor
636         g_core_codegen_interface->grpc_call_unref(call);
637         call_requester();
638       }
639     }
640
641     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
642     CallbackWithSuccessTag meta_tag_;
643     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
644               CallOpServerSendStatus>
645         finish_ops_;
646     CallbackWithSuccessTag finish_tag_;
647
648     ServerContext* ctx_;
649     Call call_;
650     experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info_;
651     experimental::MessageAllocator<RequestType, ResponseType>* allocator_;
652     std::function<void()> call_requester_;
653     std::atomic_int callbacks_outstanding_{
654         2};  // reserve for Finish and CompletionOp
655   };
656 };
657
658 template <class RequestType, class ResponseType>
659 class CallbackClientStreamingHandler : public MethodHandler {
660  public:
661   CallbackClientStreamingHandler(
662       std::function<
663           experimental::ServerReadReactor<RequestType, ResponseType>*()>
664           func)
665       : func_(std::move(func)) {}
666   void RunHandler(const HandlerParameter& param) final {
667     // Arena allocate a reader structure (that includes response)
668     g_core_codegen_interface->grpc_call_ref(param.call->call());
669
670     experimental::ServerReadReactor<RequestType, ResponseType>* reactor =
671         param.status.ok()
672             ? CatchingReactorCreator<
673                   experimental::ServerReadReactor<RequestType, ResponseType>>(
674                   func_)
675             : nullptr;
676
677     if (reactor == nullptr) {
678       // if deserialization or reactor creator failed, we need to fail the call
679       reactor = new UnimplementedReadReactor<RequestType, ResponseType>;
680     }
681
682     auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
683         param.call->call(), sizeof(ServerCallbackReaderImpl)))
684         ServerCallbackReaderImpl(param.server_context, param.call,
685                                  std::move(param.call_requester), reactor);
686
687     reader->BindReactor(reactor);
688     reactor->OnStarted(param.server_context, reader->response());
689     // The earliest that OnCancel can be called is after OnStarted is done.
690     reactor->MaybeCallOnCancel();
691     reader->MaybeDone();
692   }
693
694  private:
695   std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
696       func_;
697
698   class ServerCallbackReaderImpl
699       : public experimental::ServerCallbackReader<RequestType> {
700    public:
701     void Finish(Status s) override {
702       finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
703                       &finish_ops_);
704       if (!ctx_->sent_initial_metadata_) {
705         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
706                                         ctx_->initial_metadata_flags());
707         if (ctx_->compression_level_set()) {
708           finish_ops_.set_compression_level(ctx_->compression_level());
709         }
710         ctx_->sent_initial_metadata_ = true;
711       }
712       // The response is dropped if the status is not OK.
713       if (s.ok()) {
714         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
715                                      finish_ops_.SendMessagePtr(&resp_));
716       } else {
717         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
718       }
719       finish_ops_.set_core_cq_tag(&finish_tag_);
720       call_.PerformOps(&finish_ops_);
721     }
722
723     void SendInitialMetadata() override {
724       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
725       callbacks_outstanding_++;
726       meta_tag_.Set(call_.call(),
727                     [this](bool ok) {
728                       reactor_->OnSendInitialMetadataDone(ok);
729                       MaybeDone();
730                     },
731                     &meta_ops_);
732       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
733                                     ctx_->initial_metadata_flags());
734       if (ctx_->compression_level_set()) {
735         meta_ops_.set_compression_level(ctx_->compression_level());
736       }
737       ctx_->sent_initial_metadata_ = true;
738       meta_ops_.set_core_cq_tag(&meta_tag_);
739       call_.PerformOps(&meta_ops_);
740     }
741
742     void Read(RequestType* req) override {
743       callbacks_outstanding_++;
744       read_ops_.RecvMessage(req);
745       call_.PerformOps(&read_ops_);
746     }
747
748    private:
749     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
750
751     ServerCallbackReaderImpl(
752         ServerContext* ctx, Call* call, std::function<void()> call_requester,
753         experimental::ServerReadReactor<RequestType, ResponseType>* reactor)
754         : ctx_(ctx),
755           call_(*call),
756           call_requester_(std::move(call_requester)),
757           reactor_(reactor) {
758       ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
759       read_tag_.Set(call_.call(),
760                     [this](bool ok) {
761                       reactor_->OnReadDone(ok);
762                       MaybeDone();
763                     },
764                     &read_ops_);
765       read_ops_.set_core_cq_tag(&read_tag_);
766     }
767
768     ~ServerCallbackReaderImpl() {}
769
770     ResponseType* response() { return &resp_; }
771
772     void MaybeDone() {
773       if (--callbacks_outstanding_ == 0) {
774         reactor_->OnDone();
775         grpc_call* call = call_.call();
776         auto call_requester = std::move(call_requester_);
777         this->~ServerCallbackReaderImpl();  // explicitly call destructor
778         g_core_codegen_interface->grpc_call_unref(call);
779         call_requester();
780       }
781     }
782
783     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
784     CallbackWithSuccessTag meta_tag_;
785     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
786               CallOpServerSendStatus>
787         finish_ops_;
788     CallbackWithSuccessTag finish_tag_;
789     CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
790     CallbackWithSuccessTag read_tag_;
791
792     ServerContext* ctx_;
793     Call call_;
794     ResponseType resp_;
795     std::function<void()> call_requester_;
796     experimental::ServerReadReactor<RequestType, ResponseType>* reactor_;
797     std::atomic_int callbacks_outstanding_{
798         3};  // reserve for OnStarted, Finish, and CompletionOp
799   };
800 };
801
802 template <class RequestType, class ResponseType>
803 class CallbackServerStreamingHandler : public MethodHandler {
804  public:
805   CallbackServerStreamingHandler(
806       std::function<
807           experimental::ServerWriteReactor<RequestType, ResponseType>*()>
808           func)
809       : func_(std::move(func)) {}
810   void RunHandler(const HandlerParameter& param) final {
811     // Arena allocate a writer structure
812     g_core_codegen_interface->grpc_call_ref(param.call->call());
813
814     experimental::ServerWriteReactor<RequestType, ResponseType>* reactor =
815         param.status.ok()
816             ? CatchingReactorCreator<
817                   experimental::ServerWriteReactor<RequestType, ResponseType>>(
818                   func_)
819             : nullptr;
820
821     if (reactor == nullptr) {
822       // if deserialization or reactor creator failed, we need to fail the call
823       reactor = new UnimplementedWriteReactor<RequestType, ResponseType>;
824     }
825
826     auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
827         param.call->call(), sizeof(ServerCallbackWriterImpl)))
828         ServerCallbackWriterImpl(param.server_context, param.call,
829                                  static_cast<RequestType*>(param.request),
830                                  std::move(param.call_requester), reactor);
831     writer->BindReactor(reactor);
832     reactor->OnStarted(param.server_context, writer->request());
833     // The earliest that OnCancel can be called is after OnStarted is done.
834     reactor->MaybeCallOnCancel();
835     writer->MaybeDone();
836   }
837
838   void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status,
839                     void** handler_data) final {
840     ByteBuffer buf;
841     buf.set_buffer(req);
842     auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
843         call, sizeof(RequestType))) RequestType();
844     *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
845     buf.Release();
846     if (status->ok()) {
847       return request;
848     }
849     request->~RequestType();
850     return nullptr;
851   }
852
853  private:
854   std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
855       func_;
856
857   class ServerCallbackWriterImpl
858       : public experimental::ServerCallbackWriter<ResponseType> {
859    public:
860     void Finish(Status s) override {
861       finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
862                       &finish_ops_);
863       finish_ops_.set_core_cq_tag(&finish_tag_);
864
865       if (!ctx_->sent_initial_metadata_) {
866         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
867                                         ctx_->initial_metadata_flags());
868         if (ctx_->compression_level_set()) {
869           finish_ops_.set_compression_level(ctx_->compression_level());
870         }
871         ctx_->sent_initial_metadata_ = true;
872       }
873       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
874       call_.PerformOps(&finish_ops_);
875     }
876
877     void SendInitialMetadata() override {
878       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
879       callbacks_outstanding_++;
880       meta_tag_.Set(call_.call(),
881                     [this](bool ok) {
882                       reactor_->OnSendInitialMetadataDone(ok);
883                       MaybeDone();
884                     },
885                     &meta_ops_);
886       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
887                                     ctx_->initial_metadata_flags());
888       if (ctx_->compression_level_set()) {
889         meta_ops_.set_compression_level(ctx_->compression_level());
890       }
891       ctx_->sent_initial_metadata_ = true;
892       meta_ops_.set_core_cq_tag(&meta_tag_);
893       call_.PerformOps(&meta_ops_);
894     }
895
896     void Write(const ResponseType* resp, WriteOptions options) override {
897       callbacks_outstanding_++;
898       if (options.is_last_message()) {
899         options.set_buffer_hint();
900       }
901       if (!ctx_->sent_initial_metadata_) {
902         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
903                                        ctx_->initial_metadata_flags());
904         if (ctx_->compression_level_set()) {
905           write_ops_.set_compression_level(ctx_->compression_level());
906         }
907         ctx_->sent_initial_metadata_ = true;
908       }
909       // TODO(vjpai): don't assert
910       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
911       call_.PerformOps(&write_ops_);
912     }
913
914     void WriteAndFinish(const ResponseType* resp, WriteOptions options,
915                         Status s) override {
916       // This combines the write into the finish callback
917       // Don't send any message if the status is bad
918       if (s.ok()) {
919         // TODO(vjpai): don't assert
920         GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
921       }
922       Finish(std::move(s));
923     }
924
925    private:
926     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
927
928     ServerCallbackWriterImpl(
929         ServerContext* ctx, Call* call, const RequestType* req,
930         std::function<void()> call_requester,
931         experimental::ServerWriteReactor<RequestType, ResponseType>* reactor)
932         : ctx_(ctx),
933           call_(*call),
934           req_(req),
935           call_requester_(std::move(call_requester)),
936           reactor_(reactor) {
937       ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
938       write_tag_.Set(call_.call(),
939                      [this](bool ok) {
940                        reactor_->OnWriteDone(ok);
941                        MaybeDone();
942                      },
943                      &write_ops_);
944       write_ops_.set_core_cq_tag(&write_tag_);
945     }
946     ~ServerCallbackWriterImpl() { req_->~RequestType(); }
947
948     const RequestType* request() { return req_; }
949
950     void MaybeDone() {
951       if (--callbacks_outstanding_ == 0) {
952         reactor_->OnDone();
953         grpc_call* call = call_.call();
954         auto call_requester = std::move(call_requester_);
955         this->~ServerCallbackWriterImpl();  // explicitly call destructor
956         g_core_codegen_interface->grpc_call_unref(call);
957         call_requester();
958       }
959     }
960
961     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
962     CallbackWithSuccessTag meta_tag_;
963     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
964               CallOpServerSendStatus>
965         finish_ops_;
966     CallbackWithSuccessTag finish_tag_;
967     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
968     CallbackWithSuccessTag write_tag_;
969
970     ServerContext* ctx_;
971     Call call_;
972     const RequestType* req_;
973     std::function<void()> call_requester_;
974     experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_;
975     std::atomic_int callbacks_outstanding_{
976         3};  // reserve for OnStarted, Finish, and CompletionOp
977   };
978 };
979
980 template <class RequestType, class ResponseType>
981 class CallbackBidiHandler : public MethodHandler {
982  public:
983   CallbackBidiHandler(
984       std::function<
985           experimental::ServerBidiReactor<RequestType, ResponseType>*()>
986           func)
987       : func_(std::move(func)) {}
988   void RunHandler(const HandlerParameter& param) final {
989     g_core_codegen_interface->grpc_call_ref(param.call->call());
990
991     experimental::ServerBidiReactor<RequestType, ResponseType>* reactor =
992         param.status.ok()
993             ? CatchingReactorCreator<
994                   experimental::ServerBidiReactor<RequestType, ResponseType>>(
995                   func_)
996             : nullptr;
997
998     if (reactor == nullptr) {
999       // if deserialization or reactor creator failed, we need to fail the call
1000       reactor = new UnimplementedBidiReactor<RequestType, ResponseType>;
1001     }
1002
1003     auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
1004         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
1005         ServerCallbackReaderWriterImpl(param.server_context, param.call,
1006                                        std::move(param.call_requester),
1007                                        reactor);
1008
1009     stream->BindReactor(reactor);
1010     reactor->OnStarted(param.server_context);
1011     // The earliest that OnCancel can be called is after OnStarted is done.
1012     reactor->MaybeCallOnCancel();
1013     stream->MaybeDone();
1014   }
1015
1016  private:
1017   std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
1018       func_;
1019
1020   class ServerCallbackReaderWriterImpl
1021       : public experimental::ServerCallbackReaderWriter<RequestType,
1022                                                         ResponseType> {
1023    public:
1024     void Finish(Status s) override {
1025       finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
1026                       &finish_ops_);
1027       finish_ops_.set_core_cq_tag(&finish_tag_);
1028
1029       if (!ctx_->sent_initial_metadata_) {
1030         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1031                                         ctx_->initial_metadata_flags());
1032         if (ctx_->compression_level_set()) {
1033           finish_ops_.set_compression_level(ctx_->compression_level());
1034         }
1035         ctx_->sent_initial_metadata_ = true;
1036       }
1037       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1038       call_.PerformOps(&finish_ops_);
1039     }
1040
1041     void SendInitialMetadata() override {
1042       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1043       callbacks_outstanding_++;
1044       meta_tag_.Set(call_.call(),
1045                     [this](bool ok) {
1046                       reactor_->OnSendInitialMetadataDone(ok);
1047                       MaybeDone();
1048                     },
1049                     &meta_ops_);
1050       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1051                                     ctx_->initial_metadata_flags());
1052       if (ctx_->compression_level_set()) {
1053         meta_ops_.set_compression_level(ctx_->compression_level());
1054       }
1055       ctx_->sent_initial_metadata_ = true;
1056       meta_ops_.set_core_cq_tag(&meta_tag_);
1057       call_.PerformOps(&meta_ops_);
1058     }
1059
1060     void Write(const ResponseType* resp, WriteOptions options) override {
1061       callbacks_outstanding_++;
1062       if (options.is_last_message()) {
1063         options.set_buffer_hint();
1064       }
1065       if (!ctx_->sent_initial_metadata_) {
1066         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1067                                        ctx_->initial_metadata_flags());
1068         if (ctx_->compression_level_set()) {
1069           write_ops_.set_compression_level(ctx_->compression_level());
1070         }
1071         ctx_->sent_initial_metadata_ = true;
1072       }
1073       // TODO(vjpai): don't assert
1074       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
1075       call_.PerformOps(&write_ops_);
1076     }
1077
1078     void WriteAndFinish(const ResponseType* resp, WriteOptions options,
1079                         Status s) override {
1080       // Don't send any message if the status is bad
1081       if (s.ok()) {
1082         // TODO(vjpai): don't assert
1083         GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
1084       }
1085       Finish(std::move(s));
1086     }
1087
1088     void Read(RequestType* req) override {
1089       callbacks_outstanding_++;
1090       read_ops_.RecvMessage(req);
1091       call_.PerformOps(&read_ops_);
1092     }
1093
1094    private:
1095     friend class CallbackBidiHandler<RequestType, ResponseType>;
1096
1097     ServerCallbackReaderWriterImpl(
1098         ServerContext* ctx, Call* call, std::function<void()> call_requester,
1099         experimental::ServerBidiReactor<RequestType, ResponseType>* reactor)
1100         : ctx_(ctx),
1101           call_(*call),
1102           call_requester_(std::move(call_requester)),
1103           reactor_(reactor) {
1104       ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
1105       write_tag_.Set(call_.call(),
1106                      [this](bool ok) {
1107                        reactor_->OnWriteDone(ok);
1108                        MaybeDone();
1109                      },
1110                      &write_ops_);
1111       write_ops_.set_core_cq_tag(&write_tag_);
1112       read_tag_.Set(call_.call(),
1113                     [this](bool ok) {
1114                       reactor_->OnReadDone(ok);
1115                       MaybeDone();
1116                     },
1117                     &read_ops_);
1118       read_ops_.set_core_cq_tag(&read_tag_);
1119     }
1120     ~ServerCallbackReaderWriterImpl() {}
1121
1122     void MaybeDone() {
1123       if (--callbacks_outstanding_ == 0) {
1124         reactor_->OnDone();
1125         grpc_call* call = call_.call();
1126         auto call_requester = std::move(call_requester_);
1127         this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
1128         g_core_codegen_interface->grpc_call_unref(call);
1129         call_requester();
1130       }
1131     }
1132
1133     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
1134     CallbackWithSuccessTag meta_tag_;
1135     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
1136               CallOpServerSendStatus>
1137         finish_ops_;
1138     CallbackWithSuccessTag finish_tag_;
1139     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
1140     CallbackWithSuccessTag write_tag_;
1141     CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
1142     CallbackWithSuccessTag read_tag_;
1143
1144     ServerContext* ctx_;
1145     Call call_;
1146     std::function<void()> call_requester_;
1147     experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_;
1148     std::atomic_int callbacks_outstanding_{
1149         3};  // reserve for OnStarted, Finish, and CompletionOp
1150   };
1151 };
1152
1153 }  // namespace internal
1154
1155 }  // namespace grpc
1156
1157 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H