2 // Copyright 2018 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <grpc/support/port_platform.h>
19 #include "src/core/ext/xds/xds_client.h"
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_bootstrap.h"
39 #include "src/core/ext/xds/xds_channel_args.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/ext/xds/xds_http_filters.h"
42 #include "src/core/lib/address_utils/sockaddr_utils.h"
43 #include "src/core/lib/backoff/backoff.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/channel/channel_stack.h"
46 #include "src/core/lib/gpr/env.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/memory.h"
49 #include "src/core/lib/gprpp/orphanable.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/gprpp/sync.h"
52 #include "src/core/lib/iomgr/sockaddr.h"
53 #include "src/core/lib/iomgr/timer.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/slice/slice_string_helpers.h"
56 #include "src/core/lib/surface/call.h"
57 #include "src/core/lib/surface/channel.h"
58 #include "src/core/lib/surface/channel_init.h"
59 #include "src/core/lib/transport/static_metadata.h"
61 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
62 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
63 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
64 #define GRPC_XDS_RECONNECT_JITTER 0.2
65 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
69 TraceFlag grpc_xds_client_trace(false, "xds_client");
70 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
74 Mutex* g_mu = nullptr;
75 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
76 XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
77 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
82 // Internal class declarations
85 // An xds call wrapper that can restart a call upon failure. Holds a ref to
86 // the xds channel. The template parameter is the kind of wrapped xds call.
88 class XdsClient::ChannelState::RetryableCall
89 : public InternallyRefCounted<RetryableCall<T>> {
91 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
93 void Orphan() override;
95 void OnCallFinishedLocked();
97 T* calld() const { return calld_.get(); }
98 ChannelState* chand() const { return chand_.get(); }
100 bool IsCurrentCallOnChannel() const;
103 void StartNewCallLocked();
104 void StartRetryTimerLocked();
105 static void OnRetryTimer(void* arg, grpc_error_handle error);
106 void OnRetryTimerLocked(grpc_error_handle error);
108 // The wrapped xds call that talks to the xds server. It's instantiated
109 // every time we start a new call. It's null during call retry backoff.
110 OrphanablePtr<T> calld_;
111 // The owning xds channel.
112 RefCountedPtr<ChannelState> chand_;
116 grpc_timer retry_timer_;
117 grpc_closure on_retry_timer_;
118 bool retry_timer_callback_pending_ = false;
120 bool shutting_down_ = false;
123 // Contains an ADS call to the xds server.
124 class XdsClient::ChannelState::AdsCallState
125 : public InternallyRefCounted<AdsCallState> {
127 // The ctor and dtor should not be used directly.
128 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
129 ~AdsCallState() override;
131 void Orphan() override;
133 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
134 ChannelState* chand() const { return parent_->chand(); }
135 XdsClient* xds_client() const { return chand()->xds_client(); }
136 bool seen_response() const { return seen_response_; }
138 void SubscribeLocked(const std::string& type_url, const std::string& name)
139 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
140 void UnsubscribeLocked(const std::string& type_url, const std::string& name,
141 bool delay_unsubscription)
142 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
144 bool HasSubscribedResources() const;
147 class ResourceState : public InternallyRefCounted<ResourceState> {
149 ResourceState(const std::string& type_url, const std::string& name,
150 bool sent_initial_request)
151 : type_url_(type_url),
153 sent_initial_request_(sent_initial_request) {
154 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
155 grpc_schedule_on_exec_ctx);
158 void Orphan() override {
160 Unref(DEBUG_LOCATION, "Orphan");
163 void Start(RefCountedPtr<AdsCallState> ads_calld) {
164 if (sent_initial_request_) return;
165 sent_initial_request_ = true;
166 ads_calld_ = std::move(ads_calld);
167 Ref(DEBUG_LOCATION, "timer").release();
168 timer_pending_ = true;
171 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
176 if (timer_pending_) {
177 grpc_timer_cancel(&timer_);
178 timer_pending_ = false;
183 static void OnTimer(void* arg, grpc_error_handle error) {
184 ResourceState* self = static_cast<ResourceState*>(arg);
186 MutexLock lock(&self->ads_calld_->xds_client()->mu_);
187 self->OnTimerLocked(GRPC_ERROR_REF(error));
189 self->ads_calld_.reset();
190 self->Unref(DEBUG_LOCATION, "timer");
193 void OnTimerLocked(grpc_error_handle error)
194 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
195 if (error == GRPC_ERROR_NONE && timer_pending_) {
196 timer_pending_ = false;
197 grpc_error_handle watcher_error =
198 GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
199 "timeout obtaining resource {type=%s name=%s} from xds server",
201 watcher_error = grpc_error_set_int(
202 watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
203 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
204 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
205 grpc_error_std_string(watcher_error).c_str());
207 if (type_url_ == XdsApi::kLdsTypeUrl) {
208 ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
209 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
210 for (const auto& p : state.watchers) {
211 p.first->OnError(GRPC_ERROR_REF(watcher_error));
213 } else if (type_url_ == XdsApi::kRdsTypeUrl) {
214 RouteConfigState& state =
215 ads_calld_->xds_client()->route_config_map_[name_];
216 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
217 for (const auto& p : state.watchers) {
218 p.first->OnError(GRPC_ERROR_REF(watcher_error));
220 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
221 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
222 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
223 for (const auto& p : state.watchers) {
224 p.first->OnError(GRPC_ERROR_REF(watcher_error));
226 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
227 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
228 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
229 for (const auto& p : state.watchers) {
230 p.first->OnError(GRPC_ERROR_REF(watcher_error));
233 GPR_UNREACHABLE_CODE(return );
235 GRPC_ERROR_UNREF(watcher_error);
237 GRPC_ERROR_UNREF(error);
240 const std::string type_url_;
241 const std::string name_;
243 RefCountedPtr<AdsCallState> ads_calld_;
244 bool sent_initial_request_;
245 bool timer_pending_ = false;
247 grpc_closure timer_callback_;
250 struct ResourceTypeState {
251 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
253 // Nonce and error for this resource type.
255 grpc_error_handle error = GRPC_ERROR_NONE;
257 // Subscribed resources of this type.
258 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
259 subscribed_resources;
262 void SendMessageLocked(const std::string& type_url)
263 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
265 void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
266 XdsApi::LdsUpdateMap lds_update_map,
267 const std::set<std::string>& resource_names_failed)
268 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
269 void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
270 XdsApi::RdsUpdateMap rds_update_map)
271 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
272 void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
273 XdsApi::CdsUpdateMap cds_update_map,
274 const std::set<std::string>& resource_names_failed)
275 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
276 void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
277 XdsApi::EdsUpdateMap eds_update_map)
278 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
280 template <typename StateMap>
281 void RejectAdsUpdateLocked(grpc_millis update_time,
282 const XdsApi::AdsParseResult& result,
284 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
286 static void OnRequestSent(void* arg, grpc_error_handle error);
287 void OnRequestSentLocked(grpc_error_handle error)
288 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
289 static void OnResponseReceived(void* arg, grpc_error_handle error);
290 bool OnResponseReceivedLocked()
291 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
292 static void OnStatusReceived(void* arg, grpc_error_handle error);
293 void OnStatusReceivedLocked(grpc_error_handle error)
294 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
296 bool IsCurrentCallOnChannel() const;
298 std::set<absl::string_view> ResourceNamesForRequest(
299 const std::string& type_url);
301 // The owning RetryableCall<>.
302 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
304 bool sent_initial_message_ = false;
305 bool seen_response_ = false;
310 // recv_initial_metadata
311 grpc_metadata_array initial_metadata_recv_;
314 grpc_byte_buffer* send_message_payload_ = nullptr;
315 grpc_closure on_request_sent_;
318 grpc_byte_buffer* recv_message_payload_ = nullptr;
319 grpc_closure on_response_received_;
321 // recv_trailing_metadata
322 grpc_metadata_array trailing_metadata_recv_;
323 grpc_status_code status_code_;
324 grpc_slice status_details_;
325 grpc_closure on_status_received_;
327 // Resource types for which requests need to be sent.
328 std::set<std::string /*type_url*/> buffered_requests_;
330 // State for each resource type.
331 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
334 // Contains an LRS call to the xds server.
335 class XdsClient::ChannelState::LrsCallState
336 : public InternallyRefCounted<LrsCallState> {
338 // The ctor and dtor should not be used directly.
339 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
340 ~LrsCallState() override;
342 void Orphan() override;
344 void MaybeStartReportingLocked();
346 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
347 ChannelState* chand() const { return parent_->chand(); }
348 XdsClient* xds_client() const { return chand()->xds_client(); }
349 bool seen_response() const { return seen_response_; }
352 // Reports client-side load stats according to a fixed interval.
353 class Reporter : public InternallyRefCounted<Reporter> {
355 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
356 : parent_(std::move(parent)), report_interval_(report_interval) {
357 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
358 grpc_schedule_on_exec_ctx);
359 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
360 grpc_schedule_on_exec_ctx);
361 ScheduleNextReportLocked();
364 void Orphan() override;
367 void ScheduleNextReportLocked()
368 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
369 static void OnNextReportTimer(void* arg, grpc_error_handle error);
370 bool OnNextReportTimerLocked(grpc_error_handle error)
371 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
372 bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
373 static void OnReportDone(void* arg, grpc_error_handle error);
374 bool OnReportDoneLocked(grpc_error_handle error)
375 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
377 bool IsCurrentReporterOnCall() const {
378 return this == parent_->reporter_.get();
380 XdsClient* xds_client() const { return parent_->xds_client(); }
382 // The owning LRS call.
383 RefCountedPtr<LrsCallState> parent_;
385 // The load reporting state.
386 const grpc_millis report_interval_;
387 bool last_report_counters_were_zero_ = false;
388 bool next_report_timer_callback_pending_ = false;
389 grpc_timer next_report_timer_;
390 grpc_closure on_next_report_timer_;
391 grpc_closure on_report_done_;
394 static void OnInitialRequestSent(void* arg, grpc_error_handle error);
395 void OnInitialRequestSentLocked()
396 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
397 static void OnResponseReceived(void* arg, grpc_error_handle error);
398 bool OnResponseReceivedLocked()
399 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
400 static void OnStatusReceived(void* arg, grpc_error_handle error);
401 void OnStatusReceivedLocked(grpc_error_handle error)
402 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
404 bool IsCurrentCallOnChannel() const;
406 // The owning RetryableCall<>.
407 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
408 bool seen_response_ = false;
413 // recv_initial_metadata
414 grpc_metadata_array initial_metadata_recv_;
417 grpc_byte_buffer* send_message_payload_ = nullptr;
418 grpc_closure on_initial_request_sent_;
421 grpc_byte_buffer* recv_message_payload_ = nullptr;
422 grpc_closure on_response_received_;
424 // recv_trailing_metadata
425 grpc_metadata_array trailing_metadata_recv_;
426 grpc_status_code status_code_;
427 grpc_slice status_details_;
428 grpc_closure on_status_received_;
430 // Load reporting state.
431 bool send_all_clusters_ = false;
432 std::set<std::string> cluster_names_; // Asked for by the LRS server.
433 grpc_millis load_reporting_interval_ = 0;
434 OrphanablePtr<Reporter> reporter_;
438 // XdsClient::ChannelState::StateWatcher
441 class XdsClient::ChannelState::StateWatcher
442 : public AsyncConnectivityStateWatcherInterface {
444 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
445 : parent_(std::move(parent)) {}
448 void OnConnectivityStateChange(grpc_connectivity_state new_state,
449 const absl::Status& status) override {
450 MutexLock lock(&parent_->xds_client_->mu_);
451 if (!parent_->shutting_down_ &&
452 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
453 // In TRANSIENT_FAILURE. Notify all watchers of error.
455 "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
456 "status_message:(%s)",
457 parent_->xds_client(), status.ToString().c_str());
458 parent_->xds_client_->NotifyOnErrorLocked(
459 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
460 "xds channel in TRANSIENT_FAILURE"));
464 RefCountedPtr<ChannelState> parent_;
468 // XdsClient::ChannelState
473 grpc_channel* CreateXdsChannel(grpc_channel_args* args,
474 const XdsBootstrap::XdsServer& server) {
475 RefCountedPtr<grpc_channel_credentials> channel_creds =
476 XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
477 server.channel_creds_config);
478 return grpc_secure_channel_create(channel_creds.get(),
479 server.server_uri.c_str(), args, nullptr);
484 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
485 const XdsBootstrap::XdsServer& server)
486 : InternallyRefCounted<ChannelState>(
487 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
490 xds_client_(std::move(xds_client)),
492 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
493 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
494 xds_client_.get(), server.server_uri.c_str());
496 channel_ = CreateXdsChannel(xds_client_->args_, server);
497 GPR_ASSERT(channel_ != nullptr);
498 StartConnectivityWatchLocked();
501 XdsClient::ChannelState::~ChannelState() {
502 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
503 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
506 grpc_channel_destroy(channel_);
507 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
510 void XdsClient::ChannelState::Orphan() {
511 shutting_down_ = true;
512 CancelConnectivityWatchLocked();
515 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
518 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
520 return ads_calld_->calld();
523 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
525 return lrs_calld_->calld();
528 bool XdsClient::ChannelState::HasActiveAdsCall() const {
529 return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
532 void XdsClient::ChannelState::MaybeStartLrsCall() {
533 if (lrs_calld_ != nullptr) return;
535 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
538 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
540 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
541 ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
542 GPR_ASSERT(client_channel != nullptr);
543 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
544 client_channel->AddConnectivityWatcher(
546 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
549 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
550 ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
551 GPR_ASSERT(client_channel != nullptr);
552 client_channel->RemoveConnectivityWatcher(watcher_);
555 void XdsClient::ChannelState::SubscribeLocked(const std::string& type_url,
556 const std::string& name) {
557 if (ads_calld_ == nullptr) {
558 // Start the ADS call if this is the first request.
559 ads_calld_.reset(new RetryableCall<AdsCallState>(
560 Ref(DEBUG_LOCATION, "ChannelState+ads")));
561 // Note: AdsCallState's ctor will automatically subscribe to all
562 // resources that the XdsClient already has watchers for, so we can
566 // If the ADS call is in backoff state, we don't need to do anything now
567 // because when the call is restarted it will resend all necessary requests.
568 if (ads_calld() == nullptr) return;
569 // Subscribe to this resource if the ADS call is active.
570 ads_calld()->SubscribeLocked(type_url, name);
573 void XdsClient::ChannelState::UnsubscribeLocked(const std::string& type_url,
574 const std::string& name,
575 bool delay_unsubscription) {
576 if (ads_calld_ != nullptr) {
577 auto* calld = ads_calld_->calld();
578 if (calld != nullptr) {
579 calld->UnsubscribeLocked(type_url, name, delay_unsubscription);
580 if (!calld->HasSubscribedResources()) ads_calld_.reset();
586 // XdsClient::ChannelState::RetryableCall<>
589 template <typename T>
590 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
591 RefCountedPtr<ChannelState> chand)
592 : chand_(std::move(chand)),
595 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
597 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
598 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
599 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
600 // Closure Initialization
601 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
602 grpc_schedule_on_exec_ctx);
603 StartNewCallLocked();
606 template <typename T>
607 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
608 shutting_down_ = true;
610 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
611 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
614 template <typename T>
615 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
616 const bool seen_response = calld_->seen_response();
619 // If we lost connection to the xds server, reset backoff and restart the
622 StartNewCallLocked();
624 // If we failed to connect to the xds server, retry later.
625 StartRetryTimerLocked();
629 template <typename T>
630 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
631 if (shutting_down_) return;
632 GPR_ASSERT(chand_->channel_ != nullptr);
633 GPR_ASSERT(calld_ == nullptr);
634 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
636 "[xds_client %p] Start new call from retryable call (chand: %p, "
637 "retryable call: %p)",
638 chand()->xds_client(), chand(), this);
640 calld_ = MakeOrphanable<T>(
641 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
644 template <typename T>
645 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
646 if (shutting_down_) return;
647 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
648 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
649 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
651 "[xds_client %p] Failed to connect to xds server (chand: %p) "
652 "retry timer will fire in %" PRId64 "ms.",
653 chand()->xds_client(), chand(), timeout);
655 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
656 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
657 retry_timer_callback_pending_ = true;
660 template <typename T>
661 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
662 void* arg, grpc_error_handle error) {
663 RetryableCall* calld = static_cast<RetryableCall*>(arg);
665 MutexLock lock(&calld->chand_->xds_client()->mu_);
666 calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
668 calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
671 template <typename T>
672 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
673 grpc_error_handle error) {
674 retry_timer_callback_pending_ = false;
675 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
676 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
679 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
680 chand()->xds_client(), chand(), this);
682 StartNewCallLocked();
684 GRPC_ERROR_UNREF(error);
688 // XdsClient::ChannelState::AdsCallState
691 XdsClient::ChannelState::AdsCallState::AdsCallState(
692 RefCountedPtr<RetryableCall<AdsCallState>> parent)
693 : InternallyRefCounted<AdsCallState>(
694 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
697 parent_(std::move(parent)) {
698 // Init the ADS call. Note that the call will progress every time there's
699 // activity in xds_client()->interested_parties_, which is comprised of
700 // the polling entities from client_channel.
701 GPR_ASSERT(xds_client() != nullptr);
702 // Create a call with the specified method name.
704 chand()->server_.ShouldUseV3()
705 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
706 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
707 call_ = grpc_channel_create_pollset_set_call(
708 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
709 xds_client()->interested_parties_, method, nullptr,
710 GRPC_MILLIS_INF_FUTURE, nullptr);
711 GPR_ASSERT(call_ != nullptr);
712 // Init data associated with the call.
713 grpc_metadata_array_init(&initial_metadata_recv_);
714 grpc_metadata_array_init(&trailing_metadata_recv_);
716 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
718 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
720 xds_client(), chand(), this, call_);
723 grpc_call_error call_error;
725 memset(ops, 0, sizeof(ops));
726 // Op: send initial metadata.
728 op->op = GRPC_OP_SEND_INITIAL_METADATA;
729 op->data.send_initial_metadata.count = 0;
730 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
731 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
732 op->reserved = nullptr;
734 call_error = grpc_call_start_batch_and_execute(
735 call_, ops, static_cast<size_t>(op - ops), nullptr);
736 GPR_ASSERT(GRPC_CALL_OK == call_error);
737 // Op: send request message.
738 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
739 grpc_schedule_on_exec_ctx);
740 for (const auto& p : xds_client()->listener_map_) {
741 SubscribeLocked(XdsApi::kLdsTypeUrl, std::string(p.first));
743 for (const auto& p : xds_client()->route_config_map_) {
744 SubscribeLocked(XdsApi::kRdsTypeUrl, std::string(p.first));
746 for (const auto& p : xds_client()->cluster_map_) {
747 SubscribeLocked(XdsApi::kCdsTypeUrl, std::string(p.first));
749 for (const auto& p : xds_client()->endpoint_map_) {
750 SubscribeLocked(XdsApi::kEdsTypeUrl, std::string(p.first));
752 // Op: recv initial metadata.
754 op->op = GRPC_OP_RECV_INITIAL_METADATA;
755 op->data.recv_initial_metadata.recv_initial_metadata =
756 &initial_metadata_recv_;
758 op->reserved = nullptr;
760 // Op: recv response.
761 op->op = GRPC_OP_RECV_MESSAGE;
762 op->data.recv_message.recv_message = &recv_message_payload_;
764 op->reserved = nullptr;
766 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
767 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
768 grpc_schedule_on_exec_ctx);
769 call_error = grpc_call_start_batch_and_execute(
770 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
771 GPR_ASSERT(GRPC_CALL_OK == call_error);
772 // Op: recv server status.
774 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
775 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
776 op->data.recv_status_on_client.status = &status_code_;
777 op->data.recv_status_on_client.status_details = &status_details_;
779 op->reserved = nullptr;
781 // This callback signals the end of the call, so it relies on the initial
782 // ref instead of a new ref. When it's invoked, it's the initial ref that is
784 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
785 grpc_schedule_on_exec_ctx);
786 call_error = grpc_call_start_batch_and_execute(
787 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
788 GPR_ASSERT(GRPC_CALL_OK == call_error);
791 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
792 grpc_metadata_array_destroy(&initial_metadata_recv_);
793 grpc_metadata_array_destroy(&trailing_metadata_recv_);
794 grpc_byte_buffer_destroy(send_message_payload_);
795 grpc_byte_buffer_destroy(recv_message_payload_);
796 grpc_slice_unref_internal(status_details_);
797 GPR_ASSERT(call_ != nullptr);
798 grpc_call_unref(call_);
801 void XdsClient::ChannelState::AdsCallState::Orphan() {
802 GPR_ASSERT(call_ != nullptr);
803 // If we are here because xds_client wants to cancel the call,
804 // on_status_received_ will complete the cancellation and clean up. Otherwise,
805 // we are here because xds_client has to orphan a failed call, then the
806 // following cancellation will be a no-op.
807 grpc_call_cancel_internal(call_);
809 // Note that the initial ref is hold by on_status_received_. So the
810 // corresponding unref happens in on_status_received_ instead of here.
813 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
814 const std::string& type_url)
815 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
816 // Buffer message sending if an existing message is in flight.
817 if (send_message_payload_ != nullptr) {
818 buffered_requests_.insert(type_url);
821 auto& state = state_map_[type_url];
822 grpc_slice request_payload_slice;
823 std::set<absl::string_view> resource_names =
824 ResourceNamesForRequest(type_url);
825 request_payload_slice = xds_client()->api_.CreateAdsRequest(
826 chand()->server_, type_url, resource_names,
827 xds_client()->resource_version_map_[type_url], state.nonce,
828 GRPC_ERROR_REF(state.error), !sent_initial_message_);
829 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
830 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
831 state_map_.erase(type_url);
833 sent_initial_message_ = true;
834 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
836 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
837 "error=%s resources=%s",
838 xds_client(), type_url.c_str(),
839 xds_client()->resource_version_map_[type_url].c_str(),
840 state.nonce.c_str(), grpc_error_std_string(state.error).c_str(),
841 absl::StrJoin(resource_names, " ").c_str());
843 GRPC_ERROR_UNREF(state.error);
844 state.error = GRPC_ERROR_NONE;
845 // Create message payload.
846 send_message_payload_ =
847 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
848 grpc_slice_unref_internal(request_payload_slice);
851 memset(&op, 0, sizeof(op));
852 op.op = GRPC_OP_SEND_MESSAGE;
853 op.data.send_message.send_message = send_message_payload_;
854 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
855 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
856 grpc_schedule_on_exec_ctx);
857 grpc_call_error call_error =
858 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
859 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
861 "[xds_client %p] calld=%p call_error=%d sending ADS message",
862 xds_client(), this, call_error);
863 GPR_ASSERT(GRPC_CALL_OK == call_error);
867 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
868 const std::string& type_url, const std::string& name) {
869 auto& state = state_map_[type_url].subscribed_resources[name];
870 if (state == nullptr) {
871 state = MakeOrphanable<ResourceState>(
872 type_url, name, !xds_client()->resource_version_map_[type_url].empty());
873 SendMessageLocked(type_url);
877 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
878 const std::string& type_url, const std::string& name,
879 bool delay_unsubscription) {
880 state_map_[type_url].subscribed_resources.erase(name);
881 if (!delay_unsubscription) SendMessageLocked(type_url);
884 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
885 for (const auto& p : state_map_) {
886 if (!p.second.subscribed_resources.empty()) return true;
893 // Build a resource metadata struct for ADS result accepting methods and CSDS.
894 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
895 std::string serialized_proto, std::string version,
896 grpc_millis update_time) {
897 XdsApi::ResourceMetadata resource_metadata;
898 resource_metadata.serialized_proto = std::move(serialized_proto);
899 resource_metadata.update_time = update_time;
900 resource_metadata.version = std::move(version);
901 resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
902 return resource_metadata;
907 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
908 std::string version, grpc_millis update_time,
909 XdsApi::LdsUpdateMap lds_update_map,
910 const std::set<std::string>& resource_names_failed) {
911 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
913 "[xds_client %p] LDS update received containing %" PRIuPTR
915 xds_client(), lds_update_map.size());
917 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
918 std::set<std::string> rds_resource_names_seen;
919 for (auto& p : lds_update_map) {
920 const std::string& listener_name = p.first;
921 XdsApi::LdsUpdate& lds_update = p.second.resource;
922 auto& state = lds_state.subscribed_resources[listener_name];
923 if (state != nullptr) state->Finish();
924 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
925 gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
926 listener_name.c_str(), lds_update.ToString().c_str());
928 // Record the RDS resource names seen.
929 if (!lds_update.http_connection_manager.route_config_name.empty()) {
930 rds_resource_names_seen.insert(
931 lds_update.http_connection_manager.route_config_name);
933 // Ignore identical update.
934 ListenerState& listener_state = xds_client()->listener_map_[listener_name];
935 if (listener_state.update.has_value() &&
936 *listener_state.update == lds_update) {
937 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
939 "[xds_client %p] LDS update for %s identical to current, "
941 xds_client(), listener_name.c_str());
945 // Update the listener state.
946 listener_state.update = std::move(lds_update);
947 listener_state.meta = CreateResourceMetadataAcked(
948 std::move(p.second.serialized_proto), version, update_time);
950 for (const auto& p : listener_state.watchers) {
951 p.first->OnListenerChanged(*listener_state.update);
954 // For invalid resources in the update, if they are already in the
955 // cache, pretend that they are present in the update, so that we
956 // don't incorrectly consider them deleted below.
957 for (const std::string& listener_name : resource_names_failed) {
958 auto it = xds_client()->listener_map_.find(listener_name);
959 if (it != xds_client()->listener_map_.end()) {
960 auto& resource = it->second.update;
961 if (!resource.has_value()) continue;
962 lds_update_map[listener_name];
963 if (!resource->http_connection_manager.route_config_name.empty()) {
964 rds_resource_names_seen.insert(
965 resource->http_connection_manager.route_config_name);
969 // For any subscribed resource that is not present in the update,
970 // remove it from the cache and notify watchers that it does not exist.
971 for (const auto& p : lds_state.subscribed_resources) {
972 const std::string& listener_name = p.first;
973 if (lds_update_map.find(listener_name) == lds_update_map.end()) {
974 ListenerState& listener_state =
975 xds_client()->listener_map_[listener_name];
976 // If the resource was newly requested but has not yet been received,
977 // we don't want to generate an error for the watchers, because this LDS
978 // response may be in reaction to an earlier request that did not yet
979 // request the new resource, so its absence from the response does not
980 // necessarily indicate that the resource does not exist.
981 // For that case, we rely on the request timeout instead.
982 if (!listener_state.update.has_value()) continue;
983 listener_state.update.reset();
984 for (const auto& p : listener_state.watchers) {
985 p.first->OnResourceDoesNotExist();
989 // For any RDS resource that is no longer referred to by any LDS
990 // resources, remove it from the cache and notify watchers that it
992 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
993 for (const auto& p : rds_state.subscribed_resources) {
994 const std::string& rds_resource_name = p.first;
995 if (rds_resource_names_seen.find(rds_resource_name) ==
996 rds_resource_names_seen.end()) {
997 RouteConfigState& route_config_state =
998 xds_client()->route_config_map_[rds_resource_name];
999 route_config_state.update.reset();
1000 for (const auto& p : route_config_state.watchers) {
1001 p.first->OnResourceDoesNotExist();
1007 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
1008 std::string version, grpc_millis update_time,
1009 XdsApi::RdsUpdateMap rds_update_map) {
1010 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1012 "[xds_client %p] RDS update received containing %" PRIuPTR
1014 xds_client(), rds_update_map.size());
1016 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
1017 for (auto& p : rds_update_map) {
1018 const std::string& route_config_name = p.first;
1019 XdsApi::RdsUpdate& rds_update = p.second.resource;
1020 auto& state = rds_state.subscribed_resources[route_config_name];
1021 if (state != nullptr) state->Finish();
1022 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1023 gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
1024 rds_update.ToString().c_str());
1026 RouteConfigState& route_config_state =
1027 xds_client()->route_config_map_[route_config_name];
1028 // Ignore identical update.
1029 if (route_config_state.update.has_value() &&
1030 *route_config_state.update == rds_update) {
1031 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1033 "[xds_client %p] RDS resource identical to current, ignoring",
1038 // Update the cache.
1039 route_config_state.update = std::move(rds_update);
1040 route_config_state.meta = CreateResourceMetadataAcked(
1041 std::move(p.second.serialized_proto), version, update_time);
1042 // Notify all watchers.
1043 for (const auto& p : route_config_state.watchers) {
1044 p.first->OnRouteConfigChanged(*route_config_state.update);
1049 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
1050 std::string version, grpc_millis update_time,
1051 XdsApi::CdsUpdateMap cds_update_map,
1052 const std::set<std::string>& resource_names_failed) {
1053 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1055 "[xds_client %p] CDS update received containing %" PRIuPTR
1057 xds_client(), cds_update_map.size());
1059 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1060 std::set<std::string> eds_resource_names_seen;
1061 for (auto& p : cds_update_map) {
1062 const char* cluster_name = p.first.c_str();
1063 XdsApi::CdsUpdate& cds_update = p.second.resource;
1064 auto& state = cds_state.subscribed_resources[cluster_name];
1065 if (state != nullptr) state->Finish();
1066 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1067 gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1068 cluster_name, cds_update.ToString().c_str());
1070 // Record the EDS resource names seen.
1071 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1073 : cds_update.eds_service_name);
1074 // Ignore identical update.
1075 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1076 if (cluster_state.update.has_value() &&
1077 *cluster_state.update == cds_update) {
1078 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1080 "[xds_client %p] CDS update identical to current, ignoring.",
1085 // Update the cluster state.
1086 cluster_state.update = std::move(cds_update);
1087 cluster_state.meta = CreateResourceMetadataAcked(
1088 std::move(p.second.serialized_proto), version, update_time);
1089 // Notify all watchers.
1090 for (const auto& p : cluster_state.watchers) {
1091 p.first->OnClusterChanged(cluster_state.update.value());
1094 // For invalid resources in the update, if they are already in the
1095 // cache, pretend that they are present in the update, so that we
1096 // don't incorrectly consider them deleted below.
1097 for (const std::string& cluster_name : resource_names_failed) {
1098 auto it = xds_client()->cluster_map_.find(cluster_name);
1099 if (it != xds_client()->cluster_map_.end()) {
1100 auto& resource = it->second.update;
1101 if (!resource.has_value()) continue;
1102 cds_update_map[cluster_name];
1103 eds_resource_names_seen.insert(resource->eds_service_name.empty()
1105 : resource->eds_service_name);
1108 // For any subscribed resource that is not present in the update,
1109 // remove it from the cache and notify watchers that it does not exist.
1110 for (const auto& p : cds_state.subscribed_resources) {
1111 const std::string& cluster_name = p.first;
1112 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1113 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1114 // If the resource was newly requested but has not yet been received,
1115 // we don't want to generate an error for the watchers, because this CDS
1116 // response may be in reaction to an earlier request that did not yet
1117 // request the new resource, so its absence from the response does not
1118 // necessarily indicate that the resource does not exist.
1119 // For that case, we rely on the request timeout instead.
1120 if (!cluster_state.update.has_value()) continue;
1121 cluster_state.update.reset();
1122 for (const auto& p : cluster_state.watchers) {
1123 p.first->OnResourceDoesNotExist();
1127 // For any EDS resource that is no longer referred to by any CDS
1128 // resources, remove it from the cache and notify watchers that it
1130 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1131 for (const auto& p : eds_state.subscribed_resources) {
1132 const std::string& eds_resource_name = p.first;
1133 if (eds_resource_names_seen.find(eds_resource_name) ==
1134 eds_resource_names_seen.end()) {
1135 EndpointState& endpoint_state =
1136 xds_client()->endpoint_map_[eds_resource_name];
1137 endpoint_state.update.reset();
1138 for (const auto& p : endpoint_state.watchers) {
1139 p.first->OnResourceDoesNotExist();
1145 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
1146 std::string version, grpc_millis update_time,
1147 XdsApi::EdsUpdateMap eds_update_map) {
1148 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1150 "[xds_client %p] EDS update received containing %" PRIuPTR
1152 xds_client(), eds_update_map.size());
1154 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1155 for (auto& p : eds_update_map) {
1156 const char* eds_service_name = p.first.c_str();
1157 XdsApi::EdsUpdate& eds_update = p.second.resource;
1158 auto& state = eds_state.subscribed_resources[eds_service_name];
1159 if (state != nullptr) state->Finish();
1160 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1161 gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1162 eds_service_name, eds_update.ToString().c_str());
1164 EndpointState& endpoint_state =
1165 xds_client()->endpoint_map_[eds_service_name];
1166 // Ignore identical update.
1167 if (endpoint_state.update.has_value() &&
1168 *endpoint_state.update == eds_update) {
1169 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1171 "[xds_client %p] EDS update identical to current, ignoring.",
1176 // Update the cluster state.
1177 endpoint_state.update = std::move(eds_update);
1178 endpoint_state.meta = CreateResourceMetadataAcked(
1179 std::move(p.second.serialized_proto), version, update_time);
1180 // Notify all watchers.
1181 for (const auto& p : endpoint_state.watchers) {
1182 p.first->OnEndpointChanged(endpoint_state.update.value());
1189 // Update resource_metadata for NACK.
1190 void UpdateResourceMetadataNacked(const std::string& version,
1191 const std::string& details,
1192 grpc_millis update_time,
1193 XdsApi::ResourceMetadata* resource_metadata) {
1194 resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
1195 resource_metadata->failed_version = version;
1196 resource_metadata->failed_details = details;
1197 resource_metadata->failed_update_time = update_time;
1202 template <typename StateMap>
1203 void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked(
1204 grpc_millis update_time, const XdsApi::AdsParseResult& result,
1205 StateMap* state_map) {
1206 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1208 "[xds_client %p] %s update NACKed containing %" PRIuPTR
1209 " invalid resources",
1210 xds_client(), result.type_url.c_str(),
1211 result.resource_names_failed.size());
1213 std::string details = grpc_error_std_string(result.parse_error);
1214 for (auto& name : result.resource_names_failed) {
1215 auto it = state_map->find(name);
1216 if (it == state_map->end()) continue;
1217 auto& state = it->second;
1218 // Notify watchers of error.
1219 for (const auto& p : state.watchers) {
1220 p.first->OnError(GRPC_ERROR_REF(result.parse_error));
1222 // Update resource metadata for CSDS.
1223 UpdateResourceMetadataNacked(result.version, details, update_time,
1228 void XdsClient::ChannelState::AdsCallState::OnRequestSent(
1229 void* arg, grpc_error_handle error) {
1230 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1232 MutexLock lock(&ads_calld->xds_client()->mu_);
1233 ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1235 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1238 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1239 grpc_error_handle error) {
1240 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1241 // Clean up the sent message.
1242 grpc_byte_buffer_destroy(send_message_payload_);
1243 send_message_payload_ = nullptr;
1244 // Continue to send another pending message if any.
1245 // TODO(roth): The current code to handle buffered messages has the
1246 // advantage of sending only the most recent list of resource names for
1247 // each resource type (no matter how many times that resource type has
1248 // been requested to send while the current message sending is still
1249 // pending). But its disadvantage is that we send the requests in fixed
1250 // order of resource types. We need to fix this if we are seeing some
1251 // resource type(s) starved due to frequent requests of other resource
1253 auto it = buffered_requests_.begin();
1254 if (it != buffered_requests_.end()) {
1255 SendMessageLocked(*it);
1256 buffered_requests_.erase(it);
1259 GRPC_ERROR_UNREF(error);
1262 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1263 void* arg, grpc_error_handle /* error */) {
1264 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1267 MutexLock lock(&ads_calld->xds_client()->mu_);
1268 done = ads_calld->OnResponseReceivedLocked();
1270 if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1273 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1274 // Empty payload means the call was cancelled.
1275 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1278 // Read the response.
1279 grpc_byte_buffer_reader bbr;
1280 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1281 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1282 grpc_byte_buffer_reader_destroy(&bbr);
1283 grpc_byte_buffer_destroy(recv_message_payload_);
1284 recv_message_payload_ = nullptr;
1285 // Parse and validate the response.
1286 XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1287 chand()->server_, response_slice,
1288 ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1289 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1290 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1291 ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1292 grpc_slice_unref_internal(response_slice);
1293 if (result.type_url.empty()) {
1294 // Ignore unparsable response.
1296 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1297 xds_client(), grpc_error_std_string(result.parse_error).c_str());
1298 GRPC_ERROR_UNREF(result.parse_error);
1300 grpc_millis update_time = grpc_core::ExecCtx::Get()->Now();
1302 auto& state = state_map_[result.type_url];
1303 state.nonce = std::move(result.nonce);
1304 // If we got an error, we'll NACK the update.
1305 if (result.parse_error != GRPC_ERROR_NONE) {
1307 "[xds_client %p] ADS response invalid for resource type %s "
1308 "version %s, will NACK: nonce=%s error=%s",
1309 xds_client(), result.type_url.c_str(), result.version.c_str(),
1310 state.nonce.c_str(),
1311 grpc_error_std_string(result.parse_error).c_str());
1312 result.parse_error =
1313 grpc_error_set_int(result.parse_error, GRPC_ERROR_INT_GRPC_STATUS,
1314 GRPC_STATUS_UNAVAILABLE);
1315 GRPC_ERROR_UNREF(state.error);
1316 state.error = result.parse_error;
1317 if (result.type_url == XdsApi::kLdsTypeUrl) {
1318 RejectAdsUpdateLocked(update_time, result,
1319 &xds_client()->listener_map_);
1320 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1321 RejectAdsUpdateLocked(update_time, result,
1322 &xds_client()->route_config_map_);
1323 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1324 RejectAdsUpdateLocked(update_time, result, &xds_client()->cluster_map_);
1325 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1326 RejectAdsUpdateLocked(update_time, result,
1327 &xds_client()->endpoint_map_);
1330 // Process any valid resources.
1331 bool have_valid_resources = false;
1332 if (result.type_url == XdsApi::kLdsTypeUrl) {
1333 have_valid_resources = !result.lds_update_map.empty();
1334 AcceptLdsUpdateLocked(result.version, update_time,
1335 std::move(result.lds_update_map),
1336 result.resource_names_failed);
1337 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1338 have_valid_resources = !result.rds_update_map.empty();
1339 AcceptRdsUpdateLocked(result.version, update_time,
1340 std::move(result.rds_update_map));
1341 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1342 have_valid_resources = !result.cds_update_map.empty();
1343 AcceptCdsUpdateLocked(result.version, update_time,
1344 std::move(result.cds_update_map),
1345 result.resource_names_failed);
1346 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1347 have_valid_resources = !result.eds_update_map.empty();
1348 AcceptEdsUpdateLocked(result.version, update_time,
1349 std::move(result.eds_update_map));
1351 if (have_valid_resources) {
1352 seen_response_ = true;
1353 xds_client()->resource_version_map_[result.type_url] =
1354 std::move(result.version);
1355 // Start load reporting if needed.
1356 auto& lrs_call = chand()->lrs_calld_;
1357 if (lrs_call != nullptr) {
1358 LrsCallState* lrs_calld = lrs_call->calld();
1359 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1362 // Send ACK or NACK.
1363 SendMessageLocked(result.type_url);
1365 if (xds_client()->shutting_down_) return true;
1366 // Keep listening for updates.
1368 memset(&op, 0, sizeof(op));
1369 op.op = GRPC_OP_RECV_MESSAGE;
1370 op.data.recv_message.recv_message = &recv_message_payload_;
1372 op.reserved = nullptr;
1373 GPR_ASSERT(call_ != nullptr);
1374 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1375 const grpc_call_error call_error =
1376 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1377 GPR_ASSERT(GRPC_CALL_OK == call_error);
1381 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1382 void* arg, grpc_error_handle error) {
1383 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1385 MutexLock lock(&ads_calld->xds_client()->mu_);
1386 ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1388 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1391 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1392 grpc_error_handle error) {
1393 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1394 char* status_details = grpc_slice_to_c_string(status_details_);
1396 "[xds_client %p] ADS call status received. Status = %d, details "
1397 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1398 xds_client(), status_code_, status_details, chand(), this, call_,
1399 grpc_error_std_string(error).c_str());
1400 gpr_free(status_details);
1402 // Ignore status from a stale call.
1403 if (IsCurrentCallOnChannel()) {
1404 // Try to restart the call.
1405 parent_->OnCallFinishedLocked();
1406 // Send error to all watchers.
1407 xds_client()->NotifyOnErrorLocked(
1408 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1410 GRPC_ERROR_UNREF(error);
1413 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1414 // If the retryable ADS call is null (which only happens when the xds channel
1415 // is shutting down), all the ADS calls are stale.
1416 if (chand()->ads_calld_ == nullptr) return false;
1417 return this == chand()->ads_calld_->calld();
1420 std::set<absl::string_view>
1421 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1422 const std::string& type_url) {
1423 std::set<absl::string_view> resource_names;
1424 auto it = state_map_.find(type_url);
1425 if (it != state_map_.end()) {
1426 for (auto& p : it->second.subscribed_resources) {
1427 resource_names.insert(p.first);
1428 OrphanablePtr<ResourceState>& state = p.second;
1429 state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1432 return resource_names;
1436 // XdsClient::ChannelState::LrsCallState::Reporter
1439 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1440 if (next_report_timer_callback_pending_) {
1441 grpc_timer_cancel(&next_report_timer_);
1445 void XdsClient::ChannelState::LrsCallState::Reporter::
1446 ScheduleNextReportLocked() {
1447 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1448 grpc_timer_init(&next_report_timer_, next_report_time,
1449 &on_next_report_timer_);
1450 next_report_timer_callback_pending_ = true;
1453 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1454 void* arg, grpc_error_handle error) {
1455 Reporter* self = static_cast<Reporter*>(arg);
1458 MutexLock lock(&self->xds_client()->mu_);
1459 done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1461 if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1464 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1465 grpc_error_handle error) {
1466 next_report_timer_callback_pending_ = false;
1467 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1468 GRPC_ERROR_UNREF(error);
1471 return SendReportLocked();
1476 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1477 for (const auto& p : snapshot) {
1478 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1479 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1480 for (const auto& q : cluster_snapshot.locality_stats) {
1481 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1482 if (!locality_snapshot.IsZero()) return false;
1490 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1491 // Construct snapshot from all reported stats.
1492 XdsApi::ClusterLoadReportMap snapshot =
1493 xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1494 parent_->cluster_names_);
1495 // Skip client load report if the counters were all zero in the last
1496 // report and they are still zero in this one.
1497 const bool old_val = last_report_counters_were_zero_;
1498 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1499 if (old_val && last_report_counters_were_zero_) {
1500 if (xds_client()->load_report_map_.empty()) {
1501 parent_->chand()->StopLrsCall();
1504 ScheduleNextReportLocked();
1507 // Create a request that contains the snapshot.
1508 grpc_slice request_payload_slice =
1509 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1510 parent_->send_message_payload_ =
1511 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1512 grpc_slice_unref_internal(request_payload_slice);
1515 memset(&op, 0, sizeof(op));
1516 op.op = GRPC_OP_SEND_MESSAGE;
1517 op.data.send_message.send_message = parent_->send_message_payload_;
1518 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1519 parent_->call_, &op, 1, &on_report_done_);
1520 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1522 "[xds_client %p] calld=%p call_error=%d sending client load report",
1523 xds_client(), this, call_error);
1524 GPR_ASSERT(GRPC_CALL_OK == call_error);
1529 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1530 void* arg, grpc_error_handle error) {
1531 Reporter* self = static_cast<Reporter*>(arg);
1534 MutexLock lock(&self->xds_client()->mu_);
1535 done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1537 if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1540 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1541 grpc_error_handle error) {
1542 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1543 parent_->send_message_payload_ = nullptr;
1544 // If there are no more registered stats to report, cancel the call.
1545 if (xds_client()->load_report_map_.empty()) {
1546 parent_->chand()->StopLrsCall();
1547 GRPC_ERROR_UNREF(error);
1550 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1551 GRPC_ERROR_UNREF(error);
1552 // If this reporter is no longer the current one on the call, the reason
1553 // might be that it was orphaned for a new one due to config update.
1554 if (!IsCurrentReporterOnCall()) {
1555 parent_->MaybeStartReportingLocked();
1559 ScheduleNextReportLocked();
1564 // XdsClient::ChannelState::LrsCallState
1567 XdsClient::ChannelState::LrsCallState::LrsCallState(
1568 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1569 : InternallyRefCounted<LrsCallState>(
1570 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1573 parent_(std::move(parent)) {
1574 // Init the LRS call. Note that the call will progress every time there's
1575 // activity in xds_client()->interested_parties_, which is comprised of
1576 // the polling entities from client_channel.
1577 GPR_ASSERT(xds_client() != nullptr);
1578 const auto& method =
1579 chand()->server_.ShouldUseV3()
1580 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1581 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1582 call_ = grpc_channel_create_pollset_set_call(
1583 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1584 xds_client()->interested_parties_, method, nullptr,
1585 GRPC_MILLIS_INF_FUTURE, nullptr);
1586 GPR_ASSERT(call_ != nullptr);
1587 // Init the request payload.
1588 grpc_slice request_payload_slice =
1589 xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1590 send_message_payload_ =
1591 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1592 grpc_slice_unref_internal(request_payload_slice);
1593 // Init other data associated with the LRS call.
1594 grpc_metadata_array_init(&initial_metadata_recv_);
1595 grpc_metadata_array_init(&trailing_metadata_recv_);
1597 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1599 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1601 xds_client(), chand(), this, call_);
1604 grpc_call_error call_error;
1606 memset(ops, 0, sizeof(ops));
1607 // Op: send initial metadata.
1609 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1610 op->data.send_initial_metadata.count = 0;
1611 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1612 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1613 op->reserved = nullptr;
1615 // Op: send request message.
1616 GPR_ASSERT(send_message_payload_ != nullptr);
1617 op->op = GRPC_OP_SEND_MESSAGE;
1618 op->data.send_message.send_message = send_message_payload_;
1620 op->reserved = nullptr;
1622 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1623 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1624 grpc_schedule_on_exec_ctx);
1625 call_error = grpc_call_start_batch_and_execute(
1626 call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1627 GPR_ASSERT(GRPC_CALL_OK == call_error);
1628 // Op: recv initial metadata.
1630 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1631 op->data.recv_initial_metadata.recv_initial_metadata =
1632 &initial_metadata_recv_;
1634 op->reserved = nullptr;
1636 // Op: recv response.
1637 op->op = GRPC_OP_RECV_MESSAGE;
1638 op->data.recv_message.recv_message = &recv_message_payload_;
1640 op->reserved = nullptr;
1642 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1643 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1644 grpc_schedule_on_exec_ctx);
1645 call_error = grpc_call_start_batch_and_execute(
1646 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1647 GPR_ASSERT(GRPC_CALL_OK == call_error);
1648 // Op: recv server status.
1650 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1651 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1652 op->data.recv_status_on_client.status = &status_code_;
1653 op->data.recv_status_on_client.status_details = &status_details_;
1655 op->reserved = nullptr;
1657 // This callback signals the end of the call, so it relies on the initial
1658 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1660 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1661 grpc_schedule_on_exec_ctx);
1662 call_error = grpc_call_start_batch_and_execute(
1663 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1664 GPR_ASSERT(GRPC_CALL_OK == call_error);
1667 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1668 grpc_metadata_array_destroy(&initial_metadata_recv_);
1669 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1670 grpc_byte_buffer_destroy(send_message_payload_);
1671 grpc_byte_buffer_destroy(recv_message_payload_);
1672 grpc_slice_unref_internal(status_details_);
1673 GPR_ASSERT(call_ != nullptr);
1674 grpc_call_unref(call_);
1677 void XdsClient::ChannelState::LrsCallState::Orphan() {
1679 GPR_ASSERT(call_ != nullptr);
1680 // If we are here because xds_client wants to cancel the call,
1681 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1682 // we are here because xds_client has to orphan a failed call, then the
1683 // following cancellation will be a no-op.
1684 grpc_call_cancel_internal(call_);
1685 // Note that the initial ref is hold by on_status_received_. So the
1686 // corresponding unref happens in on_status_received_ instead of here.
1689 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1690 // Don't start again if already started.
1691 if (reporter_ != nullptr) return;
1692 // Don't start if the previous send_message op (of the initial request or the
1693 // last report of the previous reporter) hasn't completed.
1694 if (send_message_payload_ != nullptr) return;
1695 // Don't start if no LRS response has arrived.
1696 if (!seen_response()) return;
1697 // Don't start if the ADS call hasn't received any valid response. Note that
1698 // this must be the first channel because it is the current channel but its
1699 // ADS call hasn't seen any response.
1700 if (chand()->ads_calld_ == nullptr ||
1701 chand()->ads_calld_->calld() == nullptr ||
1702 !chand()->ads_calld_->calld()->seen_response()) {
1706 reporter_ = MakeOrphanable<Reporter>(
1707 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1710 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1711 void* arg, grpc_error_handle /*error*/) {
1712 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1714 MutexLock lock(&lrs_calld->xds_client()->mu_);
1715 lrs_calld->OnInitialRequestSentLocked();
1717 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1720 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1721 // Clear the send_message_payload_.
1722 grpc_byte_buffer_destroy(send_message_payload_);
1723 send_message_payload_ = nullptr;
1724 MaybeStartReportingLocked();
1727 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1728 void* arg, grpc_error_handle /*error*/) {
1729 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1732 MutexLock lock(&lrs_calld->xds_client()->mu_);
1733 done = lrs_calld->OnResponseReceivedLocked();
1735 if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1738 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1739 // Empty payload means the call was cancelled.
1740 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1743 // Read the response.
1744 grpc_byte_buffer_reader bbr;
1745 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1746 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1747 grpc_byte_buffer_reader_destroy(&bbr);
1748 grpc_byte_buffer_destroy(recv_message_payload_);
1749 recv_message_payload_ = nullptr;
1750 // This anonymous lambda is a hack to avoid the usage of goto.
1752 // Parse the response.
1753 bool send_all_clusters = false;
1754 std::set<std::string> new_cluster_names;
1755 grpc_millis new_load_reporting_interval;
1756 grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse(
1757 response_slice, &send_all_clusters, &new_cluster_names,
1758 &new_load_reporting_interval);
1759 if (parse_error != GRPC_ERROR_NONE) {
1761 "[xds_client %p] LRS response parsing failed. error=%s",
1762 xds_client(), grpc_error_std_string(parse_error).c_str());
1763 GRPC_ERROR_UNREF(parse_error);
1766 seen_response_ = true;
1767 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1770 "[xds_client %p] LRS response received, %" PRIuPTR
1771 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1773 xds_client(), new_cluster_names.size(), send_all_clusters,
1774 new_load_reporting_interval);
1776 for (const auto& name : new_cluster_names) {
1777 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1778 xds_client(), i++, name.c_str());
1781 if (new_load_reporting_interval <
1782 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1783 new_load_reporting_interval =
1784 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1785 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1787 "[xds_client %p] Increased load_report_interval to minimum "
1789 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1792 // Ignore identical update.
1793 if (send_all_clusters == send_all_clusters_ &&
1794 cluster_names_ == new_cluster_names &&
1795 load_reporting_interval_ == new_load_reporting_interval) {
1796 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1798 "[xds_client %p] Incoming LRS response identical to current, "
1804 // Stop current load reporting (if any) to adopt the new config.
1806 // Record the new config.
1807 send_all_clusters_ = send_all_clusters;
1808 cluster_names_ = std::move(new_cluster_names);
1809 load_reporting_interval_ = new_load_reporting_interval;
1810 // Try starting sending load report.
1811 MaybeStartReportingLocked();
1813 grpc_slice_unref_internal(response_slice);
1814 if (xds_client()->shutting_down_) return true;
1815 // Keep listening for LRS config updates.
1817 memset(&op, 0, sizeof(op));
1818 op.op = GRPC_OP_RECV_MESSAGE;
1819 op.data.recv_message.recv_message = &recv_message_payload_;
1821 op.reserved = nullptr;
1822 GPR_ASSERT(call_ != nullptr);
1823 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1824 const grpc_call_error call_error =
1825 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1826 GPR_ASSERT(GRPC_CALL_OK == call_error);
1830 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1831 void* arg, grpc_error_handle error) {
1832 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1834 MutexLock lock(&lrs_calld->xds_client()->mu_);
1835 lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1837 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1840 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1841 grpc_error_handle error) {
1842 GPR_ASSERT(call_ != nullptr);
1843 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1844 char* status_details = grpc_slice_to_c_string(status_details_);
1846 "[xds_client %p] LRS call status received. Status = %d, details "
1847 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1848 xds_client(), status_code_, status_details, chand(), this, call_,
1849 grpc_error_std_string(error).c_str());
1850 gpr_free(status_details);
1852 // Ignore status from a stale call.
1853 if (IsCurrentCallOnChannel()) {
1854 GPR_ASSERT(!xds_client()->shutting_down_);
1855 // Try to restart the call.
1856 parent_->OnCallFinishedLocked();
1858 GRPC_ERROR_UNREF(error);
1861 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1862 // If the retryable LRS call is null (which only happens when the xds channel
1863 // is shutting down), all the LRS calls are stale.
1864 if (chand()->lrs_calld_ == nullptr) return false;
1865 return this == chand()->lrs_calld_->calld();
1874 grpc_millis GetRequestTimeout(const grpc_channel_args* args) {
1875 return grpc_channel_args_find_integer(
1876 args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1877 {15000, 0, INT_MAX});
1880 grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
1881 absl::InlinedVector<grpc_arg, 2> args_to_add = {
1882 grpc_channel_arg_integer_create(
1883 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1884 5 * 60 * GPR_MS_PER_SEC),
1885 grpc_channel_arg_integer_create(
1886 const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1888 return grpc_channel_args_copy_and_add(args, args_to_add.data(),
1889 args_to_add.size());
1894 XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
1895 const grpc_channel_args* args)
1896 : DualRefCounted<XdsClient>(
1897 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1899 bootstrap_(std::move(bootstrap)),
1900 args_(ModifyChannelArgs(args)),
1901 request_timeout_(GetRequestTimeout(args)),
1902 interested_parties_(grpc_pollset_set_create()),
1903 certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1904 bootstrap_->certificate_providers())),
1905 api_(this, &grpc_xds_client_trace, bootstrap_->node(),
1906 &bootstrap_->certificate_providers()) {
1907 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1908 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1910 // Create ChannelState object.
1911 chand_ = MakeOrphanable<ChannelState>(
1912 WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1915 XdsClient::~XdsClient() {
1916 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1917 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1919 grpc_channel_args_destroy(args_);
1920 grpc_pollset_set_destroy(interested_parties_);
1923 void XdsClient::AddChannelzLinkage(
1924 channelz::ChannelNode* parent_channelz_node) {
1925 MutexLock lock(&mu_);
1926 channelz::ChannelNode* xds_channelz_node =
1927 grpc_channel_get_channelz_node(chand_->channel());
1928 if (xds_channelz_node != nullptr) {
1929 parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1933 void XdsClient::RemoveChannelzLinkage(
1934 channelz::ChannelNode* parent_channelz_node) {
1935 MutexLock lock(&mu_);
1936 channelz::ChannelNode* xds_channelz_node =
1937 grpc_channel_get_channelz_node(chand_->channel());
1938 if (xds_channelz_node != nullptr) {
1939 parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1943 void XdsClient::Orphan() {
1944 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1945 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1948 MutexLock lock(g_mu);
1949 if (g_xds_client == this) g_xds_client = nullptr;
1952 MutexLock lock(&mu_);
1953 shutting_down_ = true;
1954 // Orphan ChannelState object.
1956 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1957 // created by the XdsResolver because the maps contain refs for watchers
1958 // which in turn hold refs to the loadbalancing policies. At this point, it
1959 // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1960 // policies before those calls are done would lead to issues such as
1961 // https://github.com/grpc/grpc/issues/20928.
1962 if (!listener_map_.empty()) {
1963 cluster_map_.clear();
1964 endpoint_map_.clear();
1969 void XdsClient::WatchListenerData(
1970 absl::string_view listener_name,
1971 std::unique_ptr<ListenerWatcherInterface> watcher) {
1972 std::string listener_name_str = std::string(listener_name);
1973 MutexLock lock(&mu_);
1974 ListenerState& listener_state = listener_map_[listener_name_str];
1975 ListenerWatcherInterface* w = watcher.get();
1976 listener_state.watchers[w] = std::move(watcher);
1977 // If we've already received an LDS update, notify the new watcher
1979 if (listener_state.update.has_value()) {
1980 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1981 gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1982 this, listener_name_str.c_str());
1984 w->OnListenerChanged(*listener_state.update);
1986 chand_->SubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str);
1989 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1990 ListenerWatcherInterface* watcher,
1991 bool delay_unsubscription) {
1992 MutexLock lock(&mu_);
1993 if (shutting_down_) return;
1994 std::string listener_name_str = std::string(listener_name);
1995 ListenerState& listener_state = listener_map_[listener_name_str];
1996 auto it = listener_state.watchers.find(watcher);
1997 if (it != listener_state.watchers.end()) {
1998 listener_state.watchers.erase(it);
1999 if (listener_state.watchers.empty()) {
2000 listener_map_.erase(listener_name_str);
2001 chand_->UnsubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str,
2002 delay_unsubscription);
2007 void XdsClient::WatchRouteConfigData(
2008 absl::string_view route_config_name,
2009 std::unique_ptr<RouteConfigWatcherInterface> watcher) {
2010 std::string route_config_name_str = std::string(route_config_name);
2011 MutexLock lock(&mu_);
2012 RouteConfigState& route_config_state =
2013 route_config_map_[route_config_name_str];
2014 RouteConfigWatcherInterface* w = watcher.get();
2015 route_config_state.watchers[w] = std::move(watcher);
2016 // If we've already received an RDS update, notify the new watcher
2018 if (route_config_state.update.has_value()) {
2019 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2021 "[xds_client %p] returning cached route config data for %s", this,
2022 route_config_name_str.c_str());
2024 w->OnRouteConfigChanged(*route_config_state.update);
2026 chand_->SubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str);
2029 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
2030 RouteConfigWatcherInterface* watcher,
2031 bool delay_unsubscription) {
2032 MutexLock lock(&mu_);
2033 if (shutting_down_) return;
2034 std::string route_config_name_str = std::string(route_config_name);
2035 RouteConfigState& route_config_state =
2036 route_config_map_[route_config_name_str];
2037 auto it = route_config_state.watchers.find(watcher);
2038 if (it != route_config_state.watchers.end()) {
2039 route_config_state.watchers.erase(it);
2040 if (route_config_state.watchers.empty()) {
2041 route_config_map_.erase(route_config_name_str);
2042 chand_->UnsubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str,
2043 delay_unsubscription);
2048 void XdsClient::WatchClusterData(
2049 absl::string_view cluster_name,
2050 std::unique_ptr<ClusterWatcherInterface> watcher) {
2051 std::string cluster_name_str = std::string(cluster_name);
2052 MutexLock lock(&mu_);
2053 ClusterState& cluster_state = cluster_map_[cluster_name_str];
2054 ClusterWatcherInterface* w = watcher.get();
2055 cluster_state.watchers[w] = std::move(watcher);
2056 // If we've already received a CDS update, notify the new watcher
2058 if (cluster_state.update.has_value()) {
2059 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2060 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
2061 this, cluster_name_str.c_str());
2063 w->OnClusterChanged(cluster_state.update.value());
2065 chand_->SubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str);
2068 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
2069 ClusterWatcherInterface* watcher,
2070 bool delay_unsubscription) {
2071 MutexLock lock(&mu_);
2072 if (shutting_down_) return;
2073 std::string cluster_name_str = std::string(cluster_name);
2074 ClusterState& cluster_state = cluster_map_[cluster_name_str];
2075 auto it = cluster_state.watchers.find(watcher);
2076 if (it != cluster_state.watchers.end()) {
2077 cluster_state.watchers.erase(it);
2078 if (cluster_state.watchers.empty()) {
2079 cluster_map_.erase(cluster_name_str);
2080 chand_->UnsubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str,
2081 delay_unsubscription);
2086 void XdsClient::WatchEndpointData(
2087 absl::string_view eds_service_name,
2088 std::unique_ptr<EndpointWatcherInterface> watcher) {
2089 std::string eds_service_name_str = std::string(eds_service_name);
2090 MutexLock lock(&mu_);
2091 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
2092 EndpointWatcherInterface* w = watcher.get();
2093 endpoint_state.watchers[w] = std::move(watcher);
2094 // If we've already received an EDS update, notify the new watcher
2096 if (endpoint_state.update.has_value()) {
2097 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2098 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
2099 this, eds_service_name_str.c_str());
2101 w->OnEndpointChanged(endpoint_state.update.value());
2103 chand_->SubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str);
2106 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
2107 EndpointWatcherInterface* watcher,
2108 bool delay_unsubscription) {
2109 MutexLock lock(&mu_);
2110 if (shutting_down_) return;
2111 std::string eds_service_name_str = std::string(eds_service_name);
2112 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
2113 auto it = endpoint_state.watchers.find(watcher);
2114 if (it != endpoint_state.watchers.end()) {
2115 endpoint_state.watchers.erase(it);
2116 if (endpoint_state.watchers.empty()) {
2117 endpoint_map_.erase(eds_service_name_str);
2118 chand_->UnsubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str,
2119 delay_unsubscription);
2124 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
2125 absl::string_view lrs_server, absl::string_view cluster_name,
2126 absl::string_view eds_service_name) {
2127 // TODO(roth): When we add support for direct federation, use the
2128 // server name specified in lrs_server.
2130 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2131 MutexLock lock(&mu_);
2132 // We jump through some hoops here to make sure that the absl::string_views
2133 // stored in the XdsClusterDropStats object point to the strings
2134 // in the load_report_map_ key, so that they have the same lifetime.
2135 auto it = load_report_map_
2136 .emplace(std::make_pair(std::move(key), LoadReportState()))
2138 LoadReportState& load_report_state = it->second;
2139 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
2140 if (load_report_state.drop_stats != nullptr) {
2141 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
2143 if (cluster_drop_stats == nullptr) {
2144 if (load_report_state.drop_stats != nullptr) {
2145 load_report_state.deleted_drop_stats +=
2146 load_report_state.drop_stats->GetSnapshotAndReset();
2148 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
2149 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
2150 it->first.first /*cluster_name*/,
2151 it->first.second /*eds_service_name*/);
2152 load_report_state.drop_stats = cluster_drop_stats.get();
2154 chand_->MaybeStartLrsCall();
2155 return cluster_drop_stats;
2158 void XdsClient::RemoveClusterDropStats(
2159 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2160 absl::string_view eds_service_name,
2161 XdsClusterDropStats* cluster_drop_stats) {
2162 MutexLock lock(&mu_);
2163 // TODO(roth): When we add support for direct federation, use the
2164 // server name specified in lrs_server.
2165 auto it = load_report_map_.find(
2166 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2167 if (it == load_report_map_.end()) return;
2168 LoadReportState& load_report_state = it->second;
2169 if (load_report_state.drop_stats == cluster_drop_stats) {
2170 // Record final snapshot in deleted_drop_stats, which will be
2171 // added to the next load report.
2172 load_report_state.deleted_drop_stats +=
2173 load_report_state.drop_stats->GetSnapshotAndReset();
2174 load_report_state.drop_stats = nullptr;
2178 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2179 absl::string_view lrs_server, absl::string_view cluster_name,
2180 absl::string_view eds_service_name,
2181 RefCountedPtr<XdsLocalityName> locality) {
2182 // TODO(roth): When we add support for direct federation, use the
2183 // server name specified in lrs_server.
2185 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2186 MutexLock lock(&mu_);
2187 // We jump through some hoops here to make sure that the absl::string_views
2188 // stored in the XdsClusterLocalityStats object point to the strings
2189 // in the load_report_map_ key, so that they have the same lifetime.
2190 auto it = load_report_map_
2191 .emplace(std::make_pair(std::move(key), LoadReportState()))
2193 LoadReportState& load_report_state = it->second;
2194 LoadReportState::LocalityState& locality_state =
2195 load_report_state.locality_stats[locality];
2196 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2197 if (locality_state.locality_stats != nullptr) {
2198 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2200 if (cluster_locality_stats == nullptr) {
2201 if (locality_state.locality_stats != nullptr) {
2202 locality_state.deleted_locality_stats +=
2203 locality_state.locality_stats->GetSnapshotAndReset();
2205 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2206 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2207 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2208 std::move(locality));
2209 locality_state.locality_stats = cluster_locality_stats.get();
2211 chand_->MaybeStartLrsCall();
2212 return cluster_locality_stats;
2215 void XdsClient::RemoveClusterLocalityStats(
2216 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2217 absl::string_view eds_service_name,
2218 const RefCountedPtr<XdsLocalityName>& locality,
2219 XdsClusterLocalityStats* cluster_locality_stats) {
2220 MutexLock lock(&mu_);
2221 // TODO(roth): When we add support for direct federation, use the
2222 // server name specified in lrs_server.
2223 auto it = load_report_map_.find(
2224 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2225 if (it == load_report_map_.end()) return;
2226 LoadReportState& load_report_state = it->second;
2227 auto locality_it = load_report_state.locality_stats.find(locality);
2228 if (locality_it == load_report_state.locality_stats.end()) return;
2229 LoadReportState::LocalityState& locality_state = locality_it->second;
2230 if (locality_state.locality_stats == cluster_locality_stats) {
2231 // Record final snapshot in deleted_locality_stats, which will be
2232 // added to the next load report.
2233 locality_state.deleted_locality_stats +=
2234 locality_state.locality_stats->GetSnapshotAndReset();
2235 locality_state.locality_stats = nullptr;
2239 void XdsClient::ResetBackoff() {
2240 MutexLock lock(&mu_);
2241 if (chand_ != nullptr) {
2242 grpc_channel_reset_connect_backoff(chand_->channel());
2246 void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
2247 for (const auto& p : listener_map_) {
2248 const ListenerState& listener_state = p.second;
2249 for (const auto& p : listener_state.watchers) {
2250 p.first->OnError(GRPC_ERROR_REF(error));
2253 for (const auto& p : route_config_map_) {
2254 const RouteConfigState& route_config_state = p.second;
2255 for (const auto& p : route_config_state.watchers) {
2256 p.first->OnError(GRPC_ERROR_REF(error));
2259 for (const auto& p : cluster_map_) {
2260 const ClusterState& cluster_state = p.second;
2261 for (const auto& p : cluster_state.watchers) {
2262 p.first->OnError(GRPC_ERROR_REF(error));
2265 for (const auto& p : endpoint_map_) {
2266 const EndpointState& endpoint_state = p.second;
2267 for (const auto& p : endpoint_state.watchers) {
2268 p.first->OnError(GRPC_ERROR_REF(error));
2271 GRPC_ERROR_UNREF(error);
2274 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2275 bool send_all_clusters, const std::set<std::string>& clusters) {
2276 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2277 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2279 XdsApi::ClusterLoadReportMap snapshot_map;
2280 for (auto load_report_it = load_report_map_.begin();
2281 load_report_it != load_report_map_.end();) {
2282 // Cluster key is cluster and EDS service name.
2283 const auto& cluster_key = load_report_it->first;
2284 LoadReportState& load_report = load_report_it->second;
2285 // If the CDS response for a cluster indicates to use LRS but the
2286 // LRS server does not say that it wants reports for this cluster,
2287 // then we'll have stats objects here whose data we're not going to
2288 // include in the load report. However, we still need to clear out
2289 // the data from the stats objects, so that if the LRS server starts
2290 // asking for the data in the future, we don't incorrectly include
2291 // data from previous reporting intervals in that future report.
2292 const bool record_stats =
2293 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2294 XdsApi::ClusterLoadReport snapshot;
2295 // Aggregate drop stats.
2296 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2297 if (load_report.drop_stats != nullptr) {
2298 snapshot.dropped_requests +=
2299 load_report.drop_stats->GetSnapshotAndReset();
2300 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2302 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2303 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2304 load_report.drop_stats);
2307 // Aggregate locality stats.
2308 for (auto it = load_report.locality_stats.begin();
2309 it != load_report.locality_stats.end();) {
2310 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2311 auto& locality_state = it->second;
2312 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2313 snapshot.locality_stats[locality_name];
2314 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2315 if (locality_state.locality_stats != nullptr) {
2316 locality_snapshot +=
2317 locality_state.locality_stats->GetSnapshotAndReset();
2318 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2320 "[xds_client %p] cluster=%s eds_service_name=%s "
2321 "locality=%s locality_stats=%p",
2322 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2323 locality_name->AsHumanReadableString().c_str(),
2324 locality_state.locality_stats);
2327 // If the only thing left in this entry was final snapshots from
2328 // deleted locality stats objects, remove the entry.
2329 if (locality_state.locality_stats == nullptr) {
2330 it = load_report.locality_stats.erase(it);
2335 // Compute load report interval.
2336 const grpc_millis now = ExecCtx::Get()->Now();
2337 snapshot.load_report_interval = now - load_report.last_report_time;
2338 load_report.last_report_time = now;
2341 snapshot_map[cluster_key] = std::move(snapshot);
2343 // If the only thing left in this entry was final snapshots from
2344 // deleted stats objects, remove the entry.
2345 if (load_report.locality_stats.empty() &&
2346 load_report.drop_stats == nullptr) {
2347 load_report_it = load_report_map_.erase(load_report_it);
2352 return snapshot_map;
2355 std::string XdsClient::DumpClientConfigBinary() {
2356 MutexLock lock(&mu_);
2357 XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2358 // Update per-xds-type version if available, this version corresponding to the
2359 // last successful ADS update version.
2360 for (auto& p : resource_version_map_) {
2361 resource_type_metadata_map[p.first].version = p.second;
2363 // Collect resource metadata from listeners
2365 resource_type_metadata_map[XdsApi::kLdsTypeUrl].resource_metadata_map;
2366 for (auto& p : listener_map_) {
2367 lds_map[p.first] = &p.second.meta;
2369 // Collect resource metadata from route configs
2371 resource_type_metadata_map[XdsApi::kRdsTypeUrl].resource_metadata_map;
2372 for (auto& p : route_config_map_) {
2373 rds_map[p.first] = &p.second.meta;
2375 // Collect resource metadata from clusters
2377 resource_type_metadata_map[XdsApi::kCdsTypeUrl].resource_metadata_map;
2378 for (auto& p : cluster_map_) {
2379 cds_map[p.first] = &p.second.meta;
2381 // Collect resource metadata from endpoints
2383 resource_type_metadata_map[XdsApi::kEdsTypeUrl].resource_metadata_map;
2384 for (auto& p : endpoint_map_) {
2385 eds_map[p.first] = &p.second.meta;
2387 // Assemble config dump messages
2388 return api_.AssembleClientConfig(resource_type_metadata_map);
2392 // accessors for global state
2395 void XdsClientGlobalInit() {
2397 XdsHttpFilterRegistry::Init();
2400 // TODO(roth): Find a better way to clear the fallback config that does
2401 // not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
2402 void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
2403 gpr_free(g_fallback_bootstrap_config);
2404 g_fallback_bootstrap_config = nullptr;
2407 XdsHttpFilterRegistry::Shutdown();
2412 std::string GetBootstrapContents(const char* fallback_config,
2413 grpc_error_handle* error) {
2414 // First, try GRPC_XDS_BOOTSTRAP env var.
2415 grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
2416 if (path != nullptr) {
2417 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2419 "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
2420 "environment variable: %s",
2423 grpc_slice contents;
2425 grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
2426 if (*error != GRPC_ERROR_NONE) return "";
2427 std::string contents_str(StringViewFromSlice(contents));
2428 grpc_slice_unref_internal(contents);
2429 return contents_str;
2431 // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
2432 grpc_core::UniquePtr<char> env_config(
2433 gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
2434 if (env_config != nullptr) {
2435 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2437 "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
2438 "environment variable");
2440 return env_config.get();
2442 // Finally, try fallback config.
2443 if (fallback_config != nullptr) {
2444 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2445 gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
2447 return fallback_config;
2449 // No bootstrap config found.
2450 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2451 "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
2458 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(const grpc_channel_args* args,
2459 grpc_error_handle* error) {
2460 RefCountedPtr<XdsClient> xds_client;
2461 // If getting bootstrap from channel args, create a local XdsClient
2462 // instance for the channel or server instead of using the global instance.
2463 const char* bootstrap_config = grpc_channel_args_find_string(
2464 args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
2465 if (bootstrap_config != nullptr) {
2466 std::unique_ptr<XdsBootstrap> bootstrap =
2467 XdsBootstrap::Create(bootstrap_config, error);
2468 if (*error == GRPC_ERROR_NONE) {
2469 grpc_channel_args* xds_channel_args =
2470 grpc_channel_args_find_pointer<grpc_channel_args>(
2472 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
2473 return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
2477 // Otherwise, use the global instance.
2479 MutexLock lock(g_mu);
2480 if (g_xds_client != nullptr) {
2481 auto xds_client = g_xds_client->RefIfNonZero();
2482 if (xds_client != nullptr) return xds_client;
2484 // Find bootstrap contents.
2485 std::string bootstrap_contents =
2486 GetBootstrapContents(g_fallback_bootstrap_config, error);
2487 if (*error != GRPC_ERROR_NONE) return nullptr;
2488 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2489 gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
2490 bootstrap_contents.c_str());
2493 std::unique_ptr<XdsBootstrap> bootstrap =
2494 XdsBootstrap::Create(bootstrap_contents, error);
2495 if (*error != GRPC_ERROR_NONE) return nullptr;
2496 // Instantiate XdsClient.
2498 MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
2499 g_xds_client = xds_client.get();
2504 namespace internal {
2506 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2507 MutexLock lock(g_mu);
2508 g_channel_args = args;
2511 void UnsetGlobalXdsClientForTest() {
2512 MutexLock lock(g_mu);
2513 g_xds_client = nullptr;
2516 void SetXdsFallbackBootstrapConfig(const char* config) {
2517 MutexLock lock(g_mu);
2518 gpr_free(g_fallback_bootstrap_config);
2519 g_fallback_bootstrap_config = gpr_strdup(config);
2522 } // namespace internal
2525 // embedding XdsClient in channel args
2528 #define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
2532 void* XdsClientArgCopy(void* p) {
2533 XdsClient* xds_client = static_cast<XdsClient*>(p);
2534 xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2538 void XdsClientArgDestroy(void* p) {
2539 XdsClient* xds_client = static_cast<XdsClient*>(p);
2540 xds_client->Unref(DEBUG_LOCATION, "channel arg");
2543 int XdsClientArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
2545 const grpc_arg_pointer_vtable kXdsClientArgVtable = {
2546 XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
2550 grpc_arg XdsClient::MakeChannelArg() const {
2551 return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2552 const_cast<XdsClient*>(this),
2553 &kXdsClientArgVtable);
2556 RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
2557 const grpc_channel_args& args) {
2558 XdsClient* xds_client =
2559 grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2560 if (xds_client == nullptr) return nullptr;
2561 return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
2564 } // namespace grpc_core
2566 // The returned bytes may contain NULL(0), so we can't use c-string.
2567 grpc_slice grpc_dump_xds_configs() {
2568 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2569 grpc_core::ExecCtx exec_ctx;
2570 grpc_error_handle error = GRPC_ERROR_NONE;
2571 auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
2572 if (error != GRPC_ERROR_NONE) {
2573 // If we isn't using xDS, just return an empty string.
2574 GRPC_ERROR_UNREF(error);
2575 return grpc_empty_slice();
2577 return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary());