#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/map.h"
+#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
-using grpc_core::internal::ClientChannelMethodParams;
-using grpc_core::internal::ClientChannelMethodParamsTable;
-using grpc_core::internal::ProcessedResolverResult;
+using grpc_core::internal::ClientChannelMethodParsedConfig;
using grpc_core::internal::ServerRetryThrottleData;
-using grpc_core::LoadBalancingPolicy;
-
-/* Client channel implementation */
+//
+// Client channel filter
+//
// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
// any even moderately compelling reason to do so.
#define RETRY_BACKOFF_JITTER 0.2
-grpc_core::TraceFlag grpc_client_channel_call_trace(false,
- "client_channel_call");
-grpc_core::TraceFlag grpc_client_channel_routing_trace(
- false, "client_channel_routing");
+// Max number of batches that can be pending on a call at any given
+// time. This includes one batch for each of the following ops:
+// recv_initial_metadata
+// send_initial_metadata
+// recv_message
+// send_message
+// recv_trailing_metadata
+// send_trailing_metadata
+#define MAX_PENDING_BATCHES 6
-/*************************************************************************
- * CHANNEL-WIDE FUNCTIONS
- */
+namespace grpc_core {
+
+TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
+TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
+
+namespace {
+
+//
+// ChannelData definition
+//
+
+class ChannelData {
+ public:
+ struct QueuedPick {
+ grpc_call_element* elem;
+ QueuedPick* next = nullptr;
+ };
+
+ static grpc_error* Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args);
+ static void Destroy(grpc_channel_element* elem);
+ static void StartTransportOp(grpc_channel_element* elem,
+ grpc_transport_op* op);
+ static void GetChannelInfo(grpc_channel_element* elem,
+ const grpc_channel_info* info);
+
+ bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
+ bool enable_retries() const { return enable_retries_; }
+ size_t per_rpc_retry_buffer_size() const {
+ return per_rpc_retry_buffer_size_;
+ }
+
+ // Note: Does NOT return a new ref.
+ grpc_error* disconnect_error() const {
+ return disconnect_error_.Load(MemoryOrder::ACQUIRE);
+ }
+
+ grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
+
+ LoadBalancingPolicy::SubchannelPicker* picker() const {
+ return picker_.get();
+ }
+ void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
+ void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
+
+ bool received_service_config_data() const {
+ return received_service_config_data_;
+ }
+ RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
+ return retry_throttle_data_;
+ }
+ RefCountedPtr<ServiceConfig> service_config() const {
+ return service_config_;
+ }
+
+ grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
+ void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
+ grpc_connectivity_state* state,
+ grpc_closure* on_complete,
+ grpc_closure* watcher_timer_init) {
+ // Will delete itself.
+ New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
+ watcher_timer_init);
+ }
+ int NumExternalConnectivityWatchers() const {
+ return external_connectivity_watcher_list_.size();
+ }
+
+ private:
+ class ConnectivityStateAndPickerSetter;
+ class ServiceConfigSetter;
+ class GrpcSubchannel;
+ class ClientChannelControlHelper;
+
+ class ExternalConnectivityWatcher {
+ public:
+ class WatcherList {
+ public:
+ WatcherList() { gpr_mu_init(&mu_); }
+ ~WatcherList() { gpr_mu_destroy(&mu_); }
+
+ int size() const;
+ ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
+ void Add(ExternalConnectivityWatcher* watcher);
+ void Remove(const ExternalConnectivityWatcher* watcher);
+
+ private:
+ // head_ is guarded by a mutex, since the size() method needs to
+ // iterate over the list, and it's called from the C-core API
+ // function grpc_channel_num_external_connectivity_watchers(), which
+ // is synchronous and therefore cannot run in the combiner.
+ mutable gpr_mu mu_;
+ ExternalConnectivityWatcher* head_ = nullptr;
+ };
+
+ ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
+ grpc_connectivity_state* state,
+ grpc_closure* on_complete,
+ grpc_closure* watcher_timer_init);
+
+ ~ExternalConnectivityWatcher();
+
+ private:
+ static void OnWatchCompleteLocked(void* arg, grpc_error* error);
+ static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
+
+ ChannelData* chand_;
+ grpc_polling_entity pollent_;
+ grpc_connectivity_state* state_;
+ grpc_closure* on_complete_;
+ grpc_closure* watcher_timer_init_;
+ grpc_closure my_closure_;
+ ExternalConnectivityWatcher* next_ = nullptr;
+ };
+
+ ChannelData(grpc_channel_element_args* args, grpc_error** error);
+ ~ChannelData();
+
+ static bool ProcessResolverResultLocked(
+ void* arg, const Resolver::Result& result, const char** lb_policy_name,
+ RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
+ grpc_error** service_config_error);
+
+ grpc_error* DoPingLocked(grpc_transport_op* op);
+
+ static void StartTransportOpLocked(void* arg, grpc_error* ignored);
+
+ static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
+
+ void ProcessLbPolicy(
+ const Resolver::Result& resolver_result,
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+ UniquePtr<char>* lb_policy_name,
+ RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
+
+ //
+ // Fields set at construction and never modified.
+ //
+ const bool deadline_checking_enabled_;
+ const bool enable_retries_;
+ const size_t per_rpc_retry_buffer_size_;
+ grpc_channel_stack* owning_stack_;
+ ClientChannelFactory* client_channel_factory_;
+ UniquePtr<char> server_name_;
+ RefCountedPtr<ServiceConfig> default_service_config_;
+ channelz::ChannelNode* channelz_node_;
+
+ //
+ // Fields used in the data plane. Guarded by data_plane_combiner.
+ //
+ grpc_combiner* data_plane_combiner_;
+ UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
+ QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
+ // Data from service config.
+ bool received_service_config_data_ = false;
+ RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
+ RefCountedPtr<ServiceConfig> service_config_;
+
+ //
+ // Fields used in the control plane. Guarded by combiner.
+ //
+ grpc_combiner* combiner_;
+ grpc_pollset_set* interested_parties_;
+ RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
+ OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
+ grpc_connectivity_state_tracker state_tracker_;
+ ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
+ UniquePtr<char> health_check_service_name_;
+ RefCountedPtr<ServiceConfig> saved_service_config_;
+ bool received_first_resolver_result_ = false;
+ Map<Subchannel*, int> subchannel_refcount_map_;
-struct external_connectivity_watcher;
+ //
+ // Fields accessed from both data plane and control plane combiners.
+ //
+ Atomic<grpc_error*> disconnect_error_;
-struct QueuedPick {
- LoadBalancingPolicy::PickArgs pick;
- grpc_call_element* elem;
- QueuedPick* next = nullptr;
+ //
+ // Fields guarded by a mutex, since they need to be accessed
+ // synchronously via get_channel_info().
+ //
+ gpr_mu info_mu_;
+ UniquePtr<char> info_lb_policy_name_;
+ UniquePtr<char> info_service_config_json_;
};
-typedef struct client_channel_channel_data {
- bool deadline_checking_enabled;
- bool enable_retries;
- size_t per_rpc_retry_buffer_size;
-
- /** combiner protecting all variables below in this data structure */
- grpc_combiner* combiner;
- /** owning stack */
- grpc_channel_stack* owning_stack;
- /** interested parties (owned) */
- grpc_pollset_set* interested_parties;
- // Client channel factory.
- grpc_core::ClientChannelFactory* client_channel_factory;
- // Subchannel pool.
- grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
-
- grpc_core::channelz::ClientChannelNode* channelz_node;
-
- // Resolving LB policy.
- grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
- // Subchannel picker from LB policy.
- grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
- // Linked list of queued picks.
- QueuedPick* queued_picks;
-
- bool have_service_config;
- /** retry throttle data from service config */
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
- /** per-method service config data */
- grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
-
- /* the following properties are guarded by a mutex since APIs require them
- to be instantaneously available */
- gpr_mu info_mu;
- grpc_core::UniquePtr<char> info_lb_policy_name;
- grpc_core::UniquePtr<char> info_service_config_json;
-
- grpc_connectivity_state_tracker state_tracker;
- grpc_error* disconnect_error;
-
- /* external_connectivity_watcher_list head is guarded by its own mutex, since
- * counts need to be grabbed immediately without polling on a cq */
- gpr_mu external_connectivity_watcher_list_mu;
- struct external_connectivity_watcher* external_connectivity_watcher_list_head;
-} channel_data;
-
-// Forward declarations.
-static void start_pick_locked(void* arg, grpc_error* ignored);
-static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem);
-
-static const char* get_channel_connectivity_state_change_string(
- grpc_connectivity_state state) {
- switch (state) {
- case GRPC_CHANNEL_IDLE:
- return "Channel state change to IDLE";
- case GRPC_CHANNEL_CONNECTING:
- return "Channel state change to CONNECTING";
- case GRPC_CHANNEL_READY:
- return "Channel state change to READY";
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- return "Channel state change to TRANSIENT_FAILURE";
- case GRPC_CHANNEL_SHUTDOWN:
- return "Channel state change to SHUTDOWN";
+//
+// CallData definition
+//
+
+class CallData {
+ public:
+ static grpc_error* Init(grpc_call_element* elem,
+ const grpc_call_element_args* args);
+ static void Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_schedule_closure);
+ static void StartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
+ static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
+
+ RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
+
+ // Invoked by channel for queued picks once resolver results are available.
+ void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
+
+ // Invoked by channel for queued picks when the picker is updated.
+ static void StartPickLocked(void* arg, grpc_error* error);
+
+ private:
+ class QueuedPickCanceller;
+
+ class LbCallState : public LoadBalancingPolicy::CallState {
+ public:
+ explicit LbCallState(CallData* calld) : calld_(calld) {}
+
+ void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
+
+ private:
+ CallData* calld_;
+ };
+
+ // State used for starting a retryable batch on a subchannel call.
+ // This provides its own grpc_transport_stream_op_batch and other data
+ // structures needed to populate the ops in the batch.
+ // We allocate one struct on the arena for each attempt at starting a
+ // batch on a given subchannel call.
+ struct SubchannelCallBatchData {
+ // Creates a SubchannelCallBatchData object on the call's arena with the
+ // specified refcount. If set_on_complete is true, the batch's
+ // on_complete callback will be set to point to on_complete();
+ // otherwise, the batch's on_complete callback will be null.
+ static SubchannelCallBatchData* Create(grpc_call_element* elem,
+ int refcount, bool set_on_complete);
+
+ void Unref() {
+ if (gpr_unref(&refs)) Destroy();
+ }
+
+ SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
+ int refcount, bool set_on_complete);
+ // All dtor code must be added in `Destroy()`. This is because we may
+ // call closures in `SubchannelCallBatchData` after they are unrefed by
+ // `Unref()`, and msan would complain about accessing this class
+ // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
+ // TODO(soheil): We should try to call the dtor in `Unref()`.
+ ~SubchannelCallBatchData() { Destroy(); }
+ void Destroy();
+
+ gpr_refcount refs;
+ grpc_call_element* elem;
+ RefCountedPtr<SubchannelCall> subchannel_call;
+ // The batch to use in the subchannel call.
+ // Its payload field points to SubchannelCallRetryState::batch_payload.
+ grpc_transport_stream_op_batch batch;
+ // For intercepting on_complete.
+ grpc_closure on_complete;
+ };
+
+ // Retry state associated with a subchannel call.
+ // Stored in the parent_data of the subchannel call object.
+ struct SubchannelCallRetryState {
+ explicit SubchannelCallRetryState(grpc_call_context_element* context)
+ : batch_payload(context),
+ started_send_initial_metadata(false),
+ completed_send_initial_metadata(false),
+ started_send_trailing_metadata(false),
+ completed_send_trailing_metadata(false),
+ started_recv_initial_metadata(false),
+ completed_recv_initial_metadata(false),
+ started_recv_trailing_metadata(false),
+ completed_recv_trailing_metadata(false),
+ retry_dispatched(false) {}
+
+ // SubchannelCallBatchData.batch.payload points to this.
+ grpc_transport_stream_op_batch_payload batch_payload;
+ // For send_initial_metadata.
+ // Note that we need to make a copy of the initial metadata for each
+ // subchannel call instead of just referring to the copy in call_data,
+ // because filters in the subchannel stack will probably add entries,
+ // so we need to start in a pristine state for each attempt of the call.
+ grpc_linked_mdelem* send_initial_metadata_storage;
+ grpc_metadata_batch send_initial_metadata;
+ // For send_message.
+ // TODO(roth): Restructure this to eliminate use of ManualConstructor.
+ ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
+ // For send_trailing_metadata.
+ grpc_linked_mdelem* send_trailing_metadata_storage;
+ grpc_metadata_batch send_trailing_metadata;
+ // For intercepting recv_initial_metadata.
+ grpc_metadata_batch recv_initial_metadata;
+ grpc_closure recv_initial_metadata_ready;
+ bool trailing_metadata_available = false;
+ // For intercepting recv_message.
+ grpc_closure recv_message_ready;
+ OrphanablePtr<ByteStream> recv_message;
+ // For intercepting recv_trailing_metadata.
+ grpc_metadata_batch recv_trailing_metadata;
+ grpc_transport_stream_stats collect_stats;
+ grpc_closure recv_trailing_metadata_ready;
+ // These fields indicate which ops have been started and completed on
+ // this subchannel call.
+ size_t started_send_message_count = 0;
+ size_t completed_send_message_count = 0;
+ size_t started_recv_message_count = 0;
+ size_t completed_recv_message_count = 0;
+ bool started_send_initial_metadata : 1;
+ bool completed_send_initial_metadata : 1;
+ bool started_send_trailing_metadata : 1;
+ bool completed_send_trailing_metadata : 1;
+ bool started_recv_initial_metadata : 1;
+ bool completed_recv_initial_metadata : 1;
+ bool started_recv_trailing_metadata : 1;
+ bool completed_recv_trailing_metadata : 1;
+ // State for callback processing.
+ SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
+ nullptr;
+ grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
+ SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
+ grpc_error* recv_message_error = GRPC_ERROR_NONE;
+ SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
+ // NOTE: Do not move this next to the metadata bitfields above. That would
+ // save space but will also result in a data race because compiler
+ // will generate a 2 byte store which overwrites the meta-data
+ // fields upon setting this field.
+ bool retry_dispatched : 1;
+ };
+
+ // Pending batches stored in call data.
+ struct PendingBatch {
+ // The pending batch. If nullptr, this slot is empty.
+ grpc_transport_stream_op_batch* batch;
+ // Indicates whether payload for send ops has been cached in CallData.
+ bool send_ops_cached;
+ };
+
+ CallData(grpc_call_element* elem, const ChannelData& chand,
+ const grpc_call_element_args& args);
+ ~CallData();
+
+ // Caches data for send ops so that it can be retried later, if not
+ // already cached.
+ void MaybeCacheSendOpsForBatch(PendingBatch* pending);
+ void FreeCachedSendInitialMetadata(ChannelData* chand);
+ // Frees cached send_message at index idx.
+ void FreeCachedSendMessage(ChannelData* chand, size_t idx);
+ void FreeCachedSendTrailingMetadata(ChannelData* chand);
+ // Frees cached send ops that have already been completed after
+ // committing the call.
+ void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
+ SubchannelCallRetryState* retry_state);
+ // Frees cached send ops that were completed by the completed batch in
+ // batch_data. Used when batches are completed after the call is committed.
+ void FreeCachedSendOpDataForCompletedBatch(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state);
+
+ static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
+ void* arg, grpc_error* error);
+ void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
+ grpc_transport_stream_op_batch* batch);
+
+ // Returns the index into pending_batches_ to be used for batch.
+ static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
+ void PendingBatchesAdd(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch);
+ void PendingBatchClear(PendingBatch* pending);
+ void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
+ static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
+ // A predicate type and some useful implementations for PendingBatchesFail().
+ typedef bool (*YieldCallCombinerPredicate)(
+ const CallCombinerClosureList& closures);
+ static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
+ return true;
}
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
+ static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
+ return false;
+ }
+ static bool YieldCallCombinerIfPendingBatchesFound(
+ const CallCombinerClosureList& closures) {
+ return closures.size() > 0;
+ }
+ // Fails all pending batches.
+ // If yield_call_combiner_predicate returns true, assumes responsibility for
+ // yielding the call combiner.
+ void PendingBatchesFail(
+ grpc_call_element* elem, grpc_error* error,
+ YieldCallCombinerPredicate yield_call_combiner_predicate);
+ static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
+ // Resumes all pending batches on subchannel_call_.
+ void PendingBatchesResume(grpc_call_element* elem);
+ // Returns a pointer to the first pending batch for which predicate(batch)
+ // returns true, or null if not found.
+ template <typename Predicate>
+ PendingBatch* PendingBatchFind(grpc_call_element* elem,
+ const char* log_message, Predicate predicate);
+
+ // Commits the call so that no further retry attempts will be performed.
+ void RetryCommit(grpc_call_element* elem,
+ SubchannelCallRetryState* retry_state);
+ // Starts a retry after appropriate back-off.
+ void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+ grpc_millis server_pushback_ms);
+ // Returns true if the call is being retried.
+ bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ grpc_status_code status, grpc_mdelem* server_pushback_md);
+
+ // Invokes recv_initial_metadata_ready for a subchannel batch.
+ static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
+ // Intercepts recv_initial_metadata_ready callback for retries.
+ // Commits the call and returns the initial metadata up the stack.
+ static void RecvInitialMetadataReady(void* arg, grpc_error* error);
+
+ // Invokes recv_message_ready for a subchannel batch.
+ static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
+ // Intercepts recv_message_ready callback for retries.
+ // Commits the call and returns the message up the stack.
+ static void RecvMessageReady(void* arg, grpc_error* error);
+
+ // Sets *status and *server_pushback_md based on md_batch and error.
+ // Only sets *server_pushback_md if server_pushback_md != nullptr.
+ void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
+ grpc_error* error, grpc_status_code* status,
+ grpc_mdelem** server_pushback_md);
+ // Adds recv_trailing_metadata_ready closure to closures.
+ void AddClosureForRecvTrailingMetadataReady(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ grpc_error* error, CallCombinerClosureList* closures);
+ // Adds any necessary closures for deferred recv_initial_metadata and
+ // recv_message callbacks to closures.
+ static void AddClosuresForDeferredRecvCallbacks(
+ SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
+ // Returns true if any op in the batch was not yet started.
+ // Only looks at send ops, since recv ops are always started immediately.
+ bool PendingBatchIsUnstarted(PendingBatch* pending,
+ SubchannelCallRetryState* retry_state);
+ // For any pending batch containing an op that has not yet been started,
+ // adds the pending batch's completion closures to closures.
+ void AddClosuresToFailUnstartedPendingBatches(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+ grpc_error* error, CallCombinerClosureList* closures);
+ // Runs necessary closures upon completion of a call attempt.
+ void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
+ grpc_error* error);
+ // Intercepts recv_trailing_metadata_ready callback for retries.
+ // Commits the call and returns the trailing metadata up the stack.
+ static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
+
+ // Adds the on_complete closure for the pending batch completed in
+ // batch_data to closures.
+ void AddClosuresForCompletedPendingBatch(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state, grpc_error* error,
+ CallCombinerClosureList* closures);
+
+ // If there are any cached ops to replay or pending ops to start on the
+ // subchannel call, adds a closure to closures to invoke
+ // StartRetriableSubchannelBatches().
+ void AddClosuresForReplayOrPendingSendOps(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
+
+ // Callback used to intercept on_complete from subchannel calls.
+ // Called only when retries are enabled.
+ static void OnComplete(void* arg, grpc_error* error);
+
+ static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
+ // Adds a closure to closures that will execute batch in the call combiner.
+ void AddClosureForSubchannelBatch(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch,
+ CallCombinerClosureList* closures);
+ // Adds retriable send_initial_metadata op to batch_data.
+ void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data);
+ // Adds retriable send_message op to batch_data.
+ void AddRetriableSendMessageOp(grpc_call_element* elem,
+ SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data);
+ // Adds retriable send_trailing_metadata op to batch_data.
+ void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data);
+ // Adds retriable recv_initial_metadata op to batch_data.
+ void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data);
+ // Adds retriable recv_message op to batch_data.
+ void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data);
+ // Adds retriable recv_trailing_metadata op to batch_data.
+ void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data);
+ // Helper function used to start a recv_trailing_metadata batch. This
+ // is used in the case where a recv_initial_metadata or recv_message
+ // op fails in a way that we know the call is over but when the application
+ // has not yet started its own recv_trailing_metadata op.
+ void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
+ // If there are any cached send ops that need to be replayed on the
+ // current subchannel call, creates and returns a new subchannel batch
+ // to replay those ops. Otherwise, returns nullptr.
+ SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state);
+ // Adds subchannel batches for pending batches to closures.
+ void AddSubchannelBatchesForPendingBatches(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+ CallCombinerClosureList* closures);
+ // Constructs and starts whatever subchannel batches are needed on the
+ // subchannel call.
+ static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
+
+ void CreateSubchannelCall(grpc_call_element* elem);
+ // Invoked when a pick is completed, on both success or failure.
+ static void PickDone(void* arg, grpc_error* error);
+ // Removes the call from the channel's list of queued picks.
+ void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
+ // Adds the call to the channel's list of queued picks.
+ void AddCallToQueuedPicksLocked(grpc_call_element* elem);
+ // Applies service config to the call. Must be invoked once we know
+ // that the resolver has returned results to the channel.
+ void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
+
+ // State for handling deadlines.
+ // The code in deadline_filter.c requires this to be the first field.
+ // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+ // and this struct both independently store pointers to the call stack
+ // and call combiner. If/when we have time, find a way to avoid this
+ // without breaking the grpc_deadline_state abstraction.
+ grpc_deadline_state deadline_state_;
+
+ grpc_slice path_; // Request path.
+ gpr_timespec call_start_time_;
+ grpc_millis deadline_;
+ Arena* arena_;
+ grpc_call_stack* owning_call_;
+ CallCombiner* call_combiner_;
+ grpc_call_context_element* call_context_;
+
+ RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
+ ServiceConfig::CallData service_config_call_data_;
+ const ClientChannelMethodParsedConfig* method_params_ = nullptr;
+
+ RefCountedPtr<SubchannelCall> subchannel_call_;
+
+ // Set when we get a cancel_stream op.
+ grpc_error* cancel_error_ = GRPC_ERROR_NONE;
+
+ ChannelData::QueuedPick pick_;
+ bool pick_queued_ = false;
+ bool service_config_applied_ = false;
+ QueuedPickCanceller* pick_canceller_ = nullptr;
+ LbCallState lb_call_state_;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+ void (*lb_recv_trailing_metadata_ready_)(
+ void* user_data, grpc_metadata_batch* recv_trailing_metadata,
+ LoadBalancingPolicy::CallState* call_state) = nullptr;
+ void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
+ grpc_closure pick_closure_;
+
+ // For intercepting recv_trailing_metadata_ready for the LB policy.
+ grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
+ grpc_closure recv_trailing_metadata_ready_;
+ grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
+
+ grpc_polling_entity* pollent_ = nullptr;
+
+ // Batches are added to this list when received from above.
+ // They are removed when we are done handling the batch (i.e., when
+ // either we have invoked all of the batch's callbacks or we have
+ // passed the batch down to the subchannel call and are not
+ // intercepting any of its callbacks).
+ PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
+ bool pending_send_initial_metadata_ : 1;
+ bool pending_send_message_ : 1;
+ bool pending_send_trailing_metadata_ : 1;
+
+ // Retry state.
+ bool enable_retries_ : 1;
+ bool retry_committed_ : 1;
+ bool last_attempt_got_server_pushback_ : 1;
+ int num_attempts_completed_ = 0;
+ size_t bytes_buffered_for_retry_ = 0;
+ // TODO(roth): Restructure this to eliminate use of ManualConstructor.
+ ManualConstructor<BackOff> retry_backoff_;
+ grpc_timer retry_timer_;
+
+ // The number of pending retriable subchannel batches containing send ops.
+ // We hold a ref to the call stack while this is non-zero, since replay
+ // batches may not complete until after all callbacks have been returned
+ // to the surface, and we need to make sure that the call is not destroyed
+ // until all of these batches have completed.
+ // Note that we actually only need to track replay batches, but it's
+ // easier to track all batches with send ops.
+ int num_pending_retriable_subchannel_send_batches_ = 0;
+
+ // Cached data for retrying send ops.
+ // send_initial_metadata
+ bool seen_send_initial_metadata_ = false;
+ grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
+ grpc_metadata_batch send_initial_metadata_;
+ uint32_t send_initial_metadata_flags_;
+ gpr_atm* peer_string_;
+ // send_message
+ // When we get a send_message op, we replace the original byte stream
+ // with a CachingByteStream that caches the slices to a local buffer for
+ // use in retries.
+ // Note: We inline the cache for the first 3 send_message ops and use
+ // dynamic allocation after that. This number was essentially picked
+ // at random; it could be changed in the future to tune performance.
+ InlinedVector<ByteStreamCache*, 3> send_messages_;
+ // send_trailing_metadata
+ bool seen_send_trailing_metadata_ = false;
+ grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
+ grpc_metadata_batch send_trailing_metadata_;
+};
+
+//
+// ChannelData::ConnectivityStateAndPickerSetter
+//
+
+// A fire-and-forget class that sets the channel's connectivity state
+// and then hops into the data plane combiner to update the picker.
+// Must be instantiated while holding the control plane combiner.
+// Deletes itself when done.
+class ChannelData::ConnectivityStateAndPickerSetter {
+ public:
+ ConnectivityStateAndPickerSetter(
+ ChannelData* chand, grpc_connectivity_state state, const char* reason,
+ UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
+ : chand_(chand), picker_(std::move(picker)) {
+ // Update connectivity state here, while holding control plane combiner.
+ grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
+ if (chand->channelz_node_ != nullptr) {
+ chand->channelz_node_->SetConnectivityState(state);
+ chand->channelz_node_->AddTraceEvent(
+ channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string(
+ GetChannelConnectivityStateChangeString(state)));
+ }
+ // Bounce into the data plane combiner to reset the picker.
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
+ "ConnectivityStateAndPickerSetter");
+ GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
+ grpc_combiner_scheduler(chand->data_plane_combiner_));
+ GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
+ }
+
+ private:
+ static const char* GetChannelConnectivityStateChangeString(
+ grpc_connectivity_state state) {
+ switch (state) {
+ case GRPC_CHANNEL_IDLE:
+ return "Channel state change to IDLE";
+ case GRPC_CHANNEL_CONNECTING:
+ return "Channel state change to CONNECTING";
+ case GRPC_CHANNEL_READY:
+ return "Channel state change to READY";
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ return "Channel state change to TRANSIENT_FAILURE";
+ case GRPC_CHANNEL_SHUTDOWN:
+ return "Channel state change to SHUTDOWN";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+ }
+
+ static void SetPicker(void* arg, grpc_error* ignored) {
+ auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
+ // Update picker.
+ self->chand_->picker_ = std::move(self->picker_);
+ // Re-process queued picks.
+ for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
+ pick = pick->next) {
+ CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
+ }
+ // Clean up.
+ GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
+ "ConnectivityStateAndPickerSetter");
+ Delete(self);
+ }
+
+ ChannelData* chand_;
+ UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
+ grpc_closure closure_;
+};
+
+//
+// ChannelData::ServiceConfigSetter
+//
+
+// A fire-and-forget class that sets the channel's service config data
+// in the data plane combiner. Deletes itself when done.
+class ChannelData::ServiceConfigSetter {
+ public:
+ ServiceConfigSetter(
+ ChannelData* chand,
+ Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
+ retry_throttle_data,
+ RefCountedPtr<ServiceConfig> service_config)
+ : chand_(chand),
+ retry_throttle_data_(retry_throttle_data),
+ service_config_(std::move(service_config)) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
+ GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
+ grpc_combiner_scheduler(chand->data_plane_combiner_));
+ GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
+ }
+
+ private:
+ static void SetServiceConfigData(void* arg, grpc_error* ignored) {
+ ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
+ ChannelData* chand = self->chand_;
+ // Update channel state.
+ chand->received_service_config_data_ = true;
+ if (self->retry_throttle_data_.has_value()) {
+ chand->retry_throttle_data_ =
+ internal::ServerRetryThrottleMap::GetDataForServer(
+ chand->server_name_.get(),
+ self->retry_throttle_data_.value().max_milli_tokens,
+ self->retry_throttle_data_.value().milli_token_ratio);
+ }
+ chand->service_config_ = std::move(self->service_config_);
+ // Apply service config to queued picks.
+ for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
+ pick = pick->next) {
+ CallData* calld = static_cast<CallData*>(pick->elem->call_data);
+ calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
+ }
+ // Clean up.
+ GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
+ "ServiceConfigSetter");
+ Delete(self);
+ }
+
+ ChannelData* chand_;
+ Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
+ retry_throttle_data_;
+ RefCountedPtr<ServiceConfig> service_config_;
+ grpc_closure closure_;
+};
+
+//
+// ChannelData::ExternalConnectivityWatcher::WatcherList
+//
+
+int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
+ MutexLock lock(&mu_);
+ int count = 0;
+ for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
+ ++count;
+ }
+ return count;
}
-static void set_connectivity_state_and_picker_locked(
- channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
- const char* reason,
- grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
- // Update connectivity state.
- grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
- reason);
- if (chand->channelz_node != nullptr) {
- chand->channelz_node->AddTraceEvent(
- grpc_core::channelz::ChannelTrace::Severity::Info,
- grpc_slice_from_static_string(
- get_channel_connectivity_state_change_string(state)));
+ChannelData::ExternalConnectivityWatcher*
+ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
+ grpc_closure* on_complete) const {
+ MutexLock lock(&mu_);
+ ExternalConnectivityWatcher* w = head_;
+ while (w != nullptr && w->on_complete_ != on_complete) {
+ w = w->next_;
+ }
+ return w;
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
+ ExternalConnectivityWatcher* watcher) {
+ GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
+ MutexLock lock(&mu_);
+ GPR_ASSERT(watcher->next_ == nullptr);
+ watcher->next_ = head_;
+ head_ = watcher;
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
+ const ExternalConnectivityWatcher* watcher) {
+ MutexLock lock(&mu_);
+ if (watcher == head_) {
+ head_ = watcher->next_;
+ return;
}
- // Update picker.
- chand->picker = std::move(picker);
- // Re-process queued picks.
- for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
- pick = pick->next) {
- start_pick_locked(pick->elem, GRPC_ERROR_NONE);
+ for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
+ if (w->next_ == watcher) {
+ w->next_ = w->next_->next_;
+ return;
+ }
}
+ GPR_UNREACHABLE_CODE(return );
}
-namespace grpc_core {
-namespace {
+//
+// ChannelData::ExternalConnectivityWatcher
+//
+
+ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
+ ChannelData* chand, grpc_polling_entity pollent,
+ grpc_connectivity_state* state, grpc_closure* on_complete,
+ grpc_closure* watcher_timer_init)
+ : chand_(chand),
+ pollent_(pollent),
+ state_(state),
+ on_complete_(on_complete),
+ watcher_timer_init_(watcher_timer_init) {
+ grpc_polling_entity_add_to_pollset_set(&pollent_,
+ chand_->interested_parties_);
+ GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
+ grpc_combiner_scheduler(chand_->combiner_)),
+ GRPC_ERROR_NONE);
+}
+
+ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
+ grpc_polling_entity_del_from_pollset_set(&pollent_,
+ chand_->interested_parties_);
+ GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
+ "ExternalConnectivityWatcher");
+}
+
+void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
+ void* arg, grpc_error* error) {
+ ExternalConnectivityWatcher* self =
+ static_cast<ExternalConnectivityWatcher*>(arg);
+ grpc_closure* on_complete = self->on_complete_;
+ self->chand_->external_connectivity_watcher_list_.Remove(self);
+ Delete(self);
+ GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
+ void* arg, grpc_error* ignored) {
+ ExternalConnectivityWatcher* self =
+ static_cast<ExternalConnectivityWatcher*>(arg);
+ if (self->state_ == nullptr) {
+ // Handle cancellation.
+ GPR_ASSERT(self->watcher_timer_init_ == nullptr);
+ ExternalConnectivityWatcher* found =
+ self->chand_->external_connectivity_watcher_list_.Lookup(
+ self->on_complete_);
+ if (found != nullptr) {
+ grpc_connectivity_state_notify_on_state_change(
+ &found->chand_->state_tracker_, nullptr, &found->my_closure_);
+ }
+ Delete(self);
+ return;
+ }
+ // New watcher.
+ self->chand_->external_connectivity_watcher_list_.Add(self);
+ // This assumes that the closure is scheduled on the ExecCtx scheduler
+ // and that GRPC_CLOSURE_RUN would run the closure immediately.
+ GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
+ grpc_combiner_scheduler(self->chand_->combiner_));
+ grpc_connectivity_state_notify_on_state_change(
+ &self->chand_->state_tracker_, self->state_, &self->my_closure_);
+}
+
+//
+// ChannelData::GrpcSubchannel
+//
-class ClientChannelControlHelper
+// This class is a wrapper for Subchannel that hides details of the
+// channel's implementation (such as the health check service name) from
+// the LB policy API.
+//
+// Note that no synchronization is needed here, because even if the
+// underlying subchannel is shared between channels, this wrapper will only
+// be used within one channel, so it will always be synchronized by the
+// control plane combiner.
+class ChannelData::GrpcSubchannel : public SubchannelInterface {
+ public:
+ GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
+ UniquePtr<char> health_check_service_name)
+ : chand_(chand),
+ subchannel_(subchannel),
+ health_check_service_name_(std::move(health_check_service_name)) {
+ GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
+ auto* subchannel_node = subchannel_->channelz_node();
+ if (subchannel_node != nullptr) {
+ intptr_t subchannel_uuid = subchannel_node->uuid();
+ auto it = chand_->subchannel_refcount_map_.find(subchannel_);
+ if (it == chand_->subchannel_refcount_map_.end()) {
+ chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
+ it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
+ }
+ ++it->second;
+ }
+ }
+
+ ~GrpcSubchannel() {
+ auto* subchannel_node = subchannel_->channelz_node();
+ if (subchannel_node != nullptr) {
+ intptr_t subchannel_uuid = subchannel_node->uuid();
+ auto it = chand_->subchannel_refcount_map_.find(subchannel_);
+ GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
+ --it->second;
+ if (it->second == 0) {
+ chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
+ chand_->subchannel_refcount_map_.erase(it);
+ }
+ }
+ GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
+ GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
+ }
+
+ grpc_connectivity_state CheckConnectivityState(
+ RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
+ override {
+ RefCountedPtr<ConnectedSubchannel> tmp;
+ auto retval = subchannel_->CheckConnectivityState(
+ health_check_service_name_.get(), &tmp);
+ *connected_subchannel = std::move(tmp);
+ return retval;
+ }
+
+ void WatchConnectivityState(
+ grpc_connectivity_state initial_state,
+ UniquePtr<ConnectivityStateWatcher> watcher) override {
+ subchannel_->WatchConnectivityState(
+ initial_state,
+ UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
+ std::move(watcher));
+ }
+
+ void CancelConnectivityStateWatch(
+ ConnectivityStateWatcher* watcher) override {
+ subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
+ watcher);
+ }
+
+ void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
+
+ void ResetBackoff() override { subchannel_->ResetBackoff(); }
+
+ private:
+ ChannelData* chand_;
+ Subchannel* subchannel_;
+ UniquePtr<char> health_check_service_name_;
+};
+
+//
+// ChannelData::ClientChannelControlHelper
+//
+
+class ChannelData::ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper {
public:
- explicit ClientChannelControlHelper(channel_data* chand) : chand_(chand) {
- GRPC_CHANNEL_STACK_REF(chand_->owning_stack, "ClientChannelControlHelper");
+ explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
+ GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
}
~ClientChannelControlHelper() override {
- GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack,
+ GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
"ClientChannelControlHelper");
}
- Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
+ RefCountedPtr<SubchannelInterface> CreateSubchannel(
+ const grpc_channel_args& args) override {
+ bool inhibit_health_checking = grpc_channel_arg_get_bool(
+ grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
+ UniquePtr<char> health_check_service_name;
+ if (!inhibit_health_checking) {
+ health_check_service_name.reset(
+ gpr_strdup(chand_->health_check_service_name_.get()));
+ }
+ static const char* args_to_remove[] = {
+ GRPC_ARG_INHIBIT_HEALTH_CHECKING,
+ GRPC_ARG_CHANNELZ_CHANNEL_NODE,
+ };
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
- chand_->subchannel_pool.get());
- grpc_channel_args* new_args =
- grpc_channel_args_copy_and_add(&args, &arg, 1);
+ chand_->subchannel_pool_.get());
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+ &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
Subchannel* subchannel =
- chand_->client_channel_factory->CreateSubchannel(new_args);
+ chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
- return subchannel;
+ if (subchannel == nullptr) return nullptr;
+ return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
+ std::move(health_check_service_name));
}
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override {
- return chand_->client_channel_factory->CreateChannel(target, &args);
+ return chand_->client_channel_factory_->CreateChannel(target, &args);
}
void UpdateState(
- grpc_connectivity_state state, grpc_error* state_error,
+ grpc_connectivity_state state,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
- if (grpc_client_channel_routing_trace.enabled()) {
- const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
+ grpc_error* disconnect_error =
+ chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ const char* extra = disconnect_error == GRPC_ERROR_NONE
? ""
: " (ignoring -- channel shutting down)";
- gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
- chand_, grpc_connectivity_state_name(state),
- grpc_error_string(state_error), picker.get(), extra);
+ gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
+ grpc_connectivity_state_name(state), picker.get(), extra);
}
// Do update only if not shutting down.
- if (chand_->disconnect_error == GRPC_ERROR_NONE) {
- set_connectivity_state_and_picker_locked(chand_, state, state_error,
- "helper", std::move(picker));
- } else {
- GRPC_ERROR_UNREF(state_error);
+ if (disconnect_error == GRPC_ERROR_NONE) {
+ // Will delete itself.
+ New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
+ std::move(picker));
}
}
// No-op -- we should never get this from ResolvingLoadBalancingPolicy.
void RequestReresolution() override {}
+ void AddTraceEvent(TraceSeverity severity, const char* message) override {
+ if (chand_->channelz_node_ != nullptr) {
+ chand_->channelz_node_->AddTraceEvent(
+ ConvertSeverityEnum(severity),
+ grpc_slice_from_copied_string(message));
+ }
+ }
+
private:
- channel_data* chand_;
+ static channelz::ChannelTrace::Severity ConvertSeverityEnum(
+ TraceSeverity severity) {
+ if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
+ if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
+ return channelz::ChannelTrace::Error;
+ }
+
+ ChannelData* chand_;
};
-} // namespace
-} // namespace grpc_core
+//
+// ChannelData implementation
+//
+
+grpc_error* ChannelData::Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(args->is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ grpc_error* error = GRPC_ERROR_NONE;
+ new (elem->channel_data) ChannelData(args, &error);
+ return error;
+}
+
+void ChannelData::Destroy(grpc_channel_element* elem) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ chand->~ChannelData();
+}
+
+bool GetEnableRetries(const grpc_channel_args* args) {
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
+}
+
+size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
+ return static_cast<size_t>(grpc_channel_arg_get_integer(
+ grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
+ {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
+}
+
+RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
+ const grpc_channel_args* args) {
+ const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
+ grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
+ if (use_local_subchannel_pool) {
+ return MakeRefCounted<LocalSubchannelPool>();
+ }
+ return GlobalSubchannelPool::instance();
+}
-// Synchronous callback from chand->resolving_lb_policy to process a resolver
-// result update.
-static bool process_resolver_result_locked(
- void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
- grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
- channel_data* chand = static_cast<channel_data*>(arg);
- chand->have_service_config = true;
- ProcessedResolverResult resolver_result(result, chand->enable_retries);
- grpc_core::UniquePtr<char> service_config_json =
- resolver_result.service_config_json();
- if (grpc_client_channel_routing_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
- chand, service_config_json.get());
- }
- // Update channel state.
- chand->retry_throttle_data = resolver_result.retry_throttle_data();
- chand->method_params_table = resolver_result.method_params_table();
- // Swap out the data used by cc_get_channel_info().
- gpr_mu_lock(&chand->info_mu);
- chand->info_lb_policy_name = resolver_result.lb_policy_name();
+channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
+ const grpc_arg* arg =
+ grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
+ if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
+ return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
+ }
+ return nullptr;
+}
+
+ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
+ : deadline_checking_enabled_(
+ grpc_deadline_checking_enabled(args->channel_args)),
+ enable_retries_(GetEnableRetries(args->channel_args)),
+ per_rpc_retry_buffer_size_(
+ GetMaxPerRpcRetryBufferSize(args->channel_args)),
+ owning_stack_(args->channel_stack),
+ client_channel_factory_(
+ ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
+ channelz_node_(GetChannelzNode(args->channel_args)),
+ data_plane_combiner_(grpc_combiner_create()),
+ combiner_(grpc_combiner_create()),
+ interested_parties_(grpc_pollset_set_create()),
+ subchannel_pool_(GetSubchannelPool(args->channel_args)),
+ disconnect_error_(GRPC_ERROR_NONE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
+ this, owning_stack_);
+ }
+ // Initialize data members.
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
+ "client_channel");
+ gpr_mu_init(&info_mu_);
+ // Start backup polling.
+ grpc_client_channel_start_backup_polling(interested_parties_);
+ // Check client channel factory.
+ if (client_channel_factory_ == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Missing client channel factory in args for client channel filter");
+ return;
+ }
+ // Get server name to resolve, using proxy mapper if needed.
+ const char* server_uri = grpc_channel_arg_get_string(
+ grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
+ if (server_uri == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "server URI channel arg missing or wrong type in client channel "
+ "filter");
+ return;
+ }
+ // Get default service config
+ const char* service_config_json = grpc_channel_arg_get_string(
+ grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
+ if (service_config_json != nullptr) {
+ *error = GRPC_ERROR_NONE;
+ default_service_config_ = ServiceConfig::Create(service_config_json, error);
+ if (*error != GRPC_ERROR_NONE) {
+ default_service_config_.reset();
+ return;
+ }
+ }
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
+ if (uri != nullptr && uri->path[0] != '\0') {
+ server_name_.reset(
+ gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
+ }
+ grpc_uri_destroy(uri);
+ char* proxy_name = nullptr;
+ grpc_channel_args* new_args = nullptr;
+ grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
+ &new_args);
+ UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
+ : gpr_strdup(server_uri));
+ // Instantiate resolving LB policy.
+ LoadBalancingPolicy::Args lb_args;
+ lb_args.combiner = combiner_;
+ lb_args.channel_control_helper =
+ UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
+ New<ClientChannelControlHelper>(this));
+ lb_args.args = new_args != nullptr ? new_args : args->channel_args;
+ resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
+ std::move(lb_args), &grpc_client_channel_routing_trace,
+ std::move(target_uri), ProcessResolverResultLocked, this, error));
+ grpc_channel_args_destroy(new_args);
+ if (*error != GRPC_ERROR_NONE) {
+ // Orphan the resolving LB policy and flush the exec_ctx to ensure
+ // that it finishes shutting down. This ensures that if we are
+ // failing, we destroy the ClientChannelControlHelper (and thus
+ // unref the channel stack) before we return.
+ // TODO(roth): This is not a complete solution, because it only
+ // catches the case where channel stack initialization fails in this
+ // particular filter. If there is a failure in a different filter, we
+ // will leave a dangling ref here, which can cause a crash. Fortunately,
+ // in practice, there are no other filters that can cause failures in
+ // channel stack initialization, so this works for now.
+ resolving_lb_policy_.reset();
+ ExecCtx::Get()->Flush();
+ } else {
+ grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
+ interested_parties_);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
+ resolving_lb_policy_.get());
+ }
+ }
+}
+
+ChannelData::~ChannelData() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
+ }
+ if (resolving_lb_policy_ != nullptr) {
+ grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
+ interested_parties_);
+ resolving_lb_policy_.reset();
+ }
+ // Stop backup polling.
+ grpc_client_channel_stop_backup_polling(interested_parties_);
+ grpc_pollset_set_destroy(interested_parties_);
+ GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
+ GRPC_COMBINER_UNREF(combiner_, "client_channel");
+ GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
+ grpc_connectivity_state_destroy(&state_tracker_);
+ gpr_mu_destroy(&info_mu_);
+}
+
+void ChannelData::ProcessLbPolicy(
+ const Resolver::Result& resolver_result,
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+ UniquePtr<char>* lb_policy_name,
+ RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
+ // Prefer the LB policy name found in the service config.
+ if (parsed_service_config != nullptr &&
+ parsed_service_config->parsed_lb_config() != nullptr) {
+ lb_policy_name->reset(
+ gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
+ *lb_policy_config = parsed_service_config->parsed_lb_config();
+ return;
+ }
+ const char* local_policy_name = nullptr;
+ if (parsed_service_config != nullptr &&
+ parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
+ local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
+ } else {
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
+ local_policy_name = grpc_channel_arg_get_string(channel_arg);
+ }
+ // Special case: If at least one balancer address is present, we use
+ // the grpclb policy, regardless of what the resolver has returned.
+ bool found_balancer_address = false;
+ for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
+ const ServerAddress& address = resolver_result.addresses[i];
+ if (address.IsBalancer()) {
+ found_balancer_address = true;
+ break;
+ }
+ }
+ if (found_balancer_address) {
+ if (local_policy_name != nullptr &&
+ strcmp(local_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided at least one "
+ "balancer address -- forcing use of grpclb LB policy",
+ local_policy_name);
+ }
+ local_policy_name = "grpclb";
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ lb_policy_name->reset(gpr_strdup(
+ local_policy_name == nullptr ? "pick_first" : local_policy_name));
+}
+
+// Synchronous callback from ResolvingLoadBalancingPolicy to process a
+// resolver result update.
+bool ChannelData::ProcessResolverResultLocked(
+ void* arg, const Resolver::Result& result, const char** lb_policy_name,
+ RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
+ grpc_error** service_config_error) {
+ ChannelData* chand = static_cast<ChannelData*>(arg);
+ RefCountedPtr<ServiceConfig> service_config;
+ // If resolver did not return a service config or returned an invalid service
+ // config, we need a fallback service config.
+ if (result.service_config_error != GRPC_ERROR_NONE) {
+ // If the service config was invalid, then fallback to the saved service
+ // config. If there is no saved config either, use the default service
+ // config.
+ if (chand->saved_service_config_ != nullptr) {
+ service_config = chand->saved_service_config_;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned invalid service config. "
+ "Continuing to use previous service config.",
+ chand);
+ }
+ } else if (chand->default_service_config_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned invalid service config. Using "
+ "default service config provided by client API.",
+ chand);
+ }
+ service_config = chand->default_service_config_;
+ }
+ } else if (result.service_config == nullptr) {
+ if (chand->default_service_config_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned no service config. Using default "
+ "service config provided by client API.",
+ chand);
+ }
+ service_config = chand->default_service_config_;
+ }
+ } else {
+ service_config = result.service_config;
+ }
+ *service_config_error = GRPC_ERROR_REF(result.service_config_error);
+ if (service_config == nullptr &&
+ result.service_config_error != GRPC_ERROR_NONE) {
+ return false;
+ }
+ // Process service config.
+ UniquePtr<char> service_config_json;
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
+ nullptr;
+ if (service_config != nullptr) {
+ parsed_service_config =
+ static_cast<const internal::ClientChannelGlobalParsedConfig*>(
+ service_config->GetGlobalParsedConfig(
+ internal::ClientChannelServiceConfigParser::ParserIndex()));
+ }
+ // Check if the config has changed.
const bool service_config_changed =
- ((service_config_json == nullptr) !=
- (chand->info_service_config_json == nullptr)) ||
- (service_config_json != nullptr &&
- strcmp(service_config_json.get(),
- chand->info_service_config_json.get()) != 0);
- chand->info_service_config_json = std::move(service_config_json);
- gpr_mu_unlock(&chand->info_mu);
- // Return results.
- *lb_policy_name = chand->info_lb_policy_name.get();
- *lb_policy_config = resolver_result.lb_policy_config();
- // Apply service config to queued picks.
- for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
- pick = pick->next) {
- maybe_apply_service_config_to_call_locked(pick->elem);
+ ((service_config == nullptr) !=
+ (chand->saved_service_config_ == nullptr)) ||
+ (service_config != nullptr &&
+ strcmp(service_config->service_config_json(),
+ chand->saved_service_config_->service_config_json()) != 0);
+ if (service_config_changed) {
+ service_config_json.reset(gpr_strdup(
+ service_config != nullptr ? service_config->service_config_json()
+ : ""));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned updated service config: \"%s\"",
+ chand, service_config_json.get());
+ }
+ // Save health check service name.
+ if (service_config != nullptr) {
+ chand->health_check_service_name_.reset(
+ gpr_strdup(parsed_service_config->health_check_service_name()));
+ } else {
+ chand->health_check_service_name_.reset();
+ }
+ // Save service config.
+ chand->saved_service_config_ = std::move(service_config);
+ }
+ // We want to set the service config at least once. This should not really be
+ // needed, but we are doing it as a defensive approach. This can be removed,
+ // if we feel it is unnecessary.
+ if (service_config_changed || !chand->received_first_resolver_result_) {
+ chand->received_first_resolver_result_ = true;
+ Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
+ retry_throttle_data;
+ if (parsed_service_config != nullptr) {
+ retry_throttle_data = parsed_service_config->retry_throttling();
+ }
+ // Create service config setter to update channel state in the data
+ // plane combiner. Destroys itself when done.
+ New<ServiceConfigSetter>(chand, retry_throttle_data,
+ chand->saved_service_config_);
+ }
+ UniquePtr<char> processed_lb_policy_name;
+ chand->ProcessLbPolicy(result, parsed_service_config,
+ &processed_lb_policy_name, lb_policy_config);
+ // Swap out the data used by GetChannelInfo().
+ {
+ MutexLock lock(&chand->info_mu_);
+ chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
+ if (service_config_json != nullptr) {
+ chand->info_service_config_json_ = std::move(service_config_json);
+ }
}
+ // Return results.
+ *lb_policy_name = chand->info_lb_policy_name_.get();
return service_config_changed;
}
-static grpc_error* do_ping_locked(channel_data* chand, grpc_transport_op* op) {
- grpc_error* error = GRPC_ERROR_NONE;
- grpc_connectivity_state state =
- grpc_connectivity_state_get(&chand->state_tracker, &error);
- if (state != GRPC_CHANNEL_READY) {
- grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "channel not connected", &error, 1);
- GRPC_ERROR_UNREF(error);
- return new_error;
+grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
+ if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
}
- LoadBalancingPolicy::PickArgs pick;
- chand->picker->Pick(&pick, &error);
- if (pick.connected_subchannel != nullptr) {
- pick.connected_subchannel->Ping(op->send_ping.on_initiate,
- op->send_ping.on_ack);
+ LoadBalancingPolicy::PickResult result =
+ picker_->Pick(LoadBalancingPolicy::PickArgs());
+ if (result.connected_subchannel != nullptr) {
+ ConnectedSubchannel* connected_subchannel =
+ static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
+ connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
} else {
- if (error == GRPC_ERROR_NONE) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ if (result.error == GRPC_ERROR_NONE) {
+ result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"LB policy dropped call on ping");
}
}
- return error;
+ return result.error;
}
-static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
+void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
grpc_channel_element* elem =
static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ // Connectivity watch.
if (op->on_connectivity_state_change != nullptr) {
grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
+ &chand->state_tracker_, op->connectivity_state,
op->on_connectivity_state_change);
op->on_connectivity_state_change = nullptr;
op->connectivity_state = nullptr;
}
-
+ // Ping.
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
- grpc_error* error = do_ping_locked(chand, op);
+ grpc_error* error = chand->DoPingLocked(op);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
op->send_ping.on_initiate = nullptr;
op->send_ping.on_ack = nullptr;
}
-
+ // Reset backoff.
if (op->reset_connect_backoff) {
- chand->resolving_lb_policy->ResetBackoffLocked();
+ if (chand->resolving_lb_policy_ != nullptr) {
+ chand->resolving_lb_policy_->ResetBackoffLocked();
+ }
}
-
+ // Disconnect.
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
- chand->disconnect_error = op->disconnect_with_error;
+ grpc_error* error = GRPC_ERROR_NONE;
+ GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
+ &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
+ MemoryOrder::ACQUIRE));
grpc_pollset_set_del_pollset_set(
- chand->resolving_lb_policy->interested_parties(),
- chand->interested_parties);
- chand->resolving_lb_policy.reset();
- set_connectivity_state_and_picker_locked(
- chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
- "shutdown from API",
- grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
- grpc_core::New<LoadBalancingPolicy::TransientFailurePicker>(
+ chand->resolving_lb_policy_->interested_parties(),
+ chand->interested_parties_);
+ chand->resolving_lb_policy_.reset();
+ // Will delete itself.
+ New<ConnectivityStateAndPickerSetter>(
+ chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
+ UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
+ New<LoadBalancingPolicy::TransientFailurePicker>(
GRPC_ERROR_REF(op->disconnect_with_error))));
}
-
- GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
-static void cc_start_transport_op(grpc_channel_element* elem,
- grpc_transport_op* op) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-
+void ChannelData::StartTransportOp(grpc_channel_element* elem,
+ grpc_transport_op* op) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
GPR_ASSERT(op->set_accept_stream == false);
+ // Handle bind_pollset.
if (op->bind_pollset != nullptr) {
- grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
+ grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
}
-
+ // Pop into control plane combiner for remaining ops.
op->handler_private.extra_arg = elem;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
- op, grpc_combiner_scheduler(chand->combiner)),
+ GRPC_CLOSURE_INIT(&op->handler_private.closure,
+ ChannelData::StartTransportOpLocked, op,
+ grpc_combiner_scheduler(chand->combiner_)),
GRPC_ERROR_NONE);
}
-static void cc_get_channel_info(grpc_channel_element* elem,
- const grpc_channel_info* info) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- gpr_mu_lock(&chand->info_mu);
+void ChannelData::GetChannelInfo(grpc_channel_element* elem,
+ const grpc_channel_info* info) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ MutexLock lock(&chand->info_mu_);
if (info->lb_policy_name != nullptr) {
- *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
+ *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
}
if (info->service_config_json != nullptr) {
*info->service_config_json =
- gpr_strdup(chand->info_service_config_json.get());
+ gpr_strdup(chand->info_service_config_json_.get());
}
- gpr_mu_unlock(&chand->info_mu);
}
-/* Constructor for channel_data */
-static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
- grpc_channel_element_args* args) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- GPR_ASSERT(args->is_last);
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- // Initialize data members.
- chand->combiner = grpc_combiner_create();
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
- "client_channel");
- chand->disconnect_error = GRPC_ERROR_NONE;
- gpr_mu_init(&chand->info_mu);
- gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
-
- gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- chand->external_connectivity_watcher_list_head = nullptr;
- gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
-
- chand->owning_stack = args->channel_stack;
- chand->deadline_checking_enabled =
- grpc_deadline_checking_enabled(args->channel_args);
- chand->interested_parties = grpc_pollset_set_create();
- grpc_client_channel_start_backup_polling(chand->interested_parties);
- // Record max per-RPC retry buffer size.
- const grpc_arg* arg = grpc_channel_args_find(
- args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
- chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
- arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
- // Record enable_retries.
- arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
- chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
- // Record client channel factory.
- chand->client_channel_factory =
- grpc_core::ClientChannelFactory::GetFromChannelArgs(args->channel_args);
- if (chand->client_channel_factory == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Missing client channel factory in args for client channel filter");
- }
- // Get server name to resolve, using proxy mapper if needed.
- arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
- if (arg == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Missing server uri in args for client channel filter");
- }
- if (arg->type != GRPC_ARG_STRING) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "server uri arg must be a string");
- }
- char* proxy_name = nullptr;
- grpc_channel_args* new_args = nullptr;
- grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
- &proxy_name, &new_args);
- grpc_core::UniquePtr<char> target_uri(
- proxy_name != nullptr ? proxy_name : gpr_strdup(arg->value.string));
- // Instantiate subchannel pool.
- arg = grpc_channel_args_find(args->channel_args,
- GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL);
- if (grpc_channel_arg_get_bool(arg, false)) {
- chand->subchannel_pool =
- grpc_core::MakeRefCounted<grpc_core::LocalSubchannelPool>();
- } else {
- chand->subchannel_pool = grpc_core::GlobalSubchannelPool::instance();
- }
- // Instantiate resolving LB policy.
- LoadBalancingPolicy::Args lb_args;
- lb_args.combiner = chand->combiner;
- lb_args.channel_control_helper =
- grpc_core::UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
- grpc_core::New<grpc_core::ClientChannelControlHelper>(chand));
- lb_args.args = new_args != nullptr ? new_args : args->channel_args;
- grpc_error* error = GRPC_ERROR_NONE;
- chand->resolving_lb_policy.reset(
- grpc_core::New<grpc_core::ResolvingLoadBalancingPolicy>(
- std::move(lb_args), &grpc_client_channel_routing_trace,
- std::move(target_uri), process_resolver_result_locked, chand,
- &error));
- grpc_channel_args_destroy(new_args);
- if (error != GRPC_ERROR_NONE) {
- // Orphan the resolving LB policy and flush the exec_ctx to ensure
- // that it finishes shutting down. This ensures that if we are
- // failing, we destroy the ClientChannelControlHelper (and thus
- // unref the channel stack) before we return.
- // TODO(roth): This is not a complete solution, because it only
- // catches the case where channel stack initialization fails in this
- // particular filter. If there is a failure in a different filter, we
- // will leave a dangling ref here, which can cause a crash. Fortunately,
- // in practice, there are no other filters that can cause failures in
- // channel stack initialization, so this works for now.
- chand->resolving_lb_policy.reset();
- grpc_core::ExecCtx::Get()->Flush();
- } else {
- grpc_pollset_set_add_pollset_set(
- chand->resolving_lb_policy->interested_parties(),
- chand->interested_parties);
- if (grpc_client_channel_routing_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", chand,
- chand->resolving_lb_policy.get());
+void ChannelData::AddQueuedPick(QueuedPick* pick,
+ grpc_polling_entity* pollent) {
+ // Add call to queued picks list.
+ pick->next = queued_picks_;
+ queued_picks_ = pick;
+ // Add call's pollent to channel's interested_parties, so that I/O
+ // can be done under the call's CQ.
+ grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
+}
+
+void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
+ grpc_polling_entity* pollent) {
+ // Remove call's pollent from channel's interested_parties.
+ grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
+ // Remove from queued picks list.
+ for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
+ pick = &(*pick)->next) {
+ if (*pick == to_remove) {
+ *pick = to_remove->next;
+ return;
}
}
- return error;
}
-/* Destructor for channel_data */
-static void cc_destroy_channel_elem(grpc_channel_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- if (chand->resolving_lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(
- chand->resolving_lb_policy->interested_parties(),
- chand->interested_parties);
- chand->resolving_lb_policy.reset();
- }
- // TODO(roth): Once we convert the filter API to C++, there will no
- // longer be any need to explicitly reset these smart pointer data members.
- chand->picker.reset();
- chand->subchannel_pool.reset();
- chand->info_lb_policy_name.reset();
- chand->info_service_config_json.reset();
- chand->retry_throttle_data.reset();
- chand->method_params_table.reset();
- grpc_client_channel_stop_backup_polling(chand->interested_parties);
- grpc_pollset_set_destroy(chand->interested_parties);
- GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
- GRPC_ERROR_UNREF(chand->disconnect_error);
- grpc_connectivity_state_destroy(&chand->state_tracker);
- gpr_mu_destroy(&chand->info_mu);
- gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
+void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
+ auto* chand = static_cast<ChannelData*>(arg);
+ if (chand->resolving_lb_policy_ != nullptr) {
+ chand->resolving_lb_policy_->ExitIdleLocked();
+ }
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
}
-/*************************************************************************
- * PER-CALL FUNCTIONS
- */
+grpc_connectivity_state ChannelData::CheckConnectivityState(
+ bool try_to_connect) {
+ grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
+ GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
+ grpc_combiner_scheduler(combiner_)),
+ GRPC_ERROR_NONE);
+ }
+ return out;
+}
-// Max number of batches that can be pending on a call at any given
-// time. This includes one batch for each of the following ops:
-// recv_initial_metadata
-// send_initial_metadata
-// recv_message
-// send_message
-// recv_trailing_metadata
-// send_trailing_metadata
-#define MAX_PENDING_BATCHES 6
+//
+// CallData implementation
+//
// Retry support:
//
// (census filter is on top of this one)
// - add census stats for retries
-namespace grpc_core {
-namespace {
-class QueuedPickCanceller;
-} // namespace
-} // namespace grpc_core
-
-namespace {
-
-struct call_data;
-
-// State used for starting a retryable batch on a subchannel call.
-// This provides its own grpc_transport_stream_op_batch and other data
-// structures needed to populate the ops in the batch.
-// We allocate one struct on the arena for each attempt at starting a
-// batch on a given subchannel call.
-struct subchannel_batch_data {
- subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount,
- bool set_on_complete);
- // All dtor code must be added in `destroy`. This is because we may
- // call closures in `subchannel_batch_data` after they are unrefed by
- // `batch_data_unref`, and msan would complain about accessing this class
- // after calling dtor. As a result we cannot call the `dtor` in
- // `batch_data_unref`.
- // TODO(soheil): We should try to call the dtor in `batch_data_unref`.
- ~subchannel_batch_data() { destroy(); }
- void destroy();
-
- gpr_refcount refs;
- grpc_call_element* elem;
- grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
- // The batch to use in the subchannel call.
- // Its payload field points to subchannel_call_retry_state.batch_payload.
- grpc_transport_stream_op_batch batch;
- // For intercepting on_complete.
- grpc_closure on_complete;
-};
-
-// Retry state associated with a subchannel call.
-// Stored in the parent_data of the subchannel call object.
-struct subchannel_call_retry_state {
- explicit subchannel_call_retry_state(grpc_call_context_element* context)
- : batch_payload(context),
- started_send_initial_metadata(false),
- completed_send_initial_metadata(false),
- started_send_trailing_metadata(false),
- completed_send_trailing_metadata(false),
- started_recv_initial_metadata(false),
- completed_recv_initial_metadata(false),
- started_recv_trailing_metadata(false),
- completed_recv_trailing_metadata(false),
- retry_dispatched(false) {}
-
- // subchannel_batch_data.batch.payload points to this.
- grpc_transport_stream_op_batch_payload batch_payload;
- // For send_initial_metadata.
- // Note that we need to make a copy of the initial metadata for each
- // subchannel call instead of just referring to the copy in call_data,
- // because filters in the subchannel stack will probably add entries,
- // so we need to start in a pristine state for each attempt of the call.
- grpc_linked_mdelem* send_initial_metadata_storage;
- grpc_metadata_batch send_initial_metadata;
- // For send_message.
- grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
- send_message;
- // For send_trailing_metadata.
- grpc_linked_mdelem* send_trailing_metadata_storage;
- grpc_metadata_batch send_trailing_metadata;
- // For intercepting recv_initial_metadata.
- grpc_metadata_batch recv_initial_metadata;
- grpc_closure recv_initial_metadata_ready;
- bool trailing_metadata_available = false;
- // For intercepting recv_message.
- grpc_closure recv_message_ready;
- grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
- // For intercepting recv_trailing_metadata.
- grpc_metadata_batch recv_trailing_metadata;
- grpc_transport_stream_stats collect_stats;
- grpc_closure recv_trailing_metadata_ready;
- // These fields indicate which ops have been started and completed on
- // this subchannel call.
- size_t started_send_message_count = 0;
- size_t completed_send_message_count = 0;
- size_t started_recv_message_count = 0;
- size_t completed_recv_message_count = 0;
- bool started_send_initial_metadata : 1;
- bool completed_send_initial_metadata : 1;
- bool started_send_trailing_metadata : 1;
- bool completed_send_trailing_metadata : 1;
- bool started_recv_initial_metadata : 1;
- bool completed_recv_initial_metadata : 1;
- bool started_recv_trailing_metadata : 1;
- bool completed_recv_trailing_metadata : 1;
- // State for callback processing.
- subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
- grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
- subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
- grpc_error* recv_message_error = GRPC_ERROR_NONE;
- subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
- // NOTE: Do not move this next to the metadata bitfields above. That would
- // save space but will also result in a data race because compiler will
- // generate a 2 byte store which overwrites the meta-data fields upon
- // setting this field.
- bool retry_dispatched : 1;
-};
+CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
+ const grpc_call_element_args& args)
+ : deadline_state_(elem, args.call_stack, args.call_combiner,
+ GPR_LIKELY(chand.deadline_checking_enabled())
+ ? args.deadline
+ : GRPC_MILLIS_INF_FUTURE),
+ path_(grpc_slice_ref_internal(args.path)),
+ call_start_time_(args.start_time),
+ deadline_(args.deadline),
+ arena_(args.arena),
+ owning_call_(args.call_stack),
+ call_combiner_(args.call_combiner),
+ call_context_(args.context),
+ lb_call_state_(this),
+ pending_send_initial_metadata_(false),
+ pending_send_message_(false),
+ pending_send_trailing_metadata_(false),
+ enable_retries_(chand.enable_retries()),
+ retry_committed_(false),
+ last_attempt_got_server_pushback_(false) {}
+
+CallData::~CallData() {
+ grpc_slice_unref_internal(path_);
+ GRPC_ERROR_UNREF(cancel_error_);
+ // Make sure there are no remaining pending batches.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ GPR_ASSERT(pending_batches_[i].batch == nullptr);
+ }
+}
+
+grpc_error* CallData::Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ new (elem->call_data) CallData(elem, *chand, *args);
+ return GRPC_ERROR_NONE;
+}
-// Pending batches stored in call data.
-struct pending_batch {
- // The pending batch. If nullptr, this slot is empty.
- grpc_transport_stream_op_batch* batch;
- // Indicates whether payload for send ops has been cached in call data.
- bool send_ops_cached;
-};
+void CallData::Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_schedule_closure) {
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
+ calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
+ then_schedule_closure = nullptr;
+ }
+ calld->~CallData();
+ GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
+}
-/** Call data. Holds a pointer to SubchannelCall and the
- associated machinery to create such a pointer.
- Handles queueing of stream ops until a call object is ready, waiting
- for initial metadata before trying to create a call object,
- and handling cancellation gracefully. */
-struct call_data {
- call_data(grpc_call_element* elem, const channel_data& chand,
- const grpc_call_element_args& args)
- : deadline_state(elem, args.call_stack, args.call_combiner,
- GPR_LIKELY(chand.deadline_checking_enabled)
- ? args.deadline
- : GRPC_MILLIS_INF_FUTURE),
- path(grpc_slice_ref_internal(args.path)),
- call_start_time(args.start_time),
- deadline(args.deadline),
- arena(args.arena),
- owning_call(args.call_stack),
- call_combiner(args.call_combiner),
- call_context(args.context),
- pending_send_initial_metadata(false),
- pending_send_message(false),
- pending_send_trailing_metadata(false),
- enable_retries(chand.enable_retries),
- retry_committed(false),
- last_attempt_got_server_pushback(false) {}
-
- ~call_data() {
- grpc_slice_unref_internal(path);
- GRPC_ERROR_UNREF(cancel_error);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
- GPR_ASSERT(pending_batches[i].batch == nullptr);
+void CallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (GPR_LIKELY(chand->deadline_checking_enabled())) {
+ grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
+ }
+ // If we've previously been cancelled, immediately fail any new batches.
+ if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(calld->cancel_error_));
+ }
+ // Note: This will release the call combiner.
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
+ return;
+ }
+ // Handle cancellation.
+ if (GPR_UNLIKELY(batch->cancel_stream)) {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any batches are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first batch does get passed down.
+ GRPC_ERROR_UNREF(calld->cancel_error_);
+ calld->cancel_error_ =
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ calld, grpc_error_string(calld->cancel_error_));
+ }
+ // If we do not have a subchannel call (i.e., a pick has not yet
+ // been started), fail all pending batches. Otherwise, send the
+ // cancellation down to the subchannel call.
+ if (calld->subchannel_call_ == nullptr) {
+ // TODO(roth): If there is a pending retry callback, do we need to
+ // cancel it here?
+ calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
+ NoYieldCallCombiner);
+ // Note: This will release the call combiner.
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
+ } else {
+ // Note: This will release the call combiner.
+ calld->subchannel_call_->StartTransportStreamOpBatch(batch);
+ }
+ return;
+ }
+ // Add the batch to the pending list.
+ calld->PendingBatchesAdd(elem, batch);
+ // Check if we've already gotten a subchannel call.
+ // Note that once we have completed the pick, we do not need to enter
+ // the channel combiner, which is more efficient (especially for
+ // streaming calls).
+ if (calld->subchannel_call_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
+ calld, calld->subchannel_call_.get());
+ }
+ calld->PendingBatchesResume(elem);
+ return;
+ }
+ // We do not yet have a subchannel call.
+ // For batches containing a send_initial_metadata op, enter the channel
+ // combiner to start a pick.
+ if (GPR_LIKELY(batch->send_initial_metadata)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
+ chand, calld);
+ }
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(
+ &batch->handler_private.closure, StartPickLocked, elem,
+ grpc_combiner_scheduler(chand->data_plane_combiner())),
+ GRPC_ERROR_NONE);
+ } else {
+ // For all other batches, release the call combiner.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: saved batch, yielding call combiner", chand,
+ calld);
}
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
+ "batch does not include send_initial_metadata");
}
+}
- // State for handling deadlines.
- // The code in deadline_filter.c requires this to be the first field.
- // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
- // and this struct both independently store pointers to the call stack
- // and call combiner. If/when we have time, find a way to avoid this
- // without breaking the grpc_deadline_state abstraction.
- grpc_deadline_state deadline_state;
-
- grpc_slice path; // Request path.
- gpr_timespec call_start_time;
- grpc_millis deadline;
- gpr_arena* arena;
- grpc_call_stack* owning_call;
- grpc_call_combiner* call_combiner;
- grpc_call_context_element* call_context;
-
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
- grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
-
- grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
-
- // Set when we get a cancel_stream op.
- grpc_error* cancel_error = GRPC_ERROR_NONE;
-
- QueuedPick pick;
- bool pick_queued = false;
- bool service_config_applied = false;
- grpc_core::QueuedPickCanceller* pick_canceller = nullptr;
- grpc_closure pick_closure;
-
- grpc_polling_entity* pollent = nullptr;
-
- // Batches are added to this list when received from above.
- // They are removed when we are done handling the batch (i.e., when
- // either we have invoked all of the batch's callbacks or we have
- // passed the batch down to the subchannel call and are not
- // intercepting any of its callbacks).
- pending_batch pending_batches[MAX_PENDING_BATCHES] = {};
- bool pending_send_initial_metadata : 1;
- bool pending_send_message : 1;
- bool pending_send_trailing_metadata : 1;
-
- // Retry state.
- bool enable_retries : 1;
- bool retry_committed : 1;
- bool last_attempt_got_server_pushback : 1;
- int num_attempts_completed = 0;
- size_t bytes_buffered_for_retry = 0;
- grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
- grpc_timer retry_timer;
-
- // The number of pending retriable subchannel batches containing send ops.
- // We hold a ref to the call stack while this is non-zero, since replay
- // batches may not complete until after all callbacks have been returned
- // to the surface, and we need to make sure that the call is not destroyed
- // until all of these batches have completed.
- // Note that we actually only need to track replay batches, but it's
- // easier to track all batches with send ops.
- int num_pending_retriable_subchannel_send_batches = 0;
-
- // Cached data for retrying send ops.
- // send_initial_metadata
- bool seen_send_initial_metadata = false;
- grpc_linked_mdelem* send_initial_metadata_storage = nullptr;
- grpc_metadata_batch send_initial_metadata;
- uint32_t send_initial_metadata_flags;
- gpr_atm* peer_string;
- // send_message
- // When we get a send_message op, we replace the original byte stream
- // with a CachingByteStream that caches the slices to a local buffer for
- // use in retries.
- // Note: We inline the cache for the first 3 send_message ops and use
- // dynamic allocation after that. This number was essentially picked
- // at random; it could be changed in the future to tune performance.
- grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
- // send_trailing_metadata
- bool seen_send_trailing_metadata = false;
- grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
- grpc_metadata_batch send_trailing_metadata;
-};
-
-} // namespace
-
-// Forward declarations.
-static void retry_commit(grpc_call_element* elem,
- subchannel_call_retry_state* retry_state);
-static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
-static void on_complete(void* arg, grpc_error* error);
-static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
-static void remove_call_from_queued_picks_locked(grpc_call_element* elem);
+void CallData::SetPollent(grpc_call_element* elem,
+ grpc_polling_entity* pollent) {
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ calld->pollent_ = pollent;
+}
//
// send op data caching
//
-// Caches data for send ops so that it can be retried later, if not
-// already cached.
-static void maybe_cache_send_ops_for_batch(call_data* calld,
- pending_batch* pending) {
+void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
if (pending->send_ops_cached) return;
pending->send_ops_cached = true;
grpc_transport_stream_op_batch* batch = pending->batch;
// Save a copy of metadata for send_initial_metadata ops.
if (batch->send_initial_metadata) {
- calld->seen_send_initial_metadata = true;
- GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
+ seen_send_initial_metadata_ = true;
+ GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
- calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
- calld->arena,
+ send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
- grpc_metadata_batch_copy(send_initial_metadata,
- &calld->send_initial_metadata,
- calld->send_initial_metadata_storage);
- calld->send_initial_metadata_flags =
+ grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
+ send_initial_metadata_storage_);
+ send_initial_metadata_flags_ =
batch->payload->send_initial_metadata.send_initial_metadata_flags;
- calld->peer_string = batch->payload->send_initial_metadata.peer_string;
+ peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.
if (batch->send_message) {
- grpc_core::ByteStreamCache* cache =
- static_cast<grpc_core::ByteStreamCache*>(
- gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
- new (cache) grpc_core::ByteStreamCache(
+ ByteStreamCache* cache = arena_->New<ByteStreamCache>(
std::move(batch->payload->send_message.send_message));
- calld->send_messages.push_back(cache);
+ send_messages_.push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
if (batch->send_trailing_metadata) {
- calld->seen_send_trailing_metadata = true;
- GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
+ seen_send_trailing_metadata_ = true;
+ GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
grpc_metadata_batch* send_trailing_metadata =
batch->payload->send_trailing_metadata.send_trailing_metadata;
- calld->send_trailing_metadata_storage =
- (grpc_linked_mdelem*)gpr_arena_alloc(
- calld->arena,
- sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
- grpc_metadata_batch_copy(send_trailing_metadata,
- &calld->send_trailing_metadata,
- calld->send_trailing_metadata_storage);
+ send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
+ sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
+ grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
+ send_trailing_metadata_storage_);
}
}
-// Frees cached send_initial_metadata.
-static void free_cached_send_initial_metadata(channel_data* chand,
- call_data* calld) {
- if (grpc_client_channel_call_trace.enabled()) {
+void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
- calld);
+ this);
}
- grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+ grpc_metadata_batch_destroy(&send_initial_metadata_);
}
-// Frees cached send_message at index idx.
-static void free_cached_send_message(channel_data* chand, call_data* calld,
- size_t idx) {
- if (grpc_client_channel_call_trace.enabled()) {
+void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
- chand, calld, idx);
+ chand, this, idx);
}
- calld->send_messages[idx]->Destroy();
+ send_messages_[idx]->Destroy();
}
-// Frees cached send_trailing_metadata.
-static void free_cached_send_trailing_metadata(channel_data* chand,
- call_data* calld) {
- if (grpc_client_channel_call_trace.enabled()) {
+void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: destroying calld->send_trailing_metadata",
- chand, calld);
+ chand, this);
}
- grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+ grpc_metadata_batch_destroy(&send_trailing_metadata_);
}
-// Frees cached send ops that have already been completed after
-// committing the call.
-static void free_cached_send_op_data_after_commit(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+void CallData::FreeCachedSendOpDataAfterCommit(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (retry_state->completed_send_initial_metadata) {
- free_cached_send_initial_metadata(chand, calld);
+ FreeCachedSendInitialMetadata(chand);
}
for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
- free_cached_send_message(chand, calld, i);
+ FreeCachedSendMessage(chand, i);
}
if (retry_state->completed_send_trailing_metadata) {
- free_cached_send_trailing_metadata(chand, calld);
+ FreeCachedSendTrailingMetadata(chand);
}
}
-// Frees cached send ops that were completed by the completed batch in
-// batch_data. Used when batches are completed after the call is committed.
-static void free_cached_send_op_data_for_completed_batch(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+void CallData::FreeCachedSendOpDataForCompletedBatch(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (batch_data->batch.send_initial_metadata) {
- free_cached_send_initial_metadata(chand, calld);
+ FreeCachedSendInitialMetadata(chand);
}
if (batch_data->batch.send_message) {
- free_cached_send_message(chand, calld,
- retry_state->completed_send_message_count - 1);
+ FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
}
if (batch_data->batch.send_trailing_metadata) {
- free_cached_send_trailing_metadata(chand, calld);
+ FreeCachedSendTrailingMetadata(chand);
}
}
// LB recv_trailing_metadata_ready handling
//
-void maybe_inject_recv_trailing_metadata_ready_for_lb(
- const LoadBalancingPolicy::PickArgs& pick,
+void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
+ void* arg, grpc_error* error) {
+ CallData* calld = static_cast<CallData*>(arg);
+ // Invoke callback to LB policy.
+ calld->lb_recv_trailing_metadata_ready_(
+ calld->lb_recv_trailing_metadata_ready_user_data_,
+ calld->recv_trailing_metadata_, &calld->lb_call_state_);
+ // Chain to original callback.
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
+ GRPC_ERROR_REF(error));
+}
+
+void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch) {
- if (pick.recv_trailing_metadata_ready != nullptr) {
- *pick.original_recv_trailing_metadata_ready =
+ if (lb_recv_trailing_metadata_ready_ != nullptr) {
+ recv_trailing_metadata_ =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
+ RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
+ grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- pick.recv_trailing_metadata_ready;
- if (pick.recv_trailing_metadata != nullptr) {
- *pick.recv_trailing_metadata =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata;
- }
+ &recv_trailing_metadata_ready_;
}
}
// pending_batches management
//
-// Returns the index into calld->pending_batches to be used for batch.
-static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
+size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
// Note: It is important the send_initial_metadata be the first entry
// here, since the code in pick_subchannel_locked() assumes it will be.
if (batch->send_initial_metadata) return 0;
}
// This is called via the call combiner, so access to calld is synchronized.
-static void pending_batches_add(grpc_call_element* elem,
- grpc_transport_stream_op_batch* batch) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- const size_t idx = get_batch_index(batch);
- if (grpc_client_channel_call_trace.enabled()) {
+void CallData::PendingBatchesAdd(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ const size_t idx = GetBatchIndex(batch);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
- calld, idx);
+ this, idx);
}
- pending_batch* pending = &calld->pending_batches[idx];
+ PendingBatch* pending = &pending_batches_[idx];
GPR_ASSERT(pending->batch == nullptr);
pending->batch = batch;
pending->send_ops_cached = false;
- if (calld->enable_retries) {
+ if (enable_retries_) {
// Update state in calld about pending batches.
// Also check if the batch takes us over the retry buffer limit.
// Note: We don't check the size of trailing metadata here, because
// gRPC clients do not send trailing metadata.
if (batch->send_initial_metadata) {
- calld->pending_send_initial_metadata = true;
- calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
+ pending_send_initial_metadata_ = true;
+ bytes_buffered_for_retry_ += grpc_metadata_batch_size(
batch->payload->send_initial_metadata.send_initial_metadata);
}
if (batch->send_message) {
- calld->pending_send_message = true;
- calld->bytes_buffered_for_retry +=
+ pending_send_message_ = true;
+ bytes_buffered_for_retry_ +=
batch->payload->send_message.send_message->length();
}
if (batch->send_trailing_metadata) {
- calld->pending_send_trailing_metadata = true;
+ pending_send_trailing_metadata_ = true;
}
- if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
- chand->per_rpc_retry_buffer_size)) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
+ chand->per_rpc_retry_buffer_size())) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: exceeded retry buffer size, committing",
- chand, calld);
+ chand, this);
}
- subchannel_call_retry_state* retry_state =
- calld->subchannel_call == nullptr
- ? nullptr
- : static_cast<subchannel_call_retry_state*>(
-
- calld->subchannel_call->GetParentData());
- retry_commit(elem, retry_state);
+ SubchannelCallRetryState* retry_state =
+ subchannel_call_ == nullptr ? nullptr
+ : static_cast<SubchannelCallRetryState*>(
+ subchannel_call_->GetParentData());
+ RetryCommit(elem, retry_state);
// If we are not going to retry and have not yet started, pretend
// retries are disabled so that we don't bother with retry overhead.
- if (calld->num_attempts_completed == 0) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (num_attempts_completed_ == 0) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: disabling retries before first attempt",
- chand, calld);
+ chand, this);
}
- calld->enable_retries = false;
+ enable_retries_ = false;
}
}
}
}
-static void pending_batch_clear(call_data* calld, pending_batch* pending) {
- if (calld->enable_retries) {
+void CallData::PendingBatchClear(PendingBatch* pending) {
+ if (enable_retries_) {
if (pending->batch->send_initial_metadata) {
- calld->pending_send_initial_metadata = false;
+ pending_send_initial_metadata_ = false;
}
if (pending->batch->send_message) {
- calld->pending_send_message = false;
+ pending_send_message_ = false;
}
if (pending->batch->send_trailing_metadata) {
- calld->pending_send_trailing_metadata = false;
+ pending_send_trailing_metadata_ = false;
}
}
pending->batch = nullptr;
}
+void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
+ PendingBatch* pending) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ grpc_transport_stream_op_batch* batch = pending->batch;
+ // We clear the pending batch if all of its callbacks have been
+ // scheduled and reset to nullptr.
+ if (batch->on_complete == nullptr &&
+ (!batch->recv_initial_metadata ||
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
+ nullptr) &&
+ (!batch->recv_message ||
+ batch->payload->recv_message.recv_message_ready == nullptr) &&
+ (!batch->recv_trailing_metadata ||
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+ nullptr)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
+ this);
+ }
+ PendingBatchClear(pending);
+ }
+}
+
// This is called via the call combiner, so access to calld is synchronized.
-static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
+void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
- call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
+ CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
- batch, GRPC_ERROR_REF(error), calld->call_combiner);
+ batch, GRPC_ERROR_REF(error), calld->call_combiner_);
}
// This is called via the call combiner, so access to calld is synchronized.
-// If yield_call_combiner_predicate returns true, assumes responsibility for
-// yielding the call combiner.
-typedef bool (*YieldCallCombinerPredicate)(
- const grpc_core::CallCombinerClosureList& closures);
-static bool yield_call_combiner(
- const grpc_core::CallCombinerClosureList& closures) {
- return true;
-}
-static bool no_yield_call_combiner(
- const grpc_core::CallCombinerClosureList& closures) {
- return false;
-}
-static bool yield_call_combiner_if_pending_batches_found(
- const grpc_core::CallCombinerClosureList& closures) {
- return closures.size() > 0;
-}
-static void pending_batches_fail(
+void CallData::PendingBatchesFail(
grpc_call_element* elem, grpc_error* error,
YieldCallCombinerPredicate yield_call_combiner_predicate) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
size_t num_batches = 0;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- if (calld->pending_batches[i].batch != nullptr) ++num_batches;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ if (pending_batches_[i].batch != nullptr) ++num_batches;
}
gpr_log(GPR_INFO,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
- elem->channel_data, calld, num_batches, grpc_error_string(error));
+ elem->channel_data, this, num_batches, grpc_error_string(error));
}
- grpc_core::CallCombinerClosureList closures;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
+ CallCombinerClosureList closures;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
- maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
- batch);
+ MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
}
- batch->handler_private.extra_arg = calld;
+ batch->handler_private.extra_arg = this;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
+ FailPendingBatchInCallCombiner, batch,
grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
- "pending_batches_fail");
- pending_batch_clear(calld, pending);
+ "PendingBatchesFail");
+ PendingBatchClear(pending);
}
}
if (yield_call_combiner_predicate(closures)) {
- closures.RunClosures(calld->call_combiner);
+ closures.RunClosures(call_combiner_);
} else {
- closures.RunClosuresWithoutYielding(calld->call_combiner);
+ closures.RunClosuresWithoutYielding(call_combiner_);
}
GRPC_ERROR_UNREF(error);
}
// This is called via the call combiner, so access to calld is synchronized.
-static void resume_pending_batch_in_call_combiner(void* arg,
- grpc_error* ignored) {
+void CallData::ResumePendingBatchInCallCombiner(void* arg,
+ grpc_error* ignored) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
- grpc_core::SubchannelCall* subchannel_call =
- static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
+ SubchannelCall* subchannel_call =
+ static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
subchannel_call->StartTransportStreamOpBatch(batch);
}
// This is called via the call combiner, so access to calld is synchronized.
-static void pending_batches_resume(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (calld->enable_retries) {
- start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
+void CallData::PendingBatchesResume(grpc_call_element* elem) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (enable_retries_) {
+ StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
return;
}
// Retries not enabled; send down batches as-is.
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
size_t num_batches = 0;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- if (calld->pending_batches[i].batch != nullptr) ++num_batches;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ if (pending_batches_[i].batch != nullptr) ++num_batches;
}
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" pending batches on subchannel_call=%p",
- chand, calld, num_batches, calld->subchannel_call.get());
+ chand, this, num_batches, subchannel_call_.get());
}
- grpc_core::CallCombinerClosureList closures;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
+ CallCombinerClosureList closures;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
- maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
- batch);
+ MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
}
- batch->handler_private.extra_arg = calld->subchannel_call.get();
+ batch->handler_private.extra_arg = subchannel_call_.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
+ ResumePendingBatchInCallCombiner, batch,
grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
- pending_batch_clear(calld, pending);
+ "PendingBatchesResume");
+ PendingBatchClear(pending);
}
}
// Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
-}
-
-static void maybe_clear_pending_batch(grpc_call_element* elem,
- pending_batch* pending) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_transport_stream_op_batch* batch = pending->batch;
- // We clear the pending batch if all of its callbacks have been
- // scheduled and reset to nullptr.
- if (batch->on_complete == nullptr &&
- (!batch->recv_initial_metadata ||
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
- nullptr) &&
- (!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr) &&
- (!batch->recv_trailing_metadata ||
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
- nullptr)) {
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
- calld);
- }
- pending_batch_clear(calld, pending);
- }
+ closures.RunClosures(call_combiner_);
}
-// Returns a pointer to the first pending batch for which predicate(batch)
-// returns true, or null if not found.
template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
- const char* log_message,
- Predicate predicate) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
+CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
+ const char* log_message,
+ Predicate predicate) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr && predicate(batch)) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
- calld, log_message, i);
+ this, log_message, i);
}
return pending;
}
// retry code
//
-// Commits the call so that no further retry attempts will be performed.
-static void retry_commit(grpc_call_element* elem,
- subchannel_call_retry_state* retry_state) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (calld->retry_committed) return;
- calld->retry_committed = true;
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
+void CallData::RetryCommit(grpc_call_element* elem,
+ SubchannelCallRetryState* retry_state) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (retry_committed_) return;
+ retry_committed_ = true;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
}
if (retry_state != nullptr) {
- free_cached_send_op_data_after_commit(elem, retry_state);
+ FreeCachedSendOpDataAfterCommit(elem, retry_state);
}
}
-// Starts a retry after appropriate back-off.
-static void do_retry(grpc_call_element* elem,
- subchannel_call_retry_state* retry_state,
- grpc_millis server_pushback_ms) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- GPR_ASSERT(calld->method_params != nullptr);
- const ClientChannelMethodParams::RetryPolicy* retry_policy =
- calld->method_params->retry_policy();
+void CallData::DoRetry(grpc_call_element* elem,
+ SubchannelCallRetryState* retry_state,
+ grpc_millis server_pushback_ms) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ GPR_ASSERT(method_params_ != nullptr);
+ const auto* retry_policy = method_params_->retry_policy();
GPR_ASSERT(retry_policy != nullptr);
// Reset subchannel call and connected subchannel.
- calld->subchannel_call.reset();
- calld->pick.pick.connected_subchannel.reset();
+ subchannel_call_.reset();
+ connected_subchannel_.reset();
// Compute backoff delay.
grpc_millis next_attempt_time;
if (server_pushback_ms >= 0) {
- next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
- calld->last_attempt_got_server_pushback = true;
+ next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
+ last_attempt_got_server_pushback_ = true;
} else {
- if (calld->num_attempts_completed == 1 ||
- calld->last_attempt_got_server_pushback) {
- calld->retry_backoff.Init(
- grpc_core::BackOff::Options()
+ if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
+ retry_backoff_.Init(
+ BackOff::Options()
.set_initial_backoff(retry_policy->initial_backoff)
.set_multiplier(retry_policy->backoff_multiplier)
.set_jitter(RETRY_BACKOFF_JITTER)
.set_max_backoff(retry_policy->max_backoff));
- calld->last_attempt_got_server_pushback = false;
+ last_attempt_got_server_pushback_ = false;
}
- next_attempt_time = calld->retry_backoff->NextAttemptTime();
+ next_attempt_time = retry_backoff_->NextAttemptTime();
}
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
- calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
+ this, next_attempt_time - ExecCtx::Get()->Now());
}
// Schedule retry after computed delay.
- GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
- grpc_combiner_scheduler(chand->combiner));
- grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
+ GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
+ grpc_combiner_scheduler(chand->data_plane_combiner()));
+ grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
// Update bookkeeping.
if (retry_state != nullptr) retry_state->retry_dispatched = true;
}
-// Returns true if the call is being retried.
-static bool maybe_retry(grpc_call_element* elem,
- subchannel_batch_data* batch_data,
- grpc_status_code status,
- grpc_mdelem* server_pushback_md) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+bool CallData::MaybeRetry(grpc_call_element* elem,
+ SubchannelCallBatchData* batch_data,
+ grpc_status_code status,
+ grpc_mdelem* server_pushback_md) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Get retry policy.
- if (calld->method_params == nullptr) return false;
- const ClientChannelMethodParams::RetryPolicy* retry_policy =
- calld->method_params->retry_policy();
+ if (method_params_ == nullptr) return false;
+ const auto* retry_policy = method_params_->retry_policy();
if (retry_policy == nullptr) return false;
// If we've already dispatched a retry from this call, return true.
// This catches the case where the batch has multiple callbacks
// (i.e., it includes either recv_message or recv_initial_metadata).
- subchannel_call_retry_state* retry_state = nullptr;
+ SubchannelCallRetryState* retry_state = nullptr;
if (batch_data != nullptr) {
- retry_state = static_cast<subchannel_call_retry_state*>(
+ retry_state = static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
if (retry_state->retry_dispatched) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
- calld);
+ this);
}
return true;
}
}
// Check status.
if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
- if (calld->retry_throttle_data != nullptr) {
- calld->retry_throttle_data->RecordSuccess();
+ if (retry_throttle_data_ != nullptr) {
+ retry_throttle_data_->RecordSuccess();
}
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
}
return false;
}
// Status is not OK. Check whether the status is retryable.
if (!retry_policy->retryable_status_codes.Contains(status)) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: status %s not configured as retryable", chand,
- calld, grpc_status_code_to_string(status));
+ this, grpc_status_code_to_string(status));
}
return false;
}
// things like failures due to malformed requests (INVALID_ARGUMENT).
// Conversely, it's important for this to come before the remaining
// checks, so that we don't fail to record failures due to other factors.
- if (calld->retry_throttle_data != nullptr &&
- !calld->retry_throttle_data->RecordFailure()) {
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
+ if (retry_throttle_data_ != nullptr &&
+ !retry_throttle_data_->RecordFailure()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
}
return false;
}
// Check whether the call is committed.
- if (calld->retry_committed) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (retry_committed_) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
- calld);
+ this);
}
return false;
}
// Check whether we have retries remaining.
- ++calld->num_attempts_completed;
- if (calld->num_attempts_completed >= retry_policy->max_attempts) {
- if (grpc_client_channel_call_trace.enabled()) {
+ ++num_attempts_completed_;
+ if (num_attempts_completed_ >= retry_policy->max_attempts) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
- calld, retry_policy->max_attempts);
+ this, retry_policy->max_attempts);
}
return false;
}
// If the call was cancelled from the surface, don't retry.
- if (calld->cancel_error != GRPC_ERROR_NONE) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (cancel_error_ != GRPC_ERROR_NONE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: call cancelled from surface, not retrying",
- chand, calld);
+ chand, this);
}
return false;
}
// If the value is "-1" or any other unparseable string, we do not retry.
uint32_t ms;
if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: not retrying due to server push-back",
- chand, calld);
+ chand, this);
}
return false;
} else {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
- chand, calld, ms);
+ chand, this, ms);
}
server_pushback_ms = (grpc_millis)ms;
}
}
- do_retry(elem, retry_state, server_pushback_ms);
+ DoRetry(elem, retry_state, server_pushback_ms);
return true;
}
//
-// subchannel_batch_data
+// CallData::SubchannelCallBatchData
//
-namespace {
+CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
+ grpc_call_element* elem, int refcount, bool set_on_complete) {
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
+ set_on_complete);
+}
-subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
- call_data* calld, int refcount,
- bool set_on_complete)
- : elem(elem), subchannel_call(calld->subchannel_call) {
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- calld->subchannel_call->GetParentData());
+CallData::SubchannelCallBatchData::SubchannelCallBatchData(
+ grpc_call_element* elem, CallData* calld, int refcount,
+ bool set_on_complete)
+ : elem(elem), subchannel_call(calld->subchannel_call_) {
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
+ calld->subchannel_call_->GetParentData());
batch.payload = &retry_state->batch_payload;
gpr_ref_init(&refs, refcount);
if (set_on_complete) {
- GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this,
+ GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
grpc_schedule_on_exec_ctx);
batch.on_complete = &on_complete;
}
- GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
+ GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
}
-void subchannel_batch_data::destroy() {
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- subchannel_call->GetParentData());
+void CallData::SubchannelCallBatchData::Destroy() {
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
if (batch.send_initial_metadata) {
grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
}
grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
}
subchannel_call.reset();
- call_data* calld = static_cast<call_data*>(elem->call_data);
- GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
-}
-
-} // namespace
-
-// Creates a subchannel_batch_data object on the call's arena with the
-// specified refcount. If set_on_complete is true, the batch's
-// on_complete callback will be set to point to on_complete();
-// otherwise, the batch's on_complete callback will be null.
-static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount,
- bool set_on_complete) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- subchannel_batch_data* batch_data =
- new (gpr_arena_alloc(calld->arena, sizeof(*batch_data)))
- subchannel_batch_data(elem, calld, refcount, set_on_complete);
- return batch_data;
-}
-
-static void batch_data_unref(subchannel_batch_data* batch_data) {
- if (gpr_unref(&batch_data->refs)) {
- batch_data->destroy();
- }
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
}
//
// recv_initial_metadata callback handling
//
-// Invokes recv_initial_metadata_ready for a subchannel batch.
-static void invoke_recv_initial_metadata_callback(void* arg,
- grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
+ SubchannelCallBatchData* batch_data =
+ static_cast<SubchannelCallBatchData*>(arg);
+ CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = pending_batch_find(
+ PendingBatch* pending = calld->PendingBatchFind(
batch_data->elem, "invoking recv_initial_metadata_ready for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_initial_metadata &&
});
GPR_ASSERT(pending != nullptr);
// Return metadata.
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
grpc_metadata_batch_move(
&retry_state->recv_initial_metadata,
.recv_initial_metadata_ready;
pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
nullptr;
- maybe_clear_pending_batch(batch_data->elem, pending);
- batch_data_unref(batch_data);
+ calld->MaybeClearPendingBatch(batch_data->elem, pending);
+ batch_data->Unref();
// Invoke callback.
GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
}
-// Intercepts recv_initial_metadata_ready callback for retries.
-// Commits the call and returns the initial metadata up the stack.
-static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
+ SubchannelCallBatchData* batch_data =
+ static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
chand, calld, grpc_error_string(error));
}
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
retry_state->completed_recv_initial_metadata = true;
// If a retry was already dispatched, then we're not going to use the
// result of this recv_initial_metadata op, so do nothing.
if (retry_state->retry_dispatched) {
GRPC_CALL_COMBINER_STOP(
- calld->call_combiner,
+ calld->call_combiner_,
"recv_initial_metadata_ready after retry dispatched");
return;
}
if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring recv_initial_metadata_ready "
"(Trailers-Only)",
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
// ourselves to get status.
- start_internal_recv_trailing_metadata(elem);
+ calld->StartInternalRecvTrailingMetadata(elem);
} else {
GRPC_CALL_COMBINER_STOP(
- calld->call_combiner,
+ calld->call_combiner_,
"recv_initial_metadata_ready trailers-only or error");
}
return;
}
// Received valid initial metadata, so commit the call.
- retry_commit(elem, retry_state);
+ calld->RetryCommit(elem, retry_state);
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
- invoke_recv_initial_metadata_callback(batch_data, error);
+ calld->InvokeRecvInitialMetadataCallback(batch_data, error);
}
//
// recv_message callback handling
//
-// Invokes recv_message_ready for a subchannel batch.
-static void invoke_recv_message_callback(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
+ SubchannelCallBatchData* batch_data =
+ static_cast<SubchannelCallBatchData*>(arg);
+ CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = pending_batch_find(
+ PendingBatch* pending = calld->PendingBatchFind(
batch_data->elem, "invoking recv_message_ready for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_message &&
});
GPR_ASSERT(pending != nullptr);
// Return payload.
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
*pending->batch->payload->recv_message.recv_message =
std::move(retry_state->recv_message);
grpc_closure* recv_message_ready =
pending->batch->payload->recv_message.recv_message_ready;
pending->batch->payload->recv_message.recv_message_ready = nullptr;
- maybe_clear_pending_batch(batch_data->elem, pending);
- batch_data_unref(batch_data);
+ calld->MaybeClearPendingBatch(batch_data->elem, pending);
+ batch_data->Unref();
// Invoke callback.
GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
}
-// Intercepts recv_message_ready callback for retries.
-// Commits the call and returns the message up the stack.
-static void recv_message_ready(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+void CallData::RecvMessageReady(void* arg, grpc_error* error) {
+ SubchannelCallBatchData* batch_data =
+ static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
chand, calld, grpc_error_string(error));
}
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
++retry_state->completed_recv_message_count;
// If a retry was already dispatched, then we're not going to use the
// result of this recv_message op, so do nothing.
if (retry_state->retry_dispatched) {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_message_ready after retry dispatched");
return;
}
if (GPR_UNLIKELY(
(retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring recv_message_ready (nullptr "
"message and recv_trailing_metadata pending)",
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
// ourselves to get status.
- start_internal_recv_trailing_metadata(elem);
+ calld->StartInternalRecvTrailingMetadata(elem);
} else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
}
return;
}
// Received a valid message, so commit the call.
- retry_commit(elem, retry_state);
+ calld->RetryCommit(elem, retry_state);
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
- invoke_recv_message_callback(batch_data, error);
+ calld->InvokeRecvMessageCallback(batch_data, error);
}
//
// recv_trailing_metadata handling
//
-// Sets *status and *server_pushback_md based on md_batch and error.
-// Only sets *server_pushback_md if server_pushback_md != nullptr.
-static void get_call_status(grpc_call_element* elem,
- grpc_metadata_batch* md_batch, grpc_error* error,
- grpc_status_code* status,
- grpc_mdelem** server_pushback_md) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
+void CallData::GetCallStatus(grpc_call_element* elem,
+ grpc_metadata_batch* md_batch, grpc_error* error,
+ grpc_status_code* status,
+ grpc_mdelem** server_pushback_md) {
if (error != GRPC_ERROR_NONE) {
- grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
- nullptr);
+ grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
} else {
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
*status =
GRPC_ERROR_UNREF(error);
}
-// Adds recv_trailing_metadata_ready closure to closures.
-static void add_closure_for_recv_trailing_metadata_ready(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+void CallData::AddClosureForRecvTrailingMetadataReady(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ grpc_error* error, CallCombinerClosureList* closures) {
// Find pending batch.
- pending_batch* pending = pending_batch_find(
+ PendingBatch* pending = PendingBatchFind(
elem, "invoking recv_trailing_metadata for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_trailing_metadata &&
.recv_trailing_metadata_ready != nullptr;
});
// If we generated the recv_trailing_metadata op internally via
- // start_internal_recv_trailing_metadata(), then there will be no
- // pending batch.
+ // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
if (pending == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
// Return metadata.
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
grpc_metadata_batch_move(
&retry_state->recv_trailing_metadata,
// Update bookkeeping.
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
nullptr;
- maybe_clear_pending_batch(elem, pending);
+ MaybeClearPendingBatch(elem, pending);
}
-// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures.
-static void add_closures_for_deferred_recv_callbacks(
- subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+void CallData::AddClosuresForDeferredRecvCallbacks(
+ SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
+ CallCombinerClosureList* closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
+ InvokeRecvInitialMetadataCallback,
retry_state->recv_initial_metadata_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
closures->Add(&retry_state->recv_initial_metadata_ready,
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
- invoke_recv_message_callback,
+ InvokeRecvMessageCallback,
retry_state->recv_message_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
closures->Add(&retry_state->recv_message_ready,
}
}
-// Returns true if any op in the batch was not yet started.
-// Only looks at send ops, since recv ops are always started immediately.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
+bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
+ SubchannelCallRetryState* retry_state) {
if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
return false;
}
return true;
}
if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages.size()) {
+ retry_state->started_send_message_count < send_messages_.size()) {
return true;
}
if (pending->batch->send_trailing_metadata &&
return false;
}
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_call_trace.enabled()) {
+void CallData::AddClosuresToFailUnstartedPendingBatches(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+ grpc_error* error, CallCombinerClosureList* closures) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ PendingBatch* pending = &pending_batches_[i];
+ if (PendingBatchIsUnstarted(pending, retry_state)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: failing unstarted pending batch at index "
"%" PRIuPTR,
- chand, calld, i);
+ chand, this, i);
}
closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
"failing on_complete for pending batch");
pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
+ MaybeClearPendingBatch(elem, pending);
}
}
GRPC_ERROR_UNREF(error);
}
-// Runs necessary closures upon completion of a call attempt.
-static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
- grpc_error* error) {
+void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
+ grpc_error* error) {
grpc_call_element* elem = batch_data->elem;
- call_data* calld = static_cast<call_data*>(elem->call_data);
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
// Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
+ CallCombinerClosureList closures;
// First, add closure for recv_trailing_metadata_ready.
- add_closure_for_recv_trailing_metadata_ready(
- elem, batch_data, GRPC_ERROR_REF(error), &closures);
+ AddClosureForRecvTrailingMetadataReady(elem, batch_data,
+ GRPC_ERROR_REF(error), &closures);
// If there are deferred recv_initial_metadata_ready or recv_message_ready
// callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+ AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
// Add closures to fail any pending batches that have not yet been started.
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), &closures);
+ AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
+ GRPC_ERROR_REF(error), &closures);
// Don't need batch_data anymore.
- batch_data_unref(batch_data);
+ batch_data->Unref();
// Schedule all of the closures identified above.
// Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
+ closures.RunClosures(call_combiner_);
GRPC_ERROR_UNREF(error);
}
-// Intercepts recv_trailing_metadata_ready callback for retries.
-// Commits the call and returns the trailing metadata up the stack.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
+ SubchannelCallBatchData* batch_data =
+ static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
chand, calld, grpc_error_string(error));
}
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
retry_state->completed_recv_trailing_metadata = true;
// Get the call's status and check for server pushback metadata.
grpc_mdelem* server_pushback_md = nullptr;
grpc_metadata_batch* md_batch =
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
- get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
- &server_pushback_md);
- if (grpc_client_channel_call_trace.enabled()) {
+ calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
+ &server_pushback_md);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
calld, grpc_status_code_to_string(status));
}
// Check if we should retry.
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
// Unref batch_data for deferred recv_initial_metadata_ready or
// recv_message_ready callbacks, if any.
if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
+ batch_data->Unref();
GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
}
if (retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
+ batch_data->Unref();
GRPC_ERROR_UNREF(retry_state->recv_message_error);
}
- batch_data_unref(batch_data);
+ batch_data->Unref();
return;
}
// Not retrying, so commit the call.
- retry_commit(elem, retry_state);
+ calld->RetryCommit(elem, retry_state);
// Run any necessary closures.
- run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
+ calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
}
//
// on_complete callback handling
//
-// Adds the on_complete closure for the pending batch completed in
-// batch_data to closures.
-static void add_closure_for_completed_pending_batch(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- grpc_core::CallCombinerClosureList* closures) {
- pending_batch* pending = pending_batch_find(
+void CallData::AddClosuresForCompletedPendingBatch(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state, grpc_error* error,
+ CallCombinerClosureList* closures) {
+ PendingBatch* pending = PendingBatchFind(
elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
// Match the pending batch with the same set of send ops as the
// subchannel batch we've just completed.
closures->Add(pending->batch->on_complete, error,
"on_complete for pending batch");
pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
-}
-
-// If there are any cached ops to replay or pending ops to start on the
-// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches().
-static void add_closures_for_replay_or_pending_send_ops(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+ MaybeClearPendingBatch(elem, pending);
+}
+
+void CallData::AddClosuresForReplayOrPendingSendOps(
+ grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+ SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
bool have_pending_send_message_ops =
- retry_state->started_send_message_count < calld->send_messages.size();
+ retry_state->started_send_message_count < send_messages_.size();
bool have_pending_send_trailing_metadata_op =
- calld->seen_send_trailing_metadata &&
+ seen_send_trailing_metadata_ &&
!retry_state->started_send_trailing_metadata;
if (!have_pending_send_message_ops &&
!have_pending_send_trailing_metadata_op) {
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch == nullptr || pending->send_ops_cached) continue;
if (batch->send_message) have_pending_send_message_ops = true;
}
}
if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting next batch for pending send op(s)",
- chand, calld);
+ chand, this);
}
GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem,
+ StartRetriableSubchannelBatches, elem,
grpc_schedule_on_exec_ctx);
closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
"starting next batch for send_* op(s)");
}
}
-// Callback used to intercept on_complete from subchannel calls.
-// Called only when retries are enabled.
-static void on_complete(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+void CallData::OnComplete(void* arg, grpc_error* error) {
+ SubchannelCallBatchData* batch_data =
+ static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
chand, calld, grpc_error_string(error), batch_str);
gpr_free(batch_str);
}
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
// Update bookkeeping in retry_state.
if (batch_data->batch.send_initial_metadata) {
}
// If the call is committed, free cached data for send ops that we've just
// completed.
- if (calld->retry_committed) {
- free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
+ if (calld->retry_committed_) {
+ calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
}
// Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
+ CallCombinerClosureList closures;
// If a retry was already dispatched, that means we saw
// recv_trailing_metadata before this, so we do nothing here.
// Otherwise, invoke the callback to return the result to the surface.
if (!retry_state->retry_dispatched) {
// Add closure for the completed pending batch, if any.
- add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), &closures);
+ calld->AddClosuresForCompletedPendingBatch(
+ elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
// If needed, add a callback to start any replay or pending send ops on
// the subchannel call.
if (!retry_state->completed_recv_trailing_metadata) {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
&closures);
}
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- --calld->num_pending_retriable_subchannel_send_batches;
+ --calld->num_pending_retriable_subchannel_send_batches_;
const bool last_send_batch_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
+ calld->num_pending_retriable_subchannel_send_batches_ == 0;
// Don't need batch_data anymore.
- batch_data_unref(batch_data);
+ batch_data->Unref();
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- closures.RunClosures(calld->call_combiner);
+ closures.RunClosures(calld->call_combiner_);
// If this was the last subchannel send batch, unref the call stack.
if (last_send_batch_complete) {
- GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+ GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
}
}
// subchannel batch construction
//
-// Helper function used to start a subchannel batch in the call combiner.
-static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
+void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
- grpc_core::SubchannelCall* subchannel_call =
- static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
+ SubchannelCall* subchannel_call =
+ static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
subchannel_call->StartTransportStreamOpBatch(batch);
}
-// Adds a closure to closures that will execute batch in the call combiner.
-static void add_closure_for_subchannel_batch(
+void CallData::AddClosureForSubchannelBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
- grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- batch->handler_private.extra_arg = calld->subchannel_call.get();
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- start_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- if (grpc_client_channel_call_trace.enabled()) {
+ CallCombinerClosureList* closures) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ batch->handler_private.extra_arg = subchannel_call_.get();
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
+ batch, grpc_schedule_on_exec_ctx);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
- calld, batch_str);
+ this, batch_str);
gpr_free(batch_str);
}
closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
"start_subchannel_batch");
}
-// Adds retriable send_initial_metadata op to batch_data.
-static void add_retriable_send_initial_metadata_op(
- call_data* calld, subchannel_call_retry_state* retry_state,
- subchannel_batch_data* batch_data) {
+void CallData::AddRetriableSendInitialMetadataOp(
+ SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data) {
// Maps the number of retries to the corresponding metadata value slice.
static const grpc_slice* retry_count_strings[] = {
&GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
// If we've already completed one or more attempts, add the
// grpc-retry-attempts header.
retry_state->send_initial_metadata_storage =
- static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
- calld->arena, sizeof(grpc_linked_mdelem) *
- (calld->send_initial_metadata.list.count +
- (calld->num_attempts_completed > 0))));
- grpc_metadata_batch_copy(&calld->send_initial_metadata,
+ static_cast<grpc_linked_mdelem*>(arena_->Alloc(
+ sizeof(grpc_linked_mdelem) *
+ (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
+ grpc_metadata_batch_copy(&send_initial_metadata_,
&retry_state->send_initial_metadata,
retry_state->send_initial_metadata_storage);
if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts);
}
- if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
+ if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
grpc_mdelem retry_md = grpc_mdelem_create(
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
- *retry_count_strings[calld->num_attempts_completed - 1], nullptr);
+ *retry_count_strings[num_attempts_completed_ - 1], nullptr);
grpc_error* error = grpc_metadata_batch_add_tail(
&retry_state->send_initial_metadata,
- &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
- .list.count],
+ &retry_state
+ ->send_initial_metadata_storage[send_initial_metadata_.list.count],
retry_md);
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
gpr_log(GPR_ERROR, "error adding retry metadata: %s",
batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
&retry_state->send_initial_metadata;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
- calld->send_initial_metadata_flags;
- batch_data->batch.payload->send_initial_metadata.peer_string =
- calld->peer_string;
-}
-
-// Adds retriable send_message op to batch_data.
-static void add_retriable_send_message_op(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- subchannel_batch_data* batch_data) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ send_initial_metadata_flags_;
+ batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
+}
+
+void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
+ SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
- chand, calld, retry_state->started_send_message_count);
+ chand, this, retry_state->started_send_message_count);
}
- grpc_core::ByteStreamCache* cache =
- calld->send_messages[retry_state->started_send_message_count];
+ ByteStreamCache* cache =
+ send_messages_[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
retry_state->send_message.Init(cache);
batch_data->batch.send_message = true;
retry_state->send_message.get());
}
-// Adds retriable send_trailing_metadata op to batch_data.
-static void add_retriable_send_trailing_metadata_op(
- call_data* calld, subchannel_call_retry_state* retry_state,
- subchannel_batch_data* batch_data) {
+void CallData::AddRetriableSendTrailingMetadataOp(
+ SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data) {
// We need to make a copy of the metadata batch for each attempt, since
// the filters in the subchannel stack may modify this batch, and we don't
// want those modifications to be passed forward to subsequent attempts.
retry_state->send_trailing_metadata_storage =
- static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
- calld->arena, sizeof(grpc_linked_mdelem) *
- calld->send_trailing_metadata.list.count));
- grpc_metadata_batch_copy(&calld->send_trailing_metadata,
+ static_cast<grpc_linked_mdelem*>(arena_->Alloc(
+ sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
+ grpc_metadata_batch_copy(&send_trailing_metadata_,
&retry_state->send_trailing_metadata,
retry_state->send_trailing_metadata_storage);
retry_state->started_send_trailing_metadata = true;
&retry_state->send_trailing_metadata;
}
-// Adds retriable recv_initial_metadata op to batch_data.
-static void add_retriable_recv_initial_metadata_op(
- call_data* calld, subchannel_call_retry_state* retry_state,
- subchannel_batch_data* batch_data) {
+void CallData::AddRetriableRecvInitialMetadataOp(
+ SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data) {
retry_state->started_recv_initial_metadata = true;
batch_data->batch.recv_initial_metadata = true;
grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
&retry_state->trailing_metadata_available;
GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
- recv_initial_metadata_ready, batch_data,
+ RecvInitialMetadataReady, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
&retry_state->recv_initial_metadata_ready;
}
-// Adds retriable recv_message op to batch_data.
-static void add_retriable_recv_message_op(
- call_data* calld, subchannel_call_retry_state* retry_state,
- subchannel_batch_data* batch_data) {
+void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data) {
++retry_state->started_recv_message_count;
batch_data->batch.recv_message = true;
batch_data->batch.payload->recv_message.recv_message =
&retry_state->recv_message;
- GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
+ GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
batch_data, grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_message.recv_message_ready =
&retry_state->recv_message_ready;
}
-// Adds retriable recv_trailing_metadata op to batch_data.
-static void add_retriable_recv_trailing_metadata_op(
- call_data* calld, subchannel_call_retry_state* retry_state,
- subchannel_batch_data* batch_data) {
+void CallData::AddRetriableRecvTrailingMetadataOp(
+ SubchannelCallRetryState* retry_state,
+ SubchannelCallBatchData* batch_data) {
retry_state->started_recv_trailing_metadata = true;
batch_data->batch.recv_trailing_metadata = true;
grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&retry_state->collect_stats;
GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, batch_data,
+ RecvTrailingMetadataReady, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata_ready =
&retry_state->recv_trailing_metadata_ready;
- maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
- &batch_data->batch);
-}
-
-// Helper function used to start a recv_trailing_metadata batch. This
-// is used in the case where a recv_initial_metadata or recv_message
-// op fails in a way that we know the call is over but when the application
-// has not yet started its own recv_trailing_metadata op.
-static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
+ &batch_data->batch);
+}
+
+void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: call failed but recv_trailing_metadata not "
"started; starting it internally",
- chand, calld);
+ chand, this);
}
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- calld->subchannel_call->GetParentData());
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
// Create batch_data with 2 refs, since this batch will be unreffed twice:
// once for the recv_trailing_metadata_ready callback when the subchannel
// batch returns, and again when we actually get a recv_trailing_metadata
// op from the surface.
- subchannel_batch_data* batch_data =
- batch_data_create(elem, 2, false /* set_on_complete */);
- add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
+ SubchannelCallBatchData* batch_data =
+ SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
+ AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
- calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch);
+ subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
}
// If there are any cached send ops that need to be replayed on the
// current subchannel call, creates and returns a new subchannel batch
// to replay those ops. Otherwise, returns nullptr.
-static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- subchannel_batch_data* replay_batch_data = nullptr;
+CallData::SubchannelCallBatchData*
+CallData::MaybeCreateSubchannelBatchForReplay(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ SubchannelCallBatchData* replay_batch_data = nullptr;
// send_initial_metadata.
- if (calld->seen_send_initial_metadata &&
+ if (seen_send_initial_metadata_ &&
!retry_state->started_send_initial_metadata &&
- !calld->pending_send_initial_metadata) {
- if (grpc_client_channel_call_trace.enabled()) {
+ !pending_send_initial_metadata_) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_initial_metadata op",
- chand, calld);
+ chand, this);
}
- replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
- add_retriable_send_initial_metadata_op(calld, retry_state,
- replay_batch_data);
+ replay_batch_data =
+ SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
+ AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
}
// send_message.
// Note that we can only have one send_message op in flight at a time.
- if (retry_state->started_send_message_count < calld->send_messages.size() &&
+ if (retry_state->started_send_message_count < send_messages_.size() &&
retry_state->started_send_message_count ==
retry_state->completed_send_message_count &&
- !calld->pending_send_message) {
- if (grpc_client_channel_call_trace.enabled()) {
+ !pending_send_message_) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_message op",
- chand, calld);
+ chand, this);
}
if (replay_batch_data == nullptr) {
replay_batch_data =
- batch_data_create(elem, 1, true /* set_on_complete */);
+ SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
}
- add_retriable_send_message_op(elem, retry_state, replay_batch_data);
+ AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
}
// send_trailing_metadata.
// Note that we only add this op if we have no more send_message ops
// to start, since we can't send down any more send_message ops after
// send_trailing_metadata.
- if (calld->seen_send_trailing_metadata &&
- retry_state->started_send_message_count == calld->send_messages.size() &&
+ if (seen_send_trailing_metadata_ &&
+ retry_state->started_send_message_count == send_messages_.size() &&
!retry_state->started_send_trailing_metadata &&
- !calld->pending_send_trailing_metadata) {
- if (grpc_client_channel_call_trace.enabled()) {
+ !pending_send_trailing_metadata_) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_trailing_metadata op",
- chand, calld);
+ chand, this);
}
if (replay_batch_data == nullptr) {
replay_batch_data =
- batch_data_create(elem, 1, true /* set_on_complete */);
+ SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
}
- add_retriable_send_trailing_metadata_op(calld, retry_state,
- replay_batch_data);
+ AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
}
return replay_batch_data;
}
-// Adds subchannel batches for pending batches to batches, updating
-// *num_batches as needed.
-static void add_subchannel_batches_for_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
+void CallData::AddSubchannelBatchesForPendingBatches(
+ grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+ CallCombinerClosureList* closures) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+ PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch == nullptr) continue;
// Skip any batch that either (a) has already been started on this
// send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message <
- calld->send_messages.size() ||
+ send_messages_.size() ||
retry_state->started_send_trailing_metadata)) {
continue;
}
if (batch->recv_trailing_metadata &&
retry_state->started_recv_trailing_metadata) {
// If we previously completed a recv_trailing_metadata op
- // initiated by start_internal_recv_trailing_metadata(), use the
+ // initiated by StartInternalRecvTrailingMetadata(), use the
// result of that instead of trying to re-start this op.
if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
nullptr))) {
"re-executing recv_trailing_metadata_ready to propagate "
"internally triggered result");
} else {
- batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
+ retry_state->recv_trailing_metadata_internal_batch->Unref();
}
retry_state->recv_trailing_metadata_internal_batch = nullptr;
}
continue;
}
// If we're not retrying, just send the batch as-is.
- if (calld->method_params == nullptr ||
- calld->method_params->retry_policy() == nullptr ||
- calld->retry_committed) {
- add_closure_for_subchannel_batch(elem, batch, closures);
- pending_batch_clear(calld, pending);
+ if (method_params_ == nullptr ||
+ method_params_->retry_policy() == nullptr || retry_committed_) {
+ // TODO(roth) : We should probably call
+ // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
+ AddClosureForSubchannelBatch(elem, batch, closures);
+ PendingBatchClear(pending);
continue;
}
// Create batch with the right number of callbacks.
const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
batch->recv_message +
batch->recv_trailing_metadata;
- subchannel_batch_data* batch_data = batch_data_create(
+ SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
- maybe_cache_send_ops_for_batch(calld, pending);
+ MaybeCacheSendOpsForBatch(pending);
// send_initial_metadata.
if (batch->send_initial_metadata) {
- add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
+ AddRetriableSendInitialMetadataOp(retry_state, batch_data);
}
// send_message.
if (batch->send_message) {
- add_retriable_send_message_op(elem, retry_state, batch_data);
+ AddRetriableSendMessageOp(elem, retry_state, batch_data);
}
// send_trailing_metadata.
if (batch->send_trailing_metadata) {
- add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
+ AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
}
// recv_initial_metadata.
if (batch->recv_initial_metadata) {
// recv_flags is only used on the server side.
GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
- add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
+ AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
}
// recv_message.
if (batch->recv_message) {
- add_retriable_recv_message_op(calld, retry_state, batch_data);
+ AddRetriableRecvMessageOp(retry_state, batch_data);
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
- add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
+ AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
}
- add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
+ AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
batch->send_trailing_metadata) {
- if (calld->num_pending_retriable_subchannel_send_batches == 0) {
- GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
+ if (num_pending_retriable_subchannel_send_batches_ == 0) {
+ GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
}
- ++calld->num_pending_retriable_subchannel_send_batches;
+ ++num_pending_retriable_subchannel_send_batches_;
}
}
}
-// Constructs and starts whatever subchannel batches are needed on the
-// subchannel call.
-static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
+void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_call_trace.enabled()) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
chand, calld);
}
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- calld->subchannel_call->GetParentData());
+ SubchannelCallRetryState* retry_state =
+ static_cast<SubchannelCallRetryState*>(
+ calld->subchannel_call_->GetParentData());
// Construct list of closures to execute, one for each pending batch.
- grpc_core::CallCombinerClosureList closures;
+ CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
- subchannel_batch_data* replay_batch_data =
- maybe_create_subchannel_batch_for_replay(elem, retry_state);
+ SubchannelCallBatchData* replay_batch_data =
+ calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
- &closures);
+ calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
+ &closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
- if (calld->num_pending_retriable_subchannel_send_batches == 0) {
- GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
+ if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
+ GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
}
- ++calld->num_pending_retriable_subchannel_send_batches;
+ ++calld->num_pending_retriable_subchannel_send_batches_;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
+ calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
// Start batches on subchannel call.
- if (grpc_client_channel_call_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, closures.size(), calld->subchannel_call.get());
+ chand, calld, closures.size(), calld->subchannel_call_.get());
}
// Note: This will yield the call combiner.
- closures.RunClosures(calld->call_combiner);
+ closures.RunClosures(calld->call_combiner_);
}
//
// LB pick
//
-static void create_subchannel_call(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+void CallData::CreateSubchannelCall(grpc_call_element* elem) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
const size_t parent_data_size =
- calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
- const grpc_core::ConnectedSubchannel::CallArgs call_args = {
- calld->pollent, // pollent
- calld->path, // path
- calld->call_start_time, // start_time
- calld->deadline, // deadline
- calld->arena, // arena
+ enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
+ const ConnectedSubchannel::CallArgs call_args = {
+ pollent_, path_, call_start_time_, deadline_, arena_,
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
- calld->call_context, // context
- calld->call_combiner, // call_combiner
- parent_data_size // parent_data_size
- };
+ call_context_, call_combiner_, parent_data_size};
grpc_error* error = GRPC_ERROR_NONE;
- calld->subchannel_call =
- calld->pick.pick.connected_subchannel->CreateCall(call_args, &error);
- if (grpc_client_channel_routing_trace.enabled()) {
+ subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
- chand, calld, calld->subchannel_call.get(),
- grpc_error_string(error));
+ chand, this, subchannel_call_.get(), grpc_error_string(error));
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
- pending_batches_fail(elem, error, yield_call_combiner);
+ PendingBatchesFail(elem, error, YieldCallCombiner);
} else {
if (parent_data_size > 0) {
- new (calld->subchannel_call->GetParentData())
- subchannel_call_retry_state(calld->call_context);
+ new (subchannel_call_->GetParentData())
+ SubchannelCallRetryState(call_context_);
}
- pending_batches_resume(elem);
+ PendingBatchesResume(elem);
}
}
-// Invoked when a pick is completed, on both success or failure.
-static void pick_done(void* arg, grpc_error* error) {
+void CallData::PickDone(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
if (error != GRPC_ERROR_NONE) {
- if (grpc_client_channel_routing_trace.enabled()) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
calld, grpc_error_string(error));
}
- pending_batches_fail(elem, GRPC_ERROR_REF(error), yield_call_combiner);
+ calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
return;
}
- create_subchannel_call(elem);
+ calld->CreateSubchannelCall(elem);
}
-namespace grpc_core {
-namespace {
-
// A class to handle the call combiner cancellation callback for a
// queued pick.
-class QueuedPickCanceller {
+class CallData::QueuedPickCanceller {
public:
explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
- auto* calld = static_cast<call_data*>(elem->call_data);
- auto* chand = static_cast<channel_data*>(elem->channel_data);
- GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
- grpc_combiner_scheduler(chand->combiner));
- grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
+ grpc_combiner_scheduler(chand->data_plane_combiner()));
+ calld->call_combiner_->SetNotifyOnCancel(&closure_);
}
private:
static void CancelLocked(void* arg, grpc_error* error) {
auto* self = static_cast<QueuedPickCanceller*>(arg);
- auto* chand = static_cast<channel_data*>(self->elem_->channel_data);
- auto* calld = static_cast<call_data*>(self->elem_->call_data);
- if (grpc_client_channel_routing_trace.enabled()) {
+ auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
+ auto* calld = static_cast<CallData*>(self->elem_->call_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: cancelling queued pick: "
"error=%s self=%p calld->pick_canceller=%p",
chand, calld, grpc_error_string(error), self,
- calld->pick_canceller);
+ calld->pick_canceller_);
}
- if (calld->pick_canceller == self && error != GRPC_ERROR_NONE) {
+ if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
// Remove pick from list of queued picks.
- remove_call_from_queued_picks_locked(self->elem_);
+ calld->RemoveCallFromQueuedPicksLocked(self->elem_);
// Fail pending batches on the call.
- pending_batches_fail(self->elem_, GRPC_ERROR_REF(error),
- yield_call_combiner_if_pending_batches_found);
+ calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
+ YieldCallCombinerIfPendingBatchesFound);
}
- GRPC_CALL_STACK_UNREF(calld->owning_call, "QueuedPickCanceller");
+ GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
Delete(self);
}
grpc_closure closure_;
};
-} // namespace
-} // namespace grpc_core
-
-// Removes the call from the channel's list of queued picks.
-static void remove_call_from_queued_picks_locked(grpc_call_element* elem) {
- auto* chand = static_cast<channel_data*>(elem->channel_data);
- auto* calld = static_cast<call_data*>(elem->call_data);
- for (QueuedPick** pick = &chand->queued_picks; *pick != nullptr;
- pick = &(*pick)->next) {
- if (*pick == &calld->pick) {
- if (grpc_client_channel_routing_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
- chand, calld);
- }
- calld->pick_queued = false;
- *pick = calld->pick.next;
- // Remove call's pollent from channel's interested_parties.
- grpc_polling_entity_del_from_pollset_set(calld->pollent,
- chand->interested_parties);
- // Lame the call combiner canceller.
- calld->pick_canceller = nullptr;
- break;
- }
+void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
+ chand, this);
}
+ chand->RemoveQueuedPick(&pick_, pollent_);
+ pick_queued_ = false;
+ // Lame the call combiner canceller.
+ pick_canceller_ = nullptr;
}
-// Adds the call to the channel's list of queued picks.
-static void add_call_to_queued_picks_locked(grpc_call_element* elem) {
- auto* chand = static_cast<channel_data*>(elem->channel_data);
- auto* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_routing_trace.enabled()) {
+void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
- calld);
+ this);
}
- calld->pick_queued = true;
- // Add call to queued picks list.
- calld->pick.elem = elem;
- calld->pick.next = chand->queued_picks;
- chand->queued_picks = &calld->pick;
- // Add call's pollent to channel's interested_parties, so that I/O
- // can be done under the call's CQ.
- grpc_polling_entity_add_to_pollset_set(calld->pollent,
- chand->interested_parties);
+ pick_queued_ = true;
+ pick_.elem = elem;
+ chand->AddQueuedPick(&pick_, pollent_);
// Register call combiner cancellation callback.
- calld->pick_canceller = grpc_core::New<grpc_core::QueuedPickCanceller>(elem);
+ pick_canceller_ = New<QueuedPickCanceller>(elem);
}
-// Applies service config to the call. Must be invoked once we know
-// that the resolver has returned results to the channel.
-static void apply_service_config_to_call_locked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_routing_trace.enabled()) {
+void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
- chand, calld);
- }
- if (chand->retry_throttle_data != nullptr) {
- calld->retry_throttle_data = chand->retry_throttle_data->Ref();
- }
- if (chand->method_params_table != nullptr) {
- calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
- *chand->method_params_table, calld->path);
- if (calld->method_params != nullptr) {
- // If the deadline from the service config is shorter than the one
- // from the client API, reset the deadline timer.
- if (chand->deadline_checking_enabled &&
- calld->method_params->timeout() != 0) {
- const grpc_millis per_method_deadline =
- grpc_timespec_to_millis_round_up(calld->call_start_time) +
- calld->method_params->timeout();
- if (per_method_deadline < calld->deadline) {
- calld->deadline = per_method_deadline;
- grpc_deadline_state_reset(elem, calld->deadline);
- }
+ chand, this);
+ }
+ // Store a ref to the service_config in service_config_call_data_. Also, save
+ // a pointer to this in the call_context so that all future filters can access
+ // it.
+ service_config_call_data_ =
+ ServiceConfig::CallData(chand->service_config(), path_);
+ if (service_config_call_data_.service_config() != nullptr) {
+ call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
+ &service_config_call_data_;
+ method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
+ service_config_call_data_.GetMethodParsedConfig(
+ internal::ClientChannelServiceConfigParser::ParserIndex()));
+ }
+ retry_throttle_data_ = chand->retry_throttle_data();
+ if (method_params_ != nullptr) {
+ // If the deadline from the service config is shorter than the one
+ // from the client API, reset the deadline timer.
+ if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
+ const grpc_millis per_method_deadline =
+ grpc_timespec_to_millis_round_up(call_start_time_) +
+ method_params_->timeout();
+ if (per_method_deadline < deadline_) {
+ deadline_ = per_method_deadline;
+ grpc_deadline_state_reset(elem, deadline_);
}
- // If the service config set wait_for_ready and the application
- // did not explicitly set it, use the value from the service config.
- uint32_t* send_initial_metadata_flags =
- &calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- if (GPR_UNLIKELY(
- calld->method_params->wait_for_ready() !=
- ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
- !(*send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
- if (calld->method_params->wait_for_ready() ==
- ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
- *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
- } else {
- *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
- }
+ }
+ // If the service config set wait_for_ready and the application
+ // did not explicitly set it, use the value from the service config.
+ uint32_t* send_initial_metadata_flags =
+ &pending_batches_[0]
+ .batch->payload->send_initial_metadata.send_initial_metadata_flags;
+ if (method_params_->wait_for_ready().has_value() &&
+ !(*send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
+ if (method_params_->wait_for_ready().value()) {
+ *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ } else {
+ *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
}
// If no retry policy, disable retries.
// TODO(roth): Remove this when adding support for transparent retries.
- if (calld->method_params == nullptr ||
- calld->method_params->retry_policy() == nullptr) {
- calld->enable_retries = false;
+ if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
+ enable_retries_ = false;
}
}
-// Invoked once resolver results are available.
-static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Apply service config data to the call only once, and only if the
// channel has the data available.
- if (GPR_LIKELY(chand->have_service_config &&
- !calld->service_config_applied)) {
- calld->service_config_applied = true;
- apply_service_config_to_call_locked(elem);
+ if (GPR_LIKELY(chand->received_service_config_data() &&
+ !service_config_applied_)) {
+ service_config_applied_ = true;
+ ApplyServiceConfigToCallLocked(elem);
}
}
-static const char* pick_result_name(LoadBalancingPolicy::PickResult result) {
- switch (result) {
- case LoadBalancingPolicy::PICK_COMPLETE:
+const char* PickResultTypeName(
+ LoadBalancingPolicy::PickResult::ResultType type) {
+ switch (type) {
+ case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
return "COMPLETE";
- case LoadBalancingPolicy::PICK_QUEUE:
+ case LoadBalancingPolicy::PickResult::PICK_QUEUE:
return "QUEUE";
- case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
+ case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE:
return "TRANSIENT_FAILURE";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-static void start_pick_locked(void* arg, grpc_error* error) {
+void CallData::StartPickLocked(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- GPR_ASSERT(calld->pick.pick.connected_subchannel == nullptr);
- GPR_ASSERT(calld->subchannel_call == nullptr);
+ CallData* calld = static_cast<CallData*>(elem->call_data);
+ ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+ GPR_ASSERT(calld->connected_subchannel_ == nullptr);
+ GPR_ASSERT(calld->subchannel_call_ == nullptr);
+ // Apply service config to call if needed.
+ calld->MaybeApplyServiceConfigToCallLocked(elem);
// If this is a retry, use the send_initial_metadata payload that
// we've cached; otherwise, use the pending batch. The
// send_initial_metadata batch will be the first pending batch in the
- // list, as set by get_batch_index() above.
+ // list, as set by GetBatchIndex() above.
// TODO(roth): What if the LB policy needs to add something to the
// call's initial metadata, and then there's a retry? We don't want
// the new metadata to be added twice. We might need to somehow
// allocate the subchannel batch earlier so that we can give the
// subchannel's copy of the metadata batch (which is copied for each
// attempt) to the LB policy instead the one from the parent channel.
- calld->pick.pick.initial_metadata =
- calld->seen_send_initial_metadata
- ? &calld->send_initial_metadata
- : calld->pending_batches[0]
+ LoadBalancingPolicy::PickArgs pick_args;
+ pick_args.call_state = &calld->lb_call_state_;
+ pick_args.initial_metadata =
+ calld->seen_send_initial_metadata_
+ ? &calld->send_initial_metadata_
+ : calld->pending_batches_[0]
.batch->payload->send_initial_metadata.send_initial_metadata;
- uint32_t* send_initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? &calld->send_initial_metadata_flags
- : &calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- // Apply service config to call if needed.
- maybe_apply_service_config_to_call_locked(elem);
- // When done, we schedule this closure to leave the channel combiner.
- GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
+ // Grab initial metadata flags so that we can check later if the call has
+ // wait_for_ready enabled.
+ const uint32_t send_initial_metadata_flags =
+ calld->seen_send_initial_metadata_
+ ? calld->send_initial_metadata_flags_
+ : calld->pending_batches_[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ // When done, we schedule this closure to leave the data plane combiner.
+ GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
grpc_schedule_on_exec_ctx);
// Attempt pick.
- error = GRPC_ERROR_NONE;
- auto pick_result = chand->picker->Pick(&calld->pick.pick, &error);
- if (grpc_client_channel_routing_trace.enabled()) {
+ auto result = chand->picker()->Pick(pick_args);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
"error=%s)",
- chand, calld, pick_result_name(pick_result),
- calld->pick.pick.connected_subchannel.get(),
- grpc_error_string(error));
+ chand, calld, PickResultTypeName(result.type),
+ result.connected_subchannel.get(), grpc_error_string(result.error));
}
- switch (pick_result) {
- case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
+ switch (result.type) {
+ case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
// If we're shutting down, fail all RPCs.
- if (chand->disconnect_error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(error);
- GRPC_CLOSURE_SCHED(&calld->pick_closure,
- GRPC_ERROR_REF(chand->disconnect_error));
+ grpc_error* disconnect_error = chand->disconnect_error();
+ if (disconnect_error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(result.error);
+ GRPC_CLOSURE_SCHED(&calld->pick_closure_,
+ GRPC_ERROR_REF(disconnect_error));
break;
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
- if ((*send_initial_metadata_flags &
+ if ((send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
// Retry if appropriate; otherwise, fail.
grpc_status_code status = GRPC_STATUS_OK;
- grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
- nullptr);
- if (!calld->enable_retries ||
- !maybe_retry(elem, nullptr /* batch_data */, status,
- nullptr /* server_pushback_md */)) {
+ grpc_error_get_status(result.error, calld->deadline_, &status, nullptr,
+ nullptr, nullptr);
+ if (!calld->enable_retries_ ||
+ !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
+ nullptr /* server_pushback_md */)) {
grpc_error* new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to create subchannel", &error, 1);
- GRPC_ERROR_UNREF(error);
- GRPC_CLOSURE_SCHED(&calld->pick_closure, new_error);
+ "Failed to pick subchannel", &result.error, 1);
+ GRPC_ERROR_UNREF(result.error);
+ GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
}
- if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
+ if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
break;
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
- GRPC_ERROR_UNREF(error);
- // Fallthrough
- case LoadBalancingPolicy::PICK_QUEUE:
- if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
+ GRPC_ERROR_UNREF(result.error);
+ }
+ // Fallthrough
+ case LoadBalancingPolicy::PickResult::PICK_QUEUE:
+ if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
break;
default: // PICK_COMPLETE
// Handle drops.
- if (GPR_UNLIKELY(calld->pick.pick.connected_subchannel == nullptr)) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
+ result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
}
- GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
- if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
- }
-}
-
-//
-// filter call vtable functions
-//
-
-static void cc_start_transport_stream_op_batch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
- GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- if (GPR_LIKELY(chand->deadline_checking_enabled)) {
- grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
- }
- // If we've previously been cancelled, immediately fail any new batches.
- if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
- chand, calld, grpc_error_string(calld->cancel_error));
- }
- // Note: This will release the call combiner.
- grpc_transport_stream_op_batch_finish_with_failure(
- batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
- return;
- }
- // Handle cancellation.
- if (GPR_UNLIKELY(batch->cancel_stream)) {
- // Stash a copy of cancel_error in our call data, so that we can use
- // it for subsequent operations. This ensures that if the call is
- // cancelled before any batches are passed down (e.g., if the deadline
- // is in the past when the call starts), we can return the right
- // error to the caller when the first batch does get passed down.
- GRPC_ERROR_UNREF(calld->cancel_error);
- calld->cancel_error =
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
- calld, grpc_error_string(calld->cancel_error));
- }
- // If we do not have a subchannel call (i.e., a pick has not yet
- // been started), fail all pending batches. Otherwise, send the
- // cancellation down to the subchannel call.
- if (calld->subchannel_call == nullptr) {
- // TODO(roth): If there is a pending retry callback, do we need to
- // cancel it here?
- pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
- no_yield_call_combiner);
- // Note: This will release the call combiner.
- grpc_transport_stream_op_batch_finish_with_failure(
- batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
- } else {
- // Note: This will release the call combiner.
- calld->subchannel_call->StartTransportStreamOpBatch(batch);
- }
- return;
- }
- // Add the batch to the pending list.
- pending_batches_add(elem, batch);
- // Check if we've already gotten a subchannel call.
- // Note that once we have completed the pick, we do not need to enter
- // the channel combiner, which is more efficient (especially for
- // streaming calls).
- if (calld->subchannel_call != nullptr) {
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
- calld, calld->subchannel_call.get());
- }
- pending_batches_resume(elem);
- return;
- }
- // We do not yet have a subchannel call.
- // For batches containing a send_initial_metadata op, enter the channel
- // combiner to start a pick.
- if (GPR_LIKELY(batch->send_initial_metadata)) {
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
- chand, calld);
- }
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
- elem, grpc_combiner_scheduler(chand->combiner)),
- GRPC_ERROR_NONE);
- } else {
- // For all other batches, release the call combiner.
- if (grpc_client_channel_call_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: saved batch, yielding call combiner", chand,
- calld);
- }
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "batch does not include send_initial_metadata");
- }
-}
-
-/* Constructor for call_data */
-static grpc_error* cc_init_call_elem(grpc_call_element* elem,
- const grpc_call_element_args* args) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- new (elem->call_data) call_data(elem, *chand, *args);
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for call_data */
-static void cc_destroy_call_elem(grpc_call_element* elem,
- const grpc_call_final_info* final_info,
- grpc_closure* then_schedule_closure) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
- calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
- then_schedule_closure = nullptr;
+ calld->connected_subchannel_ = std::move(result.connected_subchannel);
+ calld->lb_recv_trailing_metadata_ready_ =
+ result.recv_trailing_metadata_ready;
+ calld->lb_recv_trailing_metadata_ready_user_data_ =
+ result.recv_trailing_metadata_ready_user_data;
+ GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error);
+ if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
}
- calld->~call_data();
- GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
- grpc_polling_entity* pollent) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- calld->pollent = pollent;
-}
+} // namespace
+} // namespace grpc_core
/*************************************************************************
* EXPORTED SYMBOLS
*/
+using grpc_core::CallData;
+using grpc_core::ChannelData;
+
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_stream_op_batch,
- cc_start_transport_op,
- sizeof(call_data),
- cc_init_call_elem,
- cc_set_pollset_or_pollset_set,
- cc_destroy_call_elem,
- sizeof(channel_data),
- cc_init_channel_elem,
- cc_destroy_channel_elem,
- cc_get_channel_info,
+ CallData::StartTransportStreamOpBatch,
+ ChannelData::StartTransportOp,
+ sizeof(CallData),
+ CallData::Init,
+ CallData::SetPollent,
+ CallData::Destroy,
+ sizeof(ChannelData),
+ ChannelData::Init,
+ ChannelData::Destroy,
+ ChannelData::GetChannelInfo,
"client-channel",
};
-void grpc_client_channel_set_channelz_node(
- grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- chand->channelz_node = node;
- chand->resolving_lb_policy->set_channelz_node(node->Ref());
-}
-
-void grpc_client_channel_populate_child_refs(
- grpc_channel_element* elem,
- grpc_core::channelz::ChildRefsList* child_subchannels,
- grpc_core::channelz::ChildRefsList* child_channels) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- if (chand->resolving_lb_policy != nullptr) {
- chand->resolving_lb_policy->FillChildRefsForChannelz(child_subchannels,
- child_channels);
- }
-}
-
-static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
- channel_data* chand = static_cast<channel_data*>(arg);
- chand->resolving_lb_policy->ExitIdleLocked();
- GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
-}
-
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- grpc_connectivity_state out =
- grpc_connectivity_state_check(&chand->state_tracker);
- if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
- grpc_combiner_scheduler(chand->combiner)),
- GRPC_ERROR_NONE);
- }
- return out;
-}
-
-typedef struct external_connectivity_watcher {
- channel_data* chand;
- grpc_polling_entity pollent;
- grpc_closure* on_complete;
- grpc_closure* watcher_timer_init;
- grpc_connectivity_state* state;
- grpc_closure my_closure;
- struct external_connectivity_watcher* next;
-} external_connectivity_watcher;
-
-static external_connectivity_watcher* lookup_external_connectivity_watcher(
- channel_data* chand, grpc_closure* on_complete) {
- gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- external_connectivity_watcher* w =
- chand->external_connectivity_watcher_list_head;
- while (w != nullptr && w->on_complete != on_complete) {
- w = w->next;
- }
- gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
- return w;
-}
-
-static void external_connectivity_watcher_list_append(
- channel_data* chand, external_connectivity_watcher* w) {
- GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
-
- gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
- GPR_ASSERT(!w->next);
- w->next = chand->external_connectivity_watcher_list_head;
- chand->external_connectivity_watcher_list_head = w;
- gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
-}
-
-static void external_connectivity_watcher_list_remove(
- channel_data* chand, external_connectivity_watcher* to_remove) {
- GPR_ASSERT(
- lookup_external_connectivity_watcher(chand, to_remove->on_complete));
- gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- if (to_remove == chand->external_connectivity_watcher_list_head) {
- chand->external_connectivity_watcher_list_head = to_remove->next;
- gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
- return;
- }
- external_connectivity_watcher* w =
- chand->external_connectivity_watcher_list_head;
- while (w != nullptr) {
- if (w->next == to_remove) {
- w->next = w->next->next;
- gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
- return;
- }
- w = w->next;
- }
- GPR_UNREACHABLE_CODE(return );
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ return chand->CheckConnectivityState(try_to_connect);
}
int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- int count = 0;
-
- gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- external_connectivity_watcher* w =
- chand->external_connectivity_watcher_list_head;
- while (w != nullptr) {
- count++;
- w = w->next;
- }
- gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
-
- return count;
-}
-
-static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
- external_connectivity_watcher* w =
- static_cast<external_connectivity_watcher*>(arg);
- grpc_closure* follow_up = w->on_complete;
- grpc_polling_entity_del_from_pollset_set(&w->pollent,
- w->chand->interested_parties);
- GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
- "external_connectivity_watcher");
- external_connectivity_watcher_list_remove(w->chand, w);
- gpr_free(w);
- GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
-}
-
-static void watch_connectivity_state_locked(void* arg,
- grpc_error* error_ignored) {
- external_connectivity_watcher* w =
- static_cast<external_connectivity_watcher*>(arg);
- external_connectivity_watcher* found = nullptr;
- if (w->state != nullptr) {
- external_connectivity_watcher_list_append(w->chand, w);
- // An assumption is being made that the closure is scheduled on the exec ctx
- // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
- GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
- GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
- grpc_combiner_scheduler(w->chand->combiner));
- grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
- w->state, &w->my_closure);
- } else {
- GPR_ASSERT(w->watcher_timer_init == nullptr);
- found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
- if (found) {
- GPR_ASSERT(found->on_complete == w->on_complete);
- grpc_connectivity_state_notify_on_state_change(
- &found->chand->state_tracker, nullptr, &found->my_closure);
- }
- grpc_polling_entity_del_from_pollset_set(&w->pollent,
- w->chand->interested_parties);
- GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
- "external_connectivity_watcher");
- gpr_free(w);
- }
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ return chand->NumExternalConnectivityWatchers();
}
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element* elem, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* closure,
grpc_closure* watcher_timer_init) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- external_connectivity_watcher* w =
- static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
- w->chand = chand;
- w->pollent = pollent;
- w->on_complete = closure;
- w->state = state;
- w->watcher_timer_init = watcher_timer_init;
- grpc_polling_entity_add_to_pollset_set(&w->pollent,
- chand->interested_parties);
- GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
- "external_connectivity_watcher");
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
- grpc_combiner_scheduler(chand->combiner)),
- GRPC_ERROR_NONE);
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ return chand->AddExternalConnectivityWatcher(pollent, state, closure,
+ watcher_timer_init);
}
grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- return calld->subchannel_call;
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ return calld->subchannel_call();
}