82ce253c83c3597eb3a0445fee43820b7241595b
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / client_channel.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/iomgr/combiner.h"
55 #include "src/core/lib/iomgr/iomgr.h"
56 #include "src/core/lib/iomgr/polling_entity.h"
57 #include "src/core/lib/profiling/timers.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/transport/connectivity_state.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/static_metadata.h"
66 #include "src/core/lib/transport/status_metadata.h"
67
68 using grpc_core::internal::ClientChannelMethodParams;
69 using grpc_core::internal::ClientChannelMethodParamsTable;
70 using grpc_core::internal::ProcessedResolverResult;
71 using grpc_core::internal::ServerRetryThrottleData;
72
73 using grpc_core::LoadBalancingPolicy;
74
75 /* Client channel implementation */
76
77 // By default, we buffer 256 KiB per RPC for retries.
78 // TODO(roth): Do we have any data to suggest a better value?
79 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
80
81 // This value was picked arbitrarily.  It can be changed if there is
82 // any even moderately compelling reason to do so.
83 #define RETRY_BACKOFF_JITTER 0.2
84
85 grpc_core::TraceFlag grpc_client_channel_call_trace(false,
86                                                     "client_channel_call");
87 grpc_core::TraceFlag grpc_client_channel_routing_trace(
88     false, "client_channel_routing");
89
90 /*************************************************************************
91  * CHANNEL-WIDE FUNCTIONS
92  */
93
94 struct external_connectivity_watcher;
95
96 struct QueuedPick {
97   LoadBalancingPolicy::PickArgs pick;
98   grpc_call_element* elem;
99   QueuedPick* next = nullptr;
100 };
101
102 typedef struct client_channel_channel_data {
103   bool deadline_checking_enabled;
104   bool enable_retries;
105   size_t per_rpc_retry_buffer_size;
106
107   /** combiner protecting all variables below in this data structure */
108   grpc_combiner* combiner;
109   /** owning stack */
110   grpc_channel_stack* owning_stack;
111   /** interested parties (owned) */
112   grpc_pollset_set* interested_parties;
113   // Client channel factory.
114   grpc_core::ClientChannelFactory* client_channel_factory;
115   // Subchannel pool.
116   grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
117
118   grpc_core::channelz::ClientChannelNode* channelz_node;
119
120   // Resolving LB policy.
121   grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
122   // Subchannel picker from LB policy.
123   grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
124   // Linked list of queued picks.
125   QueuedPick* queued_picks;
126
127   bool have_service_config;
128   /** retry throttle data from service config */
129   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
130   /** per-method service config data */
131   grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
132
133   /* the following properties are guarded by a mutex since APIs require them
134      to be instantaneously available */
135   gpr_mu info_mu;
136   grpc_core::UniquePtr<char> info_lb_policy_name;
137   grpc_core::UniquePtr<char> info_service_config_json;
138
139   grpc_connectivity_state_tracker state_tracker;
140   grpc_error* disconnect_error;
141
142   /* external_connectivity_watcher_list head is guarded by its own mutex, since
143    * counts need to be grabbed immediately without polling on a cq */
144   gpr_mu external_connectivity_watcher_list_mu;
145   struct external_connectivity_watcher* external_connectivity_watcher_list_head;
146 } channel_data;
147
148 // Forward declarations.
149 static void start_pick_locked(void* arg, grpc_error* ignored);
150 static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem);
151
152 static const char* get_channel_connectivity_state_change_string(
153     grpc_connectivity_state state) {
154   switch (state) {
155     case GRPC_CHANNEL_IDLE:
156       return "Channel state change to IDLE";
157     case GRPC_CHANNEL_CONNECTING:
158       return "Channel state change to CONNECTING";
159     case GRPC_CHANNEL_READY:
160       return "Channel state change to READY";
161     case GRPC_CHANNEL_TRANSIENT_FAILURE:
162       return "Channel state change to TRANSIENT_FAILURE";
163     case GRPC_CHANNEL_SHUTDOWN:
164       return "Channel state change to SHUTDOWN";
165   }
166   GPR_UNREACHABLE_CODE(return "UNKNOWN");
167 }
168
169 static void set_connectivity_state_and_picker_locked(
170     channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
171     const char* reason,
172     grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
173   // Update connectivity state.
174   grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
175                               reason);
176   if (chand->channelz_node != nullptr) {
177     chand->channelz_node->AddTraceEvent(
178         grpc_core::channelz::ChannelTrace::Severity::Info,
179         grpc_slice_from_static_string(
180             get_channel_connectivity_state_change_string(state)));
181   }
182   // Update picker.
183   chand->picker = std::move(picker);
184   // Re-process queued picks.
185   for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
186        pick = pick->next) {
187     start_pick_locked(pick->elem, GRPC_ERROR_NONE);
188   }
189 }
190
191 namespace grpc_core {
192 namespace {
193
194 class ClientChannelControlHelper
195     : public LoadBalancingPolicy::ChannelControlHelper {
196  public:
197   explicit ClientChannelControlHelper(channel_data* chand) : chand_(chand) {
198     GRPC_CHANNEL_STACK_REF(chand_->owning_stack, "ClientChannelControlHelper");
199   }
200
201   ~ClientChannelControlHelper() override {
202     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack,
203                              "ClientChannelControlHelper");
204   }
205
206   Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
207     grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
208         chand_->subchannel_pool.get());
209     grpc_channel_args* new_args =
210         grpc_channel_args_copy_and_add(&args, &arg, 1);
211     Subchannel* subchannel =
212         chand_->client_channel_factory->CreateSubchannel(new_args);
213     grpc_channel_args_destroy(new_args);
214     return subchannel;
215   }
216
217   grpc_channel* CreateChannel(const char* target,
218                               const grpc_channel_args& args) override {
219     return chand_->client_channel_factory->CreateChannel(target, &args);
220   }
221
222   void UpdateState(
223       grpc_connectivity_state state, grpc_error* state_error,
224       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
225     if (grpc_client_channel_routing_trace.enabled()) {
226       const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
227                               ? ""
228                               : " (ignoring -- channel shutting down)";
229       gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
230               chand_, grpc_connectivity_state_name(state),
231               grpc_error_string(state_error), picker.get(), extra);
232     }
233     // Do update only if not shutting down.
234     if (chand_->disconnect_error == GRPC_ERROR_NONE) {
235       set_connectivity_state_and_picker_locked(chand_, state, state_error,
236                                                "helper", std::move(picker));
237     } else {
238       GRPC_ERROR_UNREF(state_error);
239     }
240   }
241
242   // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
243   void RequestReresolution() override {}
244
245  private:
246   channel_data* chand_;
247 };
248
249 }  // namespace
250 }  // namespace grpc_core
251
252 // Synchronous callback from chand->resolving_lb_policy to process a resolver
253 // result update.
254 static bool process_resolver_result_locked(
255     void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
256     grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
257   channel_data* chand = static_cast<channel_data*>(arg);
258   chand->have_service_config = true;
259   ProcessedResolverResult resolver_result(result, chand->enable_retries);
260   grpc_core::UniquePtr<char> service_config_json =
261       resolver_result.service_config_json();
262   if (grpc_client_channel_routing_trace.enabled()) {
263     gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
264             chand, service_config_json.get());
265   }
266   // Update channel state.
267   chand->retry_throttle_data = resolver_result.retry_throttle_data();
268   chand->method_params_table = resolver_result.method_params_table();
269   // Swap out the data used by cc_get_channel_info().
270   gpr_mu_lock(&chand->info_mu);
271   chand->info_lb_policy_name = resolver_result.lb_policy_name();
272   const bool service_config_changed =
273       ((service_config_json == nullptr) !=
274        (chand->info_service_config_json == nullptr)) ||
275       (service_config_json != nullptr &&
276        strcmp(service_config_json.get(),
277               chand->info_service_config_json.get()) != 0);
278   chand->info_service_config_json = std::move(service_config_json);
279   gpr_mu_unlock(&chand->info_mu);
280   // Return results.
281   *lb_policy_name = chand->info_lb_policy_name.get();
282   *lb_policy_config = resolver_result.lb_policy_config();
283   // Apply service config to queued picks.
284   for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
285        pick = pick->next) {
286     maybe_apply_service_config_to_call_locked(pick->elem);
287   }
288   return service_config_changed;
289 }
290
291 static grpc_error* do_ping_locked(channel_data* chand, grpc_transport_op* op) {
292   grpc_error* error = GRPC_ERROR_NONE;
293   grpc_connectivity_state state =
294       grpc_connectivity_state_get(&chand->state_tracker, &error);
295   if (state != GRPC_CHANNEL_READY) {
296     grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
297         "channel not connected", &error, 1);
298     GRPC_ERROR_UNREF(error);
299     return new_error;
300   }
301   LoadBalancingPolicy::PickArgs pick;
302   chand->picker->Pick(&pick, &error);
303   if (pick.connected_subchannel != nullptr) {
304     pick.connected_subchannel->Ping(op->send_ping.on_initiate,
305                                     op->send_ping.on_ack);
306   } else {
307     if (error == GRPC_ERROR_NONE) {
308       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
309           "LB policy dropped call on ping");
310     }
311   }
312   return error;
313 }
314
315 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
316   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
317   grpc_channel_element* elem =
318       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
319   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
320
321   if (op->on_connectivity_state_change != nullptr) {
322     grpc_connectivity_state_notify_on_state_change(
323         &chand->state_tracker, op->connectivity_state,
324         op->on_connectivity_state_change);
325     op->on_connectivity_state_change = nullptr;
326     op->connectivity_state = nullptr;
327   }
328
329   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
330     grpc_error* error = do_ping_locked(chand, op);
331     if (error != GRPC_ERROR_NONE) {
332       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
333       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
334     }
335     op->bind_pollset = nullptr;
336     op->send_ping.on_initiate = nullptr;
337     op->send_ping.on_ack = nullptr;
338   }
339
340   if (op->reset_connect_backoff) {
341     chand->resolving_lb_policy->ResetBackoffLocked();
342   }
343
344   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
345     chand->disconnect_error = op->disconnect_with_error;
346     grpc_pollset_set_del_pollset_set(
347         chand->resolving_lb_policy->interested_parties(),
348         chand->interested_parties);
349     chand->resolving_lb_policy.reset();
350     set_connectivity_state_and_picker_locked(
351         chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
352         "shutdown from API",
353         grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
354             grpc_core::New<LoadBalancingPolicy::TransientFailurePicker>(
355                 GRPC_ERROR_REF(op->disconnect_with_error))));
356   }
357
358   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
359   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
360 }
361
362 static void cc_start_transport_op(grpc_channel_element* elem,
363                                   grpc_transport_op* op) {
364   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
365
366   GPR_ASSERT(op->set_accept_stream == false);
367   if (op->bind_pollset != nullptr) {
368     grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
369   }
370
371   op->handler_private.extra_arg = elem;
372   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
373   GRPC_CLOSURE_SCHED(
374       GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
375                         op, grpc_combiner_scheduler(chand->combiner)),
376       GRPC_ERROR_NONE);
377 }
378
379 static void cc_get_channel_info(grpc_channel_element* elem,
380                                 const grpc_channel_info* info) {
381   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
382   gpr_mu_lock(&chand->info_mu);
383   if (info->lb_policy_name != nullptr) {
384     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
385   }
386   if (info->service_config_json != nullptr) {
387     *info->service_config_json =
388         gpr_strdup(chand->info_service_config_json.get());
389   }
390   gpr_mu_unlock(&chand->info_mu);
391 }
392
393 /* Constructor for channel_data */
394 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
395                                         grpc_channel_element_args* args) {
396   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
397   GPR_ASSERT(args->is_last);
398   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
399   // Initialize data members.
400   chand->combiner = grpc_combiner_create();
401   grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
402                                "client_channel");
403   chand->disconnect_error = GRPC_ERROR_NONE;
404   gpr_mu_init(&chand->info_mu);
405   gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
406
407   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
408   chand->external_connectivity_watcher_list_head = nullptr;
409   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
410
411   chand->owning_stack = args->channel_stack;
412   chand->deadline_checking_enabled =
413       grpc_deadline_checking_enabled(args->channel_args);
414   chand->interested_parties = grpc_pollset_set_create();
415   grpc_client_channel_start_backup_polling(chand->interested_parties);
416   // Record max per-RPC retry buffer size.
417   const grpc_arg* arg = grpc_channel_args_find(
418       args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
419   chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
420       arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
421   // Record enable_retries.
422   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
423   chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
424   // Record client channel factory.
425   chand->client_channel_factory =
426       grpc_core::ClientChannelFactory::GetFromChannelArgs(args->channel_args);
427   if (chand->client_channel_factory == nullptr) {
428     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
429         "Missing client channel factory in args for client channel filter");
430   }
431   // Get server name to resolve, using proxy mapper if needed.
432   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
433   if (arg == nullptr) {
434     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
435         "Missing server uri in args for client channel filter");
436   }
437   if (arg->type != GRPC_ARG_STRING) {
438     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
439         "server uri arg must be a string");
440   }
441   char* proxy_name = nullptr;
442   grpc_channel_args* new_args = nullptr;
443   grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
444                               &proxy_name, &new_args);
445   grpc_core::UniquePtr<char> target_uri(
446       proxy_name != nullptr ? proxy_name : gpr_strdup(arg->value.string));
447   // Instantiate subchannel pool.
448   arg = grpc_channel_args_find(args->channel_args,
449                                GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL);
450   if (grpc_channel_arg_get_bool(arg, false)) {
451     chand->subchannel_pool =
452         grpc_core::MakeRefCounted<grpc_core::LocalSubchannelPool>();
453   } else {
454     chand->subchannel_pool = grpc_core::GlobalSubchannelPool::instance();
455   }
456   // Instantiate resolving LB policy.
457   LoadBalancingPolicy::Args lb_args;
458   lb_args.combiner = chand->combiner;
459   lb_args.channel_control_helper =
460       grpc_core::UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
461           grpc_core::New<grpc_core::ClientChannelControlHelper>(chand));
462   lb_args.args = new_args != nullptr ? new_args : args->channel_args;
463   grpc_error* error = GRPC_ERROR_NONE;
464   chand->resolving_lb_policy.reset(
465       grpc_core::New<grpc_core::ResolvingLoadBalancingPolicy>(
466           std::move(lb_args), &grpc_client_channel_routing_trace,
467           std::move(target_uri), process_resolver_result_locked, chand,
468           &error));
469   grpc_channel_args_destroy(new_args);
470   if (error != GRPC_ERROR_NONE) {
471     // Orphan the resolving LB policy and flush the exec_ctx to ensure
472     // that it finishes shutting down.  This ensures that if we are
473     // failing, we destroy the ClientChannelControlHelper (and thus
474     // unref the channel stack) before we return.
475     // TODO(roth): This is not a complete solution, because it only
476     // catches the case where channel stack initialization fails in this
477     // particular filter.  If there is a failure in a different filter, we
478     // will leave a dangling ref here, which can cause a crash.  Fortunately,
479     // in practice, there are no other filters that can cause failures in
480     // channel stack initialization, so this works for now.
481     chand->resolving_lb_policy.reset();
482     grpc_core::ExecCtx::Get()->Flush();
483   } else {
484     grpc_pollset_set_add_pollset_set(
485         chand->resolving_lb_policy->interested_parties(),
486         chand->interested_parties);
487     if (grpc_client_channel_routing_trace.enabled()) {
488       gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", chand,
489               chand->resolving_lb_policy.get());
490     }
491   }
492   return error;
493 }
494
495 /* Destructor for channel_data */
496 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
497   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
498   if (chand->resolving_lb_policy != nullptr) {
499     grpc_pollset_set_del_pollset_set(
500         chand->resolving_lb_policy->interested_parties(),
501         chand->interested_parties);
502     chand->resolving_lb_policy.reset();
503   }
504   // TODO(roth): Once we convert the filter API to C++, there will no
505   // longer be any need to explicitly reset these smart pointer data members.
506   chand->picker.reset();
507   chand->subchannel_pool.reset();
508   chand->info_lb_policy_name.reset();
509   chand->info_service_config_json.reset();
510   chand->retry_throttle_data.reset();
511   chand->method_params_table.reset();
512   grpc_client_channel_stop_backup_polling(chand->interested_parties);
513   grpc_pollset_set_destroy(chand->interested_parties);
514   GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
515   GRPC_ERROR_UNREF(chand->disconnect_error);
516   grpc_connectivity_state_destroy(&chand->state_tracker);
517   gpr_mu_destroy(&chand->info_mu);
518   gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
519 }
520
521 /*************************************************************************
522  * PER-CALL FUNCTIONS
523  */
524
525 // Max number of batches that can be pending on a call at any given
526 // time.  This includes one batch for each of the following ops:
527 //   recv_initial_metadata
528 //   send_initial_metadata
529 //   recv_message
530 //   send_message
531 //   recv_trailing_metadata
532 //   send_trailing_metadata
533 #define MAX_PENDING_BATCHES 6
534
535 // Retry support:
536 //
537 // In order to support retries, we act as a proxy for stream op batches.
538 // When we get a batch from the surface, we add it to our list of pending
539 // batches, and we then use those batches to construct separate "child"
540 // batches to be started on the subchannel call.  When the child batches
541 // return, we then decide which pending batches have been completed and
542 // schedule their callbacks accordingly.  If a subchannel call fails and
543 // we want to retry it, we do a new pick and start again, constructing
544 // new "child" batches for the new subchannel call.
545 //
546 // Note that retries are committed when receiving data from the server
547 // (except for Trailers-Only responses).  However, there may be many
548 // send ops started before receiving any data, so we may have already
549 // completed some number of send ops (and returned the completions up to
550 // the surface) by the time we realize that we need to retry.  To deal
551 // with this, we cache data for send ops, so that we can replay them on a
552 // different subchannel call even after we have completed the original
553 // batches.
554 //
555 // There are two sets of data to maintain:
556 // - In call_data (in the parent channel), we maintain a list of pending
557 //   ops and cached data for send ops.
558 // - In the subchannel call, we maintain state to indicate what ops have
559 //   already been sent down to that call.
560 //
561 // When constructing the "child" batches, we compare those two sets of
562 // data to see which batches need to be sent to the subchannel call.
563
564 // TODO(roth): In subsequent PRs:
565 // - add support for transparent retries (including initial metadata)
566 // - figure out how to record stats in census for retries
567 //   (census filter is on top of this one)
568 // - add census stats for retries
569
570 namespace grpc_core {
571 namespace {
572 class QueuedPickCanceller;
573 }  // namespace
574 }  // namespace grpc_core
575
576 namespace {
577
578 struct call_data;
579
580 // State used for starting a retryable batch on a subchannel call.
581 // This provides its own grpc_transport_stream_op_batch and other data
582 // structures needed to populate the ops in the batch.
583 // We allocate one struct on the arena for each attempt at starting a
584 // batch on a given subchannel call.
585 struct subchannel_batch_data {
586   subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount,
587                         bool set_on_complete);
588   // All dtor code must be added in `destroy`. This is because we may
589   // call closures in `subchannel_batch_data` after they are unrefed by
590   // `batch_data_unref`, and msan would complain about accessing this class
591   // after calling dtor. As a result we cannot call the `dtor` in
592   // `batch_data_unref`.
593   // TODO(soheil): We should try to call the dtor in `batch_data_unref`.
594   ~subchannel_batch_data() { destroy(); }
595   void destroy();
596
597   gpr_refcount refs;
598   grpc_call_element* elem;
599   grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
600   // The batch to use in the subchannel call.
601   // Its payload field points to subchannel_call_retry_state.batch_payload.
602   grpc_transport_stream_op_batch batch;
603   // For intercepting on_complete.
604   grpc_closure on_complete;
605 };
606
607 // Retry state associated with a subchannel call.
608 // Stored in the parent_data of the subchannel call object.
609 struct subchannel_call_retry_state {
610   explicit subchannel_call_retry_state(grpc_call_context_element* context)
611       : batch_payload(context),
612         started_send_initial_metadata(false),
613         completed_send_initial_metadata(false),
614         started_send_trailing_metadata(false),
615         completed_send_trailing_metadata(false),
616         started_recv_initial_metadata(false),
617         completed_recv_initial_metadata(false),
618         started_recv_trailing_metadata(false),
619         completed_recv_trailing_metadata(false),
620         retry_dispatched(false) {}
621
622   // subchannel_batch_data.batch.payload points to this.
623   grpc_transport_stream_op_batch_payload batch_payload;
624   // For send_initial_metadata.
625   // Note that we need to make a copy of the initial metadata for each
626   // subchannel call instead of just referring to the copy in call_data,
627   // because filters in the subchannel stack will probably add entries,
628   // so we need to start in a pristine state for each attempt of the call.
629   grpc_linked_mdelem* send_initial_metadata_storage;
630   grpc_metadata_batch send_initial_metadata;
631   // For send_message.
632   grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
633       send_message;
634   // For send_trailing_metadata.
635   grpc_linked_mdelem* send_trailing_metadata_storage;
636   grpc_metadata_batch send_trailing_metadata;
637   // For intercepting recv_initial_metadata.
638   grpc_metadata_batch recv_initial_metadata;
639   grpc_closure recv_initial_metadata_ready;
640   bool trailing_metadata_available = false;
641   // For intercepting recv_message.
642   grpc_closure recv_message_ready;
643   grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
644   // For intercepting recv_trailing_metadata.
645   grpc_metadata_batch recv_trailing_metadata;
646   grpc_transport_stream_stats collect_stats;
647   grpc_closure recv_trailing_metadata_ready;
648   // These fields indicate which ops have been started and completed on
649   // this subchannel call.
650   size_t started_send_message_count = 0;
651   size_t completed_send_message_count = 0;
652   size_t started_recv_message_count = 0;
653   size_t completed_recv_message_count = 0;
654   bool started_send_initial_metadata : 1;
655   bool completed_send_initial_metadata : 1;
656   bool started_send_trailing_metadata : 1;
657   bool completed_send_trailing_metadata : 1;
658   bool started_recv_initial_metadata : 1;
659   bool completed_recv_initial_metadata : 1;
660   bool started_recv_trailing_metadata : 1;
661   bool completed_recv_trailing_metadata : 1;
662   // State for callback processing.
663   subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
664   grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
665   subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
666   grpc_error* recv_message_error = GRPC_ERROR_NONE;
667   subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
668   // NOTE: Do not move this next to the metadata bitfields above. That would
669   //       save space but will also result in a data race because compiler will
670   //       generate a 2 byte store which overwrites the meta-data fields upon
671   //       setting this field.
672   bool retry_dispatched : 1;
673 };
674
675 // Pending batches stored in call data.
676 struct pending_batch {
677   // The pending batch.  If nullptr, this slot is empty.
678   grpc_transport_stream_op_batch* batch;
679   // Indicates whether payload for send ops has been cached in call data.
680   bool send_ops_cached;
681 };
682
683 /** Call data.  Holds a pointer to SubchannelCall and the
684     associated machinery to create such a pointer.
685     Handles queueing of stream ops until a call object is ready, waiting
686     for initial metadata before trying to create a call object,
687     and handling cancellation gracefully. */
688 struct call_data {
689   call_data(grpc_call_element* elem, const channel_data& chand,
690             const grpc_call_element_args& args)
691       : deadline_state(elem, args.call_stack, args.call_combiner,
692                        GPR_LIKELY(chand.deadline_checking_enabled)
693                            ? args.deadline
694                            : GRPC_MILLIS_INF_FUTURE),
695         path(grpc_slice_ref_internal(args.path)),
696         call_start_time(args.start_time),
697         deadline(args.deadline),
698         arena(args.arena),
699         owning_call(args.call_stack),
700         call_combiner(args.call_combiner),
701         call_context(args.context),
702         pending_send_initial_metadata(false),
703         pending_send_message(false),
704         pending_send_trailing_metadata(false),
705         enable_retries(chand.enable_retries),
706         retry_committed(false),
707         last_attempt_got_server_pushback(false) {}
708
709   ~call_data() {
710     grpc_slice_unref_internal(path);
711     GRPC_ERROR_UNREF(cancel_error);
712     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
713       GPR_ASSERT(pending_batches[i].batch == nullptr);
714     }
715   }
716
717   // State for handling deadlines.
718   // The code in deadline_filter.c requires this to be the first field.
719   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
720   // and this struct both independently store pointers to the call stack
721   // and call combiner.  If/when we have time, find a way to avoid this
722   // without breaking the grpc_deadline_state abstraction.
723   grpc_deadline_state deadline_state;
724
725   grpc_slice path;  // Request path.
726   gpr_timespec call_start_time;
727   grpc_millis deadline;
728   gpr_arena* arena;
729   grpc_call_stack* owning_call;
730   grpc_call_combiner* call_combiner;
731   grpc_call_context_element* call_context;
732
733   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
734   grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
735
736   grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
737
738   // Set when we get a cancel_stream op.
739   grpc_error* cancel_error = GRPC_ERROR_NONE;
740
741   QueuedPick pick;
742   bool pick_queued = false;
743   bool service_config_applied = false;
744   grpc_core::QueuedPickCanceller* pick_canceller = nullptr;
745   grpc_closure pick_closure;
746
747   grpc_polling_entity* pollent = nullptr;
748
749   // Batches are added to this list when received from above.
750   // They are removed when we are done handling the batch (i.e., when
751   // either we have invoked all of the batch's callbacks or we have
752   // passed the batch down to the subchannel call and are not
753   // intercepting any of its callbacks).
754   pending_batch pending_batches[MAX_PENDING_BATCHES] = {};
755   bool pending_send_initial_metadata : 1;
756   bool pending_send_message : 1;
757   bool pending_send_trailing_metadata : 1;
758
759   // Retry state.
760   bool enable_retries : 1;
761   bool retry_committed : 1;
762   bool last_attempt_got_server_pushback : 1;
763   int num_attempts_completed = 0;
764   size_t bytes_buffered_for_retry = 0;
765   grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
766   grpc_timer retry_timer;
767
768   // The number of pending retriable subchannel batches containing send ops.
769   // We hold a ref to the call stack while this is non-zero, since replay
770   // batches may not complete until after all callbacks have been returned
771   // to the surface, and we need to make sure that the call is not destroyed
772   // until all of these batches have completed.
773   // Note that we actually only need to track replay batches, but it's
774   // easier to track all batches with send ops.
775   int num_pending_retriable_subchannel_send_batches = 0;
776
777   // Cached data for retrying send ops.
778   // send_initial_metadata
779   bool seen_send_initial_metadata = false;
780   grpc_linked_mdelem* send_initial_metadata_storage = nullptr;
781   grpc_metadata_batch send_initial_metadata;
782   uint32_t send_initial_metadata_flags;
783   gpr_atm* peer_string;
784   // send_message
785   // When we get a send_message op, we replace the original byte stream
786   // with a CachingByteStream that caches the slices to a local buffer for
787   // use in retries.
788   // Note: We inline the cache for the first 3 send_message ops and use
789   // dynamic allocation after that.  This number was essentially picked
790   // at random; it could be changed in the future to tune performance.
791   grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
792   // send_trailing_metadata
793   bool seen_send_trailing_metadata = false;
794   grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
795   grpc_metadata_batch send_trailing_metadata;
796 };
797
798 }  // namespace
799
800 // Forward declarations.
801 static void retry_commit(grpc_call_element* elem,
802                          subchannel_call_retry_state* retry_state);
803 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
804 static void on_complete(void* arg, grpc_error* error);
805 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
806 static void remove_call_from_queued_picks_locked(grpc_call_element* elem);
807
808 //
809 // send op data caching
810 //
811
812 // Caches data for send ops so that it can be retried later, if not
813 // already cached.
814 static void maybe_cache_send_ops_for_batch(call_data* calld,
815                                            pending_batch* pending) {
816   if (pending->send_ops_cached) return;
817   pending->send_ops_cached = true;
818   grpc_transport_stream_op_batch* batch = pending->batch;
819   // Save a copy of metadata for send_initial_metadata ops.
820   if (batch->send_initial_metadata) {
821     calld->seen_send_initial_metadata = true;
822     GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
823     grpc_metadata_batch* send_initial_metadata =
824         batch->payload->send_initial_metadata.send_initial_metadata;
825     calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
826         calld->arena,
827         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
828     grpc_metadata_batch_copy(send_initial_metadata,
829                              &calld->send_initial_metadata,
830                              calld->send_initial_metadata_storage);
831     calld->send_initial_metadata_flags =
832         batch->payload->send_initial_metadata.send_initial_metadata_flags;
833     calld->peer_string = batch->payload->send_initial_metadata.peer_string;
834   }
835   // Set up cache for send_message ops.
836   if (batch->send_message) {
837     grpc_core::ByteStreamCache* cache =
838         static_cast<grpc_core::ByteStreamCache*>(
839             gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
840     new (cache) grpc_core::ByteStreamCache(
841         std::move(batch->payload->send_message.send_message));
842     calld->send_messages.push_back(cache);
843   }
844   // Save metadata batch for send_trailing_metadata ops.
845   if (batch->send_trailing_metadata) {
846     calld->seen_send_trailing_metadata = true;
847     GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
848     grpc_metadata_batch* send_trailing_metadata =
849         batch->payload->send_trailing_metadata.send_trailing_metadata;
850     calld->send_trailing_metadata_storage =
851         (grpc_linked_mdelem*)gpr_arena_alloc(
852             calld->arena,
853             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
854     grpc_metadata_batch_copy(send_trailing_metadata,
855                              &calld->send_trailing_metadata,
856                              calld->send_trailing_metadata_storage);
857   }
858 }
859
860 // Frees cached send_initial_metadata.
861 static void free_cached_send_initial_metadata(channel_data* chand,
862                                               call_data* calld) {
863   if (grpc_client_channel_call_trace.enabled()) {
864     gpr_log(GPR_INFO,
865             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
866             calld);
867   }
868   grpc_metadata_batch_destroy(&calld->send_initial_metadata);
869 }
870
871 // Frees cached send_message at index idx.
872 static void free_cached_send_message(channel_data* chand, call_data* calld,
873                                      size_t idx) {
874   if (grpc_client_channel_call_trace.enabled()) {
875     gpr_log(GPR_INFO,
876             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
877             chand, calld, idx);
878   }
879   calld->send_messages[idx]->Destroy();
880 }
881
882 // Frees cached send_trailing_metadata.
883 static void free_cached_send_trailing_metadata(channel_data* chand,
884                                                call_data* calld) {
885   if (grpc_client_channel_call_trace.enabled()) {
886     gpr_log(GPR_INFO,
887             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
888             chand, calld);
889   }
890   grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
891 }
892
893 // Frees cached send ops that have already been completed after
894 // committing the call.
895 static void free_cached_send_op_data_after_commit(
896     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
897   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
898   call_data* calld = static_cast<call_data*>(elem->call_data);
899   if (retry_state->completed_send_initial_metadata) {
900     free_cached_send_initial_metadata(chand, calld);
901   }
902   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
903     free_cached_send_message(chand, calld, i);
904   }
905   if (retry_state->completed_send_trailing_metadata) {
906     free_cached_send_trailing_metadata(chand, calld);
907   }
908 }
909
910 // Frees cached send ops that were completed by the completed batch in
911 // batch_data.  Used when batches are completed after the call is committed.
912 static void free_cached_send_op_data_for_completed_batch(
913     grpc_call_element* elem, subchannel_batch_data* batch_data,
914     subchannel_call_retry_state* retry_state) {
915   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
916   call_data* calld = static_cast<call_data*>(elem->call_data);
917   if (batch_data->batch.send_initial_metadata) {
918     free_cached_send_initial_metadata(chand, calld);
919   }
920   if (batch_data->batch.send_message) {
921     free_cached_send_message(chand, calld,
922                              retry_state->completed_send_message_count - 1);
923   }
924   if (batch_data->batch.send_trailing_metadata) {
925     free_cached_send_trailing_metadata(chand, calld);
926   }
927 }
928
929 //
930 // LB recv_trailing_metadata_ready handling
931 //
932
933 void maybe_inject_recv_trailing_metadata_ready_for_lb(
934     const LoadBalancingPolicy::PickArgs& pick,
935     grpc_transport_stream_op_batch* batch) {
936   if (pick.recv_trailing_metadata_ready != nullptr) {
937     *pick.original_recv_trailing_metadata_ready =
938         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
939     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
940         pick.recv_trailing_metadata_ready;
941     if (pick.recv_trailing_metadata != nullptr) {
942       *pick.recv_trailing_metadata =
943           batch->payload->recv_trailing_metadata.recv_trailing_metadata;
944     }
945   }
946 }
947
948 //
949 // pending_batches management
950 //
951
952 // Returns the index into calld->pending_batches to be used for batch.
953 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
954   // Note: It is important the send_initial_metadata be the first entry
955   // here, since the code in pick_subchannel_locked() assumes it will be.
956   if (batch->send_initial_metadata) return 0;
957   if (batch->send_message) return 1;
958   if (batch->send_trailing_metadata) return 2;
959   if (batch->recv_initial_metadata) return 3;
960   if (batch->recv_message) return 4;
961   if (batch->recv_trailing_metadata) return 5;
962   GPR_UNREACHABLE_CODE(return (size_t)-1);
963 }
964
965 // This is called via the call combiner, so access to calld is synchronized.
966 static void pending_batches_add(grpc_call_element* elem,
967                                 grpc_transport_stream_op_batch* batch) {
968   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
969   call_data* calld = static_cast<call_data*>(elem->call_data);
970   const size_t idx = get_batch_index(batch);
971   if (grpc_client_channel_call_trace.enabled()) {
972     gpr_log(GPR_INFO,
973             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
974             calld, idx);
975   }
976   pending_batch* pending = &calld->pending_batches[idx];
977   GPR_ASSERT(pending->batch == nullptr);
978   pending->batch = batch;
979   pending->send_ops_cached = false;
980   if (calld->enable_retries) {
981     // Update state in calld about pending batches.
982     // Also check if the batch takes us over the retry buffer limit.
983     // Note: We don't check the size of trailing metadata here, because
984     // gRPC clients do not send trailing metadata.
985     if (batch->send_initial_metadata) {
986       calld->pending_send_initial_metadata = true;
987       calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
988           batch->payload->send_initial_metadata.send_initial_metadata);
989     }
990     if (batch->send_message) {
991       calld->pending_send_message = true;
992       calld->bytes_buffered_for_retry +=
993           batch->payload->send_message.send_message->length();
994     }
995     if (batch->send_trailing_metadata) {
996       calld->pending_send_trailing_metadata = true;
997     }
998     if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
999                      chand->per_rpc_retry_buffer_size)) {
1000       if (grpc_client_channel_call_trace.enabled()) {
1001         gpr_log(GPR_INFO,
1002                 "chand=%p calld=%p: exceeded retry buffer size, committing",
1003                 chand, calld);
1004       }
1005       subchannel_call_retry_state* retry_state =
1006           calld->subchannel_call == nullptr
1007               ? nullptr
1008               : static_cast<subchannel_call_retry_state*>(
1009
1010                     calld->subchannel_call->GetParentData());
1011       retry_commit(elem, retry_state);
1012       // If we are not going to retry and have not yet started, pretend
1013       // retries are disabled so that we don't bother with retry overhead.
1014       if (calld->num_attempts_completed == 0) {
1015         if (grpc_client_channel_call_trace.enabled()) {
1016           gpr_log(GPR_INFO,
1017                   "chand=%p calld=%p: disabling retries before first attempt",
1018                   chand, calld);
1019         }
1020         calld->enable_retries = false;
1021       }
1022     }
1023   }
1024 }
1025
1026 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1027   if (calld->enable_retries) {
1028     if (pending->batch->send_initial_metadata) {
1029       calld->pending_send_initial_metadata = false;
1030     }
1031     if (pending->batch->send_message) {
1032       calld->pending_send_message = false;
1033     }
1034     if (pending->batch->send_trailing_metadata) {
1035       calld->pending_send_trailing_metadata = false;
1036     }
1037   }
1038   pending->batch = nullptr;
1039 }
1040
1041 // This is called via the call combiner, so access to calld is synchronized.
1042 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1043   grpc_transport_stream_op_batch* batch =
1044       static_cast<grpc_transport_stream_op_batch*>(arg);
1045   call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1046   // Note: This will release the call combiner.
1047   grpc_transport_stream_op_batch_finish_with_failure(
1048       batch, GRPC_ERROR_REF(error), calld->call_combiner);
1049 }
1050
1051 // This is called via the call combiner, so access to calld is synchronized.
1052 // If yield_call_combiner_predicate returns true, assumes responsibility for
1053 // yielding the call combiner.
1054 typedef bool (*YieldCallCombinerPredicate)(
1055     const grpc_core::CallCombinerClosureList& closures);
1056 static bool yield_call_combiner(
1057     const grpc_core::CallCombinerClosureList& closures) {
1058   return true;
1059 }
1060 static bool no_yield_call_combiner(
1061     const grpc_core::CallCombinerClosureList& closures) {
1062   return false;
1063 }
1064 static bool yield_call_combiner_if_pending_batches_found(
1065     const grpc_core::CallCombinerClosureList& closures) {
1066   return closures.size() > 0;
1067 }
1068 static void pending_batches_fail(
1069     grpc_call_element* elem, grpc_error* error,
1070     YieldCallCombinerPredicate yield_call_combiner_predicate) {
1071   GPR_ASSERT(error != GRPC_ERROR_NONE);
1072   call_data* calld = static_cast<call_data*>(elem->call_data);
1073   if (grpc_client_channel_call_trace.enabled()) {
1074     size_t num_batches = 0;
1075     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1076       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1077     }
1078     gpr_log(GPR_INFO,
1079             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1080             elem->channel_data, calld, num_batches, grpc_error_string(error));
1081   }
1082   grpc_core::CallCombinerClosureList closures;
1083   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1084     pending_batch* pending = &calld->pending_batches[i];
1085     grpc_transport_stream_op_batch* batch = pending->batch;
1086     if (batch != nullptr) {
1087       if (batch->recv_trailing_metadata) {
1088         maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
1089                                                          batch);
1090       }
1091       batch->handler_private.extra_arg = calld;
1092       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1093                         fail_pending_batch_in_call_combiner, batch,
1094                         grpc_schedule_on_exec_ctx);
1095       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1096                    "pending_batches_fail");
1097       pending_batch_clear(calld, pending);
1098     }
1099   }
1100   if (yield_call_combiner_predicate(closures)) {
1101     closures.RunClosures(calld->call_combiner);
1102   } else {
1103     closures.RunClosuresWithoutYielding(calld->call_combiner);
1104   }
1105   GRPC_ERROR_UNREF(error);
1106 }
1107
1108 // This is called via the call combiner, so access to calld is synchronized.
1109 static void resume_pending_batch_in_call_combiner(void* arg,
1110                                                   grpc_error* ignored) {
1111   grpc_transport_stream_op_batch* batch =
1112       static_cast<grpc_transport_stream_op_batch*>(arg);
1113   grpc_core::SubchannelCall* subchannel_call =
1114       static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
1115   // Note: This will release the call combiner.
1116   subchannel_call->StartTransportStreamOpBatch(batch);
1117 }
1118
1119 // This is called via the call combiner, so access to calld is synchronized.
1120 static void pending_batches_resume(grpc_call_element* elem) {
1121   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1122   call_data* calld = static_cast<call_data*>(elem->call_data);
1123   if (calld->enable_retries) {
1124     start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1125     return;
1126   }
1127   // Retries not enabled; send down batches as-is.
1128   if (grpc_client_channel_call_trace.enabled()) {
1129     size_t num_batches = 0;
1130     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1131       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1132     }
1133     gpr_log(GPR_INFO,
1134             "chand=%p calld=%p: starting %" PRIuPTR
1135             " pending batches on subchannel_call=%p",
1136             chand, calld, num_batches, calld->subchannel_call.get());
1137   }
1138   grpc_core::CallCombinerClosureList closures;
1139   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1140     pending_batch* pending = &calld->pending_batches[i];
1141     grpc_transport_stream_op_batch* batch = pending->batch;
1142     if (batch != nullptr) {
1143       if (batch->recv_trailing_metadata) {
1144         maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
1145                                                          batch);
1146       }
1147       batch->handler_private.extra_arg = calld->subchannel_call.get();
1148       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1149                         resume_pending_batch_in_call_combiner, batch,
1150                         grpc_schedule_on_exec_ctx);
1151       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1152                    "pending_batches_resume");
1153       pending_batch_clear(calld, pending);
1154     }
1155   }
1156   // Note: This will release the call combiner.
1157   closures.RunClosures(calld->call_combiner);
1158 }
1159
1160 static void maybe_clear_pending_batch(grpc_call_element* elem,
1161                                       pending_batch* pending) {
1162   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1163   call_data* calld = static_cast<call_data*>(elem->call_data);
1164   grpc_transport_stream_op_batch* batch = pending->batch;
1165   // We clear the pending batch if all of its callbacks have been
1166   // scheduled and reset to nullptr.
1167   if (batch->on_complete == nullptr &&
1168       (!batch->recv_initial_metadata ||
1169        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1170            nullptr) &&
1171       (!batch->recv_message ||
1172        batch->payload->recv_message.recv_message_ready == nullptr) &&
1173       (!batch->recv_trailing_metadata ||
1174        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1175            nullptr)) {
1176     if (grpc_client_channel_call_trace.enabled()) {
1177       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
1178               calld);
1179     }
1180     pending_batch_clear(calld, pending);
1181   }
1182 }
1183
1184 // Returns a pointer to the first pending batch for which predicate(batch)
1185 // returns true, or null if not found.
1186 template <typename Predicate>
1187 static pending_batch* pending_batch_find(grpc_call_element* elem,
1188                                          const char* log_message,
1189                                          Predicate predicate) {
1190   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1191   call_data* calld = static_cast<call_data*>(elem->call_data);
1192   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1193     pending_batch* pending = &calld->pending_batches[i];
1194     grpc_transport_stream_op_batch* batch = pending->batch;
1195     if (batch != nullptr && predicate(batch)) {
1196       if (grpc_client_channel_call_trace.enabled()) {
1197         gpr_log(GPR_INFO,
1198                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1199                 calld, log_message, i);
1200       }
1201       return pending;
1202     }
1203   }
1204   return nullptr;
1205 }
1206
1207 //
1208 // retry code
1209 //
1210
1211 // Commits the call so that no further retry attempts will be performed.
1212 static void retry_commit(grpc_call_element* elem,
1213                          subchannel_call_retry_state* retry_state) {
1214   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1215   call_data* calld = static_cast<call_data*>(elem->call_data);
1216   if (calld->retry_committed) return;
1217   calld->retry_committed = true;
1218   if (grpc_client_channel_call_trace.enabled()) {
1219     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
1220   }
1221   if (retry_state != nullptr) {
1222     free_cached_send_op_data_after_commit(elem, retry_state);
1223   }
1224 }
1225
1226 // Starts a retry after appropriate back-off.
1227 static void do_retry(grpc_call_element* elem,
1228                      subchannel_call_retry_state* retry_state,
1229                      grpc_millis server_pushback_ms) {
1230   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1231   call_data* calld = static_cast<call_data*>(elem->call_data);
1232   GPR_ASSERT(calld->method_params != nullptr);
1233   const ClientChannelMethodParams::RetryPolicy* retry_policy =
1234       calld->method_params->retry_policy();
1235   GPR_ASSERT(retry_policy != nullptr);
1236   // Reset subchannel call and connected subchannel.
1237   calld->subchannel_call.reset();
1238   calld->pick.pick.connected_subchannel.reset();
1239   // Compute backoff delay.
1240   grpc_millis next_attempt_time;
1241   if (server_pushback_ms >= 0) {
1242     next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1243     calld->last_attempt_got_server_pushback = true;
1244   } else {
1245     if (calld->num_attempts_completed == 1 ||
1246         calld->last_attempt_got_server_pushback) {
1247       calld->retry_backoff.Init(
1248           grpc_core::BackOff::Options()
1249               .set_initial_backoff(retry_policy->initial_backoff)
1250               .set_multiplier(retry_policy->backoff_multiplier)
1251               .set_jitter(RETRY_BACKOFF_JITTER)
1252               .set_max_backoff(retry_policy->max_backoff));
1253       calld->last_attempt_got_server_pushback = false;
1254     }
1255     next_attempt_time = calld->retry_backoff->NextAttemptTime();
1256   }
1257   if (grpc_client_channel_call_trace.enabled()) {
1258     gpr_log(GPR_INFO,
1259             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
1260             calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1261   }
1262   // Schedule retry after computed delay.
1263   GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1264                     grpc_combiner_scheduler(chand->combiner));
1265   grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1266   // Update bookkeeping.
1267   if (retry_state != nullptr) retry_state->retry_dispatched = true;
1268 }
1269
1270 // Returns true if the call is being retried.
1271 static bool maybe_retry(grpc_call_element* elem,
1272                         subchannel_batch_data* batch_data,
1273                         grpc_status_code status,
1274                         grpc_mdelem* server_pushback_md) {
1275   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1276   call_data* calld = static_cast<call_data*>(elem->call_data);
1277   // Get retry policy.
1278   if (calld->method_params == nullptr) return false;
1279   const ClientChannelMethodParams::RetryPolicy* retry_policy =
1280       calld->method_params->retry_policy();
1281   if (retry_policy == nullptr) return false;
1282   // If we've already dispatched a retry from this call, return true.
1283   // This catches the case where the batch has multiple callbacks
1284   // (i.e., it includes either recv_message or recv_initial_metadata).
1285   subchannel_call_retry_state* retry_state = nullptr;
1286   if (batch_data != nullptr) {
1287     retry_state = static_cast<subchannel_call_retry_state*>(
1288         batch_data->subchannel_call->GetParentData());
1289     if (retry_state->retry_dispatched) {
1290       if (grpc_client_channel_call_trace.enabled()) {
1291         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
1292                 calld);
1293       }
1294       return true;
1295     }
1296   }
1297   // Check status.
1298   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
1299     if (calld->retry_throttle_data != nullptr) {
1300       calld->retry_throttle_data->RecordSuccess();
1301     }
1302     if (grpc_client_channel_call_trace.enabled()) {
1303       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
1304     }
1305     return false;
1306   }
1307   // Status is not OK.  Check whether the status is retryable.
1308   if (!retry_policy->retryable_status_codes.Contains(status)) {
1309     if (grpc_client_channel_call_trace.enabled()) {
1310       gpr_log(GPR_INFO,
1311               "chand=%p calld=%p: status %s not configured as retryable", chand,
1312               calld, grpc_status_code_to_string(status));
1313     }
1314     return false;
1315   }
1316   // Record the failure and check whether retries are throttled.
1317   // Note that it's important for this check to come after the status
1318   // code check above, since we should only record failures whose statuses
1319   // match the configured retryable status codes, so that we don't count
1320   // things like failures due to malformed requests (INVALID_ARGUMENT).
1321   // Conversely, it's important for this to come before the remaining
1322   // checks, so that we don't fail to record failures due to other factors.
1323   if (calld->retry_throttle_data != nullptr &&
1324       !calld->retry_throttle_data->RecordFailure()) {
1325     if (grpc_client_channel_call_trace.enabled()) {
1326       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
1327     }
1328     return false;
1329   }
1330   // Check whether the call is committed.
1331   if (calld->retry_committed) {
1332     if (grpc_client_channel_call_trace.enabled()) {
1333       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
1334               calld);
1335     }
1336     return false;
1337   }
1338   // Check whether we have retries remaining.
1339   ++calld->num_attempts_completed;
1340   if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1341     if (grpc_client_channel_call_trace.enabled()) {
1342       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1343               calld, retry_policy->max_attempts);
1344     }
1345     return false;
1346   }
1347   // If the call was cancelled from the surface, don't retry.
1348   if (calld->cancel_error != GRPC_ERROR_NONE) {
1349     if (grpc_client_channel_call_trace.enabled()) {
1350       gpr_log(GPR_INFO,
1351               "chand=%p calld=%p: call cancelled from surface, not retrying",
1352               chand, calld);
1353     }
1354     return false;
1355   }
1356   // Check server push-back.
1357   grpc_millis server_pushback_ms = -1;
1358   if (server_pushback_md != nullptr) {
1359     // If the value is "-1" or any other unparseable string, we do not retry.
1360     uint32_t ms;
1361     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1362       if (grpc_client_channel_call_trace.enabled()) {
1363         gpr_log(GPR_INFO,
1364                 "chand=%p calld=%p: not retrying due to server push-back",
1365                 chand, calld);
1366       }
1367       return false;
1368     } else {
1369       if (grpc_client_channel_call_trace.enabled()) {
1370         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1371                 chand, calld, ms);
1372       }
1373       server_pushback_ms = (grpc_millis)ms;
1374     }
1375   }
1376   do_retry(elem, retry_state, server_pushback_ms);
1377   return true;
1378 }
1379
1380 //
1381 // subchannel_batch_data
1382 //
1383
1384 namespace {
1385
1386 subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
1387                                              call_data* calld, int refcount,
1388                                              bool set_on_complete)
1389     : elem(elem), subchannel_call(calld->subchannel_call) {
1390   subchannel_call_retry_state* retry_state =
1391       static_cast<subchannel_call_retry_state*>(
1392           calld->subchannel_call->GetParentData());
1393   batch.payload = &retry_state->batch_payload;
1394   gpr_ref_init(&refs, refcount);
1395   if (set_on_complete) {
1396     GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this,
1397                       grpc_schedule_on_exec_ctx);
1398     batch.on_complete = &on_complete;
1399   }
1400   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1401 }
1402
1403 void subchannel_batch_data::destroy() {
1404   subchannel_call_retry_state* retry_state =
1405       static_cast<subchannel_call_retry_state*>(
1406           subchannel_call->GetParentData());
1407   if (batch.send_initial_metadata) {
1408     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
1409   }
1410   if (batch.send_trailing_metadata) {
1411     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
1412   }
1413   if (batch.recv_initial_metadata) {
1414     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
1415   }
1416   if (batch.recv_trailing_metadata) {
1417     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
1418   }
1419   subchannel_call.reset();
1420   call_data* calld = static_cast<call_data*>(elem->call_data);
1421   GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1422 }
1423
1424 }  // namespace
1425
1426 // Creates a subchannel_batch_data object on the call's arena with the
1427 // specified refcount.  If set_on_complete is true, the batch's
1428 // on_complete callback will be set to point to on_complete();
1429 // otherwise, the batch's on_complete callback will be null.
1430 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1431                                                 int refcount,
1432                                                 bool set_on_complete) {
1433   call_data* calld = static_cast<call_data*>(elem->call_data);
1434   subchannel_batch_data* batch_data =
1435       new (gpr_arena_alloc(calld->arena, sizeof(*batch_data)))
1436           subchannel_batch_data(elem, calld, refcount, set_on_complete);
1437   return batch_data;
1438 }
1439
1440 static void batch_data_unref(subchannel_batch_data* batch_data) {
1441   if (gpr_unref(&batch_data->refs)) {
1442     batch_data->destroy();
1443   }
1444 }
1445
1446 //
1447 // recv_initial_metadata callback handling
1448 //
1449
1450 // Invokes recv_initial_metadata_ready for a subchannel batch.
1451 static void invoke_recv_initial_metadata_callback(void* arg,
1452                                                   grpc_error* error) {
1453   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1454   // Find pending batch.
1455   pending_batch* pending = pending_batch_find(
1456       batch_data->elem, "invoking recv_initial_metadata_ready for",
1457       [](grpc_transport_stream_op_batch* batch) {
1458         return batch->recv_initial_metadata &&
1459                batch->payload->recv_initial_metadata
1460                        .recv_initial_metadata_ready != nullptr;
1461       });
1462   GPR_ASSERT(pending != nullptr);
1463   // Return metadata.
1464   subchannel_call_retry_state* retry_state =
1465       static_cast<subchannel_call_retry_state*>(
1466           batch_data->subchannel_call->GetParentData());
1467   grpc_metadata_batch_move(
1468       &retry_state->recv_initial_metadata,
1469       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1470   // Update bookkeeping.
1471   // Note: Need to do this before invoking the callback, since invoking
1472   // the callback will result in yielding the call combiner.
1473   grpc_closure* recv_initial_metadata_ready =
1474       pending->batch->payload->recv_initial_metadata
1475           .recv_initial_metadata_ready;
1476   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1477       nullptr;
1478   maybe_clear_pending_batch(batch_data->elem, pending);
1479   batch_data_unref(batch_data);
1480   // Invoke callback.
1481   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1482 }
1483
1484 // Intercepts recv_initial_metadata_ready callback for retries.
1485 // Commits the call and returns the initial metadata up the stack.
1486 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1487   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1488   grpc_call_element* elem = batch_data->elem;
1489   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1490   call_data* calld = static_cast<call_data*>(elem->call_data);
1491   if (grpc_client_channel_call_trace.enabled()) {
1492     gpr_log(GPR_INFO,
1493             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1494             chand, calld, grpc_error_string(error));
1495   }
1496   subchannel_call_retry_state* retry_state =
1497       static_cast<subchannel_call_retry_state*>(
1498           batch_data->subchannel_call->GetParentData());
1499   retry_state->completed_recv_initial_metadata = true;
1500   // If a retry was already dispatched, then we're not going to use the
1501   // result of this recv_initial_metadata op, so do nothing.
1502   if (retry_state->retry_dispatched) {
1503     GRPC_CALL_COMBINER_STOP(
1504         calld->call_combiner,
1505         "recv_initial_metadata_ready after retry dispatched");
1506     return;
1507   }
1508   // If we got an error or a Trailers-Only response and have not yet gotten
1509   // the recv_trailing_metadata_ready callback, then defer propagating this
1510   // callback back to the surface.  We can evaluate whether to retry when
1511   // recv_trailing_metadata comes back.
1512   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
1513                     error != GRPC_ERROR_NONE) &&
1514                    !retry_state->completed_recv_trailing_metadata)) {
1515     if (grpc_client_channel_call_trace.enabled()) {
1516       gpr_log(GPR_INFO,
1517               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1518               "(Trailers-Only)",
1519               chand, calld);
1520     }
1521     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
1522     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1523     if (!retry_state->started_recv_trailing_metadata) {
1524       // recv_trailing_metadata not yet started by application; start it
1525       // ourselves to get status.
1526       start_internal_recv_trailing_metadata(elem);
1527     } else {
1528       GRPC_CALL_COMBINER_STOP(
1529           calld->call_combiner,
1530           "recv_initial_metadata_ready trailers-only or error");
1531     }
1532     return;
1533   }
1534   // Received valid initial metadata, so commit the call.
1535   retry_commit(elem, retry_state);
1536   // Invoke the callback to return the result to the surface.
1537   // Manually invoking a callback function; it does not take ownership of error.
1538   invoke_recv_initial_metadata_callback(batch_data, error);
1539 }
1540
1541 //
1542 // recv_message callback handling
1543 //
1544
1545 // Invokes recv_message_ready for a subchannel batch.
1546 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1547   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1548   // Find pending op.
1549   pending_batch* pending = pending_batch_find(
1550       batch_data->elem, "invoking recv_message_ready for",
1551       [](grpc_transport_stream_op_batch* batch) {
1552         return batch->recv_message &&
1553                batch->payload->recv_message.recv_message_ready != nullptr;
1554       });
1555   GPR_ASSERT(pending != nullptr);
1556   // Return payload.
1557   subchannel_call_retry_state* retry_state =
1558       static_cast<subchannel_call_retry_state*>(
1559           batch_data->subchannel_call->GetParentData());
1560   *pending->batch->payload->recv_message.recv_message =
1561       std::move(retry_state->recv_message);
1562   // Update bookkeeping.
1563   // Note: Need to do this before invoking the callback, since invoking
1564   // the callback will result in yielding the call combiner.
1565   grpc_closure* recv_message_ready =
1566       pending->batch->payload->recv_message.recv_message_ready;
1567   pending->batch->payload->recv_message.recv_message_ready = nullptr;
1568   maybe_clear_pending_batch(batch_data->elem, pending);
1569   batch_data_unref(batch_data);
1570   // Invoke callback.
1571   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1572 }
1573
1574 // Intercepts recv_message_ready callback for retries.
1575 // Commits the call and returns the message up the stack.
1576 static void recv_message_ready(void* arg, grpc_error* error) {
1577   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1578   grpc_call_element* elem = batch_data->elem;
1579   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1580   call_data* calld = static_cast<call_data*>(elem->call_data);
1581   if (grpc_client_channel_call_trace.enabled()) {
1582     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1583             chand, calld, grpc_error_string(error));
1584   }
1585   subchannel_call_retry_state* retry_state =
1586       static_cast<subchannel_call_retry_state*>(
1587           batch_data->subchannel_call->GetParentData());
1588   ++retry_state->completed_recv_message_count;
1589   // If a retry was already dispatched, then we're not going to use the
1590   // result of this recv_message op, so do nothing.
1591   if (retry_state->retry_dispatched) {
1592     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1593                             "recv_message_ready after retry dispatched");
1594     return;
1595   }
1596   // If we got an error or the payload was nullptr and we have not yet gotten
1597   // the recv_trailing_metadata_ready callback, then defer propagating this
1598   // callback back to the surface.  We can evaluate whether to retry when
1599   // recv_trailing_metadata comes back.
1600   if (GPR_UNLIKELY(
1601           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1602           !retry_state->completed_recv_trailing_metadata)) {
1603     if (grpc_client_channel_call_trace.enabled()) {
1604       gpr_log(GPR_INFO,
1605               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1606               "message and recv_trailing_metadata pending)",
1607               chand, calld);
1608     }
1609     retry_state->recv_message_ready_deferred_batch = batch_data;
1610     retry_state->recv_message_error = GRPC_ERROR_REF(error);
1611     if (!retry_state->started_recv_trailing_metadata) {
1612       // recv_trailing_metadata not yet started by application; start it
1613       // ourselves to get status.
1614       start_internal_recv_trailing_metadata(elem);
1615     } else {
1616       GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1617     }
1618     return;
1619   }
1620   // Received a valid message, so commit the call.
1621   retry_commit(elem, retry_state);
1622   // Invoke the callback to return the result to the surface.
1623   // Manually invoking a callback function; it does not take ownership of error.
1624   invoke_recv_message_callback(batch_data, error);
1625 }
1626
1627 //
1628 // recv_trailing_metadata handling
1629 //
1630
1631 // Sets *status and *server_pushback_md based on md_batch and error.
1632 // Only sets *server_pushback_md if server_pushback_md != nullptr.
1633 static void get_call_status(grpc_call_element* elem,
1634                             grpc_metadata_batch* md_batch, grpc_error* error,
1635                             grpc_status_code* status,
1636                             grpc_mdelem** server_pushback_md) {
1637   call_data* calld = static_cast<call_data*>(elem->call_data);
1638   if (error != GRPC_ERROR_NONE) {
1639     grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1640                           nullptr);
1641   } else {
1642     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1643     *status =
1644         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1645     if (server_pushback_md != nullptr &&
1646         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1647       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1648     }
1649   }
1650   GRPC_ERROR_UNREF(error);
1651 }
1652
1653 // Adds recv_trailing_metadata_ready closure to closures.
1654 static void add_closure_for_recv_trailing_metadata_ready(
1655     grpc_call_element* elem, subchannel_batch_data* batch_data,
1656     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1657   // Find pending batch.
1658   pending_batch* pending = pending_batch_find(
1659       elem, "invoking recv_trailing_metadata for",
1660       [](grpc_transport_stream_op_batch* batch) {
1661         return batch->recv_trailing_metadata &&
1662                batch->payload->recv_trailing_metadata
1663                        .recv_trailing_metadata_ready != nullptr;
1664       });
1665   // If we generated the recv_trailing_metadata op internally via
1666   // start_internal_recv_trailing_metadata(), then there will be no
1667   // pending batch.
1668   if (pending == nullptr) {
1669     GRPC_ERROR_UNREF(error);
1670     return;
1671   }
1672   // Return metadata.
1673   subchannel_call_retry_state* retry_state =
1674       static_cast<subchannel_call_retry_state*>(
1675           batch_data->subchannel_call->GetParentData());
1676   grpc_metadata_batch_move(
1677       &retry_state->recv_trailing_metadata,
1678       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1679   // Add closure.
1680   closures->Add(pending->batch->payload->recv_trailing_metadata
1681                     .recv_trailing_metadata_ready,
1682                 error, "recv_trailing_metadata_ready for pending batch");
1683   // Update bookkeeping.
1684   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1685       nullptr;
1686   maybe_clear_pending_batch(elem, pending);
1687 }
1688
1689 // Adds any necessary closures for deferred recv_initial_metadata and
1690 // recv_message callbacks to closures.
1691 static void add_closures_for_deferred_recv_callbacks(
1692     subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1693     grpc_core::CallCombinerClosureList* closures) {
1694   if (batch_data->batch.recv_trailing_metadata) {
1695     // Add closure for deferred recv_initial_metadata_ready.
1696     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1697                      nullptr)) {
1698       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1699                         invoke_recv_initial_metadata_callback,
1700                         retry_state->recv_initial_metadata_ready_deferred_batch,
1701                         grpc_schedule_on_exec_ctx);
1702       closures->Add(&retry_state->recv_initial_metadata_ready,
1703                     retry_state->recv_initial_metadata_error,
1704                     "resuming recv_initial_metadata_ready");
1705       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1706     }
1707     // Add closure for deferred recv_message_ready.
1708     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1709                      nullptr)) {
1710       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
1711                         invoke_recv_message_callback,
1712                         retry_state->recv_message_ready_deferred_batch,
1713                         grpc_schedule_on_exec_ctx);
1714       closures->Add(&retry_state->recv_message_ready,
1715                     retry_state->recv_message_error,
1716                     "resuming recv_message_ready");
1717       retry_state->recv_message_ready_deferred_batch = nullptr;
1718     }
1719   }
1720 }
1721
1722 // Returns true if any op in the batch was not yet started.
1723 // Only looks at send ops, since recv ops are always started immediately.
1724 static bool pending_batch_is_unstarted(
1725     pending_batch* pending, call_data* calld,
1726     subchannel_call_retry_state* retry_state) {
1727   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1728     return false;
1729   }
1730   if (pending->batch->send_initial_metadata &&
1731       !retry_state->started_send_initial_metadata) {
1732     return true;
1733   }
1734   if (pending->batch->send_message &&
1735       retry_state->started_send_message_count < calld->send_messages.size()) {
1736     return true;
1737   }
1738   if (pending->batch->send_trailing_metadata &&
1739       !retry_state->started_send_trailing_metadata) {
1740     return true;
1741   }
1742   return false;
1743 }
1744
1745 // For any pending batch containing an op that has not yet been started,
1746 // adds the pending batch's completion closures to closures.
1747 static void add_closures_to_fail_unstarted_pending_batches(
1748     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1749     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1750   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1751   call_data* calld = static_cast<call_data*>(elem->call_data);
1752   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1753     pending_batch* pending = &calld->pending_batches[i];
1754     if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1755       if (grpc_client_channel_call_trace.enabled()) {
1756         gpr_log(GPR_INFO,
1757                 "chand=%p calld=%p: failing unstarted pending batch at index "
1758                 "%" PRIuPTR,
1759                 chand, calld, i);
1760       }
1761       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1762                     "failing on_complete for pending batch");
1763       pending->batch->on_complete = nullptr;
1764       maybe_clear_pending_batch(elem, pending);
1765     }
1766   }
1767   GRPC_ERROR_UNREF(error);
1768 }
1769
1770 // Runs necessary closures upon completion of a call attempt.
1771 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1772                                             grpc_error* error) {
1773   grpc_call_element* elem = batch_data->elem;
1774   call_data* calld = static_cast<call_data*>(elem->call_data);
1775   subchannel_call_retry_state* retry_state =
1776       static_cast<subchannel_call_retry_state*>(
1777           batch_data->subchannel_call->GetParentData());
1778   // Construct list of closures to execute.
1779   grpc_core::CallCombinerClosureList closures;
1780   // First, add closure for recv_trailing_metadata_ready.
1781   add_closure_for_recv_trailing_metadata_ready(
1782       elem, batch_data, GRPC_ERROR_REF(error), &closures);
1783   // If there are deferred recv_initial_metadata_ready or recv_message_ready
1784   // callbacks, add them to closures.
1785   add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1786   // Add closures to fail any pending batches that have not yet been started.
1787   add_closures_to_fail_unstarted_pending_batches(
1788       elem, retry_state, GRPC_ERROR_REF(error), &closures);
1789   // Don't need batch_data anymore.
1790   batch_data_unref(batch_data);
1791   // Schedule all of the closures identified above.
1792   // Note: This will release the call combiner.
1793   closures.RunClosures(calld->call_combiner);
1794   GRPC_ERROR_UNREF(error);
1795 }
1796
1797 // Intercepts recv_trailing_metadata_ready callback for retries.
1798 // Commits the call and returns the trailing metadata up the stack.
1799 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1800   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1801   grpc_call_element* elem = batch_data->elem;
1802   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1803   call_data* calld = static_cast<call_data*>(elem->call_data);
1804   if (grpc_client_channel_call_trace.enabled()) {
1805     gpr_log(GPR_INFO,
1806             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1807             chand, calld, grpc_error_string(error));
1808   }
1809   subchannel_call_retry_state* retry_state =
1810       static_cast<subchannel_call_retry_state*>(
1811           batch_data->subchannel_call->GetParentData());
1812   retry_state->completed_recv_trailing_metadata = true;
1813   // Get the call's status and check for server pushback metadata.
1814   grpc_status_code status = GRPC_STATUS_OK;
1815   grpc_mdelem* server_pushback_md = nullptr;
1816   grpc_metadata_batch* md_batch =
1817       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
1818   get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
1819                   &server_pushback_md);
1820   if (grpc_client_channel_call_trace.enabled()) {
1821     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1822             calld, grpc_status_code_to_string(status));
1823   }
1824   // Check if we should retry.
1825   if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1826     // Unref batch_data for deferred recv_initial_metadata_ready or
1827     // recv_message_ready callbacks, if any.
1828     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1829       batch_data_unref(batch_data);
1830       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1831     }
1832     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
1833       batch_data_unref(batch_data);
1834       GRPC_ERROR_UNREF(retry_state->recv_message_error);
1835     }
1836     batch_data_unref(batch_data);
1837     return;
1838   }
1839   // Not retrying, so commit the call.
1840   retry_commit(elem, retry_state);
1841   // Run any necessary closures.
1842   run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
1843 }
1844
1845 //
1846 // on_complete callback handling
1847 //
1848
1849 // Adds the on_complete closure for the pending batch completed in
1850 // batch_data to closures.
1851 static void add_closure_for_completed_pending_batch(
1852     grpc_call_element* elem, subchannel_batch_data* batch_data,
1853     subchannel_call_retry_state* retry_state, grpc_error* error,
1854     grpc_core::CallCombinerClosureList* closures) {
1855   pending_batch* pending = pending_batch_find(
1856       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
1857         // Match the pending batch with the same set of send ops as the
1858         // subchannel batch we've just completed.
1859         return batch->on_complete != nullptr &&
1860                batch_data->batch.send_initial_metadata ==
1861                    batch->send_initial_metadata &&
1862                batch_data->batch.send_message == batch->send_message &&
1863                batch_data->batch.send_trailing_metadata ==
1864                    batch->send_trailing_metadata;
1865       });
1866   // If batch_data is a replay batch, then there will be no pending
1867   // batch to complete.
1868   if (pending == nullptr) {
1869     GRPC_ERROR_UNREF(error);
1870     return;
1871   }
1872   // Add closure.
1873   closures->Add(pending->batch->on_complete, error,
1874                 "on_complete for pending batch");
1875   pending->batch->on_complete = nullptr;
1876   maybe_clear_pending_batch(elem, pending);
1877 }
1878
1879 // If there are any cached ops to replay or pending ops to start on the
1880 // subchannel call, adds a closure to closures to invoke
1881 // start_retriable_subchannel_batches().
1882 static void add_closures_for_replay_or_pending_send_ops(
1883     grpc_call_element* elem, subchannel_batch_data* batch_data,
1884     subchannel_call_retry_state* retry_state,
1885     grpc_core::CallCombinerClosureList* closures) {
1886   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1887   call_data* calld = static_cast<call_data*>(elem->call_data);
1888   bool have_pending_send_message_ops =
1889       retry_state->started_send_message_count < calld->send_messages.size();
1890   bool have_pending_send_trailing_metadata_op =
1891       calld->seen_send_trailing_metadata &&
1892       !retry_state->started_send_trailing_metadata;
1893   if (!have_pending_send_message_ops &&
1894       !have_pending_send_trailing_metadata_op) {
1895     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1896       pending_batch* pending = &calld->pending_batches[i];
1897       grpc_transport_stream_op_batch* batch = pending->batch;
1898       if (batch == nullptr || pending->send_ops_cached) continue;
1899       if (batch->send_message) have_pending_send_message_ops = true;
1900       if (batch->send_trailing_metadata) {
1901         have_pending_send_trailing_metadata_op = true;
1902       }
1903     }
1904   }
1905   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1906     if (grpc_client_channel_call_trace.enabled()) {
1907       gpr_log(GPR_INFO,
1908               "chand=%p calld=%p: starting next batch for pending send op(s)",
1909               chand, calld);
1910     }
1911     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
1912                       start_retriable_subchannel_batches, elem,
1913                       grpc_schedule_on_exec_ctx);
1914     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
1915                   "starting next batch for send_* op(s)");
1916   }
1917 }
1918
1919 // Callback used to intercept on_complete from subchannel calls.
1920 // Called only when retries are enabled.
1921 static void on_complete(void* arg, grpc_error* error) {
1922   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1923   grpc_call_element* elem = batch_data->elem;
1924   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1925   call_data* calld = static_cast<call_data*>(elem->call_data);
1926   if (grpc_client_channel_call_trace.enabled()) {
1927     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
1928     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1929             chand, calld, grpc_error_string(error), batch_str);
1930     gpr_free(batch_str);
1931   }
1932   subchannel_call_retry_state* retry_state =
1933       static_cast<subchannel_call_retry_state*>(
1934           batch_data->subchannel_call->GetParentData());
1935   // Update bookkeeping in retry_state.
1936   if (batch_data->batch.send_initial_metadata) {
1937     retry_state->completed_send_initial_metadata = true;
1938   }
1939   if (batch_data->batch.send_message) {
1940     ++retry_state->completed_send_message_count;
1941   }
1942   if (batch_data->batch.send_trailing_metadata) {
1943     retry_state->completed_send_trailing_metadata = true;
1944   }
1945   // If the call is committed, free cached data for send ops that we've just
1946   // completed.
1947   if (calld->retry_committed) {
1948     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
1949   }
1950   // Construct list of closures to execute.
1951   grpc_core::CallCombinerClosureList closures;
1952   // If a retry was already dispatched, that means we saw
1953   // recv_trailing_metadata before this, so we do nothing here.
1954   // Otherwise, invoke the callback to return the result to the surface.
1955   if (!retry_state->retry_dispatched) {
1956     // Add closure for the completed pending batch, if any.
1957     add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
1958                                             GRPC_ERROR_REF(error), &closures);
1959     // If needed, add a callback to start any replay or pending send ops on
1960     // the subchannel call.
1961     if (!retry_state->completed_recv_trailing_metadata) {
1962       add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
1963                                                   &closures);
1964     }
1965   }
1966   // Track number of pending subchannel send batches and determine if this
1967   // was the last one.
1968   --calld->num_pending_retriable_subchannel_send_batches;
1969   const bool last_send_batch_complete =
1970       calld->num_pending_retriable_subchannel_send_batches == 0;
1971   // Don't need batch_data anymore.
1972   batch_data_unref(batch_data);
1973   // Schedule all of the closures identified above.
1974   // Note: This yeilds the call combiner.
1975   closures.RunClosures(calld->call_combiner);
1976   // If this was the last subchannel send batch, unref the call stack.
1977   if (last_send_batch_complete) {
1978     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
1979   }
1980 }
1981
1982 //
1983 // subchannel batch construction
1984 //
1985
1986 // Helper function used to start a subchannel batch in the call combiner.
1987 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
1988   grpc_transport_stream_op_batch* batch =
1989       static_cast<grpc_transport_stream_op_batch*>(arg);
1990   grpc_core::SubchannelCall* subchannel_call =
1991       static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
1992   // Note: This will release the call combiner.
1993   subchannel_call->StartTransportStreamOpBatch(batch);
1994 }
1995
1996 // Adds a closure to closures that will execute batch in the call combiner.
1997 static void add_closure_for_subchannel_batch(
1998     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
1999     grpc_core::CallCombinerClosureList* closures) {
2000   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2001   call_data* calld = static_cast<call_data*>(elem->call_data);
2002   batch->handler_private.extra_arg = calld->subchannel_call.get();
2003   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2004                     start_batch_in_call_combiner, batch,
2005                     grpc_schedule_on_exec_ctx);
2006   if (grpc_client_channel_call_trace.enabled()) {
2007     char* batch_str = grpc_transport_stream_op_batch_string(batch);
2008     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2009             calld, batch_str);
2010     gpr_free(batch_str);
2011   }
2012   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2013                 "start_subchannel_batch");
2014 }
2015
2016 // Adds retriable send_initial_metadata op to batch_data.
2017 static void add_retriable_send_initial_metadata_op(
2018     call_data* calld, subchannel_call_retry_state* retry_state,
2019     subchannel_batch_data* batch_data) {
2020   // Maps the number of retries to the corresponding metadata value slice.
2021   static const grpc_slice* retry_count_strings[] = {
2022       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2023   // We need to make a copy of the metadata batch for each attempt, since
2024   // the filters in the subchannel stack may modify this batch, and we don't
2025   // want those modifications to be passed forward to subsequent attempts.
2026   //
2027   // If we've already completed one or more attempts, add the
2028   // grpc-retry-attempts header.
2029   retry_state->send_initial_metadata_storage =
2030       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2031           calld->arena, sizeof(grpc_linked_mdelem) *
2032                             (calld->send_initial_metadata.list.count +
2033                              (calld->num_attempts_completed > 0))));
2034   grpc_metadata_batch_copy(&calld->send_initial_metadata,
2035                            &retry_state->send_initial_metadata,
2036                            retry_state->send_initial_metadata_storage);
2037   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2038                        .grpc_previous_rpc_attempts != nullptr)) {
2039     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2040                                retry_state->send_initial_metadata.idx.named
2041                                    .grpc_previous_rpc_attempts);
2042   }
2043   if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
2044     grpc_mdelem retry_md = grpc_mdelem_create(
2045         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2046         *retry_count_strings[calld->num_attempts_completed - 1], nullptr);
2047     grpc_error* error = grpc_metadata_batch_add_tail(
2048         &retry_state->send_initial_metadata,
2049         &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
2050                                                         .list.count],
2051         retry_md);
2052     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2053       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2054               grpc_error_string(error));
2055       GPR_ASSERT(false);
2056     }
2057   }
2058   retry_state->started_send_initial_metadata = true;
2059   batch_data->batch.send_initial_metadata = true;
2060   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2061       &retry_state->send_initial_metadata;
2062   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2063       calld->send_initial_metadata_flags;
2064   batch_data->batch.payload->send_initial_metadata.peer_string =
2065       calld->peer_string;
2066 }
2067
2068 // Adds retriable send_message op to batch_data.
2069 static void add_retriable_send_message_op(
2070     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2071     subchannel_batch_data* batch_data) {
2072   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2073   call_data* calld = static_cast<call_data*>(elem->call_data);
2074   if (grpc_client_channel_call_trace.enabled()) {
2075     gpr_log(GPR_INFO,
2076             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2077             chand, calld, retry_state->started_send_message_count);
2078   }
2079   grpc_core::ByteStreamCache* cache =
2080       calld->send_messages[retry_state->started_send_message_count];
2081   ++retry_state->started_send_message_count;
2082   retry_state->send_message.Init(cache);
2083   batch_data->batch.send_message = true;
2084   batch_data->batch.payload->send_message.send_message.reset(
2085       retry_state->send_message.get());
2086 }
2087
2088 // Adds retriable send_trailing_metadata op to batch_data.
2089 static void add_retriable_send_trailing_metadata_op(
2090     call_data* calld, subchannel_call_retry_state* retry_state,
2091     subchannel_batch_data* batch_data) {
2092   // We need to make a copy of the metadata batch for each attempt, since
2093   // the filters in the subchannel stack may modify this batch, and we don't
2094   // want those modifications to be passed forward to subsequent attempts.
2095   retry_state->send_trailing_metadata_storage =
2096       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2097           calld->arena, sizeof(grpc_linked_mdelem) *
2098                             calld->send_trailing_metadata.list.count));
2099   grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2100                            &retry_state->send_trailing_metadata,
2101                            retry_state->send_trailing_metadata_storage);
2102   retry_state->started_send_trailing_metadata = true;
2103   batch_data->batch.send_trailing_metadata = true;
2104   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2105       &retry_state->send_trailing_metadata;
2106 }
2107
2108 // Adds retriable recv_initial_metadata op to batch_data.
2109 static void add_retriable_recv_initial_metadata_op(
2110     call_data* calld, subchannel_call_retry_state* retry_state,
2111     subchannel_batch_data* batch_data) {
2112   retry_state->started_recv_initial_metadata = true;
2113   batch_data->batch.recv_initial_metadata = true;
2114   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2115   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2116       &retry_state->recv_initial_metadata;
2117   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2118       &retry_state->trailing_metadata_available;
2119   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2120                     recv_initial_metadata_ready, batch_data,
2121                     grpc_schedule_on_exec_ctx);
2122   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2123       &retry_state->recv_initial_metadata_ready;
2124 }
2125
2126 // Adds retriable recv_message op to batch_data.
2127 static void add_retriable_recv_message_op(
2128     call_data* calld, subchannel_call_retry_state* retry_state,
2129     subchannel_batch_data* batch_data) {
2130   ++retry_state->started_recv_message_count;
2131   batch_data->batch.recv_message = true;
2132   batch_data->batch.payload->recv_message.recv_message =
2133       &retry_state->recv_message;
2134   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
2135                     batch_data, grpc_schedule_on_exec_ctx);
2136   batch_data->batch.payload->recv_message.recv_message_ready =
2137       &retry_state->recv_message_ready;
2138 }
2139
2140 // Adds retriable recv_trailing_metadata op to batch_data.
2141 static void add_retriable_recv_trailing_metadata_op(
2142     call_data* calld, subchannel_call_retry_state* retry_state,
2143     subchannel_batch_data* batch_data) {
2144   retry_state->started_recv_trailing_metadata = true;
2145   batch_data->batch.recv_trailing_metadata = true;
2146   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
2147   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2148       &retry_state->recv_trailing_metadata;
2149   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
2150       &retry_state->collect_stats;
2151   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
2152                     recv_trailing_metadata_ready, batch_data,
2153                     grpc_schedule_on_exec_ctx);
2154   batch_data->batch.payload->recv_trailing_metadata
2155       .recv_trailing_metadata_ready =
2156       &retry_state->recv_trailing_metadata_ready;
2157   maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
2158                                                    &batch_data->batch);
2159 }
2160
2161 // Helper function used to start a recv_trailing_metadata batch.  This
2162 // is used in the case where a recv_initial_metadata or recv_message
2163 // op fails in a way that we know the call is over but when the application
2164 // has not yet started its own recv_trailing_metadata op.
2165 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2166   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2167   call_data* calld = static_cast<call_data*>(elem->call_data);
2168   if (grpc_client_channel_call_trace.enabled()) {
2169     gpr_log(GPR_INFO,
2170             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2171             "started; starting it internally",
2172             chand, calld);
2173   }
2174   subchannel_call_retry_state* retry_state =
2175       static_cast<subchannel_call_retry_state*>(
2176           calld->subchannel_call->GetParentData());
2177   // Create batch_data with 2 refs, since this batch will be unreffed twice:
2178   // once for the recv_trailing_metadata_ready callback when the subchannel
2179   // batch returns, and again when we actually get a recv_trailing_metadata
2180   // op from the surface.
2181   subchannel_batch_data* batch_data =
2182       batch_data_create(elem, 2, false /* set_on_complete */);
2183   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2184   retry_state->recv_trailing_metadata_internal_batch = batch_data;
2185   // Note: This will release the call combiner.
2186   calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch);
2187 }
2188
2189 // If there are any cached send ops that need to be replayed on the
2190 // current subchannel call, creates and returns a new subchannel batch
2191 // to replay those ops.  Otherwise, returns nullptr.
2192 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2193     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2194   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2195   call_data* calld = static_cast<call_data*>(elem->call_data);
2196   subchannel_batch_data* replay_batch_data = nullptr;
2197   // send_initial_metadata.
2198   if (calld->seen_send_initial_metadata &&
2199       !retry_state->started_send_initial_metadata &&
2200       !calld->pending_send_initial_metadata) {
2201     if (grpc_client_channel_call_trace.enabled()) {
2202       gpr_log(GPR_INFO,
2203               "chand=%p calld=%p: replaying previously completed "
2204               "send_initial_metadata op",
2205               chand, calld);
2206     }
2207     replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
2208     add_retriable_send_initial_metadata_op(calld, retry_state,
2209                                            replay_batch_data);
2210   }
2211   // send_message.
2212   // Note that we can only have one send_message op in flight at a time.
2213   if (retry_state->started_send_message_count < calld->send_messages.size() &&
2214       retry_state->started_send_message_count ==
2215           retry_state->completed_send_message_count &&
2216       !calld->pending_send_message) {
2217     if (grpc_client_channel_call_trace.enabled()) {
2218       gpr_log(GPR_INFO,
2219               "chand=%p calld=%p: replaying previously completed "
2220               "send_message op",
2221               chand, calld);
2222     }
2223     if (replay_batch_data == nullptr) {
2224       replay_batch_data =
2225           batch_data_create(elem, 1, true /* set_on_complete */);
2226     }
2227     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2228   }
2229   // send_trailing_metadata.
2230   // Note that we only add this op if we have no more send_message ops
2231   // to start, since we can't send down any more send_message ops after
2232   // send_trailing_metadata.
2233   if (calld->seen_send_trailing_metadata &&
2234       retry_state->started_send_message_count == calld->send_messages.size() &&
2235       !retry_state->started_send_trailing_metadata &&
2236       !calld->pending_send_trailing_metadata) {
2237     if (grpc_client_channel_call_trace.enabled()) {
2238       gpr_log(GPR_INFO,
2239               "chand=%p calld=%p: replaying previously completed "
2240               "send_trailing_metadata op",
2241               chand, calld);
2242     }
2243     if (replay_batch_data == nullptr) {
2244       replay_batch_data =
2245           batch_data_create(elem, 1, true /* set_on_complete */);
2246     }
2247     add_retriable_send_trailing_metadata_op(calld, retry_state,
2248                                             replay_batch_data);
2249   }
2250   return replay_batch_data;
2251 }
2252
2253 // Adds subchannel batches for pending batches to batches, updating
2254 // *num_batches as needed.
2255 static void add_subchannel_batches_for_pending_batches(
2256     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2257     grpc_core::CallCombinerClosureList* closures) {
2258   call_data* calld = static_cast<call_data*>(elem->call_data);
2259   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2260     pending_batch* pending = &calld->pending_batches[i];
2261     grpc_transport_stream_op_batch* batch = pending->batch;
2262     if (batch == nullptr) continue;
2263     // Skip any batch that either (a) has already been started on this
2264     // subchannel call or (b) we can't start yet because we're still
2265     // replaying send ops that need to be completed first.
2266     // TODO(roth): Note that if any one op in the batch can't be sent
2267     // yet due to ops that we're replaying, we don't start any of the ops
2268     // in the batch.  This is probably okay, but it could conceivably
2269     // lead to increased latency in some cases -- e.g., we could delay
2270     // starting a recv op due to it being in the same batch with a send
2271     // op.  If/when we revamp the callback protocol in
2272     // transport_stream_op_batch, we may be able to fix this.
2273     if (batch->send_initial_metadata &&
2274         retry_state->started_send_initial_metadata) {
2275       continue;
2276     }
2277     if (batch->send_message && retry_state->completed_send_message_count <
2278                                    retry_state->started_send_message_count) {
2279       continue;
2280     }
2281     // Note that we only start send_trailing_metadata if we have no more
2282     // send_message ops to start, since we can't send down any more
2283     // send_message ops after send_trailing_metadata.
2284     if (batch->send_trailing_metadata &&
2285         (retry_state->started_send_message_count + batch->send_message <
2286              calld->send_messages.size() ||
2287          retry_state->started_send_trailing_metadata)) {
2288       continue;
2289     }
2290     if (batch->recv_initial_metadata &&
2291         retry_state->started_recv_initial_metadata) {
2292       continue;
2293     }
2294     if (batch->recv_message && retry_state->completed_recv_message_count <
2295                                    retry_state->started_recv_message_count) {
2296       continue;
2297     }
2298     if (batch->recv_trailing_metadata &&
2299         retry_state->started_recv_trailing_metadata) {
2300       // If we previously completed a recv_trailing_metadata op
2301       // initiated by start_internal_recv_trailing_metadata(), use the
2302       // result of that instead of trying to re-start this op.
2303       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2304                         nullptr))) {
2305         // If the batch completed, then trigger the completion callback
2306         // directly, so that we return the previously returned results to
2307         // the application.  Otherwise, just unref the internally
2308         // started subchannel batch, since we'll propagate the
2309         // completion when it completes.
2310         if (retry_state->completed_recv_trailing_metadata) {
2311           // Batches containing recv_trailing_metadata always succeed.
2312           closures->Add(
2313               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
2314               "re-executing recv_trailing_metadata_ready to propagate "
2315               "internally triggered result");
2316         } else {
2317           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2318         }
2319         retry_state->recv_trailing_metadata_internal_batch = nullptr;
2320       }
2321       continue;
2322     }
2323     // If we're not retrying, just send the batch as-is.
2324     if (calld->method_params == nullptr ||
2325         calld->method_params->retry_policy() == nullptr ||
2326         calld->retry_committed) {
2327       add_closure_for_subchannel_batch(elem, batch, closures);
2328       pending_batch_clear(calld, pending);
2329       continue;
2330     }
2331     // Create batch with the right number of callbacks.
2332     const bool has_send_ops = batch->send_initial_metadata ||
2333                               batch->send_message ||
2334                               batch->send_trailing_metadata;
2335     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2336                               batch->recv_message +
2337                               batch->recv_trailing_metadata;
2338     subchannel_batch_data* batch_data = batch_data_create(
2339         elem, num_callbacks, has_send_ops /* set_on_complete */);
2340     // Cache send ops if needed.
2341     maybe_cache_send_ops_for_batch(calld, pending);
2342     // send_initial_metadata.
2343     if (batch->send_initial_metadata) {
2344       add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2345     }
2346     // send_message.
2347     if (batch->send_message) {
2348       add_retriable_send_message_op(elem, retry_state, batch_data);
2349     }
2350     // send_trailing_metadata.
2351     if (batch->send_trailing_metadata) {
2352       add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2353     }
2354     // recv_initial_metadata.
2355     if (batch->recv_initial_metadata) {
2356       // recv_flags is only used on the server side.
2357       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2358       add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2359     }
2360     // recv_message.
2361     if (batch->recv_message) {
2362       add_retriable_recv_message_op(calld, retry_state, batch_data);
2363     }
2364     // recv_trailing_metadata.
2365     if (batch->recv_trailing_metadata) {
2366       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2367     }
2368     add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
2369     // Track number of pending subchannel send batches.
2370     // If this is the first one, take a ref to the call stack.
2371     if (batch->send_initial_metadata || batch->send_message ||
2372         batch->send_trailing_metadata) {
2373       if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2374         GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2375       }
2376       ++calld->num_pending_retriable_subchannel_send_batches;
2377     }
2378   }
2379 }
2380
2381 // Constructs and starts whatever subchannel batches are needed on the
2382 // subchannel call.
2383 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2384   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2385   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2386   call_data* calld = static_cast<call_data*>(elem->call_data);
2387   if (grpc_client_channel_call_trace.enabled()) {
2388     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
2389             chand, calld);
2390   }
2391   subchannel_call_retry_state* retry_state =
2392       static_cast<subchannel_call_retry_state*>(
2393           calld->subchannel_call->GetParentData());
2394   // Construct list of closures to execute, one for each pending batch.
2395   grpc_core::CallCombinerClosureList closures;
2396   // Replay previously-returned send_* ops if needed.
2397   subchannel_batch_data* replay_batch_data =
2398       maybe_create_subchannel_batch_for_replay(elem, retry_state);
2399   if (replay_batch_data != nullptr) {
2400     add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2401                                      &closures);
2402     // Track number of pending subchannel send batches.
2403     // If this is the first one, take a ref to the call stack.
2404     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2405       GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2406     }
2407     ++calld->num_pending_retriable_subchannel_send_batches;
2408   }
2409   // Now add pending batches.
2410   add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
2411   // Start batches on subchannel call.
2412   if (grpc_client_channel_call_trace.enabled()) {
2413     gpr_log(GPR_INFO,
2414             "chand=%p calld=%p: starting %" PRIuPTR
2415             " retriable batches on subchannel_call=%p",
2416             chand, calld, closures.size(), calld->subchannel_call.get());
2417   }
2418   // Note: This will yield the call combiner.
2419   closures.RunClosures(calld->call_combiner);
2420 }
2421
2422 //
2423 // LB pick
2424 //
2425
2426 static void create_subchannel_call(grpc_call_element* elem) {
2427   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2428   call_data* calld = static_cast<call_data*>(elem->call_data);
2429   const size_t parent_data_size =
2430       calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
2431   const grpc_core::ConnectedSubchannel::CallArgs call_args = {
2432       calld->pollent,          // pollent
2433       calld->path,             // path
2434       calld->call_start_time,  // start_time
2435       calld->deadline,         // deadline
2436       calld->arena,            // arena
2437       // TODO(roth): When we implement hedging support, we will probably
2438       // need to use a separate call context for each subchannel call.
2439       calld->call_context,   // context
2440       calld->call_combiner,  // call_combiner
2441       parent_data_size       // parent_data_size
2442   };
2443   grpc_error* error = GRPC_ERROR_NONE;
2444   calld->subchannel_call =
2445       calld->pick.pick.connected_subchannel->CreateCall(call_args, &error);
2446   if (grpc_client_channel_routing_trace.enabled()) {
2447     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
2448             chand, calld, calld->subchannel_call.get(),
2449             grpc_error_string(error));
2450   }
2451   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2452     pending_batches_fail(elem, error, yield_call_combiner);
2453   } else {
2454     if (parent_data_size > 0) {
2455       new (calld->subchannel_call->GetParentData())
2456           subchannel_call_retry_state(calld->call_context);
2457     }
2458     pending_batches_resume(elem);
2459   }
2460 }
2461
2462 // Invoked when a pick is completed, on both success or failure.
2463 static void pick_done(void* arg, grpc_error* error) {
2464   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2465   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2466   call_data* calld = static_cast<call_data*>(elem->call_data);
2467   if (error != GRPC_ERROR_NONE) {
2468     if (grpc_client_channel_routing_trace.enabled()) {
2469       gpr_log(GPR_INFO,
2470               "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
2471               calld, grpc_error_string(error));
2472     }
2473     pending_batches_fail(elem, GRPC_ERROR_REF(error), yield_call_combiner);
2474     return;
2475   }
2476   create_subchannel_call(elem);
2477 }
2478
2479 namespace grpc_core {
2480 namespace {
2481
2482 // A class to handle the call combiner cancellation callback for a
2483 // queued pick.
2484 class QueuedPickCanceller {
2485  public:
2486   explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
2487     auto* calld = static_cast<call_data*>(elem->call_data);
2488     auto* chand = static_cast<channel_data*>(elem->channel_data);
2489     GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
2490     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2491                       grpc_combiner_scheduler(chand->combiner));
2492     grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
2493   }
2494
2495  private:
2496   static void CancelLocked(void* arg, grpc_error* error) {
2497     auto* self = static_cast<QueuedPickCanceller*>(arg);
2498     auto* chand = static_cast<channel_data*>(self->elem_->channel_data);
2499     auto* calld = static_cast<call_data*>(self->elem_->call_data);
2500     if (grpc_client_channel_routing_trace.enabled()) {
2501       gpr_log(GPR_INFO,
2502               "chand=%p calld=%p: cancelling queued pick: "
2503               "error=%s self=%p calld->pick_canceller=%p",
2504               chand, calld, grpc_error_string(error), self,
2505               calld->pick_canceller);
2506     }
2507     if (calld->pick_canceller == self && error != GRPC_ERROR_NONE) {
2508       // Remove pick from list of queued picks.
2509       remove_call_from_queued_picks_locked(self->elem_);
2510       // Fail pending batches on the call.
2511       pending_batches_fail(self->elem_, GRPC_ERROR_REF(error),
2512                            yield_call_combiner_if_pending_batches_found);
2513     }
2514     GRPC_CALL_STACK_UNREF(calld->owning_call, "QueuedPickCanceller");
2515     Delete(self);
2516   }
2517
2518   grpc_call_element* elem_;
2519   grpc_closure closure_;
2520 };
2521
2522 }  // namespace
2523 }  // namespace grpc_core
2524
2525 // Removes the call from the channel's list of queued picks.
2526 static void remove_call_from_queued_picks_locked(grpc_call_element* elem) {
2527   auto* chand = static_cast<channel_data*>(elem->channel_data);
2528   auto* calld = static_cast<call_data*>(elem->call_data);
2529   for (QueuedPick** pick = &chand->queued_picks; *pick != nullptr;
2530        pick = &(*pick)->next) {
2531     if (*pick == &calld->pick) {
2532       if (grpc_client_channel_routing_trace.enabled()) {
2533         gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
2534                 chand, calld);
2535       }
2536       calld->pick_queued = false;
2537       *pick = calld->pick.next;
2538       // Remove call's pollent from channel's interested_parties.
2539       grpc_polling_entity_del_from_pollset_set(calld->pollent,
2540                                                chand->interested_parties);
2541       // Lame the call combiner canceller.
2542       calld->pick_canceller = nullptr;
2543       break;
2544     }
2545   }
2546 }
2547
2548 // Adds the call to the channel's list of queued picks.
2549 static void add_call_to_queued_picks_locked(grpc_call_element* elem) {
2550   auto* chand = static_cast<channel_data*>(elem->channel_data);
2551   auto* calld = static_cast<call_data*>(elem->call_data);
2552   if (grpc_client_channel_routing_trace.enabled()) {
2553     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
2554             calld);
2555   }
2556   calld->pick_queued = true;
2557   // Add call to queued picks list.
2558   calld->pick.elem = elem;
2559   calld->pick.next = chand->queued_picks;
2560   chand->queued_picks = &calld->pick;
2561   // Add call's pollent to channel's interested_parties, so that I/O
2562   // can be done under the call's CQ.
2563   grpc_polling_entity_add_to_pollset_set(calld->pollent,
2564                                          chand->interested_parties);
2565   // Register call combiner cancellation callback.
2566   calld->pick_canceller = grpc_core::New<grpc_core::QueuedPickCanceller>(elem);
2567 }
2568
2569 // Applies service config to the call.  Must be invoked once we know
2570 // that the resolver has returned results to the channel.
2571 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2572   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2573   call_data* calld = static_cast<call_data*>(elem->call_data);
2574   if (grpc_client_channel_routing_trace.enabled()) {
2575     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2576             chand, calld);
2577   }
2578   if (chand->retry_throttle_data != nullptr) {
2579     calld->retry_throttle_data = chand->retry_throttle_data->Ref();
2580   }
2581   if (chand->method_params_table != nullptr) {
2582     calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2583         *chand->method_params_table, calld->path);
2584     if (calld->method_params != nullptr) {
2585       // If the deadline from the service config is shorter than the one
2586       // from the client API, reset the deadline timer.
2587       if (chand->deadline_checking_enabled &&
2588           calld->method_params->timeout() != 0) {
2589         const grpc_millis per_method_deadline =
2590             grpc_timespec_to_millis_round_up(calld->call_start_time) +
2591             calld->method_params->timeout();
2592         if (per_method_deadline < calld->deadline) {
2593           calld->deadline = per_method_deadline;
2594           grpc_deadline_state_reset(elem, calld->deadline);
2595         }
2596       }
2597       // If the service config set wait_for_ready and the application
2598       // did not explicitly set it, use the value from the service config.
2599       uint32_t* send_initial_metadata_flags =
2600           &calld->pending_batches[0]
2601                .batch->payload->send_initial_metadata
2602                .send_initial_metadata_flags;
2603       if (GPR_UNLIKELY(
2604               calld->method_params->wait_for_ready() !=
2605                   ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2606               !(*send_initial_metadata_flags &
2607                 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2608         if (calld->method_params->wait_for_ready() ==
2609             ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2610           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2611         } else {
2612           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2613         }
2614       }
2615     }
2616   }
2617   // If no retry policy, disable retries.
2618   // TODO(roth): Remove this when adding support for transparent retries.
2619   if (calld->method_params == nullptr ||
2620       calld->method_params->retry_policy() == nullptr) {
2621     calld->enable_retries = false;
2622   }
2623 }
2624
2625 // Invoked once resolver results are available.
2626 static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
2627   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2628   call_data* calld = static_cast<call_data*>(elem->call_data);
2629   // Apply service config data to the call only once, and only if the
2630   // channel has the data available.
2631   if (GPR_LIKELY(chand->have_service_config &&
2632                  !calld->service_config_applied)) {
2633     calld->service_config_applied = true;
2634     apply_service_config_to_call_locked(elem);
2635   }
2636 }
2637
2638 static const char* pick_result_name(LoadBalancingPolicy::PickResult result) {
2639   switch (result) {
2640     case LoadBalancingPolicy::PICK_COMPLETE:
2641       return "COMPLETE";
2642     case LoadBalancingPolicy::PICK_QUEUE:
2643       return "QUEUE";
2644     case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
2645       return "TRANSIENT_FAILURE";
2646   }
2647   GPR_UNREACHABLE_CODE(return "UNKNOWN");
2648 }
2649
2650 static void start_pick_locked(void* arg, grpc_error* error) {
2651   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2652   call_data* calld = static_cast<call_data*>(elem->call_data);
2653   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2654   GPR_ASSERT(calld->pick.pick.connected_subchannel == nullptr);
2655   GPR_ASSERT(calld->subchannel_call == nullptr);
2656   // If this is a retry, use the send_initial_metadata payload that
2657   // we've cached; otherwise, use the pending batch.  The
2658   // send_initial_metadata batch will be the first pending batch in the
2659   // list, as set by get_batch_index() above.
2660   // TODO(roth): What if the LB policy needs to add something to the
2661   // call's initial metadata, and then there's a retry?  We don't want
2662   // the new metadata to be added twice.  We might need to somehow
2663   // allocate the subchannel batch earlier so that we can give the
2664   // subchannel's copy of the metadata batch (which is copied for each
2665   // attempt) to the LB policy instead the one from the parent channel.
2666   calld->pick.pick.initial_metadata =
2667       calld->seen_send_initial_metadata
2668           ? &calld->send_initial_metadata
2669           : calld->pending_batches[0]
2670                 .batch->payload->send_initial_metadata.send_initial_metadata;
2671   uint32_t* send_initial_metadata_flags =
2672       calld->seen_send_initial_metadata
2673           ? &calld->send_initial_metadata_flags
2674           : &calld->pending_batches[0]
2675                  .batch->payload->send_initial_metadata
2676                  .send_initial_metadata_flags;
2677   // Apply service config to call if needed.
2678   maybe_apply_service_config_to_call_locked(elem);
2679   // When done, we schedule this closure to leave the channel combiner.
2680   GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2681                     grpc_schedule_on_exec_ctx);
2682   // Attempt pick.
2683   error = GRPC_ERROR_NONE;
2684   auto pick_result = chand->picker->Pick(&calld->pick.pick, &error);
2685   if (grpc_client_channel_routing_trace.enabled()) {
2686     gpr_log(GPR_INFO,
2687             "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
2688             "error=%s)",
2689             chand, calld, pick_result_name(pick_result),
2690             calld->pick.pick.connected_subchannel.get(),
2691             grpc_error_string(error));
2692   }
2693   switch (pick_result) {
2694     case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
2695       // If we're shutting down, fail all RPCs.
2696       if (chand->disconnect_error != GRPC_ERROR_NONE) {
2697         GRPC_ERROR_UNREF(error);
2698         GRPC_CLOSURE_SCHED(&calld->pick_closure,
2699                            GRPC_ERROR_REF(chand->disconnect_error));
2700         break;
2701       }
2702       // If wait_for_ready is false, then the error indicates the RPC
2703       // attempt's final status.
2704       if ((*send_initial_metadata_flags &
2705            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
2706         // Retry if appropriate; otherwise, fail.
2707         grpc_status_code status = GRPC_STATUS_OK;
2708         grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2709                               nullptr);
2710         if (!calld->enable_retries ||
2711             !maybe_retry(elem, nullptr /* batch_data */, status,
2712                          nullptr /* server_pushback_md */)) {
2713           grpc_error* new_error =
2714               GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2715                   "Failed to create subchannel", &error, 1);
2716           GRPC_ERROR_UNREF(error);
2717           GRPC_CLOSURE_SCHED(&calld->pick_closure, new_error);
2718         }
2719         if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
2720         break;
2721       }
2722       // If wait_for_ready is true, then queue to retry when we get a new
2723       // picker.
2724       GRPC_ERROR_UNREF(error);
2725       // Fallthrough
2726     case LoadBalancingPolicy::PICK_QUEUE:
2727       if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
2728       break;
2729     default:  // PICK_COMPLETE
2730       // Handle drops.
2731       if (GPR_UNLIKELY(calld->pick.pick.connected_subchannel == nullptr)) {
2732         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2733             "Call dropped by load balancing policy");
2734       }
2735       GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
2736       if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
2737   }
2738 }
2739
2740 //
2741 // filter call vtable functions
2742 //
2743
2744 static void cc_start_transport_stream_op_batch(
2745     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2746   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2747   call_data* calld = static_cast<call_data*>(elem->call_data);
2748   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2749   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
2750     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2751   }
2752   // If we've previously been cancelled, immediately fail any new batches.
2753   if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
2754     if (grpc_client_channel_call_trace.enabled()) {
2755       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2756               chand, calld, grpc_error_string(calld->cancel_error));
2757     }
2758     // Note: This will release the call combiner.
2759     grpc_transport_stream_op_batch_finish_with_failure(
2760         batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2761     return;
2762   }
2763   // Handle cancellation.
2764   if (GPR_UNLIKELY(batch->cancel_stream)) {
2765     // Stash a copy of cancel_error in our call data, so that we can use
2766     // it for subsequent operations.  This ensures that if the call is
2767     // cancelled before any batches are passed down (e.g., if the deadline
2768     // is in the past when the call starts), we can return the right
2769     // error to the caller when the first batch does get passed down.
2770     GRPC_ERROR_UNREF(calld->cancel_error);
2771     calld->cancel_error =
2772         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2773     if (grpc_client_channel_call_trace.enabled()) {
2774       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2775               calld, grpc_error_string(calld->cancel_error));
2776     }
2777     // If we do not have a subchannel call (i.e., a pick has not yet
2778     // been started), fail all pending batches.  Otherwise, send the
2779     // cancellation down to the subchannel call.
2780     if (calld->subchannel_call == nullptr) {
2781       // TODO(roth): If there is a pending retry callback, do we need to
2782       // cancel it here?
2783       pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
2784                            no_yield_call_combiner);
2785       // Note: This will release the call combiner.
2786       grpc_transport_stream_op_batch_finish_with_failure(
2787           batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
2788     } else {
2789       // Note: This will release the call combiner.
2790       calld->subchannel_call->StartTransportStreamOpBatch(batch);
2791     }
2792     return;
2793   }
2794   // Add the batch to the pending list.
2795   pending_batches_add(elem, batch);
2796   // Check if we've already gotten a subchannel call.
2797   // Note that once we have completed the pick, we do not need to enter
2798   // the channel combiner, which is more efficient (especially for
2799   // streaming calls).
2800   if (calld->subchannel_call != nullptr) {
2801     if (grpc_client_channel_call_trace.enabled()) {
2802       gpr_log(GPR_INFO,
2803               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
2804               calld, calld->subchannel_call.get());
2805     }
2806     pending_batches_resume(elem);
2807     return;
2808   }
2809   // We do not yet have a subchannel call.
2810   // For batches containing a send_initial_metadata op, enter the channel
2811   // combiner to start a pick.
2812   if (GPR_LIKELY(batch->send_initial_metadata)) {
2813     if (grpc_client_channel_call_trace.enabled()) {
2814       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
2815               chand, calld);
2816     }
2817     GRPC_CLOSURE_SCHED(
2818         GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
2819                           elem, grpc_combiner_scheduler(chand->combiner)),
2820         GRPC_ERROR_NONE);
2821   } else {
2822     // For all other batches, release the call combiner.
2823     if (grpc_client_channel_call_trace.enabled()) {
2824       gpr_log(GPR_INFO,
2825               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2826               calld);
2827     }
2828     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2829                             "batch does not include send_initial_metadata");
2830   }
2831 }
2832
2833 /* Constructor for call_data */
2834 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
2835                                      const grpc_call_element_args* args) {
2836   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2837   new (elem->call_data) call_data(elem, *chand, *args);
2838   return GRPC_ERROR_NONE;
2839 }
2840
2841 /* Destructor for call_data */
2842 static void cc_destroy_call_elem(grpc_call_element* elem,
2843                                  const grpc_call_final_info* final_info,
2844                                  grpc_closure* then_schedule_closure) {
2845   call_data* calld = static_cast<call_data*>(elem->call_data);
2846   if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
2847     calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
2848     then_schedule_closure = nullptr;
2849   }
2850   calld->~call_data();
2851   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
2852 }
2853
2854 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
2855                                           grpc_polling_entity* pollent) {
2856   call_data* calld = static_cast<call_data*>(elem->call_data);
2857   calld->pollent = pollent;
2858 }
2859
2860 /*************************************************************************
2861  * EXPORTED SYMBOLS
2862  */
2863
2864 const grpc_channel_filter grpc_client_channel_filter = {
2865     cc_start_transport_stream_op_batch,
2866     cc_start_transport_op,
2867     sizeof(call_data),
2868     cc_init_call_elem,
2869     cc_set_pollset_or_pollset_set,
2870     cc_destroy_call_elem,
2871     sizeof(channel_data),
2872     cc_init_channel_elem,
2873     cc_destroy_channel_elem,
2874     cc_get_channel_info,
2875     "client-channel",
2876 };
2877
2878 void grpc_client_channel_set_channelz_node(
2879     grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
2880   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2881   chand->channelz_node = node;
2882   chand->resolving_lb_policy->set_channelz_node(node->Ref());
2883 }
2884
2885 void grpc_client_channel_populate_child_refs(
2886     grpc_channel_element* elem,
2887     grpc_core::channelz::ChildRefsList* child_subchannels,
2888     grpc_core::channelz::ChildRefsList* child_channels) {
2889   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2890   if (chand->resolving_lb_policy != nullptr) {
2891     chand->resolving_lb_policy->FillChildRefsForChannelz(child_subchannels,
2892                                                          child_channels);
2893   }
2894 }
2895
2896 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
2897   channel_data* chand = static_cast<channel_data*>(arg);
2898   chand->resolving_lb_policy->ExitIdleLocked();
2899   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
2900 }
2901
2902 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
2903     grpc_channel_element* elem, int try_to_connect) {
2904   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2905   grpc_connectivity_state out =
2906       grpc_connectivity_state_check(&chand->state_tracker);
2907   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2908     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
2909     GRPC_CLOSURE_SCHED(
2910         GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
2911                             grpc_combiner_scheduler(chand->combiner)),
2912         GRPC_ERROR_NONE);
2913   }
2914   return out;
2915 }
2916
2917 typedef struct external_connectivity_watcher {
2918   channel_data* chand;
2919   grpc_polling_entity pollent;
2920   grpc_closure* on_complete;
2921   grpc_closure* watcher_timer_init;
2922   grpc_connectivity_state* state;
2923   grpc_closure my_closure;
2924   struct external_connectivity_watcher* next;
2925 } external_connectivity_watcher;
2926
2927 static external_connectivity_watcher* lookup_external_connectivity_watcher(
2928     channel_data* chand, grpc_closure* on_complete) {
2929   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2930   external_connectivity_watcher* w =
2931       chand->external_connectivity_watcher_list_head;
2932   while (w != nullptr && w->on_complete != on_complete) {
2933     w = w->next;
2934   }
2935   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2936   return w;
2937 }
2938
2939 static void external_connectivity_watcher_list_append(
2940     channel_data* chand, external_connectivity_watcher* w) {
2941   GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
2942
2943   gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
2944   GPR_ASSERT(!w->next);
2945   w->next = chand->external_connectivity_watcher_list_head;
2946   chand->external_connectivity_watcher_list_head = w;
2947   gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
2948 }
2949
2950 static void external_connectivity_watcher_list_remove(
2951     channel_data* chand, external_connectivity_watcher* to_remove) {
2952   GPR_ASSERT(
2953       lookup_external_connectivity_watcher(chand, to_remove->on_complete));
2954   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2955   if (to_remove == chand->external_connectivity_watcher_list_head) {
2956     chand->external_connectivity_watcher_list_head = to_remove->next;
2957     gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2958     return;
2959   }
2960   external_connectivity_watcher* w =
2961       chand->external_connectivity_watcher_list_head;
2962   while (w != nullptr) {
2963     if (w->next == to_remove) {
2964       w->next = w->next->next;
2965       gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2966       return;
2967     }
2968     w = w->next;
2969   }
2970   GPR_UNREACHABLE_CODE(return );
2971 }
2972
2973 int grpc_client_channel_num_external_connectivity_watchers(
2974     grpc_channel_element* elem) {
2975   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2976   int count = 0;
2977
2978   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
2979   external_connectivity_watcher* w =
2980       chand->external_connectivity_watcher_list_head;
2981   while (w != nullptr) {
2982     count++;
2983     w = w->next;
2984   }
2985   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
2986
2987   return count;
2988 }
2989
2990 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
2991   external_connectivity_watcher* w =
2992       static_cast<external_connectivity_watcher*>(arg);
2993   grpc_closure* follow_up = w->on_complete;
2994   grpc_polling_entity_del_from_pollset_set(&w->pollent,
2995                                            w->chand->interested_parties);
2996   GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
2997                            "external_connectivity_watcher");
2998   external_connectivity_watcher_list_remove(w->chand, w);
2999   gpr_free(w);
3000   GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
3001 }
3002
3003 static void watch_connectivity_state_locked(void* arg,
3004                                             grpc_error* error_ignored) {
3005   external_connectivity_watcher* w =
3006       static_cast<external_connectivity_watcher*>(arg);
3007   external_connectivity_watcher* found = nullptr;
3008   if (w->state != nullptr) {
3009     external_connectivity_watcher_list_append(w->chand, w);
3010     // An assumption is being made that the closure is scheduled on the exec ctx
3011     // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
3012     GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
3013     GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3014                       grpc_combiner_scheduler(w->chand->combiner));
3015     grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3016                                                    w->state, &w->my_closure);
3017   } else {
3018     GPR_ASSERT(w->watcher_timer_init == nullptr);
3019     found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3020     if (found) {
3021       GPR_ASSERT(found->on_complete == w->on_complete);
3022       grpc_connectivity_state_notify_on_state_change(
3023           &found->chand->state_tracker, nullptr, &found->my_closure);
3024     }
3025     grpc_polling_entity_del_from_pollset_set(&w->pollent,
3026                                              w->chand->interested_parties);
3027     GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
3028                              "external_connectivity_watcher");
3029     gpr_free(w);
3030   }
3031 }
3032
3033 void grpc_client_channel_watch_connectivity_state(
3034     grpc_channel_element* elem, grpc_polling_entity pollent,
3035     grpc_connectivity_state* state, grpc_closure* closure,
3036     grpc_closure* watcher_timer_init) {
3037   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3038   external_connectivity_watcher* w =
3039       static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
3040   w->chand = chand;
3041   w->pollent = pollent;
3042   w->on_complete = closure;
3043   w->state = state;
3044   w->watcher_timer_init = watcher_timer_init;
3045   grpc_polling_entity_add_to_pollset_set(&w->pollent,
3046                                          chand->interested_parties);
3047   GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3048                          "external_connectivity_watcher");
3049   GRPC_CLOSURE_SCHED(
3050       GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
3051                         grpc_combiner_scheduler(chand->combiner)),
3052       GRPC_ERROR_NONE);
3053 }
3054
3055 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3056 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3057   call_data* calld = static_cast<call_data*>(elem->call_data);
3058   return calld->subchannel_call;
3059 }