3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/map.h"
55 #include "src/core/lib/gprpp/sync.h"
56 #include "src/core/lib/iomgr/combiner.h"
57 #include "src/core/lib/iomgr/iomgr.h"
58 #include "src/core/lib/iomgr/polling_entity.h"
59 #include "src/core/lib/profiling/timers.h"
60 #include "src/core/lib/slice/slice_internal.h"
61 #include "src/core/lib/slice/slice_string_helpers.h"
62 #include "src/core/lib/surface/channel.h"
63 #include "src/core/lib/transport/connectivity_state.h"
64 #include "src/core/lib/transport/error_utils.h"
65 #include "src/core/lib/transport/metadata.h"
66 #include "src/core/lib/transport/metadata_batch.h"
67 #include "src/core/lib/transport/static_metadata.h"
68 #include "src/core/lib/transport/status_metadata.h"
70 using grpc_core::internal::ClientChannelMethodParsedConfig;
71 using grpc_core::internal::ServerRetryThrottleData;
74 // Client channel filter
77 // By default, we buffer 256 KiB per RPC for retries.
78 // TODO(roth): Do we have any data to suggest a better value?
79 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
81 // This value was picked arbitrarily. It can be changed if there is
82 // any even moderately compelling reason to do so.
83 #define RETRY_BACKOFF_JITTER 0.2
85 // Max number of batches that can be pending on a call at any given
86 // time. This includes one batch for each of the following ops:
87 // recv_initial_metadata
88 // send_initial_metadata
91 // recv_trailing_metadata
92 // send_trailing_metadata
93 #define MAX_PENDING_BATCHES 6
97 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
98 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
103 // ChannelData definition
109 grpc_call_element* elem;
110 QueuedPick* next = nullptr;
113 static grpc_error* Init(grpc_channel_element* elem,
114 grpc_channel_element_args* args);
115 static void Destroy(grpc_channel_element* elem);
116 static void StartTransportOp(grpc_channel_element* elem,
117 grpc_transport_op* op);
118 static void GetChannelInfo(grpc_channel_element* elem,
119 const grpc_channel_info* info);
121 bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
122 bool enable_retries() const { return enable_retries_; }
123 size_t per_rpc_retry_buffer_size() const {
124 return per_rpc_retry_buffer_size_;
127 // Note: Does NOT return a new ref.
128 grpc_error* disconnect_error() const {
129 return disconnect_error_.Load(MemoryOrder::ACQUIRE);
132 grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
134 LoadBalancingPolicy::SubchannelPicker* picker() const {
135 return picker_.get();
137 void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
138 void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
140 bool received_service_config_data() const {
141 return received_service_config_data_;
143 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
144 return retry_throttle_data_;
146 RefCountedPtr<ServiceConfig> service_config() const {
147 return service_config_;
150 grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
151 void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
152 grpc_connectivity_state* state,
153 grpc_closure* on_complete,
154 grpc_closure* watcher_timer_init) {
155 // Will delete itself.
156 New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
159 int NumExternalConnectivityWatchers() const {
160 return external_connectivity_watcher_list_.size();
164 class ConnectivityStateAndPickerSetter;
165 class ServiceConfigSetter;
166 class GrpcSubchannel;
167 class ClientChannelControlHelper;
169 class ExternalConnectivityWatcher {
173 WatcherList() { gpr_mu_init(&mu_); }
174 ~WatcherList() { gpr_mu_destroy(&mu_); }
177 ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
178 void Add(ExternalConnectivityWatcher* watcher);
179 void Remove(const ExternalConnectivityWatcher* watcher);
182 // head_ is guarded by a mutex, since the size() method needs to
183 // iterate over the list, and it's called from the C-core API
184 // function grpc_channel_num_external_connectivity_watchers(), which
185 // is synchronous and therefore cannot run in the combiner.
187 ExternalConnectivityWatcher* head_ = nullptr;
190 ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
191 grpc_connectivity_state* state,
192 grpc_closure* on_complete,
193 grpc_closure* watcher_timer_init);
195 ~ExternalConnectivityWatcher();
198 static void OnWatchCompleteLocked(void* arg, grpc_error* error);
199 static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
202 grpc_polling_entity pollent_;
203 grpc_connectivity_state* state_;
204 grpc_closure* on_complete_;
205 grpc_closure* watcher_timer_init_;
206 grpc_closure my_closure_;
207 ExternalConnectivityWatcher* next_ = nullptr;
210 ChannelData(grpc_channel_element_args* args, grpc_error** error);
213 static bool ProcessResolverResultLocked(
214 void* arg, const Resolver::Result& result, const char** lb_policy_name,
215 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
216 grpc_error** service_config_error);
218 grpc_error* DoPingLocked(grpc_transport_op* op);
220 static void StartTransportOpLocked(void* arg, grpc_error* ignored);
222 static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
224 void ProcessLbPolicy(
225 const Resolver::Result& resolver_result,
226 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
227 UniquePtr<char>* lb_policy_name,
228 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
231 // Fields set at construction and never modified.
233 const bool deadline_checking_enabled_;
234 const bool enable_retries_;
235 const size_t per_rpc_retry_buffer_size_;
236 grpc_channel_stack* owning_stack_;
237 ClientChannelFactory* client_channel_factory_;
238 UniquePtr<char> server_name_;
239 RefCountedPtr<ServiceConfig> default_service_config_;
240 channelz::ChannelNode* channelz_node_;
243 // Fields used in the data plane. Guarded by data_plane_combiner.
245 grpc_combiner* data_plane_combiner_;
246 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
247 QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
248 // Data from service config.
249 bool received_service_config_data_ = false;
250 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
251 RefCountedPtr<ServiceConfig> service_config_;
254 // Fields used in the control plane. Guarded by combiner.
256 grpc_combiner* combiner_;
257 grpc_pollset_set* interested_parties_;
258 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
259 OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
260 grpc_connectivity_state_tracker state_tracker_;
261 ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
262 UniquePtr<char> health_check_service_name_;
263 RefCountedPtr<ServiceConfig> saved_service_config_;
264 bool received_first_resolver_result_ = false;
265 Map<Subchannel*, int> subchannel_refcount_map_;
268 // Fields accessed from both data plane and control plane combiners.
270 Atomic<grpc_error*> disconnect_error_;
273 // Fields guarded by a mutex, since they need to be accessed
274 // synchronously via get_channel_info().
277 UniquePtr<char> info_lb_policy_name_;
278 UniquePtr<char> info_service_config_json_;
282 // CallData definition
287 static grpc_error* Init(grpc_call_element* elem,
288 const grpc_call_element_args* args);
289 static void Destroy(grpc_call_element* elem,
290 const grpc_call_final_info* final_info,
291 grpc_closure* then_schedule_closure);
292 static void StartTransportStreamOpBatch(
293 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
294 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
296 RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
298 // Invoked by channel for queued picks once resolver results are available.
299 void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
301 // Invoked by channel for queued picks when the picker is updated.
302 static void StartPickLocked(void* arg, grpc_error* error);
305 class QueuedPickCanceller;
307 class LbCallState : public LoadBalancingPolicy::CallState {
309 explicit LbCallState(CallData* calld) : calld_(calld) {}
311 void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
317 // State used for starting a retryable batch on a subchannel call.
318 // This provides its own grpc_transport_stream_op_batch and other data
319 // structures needed to populate the ops in the batch.
320 // We allocate one struct on the arena for each attempt at starting a
321 // batch on a given subchannel call.
322 struct SubchannelCallBatchData {
323 // Creates a SubchannelCallBatchData object on the call's arena with the
324 // specified refcount. If set_on_complete is true, the batch's
325 // on_complete callback will be set to point to on_complete();
326 // otherwise, the batch's on_complete callback will be null.
327 static SubchannelCallBatchData* Create(grpc_call_element* elem,
328 int refcount, bool set_on_complete);
331 if (gpr_unref(&refs)) Destroy();
334 SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
335 int refcount, bool set_on_complete);
336 // All dtor code must be added in `Destroy()`. This is because we may
337 // call closures in `SubchannelCallBatchData` after they are unrefed by
338 // `Unref()`, and msan would complain about accessing this class
339 // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
340 // TODO(soheil): We should try to call the dtor in `Unref()`.
341 ~SubchannelCallBatchData() { Destroy(); }
345 grpc_call_element* elem;
346 RefCountedPtr<SubchannelCall> subchannel_call;
347 // The batch to use in the subchannel call.
348 // Its payload field points to SubchannelCallRetryState::batch_payload.
349 grpc_transport_stream_op_batch batch;
350 // For intercepting on_complete.
351 grpc_closure on_complete;
354 // Retry state associated with a subchannel call.
355 // Stored in the parent_data of the subchannel call object.
356 struct SubchannelCallRetryState {
357 explicit SubchannelCallRetryState(grpc_call_context_element* context)
358 : batch_payload(context),
359 started_send_initial_metadata(false),
360 completed_send_initial_metadata(false),
361 started_send_trailing_metadata(false),
362 completed_send_trailing_metadata(false),
363 started_recv_initial_metadata(false),
364 completed_recv_initial_metadata(false),
365 started_recv_trailing_metadata(false),
366 completed_recv_trailing_metadata(false),
367 retry_dispatched(false) {}
369 // SubchannelCallBatchData.batch.payload points to this.
370 grpc_transport_stream_op_batch_payload batch_payload;
371 // For send_initial_metadata.
372 // Note that we need to make a copy of the initial metadata for each
373 // subchannel call instead of just referring to the copy in call_data,
374 // because filters in the subchannel stack will probably add entries,
375 // so we need to start in a pristine state for each attempt of the call.
376 grpc_linked_mdelem* send_initial_metadata_storage;
377 grpc_metadata_batch send_initial_metadata;
379 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
380 ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
381 // For send_trailing_metadata.
382 grpc_linked_mdelem* send_trailing_metadata_storage;
383 grpc_metadata_batch send_trailing_metadata;
384 // For intercepting recv_initial_metadata.
385 grpc_metadata_batch recv_initial_metadata;
386 grpc_closure recv_initial_metadata_ready;
387 bool trailing_metadata_available = false;
388 // For intercepting recv_message.
389 grpc_closure recv_message_ready;
390 OrphanablePtr<ByteStream> recv_message;
391 // For intercepting recv_trailing_metadata.
392 grpc_metadata_batch recv_trailing_metadata;
393 grpc_transport_stream_stats collect_stats;
394 grpc_closure recv_trailing_metadata_ready;
395 // These fields indicate which ops have been started and completed on
396 // this subchannel call.
397 size_t started_send_message_count = 0;
398 size_t completed_send_message_count = 0;
399 size_t started_recv_message_count = 0;
400 size_t completed_recv_message_count = 0;
401 bool started_send_initial_metadata : 1;
402 bool completed_send_initial_metadata : 1;
403 bool started_send_trailing_metadata : 1;
404 bool completed_send_trailing_metadata : 1;
405 bool started_recv_initial_metadata : 1;
406 bool completed_recv_initial_metadata : 1;
407 bool started_recv_trailing_metadata : 1;
408 bool completed_recv_trailing_metadata : 1;
409 // State for callback processing.
410 SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
412 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
413 SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
414 grpc_error* recv_message_error = GRPC_ERROR_NONE;
415 SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
416 // NOTE: Do not move this next to the metadata bitfields above. That would
417 // save space but will also result in a data race because compiler
418 // will generate a 2 byte store which overwrites the meta-data
419 // fields upon setting this field.
420 bool retry_dispatched : 1;
423 // Pending batches stored in call data.
424 struct PendingBatch {
425 // The pending batch. If nullptr, this slot is empty.
426 grpc_transport_stream_op_batch* batch;
427 // Indicates whether payload for send ops has been cached in CallData.
428 bool send_ops_cached;
431 CallData(grpc_call_element* elem, const ChannelData& chand,
432 const grpc_call_element_args& args);
435 // Caches data for send ops so that it can be retried later, if not
437 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
438 void FreeCachedSendInitialMetadata(ChannelData* chand);
439 // Frees cached send_message at index idx.
440 void FreeCachedSendMessage(ChannelData* chand, size_t idx);
441 void FreeCachedSendTrailingMetadata(ChannelData* chand);
442 // Frees cached send ops that have already been completed after
443 // committing the call.
444 void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
445 SubchannelCallRetryState* retry_state);
446 // Frees cached send ops that were completed by the completed batch in
447 // batch_data. Used when batches are completed after the call is committed.
448 void FreeCachedSendOpDataForCompletedBatch(
449 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
450 SubchannelCallRetryState* retry_state);
452 static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
453 void* arg, grpc_error* error);
454 void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
455 grpc_transport_stream_op_batch* batch);
457 // Returns the index into pending_batches_ to be used for batch.
458 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
459 void PendingBatchesAdd(grpc_call_element* elem,
460 grpc_transport_stream_op_batch* batch);
461 void PendingBatchClear(PendingBatch* pending);
462 void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
463 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
464 // A predicate type and some useful implementations for PendingBatchesFail().
465 typedef bool (*YieldCallCombinerPredicate)(
466 const CallCombinerClosureList& closures);
467 static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
470 static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
473 static bool YieldCallCombinerIfPendingBatchesFound(
474 const CallCombinerClosureList& closures) {
475 return closures.size() > 0;
477 // Fails all pending batches.
478 // If yield_call_combiner_predicate returns true, assumes responsibility for
479 // yielding the call combiner.
480 void PendingBatchesFail(
481 grpc_call_element* elem, grpc_error* error,
482 YieldCallCombinerPredicate yield_call_combiner_predicate);
483 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
484 // Resumes all pending batches on subchannel_call_.
485 void PendingBatchesResume(grpc_call_element* elem);
486 // Returns a pointer to the first pending batch for which predicate(batch)
487 // returns true, or null if not found.
488 template <typename Predicate>
489 PendingBatch* PendingBatchFind(grpc_call_element* elem,
490 const char* log_message, Predicate predicate);
492 // Commits the call so that no further retry attempts will be performed.
493 void RetryCommit(grpc_call_element* elem,
494 SubchannelCallRetryState* retry_state);
495 // Starts a retry after appropriate back-off.
496 void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
497 grpc_millis server_pushback_ms);
498 // Returns true if the call is being retried.
499 bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
500 grpc_status_code status, grpc_mdelem* server_pushback_md);
502 // Invokes recv_initial_metadata_ready for a subchannel batch.
503 static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
504 // Intercepts recv_initial_metadata_ready callback for retries.
505 // Commits the call and returns the initial metadata up the stack.
506 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
508 // Invokes recv_message_ready for a subchannel batch.
509 static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
510 // Intercepts recv_message_ready callback for retries.
511 // Commits the call and returns the message up the stack.
512 static void RecvMessageReady(void* arg, grpc_error* error);
514 // Sets *status and *server_pushback_md based on md_batch and error.
515 // Only sets *server_pushback_md if server_pushback_md != nullptr.
516 void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
517 grpc_error* error, grpc_status_code* status,
518 grpc_mdelem** server_pushback_md);
519 // Adds recv_trailing_metadata_ready closure to closures.
520 void AddClosureForRecvTrailingMetadataReady(
521 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
522 grpc_error* error, CallCombinerClosureList* closures);
523 // Adds any necessary closures for deferred recv_initial_metadata and
524 // recv_message callbacks to closures.
525 static void AddClosuresForDeferredRecvCallbacks(
526 SubchannelCallBatchData* batch_data,
527 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
528 // Returns true if any op in the batch was not yet started.
529 // Only looks at send ops, since recv ops are always started immediately.
530 bool PendingBatchIsUnstarted(PendingBatch* pending,
531 SubchannelCallRetryState* retry_state);
532 // For any pending batch containing an op that has not yet been started,
533 // adds the pending batch's completion closures to closures.
534 void AddClosuresToFailUnstartedPendingBatches(
535 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
536 grpc_error* error, CallCombinerClosureList* closures);
537 // Runs necessary closures upon completion of a call attempt.
538 void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
540 // Intercepts recv_trailing_metadata_ready callback for retries.
541 // Commits the call and returns the trailing metadata up the stack.
542 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
544 // Adds the on_complete closure for the pending batch completed in
545 // batch_data to closures.
546 void AddClosuresForCompletedPendingBatch(
547 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
548 SubchannelCallRetryState* retry_state, grpc_error* error,
549 CallCombinerClosureList* closures);
551 // If there are any cached ops to replay or pending ops to start on the
552 // subchannel call, adds a closure to closures to invoke
553 // StartRetriableSubchannelBatches().
554 void AddClosuresForReplayOrPendingSendOps(
555 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
556 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
558 // Callback used to intercept on_complete from subchannel calls.
559 // Called only when retries are enabled.
560 static void OnComplete(void* arg, grpc_error* error);
562 static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
563 // Adds a closure to closures that will execute batch in the call combiner.
564 void AddClosureForSubchannelBatch(grpc_call_element* elem,
565 grpc_transport_stream_op_batch* batch,
566 CallCombinerClosureList* closures);
567 // Adds retriable send_initial_metadata op to batch_data.
568 void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
569 SubchannelCallBatchData* batch_data);
570 // Adds retriable send_message op to batch_data.
571 void AddRetriableSendMessageOp(grpc_call_element* elem,
572 SubchannelCallRetryState* retry_state,
573 SubchannelCallBatchData* batch_data);
574 // Adds retriable send_trailing_metadata op to batch_data.
575 void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
576 SubchannelCallBatchData* batch_data);
577 // Adds retriable recv_initial_metadata op to batch_data.
578 void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
579 SubchannelCallBatchData* batch_data);
580 // Adds retriable recv_message op to batch_data.
581 void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
582 SubchannelCallBatchData* batch_data);
583 // Adds retriable recv_trailing_metadata op to batch_data.
584 void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
585 SubchannelCallBatchData* batch_data);
586 // Helper function used to start a recv_trailing_metadata batch. This
587 // is used in the case where a recv_initial_metadata or recv_message
588 // op fails in a way that we know the call is over but when the application
589 // has not yet started its own recv_trailing_metadata op.
590 void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
591 // If there are any cached send ops that need to be replayed on the
592 // current subchannel call, creates and returns a new subchannel batch
593 // to replay those ops. Otherwise, returns nullptr.
594 SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
595 grpc_call_element* elem, SubchannelCallRetryState* retry_state);
596 // Adds subchannel batches for pending batches to closures.
597 void AddSubchannelBatchesForPendingBatches(
598 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
599 CallCombinerClosureList* closures);
600 // Constructs and starts whatever subchannel batches are needed on the
602 static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
604 void CreateSubchannelCall(grpc_call_element* elem);
605 // Invoked when a pick is completed, on both success or failure.
606 static void PickDone(void* arg, grpc_error* error);
607 // Removes the call from the channel's list of queued picks.
608 void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
609 // Adds the call to the channel's list of queued picks.
610 void AddCallToQueuedPicksLocked(grpc_call_element* elem);
611 // Applies service config to the call. Must be invoked once we know
612 // that the resolver has returned results to the channel.
613 void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
615 // State for handling deadlines.
616 // The code in deadline_filter.c requires this to be the first field.
617 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
618 // and this struct both independently store pointers to the call stack
619 // and call combiner. If/when we have time, find a way to avoid this
620 // without breaking the grpc_deadline_state abstraction.
621 grpc_deadline_state deadline_state_;
623 grpc_slice path_; // Request path.
624 gpr_timespec call_start_time_;
625 grpc_millis deadline_;
627 grpc_call_stack* owning_call_;
628 CallCombiner* call_combiner_;
629 grpc_call_context_element* call_context_;
631 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
632 ServiceConfig::CallData service_config_call_data_;
633 const ClientChannelMethodParsedConfig* method_params_ = nullptr;
635 RefCountedPtr<SubchannelCall> subchannel_call_;
637 // Set when we get a cancel_stream op.
638 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
640 ChannelData::QueuedPick pick_;
641 bool pick_queued_ = false;
642 bool service_config_applied_ = false;
643 QueuedPickCanceller* pick_canceller_ = nullptr;
644 LbCallState lb_call_state_;
645 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
646 void (*lb_recv_trailing_metadata_ready_)(
647 void* user_data, grpc_metadata_batch* recv_trailing_metadata,
648 LoadBalancingPolicy::CallState* call_state) = nullptr;
649 void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
650 grpc_closure pick_closure_;
652 // For intercepting recv_trailing_metadata_ready for the LB policy.
653 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
654 grpc_closure recv_trailing_metadata_ready_;
655 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
657 grpc_polling_entity* pollent_ = nullptr;
659 // Batches are added to this list when received from above.
660 // They are removed when we are done handling the batch (i.e., when
661 // either we have invoked all of the batch's callbacks or we have
662 // passed the batch down to the subchannel call and are not
663 // intercepting any of its callbacks).
664 PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
665 bool pending_send_initial_metadata_ : 1;
666 bool pending_send_message_ : 1;
667 bool pending_send_trailing_metadata_ : 1;
670 bool enable_retries_ : 1;
671 bool retry_committed_ : 1;
672 bool last_attempt_got_server_pushback_ : 1;
673 int num_attempts_completed_ = 0;
674 size_t bytes_buffered_for_retry_ = 0;
675 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
676 ManualConstructor<BackOff> retry_backoff_;
677 grpc_timer retry_timer_;
679 // The number of pending retriable subchannel batches containing send ops.
680 // We hold a ref to the call stack while this is non-zero, since replay
681 // batches may not complete until after all callbacks have been returned
682 // to the surface, and we need to make sure that the call is not destroyed
683 // until all of these batches have completed.
684 // Note that we actually only need to track replay batches, but it's
685 // easier to track all batches with send ops.
686 int num_pending_retriable_subchannel_send_batches_ = 0;
688 // Cached data for retrying send ops.
689 // send_initial_metadata
690 bool seen_send_initial_metadata_ = false;
691 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
692 grpc_metadata_batch send_initial_metadata_;
693 uint32_t send_initial_metadata_flags_;
694 gpr_atm* peer_string_;
696 // When we get a send_message op, we replace the original byte stream
697 // with a CachingByteStream that caches the slices to a local buffer for
699 // Note: We inline the cache for the first 3 send_message ops and use
700 // dynamic allocation after that. This number was essentially picked
701 // at random; it could be changed in the future to tune performance.
702 InlinedVector<ByteStreamCache*, 3> send_messages_;
703 // send_trailing_metadata
704 bool seen_send_trailing_metadata_ = false;
705 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
706 grpc_metadata_batch send_trailing_metadata_;
710 // ChannelData::ConnectivityStateAndPickerSetter
713 // A fire-and-forget class that sets the channel's connectivity state
714 // and then hops into the data plane combiner to update the picker.
715 // Must be instantiated while holding the control plane combiner.
716 // Deletes itself when done.
717 class ChannelData::ConnectivityStateAndPickerSetter {
719 ConnectivityStateAndPickerSetter(
720 ChannelData* chand, grpc_connectivity_state state, const char* reason,
721 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
722 : chand_(chand), picker_(std::move(picker)) {
723 // Update connectivity state here, while holding control plane combiner.
724 grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
725 if (chand->channelz_node_ != nullptr) {
726 chand->channelz_node_->SetConnectivityState(state);
727 chand->channelz_node_->AddTraceEvent(
728 channelz::ChannelTrace::Severity::Info,
729 grpc_slice_from_static_string(
730 GetChannelConnectivityStateChangeString(state)));
732 // Bounce into the data plane combiner to reset the picker.
733 GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
734 "ConnectivityStateAndPickerSetter");
735 GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
736 grpc_combiner_scheduler(chand->data_plane_combiner_));
737 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
741 static const char* GetChannelConnectivityStateChangeString(
742 grpc_connectivity_state state) {
744 case GRPC_CHANNEL_IDLE:
745 return "Channel state change to IDLE";
746 case GRPC_CHANNEL_CONNECTING:
747 return "Channel state change to CONNECTING";
748 case GRPC_CHANNEL_READY:
749 return "Channel state change to READY";
750 case GRPC_CHANNEL_TRANSIENT_FAILURE:
751 return "Channel state change to TRANSIENT_FAILURE";
752 case GRPC_CHANNEL_SHUTDOWN:
753 return "Channel state change to SHUTDOWN";
755 GPR_UNREACHABLE_CODE(return "UNKNOWN");
758 static void SetPicker(void* arg, grpc_error* ignored) {
759 auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
761 self->chand_->picker_ = std::move(self->picker_);
762 // Re-process queued picks.
763 for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
765 CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
768 GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
769 "ConnectivityStateAndPickerSetter");
774 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
775 grpc_closure closure_;
779 // ChannelData::ServiceConfigSetter
782 // A fire-and-forget class that sets the channel's service config data
783 // in the data plane combiner. Deletes itself when done.
784 class ChannelData::ServiceConfigSetter {
788 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
790 RefCountedPtr<ServiceConfig> service_config)
792 retry_throttle_data_(retry_throttle_data),
793 service_config_(std::move(service_config)) {
794 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
795 GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
796 grpc_combiner_scheduler(chand->data_plane_combiner_));
797 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
801 static void SetServiceConfigData(void* arg, grpc_error* ignored) {
802 ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
803 ChannelData* chand = self->chand_;
804 // Update channel state.
805 chand->received_service_config_data_ = true;
806 if (self->retry_throttle_data_.has_value()) {
807 chand->retry_throttle_data_ =
808 internal::ServerRetryThrottleMap::GetDataForServer(
809 chand->server_name_.get(),
810 self->retry_throttle_data_.value().max_milli_tokens,
811 self->retry_throttle_data_.value().milli_token_ratio);
813 chand->service_config_ = std::move(self->service_config_);
814 // Apply service config to queued picks.
815 for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
817 CallData* calld = static_cast<CallData*>(pick->elem->call_data);
818 calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
821 GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
822 "ServiceConfigSetter");
827 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
828 retry_throttle_data_;
829 RefCountedPtr<ServiceConfig> service_config_;
830 grpc_closure closure_;
834 // ChannelData::ExternalConnectivityWatcher::WatcherList
837 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
838 MutexLock lock(&mu_);
840 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
846 ChannelData::ExternalConnectivityWatcher*
847 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
848 grpc_closure* on_complete) const {
849 MutexLock lock(&mu_);
850 ExternalConnectivityWatcher* w = head_;
851 while (w != nullptr && w->on_complete_ != on_complete) {
857 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
858 ExternalConnectivityWatcher* watcher) {
859 GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
860 MutexLock lock(&mu_);
861 GPR_ASSERT(watcher->next_ == nullptr);
862 watcher->next_ = head_;
866 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
867 const ExternalConnectivityWatcher* watcher) {
868 MutexLock lock(&mu_);
869 if (watcher == head_) {
870 head_ = watcher->next_;
873 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
874 if (w->next_ == watcher) {
875 w->next_ = w->next_->next_;
879 GPR_UNREACHABLE_CODE(return );
883 // ChannelData::ExternalConnectivityWatcher
886 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
887 ChannelData* chand, grpc_polling_entity pollent,
888 grpc_connectivity_state* state, grpc_closure* on_complete,
889 grpc_closure* watcher_timer_init)
893 on_complete_(on_complete),
894 watcher_timer_init_(watcher_timer_init) {
895 grpc_polling_entity_add_to_pollset_set(&pollent_,
896 chand_->interested_parties_);
897 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
899 GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
900 grpc_combiner_scheduler(chand_->combiner_)),
904 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
905 grpc_polling_entity_del_from_pollset_set(&pollent_,
906 chand_->interested_parties_);
907 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
908 "ExternalConnectivityWatcher");
911 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
912 void* arg, grpc_error* error) {
913 ExternalConnectivityWatcher* self =
914 static_cast<ExternalConnectivityWatcher*>(arg);
915 grpc_closure* on_complete = self->on_complete_;
916 self->chand_->external_connectivity_watcher_list_.Remove(self);
918 GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
921 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
922 void* arg, grpc_error* ignored) {
923 ExternalConnectivityWatcher* self =
924 static_cast<ExternalConnectivityWatcher*>(arg);
925 if (self->state_ == nullptr) {
926 // Handle cancellation.
927 GPR_ASSERT(self->watcher_timer_init_ == nullptr);
928 ExternalConnectivityWatcher* found =
929 self->chand_->external_connectivity_watcher_list_.Lookup(
931 if (found != nullptr) {
932 grpc_connectivity_state_notify_on_state_change(
933 &found->chand_->state_tracker_, nullptr, &found->my_closure_);
939 self->chand_->external_connectivity_watcher_list_.Add(self);
940 // This assumes that the closure is scheduled on the ExecCtx scheduler
941 // and that GRPC_CLOSURE_RUN would run the closure immediately.
942 GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
943 GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
944 grpc_combiner_scheduler(self->chand_->combiner_));
945 grpc_connectivity_state_notify_on_state_change(
946 &self->chand_->state_tracker_, self->state_, &self->my_closure_);
950 // ChannelData::GrpcSubchannel
953 // This class is a wrapper for Subchannel that hides details of the
954 // channel's implementation (such as the health check service name) from
955 // the LB policy API.
957 // Note that no synchronization is needed here, because even if the
958 // underlying subchannel is shared between channels, this wrapper will only
959 // be used within one channel, so it will always be synchronized by the
960 // control plane combiner.
961 class ChannelData::GrpcSubchannel : public SubchannelInterface {
963 GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
964 UniquePtr<char> health_check_service_name)
966 subchannel_(subchannel),
967 health_check_service_name_(std::move(health_check_service_name)) {
968 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
969 auto* subchannel_node = subchannel_->channelz_node();
970 if (subchannel_node != nullptr) {
971 intptr_t subchannel_uuid = subchannel_node->uuid();
972 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
973 if (it == chand_->subchannel_refcount_map_.end()) {
974 chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
975 it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
982 auto* subchannel_node = subchannel_->channelz_node();
983 if (subchannel_node != nullptr) {
984 intptr_t subchannel_uuid = subchannel_node->uuid();
985 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
986 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
988 if (it->second == 0) {
989 chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
990 chand_->subchannel_refcount_map_.erase(it);
993 GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
994 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
997 grpc_connectivity_state CheckConnectivityState(
998 RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
1000 RefCountedPtr<ConnectedSubchannel> tmp;
1001 auto retval = subchannel_->CheckConnectivityState(
1002 health_check_service_name_.get(), &tmp);
1003 *connected_subchannel = std::move(tmp);
1007 void WatchConnectivityState(
1008 grpc_connectivity_state initial_state,
1009 UniquePtr<ConnectivityStateWatcher> watcher) override {
1010 subchannel_->WatchConnectivityState(
1012 UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
1013 std::move(watcher));
1016 void CancelConnectivityStateWatch(
1017 ConnectivityStateWatcher* watcher) override {
1018 subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
1022 void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1024 void ResetBackoff() override { subchannel_->ResetBackoff(); }
1027 ChannelData* chand_;
1028 Subchannel* subchannel_;
1029 UniquePtr<char> health_check_service_name_;
1033 // ChannelData::ClientChannelControlHelper
1036 class ChannelData::ClientChannelControlHelper
1037 : public LoadBalancingPolicy::ChannelControlHelper {
1039 explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1040 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1043 ~ClientChannelControlHelper() override {
1044 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1045 "ClientChannelControlHelper");
1048 RefCountedPtr<SubchannelInterface> CreateSubchannel(
1049 const grpc_channel_args& args) override {
1050 bool inhibit_health_checking = grpc_channel_arg_get_bool(
1051 grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1052 UniquePtr<char> health_check_service_name;
1053 if (!inhibit_health_checking) {
1054 health_check_service_name.reset(
1055 gpr_strdup(chand_->health_check_service_name_.get()));
1057 static const char* args_to_remove[] = {
1058 GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1059 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1061 grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
1062 chand_->subchannel_pool_.get());
1063 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1064 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
1065 Subchannel* subchannel =
1066 chand_->client_channel_factory_->CreateSubchannel(new_args);
1067 grpc_channel_args_destroy(new_args);
1068 if (subchannel == nullptr) return nullptr;
1069 return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
1070 std::move(health_check_service_name));
1073 grpc_channel* CreateChannel(const char* target,
1074 const grpc_channel_args& args) override {
1075 return chand_->client_channel_factory_->CreateChannel(target, &args);
1079 grpc_connectivity_state state,
1080 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1081 grpc_error* disconnect_error =
1082 chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
1083 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1084 const char* extra = disconnect_error == GRPC_ERROR_NONE
1086 : " (ignoring -- channel shutting down)";
1087 gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
1088 grpc_connectivity_state_name(state), picker.get(), extra);
1090 // Do update only if not shutting down.
1091 if (disconnect_error == GRPC_ERROR_NONE) {
1092 // Will delete itself.
1093 New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
1098 // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
1099 void RequestReresolution() override {}
1101 void AddTraceEvent(TraceSeverity severity, const char* message) override {
1102 if (chand_->channelz_node_ != nullptr) {
1103 chand_->channelz_node_->AddTraceEvent(
1104 ConvertSeverityEnum(severity),
1105 grpc_slice_from_copied_string(message));
1110 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1111 TraceSeverity severity) {
1112 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1113 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1114 return channelz::ChannelTrace::Error;
1117 ChannelData* chand_;
1121 // ChannelData implementation
1124 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1125 grpc_channel_element_args* args) {
1126 GPR_ASSERT(args->is_last);
1127 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1128 grpc_error* error = GRPC_ERROR_NONE;
1129 new (elem->channel_data) ChannelData(args, &error);
1133 void ChannelData::Destroy(grpc_channel_element* elem) {
1134 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1135 chand->~ChannelData();
1138 bool GetEnableRetries(const grpc_channel_args* args) {
1139 return grpc_channel_arg_get_bool(
1140 grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1143 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1144 return static_cast<size_t>(grpc_channel_arg_get_integer(
1145 grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1146 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1149 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1150 const grpc_channel_args* args) {
1151 const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1152 grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1153 if (use_local_subchannel_pool) {
1154 return MakeRefCounted<LocalSubchannelPool>();
1156 return GlobalSubchannelPool::instance();
1159 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1160 const grpc_arg* arg =
1161 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1162 if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1163 return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1168 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1169 : deadline_checking_enabled_(
1170 grpc_deadline_checking_enabled(args->channel_args)),
1171 enable_retries_(GetEnableRetries(args->channel_args)),
1172 per_rpc_retry_buffer_size_(
1173 GetMaxPerRpcRetryBufferSize(args->channel_args)),
1174 owning_stack_(args->channel_stack),
1175 client_channel_factory_(
1176 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1177 channelz_node_(GetChannelzNode(args->channel_args)),
1178 data_plane_combiner_(grpc_combiner_create()),
1179 combiner_(grpc_combiner_create()),
1180 interested_parties_(grpc_pollset_set_create()),
1181 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1182 disconnect_error_(GRPC_ERROR_NONE) {
1183 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1184 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1185 this, owning_stack_);
1187 // Initialize data members.
1188 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1190 gpr_mu_init(&info_mu_);
1191 // Start backup polling.
1192 grpc_client_channel_start_backup_polling(interested_parties_);
1193 // Check client channel factory.
1194 if (client_channel_factory_ == nullptr) {
1195 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1196 "Missing client channel factory in args for client channel filter");
1199 // Get server name to resolve, using proxy mapper if needed.
1200 const char* server_uri = grpc_channel_arg_get_string(
1201 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1202 if (server_uri == nullptr) {
1203 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1204 "server URI channel arg missing or wrong type in client channel "
1208 // Get default service config
1209 const char* service_config_json = grpc_channel_arg_get_string(
1210 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1211 if (service_config_json != nullptr) {
1212 *error = GRPC_ERROR_NONE;
1213 default_service_config_ = ServiceConfig::Create(service_config_json, error);
1214 if (*error != GRPC_ERROR_NONE) {
1215 default_service_config_.reset();
1219 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1220 if (uri != nullptr && uri->path[0] != '\0') {
1222 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1224 grpc_uri_destroy(uri);
1225 char* proxy_name = nullptr;
1226 grpc_channel_args* new_args = nullptr;
1227 grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1229 UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
1230 : gpr_strdup(server_uri));
1231 // Instantiate resolving LB policy.
1232 LoadBalancingPolicy::Args lb_args;
1233 lb_args.combiner = combiner_;
1234 lb_args.channel_control_helper =
1235 UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1236 New<ClientChannelControlHelper>(this));
1237 lb_args.args = new_args != nullptr ? new_args : args->channel_args;
1238 resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1239 std::move(lb_args), &grpc_client_channel_routing_trace,
1240 std::move(target_uri), ProcessResolverResultLocked, this, error));
1241 grpc_channel_args_destroy(new_args);
1242 if (*error != GRPC_ERROR_NONE) {
1243 // Orphan the resolving LB policy and flush the exec_ctx to ensure
1244 // that it finishes shutting down. This ensures that if we are
1245 // failing, we destroy the ClientChannelControlHelper (and thus
1246 // unref the channel stack) before we return.
1247 // TODO(roth): This is not a complete solution, because it only
1248 // catches the case where channel stack initialization fails in this
1249 // particular filter. If there is a failure in a different filter, we
1250 // will leave a dangling ref here, which can cause a crash. Fortunately,
1251 // in practice, there are no other filters that can cause failures in
1252 // channel stack initialization, so this works for now.
1253 resolving_lb_policy_.reset();
1254 ExecCtx::Get()->Flush();
1256 grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1257 interested_parties_);
1258 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1259 gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1260 resolving_lb_policy_.get());
1265 ChannelData::~ChannelData() {
1266 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1267 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1269 if (resolving_lb_policy_ != nullptr) {
1270 grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1271 interested_parties_);
1272 resolving_lb_policy_.reset();
1274 // Stop backup polling.
1275 grpc_client_channel_stop_backup_polling(interested_parties_);
1276 grpc_pollset_set_destroy(interested_parties_);
1277 GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
1278 GRPC_COMBINER_UNREF(combiner_, "client_channel");
1279 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1280 grpc_connectivity_state_destroy(&state_tracker_);
1281 gpr_mu_destroy(&info_mu_);
1284 void ChannelData::ProcessLbPolicy(
1285 const Resolver::Result& resolver_result,
1286 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1287 UniquePtr<char>* lb_policy_name,
1288 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
1289 // Prefer the LB policy name found in the service config.
1290 if (parsed_service_config != nullptr &&
1291 parsed_service_config->parsed_lb_config() != nullptr) {
1292 lb_policy_name->reset(
1293 gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1294 *lb_policy_config = parsed_service_config->parsed_lb_config();
1297 const char* local_policy_name = nullptr;
1298 if (parsed_service_config != nullptr &&
1299 parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1300 local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1302 const grpc_arg* channel_arg =
1303 grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1304 local_policy_name = grpc_channel_arg_get_string(channel_arg);
1306 // Special case: If at least one balancer address is present, we use
1307 // the grpclb policy, regardless of what the resolver has returned.
1308 bool found_balancer_address = false;
1309 for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1310 const ServerAddress& address = resolver_result.addresses[i];
1311 if (address.IsBalancer()) {
1312 found_balancer_address = true;
1316 if (found_balancer_address) {
1317 if (local_policy_name != nullptr &&
1318 strcmp(local_policy_name, "grpclb") != 0) {
1320 "resolver requested LB policy %s but provided at least one "
1321 "balancer address -- forcing use of grpclb LB policy",
1324 local_policy_name = "grpclb";
1326 // Use pick_first if nothing was specified and we didn't select grpclb
1328 lb_policy_name->reset(gpr_strdup(
1329 local_policy_name == nullptr ? "pick_first" : local_policy_name));
1332 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1333 // resolver result update.
1334 bool ChannelData::ProcessResolverResultLocked(
1335 void* arg, const Resolver::Result& result, const char** lb_policy_name,
1336 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
1337 grpc_error** service_config_error) {
1338 ChannelData* chand = static_cast<ChannelData*>(arg);
1339 RefCountedPtr<ServiceConfig> service_config;
1340 // If resolver did not return a service config or returned an invalid service
1341 // config, we need a fallback service config.
1342 if (result.service_config_error != GRPC_ERROR_NONE) {
1343 // If the service config was invalid, then fallback to the saved service
1344 // config. If there is no saved config either, use the default service
1346 if (chand->saved_service_config_ != nullptr) {
1347 service_config = chand->saved_service_config_;
1348 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1350 "chand=%p: resolver returned invalid service config. "
1351 "Continuing to use previous service config.",
1354 } else if (chand->default_service_config_ != nullptr) {
1355 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1357 "chand=%p: resolver returned invalid service config. Using "
1358 "default service config provided by client API.",
1361 service_config = chand->default_service_config_;
1363 } else if (result.service_config == nullptr) {
1364 if (chand->default_service_config_ != nullptr) {
1365 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1367 "chand=%p: resolver returned no service config. Using default "
1368 "service config provided by client API.",
1371 service_config = chand->default_service_config_;
1374 service_config = result.service_config;
1376 *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1377 if (service_config == nullptr &&
1378 result.service_config_error != GRPC_ERROR_NONE) {
1381 // Process service config.
1382 UniquePtr<char> service_config_json;
1383 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1385 if (service_config != nullptr) {
1386 parsed_service_config =
1387 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1388 service_config->GetGlobalParsedConfig(
1389 internal::ClientChannelServiceConfigParser::ParserIndex()));
1391 // Check if the config has changed.
1392 const bool service_config_changed =
1393 ((service_config == nullptr) !=
1394 (chand->saved_service_config_ == nullptr)) ||
1395 (service_config != nullptr &&
1396 strcmp(service_config->service_config_json(),
1397 chand->saved_service_config_->service_config_json()) != 0);
1398 if (service_config_changed) {
1399 service_config_json.reset(gpr_strdup(
1400 service_config != nullptr ? service_config->service_config_json()
1402 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1404 "chand=%p: resolver returned updated service config: \"%s\"",
1405 chand, service_config_json.get());
1407 // Save health check service name.
1408 if (service_config != nullptr) {
1409 chand->health_check_service_name_.reset(
1410 gpr_strdup(parsed_service_config->health_check_service_name()));
1412 chand->health_check_service_name_.reset();
1414 // Save service config.
1415 chand->saved_service_config_ = std::move(service_config);
1417 // We want to set the service config at least once. This should not really be
1418 // needed, but we are doing it as a defensive approach. This can be removed,
1419 // if we feel it is unnecessary.
1420 if (service_config_changed || !chand->received_first_resolver_result_) {
1421 chand->received_first_resolver_result_ = true;
1422 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
1423 retry_throttle_data;
1424 if (parsed_service_config != nullptr) {
1425 retry_throttle_data = parsed_service_config->retry_throttling();
1427 // Create service config setter to update channel state in the data
1428 // plane combiner. Destroys itself when done.
1429 New<ServiceConfigSetter>(chand, retry_throttle_data,
1430 chand->saved_service_config_);
1432 UniquePtr<char> processed_lb_policy_name;
1433 chand->ProcessLbPolicy(result, parsed_service_config,
1434 &processed_lb_policy_name, lb_policy_config);
1435 // Swap out the data used by GetChannelInfo().
1437 MutexLock lock(&chand->info_mu_);
1438 chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1439 if (service_config_json != nullptr) {
1440 chand->info_service_config_json_ = std::move(service_config_json);
1444 *lb_policy_name = chand->info_lb_policy_name_.get();
1445 return service_config_changed;
1448 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1449 if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1450 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1452 LoadBalancingPolicy::PickResult result =
1453 picker_->Pick(LoadBalancingPolicy::PickArgs());
1454 if (result.connected_subchannel != nullptr) {
1455 ConnectedSubchannel* connected_subchannel =
1456 static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
1457 connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1459 if (result.error == GRPC_ERROR_NONE) {
1460 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1461 "LB policy dropped call on ping");
1464 return result.error;
1467 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1468 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1469 grpc_channel_element* elem =
1470 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1471 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1472 // Connectivity watch.
1473 if (op->on_connectivity_state_change != nullptr) {
1474 grpc_connectivity_state_notify_on_state_change(
1475 &chand->state_tracker_, op->connectivity_state,
1476 op->on_connectivity_state_change);
1477 op->on_connectivity_state_change = nullptr;
1478 op->connectivity_state = nullptr;
1481 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1482 grpc_error* error = chand->DoPingLocked(op);
1483 if (error != GRPC_ERROR_NONE) {
1484 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1485 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1487 op->bind_pollset = nullptr;
1488 op->send_ping.on_initiate = nullptr;
1489 op->send_ping.on_ack = nullptr;
1492 if (op->reset_connect_backoff) {
1493 if (chand->resolving_lb_policy_ != nullptr) {
1494 chand->resolving_lb_policy_->ResetBackoffLocked();
1498 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1499 grpc_error* error = GRPC_ERROR_NONE;
1500 GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
1501 &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
1502 MemoryOrder::ACQUIRE));
1503 grpc_pollset_set_del_pollset_set(
1504 chand->resolving_lb_policy_->interested_parties(),
1505 chand->interested_parties_);
1506 chand->resolving_lb_policy_.reset();
1507 // Will delete itself.
1508 New<ConnectivityStateAndPickerSetter>(
1509 chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1510 UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1511 New<LoadBalancingPolicy::TransientFailurePicker>(
1512 GRPC_ERROR_REF(op->disconnect_with_error))));
1514 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1515 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1518 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1519 grpc_transport_op* op) {
1520 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1521 GPR_ASSERT(op->set_accept_stream == false);
1522 // Handle bind_pollset.
1523 if (op->bind_pollset != nullptr) {
1524 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1526 // Pop into control plane combiner for remaining ops.
1527 op->handler_private.extra_arg = elem;
1528 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1530 GRPC_CLOSURE_INIT(&op->handler_private.closure,
1531 ChannelData::StartTransportOpLocked, op,
1532 grpc_combiner_scheduler(chand->combiner_)),
1536 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1537 const grpc_channel_info* info) {
1538 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1539 MutexLock lock(&chand->info_mu_);
1540 if (info->lb_policy_name != nullptr) {
1541 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1543 if (info->service_config_json != nullptr) {
1544 *info->service_config_json =
1545 gpr_strdup(chand->info_service_config_json_.get());
1549 void ChannelData::AddQueuedPick(QueuedPick* pick,
1550 grpc_polling_entity* pollent) {
1551 // Add call to queued picks list.
1552 pick->next = queued_picks_;
1553 queued_picks_ = pick;
1554 // Add call's pollent to channel's interested_parties, so that I/O
1555 // can be done under the call's CQ.
1556 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1559 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1560 grpc_polling_entity* pollent) {
1561 // Remove call's pollent from channel's interested_parties.
1562 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1563 // Remove from queued picks list.
1564 for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1565 pick = &(*pick)->next) {
1566 if (*pick == to_remove) {
1567 *pick = to_remove->next;
1573 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1574 auto* chand = static_cast<ChannelData*>(arg);
1575 if (chand->resolving_lb_policy_ != nullptr) {
1576 chand->resolving_lb_policy_->ExitIdleLocked();
1578 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1581 grpc_connectivity_state ChannelData::CheckConnectivityState(
1582 bool try_to_connect) {
1583 grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1584 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1585 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1586 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1587 grpc_combiner_scheduler(combiner_)),
1594 // CallData implementation
1599 // In order to support retries, we act as a proxy for stream op batches.
1600 // When we get a batch from the surface, we add it to our list of pending
1601 // batches, and we then use those batches to construct separate "child"
1602 // batches to be started on the subchannel call. When the child batches
1603 // return, we then decide which pending batches have been completed and
1604 // schedule their callbacks accordingly. If a subchannel call fails and
1605 // we want to retry it, we do a new pick and start again, constructing
1606 // new "child" batches for the new subchannel call.
1608 // Note that retries are committed when receiving data from the server
1609 // (except for Trailers-Only responses). However, there may be many
1610 // send ops started before receiving any data, so we may have already
1611 // completed some number of send ops (and returned the completions up to
1612 // the surface) by the time we realize that we need to retry. To deal
1613 // with this, we cache data for send ops, so that we can replay them on a
1614 // different subchannel call even after we have completed the original
1617 // There are two sets of data to maintain:
1618 // - In call_data (in the parent channel), we maintain a list of pending
1619 // ops and cached data for send ops.
1620 // - In the subchannel call, we maintain state to indicate what ops have
1621 // already been sent down to that call.
1623 // When constructing the "child" batches, we compare those two sets of
1624 // data to see which batches need to be sent to the subchannel call.
1626 // TODO(roth): In subsequent PRs:
1627 // - add support for transparent retries (including initial metadata)
1628 // - figure out how to record stats in census for retries
1629 // (census filter is on top of this one)
1630 // - add census stats for retries
1632 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1633 const grpc_call_element_args& args)
1634 : deadline_state_(elem, args.call_stack, args.call_combiner,
1635 GPR_LIKELY(chand.deadline_checking_enabled())
1637 : GRPC_MILLIS_INF_FUTURE),
1638 path_(grpc_slice_ref_internal(args.path)),
1639 call_start_time_(args.start_time),
1640 deadline_(args.deadline),
1642 owning_call_(args.call_stack),
1643 call_combiner_(args.call_combiner),
1644 call_context_(args.context),
1645 lb_call_state_(this),
1646 pending_send_initial_metadata_(false),
1647 pending_send_message_(false),
1648 pending_send_trailing_metadata_(false),
1649 enable_retries_(chand.enable_retries()),
1650 retry_committed_(false),
1651 last_attempt_got_server_pushback_(false) {}
1653 CallData::~CallData() {
1654 grpc_slice_unref_internal(path_);
1655 GRPC_ERROR_UNREF(cancel_error_);
1656 // Make sure there are no remaining pending batches.
1657 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1658 GPR_ASSERT(pending_batches_[i].batch == nullptr);
1662 grpc_error* CallData::Init(grpc_call_element* elem,
1663 const grpc_call_element_args* args) {
1664 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1665 new (elem->call_data) CallData(elem, *chand, *args);
1666 return GRPC_ERROR_NONE;
1669 void CallData::Destroy(grpc_call_element* elem,
1670 const grpc_call_final_info* final_info,
1671 grpc_closure* then_schedule_closure) {
1672 CallData* calld = static_cast<CallData*>(elem->call_data);
1673 if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1674 calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1675 then_schedule_closure = nullptr;
1678 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1681 void CallData::StartTransportStreamOpBatch(
1682 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1683 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1684 CallData* calld = static_cast<CallData*>(elem->call_data);
1685 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1686 if (GPR_LIKELY(chand->deadline_checking_enabled())) {
1687 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1689 // If we've previously been cancelled, immediately fail any new batches.
1690 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1691 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1692 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1693 chand, calld, grpc_error_string(calld->cancel_error_));
1695 // Note: This will release the call combiner.
1696 grpc_transport_stream_op_batch_finish_with_failure(
1697 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1700 // Handle cancellation.
1701 if (GPR_UNLIKELY(batch->cancel_stream)) {
1702 // Stash a copy of cancel_error in our call data, so that we can use
1703 // it for subsequent operations. This ensures that if the call is
1704 // cancelled before any batches are passed down (e.g., if the deadline
1705 // is in the past when the call starts), we can return the right
1706 // error to the caller when the first batch does get passed down.
1707 GRPC_ERROR_UNREF(calld->cancel_error_);
1708 calld->cancel_error_ =
1709 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1710 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1711 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1712 calld, grpc_error_string(calld->cancel_error_));
1714 // If we do not have a subchannel call (i.e., a pick has not yet
1715 // been started), fail all pending batches. Otherwise, send the
1716 // cancellation down to the subchannel call.
1717 if (calld->subchannel_call_ == nullptr) {
1718 // TODO(roth): If there is a pending retry callback, do we need to
1720 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1721 NoYieldCallCombiner);
1722 // Note: This will release the call combiner.
1723 grpc_transport_stream_op_batch_finish_with_failure(
1724 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1726 // Note: This will release the call combiner.
1727 calld->subchannel_call_->StartTransportStreamOpBatch(batch);
1731 // Add the batch to the pending list.
1732 calld->PendingBatchesAdd(elem, batch);
1733 // Check if we've already gotten a subchannel call.
1734 // Note that once we have completed the pick, we do not need to enter
1735 // the channel combiner, which is more efficient (especially for
1736 // streaming calls).
1737 if (calld->subchannel_call_ != nullptr) {
1738 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1740 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
1741 calld, calld->subchannel_call_.get());
1743 calld->PendingBatchesResume(elem);
1746 // We do not yet have a subchannel call.
1747 // For batches containing a send_initial_metadata op, enter the channel
1748 // combiner to start a pick.
1749 if (GPR_LIKELY(batch->send_initial_metadata)) {
1750 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1751 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
1756 &batch->handler_private.closure, StartPickLocked, elem,
1757 grpc_combiner_scheduler(chand->data_plane_combiner())),
1760 // For all other batches, release the call combiner.
1761 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1763 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1766 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1767 "batch does not include send_initial_metadata");
1771 void CallData::SetPollent(grpc_call_element* elem,
1772 grpc_polling_entity* pollent) {
1773 CallData* calld = static_cast<CallData*>(elem->call_data);
1774 calld->pollent_ = pollent;
1778 // send op data caching
1781 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1782 if (pending->send_ops_cached) return;
1783 pending->send_ops_cached = true;
1784 grpc_transport_stream_op_batch* batch = pending->batch;
1785 // Save a copy of metadata for send_initial_metadata ops.
1786 if (batch->send_initial_metadata) {
1787 seen_send_initial_metadata_ = true;
1788 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1789 grpc_metadata_batch* send_initial_metadata =
1790 batch->payload->send_initial_metadata.send_initial_metadata;
1791 send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1792 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1793 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1794 send_initial_metadata_storage_);
1795 send_initial_metadata_flags_ =
1796 batch->payload->send_initial_metadata.send_initial_metadata_flags;
1797 peer_string_ = batch->payload->send_initial_metadata.peer_string;
1799 // Set up cache for send_message ops.
1800 if (batch->send_message) {
1801 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1802 std::move(batch->payload->send_message.send_message));
1803 send_messages_.push_back(cache);
1805 // Save metadata batch for send_trailing_metadata ops.
1806 if (batch->send_trailing_metadata) {
1807 seen_send_trailing_metadata_ = true;
1808 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1809 grpc_metadata_batch* send_trailing_metadata =
1810 batch->payload->send_trailing_metadata.send_trailing_metadata;
1811 send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1812 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1813 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1814 send_trailing_metadata_storage_);
1818 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
1819 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1821 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1824 grpc_metadata_batch_destroy(&send_initial_metadata_);
1827 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
1828 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1830 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1833 send_messages_[idx]->Destroy();
1836 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
1837 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1839 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1842 grpc_metadata_batch_destroy(&send_trailing_metadata_);
1845 void CallData::FreeCachedSendOpDataAfterCommit(
1846 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
1847 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1848 if (retry_state->completed_send_initial_metadata) {
1849 FreeCachedSendInitialMetadata(chand);
1851 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1852 FreeCachedSendMessage(chand, i);
1854 if (retry_state->completed_send_trailing_metadata) {
1855 FreeCachedSendTrailingMetadata(chand);
1859 void CallData::FreeCachedSendOpDataForCompletedBatch(
1860 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
1861 SubchannelCallRetryState* retry_state) {
1862 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1863 if (batch_data->batch.send_initial_metadata) {
1864 FreeCachedSendInitialMetadata(chand);
1866 if (batch_data->batch.send_message) {
1867 FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
1869 if (batch_data->batch.send_trailing_metadata) {
1870 FreeCachedSendTrailingMetadata(chand);
1875 // LB recv_trailing_metadata_ready handling
1878 void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
1879 void* arg, grpc_error* error) {
1880 CallData* calld = static_cast<CallData*>(arg);
1881 // Invoke callback to LB policy.
1882 calld->lb_recv_trailing_metadata_ready_(
1883 calld->lb_recv_trailing_metadata_ready_user_data_,
1884 calld->recv_trailing_metadata_, &calld->lb_call_state_);
1885 // Chain to original callback.
1886 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
1887 GRPC_ERROR_REF(error));
1890 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
1891 grpc_transport_stream_op_batch* batch) {
1892 if (lb_recv_trailing_metadata_ready_ != nullptr) {
1893 recv_trailing_metadata_ =
1894 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
1895 original_recv_trailing_metadata_ready_ =
1896 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1897 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
1898 RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
1899 grpc_schedule_on_exec_ctx);
1900 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1901 &recv_trailing_metadata_ready_;
1906 // pending_batches management
1909 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
1910 // Note: It is important the send_initial_metadata be the first entry
1911 // here, since the code in pick_subchannel_locked() assumes it will be.
1912 if (batch->send_initial_metadata) return 0;
1913 if (batch->send_message) return 1;
1914 if (batch->send_trailing_metadata) return 2;
1915 if (batch->recv_initial_metadata) return 3;
1916 if (batch->recv_message) return 4;
1917 if (batch->recv_trailing_metadata) return 5;
1918 GPR_UNREACHABLE_CODE(return (size_t)-1);
1921 // This is called via the call combiner, so access to calld is synchronized.
1922 void CallData::PendingBatchesAdd(grpc_call_element* elem,
1923 grpc_transport_stream_op_batch* batch) {
1924 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1925 const size_t idx = GetBatchIndex(batch);
1926 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1928 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1931 PendingBatch* pending = &pending_batches_[idx];
1932 GPR_ASSERT(pending->batch == nullptr);
1933 pending->batch = batch;
1934 pending->send_ops_cached = false;
1935 if (enable_retries_) {
1936 // Update state in calld about pending batches.
1937 // Also check if the batch takes us over the retry buffer limit.
1938 // Note: We don't check the size of trailing metadata here, because
1939 // gRPC clients do not send trailing metadata.
1940 if (batch->send_initial_metadata) {
1941 pending_send_initial_metadata_ = true;
1942 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1943 batch->payload->send_initial_metadata.send_initial_metadata);
1945 if (batch->send_message) {
1946 pending_send_message_ = true;
1947 bytes_buffered_for_retry_ +=
1948 batch->payload->send_message.send_message->length();
1950 if (batch->send_trailing_metadata) {
1951 pending_send_trailing_metadata_ = true;
1953 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
1954 chand->per_rpc_retry_buffer_size())) {
1955 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1957 "chand=%p calld=%p: exceeded retry buffer size, committing",
1960 SubchannelCallRetryState* retry_state =
1961 subchannel_call_ == nullptr ? nullptr
1962 : static_cast<SubchannelCallRetryState*>(
1963 subchannel_call_->GetParentData());
1964 RetryCommit(elem, retry_state);
1965 // If we are not going to retry and have not yet started, pretend
1966 // retries are disabled so that we don't bother with retry overhead.
1967 if (num_attempts_completed_ == 0) {
1968 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1970 "chand=%p calld=%p: disabling retries before first attempt",
1973 enable_retries_ = false;
1979 void CallData::PendingBatchClear(PendingBatch* pending) {
1980 if (enable_retries_) {
1981 if (pending->batch->send_initial_metadata) {
1982 pending_send_initial_metadata_ = false;
1984 if (pending->batch->send_message) {
1985 pending_send_message_ = false;
1987 if (pending->batch->send_trailing_metadata) {
1988 pending_send_trailing_metadata_ = false;
1991 pending->batch = nullptr;
1994 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
1995 PendingBatch* pending) {
1996 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1997 grpc_transport_stream_op_batch* batch = pending->batch;
1998 // We clear the pending batch if all of its callbacks have been
1999 // scheduled and reset to nullptr.
2000 if (batch->on_complete == nullptr &&
2001 (!batch->recv_initial_metadata ||
2002 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2004 (!batch->recv_message ||
2005 batch->payload->recv_message.recv_message_ready == nullptr) &&
2006 (!batch->recv_trailing_metadata ||
2007 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2009 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2010 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
2013 PendingBatchClear(pending);
2017 // This is called via the call combiner, so access to calld is synchronized.
2018 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2019 grpc_transport_stream_op_batch* batch =
2020 static_cast<grpc_transport_stream_op_batch*>(arg);
2021 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2022 // Note: This will release the call combiner.
2023 grpc_transport_stream_op_batch_finish_with_failure(
2024 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2027 // This is called via the call combiner, so access to calld is synchronized.
2028 void CallData::PendingBatchesFail(
2029 grpc_call_element* elem, grpc_error* error,
2030 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2031 GPR_ASSERT(error != GRPC_ERROR_NONE);
2032 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2033 size_t num_batches = 0;
2034 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2035 if (pending_batches_[i].batch != nullptr) ++num_batches;
2038 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2039 elem->channel_data, this, num_batches, grpc_error_string(error));
2041 CallCombinerClosureList closures;
2042 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2043 PendingBatch* pending = &pending_batches_[i];
2044 grpc_transport_stream_op_batch* batch = pending->batch;
2045 if (batch != nullptr) {
2046 if (batch->recv_trailing_metadata) {
2047 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2049 batch->handler_private.extra_arg = this;
2050 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2051 FailPendingBatchInCallCombiner, batch,
2052 grpc_schedule_on_exec_ctx);
2053 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2054 "PendingBatchesFail");
2055 PendingBatchClear(pending);
2058 if (yield_call_combiner_predicate(closures)) {
2059 closures.RunClosures(call_combiner_);
2061 closures.RunClosuresWithoutYielding(call_combiner_);
2063 GRPC_ERROR_UNREF(error);
2066 // This is called via the call combiner, so access to calld is synchronized.
2067 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2068 grpc_error* ignored) {
2069 grpc_transport_stream_op_batch* batch =
2070 static_cast<grpc_transport_stream_op_batch*>(arg);
2071 SubchannelCall* subchannel_call =
2072 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2073 // Note: This will release the call combiner.
2074 subchannel_call->StartTransportStreamOpBatch(batch);
2077 // This is called via the call combiner, so access to calld is synchronized.
2078 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2079 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2080 if (enable_retries_) {
2081 StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
2084 // Retries not enabled; send down batches as-is.
2085 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2086 size_t num_batches = 0;
2087 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2088 if (pending_batches_[i].batch != nullptr) ++num_batches;
2091 "chand=%p calld=%p: starting %" PRIuPTR
2092 " pending batches on subchannel_call=%p",
2093 chand, this, num_batches, subchannel_call_.get());
2095 CallCombinerClosureList closures;
2096 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2097 PendingBatch* pending = &pending_batches_[i];
2098 grpc_transport_stream_op_batch* batch = pending->batch;
2099 if (batch != nullptr) {
2100 if (batch->recv_trailing_metadata) {
2101 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2103 batch->handler_private.extra_arg = subchannel_call_.get();
2104 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2105 ResumePendingBatchInCallCombiner, batch,
2106 grpc_schedule_on_exec_ctx);
2107 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2108 "PendingBatchesResume");
2109 PendingBatchClear(pending);
2112 // Note: This will release the call combiner.
2113 closures.RunClosures(call_combiner_);
2116 template <typename Predicate>
2117 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
2118 const char* log_message,
2119 Predicate predicate) {
2120 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2121 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2122 PendingBatch* pending = &pending_batches_[i];
2123 grpc_transport_stream_op_batch* batch = pending->batch;
2124 if (batch != nullptr && predicate(batch)) {
2125 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2127 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
2128 this, log_message, i);
2140 void CallData::RetryCommit(grpc_call_element* elem,
2141 SubchannelCallRetryState* retry_state) {
2142 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2143 if (retry_committed_) return;
2144 retry_committed_ = true;
2145 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2146 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2148 if (retry_state != nullptr) {
2149 FreeCachedSendOpDataAfterCommit(elem, retry_state);
2153 void CallData::DoRetry(grpc_call_element* elem,
2154 SubchannelCallRetryState* retry_state,
2155 grpc_millis server_pushback_ms) {
2156 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2157 GPR_ASSERT(method_params_ != nullptr);
2158 const auto* retry_policy = method_params_->retry_policy();
2159 GPR_ASSERT(retry_policy != nullptr);
2160 // Reset subchannel call and connected subchannel.
2161 subchannel_call_.reset();
2162 connected_subchannel_.reset();
2163 // Compute backoff delay.
2164 grpc_millis next_attempt_time;
2165 if (server_pushback_ms >= 0) {
2166 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2167 last_attempt_got_server_pushback_ = true;
2169 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2170 retry_backoff_.Init(
2172 .set_initial_backoff(retry_policy->initial_backoff)
2173 .set_multiplier(retry_policy->backoff_multiplier)
2174 .set_jitter(RETRY_BACKOFF_JITTER)
2175 .set_max_backoff(retry_policy->max_backoff));
2176 last_attempt_got_server_pushback_ = false;
2178 next_attempt_time = retry_backoff_->NextAttemptTime();
2180 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2182 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2183 this, next_attempt_time - ExecCtx::Get()->Now());
2185 // Schedule retry after computed delay.
2186 GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
2187 grpc_combiner_scheduler(chand->data_plane_combiner()));
2188 grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2189 // Update bookkeeping.
2190 if (retry_state != nullptr) retry_state->retry_dispatched = true;
2193 bool CallData::MaybeRetry(grpc_call_element* elem,
2194 SubchannelCallBatchData* batch_data,
2195 grpc_status_code status,
2196 grpc_mdelem* server_pushback_md) {
2197 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2198 // Get retry policy.
2199 if (method_params_ == nullptr) return false;
2200 const auto* retry_policy = method_params_->retry_policy();
2201 if (retry_policy == nullptr) return false;
2202 // If we've already dispatched a retry from this call, return true.
2203 // This catches the case where the batch has multiple callbacks
2204 // (i.e., it includes either recv_message or recv_initial_metadata).
2205 SubchannelCallRetryState* retry_state = nullptr;
2206 if (batch_data != nullptr) {
2207 retry_state = static_cast<SubchannelCallRetryState*>(
2208 batch_data->subchannel_call->GetParentData());
2209 if (retry_state->retry_dispatched) {
2210 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2211 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2218 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2219 if (retry_throttle_data_ != nullptr) {
2220 retry_throttle_data_->RecordSuccess();
2222 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2223 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2227 // Status is not OK. Check whether the status is retryable.
2228 if (!retry_policy->retryable_status_codes.Contains(status)) {
2229 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2231 "chand=%p calld=%p: status %s not configured as retryable", chand,
2232 this, grpc_status_code_to_string(status));
2236 // Record the failure and check whether retries are throttled.
2237 // Note that it's important for this check to come after the status
2238 // code check above, since we should only record failures whose statuses
2239 // match the configured retryable status codes, so that we don't count
2240 // things like failures due to malformed requests (INVALID_ARGUMENT).
2241 // Conversely, it's important for this to come before the remaining
2242 // checks, so that we don't fail to record failures due to other factors.
2243 if (retry_throttle_data_ != nullptr &&
2244 !retry_throttle_data_->RecordFailure()) {
2245 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2246 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2250 // Check whether the call is committed.
2251 if (retry_committed_) {
2252 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2253 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2258 // Check whether we have retries remaining.
2259 ++num_attempts_completed_;
2260 if (num_attempts_completed_ >= retry_policy->max_attempts) {
2261 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2262 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2263 this, retry_policy->max_attempts);
2267 // If the call was cancelled from the surface, don't retry.
2268 if (cancel_error_ != GRPC_ERROR_NONE) {
2269 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2271 "chand=%p calld=%p: call cancelled from surface, not retrying",
2276 // Check server push-back.
2277 grpc_millis server_pushback_ms = -1;
2278 if (server_pushback_md != nullptr) {
2279 // If the value is "-1" or any other unparseable string, we do not retry.
2281 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2282 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2284 "chand=%p calld=%p: not retrying due to server push-back",
2289 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2290 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2293 server_pushback_ms = (grpc_millis)ms;
2296 DoRetry(elem, retry_state, server_pushback_ms);
2301 // CallData::SubchannelCallBatchData
2304 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2305 grpc_call_element* elem, int refcount, bool set_on_complete) {
2306 CallData* calld = static_cast<CallData*>(elem->call_data);
2307 return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2311 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2312 grpc_call_element* elem, CallData* calld, int refcount,
2313 bool set_on_complete)
2314 : elem(elem), subchannel_call(calld->subchannel_call_) {
2315 SubchannelCallRetryState* retry_state =
2316 static_cast<SubchannelCallRetryState*>(
2317 calld->subchannel_call_->GetParentData());
2318 batch.payload = &retry_state->batch_payload;
2319 gpr_ref_init(&refs, refcount);
2320 if (set_on_complete) {
2321 GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2322 grpc_schedule_on_exec_ctx);
2323 batch.on_complete = &on_complete;
2325 GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2328 void CallData::SubchannelCallBatchData::Destroy() {
2329 SubchannelCallRetryState* retry_state =
2330 static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2331 if (batch.send_initial_metadata) {
2332 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2334 if (batch.send_trailing_metadata) {
2335 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2337 if (batch.recv_initial_metadata) {
2338 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2340 if (batch.recv_trailing_metadata) {
2341 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2343 subchannel_call.reset();
2344 CallData* calld = static_cast<CallData*>(elem->call_data);
2345 GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2349 // recv_initial_metadata callback handling
2352 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2353 SubchannelCallBatchData* batch_data =
2354 static_cast<SubchannelCallBatchData*>(arg);
2355 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2356 // Find pending batch.
2357 PendingBatch* pending = calld->PendingBatchFind(
2358 batch_data->elem, "invoking recv_initial_metadata_ready for",
2359 [](grpc_transport_stream_op_batch* batch) {
2360 return batch->recv_initial_metadata &&
2361 batch->payload->recv_initial_metadata
2362 .recv_initial_metadata_ready != nullptr;
2364 GPR_ASSERT(pending != nullptr);
2366 SubchannelCallRetryState* retry_state =
2367 static_cast<SubchannelCallRetryState*>(
2368 batch_data->subchannel_call->GetParentData());
2369 grpc_metadata_batch_move(
2370 &retry_state->recv_initial_metadata,
2371 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2372 // Update bookkeeping.
2373 // Note: Need to do this before invoking the callback, since invoking
2374 // the callback will result in yielding the call combiner.
2375 grpc_closure* recv_initial_metadata_ready =
2376 pending->batch->payload->recv_initial_metadata
2377 .recv_initial_metadata_ready;
2378 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2380 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2381 batch_data->Unref();
2383 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2386 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2387 SubchannelCallBatchData* batch_data =
2388 static_cast<SubchannelCallBatchData*>(arg);
2389 grpc_call_element* elem = batch_data->elem;
2390 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2391 CallData* calld = static_cast<CallData*>(elem->call_data);
2392 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2394 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2395 chand, calld, grpc_error_string(error));
2397 SubchannelCallRetryState* retry_state =
2398 static_cast<SubchannelCallRetryState*>(
2399 batch_data->subchannel_call->GetParentData());
2400 retry_state->completed_recv_initial_metadata = true;
2401 // If a retry was already dispatched, then we're not going to use the
2402 // result of this recv_initial_metadata op, so do nothing.
2403 if (retry_state->retry_dispatched) {
2404 GRPC_CALL_COMBINER_STOP(
2405 calld->call_combiner_,
2406 "recv_initial_metadata_ready after retry dispatched");
2409 // If we got an error or a Trailers-Only response and have not yet gotten
2410 // the recv_trailing_metadata_ready callback, then defer propagating this
2411 // callback back to the surface. We can evaluate whether to retry when
2412 // recv_trailing_metadata comes back.
2413 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2414 error != GRPC_ERROR_NONE) &&
2415 !retry_state->completed_recv_trailing_metadata)) {
2416 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2418 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2422 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2423 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2424 if (!retry_state->started_recv_trailing_metadata) {
2425 // recv_trailing_metadata not yet started by application; start it
2426 // ourselves to get status.
2427 calld->StartInternalRecvTrailingMetadata(elem);
2429 GRPC_CALL_COMBINER_STOP(
2430 calld->call_combiner_,
2431 "recv_initial_metadata_ready trailers-only or error");
2435 // Received valid initial metadata, so commit the call.
2436 calld->RetryCommit(elem, retry_state);
2437 // Invoke the callback to return the result to the surface.
2438 // Manually invoking a callback function; it does not take ownership of error.
2439 calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2443 // recv_message callback handling
2446 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2447 SubchannelCallBatchData* batch_data =
2448 static_cast<SubchannelCallBatchData*>(arg);
2449 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2451 PendingBatch* pending = calld->PendingBatchFind(
2452 batch_data->elem, "invoking recv_message_ready for",
2453 [](grpc_transport_stream_op_batch* batch) {
2454 return batch->recv_message &&
2455 batch->payload->recv_message.recv_message_ready != nullptr;
2457 GPR_ASSERT(pending != nullptr);
2459 SubchannelCallRetryState* retry_state =
2460 static_cast<SubchannelCallRetryState*>(
2461 batch_data->subchannel_call->GetParentData());
2462 *pending->batch->payload->recv_message.recv_message =
2463 std::move(retry_state->recv_message);
2464 // Update bookkeeping.
2465 // Note: Need to do this before invoking the callback, since invoking
2466 // the callback will result in yielding the call combiner.
2467 grpc_closure* recv_message_ready =
2468 pending->batch->payload->recv_message.recv_message_ready;
2469 pending->batch->payload->recv_message.recv_message_ready = nullptr;
2470 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2471 batch_data->Unref();
2473 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2476 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2477 SubchannelCallBatchData* batch_data =
2478 static_cast<SubchannelCallBatchData*>(arg);
2479 grpc_call_element* elem = batch_data->elem;
2480 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2481 CallData* calld = static_cast<CallData*>(elem->call_data);
2482 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2483 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2484 chand, calld, grpc_error_string(error));
2486 SubchannelCallRetryState* retry_state =
2487 static_cast<SubchannelCallRetryState*>(
2488 batch_data->subchannel_call->GetParentData());
2489 ++retry_state->completed_recv_message_count;
2490 // If a retry was already dispatched, then we're not going to use the
2491 // result of this recv_message op, so do nothing.
2492 if (retry_state->retry_dispatched) {
2493 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2494 "recv_message_ready after retry dispatched");
2497 // If we got an error or the payload was nullptr and we have not yet gotten
2498 // the recv_trailing_metadata_ready callback, then defer propagating this
2499 // callback back to the surface. We can evaluate whether to retry when
2500 // recv_trailing_metadata comes back.
2502 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2503 !retry_state->completed_recv_trailing_metadata)) {
2504 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2506 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2507 "message and recv_trailing_metadata pending)",
2510 retry_state->recv_message_ready_deferred_batch = batch_data;
2511 retry_state->recv_message_error = GRPC_ERROR_REF(error);
2512 if (!retry_state->started_recv_trailing_metadata) {
2513 // recv_trailing_metadata not yet started by application; start it
2514 // ourselves to get status.
2515 calld->StartInternalRecvTrailingMetadata(elem);
2517 GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2521 // Received a valid message, so commit the call.
2522 calld->RetryCommit(elem, retry_state);
2523 // Invoke the callback to return the result to the surface.
2524 // Manually invoking a callback function; it does not take ownership of error.
2525 calld->InvokeRecvMessageCallback(batch_data, error);
2529 // recv_trailing_metadata handling
2532 void CallData::GetCallStatus(grpc_call_element* elem,
2533 grpc_metadata_batch* md_batch, grpc_error* error,
2534 grpc_status_code* status,
2535 grpc_mdelem** server_pushback_md) {
2536 if (error != GRPC_ERROR_NONE) {
2537 grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2539 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2541 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2542 if (server_pushback_md != nullptr &&
2543 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2544 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2547 GRPC_ERROR_UNREF(error);
2550 void CallData::AddClosureForRecvTrailingMetadataReady(
2551 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2552 grpc_error* error, CallCombinerClosureList* closures) {
2553 // Find pending batch.
2554 PendingBatch* pending = PendingBatchFind(
2555 elem, "invoking recv_trailing_metadata for",
2556 [](grpc_transport_stream_op_batch* batch) {
2557 return batch->recv_trailing_metadata &&
2558 batch->payload->recv_trailing_metadata
2559 .recv_trailing_metadata_ready != nullptr;
2561 // If we generated the recv_trailing_metadata op internally via
2562 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2563 if (pending == nullptr) {
2564 GRPC_ERROR_UNREF(error);
2568 SubchannelCallRetryState* retry_state =
2569 static_cast<SubchannelCallRetryState*>(
2570 batch_data->subchannel_call->GetParentData());
2571 grpc_metadata_batch_move(
2572 &retry_state->recv_trailing_metadata,
2573 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2575 closures->Add(pending->batch->payload->recv_trailing_metadata
2576 .recv_trailing_metadata_ready,
2577 error, "recv_trailing_metadata_ready for pending batch");
2578 // Update bookkeeping.
2579 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2581 MaybeClearPendingBatch(elem, pending);
2584 void CallData::AddClosuresForDeferredRecvCallbacks(
2585 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2586 CallCombinerClosureList* closures) {
2587 if (batch_data->batch.recv_trailing_metadata) {
2588 // Add closure for deferred recv_initial_metadata_ready.
2589 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2591 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2592 InvokeRecvInitialMetadataCallback,
2593 retry_state->recv_initial_metadata_ready_deferred_batch,
2594 grpc_schedule_on_exec_ctx);
2595 closures->Add(&retry_state->recv_initial_metadata_ready,
2596 retry_state->recv_initial_metadata_error,
2597 "resuming recv_initial_metadata_ready");
2598 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2600 // Add closure for deferred recv_message_ready.
2601 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2603 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2604 InvokeRecvMessageCallback,
2605 retry_state->recv_message_ready_deferred_batch,
2606 grpc_schedule_on_exec_ctx);
2607 closures->Add(&retry_state->recv_message_ready,
2608 retry_state->recv_message_error,
2609 "resuming recv_message_ready");
2610 retry_state->recv_message_ready_deferred_batch = nullptr;
2615 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2616 SubchannelCallRetryState* retry_state) {
2617 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2620 if (pending->batch->send_initial_metadata &&
2621 !retry_state->started_send_initial_metadata) {
2624 if (pending->batch->send_message &&
2625 retry_state->started_send_message_count < send_messages_.size()) {
2628 if (pending->batch->send_trailing_metadata &&
2629 !retry_state->started_send_trailing_metadata) {
2635 void CallData::AddClosuresToFailUnstartedPendingBatches(
2636 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2637 grpc_error* error, CallCombinerClosureList* closures) {
2638 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2639 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2640 PendingBatch* pending = &pending_batches_[i];
2641 if (PendingBatchIsUnstarted(pending, retry_state)) {
2642 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2644 "chand=%p calld=%p: failing unstarted pending batch at index "
2648 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2649 "failing on_complete for pending batch");
2650 pending->batch->on_complete = nullptr;
2651 MaybeClearPendingBatch(elem, pending);
2654 GRPC_ERROR_UNREF(error);
2657 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2658 grpc_error* error) {
2659 grpc_call_element* elem = batch_data->elem;
2660 SubchannelCallRetryState* retry_state =
2661 static_cast<SubchannelCallRetryState*>(
2662 batch_data->subchannel_call->GetParentData());
2663 // Construct list of closures to execute.
2664 CallCombinerClosureList closures;
2665 // First, add closure for recv_trailing_metadata_ready.
2666 AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2667 GRPC_ERROR_REF(error), &closures);
2668 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2669 // callbacks, add them to closures.
2670 AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2671 // Add closures to fail any pending batches that have not yet been started.
2672 AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2673 GRPC_ERROR_REF(error), &closures);
2674 // Don't need batch_data anymore.
2675 batch_data->Unref();
2676 // Schedule all of the closures identified above.
2677 // Note: This will release the call combiner.
2678 closures.RunClosures(call_combiner_);
2679 GRPC_ERROR_UNREF(error);
2682 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
2683 SubchannelCallBatchData* batch_data =
2684 static_cast<SubchannelCallBatchData*>(arg);
2685 grpc_call_element* elem = batch_data->elem;
2686 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2687 CallData* calld = static_cast<CallData*>(elem->call_data);
2688 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2690 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
2691 chand, calld, grpc_error_string(error));
2693 SubchannelCallRetryState* retry_state =
2694 static_cast<SubchannelCallRetryState*>(
2695 batch_data->subchannel_call->GetParentData());
2696 retry_state->completed_recv_trailing_metadata = true;
2697 // Get the call's status and check for server pushback metadata.
2698 grpc_status_code status = GRPC_STATUS_OK;
2699 grpc_mdelem* server_pushback_md = nullptr;
2700 grpc_metadata_batch* md_batch =
2701 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
2702 calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
2703 &server_pushback_md);
2704 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2705 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2706 calld, grpc_status_code_to_string(status));
2708 // Check if we should retry.
2709 if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
2710 // Unref batch_data for deferred recv_initial_metadata_ready or
2711 // recv_message_ready callbacks, if any.
2712 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
2713 batch_data->Unref();
2714 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2716 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2717 batch_data->Unref();
2718 GRPC_ERROR_UNREF(retry_state->recv_message_error);
2720 batch_data->Unref();
2723 // Not retrying, so commit the call.
2724 calld->RetryCommit(elem, retry_state);
2725 // Run any necessary closures.
2726 calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
2730 // on_complete callback handling
2733 void CallData::AddClosuresForCompletedPendingBatch(
2734 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2735 SubchannelCallRetryState* retry_state, grpc_error* error,
2736 CallCombinerClosureList* closures) {
2737 PendingBatch* pending = PendingBatchFind(
2738 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2739 // Match the pending batch with the same set of send ops as the
2740 // subchannel batch we've just completed.
2741 return batch->on_complete != nullptr &&
2742 batch_data->batch.send_initial_metadata ==
2743 batch->send_initial_metadata &&
2744 batch_data->batch.send_message == batch->send_message &&
2745 batch_data->batch.send_trailing_metadata ==
2746 batch->send_trailing_metadata;
2748 // If batch_data is a replay batch, then there will be no pending
2749 // batch to complete.
2750 if (pending == nullptr) {
2751 GRPC_ERROR_UNREF(error);
2755 closures->Add(pending->batch->on_complete, error,
2756 "on_complete for pending batch");
2757 pending->batch->on_complete = nullptr;
2758 MaybeClearPendingBatch(elem, pending);
2761 void CallData::AddClosuresForReplayOrPendingSendOps(
2762 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2763 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
2764 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2765 bool have_pending_send_message_ops =
2766 retry_state->started_send_message_count < send_messages_.size();
2767 bool have_pending_send_trailing_metadata_op =
2768 seen_send_trailing_metadata_ &&
2769 !retry_state->started_send_trailing_metadata;
2770 if (!have_pending_send_message_ops &&
2771 !have_pending_send_trailing_metadata_op) {
2772 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2773 PendingBatch* pending = &pending_batches_[i];
2774 grpc_transport_stream_op_batch* batch = pending->batch;
2775 if (batch == nullptr || pending->send_ops_cached) continue;
2776 if (batch->send_message) have_pending_send_message_ops = true;
2777 if (batch->send_trailing_metadata) {
2778 have_pending_send_trailing_metadata_op = true;
2782 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2783 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2785 "chand=%p calld=%p: starting next batch for pending send op(s)",
2788 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2789 StartRetriableSubchannelBatches, elem,
2790 grpc_schedule_on_exec_ctx);
2791 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2792 "starting next batch for send_* op(s)");
2796 void CallData::OnComplete(void* arg, grpc_error* error) {
2797 SubchannelCallBatchData* batch_data =
2798 static_cast<SubchannelCallBatchData*>(arg);
2799 grpc_call_element* elem = batch_data->elem;
2800 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2801 CallData* calld = static_cast<CallData*>(elem->call_data);
2802 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2803 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2804 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2805 chand, calld, grpc_error_string(error), batch_str);
2806 gpr_free(batch_str);
2808 SubchannelCallRetryState* retry_state =
2809 static_cast<SubchannelCallRetryState*>(
2810 batch_data->subchannel_call->GetParentData());
2811 // Update bookkeeping in retry_state.
2812 if (batch_data->batch.send_initial_metadata) {
2813 retry_state->completed_send_initial_metadata = true;
2815 if (batch_data->batch.send_message) {
2816 ++retry_state->completed_send_message_count;
2818 if (batch_data->batch.send_trailing_metadata) {
2819 retry_state->completed_send_trailing_metadata = true;
2821 // If the call is committed, free cached data for send ops that we've just
2823 if (calld->retry_committed_) {
2824 calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
2826 // Construct list of closures to execute.
2827 CallCombinerClosureList closures;
2828 // If a retry was already dispatched, that means we saw
2829 // recv_trailing_metadata before this, so we do nothing here.
2830 // Otherwise, invoke the callback to return the result to the surface.
2831 if (!retry_state->retry_dispatched) {
2832 // Add closure for the completed pending batch, if any.
2833 calld->AddClosuresForCompletedPendingBatch(
2834 elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
2835 // If needed, add a callback to start any replay or pending send ops on
2836 // the subchannel call.
2837 if (!retry_state->completed_recv_trailing_metadata) {
2838 calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
2842 // Track number of pending subchannel send batches and determine if this
2843 // was the last one.
2844 --calld->num_pending_retriable_subchannel_send_batches_;
2845 const bool last_send_batch_complete =
2846 calld->num_pending_retriable_subchannel_send_batches_ == 0;
2847 // Don't need batch_data anymore.
2848 batch_data->Unref();
2849 // Schedule all of the closures identified above.
2850 // Note: This yeilds the call combiner.
2851 closures.RunClosures(calld->call_combiner_);
2852 // If this was the last subchannel send batch, unref the call stack.
2853 if (last_send_batch_complete) {
2854 GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
2859 // subchannel batch construction
2862 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
2863 grpc_transport_stream_op_batch* batch =
2864 static_cast<grpc_transport_stream_op_batch*>(arg);
2865 SubchannelCall* subchannel_call =
2866 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2867 // Note: This will release the call combiner.
2868 subchannel_call->StartTransportStreamOpBatch(batch);
2871 void CallData::AddClosureForSubchannelBatch(
2872 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2873 CallCombinerClosureList* closures) {
2874 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2875 batch->handler_private.extra_arg = subchannel_call_.get();
2876 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
2877 batch, grpc_schedule_on_exec_ctx);
2878 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2879 char* batch_str = grpc_transport_stream_op_batch_string(batch);
2880 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2882 gpr_free(batch_str);
2884 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2885 "start_subchannel_batch");
2888 void CallData::AddRetriableSendInitialMetadataOp(
2889 SubchannelCallRetryState* retry_state,
2890 SubchannelCallBatchData* batch_data) {
2891 // Maps the number of retries to the corresponding metadata value slice.
2892 static const grpc_slice* retry_count_strings[] = {
2893 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2894 // We need to make a copy of the metadata batch for each attempt, since
2895 // the filters in the subchannel stack may modify this batch, and we don't
2896 // want those modifications to be passed forward to subsequent attempts.
2898 // If we've already completed one or more attempts, add the
2899 // grpc-retry-attempts header.
2900 retry_state->send_initial_metadata_storage =
2901 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2902 sizeof(grpc_linked_mdelem) *
2903 (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
2904 grpc_metadata_batch_copy(&send_initial_metadata_,
2905 &retry_state->send_initial_metadata,
2906 retry_state->send_initial_metadata_storage);
2907 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2908 .grpc_previous_rpc_attempts != nullptr)) {
2909 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2910 retry_state->send_initial_metadata.idx.named
2911 .grpc_previous_rpc_attempts);
2913 if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
2914 grpc_mdelem retry_md = grpc_mdelem_create(
2915 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2916 *retry_count_strings[num_attempts_completed_ - 1], nullptr);
2917 grpc_error* error = grpc_metadata_batch_add_tail(
2918 &retry_state->send_initial_metadata,
2920 ->send_initial_metadata_storage[send_initial_metadata_.list.count],
2922 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2923 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2924 grpc_error_string(error));
2928 retry_state->started_send_initial_metadata = true;
2929 batch_data->batch.send_initial_metadata = true;
2930 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2931 &retry_state->send_initial_metadata;
2932 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2933 send_initial_metadata_flags_;
2934 batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
2937 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
2938 SubchannelCallRetryState* retry_state,
2939 SubchannelCallBatchData* batch_data) {
2940 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2941 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2943 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2944 chand, this, retry_state->started_send_message_count);
2946 ByteStreamCache* cache =
2947 send_messages_[retry_state->started_send_message_count];
2948 ++retry_state->started_send_message_count;
2949 retry_state->send_message.Init(cache);
2950 batch_data->batch.send_message = true;
2951 batch_data->batch.payload->send_message.send_message.reset(
2952 retry_state->send_message.get());
2955 void CallData::AddRetriableSendTrailingMetadataOp(
2956 SubchannelCallRetryState* retry_state,
2957 SubchannelCallBatchData* batch_data) {
2958 // We need to make a copy of the metadata batch for each attempt, since
2959 // the filters in the subchannel stack may modify this batch, and we don't
2960 // want those modifications to be passed forward to subsequent attempts.
2961 retry_state->send_trailing_metadata_storage =
2962 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2963 sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
2964 grpc_metadata_batch_copy(&send_trailing_metadata_,
2965 &retry_state->send_trailing_metadata,
2966 retry_state->send_trailing_metadata_storage);
2967 retry_state->started_send_trailing_metadata = true;
2968 batch_data->batch.send_trailing_metadata = true;
2969 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2970 &retry_state->send_trailing_metadata;
2973 void CallData::AddRetriableRecvInitialMetadataOp(
2974 SubchannelCallRetryState* retry_state,
2975 SubchannelCallBatchData* batch_data) {
2976 retry_state->started_recv_initial_metadata = true;
2977 batch_data->batch.recv_initial_metadata = true;
2978 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2979 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2980 &retry_state->recv_initial_metadata;
2981 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2982 &retry_state->trailing_metadata_available;
2983 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2984 RecvInitialMetadataReady, batch_data,
2985 grpc_schedule_on_exec_ctx);
2986 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2987 &retry_state->recv_initial_metadata_ready;
2990 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
2991 SubchannelCallBatchData* batch_data) {
2992 ++retry_state->started_recv_message_count;
2993 batch_data->batch.recv_message = true;
2994 batch_data->batch.payload->recv_message.recv_message =
2995 &retry_state->recv_message;
2996 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
2997 batch_data, grpc_schedule_on_exec_ctx);
2998 batch_data->batch.payload->recv_message.recv_message_ready =
2999 &retry_state->recv_message_ready;
3002 void CallData::AddRetriableRecvTrailingMetadataOp(
3003 SubchannelCallRetryState* retry_state,
3004 SubchannelCallBatchData* batch_data) {
3005 retry_state->started_recv_trailing_metadata = true;
3006 batch_data->batch.recv_trailing_metadata = true;
3007 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
3008 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
3009 &retry_state->recv_trailing_metadata;
3010 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
3011 &retry_state->collect_stats;
3012 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
3013 RecvTrailingMetadataReady, batch_data,
3014 grpc_schedule_on_exec_ctx);
3015 batch_data->batch.payload->recv_trailing_metadata
3016 .recv_trailing_metadata_ready =
3017 &retry_state->recv_trailing_metadata_ready;
3018 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
3019 &batch_data->batch);
3022 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
3023 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3024 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3026 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
3027 "started; starting it internally",
3030 SubchannelCallRetryState* retry_state =
3031 static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
3032 // Create batch_data with 2 refs, since this batch will be unreffed twice:
3033 // once for the recv_trailing_metadata_ready callback when the subchannel
3034 // batch returns, and again when we actually get a recv_trailing_metadata
3035 // op from the surface.
3036 SubchannelCallBatchData* batch_data =
3037 SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
3038 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3039 retry_state->recv_trailing_metadata_internal_batch = batch_data;
3040 // Note: This will release the call combiner.
3041 subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
3044 // If there are any cached send ops that need to be replayed on the
3045 // current subchannel call, creates and returns a new subchannel batch
3046 // to replay those ops. Otherwise, returns nullptr.
3047 CallData::SubchannelCallBatchData*
3048 CallData::MaybeCreateSubchannelBatchForReplay(
3049 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
3050 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3051 SubchannelCallBatchData* replay_batch_data = nullptr;
3052 // send_initial_metadata.
3053 if (seen_send_initial_metadata_ &&
3054 !retry_state->started_send_initial_metadata &&
3055 !pending_send_initial_metadata_) {
3056 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3058 "chand=%p calld=%p: replaying previously completed "
3059 "send_initial_metadata op",
3063 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3064 AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
3067 // Note that we can only have one send_message op in flight at a time.
3068 if (retry_state->started_send_message_count < send_messages_.size() &&
3069 retry_state->started_send_message_count ==
3070 retry_state->completed_send_message_count &&
3071 !pending_send_message_) {
3072 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3074 "chand=%p calld=%p: replaying previously completed "
3078 if (replay_batch_data == nullptr) {
3080 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3082 AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
3084 // send_trailing_metadata.
3085 // Note that we only add this op if we have no more send_message ops
3086 // to start, since we can't send down any more send_message ops after
3087 // send_trailing_metadata.
3088 if (seen_send_trailing_metadata_ &&
3089 retry_state->started_send_message_count == send_messages_.size() &&
3090 !retry_state->started_send_trailing_metadata &&
3091 !pending_send_trailing_metadata_) {
3092 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3094 "chand=%p calld=%p: replaying previously completed "
3095 "send_trailing_metadata op",
3098 if (replay_batch_data == nullptr) {
3100 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3102 AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
3104 return replay_batch_data;
3107 void CallData::AddSubchannelBatchesForPendingBatches(
3108 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
3109 CallCombinerClosureList* closures) {
3110 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3111 PendingBatch* pending = &pending_batches_[i];
3112 grpc_transport_stream_op_batch* batch = pending->batch;
3113 if (batch == nullptr) continue;
3114 // Skip any batch that either (a) has already been started on this
3115 // subchannel call or (b) we can't start yet because we're still
3116 // replaying send ops that need to be completed first.
3117 // TODO(roth): Note that if any one op in the batch can't be sent
3118 // yet due to ops that we're replaying, we don't start any of the ops
3119 // in the batch. This is probably okay, but it could conceivably
3120 // lead to increased latency in some cases -- e.g., we could delay
3121 // starting a recv op due to it being in the same batch with a send
3122 // op. If/when we revamp the callback protocol in
3123 // transport_stream_op_batch, we may be able to fix this.
3124 if (batch->send_initial_metadata &&
3125 retry_state->started_send_initial_metadata) {
3128 if (batch->send_message && retry_state->completed_send_message_count <
3129 retry_state->started_send_message_count) {
3132 // Note that we only start send_trailing_metadata if we have no more
3133 // send_message ops to start, since we can't send down any more
3134 // send_message ops after send_trailing_metadata.
3135 if (batch->send_trailing_metadata &&
3136 (retry_state->started_send_message_count + batch->send_message <
3137 send_messages_.size() ||
3138 retry_state->started_send_trailing_metadata)) {
3141 if (batch->recv_initial_metadata &&
3142 retry_state->started_recv_initial_metadata) {
3145 if (batch->recv_message && retry_state->completed_recv_message_count <
3146 retry_state->started_recv_message_count) {
3149 if (batch->recv_trailing_metadata &&
3150 retry_state->started_recv_trailing_metadata) {
3151 // If we previously completed a recv_trailing_metadata op
3152 // initiated by StartInternalRecvTrailingMetadata(), use the
3153 // result of that instead of trying to re-start this op.
3154 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3156 // If the batch completed, then trigger the completion callback
3157 // directly, so that we return the previously returned results to
3158 // the application. Otherwise, just unref the internally
3159 // started subchannel batch, since we'll propagate the
3160 // completion when it completes.
3161 if (retry_state->completed_recv_trailing_metadata) {
3162 // Batches containing recv_trailing_metadata always succeed.
3164 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3165 "re-executing recv_trailing_metadata_ready to propagate "
3166 "internally triggered result");
3168 retry_state->recv_trailing_metadata_internal_batch->Unref();
3170 retry_state->recv_trailing_metadata_internal_batch = nullptr;
3174 // If we're not retrying, just send the batch as-is.
3175 if (method_params_ == nullptr ||
3176 method_params_->retry_policy() == nullptr || retry_committed_) {
3177 // TODO(roth) : We should probably call
3178 // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3179 AddClosureForSubchannelBatch(elem, batch, closures);
3180 PendingBatchClear(pending);
3183 // Create batch with the right number of callbacks.
3184 const bool has_send_ops = batch->send_initial_metadata ||
3185 batch->send_message ||
3186 batch->send_trailing_metadata;
3187 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3188 batch->recv_message +
3189 batch->recv_trailing_metadata;
3190 SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3191 elem, num_callbacks, has_send_ops /* set_on_complete */);
3192 // Cache send ops if needed.
3193 MaybeCacheSendOpsForBatch(pending);
3194 // send_initial_metadata.
3195 if (batch->send_initial_metadata) {
3196 AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3199 if (batch->send_message) {
3200 AddRetriableSendMessageOp(elem, retry_state, batch_data);
3202 // send_trailing_metadata.
3203 if (batch->send_trailing_metadata) {
3204 AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3206 // recv_initial_metadata.
3207 if (batch->recv_initial_metadata) {
3208 // recv_flags is only used on the server side.
3209 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3210 AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3213 if (batch->recv_message) {
3214 AddRetriableRecvMessageOp(retry_state, batch_data);
3216 // recv_trailing_metadata.
3217 if (batch->recv_trailing_metadata) {
3218 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3220 AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3221 // Track number of pending subchannel send batches.
3222 // If this is the first one, take a ref to the call stack.
3223 if (batch->send_initial_metadata || batch->send_message ||
3224 batch->send_trailing_metadata) {
3225 if (num_pending_retriable_subchannel_send_batches_ == 0) {
3226 GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3228 ++num_pending_retriable_subchannel_send_batches_;
3233 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3234 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3235 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3236 CallData* calld = static_cast<CallData*>(elem->call_data);
3237 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3238 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3241 SubchannelCallRetryState* retry_state =
3242 static_cast<SubchannelCallRetryState*>(
3243 calld->subchannel_call_->GetParentData());
3244 // Construct list of closures to execute, one for each pending batch.
3245 CallCombinerClosureList closures;
3246 // Replay previously-returned send_* ops if needed.
3247 SubchannelCallBatchData* replay_batch_data =
3248 calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3249 if (replay_batch_data != nullptr) {
3250 calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3252 // Track number of pending subchannel send batches.
3253 // If this is the first one, take a ref to the call stack.
3254 if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3255 GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3257 ++calld->num_pending_retriable_subchannel_send_batches_;
3259 // Now add pending batches.
3260 calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3261 // Start batches on subchannel call.
3262 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3264 "chand=%p calld=%p: starting %" PRIuPTR
3265 " retriable batches on subchannel_call=%p",
3266 chand, calld, closures.size(), calld->subchannel_call_.get());
3268 // Note: This will yield the call combiner.
3269 closures.RunClosures(calld->call_combiner_);
3276 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3277 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3278 const size_t parent_data_size =
3279 enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3280 const ConnectedSubchannel::CallArgs call_args = {
3281 pollent_, path_, call_start_time_, deadline_, arena_,
3282 // TODO(roth): When we implement hedging support, we will probably
3283 // need to use a separate call context for each subchannel call.
3284 call_context_, call_combiner_, parent_data_size};
3285 grpc_error* error = GRPC_ERROR_NONE;
3286 subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error);
3287 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3288 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3289 chand, this, subchannel_call_.get(), grpc_error_string(error));
3291 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3292 PendingBatchesFail(elem, error, YieldCallCombiner);
3294 if (parent_data_size > 0) {
3295 new (subchannel_call_->GetParentData())
3296 SubchannelCallRetryState(call_context_);
3298 PendingBatchesResume(elem);
3302 void CallData::PickDone(void* arg, grpc_error* error) {
3303 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3304 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3305 CallData* calld = static_cast<CallData*>(elem->call_data);
3306 if (error != GRPC_ERROR_NONE) {
3307 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3309 "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3310 calld, grpc_error_string(error));
3312 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3315 calld->CreateSubchannelCall(elem);
3318 // A class to handle the call combiner cancellation callback for a
3320 class CallData::QueuedPickCanceller {
3322 explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3323 auto* calld = static_cast<CallData*>(elem->call_data);
3324 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3325 GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3326 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3327 grpc_combiner_scheduler(chand->data_plane_combiner()));
3328 calld->call_combiner_->SetNotifyOnCancel(&closure_);
3332 static void CancelLocked(void* arg, grpc_error* error) {
3333 auto* self = static_cast<QueuedPickCanceller*>(arg);
3334 auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3335 auto* calld = static_cast<CallData*>(self->elem_->call_data);
3336 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3338 "chand=%p calld=%p: cancelling queued pick: "
3339 "error=%s self=%p calld->pick_canceller=%p",
3340 chand, calld, grpc_error_string(error), self,
3341 calld->pick_canceller_);
3343 if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3344 // Remove pick from list of queued picks.
3345 calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3346 // Fail pending batches on the call.
3347 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3348 YieldCallCombinerIfPendingBatchesFound);
3350 GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3354 grpc_call_element* elem_;
3355 grpc_closure closure_;
3358 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3359 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3360 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3361 gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3364 chand->RemoveQueuedPick(&pick_, pollent_);
3365 pick_queued_ = false;
3366 // Lame the call combiner canceller.
3367 pick_canceller_ = nullptr;
3370 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3371 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3372 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3373 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3376 pick_queued_ = true;
3378 chand->AddQueuedPick(&pick_, pollent_);
3379 // Register call combiner cancellation callback.
3380 pick_canceller_ = New<QueuedPickCanceller>(elem);
3383 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3384 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3385 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3386 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3389 // Store a ref to the service_config in service_config_call_data_. Also, save
3390 // a pointer to this in the call_context so that all future filters can access
3392 service_config_call_data_ =
3393 ServiceConfig::CallData(chand->service_config(), path_);
3394 if (service_config_call_data_.service_config() != nullptr) {
3395 call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
3396 &service_config_call_data_;
3397 method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
3398 service_config_call_data_.GetMethodParsedConfig(
3399 internal::ClientChannelServiceConfigParser::ParserIndex()));
3401 retry_throttle_data_ = chand->retry_throttle_data();
3402 if (method_params_ != nullptr) {
3403 // If the deadline from the service config is shorter than the one
3404 // from the client API, reset the deadline timer.
3405 if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3406 const grpc_millis per_method_deadline =
3407 grpc_timespec_to_millis_round_up(call_start_time_) +
3408 method_params_->timeout();
3409 if (per_method_deadline < deadline_) {
3410 deadline_ = per_method_deadline;
3411 grpc_deadline_state_reset(elem, deadline_);
3414 // If the service config set wait_for_ready and the application
3415 // did not explicitly set it, use the value from the service config.
3416 uint32_t* send_initial_metadata_flags =
3417 &pending_batches_[0]
3418 .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3419 if (method_params_->wait_for_ready().has_value() &&
3420 !(*send_initial_metadata_flags &
3421 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3422 if (method_params_->wait_for_ready().value()) {
3423 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3425 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3429 // If no retry policy, disable retries.
3430 // TODO(roth): Remove this when adding support for transparent retries.
3431 if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3432 enable_retries_ = false;
3436 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3437 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3438 // Apply service config data to the call only once, and only if the
3439 // channel has the data available.
3440 if (GPR_LIKELY(chand->received_service_config_data() &&
3441 !service_config_applied_)) {
3442 service_config_applied_ = true;
3443 ApplyServiceConfigToCallLocked(elem);
3447 const char* PickResultTypeName(
3448 LoadBalancingPolicy::PickResult::ResultType type) {
3450 case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
3452 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3454 case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE:
3455 return "TRANSIENT_FAILURE";
3457 GPR_UNREACHABLE_CODE(return "UNKNOWN");
3460 void CallData::StartPickLocked(void* arg, grpc_error* error) {
3461 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3462 CallData* calld = static_cast<CallData*>(elem->call_data);
3463 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3464 GPR_ASSERT(calld->connected_subchannel_ == nullptr);
3465 GPR_ASSERT(calld->subchannel_call_ == nullptr);
3466 // Apply service config to call if needed.
3467 calld->MaybeApplyServiceConfigToCallLocked(elem);
3468 // If this is a retry, use the send_initial_metadata payload that
3469 // we've cached; otherwise, use the pending batch. The
3470 // send_initial_metadata batch will be the first pending batch in the
3471 // list, as set by GetBatchIndex() above.
3472 // TODO(roth): What if the LB policy needs to add something to the
3473 // call's initial metadata, and then there's a retry? We don't want
3474 // the new metadata to be added twice. We might need to somehow
3475 // allocate the subchannel batch earlier so that we can give the
3476 // subchannel's copy of the metadata batch (which is copied for each
3477 // attempt) to the LB policy instead the one from the parent channel.
3478 LoadBalancingPolicy::PickArgs pick_args;
3479 pick_args.call_state = &calld->lb_call_state_;
3480 pick_args.initial_metadata =
3481 calld->seen_send_initial_metadata_
3482 ? &calld->send_initial_metadata_
3483 : calld->pending_batches_[0]
3484 .batch->payload->send_initial_metadata.send_initial_metadata;
3485 // Grab initial metadata flags so that we can check later if the call has
3486 // wait_for_ready enabled.
3487 const uint32_t send_initial_metadata_flags =
3488 calld->seen_send_initial_metadata_
3489 ? calld->send_initial_metadata_flags_
3490 : calld->pending_batches_[0]
3491 .batch->payload->send_initial_metadata
3492 .send_initial_metadata_flags;
3493 // When done, we schedule this closure to leave the data plane combiner.
3494 GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
3495 grpc_schedule_on_exec_ctx);
3497 auto result = chand->picker()->Pick(pick_args);
3498 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3500 "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
3502 chand, calld, PickResultTypeName(result.type),
3503 result.connected_subchannel.get(), grpc_error_string(result.error));
3505 switch (result.type) {
3506 case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
3507 // If we're shutting down, fail all RPCs.
3508 grpc_error* disconnect_error = chand->disconnect_error();
3509 if (disconnect_error != GRPC_ERROR_NONE) {
3510 GRPC_ERROR_UNREF(result.error);
3511 GRPC_CLOSURE_SCHED(&calld->pick_closure_,
3512 GRPC_ERROR_REF(disconnect_error));
3515 // If wait_for_ready is false, then the error indicates the RPC
3516 // attempt's final status.
3517 if ((send_initial_metadata_flags &
3518 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3519 // Retry if appropriate; otherwise, fail.
3520 grpc_status_code status = GRPC_STATUS_OK;
3521 grpc_error_get_status(result.error, calld->deadline_, &status, nullptr,
3523 if (!calld->enable_retries_ ||
3524 !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
3525 nullptr /* server_pushback_md */)) {
3526 grpc_error* new_error =
3527 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3528 "Failed to pick subchannel", &result.error, 1);
3529 GRPC_ERROR_UNREF(result.error);
3530 GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
3532 if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3535 // If wait_for_ready is true, then queue to retry when we get a new
3537 GRPC_ERROR_UNREF(result.error);
3540 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3541 if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
3543 default: // PICK_COMPLETE
3545 if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
3546 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3547 "Call dropped by load balancing policy");
3549 calld->connected_subchannel_ = std::move(result.connected_subchannel);
3550 calld->lb_recv_trailing_metadata_ready_ =
3551 result.recv_trailing_metadata_ready;
3552 calld->lb_recv_trailing_metadata_ready_user_data_ =
3553 result.recv_trailing_metadata_ready_user_data;
3554 GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error);
3555 if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3560 } // namespace grpc_core
3562 /*************************************************************************
3566 using grpc_core::CallData;
3567 using grpc_core::ChannelData;
3569 const grpc_channel_filter grpc_client_channel_filter = {
3570 CallData::StartTransportStreamOpBatch,
3571 ChannelData::StartTransportOp,
3574 CallData::SetPollent,
3576 sizeof(ChannelData),
3578 ChannelData::Destroy,
3579 ChannelData::GetChannelInfo,
3583 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3584 grpc_channel_element* elem, int try_to_connect) {
3585 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3586 return chand->CheckConnectivityState(try_to_connect);
3589 int grpc_client_channel_num_external_connectivity_watchers(
3590 grpc_channel_element* elem) {
3591 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3592 return chand->NumExternalConnectivityWatchers();
3595 void grpc_client_channel_watch_connectivity_state(
3596 grpc_channel_element* elem, grpc_polling_entity pollent,
3597 grpc_connectivity_state* state, grpc_closure* closure,
3598 grpc_closure* watcher_timer_init) {
3599 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3600 return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3601 watcher_timer_init);
3604 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3605 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3606 auto* calld = static_cast<CallData*>(elem->call_data);
3607 return calld->subchannel_call();