3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
19 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
21 // IWYU pragma: private, include <grpcpp/support/async_stream.h>
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/channel_interface.h>
25 #include <grpcpp/impl/codegen/core_codegen_interface.h>
26 #include <grpcpp/impl/codegen/server_context.h>
27 #include <grpcpp/impl/codegen/service_type.h>
28 #include <grpcpp/impl/codegen/status.h>
33 /// Common interface for all client side asynchronous streaming.
34 class ClientAsyncStreamingInterface {
36 virtual ~ClientAsyncStreamingInterface() {}
38 /// Start the call that was set up by the constructor, but only if the
39 /// constructor was invoked through the "Prepare" API which doesn't actually
41 virtual void StartCall(void* tag) = 0;
43 /// Request notification of the reading of the initial metadata. Completion
44 /// will be notified by \a tag on the associated completion queue.
45 /// This call is optional, but if it is used, it cannot be used concurrently
46 /// with or after the \a AsyncReaderInterface::Read method.
48 /// \param[in] tag Tag identifying this request.
49 virtual void ReadInitialMetadata(void* tag) = 0;
51 /// Indicate that the stream is to be finished and request notification for
52 /// when the call has been ended.
53 /// Should not be used concurrently with other operations.
55 /// It is appropriate to call this method exactly once when both:
56 /// * the client side has no more message to send
57 /// (this can be declared implicitly by calling this method, or
58 /// explicitly through an earlier call to the <i>WritesDone</i> method
59 /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or
60 /// \a ClientAsyncReaderWriterInterface::WritesDone).
61 /// * there are no more messages to be received from the server (this can
62 /// be known implicitly by the calling code, or explicitly from an
63 /// earlier call to \a AsyncReaderInterface::Read that yielded a failed
64 /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
66 /// The tag will be returned when either:
67 /// - all incoming messages have been read and the server has returned
69 /// - the server has returned a non-OK status.
70 /// - the call failed for some reason and the library generated a
73 /// Note that implementations of this method attempt to receive initial
74 /// metadata from the server if initial metadata hasn't yet been received.
76 /// \param[in] tag Tag identifying this request.
77 /// \param[out] status To be updated with the operation status.
78 virtual void Finish(::grpc::Status* status, void* tag) = 0;
81 /// An interface that yields a sequence of messages of type \a R.
83 class AsyncReaderInterface {
85 virtual ~AsyncReaderInterface() {}
87 /// Read a message of type \a R into \a msg. Completion will be notified by \a
88 /// tag on the associated completion queue.
89 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
90 /// should not be called concurrently with other streaming APIs
91 /// on the same stream. It is not meaningful to call it concurrently
92 /// with another \a AsyncReaderInterface::Read on the same stream since reads
93 /// on the same stream are delivered in order.
95 /// \param[out] msg Where to eventually store the read message.
96 /// \param[in] tag The tag identifying the operation.
98 /// Side effect: note that this method attempt to receive initial metadata for
99 /// a stream if it hasn't yet been received.
100 virtual void Read(R* msg, void* tag) = 0;
103 /// An interface that can be fed a sequence of messages of type \a W.
105 class AsyncWriterInterface {
107 virtual ~AsyncWriterInterface() {}
109 /// Request the writing of \a msg with identifying tag \a tag.
111 /// Only one write may be outstanding at any given time. This means that
112 /// after calling Write, one must wait to receive \a tag from the completion
113 /// queue BEFORE calling Write again.
114 /// This is thread-safe with respect to \a AsyncReaderInterface::Read
116 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
117 /// to deallocate once Write returns.
119 /// \param[in] msg The message to be written.
120 /// \param[in] tag The tag identifying the operation.
121 virtual void Write(const W& msg, void* tag) = 0;
123 /// Request the writing of \a msg using WriteOptions \a options with
124 /// identifying tag \a tag.
126 /// Only one write may be outstanding at any given time. This means that
127 /// after calling Write, one must wait to receive \a tag from the completion
128 /// queue BEFORE calling Write again.
129 /// WriteOptions \a options is used to set the write options of this message.
130 /// This is thread-safe with respect to \a AsyncReaderInterface::Read
132 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
133 /// to deallocate once Write returns.
135 /// \param[in] msg The message to be written.
136 /// \param[in] options The WriteOptions to be used to write this message.
137 /// \param[in] tag The tag identifying the operation.
138 virtual void Write(const W& msg, ::grpc::WriteOptions options, void* tag) = 0;
140 /// Request the writing of \a msg and coalesce it with the writing
141 /// of trailing metadata, using WriteOptions \a options with
142 /// identifying tag \a tag.
144 /// For client, WriteLast is equivalent of performing Write and
145 /// WritesDone in a single step.
146 /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
147 /// until Finish is called, where \a msg and trailing metadata are coalesced
148 /// and write is initiated. Note that WriteLast can only buffer \a msg up to
149 /// the flow control window size. If \a msg size is larger than the window
150 /// size, it will be sent on wire without buffering.
152 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
153 /// to deallocate once Write returns.
155 /// \param[in] msg The message to be written.
156 /// \param[in] options The WriteOptions to be used to write this message.
157 /// \param[in] tag The tag identifying the operation.
158 void WriteLast(const W& msg, ::grpc::WriteOptions options, void* tag) {
159 Write(msg, options.set_last_message(), tag);
163 } // namespace internal
166 class ClientAsyncReaderInterface
167 : public internal::ClientAsyncStreamingInterface,
168 public internal::AsyncReaderInterface<R> {};
172 class ClientAsyncReaderFactory {
174 /// Create a stream object.
175 /// Write the first request out if \a start is set.
176 /// \a tag will be notified on \a cq when the call has been started and
177 /// \a request has been written out. If \a start is not set, \a tag must be
178 /// nullptr and the actual call must be initiated by StartCall
179 /// Note that \a context will be used to fill in custom initial metadata
180 /// used to send to the server when starting the call.
182 static ClientAsyncReader<R>* Create(::grpc::ChannelInterface* channel,
183 ::grpc::CompletionQueue* cq,
184 const ::grpc::internal::RpcMethod& method,
185 ::grpc::ClientContext* context,
186 const W& request, bool start, void* tag) {
187 ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
188 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
189 call.call(), sizeof(ClientAsyncReader<R>)))
190 ClientAsyncReader<R>(call, context, request, start, tag);
193 } // namespace internal
195 /// Async client-side API for doing server-streaming RPCs,
196 /// where the incoming message stream coming from the server has
197 /// messages of type \a R.
199 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
201 // always allocated against a call arena, no memory free required
202 static void operator delete(void* /*ptr*/, std::size_t size) {
203 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader));
206 // This operator should never be called as the memory should be freed as part
207 // of the arena destruction. It only exists to provide a matching operator
208 // delete to the operator new so that some compilers will not complain (see
209 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
210 // there are no tests catching the compiler warning.
211 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
213 void StartCall(void* tag) override {
214 GPR_CODEGEN_ASSERT(!started_);
216 StartCallInternal(tag);
219 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata
220 /// method for semantics.
223 /// - upon receiving initial metadata from the server,
224 /// the \a ClientContext associated with this call is updated, and the
225 /// calling code can access the received metadata through the
226 /// \a ClientContext.
227 void ReadInitialMetadata(void* tag) override {
228 GPR_CODEGEN_ASSERT(started_);
229 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
231 meta_ops_.set_output_tag(tag);
232 meta_ops_.RecvInitialMetadata(context_);
233 call_.PerformOps(&meta_ops_);
236 void Read(R* msg, void* tag) override {
237 GPR_CODEGEN_ASSERT(started_);
238 read_ops_.set_output_tag(tag);
239 if (!context_->initial_metadata_received_) {
240 read_ops_.RecvInitialMetadata(context_);
242 read_ops_.RecvMessage(msg);
243 call_.PerformOps(&read_ops_);
246 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
249 /// - the \a ClientContext associated with this call is updated with
250 /// possible initial and trailing metadata received from the server.
251 void Finish(::grpc::Status* status, void* tag) override {
252 GPR_CODEGEN_ASSERT(started_);
253 finish_ops_.set_output_tag(tag);
254 if (!context_->initial_metadata_received_) {
255 finish_ops_.RecvInitialMetadata(context_);
257 finish_ops_.ClientRecvStatus(context_, status);
258 call_.PerformOps(&finish_ops_);
262 friend class internal::ClientAsyncReaderFactory<R>;
264 ClientAsyncReader(::grpc::internal::Call call, ::grpc::ClientContext* context,
265 const W& request, bool start, void* tag)
266 : context_(context), call_(call), started_(start) {
267 // TODO(ctiller): don't assert
268 GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
269 init_ops_.ClientSendClose();
271 StartCallInternal(tag);
273 GPR_CODEGEN_ASSERT(tag == nullptr);
277 void StartCallInternal(void* tag) {
278 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
279 context_->initial_metadata_flags());
280 init_ops_.set_output_tag(tag);
281 call_.PerformOps(&init_ops_);
284 ::grpc::ClientContext* context_;
285 ::grpc::internal::Call call_;
287 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
288 ::grpc::internal::CallOpSendMessage,
289 ::grpc::internal::CallOpClientSendClose>
291 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
293 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
294 ::grpc::internal::CallOpRecvMessage<R>>
296 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
297 ::grpc::internal::CallOpClientRecvStatus>
301 /// Common interface for client side asynchronous writing.
303 class ClientAsyncWriterInterface
304 : public internal::ClientAsyncStreamingInterface,
305 public internal::AsyncWriterInterface<W> {
307 /// Signal the client is done with the writes (half-close the client stream).
308 /// Thread-safe with respect to \a AsyncReaderInterface::Read
310 /// \param[in] tag The tag identifying the operation.
311 virtual void WritesDone(void* tag) = 0;
316 class ClientAsyncWriterFactory {
318 /// Create a stream object.
319 /// Start the RPC if \a start is set
320 /// \a tag will be notified on \a cq when the call has been started (i.e.
321 /// intitial metadata sent) and \a request has been written out.
322 /// If \a start is not set, \a tag must be nullptr and the actual call
323 /// must be initiated by StartCall
324 /// Note that \a context will be used to fill in custom initial metadata
325 /// used to send to the server when starting the call.
326 /// \a response will be filled in with the single expected response
327 /// message from the server upon a successful call to the \a Finish
328 /// method of this instance.
330 static ClientAsyncWriter<W>* Create(::grpc::ChannelInterface* channel,
331 ::grpc::CompletionQueue* cq,
332 const ::grpc::internal::RpcMethod& method,
333 ::grpc::ClientContext* context,
334 R* response, bool start, void* tag) {
335 ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
336 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
337 call.call(), sizeof(ClientAsyncWriter<W>)))
338 ClientAsyncWriter<W>(call, context, response, start, tag);
341 } // namespace internal
343 /// Async API on the client side for doing client-streaming RPCs,
344 /// where the outgoing message stream going to the server contains
345 /// messages of type \a W.
347 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
349 // always allocated against a call arena, no memory free required
350 static void operator delete(void* /*ptr*/, std::size_t size) {
351 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter));
354 // This operator should never be called as the memory should be freed as part
355 // of the arena destruction. It only exists to provide a matching operator
356 // delete to the operator new so that some compilers will not complain (see
357 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
358 // there are no tests catching the compiler warning.
359 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
361 void StartCall(void* tag) override {
362 GPR_CODEGEN_ASSERT(!started_);
364 StartCallInternal(tag);
367 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for
371 /// - upon receiving initial metadata from the server, the \a ClientContext
372 /// associated with this call is updated, and the calling code can access
373 /// the received metadata through the \a ClientContext.
374 void ReadInitialMetadata(void* tag) override {
375 GPR_CODEGEN_ASSERT(started_);
376 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
378 meta_ops_.set_output_tag(tag);
379 meta_ops_.RecvInitialMetadata(context_);
380 call_.PerformOps(&meta_ops_);
383 void Write(const W& msg, void* tag) override {
384 GPR_CODEGEN_ASSERT(started_);
385 write_ops_.set_output_tag(tag);
386 // TODO(ctiller): don't assert
387 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
388 call_.PerformOps(&write_ops_);
391 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
392 GPR_CODEGEN_ASSERT(started_);
393 write_ops_.set_output_tag(tag);
394 if (options.is_last_message()) {
395 options.set_buffer_hint();
396 write_ops_.ClientSendClose();
398 // TODO(ctiller): don't assert
399 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
400 call_.PerformOps(&write_ops_);
403 void WritesDone(void* tag) override {
404 GPR_CODEGEN_ASSERT(started_);
405 write_ops_.set_output_tag(tag);
406 write_ops_.ClientSendClose();
407 call_.PerformOps(&write_ops_);
410 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
413 /// - the \a ClientContext associated with this call is updated with
414 /// possible initial and trailing metadata received from the server.
415 /// - attempts to fill in the \a response parameter passed to this class's
416 /// constructor with the server's response message.
417 void Finish(::grpc::Status* status, void* tag) override {
418 GPR_CODEGEN_ASSERT(started_);
419 finish_ops_.set_output_tag(tag);
420 if (!context_->initial_metadata_received_) {
421 finish_ops_.RecvInitialMetadata(context_);
423 finish_ops_.ClientRecvStatus(context_, status);
424 call_.PerformOps(&finish_ops_);
428 friend class internal::ClientAsyncWriterFactory<W>;
430 ClientAsyncWriter(::grpc::internal::Call call, ::grpc::ClientContext* context,
431 R* response, bool start, void* tag)
432 : context_(context), call_(call), started_(start) {
433 finish_ops_.RecvMessage(response);
434 finish_ops_.AllowNoMessage();
436 StartCallInternal(tag);
438 GPR_CODEGEN_ASSERT(tag == nullptr);
442 void StartCallInternal(void* tag) {
443 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
444 context_->initial_metadata_flags());
445 // if corked bit is set in context, we just keep the initial metadata
446 // buffered up to coalesce with later message send. No op is performed.
447 if (!context_->initial_metadata_corked_) {
448 write_ops_.set_output_tag(tag);
449 call_.PerformOps(&write_ops_);
453 ::grpc::ClientContext* context_;
454 ::grpc::internal::Call call_;
456 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
458 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
459 ::grpc::internal::CallOpSendMessage,
460 ::grpc::internal::CallOpClientSendClose>
462 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
463 ::grpc::internal::CallOpGenericRecvMessage,
464 ::grpc::internal::CallOpClientRecvStatus>
468 /// Async client-side interface for bi-directional streaming,
469 /// where the client-to-server message stream has messages of type \a W,
470 /// and the server-to-client message stream has messages of type \a R.
471 template <class W, class R>
472 class ClientAsyncReaderWriterInterface
473 : public internal::ClientAsyncStreamingInterface,
474 public internal::AsyncWriterInterface<W>,
475 public internal::AsyncReaderInterface<R> {
477 /// Signal the client is done with the writes (half-close the client stream).
478 /// Thread-safe with respect to \a AsyncReaderInterface::Read
480 /// \param[in] tag The tag identifying the operation.
481 virtual void WritesDone(void* tag) = 0;
485 template <class W, class R>
486 class ClientAsyncReaderWriterFactory {
488 /// Create a stream object.
489 /// Start the RPC request if \a start is set.
490 /// \a tag will be notified on \a cq when the call has been started (i.e.
491 /// intitial metadata sent). If \a start is not set, \a tag must be
492 /// nullptr and the actual call must be initiated by StartCall
493 /// Note that \a context will be used to fill in custom initial metadata
494 /// used to send to the server when starting the call.
495 static ClientAsyncReaderWriter<W, R>* Create(
496 ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq,
497 const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
498 bool start, void* tag) {
499 ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
501 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
502 call.call(), sizeof(ClientAsyncReaderWriter<W, R>)))
503 ClientAsyncReaderWriter<W, R>(call, context, start, tag);
506 } // namespace internal
508 /// Async client-side interface for bi-directional streaming,
509 /// where the outgoing message stream going to the server
510 /// has messages of type \a W, and the incoming message stream coming
511 /// from the server has messages of type \a R.
512 template <class W, class R>
513 class ClientAsyncReaderWriter final
514 : public ClientAsyncReaderWriterInterface<W, R> {
516 // always allocated against a call arena, no memory free required
517 static void operator delete(void* /*ptr*/, std::size_t size) {
518 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter));
521 // This operator should never be called as the memory should be freed as part
522 // of the arena destruction. It only exists to provide a matching operator
523 // delete to the operator new so that some compilers will not complain (see
524 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
525 // there are no tests catching the compiler warning.
526 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
528 void StartCall(void* tag) override {
529 GPR_CODEGEN_ASSERT(!started_);
531 StartCallInternal(tag);
534 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method
535 /// for semantics of this method.
538 /// - upon receiving initial metadata from the server, the \a ClientContext
539 /// is updated with it, and then the receiving initial metadata can
540 /// be accessed through this \a ClientContext.
541 void ReadInitialMetadata(void* tag) override {
542 GPR_CODEGEN_ASSERT(started_);
543 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
545 meta_ops_.set_output_tag(tag);
546 meta_ops_.RecvInitialMetadata(context_);
547 call_.PerformOps(&meta_ops_);
550 void Read(R* msg, void* tag) override {
551 GPR_CODEGEN_ASSERT(started_);
552 read_ops_.set_output_tag(tag);
553 if (!context_->initial_metadata_received_) {
554 read_ops_.RecvInitialMetadata(context_);
556 read_ops_.RecvMessage(msg);
557 call_.PerformOps(&read_ops_);
560 void Write(const W& msg, void* tag) override {
561 GPR_CODEGEN_ASSERT(started_);
562 write_ops_.set_output_tag(tag);
563 // TODO(ctiller): don't assert
564 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
565 call_.PerformOps(&write_ops_);
568 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
569 GPR_CODEGEN_ASSERT(started_);
570 write_ops_.set_output_tag(tag);
571 if (options.is_last_message()) {
572 options.set_buffer_hint();
573 write_ops_.ClientSendClose();
575 // TODO(ctiller): don't assert
576 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
577 call_.PerformOps(&write_ops_);
580 void WritesDone(void* tag) override {
581 GPR_CODEGEN_ASSERT(started_);
582 write_ops_.set_output_tag(tag);
583 write_ops_.ClientSendClose();
584 call_.PerformOps(&write_ops_);
587 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
589 /// - the \a ClientContext associated with this call is updated with
590 /// possible initial and trailing metadata sent from the server.
591 void Finish(::grpc::Status* status, void* tag) override {
592 GPR_CODEGEN_ASSERT(started_);
593 finish_ops_.set_output_tag(tag);
594 if (!context_->initial_metadata_received_) {
595 finish_ops_.RecvInitialMetadata(context_);
597 finish_ops_.ClientRecvStatus(context_, status);
598 call_.PerformOps(&finish_ops_);
602 friend class internal::ClientAsyncReaderWriterFactory<W, R>;
603 ClientAsyncReaderWriter(::grpc::internal::Call call,
604 ::grpc::ClientContext* context, bool start, void* tag)
605 : context_(context), call_(call), started_(start) {
607 StartCallInternal(tag);
609 GPR_CODEGEN_ASSERT(tag == nullptr);
613 void StartCallInternal(void* tag) {
614 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
615 context_->initial_metadata_flags());
616 // if corked bit is set in context, we just keep the initial metadata
617 // buffered up to coalesce with later message send. No op is performed.
618 if (!context_->initial_metadata_corked_) {
619 write_ops_.set_output_tag(tag);
620 call_.PerformOps(&write_ops_);
624 ::grpc::ClientContext* context_;
625 ::grpc::internal::Call call_;
627 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
629 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
630 ::grpc::internal::CallOpRecvMessage<R>>
632 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
633 ::grpc::internal::CallOpSendMessage,
634 ::grpc::internal::CallOpClientSendClose>
636 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
637 ::grpc::internal::CallOpClientRecvStatus>
641 template <class W, class R>
642 class ServerAsyncReaderInterface
643 : public ::grpc::internal::ServerAsyncStreamingInterface,
644 public internal::AsyncReaderInterface<R> {
646 /// Indicate that the stream is to be finished with a certain status code
647 /// and also send out \a msg response to the client.
648 /// Request notification for when the server has sent the response and the
649 /// appropriate signals to the client to end the call.
650 /// Should not be used concurrently with other operations.
652 /// It is appropriate to call this method when:
653 /// * all messages from the client have been received (either known
654 /// implictly, or explicitly because a previous
655 /// \a AsyncReaderInterface::Read operation with a non-ok result,
656 /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
658 /// This operation will end when the server has finished sending out initial
659 /// metadata (if not sent already), response message, and status, or if
660 /// some failure occurred when trying to do so.
662 /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it
663 /// is safe to deallocate once Finish returns.
665 /// \param[in] tag Tag identifying this request.
666 /// \param[in] status To be sent to the client as the result of this call.
667 /// \param[in] msg To be sent to the client as the response for this call.
668 virtual void Finish(const W& msg, const ::grpc::Status& status,
671 /// Indicate that the stream is to be finished with a certain
672 /// non-OK status code.
673 /// Request notification for when the server has sent the appropriate
674 /// signals to the client to end the call.
675 /// Should not be used concurrently with other operations.
677 /// This call is meant to end the call with some error, and can be called at
678 /// any point that the server would like to "fail" the call (though note
679 /// this shouldn't be called concurrently with any other "sending" call, like
680 /// \a AsyncWriterInterface::Write).
682 /// This operation will end when the server has finished sending out initial
683 /// metadata (if not sent already), and status, or if some failure occurred
684 /// when trying to do so.
686 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
687 /// to deallocate once FinishWithError returns.
689 /// \param[in] tag Tag identifying this request.
690 /// \param[in] status To be sent to the client as the result of this call.
691 /// - Note: \a status must have a non-OK code.
692 virtual void FinishWithError(const ::grpc::Status& status, void* tag) = 0;
695 /// Async server-side API for doing client-streaming RPCs,
696 /// where the incoming message stream from the client has messages of type \a R,
697 /// and the single response message sent from the server is type \a W.
698 template <class W, class R>
699 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
701 explicit ServerAsyncReader(::grpc::ServerContext* ctx)
702 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
704 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
706 /// Implicit input parameter:
707 /// - The initial metadata that will be sent to the client from this op will
708 /// be taken from the \a ServerContext associated with the call.
709 void SendInitialMetadata(void* tag) override {
710 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
712 meta_ops_.set_output_tag(tag);
713 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
714 ctx_->initial_metadata_flags());
715 if (ctx_->compression_level_set()) {
716 meta_ops_.set_compression_level(ctx_->compression_level());
718 ctx_->sent_initial_metadata_ = true;
719 call_.PerformOps(&meta_ops_);
722 void Read(R* msg, void* tag) override {
723 read_ops_.set_output_tag(tag);
724 read_ops_.RecvMessage(msg);
725 call_.PerformOps(&read_ops_);
728 /// See the \a ServerAsyncReaderInterface.Read method for semantics
731 /// - also sends initial metadata if not alreay sent.
732 /// - uses the \a ServerContext associated with this call to send possible
733 /// initial and trailing metadata.
735 /// Note: \a msg is not sent if \a status has a non-OK code.
737 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
738 /// is safe to deallocate once Finish returns.
739 void Finish(const W& msg, const ::grpc::Status& status, void* tag) override {
740 finish_ops_.set_output_tag(tag);
741 if (!ctx_->sent_initial_metadata_) {
742 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743 ctx_->initial_metadata_flags());
744 if (ctx_->compression_level_set()) {
745 finish_ops_.set_compression_level(ctx_->compression_level());
747 ctx_->sent_initial_metadata_ = true;
749 // The response is dropped if the status is not OK.
751 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
752 finish_ops_.SendMessage(msg));
754 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
756 call_.PerformOps(&finish_ops_);
759 /// See the \a ServerAsyncReaderInterface.Read method for semantics
762 /// - also sends initial metadata if not alreay sent.
763 /// - uses the \a ServerContext associated with this call to send possible
764 /// initial and trailing metadata.
766 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
767 /// to deallocate once FinishWithError returns.
768 void FinishWithError(const ::grpc::Status& status, void* tag) override {
769 GPR_CODEGEN_ASSERT(!status.ok());
770 finish_ops_.set_output_tag(tag);
771 if (!ctx_->sent_initial_metadata_) {
772 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
773 ctx_->initial_metadata_flags());
774 if (ctx_->compression_level_set()) {
775 finish_ops_.set_compression_level(ctx_->compression_level());
777 ctx_->sent_initial_metadata_ = true;
779 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
780 call_.PerformOps(&finish_ops_);
784 void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
786 ::grpc::internal::Call call_;
787 ::grpc::ServerContext* ctx_;
788 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
790 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
791 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
792 ::grpc::internal::CallOpSendMessage,
793 ::grpc::internal::CallOpServerSendStatus>
798 class ServerAsyncWriterInterface
799 : public ::grpc::internal::ServerAsyncStreamingInterface,
800 public internal::AsyncWriterInterface<W> {
802 /// Indicate that the stream is to be finished with a certain status code.
803 /// Request notification for when the server has sent the appropriate
804 /// signals to the client to end the call.
805 /// Should not be used concurrently with other operations.
807 /// It is appropriate to call this method when either:
808 /// * all messages from the client have been received (either known
809 /// implictly, or explicitly because a previous \a
810 /// AsyncReaderInterface::Read operation with a non-ok
811 /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
812 /// * it is desired to end the call early with some non-OK status code.
814 /// This operation will end when the server has finished sending out initial
815 /// metadata (if not sent already), response message, and status, or if
816 /// some failure occurred when trying to do so.
818 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
819 /// to deallocate once Finish returns.
821 /// \param[in] tag Tag identifying this request.
822 /// \param[in] status To be sent to the client as the result of this call.
823 virtual void Finish(const ::grpc::Status& status, void* tag) = 0;
825 /// Request the writing of \a msg and coalesce it with trailing metadata which
826 /// contains \a status, using WriteOptions options with
827 /// identifying tag \a tag.
829 /// WriteAndFinish is equivalent of performing WriteLast and Finish
830 /// in a single step.
832 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
833 /// is safe to deallocate once WriteAndFinish returns.
835 /// \param[in] msg The message to be written.
836 /// \param[in] options The WriteOptions to be used to write this message.
837 /// \param[in] status The Status that server returns to client.
838 /// \param[in] tag The tag identifying the operation.
839 virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
840 const ::grpc::Status& status, void* tag) = 0;
843 /// Async server-side API for doing server streaming RPCs,
844 /// where the outgoing message stream from the server has messages of type \a W.
846 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
848 explicit ServerAsyncWriter(::grpc::ServerContext* ctx)
849 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
851 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
853 /// Implicit input parameter:
854 /// - The initial metadata that will be sent to the client from this op will
855 /// be taken from the \a ServerContext associated with the call.
857 /// \param[in] tag Tag identifying this request.
858 void SendInitialMetadata(void* tag) override {
859 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
861 meta_ops_.set_output_tag(tag);
862 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
863 ctx_->initial_metadata_flags());
864 if (ctx_->compression_level_set()) {
865 meta_ops_.set_compression_level(ctx_->compression_level());
867 ctx_->sent_initial_metadata_ = true;
868 call_.PerformOps(&meta_ops_);
871 void Write(const W& msg, void* tag) override {
872 write_ops_.set_output_tag(tag);
873 EnsureInitialMetadataSent(&write_ops_);
874 // TODO(ctiller): don't assert
875 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
876 call_.PerformOps(&write_ops_);
879 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
880 write_ops_.set_output_tag(tag);
881 if (options.is_last_message()) {
882 options.set_buffer_hint();
885 EnsureInitialMetadataSent(&write_ops_);
886 // TODO(ctiller): don't assert
887 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
888 call_.PerformOps(&write_ops_);
891 /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics.
893 /// Implicit input parameter:
894 /// - the \a ServerContext associated with this call is used
895 /// for sending trailing (and initial) metadata to the client.
897 /// Note: \a status must have an OK code.
899 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
900 /// is safe to deallocate once WriteAndFinish returns.
901 void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
902 const ::grpc::Status& status, void* tag) override {
903 write_ops_.set_output_tag(tag);
904 EnsureInitialMetadataSent(&write_ops_);
905 options.set_buffer_hint();
906 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
907 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
908 call_.PerformOps(&write_ops_);
911 /// See the \a ServerAsyncWriterInterface.Finish method for semantics.
913 /// Implicit input parameter:
914 /// - the \a ServerContext associated with this call is used for sending
915 /// trailing (and initial if not already sent) metadata to the client.
917 /// Note: there are no restrictions are the code of
918 /// \a status,it may be non-OK
920 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
921 /// to deallocate once Finish returns.
922 void Finish(const ::grpc::Status& status, void* tag) override {
923 finish_ops_.set_output_tag(tag);
924 EnsureInitialMetadataSent(&finish_ops_);
925 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
926 call_.PerformOps(&finish_ops_);
930 void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
933 void EnsureInitialMetadataSent(T* ops) {
934 if (!ctx_->sent_initial_metadata_) {
935 ops->SendInitialMetadata(&ctx_->initial_metadata_,
936 ctx_->initial_metadata_flags());
937 if (ctx_->compression_level_set()) {
938 ops->set_compression_level(ctx_->compression_level());
940 ctx_->sent_initial_metadata_ = true;
944 ::grpc::internal::Call call_;
945 ::grpc::ServerContext* ctx_;
946 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
948 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
949 ::grpc::internal::CallOpSendMessage,
950 ::grpc::internal::CallOpServerSendStatus>
952 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
953 ::grpc::internal::CallOpServerSendStatus>
957 /// Server-side interface for asynchronous bi-directional streaming.
958 template <class W, class R>
959 class ServerAsyncReaderWriterInterface
960 : public ::grpc::internal::ServerAsyncStreamingInterface,
961 public internal::AsyncWriterInterface<W>,
962 public internal::AsyncReaderInterface<R> {
964 /// Indicate that the stream is to be finished with a certain status code.
965 /// Request notification for when the server has sent the appropriate
966 /// signals to the client to end the call.
967 /// Should not be used concurrently with other operations.
969 /// It is appropriate to call this method when either:
970 /// * all messages from the client have been received (either known
971 /// implictly, or explicitly because a previous \a
972 /// AsyncReaderInterface::Read operation
973 /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
975 /// * it is desired to end the call early with some non-OK status code.
977 /// This operation will end when the server has finished sending out initial
978 /// metadata (if not sent already), response message, and status, or if some
979 /// failure occurred when trying to do so.
981 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
982 /// to deallocate once Finish returns.
984 /// \param[in] tag Tag identifying this request.
985 /// \param[in] status To be sent to the client as the result of this call.
986 virtual void Finish(const ::grpc::Status& status, void* tag) = 0;
988 /// Request the writing of \a msg and coalesce it with trailing metadata which
989 /// contains \a status, using WriteOptions options with
990 /// identifying tag \a tag.
992 /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
995 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
996 /// is safe to deallocate once WriteAndFinish returns.
998 /// \param[in] msg The message to be written.
999 /// \param[in] options The WriteOptions to be used to write this message.
1000 /// \param[in] status The Status that server returns to client.
1001 /// \param[in] tag The tag identifying the operation.
1002 virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
1003 const ::grpc::Status& status, void* tag) = 0;
1006 /// Async server-side API for doing bidirectional streaming RPCs,
1007 /// where the incoming message stream coming from the client has messages of
1008 /// type \a R, and the outgoing message stream coming from the server has
1009 /// messages of type \a W.
1010 template <class W, class R>
1011 class ServerAsyncReaderWriter final
1012 : public ServerAsyncReaderWriterInterface<W, R> {
1014 explicit ServerAsyncReaderWriter(::grpc::ServerContext* ctx)
1015 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1017 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
1019 /// Implicit input parameter:
1020 /// - The initial metadata that will be sent to the client from this op will
1021 /// be taken from the \a ServerContext associated with the call.
1023 /// \param[in] tag Tag identifying this request.
1024 void SendInitialMetadata(void* tag) override {
1025 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1027 meta_ops_.set_output_tag(tag);
1028 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1029 ctx_->initial_metadata_flags());
1030 if (ctx_->compression_level_set()) {
1031 meta_ops_.set_compression_level(ctx_->compression_level());
1033 ctx_->sent_initial_metadata_ = true;
1034 call_.PerformOps(&meta_ops_);
1037 void Read(R* msg, void* tag) override {
1038 read_ops_.set_output_tag(tag);
1039 read_ops_.RecvMessage(msg);
1040 call_.PerformOps(&read_ops_);
1043 void Write(const W& msg, void* tag) override {
1044 write_ops_.set_output_tag(tag);
1045 EnsureInitialMetadataSent(&write_ops_);
1046 // TODO(ctiller): don't assert
1047 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
1048 call_.PerformOps(&write_ops_);
1051 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override {
1052 write_ops_.set_output_tag(tag);
1053 if (options.is_last_message()) {
1054 options.set_buffer_hint();
1056 EnsureInitialMetadataSent(&write_ops_);
1057 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
1058 call_.PerformOps(&write_ops_);
1061 /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish
1062 /// method for semantics.
1064 /// Implicit input parameter:
1065 /// - the \a ServerContext associated with this call is used
1066 /// for sending trailing (and initial) metadata to the client.
1068 /// Note: \a status must have an OK code.
1070 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
1071 /// is safe to deallocate once WriteAndFinish returns.
1072 void WriteAndFinish(const W& msg, ::grpc::WriteOptions options,
1073 const ::grpc::Status& status, void* tag) override {
1074 write_ops_.set_output_tag(tag);
1075 EnsureInitialMetadataSent(&write_ops_);
1076 options.set_buffer_hint();
1077 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
1078 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1079 call_.PerformOps(&write_ops_);
1082 /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics.
1084 /// Implicit input parameter:
1085 /// - the \a ServerContext associated with this call is used for sending
1086 /// trailing (and initial if not already sent) metadata to the client.
1088 /// Note: there are no restrictions are the code of \a status,
1089 /// it may be non-OK
1091 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
1092 /// to deallocate once Finish returns.
1093 void Finish(const ::grpc::Status& status, void* tag) override {
1094 finish_ops_.set_output_tag(tag);
1095 EnsureInitialMetadataSent(&finish_ops_);
1097 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1098 call_.PerformOps(&finish_ops_);
1102 friend class ::grpc::Server;
1104 void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
1107 void EnsureInitialMetadataSent(T* ops) {
1108 if (!ctx_->sent_initial_metadata_) {
1109 ops->SendInitialMetadata(&ctx_->initial_metadata_,
1110 ctx_->initial_metadata_flags());
1111 if (ctx_->compression_level_set()) {
1112 ops->set_compression_level(ctx_->compression_level());
1114 ctx_->sent_initial_metadata_ = true;
1118 ::grpc::internal::Call call_;
1119 ::grpc::ServerContext* ctx_;
1120 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
1122 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
1123 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
1124 ::grpc::internal::CallOpSendMessage,
1125 ::grpc::internal::CallOpServerSendStatus>
1127 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
1128 ::grpc::internal::CallOpServerSendStatus>
1133 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H