3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/call_op_set.h>
25 #include <grpcpp/impl/codegen/callback_common.h>
26 #include <grpcpp/impl/codegen/channel_interface.h>
27 #include <grpcpp/impl/codegen/config.h>
28 #include <grpcpp/impl/codegen/core_codegen_interface.h>
29 #include <grpcpp/impl/codegen/status.h>
38 /// Perform a callback-based unary call
39 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
40 template <class InputMessage, class OutputMessage>
41 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
42 const ::grpc::internal::RpcMethod& method,
43 ::grpc::ClientContext* context,
44 const InputMessage* request, OutputMessage* result,
45 std::function<void(::grpc::Status)> on_completion) {
46 CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
47 channel, method, context, request, result, on_completion);
50 template <class InputMessage, class OutputMessage>
51 class CallbackUnaryCallImpl {
53 CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
54 const ::grpc::internal::RpcMethod& method,
55 ::grpc::ClientContext* context,
56 const InputMessage* request, OutputMessage* result,
57 std::function<void(::grpc::Status)> on_completion) {
58 ::grpc::CompletionQueue* cq = channel->CallbackCQ();
59 GPR_CODEGEN_ASSERT(cq != nullptr);
60 grpc::internal::Call call(channel->CreateCall(method, context, cq));
62 using FullCallOpSet = grpc::internal::CallOpSet<
63 ::grpc::internal::CallOpSendInitialMetadata,
64 grpc::internal::CallOpSendMessage,
65 grpc::internal::CallOpRecvInitialMetadata,
66 grpc::internal::CallOpRecvMessage<OutputMessage>,
67 grpc::internal::CallOpClientSendClose,
68 grpc::internal::CallOpClientRecvStatus>;
72 grpc::internal::CallbackWithStatusTag tag;
74 const size_t alloc_sz = sizeof(OpSetAndTag);
75 auto* const alloced = static_cast<OpSetAndTag*>(
76 ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
78 auto* ops = new (&alloced->opset) FullCallOpSet;
79 auto* tag = new (&alloced->tag)
80 grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
82 // TODO(vjpai): Unify code with sync API as much as possible
83 ::grpc::Status s = ops->SendMessagePtr(request);
88 ops->SendInitialMetadata(&context->send_initial_metadata_,
89 context->initial_metadata_flags());
90 ops->RecvInitialMetadata(context);
91 ops->RecvMessage(result);
92 ops->AllowNoMessage();
93 ops->ClientSendClose();
94 ops->ClientRecvStatus(context, tag->status_ptr());
95 ops->set_core_cq_tag(tag);
100 // Base class for public API classes.
101 class ClientReactor {
103 /// Called by the library when all operations associated with this RPC have
104 /// completed and all Holds have been removed. OnDone provides the RPC status
105 /// outcome for both successful and failed RPCs. If it is never called on an
106 /// RPC, it indicates an application-level problem (like failure to remove a
109 /// \param[in] s The status outcome of this RPC
110 virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
112 /// InternalScheduleOnDone is not part of the API and is not meant to be
113 /// overridden. It is virtual to allow successful builds for certain bazel
114 /// build users that only want to depend on gRPC codegen headers and not the
115 /// full library (although this is not a generally-supported option). Although
116 /// the virtual call is slower than a direct call, this function is
117 /// heavyweight and the cost of the virtual call is not much in comparison.
118 /// This function may be removed or devirtualized in the future.
119 virtual void InternalScheduleOnDone(::grpc::Status s);
122 } // namespace internal
124 // Forward declarations
125 template <class Request, class Response>
126 class ClientBidiReactor;
127 template <class Response>
128 class ClientReadReactor;
129 template <class Request>
130 class ClientWriteReactor;
131 class ClientUnaryReactor;
133 // NOTE: The streaming objects are not actually implemented in the public API.
134 // These interfaces are provided for mocking only. Typical applications
135 // will interact exclusively with the reactors that they define.
136 template <class Request, class Response>
137 class ClientCallbackReaderWriter {
139 virtual ~ClientCallbackReaderWriter() {}
140 virtual void StartCall() = 0;
141 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
142 virtual void WritesDone() = 0;
143 virtual void Read(Response* resp) = 0;
144 virtual void AddHold(int holds) = 0;
145 virtual void RemoveHold() = 0;
148 void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
149 reactor->BindStream(this);
153 template <class Response>
154 class ClientCallbackReader {
156 virtual ~ClientCallbackReader() {}
157 virtual void StartCall() = 0;
158 virtual void Read(Response* resp) = 0;
159 virtual void AddHold(int holds) = 0;
160 virtual void RemoveHold() = 0;
163 void BindReactor(ClientReadReactor<Response>* reactor) {
164 reactor->BindReader(this);
168 template <class Request>
169 class ClientCallbackWriter {
171 virtual ~ClientCallbackWriter() {}
172 virtual void StartCall() = 0;
173 void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
174 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
175 void WriteLast(const Request* req, ::grpc::WriteOptions options) {
176 Write(req, options.set_last_message());
178 virtual void WritesDone() = 0;
180 virtual void AddHold(int holds) = 0;
181 virtual void RemoveHold() = 0;
184 void BindReactor(ClientWriteReactor<Request>* reactor) {
185 reactor->BindWriter(this);
189 class ClientCallbackUnary {
191 virtual ~ClientCallbackUnary() {}
192 virtual void StartCall() = 0;
195 void BindReactor(ClientUnaryReactor* reactor);
198 // The following classes are the reactor interfaces that are to be implemented
199 // by the user. They are passed in to the library as an argument to a call on a
200 // stub (either a codegen-ed call or a generic call). The streaming RPC is
201 // activated by calling StartCall, possibly after initiating StartRead,
202 // StartWrite, or AddHold operations on the streaming object. Note that none of
203 // the classes are pure; all reactions have a default empty reaction so that the
204 // user class only needs to override those classes that it cares about.
205 // The reactor must be passed to the stub invocation before any of the below
206 // operations can be called.
208 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
209 template <class Request, class Response>
210 class ClientBidiReactor : public internal::ClientReactor {
212 virtual ~ClientBidiReactor() {}
214 /// Activate the RPC and initiate any reads or writes that have been Start'ed
215 /// before this call. All streaming RPCs issued by the client MUST have
216 /// StartCall invoked on them (even if they are canceled) as this call is the
217 /// activation of their lifecycle.
218 void StartCall() { stream_->StartCall(); }
220 /// Initiate a read operation (or post it for later initiation if StartCall
221 /// has not yet been invoked).
223 /// \param[out] resp Where to eventually store the read message. Valid when
224 /// the library calls OnReadDone
225 void StartRead(Response* resp) { stream_->Read(resp); }
227 /// Initiate a write operation (or post it for later initiation if StartCall
228 /// has not yet been invoked).
230 /// \param[in] req The message to be written. The library does not take
231 /// ownership but the caller must ensure that the message is
232 /// not deleted or modified until OnWriteDone is called.
233 void StartWrite(const Request* req) {
234 StartWrite(req, ::grpc::WriteOptions());
237 /// Initiate/post a write operation with specified options.
239 /// \param[in] req The message to be written. The library does not take
240 /// ownership but the caller must ensure that the message is
241 /// not deleted or modified until OnWriteDone is called.
242 /// \param[in] options The WriteOptions to use for writing this message
243 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
244 stream_->Write(req, std::move(options));
247 /// Initiate/post a write operation with specified options and an indication
248 /// that this is the last write (like StartWrite and StartWritesDone, merged).
249 /// Note that calling this means that no more calls to StartWrite,
250 /// StartWriteLast, or StartWritesDone are allowed.
252 /// \param[in] req The message to be written. The library does not take
253 /// ownership but the caller must ensure that the message is
254 /// not deleted or modified until OnWriteDone is called.
255 /// \param[in] options The WriteOptions to use for writing this message
256 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
257 StartWrite(req, std::move(options.set_last_message()));
260 /// Indicate that the RPC will have no more write operations. This can only be
261 /// issued once for a given RPC. This is not required or allowed if
262 /// StartWriteLast is used since that already has the same implication.
263 /// Note that calling this means that no more calls to StartWrite,
264 /// StartWriteLast, or StartWritesDone are allowed.
265 void StartWritesDone() { stream_->WritesDone(); }
267 /// Holds are needed if (and only if) this stream has operations that take
268 /// place on it after StartCall but from outside one of the reactions
269 /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
271 /// Holds must be added before calling StartCall. If a stream still has a hold
272 /// in place, its resources will not be destroyed even if the status has
273 /// already come in from the wire and there are currently no active callbacks
274 /// outstanding. Similarly, the stream will not call OnDone if there are still
277 /// For example, if a StartRead or StartWrite operation is going to be
278 /// initiated from elsewhere in the application, the application should call
279 /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
280 /// for example, a read-flow and a write-flow taking place outside the
281 /// reactions, then call AddMultipleHolds(2) before StartCall. When the
282 /// application knows that it won't issue any more read operations (such as
283 /// when a read comes back as not ok), it should issue a RemoveHold(). It
284 /// should also call RemoveHold() again after it does StartWriteLast or
285 /// StartWritesDone that indicates that there will be no more write ops.
286 /// The number of RemoveHold calls must match the total number of AddHold
287 /// calls plus the number of holds added by AddMultipleHolds.
288 /// The argument to AddMultipleHolds must be positive.
289 void AddHold() { AddMultipleHolds(1); }
290 void AddMultipleHolds(int holds) {
291 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
292 stream_->AddHold(holds);
294 void RemoveHold() { stream_->RemoveHold(); }
296 /// Notifies the application that all operations associated with this RPC
297 /// have completed and all Holds have been removed. OnDone provides the RPC
298 /// status outcome for both successful and failed RPCs and will be called in
299 /// all cases. If it is not called, it indicates an application-level problem
300 /// (like failure to remove a hold).
302 /// \param[in] s The status outcome of this RPC
303 void OnDone(const ::grpc::Status& /*s*/) override {}
305 /// Notifies the application that a read of initial metadata from the
306 /// server is done. If the application chooses not to implement this method,
307 /// it can assume that the initial metadata has been read before the first
308 /// call of OnReadDone or OnDone.
310 /// \param[in] ok Was the initial metadata read successfully? If false, no
311 /// new read/write operation will succeed, and any further
312 /// Start* operations should not be called.
313 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
315 /// Notifies the application that a StartRead operation completed.
317 /// \param[in] ok Was it successful? If false, no new read/write operation
318 /// will succeed, and any further Start* should not be called.
319 virtual void OnReadDone(bool /*ok*/) {}
321 /// Notifies the application that a StartWrite or StartWriteLast operation
324 /// \param[in] ok Was it successful? If false, no new read/write operation
325 /// will succeed, and any further Start* should not be called.
326 virtual void OnWriteDone(bool /*ok*/) {}
328 /// Notifies the application that a StartWritesDone operation completed. Note
329 /// that this is only used on explicit StartWritesDone operations and not for
330 /// those that are implicitly invoked as part of a StartWriteLast.
332 /// \param[in] ok Was it successful? If false, the application will later see
333 /// the failure reflected as a bad status in OnDone and no
334 /// further Start* should be called.
335 virtual void OnWritesDoneDone(bool /*ok*/) {}
338 friend class ClientCallbackReaderWriter<Request, Response>;
339 void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
342 ClientCallbackReaderWriter<Request, Response>* stream_;
345 /// \a ClientReadReactor is the interface for a server-streaming RPC.
346 /// All public methods behave as in ClientBidiReactor.
347 template <class Response>
348 class ClientReadReactor : public internal::ClientReactor {
350 virtual ~ClientReadReactor() {}
352 void StartCall() { reader_->StartCall(); }
353 void StartRead(Response* resp) { reader_->Read(resp); }
355 void AddHold() { AddMultipleHolds(1); }
356 void AddMultipleHolds(int holds) {
357 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
358 reader_->AddHold(holds);
360 void RemoveHold() { reader_->RemoveHold(); }
362 void OnDone(const ::grpc::Status& /*s*/) override {}
363 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
364 virtual void OnReadDone(bool /*ok*/) {}
367 friend class ClientCallbackReader<Response>;
368 void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
369 ClientCallbackReader<Response>* reader_;
372 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
373 /// All public methods behave as in ClientBidiReactor.
374 template <class Request>
375 class ClientWriteReactor : public internal::ClientReactor {
377 virtual ~ClientWriteReactor() {}
379 void StartCall() { writer_->StartCall(); }
380 void StartWrite(const Request* req) {
381 StartWrite(req, ::grpc::WriteOptions());
383 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
384 writer_->Write(req, std::move(options));
386 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
387 StartWrite(req, std::move(options.set_last_message()));
389 void StartWritesDone() { writer_->WritesDone(); }
391 void AddHold() { AddMultipleHolds(1); }
392 void AddMultipleHolds(int holds) {
393 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
394 writer_->AddHold(holds);
396 void RemoveHold() { writer_->RemoveHold(); }
398 void OnDone(const ::grpc::Status& /*s*/) override {}
399 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
400 virtual void OnWriteDone(bool /*ok*/) {}
401 virtual void OnWritesDoneDone(bool /*ok*/) {}
404 friend class ClientCallbackWriter<Request>;
405 void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
407 ClientCallbackWriter<Request>* writer_;
410 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
411 /// This is _not_ a common way of invoking a unary RPC. In practice, this
412 /// option should be used only if the unary RPC wants to receive initial
413 /// metadata without waiting for the response to complete. Most deployments of
414 /// RPC systems do not use this option, but it is needed for generality.
415 /// All public methods behave as in ClientBidiReactor.
416 /// StartCall is included for consistency with the other reactor flavors: even
417 /// though there are no StartRead or StartWrite operations to queue before the
418 /// call (that is part of the unary call itself) and there is no reactor object
419 /// being created as a result of this call, we keep a consistent 2-phase
420 /// initiation API among all the reactor flavors.
421 class ClientUnaryReactor : public internal::ClientReactor {
423 virtual ~ClientUnaryReactor() {}
425 void StartCall() { call_->StartCall(); }
426 void OnDone(const ::grpc::Status& /*s*/) override {}
427 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
430 friend class ClientCallbackUnary;
431 void BindCall(ClientCallbackUnary* call) { call_ = call; }
432 ClientCallbackUnary* call_;
435 // Define function out-of-line from class to avoid forward declaration issue
436 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
437 reactor->BindCall(this);
442 // Forward declare factory classes for friendship
443 template <class Request, class Response>
444 class ClientCallbackReaderWriterFactory;
445 template <class Response>
446 class ClientCallbackReaderFactory;
447 template <class Request>
448 class ClientCallbackWriterFactory;
450 template <class Request, class Response>
451 class ClientCallbackReaderWriterImpl
452 : public ClientCallbackReaderWriter<Request, Response> {
454 // always allocated against a call arena, no memory free required
455 static void operator delete(void* /*ptr*/, std::size_t size) {
456 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
459 // This operator should never be called as the memory should be freed as part
460 // of the arena destruction. It only exists to provide a matching operator
461 // delete to the operator new so that some compilers will not complain (see
462 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
463 // there are no tests catching the compiler warning.
464 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
466 void StartCall() override {
467 // This call initiates two batches, plus any backlog, each with a callback
468 // 1. Send initial metadata (unless corked) + recv initial metadata
469 // 2. Any read backlog
470 // 3. Any write backlog
471 // 4. Recv trailing metadata (unless corked)
472 if (!start_corked_) {
473 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
474 context_->initial_metadata_flags());
477 call_.PerformOps(&start_ops_);
480 grpc::internal::MutexLock lock(&start_mu_);
482 if (backlog_.read_ops) {
483 call_.PerformOps(&read_ops_);
485 if (backlog_.write_ops) {
486 call_.PerformOps(&write_ops_);
488 if (backlog_.writes_done_ops) {
489 call_.PerformOps(&writes_done_ops_);
491 call_.PerformOps(&finish_ops_);
492 // The last thing in this critical section is to set started_ so that it
493 // can be used lock-free as well.
494 started_.store(true, std::memory_order_release);
496 // MaybeFinish outside the lock to make sure that destruction of this object
497 // doesn't take place while holding the lock (which would cause the lock to
498 // be released after destruction)
499 this->MaybeFinish(/*from_reaction=*/false);
502 void Read(Response* msg) override {
503 read_ops_.RecvMessage(msg);
504 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
505 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
506 grpc::internal::MutexLock lock(&start_mu_);
507 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
508 backlog_.read_ops = true;
512 call_.PerformOps(&read_ops_);
515 void Write(const Request* msg, ::grpc::WriteOptions options) override {
516 if (options.is_last_message()) {
517 options.set_buffer_hint();
518 write_ops_.ClientSendClose();
520 // TODO(vjpai): don't assert
521 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
522 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
523 if (GPR_UNLIKELY(corked_write_needed_)) {
524 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
525 context_->initial_metadata_flags());
526 corked_write_needed_ = false;
529 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
530 grpc::internal::MutexLock lock(&start_mu_);
531 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
532 backlog_.write_ops = true;
536 call_.PerformOps(&write_ops_);
538 void WritesDone() override {
539 writes_done_ops_.ClientSendClose();
540 writes_done_tag_.Set(call_.call(),
542 reactor_->OnWritesDoneDone(ok);
543 MaybeFinish(/*from_reaction=*/true);
545 &writes_done_ops_, /*can_inline=*/false);
546 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
547 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
548 if (GPR_UNLIKELY(corked_write_needed_)) {
549 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
550 context_->initial_metadata_flags());
551 corked_write_needed_ = false;
553 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
554 grpc::internal::MutexLock lock(&start_mu_);
555 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
556 backlog_.writes_done_ops = true;
560 call_.PerformOps(&writes_done_ops_);
563 void AddHold(int holds) override {
564 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
566 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
569 friend class ClientCallbackReaderWriterFactory<Request, Response>;
571 ClientCallbackReaderWriterImpl(grpc::internal::Call call,
572 ::grpc::ClientContext* context,
573 ClientBidiReactor<Request, Response>* reactor)
577 start_corked_(context_->initial_metadata_corked_),
578 corked_write_needed_(start_corked_) {
579 this->BindReactor(reactor);
581 // Set up the unchanging parts of the start, read, and write tags and ops.
582 start_tag_.Set(call_.call(),
584 reactor_->OnReadInitialMetadataDone(ok);
585 MaybeFinish(/*from_reaction=*/true);
587 &start_ops_, /*can_inline=*/false);
588 start_ops_.RecvInitialMetadata(context_);
589 start_ops_.set_core_cq_tag(&start_tag_);
591 write_tag_.Set(call_.call(),
593 reactor_->OnWriteDone(ok);
594 MaybeFinish(/*from_reaction=*/true);
596 &write_ops_, /*can_inline=*/false);
597 write_ops_.set_core_cq_tag(&write_tag_);
599 read_tag_.Set(call_.call(),
601 reactor_->OnReadDone(ok);
602 MaybeFinish(/*from_reaction=*/true);
604 &read_ops_, /*can_inline=*/false);
605 read_ops_.set_core_cq_tag(&read_tag_);
607 // Also set up the Finish tag and op set.
610 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
612 /*can_inline=*/false);
613 finish_ops_.ClientRecvStatus(context_, &finish_status_);
614 finish_ops_.set_core_cq_tag(&finish_tag_);
617 // MaybeFinish can be called from reactions or from user-initiated operations
618 // like StartCall or RemoveHold. If this is the last operation or hold on this
619 // object, it will invoke the OnDone reaction. If MaybeFinish was called from
620 // a reaction, it can call OnDone directly. If not, it would need to schedule
621 // OnDone onto an executor thread to avoid the possibility of deadlocking with
622 // any locks in the user code that invoked it.
623 void MaybeFinish(bool from_reaction) {
624 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
625 1, std::memory_order_acq_rel) == 1)) {
626 ::grpc::Status s = std::move(finish_status_);
627 auto* reactor = reactor_;
628 auto* call = call_.call();
629 this->~ClientCallbackReaderWriterImpl();
630 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
631 if (GPR_LIKELY(from_reaction)) {
634 reactor->InternalScheduleOnDone(std::move(s));
639 ::grpc::ClientContext* const context_;
640 grpc::internal::Call call_;
641 ClientBidiReactor<Request, Response>* const reactor_;
643 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
644 grpc::internal::CallOpRecvInitialMetadata>
646 grpc::internal::CallbackWithSuccessTag start_tag_;
647 const bool start_corked_;
648 bool corked_write_needed_; // no lock needed since only accessed in
649 // Write/WritesDone which cannot be concurrent
651 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
652 grpc::internal::CallbackWithSuccessTag finish_tag_;
653 ::grpc::Status finish_status_;
655 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
656 grpc::internal::CallOpSendMessage,
657 grpc::internal::CallOpClientSendClose>
659 grpc::internal::CallbackWithSuccessTag write_tag_;
661 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
662 grpc::internal::CallOpClientSendClose>
664 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
666 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
668 grpc::internal::CallbackWithSuccessTag read_tag_;
670 struct StartCallBacklog {
671 bool write_ops = false;
672 bool writes_done_ops = false;
673 bool read_ops = false;
675 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
677 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
678 std::atomic<intptr_t> callbacks_outstanding_{3};
679 std::atomic_bool started_{false};
680 grpc::internal::Mutex start_mu_;
683 template <class Request, class Response>
684 class ClientCallbackReaderWriterFactory {
686 static void Create(::grpc::ChannelInterface* channel,
687 const ::grpc::internal::RpcMethod& method,
688 ::grpc::ClientContext* context,
689 ClientBidiReactor<Request, Response>* reactor) {
690 grpc::internal::Call call =
691 channel->CreateCall(method, context, channel->CallbackCQ());
693 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
694 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
695 call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
696 ClientCallbackReaderWriterImpl<Request, Response>(call, context,
701 template <class Response>
702 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
704 // always allocated against a call arena, no memory free required
705 static void operator delete(void* /*ptr*/, std::size_t size) {
706 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
709 // This operator should never be called as the memory should be freed as part
710 // of the arena destruction. It only exists to provide a matching operator
711 // delete to the operator new so that some compilers will not complain (see
712 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
713 // there are no tests catching the compiler warning.
714 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
716 void StartCall() override {
717 // This call initiates two batches, plus any backlog, each with a callback
718 // 1. Send initial metadata (unless corked) + recv initial metadata
720 // 3. Recv trailing metadata
722 start_tag_.Set(call_.call(),
724 reactor_->OnReadInitialMetadataDone(ok);
725 MaybeFinish(/*from_reaction=*/true);
727 &start_ops_, /*can_inline=*/false);
728 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
729 context_->initial_metadata_flags());
730 start_ops_.RecvInitialMetadata(context_);
731 start_ops_.set_core_cq_tag(&start_tag_);
732 call_.PerformOps(&start_ops_);
734 // Also set up the read tag so it doesn't have to be set up each time
735 read_tag_.Set(call_.call(),
737 reactor_->OnReadDone(ok);
738 MaybeFinish(/*from_reaction=*/true);
740 &read_ops_, /*can_inline=*/false);
741 read_ops_.set_core_cq_tag(&read_tag_);
744 grpc::internal::MutexLock lock(&start_mu_);
745 if (backlog_.read_ops) {
746 call_.PerformOps(&read_ops_);
748 started_.store(true, std::memory_order_release);
753 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
754 &finish_ops_, /*can_inline=*/false);
755 finish_ops_.ClientRecvStatus(context_, &finish_status_);
756 finish_ops_.set_core_cq_tag(&finish_tag_);
757 call_.PerformOps(&finish_ops_);
760 void Read(Response* msg) override {
761 read_ops_.RecvMessage(msg);
762 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
763 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
764 grpc::internal::MutexLock lock(&start_mu_);
765 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
766 backlog_.read_ops = true;
770 call_.PerformOps(&read_ops_);
773 void AddHold(int holds) override {
774 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
776 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
779 friend class ClientCallbackReaderFactory<Response>;
781 template <class Request>
782 ClientCallbackReaderImpl(::grpc::internal::Call call,
783 ::grpc::ClientContext* context, Request* request,
784 ClientReadReactor<Response>* reactor)
785 : context_(context), call_(call), reactor_(reactor) {
786 this->BindReactor(reactor);
787 // TODO(vjpai): don't assert
788 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
789 start_ops_.ClientSendClose();
792 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
793 void MaybeFinish(bool from_reaction) {
794 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
795 1, std::memory_order_acq_rel) == 1)) {
796 ::grpc::Status s = std::move(finish_status_);
797 auto* reactor = reactor_;
798 auto* call = call_.call();
799 this->~ClientCallbackReaderImpl();
800 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
801 if (GPR_LIKELY(from_reaction)) {
804 reactor->InternalScheduleOnDone(std::move(s));
809 ::grpc::ClientContext* const context_;
810 grpc::internal::Call call_;
811 ClientReadReactor<Response>* const reactor_;
813 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
814 grpc::internal::CallOpSendMessage,
815 grpc::internal::CallOpClientSendClose,
816 grpc::internal::CallOpRecvInitialMetadata>
818 grpc::internal::CallbackWithSuccessTag start_tag_;
820 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
821 grpc::internal::CallbackWithSuccessTag finish_tag_;
822 ::grpc::Status finish_status_;
824 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
826 grpc::internal::CallbackWithSuccessTag read_tag_;
828 struct StartCallBacklog {
829 bool read_ops = false;
831 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
833 // Minimum of 2 callbacks to pre-register for start and finish
834 std::atomic<intptr_t> callbacks_outstanding_{2};
835 std::atomic_bool started_{false};
836 grpc::internal::Mutex start_mu_;
839 template <class Response>
840 class ClientCallbackReaderFactory {
842 template <class Request>
843 static void Create(::grpc::ChannelInterface* channel,
844 const ::grpc::internal::RpcMethod& method,
845 ::grpc::ClientContext* context, const Request* request,
846 ClientReadReactor<Response>* reactor) {
847 grpc::internal::Call call =
848 channel->CreateCall(method, context, channel->CallbackCQ());
850 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
851 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
852 call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
853 ClientCallbackReaderImpl<Response>(call, context, request, reactor);
857 template <class Request>
858 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
860 // always allocated against a call arena, no memory free required
861 static void operator delete(void* /*ptr*/, std::size_t size) {
862 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
865 // This operator should never be called as the memory should be freed as part
866 // of the arena destruction. It only exists to provide a matching operator
867 // delete to the operator new so that some compilers will not complain (see
868 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
869 // there are no tests catching the compiler warning.
870 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
872 void StartCall() override {
873 // This call initiates two batches, plus any backlog, each with a callback
874 // 1. Send initial metadata (unless corked) + recv initial metadata
876 // 3. Recv trailing metadata
878 if (!start_corked_) {
879 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
880 context_->initial_metadata_flags());
882 call_.PerformOps(&start_ops_);
885 grpc::internal::MutexLock lock(&start_mu_);
887 if (backlog_.write_ops) {
888 call_.PerformOps(&write_ops_);
890 if (backlog_.writes_done_ops) {
891 call_.PerformOps(&writes_done_ops_);
893 call_.PerformOps(&finish_ops_);
894 // The last thing in this critical section is to set started_ so that it
895 // can be used lock-free as well.
896 started_.store(true, std::memory_order_release);
898 // MaybeFinish outside the lock to make sure that destruction of this object
899 // doesn't take place while holding the lock (which would cause the lock to
900 // be released after destruction)
901 this->MaybeFinish(/*from_reaction=*/false);
904 void Write(const Request* msg, ::grpc::WriteOptions options) override {
905 if (GPR_UNLIKELY(options.is_last_message())) {
906 options.set_buffer_hint();
907 write_ops_.ClientSendClose();
909 // TODO(vjpai): don't assert
910 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
911 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
913 if (GPR_UNLIKELY(corked_write_needed_)) {
914 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
915 context_->initial_metadata_flags());
916 corked_write_needed_ = false;
919 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
920 grpc::internal::MutexLock lock(&start_mu_);
921 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
922 backlog_.write_ops = true;
926 call_.PerformOps(&write_ops_);
929 void WritesDone() override {
930 writes_done_ops_.ClientSendClose();
931 writes_done_tag_.Set(call_.call(),
933 reactor_->OnWritesDoneDone(ok);
934 MaybeFinish(/*from_reaction=*/true);
936 &writes_done_ops_, /*can_inline=*/false);
937 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
938 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
940 if (GPR_UNLIKELY(corked_write_needed_)) {
941 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
942 context_->initial_metadata_flags());
943 corked_write_needed_ = false;
946 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
947 grpc::internal::MutexLock lock(&start_mu_);
948 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
949 backlog_.writes_done_ops = true;
953 call_.PerformOps(&writes_done_ops_);
956 void AddHold(int holds) override {
957 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
959 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
962 friend class ClientCallbackWriterFactory<Request>;
964 template <class Response>
965 ClientCallbackWriterImpl(::grpc::internal::Call call,
966 ::grpc::ClientContext* context, Response* response,
967 ClientWriteReactor<Request>* reactor)
971 start_corked_(context_->initial_metadata_corked_),
972 corked_write_needed_(start_corked_) {
973 this->BindReactor(reactor);
975 // Set up the unchanging parts of the start and write tags and ops.
976 start_tag_.Set(call_.call(),
978 reactor_->OnReadInitialMetadataDone(ok);
979 MaybeFinish(/*from_reaction=*/true);
981 &start_ops_, /*can_inline=*/false);
982 start_ops_.RecvInitialMetadata(context_);
983 start_ops_.set_core_cq_tag(&start_tag_);
985 write_tag_.Set(call_.call(),
987 reactor_->OnWriteDone(ok);
988 MaybeFinish(/*from_reaction=*/true);
990 &write_ops_, /*can_inline=*/false);
991 write_ops_.set_core_cq_tag(&write_tag_);
993 // Also set up the Finish tag and op set.
994 finish_ops_.RecvMessage(response);
995 finish_ops_.AllowNoMessage();
998 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1000 /*can_inline=*/false);
1001 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1002 finish_ops_.set_core_cq_tag(&finish_tag_);
1005 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1006 void MaybeFinish(bool from_reaction) {
1007 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1008 1, std::memory_order_acq_rel) == 1)) {
1009 ::grpc::Status s = std::move(finish_status_);
1010 auto* reactor = reactor_;
1011 auto* call = call_.call();
1012 this->~ClientCallbackWriterImpl();
1013 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1014 if (GPR_LIKELY(from_reaction)) {
1017 reactor->InternalScheduleOnDone(std::move(s));
1022 ::grpc::ClientContext* const context_;
1023 grpc::internal::Call call_;
1024 ClientWriteReactor<Request>* const reactor_;
1026 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1027 grpc::internal::CallOpRecvInitialMetadata>
1029 grpc::internal::CallbackWithSuccessTag start_tag_;
1030 const bool start_corked_;
1031 bool corked_write_needed_; // no lock needed since only accessed in
1032 // Write/WritesDone which cannot be concurrent
1034 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1035 grpc::internal::CallOpClientRecvStatus>
1037 grpc::internal::CallbackWithSuccessTag finish_tag_;
1038 ::grpc::Status finish_status_;
1040 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1041 grpc::internal::CallOpSendMessage,
1042 grpc::internal::CallOpClientSendClose>
1044 grpc::internal::CallbackWithSuccessTag write_tag_;
1046 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1047 grpc::internal::CallOpClientSendClose>
1049 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1051 struct StartCallBacklog {
1052 bool write_ops = false;
1053 bool writes_done_ops = false;
1055 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1057 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1058 std::atomic<intptr_t> callbacks_outstanding_{3};
1059 std::atomic_bool started_{false};
1060 grpc::internal::Mutex start_mu_;
1063 template <class Request>
1064 class ClientCallbackWriterFactory {
1066 template <class Response>
1067 static void Create(::grpc::ChannelInterface* channel,
1068 const ::grpc::internal::RpcMethod& method,
1069 ::grpc::ClientContext* context, Response* response,
1070 ClientWriteReactor<Request>* reactor) {
1071 grpc::internal::Call call =
1072 channel->CreateCall(method, context, channel->CallbackCQ());
1074 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1075 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1076 call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1077 ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1081 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1083 // always allocated against a call arena, no memory free required
1084 static void operator delete(void* /*ptr*/, std::size_t size) {
1085 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1088 // This operator should never be called as the memory should be freed as part
1089 // of the arena destruction. It only exists to provide a matching operator
1090 // delete to the operator new so that some compilers will not complain (see
1091 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1092 // there are no tests catching the compiler warning.
1093 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1095 void StartCall() override {
1096 // This call initiates two batches, each with a callback
1097 // 1. Send initial metadata + write + writes done + recv initial metadata
1098 // 2. Read message, recv trailing metadata
1100 start_tag_.Set(call_.call(),
1102 reactor_->OnReadInitialMetadataDone(ok);
1105 &start_ops_, /*can_inline=*/false);
1106 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1107 context_->initial_metadata_flags());
1108 start_ops_.RecvInitialMetadata(context_);
1109 start_ops_.set_core_cq_tag(&start_tag_);
1110 call_.PerformOps(&start_ops_);
1112 finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1114 /*can_inline=*/false);
1115 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1116 finish_ops_.set_core_cq_tag(&finish_tag_);
1117 call_.PerformOps(&finish_ops_);
1121 friend class ClientCallbackUnaryFactory;
1123 template <class Request, class Response>
1124 ClientCallbackUnaryImpl(::grpc::internal::Call call,
1125 ::grpc::ClientContext* context, Request* request,
1126 Response* response, ClientUnaryReactor* reactor)
1127 : context_(context), call_(call), reactor_(reactor) {
1128 this->BindReactor(reactor);
1129 // TODO(vjpai): don't assert
1130 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1131 start_ops_.ClientSendClose();
1132 finish_ops_.RecvMessage(response);
1133 finish_ops_.AllowNoMessage();
1136 // In the unary case, MaybeFinish is only ever invoked from a
1137 // library-initiated reaction, so it will just directly call OnDone if this is
1138 // the last reaction for this RPC.
1139 void MaybeFinish() {
1140 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1141 1, std::memory_order_acq_rel) == 1)) {
1142 ::grpc::Status s = std::move(finish_status_);
1143 auto* reactor = reactor_;
1144 auto* call = call_.call();
1145 this->~ClientCallbackUnaryImpl();
1146 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1151 ::grpc::ClientContext* const context_;
1152 grpc::internal::Call call_;
1153 ClientUnaryReactor* const reactor_;
1155 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1156 grpc::internal::CallOpSendMessage,
1157 grpc::internal::CallOpClientSendClose,
1158 grpc::internal::CallOpRecvInitialMetadata>
1160 grpc::internal::CallbackWithSuccessTag start_tag_;
1162 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1163 grpc::internal::CallOpClientRecvStatus>
1165 grpc::internal::CallbackWithSuccessTag finish_tag_;
1166 ::grpc::Status finish_status_;
1168 // This call will have 2 callbacks: start and finish
1169 std::atomic<intptr_t> callbacks_outstanding_{2};
1172 class ClientCallbackUnaryFactory {
1174 template <class Request, class Response>
1175 static void Create(::grpc::ChannelInterface* channel,
1176 const ::grpc::internal::RpcMethod& method,
1177 ::grpc::ClientContext* context, const Request* request,
1178 Response* response, ClientUnaryReactor* reactor) {
1179 grpc::internal::Call call =
1180 channel->CreateCall(method, context, channel->CallbackCQ());
1182 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1184 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1185 call.call(), sizeof(ClientCallbackUnaryImpl)))
1186 ClientCallbackUnaryImpl(call, context, request, response, reactor);
1190 } // namespace internal
1192 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
1193 namespace experimental {
1195 template <class Response>
1196 using ClientCallbackReader = ::grpc::ClientCallbackReader<Response>;
1198 template <class Request>
1199 using ClientCallbackWriter = ::grpc::ClientCallbackWriter<Request>;
1201 template <class Request, class Response>
1202 using ClientCallbackReaderWriter =
1203 ::grpc::ClientCallbackReaderWriter<Request, Response>;
1205 template <class Response>
1206 using ClientReadReactor = ::grpc::ClientReadReactor<Response>;
1208 template <class Request>
1209 using ClientWriteReactor = ::grpc::ClientWriteReactor<Request>;
1211 template <class Request, class Response>
1212 using ClientBidiReactor = ::grpc::ClientBidiReactor<Request, Response>;
1214 typedef ::grpc::ClientUnaryReactor ClientUnaryReactor;
1216 } // namespace experimental
1219 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H