Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / client_callback.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #include <atomic>
21 #include <functional>
22
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/call_op_set.h>
25 #include <grpcpp/impl/codegen/callback_common.h>
26 #include <grpcpp/impl/codegen/channel_interface.h>
27 #include <grpcpp/impl/codegen/config.h>
28 #include <grpcpp/impl/codegen/core_codegen_interface.h>
29 #include <grpcpp/impl/codegen/status.h>
30
31 namespace grpc {
32 class Channel;
33 class ClientContext;
34
35 namespace internal {
36 class RpcMethod;
37
38 /// Perform a callback-based unary call
39 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
40 template <class InputMessage, class OutputMessage>
41 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
42                        const ::grpc::internal::RpcMethod& method,
43                        ::grpc::ClientContext* context,
44                        const InputMessage* request, OutputMessage* result,
45                        std::function<void(::grpc::Status)> on_completion) {
46   CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
47       channel, method, context, request, result, on_completion);
48 }
49
50 template <class InputMessage, class OutputMessage>
51 class CallbackUnaryCallImpl {
52  public:
53   CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
54                         const ::grpc::internal::RpcMethod& method,
55                         ::grpc::ClientContext* context,
56                         const InputMessage* request, OutputMessage* result,
57                         std::function<void(::grpc::Status)> on_completion) {
58     ::grpc::CompletionQueue* cq = channel->CallbackCQ();
59     GPR_CODEGEN_ASSERT(cq != nullptr);
60     grpc::internal::Call call(channel->CreateCall(method, context, cq));
61
62     using FullCallOpSet = grpc::internal::CallOpSet<
63         ::grpc::internal::CallOpSendInitialMetadata,
64         grpc::internal::CallOpSendMessage,
65         grpc::internal::CallOpRecvInitialMetadata,
66         grpc::internal::CallOpRecvMessage<OutputMessage>,
67         grpc::internal::CallOpClientSendClose,
68         grpc::internal::CallOpClientRecvStatus>;
69
70     struct OpSetAndTag {
71       FullCallOpSet opset;
72       grpc::internal::CallbackWithStatusTag tag;
73     };
74     const size_t alloc_sz = sizeof(OpSetAndTag);
75     auto* const alloced = static_cast<OpSetAndTag*>(
76         ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
77                                                                 alloc_sz));
78     auto* ops = new (&alloced->opset) FullCallOpSet;
79     auto* tag = new (&alloced->tag)
80         grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
81
82     // TODO(vjpai): Unify code with sync API as much as possible
83     ::grpc::Status s = ops->SendMessagePtr(request);
84     if (!s.ok()) {
85       tag->force_run(s);
86       return;
87     }
88     ops->SendInitialMetadata(&context->send_initial_metadata_,
89                              context->initial_metadata_flags());
90     ops->RecvInitialMetadata(context);
91     ops->RecvMessage(result);
92     ops->AllowNoMessage();
93     ops->ClientSendClose();
94     ops->ClientRecvStatus(context, tag->status_ptr());
95     ops->set_core_cq_tag(tag);
96     call.PerformOps(ops);
97   }
98 };
99
100 // Base class for public API classes.
101 class ClientReactor {
102  public:
103   /// Called by the library when all operations associated with this RPC have
104   /// completed and all Holds have been removed. OnDone provides the RPC status
105   /// outcome for both successful and failed RPCs. If it is never called on an
106   /// RPC, it indicates an application-level problem (like failure to remove a
107   /// hold).
108   ///
109   /// \param[in] s The status outcome of this RPC
110   virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
111
112   /// InternalScheduleOnDone is not part of the API and is not meant to be
113   /// overridden. It is virtual to allow successful builds for certain bazel
114   /// build users that only want to depend on gRPC codegen headers and not the
115   /// full library (although this is not a generally-supported option). Although
116   /// the virtual call is slower than a direct call, this function is
117   /// heavyweight and the cost of the virtual call is not much in comparison.
118   /// This function may be removed or devirtualized in the future.
119   virtual void InternalScheduleOnDone(::grpc::Status s);
120 };
121
122 }  // namespace internal
123
124 // Forward declarations
125 template <class Request, class Response>
126 class ClientBidiReactor;
127 template <class Response>
128 class ClientReadReactor;
129 template <class Request>
130 class ClientWriteReactor;
131 class ClientUnaryReactor;
132
133 // NOTE: The streaming objects are not actually implemented in the public API.
134 //       These interfaces are provided for mocking only. Typical applications
135 //       will interact exclusively with the reactors that they define.
136 template <class Request, class Response>
137 class ClientCallbackReaderWriter {
138  public:
139   virtual ~ClientCallbackReaderWriter() {}
140   virtual void StartCall() = 0;
141   virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
142   virtual void WritesDone() = 0;
143   virtual void Read(Response* resp) = 0;
144   virtual void AddHold(int holds) = 0;
145   virtual void RemoveHold() = 0;
146
147  protected:
148   void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
149     reactor->BindStream(this);
150   }
151 };
152
153 template <class Response>
154 class ClientCallbackReader {
155  public:
156   virtual ~ClientCallbackReader() {}
157   virtual void StartCall() = 0;
158   virtual void Read(Response* resp) = 0;
159   virtual void AddHold(int holds) = 0;
160   virtual void RemoveHold() = 0;
161
162  protected:
163   void BindReactor(ClientReadReactor<Response>* reactor) {
164     reactor->BindReader(this);
165   }
166 };
167
168 template <class Request>
169 class ClientCallbackWriter {
170  public:
171   virtual ~ClientCallbackWriter() {}
172   virtual void StartCall() = 0;
173   void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
174   virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
175   void WriteLast(const Request* req, ::grpc::WriteOptions options) {
176     Write(req, options.set_last_message());
177   }
178   virtual void WritesDone() = 0;
179
180   virtual void AddHold(int holds) = 0;
181   virtual void RemoveHold() = 0;
182
183  protected:
184   void BindReactor(ClientWriteReactor<Request>* reactor) {
185     reactor->BindWriter(this);
186   }
187 };
188
189 class ClientCallbackUnary {
190  public:
191   virtual ~ClientCallbackUnary() {}
192   virtual void StartCall() = 0;
193
194  protected:
195   void BindReactor(ClientUnaryReactor* reactor);
196 };
197
198 // The following classes are the reactor interfaces that are to be implemented
199 // by the user. They are passed in to the library as an argument to a call on a
200 // stub (either a codegen-ed call or a generic call). The streaming RPC is
201 // activated by calling StartCall, possibly after initiating StartRead,
202 // StartWrite, or AddHold operations on the streaming object. Note that none of
203 // the classes are pure; all reactions have a default empty reaction so that the
204 // user class only needs to override those classes that it cares about.
205 // The reactor must be passed to the stub invocation before any of the below
206 // operations can be called.
207
208 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
209 template <class Request, class Response>
210 class ClientBidiReactor : public internal::ClientReactor {
211  public:
212   virtual ~ClientBidiReactor() {}
213
214   /// Activate the RPC and initiate any reads or writes that have been Start'ed
215   /// before this call. All streaming RPCs issued by the client MUST have
216   /// StartCall invoked on them (even if they are canceled) as this call is the
217   /// activation of their lifecycle.
218   void StartCall() { stream_->StartCall(); }
219
220   /// Initiate a read operation (or post it for later initiation if StartCall
221   /// has not yet been invoked).
222   ///
223   /// \param[out] resp Where to eventually store the read message. Valid when
224   ///                  the library calls OnReadDone
225   void StartRead(Response* resp) { stream_->Read(resp); }
226
227   /// Initiate a write operation (or post it for later initiation if StartCall
228   /// has not yet been invoked).
229   ///
230   /// \param[in] req The message to be written. The library does not take
231   ///                ownership but the caller must ensure that the message is
232   ///                not deleted or modified until OnWriteDone is called.
233   void StartWrite(const Request* req) {
234     StartWrite(req, ::grpc::WriteOptions());
235   }
236
237   /// Initiate/post a write operation with specified options.
238   ///
239   /// \param[in] req The message to be written. The library does not take
240   ///                ownership but the caller must ensure that the message is
241   ///                not deleted or modified until OnWriteDone is called.
242   /// \param[in] options The WriteOptions to use for writing this message
243   void StartWrite(const Request* req, ::grpc::WriteOptions options) {
244     stream_->Write(req, std::move(options));
245   }
246
247   /// Initiate/post a write operation with specified options and an indication
248   /// that this is the last write (like StartWrite and StartWritesDone, merged).
249   /// Note that calling this means that no more calls to StartWrite,
250   /// StartWriteLast, or StartWritesDone are allowed.
251   ///
252   /// \param[in] req The message to be written. The library does not take
253   ///                ownership but the caller must ensure that the message is
254   ///                not deleted or modified until OnWriteDone is called.
255   /// \param[in] options The WriteOptions to use for writing this message
256   void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
257     StartWrite(req, std::move(options.set_last_message()));
258   }
259
260   /// Indicate that the RPC will have no more write operations. This can only be
261   /// issued once for a given RPC. This is not required or allowed if
262   /// StartWriteLast is used since that already has the same implication.
263   /// Note that calling this means that no more calls to StartWrite,
264   /// StartWriteLast, or StartWritesDone are allowed.
265   void StartWritesDone() { stream_->WritesDone(); }
266
267   /// Holds are needed if (and only if) this stream has operations that take
268   /// place on it after StartCall but from outside one of the reactions
269   /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
270   ///
271   /// Holds must be added before calling StartCall. If a stream still has a hold
272   /// in place, its resources will not be destroyed even if the status has
273   /// already come in from the wire and there are currently no active callbacks
274   /// outstanding. Similarly, the stream will not call OnDone if there are still
275   /// holds on it.
276   ///
277   /// For example, if a StartRead or StartWrite operation is going to be
278   /// initiated from elsewhere in the application, the application should call
279   /// AddHold or AddMultipleHolds before StartCall.  If there is going to be,
280   /// for example, a read-flow and a write-flow taking place outside the
281   /// reactions, then call AddMultipleHolds(2) before StartCall. When the
282   /// application knows that it won't issue any more read operations (such as
283   /// when a read comes back as not ok), it should issue a RemoveHold(). It
284   /// should also call RemoveHold() again after it does StartWriteLast or
285   /// StartWritesDone that indicates that there will be no more write ops.
286   /// The number of RemoveHold calls must match the total number of AddHold
287   /// calls plus the number of holds added by AddMultipleHolds.
288   /// The argument to AddMultipleHolds must be positive.
289   void AddHold() { AddMultipleHolds(1); }
290   void AddMultipleHolds(int holds) {
291     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
292     stream_->AddHold(holds);
293   }
294   void RemoveHold() { stream_->RemoveHold(); }
295
296   /// Notifies the application that all operations associated with this RPC
297   /// have completed and all Holds have been removed. OnDone provides the RPC
298   /// status outcome for both successful and failed RPCs and will be called in
299   /// all cases. If it is not called, it indicates an application-level problem
300   /// (like failure to remove a hold).
301   ///
302   /// \param[in] s The status outcome of this RPC
303   void OnDone(const ::grpc::Status& /*s*/) override {}
304
305   /// Notifies the application that a read of initial metadata from the
306   /// server is done. If the application chooses not to implement this method,
307   /// it can assume that the initial metadata has been read before the first
308   /// call of OnReadDone or OnDone.
309   ///
310   /// \param[in] ok Was the initial metadata read successfully? If false, no
311   ///               new read/write operation will succeed, and any further
312   ///               Start* operations should not be called.
313   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
314
315   /// Notifies the application that a StartRead operation completed.
316   ///
317   /// \param[in] ok Was it successful? If false, no new read/write operation
318   ///               will succeed, and any further Start* should not be called.
319   virtual void OnReadDone(bool /*ok*/) {}
320
321   /// Notifies the application that a StartWrite or StartWriteLast operation
322   /// completed.
323   ///
324   /// \param[in] ok Was it successful? If false, no new read/write operation
325   ///               will succeed, and any further Start* should not be called.
326   virtual void OnWriteDone(bool /*ok*/) {}
327
328   /// Notifies the application that a StartWritesDone operation completed. Note
329   /// that this is only used on explicit StartWritesDone operations and not for
330   /// those that are implicitly invoked as part of a StartWriteLast.
331   ///
332   /// \param[in] ok Was it successful? If false, the application will later see
333   ///               the failure reflected as a bad status in OnDone and no
334   ///               further Start* should be called.
335   virtual void OnWritesDoneDone(bool /*ok*/) {}
336
337  private:
338   friend class ClientCallbackReaderWriter<Request, Response>;
339   void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
340     stream_ = stream;
341   }
342   ClientCallbackReaderWriter<Request, Response>* stream_;
343 };
344
345 /// \a ClientReadReactor is the interface for a server-streaming RPC.
346 /// All public methods behave as in ClientBidiReactor.
347 template <class Response>
348 class ClientReadReactor : public internal::ClientReactor {
349  public:
350   virtual ~ClientReadReactor() {}
351
352   void StartCall() { reader_->StartCall(); }
353   void StartRead(Response* resp) { reader_->Read(resp); }
354
355   void AddHold() { AddMultipleHolds(1); }
356   void AddMultipleHolds(int holds) {
357     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
358     reader_->AddHold(holds);
359   }
360   void RemoveHold() { reader_->RemoveHold(); }
361
362   void OnDone(const ::grpc::Status& /*s*/) override {}
363   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
364   virtual void OnReadDone(bool /*ok*/) {}
365
366  private:
367   friend class ClientCallbackReader<Response>;
368   void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
369   ClientCallbackReader<Response>* reader_;
370 };
371
372 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
373 /// All public methods behave as in ClientBidiReactor.
374 template <class Request>
375 class ClientWriteReactor : public internal::ClientReactor {
376  public:
377   virtual ~ClientWriteReactor() {}
378
379   void StartCall() { writer_->StartCall(); }
380   void StartWrite(const Request* req) {
381     StartWrite(req, ::grpc::WriteOptions());
382   }
383   void StartWrite(const Request* req, ::grpc::WriteOptions options) {
384     writer_->Write(req, std::move(options));
385   }
386   void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
387     StartWrite(req, std::move(options.set_last_message()));
388   }
389   void StartWritesDone() { writer_->WritesDone(); }
390
391   void AddHold() { AddMultipleHolds(1); }
392   void AddMultipleHolds(int holds) {
393     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
394     writer_->AddHold(holds);
395   }
396   void RemoveHold() { writer_->RemoveHold(); }
397
398   void OnDone(const ::grpc::Status& /*s*/) override {}
399   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
400   virtual void OnWriteDone(bool /*ok*/) {}
401   virtual void OnWritesDoneDone(bool /*ok*/) {}
402
403  private:
404   friend class ClientCallbackWriter<Request>;
405   void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
406
407   ClientCallbackWriter<Request>* writer_;
408 };
409
410 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
411 /// This is _not_ a common way of invoking a unary RPC. In practice, this
412 /// option should be used only if the unary RPC wants to receive initial
413 /// metadata without waiting for the response to complete. Most deployments of
414 /// RPC systems do not use this option, but it is needed for generality.
415 /// All public methods behave as in ClientBidiReactor.
416 /// StartCall is included for consistency with the other reactor flavors: even
417 /// though there are no StartRead or StartWrite operations to queue before the
418 /// call (that is part of the unary call itself) and there is no reactor object
419 /// being created as a result of this call, we keep a consistent 2-phase
420 /// initiation API among all the reactor flavors.
421 class ClientUnaryReactor : public internal::ClientReactor {
422  public:
423   virtual ~ClientUnaryReactor() {}
424
425   void StartCall() { call_->StartCall(); }
426   void OnDone(const ::grpc::Status& /*s*/) override {}
427   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
428
429  private:
430   friend class ClientCallbackUnary;
431   void BindCall(ClientCallbackUnary* call) { call_ = call; }
432   ClientCallbackUnary* call_;
433 };
434
435 // Define function out-of-line from class to avoid forward declaration issue
436 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
437   reactor->BindCall(this);
438 }
439
440 namespace internal {
441
442 // Forward declare factory classes for friendship
443 template <class Request, class Response>
444 class ClientCallbackReaderWriterFactory;
445 template <class Response>
446 class ClientCallbackReaderFactory;
447 template <class Request>
448 class ClientCallbackWriterFactory;
449
450 template <class Request, class Response>
451 class ClientCallbackReaderWriterImpl
452     : public ClientCallbackReaderWriter<Request, Response> {
453  public:
454   // always allocated against a call arena, no memory free required
455   static void operator delete(void* /*ptr*/, std::size_t size) {
456     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
457   }
458
459   // This operator should never be called as the memory should be freed as part
460   // of the arena destruction. It only exists to provide a matching operator
461   // delete to the operator new so that some compilers will not complain (see
462   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
463   // there are no tests catching the compiler warning.
464   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
465
466   void StartCall() override {
467     // This call initiates two batches, plus any backlog, each with a callback
468     // 1. Send initial metadata (unless corked) + recv initial metadata
469     // 2. Any read backlog
470     // 3. Any write backlog
471     // 4. Recv trailing metadata (unless corked)
472     if (!start_corked_) {
473       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
474                                      context_->initial_metadata_flags());
475     }
476
477     call_.PerformOps(&start_ops_);
478
479     {
480       grpc::internal::MutexLock lock(&start_mu_);
481
482       if (backlog_.read_ops) {
483         call_.PerformOps(&read_ops_);
484       }
485       if (backlog_.write_ops) {
486         call_.PerformOps(&write_ops_);
487       }
488       if (backlog_.writes_done_ops) {
489         call_.PerformOps(&writes_done_ops_);
490       }
491       call_.PerformOps(&finish_ops_);
492       // The last thing in this critical section is to set started_ so that it
493       // can be used lock-free as well.
494       started_.store(true, std::memory_order_release);
495     }
496     // MaybeFinish outside the lock to make sure that destruction of this object
497     // doesn't take place while holding the lock (which would cause the lock to
498     // be released after destruction)
499     this->MaybeFinish(/*from_reaction=*/false);
500   }
501
502   void Read(Response* msg) override {
503     read_ops_.RecvMessage(msg);
504     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
505     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
506       grpc::internal::MutexLock lock(&start_mu_);
507       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
508         backlog_.read_ops = true;
509         return;
510       }
511     }
512     call_.PerformOps(&read_ops_);
513   }
514
515   void Write(const Request* msg, ::grpc::WriteOptions options) override {
516     if (options.is_last_message()) {
517       options.set_buffer_hint();
518       write_ops_.ClientSendClose();
519     }
520     // TODO(vjpai): don't assert
521     GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
522     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
523     if (GPR_UNLIKELY(corked_write_needed_)) {
524       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
525                                      context_->initial_metadata_flags());
526       corked_write_needed_ = false;
527     }
528
529     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
530       grpc::internal::MutexLock lock(&start_mu_);
531       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
532         backlog_.write_ops = true;
533         return;
534       }
535     }
536     call_.PerformOps(&write_ops_);
537   }
538   void WritesDone() override {
539     writes_done_ops_.ClientSendClose();
540     writes_done_tag_.Set(call_.call(),
541                          [this](bool ok) {
542                            reactor_->OnWritesDoneDone(ok);
543                            MaybeFinish(/*from_reaction=*/true);
544                          },
545                          &writes_done_ops_, /*can_inline=*/false);
546     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
547     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
548     if (GPR_UNLIKELY(corked_write_needed_)) {
549       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
550                                            context_->initial_metadata_flags());
551       corked_write_needed_ = false;
552     }
553     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
554       grpc::internal::MutexLock lock(&start_mu_);
555       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
556         backlog_.writes_done_ops = true;
557         return;
558       }
559     }
560     call_.PerformOps(&writes_done_ops_);
561   }
562
563   void AddHold(int holds) override {
564     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
565   }
566   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
567
568  private:
569   friend class ClientCallbackReaderWriterFactory<Request, Response>;
570
571   ClientCallbackReaderWriterImpl(grpc::internal::Call call,
572                                  ::grpc::ClientContext* context,
573                                  ClientBidiReactor<Request, Response>* reactor)
574       : context_(context),
575         call_(call),
576         reactor_(reactor),
577         start_corked_(context_->initial_metadata_corked_),
578         corked_write_needed_(start_corked_) {
579     this->BindReactor(reactor);
580
581     // Set up the unchanging parts of the start, read, and write tags and ops.
582     start_tag_.Set(call_.call(),
583                    [this](bool ok) {
584                      reactor_->OnReadInitialMetadataDone(ok);
585                      MaybeFinish(/*from_reaction=*/true);
586                    },
587                    &start_ops_, /*can_inline=*/false);
588     start_ops_.RecvInitialMetadata(context_);
589     start_ops_.set_core_cq_tag(&start_tag_);
590
591     write_tag_.Set(call_.call(),
592                    [this](bool ok) {
593                      reactor_->OnWriteDone(ok);
594                      MaybeFinish(/*from_reaction=*/true);
595                    },
596                    &write_ops_, /*can_inline=*/false);
597     write_ops_.set_core_cq_tag(&write_tag_);
598
599     read_tag_.Set(call_.call(),
600                   [this](bool ok) {
601                     reactor_->OnReadDone(ok);
602                     MaybeFinish(/*from_reaction=*/true);
603                   },
604                   &read_ops_, /*can_inline=*/false);
605     read_ops_.set_core_cq_tag(&read_tag_);
606
607     // Also set up the Finish tag and op set.
608     finish_tag_.Set(
609         call_.call(),
610         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
611         &finish_ops_,
612         /*can_inline=*/false);
613     finish_ops_.ClientRecvStatus(context_, &finish_status_);
614     finish_ops_.set_core_cq_tag(&finish_tag_);
615   }
616
617   // MaybeFinish can be called from reactions or from user-initiated operations
618   // like StartCall or RemoveHold. If this is the last operation or hold on this
619   // object, it will invoke the OnDone reaction. If MaybeFinish was called from
620   // a reaction, it can call OnDone directly. If not, it would need to schedule
621   // OnDone onto an executor thread to avoid the possibility of deadlocking with
622   // any locks in the user code that invoked it.
623   void MaybeFinish(bool from_reaction) {
624     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
625                          1, std::memory_order_acq_rel) == 1)) {
626       ::grpc::Status s = std::move(finish_status_);
627       auto* reactor = reactor_;
628       auto* call = call_.call();
629       this->~ClientCallbackReaderWriterImpl();
630       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
631       if (GPR_LIKELY(from_reaction)) {
632         reactor->OnDone(s);
633       } else {
634         reactor->InternalScheduleOnDone(std::move(s));
635       }
636     }
637   }
638
639   ::grpc::ClientContext* const context_;
640   grpc::internal::Call call_;
641   ClientBidiReactor<Request, Response>* const reactor_;
642
643   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
644                             grpc::internal::CallOpRecvInitialMetadata>
645       start_ops_;
646   grpc::internal::CallbackWithSuccessTag start_tag_;
647   const bool start_corked_;
648   bool corked_write_needed_;  // no lock needed since only accessed in
649                               // Write/WritesDone which cannot be concurrent
650
651   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
652   grpc::internal::CallbackWithSuccessTag finish_tag_;
653   ::grpc::Status finish_status_;
654
655   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
656                             grpc::internal::CallOpSendMessage,
657                             grpc::internal::CallOpClientSendClose>
658       write_ops_;
659   grpc::internal::CallbackWithSuccessTag write_tag_;
660
661   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
662                             grpc::internal::CallOpClientSendClose>
663       writes_done_ops_;
664   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
665
666   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
667       read_ops_;
668   grpc::internal::CallbackWithSuccessTag read_tag_;
669
670   struct StartCallBacklog {
671     bool write_ops = false;
672     bool writes_done_ops = false;
673     bool read_ops = false;
674   };
675   StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
676
677   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
678   std::atomic<intptr_t> callbacks_outstanding_{3};
679   std::atomic_bool started_{false};
680   grpc::internal::Mutex start_mu_;
681 };
682
683 template <class Request, class Response>
684 class ClientCallbackReaderWriterFactory {
685  public:
686   static void Create(::grpc::ChannelInterface* channel,
687                      const ::grpc::internal::RpcMethod& method,
688                      ::grpc::ClientContext* context,
689                      ClientBidiReactor<Request, Response>* reactor) {
690     grpc::internal::Call call =
691         channel->CreateCall(method, context, channel->CallbackCQ());
692
693     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
694     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
695         call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
696         ClientCallbackReaderWriterImpl<Request, Response>(call, context,
697                                                           reactor);
698   }
699 };
700
701 template <class Response>
702 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
703  public:
704   // always allocated against a call arena, no memory free required
705   static void operator delete(void* /*ptr*/, std::size_t size) {
706     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
707   }
708
709   // This operator should never be called as the memory should be freed as part
710   // of the arena destruction. It only exists to provide a matching operator
711   // delete to the operator new so that some compilers will not complain (see
712   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
713   // there are no tests catching the compiler warning.
714   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
715
716   void StartCall() override {
717     // This call initiates two batches, plus any backlog, each with a callback
718     // 1. Send initial metadata (unless corked) + recv initial metadata
719     // 2. Any backlog
720     // 3. Recv trailing metadata
721
722     start_tag_.Set(call_.call(),
723                    [this](bool ok) {
724                      reactor_->OnReadInitialMetadataDone(ok);
725                      MaybeFinish(/*from_reaction=*/true);
726                    },
727                    &start_ops_, /*can_inline=*/false);
728     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
729                                    context_->initial_metadata_flags());
730     start_ops_.RecvInitialMetadata(context_);
731     start_ops_.set_core_cq_tag(&start_tag_);
732     call_.PerformOps(&start_ops_);
733
734     // Also set up the read tag so it doesn't have to be set up each time
735     read_tag_.Set(call_.call(),
736                   [this](bool ok) {
737                     reactor_->OnReadDone(ok);
738                     MaybeFinish(/*from_reaction=*/true);
739                   },
740                   &read_ops_, /*can_inline=*/false);
741     read_ops_.set_core_cq_tag(&read_tag_);
742
743     {
744       grpc::internal::MutexLock lock(&start_mu_);
745       if (backlog_.read_ops) {
746         call_.PerformOps(&read_ops_);
747       }
748       started_.store(true, std::memory_order_release);
749     }
750
751     finish_tag_.Set(
752         call_.call(),
753         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
754         &finish_ops_, /*can_inline=*/false);
755     finish_ops_.ClientRecvStatus(context_, &finish_status_);
756     finish_ops_.set_core_cq_tag(&finish_tag_);
757     call_.PerformOps(&finish_ops_);
758   }
759
760   void Read(Response* msg) override {
761     read_ops_.RecvMessage(msg);
762     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
763     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
764       grpc::internal::MutexLock lock(&start_mu_);
765       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
766         backlog_.read_ops = true;
767         return;
768       }
769     }
770     call_.PerformOps(&read_ops_);
771   }
772
773   void AddHold(int holds) override {
774     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
775   }
776   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
777
778  private:
779   friend class ClientCallbackReaderFactory<Response>;
780
781   template <class Request>
782   ClientCallbackReaderImpl(::grpc::internal::Call call,
783                            ::grpc::ClientContext* context, Request* request,
784                            ClientReadReactor<Response>* reactor)
785       : context_(context), call_(call), reactor_(reactor) {
786     this->BindReactor(reactor);
787     // TODO(vjpai): don't assert
788     GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
789     start_ops_.ClientSendClose();
790   }
791
792   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
793   void MaybeFinish(bool from_reaction) {
794     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
795                          1, std::memory_order_acq_rel) == 1)) {
796       ::grpc::Status s = std::move(finish_status_);
797       auto* reactor = reactor_;
798       auto* call = call_.call();
799       this->~ClientCallbackReaderImpl();
800       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
801       if (GPR_LIKELY(from_reaction)) {
802         reactor->OnDone(s);
803       } else {
804         reactor->InternalScheduleOnDone(std::move(s));
805       }
806     }
807   }
808
809   ::grpc::ClientContext* const context_;
810   grpc::internal::Call call_;
811   ClientReadReactor<Response>* const reactor_;
812
813   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
814                             grpc::internal::CallOpSendMessage,
815                             grpc::internal::CallOpClientSendClose,
816                             grpc::internal::CallOpRecvInitialMetadata>
817       start_ops_;
818   grpc::internal::CallbackWithSuccessTag start_tag_;
819
820   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
821   grpc::internal::CallbackWithSuccessTag finish_tag_;
822   ::grpc::Status finish_status_;
823
824   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
825       read_ops_;
826   grpc::internal::CallbackWithSuccessTag read_tag_;
827
828   struct StartCallBacklog {
829     bool read_ops = false;
830   };
831   StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
832
833   // Minimum of 2 callbacks to pre-register for start and finish
834   std::atomic<intptr_t> callbacks_outstanding_{2};
835   std::atomic_bool started_{false};
836   grpc::internal::Mutex start_mu_;
837 };
838
839 template <class Response>
840 class ClientCallbackReaderFactory {
841  public:
842   template <class Request>
843   static void Create(::grpc::ChannelInterface* channel,
844                      const ::grpc::internal::RpcMethod& method,
845                      ::grpc::ClientContext* context, const Request* request,
846                      ClientReadReactor<Response>* reactor) {
847     grpc::internal::Call call =
848         channel->CreateCall(method, context, channel->CallbackCQ());
849
850     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
851     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
852         call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
853         ClientCallbackReaderImpl<Response>(call, context, request, reactor);
854   }
855 };
856
857 template <class Request>
858 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
859  public:
860   // always allocated against a call arena, no memory free required
861   static void operator delete(void* /*ptr*/, std::size_t size) {
862     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
863   }
864
865   // This operator should never be called as the memory should be freed as part
866   // of the arena destruction. It only exists to provide a matching operator
867   // delete to the operator new so that some compilers will not complain (see
868   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
869   // there are no tests catching the compiler warning.
870   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
871
872   void StartCall() override {
873     // This call initiates two batches, plus any backlog, each with a callback
874     // 1. Send initial metadata (unless corked) + recv initial metadata
875     // 2. Any backlog
876     // 3. Recv trailing metadata
877
878     if (!start_corked_) {
879       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
880                                      context_->initial_metadata_flags());
881     }
882     call_.PerformOps(&start_ops_);
883
884     {
885       grpc::internal::MutexLock lock(&start_mu_);
886
887       if (backlog_.write_ops) {
888         call_.PerformOps(&write_ops_);
889       }
890       if (backlog_.writes_done_ops) {
891         call_.PerformOps(&writes_done_ops_);
892       }
893       call_.PerformOps(&finish_ops_);
894       // The last thing in this critical section is to set started_ so that it
895       // can be used lock-free as well.
896       started_.store(true, std::memory_order_release);
897     }
898     // MaybeFinish outside the lock to make sure that destruction of this object
899     // doesn't take place while holding the lock (which would cause the lock to
900     // be released after destruction)
901     this->MaybeFinish(/*from_reaction=*/false);
902   }
903
904   void Write(const Request* msg, ::grpc::WriteOptions options) override {
905     if (GPR_UNLIKELY(options.is_last_message())) {
906       options.set_buffer_hint();
907       write_ops_.ClientSendClose();
908     }
909     // TODO(vjpai): don't assert
910     GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
911     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
912
913     if (GPR_UNLIKELY(corked_write_needed_)) {
914       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
915                                      context_->initial_metadata_flags());
916       corked_write_needed_ = false;
917     }
918
919     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
920       grpc::internal::MutexLock lock(&start_mu_);
921       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
922         backlog_.write_ops = true;
923         return;
924       }
925     }
926     call_.PerformOps(&write_ops_);
927   }
928
929   void WritesDone() override {
930     writes_done_ops_.ClientSendClose();
931     writes_done_tag_.Set(call_.call(),
932                          [this](bool ok) {
933                            reactor_->OnWritesDoneDone(ok);
934                            MaybeFinish(/*from_reaction=*/true);
935                          },
936                          &writes_done_ops_, /*can_inline=*/false);
937     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
938     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
939
940     if (GPR_UNLIKELY(corked_write_needed_)) {
941       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
942                                            context_->initial_metadata_flags());
943       corked_write_needed_ = false;
944     }
945
946     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
947       grpc::internal::MutexLock lock(&start_mu_);
948       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
949         backlog_.writes_done_ops = true;
950         return;
951       }
952     }
953     call_.PerformOps(&writes_done_ops_);
954   }
955
956   void AddHold(int holds) override {
957     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
958   }
959   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
960
961  private:
962   friend class ClientCallbackWriterFactory<Request>;
963
964   template <class Response>
965   ClientCallbackWriterImpl(::grpc::internal::Call call,
966                            ::grpc::ClientContext* context, Response* response,
967                            ClientWriteReactor<Request>* reactor)
968       : context_(context),
969         call_(call),
970         reactor_(reactor),
971         start_corked_(context_->initial_metadata_corked_),
972         corked_write_needed_(start_corked_) {
973     this->BindReactor(reactor);
974
975     // Set up the unchanging parts of the start and write tags and ops.
976     start_tag_.Set(call_.call(),
977                    [this](bool ok) {
978                      reactor_->OnReadInitialMetadataDone(ok);
979                      MaybeFinish(/*from_reaction=*/true);
980                    },
981                    &start_ops_, /*can_inline=*/false);
982     start_ops_.RecvInitialMetadata(context_);
983     start_ops_.set_core_cq_tag(&start_tag_);
984
985     write_tag_.Set(call_.call(),
986                    [this](bool ok) {
987                      reactor_->OnWriteDone(ok);
988                      MaybeFinish(/*from_reaction=*/true);
989                    },
990                    &write_ops_, /*can_inline=*/false);
991     write_ops_.set_core_cq_tag(&write_tag_);
992
993     // Also set up the Finish tag and op set.
994     finish_ops_.RecvMessage(response);
995     finish_ops_.AllowNoMessage();
996     finish_tag_.Set(
997         call_.call(),
998         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
999         &finish_ops_,
1000         /*can_inline=*/false);
1001     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1002     finish_ops_.set_core_cq_tag(&finish_tag_);
1003   }
1004
1005   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1006   void MaybeFinish(bool from_reaction) {
1007     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1008                          1, std::memory_order_acq_rel) == 1)) {
1009       ::grpc::Status s = std::move(finish_status_);
1010       auto* reactor = reactor_;
1011       auto* call = call_.call();
1012       this->~ClientCallbackWriterImpl();
1013       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1014       if (GPR_LIKELY(from_reaction)) {
1015         reactor->OnDone(s);
1016       } else {
1017         reactor->InternalScheduleOnDone(std::move(s));
1018       }
1019     }
1020   }
1021
1022   ::grpc::ClientContext* const context_;
1023   grpc::internal::Call call_;
1024   ClientWriteReactor<Request>* const reactor_;
1025
1026   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1027                             grpc::internal::CallOpRecvInitialMetadata>
1028       start_ops_;
1029   grpc::internal::CallbackWithSuccessTag start_tag_;
1030   const bool start_corked_;
1031   bool corked_write_needed_;  // no lock needed since only accessed in
1032                               // Write/WritesDone which cannot be concurrent
1033
1034   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1035                             grpc::internal::CallOpClientRecvStatus>
1036       finish_ops_;
1037   grpc::internal::CallbackWithSuccessTag finish_tag_;
1038   ::grpc::Status finish_status_;
1039
1040   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1041                             grpc::internal::CallOpSendMessage,
1042                             grpc::internal::CallOpClientSendClose>
1043       write_ops_;
1044   grpc::internal::CallbackWithSuccessTag write_tag_;
1045
1046   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1047                             grpc::internal::CallOpClientSendClose>
1048       writes_done_ops_;
1049   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1050
1051   struct StartCallBacklog {
1052     bool write_ops = false;
1053     bool writes_done_ops = false;
1054   };
1055   StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1056
1057   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1058   std::atomic<intptr_t> callbacks_outstanding_{3};
1059   std::atomic_bool started_{false};
1060   grpc::internal::Mutex start_mu_;
1061 };
1062
1063 template <class Request>
1064 class ClientCallbackWriterFactory {
1065  public:
1066   template <class Response>
1067   static void Create(::grpc::ChannelInterface* channel,
1068                      const ::grpc::internal::RpcMethod& method,
1069                      ::grpc::ClientContext* context, Response* response,
1070                      ClientWriteReactor<Request>* reactor) {
1071     grpc::internal::Call call =
1072         channel->CreateCall(method, context, channel->CallbackCQ());
1073
1074     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1075     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1076         call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1077         ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1078   }
1079 };
1080
1081 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1082  public:
1083   // always allocated against a call arena, no memory free required
1084   static void operator delete(void* /*ptr*/, std::size_t size) {
1085     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1086   }
1087
1088   // This operator should never be called as the memory should be freed as part
1089   // of the arena destruction. It only exists to provide a matching operator
1090   // delete to the operator new so that some compilers will not complain (see
1091   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1092   // there are no tests catching the compiler warning.
1093   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1094
1095   void StartCall() override {
1096     // This call initiates two batches, each with a callback
1097     // 1. Send initial metadata + write + writes done + recv initial metadata
1098     // 2. Read message, recv trailing metadata
1099
1100     start_tag_.Set(call_.call(),
1101                    [this](bool ok) {
1102                      reactor_->OnReadInitialMetadataDone(ok);
1103                      MaybeFinish();
1104                    },
1105                    &start_ops_, /*can_inline=*/false);
1106     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1107                                    context_->initial_metadata_flags());
1108     start_ops_.RecvInitialMetadata(context_);
1109     start_ops_.set_core_cq_tag(&start_tag_);
1110     call_.PerformOps(&start_ops_);
1111
1112     finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1113                     &finish_ops_,
1114                     /*can_inline=*/false);
1115     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1116     finish_ops_.set_core_cq_tag(&finish_tag_);
1117     call_.PerformOps(&finish_ops_);
1118   }
1119
1120  private:
1121   friend class ClientCallbackUnaryFactory;
1122
1123   template <class Request, class Response>
1124   ClientCallbackUnaryImpl(::grpc::internal::Call call,
1125                           ::grpc::ClientContext* context, Request* request,
1126                           Response* response, ClientUnaryReactor* reactor)
1127       : context_(context), call_(call), reactor_(reactor) {
1128     this->BindReactor(reactor);
1129     // TODO(vjpai): don't assert
1130     GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1131     start_ops_.ClientSendClose();
1132     finish_ops_.RecvMessage(response);
1133     finish_ops_.AllowNoMessage();
1134   }
1135
1136   // In the unary case, MaybeFinish is only ever invoked from a
1137   // library-initiated reaction, so it will just directly call OnDone if this is
1138   // the last reaction for this RPC.
1139   void MaybeFinish() {
1140     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1141                          1, std::memory_order_acq_rel) == 1)) {
1142       ::grpc::Status s = std::move(finish_status_);
1143       auto* reactor = reactor_;
1144       auto* call = call_.call();
1145       this->~ClientCallbackUnaryImpl();
1146       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1147       reactor->OnDone(s);
1148     }
1149   }
1150
1151   ::grpc::ClientContext* const context_;
1152   grpc::internal::Call call_;
1153   ClientUnaryReactor* const reactor_;
1154
1155   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1156                             grpc::internal::CallOpSendMessage,
1157                             grpc::internal::CallOpClientSendClose,
1158                             grpc::internal::CallOpRecvInitialMetadata>
1159       start_ops_;
1160   grpc::internal::CallbackWithSuccessTag start_tag_;
1161
1162   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1163                             grpc::internal::CallOpClientRecvStatus>
1164       finish_ops_;
1165   grpc::internal::CallbackWithSuccessTag finish_tag_;
1166   ::grpc::Status finish_status_;
1167
1168   // This call will have 2 callbacks: start and finish
1169   std::atomic<intptr_t> callbacks_outstanding_{2};
1170 };
1171
1172 class ClientCallbackUnaryFactory {
1173  public:
1174   template <class Request, class Response>
1175   static void Create(::grpc::ChannelInterface* channel,
1176                      const ::grpc::internal::RpcMethod& method,
1177                      ::grpc::ClientContext* context, const Request* request,
1178                      Response* response, ClientUnaryReactor* reactor) {
1179     grpc::internal::Call call =
1180         channel->CreateCall(method, context, channel->CallbackCQ());
1181
1182     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1183
1184     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1185         call.call(), sizeof(ClientCallbackUnaryImpl)))
1186         ClientCallbackUnaryImpl(call, context, request, response, reactor);
1187   }
1188 };
1189
1190 }  // namespace internal
1191
1192 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
1193 namespace experimental {
1194
1195 template <class Response>
1196 using ClientCallbackReader = ::grpc::ClientCallbackReader<Response>;
1197
1198 template <class Request>
1199 using ClientCallbackWriter = ::grpc::ClientCallbackWriter<Request>;
1200
1201 template <class Request, class Response>
1202 using ClientCallbackReaderWriter =
1203     ::grpc::ClientCallbackReaderWriter<Request, Response>;
1204
1205 template <class Response>
1206 using ClientReadReactor = ::grpc::ClientReadReactor<Response>;
1207
1208 template <class Request>
1209 using ClientWriteReactor = ::grpc::ClientWriteReactor<Request>;
1210
1211 template <class Request, class Response>
1212 using ClientBidiReactor = ::grpc::ClientBidiReactor<Request, Response>;
1213
1214 typedef ::grpc::ClientUnaryReactor ClientUnaryReactor;
1215
1216 }  // namespace experimental
1217
1218 }  // namespace grpc
1219 #endif  // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H