3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/iomgr/combiner.h"
55 #include "src/core/lib/iomgr/iomgr.h"
56 #include "src/core/lib/iomgr/polling_entity.h"
57 #include "src/core/lib/profiling/timers.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/transport/connectivity_state.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/static_metadata.h"
66 #include "src/core/lib/transport/status_metadata.h"
68 using grpc_core::internal::ClientChannelMethodParams;
69 using grpc_core::internal::ClientChannelMethodParamsTable;
70 using grpc_core::internal::ProcessedResolverResult;
71 using grpc_core::internal::ServerRetryThrottleData;
73 using grpc_core::LoadBalancingPolicy;
75 /* Client channel implementation */
77 // By default, we buffer 256 KiB per RPC for retries.
78 // TODO(roth): Do we have any data to suggest a better value?
79 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
81 // This value was picked arbitrarily. It can be changed if there is
82 // any even moderately compelling reason to do so.
83 #define RETRY_BACKOFF_JITTER 0.2
85 grpc_core::TraceFlag grpc_client_channel_call_trace(false,
86 "client_channel_call");
87 grpc_core::TraceFlag grpc_client_channel_routing_trace(
88 false, "client_channel_routing");
90 /*************************************************************************
91 * CHANNEL-WIDE FUNCTIONS
94 struct external_connectivity_watcher;
97 LoadBalancingPolicy::PickArgs pick;
98 grpc_call_element* elem;
99 QueuedPick* next = nullptr;
102 typedef struct client_channel_channel_data {
103 bool deadline_checking_enabled;
105 size_t per_rpc_retry_buffer_size;
107 /** combiner protecting all variables below in this data structure */
108 grpc_combiner* combiner;
110 grpc_channel_stack* owning_stack;
111 /** interested parties (owned) */
112 grpc_pollset_set* interested_parties;
113 // Client channel factory.
114 grpc_core::ClientChannelFactory* client_channel_factory;
116 grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
118 grpc_core::channelz::ClientChannelNode* channelz_node;
120 // Resolving LB policy.
121 grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
122 // Subchannel picker from LB policy.
123 grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
124 // Linked list of queued picks.
125 QueuedPick* queued_picks;
127 bool have_service_config;
128 /** retry throttle data from service config */
129 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
130 /** per-method service config data */
131 grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
133 /* the following properties are guarded by a mutex since APIs require them
134 to be instantaneously available */
136 grpc_core::UniquePtr<char> info_lb_policy_name;
137 grpc_core::UniquePtr<char> info_service_config_json;
139 grpc_connectivity_state_tracker state_tracker;
140 grpc_error* disconnect_error;
142 /* external_connectivity_watcher_list head is guarded by its own mutex, since
143 * counts need to be grabbed immediately without polling on a cq */
144 gpr_mu external_connectivity_watcher_list_mu;
145 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
148 // Forward declarations.
149 static void start_pick_locked(void* arg, grpc_error* ignored);
150 static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem);
152 static const char* get_channel_connectivity_state_change_string(
153 grpc_connectivity_state state) {
155 case GRPC_CHANNEL_IDLE:
156 return "Channel state change to IDLE";
157 case GRPC_CHANNEL_CONNECTING:
158 return "Channel state change to CONNECTING";
159 case GRPC_CHANNEL_READY:
160 return "Channel state change to READY";
161 case GRPC_CHANNEL_TRANSIENT_FAILURE:
162 return "Channel state change to TRANSIENT_FAILURE";
163 case GRPC_CHANNEL_SHUTDOWN:
164 return "Channel state change to SHUTDOWN";
166 GPR_UNREACHABLE_CODE(return "UNKNOWN");
169 static void set_connectivity_state_and_picker_locked(
170 channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
172 grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
173 // Update connectivity state.
174 grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
176 if (chand->channelz_node != nullptr) {
177 chand->channelz_node->AddTraceEvent(
178 grpc_core::channelz::ChannelTrace::Severity::Info,
179 grpc_slice_from_static_string(
180 get_channel_connectivity_state_change_string(state)));
183 chand->picker = std::move(picker);
184 // Re-process queued picks.
185 for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
187 start_pick_locked(pick->elem, GRPC_ERROR_NONE);
191 namespace grpc_core {
194 class ClientChannelControlHelper
195 : public LoadBalancingPolicy::ChannelControlHelper {
197 explicit ClientChannelControlHelper(channel_data* chand) : chand_(chand) {
198 GRPC_CHANNEL_STACK_REF(chand_->owning_stack, "ClientChannelControlHelper");
201 ~ClientChannelControlHelper() override {
202 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack,
203 "ClientChannelControlHelper");
206 Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
207 grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
208 chand_->subchannel_pool.get());
209 grpc_channel_args* new_args =
210 grpc_channel_args_copy_and_add(&args, &arg, 1);
211 Subchannel* subchannel =
212 chand_->client_channel_factory->CreateSubchannel(new_args);
213 grpc_channel_args_destroy(new_args);
217 grpc_channel* CreateChannel(const char* target,
218 const grpc_channel_args& args) override {
219 return chand_->client_channel_factory->CreateChannel(target, &args);
223 grpc_connectivity_state state, grpc_error* state_error,
224 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
225 if (grpc_client_channel_routing_trace.enabled()) {
226 const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
228 : " (ignoring -- channel shutting down)";
229 gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
230 chand_, grpc_connectivity_state_name(state),
231 grpc_error_string(state_error), picker.get(), extra);
233 // Do update only if not shutting down.
234 if (chand_->disconnect_error == GRPC_ERROR_NONE) {
235 set_connectivity_state_and_picker_locked(chand_, state, state_error,
236 "helper", std::move(picker));
238 GRPC_ERROR_UNREF(state_error);
242 // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
243 void RequestReresolution() override {}
246 channel_data* chand_;
250 } // namespace grpc_core
252 // Synchronous callback from chand->resolving_lb_policy to process a resolver
254 static bool process_resolver_result_locked(
255 void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
256 grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
257 channel_data* chand = static_cast<channel_data*>(arg);
258 chand->have_service_config = true;
259 ProcessedResolverResult resolver_result(result, chand->enable_retries);
260 grpc_core::UniquePtr<char> service_config_json =
261 resolver_result.service_config_json();
262 if (grpc_client_channel_routing_trace.enabled()) {
263 gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
264 chand, service_config_json.get());
266 // Update channel state.
267 chand->retry_throttle_data = resolver_result.retry_throttle_data();
268 chand->method_params_table = resolver_result.method_params_table();
269 // Swap out the data used by cc_get_channel_info().
270 gpr_mu_lock(&chand->info_mu);
271 chand->info_lb_policy_name = resolver_result.lb_policy_name();
272 const bool service_config_changed =
273 ((service_config_json == nullptr) !=
274 (chand->info_service_config_json == nullptr)) ||
275 (service_config_json != nullptr &&
276 strcmp(service_config_json.get(),
277 chand->info_service_config_json.get()) != 0);
278 chand->info_service_config_json = std::move(service_config_json);
279 gpr_mu_unlock(&chand->info_mu);
281 *lb_policy_name = chand->info_lb_policy_name.get();
282 *lb_policy_config = resolver_result.lb_policy_config();
283 // Apply service config to queued picks.
284 for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
286 maybe_apply_service_config_to_call_locked(pick->elem);
288 return service_config_changed;
291 static grpc_error* do_ping_locked(channel_data* chand, grpc_transport_op* op) {
292 grpc_error* error = GRPC_ERROR_NONE;
293 grpc_connectivity_state state =
294 grpc_connectivity_state_get(&chand->state_tracker, &error);
295 if (state != GRPC_CHANNEL_READY) {
296 grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
297 "channel not connected", &error, 1);
298 GRPC_ERROR_UNREF(error);
301 LoadBalancingPolicy::PickArgs pick;
302 chand->picker->Pick(&pick, &error);
303 if (pick.connected_subchannel != nullptr) {
304 pick.connected_subchannel->Ping(op->send_ping.on_initiate,
305 op->send_ping.on_ack);
307 if (error == GRPC_ERROR_NONE) {
308 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
309 "LB policy dropped call on ping");
315 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
316 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
317 grpc_channel_element* elem =
318 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
319 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
321 if (op->on_connectivity_state_change != nullptr) {
322 grpc_connectivity_state_notify_on_state_change(
323 &chand->state_tracker, op->connectivity_state,
324 op->on_connectivity_state_change);
325 op->on_connectivity_state_change = nullptr;
326 op->connectivity_state = nullptr;
329 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
330 grpc_error* error = do_ping_locked(chand, op);
331 if (error != GRPC_ERROR_NONE) {
332 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
333 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
335 op->bind_pollset = nullptr;
336 op->send_ping.on_initiate = nullptr;
337 op->send_ping.on_ack = nullptr;
340 if (op->reset_connect_backoff) {
341 chand->resolving_lb_policy->ResetBackoffLocked();
344 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
345 chand->disconnect_error = op->disconnect_with_error;
346 grpc_pollset_set_del_pollset_set(
347 chand->resolving_lb_policy->interested_parties(),
348 chand->interested_parties);
349 chand->resolving_lb_policy.reset();
350 set_connectivity_state_and_picker_locked(
351 chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
353 grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
354 grpc_core::New<LoadBalancingPolicy::TransientFailurePicker>(
355 GRPC_ERROR_REF(op->disconnect_with_error))));
358 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
359 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
362 static void cc_start_transport_op(grpc_channel_element* elem,
363 grpc_transport_op* op) {
364 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
366 GPR_ASSERT(op->set_accept_stream == false);
367 if (op->bind_pollset != nullptr) {
368 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
371 op->handler_private.extra_arg = elem;
372 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
374 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
375 op, grpc_combiner_scheduler(chand->combiner)),
379 static void cc_get_channel_info(grpc_channel_element* elem,
380 const grpc_channel_info* info) {
381 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
382 gpr_mu_lock(&chand->info_mu);
383 if (info->lb_policy_name != nullptr) {
384 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
386 if (info->service_config_json != nullptr) {
387 *info->service_config_json =
388 gpr_strdup(chand->info_service_config_json.get());
390 gpr_mu_unlock(&chand->info_mu);
393 /* Constructor for channel_data */
394 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
395 grpc_channel_element_args* args) {
396 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
397 GPR_ASSERT(args->is_last);
398 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
399 // Initialize data members.
400 chand->combiner = grpc_combiner_create();
401 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
403 chand->disconnect_error = GRPC_ERROR_NONE;
404 gpr_mu_init(&chand->info_mu);
405 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
407 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
408 chand->external_connectivity_watcher_list_head = nullptr;
409 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
411 chand->owning_stack = args->channel_stack;
412 chand->deadline_checking_enabled =
413 grpc_deadline_checking_enabled(args->channel_args);
414 chand->interested_parties = grpc_pollset_set_create();
415 grpc_client_channel_start_backup_polling(chand->interested_parties);
416 // Record max per-RPC retry buffer size.
417 const grpc_arg* arg = grpc_channel_args_find(
418 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
419 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
420 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
421 // Record enable_retries.
422 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
423 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
424 // Record client channel factory.
425 chand->client_channel_factory =
426 grpc_core::ClientChannelFactory::GetFromChannelArgs(args->channel_args);
427 if (chand->client_channel_factory == nullptr) {
428 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
429 "Missing client channel factory in args for client channel filter");
431 // Get server name to resolve, using proxy mapper if needed.
432 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
433 if (arg == nullptr) {
434 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
435 "Missing server uri in args for client channel filter");
437 if (arg->type != GRPC_ARG_STRING) {
438 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
439 "server uri arg must be a string");
441 char* proxy_name = nullptr;
442 grpc_channel_args* new_args = nullptr;
443 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
444 &proxy_name, &new_args);
445 grpc_core::UniquePtr<char> target_uri(
446 proxy_name != nullptr ? proxy_name : gpr_strdup(arg->value.string));
447 // Instantiate subchannel pool.
448 arg = grpc_channel_args_find(args->channel_args,
449 GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL);
450 if (grpc_channel_arg_get_bool(arg, false)) {
451 chand->subchannel_pool =
452 grpc_core::MakeRefCounted<grpc_core::LocalSubchannelPool>();
454 chand->subchannel_pool = grpc_core::GlobalSubchannelPool::instance();
456 // Instantiate resolving LB policy.
457 LoadBalancingPolicy::Args lb_args;
458 lb_args.combiner = chand->combiner;
459 lb_args.channel_control_helper =
460 grpc_core::UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
461 grpc_core::New<grpc_core::ClientChannelControlHelper>(chand));
462 lb_args.args = new_args != nullptr ? new_args : args->channel_args;
463 grpc_error* error = GRPC_ERROR_NONE;
464 chand->resolving_lb_policy.reset(
465 grpc_core::New<grpc_core::ResolvingLoadBalancingPolicy>(
466 std::move(lb_args), &grpc_client_channel_routing_trace,
467 std::move(target_uri), process_resolver_result_locked, chand,
469 grpc_channel_args_destroy(new_args);
470 if (error != GRPC_ERROR_NONE) {
471 // Orphan the resolving LB policy and flush the exec_ctx to ensure
472 // that it finishes shutting down. This ensures that if we are
473 // failing, we destroy the ClientChannelControlHelper (and thus
474 // unref the channel stack) before we return.
475 // TODO(roth): This is not a complete solution, because it only
476 // catches the case where channel stack initialization fails in this
477 // particular filter. If there is a failure in a different filter, we
478 // will leave a dangling ref here, which can cause a crash. Fortunately,
479 // in practice, there are no other filters that can cause failures in
480 // channel stack initialization, so this works for now.
481 chand->resolving_lb_policy.reset();
482 grpc_core::ExecCtx::Get()->Flush();
484 grpc_pollset_set_add_pollset_set(
485 chand->resolving_lb_policy->interested_parties(),
486 chand->interested_parties);
487 if (grpc_client_channel_routing_trace.enabled()) {
488 gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", chand,
489 chand->resolving_lb_policy.get());
495 /* Destructor for channel_data */
496 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
497 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
498 if (chand->resolving_lb_policy != nullptr) {
499 grpc_pollset_set_del_pollset_set(
500 chand->resolving_lb_policy->interested_parties(),
501 chand->interested_parties);
502 chand->resolving_lb_policy.reset();
504 // TODO(roth): Once we convert the filter API to C++, there will no
505 // longer be any need to explicitly reset these smart pointer data members.
506 chand->picker.reset();
507 chand->subchannel_pool.reset();
508 chand->info_lb_policy_name.reset();
509 chand->info_service_config_json.reset();
510 chand->retry_throttle_data.reset();
511 chand->method_params_table.reset();
512 grpc_client_channel_stop_backup_polling(chand->interested_parties);
513 grpc_pollset_set_destroy(chand->interested_parties);
514 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
515 GRPC_ERROR_UNREF(chand->disconnect_error);
516 grpc_connectivity_state_destroy(&chand->state_tracker);
517 gpr_mu_destroy(&chand->info_mu);
518 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
521 /*************************************************************************
525 // Max number of batches that can be pending on a call at any given
526 // time. This includes one batch for each of the following ops:
527 // recv_initial_metadata
528 // send_initial_metadata
531 // recv_trailing_metadata
532 // send_trailing_metadata
533 #define MAX_PENDING_BATCHES 6
537 // In order to support retries, we act as a proxy for stream op batches.
538 // When we get a batch from the surface, we add it to our list of pending
539 // batches, and we then use those batches to construct separate "child"
540 // batches to be started on the subchannel call. When the child batches
541 // return, we then decide which pending batches have been completed and
542 // schedule their callbacks accordingly. If a subchannel call fails and
543 // we want to retry it, we do a new pick and start again, constructing
544 // new "child" batches for the new subchannel call.
546 // Note that retries are committed when receiving data from the server
547 // (except for Trailers-Only responses). However, there may be many
548 // send ops started before receiving any data, so we may have already
549 // completed some number of send ops (and returned the completions up to
550 // the surface) by the time we realize that we need to retry. To deal
551 // with this, we cache data for send ops, so that we can replay them on a
552 // different subchannel call even after we have completed the original
555 // There are two sets of data to maintain:
556 // - In call_data (in the parent channel), we maintain a list of pending
557 // ops and cached data for send ops.
558 // - In the subchannel call, we maintain state to indicate what ops have
559 // already been sent down to that call.
561 // When constructing the "child" batches, we compare those two sets of
562 // data to see which batches need to be sent to the subchannel call.
564 // TODO(roth): In subsequent PRs:
565 // - add support for transparent retries (including initial metadata)
566 // - figure out how to record stats in census for retries
567 // (census filter is on top of this one)
568 // - add census stats for retries
570 namespace grpc_core {
572 class QueuedPickCanceller;
574 } // namespace grpc_core
580 // State used for starting a retryable batch on a subchannel call.
581 // This provides its own grpc_transport_stream_op_batch and other data
582 // structures needed to populate the ops in the batch.
583 // We allocate one struct on the arena for each attempt at starting a
584 // batch on a given subchannel call.
585 struct subchannel_batch_data {
586 subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount,
587 bool set_on_complete);
588 // All dtor code must be added in `destroy`. This is because we may
589 // call closures in `subchannel_batch_data` after they are unrefed by
590 // `batch_data_unref`, and msan would complain about accessing this class
591 // after calling dtor. As a result we cannot call the `dtor` in
592 // `batch_data_unref`.
593 // TODO(soheil): We should try to call the dtor in `batch_data_unref`.
594 ~subchannel_batch_data() { destroy(); }
598 grpc_call_element* elem;
599 grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
600 // The batch to use in the subchannel call.
601 // Its payload field points to subchannel_call_retry_state.batch_payload.
602 grpc_transport_stream_op_batch batch;
603 // For intercepting on_complete.
604 grpc_closure on_complete;
607 // Retry state associated with a subchannel call.
608 // Stored in the parent_data of the subchannel call object.
609 struct subchannel_call_retry_state {
610 explicit subchannel_call_retry_state(grpc_call_context_element* context)
611 : batch_payload(context),
612 started_send_initial_metadata(false),
613 completed_send_initial_metadata(false),
614 started_send_trailing_metadata(false),
615 completed_send_trailing_metadata(false),
616 started_recv_initial_metadata(false),
617 completed_recv_initial_metadata(false),
618 started_recv_trailing_metadata(false),
619 completed_recv_trailing_metadata(false),
620 retry_dispatched(false) {}
622 // subchannel_batch_data.batch.payload points to this.
623 grpc_transport_stream_op_batch_payload batch_payload;
624 // For send_initial_metadata.
625 // Note that we need to make a copy of the initial metadata for each
626 // subchannel call instead of just referring to the copy in call_data,
627 // because filters in the subchannel stack will probably add entries,
628 // so we need to start in a pristine state for each attempt of the call.
629 grpc_linked_mdelem* send_initial_metadata_storage;
630 grpc_metadata_batch send_initial_metadata;
632 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
634 // For send_trailing_metadata.
635 grpc_linked_mdelem* send_trailing_metadata_storage;
636 grpc_metadata_batch send_trailing_metadata;
637 // For intercepting recv_initial_metadata.
638 grpc_metadata_batch recv_initial_metadata;
639 grpc_closure recv_initial_metadata_ready;
640 bool trailing_metadata_available = false;
641 // For intercepting recv_message.
642 grpc_closure recv_message_ready;
643 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
644 // For intercepting recv_trailing_metadata.
645 grpc_metadata_batch recv_trailing_metadata;
646 grpc_transport_stream_stats collect_stats;
647 grpc_closure recv_trailing_metadata_ready;
648 // These fields indicate which ops have been started and completed on
649 // this subchannel call.
650 size_t started_send_message_count = 0;
651 size_t completed_send_message_count = 0;
652 size_t started_recv_message_count = 0;
653 size_t completed_recv_message_count = 0;
654 bool started_send_initial_metadata : 1;
655 bool completed_send_initial_metadata : 1;
656 bool started_send_trailing_metadata : 1;
657 bool completed_send_trailing_metadata : 1;
658 bool started_recv_initial_metadata : 1;
659 bool completed_recv_initial_metadata : 1;
660 bool started_recv_trailing_metadata : 1;
661 bool completed_recv_trailing_metadata : 1;
662 // State for callback processing.
663 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
664 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
665 subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
666 grpc_error* recv_message_error = GRPC_ERROR_NONE;
667 subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
668 // NOTE: Do not move this next to the metadata bitfields above. That would
669 // save space but will also result in a data race because compiler will
670 // generate a 2 byte store which overwrites the meta-data fields upon
671 // setting this field.
672 bool retry_dispatched : 1;
675 // Pending batches stored in call data.
676 struct pending_batch {
677 // The pending batch. If nullptr, this slot is empty.
678 grpc_transport_stream_op_batch* batch;
679 // Indicates whether payload for send ops has been cached in call data.
680 bool send_ops_cached;
683 /** Call data. Holds a pointer to SubchannelCall and the
684 associated machinery to create such a pointer.
685 Handles queueing of stream ops until a call object is ready, waiting
686 for initial metadata before trying to create a call object,
687 and handling cancellation gracefully. */
689 call_data(grpc_call_element* elem, const channel_data& chand,
690 const grpc_call_element_args& args)
691 : deadline_state(elem, args.call_stack, args.call_combiner,
692 GPR_LIKELY(chand.deadline_checking_enabled)
694 : GRPC_MILLIS_INF_FUTURE),
695 path(grpc_slice_ref_internal(args.path)),
696 call_start_time(args.start_time),
697 deadline(args.deadline),
699 owning_call(args.call_stack),
700 call_combiner(args.call_combiner),
701 call_context(args.context),
702 pending_send_initial_metadata(false),
703 pending_send_message(false),
704 pending_send_trailing_metadata(false),
705 enable_retries(chand.enable_retries),
706 retry_committed(false),
707 last_attempt_got_server_pushback(false) {}
710 grpc_slice_unref_internal(path);
711 GRPC_ERROR_UNREF(cancel_error);
712 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
713 GPR_ASSERT(pending_batches[i].batch == nullptr);
717 // State for handling deadlines.
718 // The code in deadline_filter.c requires this to be the first field.
719 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
720 // and this struct both independently store pointers to the call stack
721 // and call combiner. If/when we have time, find a way to avoid this
722 // without breaking the grpc_deadline_state abstraction.
723 grpc_deadline_state deadline_state;
725 grpc_slice path; // Request path.
726 gpr_timespec call_start_time;
727 grpc_millis deadline;
729 grpc_call_stack* owning_call;
730 grpc_call_combiner* call_combiner;
731 grpc_call_context_element* call_context;
733 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
734 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
736 grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
738 // Set when we get a cancel_stream op.
739 grpc_error* cancel_error = GRPC_ERROR_NONE;
742 bool pick_queued = false;
743 bool service_config_applied = false;
744 grpc_core::QueuedPickCanceller* pick_canceller = nullptr;
745 grpc_closure pick_closure;
747 grpc_polling_entity* pollent = nullptr;
749 // Batches are added to this list when received from above.
750 // They are removed when we are done handling the batch (i.e., when
751 // either we have invoked all of the batch's callbacks or we have
752 // passed the batch down to the subchannel call and are not
753 // intercepting any of its callbacks).
754 pending_batch pending_batches[MAX_PENDING_BATCHES] = {};
755 bool pending_send_initial_metadata : 1;
756 bool pending_send_message : 1;
757 bool pending_send_trailing_metadata : 1;
760 bool enable_retries : 1;
761 bool retry_committed : 1;
762 bool last_attempt_got_server_pushback : 1;
763 int num_attempts_completed = 0;
764 size_t bytes_buffered_for_retry = 0;
765 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
766 grpc_timer retry_timer;
768 // The number of pending retriable subchannel batches containing send ops.
769 // We hold a ref to the call stack while this is non-zero, since replay
770 // batches may not complete until after all callbacks have been returned
771 // to the surface, and we need to make sure that the call is not destroyed
772 // until all of these batches have completed.
773 // Note that we actually only need to track replay batches, but it's
774 // easier to track all batches with send ops.
775 int num_pending_retriable_subchannel_send_batches = 0;
777 // Cached data for retrying send ops.
778 // send_initial_metadata
779 bool seen_send_initial_metadata = false;
780 grpc_linked_mdelem* send_initial_metadata_storage = nullptr;
781 grpc_metadata_batch send_initial_metadata;
782 uint32_t send_initial_metadata_flags;
783 gpr_atm* peer_string;
785 // When we get a send_message op, we replace the original byte stream
786 // with a CachingByteStream that caches the slices to a local buffer for
788 // Note: We inline the cache for the first 3 send_message ops and use
789 // dynamic allocation after that. This number was essentially picked
790 // at random; it could be changed in the future to tune performance.
791 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
792 // send_trailing_metadata
793 bool seen_send_trailing_metadata = false;
794 grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
795 grpc_metadata_batch send_trailing_metadata;
800 // Forward declarations.
801 static void retry_commit(grpc_call_element* elem,
802 subchannel_call_retry_state* retry_state);
803 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
804 static void on_complete(void* arg, grpc_error* error);
805 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
806 static void remove_call_from_queued_picks_locked(grpc_call_element* elem);
809 // send op data caching
812 // Caches data for send ops so that it can be retried later, if not
814 static void maybe_cache_send_ops_for_batch(call_data* calld,
815 pending_batch* pending) {
816 if (pending->send_ops_cached) return;
817 pending->send_ops_cached = true;
818 grpc_transport_stream_op_batch* batch = pending->batch;
819 // Save a copy of metadata for send_initial_metadata ops.
820 if (batch->send_initial_metadata) {
821 calld->seen_send_initial_metadata = true;
822 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
823 grpc_metadata_batch* send_initial_metadata =
824 batch->payload->send_initial_metadata.send_initial_metadata;
825 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
827 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
828 grpc_metadata_batch_copy(send_initial_metadata,
829 &calld->send_initial_metadata,
830 calld->send_initial_metadata_storage);
831 calld->send_initial_metadata_flags =
832 batch->payload->send_initial_metadata.send_initial_metadata_flags;
833 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
835 // Set up cache for send_message ops.
836 if (batch->send_message) {
837 grpc_core::ByteStreamCache* cache =
838 static_cast<grpc_core::ByteStreamCache*>(
839 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
840 new (cache) grpc_core::ByteStreamCache(
841 std::move(batch->payload->send_message.send_message));
842 calld->send_messages.push_back(cache);
844 // Save metadata batch for send_trailing_metadata ops.
845 if (batch->send_trailing_metadata) {
846 calld->seen_send_trailing_metadata = true;
847 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
848 grpc_metadata_batch* send_trailing_metadata =
849 batch->payload->send_trailing_metadata.send_trailing_metadata;
850 calld->send_trailing_metadata_storage =
851 (grpc_linked_mdelem*)gpr_arena_alloc(
853 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
854 grpc_metadata_batch_copy(send_trailing_metadata,
855 &calld->send_trailing_metadata,
856 calld->send_trailing_metadata_storage);
860 // Frees cached send_initial_metadata.
861 static void free_cached_send_initial_metadata(channel_data* chand,
863 if (grpc_client_channel_call_trace.enabled()) {
865 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
868 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
871 // Frees cached send_message at index idx.
872 static void free_cached_send_message(channel_data* chand, call_data* calld,
874 if (grpc_client_channel_call_trace.enabled()) {
876 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
879 calld->send_messages[idx]->Destroy();
882 // Frees cached send_trailing_metadata.
883 static void free_cached_send_trailing_metadata(channel_data* chand,
885 if (grpc_client_channel_call_trace.enabled()) {
887 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
890 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
893 // Frees cached send ops that have already been completed after
894 // committing the call.
895 static void free_cached_send_op_data_after_commit(
896 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
897 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
898 call_data* calld = static_cast<call_data*>(elem->call_data);
899 if (retry_state->completed_send_initial_metadata) {
900 free_cached_send_initial_metadata(chand, calld);
902 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
903 free_cached_send_message(chand, calld, i);
905 if (retry_state->completed_send_trailing_metadata) {
906 free_cached_send_trailing_metadata(chand, calld);
910 // Frees cached send ops that were completed by the completed batch in
911 // batch_data. Used when batches are completed after the call is committed.
912 static void free_cached_send_op_data_for_completed_batch(
913 grpc_call_element* elem, subchannel_batch_data* batch_data,
914 subchannel_call_retry_state* retry_state) {
915 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
916 call_data* calld = static_cast<call_data*>(elem->call_data);
917 if (batch_data->batch.send_initial_metadata) {
918 free_cached_send_initial_metadata(chand, calld);
920 if (batch_data->batch.send_message) {
921 free_cached_send_message(chand, calld,
922 retry_state->completed_send_message_count - 1);
924 if (batch_data->batch.send_trailing_metadata) {
925 free_cached_send_trailing_metadata(chand, calld);
930 // LB recv_trailing_metadata_ready handling
933 void maybe_inject_recv_trailing_metadata_ready_for_lb(
934 const LoadBalancingPolicy::PickArgs& pick,
935 grpc_transport_stream_op_batch* batch) {
936 if (pick.recv_trailing_metadata_ready != nullptr) {
937 *pick.original_recv_trailing_metadata_ready =
938 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
939 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
940 pick.recv_trailing_metadata_ready;
941 if (pick.recv_trailing_metadata != nullptr) {
942 *pick.recv_trailing_metadata =
943 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
949 // pending_batches management
952 // Returns the index into calld->pending_batches to be used for batch.
953 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
954 // Note: It is important the send_initial_metadata be the first entry
955 // here, since the code in pick_subchannel_locked() assumes it will be.
956 if (batch->send_initial_metadata) return 0;
957 if (batch->send_message) return 1;
958 if (batch->send_trailing_metadata) return 2;
959 if (batch->recv_initial_metadata) return 3;
960 if (batch->recv_message) return 4;
961 if (batch->recv_trailing_metadata) return 5;
962 GPR_UNREACHABLE_CODE(return (size_t)-1);
965 // This is called via the call combiner, so access to calld is synchronized.
966 static void pending_batches_add(grpc_call_element* elem,
967 grpc_transport_stream_op_batch* batch) {
968 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
969 call_data* calld = static_cast<call_data*>(elem->call_data);
970 const size_t idx = get_batch_index(batch);
971 if (grpc_client_channel_call_trace.enabled()) {
973 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
976 pending_batch* pending = &calld->pending_batches[idx];
977 GPR_ASSERT(pending->batch == nullptr);
978 pending->batch = batch;
979 pending->send_ops_cached = false;
980 if (calld->enable_retries) {
981 // Update state in calld about pending batches.
982 // Also check if the batch takes us over the retry buffer limit.
983 // Note: We don't check the size of trailing metadata here, because
984 // gRPC clients do not send trailing metadata.
985 if (batch->send_initial_metadata) {
986 calld->pending_send_initial_metadata = true;
987 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
988 batch->payload->send_initial_metadata.send_initial_metadata);
990 if (batch->send_message) {
991 calld->pending_send_message = true;
992 calld->bytes_buffered_for_retry +=
993 batch->payload->send_message.send_message->length();
995 if (batch->send_trailing_metadata) {
996 calld->pending_send_trailing_metadata = true;
998 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
999 chand->per_rpc_retry_buffer_size)) {
1000 if (grpc_client_channel_call_trace.enabled()) {
1002 "chand=%p calld=%p: exceeded retry buffer size, committing",
1005 subchannel_call_retry_state* retry_state =
1006 calld->subchannel_call == nullptr
1008 : static_cast<subchannel_call_retry_state*>(
1010 calld->subchannel_call->GetParentData());
1011 retry_commit(elem, retry_state);
1012 // If we are not going to retry and have not yet started, pretend
1013 // retries are disabled so that we don't bother with retry overhead.
1014 if (calld->num_attempts_completed == 0) {
1015 if (grpc_client_channel_call_trace.enabled()) {
1017 "chand=%p calld=%p: disabling retries before first attempt",
1020 calld->enable_retries = false;
1026 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1027 if (calld->enable_retries) {
1028 if (pending->batch->send_initial_metadata) {
1029 calld->pending_send_initial_metadata = false;
1031 if (pending->batch->send_message) {
1032 calld->pending_send_message = false;
1034 if (pending->batch->send_trailing_metadata) {
1035 calld->pending_send_trailing_metadata = false;
1038 pending->batch = nullptr;
1041 // This is called via the call combiner, so access to calld is synchronized.
1042 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1043 grpc_transport_stream_op_batch* batch =
1044 static_cast<grpc_transport_stream_op_batch*>(arg);
1045 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1046 // Note: This will release the call combiner.
1047 grpc_transport_stream_op_batch_finish_with_failure(
1048 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1051 // This is called via the call combiner, so access to calld is synchronized.
1052 // If yield_call_combiner_predicate returns true, assumes responsibility for
1053 // yielding the call combiner.
1054 typedef bool (*YieldCallCombinerPredicate)(
1055 const grpc_core::CallCombinerClosureList& closures);
1056 static bool yield_call_combiner(
1057 const grpc_core::CallCombinerClosureList& closures) {
1060 static bool no_yield_call_combiner(
1061 const grpc_core::CallCombinerClosureList& closures) {
1064 static bool yield_call_combiner_if_pending_batches_found(
1065 const grpc_core::CallCombinerClosureList& closures) {
1066 return closures.size() > 0;
1068 static void pending_batches_fail(
1069 grpc_call_element* elem, grpc_error* error,
1070 YieldCallCombinerPredicate yield_call_combiner_predicate) {
1071 GPR_ASSERT(error != GRPC_ERROR_NONE);
1072 call_data* calld = static_cast<call_data*>(elem->call_data);
1073 if (grpc_client_channel_call_trace.enabled()) {
1074 size_t num_batches = 0;
1075 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1076 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1079 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1080 elem->channel_data, calld, num_batches, grpc_error_string(error));
1082 grpc_core::CallCombinerClosureList closures;
1083 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1084 pending_batch* pending = &calld->pending_batches[i];
1085 grpc_transport_stream_op_batch* batch = pending->batch;
1086 if (batch != nullptr) {
1087 if (batch->recv_trailing_metadata) {
1088 maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
1091 batch->handler_private.extra_arg = calld;
1092 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1093 fail_pending_batch_in_call_combiner, batch,
1094 grpc_schedule_on_exec_ctx);
1095 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1096 "pending_batches_fail");
1097 pending_batch_clear(calld, pending);
1100 if (yield_call_combiner_predicate(closures)) {
1101 closures.RunClosures(calld->call_combiner);
1103 closures.RunClosuresWithoutYielding(calld->call_combiner);
1105 GRPC_ERROR_UNREF(error);
1108 // This is called via the call combiner, so access to calld is synchronized.
1109 static void resume_pending_batch_in_call_combiner(void* arg,
1110 grpc_error* ignored) {
1111 grpc_transport_stream_op_batch* batch =
1112 static_cast<grpc_transport_stream_op_batch*>(arg);
1113 grpc_core::SubchannelCall* subchannel_call =
1114 static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
1115 // Note: This will release the call combiner.
1116 subchannel_call->StartTransportStreamOpBatch(batch);
1119 // This is called via the call combiner, so access to calld is synchronized.
1120 static void pending_batches_resume(grpc_call_element* elem) {
1121 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1122 call_data* calld = static_cast<call_data*>(elem->call_data);
1123 if (calld->enable_retries) {
1124 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1127 // Retries not enabled; send down batches as-is.
1128 if (grpc_client_channel_call_trace.enabled()) {
1129 size_t num_batches = 0;
1130 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1131 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1134 "chand=%p calld=%p: starting %" PRIuPTR
1135 " pending batches on subchannel_call=%p",
1136 chand, calld, num_batches, calld->subchannel_call.get());
1138 grpc_core::CallCombinerClosureList closures;
1139 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1140 pending_batch* pending = &calld->pending_batches[i];
1141 grpc_transport_stream_op_batch* batch = pending->batch;
1142 if (batch != nullptr) {
1143 if (batch->recv_trailing_metadata) {
1144 maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
1147 batch->handler_private.extra_arg = calld->subchannel_call.get();
1148 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1149 resume_pending_batch_in_call_combiner, batch,
1150 grpc_schedule_on_exec_ctx);
1151 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1152 "pending_batches_resume");
1153 pending_batch_clear(calld, pending);
1156 // Note: This will release the call combiner.
1157 closures.RunClosures(calld->call_combiner);
1160 static void maybe_clear_pending_batch(grpc_call_element* elem,
1161 pending_batch* pending) {
1162 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1163 call_data* calld = static_cast<call_data*>(elem->call_data);
1164 grpc_transport_stream_op_batch* batch = pending->batch;
1165 // We clear the pending batch if all of its callbacks have been
1166 // scheduled and reset to nullptr.
1167 if (batch->on_complete == nullptr &&
1168 (!batch->recv_initial_metadata ||
1169 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1171 (!batch->recv_message ||
1172 batch->payload->recv_message.recv_message_ready == nullptr) &&
1173 (!batch->recv_trailing_metadata ||
1174 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1176 if (grpc_client_channel_call_trace.enabled()) {
1177 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
1180 pending_batch_clear(calld, pending);
1184 // Returns a pointer to the first pending batch for which predicate(batch)
1185 // returns true, or null if not found.
1186 template <typename Predicate>
1187 static pending_batch* pending_batch_find(grpc_call_element* elem,
1188 const char* log_message,
1189 Predicate predicate) {
1190 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1191 call_data* calld = static_cast<call_data*>(elem->call_data);
1192 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1193 pending_batch* pending = &calld->pending_batches[i];
1194 grpc_transport_stream_op_batch* batch = pending->batch;
1195 if (batch != nullptr && predicate(batch)) {
1196 if (grpc_client_channel_call_trace.enabled()) {
1198 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1199 calld, log_message, i);
1211 // Commits the call so that no further retry attempts will be performed.
1212 static void retry_commit(grpc_call_element* elem,
1213 subchannel_call_retry_state* retry_state) {
1214 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1215 call_data* calld = static_cast<call_data*>(elem->call_data);
1216 if (calld->retry_committed) return;
1217 calld->retry_committed = true;
1218 if (grpc_client_channel_call_trace.enabled()) {
1219 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
1221 if (retry_state != nullptr) {
1222 free_cached_send_op_data_after_commit(elem, retry_state);
1226 // Starts a retry after appropriate back-off.
1227 static void do_retry(grpc_call_element* elem,
1228 subchannel_call_retry_state* retry_state,
1229 grpc_millis server_pushback_ms) {
1230 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1231 call_data* calld = static_cast<call_data*>(elem->call_data);
1232 GPR_ASSERT(calld->method_params != nullptr);
1233 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1234 calld->method_params->retry_policy();
1235 GPR_ASSERT(retry_policy != nullptr);
1236 // Reset subchannel call and connected subchannel.
1237 calld->subchannel_call.reset();
1238 calld->pick.pick.connected_subchannel.reset();
1239 // Compute backoff delay.
1240 grpc_millis next_attempt_time;
1241 if (server_pushback_ms >= 0) {
1242 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1243 calld->last_attempt_got_server_pushback = true;
1245 if (calld->num_attempts_completed == 1 ||
1246 calld->last_attempt_got_server_pushback) {
1247 calld->retry_backoff.Init(
1248 grpc_core::BackOff::Options()
1249 .set_initial_backoff(retry_policy->initial_backoff)
1250 .set_multiplier(retry_policy->backoff_multiplier)
1251 .set_jitter(RETRY_BACKOFF_JITTER)
1252 .set_max_backoff(retry_policy->max_backoff));
1253 calld->last_attempt_got_server_pushback = false;
1255 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1257 if (grpc_client_channel_call_trace.enabled()) {
1259 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
1260 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1262 // Schedule retry after computed delay.
1263 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1264 grpc_combiner_scheduler(chand->combiner));
1265 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1266 // Update bookkeeping.
1267 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1270 // Returns true if the call is being retried.
1271 static bool maybe_retry(grpc_call_element* elem,
1272 subchannel_batch_data* batch_data,
1273 grpc_status_code status,
1274 grpc_mdelem* server_pushback_md) {
1275 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1276 call_data* calld = static_cast<call_data*>(elem->call_data);
1277 // Get retry policy.
1278 if (calld->method_params == nullptr) return false;
1279 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1280 calld->method_params->retry_policy();
1281 if (retry_policy == nullptr) return false;
1282 // If we've already dispatched a retry from this call, return true.
1283 // This catches the case where the batch has multiple callbacks
1284 // (i.e., it includes either recv_message or recv_initial_metadata).
1285 subchannel_call_retry_state* retry_state = nullptr;
1286 if (batch_data != nullptr) {
1287 retry_state = static_cast<subchannel_call_retry_state*>(
1288 batch_data->subchannel_call->GetParentData());
1289 if (retry_state->retry_dispatched) {
1290 if (grpc_client_channel_call_trace.enabled()) {
1291 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
1298 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
1299 if (calld->retry_throttle_data != nullptr) {
1300 calld->retry_throttle_data->RecordSuccess();
1302 if (grpc_client_channel_call_trace.enabled()) {
1303 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
1307 // Status is not OK. Check whether the status is retryable.
1308 if (!retry_policy->retryable_status_codes.Contains(status)) {
1309 if (grpc_client_channel_call_trace.enabled()) {
1311 "chand=%p calld=%p: status %s not configured as retryable", chand,
1312 calld, grpc_status_code_to_string(status));
1316 // Record the failure and check whether retries are throttled.
1317 // Note that it's important for this check to come after the status
1318 // code check above, since we should only record failures whose statuses
1319 // match the configured retryable status codes, so that we don't count
1320 // things like failures due to malformed requests (INVALID_ARGUMENT).
1321 // Conversely, it's important for this to come before the remaining
1322 // checks, so that we don't fail to record failures due to other factors.
1323 if (calld->retry_throttle_data != nullptr &&
1324 !calld->retry_throttle_data->RecordFailure()) {
1325 if (grpc_client_channel_call_trace.enabled()) {
1326 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
1330 // Check whether the call is committed.
1331 if (calld->retry_committed) {
1332 if (grpc_client_channel_call_trace.enabled()) {
1333 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
1338 // Check whether we have retries remaining.
1339 ++calld->num_attempts_completed;
1340 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1341 if (grpc_client_channel_call_trace.enabled()) {
1342 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1343 calld, retry_policy->max_attempts);
1347 // If the call was cancelled from the surface, don't retry.
1348 if (calld->cancel_error != GRPC_ERROR_NONE) {
1349 if (grpc_client_channel_call_trace.enabled()) {
1351 "chand=%p calld=%p: call cancelled from surface, not retrying",
1356 // Check server push-back.
1357 grpc_millis server_pushback_ms = -1;
1358 if (server_pushback_md != nullptr) {
1359 // If the value is "-1" or any other unparseable string, we do not retry.
1361 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1362 if (grpc_client_channel_call_trace.enabled()) {
1364 "chand=%p calld=%p: not retrying due to server push-back",
1369 if (grpc_client_channel_call_trace.enabled()) {
1370 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1373 server_pushback_ms = (grpc_millis)ms;
1376 do_retry(elem, retry_state, server_pushback_ms);
1381 // subchannel_batch_data
1386 subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
1387 call_data* calld, int refcount,
1388 bool set_on_complete)
1389 : elem(elem), subchannel_call(calld->subchannel_call) {
1390 subchannel_call_retry_state* retry_state =
1391 static_cast<subchannel_call_retry_state*>(
1392 calld->subchannel_call->GetParentData());
1393 batch.payload = &retry_state->batch_payload;
1394 gpr_ref_init(&refs, refcount);
1395 if (set_on_complete) {
1396 GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this,
1397 grpc_schedule_on_exec_ctx);
1398 batch.on_complete = &on_complete;
1400 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1403 void subchannel_batch_data::destroy() {
1404 subchannel_call_retry_state* retry_state =
1405 static_cast<subchannel_call_retry_state*>(
1406 subchannel_call->GetParentData());
1407 if (batch.send_initial_metadata) {
1408 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
1410 if (batch.send_trailing_metadata) {
1411 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
1413 if (batch.recv_initial_metadata) {
1414 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
1416 if (batch.recv_trailing_metadata) {
1417 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
1419 subchannel_call.reset();
1420 call_data* calld = static_cast<call_data*>(elem->call_data);
1421 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1426 // Creates a subchannel_batch_data object on the call's arena with the
1427 // specified refcount. If set_on_complete is true, the batch's
1428 // on_complete callback will be set to point to on_complete();
1429 // otherwise, the batch's on_complete callback will be null.
1430 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1432 bool set_on_complete) {
1433 call_data* calld = static_cast<call_data*>(elem->call_data);
1434 subchannel_batch_data* batch_data =
1435 new (gpr_arena_alloc(calld->arena, sizeof(*batch_data)))
1436 subchannel_batch_data(elem, calld, refcount, set_on_complete);
1440 static void batch_data_unref(subchannel_batch_data* batch_data) {
1441 if (gpr_unref(&batch_data->refs)) {
1442 batch_data->destroy();
1447 // recv_initial_metadata callback handling
1450 // Invokes recv_initial_metadata_ready for a subchannel batch.
1451 static void invoke_recv_initial_metadata_callback(void* arg,
1452 grpc_error* error) {
1453 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1454 // Find pending batch.
1455 pending_batch* pending = pending_batch_find(
1456 batch_data->elem, "invoking recv_initial_metadata_ready for",
1457 [](grpc_transport_stream_op_batch* batch) {
1458 return batch->recv_initial_metadata &&
1459 batch->payload->recv_initial_metadata
1460 .recv_initial_metadata_ready != nullptr;
1462 GPR_ASSERT(pending != nullptr);
1464 subchannel_call_retry_state* retry_state =
1465 static_cast<subchannel_call_retry_state*>(
1466 batch_data->subchannel_call->GetParentData());
1467 grpc_metadata_batch_move(
1468 &retry_state->recv_initial_metadata,
1469 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1470 // Update bookkeeping.
1471 // Note: Need to do this before invoking the callback, since invoking
1472 // the callback will result in yielding the call combiner.
1473 grpc_closure* recv_initial_metadata_ready =
1474 pending->batch->payload->recv_initial_metadata
1475 .recv_initial_metadata_ready;
1476 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1478 maybe_clear_pending_batch(batch_data->elem, pending);
1479 batch_data_unref(batch_data);
1481 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1484 // Intercepts recv_initial_metadata_ready callback for retries.
1485 // Commits the call and returns the initial metadata up the stack.
1486 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1487 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1488 grpc_call_element* elem = batch_data->elem;
1489 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1490 call_data* calld = static_cast<call_data*>(elem->call_data);
1491 if (grpc_client_channel_call_trace.enabled()) {
1493 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1494 chand, calld, grpc_error_string(error));
1496 subchannel_call_retry_state* retry_state =
1497 static_cast<subchannel_call_retry_state*>(
1498 batch_data->subchannel_call->GetParentData());
1499 retry_state->completed_recv_initial_metadata = true;
1500 // If a retry was already dispatched, then we're not going to use the
1501 // result of this recv_initial_metadata op, so do nothing.
1502 if (retry_state->retry_dispatched) {
1503 GRPC_CALL_COMBINER_STOP(
1504 calld->call_combiner,
1505 "recv_initial_metadata_ready after retry dispatched");
1508 // If we got an error or a Trailers-Only response and have not yet gotten
1509 // the recv_trailing_metadata_ready callback, then defer propagating this
1510 // callback back to the surface. We can evaluate whether to retry when
1511 // recv_trailing_metadata comes back.
1512 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
1513 error != GRPC_ERROR_NONE) &&
1514 !retry_state->completed_recv_trailing_metadata)) {
1515 if (grpc_client_channel_call_trace.enabled()) {
1517 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1521 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
1522 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1523 if (!retry_state->started_recv_trailing_metadata) {
1524 // recv_trailing_metadata not yet started by application; start it
1525 // ourselves to get status.
1526 start_internal_recv_trailing_metadata(elem);
1528 GRPC_CALL_COMBINER_STOP(
1529 calld->call_combiner,
1530 "recv_initial_metadata_ready trailers-only or error");
1534 // Received valid initial metadata, so commit the call.
1535 retry_commit(elem, retry_state);
1536 // Invoke the callback to return the result to the surface.
1537 // Manually invoking a callback function; it does not take ownership of error.
1538 invoke_recv_initial_metadata_callback(batch_data, error);
1542 // recv_message callback handling
1545 // Invokes recv_message_ready for a subchannel batch.
1546 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1547 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1549 pending_batch* pending = pending_batch_find(
1550 batch_data->elem, "invoking recv_message_ready for",
1551 [](grpc_transport_stream_op_batch* batch) {
1552 return batch->recv_message &&
1553 batch->payload->recv_message.recv_message_ready != nullptr;
1555 GPR_ASSERT(pending != nullptr);
1557 subchannel_call_retry_state* retry_state =
1558 static_cast<subchannel_call_retry_state*>(
1559 batch_data->subchannel_call->GetParentData());
1560 *pending->batch->payload->recv_message.recv_message =
1561 std::move(retry_state->recv_message);
1562 // Update bookkeeping.
1563 // Note: Need to do this before invoking the callback, since invoking
1564 // the callback will result in yielding the call combiner.
1565 grpc_closure* recv_message_ready =
1566 pending->batch->payload->recv_message.recv_message_ready;
1567 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1568 maybe_clear_pending_batch(batch_data->elem, pending);
1569 batch_data_unref(batch_data);
1571 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1574 // Intercepts recv_message_ready callback for retries.
1575 // Commits the call and returns the message up the stack.
1576 static void recv_message_ready(void* arg, grpc_error* error) {
1577 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1578 grpc_call_element* elem = batch_data->elem;
1579 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1580 call_data* calld = static_cast<call_data*>(elem->call_data);
1581 if (grpc_client_channel_call_trace.enabled()) {
1582 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1583 chand, calld, grpc_error_string(error));
1585 subchannel_call_retry_state* retry_state =
1586 static_cast<subchannel_call_retry_state*>(
1587 batch_data->subchannel_call->GetParentData());
1588 ++retry_state->completed_recv_message_count;
1589 // If a retry was already dispatched, then we're not going to use the
1590 // result of this recv_message op, so do nothing.
1591 if (retry_state->retry_dispatched) {
1592 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1593 "recv_message_ready after retry dispatched");
1596 // If we got an error or the payload was nullptr and we have not yet gotten
1597 // the recv_trailing_metadata_ready callback, then defer propagating this
1598 // callback back to the surface. We can evaluate whether to retry when
1599 // recv_trailing_metadata comes back.
1601 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1602 !retry_state->completed_recv_trailing_metadata)) {
1603 if (grpc_client_channel_call_trace.enabled()) {
1605 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1606 "message and recv_trailing_metadata pending)",
1609 retry_state->recv_message_ready_deferred_batch = batch_data;
1610 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1611 if (!retry_state->started_recv_trailing_metadata) {
1612 // recv_trailing_metadata not yet started by application; start it
1613 // ourselves to get status.
1614 start_internal_recv_trailing_metadata(elem);
1616 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1620 // Received a valid message, so commit the call.
1621 retry_commit(elem, retry_state);
1622 // Invoke the callback to return the result to the surface.
1623 // Manually invoking a callback function; it does not take ownership of error.
1624 invoke_recv_message_callback(batch_data, error);
1628 // recv_trailing_metadata handling
1631 // Sets *status and *server_pushback_md based on md_batch and error.
1632 // Only sets *server_pushback_md if server_pushback_md != nullptr.
1633 static void get_call_status(grpc_call_element* elem,
1634 grpc_metadata_batch* md_batch, grpc_error* error,
1635 grpc_status_code* status,
1636 grpc_mdelem** server_pushback_md) {
1637 call_data* calld = static_cast<call_data*>(elem->call_data);
1638 if (error != GRPC_ERROR_NONE) {
1639 grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1642 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1644 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1645 if (server_pushback_md != nullptr &&
1646 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1647 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1650 GRPC_ERROR_UNREF(error);
1653 // Adds recv_trailing_metadata_ready closure to closures.
1654 static void add_closure_for_recv_trailing_metadata_ready(
1655 grpc_call_element* elem, subchannel_batch_data* batch_data,
1656 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1657 // Find pending batch.
1658 pending_batch* pending = pending_batch_find(
1659 elem, "invoking recv_trailing_metadata for",
1660 [](grpc_transport_stream_op_batch* batch) {
1661 return batch->recv_trailing_metadata &&
1662 batch->payload->recv_trailing_metadata
1663 .recv_trailing_metadata_ready != nullptr;
1665 // If we generated the recv_trailing_metadata op internally via
1666 // start_internal_recv_trailing_metadata(), then there will be no
1668 if (pending == nullptr) {
1669 GRPC_ERROR_UNREF(error);
1673 subchannel_call_retry_state* retry_state =
1674 static_cast<subchannel_call_retry_state*>(
1675 batch_data->subchannel_call->GetParentData());
1676 grpc_metadata_batch_move(
1677 &retry_state->recv_trailing_metadata,
1678 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1680 closures->Add(pending->batch->payload->recv_trailing_metadata
1681 .recv_trailing_metadata_ready,
1682 error, "recv_trailing_metadata_ready for pending batch");
1683 // Update bookkeeping.
1684 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1686 maybe_clear_pending_batch(elem, pending);
1689 // Adds any necessary closures for deferred recv_initial_metadata and
1690 // recv_message callbacks to closures.
1691 static void add_closures_for_deferred_recv_callbacks(
1692 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1693 grpc_core::CallCombinerClosureList* closures) {
1694 if (batch_data->batch.recv_trailing_metadata) {
1695 // Add closure for deferred recv_initial_metadata_ready.
1696 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1698 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1699 invoke_recv_initial_metadata_callback,
1700 retry_state->recv_initial_metadata_ready_deferred_batch,
1701 grpc_schedule_on_exec_ctx);
1702 closures->Add(&retry_state->recv_initial_metadata_ready,
1703 retry_state->recv_initial_metadata_error,
1704 "resuming recv_initial_metadata_ready");
1705 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1707 // Add closure for deferred recv_message_ready.
1708 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1710 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
1711 invoke_recv_message_callback,
1712 retry_state->recv_message_ready_deferred_batch,
1713 grpc_schedule_on_exec_ctx);
1714 closures->Add(&retry_state->recv_message_ready,
1715 retry_state->recv_message_error,
1716 "resuming recv_message_ready");
1717 retry_state->recv_message_ready_deferred_batch = nullptr;
1722 // Returns true if any op in the batch was not yet started.
1723 // Only looks at send ops, since recv ops are always started immediately.
1724 static bool pending_batch_is_unstarted(
1725 pending_batch* pending, call_data* calld,
1726 subchannel_call_retry_state* retry_state) {
1727 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1730 if (pending->batch->send_initial_metadata &&
1731 !retry_state->started_send_initial_metadata) {
1734 if (pending->batch->send_message &&
1735 retry_state->started_send_message_count < calld->send_messages.size()) {
1738 if (pending->batch->send_trailing_metadata &&
1739 !retry_state->started_send_trailing_metadata) {
1745 // For any pending batch containing an op that has not yet been started,
1746 // adds the pending batch's completion closures to closures.
1747 static void add_closures_to_fail_unstarted_pending_batches(
1748 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1749 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1750 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1751 call_data* calld = static_cast<call_data*>(elem->call_data);
1752 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1753 pending_batch* pending = &calld->pending_batches[i];
1754 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1755 if (grpc_client_channel_call_trace.enabled()) {
1757 "chand=%p calld=%p: failing unstarted pending batch at index "
1761 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1762 "failing on_complete for pending batch");
1763 pending->batch->on_complete = nullptr;
1764 maybe_clear_pending_batch(elem, pending);
1767 GRPC_ERROR_UNREF(error);
1770 // Runs necessary closures upon completion of a call attempt.
1771 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1772 grpc_error* error) {
1773 grpc_call_element* elem = batch_data->elem;
1774 call_data* calld = static_cast<call_data*>(elem->call_data);
1775 subchannel_call_retry_state* retry_state =
1776 static_cast<subchannel_call_retry_state*>(
1777 batch_data->subchannel_call->GetParentData());
1778 // Construct list of closures to execute.
1779 grpc_core::CallCombinerClosureList closures;
1780 // First, add closure for recv_trailing_metadata_ready.
1781 add_closure_for_recv_trailing_metadata_ready(
1782 elem, batch_data, GRPC_ERROR_REF(error), &closures);
1783 // If there are deferred recv_initial_metadata_ready or recv_message_ready
1784 // callbacks, add them to closures.
1785 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1786 // Add closures to fail any pending batches that have not yet been started.
1787 add_closures_to_fail_unstarted_pending_batches(
1788 elem, retry_state, GRPC_ERROR_REF(error), &closures);
1789 // Don't need batch_data anymore.
1790 batch_data_unref(batch_data);
1791 // Schedule all of the closures identified above.
1792 // Note: This will release the call combiner.
1793 closures.RunClosures(calld->call_combiner);
1794 GRPC_ERROR_UNREF(error);
1797 // Intercepts recv_trailing_metadata_ready callback for retries.
1798 // Commits the call and returns the trailing metadata up the stack.
1799 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1800 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1801 grpc_call_element* elem = batch_data->elem;
1802 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1803 call_data* calld = static_cast<call_data*>(elem->call_data);
1804 if (grpc_client_channel_call_trace.enabled()) {
1806 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1807 chand, calld, grpc_error_string(error));
1809 subchannel_call_retry_state* retry_state =
1810 static_cast<subchannel_call_retry_state*>(
1811 batch_data->subchannel_call->GetParentData());
1812 retry_state->completed_recv_trailing_metadata = true;
1813 // Get the call's status and check for server pushback metadata.
1814 grpc_status_code status = GRPC_STATUS_OK;
1815 grpc_mdelem* server_pushback_md = nullptr;
1816 grpc_metadata_batch* md_batch =
1817 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
1818 get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
1819 &server_pushback_md);
1820 if (grpc_client_channel_call_trace.enabled()) {
1821 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1822 calld, grpc_status_code_to_string(status));
1824 // Check if we should retry.
1825 if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1826 // Unref batch_data for deferred recv_initial_metadata_ready or
1827 // recv_message_ready callbacks, if any.
1828 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1829 batch_data_unref(batch_data);
1830 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1832 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
1833 batch_data_unref(batch_data);
1834 GRPC_ERROR_UNREF(retry_state->recv_message_error);
1836 batch_data_unref(batch_data);
1839 // Not retrying, so commit the call.
1840 retry_commit(elem, retry_state);
1841 // Run any necessary closures.
1842 run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
1846 // on_complete callback handling
1849 // Adds the on_complete closure for the pending batch completed in
1850 // batch_data to closures.
1851 static void add_closure_for_completed_pending_batch(
1852 grpc_call_element* elem, subchannel_batch_data* batch_data,
1853 subchannel_call_retry_state* retry_state, grpc_error* error,
1854 grpc_core::CallCombinerClosureList* closures) {
1855 pending_batch* pending = pending_batch_find(
1856 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
1857 // Match the pending batch with the same set of send ops as the
1858 // subchannel batch we've just completed.
1859 return batch->on_complete != nullptr &&
1860 batch_data->batch.send_initial_metadata ==
1861 batch->send_initial_metadata &&
1862 batch_data->batch.send_message == batch->send_message &&
1863 batch_data->batch.send_trailing_metadata ==
1864 batch->send_trailing_metadata;
1866 // If batch_data is a replay batch, then there will be no pending
1867 // batch to complete.
1868 if (pending == nullptr) {
1869 GRPC_ERROR_UNREF(error);
1873 closures->Add(pending->batch->on_complete, error,
1874 "on_complete for pending batch");
1875 pending->batch->on_complete = nullptr;
1876 maybe_clear_pending_batch(elem, pending);
1879 // If there are any cached ops to replay or pending ops to start on the
1880 // subchannel call, adds a closure to closures to invoke
1881 // start_retriable_subchannel_batches().
1882 static void add_closures_for_replay_or_pending_send_ops(
1883 grpc_call_element* elem, subchannel_batch_data* batch_data,
1884 subchannel_call_retry_state* retry_state,
1885 grpc_core::CallCombinerClosureList* closures) {
1886 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1887 call_data* calld = static_cast<call_data*>(elem->call_data);
1888 bool have_pending_send_message_ops =
1889 retry_state->started_send_message_count < calld->send_messages.size();
1890 bool have_pending_send_trailing_metadata_op =
1891 calld->seen_send_trailing_metadata &&
1892 !retry_state->started_send_trailing_metadata;
1893 if (!have_pending_send_message_ops &&
1894 !have_pending_send_trailing_metadata_op) {
1895 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1896 pending_batch* pending = &calld->pending_batches[i];
1897 grpc_transport_stream_op_batch* batch = pending->batch;
1898 if (batch == nullptr || pending->send_ops_cached) continue;
1899 if (batch->send_message) have_pending_send_message_ops = true;
1900 if (batch->send_trailing_metadata) {
1901 have_pending_send_trailing_metadata_op = true;
1905 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1906 if (grpc_client_channel_call_trace.enabled()) {
1908 "chand=%p calld=%p: starting next batch for pending send op(s)",
1911 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
1912 start_retriable_subchannel_batches, elem,
1913 grpc_schedule_on_exec_ctx);
1914 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
1915 "starting next batch for send_* op(s)");
1919 // Callback used to intercept on_complete from subchannel calls.
1920 // Called only when retries are enabled.
1921 static void on_complete(void* arg, grpc_error* error) {
1922 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1923 grpc_call_element* elem = batch_data->elem;
1924 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1925 call_data* calld = static_cast<call_data*>(elem->call_data);
1926 if (grpc_client_channel_call_trace.enabled()) {
1927 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
1928 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1929 chand, calld, grpc_error_string(error), batch_str);
1930 gpr_free(batch_str);
1932 subchannel_call_retry_state* retry_state =
1933 static_cast<subchannel_call_retry_state*>(
1934 batch_data->subchannel_call->GetParentData());
1935 // Update bookkeeping in retry_state.
1936 if (batch_data->batch.send_initial_metadata) {
1937 retry_state->completed_send_initial_metadata = true;
1939 if (batch_data->batch.send_message) {
1940 ++retry_state->completed_send_message_count;
1942 if (batch_data->batch.send_trailing_metadata) {
1943 retry_state->completed_send_trailing_metadata = true;
1945 // If the call is committed, free cached data for send ops that we've just
1947 if (calld->retry_committed) {
1948 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
1950 // Construct list of closures to execute.
1951 grpc_core::CallCombinerClosureList closures;
1952 // If a retry was already dispatched, that means we saw
1953 // recv_trailing_metadata before this, so we do nothing here.
1954 // Otherwise, invoke the callback to return the result to the surface.
1955 if (!retry_state->retry_dispatched) {
1956 // Add closure for the completed pending batch, if any.
1957 add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
1958 GRPC_ERROR_REF(error), &closures);
1959 // If needed, add a callback to start any replay or pending send ops on
1960 // the subchannel call.
1961 if (!retry_state->completed_recv_trailing_metadata) {
1962 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
1966 // Track number of pending subchannel send batches and determine if this
1967 // was the last one.
1968 --calld->num_pending_retriable_subchannel_send_batches;
1969 const bool last_send_batch_complete =
1970 calld->num_pending_retriable_subchannel_send_batches == 0;
1971 // Don't need batch_data anymore.
1972 batch_data_unref(batch_data);
1973 // Schedule all of the closures identified above.
1974 // Note: This yeilds the call combiner.
1975 closures.RunClosures(calld->call_combiner);
1976 // If this was the last subchannel send batch, unref the call stack.
1977 if (last_send_batch_complete) {
1978 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
1983 // subchannel batch construction
1986 // Helper function used to start a subchannel batch in the call combiner.
1987 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
1988 grpc_transport_stream_op_batch* batch =
1989 static_cast<grpc_transport_stream_op_batch*>(arg);
1990 grpc_core::SubchannelCall* subchannel_call =
1991 static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
1992 // Note: This will release the call combiner.
1993 subchannel_call->StartTransportStreamOpBatch(batch);
1996 // Adds a closure to closures that will execute batch in the call combiner.
1997 static void add_closure_for_subchannel_batch(
1998 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
1999 grpc_core::CallCombinerClosureList* closures) {
2000 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2001 call_data* calld = static_cast<call_data*>(elem->call_data);
2002 batch->handler_private.extra_arg = calld->subchannel_call.get();
2003 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2004 start_batch_in_call_combiner, batch,
2005 grpc_schedule_on_exec_ctx);
2006 if (grpc_client_channel_call_trace.enabled()) {
2007 char* batch_str = grpc_transport_stream_op_batch_string(batch);
2008 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2010 gpr_free(batch_str);
2012 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2013 "start_subchannel_batch");
2016 // Adds retriable send_initial_metadata op to batch_data.
2017 static void add_retriable_send_initial_metadata_op(
2018 call_data* calld, subchannel_call_retry_state* retry_state,
2019 subchannel_batch_data* batch_data) {
2020 // Maps the number of retries to the corresponding metadata value slice.
2021 static const grpc_slice* retry_count_strings[] = {
2022 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2023 // We need to make a copy of the metadata batch for each attempt, since
2024 // the filters in the subchannel stack may modify this batch, and we don't
2025 // want those modifications to be passed forward to subsequent attempts.
2027 // If we've already completed one or more attempts, add the
2028 // grpc-retry-attempts header.
2029 retry_state->send_initial_metadata_storage =
2030 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2031 calld->arena, sizeof(grpc_linked_mdelem) *
2032 (calld->send_initial_metadata.list.count +
2033 (calld->num_attempts_completed > 0))));
2034 grpc_metadata_batch_copy(&calld->send_initial_metadata,
2035 &retry_state->send_initial_metadata,
2036 retry_state->send_initial_metadata_storage);
2037 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2038 .grpc_previous_rpc_attempts != nullptr)) {
2039 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2040 retry_state->send_initial_metadata.idx.named
2041 .grpc_previous_rpc_attempts);
2043 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
2044 grpc_mdelem retry_md = grpc_mdelem_create(
2045 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2046 *retry_count_strings[calld->num_attempts_completed - 1], nullptr);
2047 grpc_error* error = grpc_metadata_batch_add_tail(
2048 &retry_state->send_initial_metadata,
2049 &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
2052 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2053 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2054 grpc_error_string(error));
2058 retry_state->started_send_initial_metadata = true;
2059 batch_data->batch.send_initial_metadata = true;
2060 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2061 &retry_state->send_initial_metadata;
2062 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2063 calld->send_initial_metadata_flags;
2064 batch_data->batch.payload->send_initial_metadata.peer_string =
2068 // Adds retriable send_message op to batch_data.
2069 static void add_retriable_send_message_op(
2070 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2071 subchannel_batch_data* batch_data) {
2072 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2073 call_data* calld = static_cast<call_data*>(elem->call_data);
2074 if (grpc_client_channel_call_trace.enabled()) {
2076 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2077 chand, calld, retry_state->started_send_message_count);
2079 grpc_core::ByteStreamCache* cache =
2080 calld->send_messages[retry_state->started_send_message_count];
2081 ++retry_state->started_send_message_count;
2082 retry_state->send_message.Init(cache);
2083 batch_data->batch.send_message = true;
2084 batch_data->batch.payload->send_message.send_message.reset(
2085 retry_state->send_message.get());
2088 // Adds retriable send_trailing_metadata op to batch_data.
2089 static void add_retriable_send_trailing_metadata_op(
2090 call_data* calld, subchannel_call_retry_state* retry_state,
2091 subchannel_batch_data* batch_data) {
2092 // We need to make a copy of the metadata batch for each attempt, since
2093 // the filters in the subchannel stack may modify this batch, and we don't
2094 // want those modifications to be passed forward to subsequent attempts.
2095 retry_state->send_trailing_metadata_storage =
2096 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2097 calld->arena, sizeof(grpc_linked_mdelem) *
2098 calld->send_trailing_metadata.list.count));
2099 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2100 &retry_state->send_trailing_metadata,
2101 retry_state->send_trailing_metadata_storage);
2102 retry_state->started_send_trailing_metadata = true;
2103 batch_data->batch.send_trailing_metadata = true;
2104 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2105 &retry_state->send_trailing_metadata;
2108 // Adds retriable recv_initial_metadata op to batch_data.
2109 static void add_retriable_recv_initial_metadata_op(
2110 call_data* calld, subchannel_call_retry_state* retry_state,
2111 subchannel_batch_data* batch_data) {
2112 retry_state->started_recv_initial_metadata = true;
2113 batch_data->batch.recv_initial_metadata = true;
2114 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2115 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2116 &retry_state->recv_initial_metadata;
2117 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2118 &retry_state->trailing_metadata_available;
2119 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2120 recv_initial_metadata_ready, batch_data,
2121 grpc_schedule_on_exec_ctx);
2122 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2123 &retry_state->recv_initial_metadata_ready;
2126 // Adds retriable recv_message op to batch_data.
2127 static void add_retriable_recv_message_op(
2128 call_data* calld, subchannel_call_retry_state* retry_state,
2129 subchannel_batch_data* batch_data) {
2130 ++retry_state->started_recv_message_count;
2131 batch_data->batch.recv_message = true;
2132 batch_data->batch.payload->recv_message.recv_message =
2133 &retry_state->recv_message;
2134 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
2135 batch_data, grpc_schedule_on_exec_ctx);
2136 batch_data->batch.payload->recv_message.recv_message_ready =
2137 &retry_state->recv_message_ready;
2140 // Adds retriable recv_trailing_metadata op to batch_data.
2141 static void add_retriable_recv_trailing_metadata_op(
2142 call_data* calld, subchannel_call_retry_state* retry_state,
2143 subchannel_batch_data* batch_data) {
2144 retry_state->started_recv_trailing_metadata = true;
2145 batch_data->batch.recv_trailing_metadata = true;
2146 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
2147 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2148 &retry_state->recv_trailing_metadata;
2149 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
2150 &retry_state->collect_stats;
2151 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
2152 recv_trailing_metadata_ready, batch_data,
2153 grpc_schedule_on_exec_ctx);
2154 batch_data->batch.payload->recv_trailing_metadata
2155 .recv_trailing_metadata_ready =
2156 &retry_state->recv_trailing_metadata_ready;
2157 maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
2158 &batch_data->batch);
2161 // Helper function used to start a recv_trailing_metadata batch. This
2162 // is used in the case where a recv_initial_metadata or recv_message
2163 // op fails in a way that we know the call is over but when the application
2164 // has not yet started its own recv_trailing_metadata op.
2165 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2166 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2167 call_data* calld = static_cast<call_data*>(elem->call_data);
2168 if (grpc_client_channel_call_trace.enabled()) {
2170 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2171 "started; starting it internally",
2174 subchannel_call_retry_state* retry_state =
2175 static_cast<subchannel_call_retry_state*>(
2176 calld->subchannel_call->GetParentData());
2177 // Create batch_data with 2 refs, since this batch will be unreffed twice:
2178 // once for the recv_trailing_metadata_ready callback when the subchannel
2179 // batch returns, and again when we actually get a recv_trailing_metadata
2180 // op from the surface.
2181 subchannel_batch_data* batch_data =
2182 batch_data_create(elem, 2, false /* set_on_complete */);
2183 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2184 retry_state->recv_trailing_metadata_internal_batch = batch_data;
2185 // Note: This will release the call combiner.
2186 calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch);
2189 // If there are any cached send ops that need to be replayed on the
2190 // current subchannel call, creates and returns a new subchannel batch
2191 // to replay those ops. Otherwise, returns nullptr.
2192 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2193 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2194 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2195 call_data* calld = static_cast<call_data*>(elem->call_data);
2196 subchannel_batch_data* replay_batch_data = nullptr;
2197 // send_initial_metadata.
2198 if (calld->seen_send_initial_metadata &&
2199 !retry_state->started_send_initial_metadata &&
2200 !calld->pending_send_initial_metadata) {
2201 if (grpc_client_channel_call_trace.enabled()) {
2203 "chand=%p calld=%p: replaying previously completed "
2204 "send_initial_metadata op",
2207 replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
2208 add_retriable_send_initial_metadata_op(calld, retry_state,
2212 // Note that we can only have one send_message op in flight at a time.
2213 if (retry_state->started_send_message_count < calld->send_messages.size() &&
2214 retry_state->started_send_message_count ==
2215 retry_state->completed_send_message_count &&
2216 !calld->pending_send_message) {
2217 if (grpc_client_channel_call_trace.enabled()) {
2219 "chand=%p calld=%p: replaying previously completed "
2223 if (replay_batch_data == nullptr) {
2225 batch_data_create(elem, 1, true /* set_on_complete */);
2227 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2229 // send_trailing_metadata.
2230 // Note that we only add this op if we have no more send_message ops
2231 // to start, since we can't send down any more send_message ops after
2232 // send_trailing_metadata.
2233 if (calld->seen_send_trailing_metadata &&
2234 retry_state->started_send_message_count == calld->send_messages.size() &&
2235 !retry_state->started_send_trailing_metadata &&
2236 !calld->pending_send_trailing_metadata) {
2237 if (grpc_client_channel_call_trace.enabled()) {
2239 "chand=%p calld=%p: replaying previously completed "
2240 "send_trailing_metadata op",
2243 if (replay_batch_data == nullptr) {
2245 batch_data_create(elem, 1, true /* set_on_complete */);
2247 add_retriable_send_trailing_metadata_op(calld, retry_state,
2250 return replay_batch_data;
2253 // Adds subchannel batches for pending batches to batches, updating
2254 // *num_batches as needed.
2255 static void add_subchannel_batches_for_pending_batches(
2256 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2257 grpc_core::CallCombinerClosureList* closures) {
2258 call_data* calld = static_cast<call_data*>(elem->call_data);
2259 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2260 pending_batch* pending = &calld->pending_batches[i];
2261 grpc_transport_stream_op_batch* batch = pending->batch;
2262 if (batch == nullptr) continue;
2263 // Skip any batch that either (a) has already been started on this
2264 // subchannel call or (b) we can't start yet because we're still
2265 // replaying send ops that need to be completed first.
2266 // TODO(roth): Note that if any one op in the batch can't be sent
2267 // yet due to ops that we're replaying, we don't start any of the ops
2268 // in the batch. This is probably okay, but it could conceivably
2269 // lead to increased latency in some cases -- e.g., we could delay
2270 // starting a recv op due to it being in the same batch with a send
2271 // op. If/when we revamp the callback protocol in
2272 // transport_stream_op_batch, we may be able to fix this.
2273 if (batch->send_initial_metadata &&
2274 retry_state->started_send_initial_metadata) {
2277 if (batch->send_message && retry_state->completed_send_message_count <
2278 retry_state->started_send_message_count) {
2281 // Note that we only start send_trailing_metadata if we have no more
2282 // send_message ops to start, since we can't send down any more
2283 // send_message ops after send_trailing_metadata.
2284 if (batch->send_trailing_metadata &&
2285 (retry_state->started_send_message_count + batch->send_message <
2286 calld->send_messages.size() ||
2287 retry_state->started_send_trailing_metadata)) {
2290 if (batch->recv_initial_metadata &&
2291 retry_state->started_recv_initial_metadata) {
2294 if (batch->recv_message && retry_state->completed_recv_message_count <
2295 retry_state->started_recv_message_count) {
2298 if (batch->recv_trailing_metadata &&
2299 retry_state->started_recv_trailing_metadata) {
2300 // If we previously completed a recv_trailing_metadata op
2301 // initiated by start_internal_recv_trailing_metadata(), use the
2302 // result of that instead of trying to re-start this op.
2303 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2305 // If the batch completed, then trigger the completion callback
2306 // directly, so that we return the previously returned results to
2307 // the application. Otherwise, just unref the internally
2308 // started subchannel batch, since we'll propagate the
2309 // completion when it completes.
2310 if (retry_state->completed_recv_trailing_metadata) {
2311 // Batches containing recv_trailing_metadata always succeed.
2313 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
2314 "re-executing recv_trailing_metadata_ready to propagate "
2315 "internally triggered result");
2317 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2319 retry_state->recv_trailing_metadata_internal_batch = nullptr;
2323 // If we're not retrying, just send the batch as-is.
2324 if (calld->method_params == nullptr ||
2325 calld->method_params->retry_policy() == nullptr ||
2326 calld->retry_committed) {
2327 add_closure_for_subchannel_batch(elem, batch, closures);
2328 pending_batch_clear(calld, pending);
2331 // Create batch with the right number of callbacks.
2332 const bool has_send_ops = batch->send_initial_metadata ||
2333 batch->send_message ||
2334 batch->send_trailing_metadata;
2335 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2336 batch->recv_message +
2337 batch->recv_trailing_metadata;
2338 subchannel_batch_data* batch_data = batch_data_create(
2339 elem, num_callbacks, has_send_ops /* set_on_complete */);
2340 // Cache send ops if needed.
2341 maybe_cache_send_ops_for_batch(calld, pending);
2342 // send_initial_metadata.
2343 if (batch->send_initial_metadata) {
2344 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2347 if (batch->send_message) {
2348 add_retriable_send_message_op(elem, retry_state, batch_data);
2350 // send_trailing_metadata.
2351 if (batch->send_trailing_metadata) {
2352 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2354 // recv_initial_metadata.
2355 if (batch->recv_initial_metadata) {
2356 // recv_flags is only used on the server side.
2357 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2358 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2361 if (batch->recv_message) {
2362 add_retriable_recv_message_op(calld, retry_state, batch_data);
2364 // recv_trailing_metadata.
2365 if (batch->recv_trailing_metadata) {
2366 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2368 add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
2369 // Track number of pending subchannel send batches.
2370 // If this is the first one, take a ref to the call stack.
2371 if (batch->send_initial_metadata || batch->send_message ||
2372 batch->send_trailing_metadata) {
2373 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2374 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2376 ++calld->num_pending_retriable_subchannel_send_batches;
2381 // Constructs and starts whatever subchannel batches are needed on the
2383 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2384 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2385 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2386 call_data* calld = static_cast<call_data*>(elem->call_data);
2387 if (grpc_client_channel_call_trace.enabled()) {
2388 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
2391 subchannel_call_retry_state* retry_state =
2392 static_cast<subchannel_call_retry_state*>(
2393 calld->subchannel_call->GetParentData());
2394 // Construct list of closures to execute, one for each pending batch.
2395 grpc_core::CallCombinerClosureList closures;
2396 // Replay previously-returned send_* ops if needed.
2397 subchannel_batch_data* replay_batch_data =
2398 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2399 if (replay_batch_data != nullptr) {
2400 add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2402 // Track number of pending subchannel send batches.
2403 // If this is the first one, take a ref to the call stack.
2404 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2405 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2407 ++calld->num_pending_retriable_subchannel_send_batches;
2409 // Now add pending batches.
2410 add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
2411 // Start batches on subchannel call.
2412 if (grpc_client_channel_call_trace.enabled()) {
2414 "chand=%p calld=%p: starting %" PRIuPTR
2415 " retriable batches on subchannel_call=%p",
2416 chand, calld, closures.size(), calld->subchannel_call.get());
2418 // Note: This will yield the call combiner.
2419 closures.RunClosures(calld->call_combiner);
2426 static void create_subchannel_call(grpc_call_element* elem) {
2427 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2428 call_data* calld = static_cast<call_data*>(elem->call_data);
2429 const size_t parent_data_size =
2430 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
2431 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
2432 calld->pollent, // pollent
2433 calld->path, // path
2434 calld->call_start_time, // start_time
2435 calld->deadline, // deadline
2436 calld->arena, // arena
2437 // TODO(roth): When we implement hedging support, we will probably
2438 // need to use a separate call context for each subchannel call.
2439 calld->call_context, // context
2440 calld->call_combiner, // call_combiner
2441 parent_data_size // parent_data_size
2443 grpc_error* error = GRPC_ERROR_NONE;
2444 calld->subchannel_call =
2445 calld->pick.pick.connected_subchannel->CreateCall(call_args, &error);
2446 if (grpc_client_channel_routing_trace.enabled()) {
2447 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
2448 chand, calld, calld->subchannel_call.get(),
2449 grpc_error_string(error));
2451 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2452 pending_batches_fail(elem, error, yield_call_combiner);
2454 if (parent_data_size > 0) {
2455 new (calld->subchannel_call->GetParentData())
2456 subchannel_call_retry_state(calld->call_context);
2458 pending_batches_resume(elem);
2462 // Invoked when a pick is completed, on both success or failure.
2463 static void pick_done(void* arg, grpc_error* error) {
2464 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2465 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2466 call_data* calld = static_cast<call_data*>(elem->call_data);
2467 if (error != GRPC_ERROR_NONE) {
2468 if (grpc_client_channel_routing_trace.enabled()) {
2470 "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
2471 calld, grpc_error_string(error));
2473 pending_batches_fail(elem, GRPC_ERROR_REF(error), yield_call_combiner);
2476 create_subchannel_call(elem);
2479 namespace grpc_core {
2482 // A class to handle the call combiner cancellation callback for a
2484 class QueuedPickCanceller {
2486 explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
2487 auto* calld = static_cast<call_data*>(elem->call_data);
2488 auto* chand = static_cast<channel_data*>(elem->channel_data);
2489 GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
2490 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2491 grpc_combiner_scheduler(chand->combiner));
2492 grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
2496 static void CancelLocked(void* arg, grpc_error* error) {
2497 auto* self = static_cast<QueuedPickCanceller*>(arg);
2498 auto* chand = static_cast<channel_data*>(self->elem_->channel_data);
2499 auto* calld = static_cast<call_data*>(self->elem_->call_data);
2500 if (grpc_client_channel_routing_trace.enabled()) {
2502 "chand=%p calld=%p: cancelling queued pick: "
2503 "error=%s self=%p calld->pick_canceller=%p",
2504 chand, calld, grpc_error_string(error), self,
2505 calld->pick_canceller);
2507 if (calld->pick_canceller == self && error != GRPC_ERROR_NONE) {
2508 // Remove pick from list of queued picks.
2509 remove_call_from_queued_picks_locked(self->elem_);
2510 // Fail pending batches on the call.
2511 pending_batches_fail(self->elem_, GRPC_ERROR_REF(error),
2512 yield_call_combiner_if_pending_batches_found);
2514 GRPC_CALL_STACK_UNREF(calld->owning_call, "QueuedPickCanceller");
2518 grpc_call_element* elem_;
2519 grpc_closure closure_;
2523 } // namespace grpc_core
2525 // Removes the call from the channel's list of queued picks.
2526 static void remove_call_from_queued_picks_locked(grpc_call_element* elem) {
2527 auto* chand = static_cast<channel_data*>(elem->channel_data);
2528 auto* calld = static_cast<call_data*>(elem->call_data);
2529 for (QueuedPick** pick = &chand->queued_picks; *pick != nullptr;
2530 pick = &(*pick)->next) {
2531 if (*pick == &calld->pick) {
2532 if (grpc_client_channel_routing_trace.enabled()) {
2533 gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
2536 calld->pick_queued = false;
2537 *pick = calld->pick.next;
2538 // Remove call's pollent from channel's interested_parties.
2539 grpc_polling_entity_del_from_pollset_set(calld->pollent,
2540 chand->interested_parties);
2541 // Lame the call combiner canceller.
2542 calld->pick_canceller = nullptr;
2548 // Adds the call to the channel's list of queued picks.
2549 static void add_call_to_queued_picks_locked(grpc_call_element* elem) {
2550 auto* chand = static_cast<channel_data*>(elem->channel_data);
2551 auto* calld = static_cast<call_data*>(elem->call_data);
2552 if (grpc_client_channel_routing_trace.enabled()) {
2553 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
2556 calld->pick_queued = true;
2557 // Add call to queued picks list.
2558 calld->pick.elem = elem;
2559 calld->pick.next = chand->queued_picks;
2560 chand->queued_picks = &calld->pick;
2561 // Add call's pollent to channel's interested_parties, so that I/O
2562 // can be done under the call's CQ.
2563 grpc_polling_entity_add_to_pollset_set(calld->pollent,
2564 chand->interested_parties);
2565 // Register call combiner cancellation callback.
2566 calld->pick_canceller = grpc_core::New<grpc_core::QueuedPickCanceller>(elem);
2569 // Applies service config to the call. Must be invoked once we know
2570 // that the resolver has returned results to the channel.
2571 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2572 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2573 call_data* calld = static_cast<call_data*>(elem->call_data);
2574 if (grpc_client_channel_routing_trace.enabled()) {
2575 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2578 if (chand->retry_throttle_data != nullptr) {
2579 calld->retry_throttle_data = chand->retry_throttle_data->Ref();
2581 if (chand->method_params_table != nullptr) {
2582 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2583 *chand->method_params_table, calld->path);
2584 if (calld->method_params != nullptr) {
2585 // If the deadline from the service config is shorter than the one
2586 // from the client API, reset the deadline timer.
2587 if (chand->deadline_checking_enabled &&
2588 calld->method_params->timeout() != 0) {
2589 const grpc_millis per_method_deadline =
2590 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2591 calld->method_params->timeout();
2592 if (per_method_deadline < calld->deadline) {
2593 calld->deadline = per_method_deadline;
2594 grpc_deadline_state_reset(elem, calld->deadline);
2597 // If the service config set wait_for_ready and the application
2598 // did not explicitly set it, use the value from the service config.
2599 uint32_t* send_initial_metadata_flags =
2600 &calld->pending_batches[0]
2601 .batch->payload->send_initial_metadata
2602 .send_initial_metadata_flags;
2604 calld->method_params->wait_for_ready() !=
2605 ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2606 !(*send_initial_metadata_flags &
2607 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2608 if (calld->method_params->wait_for_ready() ==
2609 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2610 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2612 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2617 // If no retry policy, disable retries.
2618 // TODO(roth): Remove this when adding support for transparent retries.
2619 if (calld->method_params == nullptr ||
2620 calld->method_params->retry_policy() == nullptr) {
2621 calld->enable_retries = false;
2625 // Invoked once resolver results are available.
2626 static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
2627 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2628 call_data* calld = static_cast<call_data*>(elem->call_data);
2629 // Apply service config data to the call only once, and only if the
2630 // channel has the data available.
2631 if (GPR_LIKELY(chand->have_service_config &&
2632 !calld->service_config_applied)) {
2633 calld->service_config_applied = true;
2634 apply_service_config_to_call_locked(elem);
2638 static const char* pick_result_name(LoadBalancingPolicy::PickResult result) {
2640 case LoadBalancingPolicy::PICK_COMPLETE:
2642 case LoadBalancingPolicy::PICK_QUEUE:
2644 case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
2645 return "TRANSIENT_FAILURE";
2647 GPR_UNREACHABLE_CODE(return "UNKNOWN");
2650 static void start_pick_locked(void* arg, grpc_error* error) {
2651 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2652 call_data* calld = static_cast<call_data*>(elem->call_data);
2653 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2654 GPR_ASSERT(calld->pick.pick.connected_subchannel == nullptr);
2655 GPR_ASSERT(calld->subchannel_call == nullptr);
2656 // If this is a retry, use the send_initial_metadata payload that
2657 // we've cached; otherwise, use the pending batch. The
2658 // send_initial_metadata batch will be the first pending batch in the
2659 // list, as set by get_batch_index() above.
2660 // TODO(roth): What if the LB policy needs to add something to the
2661 // call's initial metadata, and then there's a retry? We don't want
2662 // the new metadata to be added twice. We might need to somehow
2663 // allocate the subchannel batch earlier so that we can give the
2664 // subchannel's copy of the metadata batch (which is copied for each
2665 // attempt) to the LB policy instead the one from the parent channel.
2666 calld->pick.pick.initial_metadata =
2667 calld->seen_send_initial_metadata
2668 ? &calld->send_initial_metadata
2669 : calld->pending_batches[0]
2670 .batch->payload->send_initial_metadata.send_initial_metadata;
2671 uint32_t* send_initial_metadata_flags =
2672 calld->seen_send_initial_metadata
2673 ? &calld->send_initial_metadata_flags
2674 : &calld->pending_batches[0]
2675 .batch->payload->send_initial_metadata
2676 .send_initial_metadata_flags;
2677 // Apply service config to call if needed.
2678 maybe_apply_service_config_to_call_locked(elem);
2679 // When done, we schedule this closure to leave the channel combiner.
2680 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2681 grpc_schedule_on_exec_ctx);
2683 error = GRPC_ERROR_NONE;
2684 auto pick_result = chand->picker->Pick(&calld->pick.pick, &error);
2685 if (grpc_client_channel_routing_trace.enabled()) {
2687 "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
2689 chand, calld, pick_result_name(pick_result),
2690 calld->pick.pick.connected_subchannel.get(),
2691 grpc_error_string(error));
2693 switch (pick_result) {
2694 case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
2695 // If we're shutting down, fail all RPCs.
2696 if (chand->disconnect_error != GRPC_ERROR_NONE) {
2697 GRPC_ERROR_UNREF(error);
2698 GRPC_CLOSURE_SCHED(&calld->pick_closure,
2699 GRPC_ERROR_REF(chand->disconnect_error));
2702 // If wait_for_ready is false, then the error indicates the RPC
2703 // attempt's final status.
2704 if ((*send_initial_metadata_flags &
2705 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
2706 // Retry if appropriate; otherwise, fail.
2707 grpc_status_code status = GRPC_STATUS_OK;
2708 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2710 if (!calld->enable_retries ||
2711 !maybe_retry(elem, nullptr /* batch_data */, status,
2712 nullptr /* server_pushback_md */)) {
2713 grpc_error* new_error =
2714 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2715 "Failed to create subchannel", &error, 1);
2716 GRPC_ERROR_UNREF(error);
2717 GRPC_CLOSURE_SCHED(&calld->pick_closure, new_error);
2719 if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
2722 // If wait_for_ready is true, then queue to retry when we get a new
2724 GRPC_ERROR_UNREF(error);
2726 case LoadBalancingPolicy::PICK_QUEUE:
2727 if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
2729 default: // PICK_COMPLETE
2731 if (GPR_UNLIKELY(calld->pick.pick.connected_subchannel == nullptr)) {
2732 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2733 "Call dropped by load balancing policy");
2735 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
2736 if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
2741 // filter call vtable functions
2744 static void cc_start_transport_stream_op_batch(
2745 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2746 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2747 call_data* calld = static_cast<call_data*>(elem->call_data);
2748 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2749 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
2750 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2752 // If we've previously been cancelled, immediately fail any new batches.
2753 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
2754 if (grpc_client_channel_call_trace.enabled()) {
2755 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2756 chand, calld, grpc_error_string(calld->cancel_error));
2758 // Note: This will release the call combiner.
2759 grpc_transport_stream_op_batch_finish_with_failure(
2760 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2763 // Handle cancellation.
2764 if (GPR_UNLIKELY(batch->cancel_stream)) {
2765 // Stash a copy of cancel_error in our call data, so that we can use
2766 // it for subsequent operations. This ensures that if the call is
2767 // cancelled before any batches are passed down (e.g., if the deadline
2768 // is in the past when the call starts), we can return the right
2769 // error to the caller when the first batch does get passed down.
2770 GRPC_ERROR_UNREF(calld->cancel_error);
2771 calld->cancel_error =
2772 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2773 if (grpc_client_channel_call_trace.enabled()) {
2774 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2775 calld, grpc_error_string(calld->cancel_error));
2777 // If we do not have a subchannel call (i.e., a pick has not yet
2778 // been started), fail all pending batches. Otherwise, send the
2779 // cancellation down to the subchannel call.
2780 if (calld->subchannel_call == nullptr) {
2781 // TODO(roth): If there is a pending retry callback, do we need to
2783 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
2784 no_yield_call_combiner);
2785 // Note: This will release the call combiner.
2786 grpc_transport_stream_op_batch_finish_with_failure(
2787 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2789 // Note: This will release the call combiner.
2790 calld->subchannel_call->StartTransportStreamOpBatch(batch);
2794 // Add the batch to the pending list.
2795 pending_batches_add(elem, batch);
2796 // Check if we've already gotten a subchannel call.
2797 // Note that once we have completed the pick, we do not need to enter
2798 // the channel combiner, which is more efficient (especially for
2799 // streaming calls).
2800 if (calld->subchannel_call != nullptr) {
2801 if (grpc_client_channel_call_trace.enabled()) {
2803 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
2804 calld, calld->subchannel_call.get());
2806 pending_batches_resume(elem);
2809 // We do not yet have a subchannel call.
2810 // For batches containing a send_initial_metadata op, enter the channel
2811 // combiner to start a pick.
2812 if (GPR_LIKELY(batch->send_initial_metadata)) {
2813 if (grpc_client_channel_call_trace.enabled()) {
2814 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
2818 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
2819 elem, grpc_combiner_scheduler(chand->combiner)),
2822 // For all other batches, release the call combiner.
2823 if (grpc_client_channel_call_trace.enabled()) {
2825 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2828 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2829 "batch does not include send_initial_metadata");
2833 /* Constructor for call_data */
2834 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
2835 const grpc_call_element_args* args) {
2836 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2837 new (elem->call_data) call_data(elem, *chand, *args);
2838 return GRPC_ERROR_NONE;
2841 /* Destructor for call_data */
2842 static void cc_destroy_call_elem(grpc_call_element* elem,
2843 const grpc_call_final_info* final_info,
2844 grpc_closure* then_schedule_closure) {
2845 call_data* calld = static_cast<call_data*>(elem->call_data);
2846 if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
2847 calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
2848 then_schedule_closure = nullptr;
2850 calld->~call_data();
2851 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
2854 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
2855 grpc_polling_entity* pollent) {
2856 call_data* calld = static_cast<call_data*>(elem->call_data);
2857 calld->pollent = pollent;
2860 /*************************************************************************
2864 const grpc_channel_filter grpc_client_channel_filter = {
2865 cc_start_transport_stream_op_batch,
2866 cc_start_transport_op,
2869 cc_set_pollset_or_pollset_set,
2870 cc_destroy_call_elem,
2871 sizeof(channel_data),
2872 cc_init_channel_elem,
2873 cc_destroy_channel_elem,
2874 cc_get_channel_info,
2878 void grpc_client_channel_set_channelz_node(
2879 grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
2880 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2881 chand->channelz_node = node;
2882 chand->resolving_lb_policy->set_channelz_node(node->Ref());
2885 void grpc_client_channel_populate_child_refs(
2886 grpc_channel_element* elem,
2887 grpc_core::channelz::ChildRefsList* child_subchannels,
2888 grpc_core::channelz::ChildRefsList* child_channels) {
2889 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2890 if (chand->resolving_lb_policy != nullptr) {
2891 chand->resolving_lb_policy->FillChildRefsForChannelz(child_subchannels,
2896 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
2897 channel_data* chand = static_cast<channel_data*>(arg);
2898 chand->resolving_lb_policy->ExitIdleLocked();
2899 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
2902 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
2903 grpc_channel_element* elem, int try_to_connect) {
2904 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2905 grpc_connectivity_state out =
2906 grpc_connectivity_state_check(&chand->state_tracker);
2907 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2908 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
2910 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
2911 grpc_combiner_scheduler(chand->combiner)),
2917 typedef struct external_connectivity_watcher {
2918 channel_data* chand;
2919 grpc_polling_entity pollent;
2920 grpc_closure* on_complete;
2921 grpc_closure* watcher_timer_init;
2922 grpc_connectivity_state* state;
2923 grpc_closure my_closure;
2924 struct external_connectivity_watcher* next;
2925 } external_connectivity_watcher;
2927 static external_connectivity_watcher* lookup_external_connectivity_watcher(
2928 channel_data* chand, grpc_closure* on_complete) {
2929 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2930 external_connectivity_watcher* w =
2931 chand->external_connectivity_watcher_list_head;
2932 while (w != nullptr && w->on_complete != on_complete) {
2935 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2939 static void external_connectivity_watcher_list_append(
2940 channel_data* chand, external_connectivity_watcher* w) {
2941 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
2943 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
2944 GPR_ASSERT(!w->next);
2945 w->next = chand->external_connectivity_watcher_list_head;
2946 chand->external_connectivity_watcher_list_head = w;
2947 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
2950 static void external_connectivity_watcher_list_remove(
2951 channel_data* chand, external_connectivity_watcher* to_remove) {
2953 lookup_external_connectivity_watcher(chand, to_remove->on_complete));
2954 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2955 if (to_remove == chand->external_connectivity_watcher_list_head) {
2956 chand->external_connectivity_watcher_list_head = to_remove->next;
2957 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2960 external_connectivity_watcher* w =
2961 chand->external_connectivity_watcher_list_head;
2962 while (w != nullptr) {
2963 if (w->next == to_remove) {
2964 w->next = w->next->next;
2965 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2970 GPR_UNREACHABLE_CODE(return );
2973 int grpc_client_channel_num_external_connectivity_watchers(
2974 grpc_channel_element* elem) {
2975 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2978 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2979 external_connectivity_watcher* w =
2980 chand->external_connectivity_watcher_list_head;
2981 while (w != nullptr) {
2985 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2990 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
2991 external_connectivity_watcher* w =
2992 static_cast<external_connectivity_watcher*>(arg);
2993 grpc_closure* follow_up = w->on_complete;
2994 grpc_polling_entity_del_from_pollset_set(&w->pollent,
2995 w->chand->interested_parties);
2996 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
2997 "external_connectivity_watcher");
2998 external_connectivity_watcher_list_remove(w->chand, w);
3000 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
3003 static void watch_connectivity_state_locked(void* arg,
3004 grpc_error* error_ignored) {
3005 external_connectivity_watcher* w =
3006 static_cast<external_connectivity_watcher*>(arg);
3007 external_connectivity_watcher* found = nullptr;
3008 if (w->state != nullptr) {
3009 external_connectivity_watcher_list_append(w->chand, w);
3010 // An assumption is being made that the closure is scheduled on the exec ctx
3011 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
3012 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
3013 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3014 grpc_combiner_scheduler(w->chand->combiner));
3015 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3016 w->state, &w->my_closure);
3018 GPR_ASSERT(w->watcher_timer_init == nullptr);
3019 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3021 GPR_ASSERT(found->on_complete == w->on_complete);
3022 grpc_connectivity_state_notify_on_state_change(
3023 &found->chand->state_tracker, nullptr, &found->my_closure);
3025 grpc_polling_entity_del_from_pollset_set(&w->pollent,
3026 w->chand->interested_parties);
3027 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
3028 "external_connectivity_watcher");
3033 void grpc_client_channel_watch_connectivity_state(
3034 grpc_channel_element* elem, grpc_polling_entity pollent,
3035 grpc_connectivity_state* state, grpc_closure* closure,
3036 grpc_closure* watcher_timer_init) {
3037 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3038 external_connectivity_watcher* w =
3039 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
3041 w->pollent = pollent;
3042 w->on_complete = closure;
3044 w->watcher_timer_init = watcher_timer_init;
3045 grpc_polling_entity_add_to_pollset_set(&w->pollent,
3046 chand->interested_parties);
3047 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3048 "external_connectivity_watcher");
3050 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
3051 grpc_combiner_scheduler(chand->combiner)),
3055 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3056 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3057 call_data* calld = static_cast<call_data*>(elem->call_data);
3058 return calld->subchannel_call;