Imported Upstream version 1.22.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / client_channel.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/map.h"
55 #include "src/core/lib/gprpp/sync.h"
56 #include "src/core/lib/iomgr/combiner.h"
57 #include "src/core/lib/iomgr/iomgr.h"
58 #include "src/core/lib/iomgr/polling_entity.h"
59 #include "src/core/lib/profiling/timers.h"
60 #include "src/core/lib/slice/slice_internal.h"
61 #include "src/core/lib/slice/slice_string_helpers.h"
62 #include "src/core/lib/surface/channel.h"
63 #include "src/core/lib/transport/connectivity_state.h"
64 #include "src/core/lib/transport/error_utils.h"
65 #include "src/core/lib/transport/metadata.h"
66 #include "src/core/lib/transport/metadata_batch.h"
67 #include "src/core/lib/transport/static_metadata.h"
68 #include "src/core/lib/transport/status_metadata.h"
69
70 using grpc_core::internal::ClientChannelMethodParsedConfig;
71 using grpc_core::internal::ServerRetryThrottleData;
72
73 //
74 // Client channel filter
75 //
76
77 // By default, we buffer 256 KiB per RPC for retries.
78 // TODO(roth): Do we have any data to suggest a better value?
79 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
80
81 // This value was picked arbitrarily.  It can be changed if there is
82 // any even moderately compelling reason to do so.
83 #define RETRY_BACKOFF_JITTER 0.2
84
85 // Max number of batches that can be pending on a call at any given
86 // time.  This includes one batch for each of the following ops:
87 //   recv_initial_metadata
88 //   send_initial_metadata
89 //   recv_message
90 //   send_message
91 //   recv_trailing_metadata
92 //   send_trailing_metadata
93 #define MAX_PENDING_BATCHES 6
94
95 namespace grpc_core {
96
97 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
98 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
99
100 namespace {
101
102 //
103 // ChannelData definition
104 //
105
106 class ChannelData {
107  public:
108   struct QueuedPick {
109     grpc_call_element* elem;
110     QueuedPick* next = nullptr;
111   };
112
113   static grpc_error* Init(grpc_channel_element* elem,
114                           grpc_channel_element_args* args);
115   static void Destroy(grpc_channel_element* elem);
116   static void StartTransportOp(grpc_channel_element* elem,
117                                grpc_transport_op* op);
118   static void GetChannelInfo(grpc_channel_element* elem,
119                              const grpc_channel_info* info);
120
121   bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
122   bool enable_retries() const { return enable_retries_; }
123   size_t per_rpc_retry_buffer_size() const {
124     return per_rpc_retry_buffer_size_;
125   }
126
127   // Note: Does NOT return a new ref.
128   grpc_error* disconnect_error() const {
129     return disconnect_error_.Load(MemoryOrder::ACQUIRE);
130   }
131
132   grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
133
134   LoadBalancingPolicy::SubchannelPicker* picker() const {
135     return picker_.get();
136   }
137   void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
138   void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
139
140   bool received_service_config_data() const {
141     return received_service_config_data_;
142   }
143   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
144     return retry_throttle_data_;
145   }
146   RefCountedPtr<ServiceConfig> service_config() const {
147     return service_config_;
148   }
149
150   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
151   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
152                                       grpc_connectivity_state* state,
153                                       grpc_closure* on_complete,
154                                       grpc_closure* watcher_timer_init) {
155     // Will delete itself.
156     New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
157                                      watcher_timer_init);
158   }
159   int NumExternalConnectivityWatchers() const {
160     return external_connectivity_watcher_list_.size();
161   }
162
163  private:
164   class ConnectivityStateAndPickerSetter;
165   class ServiceConfigSetter;
166   class GrpcSubchannel;
167   class ClientChannelControlHelper;
168
169   class ExternalConnectivityWatcher {
170    public:
171     class WatcherList {
172      public:
173       WatcherList() { gpr_mu_init(&mu_); }
174       ~WatcherList() { gpr_mu_destroy(&mu_); }
175
176       int size() const;
177       ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
178       void Add(ExternalConnectivityWatcher* watcher);
179       void Remove(const ExternalConnectivityWatcher* watcher);
180
181      private:
182       // head_ is guarded by a mutex, since the size() method needs to
183       // iterate over the list, and it's called from the C-core API
184       // function grpc_channel_num_external_connectivity_watchers(), which
185       // is synchronous and therefore cannot run in the combiner.
186       mutable gpr_mu mu_;
187       ExternalConnectivityWatcher* head_ = nullptr;
188     };
189
190     ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
191                                 grpc_connectivity_state* state,
192                                 grpc_closure* on_complete,
193                                 grpc_closure* watcher_timer_init);
194
195     ~ExternalConnectivityWatcher();
196
197    private:
198     static void OnWatchCompleteLocked(void* arg, grpc_error* error);
199     static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
200
201     ChannelData* chand_;
202     grpc_polling_entity pollent_;
203     grpc_connectivity_state* state_;
204     grpc_closure* on_complete_;
205     grpc_closure* watcher_timer_init_;
206     grpc_closure my_closure_;
207     ExternalConnectivityWatcher* next_ = nullptr;
208   };
209
210   ChannelData(grpc_channel_element_args* args, grpc_error** error);
211   ~ChannelData();
212
213   static bool ProcessResolverResultLocked(
214       void* arg, const Resolver::Result& result, const char** lb_policy_name,
215       RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
216       grpc_error** service_config_error);
217
218   grpc_error* DoPingLocked(grpc_transport_op* op);
219
220   static void StartTransportOpLocked(void* arg, grpc_error* ignored);
221
222   static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
223
224   void ProcessLbPolicy(
225       const Resolver::Result& resolver_result,
226       const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
227       UniquePtr<char>* lb_policy_name,
228       RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
229
230   //
231   // Fields set at construction and never modified.
232   //
233   const bool deadline_checking_enabled_;
234   const bool enable_retries_;
235   const size_t per_rpc_retry_buffer_size_;
236   grpc_channel_stack* owning_stack_;
237   ClientChannelFactory* client_channel_factory_;
238   UniquePtr<char> server_name_;
239   RefCountedPtr<ServiceConfig> default_service_config_;
240   channelz::ChannelNode* channelz_node_;
241
242   //
243   // Fields used in the data plane.  Guarded by data_plane_combiner.
244   //
245   grpc_combiner* data_plane_combiner_;
246   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
247   QueuedPick* queued_picks_ = nullptr;  // Linked list of queued picks.
248   // Data from service config.
249   bool received_service_config_data_ = false;
250   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
251   RefCountedPtr<ServiceConfig> service_config_;
252
253   //
254   // Fields used in the control plane.  Guarded by combiner.
255   //
256   grpc_combiner* combiner_;
257   grpc_pollset_set* interested_parties_;
258   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
259   OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
260   grpc_connectivity_state_tracker state_tracker_;
261   ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
262   UniquePtr<char> health_check_service_name_;
263   RefCountedPtr<ServiceConfig> saved_service_config_;
264   bool received_first_resolver_result_ = false;
265   Map<Subchannel*, int> subchannel_refcount_map_;
266
267   //
268   // Fields accessed from both data plane and control plane combiners.
269   //
270   Atomic<grpc_error*> disconnect_error_;
271
272   //
273   // Fields guarded by a mutex, since they need to be accessed
274   // synchronously via get_channel_info().
275   //
276   gpr_mu info_mu_;
277   UniquePtr<char> info_lb_policy_name_;
278   UniquePtr<char> info_service_config_json_;
279 };
280
281 //
282 // CallData definition
283 //
284
285 class CallData {
286  public:
287   static grpc_error* Init(grpc_call_element* elem,
288                           const grpc_call_element_args* args);
289   static void Destroy(grpc_call_element* elem,
290                       const grpc_call_final_info* final_info,
291                       grpc_closure* then_schedule_closure);
292   static void StartTransportStreamOpBatch(
293       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
294   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
295
296   RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
297
298   // Invoked by channel for queued picks once resolver results are available.
299   void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
300
301   // Invoked by channel for queued picks when the picker is updated.
302   static void StartPickLocked(void* arg, grpc_error* error);
303
304  private:
305   class QueuedPickCanceller;
306
307   class LbCallState : public LoadBalancingPolicy::CallState {
308    public:
309     explicit LbCallState(CallData* calld) : calld_(calld) {}
310
311     void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
312
313    private:
314     CallData* calld_;
315   };
316
317   // State used for starting a retryable batch on a subchannel call.
318   // This provides its own grpc_transport_stream_op_batch and other data
319   // structures needed to populate the ops in the batch.
320   // We allocate one struct on the arena for each attempt at starting a
321   // batch on a given subchannel call.
322   struct SubchannelCallBatchData {
323     // Creates a SubchannelCallBatchData object on the call's arena with the
324     // specified refcount.  If set_on_complete is true, the batch's
325     // on_complete callback will be set to point to on_complete();
326     // otherwise, the batch's on_complete callback will be null.
327     static SubchannelCallBatchData* Create(grpc_call_element* elem,
328                                            int refcount, bool set_on_complete);
329
330     void Unref() {
331       if (gpr_unref(&refs)) Destroy();
332     }
333
334     SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
335                             int refcount, bool set_on_complete);
336     // All dtor code must be added in `Destroy()`. This is because we may
337     // call closures in `SubchannelCallBatchData` after they are unrefed by
338     // `Unref()`, and msan would complain about accessing this class
339     // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
340     // TODO(soheil): We should try to call the dtor in `Unref()`.
341     ~SubchannelCallBatchData() { Destroy(); }
342     void Destroy();
343
344     gpr_refcount refs;
345     grpc_call_element* elem;
346     RefCountedPtr<SubchannelCall> subchannel_call;
347     // The batch to use in the subchannel call.
348     // Its payload field points to SubchannelCallRetryState::batch_payload.
349     grpc_transport_stream_op_batch batch;
350     // For intercepting on_complete.
351     grpc_closure on_complete;
352   };
353
354   // Retry state associated with a subchannel call.
355   // Stored in the parent_data of the subchannel call object.
356   struct SubchannelCallRetryState {
357     explicit SubchannelCallRetryState(grpc_call_context_element* context)
358         : batch_payload(context),
359           started_send_initial_metadata(false),
360           completed_send_initial_metadata(false),
361           started_send_trailing_metadata(false),
362           completed_send_trailing_metadata(false),
363           started_recv_initial_metadata(false),
364           completed_recv_initial_metadata(false),
365           started_recv_trailing_metadata(false),
366           completed_recv_trailing_metadata(false),
367           retry_dispatched(false) {}
368
369     // SubchannelCallBatchData.batch.payload points to this.
370     grpc_transport_stream_op_batch_payload batch_payload;
371     // For send_initial_metadata.
372     // Note that we need to make a copy of the initial metadata for each
373     // subchannel call instead of just referring to the copy in call_data,
374     // because filters in the subchannel stack will probably add entries,
375     // so we need to start in a pristine state for each attempt of the call.
376     grpc_linked_mdelem* send_initial_metadata_storage;
377     grpc_metadata_batch send_initial_metadata;
378     // For send_message.
379     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
380     ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
381     // For send_trailing_metadata.
382     grpc_linked_mdelem* send_trailing_metadata_storage;
383     grpc_metadata_batch send_trailing_metadata;
384     // For intercepting recv_initial_metadata.
385     grpc_metadata_batch recv_initial_metadata;
386     grpc_closure recv_initial_metadata_ready;
387     bool trailing_metadata_available = false;
388     // For intercepting recv_message.
389     grpc_closure recv_message_ready;
390     OrphanablePtr<ByteStream> recv_message;
391     // For intercepting recv_trailing_metadata.
392     grpc_metadata_batch recv_trailing_metadata;
393     grpc_transport_stream_stats collect_stats;
394     grpc_closure recv_trailing_metadata_ready;
395     // These fields indicate which ops have been started and completed on
396     // this subchannel call.
397     size_t started_send_message_count = 0;
398     size_t completed_send_message_count = 0;
399     size_t started_recv_message_count = 0;
400     size_t completed_recv_message_count = 0;
401     bool started_send_initial_metadata : 1;
402     bool completed_send_initial_metadata : 1;
403     bool started_send_trailing_metadata : 1;
404     bool completed_send_trailing_metadata : 1;
405     bool started_recv_initial_metadata : 1;
406     bool completed_recv_initial_metadata : 1;
407     bool started_recv_trailing_metadata : 1;
408     bool completed_recv_trailing_metadata : 1;
409     // State for callback processing.
410     SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
411         nullptr;
412     grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
413     SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
414     grpc_error* recv_message_error = GRPC_ERROR_NONE;
415     SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
416     // NOTE: Do not move this next to the metadata bitfields above. That would
417     //       save space but will also result in a data race because compiler
418     //       will generate a 2 byte store which overwrites the meta-data
419     //       fields upon setting this field.
420     bool retry_dispatched : 1;
421   };
422
423   // Pending batches stored in call data.
424   struct PendingBatch {
425     // The pending batch.  If nullptr, this slot is empty.
426     grpc_transport_stream_op_batch* batch;
427     // Indicates whether payload for send ops has been cached in CallData.
428     bool send_ops_cached;
429   };
430
431   CallData(grpc_call_element* elem, const ChannelData& chand,
432            const grpc_call_element_args& args);
433   ~CallData();
434
435   // Caches data for send ops so that it can be retried later, if not
436   // already cached.
437   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
438   void FreeCachedSendInitialMetadata(ChannelData* chand);
439   // Frees cached send_message at index idx.
440   void FreeCachedSendMessage(ChannelData* chand, size_t idx);
441   void FreeCachedSendTrailingMetadata(ChannelData* chand);
442   // Frees cached send ops that have already been completed after
443   // committing the call.
444   void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
445                                        SubchannelCallRetryState* retry_state);
446   // Frees cached send ops that were completed by the completed batch in
447   // batch_data.  Used when batches are completed after the call is committed.
448   void FreeCachedSendOpDataForCompletedBatch(
449       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
450       SubchannelCallRetryState* retry_state);
451
452   static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
453       void* arg, grpc_error* error);
454   void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
455       grpc_transport_stream_op_batch* batch);
456
457   // Returns the index into pending_batches_ to be used for batch.
458   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
459   void PendingBatchesAdd(grpc_call_element* elem,
460                          grpc_transport_stream_op_batch* batch);
461   void PendingBatchClear(PendingBatch* pending);
462   void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
463   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
464   // A predicate type and some useful implementations for PendingBatchesFail().
465   typedef bool (*YieldCallCombinerPredicate)(
466       const CallCombinerClosureList& closures);
467   static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
468     return true;
469   }
470   static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
471     return false;
472   }
473   static bool YieldCallCombinerIfPendingBatchesFound(
474       const CallCombinerClosureList& closures) {
475     return closures.size() > 0;
476   }
477   // Fails all pending batches.
478   // If yield_call_combiner_predicate returns true, assumes responsibility for
479   // yielding the call combiner.
480   void PendingBatchesFail(
481       grpc_call_element* elem, grpc_error* error,
482       YieldCallCombinerPredicate yield_call_combiner_predicate);
483   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
484   // Resumes all pending batches on subchannel_call_.
485   void PendingBatchesResume(grpc_call_element* elem);
486   // Returns a pointer to the first pending batch for which predicate(batch)
487   // returns true, or null if not found.
488   template <typename Predicate>
489   PendingBatch* PendingBatchFind(grpc_call_element* elem,
490                                  const char* log_message, Predicate predicate);
491
492   // Commits the call so that no further retry attempts will be performed.
493   void RetryCommit(grpc_call_element* elem,
494                    SubchannelCallRetryState* retry_state);
495   // Starts a retry after appropriate back-off.
496   void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
497                grpc_millis server_pushback_ms);
498   // Returns true if the call is being retried.
499   bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
500                   grpc_status_code status, grpc_mdelem* server_pushback_md);
501
502   // Invokes recv_initial_metadata_ready for a subchannel batch.
503   static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
504   // Intercepts recv_initial_metadata_ready callback for retries.
505   // Commits the call and returns the initial metadata up the stack.
506   static void RecvInitialMetadataReady(void* arg, grpc_error* error);
507
508   // Invokes recv_message_ready for a subchannel batch.
509   static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
510   // Intercepts recv_message_ready callback for retries.
511   // Commits the call and returns the message up the stack.
512   static void RecvMessageReady(void* arg, grpc_error* error);
513
514   // Sets *status and *server_pushback_md based on md_batch and error.
515   // Only sets *server_pushback_md if server_pushback_md != nullptr.
516   void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
517                      grpc_error* error, grpc_status_code* status,
518                      grpc_mdelem** server_pushback_md);
519   // Adds recv_trailing_metadata_ready closure to closures.
520   void AddClosureForRecvTrailingMetadataReady(
521       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
522       grpc_error* error, CallCombinerClosureList* closures);
523   // Adds any necessary closures for deferred recv_initial_metadata and
524   // recv_message callbacks to closures.
525   static void AddClosuresForDeferredRecvCallbacks(
526       SubchannelCallBatchData* batch_data,
527       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
528   // Returns true if any op in the batch was not yet started.
529   // Only looks at send ops, since recv ops are always started immediately.
530   bool PendingBatchIsUnstarted(PendingBatch* pending,
531                                SubchannelCallRetryState* retry_state);
532   // For any pending batch containing an op that has not yet been started,
533   // adds the pending batch's completion closures to closures.
534   void AddClosuresToFailUnstartedPendingBatches(
535       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
536       grpc_error* error, CallCombinerClosureList* closures);
537   // Runs necessary closures upon completion of a call attempt.
538   void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
539                                    grpc_error* error);
540   // Intercepts recv_trailing_metadata_ready callback for retries.
541   // Commits the call and returns the trailing metadata up the stack.
542   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
543
544   // Adds the on_complete closure for the pending batch completed in
545   // batch_data to closures.
546   void AddClosuresForCompletedPendingBatch(
547       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
548       SubchannelCallRetryState* retry_state, grpc_error* error,
549       CallCombinerClosureList* closures);
550
551   // If there are any cached ops to replay or pending ops to start on the
552   // subchannel call, adds a closure to closures to invoke
553   // StartRetriableSubchannelBatches().
554   void AddClosuresForReplayOrPendingSendOps(
555       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
556       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
557
558   // Callback used to intercept on_complete from subchannel calls.
559   // Called only when retries are enabled.
560   static void OnComplete(void* arg, grpc_error* error);
561
562   static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
563   // Adds a closure to closures that will execute batch in the call combiner.
564   void AddClosureForSubchannelBatch(grpc_call_element* elem,
565                                     grpc_transport_stream_op_batch* batch,
566                                     CallCombinerClosureList* closures);
567   // Adds retriable send_initial_metadata op to batch_data.
568   void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
569                                          SubchannelCallBatchData* batch_data);
570   // Adds retriable send_message op to batch_data.
571   void AddRetriableSendMessageOp(grpc_call_element* elem,
572                                  SubchannelCallRetryState* retry_state,
573                                  SubchannelCallBatchData* batch_data);
574   // Adds retriable send_trailing_metadata op to batch_data.
575   void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
576                                           SubchannelCallBatchData* batch_data);
577   // Adds retriable recv_initial_metadata op to batch_data.
578   void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
579                                          SubchannelCallBatchData* batch_data);
580   // Adds retriable recv_message op to batch_data.
581   void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
582                                  SubchannelCallBatchData* batch_data);
583   // Adds retriable recv_trailing_metadata op to batch_data.
584   void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
585                                           SubchannelCallBatchData* batch_data);
586   // Helper function used to start a recv_trailing_metadata batch.  This
587   // is used in the case where a recv_initial_metadata or recv_message
588   // op fails in a way that we know the call is over but when the application
589   // has not yet started its own recv_trailing_metadata op.
590   void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
591   // If there are any cached send ops that need to be replayed on the
592   // current subchannel call, creates and returns a new subchannel batch
593   // to replay those ops.  Otherwise, returns nullptr.
594   SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
595       grpc_call_element* elem, SubchannelCallRetryState* retry_state);
596   // Adds subchannel batches for pending batches to closures.
597   void AddSubchannelBatchesForPendingBatches(
598       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
599       CallCombinerClosureList* closures);
600   // Constructs and starts whatever subchannel batches are needed on the
601   // subchannel call.
602   static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
603
604   void CreateSubchannelCall(grpc_call_element* elem);
605   // Invoked when a pick is completed, on both success or failure.
606   static void PickDone(void* arg, grpc_error* error);
607   // Removes the call from the channel's list of queued picks.
608   void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
609   // Adds the call to the channel's list of queued picks.
610   void AddCallToQueuedPicksLocked(grpc_call_element* elem);
611   // Applies service config to the call.  Must be invoked once we know
612   // that the resolver has returned results to the channel.
613   void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
614
615   // State for handling deadlines.
616   // The code in deadline_filter.c requires this to be the first field.
617   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
618   // and this struct both independently store pointers to the call stack
619   // and call combiner.  If/when we have time, find a way to avoid this
620   // without breaking the grpc_deadline_state abstraction.
621   grpc_deadline_state deadline_state_;
622
623   grpc_slice path_;  // Request path.
624   gpr_timespec call_start_time_;
625   grpc_millis deadline_;
626   Arena* arena_;
627   grpc_call_stack* owning_call_;
628   CallCombiner* call_combiner_;
629   grpc_call_context_element* call_context_;
630
631   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
632   ServiceConfig::CallData service_config_call_data_;
633   const ClientChannelMethodParsedConfig* method_params_ = nullptr;
634
635   RefCountedPtr<SubchannelCall> subchannel_call_;
636
637   // Set when we get a cancel_stream op.
638   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
639
640   ChannelData::QueuedPick pick_;
641   bool pick_queued_ = false;
642   bool service_config_applied_ = false;
643   QueuedPickCanceller* pick_canceller_ = nullptr;
644   LbCallState lb_call_state_;
645   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
646   void (*lb_recv_trailing_metadata_ready_)(
647       void* user_data, grpc_metadata_batch* recv_trailing_metadata,
648       LoadBalancingPolicy::CallState* call_state) = nullptr;
649   void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
650   grpc_closure pick_closure_;
651
652   // For intercepting recv_trailing_metadata_ready for the LB policy.
653   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
654   grpc_closure recv_trailing_metadata_ready_;
655   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
656
657   grpc_polling_entity* pollent_ = nullptr;
658
659   // Batches are added to this list when received from above.
660   // They are removed when we are done handling the batch (i.e., when
661   // either we have invoked all of the batch's callbacks or we have
662   // passed the batch down to the subchannel call and are not
663   // intercepting any of its callbacks).
664   PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
665   bool pending_send_initial_metadata_ : 1;
666   bool pending_send_message_ : 1;
667   bool pending_send_trailing_metadata_ : 1;
668
669   // Retry state.
670   bool enable_retries_ : 1;
671   bool retry_committed_ : 1;
672   bool last_attempt_got_server_pushback_ : 1;
673   int num_attempts_completed_ = 0;
674   size_t bytes_buffered_for_retry_ = 0;
675   // TODO(roth): Restructure this to eliminate use of ManualConstructor.
676   ManualConstructor<BackOff> retry_backoff_;
677   grpc_timer retry_timer_;
678
679   // The number of pending retriable subchannel batches containing send ops.
680   // We hold a ref to the call stack while this is non-zero, since replay
681   // batches may not complete until after all callbacks have been returned
682   // to the surface, and we need to make sure that the call is not destroyed
683   // until all of these batches have completed.
684   // Note that we actually only need to track replay batches, but it's
685   // easier to track all batches with send ops.
686   int num_pending_retriable_subchannel_send_batches_ = 0;
687
688   // Cached data for retrying send ops.
689   // send_initial_metadata
690   bool seen_send_initial_metadata_ = false;
691   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
692   grpc_metadata_batch send_initial_metadata_;
693   uint32_t send_initial_metadata_flags_;
694   gpr_atm* peer_string_;
695   // send_message
696   // When we get a send_message op, we replace the original byte stream
697   // with a CachingByteStream that caches the slices to a local buffer for
698   // use in retries.
699   // Note: We inline the cache for the first 3 send_message ops and use
700   // dynamic allocation after that.  This number was essentially picked
701   // at random; it could be changed in the future to tune performance.
702   InlinedVector<ByteStreamCache*, 3> send_messages_;
703   // send_trailing_metadata
704   bool seen_send_trailing_metadata_ = false;
705   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
706   grpc_metadata_batch send_trailing_metadata_;
707 };
708
709 //
710 // ChannelData::ConnectivityStateAndPickerSetter
711 //
712
713 // A fire-and-forget class that sets the channel's connectivity state
714 // and then hops into the data plane combiner to update the picker.
715 // Must be instantiated while holding the control plane combiner.
716 // Deletes itself when done.
717 class ChannelData::ConnectivityStateAndPickerSetter {
718  public:
719   ConnectivityStateAndPickerSetter(
720       ChannelData* chand, grpc_connectivity_state state, const char* reason,
721       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
722       : chand_(chand), picker_(std::move(picker)) {
723     // Update connectivity state here, while holding control plane combiner.
724     grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
725     if (chand->channelz_node_ != nullptr) {
726       chand->channelz_node_->SetConnectivityState(state);
727       chand->channelz_node_->AddTraceEvent(
728           channelz::ChannelTrace::Severity::Info,
729           grpc_slice_from_static_string(
730               GetChannelConnectivityStateChangeString(state)));
731     }
732     // Bounce into the data plane combiner to reset the picker.
733     GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
734                            "ConnectivityStateAndPickerSetter");
735     GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
736                       grpc_combiner_scheduler(chand->data_plane_combiner_));
737     GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
738   }
739
740  private:
741   static const char* GetChannelConnectivityStateChangeString(
742       grpc_connectivity_state state) {
743     switch (state) {
744       case GRPC_CHANNEL_IDLE:
745         return "Channel state change to IDLE";
746       case GRPC_CHANNEL_CONNECTING:
747         return "Channel state change to CONNECTING";
748       case GRPC_CHANNEL_READY:
749         return "Channel state change to READY";
750       case GRPC_CHANNEL_TRANSIENT_FAILURE:
751         return "Channel state change to TRANSIENT_FAILURE";
752       case GRPC_CHANNEL_SHUTDOWN:
753         return "Channel state change to SHUTDOWN";
754     }
755     GPR_UNREACHABLE_CODE(return "UNKNOWN");
756   }
757
758   static void SetPicker(void* arg, grpc_error* ignored) {
759     auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
760     // Update picker.
761     self->chand_->picker_ = std::move(self->picker_);
762     // Re-process queued picks.
763     for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
764          pick = pick->next) {
765       CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
766     }
767     // Clean up.
768     GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
769                              "ConnectivityStateAndPickerSetter");
770     Delete(self);
771   }
772
773   ChannelData* chand_;
774   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
775   grpc_closure closure_;
776 };
777
778 //
779 // ChannelData::ServiceConfigSetter
780 //
781
782 // A fire-and-forget class that sets the channel's service config data
783 // in the data plane combiner.  Deletes itself when done.
784 class ChannelData::ServiceConfigSetter {
785  public:
786   ServiceConfigSetter(
787       ChannelData* chand,
788       Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
789           retry_throttle_data,
790       RefCountedPtr<ServiceConfig> service_config)
791       : chand_(chand),
792         retry_throttle_data_(retry_throttle_data),
793         service_config_(std::move(service_config)) {
794     GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
795     GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
796                       grpc_combiner_scheduler(chand->data_plane_combiner_));
797     GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
798   }
799
800  private:
801   static void SetServiceConfigData(void* arg, grpc_error* ignored) {
802     ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
803     ChannelData* chand = self->chand_;
804     // Update channel state.
805     chand->received_service_config_data_ = true;
806     if (self->retry_throttle_data_.has_value()) {
807       chand->retry_throttle_data_ =
808           internal::ServerRetryThrottleMap::GetDataForServer(
809               chand->server_name_.get(),
810               self->retry_throttle_data_.value().max_milli_tokens,
811               self->retry_throttle_data_.value().milli_token_ratio);
812     }
813     chand->service_config_ = std::move(self->service_config_);
814     // Apply service config to queued picks.
815     for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
816          pick = pick->next) {
817       CallData* calld = static_cast<CallData*>(pick->elem->call_data);
818       calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
819     }
820     // Clean up.
821     GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
822                              "ServiceConfigSetter");
823     Delete(self);
824   }
825
826   ChannelData* chand_;
827   Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
828       retry_throttle_data_;
829   RefCountedPtr<ServiceConfig> service_config_;
830   grpc_closure closure_;
831 };
832
833 //
834 // ChannelData::ExternalConnectivityWatcher::WatcherList
835 //
836
837 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
838   MutexLock lock(&mu_);
839   int count = 0;
840   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
841     ++count;
842   }
843   return count;
844 }
845
846 ChannelData::ExternalConnectivityWatcher*
847 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
848     grpc_closure* on_complete) const {
849   MutexLock lock(&mu_);
850   ExternalConnectivityWatcher* w = head_;
851   while (w != nullptr && w->on_complete_ != on_complete) {
852     w = w->next_;
853   }
854   return w;
855 }
856
857 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
858     ExternalConnectivityWatcher* watcher) {
859   GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
860   MutexLock lock(&mu_);
861   GPR_ASSERT(watcher->next_ == nullptr);
862   watcher->next_ = head_;
863   head_ = watcher;
864 }
865
866 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
867     const ExternalConnectivityWatcher* watcher) {
868   MutexLock lock(&mu_);
869   if (watcher == head_) {
870     head_ = watcher->next_;
871     return;
872   }
873   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
874     if (w->next_ == watcher) {
875       w->next_ = w->next_->next_;
876       return;
877     }
878   }
879   GPR_UNREACHABLE_CODE(return );
880 }
881
882 //
883 // ChannelData::ExternalConnectivityWatcher
884 //
885
886 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
887     ChannelData* chand, grpc_polling_entity pollent,
888     grpc_connectivity_state* state, grpc_closure* on_complete,
889     grpc_closure* watcher_timer_init)
890     : chand_(chand),
891       pollent_(pollent),
892       state_(state),
893       on_complete_(on_complete),
894       watcher_timer_init_(watcher_timer_init) {
895   grpc_polling_entity_add_to_pollset_set(&pollent_,
896                                          chand_->interested_parties_);
897   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
898   GRPC_CLOSURE_SCHED(
899       GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
900                         grpc_combiner_scheduler(chand_->combiner_)),
901       GRPC_ERROR_NONE);
902 }
903
904 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
905   grpc_polling_entity_del_from_pollset_set(&pollent_,
906                                            chand_->interested_parties_);
907   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
908                            "ExternalConnectivityWatcher");
909 }
910
911 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
912     void* arg, grpc_error* error) {
913   ExternalConnectivityWatcher* self =
914       static_cast<ExternalConnectivityWatcher*>(arg);
915   grpc_closure* on_complete = self->on_complete_;
916   self->chand_->external_connectivity_watcher_list_.Remove(self);
917   Delete(self);
918   GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
919 }
920
921 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
922     void* arg, grpc_error* ignored) {
923   ExternalConnectivityWatcher* self =
924       static_cast<ExternalConnectivityWatcher*>(arg);
925   if (self->state_ == nullptr) {
926     // Handle cancellation.
927     GPR_ASSERT(self->watcher_timer_init_ == nullptr);
928     ExternalConnectivityWatcher* found =
929         self->chand_->external_connectivity_watcher_list_.Lookup(
930             self->on_complete_);
931     if (found != nullptr) {
932       grpc_connectivity_state_notify_on_state_change(
933           &found->chand_->state_tracker_, nullptr, &found->my_closure_);
934     }
935     Delete(self);
936     return;
937   }
938   // New watcher.
939   self->chand_->external_connectivity_watcher_list_.Add(self);
940   // This assumes that the closure is scheduled on the ExecCtx scheduler
941   // and that GRPC_CLOSURE_RUN would run the closure immediately.
942   GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
943   GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
944                     grpc_combiner_scheduler(self->chand_->combiner_));
945   grpc_connectivity_state_notify_on_state_change(
946       &self->chand_->state_tracker_, self->state_, &self->my_closure_);
947 }
948
949 //
950 // ChannelData::GrpcSubchannel
951 //
952
953 // This class is a wrapper for Subchannel that hides details of the
954 // channel's implementation (such as the health check service name) from
955 // the LB policy API.
956 //
957 // Note that no synchronization is needed here, because even if the
958 // underlying subchannel is shared between channels, this wrapper will only
959 // be used within one channel, so it will always be synchronized by the
960 // control plane combiner.
961 class ChannelData::GrpcSubchannel : public SubchannelInterface {
962  public:
963   GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
964                  UniquePtr<char> health_check_service_name)
965       : chand_(chand),
966         subchannel_(subchannel),
967         health_check_service_name_(std::move(health_check_service_name)) {
968     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
969     auto* subchannel_node = subchannel_->channelz_node();
970     if (subchannel_node != nullptr) {
971       intptr_t subchannel_uuid = subchannel_node->uuid();
972       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
973       if (it == chand_->subchannel_refcount_map_.end()) {
974         chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
975         it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
976       }
977       ++it->second;
978     }
979   }
980
981   ~GrpcSubchannel() {
982     auto* subchannel_node = subchannel_->channelz_node();
983     if (subchannel_node != nullptr) {
984       intptr_t subchannel_uuid = subchannel_node->uuid();
985       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
986       GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
987       --it->second;
988       if (it->second == 0) {
989         chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
990         chand_->subchannel_refcount_map_.erase(it);
991       }
992     }
993     GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
994     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
995   }
996
997   grpc_connectivity_state CheckConnectivityState(
998       RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
999       override {
1000     RefCountedPtr<ConnectedSubchannel> tmp;
1001     auto retval = subchannel_->CheckConnectivityState(
1002         health_check_service_name_.get(), &tmp);
1003     *connected_subchannel = std::move(tmp);
1004     return retval;
1005   }
1006
1007   void WatchConnectivityState(
1008       grpc_connectivity_state initial_state,
1009       UniquePtr<ConnectivityStateWatcher> watcher) override {
1010     subchannel_->WatchConnectivityState(
1011         initial_state,
1012         UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
1013         std::move(watcher));
1014   }
1015
1016   void CancelConnectivityStateWatch(
1017       ConnectivityStateWatcher* watcher) override {
1018     subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
1019                                               watcher);
1020   }
1021
1022   void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1023
1024   void ResetBackoff() override { subchannel_->ResetBackoff(); }
1025
1026  private:
1027   ChannelData* chand_;
1028   Subchannel* subchannel_;
1029   UniquePtr<char> health_check_service_name_;
1030 };
1031
1032 //
1033 // ChannelData::ClientChannelControlHelper
1034 //
1035
1036 class ChannelData::ClientChannelControlHelper
1037     : public LoadBalancingPolicy::ChannelControlHelper {
1038  public:
1039   explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1040     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1041   }
1042
1043   ~ClientChannelControlHelper() override {
1044     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1045                              "ClientChannelControlHelper");
1046   }
1047
1048   RefCountedPtr<SubchannelInterface> CreateSubchannel(
1049       const grpc_channel_args& args) override {
1050     bool inhibit_health_checking = grpc_channel_arg_get_bool(
1051         grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1052     UniquePtr<char> health_check_service_name;
1053     if (!inhibit_health_checking) {
1054       health_check_service_name.reset(
1055           gpr_strdup(chand_->health_check_service_name_.get()));
1056     }
1057     static const char* args_to_remove[] = {
1058         GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1059         GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1060     };
1061     grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
1062         chand_->subchannel_pool_.get());
1063     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1064         &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
1065     Subchannel* subchannel =
1066         chand_->client_channel_factory_->CreateSubchannel(new_args);
1067     grpc_channel_args_destroy(new_args);
1068     if (subchannel == nullptr) return nullptr;
1069     return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
1070                                           std::move(health_check_service_name));
1071   }
1072
1073   grpc_channel* CreateChannel(const char* target,
1074                               const grpc_channel_args& args) override {
1075     return chand_->client_channel_factory_->CreateChannel(target, &args);
1076   }
1077
1078   void UpdateState(
1079       grpc_connectivity_state state,
1080       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1081     grpc_error* disconnect_error =
1082         chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
1083     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1084       const char* extra = disconnect_error == GRPC_ERROR_NONE
1085                               ? ""
1086                               : " (ignoring -- channel shutting down)";
1087       gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
1088               grpc_connectivity_state_name(state), picker.get(), extra);
1089     }
1090     // Do update only if not shutting down.
1091     if (disconnect_error == GRPC_ERROR_NONE) {
1092       // Will delete itself.
1093       New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
1094                                             std::move(picker));
1095     }
1096   }
1097
1098   // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
1099   void RequestReresolution() override {}
1100
1101   void AddTraceEvent(TraceSeverity severity, const char* message) override {
1102     if (chand_->channelz_node_ != nullptr) {
1103       chand_->channelz_node_->AddTraceEvent(
1104           ConvertSeverityEnum(severity),
1105           grpc_slice_from_copied_string(message));
1106     }
1107   }
1108
1109  private:
1110   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1111       TraceSeverity severity) {
1112     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1113     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1114     return channelz::ChannelTrace::Error;
1115   }
1116
1117   ChannelData* chand_;
1118 };
1119
1120 //
1121 // ChannelData implementation
1122 //
1123
1124 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1125                               grpc_channel_element_args* args) {
1126   GPR_ASSERT(args->is_last);
1127   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1128   grpc_error* error = GRPC_ERROR_NONE;
1129   new (elem->channel_data) ChannelData(args, &error);
1130   return error;
1131 }
1132
1133 void ChannelData::Destroy(grpc_channel_element* elem) {
1134   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1135   chand->~ChannelData();
1136 }
1137
1138 bool GetEnableRetries(const grpc_channel_args* args) {
1139   return grpc_channel_arg_get_bool(
1140       grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1141 }
1142
1143 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1144   return static_cast<size_t>(grpc_channel_arg_get_integer(
1145       grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1146       {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1147 }
1148
1149 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1150     const grpc_channel_args* args) {
1151   const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1152       grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1153   if (use_local_subchannel_pool) {
1154     return MakeRefCounted<LocalSubchannelPool>();
1155   }
1156   return GlobalSubchannelPool::instance();
1157 }
1158
1159 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1160   const grpc_arg* arg =
1161       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1162   if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1163     return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1164   }
1165   return nullptr;
1166 }
1167
1168 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1169     : deadline_checking_enabled_(
1170           grpc_deadline_checking_enabled(args->channel_args)),
1171       enable_retries_(GetEnableRetries(args->channel_args)),
1172       per_rpc_retry_buffer_size_(
1173           GetMaxPerRpcRetryBufferSize(args->channel_args)),
1174       owning_stack_(args->channel_stack),
1175       client_channel_factory_(
1176           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1177       channelz_node_(GetChannelzNode(args->channel_args)),
1178       data_plane_combiner_(grpc_combiner_create()),
1179       combiner_(grpc_combiner_create()),
1180       interested_parties_(grpc_pollset_set_create()),
1181       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1182       disconnect_error_(GRPC_ERROR_NONE) {
1183   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1184     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1185             this, owning_stack_);
1186   }
1187   // Initialize data members.
1188   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1189                                "client_channel");
1190   gpr_mu_init(&info_mu_);
1191   // Start backup polling.
1192   grpc_client_channel_start_backup_polling(interested_parties_);
1193   // Check client channel factory.
1194   if (client_channel_factory_ == nullptr) {
1195     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1196         "Missing client channel factory in args for client channel filter");
1197     return;
1198   }
1199   // Get server name to resolve, using proxy mapper if needed.
1200   const char* server_uri = grpc_channel_arg_get_string(
1201       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1202   if (server_uri == nullptr) {
1203     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1204         "server URI channel arg missing or wrong type in client channel "
1205         "filter");
1206     return;
1207   }
1208   // Get default service config
1209   const char* service_config_json = grpc_channel_arg_get_string(
1210       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1211   if (service_config_json != nullptr) {
1212     *error = GRPC_ERROR_NONE;
1213     default_service_config_ = ServiceConfig::Create(service_config_json, error);
1214     if (*error != GRPC_ERROR_NONE) {
1215       default_service_config_.reset();
1216       return;
1217     }
1218   }
1219   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1220   if (uri != nullptr && uri->path[0] != '\0') {
1221     server_name_.reset(
1222         gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1223   }
1224   grpc_uri_destroy(uri);
1225   char* proxy_name = nullptr;
1226   grpc_channel_args* new_args = nullptr;
1227   grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1228                               &new_args);
1229   UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
1230                                                    : gpr_strdup(server_uri));
1231   // Instantiate resolving LB policy.
1232   LoadBalancingPolicy::Args lb_args;
1233   lb_args.combiner = combiner_;
1234   lb_args.channel_control_helper =
1235       UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1236           New<ClientChannelControlHelper>(this));
1237   lb_args.args = new_args != nullptr ? new_args : args->channel_args;
1238   resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1239       std::move(lb_args), &grpc_client_channel_routing_trace,
1240       std::move(target_uri), ProcessResolverResultLocked, this, error));
1241   grpc_channel_args_destroy(new_args);
1242   if (*error != GRPC_ERROR_NONE) {
1243     // Orphan the resolving LB policy and flush the exec_ctx to ensure
1244     // that it finishes shutting down.  This ensures that if we are
1245     // failing, we destroy the ClientChannelControlHelper (and thus
1246     // unref the channel stack) before we return.
1247     // TODO(roth): This is not a complete solution, because it only
1248     // catches the case where channel stack initialization fails in this
1249     // particular filter.  If there is a failure in a different filter, we
1250     // will leave a dangling ref here, which can cause a crash.  Fortunately,
1251     // in practice, there are no other filters that can cause failures in
1252     // channel stack initialization, so this works for now.
1253     resolving_lb_policy_.reset();
1254     ExecCtx::Get()->Flush();
1255   } else {
1256     grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1257                                      interested_parties_);
1258     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1259       gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1260               resolving_lb_policy_.get());
1261     }
1262   }
1263 }
1264
1265 ChannelData::~ChannelData() {
1266   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1267     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1268   }
1269   if (resolving_lb_policy_ != nullptr) {
1270     grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1271                                      interested_parties_);
1272     resolving_lb_policy_.reset();
1273   }
1274   // Stop backup polling.
1275   grpc_client_channel_stop_backup_polling(interested_parties_);
1276   grpc_pollset_set_destroy(interested_parties_);
1277   GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
1278   GRPC_COMBINER_UNREF(combiner_, "client_channel");
1279   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1280   grpc_connectivity_state_destroy(&state_tracker_);
1281   gpr_mu_destroy(&info_mu_);
1282 }
1283
1284 void ChannelData::ProcessLbPolicy(
1285     const Resolver::Result& resolver_result,
1286     const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1287     UniquePtr<char>* lb_policy_name,
1288     RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
1289   // Prefer the LB policy name found in the service config.
1290   if (parsed_service_config != nullptr &&
1291       parsed_service_config->parsed_lb_config() != nullptr) {
1292     lb_policy_name->reset(
1293         gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1294     *lb_policy_config = parsed_service_config->parsed_lb_config();
1295     return;
1296   }
1297   const char* local_policy_name = nullptr;
1298   if (parsed_service_config != nullptr &&
1299       parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1300     local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1301   } else {
1302     const grpc_arg* channel_arg =
1303         grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1304     local_policy_name = grpc_channel_arg_get_string(channel_arg);
1305   }
1306   // Special case: If at least one balancer address is present, we use
1307   // the grpclb policy, regardless of what the resolver has returned.
1308   bool found_balancer_address = false;
1309   for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1310     const ServerAddress& address = resolver_result.addresses[i];
1311     if (address.IsBalancer()) {
1312       found_balancer_address = true;
1313       break;
1314     }
1315   }
1316   if (found_balancer_address) {
1317     if (local_policy_name != nullptr &&
1318         strcmp(local_policy_name, "grpclb") != 0) {
1319       gpr_log(GPR_INFO,
1320               "resolver requested LB policy %s but provided at least one "
1321               "balancer address -- forcing use of grpclb LB policy",
1322               local_policy_name);
1323     }
1324     local_policy_name = "grpclb";
1325   }
1326   // Use pick_first if nothing was specified and we didn't select grpclb
1327   // above.
1328   lb_policy_name->reset(gpr_strdup(
1329       local_policy_name == nullptr ? "pick_first" : local_policy_name));
1330 }
1331
1332 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1333 // resolver result update.
1334 bool ChannelData::ProcessResolverResultLocked(
1335     void* arg, const Resolver::Result& result, const char** lb_policy_name,
1336     RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
1337     grpc_error** service_config_error) {
1338   ChannelData* chand = static_cast<ChannelData*>(arg);
1339   RefCountedPtr<ServiceConfig> service_config;
1340   // If resolver did not return a service config or returned an invalid service
1341   // config, we need a fallback service config.
1342   if (result.service_config_error != GRPC_ERROR_NONE) {
1343     // If the service config was invalid, then fallback to the saved service
1344     // config. If there is no saved config either, use the default service
1345     // config.
1346     if (chand->saved_service_config_ != nullptr) {
1347       service_config = chand->saved_service_config_;
1348       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1349         gpr_log(GPR_INFO,
1350                 "chand=%p: resolver returned invalid service config. "
1351                 "Continuing to use previous service config.",
1352                 chand);
1353       }
1354     } else if (chand->default_service_config_ != nullptr) {
1355       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1356         gpr_log(GPR_INFO,
1357                 "chand=%p: resolver returned invalid service config. Using "
1358                 "default service config provided by client API.",
1359                 chand);
1360       }
1361       service_config = chand->default_service_config_;
1362     }
1363   } else if (result.service_config == nullptr) {
1364     if (chand->default_service_config_ != nullptr) {
1365       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1366         gpr_log(GPR_INFO,
1367                 "chand=%p: resolver returned no service config. Using default "
1368                 "service config provided by client API.",
1369                 chand);
1370       }
1371       service_config = chand->default_service_config_;
1372     }
1373   } else {
1374     service_config = result.service_config;
1375   }
1376   *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1377   if (service_config == nullptr &&
1378       result.service_config_error != GRPC_ERROR_NONE) {
1379     return false;
1380   }
1381   // Process service config.
1382   UniquePtr<char> service_config_json;
1383   const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1384       nullptr;
1385   if (service_config != nullptr) {
1386     parsed_service_config =
1387         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1388             service_config->GetGlobalParsedConfig(
1389                 internal::ClientChannelServiceConfigParser::ParserIndex()));
1390   }
1391   // Check if the config has changed.
1392   const bool service_config_changed =
1393       ((service_config == nullptr) !=
1394        (chand->saved_service_config_ == nullptr)) ||
1395       (service_config != nullptr &&
1396        strcmp(service_config->service_config_json(),
1397               chand->saved_service_config_->service_config_json()) != 0);
1398   if (service_config_changed) {
1399     service_config_json.reset(gpr_strdup(
1400         service_config != nullptr ? service_config->service_config_json()
1401                                   : ""));
1402     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1403       gpr_log(GPR_INFO,
1404               "chand=%p: resolver returned updated service config: \"%s\"",
1405               chand, service_config_json.get());
1406     }
1407     // Save health check service name.
1408     if (service_config != nullptr) {
1409       chand->health_check_service_name_.reset(
1410           gpr_strdup(parsed_service_config->health_check_service_name()));
1411     } else {
1412       chand->health_check_service_name_.reset();
1413     }
1414     // Save service config.
1415     chand->saved_service_config_ = std::move(service_config);
1416   }
1417   // We want to set the service config at least once. This should not really be
1418   // needed, but we are doing it as a defensive approach. This can be removed,
1419   // if we feel it is unnecessary.
1420   if (service_config_changed || !chand->received_first_resolver_result_) {
1421     chand->received_first_resolver_result_ = true;
1422     Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
1423         retry_throttle_data;
1424     if (parsed_service_config != nullptr) {
1425       retry_throttle_data = parsed_service_config->retry_throttling();
1426     }
1427     // Create service config setter to update channel state in the data
1428     // plane combiner.  Destroys itself when done.
1429     New<ServiceConfigSetter>(chand, retry_throttle_data,
1430                              chand->saved_service_config_);
1431   }
1432   UniquePtr<char> processed_lb_policy_name;
1433   chand->ProcessLbPolicy(result, parsed_service_config,
1434                          &processed_lb_policy_name, lb_policy_config);
1435   // Swap out the data used by GetChannelInfo().
1436   {
1437     MutexLock lock(&chand->info_mu_);
1438     chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1439     if (service_config_json != nullptr) {
1440       chand->info_service_config_json_ = std::move(service_config_json);
1441     }
1442   }
1443   // Return results.
1444   *lb_policy_name = chand->info_lb_policy_name_.get();
1445   return service_config_changed;
1446 }
1447
1448 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1449   if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1450     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1451   }
1452   LoadBalancingPolicy::PickResult result =
1453       picker_->Pick(LoadBalancingPolicy::PickArgs());
1454   if (result.connected_subchannel != nullptr) {
1455     ConnectedSubchannel* connected_subchannel =
1456         static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
1457     connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1458   } else {
1459     if (result.error == GRPC_ERROR_NONE) {
1460       result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1461           "LB policy dropped call on ping");
1462     }
1463   }
1464   return result.error;
1465 }
1466
1467 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1468   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1469   grpc_channel_element* elem =
1470       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1471   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1472   // Connectivity watch.
1473   if (op->on_connectivity_state_change != nullptr) {
1474     grpc_connectivity_state_notify_on_state_change(
1475         &chand->state_tracker_, op->connectivity_state,
1476         op->on_connectivity_state_change);
1477     op->on_connectivity_state_change = nullptr;
1478     op->connectivity_state = nullptr;
1479   }
1480   // Ping.
1481   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1482     grpc_error* error = chand->DoPingLocked(op);
1483     if (error != GRPC_ERROR_NONE) {
1484       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1485       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1486     }
1487     op->bind_pollset = nullptr;
1488     op->send_ping.on_initiate = nullptr;
1489     op->send_ping.on_ack = nullptr;
1490   }
1491   // Reset backoff.
1492   if (op->reset_connect_backoff) {
1493     if (chand->resolving_lb_policy_ != nullptr) {
1494       chand->resolving_lb_policy_->ResetBackoffLocked();
1495     }
1496   }
1497   // Disconnect.
1498   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1499     grpc_error* error = GRPC_ERROR_NONE;
1500     GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
1501         &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
1502         MemoryOrder::ACQUIRE));
1503     grpc_pollset_set_del_pollset_set(
1504         chand->resolving_lb_policy_->interested_parties(),
1505         chand->interested_parties_);
1506     chand->resolving_lb_policy_.reset();
1507     // Will delete itself.
1508     New<ConnectivityStateAndPickerSetter>(
1509         chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1510         UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1511             New<LoadBalancingPolicy::TransientFailurePicker>(
1512                 GRPC_ERROR_REF(op->disconnect_with_error))));
1513   }
1514   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1515   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1516 }
1517
1518 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1519                                    grpc_transport_op* op) {
1520   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1521   GPR_ASSERT(op->set_accept_stream == false);
1522   // Handle bind_pollset.
1523   if (op->bind_pollset != nullptr) {
1524     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1525   }
1526   // Pop into control plane combiner for remaining ops.
1527   op->handler_private.extra_arg = elem;
1528   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1529   GRPC_CLOSURE_SCHED(
1530       GRPC_CLOSURE_INIT(&op->handler_private.closure,
1531                         ChannelData::StartTransportOpLocked, op,
1532                         grpc_combiner_scheduler(chand->combiner_)),
1533       GRPC_ERROR_NONE);
1534 }
1535
1536 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1537                                  const grpc_channel_info* info) {
1538   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1539   MutexLock lock(&chand->info_mu_);
1540   if (info->lb_policy_name != nullptr) {
1541     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1542   }
1543   if (info->service_config_json != nullptr) {
1544     *info->service_config_json =
1545         gpr_strdup(chand->info_service_config_json_.get());
1546   }
1547 }
1548
1549 void ChannelData::AddQueuedPick(QueuedPick* pick,
1550                                 grpc_polling_entity* pollent) {
1551   // Add call to queued picks list.
1552   pick->next = queued_picks_;
1553   queued_picks_ = pick;
1554   // Add call's pollent to channel's interested_parties, so that I/O
1555   // can be done under the call's CQ.
1556   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1557 }
1558
1559 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1560                                    grpc_polling_entity* pollent) {
1561   // Remove call's pollent from channel's interested_parties.
1562   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1563   // Remove from queued picks list.
1564   for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1565        pick = &(*pick)->next) {
1566     if (*pick == to_remove) {
1567       *pick = to_remove->next;
1568       return;
1569     }
1570   }
1571 }
1572
1573 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1574   auto* chand = static_cast<ChannelData*>(arg);
1575   if (chand->resolving_lb_policy_ != nullptr) {
1576     chand->resolving_lb_policy_->ExitIdleLocked();
1577   }
1578   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1579 }
1580
1581 grpc_connectivity_state ChannelData::CheckConnectivityState(
1582     bool try_to_connect) {
1583   grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1584   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1585     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1586     GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1587                                            grpc_combiner_scheduler(combiner_)),
1588                        GRPC_ERROR_NONE);
1589   }
1590   return out;
1591 }
1592
1593 //
1594 // CallData implementation
1595 //
1596
1597 // Retry support:
1598 //
1599 // In order to support retries, we act as a proxy for stream op batches.
1600 // When we get a batch from the surface, we add it to our list of pending
1601 // batches, and we then use those batches to construct separate "child"
1602 // batches to be started on the subchannel call.  When the child batches
1603 // return, we then decide which pending batches have been completed and
1604 // schedule their callbacks accordingly.  If a subchannel call fails and
1605 // we want to retry it, we do a new pick and start again, constructing
1606 // new "child" batches for the new subchannel call.
1607 //
1608 // Note that retries are committed when receiving data from the server
1609 // (except for Trailers-Only responses).  However, there may be many
1610 // send ops started before receiving any data, so we may have already
1611 // completed some number of send ops (and returned the completions up to
1612 // the surface) by the time we realize that we need to retry.  To deal
1613 // with this, we cache data for send ops, so that we can replay them on a
1614 // different subchannel call even after we have completed the original
1615 // batches.
1616 //
1617 // There are two sets of data to maintain:
1618 // - In call_data (in the parent channel), we maintain a list of pending
1619 //   ops and cached data for send ops.
1620 // - In the subchannel call, we maintain state to indicate what ops have
1621 //   already been sent down to that call.
1622 //
1623 // When constructing the "child" batches, we compare those two sets of
1624 // data to see which batches need to be sent to the subchannel call.
1625
1626 // TODO(roth): In subsequent PRs:
1627 // - add support for transparent retries (including initial metadata)
1628 // - figure out how to record stats in census for retries
1629 //   (census filter is on top of this one)
1630 // - add census stats for retries
1631
1632 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1633                    const grpc_call_element_args& args)
1634     : deadline_state_(elem, args.call_stack, args.call_combiner,
1635                       GPR_LIKELY(chand.deadline_checking_enabled())
1636                           ? args.deadline
1637                           : GRPC_MILLIS_INF_FUTURE),
1638       path_(grpc_slice_ref_internal(args.path)),
1639       call_start_time_(args.start_time),
1640       deadline_(args.deadline),
1641       arena_(args.arena),
1642       owning_call_(args.call_stack),
1643       call_combiner_(args.call_combiner),
1644       call_context_(args.context),
1645       lb_call_state_(this),
1646       pending_send_initial_metadata_(false),
1647       pending_send_message_(false),
1648       pending_send_trailing_metadata_(false),
1649       enable_retries_(chand.enable_retries()),
1650       retry_committed_(false),
1651       last_attempt_got_server_pushback_(false) {}
1652
1653 CallData::~CallData() {
1654   grpc_slice_unref_internal(path_);
1655   GRPC_ERROR_UNREF(cancel_error_);
1656   // Make sure there are no remaining pending batches.
1657   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1658     GPR_ASSERT(pending_batches_[i].batch == nullptr);
1659   }
1660 }
1661
1662 grpc_error* CallData::Init(grpc_call_element* elem,
1663                            const grpc_call_element_args* args) {
1664   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1665   new (elem->call_data) CallData(elem, *chand, *args);
1666   return GRPC_ERROR_NONE;
1667 }
1668
1669 void CallData::Destroy(grpc_call_element* elem,
1670                        const grpc_call_final_info* final_info,
1671                        grpc_closure* then_schedule_closure) {
1672   CallData* calld = static_cast<CallData*>(elem->call_data);
1673   if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1674     calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1675     then_schedule_closure = nullptr;
1676   }
1677   calld->~CallData();
1678   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1679 }
1680
1681 void CallData::StartTransportStreamOpBatch(
1682     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1683   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1684   CallData* calld = static_cast<CallData*>(elem->call_data);
1685   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1686   if (GPR_LIKELY(chand->deadline_checking_enabled())) {
1687     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1688   }
1689   // If we've previously been cancelled, immediately fail any new batches.
1690   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1691     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1692       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1693               chand, calld, grpc_error_string(calld->cancel_error_));
1694     }
1695     // Note: This will release the call combiner.
1696     grpc_transport_stream_op_batch_finish_with_failure(
1697         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1698     return;
1699   }
1700   // Handle cancellation.
1701   if (GPR_UNLIKELY(batch->cancel_stream)) {
1702     // Stash a copy of cancel_error in our call data, so that we can use
1703     // it for subsequent operations.  This ensures that if the call is
1704     // cancelled before any batches are passed down (e.g., if the deadline
1705     // is in the past when the call starts), we can return the right
1706     // error to the caller when the first batch does get passed down.
1707     GRPC_ERROR_UNREF(calld->cancel_error_);
1708     calld->cancel_error_ =
1709         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1710     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1711       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1712               calld, grpc_error_string(calld->cancel_error_));
1713     }
1714     // If we do not have a subchannel call (i.e., a pick has not yet
1715     // been started), fail all pending batches.  Otherwise, send the
1716     // cancellation down to the subchannel call.
1717     if (calld->subchannel_call_ == nullptr) {
1718       // TODO(roth): If there is a pending retry callback, do we need to
1719       // cancel it here?
1720       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1721                                 NoYieldCallCombiner);
1722       // Note: This will release the call combiner.
1723       grpc_transport_stream_op_batch_finish_with_failure(
1724           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1725     } else {
1726       // Note: This will release the call combiner.
1727       calld->subchannel_call_->StartTransportStreamOpBatch(batch);
1728     }
1729     return;
1730   }
1731   // Add the batch to the pending list.
1732   calld->PendingBatchesAdd(elem, batch);
1733   // Check if we've already gotten a subchannel call.
1734   // Note that once we have completed the pick, we do not need to enter
1735   // the channel combiner, which is more efficient (especially for
1736   // streaming calls).
1737   if (calld->subchannel_call_ != nullptr) {
1738     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1739       gpr_log(GPR_INFO,
1740               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
1741               calld, calld->subchannel_call_.get());
1742     }
1743     calld->PendingBatchesResume(elem);
1744     return;
1745   }
1746   // We do not yet have a subchannel call.
1747   // For batches containing a send_initial_metadata op, enter the channel
1748   // combiner to start a pick.
1749   if (GPR_LIKELY(batch->send_initial_metadata)) {
1750     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1751       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
1752               chand, calld);
1753     }
1754     GRPC_CLOSURE_SCHED(
1755         GRPC_CLOSURE_INIT(
1756             &batch->handler_private.closure, StartPickLocked, elem,
1757             grpc_combiner_scheduler(chand->data_plane_combiner())),
1758         GRPC_ERROR_NONE);
1759   } else {
1760     // For all other batches, release the call combiner.
1761     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1762       gpr_log(GPR_INFO,
1763               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1764               calld);
1765     }
1766     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1767                             "batch does not include send_initial_metadata");
1768   }
1769 }
1770
1771 void CallData::SetPollent(grpc_call_element* elem,
1772                           grpc_polling_entity* pollent) {
1773   CallData* calld = static_cast<CallData*>(elem->call_data);
1774   calld->pollent_ = pollent;
1775 }
1776
1777 //
1778 // send op data caching
1779 //
1780
1781 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1782   if (pending->send_ops_cached) return;
1783   pending->send_ops_cached = true;
1784   grpc_transport_stream_op_batch* batch = pending->batch;
1785   // Save a copy of metadata for send_initial_metadata ops.
1786   if (batch->send_initial_metadata) {
1787     seen_send_initial_metadata_ = true;
1788     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1789     grpc_metadata_batch* send_initial_metadata =
1790         batch->payload->send_initial_metadata.send_initial_metadata;
1791     send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1792         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1793     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1794                              send_initial_metadata_storage_);
1795     send_initial_metadata_flags_ =
1796         batch->payload->send_initial_metadata.send_initial_metadata_flags;
1797     peer_string_ = batch->payload->send_initial_metadata.peer_string;
1798   }
1799   // Set up cache for send_message ops.
1800   if (batch->send_message) {
1801     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1802         std::move(batch->payload->send_message.send_message));
1803     send_messages_.push_back(cache);
1804   }
1805   // Save metadata batch for send_trailing_metadata ops.
1806   if (batch->send_trailing_metadata) {
1807     seen_send_trailing_metadata_ = true;
1808     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1809     grpc_metadata_batch* send_trailing_metadata =
1810         batch->payload->send_trailing_metadata.send_trailing_metadata;
1811     send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1812         sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1813     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1814                              send_trailing_metadata_storage_);
1815   }
1816 }
1817
1818 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
1819   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1820     gpr_log(GPR_INFO,
1821             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1822             this);
1823   }
1824   grpc_metadata_batch_destroy(&send_initial_metadata_);
1825 }
1826
1827 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
1828   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1829     gpr_log(GPR_INFO,
1830             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1831             chand, this, idx);
1832   }
1833   send_messages_[idx]->Destroy();
1834 }
1835
1836 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
1837   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1838     gpr_log(GPR_INFO,
1839             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1840             chand, this);
1841   }
1842   grpc_metadata_batch_destroy(&send_trailing_metadata_);
1843 }
1844
1845 void CallData::FreeCachedSendOpDataAfterCommit(
1846     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
1847   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1848   if (retry_state->completed_send_initial_metadata) {
1849     FreeCachedSendInitialMetadata(chand);
1850   }
1851   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1852     FreeCachedSendMessage(chand, i);
1853   }
1854   if (retry_state->completed_send_trailing_metadata) {
1855     FreeCachedSendTrailingMetadata(chand);
1856   }
1857 }
1858
1859 void CallData::FreeCachedSendOpDataForCompletedBatch(
1860     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
1861     SubchannelCallRetryState* retry_state) {
1862   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1863   if (batch_data->batch.send_initial_metadata) {
1864     FreeCachedSendInitialMetadata(chand);
1865   }
1866   if (batch_data->batch.send_message) {
1867     FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
1868   }
1869   if (batch_data->batch.send_trailing_metadata) {
1870     FreeCachedSendTrailingMetadata(chand);
1871   }
1872 }
1873
1874 //
1875 // LB recv_trailing_metadata_ready handling
1876 //
1877
1878 void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
1879     void* arg, grpc_error* error) {
1880   CallData* calld = static_cast<CallData*>(arg);
1881   // Invoke callback to LB policy.
1882   calld->lb_recv_trailing_metadata_ready_(
1883       calld->lb_recv_trailing_metadata_ready_user_data_,
1884       calld->recv_trailing_metadata_, &calld->lb_call_state_);
1885   // Chain to original callback.
1886   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
1887                    GRPC_ERROR_REF(error));
1888 }
1889
1890 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
1891     grpc_transport_stream_op_batch* batch) {
1892   if (lb_recv_trailing_metadata_ready_ != nullptr) {
1893     recv_trailing_metadata_ =
1894         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
1895     original_recv_trailing_metadata_ready_ =
1896         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1897     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
1898                       RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
1899                       grpc_schedule_on_exec_ctx);
1900     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1901         &recv_trailing_metadata_ready_;
1902   }
1903 }
1904
1905 //
1906 // pending_batches management
1907 //
1908
1909 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
1910   // Note: It is important the send_initial_metadata be the first entry
1911   // here, since the code in pick_subchannel_locked() assumes it will be.
1912   if (batch->send_initial_metadata) return 0;
1913   if (batch->send_message) return 1;
1914   if (batch->send_trailing_metadata) return 2;
1915   if (batch->recv_initial_metadata) return 3;
1916   if (batch->recv_message) return 4;
1917   if (batch->recv_trailing_metadata) return 5;
1918   GPR_UNREACHABLE_CODE(return (size_t)-1);
1919 }
1920
1921 // This is called via the call combiner, so access to calld is synchronized.
1922 void CallData::PendingBatchesAdd(grpc_call_element* elem,
1923                                  grpc_transport_stream_op_batch* batch) {
1924   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1925   const size_t idx = GetBatchIndex(batch);
1926   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1927     gpr_log(GPR_INFO,
1928             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1929             this, idx);
1930   }
1931   PendingBatch* pending = &pending_batches_[idx];
1932   GPR_ASSERT(pending->batch == nullptr);
1933   pending->batch = batch;
1934   pending->send_ops_cached = false;
1935   if (enable_retries_) {
1936     // Update state in calld about pending batches.
1937     // Also check if the batch takes us over the retry buffer limit.
1938     // Note: We don't check the size of trailing metadata here, because
1939     // gRPC clients do not send trailing metadata.
1940     if (batch->send_initial_metadata) {
1941       pending_send_initial_metadata_ = true;
1942       bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1943           batch->payload->send_initial_metadata.send_initial_metadata);
1944     }
1945     if (batch->send_message) {
1946       pending_send_message_ = true;
1947       bytes_buffered_for_retry_ +=
1948           batch->payload->send_message.send_message->length();
1949     }
1950     if (batch->send_trailing_metadata) {
1951       pending_send_trailing_metadata_ = true;
1952     }
1953     if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
1954                      chand->per_rpc_retry_buffer_size())) {
1955       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1956         gpr_log(GPR_INFO,
1957                 "chand=%p calld=%p: exceeded retry buffer size, committing",
1958                 chand, this);
1959       }
1960       SubchannelCallRetryState* retry_state =
1961           subchannel_call_ == nullptr ? nullptr
1962                                       : static_cast<SubchannelCallRetryState*>(
1963                                             subchannel_call_->GetParentData());
1964       RetryCommit(elem, retry_state);
1965       // If we are not going to retry and have not yet started, pretend
1966       // retries are disabled so that we don't bother with retry overhead.
1967       if (num_attempts_completed_ == 0) {
1968         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1969           gpr_log(GPR_INFO,
1970                   "chand=%p calld=%p: disabling retries before first attempt",
1971                   chand, this);
1972         }
1973         enable_retries_ = false;
1974       }
1975     }
1976   }
1977 }
1978
1979 void CallData::PendingBatchClear(PendingBatch* pending) {
1980   if (enable_retries_) {
1981     if (pending->batch->send_initial_metadata) {
1982       pending_send_initial_metadata_ = false;
1983     }
1984     if (pending->batch->send_message) {
1985       pending_send_message_ = false;
1986     }
1987     if (pending->batch->send_trailing_metadata) {
1988       pending_send_trailing_metadata_ = false;
1989     }
1990   }
1991   pending->batch = nullptr;
1992 }
1993
1994 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
1995                                       PendingBatch* pending) {
1996   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1997   grpc_transport_stream_op_batch* batch = pending->batch;
1998   // We clear the pending batch if all of its callbacks have been
1999   // scheduled and reset to nullptr.
2000   if (batch->on_complete == nullptr &&
2001       (!batch->recv_initial_metadata ||
2002        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2003            nullptr) &&
2004       (!batch->recv_message ||
2005        batch->payload->recv_message.recv_message_ready == nullptr) &&
2006       (!batch->recv_trailing_metadata ||
2007        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2008            nullptr)) {
2009     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2010       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
2011               this);
2012     }
2013     PendingBatchClear(pending);
2014   }
2015 }
2016
2017 // This is called via the call combiner, so access to calld is synchronized.
2018 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2019   grpc_transport_stream_op_batch* batch =
2020       static_cast<grpc_transport_stream_op_batch*>(arg);
2021   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2022   // Note: This will release the call combiner.
2023   grpc_transport_stream_op_batch_finish_with_failure(
2024       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2025 }
2026
2027 // This is called via the call combiner, so access to calld is synchronized.
2028 void CallData::PendingBatchesFail(
2029     grpc_call_element* elem, grpc_error* error,
2030     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2031   GPR_ASSERT(error != GRPC_ERROR_NONE);
2032   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2033     size_t num_batches = 0;
2034     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2035       if (pending_batches_[i].batch != nullptr) ++num_batches;
2036     }
2037     gpr_log(GPR_INFO,
2038             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2039             elem->channel_data, this, num_batches, grpc_error_string(error));
2040   }
2041   CallCombinerClosureList closures;
2042   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2043     PendingBatch* pending = &pending_batches_[i];
2044     grpc_transport_stream_op_batch* batch = pending->batch;
2045     if (batch != nullptr) {
2046       if (batch->recv_trailing_metadata) {
2047         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2048       }
2049       batch->handler_private.extra_arg = this;
2050       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2051                         FailPendingBatchInCallCombiner, batch,
2052                         grpc_schedule_on_exec_ctx);
2053       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2054                    "PendingBatchesFail");
2055       PendingBatchClear(pending);
2056     }
2057   }
2058   if (yield_call_combiner_predicate(closures)) {
2059     closures.RunClosures(call_combiner_);
2060   } else {
2061     closures.RunClosuresWithoutYielding(call_combiner_);
2062   }
2063   GRPC_ERROR_UNREF(error);
2064 }
2065
2066 // This is called via the call combiner, so access to calld is synchronized.
2067 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2068                                                 grpc_error* ignored) {
2069   grpc_transport_stream_op_batch* batch =
2070       static_cast<grpc_transport_stream_op_batch*>(arg);
2071   SubchannelCall* subchannel_call =
2072       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2073   // Note: This will release the call combiner.
2074   subchannel_call->StartTransportStreamOpBatch(batch);
2075 }
2076
2077 // This is called via the call combiner, so access to calld is synchronized.
2078 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2079   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2080   if (enable_retries_) {
2081     StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
2082     return;
2083   }
2084   // Retries not enabled; send down batches as-is.
2085   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2086     size_t num_batches = 0;
2087     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2088       if (pending_batches_[i].batch != nullptr) ++num_batches;
2089     }
2090     gpr_log(GPR_INFO,
2091             "chand=%p calld=%p: starting %" PRIuPTR
2092             " pending batches on subchannel_call=%p",
2093             chand, this, num_batches, subchannel_call_.get());
2094   }
2095   CallCombinerClosureList closures;
2096   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2097     PendingBatch* pending = &pending_batches_[i];
2098     grpc_transport_stream_op_batch* batch = pending->batch;
2099     if (batch != nullptr) {
2100       if (batch->recv_trailing_metadata) {
2101         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2102       }
2103       batch->handler_private.extra_arg = subchannel_call_.get();
2104       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2105                         ResumePendingBatchInCallCombiner, batch,
2106                         grpc_schedule_on_exec_ctx);
2107       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2108                    "PendingBatchesResume");
2109       PendingBatchClear(pending);
2110     }
2111   }
2112   // Note: This will release the call combiner.
2113   closures.RunClosures(call_combiner_);
2114 }
2115
2116 template <typename Predicate>
2117 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
2118                                                    const char* log_message,
2119                                                    Predicate predicate) {
2120   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2121   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2122     PendingBatch* pending = &pending_batches_[i];
2123     grpc_transport_stream_op_batch* batch = pending->batch;
2124     if (batch != nullptr && predicate(batch)) {
2125       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2126         gpr_log(GPR_INFO,
2127                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
2128                 this, log_message, i);
2129       }
2130       return pending;
2131     }
2132   }
2133   return nullptr;
2134 }
2135
2136 //
2137 // retry code
2138 //
2139
2140 void CallData::RetryCommit(grpc_call_element* elem,
2141                            SubchannelCallRetryState* retry_state) {
2142   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2143   if (retry_committed_) return;
2144   retry_committed_ = true;
2145   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2146     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2147   }
2148   if (retry_state != nullptr) {
2149     FreeCachedSendOpDataAfterCommit(elem, retry_state);
2150   }
2151 }
2152
2153 void CallData::DoRetry(grpc_call_element* elem,
2154                        SubchannelCallRetryState* retry_state,
2155                        grpc_millis server_pushback_ms) {
2156   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2157   GPR_ASSERT(method_params_ != nullptr);
2158   const auto* retry_policy = method_params_->retry_policy();
2159   GPR_ASSERT(retry_policy != nullptr);
2160   // Reset subchannel call and connected subchannel.
2161   subchannel_call_.reset();
2162   connected_subchannel_.reset();
2163   // Compute backoff delay.
2164   grpc_millis next_attempt_time;
2165   if (server_pushback_ms >= 0) {
2166     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2167     last_attempt_got_server_pushback_ = true;
2168   } else {
2169     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2170       retry_backoff_.Init(
2171           BackOff::Options()
2172               .set_initial_backoff(retry_policy->initial_backoff)
2173               .set_multiplier(retry_policy->backoff_multiplier)
2174               .set_jitter(RETRY_BACKOFF_JITTER)
2175               .set_max_backoff(retry_policy->max_backoff));
2176       last_attempt_got_server_pushback_ = false;
2177     }
2178     next_attempt_time = retry_backoff_->NextAttemptTime();
2179   }
2180   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2181     gpr_log(GPR_INFO,
2182             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2183             this, next_attempt_time - ExecCtx::Get()->Now());
2184   }
2185   // Schedule retry after computed delay.
2186   GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
2187                     grpc_combiner_scheduler(chand->data_plane_combiner()));
2188   grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2189   // Update bookkeeping.
2190   if (retry_state != nullptr) retry_state->retry_dispatched = true;
2191 }
2192
2193 bool CallData::MaybeRetry(grpc_call_element* elem,
2194                           SubchannelCallBatchData* batch_data,
2195                           grpc_status_code status,
2196                           grpc_mdelem* server_pushback_md) {
2197   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2198   // Get retry policy.
2199   if (method_params_ == nullptr) return false;
2200   const auto* retry_policy = method_params_->retry_policy();
2201   if (retry_policy == nullptr) return false;
2202   // If we've already dispatched a retry from this call, return true.
2203   // This catches the case where the batch has multiple callbacks
2204   // (i.e., it includes either recv_message or recv_initial_metadata).
2205   SubchannelCallRetryState* retry_state = nullptr;
2206   if (batch_data != nullptr) {
2207     retry_state = static_cast<SubchannelCallRetryState*>(
2208         batch_data->subchannel_call->GetParentData());
2209     if (retry_state->retry_dispatched) {
2210       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2211         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2212                 this);
2213       }
2214       return true;
2215     }
2216   }
2217   // Check status.
2218   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2219     if (retry_throttle_data_ != nullptr) {
2220       retry_throttle_data_->RecordSuccess();
2221     }
2222     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2223       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2224     }
2225     return false;
2226   }
2227   // Status is not OK.  Check whether the status is retryable.
2228   if (!retry_policy->retryable_status_codes.Contains(status)) {
2229     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2230       gpr_log(GPR_INFO,
2231               "chand=%p calld=%p: status %s not configured as retryable", chand,
2232               this, grpc_status_code_to_string(status));
2233     }
2234     return false;
2235   }
2236   // Record the failure and check whether retries are throttled.
2237   // Note that it's important for this check to come after the status
2238   // code check above, since we should only record failures whose statuses
2239   // match the configured retryable status codes, so that we don't count
2240   // things like failures due to malformed requests (INVALID_ARGUMENT).
2241   // Conversely, it's important for this to come before the remaining
2242   // checks, so that we don't fail to record failures due to other factors.
2243   if (retry_throttle_data_ != nullptr &&
2244       !retry_throttle_data_->RecordFailure()) {
2245     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2246       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2247     }
2248     return false;
2249   }
2250   // Check whether the call is committed.
2251   if (retry_committed_) {
2252     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2253       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2254               this);
2255     }
2256     return false;
2257   }
2258   // Check whether we have retries remaining.
2259   ++num_attempts_completed_;
2260   if (num_attempts_completed_ >= retry_policy->max_attempts) {
2261     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2262       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2263               this, retry_policy->max_attempts);
2264     }
2265     return false;
2266   }
2267   // If the call was cancelled from the surface, don't retry.
2268   if (cancel_error_ != GRPC_ERROR_NONE) {
2269     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2270       gpr_log(GPR_INFO,
2271               "chand=%p calld=%p: call cancelled from surface, not retrying",
2272               chand, this);
2273     }
2274     return false;
2275   }
2276   // Check server push-back.
2277   grpc_millis server_pushback_ms = -1;
2278   if (server_pushback_md != nullptr) {
2279     // If the value is "-1" or any other unparseable string, we do not retry.
2280     uint32_t ms;
2281     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2282       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2283         gpr_log(GPR_INFO,
2284                 "chand=%p calld=%p: not retrying due to server push-back",
2285                 chand, this);
2286       }
2287       return false;
2288     } else {
2289       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2290         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2291                 chand, this, ms);
2292       }
2293       server_pushback_ms = (grpc_millis)ms;
2294     }
2295   }
2296   DoRetry(elem, retry_state, server_pushback_ms);
2297   return true;
2298 }
2299
2300 //
2301 // CallData::SubchannelCallBatchData
2302 //
2303
2304 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2305     grpc_call_element* elem, int refcount, bool set_on_complete) {
2306   CallData* calld = static_cast<CallData*>(elem->call_data);
2307   return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2308                                                      set_on_complete);
2309 }
2310
2311 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2312     grpc_call_element* elem, CallData* calld, int refcount,
2313     bool set_on_complete)
2314     : elem(elem), subchannel_call(calld->subchannel_call_) {
2315   SubchannelCallRetryState* retry_state =
2316       static_cast<SubchannelCallRetryState*>(
2317           calld->subchannel_call_->GetParentData());
2318   batch.payload = &retry_state->batch_payload;
2319   gpr_ref_init(&refs, refcount);
2320   if (set_on_complete) {
2321     GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2322                       grpc_schedule_on_exec_ctx);
2323     batch.on_complete = &on_complete;
2324   }
2325   GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2326 }
2327
2328 void CallData::SubchannelCallBatchData::Destroy() {
2329   SubchannelCallRetryState* retry_state =
2330       static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2331   if (batch.send_initial_metadata) {
2332     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2333   }
2334   if (batch.send_trailing_metadata) {
2335     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2336   }
2337   if (batch.recv_initial_metadata) {
2338     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2339   }
2340   if (batch.recv_trailing_metadata) {
2341     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2342   }
2343   subchannel_call.reset();
2344   CallData* calld = static_cast<CallData*>(elem->call_data);
2345   GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2346 }
2347
2348 //
2349 // recv_initial_metadata callback handling
2350 //
2351
2352 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2353   SubchannelCallBatchData* batch_data =
2354       static_cast<SubchannelCallBatchData*>(arg);
2355   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2356   // Find pending batch.
2357   PendingBatch* pending = calld->PendingBatchFind(
2358       batch_data->elem, "invoking recv_initial_metadata_ready for",
2359       [](grpc_transport_stream_op_batch* batch) {
2360         return batch->recv_initial_metadata &&
2361                batch->payload->recv_initial_metadata
2362                        .recv_initial_metadata_ready != nullptr;
2363       });
2364   GPR_ASSERT(pending != nullptr);
2365   // Return metadata.
2366   SubchannelCallRetryState* retry_state =
2367       static_cast<SubchannelCallRetryState*>(
2368           batch_data->subchannel_call->GetParentData());
2369   grpc_metadata_batch_move(
2370       &retry_state->recv_initial_metadata,
2371       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2372   // Update bookkeeping.
2373   // Note: Need to do this before invoking the callback, since invoking
2374   // the callback will result in yielding the call combiner.
2375   grpc_closure* recv_initial_metadata_ready =
2376       pending->batch->payload->recv_initial_metadata
2377           .recv_initial_metadata_ready;
2378   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2379       nullptr;
2380   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2381   batch_data->Unref();
2382   // Invoke callback.
2383   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2384 }
2385
2386 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2387   SubchannelCallBatchData* batch_data =
2388       static_cast<SubchannelCallBatchData*>(arg);
2389   grpc_call_element* elem = batch_data->elem;
2390   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2391   CallData* calld = static_cast<CallData*>(elem->call_data);
2392   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2393     gpr_log(GPR_INFO,
2394             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2395             chand, calld, grpc_error_string(error));
2396   }
2397   SubchannelCallRetryState* retry_state =
2398       static_cast<SubchannelCallRetryState*>(
2399           batch_data->subchannel_call->GetParentData());
2400   retry_state->completed_recv_initial_metadata = true;
2401   // If a retry was already dispatched, then we're not going to use the
2402   // result of this recv_initial_metadata op, so do nothing.
2403   if (retry_state->retry_dispatched) {
2404     GRPC_CALL_COMBINER_STOP(
2405         calld->call_combiner_,
2406         "recv_initial_metadata_ready after retry dispatched");
2407     return;
2408   }
2409   // If we got an error or a Trailers-Only response and have not yet gotten
2410   // the recv_trailing_metadata_ready callback, then defer propagating this
2411   // callback back to the surface.  We can evaluate whether to retry when
2412   // recv_trailing_metadata comes back.
2413   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2414                     error != GRPC_ERROR_NONE) &&
2415                    !retry_state->completed_recv_trailing_metadata)) {
2416     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2417       gpr_log(GPR_INFO,
2418               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2419               "(Trailers-Only)",
2420               chand, calld);
2421     }
2422     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2423     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2424     if (!retry_state->started_recv_trailing_metadata) {
2425       // recv_trailing_metadata not yet started by application; start it
2426       // ourselves to get status.
2427       calld->StartInternalRecvTrailingMetadata(elem);
2428     } else {
2429       GRPC_CALL_COMBINER_STOP(
2430           calld->call_combiner_,
2431           "recv_initial_metadata_ready trailers-only or error");
2432     }
2433     return;
2434   }
2435   // Received valid initial metadata, so commit the call.
2436   calld->RetryCommit(elem, retry_state);
2437   // Invoke the callback to return the result to the surface.
2438   // Manually invoking a callback function; it does not take ownership of error.
2439   calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2440 }
2441
2442 //
2443 // recv_message callback handling
2444 //
2445
2446 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2447   SubchannelCallBatchData* batch_data =
2448       static_cast<SubchannelCallBatchData*>(arg);
2449   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2450   // Find pending op.
2451   PendingBatch* pending = calld->PendingBatchFind(
2452       batch_data->elem, "invoking recv_message_ready for",
2453       [](grpc_transport_stream_op_batch* batch) {
2454         return batch->recv_message &&
2455                batch->payload->recv_message.recv_message_ready != nullptr;
2456       });
2457   GPR_ASSERT(pending != nullptr);
2458   // Return payload.
2459   SubchannelCallRetryState* retry_state =
2460       static_cast<SubchannelCallRetryState*>(
2461           batch_data->subchannel_call->GetParentData());
2462   *pending->batch->payload->recv_message.recv_message =
2463       std::move(retry_state->recv_message);
2464   // Update bookkeeping.
2465   // Note: Need to do this before invoking the callback, since invoking
2466   // the callback will result in yielding the call combiner.
2467   grpc_closure* recv_message_ready =
2468       pending->batch->payload->recv_message.recv_message_ready;
2469   pending->batch->payload->recv_message.recv_message_ready = nullptr;
2470   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2471   batch_data->Unref();
2472   // Invoke callback.
2473   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2474 }
2475
2476 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2477   SubchannelCallBatchData* batch_data =
2478       static_cast<SubchannelCallBatchData*>(arg);
2479   grpc_call_element* elem = batch_data->elem;
2480   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2481   CallData* calld = static_cast<CallData*>(elem->call_data);
2482   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2483     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2484             chand, calld, grpc_error_string(error));
2485   }
2486   SubchannelCallRetryState* retry_state =
2487       static_cast<SubchannelCallRetryState*>(
2488           batch_data->subchannel_call->GetParentData());
2489   ++retry_state->completed_recv_message_count;
2490   // If a retry was already dispatched, then we're not going to use the
2491   // result of this recv_message op, so do nothing.
2492   if (retry_state->retry_dispatched) {
2493     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2494                             "recv_message_ready after retry dispatched");
2495     return;
2496   }
2497   // If we got an error or the payload was nullptr and we have not yet gotten
2498   // the recv_trailing_metadata_ready callback, then defer propagating this
2499   // callback back to the surface.  We can evaluate whether to retry when
2500   // recv_trailing_metadata comes back.
2501   if (GPR_UNLIKELY(
2502           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2503           !retry_state->completed_recv_trailing_metadata)) {
2504     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2505       gpr_log(GPR_INFO,
2506               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2507               "message and recv_trailing_metadata pending)",
2508               chand, calld);
2509     }
2510     retry_state->recv_message_ready_deferred_batch = batch_data;
2511     retry_state->recv_message_error = GRPC_ERROR_REF(error);
2512     if (!retry_state->started_recv_trailing_metadata) {
2513       // recv_trailing_metadata not yet started by application; start it
2514       // ourselves to get status.
2515       calld->StartInternalRecvTrailingMetadata(elem);
2516     } else {
2517       GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2518     }
2519     return;
2520   }
2521   // Received a valid message, so commit the call.
2522   calld->RetryCommit(elem, retry_state);
2523   // Invoke the callback to return the result to the surface.
2524   // Manually invoking a callback function; it does not take ownership of error.
2525   calld->InvokeRecvMessageCallback(batch_data, error);
2526 }
2527
2528 //
2529 // recv_trailing_metadata handling
2530 //
2531
2532 void CallData::GetCallStatus(grpc_call_element* elem,
2533                              grpc_metadata_batch* md_batch, grpc_error* error,
2534                              grpc_status_code* status,
2535                              grpc_mdelem** server_pushback_md) {
2536   if (error != GRPC_ERROR_NONE) {
2537     grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2538   } else {
2539     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2540     *status =
2541         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2542     if (server_pushback_md != nullptr &&
2543         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2544       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2545     }
2546   }
2547   GRPC_ERROR_UNREF(error);
2548 }
2549
2550 void CallData::AddClosureForRecvTrailingMetadataReady(
2551     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2552     grpc_error* error, CallCombinerClosureList* closures) {
2553   // Find pending batch.
2554   PendingBatch* pending = PendingBatchFind(
2555       elem, "invoking recv_trailing_metadata for",
2556       [](grpc_transport_stream_op_batch* batch) {
2557         return batch->recv_trailing_metadata &&
2558                batch->payload->recv_trailing_metadata
2559                        .recv_trailing_metadata_ready != nullptr;
2560       });
2561   // If we generated the recv_trailing_metadata op internally via
2562   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2563   if (pending == nullptr) {
2564     GRPC_ERROR_UNREF(error);
2565     return;
2566   }
2567   // Return metadata.
2568   SubchannelCallRetryState* retry_state =
2569       static_cast<SubchannelCallRetryState*>(
2570           batch_data->subchannel_call->GetParentData());
2571   grpc_metadata_batch_move(
2572       &retry_state->recv_trailing_metadata,
2573       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2574   // Add closure.
2575   closures->Add(pending->batch->payload->recv_trailing_metadata
2576                     .recv_trailing_metadata_ready,
2577                 error, "recv_trailing_metadata_ready for pending batch");
2578   // Update bookkeeping.
2579   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2580       nullptr;
2581   MaybeClearPendingBatch(elem, pending);
2582 }
2583
2584 void CallData::AddClosuresForDeferredRecvCallbacks(
2585     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2586     CallCombinerClosureList* closures) {
2587   if (batch_data->batch.recv_trailing_metadata) {
2588     // Add closure for deferred recv_initial_metadata_ready.
2589     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2590                      nullptr)) {
2591       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2592                         InvokeRecvInitialMetadataCallback,
2593                         retry_state->recv_initial_metadata_ready_deferred_batch,
2594                         grpc_schedule_on_exec_ctx);
2595       closures->Add(&retry_state->recv_initial_metadata_ready,
2596                     retry_state->recv_initial_metadata_error,
2597                     "resuming recv_initial_metadata_ready");
2598       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2599     }
2600     // Add closure for deferred recv_message_ready.
2601     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2602                      nullptr)) {
2603       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2604                         InvokeRecvMessageCallback,
2605                         retry_state->recv_message_ready_deferred_batch,
2606                         grpc_schedule_on_exec_ctx);
2607       closures->Add(&retry_state->recv_message_ready,
2608                     retry_state->recv_message_error,
2609                     "resuming recv_message_ready");
2610       retry_state->recv_message_ready_deferred_batch = nullptr;
2611     }
2612   }
2613 }
2614
2615 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2616                                        SubchannelCallRetryState* retry_state) {
2617   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2618     return false;
2619   }
2620   if (pending->batch->send_initial_metadata &&
2621       !retry_state->started_send_initial_metadata) {
2622     return true;
2623   }
2624   if (pending->batch->send_message &&
2625       retry_state->started_send_message_count < send_messages_.size()) {
2626     return true;
2627   }
2628   if (pending->batch->send_trailing_metadata &&
2629       !retry_state->started_send_trailing_metadata) {
2630     return true;
2631   }
2632   return false;
2633 }
2634
2635 void CallData::AddClosuresToFailUnstartedPendingBatches(
2636     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2637     grpc_error* error, CallCombinerClosureList* closures) {
2638   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2639   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2640     PendingBatch* pending = &pending_batches_[i];
2641     if (PendingBatchIsUnstarted(pending, retry_state)) {
2642       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2643         gpr_log(GPR_INFO,
2644                 "chand=%p calld=%p: failing unstarted pending batch at index "
2645                 "%" PRIuPTR,
2646                 chand, this, i);
2647       }
2648       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2649                     "failing on_complete for pending batch");
2650       pending->batch->on_complete = nullptr;
2651       MaybeClearPendingBatch(elem, pending);
2652     }
2653   }
2654   GRPC_ERROR_UNREF(error);
2655 }
2656
2657 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2658                                            grpc_error* error) {
2659   grpc_call_element* elem = batch_data->elem;
2660   SubchannelCallRetryState* retry_state =
2661       static_cast<SubchannelCallRetryState*>(
2662           batch_data->subchannel_call->GetParentData());
2663   // Construct list of closures to execute.
2664   CallCombinerClosureList closures;
2665   // First, add closure for recv_trailing_metadata_ready.
2666   AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2667                                          GRPC_ERROR_REF(error), &closures);
2668   // If there are deferred recv_initial_metadata_ready or recv_message_ready
2669   // callbacks, add them to closures.
2670   AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2671   // Add closures to fail any pending batches that have not yet been started.
2672   AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2673                                            GRPC_ERROR_REF(error), &closures);
2674   // Don't need batch_data anymore.
2675   batch_data->Unref();
2676   // Schedule all of the closures identified above.
2677   // Note: This will release the call combiner.
2678   closures.RunClosures(call_combiner_);
2679   GRPC_ERROR_UNREF(error);
2680 }
2681
2682 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
2683   SubchannelCallBatchData* batch_data =
2684       static_cast<SubchannelCallBatchData*>(arg);
2685   grpc_call_element* elem = batch_data->elem;
2686   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2687   CallData* calld = static_cast<CallData*>(elem->call_data);
2688   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2689     gpr_log(GPR_INFO,
2690             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
2691             chand, calld, grpc_error_string(error));
2692   }
2693   SubchannelCallRetryState* retry_state =
2694       static_cast<SubchannelCallRetryState*>(
2695           batch_data->subchannel_call->GetParentData());
2696   retry_state->completed_recv_trailing_metadata = true;
2697   // Get the call's status and check for server pushback metadata.
2698   grpc_status_code status = GRPC_STATUS_OK;
2699   grpc_mdelem* server_pushback_md = nullptr;
2700   grpc_metadata_batch* md_batch =
2701       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
2702   calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
2703                        &server_pushback_md);
2704   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2705     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2706             calld, grpc_status_code_to_string(status));
2707   }
2708   // Check if we should retry.
2709   if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
2710     // Unref batch_data for deferred recv_initial_metadata_ready or
2711     // recv_message_ready callbacks, if any.
2712     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
2713       batch_data->Unref();
2714       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2715     }
2716     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2717       batch_data->Unref();
2718       GRPC_ERROR_UNREF(retry_state->recv_message_error);
2719     }
2720     batch_data->Unref();
2721     return;
2722   }
2723   // Not retrying, so commit the call.
2724   calld->RetryCommit(elem, retry_state);
2725   // Run any necessary closures.
2726   calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
2727 }
2728
2729 //
2730 // on_complete callback handling
2731 //
2732
2733 void CallData::AddClosuresForCompletedPendingBatch(
2734     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2735     SubchannelCallRetryState* retry_state, grpc_error* error,
2736     CallCombinerClosureList* closures) {
2737   PendingBatch* pending = PendingBatchFind(
2738       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2739         // Match the pending batch with the same set of send ops as the
2740         // subchannel batch we've just completed.
2741         return batch->on_complete != nullptr &&
2742                batch_data->batch.send_initial_metadata ==
2743                    batch->send_initial_metadata &&
2744                batch_data->batch.send_message == batch->send_message &&
2745                batch_data->batch.send_trailing_metadata ==
2746                    batch->send_trailing_metadata;
2747       });
2748   // If batch_data is a replay batch, then there will be no pending
2749   // batch to complete.
2750   if (pending == nullptr) {
2751     GRPC_ERROR_UNREF(error);
2752     return;
2753   }
2754   // Add closure.
2755   closures->Add(pending->batch->on_complete, error,
2756                 "on_complete for pending batch");
2757   pending->batch->on_complete = nullptr;
2758   MaybeClearPendingBatch(elem, pending);
2759 }
2760
2761 void CallData::AddClosuresForReplayOrPendingSendOps(
2762     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2763     SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
2764   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2765   bool have_pending_send_message_ops =
2766       retry_state->started_send_message_count < send_messages_.size();
2767   bool have_pending_send_trailing_metadata_op =
2768       seen_send_trailing_metadata_ &&
2769       !retry_state->started_send_trailing_metadata;
2770   if (!have_pending_send_message_ops &&
2771       !have_pending_send_trailing_metadata_op) {
2772     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2773       PendingBatch* pending = &pending_batches_[i];
2774       grpc_transport_stream_op_batch* batch = pending->batch;
2775       if (batch == nullptr || pending->send_ops_cached) continue;
2776       if (batch->send_message) have_pending_send_message_ops = true;
2777       if (batch->send_trailing_metadata) {
2778         have_pending_send_trailing_metadata_op = true;
2779       }
2780     }
2781   }
2782   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2783     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2784       gpr_log(GPR_INFO,
2785               "chand=%p calld=%p: starting next batch for pending send op(s)",
2786               chand, this);
2787     }
2788     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2789                       StartRetriableSubchannelBatches, elem,
2790                       grpc_schedule_on_exec_ctx);
2791     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2792                   "starting next batch for send_* op(s)");
2793   }
2794 }
2795
2796 void CallData::OnComplete(void* arg, grpc_error* error) {
2797   SubchannelCallBatchData* batch_data =
2798       static_cast<SubchannelCallBatchData*>(arg);
2799   grpc_call_element* elem = batch_data->elem;
2800   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2801   CallData* calld = static_cast<CallData*>(elem->call_data);
2802   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2803     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2804     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2805             chand, calld, grpc_error_string(error), batch_str);
2806     gpr_free(batch_str);
2807   }
2808   SubchannelCallRetryState* retry_state =
2809       static_cast<SubchannelCallRetryState*>(
2810           batch_data->subchannel_call->GetParentData());
2811   // Update bookkeeping in retry_state.
2812   if (batch_data->batch.send_initial_metadata) {
2813     retry_state->completed_send_initial_metadata = true;
2814   }
2815   if (batch_data->batch.send_message) {
2816     ++retry_state->completed_send_message_count;
2817   }
2818   if (batch_data->batch.send_trailing_metadata) {
2819     retry_state->completed_send_trailing_metadata = true;
2820   }
2821   // If the call is committed, free cached data for send ops that we've just
2822   // completed.
2823   if (calld->retry_committed_) {
2824     calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
2825   }
2826   // Construct list of closures to execute.
2827   CallCombinerClosureList closures;
2828   // If a retry was already dispatched, that means we saw
2829   // recv_trailing_metadata before this, so we do nothing here.
2830   // Otherwise, invoke the callback to return the result to the surface.
2831   if (!retry_state->retry_dispatched) {
2832     // Add closure for the completed pending batch, if any.
2833     calld->AddClosuresForCompletedPendingBatch(
2834         elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
2835     // If needed, add a callback to start any replay or pending send ops on
2836     // the subchannel call.
2837     if (!retry_state->completed_recv_trailing_metadata) {
2838       calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
2839                                                   &closures);
2840     }
2841   }
2842   // Track number of pending subchannel send batches and determine if this
2843   // was the last one.
2844   --calld->num_pending_retriable_subchannel_send_batches_;
2845   const bool last_send_batch_complete =
2846       calld->num_pending_retriable_subchannel_send_batches_ == 0;
2847   // Don't need batch_data anymore.
2848   batch_data->Unref();
2849   // Schedule all of the closures identified above.
2850   // Note: This yeilds the call combiner.
2851   closures.RunClosures(calld->call_combiner_);
2852   // If this was the last subchannel send batch, unref the call stack.
2853   if (last_send_batch_complete) {
2854     GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
2855   }
2856 }
2857
2858 //
2859 // subchannel batch construction
2860 //
2861
2862 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
2863   grpc_transport_stream_op_batch* batch =
2864       static_cast<grpc_transport_stream_op_batch*>(arg);
2865   SubchannelCall* subchannel_call =
2866       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2867   // Note: This will release the call combiner.
2868   subchannel_call->StartTransportStreamOpBatch(batch);
2869 }
2870
2871 void CallData::AddClosureForSubchannelBatch(
2872     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2873     CallCombinerClosureList* closures) {
2874   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2875   batch->handler_private.extra_arg = subchannel_call_.get();
2876   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
2877                     batch, grpc_schedule_on_exec_ctx);
2878   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2879     char* batch_str = grpc_transport_stream_op_batch_string(batch);
2880     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2881             this, batch_str);
2882     gpr_free(batch_str);
2883   }
2884   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2885                 "start_subchannel_batch");
2886 }
2887
2888 void CallData::AddRetriableSendInitialMetadataOp(
2889     SubchannelCallRetryState* retry_state,
2890     SubchannelCallBatchData* batch_data) {
2891   // Maps the number of retries to the corresponding metadata value slice.
2892   static const grpc_slice* retry_count_strings[] = {
2893       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2894   // We need to make a copy of the metadata batch for each attempt, since
2895   // the filters in the subchannel stack may modify this batch, and we don't
2896   // want those modifications to be passed forward to subsequent attempts.
2897   //
2898   // If we've already completed one or more attempts, add the
2899   // grpc-retry-attempts header.
2900   retry_state->send_initial_metadata_storage =
2901       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2902           sizeof(grpc_linked_mdelem) *
2903           (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
2904   grpc_metadata_batch_copy(&send_initial_metadata_,
2905                            &retry_state->send_initial_metadata,
2906                            retry_state->send_initial_metadata_storage);
2907   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2908                        .grpc_previous_rpc_attempts != nullptr)) {
2909     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2910                                retry_state->send_initial_metadata.idx.named
2911                                    .grpc_previous_rpc_attempts);
2912   }
2913   if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
2914     grpc_mdelem retry_md = grpc_mdelem_create(
2915         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2916         *retry_count_strings[num_attempts_completed_ - 1], nullptr);
2917     grpc_error* error = grpc_metadata_batch_add_tail(
2918         &retry_state->send_initial_metadata,
2919         &retry_state
2920              ->send_initial_metadata_storage[send_initial_metadata_.list.count],
2921         retry_md);
2922     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2923       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2924               grpc_error_string(error));
2925       GPR_ASSERT(false);
2926     }
2927   }
2928   retry_state->started_send_initial_metadata = true;
2929   batch_data->batch.send_initial_metadata = true;
2930   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2931       &retry_state->send_initial_metadata;
2932   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2933       send_initial_metadata_flags_;
2934   batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
2935 }
2936
2937 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
2938                                          SubchannelCallRetryState* retry_state,
2939                                          SubchannelCallBatchData* batch_data) {
2940   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2941   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2942     gpr_log(GPR_INFO,
2943             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2944             chand, this, retry_state->started_send_message_count);
2945   }
2946   ByteStreamCache* cache =
2947       send_messages_[retry_state->started_send_message_count];
2948   ++retry_state->started_send_message_count;
2949   retry_state->send_message.Init(cache);
2950   batch_data->batch.send_message = true;
2951   batch_data->batch.payload->send_message.send_message.reset(
2952       retry_state->send_message.get());
2953 }
2954
2955 void CallData::AddRetriableSendTrailingMetadataOp(
2956     SubchannelCallRetryState* retry_state,
2957     SubchannelCallBatchData* batch_data) {
2958   // We need to make a copy of the metadata batch for each attempt, since
2959   // the filters in the subchannel stack may modify this batch, and we don't
2960   // want those modifications to be passed forward to subsequent attempts.
2961   retry_state->send_trailing_metadata_storage =
2962       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2963           sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
2964   grpc_metadata_batch_copy(&send_trailing_metadata_,
2965                            &retry_state->send_trailing_metadata,
2966                            retry_state->send_trailing_metadata_storage);
2967   retry_state->started_send_trailing_metadata = true;
2968   batch_data->batch.send_trailing_metadata = true;
2969   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2970       &retry_state->send_trailing_metadata;
2971 }
2972
2973 void CallData::AddRetriableRecvInitialMetadataOp(
2974     SubchannelCallRetryState* retry_state,
2975     SubchannelCallBatchData* batch_data) {
2976   retry_state->started_recv_initial_metadata = true;
2977   batch_data->batch.recv_initial_metadata = true;
2978   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2979   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2980       &retry_state->recv_initial_metadata;
2981   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2982       &retry_state->trailing_metadata_available;
2983   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2984                     RecvInitialMetadataReady, batch_data,
2985                     grpc_schedule_on_exec_ctx);
2986   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2987       &retry_state->recv_initial_metadata_ready;
2988 }
2989
2990 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
2991                                          SubchannelCallBatchData* batch_data) {
2992   ++retry_state->started_recv_message_count;
2993   batch_data->batch.recv_message = true;
2994   batch_data->batch.payload->recv_message.recv_message =
2995       &retry_state->recv_message;
2996   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
2997                     batch_data, grpc_schedule_on_exec_ctx);
2998   batch_data->batch.payload->recv_message.recv_message_ready =
2999       &retry_state->recv_message_ready;
3000 }
3001
3002 void CallData::AddRetriableRecvTrailingMetadataOp(
3003     SubchannelCallRetryState* retry_state,
3004     SubchannelCallBatchData* batch_data) {
3005   retry_state->started_recv_trailing_metadata = true;
3006   batch_data->batch.recv_trailing_metadata = true;
3007   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
3008   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
3009       &retry_state->recv_trailing_metadata;
3010   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
3011       &retry_state->collect_stats;
3012   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
3013                     RecvTrailingMetadataReady, batch_data,
3014                     grpc_schedule_on_exec_ctx);
3015   batch_data->batch.payload->recv_trailing_metadata
3016       .recv_trailing_metadata_ready =
3017       &retry_state->recv_trailing_metadata_ready;
3018   MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
3019       &batch_data->batch);
3020 }
3021
3022 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
3023   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3024   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3025     gpr_log(GPR_INFO,
3026             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
3027             "started; starting it internally",
3028             chand, this);
3029   }
3030   SubchannelCallRetryState* retry_state =
3031       static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
3032   // Create batch_data with 2 refs, since this batch will be unreffed twice:
3033   // once for the recv_trailing_metadata_ready callback when the subchannel
3034   // batch returns, and again when we actually get a recv_trailing_metadata
3035   // op from the surface.
3036   SubchannelCallBatchData* batch_data =
3037       SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
3038   AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3039   retry_state->recv_trailing_metadata_internal_batch = batch_data;
3040   // Note: This will release the call combiner.
3041   subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
3042 }
3043
3044 // If there are any cached send ops that need to be replayed on the
3045 // current subchannel call, creates and returns a new subchannel batch
3046 // to replay those ops.  Otherwise, returns nullptr.
3047 CallData::SubchannelCallBatchData*
3048 CallData::MaybeCreateSubchannelBatchForReplay(
3049     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
3050   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3051   SubchannelCallBatchData* replay_batch_data = nullptr;
3052   // send_initial_metadata.
3053   if (seen_send_initial_metadata_ &&
3054       !retry_state->started_send_initial_metadata &&
3055       !pending_send_initial_metadata_) {
3056     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3057       gpr_log(GPR_INFO,
3058               "chand=%p calld=%p: replaying previously completed "
3059               "send_initial_metadata op",
3060               chand, this);
3061     }
3062     replay_batch_data =
3063         SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3064     AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
3065   }
3066   // send_message.
3067   // Note that we can only have one send_message op in flight at a time.
3068   if (retry_state->started_send_message_count < send_messages_.size() &&
3069       retry_state->started_send_message_count ==
3070           retry_state->completed_send_message_count &&
3071       !pending_send_message_) {
3072     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3073       gpr_log(GPR_INFO,
3074               "chand=%p calld=%p: replaying previously completed "
3075               "send_message op",
3076               chand, this);
3077     }
3078     if (replay_batch_data == nullptr) {
3079       replay_batch_data =
3080           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3081     }
3082     AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
3083   }
3084   // send_trailing_metadata.
3085   // Note that we only add this op if we have no more send_message ops
3086   // to start, since we can't send down any more send_message ops after
3087   // send_trailing_metadata.
3088   if (seen_send_trailing_metadata_ &&
3089       retry_state->started_send_message_count == send_messages_.size() &&
3090       !retry_state->started_send_trailing_metadata &&
3091       !pending_send_trailing_metadata_) {
3092     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3093       gpr_log(GPR_INFO,
3094               "chand=%p calld=%p: replaying previously completed "
3095               "send_trailing_metadata op",
3096               chand, this);
3097     }
3098     if (replay_batch_data == nullptr) {
3099       replay_batch_data =
3100           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3101     }
3102     AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
3103   }
3104   return replay_batch_data;
3105 }
3106
3107 void CallData::AddSubchannelBatchesForPendingBatches(
3108     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
3109     CallCombinerClosureList* closures) {
3110   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3111     PendingBatch* pending = &pending_batches_[i];
3112     grpc_transport_stream_op_batch* batch = pending->batch;
3113     if (batch == nullptr) continue;
3114     // Skip any batch that either (a) has already been started on this
3115     // subchannel call or (b) we can't start yet because we're still
3116     // replaying send ops that need to be completed first.
3117     // TODO(roth): Note that if any one op in the batch can't be sent
3118     // yet due to ops that we're replaying, we don't start any of the ops
3119     // in the batch.  This is probably okay, but it could conceivably
3120     // lead to increased latency in some cases -- e.g., we could delay
3121     // starting a recv op due to it being in the same batch with a send
3122     // op.  If/when we revamp the callback protocol in
3123     // transport_stream_op_batch, we may be able to fix this.
3124     if (batch->send_initial_metadata &&
3125         retry_state->started_send_initial_metadata) {
3126       continue;
3127     }
3128     if (batch->send_message && retry_state->completed_send_message_count <
3129                                    retry_state->started_send_message_count) {
3130       continue;
3131     }
3132     // Note that we only start send_trailing_metadata if we have no more
3133     // send_message ops to start, since we can't send down any more
3134     // send_message ops after send_trailing_metadata.
3135     if (batch->send_trailing_metadata &&
3136         (retry_state->started_send_message_count + batch->send_message <
3137              send_messages_.size() ||
3138          retry_state->started_send_trailing_metadata)) {
3139       continue;
3140     }
3141     if (batch->recv_initial_metadata &&
3142         retry_state->started_recv_initial_metadata) {
3143       continue;
3144     }
3145     if (batch->recv_message && retry_state->completed_recv_message_count <
3146                                    retry_state->started_recv_message_count) {
3147       continue;
3148     }
3149     if (batch->recv_trailing_metadata &&
3150         retry_state->started_recv_trailing_metadata) {
3151       // If we previously completed a recv_trailing_metadata op
3152       // initiated by StartInternalRecvTrailingMetadata(), use the
3153       // result of that instead of trying to re-start this op.
3154       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3155                         nullptr))) {
3156         // If the batch completed, then trigger the completion callback
3157         // directly, so that we return the previously returned results to
3158         // the application.  Otherwise, just unref the internally
3159         // started subchannel batch, since we'll propagate the
3160         // completion when it completes.
3161         if (retry_state->completed_recv_trailing_metadata) {
3162           // Batches containing recv_trailing_metadata always succeed.
3163           closures->Add(
3164               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3165               "re-executing recv_trailing_metadata_ready to propagate "
3166               "internally triggered result");
3167         } else {
3168           retry_state->recv_trailing_metadata_internal_batch->Unref();
3169         }
3170         retry_state->recv_trailing_metadata_internal_batch = nullptr;
3171       }
3172       continue;
3173     }
3174     // If we're not retrying, just send the batch as-is.
3175     if (method_params_ == nullptr ||
3176         method_params_->retry_policy() == nullptr || retry_committed_) {
3177       // TODO(roth) : We should probably call
3178       // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3179       AddClosureForSubchannelBatch(elem, batch, closures);
3180       PendingBatchClear(pending);
3181       continue;
3182     }
3183     // Create batch with the right number of callbacks.
3184     const bool has_send_ops = batch->send_initial_metadata ||
3185                               batch->send_message ||
3186                               batch->send_trailing_metadata;
3187     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3188                               batch->recv_message +
3189                               batch->recv_trailing_metadata;
3190     SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3191         elem, num_callbacks, has_send_ops /* set_on_complete */);
3192     // Cache send ops if needed.
3193     MaybeCacheSendOpsForBatch(pending);
3194     // send_initial_metadata.
3195     if (batch->send_initial_metadata) {
3196       AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3197     }
3198     // send_message.
3199     if (batch->send_message) {
3200       AddRetriableSendMessageOp(elem, retry_state, batch_data);
3201     }
3202     // send_trailing_metadata.
3203     if (batch->send_trailing_metadata) {
3204       AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3205     }
3206     // recv_initial_metadata.
3207     if (batch->recv_initial_metadata) {
3208       // recv_flags is only used on the server side.
3209       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3210       AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3211     }
3212     // recv_message.
3213     if (batch->recv_message) {
3214       AddRetriableRecvMessageOp(retry_state, batch_data);
3215     }
3216     // recv_trailing_metadata.
3217     if (batch->recv_trailing_metadata) {
3218       AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3219     }
3220     AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3221     // Track number of pending subchannel send batches.
3222     // If this is the first one, take a ref to the call stack.
3223     if (batch->send_initial_metadata || batch->send_message ||
3224         batch->send_trailing_metadata) {
3225       if (num_pending_retriable_subchannel_send_batches_ == 0) {
3226         GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3227       }
3228       ++num_pending_retriable_subchannel_send_batches_;
3229     }
3230   }
3231 }
3232
3233 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3234   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3235   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3236   CallData* calld = static_cast<CallData*>(elem->call_data);
3237   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3238     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3239             chand, calld);
3240   }
3241   SubchannelCallRetryState* retry_state =
3242       static_cast<SubchannelCallRetryState*>(
3243           calld->subchannel_call_->GetParentData());
3244   // Construct list of closures to execute, one for each pending batch.
3245   CallCombinerClosureList closures;
3246   // Replay previously-returned send_* ops if needed.
3247   SubchannelCallBatchData* replay_batch_data =
3248       calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3249   if (replay_batch_data != nullptr) {
3250     calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3251                                         &closures);
3252     // Track number of pending subchannel send batches.
3253     // If this is the first one, take a ref to the call stack.
3254     if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3255       GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3256     }
3257     ++calld->num_pending_retriable_subchannel_send_batches_;
3258   }
3259   // Now add pending batches.
3260   calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3261   // Start batches on subchannel call.
3262   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3263     gpr_log(GPR_INFO,
3264             "chand=%p calld=%p: starting %" PRIuPTR
3265             " retriable batches on subchannel_call=%p",
3266             chand, calld, closures.size(), calld->subchannel_call_.get());
3267   }
3268   // Note: This will yield the call combiner.
3269   closures.RunClosures(calld->call_combiner_);
3270 }
3271
3272 //
3273 // LB pick
3274 //
3275
3276 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3277   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3278   const size_t parent_data_size =
3279       enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3280   const ConnectedSubchannel::CallArgs call_args = {
3281       pollent_, path_, call_start_time_, deadline_, arena_,
3282       // TODO(roth): When we implement hedging support, we will probably
3283       // need to use a separate call context for each subchannel call.
3284       call_context_, call_combiner_, parent_data_size};
3285   grpc_error* error = GRPC_ERROR_NONE;
3286   subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error);
3287   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3288     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3289             chand, this, subchannel_call_.get(), grpc_error_string(error));
3290   }
3291   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3292     PendingBatchesFail(elem, error, YieldCallCombiner);
3293   } else {
3294     if (parent_data_size > 0) {
3295       new (subchannel_call_->GetParentData())
3296           SubchannelCallRetryState(call_context_);
3297     }
3298     PendingBatchesResume(elem);
3299   }
3300 }
3301
3302 void CallData::PickDone(void* arg, grpc_error* error) {
3303   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3304   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3305   CallData* calld = static_cast<CallData*>(elem->call_data);
3306   if (error != GRPC_ERROR_NONE) {
3307     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3308       gpr_log(GPR_INFO,
3309               "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3310               calld, grpc_error_string(error));
3311     }
3312     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3313     return;
3314   }
3315   calld->CreateSubchannelCall(elem);
3316 }
3317
3318 // A class to handle the call combiner cancellation callback for a
3319 // queued pick.
3320 class CallData::QueuedPickCanceller {
3321  public:
3322   explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3323     auto* calld = static_cast<CallData*>(elem->call_data);
3324     auto* chand = static_cast<ChannelData*>(elem->channel_data);
3325     GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3326     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3327                       grpc_combiner_scheduler(chand->data_plane_combiner()));
3328     calld->call_combiner_->SetNotifyOnCancel(&closure_);
3329   }
3330
3331  private:
3332   static void CancelLocked(void* arg, grpc_error* error) {
3333     auto* self = static_cast<QueuedPickCanceller*>(arg);
3334     auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3335     auto* calld = static_cast<CallData*>(self->elem_->call_data);
3336     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3337       gpr_log(GPR_INFO,
3338               "chand=%p calld=%p: cancelling queued pick: "
3339               "error=%s self=%p calld->pick_canceller=%p",
3340               chand, calld, grpc_error_string(error), self,
3341               calld->pick_canceller_);
3342     }
3343     if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3344       // Remove pick from list of queued picks.
3345       calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3346       // Fail pending batches on the call.
3347       calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3348                                 YieldCallCombinerIfPendingBatchesFound);
3349     }
3350     GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3351     Delete(self);
3352   }
3353
3354   grpc_call_element* elem_;
3355   grpc_closure closure_;
3356 };
3357
3358 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3359   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3360   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3361     gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3362             chand, this);
3363   }
3364   chand->RemoveQueuedPick(&pick_, pollent_);
3365   pick_queued_ = false;
3366   // Lame the call combiner canceller.
3367   pick_canceller_ = nullptr;
3368 }
3369
3370 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3371   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3372   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3373     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3374             this);
3375   }
3376   pick_queued_ = true;
3377   pick_.elem = elem;
3378   chand->AddQueuedPick(&pick_, pollent_);
3379   // Register call combiner cancellation callback.
3380   pick_canceller_ = New<QueuedPickCanceller>(elem);
3381 }
3382
3383 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3384   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3385   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3386     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3387             chand, this);
3388   }
3389   // Store a ref to the service_config in service_config_call_data_. Also, save
3390   // a pointer to this in the call_context so that all future filters can access
3391   // it.
3392   service_config_call_data_ =
3393       ServiceConfig::CallData(chand->service_config(), path_);
3394   if (service_config_call_data_.service_config() != nullptr) {
3395     call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
3396         &service_config_call_data_;
3397     method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
3398         service_config_call_data_.GetMethodParsedConfig(
3399             internal::ClientChannelServiceConfigParser::ParserIndex()));
3400   }
3401   retry_throttle_data_ = chand->retry_throttle_data();
3402   if (method_params_ != nullptr) {
3403     // If the deadline from the service config is shorter than the one
3404     // from the client API, reset the deadline timer.
3405     if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3406       const grpc_millis per_method_deadline =
3407           grpc_timespec_to_millis_round_up(call_start_time_) +
3408           method_params_->timeout();
3409       if (per_method_deadline < deadline_) {
3410         deadline_ = per_method_deadline;
3411         grpc_deadline_state_reset(elem, deadline_);
3412       }
3413     }
3414     // If the service config set wait_for_ready and the application
3415     // did not explicitly set it, use the value from the service config.
3416     uint32_t* send_initial_metadata_flags =
3417         &pending_batches_[0]
3418              .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3419     if (method_params_->wait_for_ready().has_value() &&
3420         !(*send_initial_metadata_flags &
3421           GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3422       if (method_params_->wait_for_ready().value()) {
3423         *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3424       } else {
3425         *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3426       }
3427     }
3428   }
3429   // If no retry policy, disable retries.
3430   // TODO(roth): Remove this when adding support for transparent retries.
3431   if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3432     enable_retries_ = false;
3433   }
3434 }
3435
3436 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3437   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3438   // Apply service config data to the call only once, and only if the
3439   // channel has the data available.
3440   if (GPR_LIKELY(chand->received_service_config_data() &&
3441                  !service_config_applied_)) {
3442     service_config_applied_ = true;
3443     ApplyServiceConfigToCallLocked(elem);
3444   }
3445 }
3446
3447 const char* PickResultTypeName(
3448     LoadBalancingPolicy::PickResult::ResultType type) {
3449   switch (type) {
3450     case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
3451       return "COMPLETE";
3452     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3453       return "QUEUE";
3454     case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE:
3455       return "TRANSIENT_FAILURE";
3456   }
3457   GPR_UNREACHABLE_CODE(return "UNKNOWN");
3458 }
3459
3460 void CallData::StartPickLocked(void* arg, grpc_error* error) {
3461   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3462   CallData* calld = static_cast<CallData*>(elem->call_data);
3463   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3464   GPR_ASSERT(calld->connected_subchannel_ == nullptr);
3465   GPR_ASSERT(calld->subchannel_call_ == nullptr);
3466   // Apply service config to call if needed.
3467   calld->MaybeApplyServiceConfigToCallLocked(elem);
3468   // If this is a retry, use the send_initial_metadata payload that
3469   // we've cached; otherwise, use the pending batch.  The
3470   // send_initial_metadata batch will be the first pending batch in the
3471   // list, as set by GetBatchIndex() above.
3472   // TODO(roth): What if the LB policy needs to add something to the
3473   // call's initial metadata, and then there's a retry?  We don't want
3474   // the new metadata to be added twice.  We might need to somehow
3475   // allocate the subchannel batch earlier so that we can give the
3476   // subchannel's copy of the metadata batch (which is copied for each
3477   // attempt) to the LB policy instead the one from the parent channel.
3478   LoadBalancingPolicy::PickArgs pick_args;
3479   pick_args.call_state = &calld->lb_call_state_;
3480   pick_args.initial_metadata =
3481       calld->seen_send_initial_metadata_
3482           ? &calld->send_initial_metadata_
3483           : calld->pending_batches_[0]
3484                 .batch->payload->send_initial_metadata.send_initial_metadata;
3485   // Grab initial metadata flags so that we can check later if the call has
3486   // wait_for_ready enabled.
3487   const uint32_t send_initial_metadata_flags =
3488       calld->seen_send_initial_metadata_
3489           ? calld->send_initial_metadata_flags_
3490           : calld->pending_batches_[0]
3491                 .batch->payload->send_initial_metadata
3492                 .send_initial_metadata_flags;
3493   // When done, we schedule this closure to leave the data plane combiner.
3494   GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
3495                     grpc_schedule_on_exec_ctx);
3496   // Attempt pick.
3497   auto result = chand->picker()->Pick(pick_args);
3498   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3499     gpr_log(GPR_INFO,
3500             "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
3501             "error=%s)",
3502             chand, calld, PickResultTypeName(result.type),
3503             result.connected_subchannel.get(), grpc_error_string(result.error));
3504   }
3505   switch (result.type) {
3506     case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
3507       // If we're shutting down, fail all RPCs.
3508       grpc_error* disconnect_error = chand->disconnect_error();
3509       if (disconnect_error != GRPC_ERROR_NONE) {
3510         GRPC_ERROR_UNREF(result.error);
3511         GRPC_CLOSURE_SCHED(&calld->pick_closure_,
3512                            GRPC_ERROR_REF(disconnect_error));
3513         break;
3514       }
3515       // If wait_for_ready is false, then the error indicates the RPC
3516       // attempt's final status.
3517       if ((send_initial_metadata_flags &
3518            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3519         // Retry if appropriate; otherwise, fail.
3520         grpc_status_code status = GRPC_STATUS_OK;
3521         grpc_error_get_status(result.error, calld->deadline_, &status, nullptr,
3522                               nullptr, nullptr);
3523         if (!calld->enable_retries_ ||
3524             !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
3525                                nullptr /* server_pushback_md */)) {
3526           grpc_error* new_error =
3527               GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3528                   "Failed to pick subchannel", &result.error, 1);
3529           GRPC_ERROR_UNREF(result.error);
3530           GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
3531         }
3532         if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3533         break;
3534       }
3535       // If wait_for_ready is true, then queue to retry when we get a new
3536       // picker.
3537       GRPC_ERROR_UNREF(result.error);
3538     }
3539     // Fallthrough
3540     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3541       if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
3542       break;
3543     default:  // PICK_COMPLETE
3544       // Handle drops.
3545       if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
3546         result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3547             "Call dropped by load balancing policy");
3548       }
3549       calld->connected_subchannel_ = std::move(result.connected_subchannel);
3550       calld->lb_recv_trailing_metadata_ready_ =
3551           result.recv_trailing_metadata_ready;
3552       calld->lb_recv_trailing_metadata_ready_user_data_ =
3553           result.recv_trailing_metadata_ready_user_data;
3554       GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error);
3555       if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3556   }
3557 }
3558
3559 }  // namespace
3560 }  // namespace grpc_core
3561
3562 /*************************************************************************
3563  * EXPORTED SYMBOLS
3564  */
3565
3566 using grpc_core::CallData;
3567 using grpc_core::ChannelData;
3568
3569 const grpc_channel_filter grpc_client_channel_filter = {
3570     CallData::StartTransportStreamOpBatch,
3571     ChannelData::StartTransportOp,
3572     sizeof(CallData),
3573     CallData::Init,
3574     CallData::SetPollent,
3575     CallData::Destroy,
3576     sizeof(ChannelData),
3577     ChannelData::Init,
3578     ChannelData::Destroy,
3579     ChannelData::GetChannelInfo,
3580     "client-channel",
3581 };
3582
3583 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3584     grpc_channel_element* elem, int try_to_connect) {
3585   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3586   return chand->CheckConnectivityState(try_to_connect);
3587 }
3588
3589 int grpc_client_channel_num_external_connectivity_watchers(
3590     grpc_channel_element* elem) {
3591   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3592   return chand->NumExternalConnectivityWatchers();
3593 }
3594
3595 void grpc_client_channel_watch_connectivity_state(
3596     grpc_channel_element* elem, grpc_polling_entity pollent,
3597     grpc_connectivity_state* state, grpc_closure* closure,
3598     grpc_closure* watcher_timer_init) {
3599   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3600   return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3601                                                watcher_timer_init);
3602 }
3603
3604 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3605 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3606   auto* calld = static_cast<CallData*>(elem->call_data);
3607   return calld->subchannel_call();
3608 }