Imported Upstream version 1.18.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / client_channel.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
38 #include "src/core/ext/filters/client_channel/request_routing.h"
39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
41 #include "src/core/ext/filters/client_channel/retry_throttle.h"
42 #include "src/core/ext/filters/client_channel/subchannel.h"
43 #include "src/core/ext/filters/deadline/deadline_filter.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/connected_channel.h"
47 #include "src/core/lib/channel/status_util.h"
48 #include "src/core/lib/gpr/string.h"
49 #include "src/core/lib/gprpp/inlined_vector.h"
50 #include "src/core/lib/gprpp/manual_constructor.h"
51 #include "src/core/lib/iomgr/combiner.h"
52 #include "src/core/lib/iomgr/iomgr.h"
53 #include "src/core/lib/iomgr/polling_entity.h"
54 #include "src/core/lib/profiling/timers.h"
55 #include "src/core/lib/slice/slice_internal.h"
56 #include "src/core/lib/slice/slice_string_helpers.h"
57 #include "src/core/lib/surface/channel.h"
58 #include "src/core/lib/transport/connectivity_state.h"
59 #include "src/core/lib/transport/error_utils.h"
60 #include "src/core/lib/transport/metadata.h"
61 #include "src/core/lib/transport/metadata_batch.h"
62 #include "src/core/lib/transport/service_config.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 #include "src/core/lib/transport/status_metadata.h"
65
66 using grpc_core::internal::ClientChannelMethodParams;
67 using grpc_core::internal::ClientChannelMethodParamsTable;
68 using grpc_core::internal::ProcessedResolverResult;
69 using grpc_core::internal::ServerRetryThrottleData;
70
71 /* Client channel implementation */
72
73 // By default, we buffer 256 KiB per RPC for retries.
74 // TODO(roth): Do we have any data to suggest a better value?
75 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
76
77 // This value was picked arbitrarily.  It can be changed if there is
78 // any even moderately compelling reason to do so.
79 #define RETRY_BACKOFF_JITTER 0.2
80
81 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
82
83 /*************************************************************************
84  * CHANNEL-WIDE FUNCTIONS
85  */
86
87 struct external_connectivity_watcher;
88
89 typedef struct client_channel_channel_data {
90   grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router;
91
92   bool deadline_checking_enabled;
93   bool enable_retries;
94   size_t per_rpc_retry_buffer_size;
95
96   /** combiner protecting all variables below in this data structure */
97   grpc_combiner* combiner;
98   /** retry throttle data */
99   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
100   /** maps method names to method_parameters structs */
101   grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
102   /** owning stack */
103   grpc_channel_stack* owning_stack;
104   /** interested parties (owned) */
105   grpc_pollset_set* interested_parties;
106
107   /* external_connectivity_watcher_list head is guarded by its own mutex, since
108    * counts need to be grabbed immediately without polling on a cq */
109   gpr_mu external_connectivity_watcher_list_mu;
110   struct external_connectivity_watcher* external_connectivity_watcher_list_head;
111
112   /* the following properties are guarded by a mutex since APIs require them
113      to be instantaneously available */
114   gpr_mu info_mu;
115   grpc_core::UniquePtr<char> info_lb_policy_name;
116   /** service config in JSON form */
117   grpc_core::UniquePtr<char> info_service_config_json;
118 } channel_data;
119
120 // Synchronous callback from chand->request_router to process a resolver
121 // result update.
122 static bool process_resolver_result_locked(void* arg,
123                                            const grpc_channel_args& args,
124                                            const char** lb_policy_name,
125                                            grpc_json** lb_policy_config) {
126   channel_data* chand = static_cast<channel_data*>(arg);
127   ProcessedResolverResult resolver_result(args, chand->enable_retries);
128   grpc_core::UniquePtr<char> service_config_json =
129       resolver_result.service_config_json();
130   if (grpc_client_channel_trace.enabled()) {
131     gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
132             chand, service_config_json.get());
133   }
134   // Update channel state.
135   chand->retry_throttle_data = resolver_result.retry_throttle_data();
136   chand->method_params_table = resolver_result.method_params_table();
137   // Swap out the data used by cc_get_channel_info().
138   gpr_mu_lock(&chand->info_mu);
139   chand->info_lb_policy_name = resolver_result.lb_policy_name();
140   const bool service_config_changed =
141       ((service_config_json == nullptr) !=
142        (chand->info_service_config_json == nullptr)) ||
143       (service_config_json != nullptr &&
144        strcmp(service_config_json.get(),
145               chand->info_service_config_json.get()) != 0);
146   chand->info_service_config_json = std::move(service_config_json);
147   gpr_mu_unlock(&chand->info_mu);
148   // Return results.
149   *lb_policy_name = chand->info_lb_policy_name.get();
150   *lb_policy_config = resolver_result.lb_policy_config();
151   return service_config_changed;
152 }
153
154 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
155   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
156   grpc_channel_element* elem =
157       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
158   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
159
160   if (op->on_connectivity_state_change != nullptr) {
161     chand->request_router->NotifyOnConnectivityStateChange(
162         op->connectivity_state, op->on_connectivity_state_change);
163     op->on_connectivity_state_change = nullptr;
164     op->connectivity_state = nullptr;
165   }
166
167   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
168     if (chand->request_router->lb_policy() == nullptr) {
169       grpc_error* error =
170           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
171       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
172       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
173     } else {
174       grpc_error* error = GRPC_ERROR_NONE;
175       grpc_core::LoadBalancingPolicy::PickState pick_state;
176       // Pick must return synchronously, because pick_state.on_complete is null.
177       GPR_ASSERT(
178           chand->request_router->lb_policy()->PickLocked(&pick_state, &error));
179       if (pick_state.connected_subchannel != nullptr) {
180         pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
181                                               op->send_ping.on_ack);
182       } else {
183         if (error == GRPC_ERROR_NONE) {
184           error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
185               "LB policy dropped call on ping");
186         }
187         GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
188         GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
189       }
190       op->bind_pollset = nullptr;
191     }
192     op->send_ping.on_initiate = nullptr;
193     op->send_ping.on_ack = nullptr;
194   }
195
196   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
197     chand->request_router->ShutdownLocked(op->disconnect_with_error);
198   }
199
200   if (op->reset_connect_backoff) {
201     chand->request_router->ResetConnectionBackoffLocked();
202   }
203
204   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
205   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
206 }
207
208 static void cc_start_transport_op(grpc_channel_element* elem,
209                                   grpc_transport_op* op) {
210   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
211
212   GPR_ASSERT(op->set_accept_stream == false);
213   if (op->bind_pollset != nullptr) {
214     grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
215   }
216
217   op->handler_private.extra_arg = elem;
218   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
219   GRPC_CLOSURE_SCHED(
220       GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
221                         op, grpc_combiner_scheduler(chand->combiner)),
222       GRPC_ERROR_NONE);
223 }
224
225 static void cc_get_channel_info(grpc_channel_element* elem,
226                                 const grpc_channel_info* info) {
227   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
228   gpr_mu_lock(&chand->info_mu);
229   if (info->lb_policy_name != nullptr) {
230     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
231   }
232   if (info->service_config_json != nullptr) {
233     *info->service_config_json =
234         gpr_strdup(chand->info_service_config_json.get());
235   }
236   gpr_mu_unlock(&chand->info_mu);
237 }
238
239 /* Constructor for channel_data */
240 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
241                                         grpc_channel_element_args* args) {
242   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
243   GPR_ASSERT(args->is_last);
244   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
245   // Initialize data members.
246   chand->combiner = grpc_combiner_create();
247   gpr_mu_init(&chand->info_mu);
248   gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
249
250   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
251   chand->external_connectivity_watcher_list_head = nullptr;
252   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
253
254   chand->owning_stack = args->channel_stack;
255   chand->deadline_checking_enabled =
256       grpc_deadline_checking_enabled(args->channel_args);
257   chand->interested_parties = grpc_pollset_set_create();
258   grpc_client_channel_start_backup_polling(chand->interested_parties);
259   // Record max per-RPC retry buffer size.
260   const grpc_arg* arg = grpc_channel_args_find(
261       args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
262   chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
263       arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
264   // Record enable_retries.
265   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
266   chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
267   // Record client channel factory.
268   arg = grpc_channel_args_find(args->channel_args,
269                                GRPC_ARG_CLIENT_CHANNEL_FACTORY);
270   if (arg == nullptr) {
271     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
272         "Missing client channel factory in args for client channel filter");
273   }
274   if (arg->type != GRPC_ARG_POINTER) {
275     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
276         "client channel factory arg must be a pointer");
277   }
278   grpc_client_channel_factory* client_channel_factory =
279       static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
280   // Get server name to resolve, using proxy mapper if needed.
281   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
282   if (arg == nullptr) {
283     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
284         "Missing server uri in args for client channel filter");
285   }
286   if (arg->type != GRPC_ARG_STRING) {
287     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
288         "server uri arg must be a string");
289   }
290   char* proxy_name = nullptr;
291   grpc_channel_args* new_args = nullptr;
292   grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
293                               &proxy_name, &new_args);
294   // Instantiate request router.
295   grpc_client_channel_factory_ref(client_channel_factory);
296   grpc_error* error = GRPC_ERROR_NONE;
297   chand->request_router.Init(
298       chand->owning_stack, chand->combiner, client_channel_factory,
299       chand->interested_parties, &grpc_client_channel_trace,
300       process_resolver_result_locked, chand,
301       proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */,
302       new_args != nullptr ? new_args : args->channel_args, &error);
303   gpr_free(proxy_name);
304   grpc_channel_args_destroy(new_args);
305   return error;
306 }
307
308 /* Destructor for channel_data */
309 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
310   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
311   chand->request_router.Destroy();
312   // TODO(roth): Once we convert the filter API to C++, there will no
313   // longer be any need to explicitly reset these smart pointer data members.
314   chand->info_lb_policy_name.reset();
315   chand->info_service_config_json.reset();
316   chand->retry_throttle_data.reset();
317   chand->method_params_table.reset();
318   grpc_client_channel_stop_backup_polling(chand->interested_parties);
319   grpc_pollset_set_destroy(chand->interested_parties);
320   GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
321   gpr_mu_destroy(&chand->info_mu);
322   gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
323 }
324
325 /*************************************************************************
326  * PER-CALL FUNCTIONS
327  */
328
329 // Max number of batches that can be pending on a call at any given
330 // time.  This includes one batch for each of the following ops:
331 //   recv_initial_metadata
332 //   send_initial_metadata
333 //   recv_message
334 //   send_message
335 //   recv_trailing_metadata
336 //   send_trailing_metadata
337 #define MAX_PENDING_BATCHES 6
338
339 // Retry support:
340 //
341 // In order to support retries, we act as a proxy for stream op batches.
342 // When we get a batch from the surface, we add it to our list of pending
343 // batches, and we then use those batches to construct separate "child"
344 // batches to be started on the subchannel call.  When the child batches
345 // return, we then decide which pending batches have been completed and
346 // schedule their callbacks accordingly.  If a subchannel call fails and
347 // we want to retry it, we do a new pick and start again, constructing
348 // new "child" batches for the new subchannel call.
349 //
350 // Note that retries are committed when receiving data from the server
351 // (except for Trailers-Only responses).  However, there may be many
352 // send ops started before receiving any data, so we may have already
353 // completed some number of send ops (and returned the completions up to
354 // the surface) by the time we realize that we need to retry.  To deal
355 // with this, we cache data for send ops, so that we can replay them on a
356 // different subchannel call even after we have completed the original
357 // batches.
358 //
359 // There are two sets of data to maintain:
360 // - In call_data (in the parent channel), we maintain a list of pending
361 //   ops and cached data for send ops.
362 // - In the subchannel call, we maintain state to indicate what ops have
363 //   already been sent down to that call.
364 //
365 // When constructing the "child" batches, we compare those two sets of
366 // data to see which batches need to be sent to the subchannel call.
367
368 // TODO(roth): In subsequent PRs:
369 // - add support for transparent retries (including initial metadata)
370 // - figure out how to record stats in census for retries
371 //   (census filter is on top of this one)
372 // - add census stats for retries
373
374 namespace {
375
376 struct call_data;
377
378 // State used for starting a retryable batch on a subchannel call.
379 // This provides its own grpc_transport_stream_op_batch and other data
380 // structures needed to populate the ops in the batch.
381 // We allocate one struct on the arena for each attempt at starting a
382 // batch on a given subchannel call.
383 struct subchannel_batch_data {
384   subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount,
385                         bool set_on_complete);
386   // All dtor code must be added in `destroy`. This is because we may
387   // call closures in `subchannel_batch_data` after they are unrefed by
388   // `batch_data_unref`, and msan would complain about accessing this class
389   // after calling dtor. As a result we cannot call the `dtor` in
390   // `batch_data_unref`.
391   // TODO(soheil): We should try to call the dtor in `batch_data_unref`.
392   ~subchannel_batch_data() { destroy(); }
393   void destroy();
394
395   gpr_refcount refs;
396   grpc_call_element* elem;
397   grpc_subchannel_call* subchannel_call;  // Holds a ref.
398   // The batch to use in the subchannel call.
399   // Its payload field points to subchannel_call_retry_state.batch_payload.
400   grpc_transport_stream_op_batch batch;
401   // For intercepting on_complete.
402   grpc_closure on_complete;
403 };
404
405 // Retry state associated with a subchannel call.
406 // Stored in the parent_data of the subchannel call object.
407 struct subchannel_call_retry_state {
408   explicit subchannel_call_retry_state(grpc_call_context_element* context)
409       : batch_payload(context),
410         started_send_initial_metadata(false),
411         completed_send_initial_metadata(false),
412         started_send_trailing_metadata(false),
413         completed_send_trailing_metadata(false),
414         started_recv_initial_metadata(false),
415         completed_recv_initial_metadata(false),
416         started_recv_trailing_metadata(false),
417         completed_recv_trailing_metadata(false),
418         retry_dispatched(false) {}
419
420   // subchannel_batch_data.batch.payload points to this.
421   grpc_transport_stream_op_batch_payload batch_payload;
422   // For send_initial_metadata.
423   // Note that we need to make a copy of the initial metadata for each
424   // subchannel call instead of just referring to the copy in call_data,
425   // because filters in the subchannel stack will probably add entries,
426   // so we need to start in a pristine state for each attempt of the call.
427   grpc_linked_mdelem* send_initial_metadata_storage;
428   grpc_metadata_batch send_initial_metadata;
429   // For send_message.
430   grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
431       send_message;
432   // For send_trailing_metadata.
433   grpc_linked_mdelem* send_trailing_metadata_storage;
434   grpc_metadata_batch send_trailing_metadata;
435   // For intercepting recv_initial_metadata.
436   grpc_metadata_batch recv_initial_metadata;
437   grpc_closure recv_initial_metadata_ready;
438   bool trailing_metadata_available = false;
439   // For intercepting recv_message.
440   grpc_closure recv_message_ready;
441   grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
442   // For intercepting recv_trailing_metadata.
443   grpc_metadata_batch recv_trailing_metadata;
444   grpc_transport_stream_stats collect_stats;
445   grpc_closure recv_trailing_metadata_ready;
446   // These fields indicate which ops have been started and completed on
447   // this subchannel call.
448   size_t started_send_message_count = 0;
449   size_t completed_send_message_count = 0;
450   size_t started_recv_message_count = 0;
451   size_t completed_recv_message_count = 0;
452   bool started_send_initial_metadata : 1;
453   bool completed_send_initial_metadata : 1;
454   bool started_send_trailing_metadata : 1;
455   bool completed_send_trailing_metadata : 1;
456   bool started_recv_initial_metadata : 1;
457   bool completed_recv_initial_metadata : 1;
458   bool started_recv_trailing_metadata : 1;
459   bool completed_recv_trailing_metadata : 1;
460   // State for callback processing.
461   subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
462   grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
463   subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
464   grpc_error* recv_message_error = GRPC_ERROR_NONE;
465   subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
466   // NOTE: Do not move this next to the metadata bitfields above. That would
467   //       save space but will also result in a data race because compiler will
468   //       generate a 2 byte store which overwrites the meta-data fields upon
469   //       setting this field.
470   bool retry_dispatched : 1;
471 };
472
473 // Pending batches stored in call data.
474 struct pending_batch {
475   // The pending batch.  If nullptr, this slot is empty.
476   grpc_transport_stream_op_batch* batch;
477   // Indicates whether payload for send ops has been cached in call data.
478   bool send_ops_cached;
479 };
480
481 /** Call data.  Holds a pointer to grpc_subchannel_call and the
482     associated machinery to create such a pointer.
483     Handles queueing of stream ops until a call object is ready, waiting
484     for initial metadata before trying to create a call object,
485     and handling cancellation gracefully. */
486 struct call_data {
487   call_data(grpc_call_element* elem, const channel_data& chand,
488             const grpc_call_element_args& args)
489       : deadline_state(elem, args.call_stack, args.call_combiner,
490                        GPR_LIKELY(chand.deadline_checking_enabled)
491                            ? args.deadline
492                            : GRPC_MILLIS_INF_FUTURE),
493         path(grpc_slice_ref_internal(args.path)),
494         call_start_time(args.start_time),
495         deadline(args.deadline),
496         arena(args.arena),
497         owning_call(args.call_stack),
498         call_combiner(args.call_combiner),
499         pending_send_initial_metadata(false),
500         pending_send_message(false),
501         pending_send_trailing_metadata(false),
502         enable_retries(chand.enable_retries),
503         retry_committed(false),
504         last_attempt_got_server_pushback(false) {}
505
506   ~call_data() {
507     if (GPR_LIKELY(subchannel_call != nullptr)) {
508       GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call,
509                                  "client_channel_destroy_call");
510     }
511     grpc_slice_unref_internal(path);
512     GRPC_ERROR_UNREF(cancel_error);
513     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
514       GPR_ASSERT(pending_batches[i].batch == nullptr);
515     }
516     if (have_request) {
517       request.Destroy();
518     }
519   }
520
521   // State for handling deadlines.
522   // The code in deadline_filter.c requires this to be the first field.
523   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
524   // and this struct both independently store pointers to the call stack
525   // and call combiner.  If/when we have time, find a way to avoid this
526   // without breaking the grpc_deadline_state abstraction.
527   grpc_deadline_state deadline_state;
528
529   grpc_slice path;  // Request path.
530   gpr_timespec call_start_time;
531   grpc_millis deadline;
532   gpr_arena* arena;
533   grpc_call_stack* owning_call;
534   grpc_call_combiner* call_combiner;
535
536   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
537   grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
538
539   grpc_subchannel_call* subchannel_call = nullptr;
540
541   // Set when we get a cancel_stream op.
542   grpc_error* cancel_error = GRPC_ERROR_NONE;
543
544   grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request;
545   bool have_request = false;
546   grpc_closure pick_closure;
547
548   grpc_polling_entity* pollent = nullptr;
549
550   // Batches are added to this list when received from above.
551   // They are removed when we are done handling the batch (i.e., when
552   // either we have invoked all of the batch's callbacks or we have
553   // passed the batch down to the subchannel call and are not
554   // intercepting any of its callbacks).
555   pending_batch pending_batches[MAX_PENDING_BATCHES] = {};
556   bool pending_send_initial_metadata : 1;
557   bool pending_send_message : 1;
558   bool pending_send_trailing_metadata : 1;
559
560   // Retry state.
561   bool enable_retries : 1;
562   bool retry_committed : 1;
563   bool last_attempt_got_server_pushback : 1;
564   int num_attempts_completed = 0;
565   size_t bytes_buffered_for_retry = 0;
566   grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
567   grpc_timer retry_timer;
568
569   // The number of pending retriable subchannel batches containing send ops.
570   // We hold a ref to the call stack while this is non-zero, since replay
571   // batches may not complete until after all callbacks have been returned
572   // to the surface, and we need to make sure that the call is not destroyed
573   // until all of these batches have completed.
574   // Note that we actually only need to track replay batches, but it's
575   // easier to track all batches with send ops.
576   int num_pending_retriable_subchannel_send_batches = 0;
577
578   // Cached data for retrying send ops.
579   // send_initial_metadata
580   bool seen_send_initial_metadata = false;
581   grpc_linked_mdelem* send_initial_metadata_storage = nullptr;
582   grpc_metadata_batch send_initial_metadata;
583   uint32_t send_initial_metadata_flags;
584   gpr_atm* peer_string;
585   // send_message
586   // When we get a send_message op, we replace the original byte stream
587   // with a CachingByteStream that caches the slices to a local buffer for
588   // use in retries.
589   // Note: We inline the cache for the first 3 send_message ops and use
590   // dynamic allocation after that.  This number was essentially picked
591   // at random; it could be changed in the future to tune performance.
592   grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
593   // send_trailing_metadata
594   bool seen_send_trailing_metadata = false;
595   grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
596   grpc_metadata_batch send_trailing_metadata;
597 };
598
599 }  // namespace
600
601 // Forward declarations.
602 static void retry_commit(grpc_call_element* elem,
603                          subchannel_call_retry_state* retry_state);
604 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
605 static void on_complete(void* arg, grpc_error* error);
606 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
607 static void start_pick_locked(void* arg, grpc_error* ignored);
608
609 //
610 // send op data caching
611 //
612
613 // Caches data for send ops so that it can be retried later, if not
614 // already cached.
615 static void maybe_cache_send_ops_for_batch(call_data* calld,
616                                            pending_batch* pending) {
617   if (pending->send_ops_cached) return;
618   pending->send_ops_cached = true;
619   grpc_transport_stream_op_batch* batch = pending->batch;
620   // Save a copy of metadata for send_initial_metadata ops.
621   if (batch->send_initial_metadata) {
622     calld->seen_send_initial_metadata = true;
623     GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
624     grpc_metadata_batch* send_initial_metadata =
625         batch->payload->send_initial_metadata.send_initial_metadata;
626     calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
627         calld->arena,
628         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
629     grpc_metadata_batch_copy(send_initial_metadata,
630                              &calld->send_initial_metadata,
631                              calld->send_initial_metadata_storage);
632     calld->send_initial_metadata_flags =
633         batch->payload->send_initial_metadata.send_initial_metadata_flags;
634     calld->peer_string = batch->payload->send_initial_metadata.peer_string;
635   }
636   // Set up cache for send_message ops.
637   if (batch->send_message) {
638     grpc_core::ByteStreamCache* cache =
639         static_cast<grpc_core::ByteStreamCache*>(
640             gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
641     new (cache) grpc_core::ByteStreamCache(
642         std::move(batch->payload->send_message.send_message));
643     calld->send_messages.push_back(cache);
644   }
645   // Save metadata batch for send_trailing_metadata ops.
646   if (batch->send_trailing_metadata) {
647     calld->seen_send_trailing_metadata = true;
648     GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
649     grpc_metadata_batch* send_trailing_metadata =
650         batch->payload->send_trailing_metadata.send_trailing_metadata;
651     calld->send_trailing_metadata_storage =
652         (grpc_linked_mdelem*)gpr_arena_alloc(
653             calld->arena,
654             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
655     grpc_metadata_batch_copy(send_trailing_metadata,
656                              &calld->send_trailing_metadata,
657                              calld->send_trailing_metadata_storage);
658   }
659 }
660
661 // Frees cached send_initial_metadata.
662 static void free_cached_send_initial_metadata(channel_data* chand,
663                                               call_data* calld) {
664   if (grpc_client_channel_trace.enabled()) {
665     gpr_log(GPR_INFO,
666             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
667             calld);
668   }
669   grpc_metadata_batch_destroy(&calld->send_initial_metadata);
670 }
671
672 // Frees cached send_message at index idx.
673 static void free_cached_send_message(channel_data* chand, call_data* calld,
674                                      size_t idx) {
675   if (grpc_client_channel_trace.enabled()) {
676     gpr_log(GPR_INFO,
677             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
678             chand, calld, idx);
679   }
680   calld->send_messages[idx]->Destroy();
681 }
682
683 // Frees cached send_trailing_metadata.
684 static void free_cached_send_trailing_metadata(channel_data* chand,
685                                                call_data* calld) {
686   if (grpc_client_channel_trace.enabled()) {
687     gpr_log(GPR_INFO,
688             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
689             chand, calld);
690   }
691   grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
692 }
693
694 // Frees cached send ops that have already been completed after
695 // committing the call.
696 static void free_cached_send_op_data_after_commit(
697     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
698   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
699   call_data* calld = static_cast<call_data*>(elem->call_data);
700   if (retry_state->completed_send_initial_metadata) {
701     free_cached_send_initial_metadata(chand, calld);
702   }
703   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
704     free_cached_send_message(chand, calld, i);
705   }
706   if (retry_state->completed_send_trailing_metadata) {
707     free_cached_send_trailing_metadata(chand, calld);
708   }
709 }
710
711 // Frees cached send ops that were completed by the completed batch in
712 // batch_data.  Used when batches are completed after the call is committed.
713 static void free_cached_send_op_data_for_completed_batch(
714     grpc_call_element* elem, subchannel_batch_data* batch_data,
715     subchannel_call_retry_state* retry_state) {
716   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
717   call_data* calld = static_cast<call_data*>(elem->call_data);
718   if (batch_data->batch.send_initial_metadata) {
719     free_cached_send_initial_metadata(chand, calld);
720   }
721   if (batch_data->batch.send_message) {
722     free_cached_send_message(chand, calld,
723                              retry_state->completed_send_message_count - 1);
724   }
725   if (batch_data->batch.send_trailing_metadata) {
726     free_cached_send_trailing_metadata(chand, calld);
727   }
728 }
729
730 //
731 // pending_batches management
732 //
733
734 // Returns the index into calld->pending_batches to be used for batch.
735 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
736   // Note: It is important the send_initial_metadata be the first entry
737   // here, since the code in pick_subchannel_locked() assumes it will be.
738   if (batch->send_initial_metadata) return 0;
739   if (batch->send_message) return 1;
740   if (batch->send_trailing_metadata) return 2;
741   if (batch->recv_initial_metadata) return 3;
742   if (batch->recv_message) return 4;
743   if (batch->recv_trailing_metadata) return 5;
744   GPR_UNREACHABLE_CODE(return (size_t)-1);
745 }
746
747 // This is called via the call combiner, so access to calld is synchronized.
748 static void pending_batches_add(grpc_call_element* elem,
749                                 grpc_transport_stream_op_batch* batch) {
750   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
751   call_data* calld = static_cast<call_data*>(elem->call_data);
752   const size_t idx = get_batch_index(batch);
753   if (grpc_client_channel_trace.enabled()) {
754     gpr_log(GPR_INFO,
755             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
756             calld, idx);
757   }
758   pending_batch* pending = &calld->pending_batches[idx];
759   GPR_ASSERT(pending->batch == nullptr);
760   pending->batch = batch;
761   pending->send_ops_cached = false;
762   if (calld->enable_retries) {
763     // Update state in calld about pending batches.
764     // Also check if the batch takes us over the retry buffer limit.
765     // Note: We don't check the size of trailing metadata here, because
766     // gRPC clients do not send trailing metadata.
767     if (batch->send_initial_metadata) {
768       calld->pending_send_initial_metadata = true;
769       calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
770           batch->payload->send_initial_metadata.send_initial_metadata);
771     }
772     if (batch->send_message) {
773       calld->pending_send_message = true;
774       calld->bytes_buffered_for_retry +=
775           batch->payload->send_message.send_message->length();
776     }
777     if (batch->send_trailing_metadata) {
778       calld->pending_send_trailing_metadata = true;
779     }
780     if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
781                      chand->per_rpc_retry_buffer_size)) {
782       if (grpc_client_channel_trace.enabled()) {
783         gpr_log(GPR_INFO,
784                 "chand=%p calld=%p: exceeded retry buffer size, committing",
785                 chand, calld);
786       }
787       subchannel_call_retry_state* retry_state =
788           calld->subchannel_call == nullptr
789               ? nullptr
790               : static_cast<subchannel_call_retry_state*>(
791                     grpc_connected_subchannel_call_get_parent_data(
792                         calld->subchannel_call));
793       retry_commit(elem, retry_state);
794       // If we are not going to retry and have not yet started, pretend
795       // retries are disabled so that we don't bother with retry overhead.
796       if (calld->num_attempts_completed == 0) {
797         if (grpc_client_channel_trace.enabled()) {
798           gpr_log(GPR_INFO,
799                   "chand=%p calld=%p: disabling retries before first attempt",
800                   chand, calld);
801         }
802         calld->enable_retries = false;
803       }
804     }
805   }
806 }
807
808 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
809   if (calld->enable_retries) {
810     if (pending->batch->send_initial_metadata) {
811       calld->pending_send_initial_metadata = false;
812     }
813     if (pending->batch->send_message) {
814       calld->pending_send_message = false;
815     }
816     if (pending->batch->send_trailing_metadata) {
817       calld->pending_send_trailing_metadata = false;
818     }
819   }
820   pending->batch = nullptr;
821 }
822
823 // This is called via the call combiner, so access to calld is synchronized.
824 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
825   grpc_transport_stream_op_batch* batch =
826       static_cast<grpc_transport_stream_op_batch*>(arg);
827   call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
828   // Note: This will release the call combiner.
829   grpc_transport_stream_op_batch_finish_with_failure(
830       batch, GRPC_ERROR_REF(error), calld->call_combiner);
831 }
832
833 // This is called via the call combiner, so access to calld is synchronized.
834 // If yield_call_combiner is true, assumes responsibility for yielding
835 // the call combiner.
836 static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
837                                  bool yield_call_combiner) {
838   GPR_ASSERT(error != GRPC_ERROR_NONE);
839   call_data* calld = static_cast<call_data*>(elem->call_data);
840   if (grpc_client_channel_trace.enabled()) {
841     size_t num_batches = 0;
842     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
843       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
844     }
845     gpr_log(GPR_INFO,
846             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
847             elem->channel_data, calld, num_batches, grpc_error_string(error));
848   }
849   grpc_core::CallCombinerClosureList closures;
850   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
851     pending_batch* pending = &calld->pending_batches[i];
852     grpc_transport_stream_op_batch* batch = pending->batch;
853     if (batch != nullptr) {
854       batch->handler_private.extra_arg = calld;
855       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
856                         fail_pending_batch_in_call_combiner, batch,
857                         grpc_schedule_on_exec_ctx);
858       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
859                    "pending_batches_fail");
860       pending_batch_clear(calld, pending);
861     }
862   }
863   if (yield_call_combiner) {
864     closures.RunClosures(calld->call_combiner);
865   } else {
866     closures.RunClosuresWithoutYielding(calld->call_combiner);
867   }
868   GRPC_ERROR_UNREF(error);
869 }
870
871 // This is called via the call combiner, so access to calld is synchronized.
872 static void resume_pending_batch_in_call_combiner(void* arg,
873                                                   grpc_error* ignored) {
874   grpc_transport_stream_op_batch* batch =
875       static_cast<grpc_transport_stream_op_batch*>(arg);
876   grpc_subchannel_call* subchannel_call =
877       static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
878   // Note: This will release the call combiner.
879   grpc_subchannel_call_process_op(subchannel_call, batch);
880 }
881
882 // This is called via the call combiner, so access to calld is synchronized.
883 static void pending_batches_resume(grpc_call_element* elem) {
884   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
885   call_data* calld = static_cast<call_data*>(elem->call_data);
886   if (calld->enable_retries) {
887     start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
888     return;
889   }
890   // Retries not enabled; send down batches as-is.
891   if (grpc_client_channel_trace.enabled()) {
892     size_t num_batches = 0;
893     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
894       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
895     }
896     gpr_log(GPR_INFO,
897             "chand=%p calld=%p: starting %" PRIuPTR
898             " pending batches on subchannel_call=%p",
899             chand, calld, num_batches, calld->subchannel_call);
900   }
901   grpc_core::CallCombinerClosureList closures;
902   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
903     pending_batch* pending = &calld->pending_batches[i];
904     grpc_transport_stream_op_batch* batch = pending->batch;
905     if (batch != nullptr) {
906       batch->handler_private.extra_arg = calld->subchannel_call;
907       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
908                         resume_pending_batch_in_call_combiner, batch,
909                         grpc_schedule_on_exec_ctx);
910       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
911                    "pending_batches_resume");
912       pending_batch_clear(calld, pending);
913     }
914   }
915   // Note: This will release the call combiner.
916   closures.RunClosures(calld->call_combiner);
917 }
918
919 static void maybe_clear_pending_batch(grpc_call_element* elem,
920                                       pending_batch* pending) {
921   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
922   call_data* calld = static_cast<call_data*>(elem->call_data);
923   grpc_transport_stream_op_batch* batch = pending->batch;
924   // We clear the pending batch if all of its callbacks have been
925   // scheduled and reset to nullptr.
926   if (batch->on_complete == nullptr &&
927       (!batch->recv_initial_metadata ||
928        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
929            nullptr) &&
930       (!batch->recv_message ||
931        batch->payload->recv_message.recv_message_ready == nullptr) &&
932       (!batch->recv_trailing_metadata ||
933        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
934            nullptr)) {
935     if (grpc_client_channel_trace.enabled()) {
936       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
937               calld);
938     }
939     pending_batch_clear(calld, pending);
940   }
941 }
942
943 // Returns a pointer to the first pending batch for which predicate(batch)
944 // returns true, or null if not found.
945 template <typename Predicate>
946 static pending_batch* pending_batch_find(grpc_call_element* elem,
947                                          const char* log_message,
948                                          Predicate predicate) {
949   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
950   call_data* calld = static_cast<call_data*>(elem->call_data);
951   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
952     pending_batch* pending = &calld->pending_batches[i];
953     grpc_transport_stream_op_batch* batch = pending->batch;
954     if (batch != nullptr && predicate(batch)) {
955       if (grpc_client_channel_trace.enabled()) {
956         gpr_log(GPR_INFO,
957                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
958                 calld, log_message, i);
959       }
960       return pending;
961     }
962   }
963   return nullptr;
964 }
965
966 //
967 // retry code
968 //
969
970 // Commits the call so that no further retry attempts will be performed.
971 static void retry_commit(grpc_call_element* elem,
972                          subchannel_call_retry_state* retry_state) {
973   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
974   call_data* calld = static_cast<call_data*>(elem->call_data);
975   if (calld->retry_committed) return;
976   calld->retry_committed = true;
977   if (grpc_client_channel_trace.enabled()) {
978     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
979   }
980   if (retry_state != nullptr) {
981     free_cached_send_op_data_after_commit(elem, retry_state);
982   }
983 }
984
985 // Starts a retry after appropriate back-off.
986 static void do_retry(grpc_call_element* elem,
987                      subchannel_call_retry_state* retry_state,
988                      grpc_millis server_pushback_ms) {
989   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
990   call_data* calld = static_cast<call_data*>(elem->call_data);
991   GPR_ASSERT(calld->method_params != nullptr);
992   const ClientChannelMethodParams::RetryPolicy* retry_policy =
993       calld->method_params->retry_policy();
994   GPR_ASSERT(retry_policy != nullptr);
995   // Reset subchannel call and connected subchannel.
996   if (calld->subchannel_call != nullptr) {
997     GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
998                                "client_channel_call_retry");
999     calld->subchannel_call = nullptr;
1000   }
1001   if (calld->have_request) {
1002     calld->have_request = false;
1003     calld->request.Destroy();
1004   }
1005   // Compute backoff delay.
1006   grpc_millis next_attempt_time;
1007   if (server_pushback_ms >= 0) {
1008     next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1009     calld->last_attempt_got_server_pushback = true;
1010   } else {
1011     if (calld->num_attempts_completed == 1 ||
1012         calld->last_attempt_got_server_pushback) {
1013       calld->retry_backoff.Init(
1014           grpc_core::BackOff::Options()
1015               .set_initial_backoff(retry_policy->initial_backoff)
1016               .set_multiplier(retry_policy->backoff_multiplier)
1017               .set_jitter(RETRY_BACKOFF_JITTER)
1018               .set_max_backoff(retry_policy->max_backoff));
1019       calld->last_attempt_got_server_pushback = false;
1020     }
1021     next_attempt_time = calld->retry_backoff->NextAttemptTime();
1022   }
1023   if (grpc_client_channel_trace.enabled()) {
1024     gpr_log(GPR_INFO,
1025             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
1026             calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1027   }
1028   // Schedule retry after computed delay.
1029   GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1030                     grpc_combiner_scheduler(chand->combiner));
1031   grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1032   // Update bookkeeping.
1033   if (retry_state != nullptr) retry_state->retry_dispatched = true;
1034 }
1035
1036 // Returns true if the call is being retried.
1037 static bool maybe_retry(grpc_call_element* elem,
1038                         subchannel_batch_data* batch_data,
1039                         grpc_status_code status,
1040                         grpc_mdelem* server_pushback_md) {
1041   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1042   call_data* calld = static_cast<call_data*>(elem->call_data);
1043   // Get retry policy.
1044   if (calld->method_params == nullptr) return false;
1045   const ClientChannelMethodParams::RetryPolicy* retry_policy =
1046       calld->method_params->retry_policy();
1047   if (retry_policy == nullptr) return false;
1048   // If we've already dispatched a retry from this call, return true.
1049   // This catches the case where the batch has multiple callbacks
1050   // (i.e., it includes either recv_message or recv_initial_metadata).
1051   subchannel_call_retry_state* retry_state = nullptr;
1052   if (batch_data != nullptr) {
1053     retry_state = static_cast<subchannel_call_retry_state*>(
1054         grpc_connected_subchannel_call_get_parent_data(
1055             batch_data->subchannel_call));
1056     if (retry_state->retry_dispatched) {
1057       if (grpc_client_channel_trace.enabled()) {
1058         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
1059                 calld);
1060       }
1061       return true;
1062     }
1063   }
1064   // Check status.
1065   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
1066     if (calld->retry_throttle_data != nullptr) {
1067       calld->retry_throttle_data->RecordSuccess();
1068     }
1069     if (grpc_client_channel_trace.enabled()) {
1070       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
1071     }
1072     return false;
1073   }
1074   // Status is not OK.  Check whether the status is retryable.
1075   if (!retry_policy->retryable_status_codes.Contains(status)) {
1076     if (grpc_client_channel_trace.enabled()) {
1077       gpr_log(GPR_INFO,
1078               "chand=%p calld=%p: status %s not configured as retryable", chand,
1079               calld, grpc_status_code_to_string(status));
1080     }
1081     return false;
1082   }
1083   // Record the failure and check whether retries are throttled.
1084   // Note that it's important for this check to come after the status
1085   // code check above, since we should only record failures whose statuses
1086   // match the configured retryable status codes, so that we don't count
1087   // things like failures due to malformed requests (INVALID_ARGUMENT).
1088   // Conversely, it's important for this to come before the remaining
1089   // checks, so that we don't fail to record failures due to other factors.
1090   if (calld->retry_throttle_data != nullptr &&
1091       !calld->retry_throttle_data->RecordFailure()) {
1092     if (grpc_client_channel_trace.enabled()) {
1093       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
1094     }
1095     return false;
1096   }
1097   // Check whether the call is committed.
1098   if (calld->retry_committed) {
1099     if (grpc_client_channel_trace.enabled()) {
1100       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
1101               calld);
1102     }
1103     return false;
1104   }
1105   // Check whether we have retries remaining.
1106   ++calld->num_attempts_completed;
1107   if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1108     if (grpc_client_channel_trace.enabled()) {
1109       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1110               calld, retry_policy->max_attempts);
1111     }
1112     return false;
1113   }
1114   // If the call was cancelled from the surface, don't retry.
1115   if (calld->cancel_error != GRPC_ERROR_NONE) {
1116     if (grpc_client_channel_trace.enabled()) {
1117       gpr_log(GPR_INFO,
1118               "chand=%p calld=%p: call cancelled from surface, not retrying",
1119               chand, calld);
1120     }
1121     return false;
1122   }
1123   // Check server push-back.
1124   grpc_millis server_pushback_ms = -1;
1125   if (server_pushback_md != nullptr) {
1126     // If the value is "-1" or any other unparseable string, we do not retry.
1127     uint32_t ms;
1128     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1129       if (grpc_client_channel_trace.enabled()) {
1130         gpr_log(GPR_INFO,
1131                 "chand=%p calld=%p: not retrying due to server push-back",
1132                 chand, calld);
1133       }
1134       return false;
1135     } else {
1136       if (grpc_client_channel_trace.enabled()) {
1137         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1138                 chand, calld, ms);
1139       }
1140       server_pushback_ms = (grpc_millis)ms;
1141     }
1142   }
1143   do_retry(elem, retry_state, server_pushback_ms);
1144   return true;
1145 }
1146
1147 //
1148 // subchannel_batch_data
1149 //
1150
1151 namespace {
1152
1153 subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
1154                                              call_data* calld, int refcount,
1155                                              bool set_on_complete)
1156     : elem(elem),
1157       subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call,
1158                                                "batch_data_create")) {
1159   subchannel_call_retry_state* retry_state =
1160       static_cast<subchannel_call_retry_state*>(
1161           grpc_connected_subchannel_call_get_parent_data(
1162               calld->subchannel_call));
1163   batch.payload = &retry_state->batch_payload;
1164   gpr_ref_init(&refs, refcount);
1165   if (set_on_complete) {
1166     GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this,
1167                       grpc_schedule_on_exec_ctx);
1168     batch.on_complete = &on_complete;
1169   }
1170   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1171 }
1172
1173 void subchannel_batch_data::destroy() {
1174   subchannel_call_retry_state* retry_state =
1175       static_cast<subchannel_call_retry_state*>(
1176           grpc_connected_subchannel_call_get_parent_data(subchannel_call));
1177   if (batch.send_initial_metadata) {
1178     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
1179   }
1180   if (batch.send_trailing_metadata) {
1181     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
1182   }
1183   if (batch.recv_initial_metadata) {
1184     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
1185   }
1186   if (batch.recv_trailing_metadata) {
1187     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
1188   }
1189   GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref");
1190   call_data* calld = static_cast<call_data*>(elem->call_data);
1191   GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1192 }
1193
1194 }  // namespace
1195
1196 // Creates a subchannel_batch_data object on the call's arena with the
1197 // specified refcount.  If set_on_complete is true, the batch's
1198 // on_complete callback will be set to point to on_complete();
1199 // otherwise, the batch's on_complete callback will be null.
1200 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1201                                                 int refcount,
1202                                                 bool set_on_complete) {
1203   call_data* calld = static_cast<call_data*>(elem->call_data);
1204   subchannel_batch_data* batch_data =
1205       new (gpr_arena_alloc(calld->arena, sizeof(*batch_data)))
1206           subchannel_batch_data(elem, calld, refcount, set_on_complete);
1207   return batch_data;
1208 }
1209
1210 static void batch_data_unref(subchannel_batch_data* batch_data) {
1211   if (gpr_unref(&batch_data->refs)) {
1212     batch_data->destroy();
1213   }
1214 }
1215
1216 //
1217 // recv_initial_metadata callback handling
1218 //
1219
1220 // Invokes recv_initial_metadata_ready for a subchannel batch.
1221 static void invoke_recv_initial_metadata_callback(void* arg,
1222                                                   grpc_error* error) {
1223   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1224   // Find pending batch.
1225   pending_batch* pending = pending_batch_find(
1226       batch_data->elem, "invoking recv_initial_metadata_ready for",
1227       [](grpc_transport_stream_op_batch* batch) {
1228         return batch->recv_initial_metadata &&
1229                batch->payload->recv_initial_metadata
1230                        .recv_initial_metadata_ready != nullptr;
1231       });
1232   GPR_ASSERT(pending != nullptr);
1233   // Return metadata.
1234   subchannel_call_retry_state* retry_state =
1235       static_cast<subchannel_call_retry_state*>(
1236           grpc_connected_subchannel_call_get_parent_data(
1237               batch_data->subchannel_call));
1238   grpc_metadata_batch_move(
1239       &retry_state->recv_initial_metadata,
1240       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1241   // Update bookkeeping.
1242   // Note: Need to do this before invoking the callback, since invoking
1243   // the callback will result in yielding the call combiner.
1244   grpc_closure* recv_initial_metadata_ready =
1245       pending->batch->payload->recv_initial_metadata
1246           .recv_initial_metadata_ready;
1247   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1248       nullptr;
1249   maybe_clear_pending_batch(batch_data->elem, pending);
1250   batch_data_unref(batch_data);
1251   // Invoke callback.
1252   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1253 }
1254
1255 // Intercepts recv_initial_metadata_ready callback for retries.
1256 // Commits the call and returns the initial metadata up the stack.
1257 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1258   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1259   grpc_call_element* elem = batch_data->elem;
1260   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1261   call_data* calld = static_cast<call_data*>(elem->call_data);
1262   if (grpc_client_channel_trace.enabled()) {
1263     gpr_log(GPR_INFO,
1264             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1265             chand, calld, grpc_error_string(error));
1266   }
1267   subchannel_call_retry_state* retry_state =
1268       static_cast<subchannel_call_retry_state*>(
1269           grpc_connected_subchannel_call_get_parent_data(
1270               batch_data->subchannel_call));
1271   retry_state->completed_recv_initial_metadata = true;
1272   // If a retry was already dispatched, then we're not going to use the
1273   // result of this recv_initial_metadata op, so do nothing.
1274   if (retry_state->retry_dispatched) {
1275     GRPC_CALL_COMBINER_STOP(
1276         calld->call_combiner,
1277         "recv_initial_metadata_ready after retry dispatched");
1278     return;
1279   }
1280   // If we got an error or a Trailers-Only response and have not yet gotten
1281   // the recv_trailing_metadata_ready callback, then defer propagating this
1282   // callback back to the surface.  We can evaluate whether to retry when
1283   // recv_trailing_metadata comes back.
1284   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
1285                     error != GRPC_ERROR_NONE) &&
1286                    !retry_state->completed_recv_trailing_metadata)) {
1287     if (grpc_client_channel_trace.enabled()) {
1288       gpr_log(GPR_INFO,
1289               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1290               "(Trailers-Only)",
1291               chand, calld);
1292     }
1293     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
1294     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1295     if (!retry_state->started_recv_trailing_metadata) {
1296       // recv_trailing_metadata not yet started by application; start it
1297       // ourselves to get status.
1298       start_internal_recv_trailing_metadata(elem);
1299     } else {
1300       GRPC_CALL_COMBINER_STOP(
1301           calld->call_combiner,
1302           "recv_initial_metadata_ready trailers-only or error");
1303     }
1304     return;
1305   }
1306   // Received valid initial metadata, so commit the call.
1307   retry_commit(elem, retry_state);
1308   // Invoke the callback to return the result to the surface.
1309   // Manually invoking a callback function; it does not take ownership of error.
1310   invoke_recv_initial_metadata_callback(batch_data, error);
1311 }
1312
1313 //
1314 // recv_message callback handling
1315 //
1316
1317 // Invokes recv_message_ready for a subchannel batch.
1318 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1319   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1320   // Find pending op.
1321   pending_batch* pending = pending_batch_find(
1322       batch_data->elem, "invoking recv_message_ready for",
1323       [](grpc_transport_stream_op_batch* batch) {
1324         return batch->recv_message &&
1325                batch->payload->recv_message.recv_message_ready != nullptr;
1326       });
1327   GPR_ASSERT(pending != nullptr);
1328   // Return payload.
1329   subchannel_call_retry_state* retry_state =
1330       static_cast<subchannel_call_retry_state*>(
1331           grpc_connected_subchannel_call_get_parent_data(
1332               batch_data->subchannel_call));
1333   *pending->batch->payload->recv_message.recv_message =
1334       std::move(retry_state->recv_message);
1335   // Update bookkeeping.
1336   // Note: Need to do this before invoking the callback, since invoking
1337   // the callback will result in yielding the call combiner.
1338   grpc_closure* recv_message_ready =
1339       pending->batch->payload->recv_message.recv_message_ready;
1340   pending->batch->payload->recv_message.recv_message_ready = nullptr;
1341   maybe_clear_pending_batch(batch_data->elem, pending);
1342   batch_data_unref(batch_data);
1343   // Invoke callback.
1344   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1345 }
1346
1347 // Intercepts recv_message_ready callback for retries.
1348 // Commits the call and returns the message up the stack.
1349 static void recv_message_ready(void* arg, grpc_error* error) {
1350   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1351   grpc_call_element* elem = batch_data->elem;
1352   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1353   call_data* calld = static_cast<call_data*>(elem->call_data);
1354   if (grpc_client_channel_trace.enabled()) {
1355     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1356             chand, calld, grpc_error_string(error));
1357   }
1358   subchannel_call_retry_state* retry_state =
1359       static_cast<subchannel_call_retry_state*>(
1360           grpc_connected_subchannel_call_get_parent_data(
1361               batch_data->subchannel_call));
1362   ++retry_state->completed_recv_message_count;
1363   // If a retry was already dispatched, then we're not going to use the
1364   // result of this recv_message op, so do nothing.
1365   if (retry_state->retry_dispatched) {
1366     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1367                             "recv_message_ready after retry dispatched");
1368     return;
1369   }
1370   // If we got an error or the payload was nullptr and we have not yet gotten
1371   // the recv_trailing_metadata_ready callback, then defer propagating this
1372   // callback back to the surface.  We can evaluate whether to retry when
1373   // recv_trailing_metadata comes back.
1374   if (GPR_UNLIKELY(
1375           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1376           !retry_state->completed_recv_trailing_metadata)) {
1377     if (grpc_client_channel_trace.enabled()) {
1378       gpr_log(GPR_INFO,
1379               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1380               "message and recv_trailing_metadata pending)",
1381               chand, calld);
1382     }
1383     retry_state->recv_message_ready_deferred_batch = batch_data;
1384     retry_state->recv_message_error = GRPC_ERROR_REF(error);
1385     if (!retry_state->started_recv_trailing_metadata) {
1386       // recv_trailing_metadata not yet started by application; start it
1387       // ourselves to get status.
1388       start_internal_recv_trailing_metadata(elem);
1389     } else {
1390       GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1391     }
1392     return;
1393   }
1394   // Received a valid message, so commit the call.
1395   retry_commit(elem, retry_state);
1396   // Invoke the callback to return the result to the surface.
1397   // Manually invoking a callback function; it does not take ownership of error.
1398   invoke_recv_message_callback(batch_data, error);
1399 }
1400
1401 //
1402 // recv_trailing_metadata handling
1403 //
1404
1405 // Sets *status and *server_pushback_md based on md_batch and error.
1406 // Only sets *server_pushback_md if server_pushback_md != nullptr.
1407 static void get_call_status(grpc_call_element* elem,
1408                             grpc_metadata_batch* md_batch, grpc_error* error,
1409                             grpc_status_code* status,
1410                             grpc_mdelem** server_pushback_md) {
1411   call_data* calld = static_cast<call_data*>(elem->call_data);
1412   if (error != GRPC_ERROR_NONE) {
1413     grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1414                           nullptr);
1415   } else {
1416     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1417     *status =
1418         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1419     if (server_pushback_md != nullptr &&
1420         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1421       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1422     }
1423   }
1424   GRPC_ERROR_UNREF(error);
1425 }
1426
1427 // Adds recv_trailing_metadata_ready closure to closures.
1428 static void add_closure_for_recv_trailing_metadata_ready(
1429     grpc_call_element* elem, subchannel_batch_data* batch_data,
1430     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1431   // Find pending batch.
1432   pending_batch* pending = pending_batch_find(
1433       elem, "invoking recv_trailing_metadata for",
1434       [](grpc_transport_stream_op_batch* batch) {
1435         return batch->recv_trailing_metadata &&
1436                batch->payload->recv_trailing_metadata
1437                        .recv_trailing_metadata_ready != nullptr;
1438       });
1439   // If we generated the recv_trailing_metadata op internally via
1440   // start_internal_recv_trailing_metadata(), then there will be no
1441   // pending batch.
1442   if (pending == nullptr) {
1443     GRPC_ERROR_UNREF(error);
1444     return;
1445   }
1446   // Return metadata.
1447   subchannel_call_retry_state* retry_state =
1448       static_cast<subchannel_call_retry_state*>(
1449           grpc_connected_subchannel_call_get_parent_data(
1450               batch_data->subchannel_call));
1451   grpc_metadata_batch_move(
1452       &retry_state->recv_trailing_metadata,
1453       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1454   // Add closure.
1455   closures->Add(pending->batch->payload->recv_trailing_metadata
1456                     .recv_trailing_metadata_ready,
1457                 error, "recv_trailing_metadata_ready for pending batch");
1458   // Update bookkeeping.
1459   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1460       nullptr;
1461   maybe_clear_pending_batch(elem, pending);
1462 }
1463
1464 // Adds any necessary closures for deferred recv_initial_metadata and
1465 // recv_message callbacks to closures.
1466 static void add_closures_for_deferred_recv_callbacks(
1467     subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1468     grpc_core::CallCombinerClosureList* closures) {
1469   if (batch_data->batch.recv_trailing_metadata) {
1470     // Add closure for deferred recv_initial_metadata_ready.
1471     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1472                      nullptr)) {
1473       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1474                         invoke_recv_initial_metadata_callback,
1475                         retry_state->recv_initial_metadata_ready_deferred_batch,
1476                         grpc_schedule_on_exec_ctx);
1477       closures->Add(&retry_state->recv_initial_metadata_ready,
1478                     retry_state->recv_initial_metadata_error,
1479                     "resuming recv_initial_metadata_ready");
1480       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1481     }
1482     // Add closure for deferred recv_message_ready.
1483     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1484                      nullptr)) {
1485       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
1486                         invoke_recv_message_callback,
1487                         retry_state->recv_message_ready_deferred_batch,
1488                         grpc_schedule_on_exec_ctx);
1489       closures->Add(&retry_state->recv_message_ready,
1490                     retry_state->recv_message_error,
1491                     "resuming recv_message_ready");
1492       retry_state->recv_message_ready_deferred_batch = nullptr;
1493     }
1494   }
1495 }
1496
1497 // Returns true if any op in the batch was not yet started.
1498 // Only looks at send ops, since recv ops are always started immediately.
1499 static bool pending_batch_is_unstarted(
1500     pending_batch* pending, call_data* calld,
1501     subchannel_call_retry_state* retry_state) {
1502   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1503     return false;
1504   }
1505   if (pending->batch->send_initial_metadata &&
1506       !retry_state->started_send_initial_metadata) {
1507     return true;
1508   }
1509   if (pending->batch->send_message &&
1510       retry_state->started_send_message_count < calld->send_messages.size()) {
1511     return true;
1512   }
1513   if (pending->batch->send_trailing_metadata &&
1514       !retry_state->started_send_trailing_metadata) {
1515     return true;
1516   }
1517   return false;
1518 }
1519
1520 // For any pending batch containing an op that has not yet been started,
1521 // adds the pending batch's completion closures to closures.
1522 static void add_closures_to_fail_unstarted_pending_batches(
1523     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1524     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1525   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1526   call_data* calld = static_cast<call_data*>(elem->call_data);
1527   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1528     pending_batch* pending = &calld->pending_batches[i];
1529     if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1530       if (grpc_client_channel_trace.enabled()) {
1531         gpr_log(GPR_INFO,
1532                 "chand=%p calld=%p: failing unstarted pending batch at index "
1533                 "%" PRIuPTR,
1534                 chand, calld, i);
1535       }
1536       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1537                     "failing on_complete for pending batch");
1538       pending->batch->on_complete = nullptr;
1539       maybe_clear_pending_batch(elem, pending);
1540     }
1541   }
1542   GRPC_ERROR_UNREF(error);
1543 }
1544
1545 // Runs necessary closures upon completion of a call attempt.
1546 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1547                                             grpc_error* error) {
1548   grpc_call_element* elem = batch_data->elem;
1549   call_data* calld = static_cast<call_data*>(elem->call_data);
1550   subchannel_call_retry_state* retry_state =
1551       static_cast<subchannel_call_retry_state*>(
1552           grpc_connected_subchannel_call_get_parent_data(
1553               batch_data->subchannel_call));
1554   // Construct list of closures to execute.
1555   grpc_core::CallCombinerClosureList closures;
1556   // First, add closure for recv_trailing_metadata_ready.
1557   add_closure_for_recv_trailing_metadata_ready(
1558       elem, batch_data, GRPC_ERROR_REF(error), &closures);
1559   // If there are deferred recv_initial_metadata_ready or recv_message_ready
1560   // callbacks, add them to closures.
1561   add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1562   // Add closures to fail any pending batches that have not yet been started.
1563   add_closures_to_fail_unstarted_pending_batches(
1564       elem, retry_state, GRPC_ERROR_REF(error), &closures);
1565   // Don't need batch_data anymore.
1566   batch_data_unref(batch_data);
1567   // Schedule all of the closures identified above.
1568   // Note: This will release the call combiner.
1569   closures.RunClosures(calld->call_combiner);
1570   GRPC_ERROR_UNREF(error);
1571 }
1572
1573 // Intercepts recv_trailing_metadata_ready callback for retries.
1574 // Commits the call and returns the trailing metadata up the stack.
1575 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1576   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1577   grpc_call_element* elem = batch_data->elem;
1578   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1579   call_data* calld = static_cast<call_data*>(elem->call_data);
1580   if (grpc_client_channel_trace.enabled()) {
1581     gpr_log(GPR_INFO,
1582             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1583             chand, calld, grpc_error_string(error));
1584   }
1585   subchannel_call_retry_state* retry_state =
1586       static_cast<subchannel_call_retry_state*>(
1587           grpc_connected_subchannel_call_get_parent_data(
1588               batch_data->subchannel_call));
1589   retry_state->completed_recv_trailing_metadata = true;
1590   // Get the call's status and check for server pushback metadata.
1591   grpc_status_code status = GRPC_STATUS_OK;
1592   grpc_mdelem* server_pushback_md = nullptr;
1593   grpc_metadata_batch* md_batch =
1594       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
1595   get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
1596                   &server_pushback_md);
1597   if (grpc_client_channel_trace.enabled()) {
1598     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1599             calld, grpc_status_code_to_string(status));
1600   }
1601   // Check if we should retry.
1602   if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1603     // Unref batch_data for deferred recv_initial_metadata_ready or
1604     // recv_message_ready callbacks, if any.
1605     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1606       batch_data_unref(batch_data);
1607       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1608     }
1609     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
1610       batch_data_unref(batch_data);
1611       GRPC_ERROR_UNREF(retry_state->recv_message_error);
1612     }
1613     batch_data_unref(batch_data);
1614     return;
1615   }
1616   // Not retrying, so commit the call.
1617   retry_commit(elem, retry_state);
1618   // Run any necessary closures.
1619   run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
1620 }
1621
1622 //
1623 // on_complete callback handling
1624 //
1625
1626 // Adds the on_complete closure for the pending batch completed in
1627 // batch_data to closures.
1628 static void add_closure_for_completed_pending_batch(
1629     grpc_call_element* elem, subchannel_batch_data* batch_data,
1630     subchannel_call_retry_state* retry_state, grpc_error* error,
1631     grpc_core::CallCombinerClosureList* closures) {
1632   pending_batch* pending = pending_batch_find(
1633       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
1634         // Match the pending batch with the same set of send ops as the
1635         // subchannel batch we've just completed.
1636         return batch->on_complete != nullptr &&
1637                batch_data->batch.send_initial_metadata ==
1638                    batch->send_initial_metadata &&
1639                batch_data->batch.send_message == batch->send_message &&
1640                batch_data->batch.send_trailing_metadata ==
1641                    batch->send_trailing_metadata;
1642       });
1643   // If batch_data is a replay batch, then there will be no pending
1644   // batch to complete.
1645   if (pending == nullptr) {
1646     GRPC_ERROR_UNREF(error);
1647     return;
1648   }
1649   // Add closure.
1650   closures->Add(pending->batch->on_complete, error,
1651                 "on_complete for pending batch");
1652   pending->batch->on_complete = nullptr;
1653   maybe_clear_pending_batch(elem, pending);
1654 }
1655
1656 // If there are any cached ops to replay or pending ops to start on the
1657 // subchannel call, adds a closure to closures to invoke
1658 // start_retriable_subchannel_batches().
1659 static void add_closures_for_replay_or_pending_send_ops(
1660     grpc_call_element* elem, subchannel_batch_data* batch_data,
1661     subchannel_call_retry_state* retry_state,
1662     grpc_core::CallCombinerClosureList* closures) {
1663   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1664   call_data* calld = static_cast<call_data*>(elem->call_data);
1665   bool have_pending_send_message_ops =
1666       retry_state->started_send_message_count < calld->send_messages.size();
1667   bool have_pending_send_trailing_metadata_op =
1668       calld->seen_send_trailing_metadata &&
1669       !retry_state->started_send_trailing_metadata;
1670   if (!have_pending_send_message_ops &&
1671       !have_pending_send_trailing_metadata_op) {
1672     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1673       pending_batch* pending = &calld->pending_batches[i];
1674       grpc_transport_stream_op_batch* batch = pending->batch;
1675       if (batch == nullptr || pending->send_ops_cached) continue;
1676       if (batch->send_message) have_pending_send_message_ops = true;
1677       if (batch->send_trailing_metadata) {
1678         have_pending_send_trailing_metadata_op = true;
1679       }
1680     }
1681   }
1682   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1683     if (grpc_client_channel_trace.enabled()) {
1684       gpr_log(GPR_INFO,
1685               "chand=%p calld=%p: starting next batch for pending send op(s)",
1686               chand, calld);
1687     }
1688     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
1689                       start_retriable_subchannel_batches, elem,
1690                       grpc_schedule_on_exec_ctx);
1691     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
1692                   "starting next batch for send_* op(s)");
1693   }
1694 }
1695
1696 // Callback used to intercept on_complete from subchannel calls.
1697 // Called only when retries are enabled.
1698 static void on_complete(void* arg, grpc_error* error) {
1699   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1700   grpc_call_element* elem = batch_data->elem;
1701   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1702   call_data* calld = static_cast<call_data*>(elem->call_data);
1703   if (grpc_client_channel_trace.enabled()) {
1704     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
1705     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1706             chand, calld, grpc_error_string(error), batch_str);
1707     gpr_free(batch_str);
1708   }
1709   subchannel_call_retry_state* retry_state =
1710       static_cast<subchannel_call_retry_state*>(
1711           grpc_connected_subchannel_call_get_parent_data(
1712               batch_data->subchannel_call));
1713   // Update bookkeeping in retry_state.
1714   if (batch_data->batch.send_initial_metadata) {
1715     retry_state->completed_send_initial_metadata = true;
1716   }
1717   if (batch_data->batch.send_message) {
1718     ++retry_state->completed_send_message_count;
1719   }
1720   if (batch_data->batch.send_trailing_metadata) {
1721     retry_state->completed_send_trailing_metadata = true;
1722   }
1723   // If the call is committed, free cached data for send ops that we've just
1724   // completed.
1725   if (calld->retry_committed) {
1726     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
1727   }
1728   // Construct list of closures to execute.
1729   grpc_core::CallCombinerClosureList closures;
1730   // If a retry was already dispatched, that means we saw
1731   // recv_trailing_metadata before this, so we do nothing here.
1732   // Otherwise, invoke the callback to return the result to the surface.
1733   if (!retry_state->retry_dispatched) {
1734     // Add closure for the completed pending batch, if any.
1735     add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
1736                                             GRPC_ERROR_REF(error), &closures);
1737     // If needed, add a callback to start any replay or pending send ops on
1738     // the subchannel call.
1739     if (!retry_state->completed_recv_trailing_metadata) {
1740       add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
1741                                                   &closures);
1742     }
1743   }
1744   // Track number of pending subchannel send batches and determine if this
1745   // was the last one.
1746   --calld->num_pending_retriable_subchannel_send_batches;
1747   const bool last_send_batch_complete =
1748       calld->num_pending_retriable_subchannel_send_batches == 0;
1749   // Don't need batch_data anymore.
1750   batch_data_unref(batch_data);
1751   // Schedule all of the closures identified above.
1752   // Note: This yeilds the call combiner.
1753   closures.RunClosures(calld->call_combiner);
1754   // If this was the last subchannel send batch, unref the call stack.
1755   if (last_send_batch_complete) {
1756     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
1757   }
1758 }
1759
1760 //
1761 // subchannel batch construction
1762 //
1763
1764 // Helper function used to start a subchannel batch in the call combiner.
1765 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
1766   grpc_transport_stream_op_batch* batch =
1767       static_cast<grpc_transport_stream_op_batch*>(arg);
1768   grpc_subchannel_call* subchannel_call =
1769       static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1770   // Note: This will release the call combiner.
1771   grpc_subchannel_call_process_op(subchannel_call, batch);
1772 }
1773
1774 // Adds a closure to closures that will execute batch in the call combiner.
1775 static void add_closure_for_subchannel_batch(
1776     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
1777     grpc_core::CallCombinerClosureList* closures) {
1778   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1779   call_data* calld = static_cast<call_data*>(elem->call_data);
1780   batch->handler_private.extra_arg = calld->subchannel_call;
1781   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1782                     start_batch_in_call_combiner, batch,
1783                     grpc_schedule_on_exec_ctx);
1784   if (grpc_client_channel_trace.enabled()) {
1785     char* batch_str = grpc_transport_stream_op_batch_string(batch);
1786     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
1787             calld, batch_str);
1788     gpr_free(batch_str);
1789   }
1790   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1791                 "start_subchannel_batch");
1792 }
1793
1794 // Adds retriable send_initial_metadata op to batch_data.
1795 static void add_retriable_send_initial_metadata_op(
1796     call_data* calld, subchannel_call_retry_state* retry_state,
1797     subchannel_batch_data* batch_data) {
1798   // Maps the number of retries to the corresponding metadata value slice.
1799   static const grpc_slice* retry_count_strings[] = {
1800       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
1801   // We need to make a copy of the metadata batch for each attempt, since
1802   // the filters in the subchannel stack may modify this batch, and we don't
1803   // want those modifications to be passed forward to subsequent attempts.
1804   //
1805   // If we've already completed one or more attempts, add the
1806   // grpc-retry-attempts header.
1807   retry_state->send_initial_metadata_storage =
1808       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
1809           calld->arena, sizeof(grpc_linked_mdelem) *
1810                             (calld->send_initial_metadata.list.count +
1811                              (calld->num_attempts_completed > 0))));
1812   grpc_metadata_batch_copy(&calld->send_initial_metadata,
1813                            &retry_state->send_initial_metadata,
1814                            retry_state->send_initial_metadata_storage);
1815   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
1816                        .grpc_previous_rpc_attempts != nullptr)) {
1817     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
1818                                retry_state->send_initial_metadata.idx.named
1819                                    .grpc_previous_rpc_attempts);
1820   }
1821   if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
1822     grpc_mdelem retry_md = grpc_mdelem_create(
1823         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
1824         *retry_count_strings[calld->num_attempts_completed - 1], nullptr);
1825     grpc_error* error = grpc_metadata_batch_add_tail(
1826         &retry_state->send_initial_metadata,
1827         &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
1828                                                         .list.count],
1829         retry_md);
1830     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
1831       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
1832               grpc_error_string(error));
1833       GPR_ASSERT(false);
1834     }
1835   }
1836   retry_state->started_send_initial_metadata = true;
1837   batch_data->batch.send_initial_metadata = true;
1838   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
1839       &retry_state->send_initial_metadata;
1840   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
1841       calld->send_initial_metadata_flags;
1842   batch_data->batch.payload->send_initial_metadata.peer_string =
1843       calld->peer_string;
1844 }
1845
1846 // Adds retriable send_message op to batch_data.
1847 static void add_retriable_send_message_op(
1848     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1849     subchannel_batch_data* batch_data) {
1850   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1851   call_data* calld = static_cast<call_data*>(elem->call_data);
1852   if (grpc_client_channel_trace.enabled()) {
1853     gpr_log(GPR_INFO,
1854             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
1855             chand, calld, retry_state->started_send_message_count);
1856   }
1857   grpc_core::ByteStreamCache* cache =
1858       calld->send_messages[retry_state->started_send_message_count];
1859   ++retry_state->started_send_message_count;
1860   retry_state->send_message.Init(cache);
1861   batch_data->batch.send_message = true;
1862   batch_data->batch.payload->send_message.send_message.reset(
1863       retry_state->send_message.get());
1864 }
1865
1866 // Adds retriable send_trailing_metadata op to batch_data.
1867 static void add_retriable_send_trailing_metadata_op(
1868     call_data* calld, subchannel_call_retry_state* retry_state,
1869     subchannel_batch_data* batch_data) {
1870   // We need to make a copy of the metadata batch for each attempt, since
1871   // the filters in the subchannel stack may modify this batch, and we don't
1872   // want those modifications to be passed forward to subsequent attempts.
1873   retry_state->send_trailing_metadata_storage =
1874       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
1875           calld->arena, sizeof(grpc_linked_mdelem) *
1876                             calld->send_trailing_metadata.list.count));
1877   grpc_metadata_batch_copy(&calld->send_trailing_metadata,
1878                            &retry_state->send_trailing_metadata,
1879                            retry_state->send_trailing_metadata_storage);
1880   retry_state->started_send_trailing_metadata = true;
1881   batch_data->batch.send_trailing_metadata = true;
1882   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
1883       &retry_state->send_trailing_metadata;
1884 }
1885
1886 // Adds retriable recv_initial_metadata op to batch_data.
1887 static void add_retriable_recv_initial_metadata_op(
1888     call_data* calld, subchannel_call_retry_state* retry_state,
1889     subchannel_batch_data* batch_data) {
1890   retry_state->started_recv_initial_metadata = true;
1891   batch_data->batch.recv_initial_metadata = true;
1892   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
1893   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
1894       &retry_state->recv_initial_metadata;
1895   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
1896       &retry_state->trailing_metadata_available;
1897   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1898                     recv_initial_metadata_ready, batch_data,
1899                     grpc_schedule_on_exec_ctx);
1900   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
1901       &retry_state->recv_initial_metadata_ready;
1902 }
1903
1904 // Adds retriable recv_message op to batch_data.
1905 static void add_retriable_recv_message_op(
1906     call_data* calld, subchannel_call_retry_state* retry_state,
1907     subchannel_batch_data* batch_data) {
1908   ++retry_state->started_recv_message_count;
1909   batch_data->batch.recv_message = true;
1910   batch_data->batch.payload->recv_message.recv_message =
1911       &retry_state->recv_message;
1912   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
1913                     batch_data, grpc_schedule_on_exec_ctx);
1914   batch_data->batch.payload->recv_message.recv_message_ready =
1915       &retry_state->recv_message_ready;
1916 }
1917
1918 // Adds retriable recv_trailing_metadata op to batch_data.
1919 static void add_retriable_recv_trailing_metadata_op(
1920     call_data* calld, subchannel_call_retry_state* retry_state,
1921     subchannel_batch_data* batch_data) {
1922   retry_state->started_recv_trailing_metadata = true;
1923   batch_data->batch.recv_trailing_metadata = true;
1924   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
1925   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
1926       &retry_state->recv_trailing_metadata;
1927   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
1928       &retry_state->collect_stats;
1929   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
1930                     recv_trailing_metadata_ready, batch_data,
1931                     grpc_schedule_on_exec_ctx);
1932   batch_data->batch.payload->recv_trailing_metadata
1933       .recv_trailing_metadata_ready =
1934       &retry_state->recv_trailing_metadata_ready;
1935 }
1936
1937 // Helper function used to start a recv_trailing_metadata batch.  This
1938 // is used in the case where a recv_initial_metadata or recv_message
1939 // op fails in a way that we know the call is over but when the application
1940 // has not yet started its own recv_trailing_metadata op.
1941 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
1942   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1943   call_data* calld = static_cast<call_data*>(elem->call_data);
1944   if (grpc_client_channel_trace.enabled()) {
1945     gpr_log(GPR_INFO,
1946             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
1947             "started; starting it internally",
1948             chand, calld);
1949   }
1950   subchannel_call_retry_state* retry_state =
1951       static_cast<subchannel_call_retry_state*>(
1952           grpc_connected_subchannel_call_get_parent_data(
1953               calld->subchannel_call));
1954   // Create batch_data with 2 refs, since this batch will be unreffed twice:
1955   // once for the recv_trailing_metadata_ready callback when the subchannel
1956   // batch returns, and again when we actually get a recv_trailing_metadata
1957   // op from the surface.
1958   subchannel_batch_data* batch_data =
1959       batch_data_create(elem, 2, false /* set_on_complete */);
1960   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
1961   retry_state->recv_trailing_metadata_internal_batch = batch_data;
1962   // Note: This will release the call combiner.
1963   grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
1964 }
1965
1966 // If there are any cached send ops that need to be replayed on the
1967 // current subchannel call, creates and returns a new subchannel batch
1968 // to replay those ops.  Otherwise, returns nullptr.
1969 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
1970     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
1971   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1972   call_data* calld = static_cast<call_data*>(elem->call_data);
1973   subchannel_batch_data* replay_batch_data = nullptr;
1974   // send_initial_metadata.
1975   if (calld->seen_send_initial_metadata &&
1976       !retry_state->started_send_initial_metadata &&
1977       !calld->pending_send_initial_metadata) {
1978     if (grpc_client_channel_trace.enabled()) {
1979       gpr_log(GPR_INFO,
1980               "chand=%p calld=%p: replaying previously completed "
1981               "send_initial_metadata op",
1982               chand, calld);
1983     }
1984     replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
1985     add_retriable_send_initial_metadata_op(calld, retry_state,
1986                                            replay_batch_data);
1987   }
1988   // send_message.
1989   // Note that we can only have one send_message op in flight at a time.
1990   if (retry_state->started_send_message_count < calld->send_messages.size() &&
1991       retry_state->started_send_message_count ==
1992           retry_state->completed_send_message_count &&
1993       !calld->pending_send_message) {
1994     if (grpc_client_channel_trace.enabled()) {
1995       gpr_log(GPR_INFO,
1996               "chand=%p calld=%p: replaying previously completed "
1997               "send_message op",
1998               chand, calld);
1999     }
2000     if (replay_batch_data == nullptr) {
2001       replay_batch_data =
2002           batch_data_create(elem, 1, true /* set_on_complete */);
2003     }
2004     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2005   }
2006   // send_trailing_metadata.
2007   // Note that we only add this op if we have no more send_message ops
2008   // to start, since we can't send down any more send_message ops after
2009   // send_trailing_metadata.
2010   if (calld->seen_send_trailing_metadata &&
2011       retry_state->started_send_message_count == calld->send_messages.size() &&
2012       !retry_state->started_send_trailing_metadata &&
2013       !calld->pending_send_trailing_metadata) {
2014     if (grpc_client_channel_trace.enabled()) {
2015       gpr_log(GPR_INFO,
2016               "chand=%p calld=%p: replaying previously completed "
2017               "send_trailing_metadata op",
2018               chand, calld);
2019     }
2020     if (replay_batch_data == nullptr) {
2021       replay_batch_data =
2022           batch_data_create(elem, 1, true /* set_on_complete */);
2023     }
2024     add_retriable_send_trailing_metadata_op(calld, retry_state,
2025                                             replay_batch_data);
2026   }
2027   return replay_batch_data;
2028 }
2029
2030 // Adds subchannel batches for pending batches to batches, updating
2031 // *num_batches as needed.
2032 static void add_subchannel_batches_for_pending_batches(
2033     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2034     grpc_core::CallCombinerClosureList* closures) {
2035   call_data* calld = static_cast<call_data*>(elem->call_data);
2036   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2037     pending_batch* pending = &calld->pending_batches[i];
2038     grpc_transport_stream_op_batch* batch = pending->batch;
2039     if (batch == nullptr) continue;
2040     // Skip any batch that either (a) has already been started on this
2041     // subchannel call or (b) we can't start yet because we're still
2042     // replaying send ops that need to be completed first.
2043     // TODO(roth): Note that if any one op in the batch can't be sent
2044     // yet due to ops that we're replaying, we don't start any of the ops
2045     // in the batch.  This is probably okay, but it could conceivably
2046     // lead to increased latency in some cases -- e.g., we could delay
2047     // starting a recv op due to it being in the same batch with a send
2048     // op.  If/when we revamp the callback protocol in
2049     // transport_stream_op_batch, we may be able to fix this.
2050     if (batch->send_initial_metadata &&
2051         retry_state->started_send_initial_metadata) {
2052       continue;
2053     }
2054     if (batch->send_message && retry_state->completed_send_message_count <
2055                                    retry_state->started_send_message_count) {
2056       continue;
2057     }
2058     // Note that we only start send_trailing_metadata if we have no more
2059     // send_message ops to start, since we can't send down any more
2060     // send_message ops after send_trailing_metadata.
2061     if (batch->send_trailing_metadata &&
2062         (retry_state->started_send_message_count + batch->send_message <
2063              calld->send_messages.size() ||
2064          retry_state->started_send_trailing_metadata)) {
2065       continue;
2066     }
2067     if (batch->recv_initial_metadata &&
2068         retry_state->started_recv_initial_metadata) {
2069       continue;
2070     }
2071     if (batch->recv_message && retry_state->completed_recv_message_count <
2072                                    retry_state->started_recv_message_count) {
2073       continue;
2074     }
2075     if (batch->recv_trailing_metadata &&
2076         retry_state->started_recv_trailing_metadata) {
2077       // If we previously completed a recv_trailing_metadata op
2078       // initiated by start_internal_recv_trailing_metadata(), use the
2079       // result of that instead of trying to re-start this op.
2080       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2081                         nullptr))) {
2082         // If the batch completed, then trigger the completion callback
2083         // directly, so that we return the previously returned results to
2084         // the application.  Otherwise, just unref the internally
2085         // started subchannel batch, since we'll propagate the
2086         // completion when it completes.
2087         if (retry_state->completed_recv_trailing_metadata) {
2088           // Batches containing recv_trailing_metadata always succeed.
2089           closures->Add(
2090               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
2091               "re-executing recv_trailing_metadata_ready to propagate "
2092               "internally triggered result");
2093         } else {
2094           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2095         }
2096         retry_state->recv_trailing_metadata_internal_batch = nullptr;
2097       }
2098       continue;
2099     }
2100     // If we're not retrying, just send the batch as-is.
2101     if (calld->method_params == nullptr ||
2102         calld->method_params->retry_policy() == nullptr ||
2103         calld->retry_committed) {
2104       add_closure_for_subchannel_batch(elem, batch, closures);
2105       pending_batch_clear(calld, pending);
2106       continue;
2107     }
2108     // Create batch with the right number of callbacks.
2109     const bool has_send_ops = batch->send_initial_metadata ||
2110                               batch->send_message ||
2111                               batch->send_trailing_metadata;
2112     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2113                               batch->recv_message +
2114                               batch->recv_trailing_metadata;
2115     subchannel_batch_data* batch_data = batch_data_create(
2116         elem, num_callbacks, has_send_ops /* set_on_complete */);
2117     // Cache send ops if needed.
2118     maybe_cache_send_ops_for_batch(calld, pending);
2119     // send_initial_metadata.
2120     if (batch->send_initial_metadata) {
2121       add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2122     }
2123     // send_message.
2124     if (batch->send_message) {
2125       add_retriable_send_message_op(elem, retry_state, batch_data);
2126     }
2127     // send_trailing_metadata.
2128     if (batch->send_trailing_metadata) {
2129       add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2130     }
2131     // recv_initial_metadata.
2132     if (batch->recv_initial_metadata) {
2133       // recv_flags is only used on the server side.
2134       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2135       add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2136     }
2137     // recv_message.
2138     if (batch->recv_message) {
2139       add_retriable_recv_message_op(calld, retry_state, batch_data);
2140     }
2141     // recv_trailing_metadata.
2142     if (batch->recv_trailing_metadata) {
2143       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2144     }
2145     add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
2146     // Track number of pending subchannel send batches.
2147     // If this is the first one, take a ref to the call stack.
2148     if (batch->send_initial_metadata || batch->send_message ||
2149         batch->send_trailing_metadata) {
2150       if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2151         GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2152       }
2153       ++calld->num_pending_retriable_subchannel_send_batches;
2154     }
2155   }
2156 }
2157
2158 // Constructs and starts whatever subchannel batches are needed on the
2159 // subchannel call.
2160 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2161   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2162   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2163   call_data* calld = static_cast<call_data*>(elem->call_data);
2164   if (grpc_client_channel_trace.enabled()) {
2165     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
2166             chand, calld);
2167   }
2168   subchannel_call_retry_state* retry_state =
2169       static_cast<subchannel_call_retry_state*>(
2170           grpc_connected_subchannel_call_get_parent_data(
2171               calld->subchannel_call));
2172   // Construct list of closures to execute, one for each pending batch.
2173   grpc_core::CallCombinerClosureList closures;
2174   // Replay previously-returned send_* ops if needed.
2175   subchannel_batch_data* replay_batch_data =
2176       maybe_create_subchannel_batch_for_replay(elem, retry_state);
2177   if (replay_batch_data != nullptr) {
2178     add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2179                                      &closures);
2180     // Track number of pending subchannel send batches.
2181     // If this is the first one, take a ref to the call stack.
2182     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2183       GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2184     }
2185     ++calld->num_pending_retriable_subchannel_send_batches;
2186   }
2187   // Now add pending batches.
2188   add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
2189   // Start batches on subchannel call.
2190   if (grpc_client_channel_trace.enabled()) {
2191     gpr_log(GPR_INFO,
2192             "chand=%p calld=%p: starting %" PRIuPTR
2193             " retriable batches on subchannel_call=%p",
2194             chand, calld, closures.size(), calld->subchannel_call);
2195   }
2196   // Note: This will yield the call combiner.
2197   closures.RunClosures(calld->call_combiner);
2198 }
2199
2200 //
2201 // LB pick
2202 //
2203
2204 static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2205   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2206   call_data* calld = static_cast<call_data*>(elem->call_data);
2207   const size_t parent_data_size =
2208       calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
2209   const grpc_core::ConnectedSubchannel::CallArgs call_args = {
2210       calld->pollent,                                   // pollent
2211       calld->path,                                      // path
2212       calld->call_start_time,                           // start_time
2213       calld->deadline,                                  // deadline
2214       calld->arena,                                     // arena
2215       calld->request->pick()->subchannel_call_context,  // context
2216       calld->call_combiner,                             // call_combiner
2217       parent_data_size                                  // parent_data_size
2218   };
2219   grpc_error* new_error =
2220       calld->request->pick()->connected_subchannel->CreateCall(
2221           call_args, &calld->subchannel_call);
2222   if (grpc_client_channel_trace.enabled()) {
2223     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
2224             chand, calld, calld->subchannel_call, grpc_error_string(new_error));
2225   }
2226   if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
2227     new_error = grpc_error_add_child(new_error, error);
2228     pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2229   } else {
2230     if (parent_data_size > 0) {
2231       new (grpc_connected_subchannel_call_get_parent_data(
2232           calld->subchannel_call))
2233           subchannel_call_retry_state(
2234               calld->request->pick()->subchannel_call_context);
2235     }
2236     pending_batches_resume(elem);
2237   }
2238   GRPC_ERROR_UNREF(error);
2239 }
2240
2241 // Invoked when a pick is completed, on both success or failure.
2242 static void pick_done(void* arg, grpc_error* error) {
2243   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2244   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2245   call_data* calld = static_cast<call_data*>(elem->call_data);
2246   if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) {
2247     // Failed to create subchannel.
2248     // If there was no error, this is an LB policy drop, in which case
2249     // we return an error; otherwise, we may retry.
2250     grpc_status_code status = GRPC_STATUS_OK;
2251     grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2252                           nullptr);
2253     if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2254         !maybe_retry(elem, nullptr /* batch_data */, status,
2255                      nullptr /* server_pushback_md */)) {
2256       grpc_error* new_error =
2257           error == GRPC_ERROR_NONE
2258               ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2259                     "Call dropped by load balancing policy")
2260               : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2261                     "Failed to create subchannel", &error, 1);
2262       if (grpc_client_channel_trace.enabled()) {
2263         gpr_log(GPR_INFO,
2264                 "chand=%p calld=%p: failed to create subchannel: error=%s",
2265                 chand, calld, grpc_error_string(new_error));
2266       }
2267       pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2268     }
2269   } else {
2270     /* Create call on subchannel. */
2271     create_subchannel_call(elem, GRPC_ERROR_REF(error));
2272   }
2273 }
2274
2275 // If the channel is in TRANSIENT_FAILURE and the call is not
2276 // wait_for_ready=true, fails the call and returns true.
2277 static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
2278   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2279   call_data* calld = static_cast<call_data*>(elem->call_data);
2280   grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
2281   if (chand->request_router->GetConnectivityState() ==
2282           GRPC_CHANNEL_TRANSIENT_FAILURE &&
2283       (batch->payload->send_initial_metadata.send_initial_metadata_flags &
2284        GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
2285     pending_batches_fail(
2286         elem,
2287         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2288                                "channel is in state TRANSIENT_FAILURE"),
2289                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
2290         true /* yield_call_combiner */);
2291     return true;
2292   }
2293   return false;
2294 }
2295
2296 // Applies service config to the call.  Must be invoked once we know
2297 // that the resolver has returned results to the channel.
2298 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2299   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2300   call_data* calld = static_cast<call_data*>(elem->call_data);
2301   if (grpc_client_channel_trace.enabled()) {
2302     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2303             chand, calld);
2304   }
2305   if (chand->retry_throttle_data != nullptr) {
2306     calld->retry_throttle_data = chand->retry_throttle_data->Ref();
2307   }
2308   if (chand->method_params_table != nullptr) {
2309     calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2310         *chand->method_params_table, calld->path);
2311     if (calld->method_params != nullptr) {
2312       // If the deadline from the service config is shorter than the one
2313       // from the client API, reset the deadline timer.
2314       if (chand->deadline_checking_enabled &&
2315           calld->method_params->timeout() != 0) {
2316         const grpc_millis per_method_deadline =
2317             grpc_timespec_to_millis_round_up(calld->call_start_time) +
2318             calld->method_params->timeout();
2319         if (per_method_deadline < calld->deadline) {
2320           calld->deadline = per_method_deadline;
2321           grpc_deadline_state_reset(elem, calld->deadline);
2322         }
2323       }
2324       // If the service config set wait_for_ready and the application
2325       // did not explicitly set it, use the value from the service config.
2326       uint32_t* send_initial_metadata_flags =
2327           &calld->pending_batches[0]
2328                .batch->payload->send_initial_metadata
2329                .send_initial_metadata_flags;
2330       if (GPR_UNLIKELY(
2331               calld->method_params->wait_for_ready() !=
2332                   ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2333               !(*send_initial_metadata_flags &
2334                 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2335         if (calld->method_params->wait_for_ready() ==
2336             ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2337           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2338         } else {
2339           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2340         }
2341       }
2342     }
2343   }
2344   // If no retry policy, disable retries.
2345   // TODO(roth): Remove this when adding support for transparent retries.
2346   if (calld->method_params == nullptr ||
2347       calld->method_params->retry_policy() == nullptr) {
2348     calld->enable_retries = false;
2349   }
2350 }
2351
2352 // Invoked once resolver results are available.
2353 static bool maybe_apply_service_config_to_call_locked(void* arg) {
2354   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2355   call_data* calld = static_cast<call_data*>(elem->call_data);
2356   // Only get service config data on the first attempt.
2357   if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
2358     apply_service_config_to_call_locked(elem);
2359     // Check this after applying service config, since it may have
2360     // affected the call's wait_for_ready value.
2361     if (fail_call_if_in_transient_failure(elem)) return false;
2362   }
2363   return true;
2364 }
2365
2366 static void start_pick_locked(void* arg, grpc_error* ignored) {
2367   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2368   call_data* calld = static_cast<call_data*>(elem->call_data);
2369   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2370   GPR_ASSERT(!calld->have_request);
2371   GPR_ASSERT(calld->subchannel_call == nullptr);
2372   // Normally, we want to do this check until after we've processed the
2373   // service config, so that we can honor the wait_for_ready setting in
2374   // the service config.  However, if the channel is in TRANSIENT_FAILURE
2375   // and we don't have an LB policy at this point, that means that the
2376   // resolver has returned a failure, so we're not going to get a service
2377   // config right away.  In that case, we fail the call now based on the
2378   // wait_for_ready value passed in from the application.
2379   if (chand->request_router->lb_policy() == nullptr &&
2380       fail_call_if_in_transient_failure(elem)) {
2381     return;
2382   }
2383   // If this is a retry, use the send_initial_metadata payload that
2384   // we've cached; otherwise, use the pending batch.  The
2385   // send_initial_metadata batch will be the first pending batch in the
2386   // list, as set by get_batch_index() above.
2387   // TODO(roth): What if the LB policy needs to add something to the
2388   // call's initial metadata, and then there's a retry?  We don't want
2389   // the new metadata to be added twice.  We might need to somehow
2390   // allocate the subchannel batch earlier so that we can give the
2391   // subchannel's copy of the metadata batch (which is copied for each
2392   // attempt) to the LB policy instead the one from the parent channel.
2393   grpc_metadata_batch* initial_metadata =
2394       calld->seen_send_initial_metadata
2395           ? &calld->send_initial_metadata
2396           : calld->pending_batches[0]
2397                 .batch->payload->send_initial_metadata.send_initial_metadata;
2398   uint32_t* initial_metadata_flags =
2399       calld->seen_send_initial_metadata
2400           ? &calld->send_initial_metadata_flags
2401           : &calld->pending_batches[0]
2402                  .batch->payload->send_initial_metadata
2403                  .send_initial_metadata_flags;
2404   GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2405                     grpc_schedule_on_exec_ctx);
2406   calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent,
2407                       initial_metadata, initial_metadata_flags,
2408                       maybe_apply_service_config_to_call_locked, elem,
2409                       &calld->pick_closure);
2410   calld->have_request = true;
2411   chand->request_router->RouteCallLocked(calld->request.get());
2412 }
2413
2414 //
2415 // filter call vtable functions
2416 //
2417
2418 static void cc_start_transport_stream_op_batch(
2419     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2420   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2421   call_data* calld = static_cast<call_data*>(elem->call_data);
2422   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2423   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
2424     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2425   }
2426   // If we've previously been cancelled, immediately fail any new batches.
2427   if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
2428     if (grpc_client_channel_trace.enabled()) {
2429       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2430               chand, calld, grpc_error_string(calld->cancel_error));
2431     }
2432     // Note: This will release the call combiner.
2433     grpc_transport_stream_op_batch_finish_with_failure(
2434         batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2435     return;
2436   }
2437   // Handle cancellation.
2438   if (GPR_UNLIKELY(batch->cancel_stream)) {
2439     // Stash a copy of cancel_error in our call data, so that we can use
2440     // it for subsequent operations.  This ensures that if the call is
2441     // cancelled before any batches are passed down (e.g., if the deadline
2442     // is in the past when the call starts), we can return the right
2443     // error to the caller when the first batch does get passed down.
2444     GRPC_ERROR_UNREF(calld->cancel_error);
2445     calld->cancel_error =
2446         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2447     if (grpc_client_channel_trace.enabled()) {
2448       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2449               calld, grpc_error_string(calld->cancel_error));
2450     }
2451     // If we do not have a subchannel call (i.e., a pick has not yet
2452     // been started), fail all pending batches.  Otherwise, send the
2453     // cancellation down to the subchannel call.
2454     if (calld->subchannel_call == nullptr) {
2455       pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
2456                            false /* yield_call_combiner */);
2457       // Note: This will release the call combiner.
2458       grpc_transport_stream_op_batch_finish_with_failure(
2459           batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2460     } else {
2461       // Note: This will release the call combiner.
2462       grpc_subchannel_call_process_op(calld->subchannel_call, batch);
2463     }
2464     return;
2465   }
2466   // Add the batch to the pending list.
2467   pending_batches_add(elem, batch);
2468   // Check if we've already gotten a subchannel call.
2469   // Note that once we have completed the pick, we do not need to enter
2470   // the channel combiner, which is more efficient (especially for
2471   // streaming calls).
2472   if (calld->subchannel_call != nullptr) {
2473     if (grpc_client_channel_trace.enabled()) {
2474       gpr_log(GPR_INFO,
2475               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
2476               calld, calld->subchannel_call);
2477     }
2478     pending_batches_resume(elem);
2479     return;
2480   }
2481   // We do not yet have a subchannel call.
2482   // For batches containing a send_initial_metadata op, enter the channel
2483   // combiner to start a pick.
2484   if (GPR_LIKELY(batch->send_initial_metadata)) {
2485     if (grpc_client_channel_trace.enabled()) {
2486       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
2487               chand, calld);
2488     }
2489     GRPC_CLOSURE_SCHED(
2490         GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
2491                           elem, grpc_combiner_scheduler(chand->combiner)),
2492         GRPC_ERROR_NONE);
2493   } else {
2494     // For all other batches, release the call combiner.
2495     if (grpc_client_channel_trace.enabled()) {
2496       gpr_log(GPR_INFO,
2497               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2498               calld);
2499     }
2500     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2501                             "batch does not include send_initial_metadata");
2502   }
2503 }
2504
2505 /* Constructor for call_data */
2506 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
2507                                      const grpc_call_element_args* args) {
2508   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2509   new (elem->call_data) call_data(elem, *chand, *args);
2510   return GRPC_ERROR_NONE;
2511 }
2512
2513 /* Destructor for call_data */
2514 static void cc_destroy_call_elem(grpc_call_element* elem,
2515                                  const grpc_call_final_info* final_info,
2516                                  grpc_closure* then_schedule_closure) {
2517   call_data* calld = static_cast<call_data*>(elem->call_data);
2518   if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
2519     grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
2520                                              then_schedule_closure);
2521     then_schedule_closure = nullptr;
2522   }
2523   calld->~call_data();
2524   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
2525 }
2526
2527 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
2528                                           grpc_polling_entity* pollent) {
2529   call_data* calld = static_cast<call_data*>(elem->call_data);
2530   calld->pollent = pollent;
2531 }
2532
2533 /*************************************************************************
2534  * EXPORTED SYMBOLS
2535  */
2536
2537 const grpc_channel_filter grpc_client_channel_filter = {
2538     cc_start_transport_stream_op_batch,
2539     cc_start_transport_op,
2540     sizeof(call_data),
2541     cc_init_call_elem,
2542     cc_set_pollset_or_pollset_set,
2543     cc_destroy_call_elem,
2544     sizeof(channel_data),
2545     cc_init_channel_elem,
2546     cc_destroy_channel_elem,
2547     cc_get_channel_info,
2548     "client-channel",
2549 };
2550
2551 void grpc_client_channel_set_channelz_node(
2552     grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
2553   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2554   chand->request_router->set_channelz_node(node);
2555 }
2556
2557 void grpc_client_channel_populate_child_refs(
2558     grpc_channel_element* elem,
2559     grpc_core::channelz::ChildRefsList* child_subchannels,
2560     grpc_core::channelz::ChildRefsList* child_channels) {
2561   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2562   if (chand->request_router->lb_policy() != nullptr) {
2563     chand->request_router->lb_policy()->FillChildRefsForChannelz(
2564         child_subchannels, child_channels);
2565   }
2566 }
2567
2568 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
2569   channel_data* chand = static_cast<channel_data*>(arg);
2570   chand->request_router->ExitIdleLocked();
2571   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
2572 }
2573
2574 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
2575     grpc_channel_element* elem, int try_to_connect) {
2576   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2577   grpc_connectivity_state out = chand->request_router->GetConnectivityState();
2578   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2579     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
2580     GRPC_CLOSURE_SCHED(
2581         GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
2582                             grpc_combiner_scheduler(chand->combiner)),
2583         GRPC_ERROR_NONE);
2584   }
2585   return out;
2586 }
2587
2588 typedef struct external_connectivity_watcher {
2589   channel_data* chand;
2590   grpc_polling_entity pollent;
2591   grpc_closure* on_complete;
2592   grpc_closure* watcher_timer_init;
2593   grpc_connectivity_state* state;
2594   grpc_closure my_closure;
2595   struct external_connectivity_watcher* next;
2596 } external_connectivity_watcher;
2597
2598 static external_connectivity_watcher* lookup_external_connectivity_watcher(
2599     channel_data* chand, grpc_closure* on_complete) {
2600   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2601   external_connectivity_watcher* w =
2602       chand->external_connectivity_watcher_list_head;
2603   while (w != nullptr && w->on_complete != on_complete) {
2604     w = w->next;
2605   }
2606   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2607   return w;
2608 }
2609
2610 static void external_connectivity_watcher_list_append(
2611     channel_data* chand, external_connectivity_watcher* w) {
2612   GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
2613
2614   gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
2615   GPR_ASSERT(!w->next);
2616   w->next = chand->external_connectivity_watcher_list_head;
2617   chand->external_connectivity_watcher_list_head = w;
2618   gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
2619 }
2620
2621 static void external_connectivity_watcher_list_remove(
2622     channel_data* chand, external_connectivity_watcher* to_remove) {
2623   GPR_ASSERT(
2624       lookup_external_connectivity_watcher(chand, to_remove->on_complete));
2625   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2626   if (to_remove == chand->external_connectivity_watcher_list_head) {
2627     chand->external_connectivity_watcher_list_head = to_remove->next;
2628     gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2629     return;
2630   }
2631   external_connectivity_watcher* w =
2632       chand->external_connectivity_watcher_list_head;
2633   while (w != nullptr) {
2634     if (w->next == to_remove) {
2635       w->next = w->next->next;
2636       gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2637       return;
2638     }
2639     w = w->next;
2640   }
2641   GPR_UNREACHABLE_CODE(return );
2642 }
2643
2644 int grpc_client_channel_num_external_connectivity_watchers(
2645     grpc_channel_element* elem) {
2646   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2647   int count = 0;
2648
2649   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2650   external_connectivity_watcher* w =
2651       chand->external_connectivity_watcher_list_head;
2652   while (w != nullptr) {
2653     count++;
2654     w = w->next;
2655   }
2656   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2657
2658   return count;
2659 }
2660
2661 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
2662   external_connectivity_watcher* w =
2663       static_cast<external_connectivity_watcher*>(arg);
2664   grpc_closure* follow_up = w->on_complete;
2665   grpc_polling_entity_del_from_pollset_set(&w->pollent,
2666                                            w->chand->interested_parties);
2667   GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
2668                            "external_connectivity_watcher");
2669   external_connectivity_watcher_list_remove(w->chand, w);
2670   gpr_free(w);
2671   GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
2672 }
2673
2674 static void watch_connectivity_state_locked(void* arg,
2675                                             grpc_error* error_ignored) {
2676   external_connectivity_watcher* w =
2677       static_cast<external_connectivity_watcher*>(arg);
2678   external_connectivity_watcher* found = nullptr;
2679   if (w->state != nullptr) {
2680     external_connectivity_watcher_list_append(w->chand, w);
2681     // An assumption is being made that the closure is scheduled on the exec ctx
2682     // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
2683     GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
2684     GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
2685                       grpc_combiner_scheduler(w->chand->combiner));
2686     w->chand->request_router->NotifyOnConnectivityStateChange(w->state,
2687                                                               &w->my_closure);
2688   } else {
2689     GPR_ASSERT(w->watcher_timer_init == nullptr);
2690     found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
2691     if (found) {
2692       GPR_ASSERT(found->on_complete == w->on_complete);
2693       found->chand->request_router->NotifyOnConnectivityStateChange(
2694           nullptr, &found->my_closure);
2695     }
2696     grpc_polling_entity_del_from_pollset_set(&w->pollent,
2697                                              w->chand->interested_parties);
2698     GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
2699                              "external_connectivity_watcher");
2700     gpr_free(w);
2701   }
2702 }
2703
2704 void grpc_client_channel_watch_connectivity_state(
2705     grpc_channel_element* elem, grpc_polling_entity pollent,
2706     grpc_connectivity_state* state, grpc_closure* closure,
2707     grpc_closure* watcher_timer_init) {
2708   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2709   external_connectivity_watcher* w =
2710       static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
2711   w->chand = chand;
2712   w->pollent = pollent;
2713   w->on_complete = closure;
2714   w->state = state;
2715   w->watcher_timer_init = watcher_timer_init;
2716   grpc_polling_entity_add_to_pollset_set(&w->pollent,
2717                                          chand->interested_parties);
2718   GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
2719                          "external_connectivity_watcher");
2720   GRPC_CLOSURE_SCHED(
2721       GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
2722                         grpc_combiner_scheduler(chand->combiner)),
2723       GRPC_ERROR_NONE);
2724 }
2725
2726 grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
2727     grpc_call_element* elem) {
2728   call_data* calld = static_cast<call_data*>(elem->call_data);
2729   return calld->subchannel_call;
2730 }