Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / async_stream.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
19 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
20
21 // IWYU pragma: private, include <grpcpp/support/async_stream.h>
22
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/channel_interface.h>
25 #include <grpcpp/impl/codegen/core_codegen_interface.h>
26 #include <grpcpp/impl/codegen/server_context.h>
27 #include <grpcpp/impl/codegen/service_type.h>
28 #include <grpcpp/impl/codegen/status.h>
29
30 namespace grpc {
31
32 namespace internal {
33 /// Common interface for all client side asynchronous streaming.
34 class ClientAsyncStreamingInterface {
35  public:
36   virtual ~ClientAsyncStreamingInterface() {}
37
38   /// Start the call that was set up by the constructor, but only if the
39   /// constructor was invoked through the "Prepare" API which doesn't actually
40   /// start the call
41   virtual void StartCall(void* tag) = 0;
42
43   /// Request notification of the reading of the initial metadata. Completion
44   /// will be notified by \a tag on the associated completion queue.
45   /// This call is optional, but if it is used, it cannot be used concurrently
46   /// with or after the \a AsyncReaderInterface::Read method.
47   ///
48   /// \param[in] tag Tag identifying this request.
49   virtual void ReadInitialMetadata(void* tag) = 0;
50
51   /// Indicate that the stream is to be finished and request notification for
52   /// when the call has been ended.
53   /// Should not be used concurrently with other operations.
54   ///
55   /// It is appropriate to call this method exactly once when both:
56   ///   * the client side has no more message to send
57   ///     (this can be declared implicitly by calling this method, or
58   ///     explicitly through an earlier call to the <i>WritesDone</i> method
59   ///     of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or
60   ///     \a ClientAsyncReaderWriterInterface::WritesDone).
61   ///   * there are no more messages to be received from the server (this can
62   ///     be known implicitly by the calling code, or explicitly from an
63   ///     earlier call to \a AsyncReaderInterface::Read that yielded a failed
64   ///     result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
65   ///
66   /// The tag will be returned when either:
67   /// - all incoming messages have been read and the server has returned
68   ///   a status.
69   /// - the server has returned a non-OK status.
70   /// - the call failed for some reason and the library generated a
71   ///   status.
72   ///
73   /// Note that implementations of this method attempt to receive initial
74   /// metadata from the server if initial metadata hasn't yet been received.
75   ///
76   /// \param[in] tag Tag identifying this request.
77   /// \param[out] status To be updated with the operation status.
78   virtual void Finish(::grpc::Status* status, void* tag) = 0;
79 };
80
81 /// An interface that yields a sequence of messages of type \a R.
82 template <class R>
83 class AsyncReaderInterface {
84  public:
85   virtual ~AsyncReaderInterface() {}
86
87   /// Read a message of type \a R into \a msg. Completion will be notified by \a
88   /// tag on the associated completion queue.
89   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
90   /// should not be called concurrently with other streaming APIs
91   /// on the same stream. It is not meaningful to call it concurrently
92   /// with another \a AsyncReaderInterface::Read on the same stream since reads
93   /// on the same stream are delivered in order.
94   ///
95   /// \param[out] msg Where to eventually store the read message.
96   /// \param[in] tag The tag identifying the operation.
97   ///
98   /// Side effect: note that this method attempt to receive initial metadata for
99   /// a stream if it hasn't yet been received.
100   virtual void Read(R* msg, void* tag) = 0;
101 };
102
103 /// An interface that can be fed a sequence of messages of type \a W.
104 template <class W>
105 class AsyncWriterInterface {
106  public:
107   virtual ~AsyncWriterInterface() {}
108
109   /// Request the writing of \a msg with identifying tag \a tag.
110   ///
111   /// Only one write may be outstanding at any given time. This means that
112   /// after calling Write, one must wait to receive \a tag from the completion
113   /// queue BEFORE calling Write again.
114   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
115   ///
116   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
117   /// to deallocate once Write returns.
118   ///
119   /// \param[in] msg The message to be written.
120   /// \param[in] tag The tag identifying the operation.
121   virtual void Write(const W& msg, void* tag) = 0;
122
123   /// Request the writing of \a msg using WriteOptions \a options with
124   /// identifying tag \a tag.
125   ///
126   /// Only one write may be outstanding at any given time. This means that
127   /// after calling Write, one must wait to receive \a tag from the completion
128   /// queue BEFORE calling Write again.
129   /// WriteOptions \a options is used to set the write options of this message.
130   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
131   ///
132   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
133   /// to deallocate once Write returns.
134   ///
135   /// \param[in] msg The message to be written.
136   /// \param[in] options The WriteOptions to be used to write this message.
137   /// \param[in] tag The tag identifying the operation.
138   virtual void Write(const W& msg, ::grpc::WriteOptions options, void* tag) = 0;
139
140   /// Request the writing of \a msg and coalesce it with the writing
141   /// of trailing metadata, using WriteOptions \a options with
142   /// identifying tag \a tag.
143   ///
144   /// For client, WriteLast is equivalent of performing Write and
145   /// WritesDone in a single step.
146   /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
147   /// until Finish is called, where \a msg and trailing metadata are coalesced
148   /// and write is initiated. Note that WriteLast can only buffer \a msg up to
149   /// the flow control window size. If \a msg size is larger than the window
150   /// size, it will be sent on wire without buffering.
151   ///
152   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
153   /// to deallocate once Write returns.
154   ///
155   /// \param[in] msg The message to be written.
156   /// \param[in] options The WriteOptions to be used to write this message.
157   /// \param[in] tag The tag identifying the operation.
158   void WriteLast(const W& msg, ::grpc::WriteOptions options, void* tag) {
159     Write(msg, options.set_last_message(), tag);
160   }
161 };
162
163 }  // namespace internal
164
165 template <class R>
166 class ClientAsyncReaderInterface
167     : public internal::ClientAsyncStreamingInterface,
168       public internal::AsyncReaderInterface<R> {};
169
170 namespace internal {
171 template <class R>
172 class ClientAsyncReaderFactory {
173  public:
174   /// Create a stream object.
175   /// Write the first request out if \a start is set.
176   /// \a tag will be notified on \a cq when the call has been started and
177   /// \a request has been written out. If \a start is not set, \a tag must be
178   /// nullptr and the actual call must be initiated by StartCall
179   /// Note that \a context will be used to fill in custom initial metadata
180   /// used to send to the server when starting the call.
181   template <class W>
182   static ClientAsyncReader<R>* Create(::grpc::ChannelInterface* channel,
183                                       ::grpc::CompletionQueue* cq,
184                                       const ::grpc::internal::RpcMethod& method,
185                                       ::grpc::ClientContext* context,
186                                       const W& request, bool start, void* tag) {
187     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
188     return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
189         call.call(), sizeof(ClientAsyncReader<R>)))
190         ClientAsyncReader<R>(call, context, request, start, tag);
191   }
192 };
193 }  // namespace internal
194
195 /// Async client-side API for doing server-streaming RPCs,
196 /// where the incoming message stream coming from the server has
197 /// messages of type \a R.
198 template <class R>
199 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
200  public:
201   // always allocated against a call arena, no memory free required
202   static void operator delete(void* /*ptr*/, std::size_t size) {
203     GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader));
204   }
205
206   // This operator should never be called as the memory should be freed as part
207   // of the arena destruction. It only exists to provide a matching operator
208   // delete to the operator new so that some compilers will not complain (see
209   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
210   // there are no tests catching the compiler warning.
211   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
212
213   void StartCall(void* tag) override {
214     GPR_CODEGEN_ASSERT(!started_);
215     started_ = true;
216     StartCallInternal(tag);
217   }
218
219   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata
220   /// method for semantics.
221   ///
222   /// Side effect:
223   ///   - upon receiving initial metadata from the server,
224   ///     the \a ClientContext associated with this call is updated, and the
225   ///     calling code can access the received metadata through the
226   ///     \a ClientContext.
227   void ReadInitialMetadata(void* tag) override {
228     GPR_CODEGEN_ASSERT(started_);
229     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
230
231     meta_ops_.set_output_tag(tag);
232     meta_ops_.RecvInitialMetadata(context_);
233     call_.PerformOps(&meta_ops_);
234   }
235
236   void Read(R* msg, void* tag) override {
237     GPR_CODEGEN_ASSERT(started_);
238     read_ops_.set_output_tag(tag);
239     if (!context_->initial_metadata_received_) {
240       read_ops_.RecvInitialMetadata(context_);
241     }
242     read_ops_.RecvMessage(msg);
243     call_.PerformOps(&read_ops_);
244   }
245
246   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
247   ///
248   /// Side effect:
249   ///   - the \a ClientContext associated with this call is updated with
250   ///     possible initial and trailing metadata received from the server.
251   void Finish(::grpc::Status* status, void* tag) override {
252     GPR_CODEGEN_ASSERT(started_);
253     finish_ops_.set_output_tag(tag);
254     if (!context_->initial_metadata_received_) {
255       finish_ops_.RecvInitialMetadata(context_);
256     }
257     finish_ops_.ClientRecvStatus(context_, status);
258     call_.PerformOps(&finish_ops_);
259   }
260
261  private:
262   friend class internal::ClientAsyncReaderFactory<R>;
263   template <class W>
264   ClientAsyncReader(::grpc::internal::Call call, ::grpc::ClientContext* context,
265                     const W& request, bool start, void* tag)
266       : context_(context), call_(call), started_(start) {
267     // TODO(ctiller): don't assert
268     GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
269     init_ops_.ClientSendClose();
270     if (start) {
271       StartCallInternal(tag);
272     } else {
273       GPR_CODEGEN_ASSERT(tag == nullptr);
274     }
275   }
276
277   void StartCallInternal(void* tag) {
278     init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
279                                   context_->initial_metadata_flags());
280     init_ops_.set_output_tag(tag);
281     call_.PerformOps(&init_ops_);
282   }
283
284   ::grpc::ClientContext* context_;
285   ::grpc::internal::Call call_;
286   bool started_;
287   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
288                               ::grpc::internal::CallOpSendMessage,
289                               ::grpc::internal::CallOpClientSendClose>
290       init_ops_;
291   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
292       meta_ops_;
293   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
294                               ::grpc::internal::CallOpRecvMessage<R>>
295       read_ops_;
296   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
297                               ::grpc::internal::CallOpClientRecvStatus>
298       finish_ops_;
299 };
300
301 /// Common interface for client side asynchronous writing.
302 template <class W>
303 class ClientAsyncWriterInterface
304     : public internal::ClientAsyncStreamingInterface,
305       public internal::AsyncWriterInterface<W> {
306  public:
307   /// Signal the client is done with the writes (half-close the client stream).
308   /// Thread-safe with respect to \a AsyncReaderInterface::Read
309   ///
310   /// \param[in] tag The tag identifying the operation.
311   virtual void WritesDone(void* tag) = 0;
312 };
313
314 namespace internal {
315 template <class W>
316 class ClientAsyncWriterFactory {
317  public:
318   /// Create a stream object.
319   /// Start the RPC if \a start is set
320   /// \a tag will be notified on \a cq when the call has been started (i.e.
321   /// intitial metadata sent) and \a request has been written out.
322   /// If \a start is not set, \a tag must be nullptr and the actual call
323   /// must be initiated by StartCall
324   /// Note that \a context will be used to fill in custom initial metadata
325   /// used to send to the server when starting the call.
326   /// \a response will be filled in with the single expected response
327   /// message from the server upon a successful call to the \a Finish
328   /// method of this instance.
329   template <class R>
330   static ClientAsyncWriter<W>* Create(::grpc::ChannelInterface* channel,
331                                       ::grpc::CompletionQueue* cq,
332                                       const ::grpc::internal::RpcMethod& method,
333                                       ::grpc::ClientContext* context,
334                                       R* response, bool start, void* tag) {
335     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
336     return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
337         call.call(), sizeof(ClientAsyncWriter<W>)))
338         ClientAsyncWriter<W>(call, context, response, start, tag);
339   }
340 };
341 }  // namespace internal
342
343 /// Async API on the client side for doing client-streaming RPCs,
344 /// where the outgoing message stream going to the server contains
345 /// messages of type \a W.
346 template <class W>
347 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
348  public:
349   // always allocated against a call arena, no memory free required
350   static void operator delete(void* /*ptr*/, std::size_t size) {
351     GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter));
352   }
353
354   // This operator should never be called as the memory should be freed as part
355   // of the arena destruction. It only exists to provide a matching operator
356   // delete to the operator new so that some compilers will not complain (see
357   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
358   // there are no tests catching the compiler warning.
359   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
360
361   void StartCall(void* tag) override {
362     GPR_CODEGEN_ASSERT(!started_);
363     started_ = true;
364     StartCallInternal(tag);
365   }
366
367   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for
368   /// semantics.
369   ///
370   /// Side effect:
371   ///   - upon receiving initial metadata from the server, the \a ClientContext
372   ///     associated with this call is updated, and the calling code can access
373   ///     the received metadata through the \a ClientContext.
374   void ReadInitialMetadata(void* tag) override {
375     GPR_CODEGEN_ASSERT(started_);
376     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
377
378     meta_ops_.set_output_tag(tag);
379     meta_ops_.RecvInitialMetadata(context_);
380     call_.PerformOps(&meta_ops_);
381   }
382
383   void Write(const W& msg, void* tag) override {
384     GPR_CODEGEN_ASSERT(started_);
385     write_ops_.set_output_tag(tag);
386     // TODO(ctiller): don't assert
387     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
388     call_.PerformOps(&write_ops_);
389   }
390
391   void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
392     GPR_CODEGEN_ASSERT(started_);
393     write_ops_.set_output_tag(tag);
394     if (options.is_last_message()) {
395       options.set_buffer_hint();
396       write_ops_.ClientSendClose();
397     }
398     // TODO(ctiller): don't assert
399     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
400     call_.PerformOps(&write_ops_);
401   }
402
403   void WritesDone(void* tag) override {
404     GPR_CODEGEN_ASSERT(started_);
405     write_ops_.set_output_tag(tag);
406     write_ops_.ClientSendClose();
407     call_.PerformOps(&write_ops_);
408   }
409
410   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
411   ///
412   /// Side effect:
413   ///   - the \a ClientContext associated with this call is updated with
414   ///     possible initial and trailing metadata received from the server.
415   ///   - attempts to fill in the \a response parameter passed to this class's
416   ///     constructor with the server's response message.
417   void Finish(::grpc::Status* status, void* tag) override {
418     GPR_CODEGEN_ASSERT(started_);
419     finish_ops_.set_output_tag(tag);
420     if (!context_->initial_metadata_received_) {
421       finish_ops_.RecvInitialMetadata(context_);
422     }
423     finish_ops_.ClientRecvStatus(context_, status);
424     call_.PerformOps(&finish_ops_);
425   }
426
427  private:
428   friend class internal::ClientAsyncWriterFactory<W>;
429   template <class R>
430   ClientAsyncWriter(::grpc::internal::Call call, ::grpc::ClientContext* context,
431                     R* response, bool start, void* tag)
432       : context_(context), call_(call), started_(start) {
433     finish_ops_.RecvMessage(response);
434     finish_ops_.AllowNoMessage();
435     if (start) {
436       StartCallInternal(tag);
437     } else {
438       GPR_CODEGEN_ASSERT(tag == nullptr);
439     }
440   }
441
442   void StartCallInternal(void* tag) {
443     write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
444                                    context_->initial_metadata_flags());
445     // if corked bit is set in context, we just keep the initial metadata
446     // buffered up to coalesce with later message send. No op is performed.
447     if (!context_->initial_metadata_corked_) {
448       write_ops_.set_output_tag(tag);
449       call_.PerformOps(&write_ops_);
450     }
451   }
452
453   ::grpc::ClientContext* context_;
454   ::grpc::internal::Call call_;
455   bool started_;
456   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
457       meta_ops_;
458   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
459                               ::grpc::internal::CallOpSendMessage,
460                               ::grpc::internal::CallOpClientSendClose>
461       write_ops_;
462   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
463                               ::grpc::internal::CallOpGenericRecvMessage,
464                               ::grpc::internal::CallOpClientRecvStatus>
465       finish_ops_;
466 };
467
468 /// Async client-side interface for bi-directional streaming,
469 /// where the client-to-server message stream has messages of type \a W,
470 /// and the server-to-client message stream has messages of type \a R.
471 template <class W, class R>
472 class ClientAsyncReaderWriterInterface
473     : public internal::ClientAsyncStreamingInterface,
474       public internal::AsyncWriterInterface<W>,
475       public internal::AsyncReaderInterface<R> {
476  public:
477   /// Signal the client is done with the writes (half-close the client stream).
478   /// Thread-safe with respect to \a AsyncReaderInterface::Read
479   ///
480   /// \param[in] tag The tag identifying the operation.
481   virtual void WritesDone(void* tag) = 0;
482 };
483
484 namespace internal {
485 template <class W, class R>
486 class ClientAsyncReaderWriterFactory {
487  public:
488   /// Create a stream object.
489   /// Start the RPC request if \a start is set.
490   /// \a tag will be notified on \a cq when the call has been started (i.e.
491   /// intitial metadata sent). If \a start is not set, \a tag must be
492   /// nullptr and the actual call must be initiated by StartCall
493   /// Note that \a context will be used to fill in custom initial metadata
494   /// used to send to the server when starting the call.
495   static ClientAsyncReaderWriter<W, R>* Create(
496       ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq,
497       const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
498       bool start, void* tag) {
499     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
500
501     return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
502         call.call(), sizeof(ClientAsyncReaderWriter<W, R>)))
503         ClientAsyncReaderWriter<W, R>(call, context, start, tag);
504   }
505 };
506 }  // namespace internal
507
508 /// Async client-side interface for bi-directional streaming,
509 /// where the outgoing message stream going to the server
510 /// has messages of type \a W,  and the incoming message stream coming
511 /// from the server has messages of type \a R.
512 template <class W, class R>
513 class ClientAsyncReaderWriter final
514     : public ClientAsyncReaderWriterInterface<W, R> {
515  public:
516   // always allocated against a call arena, no memory free required
517   static void operator delete(void* /*ptr*/, std::size_t size) {
518     GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter));
519   }
520
521   // This operator should never be called as the memory should be freed as part
522   // of the arena destruction. It only exists to provide a matching operator
523   // delete to the operator new so that some compilers will not complain (see
524   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
525   // there are no tests catching the compiler warning.
526   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
527
528   void StartCall(void* tag) override {
529     GPR_CODEGEN_ASSERT(!started_);
530     started_ = true;
531     StartCallInternal(tag);
532   }
533
534   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method
535   /// for semantics of this method.
536   ///
537   /// Side effect:
538   ///   - upon receiving initial metadata from the server, the \a ClientContext
539   ///     is updated with it, and then the receiving initial metadata can
540   ///     be accessed through this \a ClientContext.
541   void ReadInitialMetadata(void* tag) override {
542     GPR_CODEGEN_ASSERT(started_);
543     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
544
545     meta_ops_.set_output_tag(tag);
546     meta_ops_.RecvInitialMetadata(context_);
547     call_.PerformOps(&meta_ops_);
548   }
549
550   void Read(R* msg, void* tag) override {
551     GPR_CODEGEN_ASSERT(started_);
552     read_ops_.set_output_tag(tag);
553     if (!context_->initial_metadata_received_) {
554       read_ops_.RecvInitialMetadata(context_);
555     }
556     read_ops_.RecvMessage(msg);
557     call_.PerformOps(&read_ops_);
558   }
559
560   void Write(const W& msg, void* tag) override {
561     GPR_CODEGEN_ASSERT(started_);
562     write_ops_.set_output_tag(tag);
563     // TODO(ctiller): don't assert
564     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
565     call_.PerformOps(&write_ops_);
566   }
567
568   void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
569     GPR_CODEGEN_ASSERT(started_);
570     write_ops_.set_output_tag(tag);
571     if (options.is_last_message()) {
572       options.set_buffer_hint();
573       write_ops_.ClientSendClose();
574     }
575     // TODO(ctiller): don't assert
576     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
577     call_.PerformOps(&write_ops_);
578   }
579
580   void WritesDone(void* tag) override {
581     GPR_CODEGEN_ASSERT(started_);
582     write_ops_.set_output_tag(tag);
583     write_ops_.ClientSendClose();
584     call_.PerformOps(&write_ops_);
585   }
586
587   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
588   /// Side effect
589   ///   - the \a ClientContext associated with this call is updated with
590   ///     possible initial and trailing metadata sent from the server.
591   void Finish(::grpc::Status* status, void* tag) override {
592     GPR_CODEGEN_ASSERT(started_);
593     finish_ops_.set_output_tag(tag);
594     if (!context_->initial_metadata_received_) {
595       finish_ops_.RecvInitialMetadata(context_);
596     }
597     finish_ops_.ClientRecvStatus(context_, status);
598     call_.PerformOps(&finish_ops_);
599   }
600
601  private:
602   friend class internal::ClientAsyncReaderWriterFactory<W, R>;
603   ClientAsyncReaderWriter(::grpc::internal::Call call,
604                           ::grpc::ClientContext* context, bool start, void* tag)
605       : context_(context), call_(call), started_(start) {
606     if (start) {
607       StartCallInternal(tag);
608     } else {
609       GPR_CODEGEN_ASSERT(tag == nullptr);
610     }
611   }
612
613   void StartCallInternal(void* tag) {
614     write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
615                                    context_->initial_metadata_flags());
616     // if corked bit is set in context, we just keep the initial metadata
617     // buffered up to coalesce with later message send. No op is performed.
618     if (!context_->initial_metadata_corked_) {
619       write_ops_.set_output_tag(tag);
620       call_.PerformOps(&write_ops_);
621     }
622   }
623
624   ::grpc::ClientContext* context_;
625   ::grpc::internal::Call call_;
626   bool started_;
627   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
628       meta_ops_;
629   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
630                               ::grpc::internal::CallOpRecvMessage<R>>
631       read_ops_;
632   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
633                               ::grpc::internal::CallOpSendMessage,
634                               ::grpc::internal::CallOpClientSendClose>
635       write_ops_;
636   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
637                               ::grpc::internal::CallOpClientRecvStatus>
638       finish_ops_;
639 };
640
641 template <class W, class R>
642 class ServerAsyncReaderInterface
643     : public ::grpc::internal::ServerAsyncStreamingInterface,
644       public internal::AsyncReaderInterface<R> {
645  public:
646   /// Indicate that the stream is to be finished with a certain status code
647   /// and also send out \a msg response to the client.
648   /// Request notification for when the server has sent the response and the
649   /// appropriate signals to the client to end the call.
650   /// Should not be used concurrently with other operations.
651   ///
652   /// It is appropriate to call this method when:
653   ///   * all messages from the client have been received (either known
654   ///     implictly, or explicitly because a previous
655   ///     \a AsyncReaderInterface::Read operation with a non-ok result,
656   ///     e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
657   ///
658   /// This operation will end when the server has finished sending out initial
659   /// metadata (if not sent already), response message, and status, or if
660   /// some failure occurred when trying to do so.
661   ///
662   /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it
663   /// is safe to deallocate once Finish returns.
664   ///
665   /// \param[in] tag Tag identifying this request.
666   /// \param[in] status To be sent to the client as the result of this call.
667   /// \param[in] msg To be sent to the client as the response for this call.
668   virtual void Finish(const W& msg, const ::grpc::Status& status,
669                       void* tag) = 0;
670
671   /// Indicate that the stream is to be finished with a certain
672   /// non-OK status code.
673   /// Request notification for when the server has sent the appropriate
674   /// signals to the client to end the call.
675   /// Should not be used concurrently with other operations.
676   ///
677   /// This call is meant to end the call with some error, and can be called at
678   /// any point that the server would like to "fail" the call (though note
679   /// this shouldn't be called concurrently with any other "sending" call, like
680   /// \a AsyncWriterInterface::Write).
681   ///
682   /// This operation will end when the server has finished sending out initial
683   /// metadata (if not sent already), and status, or if some failure occurred
684   /// when trying to do so.
685   ///
686   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
687   /// to deallocate once FinishWithError returns.
688   ///
689   /// \param[in] tag Tag identifying this request.
690   /// \param[in] status To be sent to the client as the result of this call.
691   ///     - Note: \a status must have a non-OK code.
692   virtual void FinishWithError(const ::grpc::Status& status, void* tag) = 0;
693 };
694
695 /// Async server-side API for doing client-streaming RPCs,
696 /// where the incoming message stream from the client has messages of type \a R,
697 /// and the single response message sent from the server is type \a W.
698 template <class W, class R>
699 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
700  public:
701   explicit ServerAsyncReader(::grpc::ServerContext* ctx)
702       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
703
704   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
705   ///
706   /// Implicit input parameter:
707   ///   - The initial metadata that will be sent to the client from this op will
708   ///     be taken from the \a ServerContext associated with the call.
709   void SendInitialMetadata(void* tag) override {
710     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
711
712     meta_ops_.set_output_tag(tag);
713     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
714                                   ctx_->initial_metadata_flags());
715     if (ctx_->compression_level_set()) {
716       meta_ops_.set_compression_level(ctx_->compression_level());
717     }
718     ctx_->sent_initial_metadata_ = true;
719     call_.PerformOps(&meta_ops_);
720   }
721
722   void Read(R* msg, void* tag) override {
723     read_ops_.set_output_tag(tag);
724     read_ops_.RecvMessage(msg);
725     call_.PerformOps(&read_ops_);
726   }
727
728   /// See the \a ServerAsyncReaderInterface.Read method for semantics
729   ///
730   /// Side effect:
731   ///   - also sends initial metadata if not alreay sent.
732   ///   - uses the \a ServerContext associated with this call to send possible
733   ///     initial and trailing metadata.
734   ///
735   /// Note: \a msg is not sent if \a status has a non-OK code.
736   ///
737   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
738   /// is safe to deallocate once Finish returns.
739   void Finish(const W& msg, const ::grpc::Status& status, void* tag) override {
740     finish_ops_.set_output_tag(tag);
741     if (!ctx_->sent_initial_metadata_) {
742       finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743                                       ctx_->initial_metadata_flags());
744       if (ctx_->compression_level_set()) {
745         finish_ops_.set_compression_level(ctx_->compression_level());
746       }
747       ctx_->sent_initial_metadata_ = true;
748     }
749     // The response is dropped if the status is not OK.
750     if (status.ok()) {
751       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
752                                    finish_ops_.SendMessage(msg));
753     } else {
754       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
755     }
756     call_.PerformOps(&finish_ops_);
757   }
758
759   /// See the \a ServerAsyncReaderInterface.Read method for semantics
760   ///
761   /// Side effect:
762   ///   - also sends initial metadata if not alreay sent.
763   ///   - uses the \a ServerContext associated with this call to send possible
764   ///     initial and trailing metadata.
765   ///
766   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
767   /// to deallocate once FinishWithError returns.
768   void FinishWithError(const ::grpc::Status& status, void* tag) override {
769     GPR_CODEGEN_ASSERT(!status.ok());
770     finish_ops_.set_output_tag(tag);
771     if (!ctx_->sent_initial_metadata_) {
772       finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773                                       ctx_->initial_metadata_flags());
774       if (ctx_->compression_level_set()) {
775         finish_ops_.set_compression_level(ctx_->compression_level());
776       }
777       ctx_->sent_initial_metadata_ = true;
778     }
779     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
780     call_.PerformOps(&finish_ops_);
781   }
782
783  private:
784   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
785
786   ::grpc::internal::Call call_;
787   ::grpc::ServerContext* ctx_;
788   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
789       meta_ops_;
790   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
791   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
792                               ::grpc::internal::CallOpSendMessage,
793                               ::grpc::internal::CallOpServerSendStatus>
794       finish_ops_;
795 };
796
797 template <class W>
798 class ServerAsyncWriterInterface
799     : public ::grpc::internal::ServerAsyncStreamingInterface,
800       public internal::AsyncWriterInterface<W> {
801  public:
802   /// Indicate that the stream is to be finished with a certain status code.
803   /// Request notification for when the server has sent the appropriate
804   /// signals to the client to end the call.
805   /// Should not be used concurrently with other operations.
806   ///
807   /// It is appropriate to call this method when either:
808   ///   * all messages from the client have been received (either known
809   ///     implictly, or explicitly because a previous \a
810   ///     AsyncReaderInterface::Read operation with a non-ok
811   ///     result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
812   ///   * it is desired to end the call early with some non-OK status code.
813   ///
814   /// This operation will end when the server has finished sending out initial
815   /// metadata (if not sent already), response message, and status, or if
816   /// some failure occurred when trying to do so.
817   ///
818   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
819   /// to deallocate once Finish returns.
820   ///
821   /// \param[in] tag Tag identifying this request.
822   /// \param[in] status To be sent to the client as the result of this call.
823   virtual void Finish(const ::grpc::Status& status, void* tag) = 0;
824
825   /// Request the writing of \a msg and coalesce it with trailing metadata which
826   /// contains \a status, using WriteOptions options with
827   /// identifying tag \a tag.
828   ///
829   /// WriteAndFinish is equivalent of performing WriteLast and Finish
830   /// in a single step.
831   ///
832   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
833   /// is safe to deallocate once WriteAndFinish returns.
834   ///
835   /// \param[in] msg The message to be written.
836   /// \param[in] options The WriteOptions to be used to write this message.
837   /// \param[in] status The Status that server returns to client.
838   /// \param[in] tag The tag identifying the operation.
839   virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
840                               const ::grpc::Status& status, void* tag) = 0;
841 };
842
843 /// Async server-side API for doing server streaming RPCs,
844 /// where the outgoing message stream from the server has messages of type \a W.
845 template <class W>
846 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
847  public:
848   explicit ServerAsyncWriter(::grpc::ServerContext* ctx)
849       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
850
851   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
852   ///
853   /// Implicit input parameter:
854   ///   - The initial metadata that will be sent to the client from this op will
855   ///     be taken from the \a ServerContext associated with the call.
856   ///
857   /// \param[in] tag Tag identifying this request.
858   void SendInitialMetadata(void* tag) override {
859     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
860
861     meta_ops_.set_output_tag(tag);
862     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
863                                   ctx_->initial_metadata_flags());
864     if (ctx_->compression_level_set()) {
865       meta_ops_.set_compression_level(ctx_->compression_level());
866     }
867     ctx_->sent_initial_metadata_ = true;
868     call_.PerformOps(&meta_ops_);
869   }
870
871   void Write(const W& msg, void* tag) override {
872     write_ops_.set_output_tag(tag);
873     EnsureInitialMetadataSent(&write_ops_);
874     // TODO(ctiller): don't assert
875     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
876     call_.PerformOps(&write_ops_);
877   }
878
879   void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
880     write_ops_.set_output_tag(tag);
881     if (options.is_last_message()) {
882       options.set_buffer_hint();
883     }
884
885     EnsureInitialMetadataSent(&write_ops_);
886     // TODO(ctiller): don't assert
887     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
888     call_.PerformOps(&write_ops_);
889   }
890
891   /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics.
892   ///
893   /// Implicit input parameter:
894   ///   - the \a ServerContext associated with this call is used
895   ///     for sending trailing (and initial) metadata to the client.
896   ///
897   /// Note: \a status must have an OK code.
898   ///
899   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
900   /// is safe to deallocate once WriteAndFinish returns.
901   void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
902                       const ::grpc::Status& status, void* tag) override {
903     write_ops_.set_output_tag(tag);
904     EnsureInitialMetadataSent(&write_ops_);
905     options.set_buffer_hint();
906     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
907     write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
908     call_.PerformOps(&write_ops_);
909   }
910
911   /// See the \a ServerAsyncWriterInterface.Finish method for semantics.
912   ///
913   /// Implicit input parameter:
914   ///   - the \a ServerContext associated with this call is used for sending
915   ///     trailing (and initial if not already sent) metadata to the client.
916   ///
917   /// Note: there are no restrictions are the code of
918   /// \a status,it may be non-OK
919   ///
920   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
921   /// to deallocate once Finish returns.
922   void Finish(const ::grpc::Status& status, void* tag) override {
923     finish_ops_.set_output_tag(tag);
924     EnsureInitialMetadataSent(&finish_ops_);
925     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
926     call_.PerformOps(&finish_ops_);
927   }
928
929  private:
930   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
931
932   template <class T>
933   void EnsureInitialMetadataSent(T* ops) {
934     if (!ctx_->sent_initial_metadata_) {
935       ops->SendInitialMetadata(&ctx_->initial_metadata_,
936                                ctx_->initial_metadata_flags());
937       if (ctx_->compression_level_set()) {
938         ops->set_compression_level(ctx_->compression_level());
939       }
940       ctx_->sent_initial_metadata_ = true;
941     }
942   }
943
944   ::grpc::internal::Call call_;
945   ::grpc::ServerContext* ctx_;
946   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
947       meta_ops_;
948   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
949                               ::grpc::internal::CallOpSendMessage,
950                               ::grpc::internal::CallOpServerSendStatus>
951       write_ops_;
952   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
953                               ::grpc::internal::CallOpServerSendStatus>
954       finish_ops_;
955 };
956
957 /// Server-side interface for asynchronous bi-directional streaming.
958 template <class W, class R>
959 class ServerAsyncReaderWriterInterface
960     : public ::grpc::internal::ServerAsyncStreamingInterface,
961       public internal::AsyncWriterInterface<W>,
962       public internal::AsyncReaderInterface<R> {
963  public:
964   /// Indicate that the stream is to be finished with a certain status code.
965   /// Request notification for when the server has sent the appropriate
966   /// signals to the client to end the call.
967   /// Should not be used concurrently with other operations.
968   ///
969   /// It is appropriate to call this method when either:
970   ///   * all messages from the client have been received (either known
971   ///     implictly, or explicitly because a previous \a
972   ///     AsyncReaderInterface::Read operation
973   ///     with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
974   ///     with 'false'.
975   ///   * it is desired to end the call early with some non-OK status code.
976   ///
977   /// This operation will end when the server has finished sending out initial
978   /// metadata (if not sent already), response message, and status, or if some
979   /// failure occurred when trying to do so.
980   ///
981   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
982   /// to deallocate once Finish returns.
983   ///
984   /// \param[in] tag Tag identifying this request.
985   /// \param[in] status To be sent to the client as the result of this call.
986   virtual void Finish(const ::grpc::Status& status, void* tag) = 0;
987
988   /// Request the writing of \a msg and coalesce it with trailing metadata which
989   /// contains \a status, using WriteOptions options with
990   /// identifying tag \a tag.
991   ///
992   /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
993   /// single step.
994   ///
995   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
996   /// is safe to deallocate once WriteAndFinish returns.
997   ///
998   /// \param[in] msg The message to be written.
999   /// \param[in] options The WriteOptions to be used to write this message.
1000   /// \param[in] status The Status that server returns to client.
1001   /// \param[in] tag The tag identifying the operation.
1002   virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
1003                               const ::grpc::Status& status, void* tag) = 0;
1004 };
1005
1006 /// Async server-side API for doing bidirectional streaming RPCs,
1007 /// where the incoming message stream coming from the client has messages of
1008 /// type \a R, and the outgoing message stream coming from the server has
1009 /// messages of type \a W.
1010 template <class W, class R>
1011 class ServerAsyncReaderWriter final
1012     : public ServerAsyncReaderWriterInterface<W, R> {
1013  public:
1014   explicit ServerAsyncReaderWriter(::grpc::ServerContext* ctx)
1015       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1016
1017   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
1018   ///
1019   /// Implicit input parameter:
1020   ///   - The initial metadata that will be sent to the client from this op will
1021   ///     be taken from the \a ServerContext associated with the call.
1022   ///
1023   /// \param[in] tag Tag identifying this request.
1024   void SendInitialMetadata(void* tag) override {
1025     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1026
1027     meta_ops_.set_output_tag(tag);
1028     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1029                                   ctx_->initial_metadata_flags());
1030     if (ctx_->compression_level_set()) {
1031       meta_ops_.set_compression_level(ctx_->compression_level());
1032     }
1033     ctx_->sent_initial_metadata_ = true;
1034     call_.PerformOps(&meta_ops_);
1035   }
1036
1037   void Read(R* msg, void* tag) override {
1038     read_ops_.set_output_tag(tag);
1039     read_ops_.RecvMessage(msg);
1040     call_.PerformOps(&read_ops_);
1041   }
1042
1043   void Write(const W& msg, void* tag) override {
1044     write_ops_.set_output_tag(tag);
1045     EnsureInitialMetadataSent(&write_ops_);
1046     // TODO(ctiller): don't assert
1047     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
1048     call_.PerformOps(&write_ops_);
1049   }
1050
1051   void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
1052     write_ops_.set_output_tag(tag);
1053     if (options.is_last_message()) {
1054       options.set_buffer_hint();
1055     }
1056     EnsureInitialMetadataSent(&write_ops_);
1057     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
1058     call_.PerformOps(&write_ops_);
1059   }
1060
1061   /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish
1062   /// method for semantics.
1063   ///
1064   /// Implicit input parameter:
1065   ///   - the \a ServerContext associated with this call is used
1066   ///     for sending trailing (and initial) metadata to the client.
1067   ///
1068   /// Note: \a status must have an OK code.
1069   //
1070   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
1071   /// is safe to deallocate once WriteAndFinish returns.
1072   void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
1073                       const ::grpc::Status& status, void* tag) override {
1074     write_ops_.set_output_tag(tag);
1075     EnsureInitialMetadataSent(&write_ops_);
1076     options.set_buffer_hint();
1077     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
1078     write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1079     call_.PerformOps(&write_ops_);
1080   }
1081
1082   /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics.
1083   ///
1084   /// Implicit input parameter:
1085   ///   - the \a ServerContext associated with this call is used for sending
1086   ///     trailing (and initial if not already sent) metadata to the client.
1087   ///
1088   /// Note: there are no restrictions are the code of \a status,
1089   /// it may be non-OK
1090   //
1091   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
1092   /// to deallocate once Finish returns.
1093   void Finish(const ::grpc::Status& status, void* tag) override {
1094     finish_ops_.set_output_tag(tag);
1095     EnsureInitialMetadataSent(&finish_ops_);
1096
1097     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1098     call_.PerformOps(&finish_ops_);
1099   }
1100
1101  private:
1102   friend class ::grpc::Server;
1103
1104   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
1105
1106   template <class T>
1107   void EnsureInitialMetadataSent(T* ops) {
1108     if (!ctx_->sent_initial_metadata_) {
1109       ops->SendInitialMetadata(&ctx_->initial_metadata_,
1110                                ctx_->initial_metadata_flags());
1111       if (ctx_->compression_level_set()) {
1112         ops->set_compression_level(ctx_->compression_level());
1113       }
1114       ctx_->sent_initial_metadata_ = true;
1115     }
1116   }
1117
1118   ::grpc::internal::Call call_;
1119   ::grpc::ServerContext* ctx_;
1120   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
1121       meta_ops_;
1122   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
1123   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
1124                               ::grpc::internal::CallOpSendMessage,
1125                               ::grpc::internal::CallOpServerSendStatus>
1126       write_ops_;
1127   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
1128                               ::grpc::internal::CallOpServerSendStatus>
1129       finish_ops_;
1130 };
1131
1132 }  // namespace grpc
1133 #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H