3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
38 #include "src/core/ext/filters/client_channel/request_routing.h"
39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
41 #include "src/core/ext/filters/client_channel/retry_throttle.h"
42 #include "src/core/ext/filters/client_channel/subchannel.h"
43 #include "src/core/ext/filters/deadline/deadline_filter.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/connected_channel.h"
47 #include "src/core/lib/channel/status_util.h"
48 #include "src/core/lib/gpr/string.h"
49 #include "src/core/lib/gprpp/inlined_vector.h"
50 #include "src/core/lib/gprpp/manual_constructor.h"
51 #include "src/core/lib/iomgr/combiner.h"
52 #include "src/core/lib/iomgr/iomgr.h"
53 #include "src/core/lib/iomgr/polling_entity.h"
54 #include "src/core/lib/profiling/timers.h"
55 #include "src/core/lib/slice/slice_internal.h"
56 #include "src/core/lib/slice/slice_string_helpers.h"
57 #include "src/core/lib/surface/channel.h"
58 #include "src/core/lib/transport/connectivity_state.h"
59 #include "src/core/lib/transport/error_utils.h"
60 #include "src/core/lib/transport/metadata.h"
61 #include "src/core/lib/transport/metadata_batch.h"
62 #include "src/core/lib/transport/service_config.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 #include "src/core/lib/transport/status_metadata.h"
66 using grpc_core::internal::ClientChannelMethodParams;
67 using grpc_core::internal::ClientChannelMethodParamsTable;
68 using grpc_core::internal::ProcessedResolverResult;
69 using grpc_core::internal::ServerRetryThrottleData;
71 /* Client channel implementation */
73 // By default, we buffer 256 KiB per RPC for retries.
74 // TODO(roth): Do we have any data to suggest a better value?
75 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
77 // This value was picked arbitrarily. It can be changed if there is
78 // any even moderately compelling reason to do so.
79 #define RETRY_BACKOFF_JITTER 0.2
81 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
83 /*************************************************************************
84 * CHANNEL-WIDE FUNCTIONS
87 struct external_connectivity_watcher;
89 typedef struct client_channel_channel_data {
90 grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router;
92 bool deadline_checking_enabled;
94 size_t per_rpc_retry_buffer_size;
96 /** combiner protecting all variables below in this data structure */
97 grpc_combiner* combiner;
98 /** retry throttle data */
99 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
100 /** maps method names to method_parameters structs */
101 grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
103 grpc_channel_stack* owning_stack;
104 /** interested parties (owned) */
105 grpc_pollset_set* interested_parties;
107 /* external_connectivity_watcher_list head is guarded by its own mutex, since
108 * counts need to be grabbed immediately without polling on a cq */
109 gpr_mu external_connectivity_watcher_list_mu;
110 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
112 /* the following properties are guarded by a mutex since APIs require them
113 to be instantaneously available */
115 grpc_core::UniquePtr<char> info_lb_policy_name;
116 /** service config in JSON form */
117 grpc_core::UniquePtr<char> info_service_config_json;
120 // Synchronous callback from chand->request_router to process a resolver
122 static bool process_resolver_result_locked(void* arg,
123 const grpc_channel_args& args,
124 const char** lb_policy_name,
125 grpc_json** lb_policy_config) {
126 channel_data* chand = static_cast<channel_data*>(arg);
127 ProcessedResolverResult resolver_result(args, chand->enable_retries);
128 grpc_core::UniquePtr<char> service_config_json =
129 resolver_result.service_config_json();
130 if (grpc_client_channel_trace.enabled()) {
131 gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
132 chand, service_config_json.get());
134 // Update channel state.
135 chand->retry_throttle_data = resolver_result.retry_throttle_data();
136 chand->method_params_table = resolver_result.method_params_table();
137 // Swap out the data used by cc_get_channel_info().
138 gpr_mu_lock(&chand->info_mu);
139 chand->info_lb_policy_name = resolver_result.lb_policy_name();
140 const bool service_config_changed =
141 ((service_config_json == nullptr) !=
142 (chand->info_service_config_json == nullptr)) ||
143 (service_config_json != nullptr &&
144 strcmp(service_config_json.get(),
145 chand->info_service_config_json.get()) != 0);
146 chand->info_service_config_json = std::move(service_config_json);
147 gpr_mu_unlock(&chand->info_mu);
149 *lb_policy_name = chand->info_lb_policy_name.get();
150 *lb_policy_config = resolver_result.lb_policy_config();
151 return service_config_changed;
154 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
155 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
156 grpc_channel_element* elem =
157 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
158 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
160 if (op->on_connectivity_state_change != nullptr) {
161 chand->request_router->NotifyOnConnectivityStateChange(
162 op->connectivity_state, op->on_connectivity_state_change);
163 op->on_connectivity_state_change = nullptr;
164 op->connectivity_state = nullptr;
167 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
168 if (chand->request_router->lb_policy() == nullptr) {
170 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
171 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
172 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
174 grpc_error* error = GRPC_ERROR_NONE;
175 grpc_core::LoadBalancingPolicy::PickState pick_state;
176 // Pick must return synchronously, because pick_state.on_complete is null.
178 chand->request_router->lb_policy()->PickLocked(&pick_state, &error));
179 if (pick_state.connected_subchannel != nullptr) {
180 pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
181 op->send_ping.on_ack);
183 if (error == GRPC_ERROR_NONE) {
184 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
185 "LB policy dropped call on ping");
187 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
188 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
190 op->bind_pollset = nullptr;
192 op->send_ping.on_initiate = nullptr;
193 op->send_ping.on_ack = nullptr;
196 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
197 chand->request_router->ShutdownLocked(op->disconnect_with_error);
200 if (op->reset_connect_backoff) {
201 chand->request_router->ResetConnectionBackoffLocked();
204 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
205 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
208 static void cc_start_transport_op(grpc_channel_element* elem,
209 grpc_transport_op* op) {
210 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
212 GPR_ASSERT(op->set_accept_stream == false);
213 if (op->bind_pollset != nullptr) {
214 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
217 op->handler_private.extra_arg = elem;
218 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
220 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
221 op, grpc_combiner_scheduler(chand->combiner)),
225 static void cc_get_channel_info(grpc_channel_element* elem,
226 const grpc_channel_info* info) {
227 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
228 gpr_mu_lock(&chand->info_mu);
229 if (info->lb_policy_name != nullptr) {
230 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
232 if (info->service_config_json != nullptr) {
233 *info->service_config_json =
234 gpr_strdup(chand->info_service_config_json.get());
236 gpr_mu_unlock(&chand->info_mu);
239 /* Constructor for channel_data */
240 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
241 grpc_channel_element_args* args) {
242 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
243 GPR_ASSERT(args->is_last);
244 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
245 // Initialize data members.
246 chand->combiner = grpc_combiner_create();
247 gpr_mu_init(&chand->info_mu);
248 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
250 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
251 chand->external_connectivity_watcher_list_head = nullptr;
252 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
254 chand->owning_stack = args->channel_stack;
255 chand->deadline_checking_enabled =
256 grpc_deadline_checking_enabled(args->channel_args);
257 chand->interested_parties = grpc_pollset_set_create();
258 grpc_client_channel_start_backup_polling(chand->interested_parties);
259 // Record max per-RPC retry buffer size.
260 const grpc_arg* arg = grpc_channel_args_find(
261 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
262 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
263 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
264 // Record enable_retries.
265 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
266 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
267 // Record client channel factory.
268 arg = grpc_channel_args_find(args->channel_args,
269 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
270 if (arg == nullptr) {
271 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
272 "Missing client channel factory in args for client channel filter");
274 if (arg->type != GRPC_ARG_POINTER) {
275 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
276 "client channel factory arg must be a pointer");
278 grpc_client_channel_factory* client_channel_factory =
279 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
280 // Get server name to resolve, using proxy mapper if needed.
281 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
282 if (arg == nullptr) {
283 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
284 "Missing server uri in args for client channel filter");
286 if (arg->type != GRPC_ARG_STRING) {
287 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
288 "server uri arg must be a string");
290 char* proxy_name = nullptr;
291 grpc_channel_args* new_args = nullptr;
292 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
293 &proxy_name, &new_args);
294 // Instantiate request router.
295 grpc_client_channel_factory_ref(client_channel_factory);
296 grpc_error* error = GRPC_ERROR_NONE;
297 chand->request_router.Init(
298 chand->owning_stack, chand->combiner, client_channel_factory,
299 chand->interested_parties, &grpc_client_channel_trace,
300 process_resolver_result_locked, chand,
301 proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */,
302 new_args != nullptr ? new_args : args->channel_args, &error);
303 gpr_free(proxy_name);
304 grpc_channel_args_destroy(new_args);
308 /* Destructor for channel_data */
309 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
310 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
311 chand->request_router.Destroy();
312 // TODO(roth): Once we convert the filter API to C++, there will no
313 // longer be any need to explicitly reset these smart pointer data members.
314 chand->info_lb_policy_name.reset();
315 chand->info_service_config_json.reset();
316 chand->retry_throttle_data.reset();
317 chand->method_params_table.reset();
318 grpc_client_channel_stop_backup_polling(chand->interested_parties);
319 grpc_pollset_set_destroy(chand->interested_parties);
320 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
321 gpr_mu_destroy(&chand->info_mu);
322 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
325 /*************************************************************************
329 // Max number of batches that can be pending on a call at any given
330 // time. This includes one batch for each of the following ops:
331 // recv_initial_metadata
332 // send_initial_metadata
335 // recv_trailing_metadata
336 // send_trailing_metadata
337 #define MAX_PENDING_BATCHES 6
341 // In order to support retries, we act as a proxy for stream op batches.
342 // When we get a batch from the surface, we add it to our list of pending
343 // batches, and we then use those batches to construct separate "child"
344 // batches to be started on the subchannel call. When the child batches
345 // return, we then decide which pending batches have been completed and
346 // schedule their callbacks accordingly. If a subchannel call fails and
347 // we want to retry it, we do a new pick and start again, constructing
348 // new "child" batches for the new subchannel call.
350 // Note that retries are committed when receiving data from the server
351 // (except for Trailers-Only responses). However, there may be many
352 // send ops started before receiving any data, so we may have already
353 // completed some number of send ops (and returned the completions up to
354 // the surface) by the time we realize that we need to retry. To deal
355 // with this, we cache data for send ops, so that we can replay them on a
356 // different subchannel call even after we have completed the original
359 // There are two sets of data to maintain:
360 // - In call_data (in the parent channel), we maintain a list of pending
361 // ops and cached data for send ops.
362 // - In the subchannel call, we maintain state to indicate what ops have
363 // already been sent down to that call.
365 // When constructing the "child" batches, we compare those two sets of
366 // data to see which batches need to be sent to the subchannel call.
368 // TODO(roth): In subsequent PRs:
369 // - add support for transparent retries (including initial metadata)
370 // - figure out how to record stats in census for retries
371 // (census filter is on top of this one)
372 // - add census stats for retries
378 // State used for starting a retryable batch on a subchannel call.
379 // This provides its own grpc_transport_stream_op_batch and other data
380 // structures needed to populate the ops in the batch.
381 // We allocate one struct on the arena for each attempt at starting a
382 // batch on a given subchannel call.
383 struct subchannel_batch_data {
384 subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount,
385 bool set_on_complete);
386 // All dtor code must be added in `destroy`. This is because we may
387 // call closures in `subchannel_batch_data` after they are unrefed by
388 // `batch_data_unref`, and msan would complain about accessing this class
389 // after calling dtor. As a result we cannot call the `dtor` in
390 // `batch_data_unref`.
391 // TODO(soheil): We should try to call the dtor in `batch_data_unref`.
392 ~subchannel_batch_data() { destroy(); }
396 grpc_call_element* elem;
397 grpc_subchannel_call* subchannel_call; // Holds a ref.
398 // The batch to use in the subchannel call.
399 // Its payload field points to subchannel_call_retry_state.batch_payload.
400 grpc_transport_stream_op_batch batch;
401 // For intercepting on_complete.
402 grpc_closure on_complete;
405 // Retry state associated with a subchannel call.
406 // Stored in the parent_data of the subchannel call object.
407 struct subchannel_call_retry_state {
408 explicit subchannel_call_retry_state(grpc_call_context_element* context)
409 : batch_payload(context),
410 started_send_initial_metadata(false),
411 completed_send_initial_metadata(false),
412 started_send_trailing_metadata(false),
413 completed_send_trailing_metadata(false),
414 started_recv_initial_metadata(false),
415 completed_recv_initial_metadata(false),
416 started_recv_trailing_metadata(false),
417 completed_recv_trailing_metadata(false),
418 retry_dispatched(false) {}
420 // subchannel_batch_data.batch.payload points to this.
421 grpc_transport_stream_op_batch_payload batch_payload;
422 // For send_initial_metadata.
423 // Note that we need to make a copy of the initial metadata for each
424 // subchannel call instead of just referring to the copy in call_data,
425 // because filters in the subchannel stack will probably add entries,
426 // so we need to start in a pristine state for each attempt of the call.
427 grpc_linked_mdelem* send_initial_metadata_storage;
428 grpc_metadata_batch send_initial_metadata;
430 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
432 // For send_trailing_metadata.
433 grpc_linked_mdelem* send_trailing_metadata_storage;
434 grpc_metadata_batch send_trailing_metadata;
435 // For intercepting recv_initial_metadata.
436 grpc_metadata_batch recv_initial_metadata;
437 grpc_closure recv_initial_metadata_ready;
438 bool trailing_metadata_available = false;
439 // For intercepting recv_message.
440 grpc_closure recv_message_ready;
441 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
442 // For intercepting recv_trailing_metadata.
443 grpc_metadata_batch recv_trailing_metadata;
444 grpc_transport_stream_stats collect_stats;
445 grpc_closure recv_trailing_metadata_ready;
446 // These fields indicate which ops have been started and completed on
447 // this subchannel call.
448 size_t started_send_message_count = 0;
449 size_t completed_send_message_count = 0;
450 size_t started_recv_message_count = 0;
451 size_t completed_recv_message_count = 0;
452 bool started_send_initial_metadata : 1;
453 bool completed_send_initial_metadata : 1;
454 bool started_send_trailing_metadata : 1;
455 bool completed_send_trailing_metadata : 1;
456 bool started_recv_initial_metadata : 1;
457 bool completed_recv_initial_metadata : 1;
458 bool started_recv_trailing_metadata : 1;
459 bool completed_recv_trailing_metadata : 1;
460 // State for callback processing.
461 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
462 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
463 subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
464 grpc_error* recv_message_error = GRPC_ERROR_NONE;
465 subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
466 // NOTE: Do not move this next to the metadata bitfields above. That would
467 // save space but will also result in a data race because compiler will
468 // generate a 2 byte store which overwrites the meta-data fields upon
469 // setting this field.
470 bool retry_dispatched : 1;
473 // Pending batches stored in call data.
474 struct pending_batch {
475 // The pending batch. If nullptr, this slot is empty.
476 grpc_transport_stream_op_batch* batch;
477 // Indicates whether payload for send ops has been cached in call data.
478 bool send_ops_cached;
481 /** Call data. Holds a pointer to grpc_subchannel_call and the
482 associated machinery to create such a pointer.
483 Handles queueing of stream ops until a call object is ready, waiting
484 for initial metadata before trying to create a call object,
485 and handling cancellation gracefully. */
487 call_data(grpc_call_element* elem, const channel_data& chand,
488 const grpc_call_element_args& args)
489 : deadline_state(elem, args.call_stack, args.call_combiner,
490 GPR_LIKELY(chand.deadline_checking_enabled)
492 : GRPC_MILLIS_INF_FUTURE),
493 path(grpc_slice_ref_internal(args.path)),
494 call_start_time(args.start_time),
495 deadline(args.deadline),
497 owning_call(args.call_stack),
498 call_combiner(args.call_combiner),
499 pending_send_initial_metadata(false),
500 pending_send_message(false),
501 pending_send_trailing_metadata(false),
502 enable_retries(chand.enable_retries),
503 retry_committed(false),
504 last_attempt_got_server_pushback(false) {}
507 if (GPR_LIKELY(subchannel_call != nullptr)) {
508 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call,
509 "client_channel_destroy_call");
511 grpc_slice_unref_internal(path);
512 GRPC_ERROR_UNREF(cancel_error);
513 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
514 GPR_ASSERT(pending_batches[i].batch == nullptr);
521 // State for handling deadlines.
522 // The code in deadline_filter.c requires this to be the first field.
523 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
524 // and this struct both independently store pointers to the call stack
525 // and call combiner. If/when we have time, find a way to avoid this
526 // without breaking the grpc_deadline_state abstraction.
527 grpc_deadline_state deadline_state;
529 grpc_slice path; // Request path.
530 gpr_timespec call_start_time;
531 grpc_millis deadline;
533 grpc_call_stack* owning_call;
534 grpc_call_combiner* call_combiner;
536 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
537 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
539 grpc_subchannel_call* subchannel_call = nullptr;
541 // Set when we get a cancel_stream op.
542 grpc_error* cancel_error = GRPC_ERROR_NONE;
544 grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request;
545 bool have_request = false;
546 grpc_closure pick_closure;
548 grpc_polling_entity* pollent = nullptr;
550 // Batches are added to this list when received from above.
551 // They are removed when we are done handling the batch (i.e., when
552 // either we have invoked all of the batch's callbacks or we have
553 // passed the batch down to the subchannel call and are not
554 // intercepting any of its callbacks).
555 pending_batch pending_batches[MAX_PENDING_BATCHES] = {};
556 bool pending_send_initial_metadata : 1;
557 bool pending_send_message : 1;
558 bool pending_send_trailing_metadata : 1;
561 bool enable_retries : 1;
562 bool retry_committed : 1;
563 bool last_attempt_got_server_pushback : 1;
564 int num_attempts_completed = 0;
565 size_t bytes_buffered_for_retry = 0;
566 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
567 grpc_timer retry_timer;
569 // The number of pending retriable subchannel batches containing send ops.
570 // We hold a ref to the call stack while this is non-zero, since replay
571 // batches may not complete until after all callbacks have been returned
572 // to the surface, and we need to make sure that the call is not destroyed
573 // until all of these batches have completed.
574 // Note that we actually only need to track replay batches, but it's
575 // easier to track all batches with send ops.
576 int num_pending_retriable_subchannel_send_batches = 0;
578 // Cached data for retrying send ops.
579 // send_initial_metadata
580 bool seen_send_initial_metadata = false;
581 grpc_linked_mdelem* send_initial_metadata_storage = nullptr;
582 grpc_metadata_batch send_initial_metadata;
583 uint32_t send_initial_metadata_flags;
584 gpr_atm* peer_string;
586 // When we get a send_message op, we replace the original byte stream
587 // with a CachingByteStream that caches the slices to a local buffer for
589 // Note: We inline the cache for the first 3 send_message ops and use
590 // dynamic allocation after that. This number was essentially picked
591 // at random; it could be changed in the future to tune performance.
592 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
593 // send_trailing_metadata
594 bool seen_send_trailing_metadata = false;
595 grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
596 grpc_metadata_batch send_trailing_metadata;
601 // Forward declarations.
602 static void retry_commit(grpc_call_element* elem,
603 subchannel_call_retry_state* retry_state);
604 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
605 static void on_complete(void* arg, grpc_error* error);
606 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
607 static void start_pick_locked(void* arg, grpc_error* ignored);
610 // send op data caching
613 // Caches data for send ops so that it can be retried later, if not
615 static void maybe_cache_send_ops_for_batch(call_data* calld,
616 pending_batch* pending) {
617 if (pending->send_ops_cached) return;
618 pending->send_ops_cached = true;
619 grpc_transport_stream_op_batch* batch = pending->batch;
620 // Save a copy of metadata for send_initial_metadata ops.
621 if (batch->send_initial_metadata) {
622 calld->seen_send_initial_metadata = true;
623 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
624 grpc_metadata_batch* send_initial_metadata =
625 batch->payload->send_initial_metadata.send_initial_metadata;
626 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
628 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
629 grpc_metadata_batch_copy(send_initial_metadata,
630 &calld->send_initial_metadata,
631 calld->send_initial_metadata_storage);
632 calld->send_initial_metadata_flags =
633 batch->payload->send_initial_metadata.send_initial_metadata_flags;
634 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
636 // Set up cache for send_message ops.
637 if (batch->send_message) {
638 grpc_core::ByteStreamCache* cache =
639 static_cast<grpc_core::ByteStreamCache*>(
640 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
641 new (cache) grpc_core::ByteStreamCache(
642 std::move(batch->payload->send_message.send_message));
643 calld->send_messages.push_back(cache);
645 // Save metadata batch for send_trailing_metadata ops.
646 if (batch->send_trailing_metadata) {
647 calld->seen_send_trailing_metadata = true;
648 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
649 grpc_metadata_batch* send_trailing_metadata =
650 batch->payload->send_trailing_metadata.send_trailing_metadata;
651 calld->send_trailing_metadata_storage =
652 (grpc_linked_mdelem*)gpr_arena_alloc(
654 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
655 grpc_metadata_batch_copy(send_trailing_metadata,
656 &calld->send_trailing_metadata,
657 calld->send_trailing_metadata_storage);
661 // Frees cached send_initial_metadata.
662 static void free_cached_send_initial_metadata(channel_data* chand,
664 if (grpc_client_channel_trace.enabled()) {
666 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
669 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
672 // Frees cached send_message at index idx.
673 static void free_cached_send_message(channel_data* chand, call_data* calld,
675 if (grpc_client_channel_trace.enabled()) {
677 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
680 calld->send_messages[idx]->Destroy();
683 // Frees cached send_trailing_metadata.
684 static void free_cached_send_trailing_metadata(channel_data* chand,
686 if (grpc_client_channel_trace.enabled()) {
688 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
691 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
694 // Frees cached send ops that have already been completed after
695 // committing the call.
696 static void free_cached_send_op_data_after_commit(
697 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
698 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
699 call_data* calld = static_cast<call_data*>(elem->call_data);
700 if (retry_state->completed_send_initial_metadata) {
701 free_cached_send_initial_metadata(chand, calld);
703 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
704 free_cached_send_message(chand, calld, i);
706 if (retry_state->completed_send_trailing_metadata) {
707 free_cached_send_trailing_metadata(chand, calld);
711 // Frees cached send ops that were completed by the completed batch in
712 // batch_data. Used when batches are completed after the call is committed.
713 static void free_cached_send_op_data_for_completed_batch(
714 grpc_call_element* elem, subchannel_batch_data* batch_data,
715 subchannel_call_retry_state* retry_state) {
716 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
717 call_data* calld = static_cast<call_data*>(elem->call_data);
718 if (batch_data->batch.send_initial_metadata) {
719 free_cached_send_initial_metadata(chand, calld);
721 if (batch_data->batch.send_message) {
722 free_cached_send_message(chand, calld,
723 retry_state->completed_send_message_count - 1);
725 if (batch_data->batch.send_trailing_metadata) {
726 free_cached_send_trailing_metadata(chand, calld);
731 // pending_batches management
734 // Returns the index into calld->pending_batches to be used for batch.
735 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
736 // Note: It is important the send_initial_metadata be the first entry
737 // here, since the code in pick_subchannel_locked() assumes it will be.
738 if (batch->send_initial_metadata) return 0;
739 if (batch->send_message) return 1;
740 if (batch->send_trailing_metadata) return 2;
741 if (batch->recv_initial_metadata) return 3;
742 if (batch->recv_message) return 4;
743 if (batch->recv_trailing_metadata) return 5;
744 GPR_UNREACHABLE_CODE(return (size_t)-1);
747 // This is called via the call combiner, so access to calld is synchronized.
748 static void pending_batches_add(grpc_call_element* elem,
749 grpc_transport_stream_op_batch* batch) {
750 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
751 call_data* calld = static_cast<call_data*>(elem->call_data);
752 const size_t idx = get_batch_index(batch);
753 if (grpc_client_channel_trace.enabled()) {
755 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
758 pending_batch* pending = &calld->pending_batches[idx];
759 GPR_ASSERT(pending->batch == nullptr);
760 pending->batch = batch;
761 pending->send_ops_cached = false;
762 if (calld->enable_retries) {
763 // Update state in calld about pending batches.
764 // Also check if the batch takes us over the retry buffer limit.
765 // Note: We don't check the size of trailing metadata here, because
766 // gRPC clients do not send trailing metadata.
767 if (batch->send_initial_metadata) {
768 calld->pending_send_initial_metadata = true;
769 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
770 batch->payload->send_initial_metadata.send_initial_metadata);
772 if (batch->send_message) {
773 calld->pending_send_message = true;
774 calld->bytes_buffered_for_retry +=
775 batch->payload->send_message.send_message->length();
777 if (batch->send_trailing_metadata) {
778 calld->pending_send_trailing_metadata = true;
780 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
781 chand->per_rpc_retry_buffer_size)) {
782 if (grpc_client_channel_trace.enabled()) {
784 "chand=%p calld=%p: exceeded retry buffer size, committing",
787 subchannel_call_retry_state* retry_state =
788 calld->subchannel_call == nullptr
790 : static_cast<subchannel_call_retry_state*>(
791 grpc_connected_subchannel_call_get_parent_data(
792 calld->subchannel_call));
793 retry_commit(elem, retry_state);
794 // If we are not going to retry and have not yet started, pretend
795 // retries are disabled so that we don't bother with retry overhead.
796 if (calld->num_attempts_completed == 0) {
797 if (grpc_client_channel_trace.enabled()) {
799 "chand=%p calld=%p: disabling retries before first attempt",
802 calld->enable_retries = false;
808 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
809 if (calld->enable_retries) {
810 if (pending->batch->send_initial_metadata) {
811 calld->pending_send_initial_metadata = false;
813 if (pending->batch->send_message) {
814 calld->pending_send_message = false;
816 if (pending->batch->send_trailing_metadata) {
817 calld->pending_send_trailing_metadata = false;
820 pending->batch = nullptr;
823 // This is called via the call combiner, so access to calld is synchronized.
824 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
825 grpc_transport_stream_op_batch* batch =
826 static_cast<grpc_transport_stream_op_batch*>(arg);
827 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
828 // Note: This will release the call combiner.
829 grpc_transport_stream_op_batch_finish_with_failure(
830 batch, GRPC_ERROR_REF(error), calld->call_combiner);
833 // This is called via the call combiner, so access to calld is synchronized.
834 // If yield_call_combiner is true, assumes responsibility for yielding
835 // the call combiner.
836 static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
837 bool yield_call_combiner) {
838 GPR_ASSERT(error != GRPC_ERROR_NONE);
839 call_data* calld = static_cast<call_data*>(elem->call_data);
840 if (grpc_client_channel_trace.enabled()) {
841 size_t num_batches = 0;
842 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
843 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
846 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
847 elem->channel_data, calld, num_batches, grpc_error_string(error));
849 grpc_core::CallCombinerClosureList closures;
850 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
851 pending_batch* pending = &calld->pending_batches[i];
852 grpc_transport_stream_op_batch* batch = pending->batch;
853 if (batch != nullptr) {
854 batch->handler_private.extra_arg = calld;
855 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
856 fail_pending_batch_in_call_combiner, batch,
857 grpc_schedule_on_exec_ctx);
858 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
859 "pending_batches_fail");
860 pending_batch_clear(calld, pending);
863 if (yield_call_combiner) {
864 closures.RunClosures(calld->call_combiner);
866 closures.RunClosuresWithoutYielding(calld->call_combiner);
868 GRPC_ERROR_UNREF(error);
871 // This is called via the call combiner, so access to calld is synchronized.
872 static void resume_pending_batch_in_call_combiner(void* arg,
873 grpc_error* ignored) {
874 grpc_transport_stream_op_batch* batch =
875 static_cast<grpc_transport_stream_op_batch*>(arg);
876 grpc_subchannel_call* subchannel_call =
877 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
878 // Note: This will release the call combiner.
879 grpc_subchannel_call_process_op(subchannel_call, batch);
882 // This is called via the call combiner, so access to calld is synchronized.
883 static void pending_batches_resume(grpc_call_element* elem) {
884 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
885 call_data* calld = static_cast<call_data*>(elem->call_data);
886 if (calld->enable_retries) {
887 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
890 // Retries not enabled; send down batches as-is.
891 if (grpc_client_channel_trace.enabled()) {
892 size_t num_batches = 0;
893 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
894 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
897 "chand=%p calld=%p: starting %" PRIuPTR
898 " pending batches on subchannel_call=%p",
899 chand, calld, num_batches, calld->subchannel_call);
901 grpc_core::CallCombinerClosureList closures;
902 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
903 pending_batch* pending = &calld->pending_batches[i];
904 grpc_transport_stream_op_batch* batch = pending->batch;
905 if (batch != nullptr) {
906 batch->handler_private.extra_arg = calld->subchannel_call;
907 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
908 resume_pending_batch_in_call_combiner, batch,
909 grpc_schedule_on_exec_ctx);
910 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
911 "pending_batches_resume");
912 pending_batch_clear(calld, pending);
915 // Note: This will release the call combiner.
916 closures.RunClosures(calld->call_combiner);
919 static void maybe_clear_pending_batch(grpc_call_element* elem,
920 pending_batch* pending) {
921 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
922 call_data* calld = static_cast<call_data*>(elem->call_data);
923 grpc_transport_stream_op_batch* batch = pending->batch;
924 // We clear the pending batch if all of its callbacks have been
925 // scheduled and reset to nullptr.
926 if (batch->on_complete == nullptr &&
927 (!batch->recv_initial_metadata ||
928 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
930 (!batch->recv_message ||
931 batch->payload->recv_message.recv_message_ready == nullptr) &&
932 (!batch->recv_trailing_metadata ||
933 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
935 if (grpc_client_channel_trace.enabled()) {
936 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
939 pending_batch_clear(calld, pending);
943 // Returns a pointer to the first pending batch for which predicate(batch)
944 // returns true, or null if not found.
945 template <typename Predicate>
946 static pending_batch* pending_batch_find(grpc_call_element* elem,
947 const char* log_message,
948 Predicate predicate) {
949 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
950 call_data* calld = static_cast<call_data*>(elem->call_data);
951 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
952 pending_batch* pending = &calld->pending_batches[i];
953 grpc_transport_stream_op_batch* batch = pending->batch;
954 if (batch != nullptr && predicate(batch)) {
955 if (grpc_client_channel_trace.enabled()) {
957 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
958 calld, log_message, i);
970 // Commits the call so that no further retry attempts will be performed.
971 static void retry_commit(grpc_call_element* elem,
972 subchannel_call_retry_state* retry_state) {
973 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
974 call_data* calld = static_cast<call_data*>(elem->call_data);
975 if (calld->retry_committed) return;
976 calld->retry_committed = true;
977 if (grpc_client_channel_trace.enabled()) {
978 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
980 if (retry_state != nullptr) {
981 free_cached_send_op_data_after_commit(elem, retry_state);
985 // Starts a retry after appropriate back-off.
986 static void do_retry(grpc_call_element* elem,
987 subchannel_call_retry_state* retry_state,
988 grpc_millis server_pushback_ms) {
989 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
990 call_data* calld = static_cast<call_data*>(elem->call_data);
991 GPR_ASSERT(calld->method_params != nullptr);
992 const ClientChannelMethodParams::RetryPolicy* retry_policy =
993 calld->method_params->retry_policy();
994 GPR_ASSERT(retry_policy != nullptr);
995 // Reset subchannel call and connected subchannel.
996 if (calld->subchannel_call != nullptr) {
997 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
998 "client_channel_call_retry");
999 calld->subchannel_call = nullptr;
1001 if (calld->have_request) {
1002 calld->have_request = false;
1003 calld->request.Destroy();
1005 // Compute backoff delay.
1006 grpc_millis next_attempt_time;
1007 if (server_pushback_ms >= 0) {
1008 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1009 calld->last_attempt_got_server_pushback = true;
1011 if (calld->num_attempts_completed == 1 ||
1012 calld->last_attempt_got_server_pushback) {
1013 calld->retry_backoff.Init(
1014 grpc_core::BackOff::Options()
1015 .set_initial_backoff(retry_policy->initial_backoff)
1016 .set_multiplier(retry_policy->backoff_multiplier)
1017 .set_jitter(RETRY_BACKOFF_JITTER)
1018 .set_max_backoff(retry_policy->max_backoff));
1019 calld->last_attempt_got_server_pushback = false;
1021 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1023 if (grpc_client_channel_trace.enabled()) {
1025 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
1026 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1028 // Schedule retry after computed delay.
1029 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1030 grpc_combiner_scheduler(chand->combiner));
1031 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1032 // Update bookkeeping.
1033 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1036 // Returns true if the call is being retried.
1037 static bool maybe_retry(grpc_call_element* elem,
1038 subchannel_batch_data* batch_data,
1039 grpc_status_code status,
1040 grpc_mdelem* server_pushback_md) {
1041 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1042 call_data* calld = static_cast<call_data*>(elem->call_data);
1043 // Get retry policy.
1044 if (calld->method_params == nullptr) return false;
1045 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1046 calld->method_params->retry_policy();
1047 if (retry_policy == nullptr) return false;
1048 // If we've already dispatched a retry from this call, return true.
1049 // This catches the case where the batch has multiple callbacks
1050 // (i.e., it includes either recv_message or recv_initial_metadata).
1051 subchannel_call_retry_state* retry_state = nullptr;
1052 if (batch_data != nullptr) {
1053 retry_state = static_cast<subchannel_call_retry_state*>(
1054 grpc_connected_subchannel_call_get_parent_data(
1055 batch_data->subchannel_call));
1056 if (retry_state->retry_dispatched) {
1057 if (grpc_client_channel_trace.enabled()) {
1058 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
1065 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
1066 if (calld->retry_throttle_data != nullptr) {
1067 calld->retry_throttle_data->RecordSuccess();
1069 if (grpc_client_channel_trace.enabled()) {
1070 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
1074 // Status is not OK. Check whether the status is retryable.
1075 if (!retry_policy->retryable_status_codes.Contains(status)) {
1076 if (grpc_client_channel_trace.enabled()) {
1078 "chand=%p calld=%p: status %s not configured as retryable", chand,
1079 calld, grpc_status_code_to_string(status));
1083 // Record the failure and check whether retries are throttled.
1084 // Note that it's important for this check to come after the status
1085 // code check above, since we should only record failures whose statuses
1086 // match the configured retryable status codes, so that we don't count
1087 // things like failures due to malformed requests (INVALID_ARGUMENT).
1088 // Conversely, it's important for this to come before the remaining
1089 // checks, so that we don't fail to record failures due to other factors.
1090 if (calld->retry_throttle_data != nullptr &&
1091 !calld->retry_throttle_data->RecordFailure()) {
1092 if (grpc_client_channel_trace.enabled()) {
1093 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
1097 // Check whether the call is committed.
1098 if (calld->retry_committed) {
1099 if (grpc_client_channel_trace.enabled()) {
1100 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
1105 // Check whether we have retries remaining.
1106 ++calld->num_attempts_completed;
1107 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1108 if (grpc_client_channel_trace.enabled()) {
1109 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1110 calld, retry_policy->max_attempts);
1114 // If the call was cancelled from the surface, don't retry.
1115 if (calld->cancel_error != GRPC_ERROR_NONE) {
1116 if (grpc_client_channel_trace.enabled()) {
1118 "chand=%p calld=%p: call cancelled from surface, not retrying",
1123 // Check server push-back.
1124 grpc_millis server_pushback_ms = -1;
1125 if (server_pushback_md != nullptr) {
1126 // If the value is "-1" or any other unparseable string, we do not retry.
1128 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1129 if (grpc_client_channel_trace.enabled()) {
1131 "chand=%p calld=%p: not retrying due to server push-back",
1136 if (grpc_client_channel_trace.enabled()) {
1137 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1140 server_pushback_ms = (grpc_millis)ms;
1143 do_retry(elem, retry_state, server_pushback_ms);
1148 // subchannel_batch_data
1153 subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
1154 call_data* calld, int refcount,
1155 bool set_on_complete)
1157 subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call,
1158 "batch_data_create")) {
1159 subchannel_call_retry_state* retry_state =
1160 static_cast<subchannel_call_retry_state*>(
1161 grpc_connected_subchannel_call_get_parent_data(
1162 calld->subchannel_call));
1163 batch.payload = &retry_state->batch_payload;
1164 gpr_ref_init(&refs, refcount);
1165 if (set_on_complete) {
1166 GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this,
1167 grpc_schedule_on_exec_ctx);
1168 batch.on_complete = &on_complete;
1170 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1173 void subchannel_batch_data::destroy() {
1174 subchannel_call_retry_state* retry_state =
1175 static_cast<subchannel_call_retry_state*>(
1176 grpc_connected_subchannel_call_get_parent_data(subchannel_call));
1177 if (batch.send_initial_metadata) {
1178 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
1180 if (batch.send_trailing_metadata) {
1181 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
1183 if (batch.recv_initial_metadata) {
1184 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
1186 if (batch.recv_trailing_metadata) {
1187 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
1189 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref");
1190 call_data* calld = static_cast<call_data*>(elem->call_data);
1191 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1196 // Creates a subchannel_batch_data object on the call's arena with the
1197 // specified refcount. If set_on_complete is true, the batch's
1198 // on_complete callback will be set to point to on_complete();
1199 // otherwise, the batch's on_complete callback will be null.
1200 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1202 bool set_on_complete) {
1203 call_data* calld = static_cast<call_data*>(elem->call_data);
1204 subchannel_batch_data* batch_data =
1205 new (gpr_arena_alloc(calld->arena, sizeof(*batch_data)))
1206 subchannel_batch_data(elem, calld, refcount, set_on_complete);
1210 static void batch_data_unref(subchannel_batch_data* batch_data) {
1211 if (gpr_unref(&batch_data->refs)) {
1212 batch_data->destroy();
1217 // recv_initial_metadata callback handling
1220 // Invokes recv_initial_metadata_ready for a subchannel batch.
1221 static void invoke_recv_initial_metadata_callback(void* arg,
1222 grpc_error* error) {
1223 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1224 // Find pending batch.
1225 pending_batch* pending = pending_batch_find(
1226 batch_data->elem, "invoking recv_initial_metadata_ready for",
1227 [](grpc_transport_stream_op_batch* batch) {
1228 return batch->recv_initial_metadata &&
1229 batch->payload->recv_initial_metadata
1230 .recv_initial_metadata_ready != nullptr;
1232 GPR_ASSERT(pending != nullptr);
1234 subchannel_call_retry_state* retry_state =
1235 static_cast<subchannel_call_retry_state*>(
1236 grpc_connected_subchannel_call_get_parent_data(
1237 batch_data->subchannel_call));
1238 grpc_metadata_batch_move(
1239 &retry_state->recv_initial_metadata,
1240 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1241 // Update bookkeeping.
1242 // Note: Need to do this before invoking the callback, since invoking
1243 // the callback will result in yielding the call combiner.
1244 grpc_closure* recv_initial_metadata_ready =
1245 pending->batch->payload->recv_initial_metadata
1246 .recv_initial_metadata_ready;
1247 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1249 maybe_clear_pending_batch(batch_data->elem, pending);
1250 batch_data_unref(batch_data);
1252 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1255 // Intercepts recv_initial_metadata_ready callback for retries.
1256 // Commits the call and returns the initial metadata up the stack.
1257 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1258 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1259 grpc_call_element* elem = batch_data->elem;
1260 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1261 call_data* calld = static_cast<call_data*>(elem->call_data);
1262 if (grpc_client_channel_trace.enabled()) {
1264 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1265 chand, calld, grpc_error_string(error));
1267 subchannel_call_retry_state* retry_state =
1268 static_cast<subchannel_call_retry_state*>(
1269 grpc_connected_subchannel_call_get_parent_data(
1270 batch_data->subchannel_call));
1271 retry_state->completed_recv_initial_metadata = true;
1272 // If a retry was already dispatched, then we're not going to use the
1273 // result of this recv_initial_metadata op, so do nothing.
1274 if (retry_state->retry_dispatched) {
1275 GRPC_CALL_COMBINER_STOP(
1276 calld->call_combiner,
1277 "recv_initial_metadata_ready after retry dispatched");
1280 // If we got an error or a Trailers-Only response and have not yet gotten
1281 // the recv_trailing_metadata_ready callback, then defer propagating this
1282 // callback back to the surface. We can evaluate whether to retry when
1283 // recv_trailing_metadata comes back.
1284 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
1285 error != GRPC_ERROR_NONE) &&
1286 !retry_state->completed_recv_trailing_metadata)) {
1287 if (grpc_client_channel_trace.enabled()) {
1289 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1293 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
1294 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1295 if (!retry_state->started_recv_trailing_metadata) {
1296 // recv_trailing_metadata not yet started by application; start it
1297 // ourselves to get status.
1298 start_internal_recv_trailing_metadata(elem);
1300 GRPC_CALL_COMBINER_STOP(
1301 calld->call_combiner,
1302 "recv_initial_metadata_ready trailers-only or error");
1306 // Received valid initial metadata, so commit the call.
1307 retry_commit(elem, retry_state);
1308 // Invoke the callback to return the result to the surface.
1309 // Manually invoking a callback function; it does not take ownership of error.
1310 invoke_recv_initial_metadata_callback(batch_data, error);
1314 // recv_message callback handling
1317 // Invokes recv_message_ready for a subchannel batch.
1318 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1319 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1321 pending_batch* pending = pending_batch_find(
1322 batch_data->elem, "invoking recv_message_ready for",
1323 [](grpc_transport_stream_op_batch* batch) {
1324 return batch->recv_message &&
1325 batch->payload->recv_message.recv_message_ready != nullptr;
1327 GPR_ASSERT(pending != nullptr);
1329 subchannel_call_retry_state* retry_state =
1330 static_cast<subchannel_call_retry_state*>(
1331 grpc_connected_subchannel_call_get_parent_data(
1332 batch_data->subchannel_call));
1333 *pending->batch->payload->recv_message.recv_message =
1334 std::move(retry_state->recv_message);
1335 // Update bookkeeping.
1336 // Note: Need to do this before invoking the callback, since invoking
1337 // the callback will result in yielding the call combiner.
1338 grpc_closure* recv_message_ready =
1339 pending->batch->payload->recv_message.recv_message_ready;
1340 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1341 maybe_clear_pending_batch(batch_data->elem, pending);
1342 batch_data_unref(batch_data);
1344 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1347 // Intercepts recv_message_ready callback for retries.
1348 // Commits the call and returns the message up the stack.
1349 static void recv_message_ready(void* arg, grpc_error* error) {
1350 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1351 grpc_call_element* elem = batch_data->elem;
1352 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1353 call_data* calld = static_cast<call_data*>(elem->call_data);
1354 if (grpc_client_channel_trace.enabled()) {
1355 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1356 chand, calld, grpc_error_string(error));
1358 subchannel_call_retry_state* retry_state =
1359 static_cast<subchannel_call_retry_state*>(
1360 grpc_connected_subchannel_call_get_parent_data(
1361 batch_data->subchannel_call));
1362 ++retry_state->completed_recv_message_count;
1363 // If a retry was already dispatched, then we're not going to use the
1364 // result of this recv_message op, so do nothing.
1365 if (retry_state->retry_dispatched) {
1366 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1367 "recv_message_ready after retry dispatched");
1370 // If we got an error or the payload was nullptr and we have not yet gotten
1371 // the recv_trailing_metadata_ready callback, then defer propagating this
1372 // callback back to the surface. We can evaluate whether to retry when
1373 // recv_trailing_metadata comes back.
1375 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1376 !retry_state->completed_recv_trailing_metadata)) {
1377 if (grpc_client_channel_trace.enabled()) {
1379 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1380 "message and recv_trailing_metadata pending)",
1383 retry_state->recv_message_ready_deferred_batch = batch_data;
1384 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1385 if (!retry_state->started_recv_trailing_metadata) {
1386 // recv_trailing_metadata not yet started by application; start it
1387 // ourselves to get status.
1388 start_internal_recv_trailing_metadata(elem);
1390 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1394 // Received a valid message, so commit the call.
1395 retry_commit(elem, retry_state);
1396 // Invoke the callback to return the result to the surface.
1397 // Manually invoking a callback function; it does not take ownership of error.
1398 invoke_recv_message_callback(batch_data, error);
1402 // recv_trailing_metadata handling
1405 // Sets *status and *server_pushback_md based on md_batch and error.
1406 // Only sets *server_pushback_md if server_pushback_md != nullptr.
1407 static void get_call_status(grpc_call_element* elem,
1408 grpc_metadata_batch* md_batch, grpc_error* error,
1409 grpc_status_code* status,
1410 grpc_mdelem** server_pushback_md) {
1411 call_data* calld = static_cast<call_data*>(elem->call_data);
1412 if (error != GRPC_ERROR_NONE) {
1413 grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1416 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1418 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1419 if (server_pushback_md != nullptr &&
1420 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1421 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1424 GRPC_ERROR_UNREF(error);
1427 // Adds recv_trailing_metadata_ready closure to closures.
1428 static void add_closure_for_recv_trailing_metadata_ready(
1429 grpc_call_element* elem, subchannel_batch_data* batch_data,
1430 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1431 // Find pending batch.
1432 pending_batch* pending = pending_batch_find(
1433 elem, "invoking recv_trailing_metadata for",
1434 [](grpc_transport_stream_op_batch* batch) {
1435 return batch->recv_trailing_metadata &&
1436 batch->payload->recv_trailing_metadata
1437 .recv_trailing_metadata_ready != nullptr;
1439 // If we generated the recv_trailing_metadata op internally via
1440 // start_internal_recv_trailing_metadata(), then there will be no
1442 if (pending == nullptr) {
1443 GRPC_ERROR_UNREF(error);
1447 subchannel_call_retry_state* retry_state =
1448 static_cast<subchannel_call_retry_state*>(
1449 grpc_connected_subchannel_call_get_parent_data(
1450 batch_data->subchannel_call));
1451 grpc_metadata_batch_move(
1452 &retry_state->recv_trailing_metadata,
1453 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1455 closures->Add(pending->batch->payload->recv_trailing_metadata
1456 .recv_trailing_metadata_ready,
1457 error, "recv_trailing_metadata_ready for pending batch");
1458 // Update bookkeeping.
1459 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1461 maybe_clear_pending_batch(elem, pending);
1464 // Adds any necessary closures for deferred recv_initial_metadata and
1465 // recv_message callbacks to closures.
1466 static void add_closures_for_deferred_recv_callbacks(
1467 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1468 grpc_core::CallCombinerClosureList* closures) {
1469 if (batch_data->batch.recv_trailing_metadata) {
1470 // Add closure for deferred recv_initial_metadata_ready.
1471 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1473 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1474 invoke_recv_initial_metadata_callback,
1475 retry_state->recv_initial_metadata_ready_deferred_batch,
1476 grpc_schedule_on_exec_ctx);
1477 closures->Add(&retry_state->recv_initial_metadata_ready,
1478 retry_state->recv_initial_metadata_error,
1479 "resuming recv_initial_metadata_ready");
1480 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1482 // Add closure for deferred recv_message_ready.
1483 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1485 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
1486 invoke_recv_message_callback,
1487 retry_state->recv_message_ready_deferred_batch,
1488 grpc_schedule_on_exec_ctx);
1489 closures->Add(&retry_state->recv_message_ready,
1490 retry_state->recv_message_error,
1491 "resuming recv_message_ready");
1492 retry_state->recv_message_ready_deferred_batch = nullptr;
1497 // Returns true if any op in the batch was not yet started.
1498 // Only looks at send ops, since recv ops are always started immediately.
1499 static bool pending_batch_is_unstarted(
1500 pending_batch* pending, call_data* calld,
1501 subchannel_call_retry_state* retry_state) {
1502 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1505 if (pending->batch->send_initial_metadata &&
1506 !retry_state->started_send_initial_metadata) {
1509 if (pending->batch->send_message &&
1510 retry_state->started_send_message_count < calld->send_messages.size()) {
1513 if (pending->batch->send_trailing_metadata &&
1514 !retry_state->started_send_trailing_metadata) {
1520 // For any pending batch containing an op that has not yet been started,
1521 // adds the pending batch's completion closures to closures.
1522 static void add_closures_to_fail_unstarted_pending_batches(
1523 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1524 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1525 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1526 call_data* calld = static_cast<call_data*>(elem->call_data);
1527 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1528 pending_batch* pending = &calld->pending_batches[i];
1529 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1530 if (grpc_client_channel_trace.enabled()) {
1532 "chand=%p calld=%p: failing unstarted pending batch at index "
1536 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1537 "failing on_complete for pending batch");
1538 pending->batch->on_complete = nullptr;
1539 maybe_clear_pending_batch(elem, pending);
1542 GRPC_ERROR_UNREF(error);
1545 // Runs necessary closures upon completion of a call attempt.
1546 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1547 grpc_error* error) {
1548 grpc_call_element* elem = batch_data->elem;
1549 call_data* calld = static_cast<call_data*>(elem->call_data);
1550 subchannel_call_retry_state* retry_state =
1551 static_cast<subchannel_call_retry_state*>(
1552 grpc_connected_subchannel_call_get_parent_data(
1553 batch_data->subchannel_call));
1554 // Construct list of closures to execute.
1555 grpc_core::CallCombinerClosureList closures;
1556 // First, add closure for recv_trailing_metadata_ready.
1557 add_closure_for_recv_trailing_metadata_ready(
1558 elem, batch_data, GRPC_ERROR_REF(error), &closures);
1559 // If there are deferred recv_initial_metadata_ready or recv_message_ready
1560 // callbacks, add them to closures.
1561 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1562 // Add closures to fail any pending batches that have not yet been started.
1563 add_closures_to_fail_unstarted_pending_batches(
1564 elem, retry_state, GRPC_ERROR_REF(error), &closures);
1565 // Don't need batch_data anymore.
1566 batch_data_unref(batch_data);
1567 // Schedule all of the closures identified above.
1568 // Note: This will release the call combiner.
1569 closures.RunClosures(calld->call_combiner);
1570 GRPC_ERROR_UNREF(error);
1573 // Intercepts recv_trailing_metadata_ready callback for retries.
1574 // Commits the call and returns the trailing metadata up the stack.
1575 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1576 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1577 grpc_call_element* elem = batch_data->elem;
1578 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1579 call_data* calld = static_cast<call_data*>(elem->call_data);
1580 if (grpc_client_channel_trace.enabled()) {
1582 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1583 chand, calld, grpc_error_string(error));
1585 subchannel_call_retry_state* retry_state =
1586 static_cast<subchannel_call_retry_state*>(
1587 grpc_connected_subchannel_call_get_parent_data(
1588 batch_data->subchannel_call));
1589 retry_state->completed_recv_trailing_metadata = true;
1590 // Get the call's status and check for server pushback metadata.
1591 grpc_status_code status = GRPC_STATUS_OK;
1592 grpc_mdelem* server_pushback_md = nullptr;
1593 grpc_metadata_batch* md_batch =
1594 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
1595 get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
1596 &server_pushback_md);
1597 if (grpc_client_channel_trace.enabled()) {
1598 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1599 calld, grpc_status_code_to_string(status));
1601 // Check if we should retry.
1602 if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1603 // Unref batch_data for deferred recv_initial_metadata_ready or
1604 // recv_message_ready callbacks, if any.
1605 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1606 batch_data_unref(batch_data);
1607 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1609 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
1610 batch_data_unref(batch_data);
1611 GRPC_ERROR_UNREF(retry_state->recv_message_error);
1613 batch_data_unref(batch_data);
1616 // Not retrying, so commit the call.
1617 retry_commit(elem, retry_state);
1618 // Run any necessary closures.
1619 run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
1623 // on_complete callback handling
1626 // Adds the on_complete closure for the pending batch completed in
1627 // batch_data to closures.
1628 static void add_closure_for_completed_pending_batch(
1629 grpc_call_element* elem, subchannel_batch_data* batch_data,
1630 subchannel_call_retry_state* retry_state, grpc_error* error,
1631 grpc_core::CallCombinerClosureList* closures) {
1632 pending_batch* pending = pending_batch_find(
1633 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
1634 // Match the pending batch with the same set of send ops as the
1635 // subchannel batch we've just completed.
1636 return batch->on_complete != nullptr &&
1637 batch_data->batch.send_initial_metadata ==
1638 batch->send_initial_metadata &&
1639 batch_data->batch.send_message == batch->send_message &&
1640 batch_data->batch.send_trailing_metadata ==
1641 batch->send_trailing_metadata;
1643 // If batch_data is a replay batch, then there will be no pending
1644 // batch to complete.
1645 if (pending == nullptr) {
1646 GRPC_ERROR_UNREF(error);
1650 closures->Add(pending->batch->on_complete, error,
1651 "on_complete for pending batch");
1652 pending->batch->on_complete = nullptr;
1653 maybe_clear_pending_batch(elem, pending);
1656 // If there are any cached ops to replay or pending ops to start on the
1657 // subchannel call, adds a closure to closures to invoke
1658 // start_retriable_subchannel_batches().
1659 static void add_closures_for_replay_or_pending_send_ops(
1660 grpc_call_element* elem, subchannel_batch_data* batch_data,
1661 subchannel_call_retry_state* retry_state,
1662 grpc_core::CallCombinerClosureList* closures) {
1663 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1664 call_data* calld = static_cast<call_data*>(elem->call_data);
1665 bool have_pending_send_message_ops =
1666 retry_state->started_send_message_count < calld->send_messages.size();
1667 bool have_pending_send_trailing_metadata_op =
1668 calld->seen_send_trailing_metadata &&
1669 !retry_state->started_send_trailing_metadata;
1670 if (!have_pending_send_message_ops &&
1671 !have_pending_send_trailing_metadata_op) {
1672 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1673 pending_batch* pending = &calld->pending_batches[i];
1674 grpc_transport_stream_op_batch* batch = pending->batch;
1675 if (batch == nullptr || pending->send_ops_cached) continue;
1676 if (batch->send_message) have_pending_send_message_ops = true;
1677 if (batch->send_trailing_metadata) {
1678 have_pending_send_trailing_metadata_op = true;
1682 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1683 if (grpc_client_channel_trace.enabled()) {
1685 "chand=%p calld=%p: starting next batch for pending send op(s)",
1688 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
1689 start_retriable_subchannel_batches, elem,
1690 grpc_schedule_on_exec_ctx);
1691 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
1692 "starting next batch for send_* op(s)");
1696 // Callback used to intercept on_complete from subchannel calls.
1697 // Called only when retries are enabled.
1698 static void on_complete(void* arg, grpc_error* error) {
1699 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1700 grpc_call_element* elem = batch_data->elem;
1701 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1702 call_data* calld = static_cast<call_data*>(elem->call_data);
1703 if (grpc_client_channel_trace.enabled()) {
1704 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
1705 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1706 chand, calld, grpc_error_string(error), batch_str);
1707 gpr_free(batch_str);
1709 subchannel_call_retry_state* retry_state =
1710 static_cast<subchannel_call_retry_state*>(
1711 grpc_connected_subchannel_call_get_parent_data(
1712 batch_data->subchannel_call));
1713 // Update bookkeeping in retry_state.
1714 if (batch_data->batch.send_initial_metadata) {
1715 retry_state->completed_send_initial_metadata = true;
1717 if (batch_data->batch.send_message) {
1718 ++retry_state->completed_send_message_count;
1720 if (batch_data->batch.send_trailing_metadata) {
1721 retry_state->completed_send_trailing_metadata = true;
1723 // If the call is committed, free cached data for send ops that we've just
1725 if (calld->retry_committed) {
1726 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
1728 // Construct list of closures to execute.
1729 grpc_core::CallCombinerClosureList closures;
1730 // If a retry was already dispatched, that means we saw
1731 // recv_trailing_metadata before this, so we do nothing here.
1732 // Otherwise, invoke the callback to return the result to the surface.
1733 if (!retry_state->retry_dispatched) {
1734 // Add closure for the completed pending batch, if any.
1735 add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
1736 GRPC_ERROR_REF(error), &closures);
1737 // If needed, add a callback to start any replay or pending send ops on
1738 // the subchannel call.
1739 if (!retry_state->completed_recv_trailing_metadata) {
1740 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
1744 // Track number of pending subchannel send batches and determine if this
1745 // was the last one.
1746 --calld->num_pending_retriable_subchannel_send_batches;
1747 const bool last_send_batch_complete =
1748 calld->num_pending_retriable_subchannel_send_batches == 0;
1749 // Don't need batch_data anymore.
1750 batch_data_unref(batch_data);
1751 // Schedule all of the closures identified above.
1752 // Note: This yeilds the call combiner.
1753 closures.RunClosures(calld->call_combiner);
1754 // If this was the last subchannel send batch, unref the call stack.
1755 if (last_send_batch_complete) {
1756 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
1761 // subchannel batch construction
1764 // Helper function used to start a subchannel batch in the call combiner.
1765 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
1766 grpc_transport_stream_op_batch* batch =
1767 static_cast<grpc_transport_stream_op_batch*>(arg);
1768 grpc_subchannel_call* subchannel_call =
1769 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1770 // Note: This will release the call combiner.
1771 grpc_subchannel_call_process_op(subchannel_call, batch);
1774 // Adds a closure to closures that will execute batch in the call combiner.
1775 static void add_closure_for_subchannel_batch(
1776 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
1777 grpc_core::CallCombinerClosureList* closures) {
1778 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1779 call_data* calld = static_cast<call_data*>(elem->call_data);
1780 batch->handler_private.extra_arg = calld->subchannel_call;
1781 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1782 start_batch_in_call_combiner, batch,
1783 grpc_schedule_on_exec_ctx);
1784 if (grpc_client_channel_trace.enabled()) {
1785 char* batch_str = grpc_transport_stream_op_batch_string(batch);
1786 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
1788 gpr_free(batch_str);
1790 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1791 "start_subchannel_batch");
1794 // Adds retriable send_initial_metadata op to batch_data.
1795 static void add_retriable_send_initial_metadata_op(
1796 call_data* calld, subchannel_call_retry_state* retry_state,
1797 subchannel_batch_data* batch_data) {
1798 // Maps the number of retries to the corresponding metadata value slice.
1799 static const grpc_slice* retry_count_strings[] = {
1800 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
1801 // We need to make a copy of the metadata batch for each attempt, since
1802 // the filters in the subchannel stack may modify this batch, and we don't
1803 // want those modifications to be passed forward to subsequent attempts.
1805 // If we've already completed one or more attempts, add the
1806 // grpc-retry-attempts header.
1807 retry_state->send_initial_metadata_storage =
1808 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
1809 calld->arena, sizeof(grpc_linked_mdelem) *
1810 (calld->send_initial_metadata.list.count +
1811 (calld->num_attempts_completed > 0))));
1812 grpc_metadata_batch_copy(&calld->send_initial_metadata,
1813 &retry_state->send_initial_metadata,
1814 retry_state->send_initial_metadata_storage);
1815 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
1816 .grpc_previous_rpc_attempts != nullptr)) {
1817 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
1818 retry_state->send_initial_metadata.idx.named
1819 .grpc_previous_rpc_attempts);
1821 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
1822 grpc_mdelem retry_md = grpc_mdelem_create(
1823 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
1824 *retry_count_strings[calld->num_attempts_completed - 1], nullptr);
1825 grpc_error* error = grpc_metadata_batch_add_tail(
1826 &retry_state->send_initial_metadata,
1827 &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
1830 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
1831 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
1832 grpc_error_string(error));
1836 retry_state->started_send_initial_metadata = true;
1837 batch_data->batch.send_initial_metadata = true;
1838 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
1839 &retry_state->send_initial_metadata;
1840 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
1841 calld->send_initial_metadata_flags;
1842 batch_data->batch.payload->send_initial_metadata.peer_string =
1846 // Adds retriable send_message op to batch_data.
1847 static void add_retriable_send_message_op(
1848 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1849 subchannel_batch_data* batch_data) {
1850 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1851 call_data* calld = static_cast<call_data*>(elem->call_data);
1852 if (grpc_client_channel_trace.enabled()) {
1854 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
1855 chand, calld, retry_state->started_send_message_count);
1857 grpc_core::ByteStreamCache* cache =
1858 calld->send_messages[retry_state->started_send_message_count];
1859 ++retry_state->started_send_message_count;
1860 retry_state->send_message.Init(cache);
1861 batch_data->batch.send_message = true;
1862 batch_data->batch.payload->send_message.send_message.reset(
1863 retry_state->send_message.get());
1866 // Adds retriable send_trailing_metadata op to batch_data.
1867 static void add_retriable_send_trailing_metadata_op(
1868 call_data* calld, subchannel_call_retry_state* retry_state,
1869 subchannel_batch_data* batch_data) {
1870 // We need to make a copy of the metadata batch for each attempt, since
1871 // the filters in the subchannel stack may modify this batch, and we don't
1872 // want those modifications to be passed forward to subsequent attempts.
1873 retry_state->send_trailing_metadata_storage =
1874 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
1875 calld->arena, sizeof(grpc_linked_mdelem) *
1876 calld->send_trailing_metadata.list.count));
1877 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
1878 &retry_state->send_trailing_metadata,
1879 retry_state->send_trailing_metadata_storage);
1880 retry_state->started_send_trailing_metadata = true;
1881 batch_data->batch.send_trailing_metadata = true;
1882 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
1883 &retry_state->send_trailing_metadata;
1886 // Adds retriable recv_initial_metadata op to batch_data.
1887 static void add_retriable_recv_initial_metadata_op(
1888 call_data* calld, subchannel_call_retry_state* retry_state,
1889 subchannel_batch_data* batch_data) {
1890 retry_state->started_recv_initial_metadata = true;
1891 batch_data->batch.recv_initial_metadata = true;
1892 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
1893 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
1894 &retry_state->recv_initial_metadata;
1895 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
1896 &retry_state->trailing_metadata_available;
1897 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1898 recv_initial_metadata_ready, batch_data,
1899 grpc_schedule_on_exec_ctx);
1900 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
1901 &retry_state->recv_initial_metadata_ready;
1904 // Adds retriable recv_message op to batch_data.
1905 static void add_retriable_recv_message_op(
1906 call_data* calld, subchannel_call_retry_state* retry_state,
1907 subchannel_batch_data* batch_data) {
1908 ++retry_state->started_recv_message_count;
1909 batch_data->batch.recv_message = true;
1910 batch_data->batch.payload->recv_message.recv_message =
1911 &retry_state->recv_message;
1912 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
1913 batch_data, grpc_schedule_on_exec_ctx);
1914 batch_data->batch.payload->recv_message.recv_message_ready =
1915 &retry_state->recv_message_ready;
1918 // Adds retriable recv_trailing_metadata op to batch_data.
1919 static void add_retriable_recv_trailing_metadata_op(
1920 call_data* calld, subchannel_call_retry_state* retry_state,
1921 subchannel_batch_data* batch_data) {
1922 retry_state->started_recv_trailing_metadata = true;
1923 batch_data->batch.recv_trailing_metadata = true;
1924 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
1925 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
1926 &retry_state->recv_trailing_metadata;
1927 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
1928 &retry_state->collect_stats;
1929 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
1930 recv_trailing_metadata_ready, batch_data,
1931 grpc_schedule_on_exec_ctx);
1932 batch_data->batch.payload->recv_trailing_metadata
1933 .recv_trailing_metadata_ready =
1934 &retry_state->recv_trailing_metadata_ready;
1937 // Helper function used to start a recv_trailing_metadata batch. This
1938 // is used in the case where a recv_initial_metadata or recv_message
1939 // op fails in a way that we know the call is over but when the application
1940 // has not yet started its own recv_trailing_metadata op.
1941 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
1942 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1943 call_data* calld = static_cast<call_data*>(elem->call_data);
1944 if (grpc_client_channel_trace.enabled()) {
1946 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
1947 "started; starting it internally",
1950 subchannel_call_retry_state* retry_state =
1951 static_cast<subchannel_call_retry_state*>(
1952 grpc_connected_subchannel_call_get_parent_data(
1953 calld->subchannel_call));
1954 // Create batch_data with 2 refs, since this batch will be unreffed twice:
1955 // once for the recv_trailing_metadata_ready callback when the subchannel
1956 // batch returns, and again when we actually get a recv_trailing_metadata
1957 // op from the surface.
1958 subchannel_batch_data* batch_data =
1959 batch_data_create(elem, 2, false /* set_on_complete */);
1960 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
1961 retry_state->recv_trailing_metadata_internal_batch = batch_data;
1962 // Note: This will release the call combiner.
1963 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
1966 // If there are any cached send ops that need to be replayed on the
1967 // current subchannel call, creates and returns a new subchannel batch
1968 // to replay those ops. Otherwise, returns nullptr.
1969 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
1970 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
1971 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1972 call_data* calld = static_cast<call_data*>(elem->call_data);
1973 subchannel_batch_data* replay_batch_data = nullptr;
1974 // send_initial_metadata.
1975 if (calld->seen_send_initial_metadata &&
1976 !retry_state->started_send_initial_metadata &&
1977 !calld->pending_send_initial_metadata) {
1978 if (grpc_client_channel_trace.enabled()) {
1980 "chand=%p calld=%p: replaying previously completed "
1981 "send_initial_metadata op",
1984 replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
1985 add_retriable_send_initial_metadata_op(calld, retry_state,
1989 // Note that we can only have one send_message op in flight at a time.
1990 if (retry_state->started_send_message_count < calld->send_messages.size() &&
1991 retry_state->started_send_message_count ==
1992 retry_state->completed_send_message_count &&
1993 !calld->pending_send_message) {
1994 if (grpc_client_channel_trace.enabled()) {
1996 "chand=%p calld=%p: replaying previously completed "
2000 if (replay_batch_data == nullptr) {
2002 batch_data_create(elem, 1, true /* set_on_complete */);
2004 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2006 // send_trailing_metadata.
2007 // Note that we only add this op if we have no more send_message ops
2008 // to start, since we can't send down any more send_message ops after
2009 // send_trailing_metadata.
2010 if (calld->seen_send_trailing_metadata &&
2011 retry_state->started_send_message_count == calld->send_messages.size() &&
2012 !retry_state->started_send_trailing_metadata &&
2013 !calld->pending_send_trailing_metadata) {
2014 if (grpc_client_channel_trace.enabled()) {
2016 "chand=%p calld=%p: replaying previously completed "
2017 "send_trailing_metadata op",
2020 if (replay_batch_data == nullptr) {
2022 batch_data_create(elem, 1, true /* set_on_complete */);
2024 add_retriable_send_trailing_metadata_op(calld, retry_state,
2027 return replay_batch_data;
2030 // Adds subchannel batches for pending batches to batches, updating
2031 // *num_batches as needed.
2032 static void add_subchannel_batches_for_pending_batches(
2033 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2034 grpc_core::CallCombinerClosureList* closures) {
2035 call_data* calld = static_cast<call_data*>(elem->call_data);
2036 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2037 pending_batch* pending = &calld->pending_batches[i];
2038 grpc_transport_stream_op_batch* batch = pending->batch;
2039 if (batch == nullptr) continue;
2040 // Skip any batch that either (a) has already been started on this
2041 // subchannel call or (b) we can't start yet because we're still
2042 // replaying send ops that need to be completed first.
2043 // TODO(roth): Note that if any one op in the batch can't be sent
2044 // yet due to ops that we're replaying, we don't start any of the ops
2045 // in the batch. This is probably okay, but it could conceivably
2046 // lead to increased latency in some cases -- e.g., we could delay
2047 // starting a recv op due to it being in the same batch with a send
2048 // op. If/when we revamp the callback protocol in
2049 // transport_stream_op_batch, we may be able to fix this.
2050 if (batch->send_initial_metadata &&
2051 retry_state->started_send_initial_metadata) {
2054 if (batch->send_message && retry_state->completed_send_message_count <
2055 retry_state->started_send_message_count) {
2058 // Note that we only start send_trailing_metadata if we have no more
2059 // send_message ops to start, since we can't send down any more
2060 // send_message ops after send_trailing_metadata.
2061 if (batch->send_trailing_metadata &&
2062 (retry_state->started_send_message_count + batch->send_message <
2063 calld->send_messages.size() ||
2064 retry_state->started_send_trailing_metadata)) {
2067 if (batch->recv_initial_metadata &&
2068 retry_state->started_recv_initial_metadata) {
2071 if (batch->recv_message && retry_state->completed_recv_message_count <
2072 retry_state->started_recv_message_count) {
2075 if (batch->recv_trailing_metadata &&
2076 retry_state->started_recv_trailing_metadata) {
2077 // If we previously completed a recv_trailing_metadata op
2078 // initiated by start_internal_recv_trailing_metadata(), use the
2079 // result of that instead of trying to re-start this op.
2080 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2082 // If the batch completed, then trigger the completion callback
2083 // directly, so that we return the previously returned results to
2084 // the application. Otherwise, just unref the internally
2085 // started subchannel batch, since we'll propagate the
2086 // completion when it completes.
2087 if (retry_state->completed_recv_trailing_metadata) {
2088 // Batches containing recv_trailing_metadata always succeed.
2090 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
2091 "re-executing recv_trailing_metadata_ready to propagate "
2092 "internally triggered result");
2094 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2096 retry_state->recv_trailing_metadata_internal_batch = nullptr;
2100 // If we're not retrying, just send the batch as-is.
2101 if (calld->method_params == nullptr ||
2102 calld->method_params->retry_policy() == nullptr ||
2103 calld->retry_committed) {
2104 add_closure_for_subchannel_batch(elem, batch, closures);
2105 pending_batch_clear(calld, pending);
2108 // Create batch with the right number of callbacks.
2109 const bool has_send_ops = batch->send_initial_metadata ||
2110 batch->send_message ||
2111 batch->send_trailing_metadata;
2112 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2113 batch->recv_message +
2114 batch->recv_trailing_metadata;
2115 subchannel_batch_data* batch_data = batch_data_create(
2116 elem, num_callbacks, has_send_ops /* set_on_complete */);
2117 // Cache send ops if needed.
2118 maybe_cache_send_ops_for_batch(calld, pending);
2119 // send_initial_metadata.
2120 if (batch->send_initial_metadata) {
2121 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2124 if (batch->send_message) {
2125 add_retriable_send_message_op(elem, retry_state, batch_data);
2127 // send_trailing_metadata.
2128 if (batch->send_trailing_metadata) {
2129 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2131 // recv_initial_metadata.
2132 if (batch->recv_initial_metadata) {
2133 // recv_flags is only used on the server side.
2134 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2135 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2138 if (batch->recv_message) {
2139 add_retriable_recv_message_op(calld, retry_state, batch_data);
2141 // recv_trailing_metadata.
2142 if (batch->recv_trailing_metadata) {
2143 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2145 add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
2146 // Track number of pending subchannel send batches.
2147 // If this is the first one, take a ref to the call stack.
2148 if (batch->send_initial_metadata || batch->send_message ||
2149 batch->send_trailing_metadata) {
2150 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2151 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2153 ++calld->num_pending_retriable_subchannel_send_batches;
2158 // Constructs and starts whatever subchannel batches are needed on the
2160 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2161 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2162 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2163 call_data* calld = static_cast<call_data*>(elem->call_data);
2164 if (grpc_client_channel_trace.enabled()) {
2165 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
2168 subchannel_call_retry_state* retry_state =
2169 static_cast<subchannel_call_retry_state*>(
2170 grpc_connected_subchannel_call_get_parent_data(
2171 calld->subchannel_call));
2172 // Construct list of closures to execute, one for each pending batch.
2173 grpc_core::CallCombinerClosureList closures;
2174 // Replay previously-returned send_* ops if needed.
2175 subchannel_batch_data* replay_batch_data =
2176 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2177 if (replay_batch_data != nullptr) {
2178 add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2180 // Track number of pending subchannel send batches.
2181 // If this is the first one, take a ref to the call stack.
2182 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2183 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2185 ++calld->num_pending_retriable_subchannel_send_batches;
2187 // Now add pending batches.
2188 add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
2189 // Start batches on subchannel call.
2190 if (grpc_client_channel_trace.enabled()) {
2192 "chand=%p calld=%p: starting %" PRIuPTR
2193 " retriable batches on subchannel_call=%p",
2194 chand, calld, closures.size(), calld->subchannel_call);
2196 // Note: This will yield the call combiner.
2197 closures.RunClosures(calld->call_combiner);
2204 static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2205 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2206 call_data* calld = static_cast<call_data*>(elem->call_data);
2207 const size_t parent_data_size =
2208 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
2209 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
2210 calld->pollent, // pollent
2211 calld->path, // path
2212 calld->call_start_time, // start_time
2213 calld->deadline, // deadline
2214 calld->arena, // arena
2215 calld->request->pick()->subchannel_call_context, // context
2216 calld->call_combiner, // call_combiner
2217 parent_data_size // parent_data_size
2219 grpc_error* new_error =
2220 calld->request->pick()->connected_subchannel->CreateCall(
2221 call_args, &calld->subchannel_call);
2222 if (grpc_client_channel_trace.enabled()) {
2223 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
2224 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
2226 if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
2227 new_error = grpc_error_add_child(new_error, error);
2228 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2230 if (parent_data_size > 0) {
2231 new (grpc_connected_subchannel_call_get_parent_data(
2232 calld->subchannel_call))
2233 subchannel_call_retry_state(
2234 calld->request->pick()->subchannel_call_context);
2236 pending_batches_resume(elem);
2238 GRPC_ERROR_UNREF(error);
2241 // Invoked when a pick is completed, on both success or failure.
2242 static void pick_done(void* arg, grpc_error* error) {
2243 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2244 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2245 call_data* calld = static_cast<call_data*>(elem->call_data);
2246 if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) {
2247 // Failed to create subchannel.
2248 // If there was no error, this is an LB policy drop, in which case
2249 // we return an error; otherwise, we may retry.
2250 grpc_status_code status = GRPC_STATUS_OK;
2251 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2253 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2254 !maybe_retry(elem, nullptr /* batch_data */, status,
2255 nullptr /* server_pushback_md */)) {
2256 grpc_error* new_error =
2257 error == GRPC_ERROR_NONE
2258 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2259 "Call dropped by load balancing policy")
2260 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2261 "Failed to create subchannel", &error, 1);
2262 if (grpc_client_channel_trace.enabled()) {
2264 "chand=%p calld=%p: failed to create subchannel: error=%s",
2265 chand, calld, grpc_error_string(new_error));
2267 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2270 /* Create call on subchannel. */
2271 create_subchannel_call(elem, GRPC_ERROR_REF(error));
2275 // If the channel is in TRANSIENT_FAILURE and the call is not
2276 // wait_for_ready=true, fails the call and returns true.
2277 static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
2278 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2279 call_data* calld = static_cast<call_data*>(elem->call_data);
2280 grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
2281 if (chand->request_router->GetConnectivityState() ==
2282 GRPC_CHANNEL_TRANSIENT_FAILURE &&
2283 (batch->payload->send_initial_metadata.send_initial_metadata_flags &
2284 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
2285 pending_batches_fail(
2287 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2288 "channel is in state TRANSIENT_FAILURE"),
2289 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
2290 true /* yield_call_combiner */);
2296 // Applies service config to the call. Must be invoked once we know
2297 // that the resolver has returned results to the channel.
2298 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2299 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2300 call_data* calld = static_cast<call_data*>(elem->call_data);
2301 if (grpc_client_channel_trace.enabled()) {
2302 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2305 if (chand->retry_throttle_data != nullptr) {
2306 calld->retry_throttle_data = chand->retry_throttle_data->Ref();
2308 if (chand->method_params_table != nullptr) {
2309 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2310 *chand->method_params_table, calld->path);
2311 if (calld->method_params != nullptr) {
2312 // If the deadline from the service config is shorter than the one
2313 // from the client API, reset the deadline timer.
2314 if (chand->deadline_checking_enabled &&
2315 calld->method_params->timeout() != 0) {
2316 const grpc_millis per_method_deadline =
2317 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2318 calld->method_params->timeout();
2319 if (per_method_deadline < calld->deadline) {
2320 calld->deadline = per_method_deadline;
2321 grpc_deadline_state_reset(elem, calld->deadline);
2324 // If the service config set wait_for_ready and the application
2325 // did not explicitly set it, use the value from the service config.
2326 uint32_t* send_initial_metadata_flags =
2327 &calld->pending_batches[0]
2328 .batch->payload->send_initial_metadata
2329 .send_initial_metadata_flags;
2331 calld->method_params->wait_for_ready() !=
2332 ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2333 !(*send_initial_metadata_flags &
2334 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2335 if (calld->method_params->wait_for_ready() ==
2336 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2337 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2339 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2344 // If no retry policy, disable retries.
2345 // TODO(roth): Remove this when adding support for transparent retries.
2346 if (calld->method_params == nullptr ||
2347 calld->method_params->retry_policy() == nullptr) {
2348 calld->enable_retries = false;
2352 // Invoked once resolver results are available.
2353 static bool maybe_apply_service_config_to_call_locked(void* arg) {
2354 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2355 call_data* calld = static_cast<call_data*>(elem->call_data);
2356 // Only get service config data on the first attempt.
2357 if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
2358 apply_service_config_to_call_locked(elem);
2359 // Check this after applying service config, since it may have
2360 // affected the call's wait_for_ready value.
2361 if (fail_call_if_in_transient_failure(elem)) return false;
2366 static void start_pick_locked(void* arg, grpc_error* ignored) {
2367 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2368 call_data* calld = static_cast<call_data*>(elem->call_data);
2369 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2370 GPR_ASSERT(!calld->have_request);
2371 GPR_ASSERT(calld->subchannel_call == nullptr);
2372 // Normally, we want to do this check until after we've processed the
2373 // service config, so that we can honor the wait_for_ready setting in
2374 // the service config. However, if the channel is in TRANSIENT_FAILURE
2375 // and we don't have an LB policy at this point, that means that the
2376 // resolver has returned a failure, so we're not going to get a service
2377 // config right away. In that case, we fail the call now based on the
2378 // wait_for_ready value passed in from the application.
2379 if (chand->request_router->lb_policy() == nullptr &&
2380 fail_call_if_in_transient_failure(elem)) {
2383 // If this is a retry, use the send_initial_metadata payload that
2384 // we've cached; otherwise, use the pending batch. The
2385 // send_initial_metadata batch will be the first pending batch in the
2386 // list, as set by get_batch_index() above.
2387 // TODO(roth): What if the LB policy needs to add something to the
2388 // call's initial metadata, and then there's a retry? We don't want
2389 // the new metadata to be added twice. We might need to somehow
2390 // allocate the subchannel batch earlier so that we can give the
2391 // subchannel's copy of the metadata batch (which is copied for each
2392 // attempt) to the LB policy instead the one from the parent channel.
2393 grpc_metadata_batch* initial_metadata =
2394 calld->seen_send_initial_metadata
2395 ? &calld->send_initial_metadata
2396 : calld->pending_batches[0]
2397 .batch->payload->send_initial_metadata.send_initial_metadata;
2398 uint32_t* initial_metadata_flags =
2399 calld->seen_send_initial_metadata
2400 ? &calld->send_initial_metadata_flags
2401 : &calld->pending_batches[0]
2402 .batch->payload->send_initial_metadata
2403 .send_initial_metadata_flags;
2404 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2405 grpc_schedule_on_exec_ctx);
2406 calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent,
2407 initial_metadata, initial_metadata_flags,
2408 maybe_apply_service_config_to_call_locked, elem,
2409 &calld->pick_closure);
2410 calld->have_request = true;
2411 chand->request_router->RouteCallLocked(calld->request.get());
2415 // filter call vtable functions
2418 static void cc_start_transport_stream_op_batch(
2419 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2420 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2421 call_data* calld = static_cast<call_data*>(elem->call_data);
2422 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2423 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
2424 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2426 // If we've previously been cancelled, immediately fail any new batches.
2427 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
2428 if (grpc_client_channel_trace.enabled()) {
2429 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2430 chand, calld, grpc_error_string(calld->cancel_error));
2432 // Note: This will release the call combiner.
2433 grpc_transport_stream_op_batch_finish_with_failure(
2434 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2437 // Handle cancellation.
2438 if (GPR_UNLIKELY(batch->cancel_stream)) {
2439 // Stash a copy of cancel_error in our call data, so that we can use
2440 // it for subsequent operations. This ensures that if the call is
2441 // cancelled before any batches are passed down (e.g., if the deadline
2442 // is in the past when the call starts), we can return the right
2443 // error to the caller when the first batch does get passed down.
2444 GRPC_ERROR_UNREF(calld->cancel_error);
2445 calld->cancel_error =
2446 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2447 if (grpc_client_channel_trace.enabled()) {
2448 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2449 calld, grpc_error_string(calld->cancel_error));
2451 // If we do not have a subchannel call (i.e., a pick has not yet
2452 // been started), fail all pending batches. Otherwise, send the
2453 // cancellation down to the subchannel call.
2454 if (calld->subchannel_call == nullptr) {
2455 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
2456 false /* yield_call_combiner */);
2457 // Note: This will release the call combiner.
2458 grpc_transport_stream_op_batch_finish_with_failure(
2459 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2461 // Note: This will release the call combiner.
2462 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
2466 // Add the batch to the pending list.
2467 pending_batches_add(elem, batch);
2468 // Check if we've already gotten a subchannel call.
2469 // Note that once we have completed the pick, we do not need to enter
2470 // the channel combiner, which is more efficient (especially for
2471 // streaming calls).
2472 if (calld->subchannel_call != nullptr) {
2473 if (grpc_client_channel_trace.enabled()) {
2475 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
2476 calld, calld->subchannel_call);
2478 pending_batches_resume(elem);
2481 // We do not yet have a subchannel call.
2482 // For batches containing a send_initial_metadata op, enter the channel
2483 // combiner to start a pick.
2484 if (GPR_LIKELY(batch->send_initial_metadata)) {
2485 if (grpc_client_channel_trace.enabled()) {
2486 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
2490 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
2491 elem, grpc_combiner_scheduler(chand->combiner)),
2494 // For all other batches, release the call combiner.
2495 if (grpc_client_channel_trace.enabled()) {
2497 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2500 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2501 "batch does not include send_initial_metadata");
2505 /* Constructor for call_data */
2506 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
2507 const grpc_call_element_args* args) {
2508 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2509 new (elem->call_data) call_data(elem, *chand, *args);
2510 return GRPC_ERROR_NONE;
2513 /* Destructor for call_data */
2514 static void cc_destroy_call_elem(grpc_call_element* elem,
2515 const grpc_call_final_info* final_info,
2516 grpc_closure* then_schedule_closure) {
2517 call_data* calld = static_cast<call_data*>(elem->call_data);
2518 if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
2519 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
2520 then_schedule_closure);
2521 then_schedule_closure = nullptr;
2523 calld->~call_data();
2524 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
2527 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
2528 grpc_polling_entity* pollent) {
2529 call_data* calld = static_cast<call_data*>(elem->call_data);
2530 calld->pollent = pollent;
2533 /*************************************************************************
2537 const grpc_channel_filter grpc_client_channel_filter = {
2538 cc_start_transport_stream_op_batch,
2539 cc_start_transport_op,
2542 cc_set_pollset_or_pollset_set,
2543 cc_destroy_call_elem,
2544 sizeof(channel_data),
2545 cc_init_channel_elem,
2546 cc_destroy_channel_elem,
2547 cc_get_channel_info,
2551 void grpc_client_channel_set_channelz_node(
2552 grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
2553 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2554 chand->request_router->set_channelz_node(node);
2557 void grpc_client_channel_populate_child_refs(
2558 grpc_channel_element* elem,
2559 grpc_core::channelz::ChildRefsList* child_subchannels,
2560 grpc_core::channelz::ChildRefsList* child_channels) {
2561 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2562 if (chand->request_router->lb_policy() != nullptr) {
2563 chand->request_router->lb_policy()->FillChildRefsForChannelz(
2564 child_subchannels, child_channels);
2568 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
2569 channel_data* chand = static_cast<channel_data*>(arg);
2570 chand->request_router->ExitIdleLocked();
2571 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
2574 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
2575 grpc_channel_element* elem, int try_to_connect) {
2576 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2577 grpc_connectivity_state out = chand->request_router->GetConnectivityState();
2578 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2579 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
2581 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
2582 grpc_combiner_scheduler(chand->combiner)),
2588 typedef struct external_connectivity_watcher {
2589 channel_data* chand;
2590 grpc_polling_entity pollent;
2591 grpc_closure* on_complete;
2592 grpc_closure* watcher_timer_init;
2593 grpc_connectivity_state* state;
2594 grpc_closure my_closure;
2595 struct external_connectivity_watcher* next;
2596 } external_connectivity_watcher;
2598 static external_connectivity_watcher* lookup_external_connectivity_watcher(
2599 channel_data* chand, grpc_closure* on_complete) {
2600 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2601 external_connectivity_watcher* w =
2602 chand->external_connectivity_watcher_list_head;
2603 while (w != nullptr && w->on_complete != on_complete) {
2606 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2610 static void external_connectivity_watcher_list_append(
2611 channel_data* chand, external_connectivity_watcher* w) {
2612 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
2614 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
2615 GPR_ASSERT(!w->next);
2616 w->next = chand->external_connectivity_watcher_list_head;
2617 chand->external_connectivity_watcher_list_head = w;
2618 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
2621 static void external_connectivity_watcher_list_remove(
2622 channel_data* chand, external_connectivity_watcher* to_remove) {
2624 lookup_external_connectivity_watcher(chand, to_remove->on_complete));
2625 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2626 if (to_remove == chand->external_connectivity_watcher_list_head) {
2627 chand->external_connectivity_watcher_list_head = to_remove->next;
2628 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2631 external_connectivity_watcher* w =
2632 chand->external_connectivity_watcher_list_head;
2633 while (w != nullptr) {
2634 if (w->next == to_remove) {
2635 w->next = w->next->next;
2636 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2641 GPR_UNREACHABLE_CODE(return );
2644 int grpc_client_channel_num_external_connectivity_watchers(
2645 grpc_channel_element* elem) {
2646 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2649 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2650 external_connectivity_watcher* w =
2651 chand->external_connectivity_watcher_list_head;
2652 while (w != nullptr) {
2656 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2661 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
2662 external_connectivity_watcher* w =
2663 static_cast<external_connectivity_watcher*>(arg);
2664 grpc_closure* follow_up = w->on_complete;
2665 grpc_polling_entity_del_from_pollset_set(&w->pollent,
2666 w->chand->interested_parties);
2667 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
2668 "external_connectivity_watcher");
2669 external_connectivity_watcher_list_remove(w->chand, w);
2671 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
2674 static void watch_connectivity_state_locked(void* arg,
2675 grpc_error* error_ignored) {
2676 external_connectivity_watcher* w =
2677 static_cast<external_connectivity_watcher*>(arg);
2678 external_connectivity_watcher* found = nullptr;
2679 if (w->state != nullptr) {
2680 external_connectivity_watcher_list_append(w->chand, w);
2681 // An assumption is being made that the closure is scheduled on the exec ctx
2682 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
2683 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
2684 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
2685 grpc_combiner_scheduler(w->chand->combiner));
2686 w->chand->request_router->NotifyOnConnectivityStateChange(w->state,
2689 GPR_ASSERT(w->watcher_timer_init == nullptr);
2690 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
2692 GPR_ASSERT(found->on_complete == w->on_complete);
2693 found->chand->request_router->NotifyOnConnectivityStateChange(
2694 nullptr, &found->my_closure);
2696 grpc_polling_entity_del_from_pollset_set(&w->pollent,
2697 w->chand->interested_parties);
2698 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
2699 "external_connectivity_watcher");
2704 void grpc_client_channel_watch_connectivity_state(
2705 grpc_channel_element* elem, grpc_polling_entity pollent,
2706 grpc_connectivity_state* state, grpc_closure* closure,
2707 grpc_closure* watcher_timer_init) {
2708 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2709 external_connectivity_watcher* w =
2710 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
2712 w->pollent = pollent;
2713 w->on_complete = closure;
2715 w->watcher_timer_init = watcher_timer_init;
2716 grpc_polling_entity_add_to_pollset_set(&w->pollent,
2717 chand->interested_parties);
2718 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
2719 "external_connectivity_watcher");
2721 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
2722 grpc_combiner_scheduler(chand->combiner)),
2726 grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
2727 grpc_call_element* elem) {
2728 call_data* calld = static_cast<call_data*>(elem->call_data);
2729 return calld->subchannel_call;