3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/map.h"
46 #include "src/core/lib/gprpp/memory.h"
47 #include "src/core/lib/gprpp/orphanable.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/iomgr/sockaddr.h"
51 #include "src/core/lib/iomgr/sockaddr_utils.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/security/credentials/credentials.h"
54 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
55 #include "src/core/lib/slice/slice_internal.h"
56 #include "src/core/lib/slice/slice_string_helpers.h"
57 #include "src/core/lib/surface/call.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/surface/channel_init.h"
60 #include "src/core/lib/transport/static_metadata.h"
62 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
63 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
64 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
65 #define GRPC_XDS_RECONNECT_JITTER 0.2
66 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
70 TraceFlag grpc_xds_client_trace(false, "xds_client");
74 Mutex* g_mu = nullptr;
75 const grpc_channel_args* g_channel_args = nullptr;
76 XdsClient* g_xds_client = nullptr;
81 // Internal class declarations
84 // An xds call wrapper that can restart a call upon failure. Holds a ref to
85 // the xds channel. The template parameter is the kind of wrapped xds call.
87 class XdsClient::ChannelState::RetryableCall
88 : public InternallyRefCounted<RetryableCall<T>> {
90 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
92 void Orphan() override;
94 void OnCallFinishedLocked();
96 T* calld() const { return calld_.get(); }
97 ChannelState* chand() const { return chand_.get(); }
99 bool IsCurrentCallOnChannel() const;
102 void StartNewCallLocked();
103 void StartRetryTimerLocked();
104 static void OnRetryTimer(void* arg, grpc_error* error);
105 void OnRetryTimerLocked(grpc_error* error);
107 // The wrapped xds call that talks to the xds server. It's instantiated
108 // every time we start a new call. It's null during call retry backoff.
109 OrphanablePtr<T> calld_;
110 // The owning xds channel.
111 RefCountedPtr<ChannelState> chand_;
115 grpc_timer retry_timer_;
116 grpc_closure on_retry_timer_;
117 bool retry_timer_callback_pending_ = false;
119 bool shutting_down_ = false;
122 // Contains an ADS call to the xds server.
123 class XdsClient::ChannelState::AdsCallState
124 : public InternallyRefCounted<AdsCallState> {
126 // The ctor and dtor should not be used directly.
127 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
128 ~AdsCallState() override;
130 void Orphan() override;
132 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
133 ChannelState* chand() const { return parent_->chand(); }
134 XdsClient* xds_client() const { return chand()->xds_client(); }
135 bool seen_response() const { return seen_response_; }
137 void Subscribe(const std::string& type_url, const std::string& name);
138 void Unsubscribe(const std::string& type_url, const std::string& name,
139 bool delay_unsubscription);
141 bool HasSubscribedResources() const;
144 class ResourceState : public InternallyRefCounted<ResourceState> {
146 ResourceState(const std::string& type_url, const std::string& name)
147 : type_url_(type_url), name_(name) {
148 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
149 grpc_schedule_on_exec_ctx);
152 void Orphan() override {
154 Unref(DEBUG_LOCATION, "Orphan");
157 void Start(RefCountedPtr<AdsCallState> ads_calld) {
160 ads_calld_ = std::move(ads_calld);
161 Ref(DEBUG_LOCATION, "timer").release();
162 timer_pending_ = true;
165 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
170 if (timer_pending_) {
171 grpc_timer_cancel(&timer_);
172 timer_pending_ = false;
177 static void OnTimer(void* arg, grpc_error* error) {
178 ResourceState* self = static_cast<ResourceState*>(arg);
180 MutexLock lock(&self->ads_calld_->xds_client()->mu_);
181 self->OnTimerLocked(GRPC_ERROR_REF(error));
183 self->ads_calld_.reset();
184 self->Unref(DEBUG_LOCATION, "timer");
187 void OnTimerLocked(grpc_error* error) {
188 if (error == GRPC_ERROR_NONE && timer_pending_) {
189 timer_pending_ = false;
190 grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
192 "timeout obtaining resource {type=%s name=%s} from xds server",
195 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
196 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
197 grpc_error_string(watcher_error));
199 if (type_url_ == XdsApi::kLdsTypeUrl) {
200 ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
201 for (const auto& p : state.watchers) {
202 p.first->OnError(GRPC_ERROR_REF(watcher_error));
204 } else if (type_url_ == XdsApi::kRdsTypeUrl) {
205 RouteConfigState& state =
206 ads_calld_->xds_client()->route_config_map_[name_];
207 for (const auto& p : state.watchers) {
208 p.first->OnError(GRPC_ERROR_REF(watcher_error));
210 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
211 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
212 for (const auto& p : state.watchers) {
213 p.first->OnError(GRPC_ERROR_REF(watcher_error));
215 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
216 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
217 for (const auto& p : state.watchers) {
218 p.first->OnError(GRPC_ERROR_REF(watcher_error));
221 GPR_UNREACHABLE_CODE(return );
223 GRPC_ERROR_UNREF(watcher_error);
225 GRPC_ERROR_UNREF(error);
228 const std::string type_url_;
229 const std::string name_;
231 RefCountedPtr<AdsCallState> ads_calld_;
233 bool timer_pending_ = false;
235 grpc_closure timer_callback_;
238 struct ResourceTypeState {
239 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
241 // Version, nonce, and error for this resource type.
244 grpc_error* error = GRPC_ERROR_NONE;
246 // Subscribed resources of this type.
247 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
248 subscribed_resources;
251 void SendMessageLocked(const std::string& type_url);
253 void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
254 void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
255 void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
256 void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
258 static void OnRequestSent(void* arg, grpc_error* error);
259 void OnRequestSentLocked(grpc_error* error);
260 static void OnResponseReceived(void* arg, grpc_error* error);
261 bool OnResponseReceivedLocked();
262 static void OnStatusReceived(void* arg, grpc_error* error);
263 void OnStatusReceivedLocked(grpc_error* error);
265 bool IsCurrentCallOnChannel() const;
267 std::set<absl::string_view> ResourceNamesForRequest(
268 const std::string& type_url);
270 // The owning RetryableCall<>.
271 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
273 bool sent_initial_message_ = false;
274 bool seen_response_ = false;
279 // recv_initial_metadata
280 grpc_metadata_array initial_metadata_recv_;
283 grpc_byte_buffer* send_message_payload_ = nullptr;
284 grpc_closure on_request_sent_;
287 grpc_byte_buffer* recv_message_payload_ = nullptr;
288 grpc_closure on_response_received_;
290 // recv_trailing_metadata
291 grpc_metadata_array trailing_metadata_recv_;
292 grpc_status_code status_code_;
293 grpc_slice status_details_;
294 grpc_closure on_status_received_;
296 // Resource types for which requests need to be sent.
297 std::set<std::string /*type_url*/> buffered_requests_;
299 // State for each resource type.
300 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
303 // Contains an LRS call to the xds server.
304 class XdsClient::ChannelState::LrsCallState
305 : public InternallyRefCounted<LrsCallState> {
307 // The ctor and dtor should not be used directly.
308 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
309 ~LrsCallState() override;
311 void Orphan() override;
313 void MaybeStartReportingLocked();
315 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
316 ChannelState* chand() const { return parent_->chand(); }
317 XdsClient* xds_client() const { return chand()->xds_client(); }
318 bool seen_response() const { return seen_response_; }
321 // Reports client-side load stats according to a fixed interval.
322 class Reporter : public InternallyRefCounted<Reporter> {
324 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
325 : parent_(std::move(parent)), report_interval_(report_interval) {
326 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
327 grpc_schedule_on_exec_ctx);
328 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
329 grpc_schedule_on_exec_ctx);
330 ScheduleNextReportLocked();
333 void Orphan() override;
336 void ScheduleNextReportLocked();
337 static void OnNextReportTimer(void* arg, grpc_error* error);
338 bool OnNextReportTimerLocked(grpc_error* error);
339 void SendReportLocked();
340 static void OnReportDone(void* arg, grpc_error* error);
341 bool OnReportDoneLocked(grpc_error* error);
343 bool IsCurrentReporterOnCall() const {
344 return this == parent_->reporter_.get();
346 XdsClient* xds_client() const { return parent_->xds_client(); }
348 // The owning LRS call.
349 RefCountedPtr<LrsCallState> parent_;
351 // The load reporting state.
352 const grpc_millis report_interval_;
353 bool last_report_counters_were_zero_ = false;
354 bool next_report_timer_callback_pending_ = false;
355 grpc_timer next_report_timer_;
356 grpc_closure on_next_report_timer_;
357 grpc_closure on_report_done_;
360 static void OnInitialRequestSent(void* arg, grpc_error* error);
361 void OnInitialRequestSentLocked();
362 static void OnResponseReceived(void* arg, grpc_error* error);
363 bool OnResponseReceivedLocked();
364 static void OnStatusReceived(void* arg, grpc_error* error);
365 void OnStatusReceivedLocked(grpc_error* error);
367 bool IsCurrentCallOnChannel() const;
369 // The owning RetryableCall<>.
370 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
371 bool seen_response_ = false;
376 // recv_initial_metadata
377 grpc_metadata_array initial_metadata_recv_;
380 grpc_byte_buffer* send_message_payload_ = nullptr;
381 grpc_closure on_initial_request_sent_;
384 grpc_byte_buffer* recv_message_payload_ = nullptr;
385 grpc_closure on_response_received_;
387 // recv_trailing_metadata
388 grpc_metadata_array trailing_metadata_recv_;
389 grpc_status_code status_code_;
390 grpc_slice status_details_;
391 grpc_closure on_status_received_;
393 // Load reporting state.
394 bool send_all_clusters_ = false;
395 std::set<std::string> cluster_names_; // Asked for by the LRS server.
396 grpc_millis load_reporting_interval_ = 0;
397 OrphanablePtr<Reporter> reporter_;
401 // XdsClient::ChannelState::StateWatcher
404 class XdsClient::ChannelState::StateWatcher
405 : public AsyncConnectivityStateWatcherInterface {
407 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
408 : parent_(std::move(parent)) {}
411 void OnConnectivityStateChange(grpc_connectivity_state new_state,
412 const absl::Status& status) override {
413 MutexLock lock(&parent_->xds_client_->mu_);
414 if (!parent_->shutting_down_ &&
415 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
416 // In TRANSIENT_FAILURE. Notify all watchers of error.
418 "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
419 "status_message:(%s)",
420 parent_->xds_client(), status.ToString().c_str());
421 parent_->xds_client()->NotifyOnErrorLocked(
422 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
423 "xds channel in TRANSIENT_FAILURE"));
427 RefCountedPtr<ChannelState> parent_;
431 // XdsClient::ChannelState
434 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
435 grpc_channel* channel)
436 : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
437 xds_client_(std::move(xds_client)),
439 GPR_ASSERT(channel_ != nullptr);
440 StartConnectivityWatchLocked();
443 XdsClient::ChannelState::~ChannelState() {
444 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
445 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
448 grpc_channel_destroy(channel_);
449 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
452 void XdsClient::ChannelState::Orphan() {
453 shutting_down_ = true;
454 CancelConnectivityWatchLocked();
457 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
460 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
462 return ads_calld_->calld();
465 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
467 return lrs_calld_->calld();
470 bool XdsClient::ChannelState::HasActiveAdsCall() const {
471 return ads_calld_->calld() != nullptr;
474 void XdsClient::ChannelState::MaybeStartLrsCall() {
475 if (lrs_calld_ != nullptr) return;
477 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
480 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
482 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
483 grpc_channel_element* client_channel_elem =
484 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
485 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
486 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
487 grpc_client_channel_start_connectivity_watch(
488 client_channel_elem, GRPC_CHANNEL_IDLE,
489 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
492 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
493 grpc_channel_element* client_channel_elem =
494 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
495 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
496 grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
499 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
500 const std::string& name) {
501 if (ads_calld_ == nullptr) {
502 // Start the ADS call if this is the first request.
503 ads_calld_.reset(new RetryableCall<AdsCallState>(
504 Ref(DEBUG_LOCATION, "ChannelState+ads")));
505 // Note: AdsCallState's ctor will automatically subscribe to all
506 // resources that the XdsClient already has watchers for, so we can
510 // If the ADS call is in backoff state, we don't need to do anything now
511 // because when the call is restarted it will resend all necessary requests.
512 if (ads_calld() == nullptr) return;
513 // Subscribe to this resource if the ADS call is active.
514 ads_calld()->Subscribe(type_url, name);
517 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
518 const std::string& name,
519 bool delay_unsubscription) {
520 if (ads_calld_ != nullptr) {
521 auto* calld = ads_calld_->calld();
522 if (calld != nullptr) {
523 calld->Unsubscribe(type_url, name, delay_unsubscription);
524 if (!calld->HasSubscribedResources()) ads_calld_.reset();
530 // XdsClient::ChannelState::RetryableCall<>
533 template <typename T>
534 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
535 RefCountedPtr<ChannelState> chand)
536 : chand_(std::move(chand)),
539 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
541 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
542 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
543 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
544 // Closure Initialization
545 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
546 grpc_schedule_on_exec_ctx);
547 StartNewCallLocked();
550 template <typename T>
551 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
552 shutting_down_ = true;
554 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
555 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
558 template <typename T>
559 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
560 const bool seen_response = calld_->seen_response();
563 // If we lost connection to the xds server, reset backoff and restart the
566 StartNewCallLocked();
568 // If we failed to connect to the xds server, retry later.
569 StartRetryTimerLocked();
573 template <typename T>
574 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
575 if (shutting_down_) return;
576 GPR_ASSERT(chand_->channel_ != nullptr);
577 GPR_ASSERT(calld_ == nullptr);
578 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
580 "[xds_client %p] Start new call from retryable call (chand: %p, "
581 "retryable call: %p)",
582 chand()->xds_client(), chand(), this);
584 calld_ = MakeOrphanable<T>(
585 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
588 template <typename T>
589 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
590 if (shutting_down_) return;
591 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
592 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
593 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
595 "[xds_client %p] Failed to connect to xds server (chand: %p) "
596 "retry timer will fire in %" PRId64 "ms.",
597 chand()->xds_client(), chand(), timeout);
599 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
600 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
601 retry_timer_callback_pending_ = true;
604 template <typename T>
605 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
606 void* arg, grpc_error* error) {
607 RetryableCall* calld = static_cast<RetryableCall*>(arg);
609 MutexLock lock(&calld->chand_->xds_client()->mu_);
610 calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
612 calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
615 template <typename T>
616 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
618 retry_timer_callback_pending_ = false;
619 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
620 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
623 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
624 chand()->xds_client(), chand(), this);
626 StartNewCallLocked();
628 GRPC_ERROR_UNREF(error);
632 // XdsClient::ChannelState::AdsCallState
635 XdsClient::ChannelState::AdsCallState::AdsCallState(
636 RefCountedPtr<RetryableCall<AdsCallState>> parent)
637 : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
638 parent_(std::move(parent)) {
639 // Init the ADS call. Note that the call will progress every time there's
640 // activity in xds_client()->interested_parties_, which is comprised of
641 // the polling entities from client_channel.
642 GPR_ASSERT(xds_client() != nullptr);
643 // Create a call with the specified method name.
645 xds_client()->bootstrap_->server().ShouldUseV3()
646 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
647 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
648 call_ = grpc_channel_create_pollset_set_call(
649 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
650 xds_client()->interested_parties_, method, nullptr,
651 GRPC_MILLIS_INF_FUTURE, nullptr);
652 GPR_ASSERT(call_ != nullptr);
653 // Init data associated with the call.
654 grpc_metadata_array_init(&initial_metadata_recv_);
655 grpc_metadata_array_init(&trailing_metadata_recv_);
657 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
659 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
661 xds_client(), chand(), this, call_);
664 grpc_call_error call_error;
666 memset(ops, 0, sizeof(ops));
667 // Op: send initial metadata.
669 op->op = GRPC_OP_SEND_INITIAL_METADATA;
670 op->data.send_initial_metadata.count = 0;
671 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
672 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
673 op->reserved = nullptr;
675 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
677 GPR_ASSERT(GRPC_CALL_OK == call_error);
678 // Op: send request message.
679 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
680 grpc_schedule_on_exec_ctx);
681 for (const auto& p : xds_client()->listener_map_) {
682 Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
684 for (const auto& p : xds_client()->route_config_map_) {
685 Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
687 for (const auto& p : xds_client()->cluster_map_) {
688 Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
690 for (const auto& p : xds_client()->endpoint_map_) {
691 Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
693 // Op: recv initial metadata.
695 op->op = GRPC_OP_RECV_INITIAL_METADATA;
696 op->data.recv_initial_metadata.recv_initial_metadata =
697 &initial_metadata_recv_;
699 op->reserved = nullptr;
701 // Op: recv response.
702 op->op = GRPC_OP_RECV_MESSAGE;
703 op->data.recv_message.recv_message = &recv_message_payload_;
705 op->reserved = nullptr;
707 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
708 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
709 grpc_schedule_on_exec_ctx);
710 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
711 &on_response_received_);
712 GPR_ASSERT(GRPC_CALL_OK == call_error);
713 // Op: recv server status.
715 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
716 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
717 op->data.recv_status_on_client.status = &status_code_;
718 op->data.recv_status_on_client.status_details = &status_details_;
720 op->reserved = nullptr;
722 // This callback signals the end of the call, so it relies on the initial
723 // ref instead of a new ref. When it's invoked, it's the initial ref that is
725 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
726 grpc_schedule_on_exec_ctx);
727 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
728 &on_status_received_);
729 GPR_ASSERT(GRPC_CALL_OK == call_error);
732 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
733 grpc_metadata_array_destroy(&initial_metadata_recv_);
734 grpc_metadata_array_destroy(&trailing_metadata_recv_);
735 grpc_byte_buffer_destroy(send_message_payload_);
736 grpc_byte_buffer_destroy(recv_message_payload_);
737 grpc_slice_unref_internal(status_details_);
738 GPR_ASSERT(call_ != nullptr);
739 grpc_call_unref(call_);
742 void XdsClient::ChannelState::AdsCallState::Orphan() {
743 GPR_ASSERT(call_ != nullptr);
744 // If we are here because xds_client wants to cancel the call,
745 // on_status_received_ will complete the cancellation and clean up. Otherwise,
746 // we are here because xds_client has to orphan a failed call, then the
747 // following cancellation will be a no-op.
748 grpc_call_cancel_internal(call_);
750 // Note that the initial ref is hold by on_status_received_. So the
751 // corresponding unref happens in on_status_received_ instead of here.
754 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
755 const std::string& type_url) {
756 // Buffer message sending if an existing message is in flight.
757 if (send_message_payload_ != nullptr) {
758 buffered_requests_.insert(type_url);
761 auto& state = state_map_[type_url];
762 grpc_slice request_payload_slice;
763 std::set<absl::string_view> resource_names =
764 ResourceNamesForRequest(type_url);
765 request_payload_slice = xds_client()->api_.CreateAdsRequest(
766 type_url, resource_names, state.version, state.nonce,
767 GRPC_ERROR_REF(state.error), !sent_initial_message_);
768 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
769 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
770 state_map_.erase(type_url);
772 sent_initial_message_ = true;
773 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
775 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
776 "error=%s resources=%s",
777 xds_client(), type_url.c_str(), state.version.c_str(),
778 state.nonce.c_str(), grpc_error_string(state.error),
779 absl::StrJoin(resource_names, " ").c_str());
781 GRPC_ERROR_UNREF(state.error);
782 state.error = GRPC_ERROR_NONE;
783 // Create message payload.
784 send_message_payload_ =
785 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
786 grpc_slice_unref_internal(request_payload_slice);
789 memset(&op, 0, sizeof(op));
790 op.op = GRPC_OP_SEND_MESSAGE;
791 op.data.send_message.send_message = send_message_payload_;
792 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
793 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
794 grpc_schedule_on_exec_ctx);
795 grpc_call_error call_error =
796 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
797 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
799 "[xds_client %p] calld=%p call_error=%d sending ADS message",
800 xds_client(), this, call_error);
801 GPR_ASSERT(GRPC_CALL_OK == call_error);
805 void XdsClient::ChannelState::AdsCallState::Subscribe(
806 const std::string& type_url, const std::string& name) {
807 auto& state = state_map_[type_url].subscribed_resources[name];
808 if (state == nullptr) {
809 state = MakeOrphanable<ResourceState>(type_url, name);
810 SendMessageLocked(type_url);
814 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
815 const std::string& type_url, const std::string& name,
816 bool delay_unsubscription) {
817 state_map_[type_url].subscribed_resources.erase(name);
818 if (!delay_unsubscription) SendMessageLocked(type_url);
821 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
822 for (const auto& p : state_map_) {
823 if (!p.second.subscribed_resources.empty()) return true;
828 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
829 XdsApi::LdsUpdateMap lds_update_map) {
830 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
832 "[xds_client %p] LDS update received containing %" PRIuPTR
834 xds_client(), lds_update_map.size());
836 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
837 std::set<std::string> rds_resource_names_seen;
838 for (auto& p : lds_update_map) {
839 const std::string& listener_name = p.first;
840 XdsApi::LdsUpdate& lds_update = p.second;
841 auto& state = lds_state.subscribed_resources[listener_name];
842 if (state != nullptr) state->Finish();
843 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
844 gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
845 xds_client(), listener_name.c_str(),
846 (!lds_update.route_config_name.empty()
847 ? lds_update.route_config_name.c_str()
849 if (lds_update.rds_update.has_value()) {
850 gpr_log(GPR_INFO, "RouteConfiguration: %s",
851 lds_update.rds_update->ToString().c_str());
854 // Record the RDS resource names seen.
855 if (!lds_update.route_config_name.empty()) {
856 rds_resource_names_seen.insert(lds_update.route_config_name);
858 // Ignore identical update.
859 ListenerState& listener_state = xds_client()->listener_map_[listener_name];
860 if (listener_state.update.has_value() &&
861 *listener_state.update == lds_update) {
862 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
864 "[xds_client %p] LDS update for %s identical to current, "
866 xds_client(), listener_name.c_str());
870 // Update the listener state.
871 listener_state.update = std::move(lds_update);
873 for (const auto& p : listener_state.watchers) {
874 p.first->OnListenerChanged(*listener_state.update);
877 // For any subscribed resource that is not present in the update,
878 // remove it from the cache and notify watchers that it does not exist.
879 for (const auto& p : lds_state.subscribed_resources) {
880 const std::string& listener_name = p.first;
881 if (lds_update_map.find(listener_name) == lds_update_map.end()) {
882 ListenerState& listener_state =
883 xds_client()->listener_map_[listener_name];
884 // If the resource was newly requested but has not yet been received,
885 // we don't want to generate an error for the watchers, because this LDS
886 // response may be in reaction to an earlier request that did not yet
887 // request the new resource, so its absence from the response does not
888 // necessarily indicate that the resource does not exist.
889 // For that case, we rely on the request timeout instead.
890 if (!listener_state.update.has_value()) continue;
891 listener_state.update.reset();
892 for (const auto& p : listener_state.watchers) {
893 p.first->OnResourceDoesNotExist();
897 // For any RDS resource that is no longer referred to by any LDS
898 // resources, remove it from the cache and notify watchers that it
900 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
901 for (const auto& p : rds_state.subscribed_resources) {
902 const std::string& rds_resource_name = p.first;
903 if (rds_resource_names_seen.find(rds_resource_name) ==
904 rds_resource_names_seen.end()) {
905 RouteConfigState& route_config_state =
906 xds_client()->route_config_map_[rds_resource_name];
907 route_config_state.update.reset();
908 for (const auto& p : route_config_state.watchers) {
909 p.first->OnResourceDoesNotExist();
915 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
916 XdsApi::RdsUpdateMap rds_update_map) {
917 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
919 "[xds_client %p] RDS update received containing %" PRIuPTR
921 xds_client(), rds_update_map.size());
923 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
924 for (auto& p : rds_update_map) {
925 const std::string& route_config_name = p.first;
926 XdsApi::RdsUpdate& rds_update = p.second;
927 auto& state = rds_state.subscribed_resources[route_config_name];
928 if (state != nullptr) state->Finish();
929 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
930 gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
931 rds_update.ToString().c_str());
933 RouteConfigState& route_config_state =
934 xds_client()->route_config_map_[route_config_name];
935 // Ignore identical update.
936 if (route_config_state.update.has_value() &&
937 *route_config_state.update == rds_update) {
938 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
940 "[xds_client %p] RDS resource identical to current, ignoring",
946 route_config_state.update = std::move(rds_update);
947 // Notify all watchers.
948 for (const auto& p : route_config_state.watchers) {
949 p.first->OnRouteConfigChanged(*route_config_state.update);
954 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
955 XdsApi::CdsUpdateMap cds_update_map) {
956 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
958 "[xds_client %p] CDS update received containing %" PRIuPTR
960 xds_client(), cds_update_map.size());
962 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
963 std::set<std::string> eds_resource_names_seen;
964 for (auto& p : cds_update_map) {
965 const char* cluster_name = p.first.c_str();
966 XdsApi::CdsUpdate& cds_update = p.second;
967 auto& state = cds_state.subscribed_resources[cluster_name];
968 if (state != nullptr) state->Finish();
969 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
971 "[xds_client %p] cluster=%s: eds_service_name=%s, "
972 "lrs_load_reporting_server_name=%s",
973 xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
974 cds_update.lrs_load_reporting_server_name.has_value()
975 ? cds_update.lrs_load_reporting_server_name.value().c_str()
978 // Record the EDS resource names seen.
979 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
981 : cds_update.eds_service_name);
982 // Ignore identical update.
983 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
984 if (cluster_state.update.has_value() &&
985 *cluster_state.update == cds_update) {
986 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
988 "[xds_client %p] CDS update identical to current, ignoring.",
993 // Update the cluster state.
994 cluster_state.update = std::move(cds_update);
995 // Notify all watchers.
996 for (const auto& p : cluster_state.watchers) {
997 p.first->OnClusterChanged(cluster_state.update.value());
1000 // For any subscribed resource that is not present in the update,
1001 // remove it from the cache and notify watchers that it does not exist.
1002 for (const auto& p : cds_state.subscribed_resources) {
1003 const std::string& cluster_name = p.first;
1004 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1005 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1006 // If the resource was newly requested but has not yet been received,
1007 // we don't want to generate an error for the watchers, because this CDS
1008 // response may be in reaction to an earlier request that did not yet
1009 // request the new resource, so its absence from the response does not
1010 // necessarily indicate that the resource does not exist.
1011 // For that case, we rely on the request timeout instead.
1012 if (!cluster_state.update.has_value()) continue;
1013 cluster_state.update.reset();
1014 for (const auto& p : cluster_state.watchers) {
1015 p.first->OnResourceDoesNotExist();
1019 // For any EDS resource that is no longer referred to by any CDS
1020 // resources, remove it from the cache and notify watchers that it
1022 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1023 for (const auto& p : eds_state.subscribed_resources) {
1024 const std::string& eds_resource_name = p.first;
1025 if (eds_resource_names_seen.find(eds_resource_name) ==
1026 eds_resource_names_seen.end()) {
1027 EndpointState& endpoint_state =
1028 xds_client()->endpoint_map_[eds_resource_name];
1029 endpoint_state.update.reset();
1030 for (const auto& p : endpoint_state.watchers) {
1031 p.first->OnResourceDoesNotExist();
1037 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1038 XdsApi::EdsUpdateMap eds_update_map) {
1039 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1041 "[xds_client %p] EDS update received containing %" PRIuPTR
1043 xds_client(), eds_update_map.size());
1045 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1046 for (auto& p : eds_update_map) {
1047 const char* eds_service_name = p.first.c_str();
1048 XdsApi::EdsUpdate& eds_update = p.second;
1049 auto& state = eds_state.subscribed_resources[eds_service_name];
1050 if (state != nullptr) state->Finish();
1051 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1052 gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1053 eds_service_name, eds_update.ToString().c_str());
1055 EndpointState& endpoint_state =
1056 xds_client()->endpoint_map_[eds_service_name];
1057 // Ignore identical update.
1058 if (endpoint_state.update.has_value() &&
1059 *endpoint_state.update == eds_update) {
1060 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1062 "[xds_client %p] EDS update identical to current, ignoring.",
1067 // Update the cluster state.
1068 endpoint_state.update = std::move(eds_update);
1069 // Notify all watchers.
1070 for (const auto& p : endpoint_state.watchers) {
1071 p.first->OnEndpointChanged(endpoint_state.update.value());
1076 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1077 grpc_error* error) {
1078 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1080 MutexLock lock(&ads_calld->xds_client()->mu_);
1081 ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1083 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1086 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1087 grpc_error* error) {
1088 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1089 // Clean up the sent message.
1090 grpc_byte_buffer_destroy(send_message_payload_);
1091 send_message_payload_ = nullptr;
1092 // Continue to send another pending message if any.
1093 // TODO(roth): The current code to handle buffered messages has the
1094 // advantage of sending only the most recent list of resource names for
1095 // each resource type (no matter how many times that resource type has
1096 // been requested to send while the current message sending is still
1097 // pending). But its disadvantage is that we send the requests in fixed
1098 // order of resource types. We need to fix this if we are seeing some
1099 // resource type(s) starved due to frequent requests of other resource
1101 auto it = buffered_requests_.begin();
1102 if (it != buffered_requests_.end()) {
1103 SendMessageLocked(*it);
1104 buffered_requests_.erase(it);
1107 GRPC_ERROR_UNREF(error);
1110 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1111 void* arg, grpc_error* /* error */) {
1112 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1115 MutexLock lock(&ads_calld->xds_client()->mu_);
1116 done = ads_calld->OnResponseReceivedLocked();
1118 if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1121 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1122 // Empty payload means the call was cancelled.
1123 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1126 // Read the response.
1127 grpc_byte_buffer_reader bbr;
1128 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1129 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1130 grpc_byte_buffer_reader_destroy(&bbr);
1131 grpc_byte_buffer_destroy(recv_message_payload_);
1132 recv_message_payload_ = nullptr;
1133 // Parse and validate the response.
1134 XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1135 response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1136 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1137 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1138 ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1139 grpc_slice_unref_internal(response_slice);
1140 if (result.type_url.empty()) {
1141 // Ignore unparsable response.
1143 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1144 xds_client(), grpc_error_string(result.parse_error));
1145 GRPC_ERROR_UNREF(result.parse_error);
1148 auto& state = state_map_[result.type_url];
1149 state.nonce = std::move(result.nonce);
1150 // NACK or ACK the response.
1151 if (result.parse_error != GRPC_ERROR_NONE) {
1152 GRPC_ERROR_UNREF(state.error);
1153 state.error = result.parse_error;
1154 // NACK unacceptable update.
1156 "[xds_client %p] ADS response invalid for resource type %s "
1157 "version %s, will NACK: nonce=%s error=%s",
1158 xds_client(), result.type_url.c_str(), result.version.c_str(),
1159 state.nonce.c_str(), grpc_error_string(result.parse_error));
1160 SendMessageLocked(result.type_url);
1162 seen_response_ = true;
1163 // Accept the ADS response according to the type_url.
1164 if (result.type_url == XdsApi::kLdsTypeUrl) {
1165 AcceptLdsUpdate(std::move(result.lds_update_map));
1166 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1167 AcceptRdsUpdate(std::move(result.rds_update_map));
1168 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1169 AcceptCdsUpdate(std::move(result.cds_update_map));
1170 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1171 AcceptEdsUpdate(std::move(result.eds_update_map));
1173 state.version = std::move(result.version);
1175 SendMessageLocked(result.type_url);
1176 // Start load reporting if needed.
1177 auto& lrs_call = chand()->lrs_calld_;
1178 if (lrs_call != nullptr) {
1179 LrsCallState* lrs_calld = lrs_call->calld();
1180 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1184 if (xds_client()->shutting_down_) return true;
1185 // Keep listening for updates.
1187 memset(&op, 0, sizeof(op));
1188 op.op = GRPC_OP_RECV_MESSAGE;
1189 op.data.recv_message.recv_message = &recv_message_payload_;
1191 op.reserved = nullptr;
1192 GPR_ASSERT(call_ != nullptr);
1193 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1194 const grpc_call_error call_error =
1195 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1196 GPR_ASSERT(GRPC_CALL_OK == call_error);
1200 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1201 void* arg, grpc_error* error) {
1202 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1204 MutexLock lock(&ads_calld->xds_client()->mu_);
1205 ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1207 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1210 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1211 grpc_error* error) {
1212 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1213 char* status_details = grpc_slice_to_c_string(status_details_);
1215 "[xds_client %p] ADS call status received. Status = %d, details "
1216 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1217 xds_client(), status_code_, status_details, chand(), this, call_,
1218 grpc_error_string(error));
1219 gpr_free(status_details);
1221 // Ignore status from a stale call.
1222 if (IsCurrentCallOnChannel()) {
1223 // Try to restart the call.
1224 parent_->OnCallFinishedLocked();
1225 // Send error to all watchers.
1226 xds_client()->NotifyOnErrorLocked(
1227 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1229 GRPC_ERROR_UNREF(error);
1232 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1233 // If the retryable ADS call is null (which only happens when the xds channel
1234 // is shutting down), all the ADS calls are stale.
1235 if (chand()->ads_calld_ == nullptr) return false;
1236 return this == chand()->ads_calld_->calld();
1239 std::set<absl::string_view>
1240 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1241 const std::string& type_url) {
1242 std::set<absl::string_view> resource_names;
1243 auto it = state_map_.find(type_url);
1244 if (it != state_map_.end()) {
1245 for (auto& p : it->second.subscribed_resources) {
1246 resource_names.insert(p.first);
1247 OrphanablePtr<ResourceState>& state = p.second;
1248 state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1251 return resource_names;
1255 // XdsClient::ChannelState::LrsCallState::Reporter
1258 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1259 if (next_report_timer_callback_pending_) {
1260 grpc_timer_cancel(&next_report_timer_);
1264 void XdsClient::ChannelState::LrsCallState::Reporter::
1265 ScheduleNextReportLocked() {
1266 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1267 grpc_timer_init(&next_report_timer_, next_report_time,
1268 &on_next_report_timer_);
1269 next_report_timer_callback_pending_ = true;
1272 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1273 void* arg, grpc_error* error) {
1274 Reporter* self = static_cast<Reporter*>(arg);
1277 MutexLock lock(&self->xds_client()->mu_);
1278 done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1280 if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1283 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1284 grpc_error* error) {
1285 next_report_timer_callback_pending_ = false;
1286 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1287 GRPC_ERROR_UNREF(error);
1296 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1297 for (const auto& p : snapshot) {
1298 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1299 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1300 for (const auto& q : cluster_snapshot.locality_stats) {
1301 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1302 if (!locality_snapshot.IsZero()) return false;
1310 void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1311 // Construct snapshot from all reported stats.
1312 XdsApi::ClusterLoadReportMap snapshot =
1313 xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1314 parent_->cluster_names_);
1315 // Skip client load report if the counters were all zero in the last
1316 // report and they are still zero in this one.
1317 const bool old_val = last_report_counters_were_zero_;
1318 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1319 if (old_val && last_report_counters_were_zero_) {
1320 ScheduleNextReportLocked();
1323 // Create a request that contains the snapshot.
1324 grpc_slice request_payload_slice =
1325 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1326 parent_->send_message_payload_ =
1327 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1328 grpc_slice_unref_internal(request_payload_slice);
1331 memset(&op, 0, sizeof(op));
1332 op.op = GRPC_OP_SEND_MESSAGE;
1333 op.data.send_message.send_message = parent_->send_message_payload_;
1334 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1335 parent_->call_, &op, 1, &on_report_done_);
1336 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1338 "[xds_client %p] calld=%p call_error=%d sending client load report",
1339 xds_client(), this, call_error);
1340 GPR_ASSERT(GRPC_CALL_OK == call_error);
1344 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1345 void* arg, grpc_error* error) {
1346 Reporter* self = static_cast<Reporter*>(arg);
1349 MutexLock lock(&self->xds_client()->mu_);
1350 done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1352 if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1355 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1356 grpc_error* error) {
1357 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1358 parent_->send_message_payload_ = nullptr;
1359 // If there are no more registered stats to report, cancel the call.
1360 if (xds_client()->load_report_map_.empty()) {
1361 parent_->chand()->StopLrsCall();
1362 GRPC_ERROR_UNREF(error);
1365 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1366 GRPC_ERROR_UNREF(error);
1367 // If this reporter is no longer the current one on the call, the reason
1368 // might be that it was orphaned for a new one due to config update.
1369 if (!IsCurrentReporterOnCall()) {
1370 parent_->MaybeStartReportingLocked();
1374 ScheduleNextReportLocked();
1379 // XdsClient::ChannelState::LrsCallState
1382 XdsClient::ChannelState::LrsCallState::LrsCallState(
1383 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1384 : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
1385 parent_(std::move(parent)) {
1386 // Init the LRS call. Note that the call will progress every time there's
1387 // activity in xds_client()->interested_parties_, which is comprised of
1388 // the polling entities from client_channel.
1389 GPR_ASSERT(xds_client() != nullptr);
1390 const auto& method =
1391 xds_client()->bootstrap_->server().ShouldUseV3()
1392 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1393 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1394 call_ = grpc_channel_create_pollset_set_call(
1395 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1396 xds_client()->interested_parties_, method, nullptr,
1397 GRPC_MILLIS_INF_FUTURE, nullptr);
1398 GPR_ASSERT(call_ != nullptr);
1399 // Init the request payload.
1400 grpc_slice request_payload_slice =
1401 xds_client()->api_.CreateLrsInitialRequest();
1402 send_message_payload_ =
1403 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1404 grpc_slice_unref_internal(request_payload_slice);
1405 // Init other data associated with the LRS call.
1406 grpc_metadata_array_init(&initial_metadata_recv_);
1407 grpc_metadata_array_init(&trailing_metadata_recv_);
1409 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1411 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1413 xds_client(), chand(), this, call_);
1416 grpc_call_error call_error;
1418 memset(ops, 0, sizeof(ops));
1419 // Op: send initial metadata.
1421 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1422 op->data.send_initial_metadata.count = 0;
1423 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1424 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1425 op->reserved = nullptr;
1427 // Op: send request message.
1428 GPR_ASSERT(send_message_payload_ != nullptr);
1429 op->op = GRPC_OP_SEND_MESSAGE;
1430 op->data.send_message.send_message = send_message_payload_;
1432 op->reserved = nullptr;
1434 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1435 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1436 grpc_schedule_on_exec_ctx);
1437 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1438 &on_initial_request_sent_);
1439 GPR_ASSERT(GRPC_CALL_OK == call_error);
1440 // Op: recv initial metadata.
1442 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1443 op->data.recv_initial_metadata.recv_initial_metadata =
1444 &initial_metadata_recv_;
1446 op->reserved = nullptr;
1448 // Op: recv response.
1449 op->op = GRPC_OP_RECV_MESSAGE;
1450 op->data.recv_message.recv_message = &recv_message_payload_;
1452 op->reserved = nullptr;
1454 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1455 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1456 grpc_schedule_on_exec_ctx);
1457 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1458 &on_response_received_);
1459 GPR_ASSERT(GRPC_CALL_OK == call_error);
1460 // Op: recv server status.
1462 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1463 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1464 op->data.recv_status_on_client.status = &status_code_;
1465 op->data.recv_status_on_client.status_details = &status_details_;
1467 op->reserved = nullptr;
1469 // This callback signals the end of the call, so it relies on the initial
1470 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1472 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1473 grpc_schedule_on_exec_ctx);
1474 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1475 &on_status_received_);
1476 GPR_ASSERT(GRPC_CALL_OK == call_error);
1479 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1480 grpc_metadata_array_destroy(&initial_metadata_recv_);
1481 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1482 grpc_byte_buffer_destroy(send_message_payload_);
1483 grpc_byte_buffer_destroy(recv_message_payload_);
1484 grpc_slice_unref_internal(status_details_);
1485 GPR_ASSERT(call_ != nullptr);
1486 grpc_call_unref(call_);
1489 void XdsClient::ChannelState::LrsCallState::Orphan() {
1491 GPR_ASSERT(call_ != nullptr);
1492 // If we are here because xds_client wants to cancel the call,
1493 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1494 // we are here because xds_client has to orphan a failed call, then the
1495 // following cancellation will be a no-op.
1496 grpc_call_cancel_internal(call_);
1497 // Note that the initial ref is hold by on_status_received_. So the
1498 // corresponding unref happens in on_status_received_ instead of here.
1501 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1502 // Don't start again if already started.
1503 if (reporter_ != nullptr) return;
1504 // Don't start if the previous send_message op (of the initial request or the
1505 // last report of the previous reporter) hasn't completed.
1506 if (send_message_payload_ != nullptr) return;
1507 // Don't start if no LRS response has arrived.
1508 if (!seen_response()) return;
1509 // Don't start if the ADS call hasn't received any valid response. Note that
1510 // this must be the first channel because it is the current channel but its
1511 // ADS call hasn't seen any response.
1512 if (chand()->ads_calld_ == nullptr ||
1513 chand()->ads_calld_->calld() == nullptr ||
1514 !chand()->ads_calld_->calld()->seen_response()) {
1518 reporter_ = MakeOrphanable<Reporter>(
1519 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1522 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1523 void* arg, grpc_error* /*error*/) {
1524 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1526 MutexLock lock(&lrs_calld->xds_client()->mu_);
1527 lrs_calld->OnInitialRequestSentLocked();
1529 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1532 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1533 // Clear the send_message_payload_.
1534 grpc_byte_buffer_destroy(send_message_payload_);
1535 send_message_payload_ = nullptr;
1536 MaybeStartReportingLocked();
1539 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1540 void* arg, grpc_error* /*error*/) {
1541 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1544 MutexLock lock(&lrs_calld->xds_client()->mu_);
1545 done = lrs_calld->OnResponseReceivedLocked();
1547 if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1550 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1551 // Empty payload means the call was cancelled.
1552 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1555 // Read the response.
1556 grpc_byte_buffer_reader bbr;
1557 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1558 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1559 grpc_byte_buffer_reader_destroy(&bbr);
1560 grpc_byte_buffer_destroy(recv_message_payload_);
1561 recv_message_payload_ = nullptr;
1562 // This anonymous lambda is a hack to avoid the usage of goto.
1564 // Parse the response.
1565 bool send_all_clusters = false;
1566 std::set<std::string> new_cluster_names;
1567 grpc_millis new_load_reporting_interval;
1568 grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1569 response_slice, &send_all_clusters, &new_cluster_names,
1570 &new_load_reporting_interval);
1571 if (parse_error != GRPC_ERROR_NONE) {
1573 "[xds_client %p] LRS response parsing failed. error=%s",
1574 xds_client(), grpc_error_string(parse_error));
1575 GRPC_ERROR_UNREF(parse_error);
1578 seen_response_ = true;
1579 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1582 "[xds_client %p] LRS response received, %" PRIuPTR
1583 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1585 xds_client(), new_cluster_names.size(), send_all_clusters,
1586 new_load_reporting_interval);
1588 for (const auto& name : new_cluster_names) {
1589 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1590 xds_client(), i++, name.c_str());
1593 if (new_load_reporting_interval <
1594 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1595 new_load_reporting_interval =
1596 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1597 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1599 "[xds_client %p] Increased load_report_interval to minimum "
1601 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1604 // Ignore identical update.
1605 if (send_all_clusters == send_all_clusters_ &&
1606 cluster_names_ == new_cluster_names &&
1607 load_reporting_interval_ == new_load_reporting_interval) {
1608 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1610 "[xds_client %p] Incoming LRS response identical to current, "
1616 // Stop current load reporting (if any) to adopt the new config.
1618 // Record the new config.
1619 send_all_clusters_ = send_all_clusters;
1620 cluster_names_ = std::move(new_cluster_names);
1621 load_reporting_interval_ = new_load_reporting_interval;
1622 // Try starting sending load report.
1623 MaybeStartReportingLocked();
1625 grpc_slice_unref_internal(response_slice);
1626 if (xds_client()->shutting_down_) return true;
1627 // Keep listening for LRS config updates.
1629 memset(&op, 0, sizeof(op));
1630 op.op = GRPC_OP_RECV_MESSAGE;
1631 op.data.recv_message.recv_message = &recv_message_payload_;
1633 op.reserved = nullptr;
1634 GPR_ASSERT(call_ != nullptr);
1635 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1636 const grpc_call_error call_error =
1637 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1638 GPR_ASSERT(GRPC_CALL_OK == call_error);
1642 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1643 void* arg, grpc_error* error) {
1644 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1646 MutexLock lock(&lrs_calld->xds_client()->mu_);
1647 lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1649 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1652 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1653 grpc_error* error) {
1654 GPR_ASSERT(call_ != nullptr);
1655 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1656 char* status_details = grpc_slice_to_c_string(status_details_);
1658 "[xds_client %p] LRS call status received. Status = %d, details "
1659 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1660 xds_client(), status_code_, status_details, chand(), this, call_,
1661 grpc_error_string(error));
1662 gpr_free(status_details);
1664 // Ignore status from a stale call.
1665 if (IsCurrentCallOnChannel()) {
1666 GPR_ASSERT(!xds_client()->shutting_down_);
1667 // Try to restart the call.
1668 parent_->OnCallFinishedLocked();
1670 GRPC_ERROR_UNREF(error);
1673 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1674 // If the retryable LRS call is null (which only happens when the xds channel
1675 // is shutting down), all the LRS calls are stale.
1676 if (chand()->lrs_calld_ == nullptr) return false;
1677 return this == chand()->lrs_calld_->calld();
1686 grpc_millis GetRequestTimeout() {
1687 return grpc_channel_args_find_integer(
1688 g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1689 {15000, 0, INT_MAX});
1692 grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
1693 grpc_error** error) {
1694 // Build channel args.
1695 absl::InlinedVector<grpc_arg, 2> args_to_add = {
1696 grpc_channel_arg_integer_create(
1697 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1698 5 * 60 * GPR_MS_PER_SEC),
1699 grpc_channel_arg_integer_create(
1700 const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1702 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
1703 g_channel_args, args_to_add.data(), args_to_add.size());
1704 // Find credentials and create channel.
1705 RefCountedPtr<grpc_channel_credentials> creds;
1706 for (const auto& channel_creds : bootstrap.server().channel_creds) {
1707 if (channel_creds.type == "google_default") {
1708 creds.reset(grpc_google_default_credentials_create(nullptr));
1711 if (channel_creds.type == "insecure") {
1712 grpc_channel* channel = grpc_insecure_channel_create(
1713 bootstrap.server().server_uri.c_str(), new_args, nullptr);
1714 grpc_channel_args_destroy(new_args);
1717 if (channel_creds.type == "fake") {
1718 creds.reset(grpc_fake_transport_security_credentials_create());
1722 if (creds == nullptr) {
1723 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1724 "no supported credential types found");
1727 grpc_channel* channel = grpc_secure_channel_create(
1728 creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr);
1729 grpc_channel_args_destroy(new_args);
1735 XdsClient::XdsClient(grpc_error** error)
1736 : DualRefCounted<XdsClient>(&grpc_xds_client_trace),
1737 request_timeout_(GetRequestTimeout()),
1738 interested_parties_(grpc_pollset_set_create()),
1740 XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1741 api_(this, &grpc_xds_client_trace, bootstrap_.get()) {
1742 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1743 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1745 if (*error != GRPC_ERROR_NONE) {
1746 gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1747 this, grpc_error_string(*error));
1750 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1751 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
1752 bootstrap_->server().server_uri.c_str());
1754 grpc_channel* channel = CreateXdsChannel(*bootstrap_, error);
1755 if (*error != GRPC_ERROR_NONE) {
1756 gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
1757 grpc_error_string(*error));
1760 // Create ChannelState object.
1761 chand_ = MakeOrphanable<ChannelState>(
1762 WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
1765 XdsClient::~XdsClient() {
1766 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1767 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1769 grpc_pollset_set_destroy(interested_parties_);
1772 void XdsClient::AddChannelzLinkage(
1773 channelz::ChannelNode* parent_channelz_node) {
1774 channelz::ChannelNode* xds_channelz_node =
1775 grpc_channel_get_channelz_node(chand_->channel());
1776 if (xds_channelz_node != nullptr) {
1777 parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1781 void XdsClient::RemoveChannelzLinkage(
1782 channelz::ChannelNode* parent_channelz_node) {
1783 channelz::ChannelNode* xds_channelz_node =
1784 grpc_channel_get_channelz_node(chand_->channel());
1785 if (xds_channelz_node != nullptr) {
1786 parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1790 void XdsClient::Orphan() {
1791 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1792 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1795 MutexLock lock(g_mu);
1796 if (g_xds_client == this) g_xds_client = nullptr;
1799 MutexLock lock(&mu_);
1800 shutting_down_ = true;
1801 // Orphan ChannelState object.
1803 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1804 // created by the XdsResolver because the maps contain refs for watchers
1805 // which in turn hold refs to the loadbalancing policies. At this point, it
1806 // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1807 // policies before those calls are done would lead to issues such as
1808 // https://github.com/grpc/grpc/issues/20928.
1809 if (!listener_map_.empty()) {
1810 cluster_map_.clear();
1811 endpoint_map_.clear();
1816 void XdsClient::WatchListenerData(
1817 absl::string_view listener_name,
1818 std::unique_ptr<ListenerWatcherInterface> watcher) {
1819 std::string listener_name_str = std::string(listener_name);
1820 MutexLock lock(&mu_);
1821 ListenerState& listener_state = listener_map_[listener_name_str];
1822 ListenerWatcherInterface* w = watcher.get();
1823 listener_state.watchers[w] = std::move(watcher);
1824 // If we've already received an LDS update, notify the new watcher
1826 if (listener_state.update.has_value()) {
1827 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1828 gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1829 this, listener_name_str.c_str());
1831 w->OnListenerChanged(*listener_state.update);
1833 chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1836 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1837 ListenerWatcherInterface* watcher,
1838 bool delay_unsubscription) {
1839 MutexLock lock(&mu_);
1840 if (shutting_down_) return;
1841 std::string listener_name_str = std::string(listener_name);
1842 ListenerState& listener_state = listener_map_[listener_name_str];
1843 auto it = listener_state.watchers.find(watcher);
1844 if (it != listener_state.watchers.end()) {
1845 listener_state.watchers.erase(it);
1846 if (listener_state.watchers.empty()) {
1847 listener_map_.erase(listener_name_str);
1848 chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1849 delay_unsubscription);
1854 void XdsClient::WatchRouteConfigData(
1855 absl::string_view route_config_name,
1856 std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1857 std::string route_config_name_str = std::string(route_config_name);
1858 MutexLock lock(&mu_);
1859 RouteConfigState& route_config_state =
1860 route_config_map_[route_config_name_str];
1861 RouteConfigWatcherInterface* w = watcher.get();
1862 route_config_state.watchers[w] = std::move(watcher);
1863 // If we've already received an RDS update, notify the new watcher
1865 if (route_config_state.update.has_value()) {
1866 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1868 "[xds_client %p] returning cached route config data for %s", this,
1869 route_config_name_str.c_str());
1871 w->OnRouteConfigChanged(*route_config_state.update);
1873 chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1876 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1877 RouteConfigWatcherInterface* watcher,
1878 bool delay_unsubscription) {
1879 MutexLock lock(&mu_);
1880 if (shutting_down_) return;
1881 std::string route_config_name_str = std::string(route_config_name);
1882 RouteConfigState& route_config_state =
1883 route_config_map_[route_config_name_str];
1884 auto it = route_config_state.watchers.find(watcher);
1885 if (it != route_config_state.watchers.end()) {
1886 route_config_state.watchers.erase(it);
1887 if (route_config_state.watchers.empty()) {
1888 route_config_map_.erase(route_config_name_str);
1889 chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1890 delay_unsubscription);
1895 void XdsClient::WatchClusterData(
1896 absl::string_view cluster_name,
1897 std::unique_ptr<ClusterWatcherInterface> watcher) {
1898 std::string cluster_name_str = std::string(cluster_name);
1899 MutexLock lock(&mu_);
1900 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1901 ClusterWatcherInterface* w = watcher.get();
1902 cluster_state.watchers[w] = std::move(watcher);
1903 // If we've already received a CDS update, notify the new watcher
1905 if (cluster_state.update.has_value()) {
1906 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1907 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1908 this, cluster_name_str.c_str());
1910 w->OnClusterChanged(cluster_state.update.value());
1912 chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1915 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1916 ClusterWatcherInterface* watcher,
1917 bool delay_unsubscription) {
1918 MutexLock lock(&mu_);
1919 if (shutting_down_) return;
1920 std::string cluster_name_str = std::string(cluster_name);
1921 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1922 auto it = cluster_state.watchers.find(watcher);
1923 if (it != cluster_state.watchers.end()) {
1924 cluster_state.watchers.erase(it);
1925 if (cluster_state.watchers.empty()) {
1926 cluster_map_.erase(cluster_name_str);
1927 chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1928 delay_unsubscription);
1933 void XdsClient::WatchEndpointData(
1934 absl::string_view eds_service_name,
1935 std::unique_ptr<EndpointWatcherInterface> watcher) {
1936 std::string eds_service_name_str = std::string(eds_service_name);
1937 MutexLock lock(&mu_);
1938 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1939 EndpointWatcherInterface* w = watcher.get();
1940 endpoint_state.watchers[w] = std::move(watcher);
1941 // If we've already received an EDS update, notify the new watcher
1943 if (endpoint_state.update.has_value()) {
1944 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1945 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1946 this, eds_service_name_str.c_str());
1948 w->OnEndpointChanged(endpoint_state.update.value());
1950 chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1953 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1954 EndpointWatcherInterface* watcher,
1955 bool delay_unsubscription) {
1956 MutexLock lock(&mu_);
1957 if (shutting_down_) return;
1958 std::string eds_service_name_str = std::string(eds_service_name);
1959 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1960 auto it = endpoint_state.watchers.find(watcher);
1961 if (it != endpoint_state.watchers.end()) {
1962 endpoint_state.watchers.erase(it);
1963 if (endpoint_state.watchers.empty()) {
1964 endpoint_map_.erase(eds_service_name_str);
1965 chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1966 delay_unsubscription);
1971 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1972 absl::string_view lrs_server, absl::string_view cluster_name,
1973 absl::string_view eds_service_name) {
1974 // TODO(roth): When we add support for direct federation, use the
1975 // server name specified in lrs_server.
1977 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1978 MutexLock lock(&mu_);
1979 // We jump through some hoops here to make sure that the absl::string_views
1980 // stored in the XdsClusterDropStats object point to the strings
1981 // in the load_report_map_ key, so that they have the same lifetime.
1982 auto it = load_report_map_
1983 .emplace(std::make_pair(std::move(key), LoadReportState()))
1985 auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1986 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1987 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
1988 it->second.drop_stats.insert(cluster_drop_stats.get());
1989 chand_->MaybeStartLrsCall();
1990 return cluster_drop_stats;
1993 void XdsClient::RemoveClusterDropStats(
1994 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
1995 absl::string_view eds_service_name,
1996 XdsClusterDropStats* cluster_drop_stats) {
1997 MutexLock lock(&mu_);
1998 auto load_report_it = load_report_map_.find(
1999 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2000 if (load_report_it == load_report_map_.end()) return;
2001 LoadReportState& load_report_state = load_report_it->second;
2002 // TODO(roth): When we add support for direct federation, use the
2003 // server name specified in lrs_server.
2004 auto it = load_report_state.drop_stats.find(cluster_drop_stats);
2005 if (it != load_report_state.drop_stats.end()) {
2006 // Record final drop stats in deleted_drop_stats, which will be
2007 // added to the next load report.
2008 auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset();
2009 load_report_state.deleted_drop_stats += dropped_requests;
2010 load_report_state.drop_stats.erase(it);
2014 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2015 absl::string_view lrs_server, absl::string_view cluster_name,
2016 absl::string_view eds_service_name,
2017 RefCountedPtr<XdsLocalityName> locality) {
2018 // TODO(roth): When we add support for direct federation, use the
2019 // server name specified in lrs_server.
2021 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2022 MutexLock lock(&mu_);
2023 // We jump through some hoops here to make sure that the absl::string_views
2024 // stored in the XdsClusterLocalityStats object point to the strings
2025 // in the load_report_map_ key, so that they have the same lifetime.
2026 auto it = load_report_map_
2027 .emplace(std::make_pair(std::move(key), LoadReportState()))
2029 auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2030 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2031 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2033 it->second.locality_stats[std::move(locality)].locality_stats.insert(
2034 cluster_locality_stats.get());
2035 chand_->MaybeStartLrsCall();
2036 return cluster_locality_stats;
2039 void XdsClient::RemoveClusterLocalityStats(
2040 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2041 absl::string_view eds_service_name,
2042 const RefCountedPtr<XdsLocalityName>& locality,
2043 XdsClusterLocalityStats* cluster_locality_stats) {
2044 MutexLock lock(&mu_);
2045 auto load_report_it = load_report_map_.find(
2046 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2047 if (load_report_it == load_report_map_.end()) return;
2048 LoadReportState& load_report_state = load_report_it->second;
2049 // TODO(roth): When we add support for direct federation, use the
2050 // server name specified in lrs_server.
2051 auto locality_it = load_report_state.locality_stats.find(locality);
2052 if (locality_it == load_report_state.locality_stats.end()) return;
2053 auto& locality_set = locality_it->second.locality_stats;
2054 auto it = locality_set.find(cluster_locality_stats);
2055 if (it != locality_set.end()) {
2056 // Record final snapshot in deleted_locality_stats, which will be
2057 // added to the next load report.
2058 locality_it->second.deleted_locality_stats.emplace_back(
2059 cluster_locality_stats->GetSnapshotAndReset());
2060 locality_set.erase(it);
2064 void XdsClient::ResetBackoff() {
2065 MutexLock lock(&mu_);
2066 if (chand_ != nullptr) {
2067 grpc_channel_reset_connect_backoff(chand_->channel());
2071 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2072 for (const auto& p : listener_map_) {
2073 const ListenerState& listener_state = p.second;
2074 for (const auto& p : listener_state.watchers) {
2075 p.first->OnError(GRPC_ERROR_REF(error));
2078 for (const auto& p : route_config_map_) {
2079 const RouteConfigState& route_config_state = p.second;
2080 for (const auto& p : route_config_state.watchers) {
2081 p.first->OnError(GRPC_ERROR_REF(error));
2084 for (const auto& p : cluster_map_) {
2085 const ClusterState& cluster_state = p.second;
2086 for (const auto& p : cluster_state.watchers) {
2087 p.first->OnError(GRPC_ERROR_REF(error));
2090 for (const auto& p : endpoint_map_) {
2091 const EndpointState& endpoint_state = p.second;
2092 for (const auto& p : endpoint_state.watchers) {
2093 p.first->OnError(GRPC_ERROR_REF(error));
2096 GRPC_ERROR_UNREF(error);
2099 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2100 bool send_all_clusters, const std::set<std::string>& clusters) {
2101 XdsApi::ClusterLoadReportMap snapshot_map;
2102 for (auto load_report_it = load_report_map_.begin();
2103 load_report_it != load_report_map_.end();) {
2104 // Cluster key is cluster and EDS service name.
2105 const auto& cluster_key = load_report_it->first;
2106 LoadReportState& load_report = load_report_it->second;
2107 // If the CDS response for a cluster indicates to use LRS but the
2108 // LRS server does not say that it wants reports for this cluster,
2109 // then we'll have stats objects here whose data we're not going to
2110 // include in the load report. However, we still need to clear out
2111 // the data from the stats objects, so that if the LRS server starts
2112 // asking for the data in the future, we don't incorrectly include
2113 // data from previous reporting intervals in that future report.
2114 const bool record_stats =
2115 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2116 XdsApi::ClusterLoadReport snapshot;
2117 // Aggregate drop stats.
2118 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2119 for (auto& drop_stats : load_report.drop_stats) {
2120 auto dropped_requests = drop_stats->GetSnapshotAndReset();
2121 snapshot.dropped_requests += dropped_requests;
2123 // Aggregate locality stats.
2124 for (auto it = load_report.locality_stats.begin();
2125 it != load_report.locality_stats.end();) {
2126 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2127 auto& locality_state = it->second;
2128 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2129 snapshot.locality_stats[locality_name];
2130 for (auto& locality_stats : locality_state.locality_stats) {
2131 locality_snapshot += locality_stats->GetSnapshotAndReset();
2133 // Add final snapshots from recently deleted locality stats objects.
2134 for (auto& deleted_locality_stats :
2135 locality_state.deleted_locality_stats) {
2136 locality_snapshot += deleted_locality_stats;
2138 locality_state.deleted_locality_stats.clear();
2139 // If the only thing left in this entry was final snapshots from
2140 // deleted locality stats objects, remove the entry.
2141 if (locality_state.locality_stats.empty()) {
2142 it = load_report.locality_stats.erase(it);
2148 // Compute load report interval.
2149 const grpc_millis now = ExecCtx::Get()->Now();
2150 snapshot.load_report_interval = now - load_report.last_report_time;
2151 load_report.last_report_time = now;
2153 snapshot_map[cluster_key] = std::move(snapshot);
2155 // If the only thing left in this entry was final snapshots from
2156 // deleted stats objects, remove the entry.
2157 if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
2158 load_report_it = load_report_map_.erase(load_report_it);
2163 return snapshot_map;
2167 // accessors for global state
2170 void XdsClientGlobalInit() { g_mu = new Mutex; }
2172 void XdsClientGlobalShutdown() {
2177 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2178 MutexLock lock(g_mu);
2179 if (g_xds_client != nullptr) {
2180 auto xds_client = g_xds_client->RefIfNonZero();
2181 if (xds_client != nullptr) return xds_client;
2183 auto xds_client = MakeRefCounted<XdsClient>(error);
2184 g_xds_client = xds_client.get();
2188 namespace internal {
2190 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2191 MutexLock lock(g_mu);
2192 g_channel_args = args;
2195 void UnsetGlobalXdsClientForTest() {
2196 MutexLock lock(g_mu);
2197 g_xds_client = nullptr;
2200 } // namespace internal
2202 } // namespace grpc_core