3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/combiner.h"
56 #include "src/core/lib/iomgr/iomgr.h"
57 #include "src/core/lib/iomgr/polling_entity.h"
58 #include "src/core/lib/profiling/timers.h"
59 #include "src/core/lib/slice/slice_internal.h"
60 #include "src/core/lib/slice/slice_string_helpers.h"
61 #include "src/core/lib/surface/channel.h"
62 #include "src/core/lib/transport/connectivity_state.h"
63 #include "src/core/lib/transport/error_utils.h"
64 #include "src/core/lib/transport/metadata.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/lib/transport/static_metadata.h"
67 #include "src/core/lib/transport/status_metadata.h"
69 using grpc_core::internal::ClientChannelMethodParsedObject;
70 using grpc_core::internal::ServerRetryThrottleData;
73 // Client channel filter
76 // By default, we buffer 256 KiB per RPC for retries.
77 // TODO(roth): Do we have any data to suggest a better value?
78 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
80 // This value was picked arbitrarily. It can be changed if there is
81 // any even moderately compelling reason to do so.
82 #define RETRY_BACKOFF_JITTER 0.2
84 // Max number of batches that can be pending on a call at any given
85 // time. This includes one batch for each of the following ops:
86 // recv_initial_metadata
87 // send_initial_metadata
90 // recv_trailing_metadata
91 // send_trailing_metadata
92 #define MAX_PENDING_BATCHES 6
96 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
97 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
102 // ChannelData definition
108 LoadBalancingPolicy::PickArgs pick;
109 grpc_call_element* elem;
110 QueuedPick* next = nullptr;
113 static grpc_error* Init(grpc_channel_element* elem,
114 grpc_channel_element_args* args);
115 static void Destroy(grpc_channel_element* elem);
116 static void StartTransportOp(grpc_channel_element* elem,
117 grpc_transport_op* op);
118 static void GetChannelInfo(grpc_channel_element* elem,
119 const grpc_channel_info* info);
121 void set_channelz_node(channelz::ClientChannelNode* node) {
122 channelz_node_ = node;
123 resolving_lb_policy_->set_channelz_node(node->Ref());
125 void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
126 channelz::ChildRefsList* child_channels) {
127 if (resolving_lb_policy_ != nullptr) {
128 resolving_lb_policy_->FillChildRefsForChannelz(child_subchannels,
133 bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
134 bool enable_retries() const { return enable_retries_; }
135 size_t per_rpc_retry_buffer_size() const {
136 return per_rpc_retry_buffer_size_;
139 // Note: Does NOT return a new ref.
140 grpc_error* disconnect_error() const {
141 return disconnect_error_.Load(MemoryOrder::ACQUIRE);
144 grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
146 LoadBalancingPolicy::SubchannelPicker* picker() const {
147 return picker_.get();
149 void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
150 void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
152 bool received_service_config_data() const {
153 return received_service_config_data_;
155 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
156 return retry_throttle_data_;
158 RefCountedPtr<ServiceConfig> service_config() const {
159 return service_config_;
162 grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
163 void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
164 grpc_connectivity_state* state,
165 grpc_closure* on_complete,
166 grpc_closure* watcher_timer_init) {
167 // Will delete itself.
168 New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
171 int NumExternalConnectivityWatchers() const {
172 return external_connectivity_watcher_list_.size();
176 class ConnectivityStateAndPickerSetter;
177 class ServiceConfigSetter;
178 class ClientChannelControlHelper;
180 class ExternalConnectivityWatcher {
184 WatcherList() { gpr_mu_init(&mu_); }
185 ~WatcherList() { gpr_mu_destroy(&mu_); }
188 ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
189 void Add(ExternalConnectivityWatcher* watcher);
190 void Remove(const ExternalConnectivityWatcher* watcher);
193 // head_ is guarded by a mutex, since the size() method needs to
194 // iterate over the list, and it's called from the C-core API
195 // function grpc_channel_num_external_connectivity_watchers(), which
196 // is synchronous and therefore cannot run in the combiner.
198 ExternalConnectivityWatcher* head_ = nullptr;
201 ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
202 grpc_connectivity_state* state,
203 grpc_closure* on_complete,
204 grpc_closure* watcher_timer_init);
206 ~ExternalConnectivityWatcher();
209 static void OnWatchCompleteLocked(void* arg, grpc_error* error);
210 static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
213 grpc_polling_entity pollent_;
214 grpc_connectivity_state* state_;
215 grpc_closure* on_complete_;
216 grpc_closure* watcher_timer_init_;
217 grpc_closure my_closure_;
218 ExternalConnectivityWatcher* next_ = nullptr;
221 ChannelData(grpc_channel_element_args* args, grpc_error** error);
224 static bool ProcessResolverResultLocked(
225 void* arg, const Resolver::Result& result, const char** lb_policy_name,
226 RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
227 grpc_error** service_config_error);
229 grpc_error* DoPingLocked(grpc_transport_op* op);
231 static void StartTransportOpLocked(void* arg, grpc_error* ignored);
233 static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
235 void ProcessLbPolicy(
236 const Resolver::Result& resolver_result,
237 const internal::ClientChannelGlobalParsedObject* parsed_service_config,
238 UniquePtr<char>* lb_policy_name,
239 RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config);
242 // Fields set at construction and never modified.
244 const bool deadline_checking_enabled_;
245 const bool enable_retries_;
246 const size_t per_rpc_retry_buffer_size_;
247 grpc_channel_stack* owning_stack_;
248 ClientChannelFactory* client_channel_factory_;
249 UniquePtr<char> server_name_;
250 RefCountedPtr<ServiceConfig> default_service_config_;
251 // Initialized shortly after construction.
252 channelz::ClientChannelNode* channelz_node_ = nullptr;
255 // Fields used in the data plane. Guarded by data_plane_combiner.
257 grpc_combiner* data_plane_combiner_;
258 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
259 QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
260 // Data from service config.
261 bool received_service_config_data_ = false;
262 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
263 RefCountedPtr<ServiceConfig> service_config_;
266 // Fields used in the control plane. Guarded by combiner.
268 grpc_combiner* combiner_;
269 grpc_pollset_set* interested_parties_;
270 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
271 OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
272 grpc_connectivity_state_tracker state_tracker_;
273 ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
274 UniquePtr<char> health_check_service_name_;
275 RefCountedPtr<ServiceConfig> saved_service_config_;
276 bool received_first_resolver_result_ = false;
279 // Fields accessed from both data plane and control plane combiners.
281 Atomic<grpc_error*> disconnect_error_;
284 // Fields guarded by a mutex, since they need to be accessed
285 // synchronously via get_channel_info().
288 UniquePtr<char> info_lb_policy_name_;
289 UniquePtr<char> info_service_config_json_;
293 // CallData definition
298 static grpc_error* Init(grpc_call_element* elem,
299 const grpc_call_element_args* args);
300 static void Destroy(grpc_call_element* elem,
301 const grpc_call_final_info* final_info,
302 grpc_closure* then_schedule_closure);
303 static void StartTransportStreamOpBatch(
304 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
305 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
307 RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
309 // Invoked by channel for queued picks once resolver results are available.
310 void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
312 // Invoked by channel for queued picks when the picker is updated.
313 static void StartPickLocked(void* arg, grpc_error* error);
316 class QueuedPickCanceller;
318 // State used for starting a retryable batch on a subchannel call.
319 // This provides its own grpc_transport_stream_op_batch and other data
320 // structures needed to populate the ops in the batch.
321 // We allocate one struct on the arena for each attempt at starting a
322 // batch on a given subchannel call.
323 struct SubchannelCallBatchData {
324 // Creates a SubchannelCallBatchData object on the call's arena with the
325 // specified refcount. If set_on_complete is true, the batch's
326 // on_complete callback will be set to point to on_complete();
327 // otherwise, the batch's on_complete callback will be null.
328 static SubchannelCallBatchData* Create(grpc_call_element* elem,
329 int refcount, bool set_on_complete);
332 if (gpr_unref(&refs)) Destroy();
335 SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
336 int refcount, bool set_on_complete);
337 // All dtor code must be added in `Destroy()`. This is because we may
338 // call closures in `SubchannelCallBatchData` after they are unrefed by
339 // `Unref()`, and msan would complain about accessing this class
340 // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
341 // TODO(soheil): We should try to call the dtor in `Unref()`.
342 ~SubchannelCallBatchData() { Destroy(); }
346 grpc_call_element* elem;
347 RefCountedPtr<SubchannelCall> subchannel_call;
348 // The batch to use in the subchannel call.
349 // Its payload field points to SubchannelCallRetryState::batch_payload.
350 grpc_transport_stream_op_batch batch;
351 // For intercepting on_complete.
352 grpc_closure on_complete;
355 // Retry state associated with a subchannel call.
356 // Stored in the parent_data of the subchannel call object.
357 struct SubchannelCallRetryState {
358 explicit SubchannelCallRetryState(grpc_call_context_element* context)
359 : batch_payload(context),
360 started_send_initial_metadata(false),
361 completed_send_initial_metadata(false),
362 started_send_trailing_metadata(false),
363 completed_send_trailing_metadata(false),
364 started_recv_initial_metadata(false),
365 completed_recv_initial_metadata(false),
366 started_recv_trailing_metadata(false),
367 completed_recv_trailing_metadata(false),
368 retry_dispatched(false) {}
370 // SubchannelCallBatchData.batch.payload points to this.
371 grpc_transport_stream_op_batch_payload batch_payload;
372 // For send_initial_metadata.
373 // Note that we need to make a copy of the initial metadata for each
374 // subchannel call instead of just referring to the copy in call_data,
375 // because filters in the subchannel stack will probably add entries,
376 // so we need to start in a pristine state for each attempt of the call.
377 grpc_linked_mdelem* send_initial_metadata_storage;
378 grpc_metadata_batch send_initial_metadata;
380 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
381 ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
382 // For send_trailing_metadata.
383 grpc_linked_mdelem* send_trailing_metadata_storage;
384 grpc_metadata_batch send_trailing_metadata;
385 // For intercepting recv_initial_metadata.
386 grpc_metadata_batch recv_initial_metadata;
387 grpc_closure recv_initial_metadata_ready;
388 bool trailing_metadata_available = false;
389 // For intercepting recv_message.
390 grpc_closure recv_message_ready;
391 OrphanablePtr<ByteStream> recv_message;
392 // For intercepting recv_trailing_metadata.
393 grpc_metadata_batch recv_trailing_metadata;
394 grpc_transport_stream_stats collect_stats;
395 grpc_closure recv_trailing_metadata_ready;
396 // These fields indicate which ops have been started and completed on
397 // this subchannel call.
398 size_t started_send_message_count = 0;
399 size_t completed_send_message_count = 0;
400 size_t started_recv_message_count = 0;
401 size_t completed_recv_message_count = 0;
402 bool started_send_initial_metadata : 1;
403 bool completed_send_initial_metadata : 1;
404 bool started_send_trailing_metadata : 1;
405 bool completed_send_trailing_metadata : 1;
406 bool started_recv_initial_metadata : 1;
407 bool completed_recv_initial_metadata : 1;
408 bool started_recv_trailing_metadata : 1;
409 bool completed_recv_trailing_metadata : 1;
410 // State for callback processing.
411 SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
413 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
414 SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
415 grpc_error* recv_message_error = GRPC_ERROR_NONE;
416 SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
417 // NOTE: Do not move this next to the metadata bitfields above. That would
418 // save space but will also result in a data race because compiler
419 // will generate a 2 byte store which overwrites the meta-data
420 // fields upon setting this field.
421 bool retry_dispatched : 1;
424 // Pending batches stored in call data.
425 struct PendingBatch {
426 // The pending batch. If nullptr, this slot is empty.
427 grpc_transport_stream_op_batch* batch;
428 // Indicates whether payload for send ops has been cached in CallData.
429 bool send_ops_cached;
432 CallData(grpc_call_element* elem, const ChannelData& chand,
433 const grpc_call_element_args& args);
436 // Caches data for send ops so that it can be retried later, if not
438 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
439 void FreeCachedSendInitialMetadata(ChannelData* chand);
440 // Frees cached send_message at index idx.
441 void FreeCachedSendMessage(ChannelData* chand, size_t idx);
442 void FreeCachedSendTrailingMetadata(ChannelData* chand);
443 // Frees cached send ops that have already been completed after
444 // committing the call.
445 void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
446 SubchannelCallRetryState* retry_state);
447 // Frees cached send ops that were completed by the completed batch in
448 // batch_data. Used when batches are completed after the call is committed.
449 void FreeCachedSendOpDataForCompletedBatch(
450 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
451 SubchannelCallRetryState* retry_state);
453 static void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
454 const LoadBalancingPolicy::PickArgs& pick,
455 grpc_transport_stream_op_batch* batch);
457 // Returns the index into pending_batches_ to be used for batch.
458 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
459 void PendingBatchesAdd(grpc_call_element* elem,
460 grpc_transport_stream_op_batch* batch);
461 void PendingBatchClear(PendingBatch* pending);
462 void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
463 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
464 // A predicate type and some useful implementations for PendingBatchesFail().
465 typedef bool (*YieldCallCombinerPredicate)(
466 const CallCombinerClosureList& closures);
467 static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
470 static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
473 static bool YieldCallCombinerIfPendingBatchesFound(
474 const CallCombinerClosureList& closures) {
475 return closures.size() > 0;
477 // Fails all pending batches.
478 // If yield_call_combiner_predicate returns true, assumes responsibility for
479 // yielding the call combiner.
480 void PendingBatchesFail(
481 grpc_call_element* elem, grpc_error* error,
482 YieldCallCombinerPredicate yield_call_combiner_predicate);
483 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
484 // Resumes all pending batches on subchannel_call_.
485 void PendingBatchesResume(grpc_call_element* elem);
486 // Returns a pointer to the first pending batch for which predicate(batch)
487 // returns true, or null if not found.
488 template <typename Predicate>
489 PendingBatch* PendingBatchFind(grpc_call_element* elem,
490 const char* log_message, Predicate predicate);
492 // Commits the call so that no further retry attempts will be performed.
493 void RetryCommit(grpc_call_element* elem,
494 SubchannelCallRetryState* retry_state);
495 // Starts a retry after appropriate back-off.
496 void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
497 grpc_millis server_pushback_ms);
498 // Returns true if the call is being retried.
499 bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
500 grpc_status_code status, grpc_mdelem* server_pushback_md);
502 // Invokes recv_initial_metadata_ready for a subchannel batch.
503 static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
504 // Intercepts recv_initial_metadata_ready callback for retries.
505 // Commits the call and returns the initial metadata up the stack.
506 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
508 // Invokes recv_message_ready for a subchannel batch.
509 static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
510 // Intercepts recv_message_ready callback for retries.
511 // Commits the call and returns the message up the stack.
512 static void RecvMessageReady(void* arg, grpc_error* error);
514 // Sets *status and *server_pushback_md based on md_batch and error.
515 // Only sets *server_pushback_md if server_pushback_md != nullptr.
516 void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
517 grpc_error* error, grpc_status_code* status,
518 grpc_mdelem** server_pushback_md);
519 // Adds recv_trailing_metadata_ready closure to closures.
520 void AddClosureForRecvTrailingMetadataReady(
521 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
522 grpc_error* error, CallCombinerClosureList* closures);
523 // Adds any necessary closures for deferred recv_initial_metadata and
524 // recv_message callbacks to closures.
525 static void AddClosuresForDeferredRecvCallbacks(
526 SubchannelCallBatchData* batch_data,
527 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
528 // Returns true if any op in the batch was not yet started.
529 // Only looks at send ops, since recv ops are always started immediately.
530 bool PendingBatchIsUnstarted(PendingBatch* pending,
531 SubchannelCallRetryState* retry_state);
532 // For any pending batch containing an op that has not yet been started,
533 // adds the pending batch's completion closures to closures.
534 void AddClosuresToFailUnstartedPendingBatches(
535 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
536 grpc_error* error, CallCombinerClosureList* closures);
537 // Runs necessary closures upon completion of a call attempt.
538 void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
540 // Intercepts recv_trailing_metadata_ready callback for retries.
541 // Commits the call and returns the trailing metadata up the stack.
542 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
544 // Adds the on_complete closure for the pending batch completed in
545 // batch_data to closures.
546 void AddClosuresForCompletedPendingBatch(
547 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
548 SubchannelCallRetryState* retry_state, grpc_error* error,
549 CallCombinerClosureList* closures);
551 // If there are any cached ops to replay or pending ops to start on the
552 // subchannel call, adds a closure to closures to invoke
553 // StartRetriableSubchannelBatches().
554 void AddClosuresForReplayOrPendingSendOps(
555 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
556 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
558 // Callback used to intercept on_complete from subchannel calls.
559 // Called only when retries are enabled.
560 static void OnComplete(void* arg, grpc_error* error);
562 static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
563 // Adds a closure to closures that will execute batch in the call combiner.
564 void AddClosureForSubchannelBatch(grpc_call_element* elem,
565 grpc_transport_stream_op_batch* batch,
566 CallCombinerClosureList* closures);
567 // Adds retriable send_initial_metadata op to batch_data.
568 void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
569 SubchannelCallBatchData* batch_data);
570 // Adds retriable send_message op to batch_data.
571 void AddRetriableSendMessageOp(grpc_call_element* elem,
572 SubchannelCallRetryState* retry_state,
573 SubchannelCallBatchData* batch_data);
574 // Adds retriable send_trailing_metadata op to batch_data.
575 void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
576 SubchannelCallBatchData* batch_data);
577 // Adds retriable recv_initial_metadata op to batch_data.
578 void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
579 SubchannelCallBatchData* batch_data);
580 // Adds retriable recv_message op to batch_data.
581 void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
582 SubchannelCallBatchData* batch_data);
583 // Adds retriable recv_trailing_metadata op to batch_data.
584 void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
585 SubchannelCallBatchData* batch_data);
586 // Helper function used to start a recv_trailing_metadata batch. This
587 // is used in the case where a recv_initial_metadata or recv_message
588 // op fails in a way that we know the call is over but when the application
589 // has not yet started its own recv_trailing_metadata op.
590 void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
591 // If there are any cached send ops that need to be replayed on the
592 // current subchannel call, creates and returns a new subchannel batch
593 // to replay those ops. Otherwise, returns nullptr.
594 SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
595 grpc_call_element* elem, SubchannelCallRetryState* retry_state);
596 // Adds subchannel batches for pending batches to closures.
597 void AddSubchannelBatchesForPendingBatches(
598 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
599 CallCombinerClosureList* closures);
600 // Constructs and starts whatever subchannel batches are needed on the
602 static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
604 void CreateSubchannelCall(grpc_call_element* elem);
605 // Invoked when a pick is completed, on both success or failure.
606 static void PickDone(void* arg, grpc_error* error);
607 // Removes the call from the channel's list of queued picks.
608 void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
609 // Adds the call to the channel's list of queued picks.
610 void AddCallToQueuedPicksLocked(grpc_call_element* elem);
611 // Applies service config to the call. Must be invoked once we know
612 // that the resolver has returned results to the channel.
613 void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
615 // State for handling deadlines.
616 // The code in deadline_filter.c requires this to be the first field.
617 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
618 // and this struct both independently store pointers to the call stack
619 // and call combiner. If/when we have time, find a way to avoid this
620 // without breaking the grpc_deadline_state abstraction.
621 grpc_deadline_state deadline_state_;
623 grpc_slice path_; // Request path.
624 gpr_timespec call_start_time_;
625 grpc_millis deadline_;
627 grpc_call_stack* owning_call_;
628 CallCombiner* call_combiner_;
629 grpc_call_context_element* call_context_;
631 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
632 ServiceConfig::CallData service_config_call_data_;
633 const ClientChannelMethodParsedObject* method_params_ = nullptr;
635 RefCountedPtr<SubchannelCall> subchannel_call_;
637 // Set when we get a cancel_stream op.
638 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
640 ChannelData::QueuedPick pick_;
641 bool pick_queued_ = false;
642 bool service_config_applied_ = false;
643 QueuedPickCanceller* pick_canceller_ = nullptr;
644 grpc_closure pick_closure_;
646 grpc_polling_entity* pollent_ = nullptr;
648 // Batches are added to this list when received from above.
649 // They are removed when we are done handling the batch (i.e., when
650 // either we have invoked all of the batch's callbacks or we have
651 // passed the batch down to the subchannel call and are not
652 // intercepting any of its callbacks).
653 PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
654 bool pending_send_initial_metadata_ : 1;
655 bool pending_send_message_ : 1;
656 bool pending_send_trailing_metadata_ : 1;
659 bool enable_retries_ : 1;
660 bool retry_committed_ : 1;
661 bool last_attempt_got_server_pushback_ : 1;
662 int num_attempts_completed_ = 0;
663 size_t bytes_buffered_for_retry_ = 0;
664 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
665 ManualConstructor<BackOff> retry_backoff_;
666 grpc_timer retry_timer_;
668 // The number of pending retriable subchannel batches containing send ops.
669 // We hold a ref to the call stack while this is non-zero, since replay
670 // batches may not complete until after all callbacks have been returned
671 // to the surface, and we need to make sure that the call is not destroyed
672 // until all of these batches have completed.
673 // Note that we actually only need to track replay batches, but it's
674 // easier to track all batches with send ops.
675 int num_pending_retriable_subchannel_send_batches_ = 0;
677 // Cached data for retrying send ops.
678 // send_initial_metadata
679 bool seen_send_initial_metadata_ = false;
680 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
681 grpc_metadata_batch send_initial_metadata_;
682 uint32_t send_initial_metadata_flags_;
683 gpr_atm* peer_string_;
685 // When we get a send_message op, we replace the original byte stream
686 // with a CachingByteStream that caches the slices to a local buffer for
688 // Note: We inline the cache for the first 3 send_message ops and use
689 // dynamic allocation after that. This number was essentially picked
690 // at random; it could be changed in the future to tune performance.
691 InlinedVector<ByteStreamCache*, 3> send_messages_;
692 // send_trailing_metadata
693 bool seen_send_trailing_metadata_ = false;
694 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
695 grpc_metadata_batch send_trailing_metadata_;
699 // ChannelData::ConnectivityStateAndPickerSetter
702 // A fire-and-forget class that sets the channel's connectivity state
703 // and then hops into the data plane combiner to update the picker.
704 // Must be instantiated while holding the control plane combiner.
705 // Deletes itself when done.
706 class ChannelData::ConnectivityStateAndPickerSetter {
708 ConnectivityStateAndPickerSetter(
709 ChannelData* chand, grpc_connectivity_state state, const char* reason,
710 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
711 : chand_(chand), picker_(std::move(picker)) {
712 // Update connectivity state here, while holding control plane combiner.
713 grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
714 if (chand->channelz_node_ != nullptr) {
715 chand->channelz_node_->AddTraceEvent(
716 channelz::ChannelTrace::Severity::Info,
717 grpc_slice_from_static_string(
718 GetChannelConnectivityStateChangeString(state)));
720 // Bounce into the data plane combiner to reset the picker.
721 GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
722 "ConnectivityStateAndPickerSetter");
723 GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
724 grpc_combiner_scheduler(chand->data_plane_combiner_));
725 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
729 static const char* GetChannelConnectivityStateChangeString(
730 grpc_connectivity_state state) {
732 case GRPC_CHANNEL_IDLE:
733 return "Channel state change to IDLE";
734 case GRPC_CHANNEL_CONNECTING:
735 return "Channel state change to CONNECTING";
736 case GRPC_CHANNEL_READY:
737 return "Channel state change to READY";
738 case GRPC_CHANNEL_TRANSIENT_FAILURE:
739 return "Channel state change to TRANSIENT_FAILURE";
740 case GRPC_CHANNEL_SHUTDOWN:
741 return "Channel state change to SHUTDOWN";
743 GPR_UNREACHABLE_CODE(return "UNKNOWN");
746 static void SetPicker(void* arg, grpc_error* ignored) {
747 auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
749 self->chand_->picker_ = std::move(self->picker_);
750 // Re-process queued picks.
751 for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
753 CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
756 GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
757 "ConnectivityStateAndPickerSetter");
762 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
763 grpc_closure closure_;
767 // ChannelData::ServiceConfigSetter
770 // A fire-and-forget class that sets the channel's service config data
771 // in the data plane combiner. Deletes itself when done.
772 class ChannelData::ServiceConfigSetter {
776 Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
778 RefCountedPtr<ServiceConfig> service_config)
780 retry_throttle_data_(retry_throttle_data),
781 service_config_(std::move(service_config)) {
782 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
783 GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
784 grpc_combiner_scheduler(chand->data_plane_combiner_));
785 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
789 static void SetServiceConfigData(void* arg, grpc_error* ignored) {
790 ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
791 ChannelData* chand = self->chand_;
792 // Update channel state.
793 chand->received_service_config_data_ = true;
794 if (self->retry_throttle_data_.has_value()) {
795 chand->retry_throttle_data_ =
796 internal::ServerRetryThrottleMap::GetDataForServer(
797 chand->server_name_.get(),
798 self->retry_throttle_data_.value().max_milli_tokens,
799 self->retry_throttle_data_.value().milli_token_ratio);
801 chand->service_config_ = std::move(self->service_config_);
802 // Apply service config to queued picks.
803 for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
805 CallData* calld = static_cast<CallData*>(pick->elem->call_data);
806 calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
809 GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
810 "ServiceConfigSetter");
815 Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
816 retry_throttle_data_;
817 RefCountedPtr<ServiceConfig> service_config_;
818 grpc_closure closure_;
822 // ChannelData::ExternalConnectivityWatcher::WatcherList
825 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
826 MutexLock lock(&mu_);
828 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
834 ChannelData::ExternalConnectivityWatcher*
835 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
836 grpc_closure* on_complete) const {
837 MutexLock lock(&mu_);
838 ExternalConnectivityWatcher* w = head_;
839 while (w != nullptr && w->on_complete_ != on_complete) {
845 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
846 ExternalConnectivityWatcher* watcher) {
847 GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
848 MutexLock lock(&mu_);
849 GPR_ASSERT(watcher->next_ == nullptr);
850 watcher->next_ = head_;
854 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
855 const ExternalConnectivityWatcher* watcher) {
856 MutexLock lock(&mu_);
857 if (watcher == head_) {
858 head_ = watcher->next_;
861 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
862 if (w->next_ == watcher) {
863 w->next_ = w->next_->next_;
867 GPR_UNREACHABLE_CODE(return );
871 // ChannelData::ExternalConnectivityWatcher
874 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
875 ChannelData* chand, grpc_polling_entity pollent,
876 grpc_connectivity_state* state, grpc_closure* on_complete,
877 grpc_closure* watcher_timer_init)
881 on_complete_(on_complete),
882 watcher_timer_init_(watcher_timer_init) {
883 grpc_polling_entity_add_to_pollset_set(&pollent_,
884 chand_->interested_parties_);
885 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
887 GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
888 grpc_combiner_scheduler(chand_->combiner_)),
892 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
893 grpc_polling_entity_del_from_pollset_set(&pollent_,
894 chand_->interested_parties_);
895 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
896 "ExternalConnectivityWatcher");
899 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
900 void* arg, grpc_error* error) {
901 ExternalConnectivityWatcher* self =
902 static_cast<ExternalConnectivityWatcher*>(arg);
903 grpc_closure* on_complete = self->on_complete_;
904 self->chand_->external_connectivity_watcher_list_.Remove(self);
906 GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
909 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
910 void* arg, grpc_error* ignored) {
911 ExternalConnectivityWatcher* self =
912 static_cast<ExternalConnectivityWatcher*>(arg);
913 if (self->state_ == nullptr) {
914 // Handle cancellation.
915 GPR_ASSERT(self->watcher_timer_init_ == nullptr);
916 ExternalConnectivityWatcher* found =
917 self->chand_->external_connectivity_watcher_list_.Lookup(
919 if (found != nullptr) {
920 grpc_connectivity_state_notify_on_state_change(
921 &found->chand_->state_tracker_, nullptr, &found->my_closure_);
927 self->chand_->external_connectivity_watcher_list_.Add(self);
928 // This assumes that the closure is scheduled on the ExecCtx scheduler
929 // and that GRPC_CLOSURE_RUN would run the closure immediately.
930 GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
931 GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
932 grpc_combiner_scheduler(self->chand_->combiner_));
933 grpc_connectivity_state_notify_on_state_change(
934 &self->chand_->state_tracker_, self->state_, &self->my_closure_);
938 // ChannelData::ClientChannelControlHelper
941 class ChannelData::ClientChannelControlHelper
942 : public LoadBalancingPolicy::ChannelControlHelper {
944 explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
945 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
948 ~ClientChannelControlHelper() override {
949 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
950 "ClientChannelControlHelper");
953 Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
954 grpc_arg args_to_add[2];
955 int num_args_to_add = 0;
956 if (chand_->health_check_service_name_ != nullptr) {
957 args_to_add[0] = grpc_channel_arg_string_create(
958 const_cast<char*>("grpc.temp.health_check"),
959 const_cast<char*>(chand_->health_check_service_name_.get()));
962 args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg(
963 chand_->subchannel_pool_.get());
964 grpc_channel_args* new_args =
965 grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add);
966 Subchannel* subchannel =
967 chand_->client_channel_factory_->CreateSubchannel(new_args);
968 grpc_channel_args_destroy(new_args);
972 grpc_channel* CreateChannel(const char* target,
973 const grpc_channel_args& args) override {
974 return chand_->client_channel_factory_->CreateChannel(target, &args);
978 grpc_connectivity_state state,
979 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
980 grpc_error* disconnect_error =
981 chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
982 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
983 const char* extra = disconnect_error == GRPC_ERROR_NONE
985 : " (ignoring -- channel shutting down)";
986 gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
987 grpc_connectivity_state_name(state), picker.get(), extra);
989 // Do update only if not shutting down.
990 if (disconnect_error == GRPC_ERROR_NONE) {
991 // Will delete itself.
992 New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
997 // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
998 void RequestReresolution() override {}
1001 ChannelData* chand_;
1005 // ChannelData implementation
1008 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1009 grpc_channel_element_args* args) {
1010 GPR_ASSERT(args->is_last);
1011 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1012 grpc_error* error = GRPC_ERROR_NONE;
1013 new (elem->channel_data) ChannelData(args, &error);
1017 void ChannelData::Destroy(grpc_channel_element* elem) {
1018 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1019 chand->~ChannelData();
1022 bool GetEnableRetries(const grpc_channel_args* args) {
1023 return grpc_channel_arg_get_bool(
1024 grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1027 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1028 return static_cast<size_t>(grpc_channel_arg_get_integer(
1029 grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1030 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1033 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1034 const grpc_channel_args* args) {
1035 const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1036 grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1037 if (use_local_subchannel_pool) {
1038 return MakeRefCounted<LocalSubchannelPool>();
1040 return GlobalSubchannelPool::instance();
1043 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1044 : deadline_checking_enabled_(
1045 grpc_deadline_checking_enabled(args->channel_args)),
1046 enable_retries_(GetEnableRetries(args->channel_args)),
1047 per_rpc_retry_buffer_size_(
1048 GetMaxPerRpcRetryBufferSize(args->channel_args)),
1049 owning_stack_(args->channel_stack),
1050 client_channel_factory_(
1051 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1052 data_plane_combiner_(grpc_combiner_create()),
1053 combiner_(grpc_combiner_create()),
1054 interested_parties_(grpc_pollset_set_create()),
1055 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1056 disconnect_error_(GRPC_ERROR_NONE) {
1057 // Initialize data members.
1058 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1060 gpr_mu_init(&info_mu_);
1061 // Start backup polling.
1062 grpc_client_channel_start_backup_polling(interested_parties_);
1063 // Check client channel factory.
1064 if (client_channel_factory_ == nullptr) {
1065 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1066 "Missing client channel factory in args for client channel filter");
1069 // Get server name to resolve, using proxy mapper if needed.
1070 const char* server_uri = grpc_channel_arg_get_string(
1071 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1072 if (server_uri == nullptr) {
1073 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1074 "server URI channel arg missing or wrong type in client channel "
1078 // Get default service config
1079 const char* service_config_json = grpc_channel_arg_get_string(
1080 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1081 // TODO(yashkt): Make sure we set the channel in TRANSIENT_FAILURE on an
1082 // invalid default service config
1083 if (service_config_json != nullptr) {
1084 *error = GRPC_ERROR_NONE;
1085 default_service_config_ = ServiceConfig::Create(service_config_json, error);
1086 if (*error != GRPC_ERROR_NONE) {
1087 default_service_config_.reset();
1091 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1092 if (uri != nullptr && uri->path[0] != '\0') {
1094 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1096 grpc_uri_destroy(uri);
1097 char* proxy_name = nullptr;
1098 grpc_channel_args* new_args = nullptr;
1099 grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1101 UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
1102 : gpr_strdup(server_uri));
1103 // Instantiate resolving LB policy.
1104 LoadBalancingPolicy::Args lb_args;
1105 lb_args.combiner = combiner_;
1106 lb_args.channel_control_helper =
1107 UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1108 New<ClientChannelControlHelper>(this));
1109 lb_args.args = new_args != nullptr ? new_args : args->channel_args;
1110 resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1111 std::move(lb_args), &grpc_client_channel_routing_trace,
1112 std::move(target_uri), ProcessResolverResultLocked, this, error));
1113 grpc_channel_args_destroy(new_args);
1114 if (*error != GRPC_ERROR_NONE) {
1115 // Orphan the resolving LB policy and flush the exec_ctx to ensure
1116 // that it finishes shutting down. This ensures that if we are
1117 // failing, we destroy the ClientChannelControlHelper (and thus
1118 // unref the channel stack) before we return.
1119 // TODO(roth): This is not a complete solution, because it only
1120 // catches the case where channel stack initialization fails in this
1121 // particular filter. If there is a failure in a different filter, we
1122 // will leave a dangling ref here, which can cause a crash. Fortunately,
1123 // in practice, there are no other filters that can cause failures in
1124 // channel stack initialization, so this works for now.
1125 resolving_lb_policy_.reset();
1126 ExecCtx::Get()->Flush();
1128 grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1129 interested_parties_);
1130 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1131 gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1132 resolving_lb_policy_.get());
1137 ChannelData::~ChannelData() {
1138 if (resolving_lb_policy_ != nullptr) {
1139 grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1140 interested_parties_);
1141 resolving_lb_policy_.reset();
1143 // Stop backup polling.
1144 grpc_client_channel_stop_backup_polling(interested_parties_);
1145 grpc_pollset_set_destroy(interested_parties_);
1146 GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
1147 GRPC_COMBINER_UNREF(combiner_, "client_channel");
1148 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1149 grpc_connectivity_state_destroy(&state_tracker_);
1150 gpr_mu_destroy(&info_mu_);
1153 void ChannelData::ProcessLbPolicy(
1154 const Resolver::Result& resolver_result,
1155 const internal::ClientChannelGlobalParsedObject* parsed_service_config,
1156 UniquePtr<char>* lb_policy_name,
1157 RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config) {
1158 // Prefer the LB policy name found in the service config.
1159 if (parsed_service_config != nullptr &&
1160 parsed_service_config->parsed_lb_config() != nullptr) {
1161 lb_policy_name->reset(
1162 gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1163 *lb_policy_config = parsed_service_config->parsed_lb_config();
1166 const char* local_policy_name = nullptr;
1167 if (parsed_service_config != nullptr &&
1168 parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1169 local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1171 const grpc_arg* channel_arg =
1172 grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1173 local_policy_name = grpc_channel_arg_get_string(channel_arg);
1175 // Special case: If at least one balancer address is present, we use
1176 // the grpclb policy, regardless of what the resolver has returned.
1177 bool found_balancer_address = false;
1178 for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1179 const ServerAddress& address = resolver_result.addresses[i];
1180 if (address.IsBalancer()) {
1181 found_balancer_address = true;
1185 if (found_balancer_address) {
1186 if (local_policy_name != nullptr &&
1187 strcmp(local_policy_name, "grpclb") != 0) {
1189 "resolver requested LB policy %s but provided at least one "
1190 "balancer address -- forcing use of grpclb LB policy",
1193 local_policy_name = "grpclb";
1195 // Use pick_first if nothing was specified and we didn't select grpclb
1197 lb_policy_name->reset(gpr_strdup(
1198 local_policy_name == nullptr ? "pick_first" : local_policy_name));
1201 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1202 // resolver result update.
1203 bool ChannelData::ProcessResolverResultLocked(
1204 void* arg, const Resolver::Result& result, const char** lb_policy_name,
1205 RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
1206 grpc_error** service_config_error) {
1207 ChannelData* chand = static_cast<ChannelData*>(arg);
1208 RefCountedPtr<ServiceConfig> service_config;
1209 // If resolver did not return a service config or returned an invalid service
1210 // config, we need a fallback service config.
1211 if (result.service_config_error != GRPC_ERROR_NONE) {
1212 // If the service config was invalid, then fallback to the saved service
1213 // config. If there is no saved config either, use the default service
1215 if (chand->saved_service_config_ != nullptr) {
1216 service_config = chand->saved_service_config_;
1217 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1219 "chand=%p: resolver returned invalid service config. "
1220 "Continuing to use previous service config.",
1223 } else if (chand->default_service_config_ != nullptr) {
1224 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1226 "chand=%p: resolver returned invalid service config. Using "
1227 "default service config provided by client API.",
1230 service_config = chand->default_service_config_;
1232 } else if (result.service_config == nullptr) {
1233 if (chand->default_service_config_ != nullptr) {
1234 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1236 "chand=%p: resolver returned no service config. Using default "
1237 "service config provided by client API.",
1240 service_config = chand->default_service_config_;
1243 service_config = result.service_config;
1245 *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1246 if (service_config == nullptr &&
1247 result.service_config_error != GRPC_ERROR_NONE) {
1250 UniquePtr<char> service_config_json;
1251 // Process service config.
1252 const internal::ClientChannelGlobalParsedObject* parsed_service_config =
1254 if (service_config != nullptr) {
1255 parsed_service_config =
1256 static_cast<const internal::ClientChannelGlobalParsedObject*>(
1257 service_config->GetParsedGlobalServiceConfigObject(
1258 internal::ClientChannelServiceConfigParser::ParserIndex()));
1260 const bool service_config_changed =
1261 ((service_config == nullptr) !=
1262 (chand->saved_service_config_ == nullptr)) ||
1263 (service_config != nullptr &&
1264 strcmp(service_config->service_config_json(),
1265 chand->saved_service_config_->service_config_json()) != 0);
1266 if (service_config_changed) {
1267 service_config_json.reset(gpr_strdup(
1268 service_config != nullptr ? service_config->service_config_json()
1270 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1272 "chand=%p: resolver returned updated service config: \"%s\"",
1273 chand, service_config_json.get());
1275 chand->saved_service_config_ = std::move(service_config);
1276 if (parsed_service_config != nullptr) {
1277 chand->health_check_service_name_.reset(
1278 gpr_strdup(parsed_service_config->health_check_service_name()));
1280 chand->health_check_service_name_.reset();
1283 // We want to set the service config at least once. This should not really be
1284 // needed, but we are doing it as a defensive approach. This can be removed,
1285 // if we feel it is unnecessary.
1286 if (service_config_changed || !chand->received_first_resolver_result_) {
1287 chand->received_first_resolver_result_ = true;
1288 Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
1289 retry_throttle_data;
1290 if (parsed_service_config != nullptr) {
1291 retry_throttle_data = parsed_service_config->retry_throttling();
1293 // Create service config setter to update channel state in the data
1294 // plane combiner. Destroys itself when done.
1295 New<ServiceConfigSetter>(chand, retry_throttle_data,
1296 chand->saved_service_config_);
1298 UniquePtr<char> processed_lb_policy_name;
1299 chand->ProcessLbPolicy(result, parsed_service_config,
1300 &processed_lb_policy_name, lb_policy_config);
1301 // Swap out the data used by GetChannelInfo().
1303 MutexLock lock(&chand->info_mu_);
1304 chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1305 if (service_config_json != nullptr) {
1306 chand->info_service_config_json_ = std::move(service_config_json);
1310 *lb_policy_name = chand->info_lb_policy_name_.get();
1311 return service_config_changed;
1314 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1315 if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1316 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1318 LoadBalancingPolicy::PickArgs pick;
1319 grpc_error* error = GRPC_ERROR_NONE;
1320 picker_->Pick(&pick, &error);
1321 if (pick.connected_subchannel != nullptr) {
1322 pick.connected_subchannel->Ping(op->send_ping.on_initiate,
1323 op->send_ping.on_ack);
1325 if (error == GRPC_ERROR_NONE) {
1326 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1327 "LB policy dropped call on ping");
1333 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1334 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1335 grpc_channel_element* elem =
1336 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1337 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1338 // Connectivity watch.
1339 if (op->on_connectivity_state_change != nullptr) {
1340 grpc_connectivity_state_notify_on_state_change(
1341 &chand->state_tracker_, op->connectivity_state,
1342 op->on_connectivity_state_change);
1343 op->on_connectivity_state_change = nullptr;
1344 op->connectivity_state = nullptr;
1347 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1348 grpc_error* error = chand->DoPingLocked(op);
1349 if (error != GRPC_ERROR_NONE) {
1350 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1351 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1353 op->bind_pollset = nullptr;
1354 op->send_ping.on_initiate = nullptr;
1355 op->send_ping.on_ack = nullptr;
1358 if (op->reset_connect_backoff) {
1359 if (chand->resolving_lb_policy_ != nullptr) {
1360 chand->resolving_lb_policy_->ResetBackoffLocked();
1364 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1365 grpc_error* error = GRPC_ERROR_NONE;
1366 GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
1367 &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
1368 MemoryOrder::ACQUIRE));
1369 grpc_pollset_set_del_pollset_set(
1370 chand->resolving_lb_policy_->interested_parties(),
1371 chand->interested_parties_);
1372 chand->resolving_lb_policy_.reset();
1373 // Will delete itself.
1374 New<ConnectivityStateAndPickerSetter>(
1375 chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1376 UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1377 New<LoadBalancingPolicy::TransientFailurePicker>(
1378 GRPC_ERROR_REF(op->disconnect_with_error))));
1380 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1381 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1384 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1385 grpc_transport_op* op) {
1386 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1387 GPR_ASSERT(op->set_accept_stream == false);
1388 // Handle bind_pollset.
1389 if (op->bind_pollset != nullptr) {
1390 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1392 // Pop into control plane combiner for remaining ops.
1393 op->handler_private.extra_arg = elem;
1394 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1396 GRPC_CLOSURE_INIT(&op->handler_private.closure,
1397 ChannelData::StartTransportOpLocked, op,
1398 grpc_combiner_scheduler(chand->combiner_)),
1402 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1403 const grpc_channel_info* info) {
1404 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1405 MutexLock lock(&chand->info_mu_);
1406 if (info->lb_policy_name != nullptr) {
1407 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1409 if (info->service_config_json != nullptr) {
1410 *info->service_config_json =
1411 gpr_strdup(chand->info_service_config_json_.get());
1415 void ChannelData::AddQueuedPick(QueuedPick* pick,
1416 grpc_polling_entity* pollent) {
1417 // Add call to queued picks list.
1418 pick->next = queued_picks_;
1419 queued_picks_ = pick;
1420 // Add call's pollent to channel's interested_parties, so that I/O
1421 // can be done under the call's CQ.
1422 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1425 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1426 grpc_polling_entity* pollent) {
1427 // Remove call's pollent from channel's interested_parties.
1428 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1429 // Remove from queued picks list.
1430 for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1431 pick = &(*pick)->next) {
1432 if (*pick == to_remove) {
1433 *pick = to_remove->next;
1439 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1440 auto* chand = static_cast<ChannelData*>(arg);
1441 if (chand->resolving_lb_policy_ != nullptr) {
1442 chand->resolving_lb_policy_->ExitIdleLocked();
1444 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1447 grpc_connectivity_state ChannelData::CheckConnectivityState(
1448 bool try_to_connect) {
1449 grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1450 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1451 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1452 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1453 grpc_combiner_scheduler(combiner_)),
1460 // CallData implementation
1465 // In order to support retries, we act as a proxy for stream op batches.
1466 // When we get a batch from the surface, we add it to our list of pending
1467 // batches, and we then use those batches to construct separate "child"
1468 // batches to be started on the subchannel call. When the child batches
1469 // return, we then decide which pending batches have been completed and
1470 // schedule their callbacks accordingly. If a subchannel call fails and
1471 // we want to retry it, we do a new pick and start again, constructing
1472 // new "child" batches for the new subchannel call.
1474 // Note that retries are committed when receiving data from the server
1475 // (except for Trailers-Only responses). However, there may be many
1476 // send ops started before receiving any data, so we may have already
1477 // completed some number of send ops (and returned the completions up to
1478 // the surface) by the time we realize that we need to retry. To deal
1479 // with this, we cache data for send ops, so that we can replay them on a
1480 // different subchannel call even after we have completed the original
1483 // There are two sets of data to maintain:
1484 // - In call_data (in the parent channel), we maintain a list of pending
1485 // ops and cached data for send ops.
1486 // - In the subchannel call, we maintain state to indicate what ops have
1487 // already been sent down to that call.
1489 // When constructing the "child" batches, we compare those two sets of
1490 // data to see which batches need to be sent to the subchannel call.
1492 // TODO(roth): In subsequent PRs:
1493 // - add support for transparent retries (including initial metadata)
1494 // - figure out how to record stats in census for retries
1495 // (census filter is on top of this one)
1496 // - add census stats for retries
1498 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1499 const grpc_call_element_args& args)
1500 : deadline_state_(elem, args.call_stack, args.call_combiner,
1501 GPR_LIKELY(chand.deadline_checking_enabled())
1503 : GRPC_MILLIS_INF_FUTURE),
1504 path_(grpc_slice_ref_internal(args.path)),
1505 call_start_time_(args.start_time),
1506 deadline_(args.deadline),
1508 owning_call_(args.call_stack),
1509 call_combiner_(args.call_combiner),
1510 call_context_(args.context),
1511 pending_send_initial_metadata_(false),
1512 pending_send_message_(false),
1513 pending_send_trailing_metadata_(false),
1514 enable_retries_(chand.enable_retries()),
1515 retry_committed_(false),
1516 last_attempt_got_server_pushback_(false) {}
1518 CallData::~CallData() {
1519 grpc_slice_unref_internal(path_);
1520 GRPC_ERROR_UNREF(cancel_error_);
1521 // Make sure there are no remaining pending batches.
1522 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1523 GPR_ASSERT(pending_batches_[i].batch == nullptr);
1527 grpc_error* CallData::Init(grpc_call_element* elem,
1528 const grpc_call_element_args* args) {
1529 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1530 new (elem->call_data) CallData(elem, *chand, *args);
1531 return GRPC_ERROR_NONE;
1534 void CallData::Destroy(grpc_call_element* elem,
1535 const grpc_call_final_info* final_info,
1536 grpc_closure* then_schedule_closure) {
1537 CallData* calld = static_cast<CallData*>(elem->call_data);
1538 if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1539 calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1540 then_schedule_closure = nullptr;
1543 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1546 void CallData::StartTransportStreamOpBatch(
1547 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1548 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1549 CallData* calld = static_cast<CallData*>(elem->call_data);
1550 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1551 if (GPR_LIKELY(chand->deadline_checking_enabled())) {
1552 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1554 // If we've previously been cancelled, immediately fail any new batches.
1555 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1556 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1557 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1558 chand, calld, grpc_error_string(calld->cancel_error_));
1560 // Note: This will release the call combiner.
1561 grpc_transport_stream_op_batch_finish_with_failure(
1562 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1565 // Handle cancellation.
1566 if (GPR_UNLIKELY(batch->cancel_stream)) {
1567 // Stash a copy of cancel_error in our call data, so that we can use
1568 // it for subsequent operations. This ensures that if the call is
1569 // cancelled before any batches are passed down (e.g., if the deadline
1570 // is in the past when the call starts), we can return the right
1571 // error to the caller when the first batch does get passed down.
1572 GRPC_ERROR_UNREF(calld->cancel_error_);
1573 calld->cancel_error_ =
1574 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1575 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1576 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1577 calld, grpc_error_string(calld->cancel_error_));
1579 // If we do not have a subchannel call (i.e., a pick has not yet
1580 // been started), fail all pending batches. Otherwise, send the
1581 // cancellation down to the subchannel call.
1582 if (calld->subchannel_call_ == nullptr) {
1583 // TODO(roth): If there is a pending retry callback, do we need to
1585 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1586 NoYieldCallCombiner);
1587 // Note: This will release the call combiner.
1588 grpc_transport_stream_op_batch_finish_with_failure(
1589 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1591 // Note: This will release the call combiner.
1592 calld->subchannel_call_->StartTransportStreamOpBatch(batch);
1596 // Add the batch to the pending list.
1597 calld->PendingBatchesAdd(elem, batch);
1598 // Check if we've already gotten a subchannel call.
1599 // Note that once we have completed the pick, we do not need to enter
1600 // the channel combiner, which is more efficient (especially for
1601 // streaming calls).
1602 if (calld->subchannel_call_ != nullptr) {
1603 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1605 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
1606 calld, calld->subchannel_call_.get());
1608 calld->PendingBatchesResume(elem);
1611 // We do not yet have a subchannel call.
1612 // For batches containing a send_initial_metadata op, enter the channel
1613 // combiner to start a pick.
1614 if (GPR_LIKELY(batch->send_initial_metadata)) {
1615 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1616 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
1621 &batch->handler_private.closure, StartPickLocked, elem,
1622 grpc_combiner_scheduler(chand->data_plane_combiner())),
1625 // For all other batches, release the call combiner.
1626 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1628 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1631 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1632 "batch does not include send_initial_metadata");
1636 void CallData::SetPollent(grpc_call_element* elem,
1637 grpc_polling_entity* pollent) {
1638 CallData* calld = static_cast<CallData*>(elem->call_data);
1639 calld->pollent_ = pollent;
1643 // send op data caching
1646 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1647 if (pending->send_ops_cached) return;
1648 pending->send_ops_cached = true;
1649 grpc_transport_stream_op_batch* batch = pending->batch;
1650 // Save a copy of metadata for send_initial_metadata ops.
1651 if (batch->send_initial_metadata) {
1652 seen_send_initial_metadata_ = true;
1653 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1654 grpc_metadata_batch* send_initial_metadata =
1655 batch->payload->send_initial_metadata.send_initial_metadata;
1656 send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1657 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1658 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1659 send_initial_metadata_storage_);
1660 send_initial_metadata_flags_ =
1661 batch->payload->send_initial_metadata.send_initial_metadata_flags;
1662 peer_string_ = batch->payload->send_initial_metadata.peer_string;
1664 // Set up cache for send_message ops.
1665 if (batch->send_message) {
1666 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1667 std::move(batch->payload->send_message.send_message));
1668 send_messages_.push_back(cache);
1670 // Save metadata batch for send_trailing_metadata ops.
1671 if (batch->send_trailing_metadata) {
1672 seen_send_trailing_metadata_ = true;
1673 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1674 grpc_metadata_batch* send_trailing_metadata =
1675 batch->payload->send_trailing_metadata.send_trailing_metadata;
1676 send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1677 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1678 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1679 send_trailing_metadata_storage_);
1683 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
1684 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1686 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1689 grpc_metadata_batch_destroy(&send_initial_metadata_);
1692 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
1693 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1695 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1698 send_messages_[idx]->Destroy();
1701 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
1702 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1704 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1707 grpc_metadata_batch_destroy(&send_trailing_metadata_);
1710 void CallData::FreeCachedSendOpDataAfterCommit(
1711 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
1712 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1713 if (retry_state->completed_send_initial_metadata) {
1714 FreeCachedSendInitialMetadata(chand);
1716 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1717 FreeCachedSendMessage(chand, i);
1719 if (retry_state->completed_send_trailing_metadata) {
1720 FreeCachedSendTrailingMetadata(chand);
1724 void CallData::FreeCachedSendOpDataForCompletedBatch(
1725 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
1726 SubchannelCallRetryState* retry_state) {
1727 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1728 if (batch_data->batch.send_initial_metadata) {
1729 FreeCachedSendInitialMetadata(chand);
1731 if (batch_data->batch.send_message) {
1732 FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
1734 if (batch_data->batch.send_trailing_metadata) {
1735 FreeCachedSendTrailingMetadata(chand);
1740 // LB recv_trailing_metadata_ready handling
1743 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
1744 const LoadBalancingPolicy::PickArgs& pick,
1745 grpc_transport_stream_op_batch* batch) {
1746 if (pick.recv_trailing_metadata_ready != nullptr) {
1747 *pick.original_recv_trailing_metadata_ready =
1748 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1749 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1750 pick.recv_trailing_metadata_ready;
1751 if (pick.recv_trailing_metadata != nullptr) {
1752 *pick.recv_trailing_metadata =
1753 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
1759 // pending_batches management
1762 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
1763 // Note: It is important the send_initial_metadata be the first entry
1764 // here, since the code in pick_subchannel_locked() assumes it will be.
1765 if (batch->send_initial_metadata) return 0;
1766 if (batch->send_message) return 1;
1767 if (batch->send_trailing_metadata) return 2;
1768 if (batch->recv_initial_metadata) return 3;
1769 if (batch->recv_message) return 4;
1770 if (batch->recv_trailing_metadata) return 5;
1771 GPR_UNREACHABLE_CODE(return (size_t)-1);
1774 // This is called via the call combiner, so access to calld is synchronized.
1775 void CallData::PendingBatchesAdd(grpc_call_element* elem,
1776 grpc_transport_stream_op_batch* batch) {
1777 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1778 const size_t idx = GetBatchIndex(batch);
1779 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1781 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1784 PendingBatch* pending = &pending_batches_[idx];
1785 GPR_ASSERT(pending->batch == nullptr);
1786 pending->batch = batch;
1787 pending->send_ops_cached = false;
1788 if (enable_retries_) {
1789 // Update state in calld about pending batches.
1790 // Also check if the batch takes us over the retry buffer limit.
1791 // Note: We don't check the size of trailing metadata here, because
1792 // gRPC clients do not send trailing metadata.
1793 if (batch->send_initial_metadata) {
1794 pending_send_initial_metadata_ = true;
1795 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1796 batch->payload->send_initial_metadata.send_initial_metadata);
1798 if (batch->send_message) {
1799 pending_send_message_ = true;
1800 bytes_buffered_for_retry_ +=
1801 batch->payload->send_message.send_message->length();
1803 if (batch->send_trailing_metadata) {
1804 pending_send_trailing_metadata_ = true;
1806 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
1807 chand->per_rpc_retry_buffer_size())) {
1808 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1810 "chand=%p calld=%p: exceeded retry buffer size, committing",
1813 SubchannelCallRetryState* retry_state =
1814 subchannel_call_ == nullptr ? nullptr
1815 : static_cast<SubchannelCallRetryState*>(
1816 subchannel_call_->GetParentData());
1817 RetryCommit(elem, retry_state);
1818 // If we are not going to retry and have not yet started, pretend
1819 // retries are disabled so that we don't bother with retry overhead.
1820 if (num_attempts_completed_ == 0) {
1821 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1823 "chand=%p calld=%p: disabling retries before first attempt",
1826 enable_retries_ = false;
1832 void CallData::PendingBatchClear(PendingBatch* pending) {
1833 if (enable_retries_) {
1834 if (pending->batch->send_initial_metadata) {
1835 pending_send_initial_metadata_ = false;
1837 if (pending->batch->send_message) {
1838 pending_send_message_ = false;
1840 if (pending->batch->send_trailing_metadata) {
1841 pending_send_trailing_metadata_ = false;
1844 pending->batch = nullptr;
1847 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
1848 PendingBatch* pending) {
1849 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1850 grpc_transport_stream_op_batch* batch = pending->batch;
1851 // We clear the pending batch if all of its callbacks have been
1852 // scheduled and reset to nullptr.
1853 if (batch->on_complete == nullptr &&
1854 (!batch->recv_initial_metadata ||
1855 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1857 (!batch->recv_message ||
1858 batch->payload->recv_message.recv_message_ready == nullptr) &&
1859 (!batch->recv_trailing_metadata ||
1860 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1862 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1863 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
1866 PendingBatchClear(pending);
1870 // This is called via the call combiner, so access to calld is synchronized.
1871 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
1872 grpc_transport_stream_op_batch* batch =
1873 static_cast<grpc_transport_stream_op_batch*>(arg);
1874 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
1875 // Note: This will release the call combiner.
1876 grpc_transport_stream_op_batch_finish_with_failure(
1877 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
1880 // This is called via the call combiner, so access to calld is synchronized.
1881 void CallData::PendingBatchesFail(
1882 grpc_call_element* elem, grpc_error* error,
1883 YieldCallCombinerPredicate yield_call_combiner_predicate) {
1884 GPR_ASSERT(error != GRPC_ERROR_NONE);
1885 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1886 size_t num_batches = 0;
1887 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1888 if (pending_batches_[i].batch != nullptr) ++num_batches;
1891 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1892 elem->channel_data, this, num_batches, grpc_error_string(error));
1894 CallCombinerClosureList closures;
1895 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1896 PendingBatch* pending = &pending_batches_[i];
1897 grpc_transport_stream_op_batch* batch = pending->batch;
1898 if (batch != nullptr) {
1899 if (batch->recv_trailing_metadata) {
1900 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
1903 batch->handler_private.extra_arg = this;
1904 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1905 FailPendingBatchInCallCombiner, batch,
1906 grpc_schedule_on_exec_ctx);
1907 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1908 "PendingBatchesFail");
1909 PendingBatchClear(pending);
1912 if (yield_call_combiner_predicate(closures)) {
1913 closures.RunClosures(call_combiner_);
1915 closures.RunClosuresWithoutYielding(call_combiner_);
1917 GRPC_ERROR_UNREF(error);
1920 // This is called via the call combiner, so access to calld is synchronized.
1921 void CallData::ResumePendingBatchInCallCombiner(void* arg,
1922 grpc_error* ignored) {
1923 grpc_transport_stream_op_batch* batch =
1924 static_cast<grpc_transport_stream_op_batch*>(arg);
1925 SubchannelCall* subchannel_call =
1926 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
1927 // Note: This will release the call combiner.
1928 subchannel_call->StartTransportStreamOpBatch(batch);
1931 // This is called via the call combiner, so access to calld is synchronized.
1932 void CallData::PendingBatchesResume(grpc_call_element* elem) {
1933 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1934 if (enable_retries_) {
1935 StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
1938 // Retries not enabled; send down batches as-is.
1939 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1940 size_t num_batches = 0;
1941 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1942 if (pending_batches_[i].batch != nullptr) ++num_batches;
1945 "chand=%p calld=%p: starting %" PRIuPTR
1946 " pending batches on subchannel_call=%p",
1947 chand, this, num_batches, subchannel_call_.get());
1949 CallCombinerClosureList closures;
1950 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1951 PendingBatch* pending = &pending_batches_[i];
1952 grpc_transport_stream_op_batch* batch = pending->batch;
1953 if (batch != nullptr) {
1954 if (batch->recv_trailing_metadata) {
1955 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
1958 batch->handler_private.extra_arg = subchannel_call_.get();
1959 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1960 ResumePendingBatchInCallCombiner, batch,
1961 grpc_schedule_on_exec_ctx);
1962 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1963 "PendingBatchesResume");
1964 PendingBatchClear(pending);
1967 // Note: This will release the call combiner.
1968 closures.RunClosures(call_combiner_);
1971 template <typename Predicate>
1972 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
1973 const char* log_message,
1974 Predicate predicate) {
1975 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1976 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1977 PendingBatch* pending = &pending_batches_[i];
1978 grpc_transport_stream_op_batch* batch = pending->batch;
1979 if (batch != nullptr && predicate(batch)) {
1980 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1982 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1983 this, log_message, i);
1995 void CallData::RetryCommit(grpc_call_element* elem,
1996 SubchannelCallRetryState* retry_state) {
1997 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1998 if (retry_committed_) return;
1999 retry_committed_ = true;
2000 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2001 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2003 if (retry_state != nullptr) {
2004 FreeCachedSendOpDataAfterCommit(elem, retry_state);
2008 void CallData::DoRetry(grpc_call_element* elem,
2009 SubchannelCallRetryState* retry_state,
2010 grpc_millis server_pushback_ms) {
2011 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2012 GPR_ASSERT(method_params_ != nullptr);
2013 const auto* retry_policy = method_params_->retry_policy();
2014 GPR_ASSERT(retry_policy != nullptr);
2015 // Reset subchannel call and connected subchannel.
2016 subchannel_call_.reset();
2017 pick_.pick.connected_subchannel.reset();
2018 // Compute backoff delay.
2019 grpc_millis next_attempt_time;
2020 if (server_pushback_ms >= 0) {
2021 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2022 last_attempt_got_server_pushback_ = true;
2024 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2025 retry_backoff_.Init(
2027 .set_initial_backoff(retry_policy->initial_backoff)
2028 .set_multiplier(retry_policy->backoff_multiplier)
2029 .set_jitter(RETRY_BACKOFF_JITTER)
2030 .set_max_backoff(retry_policy->max_backoff));
2031 last_attempt_got_server_pushback_ = false;
2033 next_attempt_time = retry_backoff_->NextAttemptTime();
2035 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2037 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2038 this, next_attempt_time - ExecCtx::Get()->Now());
2040 // Schedule retry after computed delay.
2041 GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
2042 grpc_combiner_scheduler(chand->data_plane_combiner()));
2043 grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2044 // Update bookkeeping.
2045 if (retry_state != nullptr) retry_state->retry_dispatched = true;
2048 bool CallData::MaybeRetry(grpc_call_element* elem,
2049 SubchannelCallBatchData* batch_data,
2050 grpc_status_code status,
2051 grpc_mdelem* server_pushback_md) {
2052 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2053 // Get retry policy.
2054 if (method_params_ == nullptr) return false;
2055 const auto* retry_policy = method_params_->retry_policy();
2056 if (retry_policy == nullptr) return false;
2057 // If we've already dispatched a retry from this call, return true.
2058 // This catches the case where the batch has multiple callbacks
2059 // (i.e., it includes either recv_message or recv_initial_metadata).
2060 SubchannelCallRetryState* retry_state = nullptr;
2061 if (batch_data != nullptr) {
2062 retry_state = static_cast<SubchannelCallRetryState*>(
2063 batch_data->subchannel_call->GetParentData());
2064 if (retry_state->retry_dispatched) {
2065 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2066 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2073 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2074 if (retry_throttle_data_ != nullptr) {
2075 retry_throttle_data_->RecordSuccess();
2077 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2078 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2082 // Status is not OK. Check whether the status is retryable.
2083 if (!retry_policy->retryable_status_codes.Contains(status)) {
2084 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2086 "chand=%p calld=%p: status %s not configured as retryable", chand,
2087 this, grpc_status_code_to_string(status));
2091 // Record the failure and check whether retries are throttled.
2092 // Note that it's important for this check to come after the status
2093 // code check above, since we should only record failures whose statuses
2094 // match the configured retryable status codes, so that we don't count
2095 // things like failures due to malformed requests (INVALID_ARGUMENT).
2096 // Conversely, it's important for this to come before the remaining
2097 // checks, so that we don't fail to record failures due to other factors.
2098 if (retry_throttle_data_ != nullptr &&
2099 !retry_throttle_data_->RecordFailure()) {
2100 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2101 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2105 // Check whether the call is committed.
2106 if (retry_committed_) {
2107 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2108 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2113 // Check whether we have retries remaining.
2114 ++num_attempts_completed_;
2115 if (num_attempts_completed_ >= retry_policy->max_attempts) {
2116 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2117 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2118 this, retry_policy->max_attempts);
2122 // If the call was cancelled from the surface, don't retry.
2123 if (cancel_error_ != GRPC_ERROR_NONE) {
2124 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2126 "chand=%p calld=%p: call cancelled from surface, not retrying",
2131 // Check server push-back.
2132 grpc_millis server_pushback_ms = -1;
2133 if (server_pushback_md != nullptr) {
2134 // If the value is "-1" or any other unparseable string, we do not retry.
2136 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2137 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2139 "chand=%p calld=%p: not retrying due to server push-back",
2144 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2145 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2148 server_pushback_ms = (grpc_millis)ms;
2151 DoRetry(elem, retry_state, server_pushback_ms);
2156 // CallData::SubchannelCallBatchData
2159 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2160 grpc_call_element* elem, int refcount, bool set_on_complete) {
2161 CallData* calld = static_cast<CallData*>(elem->call_data);
2162 return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2166 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2167 grpc_call_element* elem, CallData* calld, int refcount,
2168 bool set_on_complete)
2169 : elem(elem), subchannel_call(calld->subchannel_call_) {
2170 SubchannelCallRetryState* retry_state =
2171 static_cast<SubchannelCallRetryState*>(
2172 calld->subchannel_call_->GetParentData());
2173 batch.payload = &retry_state->batch_payload;
2174 gpr_ref_init(&refs, refcount);
2175 if (set_on_complete) {
2176 GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2177 grpc_schedule_on_exec_ctx);
2178 batch.on_complete = &on_complete;
2180 GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2183 void CallData::SubchannelCallBatchData::Destroy() {
2184 SubchannelCallRetryState* retry_state =
2185 static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2186 if (batch.send_initial_metadata) {
2187 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2189 if (batch.send_trailing_metadata) {
2190 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2192 if (batch.recv_initial_metadata) {
2193 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2195 if (batch.recv_trailing_metadata) {
2196 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2198 subchannel_call.reset();
2199 CallData* calld = static_cast<CallData*>(elem->call_data);
2200 GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2204 // recv_initial_metadata callback handling
2207 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2208 SubchannelCallBatchData* batch_data =
2209 static_cast<SubchannelCallBatchData*>(arg);
2210 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2211 // Find pending batch.
2212 PendingBatch* pending = calld->PendingBatchFind(
2213 batch_data->elem, "invoking recv_initial_metadata_ready for",
2214 [](grpc_transport_stream_op_batch* batch) {
2215 return batch->recv_initial_metadata &&
2216 batch->payload->recv_initial_metadata
2217 .recv_initial_metadata_ready != nullptr;
2219 GPR_ASSERT(pending != nullptr);
2221 SubchannelCallRetryState* retry_state =
2222 static_cast<SubchannelCallRetryState*>(
2223 batch_data->subchannel_call->GetParentData());
2224 grpc_metadata_batch_move(
2225 &retry_state->recv_initial_metadata,
2226 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2227 // Update bookkeeping.
2228 // Note: Need to do this before invoking the callback, since invoking
2229 // the callback will result in yielding the call combiner.
2230 grpc_closure* recv_initial_metadata_ready =
2231 pending->batch->payload->recv_initial_metadata
2232 .recv_initial_metadata_ready;
2233 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2235 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2236 batch_data->Unref();
2238 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2241 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2242 SubchannelCallBatchData* batch_data =
2243 static_cast<SubchannelCallBatchData*>(arg);
2244 grpc_call_element* elem = batch_data->elem;
2245 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2246 CallData* calld = static_cast<CallData*>(elem->call_data);
2247 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2249 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2250 chand, calld, grpc_error_string(error));
2252 SubchannelCallRetryState* retry_state =
2253 static_cast<SubchannelCallRetryState*>(
2254 batch_data->subchannel_call->GetParentData());
2255 retry_state->completed_recv_initial_metadata = true;
2256 // If a retry was already dispatched, then we're not going to use the
2257 // result of this recv_initial_metadata op, so do nothing.
2258 if (retry_state->retry_dispatched) {
2259 GRPC_CALL_COMBINER_STOP(
2260 calld->call_combiner_,
2261 "recv_initial_metadata_ready after retry dispatched");
2264 // If we got an error or a Trailers-Only response and have not yet gotten
2265 // the recv_trailing_metadata_ready callback, then defer propagating this
2266 // callback back to the surface. We can evaluate whether to retry when
2267 // recv_trailing_metadata comes back.
2268 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2269 error != GRPC_ERROR_NONE) &&
2270 !retry_state->completed_recv_trailing_metadata)) {
2271 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2273 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2277 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2278 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2279 if (!retry_state->started_recv_trailing_metadata) {
2280 // recv_trailing_metadata not yet started by application; start it
2281 // ourselves to get status.
2282 calld->StartInternalRecvTrailingMetadata(elem);
2284 GRPC_CALL_COMBINER_STOP(
2285 calld->call_combiner_,
2286 "recv_initial_metadata_ready trailers-only or error");
2290 // Received valid initial metadata, so commit the call.
2291 calld->RetryCommit(elem, retry_state);
2292 // Invoke the callback to return the result to the surface.
2293 // Manually invoking a callback function; it does not take ownership of error.
2294 calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2298 // recv_message callback handling
2301 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2302 SubchannelCallBatchData* batch_data =
2303 static_cast<SubchannelCallBatchData*>(arg);
2304 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2306 PendingBatch* pending = calld->PendingBatchFind(
2307 batch_data->elem, "invoking recv_message_ready for",
2308 [](grpc_transport_stream_op_batch* batch) {
2309 return batch->recv_message &&
2310 batch->payload->recv_message.recv_message_ready != nullptr;
2312 GPR_ASSERT(pending != nullptr);
2314 SubchannelCallRetryState* retry_state =
2315 static_cast<SubchannelCallRetryState*>(
2316 batch_data->subchannel_call->GetParentData());
2317 *pending->batch->payload->recv_message.recv_message =
2318 std::move(retry_state->recv_message);
2319 // Update bookkeeping.
2320 // Note: Need to do this before invoking the callback, since invoking
2321 // the callback will result in yielding the call combiner.
2322 grpc_closure* recv_message_ready =
2323 pending->batch->payload->recv_message.recv_message_ready;
2324 pending->batch->payload->recv_message.recv_message_ready = nullptr;
2325 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2326 batch_data->Unref();
2328 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2331 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2332 SubchannelCallBatchData* batch_data =
2333 static_cast<SubchannelCallBatchData*>(arg);
2334 grpc_call_element* elem = batch_data->elem;
2335 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2336 CallData* calld = static_cast<CallData*>(elem->call_data);
2337 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2338 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2339 chand, calld, grpc_error_string(error));
2341 SubchannelCallRetryState* retry_state =
2342 static_cast<SubchannelCallRetryState*>(
2343 batch_data->subchannel_call->GetParentData());
2344 ++retry_state->completed_recv_message_count;
2345 // If a retry was already dispatched, then we're not going to use the
2346 // result of this recv_message op, so do nothing.
2347 if (retry_state->retry_dispatched) {
2348 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2349 "recv_message_ready after retry dispatched");
2352 // If we got an error or the payload was nullptr and we have not yet gotten
2353 // the recv_trailing_metadata_ready callback, then defer propagating this
2354 // callback back to the surface. We can evaluate whether to retry when
2355 // recv_trailing_metadata comes back.
2357 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2358 !retry_state->completed_recv_trailing_metadata)) {
2359 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2361 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2362 "message and recv_trailing_metadata pending)",
2365 retry_state->recv_message_ready_deferred_batch = batch_data;
2366 retry_state->recv_message_error = GRPC_ERROR_REF(error);
2367 if (!retry_state->started_recv_trailing_metadata) {
2368 // recv_trailing_metadata not yet started by application; start it
2369 // ourselves to get status.
2370 calld->StartInternalRecvTrailingMetadata(elem);
2372 GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2376 // Received a valid message, so commit the call.
2377 calld->RetryCommit(elem, retry_state);
2378 // Invoke the callback to return the result to the surface.
2379 // Manually invoking a callback function; it does not take ownership of error.
2380 calld->InvokeRecvMessageCallback(batch_data, error);
2384 // recv_trailing_metadata handling
2387 void CallData::GetCallStatus(grpc_call_element* elem,
2388 grpc_metadata_batch* md_batch, grpc_error* error,
2389 grpc_status_code* status,
2390 grpc_mdelem** server_pushback_md) {
2391 if (error != GRPC_ERROR_NONE) {
2392 grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2394 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2396 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2397 if (server_pushback_md != nullptr &&
2398 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2399 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2402 GRPC_ERROR_UNREF(error);
2405 void CallData::AddClosureForRecvTrailingMetadataReady(
2406 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2407 grpc_error* error, CallCombinerClosureList* closures) {
2408 // Find pending batch.
2409 PendingBatch* pending = PendingBatchFind(
2410 elem, "invoking recv_trailing_metadata for",
2411 [](grpc_transport_stream_op_batch* batch) {
2412 return batch->recv_trailing_metadata &&
2413 batch->payload->recv_trailing_metadata
2414 .recv_trailing_metadata_ready != nullptr;
2416 // If we generated the recv_trailing_metadata op internally via
2417 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2418 if (pending == nullptr) {
2419 GRPC_ERROR_UNREF(error);
2423 SubchannelCallRetryState* retry_state =
2424 static_cast<SubchannelCallRetryState*>(
2425 batch_data->subchannel_call->GetParentData());
2426 grpc_metadata_batch_move(
2427 &retry_state->recv_trailing_metadata,
2428 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2430 closures->Add(pending->batch->payload->recv_trailing_metadata
2431 .recv_trailing_metadata_ready,
2432 error, "recv_trailing_metadata_ready for pending batch");
2433 // Update bookkeeping.
2434 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2436 MaybeClearPendingBatch(elem, pending);
2439 void CallData::AddClosuresForDeferredRecvCallbacks(
2440 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2441 CallCombinerClosureList* closures) {
2442 if (batch_data->batch.recv_trailing_metadata) {
2443 // Add closure for deferred recv_initial_metadata_ready.
2444 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2446 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2447 InvokeRecvInitialMetadataCallback,
2448 retry_state->recv_initial_metadata_ready_deferred_batch,
2449 grpc_schedule_on_exec_ctx);
2450 closures->Add(&retry_state->recv_initial_metadata_ready,
2451 retry_state->recv_initial_metadata_error,
2452 "resuming recv_initial_metadata_ready");
2453 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2455 // Add closure for deferred recv_message_ready.
2456 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2458 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2459 InvokeRecvMessageCallback,
2460 retry_state->recv_message_ready_deferred_batch,
2461 grpc_schedule_on_exec_ctx);
2462 closures->Add(&retry_state->recv_message_ready,
2463 retry_state->recv_message_error,
2464 "resuming recv_message_ready");
2465 retry_state->recv_message_ready_deferred_batch = nullptr;
2470 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2471 SubchannelCallRetryState* retry_state) {
2472 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2475 if (pending->batch->send_initial_metadata &&
2476 !retry_state->started_send_initial_metadata) {
2479 if (pending->batch->send_message &&
2480 retry_state->started_send_message_count < send_messages_.size()) {
2483 if (pending->batch->send_trailing_metadata &&
2484 !retry_state->started_send_trailing_metadata) {
2490 void CallData::AddClosuresToFailUnstartedPendingBatches(
2491 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2492 grpc_error* error, CallCombinerClosureList* closures) {
2493 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2494 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2495 PendingBatch* pending = &pending_batches_[i];
2496 if (PendingBatchIsUnstarted(pending, retry_state)) {
2497 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2499 "chand=%p calld=%p: failing unstarted pending batch at index "
2503 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2504 "failing on_complete for pending batch");
2505 pending->batch->on_complete = nullptr;
2506 MaybeClearPendingBatch(elem, pending);
2509 GRPC_ERROR_UNREF(error);
2512 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2513 grpc_error* error) {
2514 grpc_call_element* elem = batch_data->elem;
2515 SubchannelCallRetryState* retry_state =
2516 static_cast<SubchannelCallRetryState*>(
2517 batch_data->subchannel_call->GetParentData());
2518 // Construct list of closures to execute.
2519 CallCombinerClosureList closures;
2520 // First, add closure for recv_trailing_metadata_ready.
2521 AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2522 GRPC_ERROR_REF(error), &closures);
2523 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2524 // callbacks, add them to closures.
2525 AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2526 // Add closures to fail any pending batches that have not yet been started.
2527 AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2528 GRPC_ERROR_REF(error), &closures);
2529 // Don't need batch_data anymore.
2530 batch_data->Unref();
2531 // Schedule all of the closures identified above.
2532 // Note: This will release the call combiner.
2533 closures.RunClosures(call_combiner_);
2534 GRPC_ERROR_UNREF(error);
2537 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
2538 SubchannelCallBatchData* batch_data =
2539 static_cast<SubchannelCallBatchData*>(arg);
2540 grpc_call_element* elem = batch_data->elem;
2541 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2542 CallData* calld = static_cast<CallData*>(elem->call_data);
2543 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2545 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
2546 chand, calld, grpc_error_string(error));
2548 SubchannelCallRetryState* retry_state =
2549 static_cast<SubchannelCallRetryState*>(
2550 batch_data->subchannel_call->GetParentData());
2551 retry_state->completed_recv_trailing_metadata = true;
2552 // Get the call's status and check for server pushback metadata.
2553 grpc_status_code status = GRPC_STATUS_OK;
2554 grpc_mdelem* server_pushback_md = nullptr;
2555 grpc_metadata_batch* md_batch =
2556 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
2557 calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
2558 &server_pushback_md);
2559 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2560 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2561 calld, grpc_status_code_to_string(status));
2563 // Check if we should retry.
2564 if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
2565 // Unref batch_data for deferred recv_initial_metadata_ready or
2566 // recv_message_ready callbacks, if any.
2567 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
2568 batch_data->Unref();
2569 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2571 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2572 batch_data->Unref();
2573 GRPC_ERROR_UNREF(retry_state->recv_message_error);
2575 batch_data->Unref();
2578 // Not retrying, so commit the call.
2579 calld->RetryCommit(elem, retry_state);
2580 // Run any necessary closures.
2581 calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
2585 // on_complete callback handling
2588 void CallData::AddClosuresForCompletedPendingBatch(
2589 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2590 SubchannelCallRetryState* retry_state, grpc_error* error,
2591 CallCombinerClosureList* closures) {
2592 PendingBatch* pending = PendingBatchFind(
2593 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2594 // Match the pending batch with the same set of send ops as the
2595 // subchannel batch we've just completed.
2596 return batch->on_complete != nullptr &&
2597 batch_data->batch.send_initial_metadata ==
2598 batch->send_initial_metadata &&
2599 batch_data->batch.send_message == batch->send_message &&
2600 batch_data->batch.send_trailing_metadata ==
2601 batch->send_trailing_metadata;
2603 // If batch_data is a replay batch, then there will be no pending
2604 // batch to complete.
2605 if (pending == nullptr) {
2606 GRPC_ERROR_UNREF(error);
2610 closures->Add(pending->batch->on_complete, error,
2611 "on_complete for pending batch");
2612 pending->batch->on_complete = nullptr;
2613 MaybeClearPendingBatch(elem, pending);
2616 void CallData::AddClosuresForReplayOrPendingSendOps(
2617 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2618 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
2619 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2620 bool have_pending_send_message_ops =
2621 retry_state->started_send_message_count < send_messages_.size();
2622 bool have_pending_send_trailing_metadata_op =
2623 seen_send_trailing_metadata_ &&
2624 !retry_state->started_send_trailing_metadata;
2625 if (!have_pending_send_message_ops &&
2626 !have_pending_send_trailing_metadata_op) {
2627 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2628 PendingBatch* pending = &pending_batches_[i];
2629 grpc_transport_stream_op_batch* batch = pending->batch;
2630 if (batch == nullptr || pending->send_ops_cached) continue;
2631 if (batch->send_message) have_pending_send_message_ops = true;
2632 if (batch->send_trailing_metadata) {
2633 have_pending_send_trailing_metadata_op = true;
2637 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2638 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2640 "chand=%p calld=%p: starting next batch for pending send op(s)",
2643 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2644 StartRetriableSubchannelBatches, elem,
2645 grpc_schedule_on_exec_ctx);
2646 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2647 "starting next batch for send_* op(s)");
2651 void CallData::OnComplete(void* arg, grpc_error* error) {
2652 SubchannelCallBatchData* batch_data =
2653 static_cast<SubchannelCallBatchData*>(arg);
2654 grpc_call_element* elem = batch_data->elem;
2655 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2656 CallData* calld = static_cast<CallData*>(elem->call_data);
2657 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2658 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2659 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2660 chand, calld, grpc_error_string(error), batch_str);
2661 gpr_free(batch_str);
2663 SubchannelCallRetryState* retry_state =
2664 static_cast<SubchannelCallRetryState*>(
2665 batch_data->subchannel_call->GetParentData());
2666 // Update bookkeeping in retry_state.
2667 if (batch_data->batch.send_initial_metadata) {
2668 retry_state->completed_send_initial_metadata = true;
2670 if (batch_data->batch.send_message) {
2671 ++retry_state->completed_send_message_count;
2673 if (batch_data->batch.send_trailing_metadata) {
2674 retry_state->completed_send_trailing_metadata = true;
2676 // If the call is committed, free cached data for send ops that we've just
2678 if (calld->retry_committed_) {
2679 calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
2681 // Construct list of closures to execute.
2682 CallCombinerClosureList closures;
2683 // If a retry was already dispatched, that means we saw
2684 // recv_trailing_metadata before this, so we do nothing here.
2685 // Otherwise, invoke the callback to return the result to the surface.
2686 if (!retry_state->retry_dispatched) {
2687 // Add closure for the completed pending batch, if any.
2688 calld->AddClosuresForCompletedPendingBatch(
2689 elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
2690 // If needed, add a callback to start any replay or pending send ops on
2691 // the subchannel call.
2692 if (!retry_state->completed_recv_trailing_metadata) {
2693 calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
2697 // Track number of pending subchannel send batches and determine if this
2698 // was the last one.
2699 --calld->num_pending_retriable_subchannel_send_batches_;
2700 const bool last_send_batch_complete =
2701 calld->num_pending_retriable_subchannel_send_batches_ == 0;
2702 // Don't need batch_data anymore.
2703 batch_data->Unref();
2704 // Schedule all of the closures identified above.
2705 // Note: This yeilds the call combiner.
2706 closures.RunClosures(calld->call_combiner_);
2707 // If this was the last subchannel send batch, unref the call stack.
2708 if (last_send_batch_complete) {
2709 GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
2714 // subchannel batch construction
2717 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
2718 grpc_transport_stream_op_batch* batch =
2719 static_cast<grpc_transport_stream_op_batch*>(arg);
2720 SubchannelCall* subchannel_call =
2721 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2722 // Note: This will release the call combiner.
2723 subchannel_call->StartTransportStreamOpBatch(batch);
2726 void CallData::AddClosureForSubchannelBatch(
2727 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2728 CallCombinerClosureList* closures) {
2729 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2730 batch->handler_private.extra_arg = subchannel_call_.get();
2731 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
2732 batch, grpc_schedule_on_exec_ctx);
2733 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2734 char* batch_str = grpc_transport_stream_op_batch_string(batch);
2735 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2737 gpr_free(batch_str);
2739 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2740 "start_subchannel_batch");
2743 void CallData::AddRetriableSendInitialMetadataOp(
2744 SubchannelCallRetryState* retry_state,
2745 SubchannelCallBatchData* batch_data) {
2746 // Maps the number of retries to the corresponding metadata value slice.
2747 static const grpc_slice* retry_count_strings[] = {
2748 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2749 // We need to make a copy of the metadata batch for each attempt, since
2750 // the filters in the subchannel stack may modify this batch, and we don't
2751 // want those modifications to be passed forward to subsequent attempts.
2753 // If we've already completed one or more attempts, add the
2754 // grpc-retry-attempts header.
2755 retry_state->send_initial_metadata_storage =
2756 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2757 sizeof(grpc_linked_mdelem) *
2758 (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
2759 grpc_metadata_batch_copy(&send_initial_metadata_,
2760 &retry_state->send_initial_metadata,
2761 retry_state->send_initial_metadata_storage);
2762 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2763 .grpc_previous_rpc_attempts != nullptr)) {
2764 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2765 retry_state->send_initial_metadata.idx.named
2766 .grpc_previous_rpc_attempts);
2768 if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
2769 grpc_mdelem retry_md = grpc_mdelem_create(
2770 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2771 *retry_count_strings[num_attempts_completed_ - 1], nullptr);
2772 grpc_error* error = grpc_metadata_batch_add_tail(
2773 &retry_state->send_initial_metadata,
2775 ->send_initial_metadata_storage[send_initial_metadata_.list.count],
2777 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2778 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2779 grpc_error_string(error));
2783 retry_state->started_send_initial_metadata = true;
2784 batch_data->batch.send_initial_metadata = true;
2785 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2786 &retry_state->send_initial_metadata;
2787 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2788 send_initial_metadata_flags_;
2789 batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
2792 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
2793 SubchannelCallRetryState* retry_state,
2794 SubchannelCallBatchData* batch_data) {
2795 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2796 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2798 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2799 chand, this, retry_state->started_send_message_count);
2801 ByteStreamCache* cache =
2802 send_messages_[retry_state->started_send_message_count];
2803 ++retry_state->started_send_message_count;
2804 retry_state->send_message.Init(cache);
2805 batch_data->batch.send_message = true;
2806 batch_data->batch.payload->send_message.send_message.reset(
2807 retry_state->send_message.get());
2810 void CallData::AddRetriableSendTrailingMetadataOp(
2811 SubchannelCallRetryState* retry_state,
2812 SubchannelCallBatchData* batch_data) {
2813 // We need to make a copy of the metadata batch for each attempt, since
2814 // the filters in the subchannel stack may modify this batch, and we don't
2815 // want those modifications to be passed forward to subsequent attempts.
2816 retry_state->send_trailing_metadata_storage =
2817 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2818 sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
2819 grpc_metadata_batch_copy(&send_trailing_metadata_,
2820 &retry_state->send_trailing_metadata,
2821 retry_state->send_trailing_metadata_storage);
2822 retry_state->started_send_trailing_metadata = true;
2823 batch_data->batch.send_trailing_metadata = true;
2824 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2825 &retry_state->send_trailing_metadata;
2828 void CallData::AddRetriableRecvInitialMetadataOp(
2829 SubchannelCallRetryState* retry_state,
2830 SubchannelCallBatchData* batch_data) {
2831 retry_state->started_recv_initial_metadata = true;
2832 batch_data->batch.recv_initial_metadata = true;
2833 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2834 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2835 &retry_state->recv_initial_metadata;
2836 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2837 &retry_state->trailing_metadata_available;
2838 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2839 RecvInitialMetadataReady, batch_data,
2840 grpc_schedule_on_exec_ctx);
2841 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2842 &retry_state->recv_initial_metadata_ready;
2845 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
2846 SubchannelCallBatchData* batch_data) {
2847 ++retry_state->started_recv_message_count;
2848 batch_data->batch.recv_message = true;
2849 batch_data->batch.payload->recv_message.recv_message =
2850 &retry_state->recv_message;
2851 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
2852 batch_data, grpc_schedule_on_exec_ctx);
2853 batch_data->batch.payload->recv_message.recv_message_ready =
2854 &retry_state->recv_message_ready;
2857 void CallData::AddRetriableRecvTrailingMetadataOp(
2858 SubchannelCallRetryState* retry_state,
2859 SubchannelCallBatchData* batch_data) {
2860 retry_state->started_recv_trailing_metadata = true;
2861 batch_data->batch.recv_trailing_metadata = true;
2862 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
2863 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2864 &retry_state->recv_trailing_metadata;
2865 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
2866 &retry_state->collect_stats;
2867 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
2868 RecvTrailingMetadataReady, batch_data,
2869 grpc_schedule_on_exec_ctx);
2870 batch_data->batch.payload->recv_trailing_metadata
2871 .recv_trailing_metadata_ready =
2872 &retry_state->recv_trailing_metadata_ready;
2873 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
2874 pick_.pick, &batch_data->batch);
2877 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
2878 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2879 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2881 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2882 "started; starting it internally",
2885 SubchannelCallRetryState* retry_state =
2886 static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
2887 // Create batch_data with 2 refs, since this batch will be unreffed twice:
2888 // once for the recv_trailing_metadata_ready callback when the subchannel
2889 // batch returns, and again when we actually get a recv_trailing_metadata
2890 // op from the surface.
2891 SubchannelCallBatchData* batch_data =
2892 SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
2893 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
2894 retry_state->recv_trailing_metadata_internal_batch = batch_data;
2895 // Note: This will release the call combiner.
2896 subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
2899 // If there are any cached send ops that need to be replayed on the
2900 // current subchannel call, creates and returns a new subchannel batch
2901 // to replay those ops. Otherwise, returns nullptr.
2902 CallData::SubchannelCallBatchData*
2903 CallData::MaybeCreateSubchannelBatchForReplay(
2904 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
2905 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2906 SubchannelCallBatchData* replay_batch_data = nullptr;
2907 // send_initial_metadata.
2908 if (seen_send_initial_metadata_ &&
2909 !retry_state->started_send_initial_metadata &&
2910 !pending_send_initial_metadata_) {
2911 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2913 "chand=%p calld=%p: replaying previously completed "
2914 "send_initial_metadata op",
2918 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
2919 AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
2922 // Note that we can only have one send_message op in flight at a time.
2923 if (retry_state->started_send_message_count < send_messages_.size() &&
2924 retry_state->started_send_message_count ==
2925 retry_state->completed_send_message_count &&
2926 !pending_send_message_) {
2927 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2929 "chand=%p calld=%p: replaying previously completed "
2933 if (replay_batch_data == nullptr) {
2935 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
2937 AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
2939 // send_trailing_metadata.
2940 // Note that we only add this op if we have no more send_message ops
2941 // to start, since we can't send down any more send_message ops after
2942 // send_trailing_metadata.
2943 if (seen_send_trailing_metadata_ &&
2944 retry_state->started_send_message_count == send_messages_.size() &&
2945 !retry_state->started_send_trailing_metadata &&
2946 !pending_send_trailing_metadata_) {
2947 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2949 "chand=%p calld=%p: replaying previously completed "
2950 "send_trailing_metadata op",
2953 if (replay_batch_data == nullptr) {
2955 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
2957 AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
2959 return replay_batch_data;
2962 void CallData::AddSubchannelBatchesForPendingBatches(
2963 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2964 CallCombinerClosureList* closures) {
2965 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2966 PendingBatch* pending = &pending_batches_[i];
2967 grpc_transport_stream_op_batch* batch = pending->batch;
2968 if (batch == nullptr) continue;
2969 // Skip any batch that either (a) has already been started on this
2970 // subchannel call or (b) we can't start yet because we're still
2971 // replaying send ops that need to be completed first.
2972 // TODO(roth): Note that if any one op in the batch can't be sent
2973 // yet due to ops that we're replaying, we don't start any of the ops
2974 // in the batch. This is probably okay, but it could conceivably
2975 // lead to increased latency in some cases -- e.g., we could delay
2976 // starting a recv op due to it being in the same batch with a send
2977 // op. If/when we revamp the callback protocol in
2978 // transport_stream_op_batch, we may be able to fix this.
2979 if (batch->send_initial_metadata &&
2980 retry_state->started_send_initial_metadata) {
2983 if (batch->send_message && retry_state->completed_send_message_count <
2984 retry_state->started_send_message_count) {
2987 // Note that we only start send_trailing_metadata if we have no more
2988 // send_message ops to start, since we can't send down any more
2989 // send_message ops after send_trailing_metadata.
2990 if (batch->send_trailing_metadata &&
2991 (retry_state->started_send_message_count + batch->send_message <
2992 send_messages_.size() ||
2993 retry_state->started_send_trailing_metadata)) {
2996 if (batch->recv_initial_metadata &&
2997 retry_state->started_recv_initial_metadata) {
3000 if (batch->recv_message && retry_state->completed_recv_message_count <
3001 retry_state->started_recv_message_count) {
3004 if (batch->recv_trailing_metadata &&
3005 retry_state->started_recv_trailing_metadata) {
3006 // If we previously completed a recv_trailing_metadata op
3007 // initiated by StartInternalRecvTrailingMetadata(), use the
3008 // result of that instead of trying to re-start this op.
3009 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3011 // If the batch completed, then trigger the completion callback
3012 // directly, so that we return the previously returned results to
3013 // the application. Otherwise, just unref the internally
3014 // started subchannel batch, since we'll propagate the
3015 // completion when it completes.
3016 if (retry_state->completed_recv_trailing_metadata) {
3017 // Batches containing recv_trailing_metadata always succeed.
3019 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3020 "re-executing recv_trailing_metadata_ready to propagate "
3021 "internally triggered result");
3023 retry_state->recv_trailing_metadata_internal_batch->Unref();
3025 retry_state->recv_trailing_metadata_internal_batch = nullptr;
3029 // If we're not retrying, just send the batch as-is.
3030 if (method_params_ == nullptr ||
3031 method_params_->retry_policy() == nullptr || retry_committed_) {
3032 // TODO(roth) : We should probably call
3033 // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3034 AddClosureForSubchannelBatch(elem, batch, closures);
3035 PendingBatchClear(pending);
3038 // Create batch with the right number of callbacks.
3039 const bool has_send_ops = batch->send_initial_metadata ||
3040 batch->send_message ||
3041 batch->send_trailing_metadata;
3042 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3043 batch->recv_message +
3044 batch->recv_trailing_metadata;
3045 SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3046 elem, num_callbacks, has_send_ops /* set_on_complete */);
3047 // Cache send ops if needed.
3048 MaybeCacheSendOpsForBatch(pending);
3049 // send_initial_metadata.
3050 if (batch->send_initial_metadata) {
3051 AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3054 if (batch->send_message) {
3055 AddRetriableSendMessageOp(elem, retry_state, batch_data);
3057 // send_trailing_metadata.
3058 if (batch->send_trailing_metadata) {
3059 AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3061 // recv_initial_metadata.
3062 if (batch->recv_initial_metadata) {
3063 // recv_flags is only used on the server side.
3064 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3065 AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3068 if (batch->recv_message) {
3069 AddRetriableRecvMessageOp(retry_state, batch_data);
3071 // recv_trailing_metadata.
3072 if (batch->recv_trailing_metadata) {
3073 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3075 AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3076 // Track number of pending subchannel send batches.
3077 // If this is the first one, take a ref to the call stack.
3078 if (batch->send_initial_metadata || batch->send_message ||
3079 batch->send_trailing_metadata) {
3080 if (num_pending_retriable_subchannel_send_batches_ == 0) {
3081 GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3083 ++num_pending_retriable_subchannel_send_batches_;
3088 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3089 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3090 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3091 CallData* calld = static_cast<CallData*>(elem->call_data);
3092 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3093 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3096 SubchannelCallRetryState* retry_state =
3097 static_cast<SubchannelCallRetryState*>(
3098 calld->subchannel_call_->GetParentData());
3099 // Construct list of closures to execute, one for each pending batch.
3100 CallCombinerClosureList closures;
3101 // Replay previously-returned send_* ops if needed.
3102 SubchannelCallBatchData* replay_batch_data =
3103 calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3104 if (replay_batch_data != nullptr) {
3105 calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3107 // Track number of pending subchannel send batches.
3108 // If this is the first one, take a ref to the call stack.
3109 if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3110 GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3112 ++calld->num_pending_retriable_subchannel_send_batches_;
3114 // Now add pending batches.
3115 calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3116 // Start batches on subchannel call.
3117 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3119 "chand=%p calld=%p: starting %" PRIuPTR
3120 " retriable batches on subchannel_call=%p",
3121 chand, calld, closures.size(), calld->subchannel_call_.get());
3123 // Note: This will yield the call combiner.
3124 closures.RunClosures(calld->call_combiner_);
3131 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3132 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3133 const size_t parent_data_size =
3134 enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3135 const ConnectedSubchannel::CallArgs call_args = {
3136 pollent_, path_, call_start_time_, deadline_, arena_,
3137 // TODO(roth): When we implement hedging support, we will probably
3138 // need to use a separate call context for each subchannel call.
3139 call_context_, call_combiner_, parent_data_size};
3140 grpc_error* error = GRPC_ERROR_NONE;
3142 pick_.pick.connected_subchannel->CreateCall(call_args, &error);
3143 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3144 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3145 chand, this, subchannel_call_.get(), grpc_error_string(error));
3147 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3148 PendingBatchesFail(elem, error, YieldCallCombiner);
3150 if (parent_data_size > 0) {
3151 new (subchannel_call_->GetParentData())
3152 SubchannelCallRetryState(call_context_);
3154 PendingBatchesResume(elem);
3158 void CallData::PickDone(void* arg, grpc_error* error) {
3159 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3160 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3161 CallData* calld = static_cast<CallData*>(elem->call_data);
3162 if (error != GRPC_ERROR_NONE) {
3163 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3165 "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3166 calld, grpc_error_string(error));
3168 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3171 calld->CreateSubchannelCall(elem);
3174 // A class to handle the call combiner cancellation callback for a
3176 class CallData::QueuedPickCanceller {
3178 explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3179 auto* calld = static_cast<CallData*>(elem->call_data);
3180 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3181 GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3182 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3183 grpc_combiner_scheduler(chand->data_plane_combiner()));
3184 calld->call_combiner_->SetNotifyOnCancel(&closure_);
3188 static void CancelLocked(void* arg, grpc_error* error) {
3189 auto* self = static_cast<QueuedPickCanceller*>(arg);
3190 auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3191 auto* calld = static_cast<CallData*>(self->elem_->call_data);
3192 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3194 "chand=%p calld=%p: cancelling queued pick: "
3195 "error=%s self=%p calld->pick_canceller=%p",
3196 chand, calld, grpc_error_string(error), self,
3197 calld->pick_canceller_);
3199 if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3200 // Remove pick from list of queued picks.
3201 calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3202 // Fail pending batches on the call.
3203 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3204 YieldCallCombinerIfPendingBatchesFound);
3206 GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3210 grpc_call_element* elem_;
3211 grpc_closure closure_;
3214 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3215 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3216 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3217 gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3220 chand->RemoveQueuedPick(&pick_, pollent_);
3221 pick_queued_ = false;
3222 // Lame the call combiner canceller.
3223 pick_canceller_ = nullptr;
3226 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3227 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3228 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3229 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3232 pick_queued_ = true;
3234 chand->AddQueuedPick(&pick_, pollent_);
3235 // Register call combiner cancellation callback.
3236 pick_canceller_ = New<QueuedPickCanceller>(elem);
3239 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3240 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3241 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3242 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3245 // Store a ref to the service_config in service_config_call_data_. Also, save
3246 // a pointer to this in the call_context so that all future filters can access
3248 service_config_call_data_ =
3249 ServiceConfig::CallData(chand->service_config(), path_);
3250 if (service_config_call_data_.service_config() != nullptr) {
3251 call_context_[GRPC_SERVICE_CONFIG_CALL_DATA].value =
3252 &service_config_call_data_;
3253 method_params_ = static_cast<ClientChannelMethodParsedObject*>(
3254 service_config_call_data_.GetMethodParsedObject(
3255 internal::ClientChannelServiceConfigParser::ParserIndex()));
3257 retry_throttle_data_ = chand->retry_throttle_data();
3258 if (method_params_ != nullptr) {
3259 // If the deadline from the service config is shorter than the one
3260 // from the client API, reset the deadline timer.
3261 if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3262 const grpc_millis per_method_deadline =
3263 grpc_timespec_to_millis_round_up(call_start_time_) +
3264 method_params_->timeout();
3265 if (per_method_deadline < deadline_) {
3266 deadline_ = per_method_deadline;
3267 grpc_deadline_state_reset(elem, deadline_);
3270 // If the service config set wait_for_ready and the application
3271 // did not explicitly set it, use the value from the service config.
3272 uint32_t* send_initial_metadata_flags =
3273 &pending_batches_[0]
3274 .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3275 if (method_params_->wait_for_ready().has_value() &&
3276 !(*send_initial_metadata_flags &
3277 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3278 if (method_params_->wait_for_ready().value()) {
3279 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3281 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3285 // If no retry policy, disable retries.
3286 // TODO(roth): Remove this when adding support for transparent retries.
3287 if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3288 enable_retries_ = false;
3292 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3293 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3294 // Apply service config data to the call only once, and only if the
3295 // channel has the data available.
3296 if (GPR_LIKELY(chand->received_service_config_data() &&
3297 !service_config_applied_)) {
3298 service_config_applied_ = true;
3299 ApplyServiceConfigToCallLocked(elem);
3303 const char* PickResultName(LoadBalancingPolicy::PickResult result) {
3305 case LoadBalancingPolicy::PICK_COMPLETE:
3307 case LoadBalancingPolicy::PICK_QUEUE:
3309 case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
3310 return "TRANSIENT_FAILURE";
3312 GPR_UNREACHABLE_CODE(return "UNKNOWN");
3315 void CallData::StartPickLocked(void* arg, grpc_error* error) {
3316 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3317 CallData* calld = static_cast<CallData*>(elem->call_data);
3318 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3319 GPR_ASSERT(calld->pick_.pick.connected_subchannel == nullptr);
3320 GPR_ASSERT(calld->subchannel_call_ == nullptr);
3321 // If this is a retry, use the send_initial_metadata payload that
3322 // we've cached; otherwise, use the pending batch. The
3323 // send_initial_metadata batch will be the first pending batch in the
3324 // list, as set by GetBatchIndex() above.
3325 // TODO(roth): What if the LB policy needs to add something to the
3326 // call's initial metadata, and then there's a retry? We don't want
3327 // the new metadata to be added twice. We might need to somehow
3328 // allocate the subchannel batch earlier so that we can give the
3329 // subchannel's copy of the metadata batch (which is copied for each
3330 // attempt) to the LB policy instead the one from the parent channel.
3331 calld->pick_.pick.initial_metadata =
3332 calld->seen_send_initial_metadata_
3333 ? &calld->send_initial_metadata_
3334 : calld->pending_batches_[0]
3335 .batch->payload->send_initial_metadata.send_initial_metadata;
3336 uint32_t* send_initial_metadata_flags =
3337 calld->seen_send_initial_metadata_
3338 ? &calld->send_initial_metadata_flags_
3339 : &calld->pending_batches_[0]
3340 .batch->payload->send_initial_metadata
3341 .send_initial_metadata_flags;
3342 // Apply service config to call if needed.
3343 calld->MaybeApplyServiceConfigToCallLocked(elem);
3344 // When done, we schedule this closure to leave the data plane combiner.
3345 GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
3346 grpc_schedule_on_exec_ctx);
3348 error = GRPC_ERROR_NONE;
3349 auto pick_result = chand->picker()->Pick(&calld->pick_.pick, &error);
3350 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3352 "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
3354 chand, calld, PickResultName(pick_result),
3355 calld->pick_.pick.connected_subchannel.get(),
3356 grpc_error_string(error));
3358 switch (pick_result) {
3359 case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
3360 // If we're shutting down, fail all RPCs.
3361 grpc_error* disconnect_error = chand->disconnect_error();
3362 if (disconnect_error != GRPC_ERROR_NONE) {
3363 GRPC_ERROR_UNREF(error);
3364 GRPC_CLOSURE_SCHED(&calld->pick_closure_,
3365 GRPC_ERROR_REF(disconnect_error));
3368 // If wait_for_ready is false, then the error indicates the RPC
3369 // attempt's final status.
3370 if ((*send_initial_metadata_flags &
3371 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3372 // Retry if appropriate; otherwise, fail.
3373 grpc_status_code status = GRPC_STATUS_OK;
3374 grpc_error_get_status(error, calld->deadline_, &status, nullptr,
3376 if (!calld->enable_retries_ ||
3377 !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
3378 nullptr /* server_pushback_md */)) {
3379 grpc_error* new_error =
3380 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3381 "Failed to pick subchannel", &error, 1);
3382 GRPC_ERROR_UNREF(error);
3383 GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
3385 if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3388 // If wait_for_ready is true, then queue to retry when we get a new
3390 GRPC_ERROR_UNREF(error);
3393 case LoadBalancingPolicy::PICK_QUEUE:
3394 if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
3396 default: // PICK_COMPLETE
3398 if (GPR_UNLIKELY(calld->pick_.pick.connected_subchannel == nullptr)) {
3399 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3400 "Call dropped by load balancing policy");
3402 GRPC_CLOSURE_SCHED(&calld->pick_closure_, error);
3403 if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3408 } // namespace grpc_core
3410 /*************************************************************************
3414 using grpc_core::CallData;
3415 using grpc_core::ChannelData;
3417 const grpc_channel_filter grpc_client_channel_filter = {
3418 CallData::StartTransportStreamOpBatch,
3419 ChannelData::StartTransportOp,
3422 CallData::SetPollent,
3424 sizeof(ChannelData),
3426 ChannelData::Destroy,
3427 ChannelData::GetChannelInfo,
3431 void grpc_client_channel_set_channelz_node(
3432 grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
3433 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3434 chand->set_channelz_node(node);
3437 void grpc_client_channel_populate_child_refs(
3438 grpc_channel_element* elem,
3439 grpc_core::channelz::ChildRefsList* child_subchannels,
3440 grpc_core::channelz::ChildRefsList* child_channels) {
3441 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3442 chand->FillChildRefsForChannelz(child_subchannels, child_channels);
3445 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3446 grpc_channel_element* elem, int try_to_connect) {
3447 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3448 return chand->CheckConnectivityState(try_to_connect);
3451 int grpc_client_channel_num_external_connectivity_watchers(
3452 grpc_channel_element* elem) {
3453 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3454 return chand->NumExternalConnectivityWatchers();
3457 void grpc_client_channel_watch_connectivity_state(
3458 grpc_channel_element* elem, grpc_polling_entity pollent,
3459 grpc_connectivity_state* state, grpc_closure* closure,
3460 grpc_closure* watcher_timer_init) {
3461 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3462 return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3463 watcher_timer_init);
3466 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3467 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3468 auto* calld = static_cast<CallData*>(elem->call_data);
3469 return calld->subchannel_call();