Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / client_channel.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/combiner.h"
56 #include "src/core/lib/iomgr/iomgr.h"
57 #include "src/core/lib/iomgr/polling_entity.h"
58 #include "src/core/lib/profiling/timers.h"
59 #include "src/core/lib/slice/slice_internal.h"
60 #include "src/core/lib/slice/slice_string_helpers.h"
61 #include "src/core/lib/surface/channel.h"
62 #include "src/core/lib/transport/connectivity_state.h"
63 #include "src/core/lib/transport/error_utils.h"
64 #include "src/core/lib/transport/metadata.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/lib/transport/static_metadata.h"
67 #include "src/core/lib/transport/status_metadata.h"
68
69 using grpc_core::internal::ClientChannelMethodParsedObject;
70 using grpc_core::internal::ServerRetryThrottleData;
71
72 //
73 // Client channel filter
74 //
75
76 // By default, we buffer 256 KiB per RPC for retries.
77 // TODO(roth): Do we have any data to suggest a better value?
78 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
79
80 // This value was picked arbitrarily.  It can be changed if there is
81 // any even moderately compelling reason to do so.
82 #define RETRY_BACKOFF_JITTER 0.2
83
84 // Max number of batches that can be pending on a call at any given
85 // time.  This includes one batch for each of the following ops:
86 //   recv_initial_metadata
87 //   send_initial_metadata
88 //   recv_message
89 //   send_message
90 //   recv_trailing_metadata
91 //   send_trailing_metadata
92 #define MAX_PENDING_BATCHES 6
93
94 namespace grpc_core {
95
96 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
97 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
98
99 namespace {
100
101 //
102 // ChannelData definition
103 //
104
105 class ChannelData {
106  public:
107   struct QueuedPick {
108     LoadBalancingPolicy::PickArgs pick;
109     grpc_call_element* elem;
110     QueuedPick* next = nullptr;
111   };
112
113   static grpc_error* Init(grpc_channel_element* elem,
114                           grpc_channel_element_args* args);
115   static void Destroy(grpc_channel_element* elem);
116   static void StartTransportOp(grpc_channel_element* elem,
117                                grpc_transport_op* op);
118   static void GetChannelInfo(grpc_channel_element* elem,
119                              const grpc_channel_info* info);
120
121   void set_channelz_node(channelz::ClientChannelNode* node) {
122     channelz_node_ = node;
123     resolving_lb_policy_->set_channelz_node(node->Ref());
124   }
125   void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
126                                 channelz::ChildRefsList* child_channels) {
127     if (resolving_lb_policy_ != nullptr) {
128       resolving_lb_policy_->FillChildRefsForChannelz(child_subchannels,
129                                                      child_channels);
130     }
131   }
132
133   bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
134   bool enable_retries() const { return enable_retries_; }
135   size_t per_rpc_retry_buffer_size() const {
136     return per_rpc_retry_buffer_size_;
137   }
138
139   // Note: Does NOT return a new ref.
140   grpc_error* disconnect_error() const {
141     return disconnect_error_.Load(MemoryOrder::ACQUIRE);
142   }
143
144   grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
145
146   LoadBalancingPolicy::SubchannelPicker* picker() const {
147     return picker_.get();
148   }
149   void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
150   void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
151
152   bool received_service_config_data() const {
153     return received_service_config_data_;
154   }
155   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
156     return retry_throttle_data_;
157   }
158   RefCountedPtr<ServiceConfig> service_config() const {
159     return service_config_;
160   }
161
162   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
163   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
164                                       grpc_connectivity_state* state,
165                                       grpc_closure* on_complete,
166                                       grpc_closure* watcher_timer_init) {
167     // Will delete itself.
168     New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
169                                      watcher_timer_init);
170   }
171   int NumExternalConnectivityWatchers() const {
172     return external_connectivity_watcher_list_.size();
173   }
174
175  private:
176   class ConnectivityStateAndPickerSetter;
177   class ServiceConfigSetter;
178   class ClientChannelControlHelper;
179
180   class ExternalConnectivityWatcher {
181    public:
182     class WatcherList {
183      public:
184       WatcherList() { gpr_mu_init(&mu_); }
185       ~WatcherList() { gpr_mu_destroy(&mu_); }
186
187       int size() const;
188       ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
189       void Add(ExternalConnectivityWatcher* watcher);
190       void Remove(const ExternalConnectivityWatcher* watcher);
191
192      private:
193       // head_ is guarded by a mutex, since the size() method needs to
194       // iterate over the list, and it's called from the C-core API
195       // function grpc_channel_num_external_connectivity_watchers(), which
196       // is synchronous and therefore cannot run in the combiner.
197       mutable gpr_mu mu_;
198       ExternalConnectivityWatcher* head_ = nullptr;
199     };
200
201     ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
202                                 grpc_connectivity_state* state,
203                                 grpc_closure* on_complete,
204                                 grpc_closure* watcher_timer_init);
205
206     ~ExternalConnectivityWatcher();
207
208    private:
209     static void OnWatchCompleteLocked(void* arg, grpc_error* error);
210     static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
211
212     ChannelData* chand_;
213     grpc_polling_entity pollent_;
214     grpc_connectivity_state* state_;
215     grpc_closure* on_complete_;
216     grpc_closure* watcher_timer_init_;
217     grpc_closure my_closure_;
218     ExternalConnectivityWatcher* next_ = nullptr;
219   };
220
221   ChannelData(grpc_channel_element_args* args, grpc_error** error);
222   ~ChannelData();
223
224   static bool ProcessResolverResultLocked(
225       void* arg, const Resolver::Result& result, const char** lb_policy_name,
226       RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
227       grpc_error** service_config_error);
228
229   grpc_error* DoPingLocked(grpc_transport_op* op);
230
231   static void StartTransportOpLocked(void* arg, grpc_error* ignored);
232
233   static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
234
235   void ProcessLbPolicy(
236       const Resolver::Result& resolver_result,
237       const internal::ClientChannelGlobalParsedObject* parsed_service_config,
238       UniquePtr<char>* lb_policy_name,
239       RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config);
240
241   //
242   // Fields set at construction and never modified.
243   //
244   const bool deadline_checking_enabled_;
245   const bool enable_retries_;
246   const size_t per_rpc_retry_buffer_size_;
247   grpc_channel_stack* owning_stack_;
248   ClientChannelFactory* client_channel_factory_;
249   UniquePtr<char> server_name_;
250   RefCountedPtr<ServiceConfig> default_service_config_;
251   // Initialized shortly after construction.
252   channelz::ClientChannelNode* channelz_node_ = nullptr;
253
254   //
255   // Fields used in the data plane.  Guarded by data_plane_combiner.
256   //
257   grpc_combiner* data_plane_combiner_;
258   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
259   QueuedPick* queued_picks_ = nullptr;  // Linked list of queued picks.
260   // Data from service config.
261   bool received_service_config_data_ = false;
262   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
263   RefCountedPtr<ServiceConfig> service_config_;
264
265   //
266   // Fields used in the control plane.  Guarded by combiner.
267   //
268   grpc_combiner* combiner_;
269   grpc_pollset_set* interested_parties_;
270   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
271   OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
272   grpc_connectivity_state_tracker state_tracker_;
273   ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
274   UniquePtr<char> health_check_service_name_;
275   RefCountedPtr<ServiceConfig> saved_service_config_;
276   bool received_first_resolver_result_ = false;
277
278   //
279   // Fields accessed from both data plane and control plane combiners.
280   //
281   Atomic<grpc_error*> disconnect_error_;
282
283   //
284   // Fields guarded by a mutex, since they need to be accessed
285   // synchronously via get_channel_info().
286   //
287   gpr_mu info_mu_;
288   UniquePtr<char> info_lb_policy_name_;
289   UniquePtr<char> info_service_config_json_;
290 };
291
292 //
293 // CallData definition
294 //
295
296 class CallData {
297  public:
298   static grpc_error* Init(grpc_call_element* elem,
299                           const grpc_call_element_args* args);
300   static void Destroy(grpc_call_element* elem,
301                       const grpc_call_final_info* final_info,
302                       grpc_closure* then_schedule_closure);
303   static void StartTransportStreamOpBatch(
304       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
305   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
306
307   RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
308
309   // Invoked by channel for queued picks once resolver results are available.
310   void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
311
312   // Invoked by channel for queued picks when the picker is updated.
313   static void StartPickLocked(void* arg, grpc_error* error);
314
315  private:
316   class QueuedPickCanceller;
317
318   // State used for starting a retryable batch on a subchannel call.
319   // This provides its own grpc_transport_stream_op_batch and other data
320   // structures needed to populate the ops in the batch.
321   // We allocate one struct on the arena for each attempt at starting a
322   // batch on a given subchannel call.
323   struct SubchannelCallBatchData {
324     // Creates a SubchannelCallBatchData object on the call's arena with the
325     // specified refcount.  If set_on_complete is true, the batch's
326     // on_complete callback will be set to point to on_complete();
327     // otherwise, the batch's on_complete callback will be null.
328     static SubchannelCallBatchData* Create(grpc_call_element* elem,
329                                            int refcount, bool set_on_complete);
330
331     void Unref() {
332       if (gpr_unref(&refs)) Destroy();
333     }
334
335     SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
336                             int refcount, bool set_on_complete);
337     // All dtor code must be added in `Destroy()`. This is because we may
338     // call closures in `SubchannelCallBatchData` after they are unrefed by
339     // `Unref()`, and msan would complain about accessing this class
340     // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
341     // TODO(soheil): We should try to call the dtor in `Unref()`.
342     ~SubchannelCallBatchData() { Destroy(); }
343     void Destroy();
344
345     gpr_refcount refs;
346     grpc_call_element* elem;
347     RefCountedPtr<SubchannelCall> subchannel_call;
348     // The batch to use in the subchannel call.
349     // Its payload field points to SubchannelCallRetryState::batch_payload.
350     grpc_transport_stream_op_batch batch;
351     // For intercepting on_complete.
352     grpc_closure on_complete;
353   };
354
355   // Retry state associated with a subchannel call.
356   // Stored in the parent_data of the subchannel call object.
357   struct SubchannelCallRetryState {
358     explicit SubchannelCallRetryState(grpc_call_context_element* context)
359         : batch_payload(context),
360           started_send_initial_metadata(false),
361           completed_send_initial_metadata(false),
362           started_send_trailing_metadata(false),
363           completed_send_trailing_metadata(false),
364           started_recv_initial_metadata(false),
365           completed_recv_initial_metadata(false),
366           started_recv_trailing_metadata(false),
367           completed_recv_trailing_metadata(false),
368           retry_dispatched(false) {}
369
370     // SubchannelCallBatchData.batch.payload points to this.
371     grpc_transport_stream_op_batch_payload batch_payload;
372     // For send_initial_metadata.
373     // Note that we need to make a copy of the initial metadata for each
374     // subchannel call instead of just referring to the copy in call_data,
375     // because filters in the subchannel stack will probably add entries,
376     // so we need to start in a pristine state for each attempt of the call.
377     grpc_linked_mdelem* send_initial_metadata_storage;
378     grpc_metadata_batch send_initial_metadata;
379     // For send_message.
380     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
381     ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
382     // For send_trailing_metadata.
383     grpc_linked_mdelem* send_trailing_metadata_storage;
384     grpc_metadata_batch send_trailing_metadata;
385     // For intercepting recv_initial_metadata.
386     grpc_metadata_batch recv_initial_metadata;
387     grpc_closure recv_initial_metadata_ready;
388     bool trailing_metadata_available = false;
389     // For intercepting recv_message.
390     grpc_closure recv_message_ready;
391     OrphanablePtr<ByteStream> recv_message;
392     // For intercepting recv_trailing_metadata.
393     grpc_metadata_batch recv_trailing_metadata;
394     grpc_transport_stream_stats collect_stats;
395     grpc_closure recv_trailing_metadata_ready;
396     // These fields indicate which ops have been started and completed on
397     // this subchannel call.
398     size_t started_send_message_count = 0;
399     size_t completed_send_message_count = 0;
400     size_t started_recv_message_count = 0;
401     size_t completed_recv_message_count = 0;
402     bool started_send_initial_metadata : 1;
403     bool completed_send_initial_metadata : 1;
404     bool started_send_trailing_metadata : 1;
405     bool completed_send_trailing_metadata : 1;
406     bool started_recv_initial_metadata : 1;
407     bool completed_recv_initial_metadata : 1;
408     bool started_recv_trailing_metadata : 1;
409     bool completed_recv_trailing_metadata : 1;
410     // State for callback processing.
411     SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
412         nullptr;
413     grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
414     SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
415     grpc_error* recv_message_error = GRPC_ERROR_NONE;
416     SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
417     // NOTE: Do not move this next to the metadata bitfields above. That would
418     //       save space but will also result in a data race because compiler
419     //       will generate a 2 byte store which overwrites the meta-data
420     //       fields upon setting this field.
421     bool retry_dispatched : 1;
422   };
423
424   // Pending batches stored in call data.
425   struct PendingBatch {
426     // The pending batch.  If nullptr, this slot is empty.
427     grpc_transport_stream_op_batch* batch;
428     // Indicates whether payload for send ops has been cached in CallData.
429     bool send_ops_cached;
430   };
431
432   CallData(grpc_call_element* elem, const ChannelData& chand,
433            const grpc_call_element_args& args);
434   ~CallData();
435
436   // Caches data for send ops so that it can be retried later, if not
437   // already cached.
438   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
439   void FreeCachedSendInitialMetadata(ChannelData* chand);
440   // Frees cached send_message at index idx.
441   void FreeCachedSendMessage(ChannelData* chand, size_t idx);
442   void FreeCachedSendTrailingMetadata(ChannelData* chand);
443   // Frees cached send ops that have already been completed after
444   // committing the call.
445   void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
446                                        SubchannelCallRetryState* retry_state);
447   // Frees cached send ops that were completed by the completed batch in
448   // batch_data.  Used when batches are completed after the call is committed.
449   void FreeCachedSendOpDataForCompletedBatch(
450       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
451       SubchannelCallRetryState* retry_state);
452
453   static void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
454       const LoadBalancingPolicy::PickArgs& pick,
455       grpc_transport_stream_op_batch* batch);
456
457   // Returns the index into pending_batches_ to be used for batch.
458   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
459   void PendingBatchesAdd(grpc_call_element* elem,
460                          grpc_transport_stream_op_batch* batch);
461   void PendingBatchClear(PendingBatch* pending);
462   void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
463   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
464   // A predicate type and some useful implementations for PendingBatchesFail().
465   typedef bool (*YieldCallCombinerPredicate)(
466       const CallCombinerClosureList& closures);
467   static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
468     return true;
469   }
470   static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
471     return false;
472   }
473   static bool YieldCallCombinerIfPendingBatchesFound(
474       const CallCombinerClosureList& closures) {
475     return closures.size() > 0;
476   }
477   // Fails all pending batches.
478   // If yield_call_combiner_predicate returns true, assumes responsibility for
479   // yielding the call combiner.
480   void PendingBatchesFail(
481       grpc_call_element* elem, grpc_error* error,
482       YieldCallCombinerPredicate yield_call_combiner_predicate);
483   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
484   // Resumes all pending batches on subchannel_call_.
485   void PendingBatchesResume(grpc_call_element* elem);
486   // Returns a pointer to the first pending batch for which predicate(batch)
487   // returns true, or null if not found.
488   template <typename Predicate>
489   PendingBatch* PendingBatchFind(grpc_call_element* elem,
490                                  const char* log_message, Predicate predicate);
491
492   // Commits the call so that no further retry attempts will be performed.
493   void RetryCommit(grpc_call_element* elem,
494                    SubchannelCallRetryState* retry_state);
495   // Starts a retry after appropriate back-off.
496   void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
497                grpc_millis server_pushback_ms);
498   // Returns true if the call is being retried.
499   bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
500                   grpc_status_code status, grpc_mdelem* server_pushback_md);
501
502   // Invokes recv_initial_metadata_ready for a subchannel batch.
503   static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
504   // Intercepts recv_initial_metadata_ready callback for retries.
505   // Commits the call and returns the initial metadata up the stack.
506   static void RecvInitialMetadataReady(void* arg, grpc_error* error);
507
508   // Invokes recv_message_ready for a subchannel batch.
509   static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
510   // Intercepts recv_message_ready callback for retries.
511   // Commits the call and returns the message up the stack.
512   static void RecvMessageReady(void* arg, grpc_error* error);
513
514   // Sets *status and *server_pushback_md based on md_batch and error.
515   // Only sets *server_pushback_md if server_pushback_md != nullptr.
516   void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
517                      grpc_error* error, grpc_status_code* status,
518                      grpc_mdelem** server_pushback_md);
519   // Adds recv_trailing_metadata_ready closure to closures.
520   void AddClosureForRecvTrailingMetadataReady(
521       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
522       grpc_error* error, CallCombinerClosureList* closures);
523   // Adds any necessary closures for deferred recv_initial_metadata and
524   // recv_message callbacks to closures.
525   static void AddClosuresForDeferredRecvCallbacks(
526       SubchannelCallBatchData* batch_data,
527       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
528   // Returns true if any op in the batch was not yet started.
529   // Only looks at send ops, since recv ops are always started immediately.
530   bool PendingBatchIsUnstarted(PendingBatch* pending,
531                                SubchannelCallRetryState* retry_state);
532   // For any pending batch containing an op that has not yet been started,
533   // adds the pending batch's completion closures to closures.
534   void AddClosuresToFailUnstartedPendingBatches(
535       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
536       grpc_error* error, CallCombinerClosureList* closures);
537   // Runs necessary closures upon completion of a call attempt.
538   void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
539                                    grpc_error* error);
540   // Intercepts recv_trailing_metadata_ready callback for retries.
541   // Commits the call and returns the trailing metadata up the stack.
542   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
543
544   // Adds the on_complete closure for the pending batch completed in
545   // batch_data to closures.
546   void AddClosuresForCompletedPendingBatch(
547       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
548       SubchannelCallRetryState* retry_state, grpc_error* error,
549       CallCombinerClosureList* closures);
550
551   // If there are any cached ops to replay or pending ops to start on the
552   // subchannel call, adds a closure to closures to invoke
553   // StartRetriableSubchannelBatches().
554   void AddClosuresForReplayOrPendingSendOps(
555       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
556       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
557
558   // Callback used to intercept on_complete from subchannel calls.
559   // Called only when retries are enabled.
560   static void OnComplete(void* arg, grpc_error* error);
561
562   static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
563   // Adds a closure to closures that will execute batch in the call combiner.
564   void AddClosureForSubchannelBatch(grpc_call_element* elem,
565                                     grpc_transport_stream_op_batch* batch,
566                                     CallCombinerClosureList* closures);
567   // Adds retriable send_initial_metadata op to batch_data.
568   void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
569                                          SubchannelCallBatchData* batch_data);
570   // Adds retriable send_message op to batch_data.
571   void AddRetriableSendMessageOp(grpc_call_element* elem,
572                                  SubchannelCallRetryState* retry_state,
573                                  SubchannelCallBatchData* batch_data);
574   // Adds retriable send_trailing_metadata op to batch_data.
575   void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
576                                           SubchannelCallBatchData* batch_data);
577   // Adds retriable recv_initial_metadata op to batch_data.
578   void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
579                                          SubchannelCallBatchData* batch_data);
580   // Adds retriable recv_message op to batch_data.
581   void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
582                                  SubchannelCallBatchData* batch_data);
583   // Adds retriable recv_trailing_metadata op to batch_data.
584   void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
585                                           SubchannelCallBatchData* batch_data);
586   // Helper function used to start a recv_trailing_metadata batch.  This
587   // is used in the case where a recv_initial_metadata or recv_message
588   // op fails in a way that we know the call is over but when the application
589   // has not yet started its own recv_trailing_metadata op.
590   void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
591   // If there are any cached send ops that need to be replayed on the
592   // current subchannel call, creates and returns a new subchannel batch
593   // to replay those ops.  Otherwise, returns nullptr.
594   SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
595       grpc_call_element* elem, SubchannelCallRetryState* retry_state);
596   // Adds subchannel batches for pending batches to closures.
597   void AddSubchannelBatchesForPendingBatches(
598       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
599       CallCombinerClosureList* closures);
600   // Constructs and starts whatever subchannel batches are needed on the
601   // subchannel call.
602   static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
603
604   void CreateSubchannelCall(grpc_call_element* elem);
605   // Invoked when a pick is completed, on both success or failure.
606   static void PickDone(void* arg, grpc_error* error);
607   // Removes the call from the channel's list of queued picks.
608   void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
609   // Adds the call to the channel's list of queued picks.
610   void AddCallToQueuedPicksLocked(grpc_call_element* elem);
611   // Applies service config to the call.  Must be invoked once we know
612   // that the resolver has returned results to the channel.
613   void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
614
615   // State for handling deadlines.
616   // The code in deadline_filter.c requires this to be the first field.
617   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
618   // and this struct both independently store pointers to the call stack
619   // and call combiner.  If/when we have time, find a way to avoid this
620   // without breaking the grpc_deadline_state abstraction.
621   grpc_deadline_state deadline_state_;
622
623   grpc_slice path_;  // Request path.
624   gpr_timespec call_start_time_;
625   grpc_millis deadline_;
626   Arena* arena_;
627   grpc_call_stack* owning_call_;
628   CallCombiner* call_combiner_;
629   grpc_call_context_element* call_context_;
630
631   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
632   ServiceConfig::CallData service_config_call_data_;
633   const ClientChannelMethodParsedObject* method_params_ = nullptr;
634
635   RefCountedPtr<SubchannelCall> subchannel_call_;
636
637   // Set when we get a cancel_stream op.
638   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
639
640   ChannelData::QueuedPick pick_;
641   bool pick_queued_ = false;
642   bool service_config_applied_ = false;
643   QueuedPickCanceller* pick_canceller_ = nullptr;
644   grpc_closure pick_closure_;
645
646   grpc_polling_entity* pollent_ = nullptr;
647
648   // Batches are added to this list when received from above.
649   // They are removed when we are done handling the batch (i.e., when
650   // either we have invoked all of the batch's callbacks or we have
651   // passed the batch down to the subchannel call and are not
652   // intercepting any of its callbacks).
653   PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
654   bool pending_send_initial_metadata_ : 1;
655   bool pending_send_message_ : 1;
656   bool pending_send_trailing_metadata_ : 1;
657
658   // Retry state.
659   bool enable_retries_ : 1;
660   bool retry_committed_ : 1;
661   bool last_attempt_got_server_pushback_ : 1;
662   int num_attempts_completed_ = 0;
663   size_t bytes_buffered_for_retry_ = 0;
664   // TODO(roth): Restructure this to eliminate use of ManualConstructor.
665   ManualConstructor<BackOff> retry_backoff_;
666   grpc_timer retry_timer_;
667
668   // The number of pending retriable subchannel batches containing send ops.
669   // We hold a ref to the call stack while this is non-zero, since replay
670   // batches may not complete until after all callbacks have been returned
671   // to the surface, and we need to make sure that the call is not destroyed
672   // until all of these batches have completed.
673   // Note that we actually only need to track replay batches, but it's
674   // easier to track all batches with send ops.
675   int num_pending_retriable_subchannel_send_batches_ = 0;
676
677   // Cached data for retrying send ops.
678   // send_initial_metadata
679   bool seen_send_initial_metadata_ = false;
680   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
681   grpc_metadata_batch send_initial_metadata_;
682   uint32_t send_initial_metadata_flags_;
683   gpr_atm* peer_string_;
684   // send_message
685   // When we get a send_message op, we replace the original byte stream
686   // with a CachingByteStream that caches the slices to a local buffer for
687   // use in retries.
688   // Note: We inline the cache for the first 3 send_message ops and use
689   // dynamic allocation after that.  This number was essentially picked
690   // at random; it could be changed in the future to tune performance.
691   InlinedVector<ByteStreamCache*, 3> send_messages_;
692   // send_trailing_metadata
693   bool seen_send_trailing_metadata_ = false;
694   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
695   grpc_metadata_batch send_trailing_metadata_;
696 };
697
698 //
699 // ChannelData::ConnectivityStateAndPickerSetter
700 //
701
702 // A fire-and-forget class that sets the channel's connectivity state
703 // and then hops into the data plane combiner to update the picker.
704 // Must be instantiated while holding the control plane combiner.
705 // Deletes itself when done.
706 class ChannelData::ConnectivityStateAndPickerSetter {
707  public:
708   ConnectivityStateAndPickerSetter(
709       ChannelData* chand, grpc_connectivity_state state, const char* reason,
710       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
711       : chand_(chand), picker_(std::move(picker)) {
712     // Update connectivity state here, while holding control plane combiner.
713     grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
714     if (chand->channelz_node_ != nullptr) {
715       chand->channelz_node_->AddTraceEvent(
716           channelz::ChannelTrace::Severity::Info,
717           grpc_slice_from_static_string(
718               GetChannelConnectivityStateChangeString(state)));
719     }
720     // Bounce into the data plane combiner to reset the picker.
721     GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
722                            "ConnectivityStateAndPickerSetter");
723     GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
724                       grpc_combiner_scheduler(chand->data_plane_combiner_));
725     GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
726   }
727
728  private:
729   static const char* GetChannelConnectivityStateChangeString(
730       grpc_connectivity_state state) {
731     switch (state) {
732       case GRPC_CHANNEL_IDLE:
733         return "Channel state change to IDLE";
734       case GRPC_CHANNEL_CONNECTING:
735         return "Channel state change to CONNECTING";
736       case GRPC_CHANNEL_READY:
737         return "Channel state change to READY";
738       case GRPC_CHANNEL_TRANSIENT_FAILURE:
739         return "Channel state change to TRANSIENT_FAILURE";
740       case GRPC_CHANNEL_SHUTDOWN:
741         return "Channel state change to SHUTDOWN";
742     }
743     GPR_UNREACHABLE_CODE(return "UNKNOWN");
744   }
745
746   static void SetPicker(void* arg, grpc_error* ignored) {
747     auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
748     // Update picker.
749     self->chand_->picker_ = std::move(self->picker_);
750     // Re-process queued picks.
751     for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
752          pick = pick->next) {
753       CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
754     }
755     // Clean up.
756     GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
757                              "ConnectivityStateAndPickerSetter");
758     Delete(self);
759   }
760
761   ChannelData* chand_;
762   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
763   grpc_closure closure_;
764 };
765
766 //
767 // ChannelData::ServiceConfigSetter
768 //
769
770 // A fire-and-forget class that sets the channel's service config data
771 // in the data plane combiner.  Deletes itself when done.
772 class ChannelData::ServiceConfigSetter {
773  public:
774   ServiceConfigSetter(
775       ChannelData* chand,
776       Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
777           retry_throttle_data,
778       RefCountedPtr<ServiceConfig> service_config)
779       : chand_(chand),
780         retry_throttle_data_(retry_throttle_data),
781         service_config_(std::move(service_config)) {
782     GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
783     GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
784                       grpc_combiner_scheduler(chand->data_plane_combiner_));
785     GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
786   }
787
788  private:
789   static void SetServiceConfigData(void* arg, grpc_error* ignored) {
790     ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
791     ChannelData* chand = self->chand_;
792     // Update channel state.
793     chand->received_service_config_data_ = true;
794     if (self->retry_throttle_data_.has_value()) {
795       chand->retry_throttle_data_ =
796           internal::ServerRetryThrottleMap::GetDataForServer(
797               chand->server_name_.get(),
798               self->retry_throttle_data_.value().max_milli_tokens,
799               self->retry_throttle_data_.value().milli_token_ratio);
800     }
801     chand->service_config_ = std::move(self->service_config_);
802     // Apply service config to queued picks.
803     for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
804          pick = pick->next) {
805       CallData* calld = static_cast<CallData*>(pick->elem->call_data);
806       calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
807     }
808     // Clean up.
809     GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
810                              "ServiceConfigSetter");
811     Delete(self);
812   }
813
814   ChannelData* chand_;
815   Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
816       retry_throttle_data_;
817   RefCountedPtr<ServiceConfig> service_config_;
818   grpc_closure closure_;
819 };
820
821 //
822 // ChannelData::ExternalConnectivityWatcher::WatcherList
823 //
824
825 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
826   MutexLock lock(&mu_);
827   int count = 0;
828   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
829     ++count;
830   }
831   return count;
832 }
833
834 ChannelData::ExternalConnectivityWatcher*
835 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
836     grpc_closure* on_complete) const {
837   MutexLock lock(&mu_);
838   ExternalConnectivityWatcher* w = head_;
839   while (w != nullptr && w->on_complete_ != on_complete) {
840     w = w->next_;
841   }
842   return w;
843 }
844
845 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
846     ExternalConnectivityWatcher* watcher) {
847   GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
848   MutexLock lock(&mu_);
849   GPR_ASSERT(watcher->next_ == nullptr);
850   watcher->next_ = head_;
851   head_ = watcher;
852 }
853
854 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
855     const ExternalConnectivityWatcher* watcher) {
856   MutexLock lock(&mu_);
857   if (watcher == head_) {
858     head_ = watcher->next_;
859     return;
860   }
861   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
862     if (w->next_ == watcher) {
863       w->next_ = w->next_->next_;
864       return;
865     }
866   }
867   GPR_UNREACHABLE_CODE(return );
868 }
869
870 //
871 // ChannelData::ExternalConnectivityWatcher
872 //
873
874 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
875     ChannelData* chand, grpc_polling_entity pollent,
876     grpc_connectivity_state* state, grpc_closure* on_complete,
877     grpc_closure* watcher_timer_init)
878     : chand_(chand),
879       pollent_(pollent),
880       state_(state),
881       on_complete_(on_complete),
882       watcher_timer_init_(watcher_timer_init) {
883   grpc_polling_entity_add_to_pollset_set(&pollent_,
884                                          chand_->interested_parties_);
885   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
886   GRPC_CLOSURE_SCHED(
887       GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
888                         grpc_combiner_scheduler(chand_->combiner_)),
889       GRPC_ERROR_NONE);
890 }
891
892 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
893   grpc_polling_entity_del_from_pollset_set(&pollent_,
894                                            chand_->interested_parties_);
895   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
896                            "ExternalConnectivityWatcher");
897 }
898
899 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
900     void* arg, grpc_error* error) {
901   ExternalConnectivityWatcher* self =
902       static_cast<ExternalConnectivityWatcher*>(arg);
903   grpc_closure* on_complete = self->on_complete_;
904   self->chand_->external_connectivity_watcher_list_.Remove(self);
905   Delete(self);
906   GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
907 }
908
909 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
910     void* arg, grpc_error* ignored) {
911   ExternalConnectivityWatcher* self =
912       static_cast<ExternalConnectivityWatcher*>(arg);
913   if (self->state_ == nullptr) {
914     // Handle cancellation.
915     GPR_ASSERT(self->watcher_timer_init_ == nullptr);
916     ExternalConnectivityWatcher* found =
917         self->chand_->external_connectivity_watcher_list_.Lookup(
918             self->on_complete_);
919     if (found != nullptr) {
920       grpc_connectivity_state_notify_on_state_change(
921           &found->chand_->state_tracker_, nullptr, &found->my_closure_);
922     }
923     Delete(self);
924     return;
925   }
926   // New watcher.
927   self->chand_->external_connectivity_watcher_list_.Add(self);
928   // This assumes that the closure is scheduled on the ExecCtx scheduler
929   // and that GRPC_CLOSURE_RUN would run the closure immediately.
930   GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
931   GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
932                     grpc_combiner_scheduler(self->chand_->combiner_));
933   grpc_connectivity_state_notify_on_state_change(
934       &self->chand_->state_tracker_, self->state_, &self->my_closure_);
935 }
936
937 //
938 // ChannelData::ClientChannelControlHelper
939 //
940
941 class ChannelData::ClientChannelControlHelper
942     : public LoadBalancingPolicy::ChannelControlHelper {
943  public:
944   explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
945     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
946   }
947
948   ~ClientChannelControlHelper() override {
949     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
950                              "ClientChannelControlHelper");
951   }
952
953   Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
954     grpc_arg args_to_add[2];
955     int num_args_to_add = 0;
956     if (chand_->health_check_service_name_ != nullptr) {
957       args_to_add[0] = grpc_channel_arg_string_create(
958           const_cast<char*>("grpc.temp.health_check"),
959           const_cast<char*>(chand_->health_check_service_name_.get()));
960       num_args_to_add++;
961     }
962     args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg(
963         chand_->subchannel_pool_.get());
964     grpc_channel_args* new_args =
965         grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add);
966     Subchannel* subchannel =
967         chand_->client_channel_factory_->CreateSubchannel(new_args);
968     grpc_channel_args_destroy(new_args);
969     return subchannel;
970   }
971
972   grpc_channel* CreateChannel(const char* target,
973                               const grpc_channel_args& args) override {
974     return chand_->client_channel_factory_->CreateChannel(target, &args);
975   }
976
977   void UpdateState(
978       grpc_connectivity_state state,
979       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
980     grpc_error* disconnect_error =
981         chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
982     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
983       const char* extra = disconnect_error == GRPC_ERROR_NONE
984                               ? ""
985                               : " (ignoring -- channel shutting down)";
986       gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
987               grpc_connectivity_state_name(state), picker.get(), extra);
988     }
989     // Do update only if not shutting down.
990     if (disconnect_error == GRPC_ERROR_NONE) {
991       // Will delete itself.
992       New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
993                                             std::move(picker));
994     }
995   }
996
997   // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
998   void RequestReresolution() override {}
999
1000  private:
1001   ChannelData* chand_;
1002 };
1003
1004 //
1005 // ChannelData implementation
1006 //
1007
1008 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1009                               grpc_channel_element_args* args) {
1010   GPR_ASSERT(args->is_last);
1011   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1012   grpc_error* error = GRPC_ERROR_NONE;
1013   new (elem->channel_data) ChannelData(args, &error);
1014   return error;
1015 }
1016
1017 void ChannelData::Destroy(grpc_channel_element* elem) {
1018   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1019   chand->~ChannelData();
1020 }
1021
1022 bool GetEnableRetries(const grpc_channel_args* args) {
1023   return grpc_channel_arg_get_bool(
1024       grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1025 }
1026
1027 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1028   return static_cast<size_t>(grpc_channel_arg_get_integer(
1029       grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1030       {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1031 }
1032
1033 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1034     const grpc_channel_args* args) {
1035   const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1036       grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1037   if (use_local_subchannel_pool) {
1038     return MakeRefCounted<LocalSubchannelPool>();
1039   }
1040   return GlobalSubchannelPool::instance();
1041 }
1042
1043 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1044     : deadline_checking_enabled_(
1045           grpc_deadline_checking_enabled(args->channel_args)),
1046       enable_retries_(GetEnableRetries(args->channel_args)),
1047       per_rpc_retry_buffer_size_(
1048           GetMaxPerRpcRetryBufferSize(args->channel_args)),
1049       owning_stack_(args->channel_stack),
1050       client_channel_factory_(
1051           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1052       data_plane_combiner_(grpc_combiner_create()),
1053       combiner_(grpc_combiner_create()),
1054       interested_parties_(grpc_pollset_set_create()),
1055       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1056       disconnect_error_(GRPC_ERROR_NONE) {
1057   // Initialize data members.
1058   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1059                                "client_channel");
1060   gpr_mu_init(&info_mu_);
1061   // Start backup polling.
1062   grpc_client_channel_start_backup_polling(interested_parties_);
1063   // Check client channel factory.
1064   if (client_channel_factory_ == nullptr) {
1065     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1066         "Missing client channel factory in args for client channel filter");
1067     return;
1068   }
1069   // Get server name to resolve, using proxy mapper if needed.
1070   const char* server_uri = grpc_channel_arg_get_string(
1071       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1072   if (server_uri == nullptr) {
1073     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1074         "server URI channel arg missing or wrong type in client channel "
1075         "filter");
1076     return;
1077   }
1078   // Get default service config
1079   const char* service_config_json = grpc_channel_arg_get_string(
1080       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1081   // TODO(yashkt): Make sure we set the channel in TRANSIENT_FAILURE on an
1082   // invalid default service config
1083   if (service_config_json != nullptr) {
1084     *error = GRPC_ERROR_NONE;
1085     default_service_config_ = ServiceConfig::Create(service_config_json, error);
1086     if (*error != GRPC_ERROR_NONE) {
1087       default_service_config_.reset();
1088       return;
1089     }
1090   }
1091   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1092   if (uri != nullptr && uri->path[0] != '\0') {
1093     server_name_.reset(
1094         gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1095   }
1096   grpc_uri_destroy(uri);
1097   char* proxy_name = nullptr;
1098   grpc_channel_args* new_args = nullptr;
1099   grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1100                               &new_args);
1101   UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
1102                                                    : gpr_strdup(server_uri));
1103   // Instantiate resolving LB policy.
1104   LoadBalancingPolicy::Args lb_args;
1105   lb_args.combiner = combiner_;
1106   lb_args.channel_control_helper =
1107       UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1108           New<ClientChannelControlHelper>(this));
1109   lb_args.args = new_args != nullptr ? new_args : args->channel_args;
1110   resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1111       std::move(lb_args), &grpc_client_channel_routing_trace,
1112       std::move(target_uri), ProcessResolverResultLocked, this, error));
1113   grpc_channel_args_destroy(new_args);
1114   if (*error != GRPC_ERROR_NONE) {
1115     // Orphan the resolving LB policy and flush the exec_ctx to ensure
1116     // that it finishes shutting down.  This ensures that if we are
1117     // failing, we destroy the ClientChannelControlHelper (and thus
1118     // unref the channel stack) before we return.
1119     // TODO(roth): This is not a complete solution, because it only
1120     // catches the case where channel stack initialization fails in this
1121     // particular filter.  If there is a failure in a different filter, we
1122     // will leave a dangling ref here, which can cause a crash.  Fortunately,
1123     // in practice, there are no other filters that can cause failures in
1124     // channel stack initialization, so this works for now.
1125     resolving_lb_policy_.reset();
1126     ExecCtx::Get()->Flush();
1127   } else {
1128     grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1129                                      interested_parties_);
1130     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1131       gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1132               resolving_lb_policy_.get());
1133     }
1134   }
1135 }
1136
1137 ChannelData::~ChannelData() {
1138   if (resolving_lb_policy_ != nullptr) {
1139     grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1140                                      interested_parties_);
1141     resolving_lb_policy_.reset();
1142   }
1143   // Stop backup polling.
1144   grpc_client_channel_stop_backup_polling(interested_parties_);
1145   grpc_pollset_set_destroy(interested_parties_);
1146   GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
1147   GRPC_COMBINER_UNREF(combiner_, "client_channel");
1148   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1149   grpc_connectivity_state_destroy(&state_tracker_);
1150   gpr_mu_destroy(&info_mu_);
1151 }
1152
1153 void ChannelData::ProcessLbPolicy(
1154     const Resolver::Result& resolver_result,
1155     const internal::ClientChannelGlobalParsedObject* parsed_service_config,
1156     UniquePtr<char>* lb_policy_name,
1157     RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config) {
1158   // Prefer the LB policy name found in the service config.
1159   if (parsed_service_config != nullptr &&
1160       parsed_service_config->parsed_lb_config() != nullptr) {
1161     lb_policy_name->reset(
1162         gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1163     *lb_policy_config = parsed_service_config->parsed_lb_config();
1164     return;
1165   }
1166   const char* local_policy_name = nullptr;
1167   if (parsed_service_config != nullptr &&
1168       parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1169     local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1170   } else {
1171     const grpc_arg* channel_arg =
1172         grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1173     local_policy_name = grpc_channel_arg_get_string(channel_arg);
1174   }
1175   // Special case: If at least one balancer address is present, we use
1176   // the grpclb policy, regardless of what the resolver has returned.
1177   bool found_balancer_address = false;
1178   for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1179     const ServerAddress& address = resolver_result.addresses[i];
1180     if (address.IsBalancer()) {
1181       found_balancer_address = true;
1182       break;
1183     }
1184   }
1185   if (found_balancer_address) {
1186     if (local_policy_name != nullptr &&
1187         strcmp(local_policy_name, "grpclb") != 0) {
1188       gpr_log(GPR_INFO,
1189               "resolver requested LB policy %s but provided at least one "
1190               "balancer address -- forcing use of grpclb LB policy",
1191               local_policy_name);
1192     }
1193     local_policy_name = "grpclb";
1194   }
1195   // Use pick_first if nothing was specified and we didn't select grpclb
1196   // above.
1197   lb_policy_name->reset(gpr_strdup(
1198       local_policy_name == nullptr ? "pick_first" : local_policy_name));
1199 }
1200
1201 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1202 // resolver result update.
1203 bool ChannelData::ProcessResolverResultLocked(
1204     void* arg, const Resolver::Result& result, const char** lb_policy_name,
1205     RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
1206     grpc_error** service_config_error) {
1207   ChannelData* chand = static_cast<ChannelData*>(arg);
1208   RefCountedPtr<ServiceConfig> service_config;
1209   // If resolver did not return a service config or returned an invalid service
1210   // config, we need a fallback service config.
1211   if (result.service_config_error != GRPC_ERROR_NONE) {
1212     // If the service config was invalid, then fallback to the saved service
1213     // config. If there is no saved config either, use the default service
1214     // config.
1215     if (chand->saved_service_config_ != nullptr) {
1216       service_config = chand->saved_service_config_;
1217       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1218         gpr_log(GPR_INFO,
1219                 "chand=%p: resolver returned invalid service config. "
1220                 "Continuing to use previous service config.",
1221                 chand);
1222       }
1223     } else if (chand->default_service_config_ != nullptr) {
1224       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1225         gpr_log(GPR_INFO,
1226                 "chand=%p: resolver returned invalid service config. Using "
1227                 "default service config provided by client API.",
1228                 chand);
1229       }
1230       service_config = chand->default_service_config_;
1231     }
1232   } else if (result.service_config == nullptr) {
1233     if (chand->default_service_config_ != nullptr) {
1234       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1235         gpr_log(GPR_INFO,
1236                 "chand=%p: resolver returned no service config. Using default "
1237                 "service config provided by client API.",
1238                 chand);
1239       }
1240       service_config = chand->default_service_config_;
1241     }
1242   } else {
1243     service_config = result.service_config;
1244   }
1245   *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1246   if (service_config == nullptr &&
1247       result.service_config_error != GRPC_ERROR_NONE) {
1248     return false;
1249   }
1250   UniquePtr<char> service_config_json;
1251   // Process service config.
1252   const internal::ClientChannelGlobalParsedObject* parsed_service_config =
1253       nullptr;
1254   if (service_config != nullptr) {
1255     parsed_service_config =
1256         static_cast<const internal::ClientChannelGlobalParsedObject*>(
1257             service_config->GetParsedGlobalServiceConfigObject(
1258                 internal::ClientChannelServiceConfigParser::ParserIndex()));
1259   }
1260   const bool service_config_changed =
1261       ((service_config == nullptr) !=
1262        (chand->saved_service_config_ == nullptr)) ||
1263       (service_config != nullptr &&
1264        strcmp(service_config->service_config_json(),
1265               chand->saved_service_config_->service_config_json()) != 0);
1266   if (service_config_changed) {
1267     service_config_json.reset(gpr_strdup(
1268         service_config != nullptr ? service_config->service_config_json()
1269                                   : ""));
1270     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1271       gpr_log(GPR_INFO,
1272               "chand=%p: resolver returned updated service config: \"%s\"",
1273               chand, service_config_json.get());
1274     }
1275     chand->saved_service_config_ = std::move(service_config);
1276     if (parsed_service_config != nullptr) {
1277       chand->health_check_service_name_.reset(
1278           gpr_strdup(parsed_service_config->health_check_service_name()));
1279     } else {
1280       chand->health_check_service_name_.reset();
1281     }
1282   }
1283   // We want to set the service config at least once. This should not really be
1284   // needed, but we are doing it as a defensive approach. This can be removed,
1285   // if we feel it is unnecessary.
1286   if (service_config_changed || !chand->received_first_resolver_result_) {
1287     chand->received_first_resolver_result_ = true;
1288     Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
1289         retry_throttle_data;
1290     if (parsed_service_config != nullptr) {
1291       retry_throttle_data = parsed_service_config->retry_throttling();
1292     }
1293     // Create service config setter to update channel state in the data
1294     // plane combiner.  Destroys itself when done.
1295     New<ServiceConfigSetter>(chand, retry_throttle_data,
1296                              chand->saved_service_config_);
1297   }
1298   UniquePtr<char> processed_lb_policy_name;
1299   chand->ProcessLbPolicy(result, parsed_service_config,
1300                          &processed_lb_policy_name, lb_policy_config);
1301   // Swap out the data used by GetChannelInfo().
1302   {
1303     MutexLock lock(&chand->info_mu_);
1304     chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1305     if (service_config_json != nullptr) {
1306       chand->info_service_config_json_ = std::move(service_config_json);
1307     }
1308   }
1309   // Return results.
1310   *lb_policy_name = chand->info_lb_policy_name_.get();
1311   return service_config_changed;
1312 }
1313
1314 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1315   if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1316     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1317   }
1318   LoadBalancingPolicy::PickArgs pick;
1319   grpc_error* error = GRPC_ERROR_NONE;
1320   picker_->Pick(&pick, &error);
1321   if (pick.connected_subchannel != nullptr) {
1322     pick.connected_subchannel->Ping(op->send_ping.on_initiate,
1323                                     op->send_ping.on_ack);
1324   } else {
1325     if (error == GRPC_ERROR_NONE) {
1326       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1327           "LB policy dropped call on ping");
1328     }
1329   }
1330   return error;
1331 }
1332
1333 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1334   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1335   grpc_channel_element* elem =
1336       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1337   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1338   // Connectivity watch.
1339   if (op->on_connectivity_state_change != nullptr) {
1340     grpc_connectivity_state_notify_on_state_change(
1341         &chand->state_tracker_, op->connectivity_state,
1342         op->on_connectivity_state_change);
1343     op->on_connectivity_state_change = nullptr;
1344     op->connectivity_state = nullptr;
1345   }
1346   // Ping.
1347   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1348     grpc_error* error = chand->DoPingLocked(op);
1349     if (error != GRPC_ERROR_NONE) {
1350       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1351       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1352     }
1353     op->bind_pollset = nullptr;
1354     op->send_ping.on_initiate = nullptr;
1355     op->send_ping.on_ack = nullptr;
1356   }
1357   // Reset backoff.
1358   if (op->reset_connect_backoff) {
1359     if (chand->resolving_lb_policy_ != nullptr) {
1360       chand->resolving_lb_policy_->ResetBackoffLocked();
1361     }
1362   }
1363   // Disconnect.
1364   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1365     grpc_error* error = GRPC_ERROR_NONE;
1366     GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
1367         &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
1368         MemoryOrder::ACQUIRE));
1369     grpc_pollset_set_del_pollset_set(
1370         chand->resolving_lb_policy_->interested_parties(),
1371         chand->interested_parties_);
1372     chand->resolving_lb_policy_.reset();
1373     // Will delete itself.
1374     New<ConnectivityStateAndPickerSetter>(
1375         chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1376         UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1377             New<LoadBalancingPolicy::TransientFailurePicker>(
1378                 GRPC_ERROR_REF(op->disconnect_with_error))));
1379   }
1380   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1381   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1382 }
1383
1384 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1385                                    grpc_transport_op* op) {
1386   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1387   GPR_ASSERT(op->set_accept_stream == false);
1388   // Handle bind_pollset.
1389   if (op->bind_pollset != nullptr) {
1390     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1391   }
1392   // Pop into control plane combiner for remaining ops.
1393   op->handler_private.extra_arg = elem;
1394   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1395   GRPC_CLOSURE_SCHED(
1396       GRPC_CLOSURE_INIT(&op->handler_private.closure,
1397                         ChannelData::StartTransportOpLocked, op,
1398                         grpc_combiner_scheduler(chand->combiner_)),
1399       GRPC_ERROR_NONE);
1400 }
1401
1402 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1403                                  const grpc_channel_info* info) {
1404   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1405   MutexLock lock(&chand->info_mu_);
1406   if (info->lb_policy_name != nullptr) {
1407     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1408   }
1409   if (info->service_config_json != nullptr) {
1410     *info->service_config_json =
1411         gpr_strdup(chand->info_service_config_json_.get());
1412   }
1413 }
1414
1415 void ChannelData::AddQueuedPick(QueuedPick* pick,
1416                                 grpc_polling_entity* pollent) {
1417   // Add call to queued picks list.
1418   pick->next = queued_picks_;
1419   queued_picks_ = pick;
1420   // Add call's pollent to channel's interested_parties, so that I/O
1421   // can be done under the call's CQ.
1422   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1423 }
1424
1425 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1426                                    grpc_polling_entity* pollent) {
1427   // Remove call's pollent from channel's interested_parties.
1428   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1429   // Remove from queued picks list.
1430   for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1431        pick = &(*pick)->next) {
1432     if (*pick == to_remove) {
1433       *pick = to_remove->next;
1434       return;
1435     }
1436   }
1437 }
1438
1439 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1440   auto* chand = static_cast<ChannelData*>(arg);
1441   if (chand->resolving_lb_policy_ != nullptr) {
1442     chand->resolving_lb_policy_->ExitIdleLocked();
1443   }
1444   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1445 }
1446
1447 grpc_connectivity_state ChannelData::CheckConnectivityState(
1448     bool try_to_connect) {
1449   grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1450   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1451     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1452     GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1453                                            grpc_combiner_scheduler(combiner_)),
1454                        GRPC_ERROR_NONE);
1455   }
1456   return out;
1457 }
1458
1459 //
1460 // CallData implementation
1461 //
1462
1463 // Retry support:
1464 //
1465 // In order to support retries, we act as a proxy for stream op batches.
1466 // When we get a batch from the surface, we add it to our list of pending
1467 // batches, and we then use those batches to construct separate "child"
1468 // batches to be started on the subchannel call.  When the child batches
1469 // return, we then decide which pending batches have been completed and
1470 // schedule their callbacks accordingly.  If a subchannel call fails and
1471 // we want to retry it, we do a new pick and start again, constructing
1472 // new "child" batches for the new subchannel call.
1473 //
1474 // Note that retries are committed when receiving data from the server
1475 // (except for Trailers-Only responses).  However, there may be many
1476 // send ops started before receiving any data, so we may have already
1477 // completed some number of send ops (and returned the completions up to
1478 // the surface) by the time we realize that we need to retry.  To deal
1479 // with this, we cache data for send ops, so that we can replay them on a
1480 // different subchannel call even after we have completed the original
1481 // batches.
1482 //
1483 // There are two sets of data to maintain:
1484 // - In call_data (in the parent channel), we maintain a list of pending
1485 //   ops and cached data for send ops.
1486 // - In the subchannel call, we maintain state to indicate what ops have
1487 //   already been sent down to that call.
1488 //
1489 // When constructing the "child" batches, we compare those two sets of
1490 // data to see which batches need to be sent to the subchannel call.
1491
1492 // TODO(roth): In subsequent PRs:
1493 // - add support for transparent retries (including initial metadata)
1494 // - figure out how to record stats in census for retries
1495 //   (census filter is on top of this one)
1496 // - add census stats for retries
1497
1498 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1499                    const grpc_call_element_args& args)
1500     : deadline_state_(elem, args.call_stack, args.call_combiner,
1501                       GPR_LIKELY(chand.deadline_checking_enabled())
1502                           ? args.deadline
1503                           : GRPC_MILLIS_INF_FUTURE),
1504       path_(grpc_slice_ref_internal(args.path)),
1505       call_start_time_(args.start_time),
1506       deadline_(args.deadline),
1507       arena_(args.arena),
1508       owning_call_(args.call_stack),
1509       call_combiner_(args.call_combiner),
1510       call_context_(args.context),
1511       pending_send_initial_metadata_(false),
1512       pending_send_message_(false),
1513       pending_send_trailing_metadata_(false),
1514       enable_retries_(chand.enable_retries()),
1515       retry_committed_(false),
1516       last_attempt_got_server_pushback_(false) {}
1517
1518 CallData::~CallData() {
1519   grpc_slice_unref_internal(path_);
1520   GRPC_ERROR_UNREF(cancel_error_);
1521   // Make sure there are no remaining pending batches.
1522   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1523     GPR_ASSERT(pending_batches_[i].batch == nullptr);
1524   }
1525 }
1526
1527 grpc_error* CallData::Init(grpc_call_element* elem,
1528                            const grpc_call_element_args* args) {
1529   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1530   new (elem->call_data) CallData(elem, *chand, *args);
1531   return GRPC_ERROR_NONE;
1532 }
1533
1534 void CallData::Destroy(grpc_call_element* elem,
1535                        const grpc_call_final_info* final_info,
1536                        grpc_closure* then_schedule_closure) {
1537   CallData* calld = static_cast<CallData*>(elem->call_data);
1538   if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1539     calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1540     then_schedule_closure = nullptr;
1541   }
1542   calld->~CallData();
1543   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1544 }
1545
1546 void CallData::StartTransportStreamOpBatch(
1547     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1548   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1549   CallData* calld = static_cast<CallData*>(elem->call_data);
1550   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1551   if (GPR_LIKELY(chand->deadline_checking_enabled())) {
1552     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1553   }
1554   // If we've previously been cancelled, immediately fail any new batches.
1555   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1556     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1557       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1558               chand, calld, grpc_error_string(calld->cancel_error_));
1559     }
1560     // Note: This will release the call combiner.
1561     grpc_transport_stream_op_batch_finish_with_failure(
1562         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1563     return;
1564   }
1565   // Handle cancellation.
1566   if (GPR_UNLIKELY(batch->cancel_stream)) {
1567     // Stash a copy of cancel_error in our call data, so that we can use
1568     // it for subsequent operations.  This ensures that if the call is
1569     // cancelled before any batches are passed down (e.g., if the deadline
1570     // is in the past when the call starts), we can return the right
1571     // error to the caller when the first batch does get passed down.
1572     GRPC_ERROR_UNREF(calld->cancel_error_);
1573     calld->cancel_error_ =
1574         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1575     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1576       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1577               calld, grpc_error_string(calld->cancel_error_));
1578     }
1579     // If we do not have a subchannel call (i.e., a pick has not yet
1580     // been started), fail all pending batches.  Otherwise, send the
1581     // cancellation down to the subchannel call.
1582     if (calld->subchannel_call_ == nullptr) {
1583       // TODO(roth): If there is a pending retry callback, do we need to
1584       // cancel it here?
1585       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1586                                 NoYieldCallCombiner);
1587       // Note: This will release the call combiner.
1588       grpc_transport_stream_op_batch_finish_with_failure(
1589           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1590     } else {
1591       // Note: This will release the call combiner.
1592       calld->subchannel_call_->StartTransportStreamOpBatch(batch);
1593     }
1594     return;
1595   }
1596   // Add the batch to the pending list.
1597   calld->PendingBatchesAdd(elem, batch);
1598   // Check if we've already gotten a subchannel call.
1599   // Note that once we have completed the pick, we do not need to enter
1600   // the channel combiner, which is more efficient (especially for
1601   // streaming calls).
1602   if (calld->subchannel_call_ != nullptr) {
1603     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1604       gpr_log(GPR_INFO,
1605               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
1606               calld, calld->subchannel_call_.get());
1607     }
1608     calld->PendingBatchesResume(elem);
1609     return;
1610   }
1611   // We do not yet have a subchannel call.
1612   // For batches containing a send_initial_metadata op, enter the channel
1613   // combiner to start a pick.
1614   if (GPR_LIKELY(batch->send_initial_metadata)) {
1615     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1616       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
1617               chand, calld);
1618     }
1619     GRPC_CLOSURE_SCHED(
1620         GRPC_CLOSURE_INIT(
1621             &batch->handler_private.closure, StartPickLocked, elem,
1622             grpc_combiner_scheduler(chand->data_plane_combiner())),
1623         GRPC_ERROR_NONE);
1624   } else {
1625     // For all other batches, release the call combiner.
1626     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1627       gpr_log(GPR_INFO,
1628               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1629               calld);
1630     }
1631     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1632                             "batch does not include send_initial_metadata");
1633   }
1634 }
1635
1636 void CallData::SetPollent(grpc_call_element* elem,
1637                           grpc_polling_entity* pollent) {
1638   CallData* calld = static_cast<CallData*>(elem->call_data);
1639   calld->pollent_ = pollent;
1640 }
1641
1642 //
1643 // send op data caching
1644 //
1645
1646 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1647   if (pending->send_ops_cached) return;
1648   pending->send_ops_cached = true;
1649   grpc_transport_stream_op_batch* batch = pending->batch;
1650   // Save a copy of metadata for send_initial_metadata ops.
1651   if (batch->send_initial_metadata) {
1652     seen_send_initial_metadata_ = true;
1653     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1654     grpc_metadata_batch* send_initial_metadata =
1655         batch->payload->send_initial_metadata.send_initial_metadata;
1656     send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1657         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1658     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1659                              send_initial_metadata_storage_);
1660     send_initial_metadata_flags_ =
1661         batch->payload->send_initial_metadata.send_initial_metadata_flags;
1662     peer_string_ = batch->payload->send_initial_metadata.peer_string;
1663   }
1664   // Set up cache for send_message ops.
1665   if (batch->send_message) {
1666     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1667         std::move(batch->payload->send_message.send_message));
1668     send_messages_.push_back(cache);
1669   }
1670   // Save metadata batch for send_trailing_metadata ops.
1671   if (batch->send_trailing_metadata) {
1672     seen_send_trailing_metadata_ = true;
1673     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1674     grpc_metadata_batch* send_trailing_metadata =
1675         batch->payload->send_trailing_metadata.send_trailing_metadata;
1676     send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1677         sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1678     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1679                              send_trailing_metadata_storage_);
1680   }
1681 }
1682
1683 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
1684   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1685     gpr_log(GPR_INFO,
1686             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1687             this);
1688   }
1689   grpc_metadata_batch_destroy(&send_initial_metadata_);
1690 }
1691
1692 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
1693   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1694     gpr_log(GPR_INFO,
1695             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1696             chand, this, idx);
1697   }
1698   send_messages_[idx]->Destroy();
1699 }
1700
1701 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
1702   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1703     gpr_log(GPR_INFO,
1704             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1705             chand, this);
1706   }
1707   grpc_metadata_batch_destroy(&send_trailing_metadata_);
1708 }
1709
1710 void CallData::FreeCachedSendOpDataAfterCommit(
1711     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
1712   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1713   if (retry_state->completed_send_initial_metadata) {
1714     FreeCachedSendInitialMetadata(chand);
1715   }
1716   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1717     FreeCachedSendMessage(chand, i);
1718   }
1719   if (retry_state->completed_send_trailing_metadata) {
1720     FreeCachedSendTrailingMetadata(chand);
1721   }
1722 }
1723
1724 void CallData::FreeCachedSendOpDataForCompletedBatch(
1725     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
1726     SubchannelCallRetryState* retry_state) {
1727   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1728   if (batch_data->batch.send_initial_metadata) {
1729     FreeCachedSendInitialMetadata(chand);
1730   }
1731   if (batch_data->batch.send_message) {
1732     FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
1733   }
1734   if (batch_data->batch.send_trailing_metadata) {
1735     FreeCachedSendTrailingMetadata(chand);
1736   }
1737 }
1738
1739 //
1740 // LB recv_trailing_metadata_ready handling
1741 //
1742
1743 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
1744     const LoadBalancingPolicy::PickArgs& pick,
1745     grpc_transport_stream_op_batch* batch) {
1746   if (pick.recv_trailing_metadata_ready != nullptr) {
1747     *pick.original_recv_trailing_metadata_ready =
1748         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1749     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1750         pick.recv_trailing_metadata_ready;
1751     if (pick.recv_trailing_metadata != nullptr) {
1752       *pick.recv_trailing_metadata =
1753           batch->payload->recv_trailing_metadata.recv_trailing_metadata;
1754     }
1755   }
1756 }
1757
1758 //
1759 // pending_batches management
1760 //
1761
1762 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
1763   // Note: It is important the send_initial_metadata be the first entry
1764   // here, since the code in pick_subchannel_locked() assumes it will be.
1765   if (batch->send_initial_metadata) return 0;
1766   if (batch->send_message) return 1;
1767   if (batch->send_trailing_metadata) return 2;
1768   if (batch->recv_initial_metadata) return 3;
1769   if (batch->recv_message) return 4;
1770   if (batch->recv_trailing_metadata) return 5;
1771   GPR_UNREACHABLE_CODE(return (size_t)-1);
1772 }
1773
1774 // This is called via the call combiner, so access to calld is synchronized.
1775 void CallData::PendingBatchesAdd(grpc_call_element* elem,
1776                                  grpc_transport_stream_op_batch* batch) {
1777   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1778   const size_t idx = GetBatchIndex(batch);
1779   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1780     gpr_log(GPR_INFO,
1781             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1782             this, idx);
1783   }
1784   PendingBatch* pending = &pending_batches_[idx];
1785   GPR_ASSERT(pending->batch == nullptr);
1786   pending->batch = batch;
1787   pending->send_ops_cached = false;
1788   if (enable_retries_) {
1789     // Update state in calld about pending batches.
1790     // Also check if the batch takes us over the retry buffer limit.
1791     // Note: We don't check the size of trailing metadata here, because
1792     // gRPC clients do not send trailing metadata.
1793     if (batch->send_initial_metadata) {
1794       pending_send_initial_metadata_ = true;
1795       bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1796           batch->payload->send_initial_metadata.send_initial_metadata);
1797     }
1798     if (batch->send_message) {
1799       pending_send_message_ = true;
1800       bytes_buffered_for_retry_ +=
1801           batch->payload->send_message.send_message->length();
1802     }
1803     if (batch->send_trailing_metadata) {
1804       pending_send_trailing_metadata_ = true;
1805     }
1806     if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
1807                      chand->per_rpc_retry_buffer_size())) {
1808       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1809         gpr_log(GPR_INFO,
1810                 "chand=%p calld=%p: exceeded retry buffer size, committing",
1811                 chand, this);
1812       }
1813       SubchannelCallRetryState* retry_state =
1814           subchannel_call_ == nullptr ? nullptr
1815                                       : static_cast<SubchannelCallRetryState*>(
1816                                             subchannel_call_->GetParentData());
1817       RetryCommit(elem, retry_state);
1818       // If we are not going to retry and have not yet started, pretend
1819       // retries are disabled so that we don't bother with retry overhead.
1820       if (num_attempts_completed_ == 0) {
1821         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1822           gpr_log(GPR_INFO,
1823                   "chand=%p calld=%p: disabling retries before first attempt",
1824                   chand, this);
1825         }
1826         enable_retries_ = false;
1827       }
1828     }
1829   }
1830 }
1831
1832 void CallData::PendingBatchClear(PendingBatch* pending) {
1833   if (enable_retries_) {
1834     if (pending->batch->send_initial_metadata) {
1835       pending_send_initial_metadata_ = false;
1836     }
1837     if (pending->batch->send_message) {
1838       pending_send_message_ = false;
1839     }
1840     if (pending->batch->send_trailing_metadata) {
1841       pending_send_trailing_metadata_ = false;
1842     }
1843   }
1844   pending->batch = nullptr;
1845 }
1846
1847 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
1848                                       PendingBatch* pending) {
1849   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1850   grpc_transport_stream_op_batch* batch = pending->batch;
1851   // We clear the pending batch if all of its callbacks have been
1852   // scheduled and reset to nullptr.
1853   if (batch->on_complete == nullptr &&
1854       (!batch->recv_initial_metadata ||
1855        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1856            nullptr) &&
1857       (!batch->recv_message ||
1858        batch->payload->recv_message.recv_message_ready == nullptr) &&
1859       (!batch->recv_trailing_metadata ||
1860        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1861            nullptr)) {
1862     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1863       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
1864               this);
1865     }
1866     PendingBatchClear(pending);
1867   }
1868 }
1869
1870 // This is called via the call combiner, so access to calld is synchronized.
1871 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
1872   grpc_transport_stream_op_batch* batch =
1873       static_cast<grpc_transport_stream_op_batch*>(arg);
1874   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
1875   // Note: This will release the call combiner.
1876   grpc_transport_stream_op_batch_finish_with_failure(
1877       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
1878 }
1879
1880 // This is called via the call combiner, so access to calld is synchronized.
1881 void CallData::PendingBatchesFail(
1882     grpc_call_element* elem, grpc_error* error,
1883     YieldCallCombinerPredicate yield_call_combiner_predicate) {
1884   GPR_ASSERT(error != GRPC_ERROR_NONE);
1885   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1886     size_t num_batches = 0;
1887     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1888       if (pending_batches_[i].batch != nullptr) ++num_batches;
1889     }
1890     gpr_log(GPR_INFO,
1891             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1892             elem->channel_data, this, num_batches, grpc_error_string(error));
1893   }
1894   CallCombinerClosureList closures;
1895   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1896     PendingBatch* pending = &pending_batches_[i];
1897     grpc_transport_stream_op_batch* batch = pending->batch;
1898     if (batch != nullptr) {
1899       if (batch->recv_trailing_metadata) {
1900         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
1901                                                                    batch);
1902       }
1903       batch->handler_private.extra_arg = this;
1904       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1905                         FailPendingBatchInCallCombiner, batch,
1906                         grpc_schedule_on_exec_ctx);
1907       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1908                    "PendingBatchesFail");
1909       PendingBatchClear(pending);
1910     }
1911   }
1912   if (yield_call_combiner_predicate(closures)) {
1913     closures.RunClosures(call_combiner_);
1914   } else {
1915     closures.RunClosuresWithoutYielding(call_combiner_);
1916   }
1917   GRPC_ERROR_UNREF(error);
1918 }
1919
1920 // This is called via the call combiner, so access to calld is synchronized.
1921 void CallData::ResumePendingBatchInCallCombiner(void* arg,
1922                                                 grpc_error* ignored) {
1923   grpc_transport_stream_op_batch* batch =
1924       static_cast<grpc_transport_stream_op_batch*>(arg);
1925   SubchannelCall* subchannel_call =
1926       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
1927   // Note: This will release the call combiner.
1928   subchannel_call->StartTransportStreamOpBatch(batch);
1929 }
1930
1931 // This is called via the call combiner, so access to calld is synchronized.
1932 void CallData::PendingBatchesResume(grpc_call_element* elem) {
1933   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1934   if (enable_retries_) {
1935     StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
1936     return;
1937   }
1938   // Retries not enabled; send down batches as-is.
1939   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1940     size_t num_batches = 0;
1941     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1942       if (pending_batches_[i].batch != nullptr) ++num_batches;
1943     }
1944     gpr_log(GPR_INFO,
1945             "chand=%p calld=%p: starting %" PRIuPTR
1946             " pending batches on subchannel_call=%p",
1947             chand, this, num_batches, subchannel_call_.get());
1948   }
1949   CallCombinerClosureList closures;
1950   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1951     PendingBatch* pending = &pending_batches_[i];
1952     grpc_transport_stream_op_batch* batch = pending->batch;
1953     if (batch != nullptr) {
1954       if (batch->recv_trailing_metadata) {
1955         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
1956                                                                    batch);
1957       }
1958       batch->handler_private.extra_arg = subchannel_call_.get();
1959       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1960                         ResumePendingBatchInCallCombiner, batch,
1961                         grpc_schedule_on_exec_ctx);
1962       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1963                    "PendingBatchesResume");
1964       PendingBatchClear(pending);
1965     }
1966   }
1967   // Note: This will release the call combiner.
1968   closures.RunClosures(call_combiner_);
1969 }
1970
1971 template <typename Predicate>
1972 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
1973                                                    const char* log_message,
1974                                                    Predicate predicate) {
1975   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1976   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1977     PendingBatch* pending = &pending_batches_[i];
1978     grpc_transport_stream_op_batch* batch = pending->batch;
1979     if (batch != nullptr && predicate(batch)) {
1980       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1981         gpr_log(GPR_INFO,
1982                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1983                 this, log_message, i);
1984       }
1985       return pending;
1986     }
1987   }
1988   return nullptr;
1989 }
1990
1991 //
1992 // retry code
1993 //
1994
1995 void CallData::RetryCommit(grpc_call_element* elem,
1996                            SubchannelCallRetryState* retry_state) {
1997   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1998   if (retry_committed_) return;
1999   retry_committed_ = true;
2000   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2001     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2002   }
2003   if (retry_state != nullptr) {
2004     FreeCachedSendOpDataAfterCommit(elem, retry_state);
2005   }
2006 }
2007
2008 void CallData::DoRetry(grpc_call_element* elem,
2009                        SubchannelCallRetryState* retry_state,
2010                        grpc_millis server_pushback_ms) {
2011   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2012   GPR_ASSERT(method_params_ != nullptr);
2013   const auto* retry_policy = method_params_->retry_policy();
2014   GPR_ASSERT(retry_policy != nullptr);
2015   // Reset subchannel call and connected subchannel.
2016   subchannel_call_.reset();
2017   pick_.pick.connected_subchannel.reset();
2018   // Compute backoff delay.
2019   grpc_millis next_attempt_time;
2020   if (server_pushback_ms >= 0) {
2021     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2022     last_attempt_got_server_pushback_ = true;
2023   } else {
2024     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2025       retry_backoff_.Init(
2026           BackOff::Options()
2027               .set_initial_backoff(retry_policy->initial_backoff)
2028               .set_multiplier(retry_policy->backoff_multiplier)
2029               .set_jitter(RETRY_BACKOFF_JITTER)
2030               .set_max_backoff(retry_policy->max_backoff));
2031       last_attempt_got_server_pushback_ = false;
2032     }
2033     next_attempt_time = retry_backoff_->NextAttemptTime();
2034   }
2035   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2036     gpr_log(GPR_INFO,
2037             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2038             this, next_attempt_time - ExecCtx::Get()->Now());
2039   }
2040   // Schedule retry after computed delay.
2041   GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
2042                     grpc_combiner_scheduler(chand->data_plane_combiner()));
2043   grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2044   // Update bookkeeping.
2045   if (retry_state != nullptr) retry_state->retry_dispatched = true;
2046 }
2047
2048 bool CallData::MaybeRetry(grpc_call_element* elem,
2049                           SubchannelCallBatchData* batch_data,
2050                           grpc_status_code status,
2051                           grpc_mdelem* server_pushback_md) {
2052   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2053   // Get retry policy.
2054   if (method_params_ == nullptr) return false;
2055   const auto* retry_policy = method_params_->retry_policy();
2056   if (retry_policy == nullptr) return false;
2057   // If we've already dispatched a retry from this call, return true.
2058   // This catches the case where the batch has multiple callbacks
2059   // (i.e., it includes either recv_message or recv_initial_metadata).
2060   SubchannelCallRetryState* retry_state = nullptr;
2061   if (batch_data != nullptr) {
2062     retry_state = static_cast<SubchannelCallRetryState*>(
2063         batch_data->subchannel_call->GetParentData());
2064     if (retry_state->retry_dispatched) {
2065       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2066         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2067                 this);
2068       }
2069       return true;
2070     }
2071   }
2072   // Check status.
2073   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2074     if (retry_throttle_data_ != nullptr) {
2075       retry_throttle_data_->RecordSuccess();
2076     }
2077     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2078       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2079     }
2080     return false;
2081   }
2082   // Status is not OK.  Check whether the status is retryable.
2083   if (!retry_policy->retryable_status_codes.Contains(status)) {
2084     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2085       gpr_log(GPR_INFO,
2086               "chand=%p calld=%p: status %s not configured as retryable", chand,
2087               this, grpc_status_code_to_string(status));
2088     }
2089     return false;
2090   }
2091   // Record the failure and check whether retries are throttled.
2092   // Note that it's important for this check to come after the status
2093   // code check above, since we should only record failures whose statuses
2094   // match the configured retryable status codes, so that we don't count
2095   // things like failures due to malformed requests (INVALID_ARGUMENT).
2096   // Conversely, it's important for this to come before the remaining
2097   // checks, so that we don't fail to record failures due to other factors.
2098   if (retry_throttle_data_ != nullptr &&
2099       !retry_throttle_data_->RecordFailure()) {
2100     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2101       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2102     }
2103     return false;
2104   }
2105   // Check whether the call is committed.
2106   if (retry_committed_) {
2107     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2108       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2109               this);
2110     }
2111     return false;
2112   }
2113   // Check whether we have retries remaining.
2114   ++num_attempts_completed_;
2115   if (num_attempts_completed_ >= retry_policy->max_attempts) {
2116     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2117       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2118               this, retry_policy->max_attempts);
2119     }
2120     return false;
2121   }
2122   // If the call was cancelled from the surface, don't retry.
2123   if (cancel_error_ != GRPC_ERROR_NONE) {
2124     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2125       gpr_log(GPR_INFO,
2126               "chand=%p calld=%p: call cancelled from surface, not retrying",
2127               chand, this);
2128     }
2129     return false;
2130   }
2131   // Check server push-back.
2132   grpc_millis server_pushback_ms = -1;
2133   if (server_pushback_md != nullptr) {
2134     // If the value is "-1" or any other unparseable string, we do not retry.
2135     uint32_t ms;
2136     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2137       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2138         gpr_log(GPR_INFO,
2139                 "chand=%p calld=%p: not retrying due to server push-back",
2140                 chand, this);
2141       }
2142       return false;
2143     } else {
2144       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2145         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2146                 chand, this, ms);
2147       }
2148       server_pushback_ms = (grpc_millis)ms;
2149     }
2150   }
2151   DoRetry(elem, retry_state, server_pushback_ms);
2152   return true;
2153 }
2154
2155 //
2156 // CallData::SubchannelCallBatchData
2157 //
2158
2159 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2160     grpc_call_element* elem, int refcount, bool set_on_complete) {
2161   CallData* calld = static_cast<CallData*>(elem->call_data);
2162   return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2163                                                      set_on_complete);
2164 }
2165
2166 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2167     grpc_call_element* elem, CallData* calld, int refcount,
2168     bool set_on_complete)
2169     : elem(elem), subchannel_call(calld->subchannel_call_) {
2170   SubchannelCallRetryState* retry_state =
2171       static_cast<SubchannelCallRetryState*>(
2172           calld->subchannel_call_->GetParentData());
2173   batch.payload = &retry_state->batch_payload;
2174   gpr_ref_init(&refs, refcount);
2175   if (set_on_complete) {
2176     GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2177                       grpc_schedule_on_exec_ctx);
2178     batch.on_complete = &on_complete;
2179   }
2180   GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2181 }
2182
2183 void CallData::SubchannelCallBatchData::Destroy() {
2184   SubchannelCallRetryState* retry_state =
2185       static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2186   if (batch.send_initial_metadata) {
2187     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2188   }
2189   if (batch.send_trailing_metadata) {
2190     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2191   }
2192   if (batch.recv_initial_metadata) {
2193     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2194   }
2195   if (batch.recv_trailing_metadata) {
2196     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2197   }
2198   subchannel_call.reset();
2199   CallData* calld = static_cast<CallData*>(elem->call_data);
2200   GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2201 }
2202
2203 //
2204 // recv_initial_metadata callback handling
2205 //
2206
2207 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2208   SubchannelCallBatchData* batch_data =
2209       static_cast<SubchannelCallBatchData*>(arg);
2210   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2211   // Find pending batch.
2212   PendingBatch* pending = calld->PendingBatchFind(
2213       batch_data->elem, "invoking recv_initial_metadata_ready for",
2214       [](grpc_transport_stream_op_batch* batch) {
2215         return batch->recv_initial_metadata &&
2216                batch->payload->recv_initial_metadata
2217                        .recv_initial_metadata_ready != nullptr;
2218       });
2219   GPR_ASSERT(pending != nullptr);
2220   // Return metadata.
2221   SubchannelCallRetryState* retry_state =
2222       static_cast<SubchannelCallRetryState*>(
2223           batch_data->subchannel_call->GetParentData());
2224   grpc_metadata_batch_move(
2225       &retry_state->recv_initial_metadata,
2226       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2227   // Update bookkeeping.
2228   // Note: Need to do this before invoking the callback, since invoking
2229   // the callback will result in yielding the call combiner.
2230   grpc_closure* recv_initial_metadata_ready =
2231       pending->batch->payload->recv_initial_metadata
2232           .recv_initial_metadata_ready;
2233   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2234       nullptr;
2235   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2236   batch_data->Unref();
2237   // Invoke callback.
2238   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2239 }
2240
2241 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2242   SubchannelCallBatchData* batch_data =
2243       static_cast<SubchannelCallBatchData*>(arg);
2244   grpc_call_element* elem = batch_data->elem;
2245   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2246   CallData* calld = static_cast<CallData*>(elem->call_data);
2247   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2248     gpr_log(GPR_INFO,
2249             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2250             chand, calld, grpc_error_string(error));
2251   }
2252   SubchannelCallRetryState* retry_state =
2253       static_cast<SubchannelCallRetryState*>(
2254           batch_data->subchannel_call->GetParentData());
2255   retry_state->completed_recv_initial_metadata = true;
2256   // If a retry was already dispatched, then we're not going to use the
2257   // result of this recv_initial_metadata op, so do nothing.
2258   if (retry_state->retry_dispatched) {
2259     GRPC_CALL_COMBINER_STOP(
2260         calld->call_combiner_,
2261         "recv_initial_metadata_ready after retry dispatched");
2262     return;
2263   }
2264   // If we got an error or a Trailers-Only response and have not yet gotten
2265   // the recv_trailing_metadata_ready callback, then defer propagating this
2266   // callback back to the surface.  We can evaluate whether to retry when
2267   // recv_trailing_metadata comes back.
2268   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2269                     error != GRPC_ERROR_NONE) &&
2270                    !retry_state->completed_recv_trailing_metadata)) {
2271     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2272       gpr_log(GPR_INFO,
2273               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2274               "(Trailers-Only)",
2275               chand, calld);
2276     }
2277     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2278     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2279     if (!retry_state->started_recv_trailing_metadata) {
2280       // recv_trailing_metadata not yet started by application; start it
2281       // ourselves to get status.
2282       calld->StartInternalRecvTrailingMetadata(elem);
2283     } else {
2284       GRPC_CALL_COMBINER_STOP(
2285           calld->call_combiner_,
2286           "recv_initial_metadata_ready trailers-only or error");
2287     }
2288     return;
2289   }
2290   // Received valid initial metadata, so commit the call.
2291   calld->RetryCommit(elem, retry_state);
2292   // Invoke the callback to return the result to the surface.
2293   // Manually invoking a callback function; it does not take ownership of error.
2294   calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2295 }
2296
2297 //
2298 // recv_message callback handling
2299 //
2300
2301 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2302   SubchannelCallBatchData* batch_data =
2303       static_cast<SubchannelCallBatchData*>(arg);
2304   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2305   // Find pending op.
2306   PendingBatch* pending = calld->PendingBatchFind(
2307       batch_data->elem, "invoking recv_message_ready for",
2308       [](grpc_transport_stream_op_batch* batch) {
2309         return batch->recv_message &&
2310                batch->payload->recv_message.recv_message_ready != nullptr;
2311       });
2312   GPR_ASSERT(pending != nullptr);
2313   // Return payload.
2314   SubchannelCallRetryState* retry_state =
2315       static_cast<SubchannelCallRetryState*>(
2316           batch_data->subchannel_call->GetParentData());
2317   *pending->batch->payload->recv_message.recv_message =
2318       std::move(retry_state->recv_message);
2319   // Update bookkeeping.
2320   // Note: Need to do this before invoking the callback, since invoking
2321   // the callback will result in yielding the call combiner.
2322   grpc_closure* recv_message_ready =
2323       pending->batch->payload->recv_message.recv_message_ready;
2324   pending->batch->payload->recv_message.recv_message_ready = nullptr;
2325   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2326   batch_data->Unref();
2327   // Invoke callback.
2328   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2329 }
2330
2331 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2332   SubchannelCallBatchData* batch_data =
2333       static_cast<SubchannelCallBatchData*>(arg);
2334   grpc_call_element* elem = batch_data->elem;
2335   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2336   CallData* calld = static_cast<CallData*>(elem->call_data);
2337   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2338     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2339             chand, calld, grpc_error_string(error));
2340   }
2341   SubchannelCallRetryState* retry_state =
2342       static_cast<SubchannelCallRetryState*>(
2343           batch_data->subchannel_call->GetParentData());
2344   ++retry_state->completed_recv_message_count;
2345   // If a retry was already dispatched, then we're not going to use the
2346   // result of this recv_message op, so do nothing.
2347   if (retry_state->retry_dispatched) {
2348     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2349                             "recv_message_ready after retry dispatched");
2350     return;
2351   }
2352   // If we got an error or the payload was nullptr and we have not yet gotten
2353   // the recv_trailing_metadata_ready callback, then defer propagating this
2354   // callback back to the surface.  We can evaluate whether to retry when
2355   // recv_trailing_metadata comes back.
2356   if (GPR_UNLIKELY(
2357           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2358           !retry_state->completed_recv_trailing_metadata)) {
2359     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2360       gpr_log(GPR_INFO,
2361               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2362               "message and recv_trailing_metadata pending)",
2363               chand, calld);
2364     }
2365     retry_state->recv_message_ready_deferred_batch = batch_data;
2366     retry_state->recv_message_error = GRPC_ERROR_REF(error);
2367     if (!retry_state->started_recv_trailing_metadata) {
2368       // recv_trailing_metadata not yet started by application; start it
2369       // ourselves to get status.
2370       calld->StartInternalRecvTrailingMetadata(elem);
2371     } else {
2372       GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2373     }
2374     return;
2375   }
2376   // Received a valid message, so commit the call.
2377   calld->RetryCommit(elem, retry_state);
2378   // Invoke the callback to return the result to the surface.
2379   // Manually invoking a callback function; it does not take ownership of error.
2380   calld->InvokeRecvMessageCallback(batch_data, error);
2381 }
2382
2383 //
2384 // recv_trailing_metadata handling
2385 //
2386
2387 void CallData::GetCallStatus(grpc_call_element* elem,
2388                              grpc_metadata_batch* md_batch, grpc_error* error,
2389                              grpc_status_code* status,
2390                              grpc_mdelem** server_pushback_md) {
2391   if (error != GRPC_ERROR_NONE) {
2392     grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2393   } else {
2394     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2395     *status =
2396         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2397     if (server_pushback_md != nullptr &&
2398         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2399       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2400     }
2401   }
2402   GRPC_ERROR_UNREF(error);
2403 }
2404
2405 void CallData::AddClosureForRecvTrailingMetadataReady(
2406     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2407     grpc_error* error, CallCombinerClosureList* closures) {
2408   // Find pending batch.
2409   PendingBatch* pending = PendingBatchFind(
2410       elem, "invoking recv_trailing_metadata for",
2411       [](grpc_transport_stream_op_batch* batch) {
2412         return batch->recv_trailing_metadata &&
2413                batch->payload->recv_trailing_metadata
2414                        .recv_trailing_metadata_ready != nullptr;
2415       });
2416   // If we generated the recv_trailing_metadata op internally via
2417   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2418   if (pending == nullptr) {
2419     GRPC_ERROR_UNREF(error);
2420     return;
2421   }
2422   // Return metadata.
2423   SubchannelCallRetryState* retry_state =
2424       static_cast<SubchannelCallRetryState*>(
2425           batch_data->subchannel_call->GetParentData());
2426   grpc_metadata_batch_move(
2427       &retry_state->recv_trailing_metadata,
2428       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2429   // Add closure.
2430   closures->Add(pending->batch->payload->recv_trailing_metadata
2431                     .recv_trailing_metadata_ready,
2432                 error, "recv_trailing_metadata_ready for pending batch");
2433   // Update bookkeeping.
2434   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2435       nullptr;
2436   MaybeClearPendingBatch(elem, pending);
2437 }
2438
2439 void CallData::AddClosuresForDeferredRecvCallbacks(
2440     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2441     CallCombinerClosureList* closures) {
2442   if (batch_data->batch.recv_trailing_metadata) {
2443     // Add closure for deferred recv_initial_metadata_ready.
2444     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2445                      nullptr)) {
2446       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2447                         InvokeRecvInitialMetadataCallback,
2448                         retry_state->recv_initial_metadata_ready_deferred_batch,
2449                         grpc_schedule_on_exec_ctx);
2450       closures->Add(&retry_state->recv_initial_metadata_ready,
2451                     retry_state->recv_initial_metadata_error,
2452                     "resuming recv_initial_metadata_ready");
2453       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2454     }
2455     // Add closure for deferred recv_message_ready.
2456     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2457                      nullptr)) {
2458       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2459                         InvokeRecvMessageCallback,
2460                         retry_state->recv_message_ready_deferred_batch,
2461                         grpc_schedule_on_exec_ctx);
2462       closures->Add(&retry_state->recv_message_ready,
2463                     retry_state->recv_message_error,
2464                     "resuming recv_message_ready");
2465       retry_state->recv_message_ready_deferred_batch = nullptr;
2466     }
2467   }
2468 }
2469
2470 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2471                                        SubchannelCallRetryState* retry_state) {
2472   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2473     return false;
2474   }
2475   if (pending->batch->send_initial_metadata &&
2476       !retry_state->started_send_initial_metadata) {
2477     return true;
2478   }
2479   if (pending->batch->send_message &&
2480       retry_state->started_send_message_count < send_messages_.size()) {
2481     return true;
2482   }
2483   if (pending->batch->send_trailing_metadata &&
2484       !retry_state->started_send_trailing_metadata) {
2485     return true;
2486   }
2487   return false;
2488 }
2489
2490 void CallData::AddClosuresToFailUnstartedPendingBatches(
2491     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2492     grpc_error* error, CallCombinerClosureList* closures) {
2493   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2494   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2495     PendingBatch* pending = &pending_batches_[i];
2496     if (PendingBatchIsUnstarted(pending, retry_state)) {
2497       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2498         gpr_log(GPR_INFO,
2499                 "chand=%p calld=%p: failing unstarted pending batch at index "
2500                 "%" PRIuPTR,
2501                 chand, this, i);
2502       }
2503       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2504                     "failing on_complete for pending batch");
2505       pending->batch->on_complete = nullptr;
2506       MaybeClearPendingBatch(elem, pending);
2507     }
2508   }
2509   GRPC_ERROR_UNREF(error);
2510 }
2511
2512 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2513                                            grpc_error* error) {
2514   grpc_call_element* elem = batch_data->elem;
2515   SubchannelCallRetryState* retry_state =
2516       static_cast<SubchannelCallRetryState*>(
2517           batch_data->subchannel_call->GetParentData());
2518   // Construct list of closures to execute.
2519   CallCombinerClosureList closures;
2520   // First, add closure for recv_trailing_metadata_ready.
2521   AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2522                                          GRPC_ERROR_REF(error), &closures);
2523   // If there are deferred recv_initial_metadata_ready or recv_message_ready
2524   // callbacks, add them to closures.
2525   AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2526   // Add closures to fail any pending batches that have not yet been started.
2527   AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2528                                            GRPC_ERROR_REF(error), &closures);
2529   // Don't need batch_data anymore.
2530   batch_data->Unref();
2531   // Schedule all of the closures identified above.
2532   // Note: This will release the call combiner.
2533   closures.RunClosures(call_combiner_);
2534   GRPC_ERROR_UNREF(error);
2535 }
2536
2537 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
2538   SubchannelCallBatchData* batch_data =
2539       static_cast<SubchannelCallBatchData*>(arg);
2540   grpc_call_element* elem = batch_data->elem;
2541   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2542   CallData* calld = static_cast<CallData*>(elem->call_data);
2543   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2544     gpr_log(GPR_INFO,
2545             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
2546             chand, calld, grpc_error_string(error));
2547   }
2548   SubchannelCallRetryState* retry_state =
2549       static_cast<SubchannelCallRetryState*>(
2550           batch_data->subchannel_call->GetParentData());
2551   retry_state->completed_recv_trailing_metadata = true;
2552   // Get the call's status and check for server pushback metadata.
2553   grpc_status_code status = GRPC_STATUS_OK;
2554   grpc_mdelem* server_pushback_md = nullptr;
2555   grpc_metadata_batch* md_batch =
2556       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
2557   calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
2558                        &server_pushback_md);
2559   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2560     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2561             calld, grpc_status_code_to_string(status));
2562   }
2563   // Check if we should retry.
2564   if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
2565     // Unref batch_data for deferred recv_initial_metadata_ready or
2566     // recv_message_ready callbacks, if any.
2567     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
2568       batch_data->Unref();
2569       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2570     }
2571     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2572       batch_data->Unref();
2573       GRPC_ERROR_UNREF(retry_state->recv_message_error);
2574     }
2575     batch_data->Unref();
2576     return;
2577   }
2578   // Not retrying, so commit the call.
2579   calld->RetryCommit(elem, retry_state);
2580   // Run any necessary closures.
2581   calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
2582 }
2583
2584 //
2585 // on_complete callback handling
2586 //
2587
2588 void CallData::AddClosuresForCompletedPendingBatch(
2589     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2590     SubchannelCallRetryState* retry_state, grpc_error* error,
2591     CallCombinerClosureList* closures) {
2592   PendingBatch* pending = PendingBatchFind(
2593       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2594         // Match the pending batch with the same set of send ops as the
2595         // subchannel batch we've just completed.
2596         return batch->on_complete != nullptr &&
2597                batch_data->batch.send_initial_metadata ==
2598                    batch->send_initial_metadata &&
2599                batch_data->batch.send_message == batch->send_message &&
2600                batch_data->batch.send_trailing_metadata ==
2601                    batch->send_trailing_metadata;
2602       });
2603   // If batch_data is a replay batch, then there will be no pending
2604   // batch to complete.
2605   if (pending == nullptr) {
2606     GRPC_ERROR_UNREF(error);
2607     return;
2608   }
2609   // Add closure.
2610   closures->Add(pending->batch->on_complete, error,
2611                 "on_complete for pending batch");
2612   pending->batch->on_complete = nullptr;
2613   MaybeClearPendingBatch(elem, pending);
2614 }
2615
2616 void CallData::AddClosuresForReplayOrPendingSendOps(
2617     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2618     SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
2619   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2620   bool have_pending_send_message_ops =
2621       retry_state->started_send_message_count < send_messages_.size();
2622   bool have_pending_send_trailing_metadata_op =
2623       seen_send_trailing_metadata_ &&
2624       !retry_state->started_send_trailing_metadata;
2625   if (!have_pending_send_message_ops &&
2626       !have_pending_send_trailing_metadata_op) {
2627     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2628       PendingBatch* pending = &pending_batches_[i];
2629       grpc_transport_stream_op_batch* batch = pending->batch;
2630       if (batch == nullptr || pending->send_ops_cached) continue;
2631       if (batch->send_message) have_pending_send_message_ops = true;
2632       if (batch->send_trailing_metadata) {
2633         have_pending_send_trailing_metadata_op = true;
2634       }
2635     }
2636   }
2637   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2638     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2639       gpr_log(GPR_INFO,
2640               "chand=%p calld=%p: starting next batch for pending send op(s)",
2641               chand, this);
2642     }
2643     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2644                       StartRetriableSubchannelBatches, elem,
2645                       grpc_schedule_on_exec_ctx);
2646     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2647                   "starting next batch for send_* op(s)");
2648   }
2649 }
2650
2651 void CallData::OnComplete(void* arg, grpc_error* error) {
2652   SubchannelCallBatchData* batch_data =
2653       static_cast<SubchannelCallBatchData*>(arg);
2654   grpc_call_element* elem = batch_data->elem;
2655   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2656   CallData* calld = static_cast<CallData*>(elem->call_data);
2657   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2658     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2659     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2660             chand, calld, grpc_error_string(error), batch_str);
2661     gpr_free(batch_str);
2662   }
2663   SubchannelCallRetryState* retry_state =
2664       static_cast<SubchannelCallRetryState*>(
2665           batch_data->subchannel_call->GetParentData());
2666   // Update bookkeeping in retry_state.
2667   if (batch_data->batch.send_initial_metadata) {
2668     retry_state->completed_send_initial_metadata = true;
2669   }
2670   if (batch_data->batch.send_message) {
2671     ++retry_state->completed_send_message_count;
2672   }
2673   if (batch_data->batch.send_trailing_metadata) {
2674     retry_state->completed_send_trailing_metadata = true;
2675   }
2676   // If the call is committed, free cached data for send ops that we've just
2677   // completed.
2678   if (calld->retry_committed_) {
2679     calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
2680   }
2681   // Construct list of closures to execute.
2682   CallCombinerClosureList closures;
2683   // If a retry was already dispatched, that means we saw
2684   // recv_trailing_metadata before this, so we do nothing here.
2685   // Otherwise, invoke the callback to return the result to the surface.
2686   if (!retry_state->retry_dispatched) {
2687     // Add closure for the completed pending batch, if any.
2688     calld->AddClosuresForCompletedPendingBatch(
2689         elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
2690     // If needed, add a callback to start any replay or pending send ops on
2691     // the subchannel call.
2692     if (!retry_state->completed_recv_trailing_metadata) {
2693       calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
2694                                                   &closures);
2695     }
2696   }
2697   // Track number of pending subchannel send batches and determine if this
2698   // was the last one.
2699   --calld->num_pending_retriable_subchannel_send_batches_;
2700   const bool last_send_batch_complete =
2701       calld->num_pending_retriable_subchannel_send_batches_ == 0;
2702   // Don't need batch_data anymore.
2703   batch_data->Unref();
2704   // Schedule all of the closures identified above.
2705   // Note: This yeilds the call combiner.
2706   closures.RunClosures(calld->call_combiner_);
2707   // If this was the last subchannel send batch, unref the call stack.
2708   if (last_send_batch_complete) {
2709     GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
2710   }
2711 }
2712
2713 //
2714 // subchannel batch construction
2715 //
2716
2717 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
2718   grpc_transport_stream_op_batch* batch =
2719       static_cast<grpc_transport_stream_op_batch*>(arg);
2720   SubchannelCall* subchannel_call =
2721       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2722   // Note: This will release the call combiner.
2723   subchannel_call->StartTransportStreamOpBatch(batch);
2724 }
2725
2726 void CallData::AddClosureForSubchannelBatch(
2727     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2728     CallCombinerClosureList* closures) {
2729   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2730   batch->handler_private.extra_arg = subchannel_call_.get();
2731   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
2732                     batch, grpc_schedule_on_exec_ctx);
2733   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2734     char* batch_str = grpc_transport_stream_op_batch_string(batch);
2735     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2736             this, batch_str);
2737     gpr_free(batch_str);
2738   }
2739   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2740                 "start_subchannel_batch");
2741 }
2742
2743 void CallData::AddRetriableSendInitialMetadataOp(
2744     SubchannelCallRetryState* retry_state,
2745     SubchannelCallBatchData* batch_data) {
2746   // Maps the number of retries to the corresponding metadata value slice.
2747   static const grpc_slice* retry_count_strings[] = {
2748       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2749   // We need to make a copy of the metadata batch for each attempt, since
2750   // the filters in the subchannel stack may modify this batch, and we don't
2751   // want those modifications to be passed forward to subsequent attempts.
2752   //
2753   // If we've already completed one or more attempts, add the
2754   // grpc-retry-attempts header.
2755   retry_state->send_initial_metadata_storage =
2756       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2757           sizeof(grpc_linked_mdelem) *
2758           (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
2759   grpc_metadata_batch_copy(&send_initial_metadata_,
2760                            &retry_state->send_initial_metadata,
2761                            retry_state->send_initial_metadata_storage);
2762   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2763                        .grpc_previous_rpc_attempts != nullptr)) {
2764     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2765                                retry_state->send_initial_metadata.idx.named
2766                                    .grpc_previous_rpc_attempts);
2767   }
2768   if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
2769     grpc_mdelem retry_md = grpc_mdelem_create(
2770         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2771         *retry_count_strings[num_attempts_completed_ - 1], nullptr);
2772     grpc_error* error = grpc_metadata_batch_add_tail(
2773         &retry_state->send_initial_metadata,
2774         &retry_state
2775              ->send_initial_metadata_storage[send_initial_metadata_.list.count],
2776         retry_md);
2777     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2778       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2779               grpc_error_string(error));
2780       GPR_ASSERT(false);
2781     }
2782   }
2783   retry_state->started_send_initial_metadata = true;
2784   batch_data->batch.send_initial_metadata = true;
2785   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2786       &retry_state->send_initial_metadata;
2787   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2788       send_initial_metadata_flags_;
2789   batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
2790 }
2791
2792 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
2793                                          SubchannelCallRetryState* retry_state,
2794                                          SubchannelCallBatchData* batch_data) {
2795   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2796   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2797     gpr_log(GPR_INFO,
2798             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2799             chand, this, retry_state->started_send_message_count);
2800   }
2801   ByteStreamCache* cache =
2802       send_messages_[retry_state->started_send_message_count];
2803   ++retry_state->started_send_message_count;
2804   retry_state->send_message.Init(cache);
2805   batch_data->batch.send_message = true;
2806   batch_data->batch.payload->send_message.send_message.reset(
2807       retry_state->send_message.get());
2808 }
2809
2810 void CallData::AddRetriableSendTrailingMetadataOp(
2811     SubchannelCallRetryState* retry_state,
2812     SubchannelCallBatchData* batch_data) {
2813   // We need to make a copy of the metadata batch for each attempt, since
2814   // the filters in the subchannel stack may modify this batch, and we don't
2815   // want those modifications to be passed forward to subsequent attempts.
2816   retry_state->send_trailing_metadata_storage =
2817       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2818           sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
2819   grpc_metadata_batch_copy(&send_trailing_metadata_,
2820                            &retry_state->send_trailing_metadata,
2821                            retry_state->send_trailing_metadata_storage);
2822   retry_state->started_send_trailing_metadata = true;
2823   batch_data->batch.send_trailing_metadata = true;
2824   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2825       &retry_state->send_trailing_metadata;
2826 }
2827
2828 void CallData::AddRetriableRecvInitialMetadataOp(
2829     SubchannelCallRetryState* retry_state,
2830     SubchannelCallBatchData* batch_data) {
2831   retry_state->started_recv_initial_metadata = true;
2832   batch_data->batch.recv_initial_metadata = true;
2833   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2834   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2835       &retry_state->recv_initial_metadata;
2836   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2837       &retry_state->trailing_metadata_available;
2838   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2839                     RecvInitialMetadataReady, batch_data,
2840                     grpc_schedule_on_exec_ctx);
2841   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2842       &retry_state->recv_initial_metadata_ready;
2843 }
2844
2845 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
2846                                          SubchannelCallBatchData* batch_data) {
2847   ++retry_state->started_recv_message_count;
2848   batch_data->batch.recv_message = true;
2849   batch_data->batch.payload->recv_message.recv_message =
2850       &retry_state->recv_message;
2851   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
2852                     batch_data, grpc_schedule_on_exec_ctx);
2853   batch_data->batch.payload->recv_message.recv_message_ready =
2854       &retry_state->recv_message_ready;
2855 }
2856
2857 void CallData::AddRetriableRecvTrailingMetadataOp(
2858     SubchannelCallRetryState* retry_state,
2859     SubchannelCallBatchData* batch_data) {
2860   retry_state->started_recv_trailing_metadata = true;
2861   batch_data->batch.recv_trailing_metadata = true;
2862   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
2863   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2864       &retry_state->recv_trailing_metadata;
2865   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
2866       &retry_state->collect_stats;
2867   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
2868                     RecvTrailingMetadataReady, batch_data,
2869                     grpc_schedule_on_exec_ctx);
2870   batch_data->batch.payload->recv_trailing_metadata
2871       .recv_trailing_metadata_ready =
2872       &retry_state->recv_trailing_metadata_ready;
2873   MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
2874       pick_.pick, &batch_data->batch);
2875 }
2876
2877 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
2878   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2879   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2880     gpr_log(GPR_INFO,
2881             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2882             "started; starting it internally",
2883             chand, this);
2884   }
2885   SubchannelCallRetryState* retry_state =
2886       static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
2887   // Create batch_data with 2 refs, since this batch will be unreffed twice:
2888   // once for the recv_trailing_metadata_ready callback when the subchannel
2889   // batch returns, and again when we actually get a recv_trailing_metadata
2890   // op from the surface.
2891   SubchannelCallBatchData* batch_data =
2892       SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
2893   AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
2894   retry_state->recv_trailing_metadata_internal_batch = batch_data;
2895   // Note: This will release the call combiner.
2896   subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
2897 }
2898
2899 // If there are any cached send ops that need to be replayed on the
2900 // current subchannel call, creates and returns a new subchannel batch
2901 // to replay those ops.  Otherwise, returns nullptr.
2902 CallData::SubchannelCallBatchData*
2903 CallData::MaybeCreateSubchannelBatchForReplay(
2904     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
2905   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2906   SubchannelCallBatchData* replay_batch_data = nullptr;
2907   // send_initial_metadata.
2908   if (seen_send_initial_metadata_ &&
2909       !retry_state->started_send_initial_metadata &&
2910       !pending_send_initial_metadata_) {
2911     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2912       gpr_log(GPR_INFO,
2913               "chand=%p calld=%p: replaying previously completed "
2914               "send_initial_metadata op",
2915               chand, this);
2916     }
2917     replay_batch_data =
2918         SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
2919     AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
2920   }
2921   // send_message.
2922   // Note that we can only have one send_message op in flight at a time.
2923   if (retry_state->started_send_message_count < send_messages_.size() &&
2924       retry_state->started_send_message_count ==
2925           retry_state->completed_send_message_count &&
2926       !pending_send_message_) {
2927     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2928       gpr_log(GPR_INFO,
2929               "chand=%p calld=%p: replaying previously completed "
2930               "send_message op",
2931               chand, this);
2932     }
2933     if (replay_batch_data == nullptr) {
2934       replay_batch_data =
2935           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
2936     }
2937     AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
2938   }
2939   // send_trailing_metadata.
2940   // Note that we only add this op if we have no more send_message ops
2941   // to start, since we can't send down any more send_message ops after
2942   // send_trailing_metadata.
2943   if (seen_send_trailing_metadata_ &&
2944       retry_state->started_send_message_count == send_messages_.size() &&
2945       !retry_state->started_send_trailing_metadata &&
2946       !pending_send_trailing_metadata_) {
2947     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2948       gpr_log(GPR_INFO,
2949               "chand=%p calld=%p: replaying previously completed "
2950               "send_trailing_metadata op",
2951               chand, this);
2952     }
2953     if (replay_batch_data == nullptr) {
2954       replay_batch_data =
2955           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
2956     }
2957     AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
2958   }
2959   return replay_batch_data;
2960 }
2961
2962 void CallData::AddSubchannelBatchesForPendingBatches(
2963     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2964     CallCombinerClosureList* closures) {
2965   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2966     PendingBatch* pending = &pending_batches_[i];
2967     grpc_transport_stream_op_batch* batch = pending->batch;
2968     if (batch == nullptr) continue;
2969     // Skip any batch that either (a) has already been started on this
2970     // subchannel call or (b) we can't start yet because we're still
2971     // replaying send ops that need to be completed first.
2972     // TODO(roth): Note that if any one op in the batch can't be sent
2973     // yet due to ops that we're replaying, we don't start any of the ops
2974     // in the batch.  This is probably okay, but it could conceivably
2975     // lead to increased latency in some cases -- e.g., we could delay
2976     // starting a recv op due to it being in the same batch with a send
2977     // op.  If/when we revamp the callback protocol in
2978     // transport_stream_op_batch, we may be able to fix this.
2979     if (batch->send_initial_metadata &&
2980         retry_state->started_send_initial_metadata) {
2981       continue;
2982     }
2983     if (batch->send_message && retry_state->completed_send_message_count <
2984                                    retry_state->started_send_message_count) {
2985       continue;
2986     }
2987     // Note that we only start send_trailing_metadata if we have no more
2988     // send_message ops to start, since we can't send down any more
2989     // send_message ops after send_trailing_metadata.
2990     if (batch->send_trailing_metadata &&
2991         (retry_state->started_send_message_count + batch->send_message <
2992              send_messages_.size() ||
2993          retry_state->started_send_trailing_metadata)) {
2994       continue;
2995     }
2996     if (batch->recv_initial_metadata &&
2997         retry_state->started_recv_initial_metadata) {
2998       continue;
2999     }
3000     if (batch->recv_message && retry_state->completed_recv_message_count <
3001                                    retry_state->started_recv_message_count) {
3002       continue;
3003     }
3004     if (batch->recv_trailing_metadata &&
3005         retry_state->started_recv_trailing_metadata) {
3006       // If we previously completed a recv_trailing_metadata op
3007       // initiated by StartInternalRecvTrailingMetadata(), use the
3008       // result of that instead of trying to re-start this op.
3009       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3010                         nullptr))) {
3011         // If the batch completed, then trigger the completion callback
3012         // directly, so that we return the previously returned results to
3013         // the application.  Otherwise, just unref the internally
3014         // started subchannel batch, since we'll propagate the
3015         // completion when it completes.
3016         if (retry_state->completed_recv_trailing_metadata) {
3017           // Batches containing recv_trailing_metadata always succeed.
3018           closures->Add(
3019               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3020               "re-executing recv_trailing_metadata_ready to propagate "
3021               "internally triggered result");
3022         } else {
3023           retry_state->recv_trailing_metadata_internal_batch->Unref();
3024         }
3025         retry_state->recv_trailing_metadata_internal_batch = nullptr;
3026       }
3027       continue;
3028     }
3029     // If we're not retrying, just send the batch as-is.
3030     if (method_params_ == nullptr ||
3031         method_params_->retry_policy() == nullptr || retry_committed_) {
3032       // TODO(roth) : We should probably call
3033       // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3034       AddClosureForSubchannelBatch(elem, batch, closures);
3035       PendingBatchClear(pending);
3036       continue;
3037     }
3038     // Create batch with the right number of callbacks.
3039     const bool has_send_ops = batch->send_initial_metadata ||
3040                               batch->send_message ||
3041                               batch->send_trailing_metadata;
3042     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3043                               batch->recv_message +
3044                               batch->recv_trailing_metadata;
3045     SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3046         elem, num_callbacks, has_send_ops /* set_on_complete */);
3047     // Cache send ops if needed.
3048     MaybeCacheSendOpsForBatch(pending);
3049     // send_initial_metadata.
3050     if (batch->send_initial_metadata) {
3051       AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3052     }
3053     // send_message.
3054     if (batch->send_message) {
3055       AddRetriableSendMessageOp(elem, retry_state, batch_data);
3056     }
3057     // send_trailing_metadata.
3058     if (batch->send_trailing_metadata) {
3059       AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3060     }
3061     // recv_initial_metadata.
3062     if (batch->recv_initial_metadata) {
3063       // recv_flags is only used on the server side.
3064       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3065       AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3066     }
3067     // recv_message.
3068     if (batch->recv_message) {
3069       AddRetriableRecvMessageOp(retry_state, batch_data);
3070     }
3071     // recv_trailing_metadata.
3072     if (batch->recv_trailing_metadata) {
3073       AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3074     }
3075     AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3076     // Track number of pending subchannel send batches.
3077     // If this is the first one, take a ref to the call stack.
3078     if (batch->send_initial_metadata || batch->send_message ||
3079         batch->send_trailing_metadata) {
3080       if (num_pending_retriable_subchannel_send_batches_ == 0) {
3081         GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3082       }
3083       ++num_pending_retriable_subchannel_send_batches_;
3084     }
3085   }
3086 }
3087
3088 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3089   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3090   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3091   CallData* calld = static_cast<CallData*>(elem->call_data);
3092   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3093     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3094             chand, calld);
3095   }
3096   SubchannelCallRetryState* retry_state =
3097       static_cast<SubchannelCallRetryState*>(
3098           calld->subchannel_call_->GetParentData());
3099   // Construct list of closures to execute, one for each pending batch.
3100   CallCombinerClosureList closures;
3101   // Replay previously-returned send_* ops if needed.
3102   SubchannelCallBatchData* replay_batch_data =
3103       calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3104   if (replay_batch_data != nullptr) {
3105     calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3106                                         &closures);
3107     // Track number of pending subchannel send batches.
3108     // If this is the first one, take a ref to the call stack.
3109     if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3110       GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3111     }
3112     ++calld->num_pending_retriable_subchannel_send_batches_;
3113   }
3114   // Now add pending batches.
3115   calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3116   // Start batches on subchannel call.
3117   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3118     gpr_log(GPR_INFO,
3119             "chand=%p calld=%p: starting %" PRIuPTR
3120             " retriable batches on subchannel_call=%p",
3121             chand, calld, closures.size(), calld->subchannel_call_.get());
3122   }
3123   // Note: This will yield the call combiner.
3124   closures.RunClosures(calld->call_combiner_);
3125 }
3126
3127 //
3128 // LB pick
3129 //
3130
3131 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3132   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3133   const size_t parent_data_size =
3134       enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3135   const ConnectedSubchannel::CallArgs call_args = {
3136       pollent_, path_, call_start_time_, deadline_, arena_,
3137       // TODO(roth): When we implement hedging support, we will probably
3138       // need to use a separate call context for each subchannel call.
3139       call_context_, call_combiner_, parent_data_size};
3140   grpc_error* error = GRPC_ERROR_NONE;
3141   subchannel_call_ =
3142       pick_.pick.connected_subchannel->CreateCall(call_args, &error);
3143   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3144     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3145             chand, this, subchannel_call_.get(), grpc_error_string(error));
3146   }
3147   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3148     PendingBatchesFail(elem, error, YieldCallCombiner);
3149   } else {
3150     if (parent_data_size > 0) {
3151       new (subchannel_call_->GetParentData())
3152           SubchannelCallRetryState(call_context_);
3153     }
3154     PendingBatchesResume(elem);
3155   }
3156 }
3157
3158 void CallData::PickDone(void* arg, grpc_error* error) {
3159   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3160   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3161   CallData* calld = static_cast<CallData*>(elem->call_data);
3162   if (error != GRPC_ERROR_NONE) {
3163     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3164       gpr_log(GPR_INFO,
3165               "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3166               calld, grpc_error_string(error));
3167     }
3168     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3169     return;
3170   }
3171   calld->CreateSubchannelCall(elem);
3172 }
3173
3174 // A class to handle the call combiner cancellation callback for a
3175 // queued pick.
3176 class CallData::QueuedPickCanceller {
3177  public:
3178   explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3179     auto* calld = static_cast<CallData*>(elem->call_data);
3180     auto* chand = static_cast<ChannelData*>(elem->channel_data);
3181     GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3182     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3183                       grpc_combiner_scheduler(chand->data_plane_combiner()));
3184     calld->call_combiner_->SetNotifyOnCancel(&closure_);
3185   }
3186
3187  private:
3188   static void CancelLocked(void* arg, grpc_error* error) {
3189     auto* self = static_cast<QueuedPickCanceller*>(arg);
3190     auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3191     auto* calld = static_cast<CallData*>(self->elem_->call_data);
3192     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3193       gpr_log(GPR_INFO,
3194               "chand=%p calld=%p: cancelling queued pick: "
3195               "error=%s self=%p calld->pick_canceller=%p",
3196               chand, calld, grpc_error_string(error), self,
3197               calld->pick_canceller_);
3198     }
3199     if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3200       // Remove pick from list of queued picks.
3201       calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3202       // Fail pending batches on the call.
3203       calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3204                                 YieldCallCombinerIfPendingBatchesFound);
3205     }
3206     GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3207     Delete(self);
3208   }
3209
3210   grpc_call_element* elem_;
3211   grpc_closure closure_;
3212 };
3213
3214 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3215   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3216   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3217     gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3218             chand, this);
3219   }
3220   chand->RemoveQueuedPick(&pick_, pollent_);
3221   pick_queued_ = false;
3222   // Lame the call combiner canceller.
3223   pick_canceller_ = nullptr;
3224 }
3225
3226 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3227   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3228   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3229     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3230             this);
3231   }
3232   pick_queued_ = true;
3233   pick_.elem = elem;
3234   chand->AddQueuedPick(&pick_, pollent_);
3235   // Register call combiner cancellation callback.
3236   pick_canceller_ = New<QueuedPickCanceller>(elem);
3237 }
3238
3239 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3240   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3241   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3242     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3243             chand, this);
3244   }
3245   // Store a ref to the service_config in service_config_call_data_. Also, save
3246   // a pointer to this in the call_context so that all future filters can access
3247   // it.
3248   service_config_call_data_ =
3249       ServiceConfig::CallData(chand->service_config(), path_);
3250   if (service_config_call_data_.service_config() != nullptr) {
3251     call_context_[GRPC_SERVICE_CONFIG_CALL_DATA].value =
3252         &service_config_call_data_;
3253     method_params_ = static_cast<ClientChannelMethodParsedObject*>(
3254         service_config_call_data_.GetMethodParsedObject(
3255             internal::ClientChannelServiceConfigParser::ParserIndex()));
3256   }
3257   retry_throttle_data_ = chand->retry_throttle_data();
3258   if (method_params_ != nullptr) {
3259     // If the deadline from the service config is shorter than the one
3260     // from the client API, reset the deadline timer.
3261     if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3262       const grpc_millis per_method_deadline =
3263           grpc_timespec_to_millis_round_up(call_start_time_) +
3264           method_params_->timeout();
3265       if (per_method_deadline < deadline_) {
3266         deadline_ = per_method_deadline;
3267         grpc_deadline_state_reset(elem, deadline_);
3268       }
3269     }
3270     // If the service config set wait_for_ready and the application
3271     // did not explicitly set it, use the value from the service config.
3272     uint32_t* send_initial_metadata_flags =
3273         &pending_batches_[0]
3274              .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3275     if (method_params_->wait_for_ready().has_value() &&
3276         !(*send_initial_metadata_flags &
3277           GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3278       if (method_params_->wait_for_ready().value()) {
3279         *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3280       } else {
3281         *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3282       }
3283     }
3284   }
3285   // If no retry policy, disable retries.
3286   // TODO(roth): Remove this when adding support for transparent retries.
3287   if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3288     enable_retries_ = false;
3289   }
3290 }
3291
3292 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3293   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3294   // Apply service config data to the call only once, and only if the
3295   // channel has the data available.
3296   if (GPR_LIKELY(chand->received_service_config_data() &&
3297                  !service_config_applied_)) {
3298     service_config_applied_ = true;
3299     ApplyServiceConfigToCallLocked(elem);
3300   }
3301 }
3302
3303 const char* PickResultName(LoadBalancingPolicy::PickResult result) {
3304   switch (result) {
3305     case LoadBalancingPolicy::PICK_COMPLETE:
3306       return "COMPLETE";
3307     case LoadBalancingPolicy::PICK_QUEUE:
3308       return "QUEUE";
3309     case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
3310       return "TRANSIENT_FAILURE";
3311   }
3312   GPR_UNREACHABLE_CODE(return "UNKNOWN");
3313 }
3314
3315 void CallData::StartPickLocked(void* arg, grpc_error* error) {
3316   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3317   CallData* calld = static_cast<CallData*>(elem->call_data);
3318   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3319   GPR_ASSERT(calld->pick_.pick.connected_subchannel == nullptr);
3320   GPR_ASSERT(calld->subchannel_call_ == nullptr);
3321   // If this is a retry, use the send_initial_metadata payload that
3322   // we've cached; otherwise, use the pending batch.  The
3323   // send_initial_metadata batch will be the first pending batch in the
3324   // list, as set by GetBatchIndex() above.
3325   // TODO(roth): What if the LB policy needs to add something to the
3326   // call's initial metadata, and then there's a retry?  We don't want
3327   // the new metadata to be added twice.  We might need to somehow
3328   // allocate the subchannel batch earlier so that we can give the
3329   // subchannel's copy of the metadata batch (which is copied for each
3330   // attempt) to the LB policy instead the one from the parent channel.
3331   calld->pick_.pick.initial_metadata =
3332       calld->seen_send_initial_metadata_
3333           ? &calld->send_initial_metadata_
3334           : calld->pending_batches_[0]
3335                 .batch->payload->send_initial_metadata.send_initial_metadata;
3336   uint32_t* send_initial_metadata_flags =
3337       calld->seen_send_initial_metadata_
3338           ? &calld->send_initial_metadata_flags_
3339           : &calld->pending_batches_[0]
3340                  .batch->payload->send_initial_metadata
3341                  .send_initial_metadata_flags;
3342   // Apply service config to call if needed.
3343   calld->MaybeApplyServiceConfigToCallLocked(elem);
3344   // When done, we schedule this closure to leave the data plane combiner.
3345   GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
3346                     grpc_schedule_on_exec_ctx);
3347   // Attempt pick.
3348   error = GRPC_ERROR_NONE;
3349   auto pick_result = chand->picker()->Pick(&calld->pick_.pick, &error);
3350   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3351     gpr_log(GPR_INFO,
3352             "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
3353             "error=%s)",
3354             chand, calld, PickResultName(pick_result),
3355             calld->pick_.pick.connected_subchannel.get(),
3356             grpc_error_string(error));
3357   }
3358   switch (pick_result) {
3359     case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
3360       // If we're shutting down, fail all RPCs.
3361       grpc_error* disconnect_error = chand->disconnect_error();
3362       if (disconnect_error != GRPC_ERROR_NONE) {
3363         GRPC_ERROR_UNREF(error);
3364         GRPC_CLOSURE_SCHED(&calld->pick_closure_,
3365                            GRPC_ERROR_REF(disconnect_error));
3366         break;
3367       }
3368       // If wait_for_ready is false, then the error indicates the RPC
3369       // attempt's final status.
3370       if ((*send_initial_metadata_flags &
3371            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3372         // Retry if appropriate; otherwise, fail.
3373         grpc_status_code status = GRPC_STATUS_OK;
3374         grpc_error_get_status(error, calld->deadline_, &status, nullptr,
3375                               nullptr, nullptr);
3376         if (!calld->enable_retries_ ||
3377             !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
3378                                nullptr /* server_pushback_md */)) {
3379           grpc_error* new_error =
3380               GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3381                   "Failed to pick subchannel", &error, 1);
3382           GRPC_ERROR_UNREF(error);
3383           GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
3384         }
3385         if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3386         break;
3387       }
3388       // If wait_for_ready is true, then queue to retry when we get a new
3389       // picker.
3390       GRPC_ERROR_UNREF(error);
3391     }
3392     // Fallthrough
3393     case LoadBalancingPolicy::PICK_QUEUE:
3394       if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
3395       break;
3396     default:  // PICK_COMPLETE
3397       // Handle drops.
3398       if (GPR_UNLIKELY(calld->pick_.pick.connected_subchannel == nullptr)) {
3399         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3400             "Call dropped by load balancing policy");
3401       }
3402       GRPC_CLOSURE_SCHED(&calld->pick_closure_, error);
3403       if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3404   }
3405 }
3406
3407 }  // namespace
3408 }  // namespace grpc_core
3409
3410 /*************************************************************************
3411  * EXPORTED SYMBOLS
3412  */
3413
3414 using grpc_core::CallData;
3415 using grpc_core::ChannelData;
3416
3417 const grpc_channel_filter grpc_client_channel_filter = {
3418     CallData::StartTransportStreamOpBatch,
3419     ChannelData::StartTransportOp,
3420     sizeof(CallData),
3421     CallData::Init,
3422     CallData::SetPollent,
3423     CallData::Destroy,
3424     sizeof(ChannelData),
3425     ChannelData::Init,
3426     ChannelData::Destroy,
3427     ChannelData::GetChannelInfo,
3428     "client-channel",
3429 };
3430
3431 void grpc_client_channel_set_channelz_node(
3432     grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
3433   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3434   chand->set_channelz_node(node);
3435 }
3436
3437 void grpc_client_channel_populate_child_refs(
3438     grpc_channel_element* elem,
3439     grpc_core::channelz::ChildRefsList* child_subchannels,
3440     grpc_core::channelz::ChildRefsList* child_channels) {
3441   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3442   chand->FillChildRefsForChannelz(child_subchannels, child_channels);
3443 }
3444
3445 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3446     grpc_channel_element* elem, int try_to_connect) {
3447   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3448   return chand->CheckConnectivityState(try_to_connect);
3449 }
3450
3451 int grpc_client_channel_num_external_connectivity_watchers(
3452     grpc_channel_element* elem) {
3453   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3454   return chand->NumExternalConnectivityWatchers();
3455 }
3456
3457 void grpc_client_channel_watch_connectivity_state(
3458     grpc_channel_element* elem, grpc_polling_entity pollent,
3459     grpc_connectivity_state* state, grpc_closure* closure,
3460     grpc_closure* watcher_timer_init) {
3461   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3462   return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3463                                                watcher_timer_init);
3464 }
3465
3466 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3467 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3468   auto* calld = static_cast<CallData*>(elem->call_data);
3469   return calld->subchannel_call();
3470 }