3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
30 #include <grpcpp/impl/codegen/byte_buffer.h>
31 #include <grpcpp/impl/codegen/call.h>
32 #include <grpcpp/impl/codegen/call_hook.h>
33 #include <grpcpp/impl/codegen/call_op_set_interface.h>
34 #include <grpcpp/impl/codegen/client_context.h>
35 #include <grpcpp/impl/codegen/completion_queue.h>
36 #include <grpcpp/impl/codegen/completion_queue_tag.h>
37 #include <grpcpp/impl/codegen/config.h>
38 #include <grpcpp/impl/codegen/core_codegen_interface.h>
39 #include <grpcpp/impl/codegen/intercepted_channel.h>
40 #include <grpcpp/impl/codegen/interceptor_common.h>
41 #include <grpcpp/impl/codegen/serialization_traits.h>
42 #include <grpcpp/impl/codegen/slice.h>
43 #include <grpcpp/impl/codegen/string_ref.h>
45 #include <grpc/impl/codegen/atm.h>
46 #include <grpc/impl/codegen/compression_types.h>
47 #include <grpc/impl/codegen/grpc_types.h>
51 class CompletionQueue;
52 extern CoreCodegenInterface* g_core_codegen_interface;
58 // TODO(yangg) if the map is changed before we send, the pointers will be a
59 // mess. Make sure it does not happen.
60 inline grpc_metadata* FillMetadataArray(
61 const std::multimap<grpc::string, grpc::string>& metadata,
62 size_t* metadata_count, const grpc::string& optional_error_details) {
63 *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
64 if (*metadata_count == 0) {
67 grpc_metadata* metadata_array =
68 (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
69 (*metadata_count) * sizeof(grpc_metadata)));
71 for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
72 metadata_array[i].key = SliceReferencingString(iter->first);
73 metadata_array[i].value = SliceReferencingString(iter->second);
75 if (!optional_error_details.empty()) {
76 metadata_array[i].key =
77 g_core_codegen_interface->grpc_slice_from_static_buffer(
78 kBinaryErrorDetailsKey, sizeof(kBinaryErrorDetailsKey) - 1);
79 metadata_array[i].value = SliceReferencingString(optional_error_details);
81 return metadata_array;
83 } // namespace internal
85 /// Per-message write options.
88 WriteOptions() : flags_(0), last_message_(false) {}
89 WriteOptions(const WriteOptions& other)
90 : flags_(other.flags_), last_message_(other.last_message_) {}
93 inline void Clear() { flags_ = 0; }
95 /// Returns raw flags bitset.
96 inline uint32_t flags() const { return flags_; }
98 /// Sets flag for the disabling of compression for the next message write.
100 /// \sa GRPC_WRITE_NO_COMPRESS
101 inline WriteOptions& set_no_compression() {
102 SetBit(GRPC_WRITE_NO_COMPRESS);
106 /// Clears flag for the disabling of compression for the next message write.
108 /// \sa GRPC_WRITE_NO_COMPRESS
109 inline WriteOptions& clear_no_compression() {
110 ClearBit(GRPC_WRITE_NO_COMPRESS);
114 /// Get value for the flag indicating whether compression for the next
115 /// message write is forcefully disabled.
117 /// \sa GRPC_WRITE_NO_COMPRESS
118 inline bool get_no_compression() const {
119 return GetBit(GRPC_WRITE_NO_COMPRESS);
122 /// Sets flag indicating that the write may be buffered and need not go out on
123 /// the wire immediately.
125 /// \sa GRPC_WRITE_BUFFER_HINT
126 inline WriteOptions& set_buffer_hint() {
127 SetBit(GRPC_WRITE_BUFFER_HINT);
131 /// Clears flag indicating that the write may be buffered and need not go out
132 /// on the wire immediately.
134 /// \sa GRPC_WRITE_BUFFER_HINT
135 inline WriteOptions& clear_buffer_hint() {
136 ClearBit(GRPC_WRITE_BUFFER_HINT);
140 /// Get value for the flag indicating that the write may be buffered and need
141 /// not go out on the wire immediately.
143 /// \sa GRPC_WRITE_BUFFER_HINT
144 inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
146 /// corked bit: aliases set_buffer_hint currently, with the intent that
147 /// set_buffer_hint will be removed in the future
148 inline WriteOptions& set_corked() {
149 SetBit(GRPC_WRITE_BUFFER_HINT);
153 inline WriteOptions& clear_corked() {
154 ClearBit(GRPC_WRITE_BUFFER_HINT);
158 inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
160 /// last-message bit: indicates this is the last message in a stream
161 /// client-side: makes Write the equivalent of performing Write, WritesDone
163 /// server-side: hold the Write until the service handler returns (sync api)
164 /// or until Finish is called (async api)
165 inline WriteOptions& set_last_message() {
166 last_message_ = true;
170 /// Clears flag indicating that this is the last message in a stream,
171 /// disabling coalescing.
172 inline WriteOptions& clear_last_message() {
173 last_message_ = false;
177 /// Guarantee that all bytes have been written to the socket before completing
178 /// this write (usually writes are completed when they pass flow control).
179 inline WriteOptions& set_write_through() {
180 SetBit(GRPC_WRITE_THROUGH);
184 inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
186 /// Get value for the flag indicating that this is the last message, and
187 /// should be coalesced with trailing metadata.
189 /// \sa GRPC_WRITE_LAST_MESSAGE
190 bool is_last_message() const { return last_message_; }
192 WriteOptions& operator=(const WriteOptions& rhs) {
198 void SetBit(const uint32_t mask) { flags_ |= mask; }
200 void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
202 bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
210 /// Default argument for CallOpSet. I is unused by the class, but can be
211 /// used for generating multiple names for the same thing.
215 void AddOp(grpc_op* ops, size_t* nops) {}
216 void FinishOp(bool* status) {}
217 void SetInterceptionHookPoint(
218 InterceptorBatchMethodsImpl* interceptor_methods) {}
219 void SetFinishInterceptionHookPoint(
220 InterceptorBatchMethodsImpl* interceptor_methods) {}
221 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
224 class CallOpSendInitialMetadata {
226 CallOpSendInitialMetadata() : send_(false) {
227 maybe_compression_level_.is_set = false;
230 void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
232 maybe_compression_level_.is_set = false;
235 metadata_map_ = metadata;
238 void set_compression_level(grpc_compression_level level) {
239 maybe_compression_level_.is_set = true;
240 maybe_compression_level_.level = level;
244 void AddOp(grpc_op* ops, size_t* nops) {
245 if (!send_ || hijacked_) return;
246 grpc_op* op = &ops[(*nops)++];
247 op->op = GRPC_OP_SEND_INITIAL_METADATA;
251 FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
252 op->data.send_initial_metadata.count = initial_metadata_count_;
253 op->data.send_initial_metadata.metadata = initial_metadata_;
254 op->data.send_initial_metadata.maybe_compression_level.is_set =
255 maybe_compression_level_.is_set;
256 if (maybe_compression_level_.is_set) {
257 op->data.send_initial_metadata.maybe_compression_level.level =
258 maybe_compression_level_.level;
261 void FinishOp(bool* status) {
262 if (!send_ || hijacked_) return;
263 g_core_codegen_interface->gpr_free(initial_metadata_);
267 void SetInterceptionHookPoint(
268 InterceptorBatchMethodsImpl* interceptor_methods) {
270 interceptor_methods->AddInterceptionHookPoint(
271 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA);
272 interceptor_methods->SetSendInitialMetadata(metadata_map_);
275 void SetFinishInterceptionHookPoint(
276 InterceptorBatchMethodsImpl* interceptor_methods) {}
278 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
282 bool hijacked_ = false;
285 size_t initial_metadata_count_;
286 std::multimap<grpc::string, grpc::string>* metadata_map_;
287 grpc_metadata* initial_metadata_;
290 grpc_compression_level level;
291 } maybe_compression_level_;
294 class CallOpSendMessage {
296 CallOpSendMessage() : send_buf_() {}
298 /// Send \a message using \a options for the write. The \a options are cleared
301 Status SendMessage(const M& message,
302 WriteOptions options) GRPC_MUST_USE_RESULT;
305 Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
307 /// Send \a message using \a options for the write. The \a options are cleared
308 /// after use. This form of SendMessage allows gRPC to reference \a message
309 /// beyond the lifetime of SendMessage.
311 Status SendMessagePtr(const M* message,
312 WriteOptions options) GRPC_MUST_USE_RESULT;
314 /// This form of SendMessage allows gRPC to reference \a message beyond the
315 /// lifetime of SendMessage.
317 Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
320 void AddOp(grpc_op* ops, size_t* nops) {
321 if (msg_ == nullptr && !send_buf_.Valid()) return;
323 serializer_ = nullptr;
326 if (msg_ != nullptr) {
327 GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
329 serializer_ = nullptr;
330 grpc_op* op = &ops[(*nops)++];
331 op->op = GRPC_OP_SEND_MESSAGE;
332 op->flags = write_options_.flags();
334 op->data.send_message.send_message = send_buf_.c_buffer();
335 // Flags are per-message: clear them after use.
336 write_options_.Clear();
338 void FinishOp(bool* status) {
339 if (msg_ == nullptr && !send_buf_.Valid()) return;
340 if (hijacked_ && failed_send_) {
341 // Hijacking interceptor failed this Op
343 } else if (!*status) {
344 // This Op was passed down to core and the Op failed
349 void SetInterceptionHookPoint(
350 InterceptorBatchMethodsImpl* interceptor_methods) {
351 if (msg_ == nullptr && !send_buf_.Valid()) return;
352 interceptor_methods->AddInterceptionHookPoint(
353 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
354 interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
358 void SetFinishInterceptionHookPoint(
359 InterceptorBatchMethodsImpl* interceptor_methods) {
360 if (msg_ != nullptr || send_buf_.Valid()) {
361 interceptor_methods->AddInterceptionHookPoint(
362 experimental::InterceptionHookPoints::POST_SEND_MESSAGE);
366 // The contents of the SendMessage value that was previously set
367 // has had its references stolen by core's operations
368 interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
372 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
377 const void* msg_ = nullptr; // The original non-serialized message
378 bool hijacked_ = false;
379 bool failed_send_ = false;
380 ByteBuffer send_buf_;
381 WriteOptions write_options_;
382 std::function<Status(const void*)> serializer_;
386 Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
387 write_options_ = options;
388 serializer_ = [this](const void* message) {
391 // TODO(vjpai): Remove the void below when possible
392 // The void in the template parameter below should not be needed
393 // (since it should be implicit) but is needed due to an observed
394 // difference in behavior between clang and gcc for certain internal users
395 Status result = SerializationTraits<M, void>::Serialize(
396 *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
398 send_buf_.Duplicate();
402 // Serialize immediately only if we do not have access to the message pointer
403 if (msg_ == nullptr) {
404 Status result = serializer_(&message);
405 serializer_ = nullptr;
412 Status CallOpSendMessage::SendMessage(const M& message) {
413 return SendMessage(message, WriteOptions());
417 Status CallOpSendMessage::SendMessagePtr(const M* message,
418 WriteOptions options) {
420 return SendMessage(*message, options);
424 Status CallOpSendMessage::SendMessagePtr(const M* message) {
426 return SendMessage(*message, WriteOptions());
430 class CallOpRecvMessage {
433 : got_message(false),
435 allow_not_getting_message_(false) {}
437 void RecvMessage(R* message) { message_ = message; }
439 // Do not change status if no message is received.
440 void AllowNoMessage() { allow_not_getting_message_ = true; }
445 void AddOp(grpc_op* ops, size_t* nops) {
446 if (message_ == nullptr || hijacked_) return;
447 grpc_op* op = &ops[(*nops)++];
448 op->op = GRPC_OP_RECV_MESSAGE;
451 op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
454 void FinishOp(bool* status) {
455 if (message_ == nullptr || hijacked_) return;
456 if (recv_buf_.Valid()) {
458 got_message = *status =
459 SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
468 if (!allow_not_getting_message_) {
475 void SetInterceptionHookPoint(
476 InterceptorBatchMethodsImpl* interceptor_methods) {
477 if (message_ == nullptr) return;
478 interceptor_methods->SetRecvMessage(message_, &got_message);
481 void SetFinishInterceptionHookPoint(
482 InterceptorBatchMethodsImpl* interceptor_methods) {
483 if (message_ == nullptr) return;
484 interceptor_methods->AddInterceptionHookPoint(
485 experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
486 if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
488 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
490 if (message_ == nullptr) return;
491 interceptor_methods->AddInterceptionHookPoint(
492 experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
498 ByteBuffer recv_buf_;
499 bool allow_not_getting_message_;
500 bool hijacked_ = false;
503 class DeserializeFunc {
505 virtual Status Deserialize(ByteBuffer* buf) = 0;
506 virtual ~DeserializeFunc() {}
510 class DeserializeFuncType final : public DeserializeFunc {
512 DeserializeFuncType(R* message) : message_(message) {}
513 Status Deserialize(ByteBuffer* buf) override {
514 return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
517 ~DeserializeFuncType() override {}
520 R* message_; // Not a managed pointer because management is external to this
523 class CallOpGenericRecvMessage {
525 CallOpGenericRecvMessage()
526 : got_message(false), allow_not_getting_message_(false) {}
529 void RecvMessage(R* message) {
530 // Use an explicit base class pointer to avoid resolution error in the
531 // following unique_ptr::reset for some old implementations.
532 DeserializeFunc* func = new DeserializeFuncType<R>(message);
533 deserialize_.reset(func);
537 // Do not change status if no message is received.
538 void AllowNoMessage() { allow_not_getting_message_ = true; }
543 void AddOp(grpc_op* ops, size_t* nops) {
544 if (!deserialize_ || hijacked_) return;
545 grpc_op* op = &ops[(*nops)++];
546 op->op = GRPC_OP_RECV_MESSAGE;
549 op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
552 void FinishOp(bool* status) {
553 if (!deserialize_ || hijacked_) return;
554 if (recv_buf_.Valid()) {
557 *status = deserialize_->Deserialize(&recv_buf_).ok();
565 if (!allow_not_getting_message_) {
569 deserialize_.reset();
572 void SetInterceptionHookPoint(
573 InterceptorBatchMethodsImpl* interceptor_methods) {
574 if (!deserialize_) return;
575 interceptor_methods->SetRecvMessage(message_, &got_message);
578 void SetFinishInterceptionHookPoint(
579 InterceptorBatchMethodsImpl* interceptor_methods) {
580 if (!deserialize_) return;
581 interceptor_methods->AddInterceptionHookPoint(
582 experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
583 if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
585 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
587 if (!deserialize_) return;
588 interceptor_methods->AddInterceptionHookPoint(
589 experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
595 bool hijacked_ = false;
596 std::unique_ptr<DeserializeFunc> deserialize_;
597 ByteBuffer recv_buf_;
598 bool allow_not_getting_message_;
601 class CallOpClientSendClose {
603 CallOpClientSendClose() : send_(false) {}
605 void ClientSendClose() { send_ = true; }
608 void AddOp(grpc_op* ops, size_t* nops) {
609 if (!send_ || hijacked_) return;
610 grpc_op* op = &ops[(*nops)++];
611 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
615 void FinishOp(bool* status) { send_ = false; }
617 void SetInterceptionHookPoint(
618 InterceptorBatchMethodsImpl* interceptor_methods) {
620 interceptor_methods->AddInterceptionHookPoint(
621 experimental::InterceptionHookPoints::PRE_SEND_CLOSE);
624 void SetFinishInterceptionHookPoint(
625 InterceptorBatchMethodsImpl* interceptor_methods) {}
627 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
632 bool hijacked_ = false;
636 class CallOpServerSendStatus {
638 CallOpServerSendStatus() : send_status_available_(false) {}
640 void ServerSendStatus(
641 std::multimap<grpc::string, grpc::string>* trailing_metadata,
642 const Status& status) {
643 send_error_details_ = status.error_details();
644 metadata_map_ = trailing_metadata;
645 send_status_available_ = true;
646 send_status_code_ = static_cast<grpc_status_code>(status.error_code());
647 send_error_message_ = status.error_message();
651 void AddOp(grpc_op* ops, size_t* nops) {
652 if (!send_status_available_ || hijacked_) return;
653 trailing_metadata_ = FillMetadataArray(
654 *metadata_map_, &trailing_metadata_count_, send_error_details_);
655 grpc_op* op = &ops[(*nops)++];
656 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
657 op->data.send_status_from_server.trailing_metadata_count =
658 trailing_metadata_count_;
659 op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
660 op->data.send_status_from_server.status = send_status_code_;
661 error_message_slice_ = SliceReferencingString(send_error_message_);
662 op->data.send_status_from_server.status_details =
663 send_error_message_.empty() ? nullptr : &error_message_slice_;
668 void FinishOp(bool* status) {
669 if (!send_status_available_ || hijacked_) return;
670 g_core_codegen_interface->gpr_free(trailing_metadata_);
671 send_status_available_ = false;
674 void SetInterceptionHookPoint(
675 InterceptorBatchMethodsImpl* interceptor_methods) {
676 if (!send_status_available_) return;
677 interceptor_methods->AddInterceptionHookPoint(
678 experimental::InterceptionHookPoints::PRE_SEND_STATUS);
679 interceptor_methods->SetSendTrailingMetadata(metadata_map_);
680 interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
681 &send_error_message_);
684 void SetFinishInterceptionHookPoint(
685 InterceptorBatchMethodsImpl* interceptor_methods) {}
687 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
692 bool hijacked_ = false;
693 bool send_status_available_;
694 grpc_status_code send_status_code_;
695 grpc::string send_error_details_;
696 grpc::string send_error_message_;
697 size_t trailing_metadata_count_;
698 std::multimap<grpc::string, grpc::string>* metadata_map_;
699 grpc_metadata* trailing_metadata_;
700 grpc_slice error_message_slice_;
703 class CallOpRecvInitialMetadata {
705 CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
707 void RecvInitialMetadata(ClientContext* context) {
708 context->initial_metadata_received_ = true;
709 metadata_map_ = &context->recv_initial_metadata_;
713 void AddOp(grpc_op* ops, size_t* nops) {
714 if (metadata_map_ == nullptr || hijacked_) return;
715 grpc_op* op = &ops[(*nops)++];
716 op->op = GRPC_OP_RECV_INITIAL_METADATA;
717 op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
722 void FinishOp(bool* status) {
723 if (metadata_map_ == nullptr || hijacked_) return;
726 void SetInterceptionHookPoint(
727 InterceptorBatchMethodsImpl* interceptor_methods) {
728 interceptor_methods->SetRecvInitialMetadata(metadata_map_);
731 void SetFinishInterceptionHookPoint(
732 InterceptorBatchMethodsImpl* interceptor_methods) {
733 if (metadata_map_ == nullptr) return;
734 interceptor_methods->AddInterceptionHookPoint(
735 experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
736 metadata_map_ = nullptr;
739 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
741 if (metadata_map_ == nullptr) return;
742 interceptor_methods->AddInterceptionHookPoint(
743 experimental::InterceptionHookPoints::PRE_RECV_INITIAL_METADATA);
747 bool hijacked_ = false;
748 MetadataMap* metadata_map_;
751 class CallOpClientRecvStatus {
753 CallOpClientRecvStatus()
754 : recv_status_(nullptr), debug_error_string_(nullptr) {}
756 void ClientRecvStatus(ClientContext* context, Status* status) {
757 client_context_ = context;
758 metadata_map_ = &client_context_->trailing_metadata_;
759 recv_status_ = status;
760 error_message_ = g_core_codegen_interface->grpc_empty_slice();
764 void AddOp(grpc_op* ops, size_t* nops) {
765 if (recv_status_ == nullptr || hijacked_) return;
766 grpc_op* op = &ops[(*nops)++];
767 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
768 op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
769 op->data.recv_status_on_client.status = &status_code_;
770 op->data.recv_status_on_client.status_details = &error_message_;
771 op->data.recv_status_on_client.error_string = &debug_error_string_;
776 void FinishOp(bool* status) {
777 if (recv_status_ == nullptr || hijacked_) return;
778 grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
780 Status(static_cast<StatusCode>(status_code_),
781 GRPC_SLICE_IS_EMPTY(error_message_)
783 : grpc::string(GRPC_SLICE_START_PTR(error_message_),
784 GRPC_SLICE_END_PTR(error_message_)),
785 binary_error_details);
786 client_context_->set_debug_error_string(
787 debug_error_string_ != nullptr ? debug_error_string_ : "");
788 g_core_codegen_interface->grpc_slice_unref(error_message_);
789 if (debug_error_string_ != nullptr) {
790 g_core_codegen_interface->gpr_free((void*)debug_error_string_);
794 void SetInterceptionHookPoint(
795 InterceptorBatchMethodsImpl* interceptor_methods) {
796 interceptor_methods->SetRecvStatus(recv_status_);
797 interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
800 void SetFinishInterceptionHookPoint(
801 InterceptorBatchMethodsImpl* interceptor_methods) {
802 if (recv_status_ == nullptr) return;
803 interceptor_methods->AddInterceptionHookPoint(
804 experimental::InterceptionHookPoints::POST_RECV_STATUS);
805 recv_status_ = nullptr;
808 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
810 if (recv_status_ == nullptr) return;
811 interceptor_methods->AddInterceptionHookPoint(
812 experimental::InterceptionHookPoints::PRE_RECV_STATUS);
816 bool hijacked_ = false;
817 ClientContext* client_context_;
818 MetadataMap* metadata_map_;
819 Status* recv_status_;
820 const char* debug_error_string_;
821 grpc_status_code status_code_;
822 grpc_slice error_message_;
825 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
826 class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
827 class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
830 /// Primary implementation of CallOpSetInterface.
831 /// Since we cannot use variadic templates, we declare slots up to
832 /// the maximum count of ops we'll need in a set. We leverage the
833 /// empty base class optimization to slim this class (especially
834 /// when there are many unused slots used). To avoid duplicate base classes,
835 /// the template parmeter for CallNoOp is varied by argument position.
836 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
837 class CallOpSet : public CallOpSetInterface,
845 CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
846 // The copy constructor and assignment operator reset the value of
847 // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
848 // since those are only meaningful on a specific object, not across objects.
849 CallOpSet(const CallOpSet& other)
850 : core_cq_tag_(this),
853 done_intercepting_(false),
854 interceptor_methods_(InterceptorBatchMethodsImpl()) {}
856 CallOpSet& operator=(const CallOpSet& other) {
860 done_intercepting_ = false;
861 interceptor_methods_ = InterceptorBatchMethodsImpl();
865 void FillOps(Call* call) override {
866 done_intercepting_ = false;
867 g_core_codegen_interface->grpc_call_ref(call->call());
869 *call; // It's fine to create a copy of call since it's just pointers
871 if (RunInterceptors()) {
872 ContinueFillOpsAfterInterception();
874 // After the interceptors are run, ContinueFillOpsAfterInterception will
879 bool FinalizeResult(void** tag, bool* status) override {
880 if (done_intercepting_) {
881 // Complete the avalanching since we are done with this batch of ops
882 call_.cq()->CompleteAvalanching();
883 // We have already finished intercepting and filling in the results. This
884 // round trip from the core needed to be made because interceptors were
887 *status = saved_status_;
888 g_core_codegen_interface->grpc_call_unref(call_.call());
892 this->Op1::FinishOp(status);
893 this->Op2::FinishOp(status);
894 this->Op3::FinishOp(status);
895 this->Op4::FinishOp(status);
896 this->Op5::FinishOp(status);
897 this->Op6::FinishOp(status);
898 saved_status_ = *status;
899 if (RunInterceptorsPostRecv()) {
901 g_core_codegen_interface->grpc_call_unref(call_.call());
904 // Interceptors are going to be run, so we can't return the tag just yet.
905 // After the interceptors are run, ContinueFinalizeResultAfterInterception
909 void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
911 void* core_cq_tag() override { return core_cq_tag_; }
913 /// set_core_cq_tag is used to provide a different core CQ tag than "this".
914 /// This is used for callback-based tags, where the core tag is the core
915 /// callback function. It does not change the use or behavior of any other
916 /// function (such as FinalizeResult)
917 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
919 // This will be called while interceptors are run if the RPC is a hijacked
920 // RPC. This should set hijacking state for each of the ops.
921 void SetHijackingState() override {
922 this->Op1::SetHijackingState(&interceptor_methods_);
923 this->Op2::SetHijackingState(&interceptor_methods_);
924 this->Op3::SetHijackingState(&interceptor_methods_);
925 this->Op4::SetHijackingState(&interceptor_methods_);
926 this->Op5::SetHijackingState(&interceptor_methods_);
927 this->Op6::SetHijackingState(&interceptor_methods_);
930 // Should be called after interceptors are done running
931 void ContinueFillOpsAfterInterception() override {
932 static const size_t MAX_OPS = 6;
933 grpc_op ops[MAX_OPS];
935 this->Op1::AddOp(ops, &nops);
936 this->Op2::AddOp(ops, &nops);
937 this->Op3::AddOp(ops, &nops);
938 this->Op4::AddOp(ops, &nops);
939 this->Op5::AddOp(ops, &nops);
940 this->Op6::AddOp(ops, &nops);
941 GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
942 g_core_codegen_interface->grpc_call_start_batch(
943 call_.call(), ops, nops, core_cq_tag(), nullptr));
946 // Should be called after interceptors are done running on the finalize result
948 void ContinueFinalizeResultAfterInterception() override {
949 done_intercepting_ = true;
950 GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
951 g_core_codegen_interface->grpc_call_start_batch(
952 call_.call(), nullptr, 0, core_cq_tag(), nullptr));
956 // Returns true if no interceptors need to be run
957 bool RunInterceptors() {
958 interceptor_methods_.ClearState();
959 interceptor_methods_.SetCallOpSetInterface(this);
960 interceptor_methods_.SetCall(&call_);
961 this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
962 this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
963 this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
964 this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
965 this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
966 this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
967 if (interceptor_methods_.InterceptorsListEmpty()) {
970 // This call will go through interceptors and would need to
971 // schedule new batches, so delay completion queue shutdown
972 call_.cq()->RegisterAvalanching();
973 return interceptor_methods_.RunInterceptors();
975 // Returns true if no interceptors need to be run
976 bool RunInterceptorsPostRecv() {
977 // Call and OpSet had already been set on the set state.
978 // SetReverse also clears previously set hook points
979 interceptor_methods_.SetReverse();
980 this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
981 this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
982 this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
983 this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
984 this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
985 this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
986 return interceptor_methods_.RunInterceptors();
992 bool done_intercepting_ = false;
993 InterceptorBatchMethodsImpl interceptor_methods_;
997 } // namespace internal
1000 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H