3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/map.h"
46 #include "src/core/lib/gprpp/memory.h"
47 #include "src/core/lib/gprpp/orphanable.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/iomgr/sockaddr.h"
51 #include "src/core/lib/iomgr/sockaddr_utils.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 #include "src/core/lib/surface/call.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/channel_init.h"
58 #include "src/core/lib/transport/static_metadata.h"
60 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
61 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
62 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
63 #define GRPC_XDS_RECONNECT_JITTER 0.2
64 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
68 TraceFlag grpc_xds_client_trace(false, "xds_client");
72 Mutex* g_mu = nullptr;
73 const grpc_channel_args* g_channel_args = nullptr;
74 XdsClient* g_xds_client = nullptr;
79 // Internal class declarations
82 // An xds call wrapper that can restart a call upon failure. Holds a ref to
83 // the xds channel. The template parameter is the kind of wrapped xds call.
85 class XdsClient::ChannelState::RetryableCall
86 : public InternallyRefCounted<RetryableCall<T>> {
88 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
90 void Orphan() override;
92 void OnCallFinishedLocked();
94 T* calld() const { return calld_.get(); }
95 ChannelState* chand() const { return chand_.get(); }
97 bool IsCurrentCallOnChannel() const;
100 void StartNewCallLocked();
101 void StartRetryTimerLocked();
102 static void OnRetryTimer(void* arg, grpc_error* error);
103 void OnRetryTimerLocked(grpc_error* error);
105 // The wrapped xds call that talks to the xds server. It's instantiated
106 // every time we start a new call. It's null during call retry backoff.
107 OrphanablePtr<T> calld_;
108 // The owning xds channel.
109 RefCountedPtr<ChannelState> chand_;
113 grpc_timer retry_timer_;
114 grpc_closure on_retry_timer_;
115 bool retry_timer_callback_pending_ = false;
117 bool shutting_down_ = false;
120 // Contains an ADS call to the xds server.
121 class XdsClient::ChannelState::AdsCallState
122 : public InternallyRefCounted<AdsCallState> {
124 // The ctor and dtor should not be used directly.
125 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
126 ~AdsCallState() override;
128 void Orphan() override;
130 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
131 ChannelState* chand() const { return parent_->chand(); }
132 XdsClient* xds_client() const { return chand()->xds_client(); }
133 bool seen_response() const { return seen_response_; }
135 void Subscribe(const std::string& type_url, const std::string& name);
136 void Unsubscribe(const std::string& type_url, const std::string& name,
137 bool delay_unsubscription);
139 bool HasSubscribedResources() const;
142 class ResourceState : public InternallyRefCounted<ResourceState> {
144 ResourceState(const std::string& type_url, const std::string& name,
145 bool sent_initial_request)
146 : type_url_(type_url),
148 sent_initial_request_(sent_initial_request) {
149 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
150 grpc_schedule_on_exec_ctx);
153 void Orphan() override {
155 Unref(DEBUG_LOCATION, "Orphan");
158 void Start(RefCountedPtr<AdsCallState> ads_calld) {
159 if (sent_initial_request_) return;
160 sent_initial_request_ = true;
161 ads_calld_ = std::move(ads_calld);
162 Ref(DEBUG_LOCATION, "timer").release();
163 timer_pending_ = true;
166 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
171 if (timer_pending_) {
172 grpc_timer_cancel(&timer_);
173 timer_pending_ = false;
178 static void OnTimer(void* arg, grpc_error* error) {
179 ResourceState* self = static_cast<ResourceState*>(arg);
181 MutexLock lock(&self->ads_calld_->xds_client()->mu_);
182 self->OnTimerLocked(GRPC_ERROR_REF(error));
184 self->ads_calld_.reset();
185 self->Unref(DEBUG_LOCATION, "timer");
188 void OnTimerLocked(grpc_error* error) {
189 if (error == GRPC_ERROR_NONE && timer_pending_) {
190 timer_pending_ = false;
191 grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
193 "timeout obtaining resource {type=%s name=%s} from xds server",
196 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
197 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
198 grpc_error_string(watcher_error));
200 if (type_url_ == XdsApi::kLdsTypeUrl) {
201 ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
202 for (const auto& p : state.watchers) {
203 p.first->OnError(GRPC_ERROR_REF(watcher_error));
205 } else if (type_url_ == XdsApi::kRdsTypeUrl) {
206 RouteConfigState& state =
207 ads_calld_->xds_client()->route_config_map_[name_];
208 for (const auto& p : state.watchers) {
209 p.first->OnError(GRPC_ERROR_REF(watcher_error));
211 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
212 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
213 for (const auto& p : state.watchers) {
214 p.first->OnError(GRPC_ERROR_REF(watcher_error));
216 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
217 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
218 for (const auto& p : state.watchers) {
219 p.first->OnError(GRPC_ERROR_REF(watcher_error));
222 GPR_UNREACHABLE_CODE(return );
224 GRPC_ERROR_UNREF(watcher_error);
226 GRPC_ERROR_UNREF(error);
229 const std::string type_url_;
230 const std::string name_;
232 RefCountedPtr<AdsCallState> ads_calld_;
233 bool sent_initial_request_;
234 bool timer_pending_ = false;
236 grpc_closure timer_callback_;
239 struct ResourceTypeState {
240 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
242 // Nonce and error for this resource type.
244 grpc_error* error = GRPC_ERROR_NONE;
246 // Subscribed resources of this type.
247 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
248 subscribed_resources;
251 void SendMessageLocked(const std::string& type_url);
253 void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
254 void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
255 void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
256 void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
258 static void OnRequestSent(void* arg, grpc_error* error);
259 void OnRequestSentLocked(grpc_error* error);
260 static void OnResponseReceived(void* arg, grpc_error* error);
261 bool OnResponseReceivedLocked();
262 static void OnStatusReceived(void* arg, grpc_error* error);
263 void OnStatusReceivedLocked(grpc_error* error);
265 bool IsCurrentCallOnChannel() const;
267 std::set<absl::string_view> ResourceNamesForRequest(
268 const std::string& type_url);
270 // The owning RetryableCall<>.
271 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
273 bool sent_initial_message_ = false;
274 bool seen_response_ = false;
279 // recv_initial_metadata
280 grpc_metadata_array initial_metadata_recv_;
283 grpc_byte_buffer* send_message_payload_ = nullptr;
284 grpc_closure on_request_sent_;
287 grpc_byte_buffer* recv_message_payload_ = nullptr;
288 grpc_closure on_response_received_;
290 // recv_trailing_metadata
291 grpc_metadata_array trailing_metadata_recv_;
292 grpc_status_code status_code_;
293 grpc_slice status_details_;
294 grpc_closure on_status_received_;
296 // Resource types for which requests need to be sent.
297 std::set<std::string /*type_url*/> buffered_requests_;
299 // State for each resource type.
300 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
303 // Contains an LRS call to the xds server.
304 class XdsClient::ChannelState::LrsCallState
305 : public InternallyRefCounted<LrsCallState> {
307 // The ctor and dtor should not be used directly.
308 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
309 ~LrsCallState() override;
311 void Orphan() override;
313 void MaybeStartReportingLocked();
315 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
316 ChannelState* chand() const { return parent_->chand(); }
317 XdsClient* xds_client() const { return chand()->xds_client(); }
318 bool seen_response() const { return seen_response_; }
321 // Reports client-side load stats according to a fixed interval.
322 class Reporter : public InternallyRefCounted<Reporter> {
324 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
325 : parent_(std::move(parent)), report_interval_(report_interval) {
326 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
327 grpc_schedule_on_exec_ctx);
328 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
329 grpc_schedule_on_exec_ctx);
330 ScheduleNextReportLocked();
333 void Orphan() override;
336 void ScheduleNextReportLocked();
337 static void OnNextReportTimer(void* arg, grpc_error* error);
338 bool OnNextReportTimerLocked(grpc_error* error);
339 bool SendReportLocked();
340 static void OnReportDone(void* arg, grpc_error* error);
341 bool OnReportDoneLocked(grpc_error* error);
343 bool IsCurrentReporterOnCall() const {
344 return this == parent_->reporter_.get();
346 XdsClient* xds_client() const { return parent_->xds_client(); }
348 // The owning LRS call.
349 RefCountedPtr<LrsCallState> parent_;
351 // The load reporting state.
352 const grpc_millis report_interval_;
353 bool last_report_counters_were_zero_ = false;
354 bool next_report_timer_callback_pending_ = false;
355 grpc_timer next_report_timer_;
356 grpc_closure on_next_report_timer_;
357 grpc_closure on_report_done_;
360 static void OnInitialRequestSent(void* arg, grpc_error* error);
361 void OnInitialRequestSentLocked();
362 static void OnResponseReceived(void* arg, grpc_error* error);
363 bool OnResponseReceivedLocked();
364 static void OnStatusReceived(void* arg, grpc_error* error);
365 void OnStatusReceivedLocked(grpc_error* error);
367 bool IsCurrentCallOnChannel() const;
369 // The owning RetryableCall<>.
370 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
371 bool seen_response_ = false;
376 // recv_initial_metadata
377 grpc_metadata_array initial_metadata_recv_;
380 grpc_byte_buffer* send_message_payload_ = nullptr;
381 grpc_closure on_initial_request_sent_;
384 grpc_byte_buffer* recv_message_payload_ = nullptr;
385 grpc_closure on_response_received_;
387 // recv_trailing_metadata
388 grpc_metadata_array trailing_metadata_recv_;
389 grpc_status_code status_code_;
390 grpc_slice status_details_;
391 grpc_closure on_status_received_;
393 // Load reporting state.
394 bool send_all_clusters_ = false;
395 std::set<std::string> cluster_names_; // Asked for by the LRS server.
396 grpc_millis load_reporting_interval_ = 0;
397 OrphanablePtr<Reporter> reporter_;
401 // XdsClient::ChannelState::StateWatcher
404 class XdsClient::ChannelState::StateWatcher
405 : public AsyncConnectivityStateWatcherInterface {
407 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
408 : parent_(std::move(parent)) {}
411 void OnConnectivityStateChange(grpc_connectivity_state new_state,
412 const absl::Status& status) override {
413 MutexLock lock(&parent_->xds_client_->mu_);
414 if (!parent_->shutting_down_ &&
415 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
416 // In TRANSIENT_FAILURE. Notify all watchers of error.
418 "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
419 "status_message:(%s)",
420 parent_->xds_client(), status.ToString().c_str());
421 parent_->xds_client()->NotifyOnErrorLocked(
422 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
423 "xds channel in TRANSIENT_FAILURE"));
427 RefCountedPtr<ChannelState> parent_;
431 // XdsClient::ChannelState
436 grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
437 // Build channel args.
438 absl::InlinedVector<grpc_arg, 2> args_to_add = {
439 grpc_channel_arg_integer_create(
440 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
441 5 * 60 * GPR_MS_PER_SEC),
442 grpc_channel_arg_integer_create(
443 const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
445 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
446 g_channel_args, args_to_add.data(), args_to_add.size());
447 // Create channel creds.
448 RefCountedPtr<grpc_channel_credentials> channel_creds =
449 XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
450 server.channel_creds_config);
452 grpc_channel* channel = grpc_secure_channel_create(
453 channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
454 grpc_channel_args_destroy(new_args);
460 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
461 const XdsBootstrap::XdsServer& server)
462 : InternallyRefCounted<ChannelState>(
463 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState"
465 xds_client_(std::move(xds_client)),
467 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
468 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
469 xds_client_.get(), server.server_uri.c_str());
471 channel_ = CreateXdsChannel(server);
472 GPR_ASSERT(channel_ != nullptr);
473 StartConnectivityWatchLocked();
476 XdsClient::ChannelState::~ChannelState() {
477 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
478 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
481 grpc_channel_destroy(channel_);
482 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
485 void XdsClient::ChannelState::Orphan() {
486 shutting_down_ = true;
487 CancelConnectivityWatchLocked();
490 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
493 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
495 return ads_calld_->calld();
498 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
500 return lrs_calld_->calld();
503 bool XdsClient::ChannelState::HasActiveAdsCall() const {
504 return ads_calld_->calld() != nullptr;
507 void XdsClient::ChannelState::MaybeStartLrsCall() {
508 if (lrs_calld_ != nullptr) return;
510 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
513 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
515 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
516 grpc_channel_element* client_channel_elem =
517 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
518 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
519 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
520 grpc_client_channel_start_connectivity_watch(
521 client_channel_elem, GRPC_CHANNEL_IDLE,
522 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
525 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
526 grpc_channel_element* client_channel_elem =
527 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
528 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
529 grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
532 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
533 const std::string& name) {
534 if (ads_calld_ == nullptr) {
535 // Start the ADS call if this is the first request.
536 ads_calld_.reset(new RetryableCall<AdsCallState>(
537 Ref(DEBUG_LOCATION, "ChannelState+ads")));
538 // Note: AdsCallState's ctor will automatically subscribe to all
539 // resources that the XdsClient already has watchers for, so we can
543 // If the ADS call is in backoff state, we don't need to do anything now
544 // because when the call is restarted it will resend all necessary requests.
545 if (ads_calld() == nullptr) return;
546 // Subscribe to this resource if the ADS call is active.
547 ads_calld()->Subscribe(type_url, name);
550 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
551 const std::string& name,
552 bool delay_unsubscription) {
553 if (ads_calld_ != nullptr) {
554 auto* calld = ads_calld_->calld();
555 if (calld != nullptr) {
556 calld->Unsubscribe(type_url, name, delay_unsubscription);
557 if (!calld->HasSubscribedResources()) ads_calld_.reset();
563 // XdsClient::ChannelState::RetryableCall<>
566 template <typename T>
567 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
568 RefCountedPtr<ChannelState> chand)
569 : chand_(std::move(chand)),
572 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
574 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
575 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
576 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
577 // Closure Initialization
578 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
579 grpc_schedule_on_exec_ctx);
580 StartNewCallLocked();
583 template <typename T>
584 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
585 shutting_down_ = true;
587 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
588 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
591 template <typename T>
592 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
593 const bool seen_response = calld_->seen_response();
596 // If we lost connection to the xds server, reset backoff and restart the
599 StartNewCallLocked();
601 // If we failed to connect to the xds server, retry later.
602 StartRetryTimerLocked();
606 template <typename T>
607 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
608 if (shutting_down_) return;
609 GPR_ASSERT(chand_->channel_ != nullptr);
610 GPR_ASSERT(calld_ == nullptr);
611 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
613 "[xds_client %p] Start new call from retryable call (chand: %p, "
614 "retryable call: %p)",
615 chand()->xds_client(), chand(), this);
617 calld_ = MakeOrphanable<T>(
618 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
621 template <typename T>
622 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
623 if (shutting_down_) return;
624 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
625 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
626 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
628 "[xds_client %p] Failed to connect to xds server (chand: %p) "
629 "retry timer will fire in %" PRId64 "ms.",
630 chand()->xds_client(), chand(), timeout);
632 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
633 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
634 retry_timer_callback_pending_ = true;
637 template <typename T>
638 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
639 void* arg, grpc_error* error) {
640 RetryableCall* calld = static_cast<RetryableCall*>(arg);
642 MutexLock lock(&calld->chand_->xds_client()->mu_);
643 calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
645 calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
648 template <typename T>
649 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
651 retry_timer_callback_pending_ = false;
652 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
653 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
656 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
657 chand()->xds_client(), chand(), this);
659 StartNewCallLocked();
661 GRPC_ERROR_UNREF(error);
665 // XdsClient::ChannelState::AdsCallState
668 XdsClient::ChannelState::AdsCallState::AdsCallState(
669 RefCountedPtr<RetryableCall<AdsCallState>> parent)
670 : InternallyRefCounted<AdsCallState>(
671 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "AdsCallState"
673 parent_(std::move(parent)) {
674 // Init the ADS call. Note that the call will progress every time there's
675 // activity in xds_client()->interested_parties_, which is comprised of
676 // the polling entities from client_channel.
677 GPR_ASSERT(xds_client() != nullptr);
678 // Create a call with the specified method name.
680 chand()->server_.ShouldUseV3()
681 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
682 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
683 call_ = grpc_channel_create_pollset_set_call(
684 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
685 xds_client()->interested_parties_, method, nullptr,
686 GRPC_MILLIS_INF_FUTURE, nullptr);
687 GPR_ASSERT(call_ != nullptr);
688 // Init data associated with the call.
689 grpc_metadata_array_init(&initial_metadata_recv_);
690 grpc_metadata_array_init(&trailing_metadata_recv_);
692 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
694 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
696 xds_client(), chand(), this, call_);
699 grpc_call_error call_error;
701 memset(ops, 0, sizeof(ops));
702 // Op: send initial metadata.
704 op->op = GRPC_OP_SEND_INITIAL_METADATA;
705 op->data.send_initial_metadata.count = 0;
706 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
707 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
708 op->reserved = nullptr;
710 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
712 GPR_ASSERT(GRPC_CALL_OK == call_error);
713 // Op: send request message.
714 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
715 grpc_schedule_on_exec_ctx);
716 for (const auto& p : xds_client()->listener_map_) {
717 Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
719 for (const auto& p : xds_client()->route_config_map_) {
720 Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
722 for (const auto& p : xds_client()->cluster_map_) {
723 Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
725 for (const auto& p : xds_client()->endpoint_map_) {
726 Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
728 // Op: recv initial metadata.
730 op->op = GRPC_OP_RECV_INITIAL_METADATA;
731 op->data.recv_initial_metadata.recv_initial_metadata =
732 &initial_metadata_recv_;
734 op->reserved = nullptr;
736 // Op: recv response.
737 op->op = GRPC_OP_RECV_MESSAGE;
738 op->data.recv_message.recv_message = &recv_message_payload_;
740 op->reserved = nullptr;
742 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
743 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
744 grpc_schedule_on_exec_ctx);
745 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
746 &on_response_received_);
747 GPR_ASSERT(GRPC_CALL_OK == call_error);
748 // Op: recv server status.
750 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
751 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
752 op->data.recv_status_on_client.status = &status_code_;
753 op->data.recv_status_on_client.status_details = &status_details_;
755 op->reserved = nullptr;
757 // This callback signals the end of the call, so it relies on the initial
758 // ref instead of a new ref. When it's invoked, it's the initial ref that is
760 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
761 grpc_schedule_on_exec_ctx);
762 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
763 &on_status_received_);
764 GPR_ASSERT(GRPC_CALL_OK == call_error);
767 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
768 grpc_metadata_array_destroy(&initial_metadata_recv_);
769 grpc_metadata_array_destroy(&trailing_metadata_recv_);
770 grpc_byte_buffer_destroy(send_message_payload_);
771 grpc_byte_buffer_destroy(recv_message_payload_);
772 grpc_slice_unref_internal(status_details_);
773 GPR_ASSERT(call_ != nullptr);
774 grpc_call_unref(call_);
777 void XdsClient::ChannelState::AdsCallState::Orphan() {
778 GPR_ASSERT(call_ != nullptr);
779 // If we are here because xds_client wants to cancel the call,
780 // on_status_received_ will complete the cancellation and clean up. Otherwise,
781 // we are here because xds_client has to orphan a failed call, then the
782 // following cancellation will be a no-op.
783 grpc_call_cancel_internal(call_);
785 // Note that the initial ref is hold by on_status_received_. So the
786 // corresponding unref happens in on_status_received_ instead of here.
789 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
790 const std::string& type_url) {
791 // Buffer message sending if an existing message is in flight.
792 if (send_message_payload_ != nullptr) {
793 buffered_requests_.insert(type_url);
796 auto& state = state_map_[type_url];
797 grpc_slice request_payload_slice;
798 std::set<absl::string_view> resource_names =
799 ResourceNamesForRequest(type_url);
800 request_payload_slice = xds_client()->api_.CreateAdsRequest(
801 chand()->server_, type_url, resource_names,
802 xds_client()->resource_version_map_[type_url], state.nonce,
803 GRPC_ERROR_REF(state.error), !sent_initial_message_);
804 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
805 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
806 state_map_.erase(type_url);
808 sent_initial_message_ = true;
809 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
811 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
812 "error=%s resources=%s",
813 xds_client(), type_url.c_str(),
814 xds_client()->resource_version_map_[type_url].c_str(),
815 state.nonce.c_str(), grpc_error_string(state.error),
816 absl::StrJoin(resource_names, " ").c_str());
818 GRPC_ERROR_UNREF(state.error);
819 state.error = GRPC_ERROR_NONE;
820 // Create message payload.
821 send_message_payload_ =
822 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
823 grpc_slice_unref_internal(request_payload_slice);
826 memset(&op, 0, sizeof(op));
827 op.op = GRPC_OP_SEND_MESSAGE;
828 op.data.send_message.send_message = send_message_payload_;
829 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
830 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
831 grpc_schedule_on_exec_ctx);
832 grpc_call_error call_error =
833 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
834 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
836 "[xds_client %p] calld=%p call_error=%d sending ADS message",
837 xds_client(), this, call_error);
838 GPR_ASSERT(GRPC_CALL_OK == call_error);
842 void XdsClient::ChannelState::AdsCallState::Subscribe(
843 const std::string& type_url, const std::string& name) {
844 auto& state = state_map_[type_url].subscribed_resources[name];
845 if (state == nullptr) {
846 state = MakeOrphanable<ResourceState>(
847 type_url, name, !xds_client()->resource_version_map_[type_url].empty());
848 SendMessageLocked(type_url);
852 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
853 const std::string& type_url, const std::string& name,
854 bool delay_unsubscription) {
855 state_map_[type_url].subscribed_resources.erase(name);
856 if (!delay_unsubscription) SendMessageLocked(type_url);
859 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
860 for (const auto& p : state_map_) {
861 if (!p.second.subscribed_resources.empty()) return true;
866 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
867 XdsApi::LdsUpdateMap lds_update_map) {
868 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
870 "[xds_client %p] LDS update received containing %" PRIuPTR
872 xds_client(), lds_update_map.size());
874 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
875 std::set<std::string> rds_resource_names_seen;
876 for (auto& p : lds_update_map) {
877 const std::string& listener_name = p.first;
878 XdsApi::LdsUpdate& lds_update = p.second;
879 auto& state = lds_state.subscribed_resources[listener_name];
880 if (state != nullptr) state->Finish();
881 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
882 gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
883 xds_client(), listener_name.c_str(),
884 (!lds_update.route_config_name.empty()
885 ? lds_update.route_config_name.c_str()
887 if (lds_update.rds_update.has_value()) {
888 gpr_log(GPR_INFO, "RouteConfiguration: %s",
889 lds_update.rds_update->ToString().c_str());
892 // Record the RDS resource names seen.
893 if (!lds_update.route_config_name.empty()) {
894 rds_resource_names_seen.insert(lds_update.route_config_name);
896 // Ignore identical update.
897 ListenerState& listener_state = xds_client()->listener_map_[listener_name];
898 if (listener_state.update.has_value() &&
899 *listener_state.update == lds_update) {
900 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
902 "[xds_client %p] LDS update for %s identical to current, "
904 xds_client(), listener_name.c_str());
908 // Update the listener state.
909 listener_state.update = std::move(lds_update);
911 for (const auto& p : listener_state.watchers) {
912 p.first->OnListenerChanged(*listener_state.update);
915 // For any subscribed resource that is not present in the update,
916 // remove it from the cache and notify watchers that it does not exist.
917 for (const auto& p : lds_state.subscribed_resources) {
918 const std::string& listener_name = p.first;
919 if (lds_update_map.find(listener_name) == lds_update_map.end()) {
920 ListenerState& listener_state =
921 xds_client()->listener_map_[listener_name];
922 // If the resource was newly requested but has not yet been received,
923 // we don't want to generate an error for the watchers, because this LDS
924 // response may be in reaction to an earlier request that did not yet
925 // request the new resource, so its absence from the response does not
926 // necessarily indicate that the resource does not exist.
927 // For that case, we rely on the request timeout instead.
928 if (!listener_state.update.has_value()) continue;
929 listener_state.update.reset();
930 for (const auto& p : listener_state.watchers) {
931 p.first->OnResourceDoesNotExist();
935 // For any RDS resource that is no longer referred to by any LDS
936 // resources, remove it from the cache and notify watchers that it
938 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
939 for (const auto& p : rds_state.subscribed_resources) {
940 const std::string& rds_resource_name = p.first;
941 if (rds_resource_names_seen.find(rds_resource_name) ==
942 rds_resource_names_seen.end()) {
943 RouteConfigState& route_config_state =
944 xds_client()->route_config_map_[rds_resource_name];
945 route_config_state.update.reset();
946 for (const auto& p : route_config_state.watchers) {
947 p.first->OnResourceDoesNotExist();
953 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
954 XdsApi::RdsUpdateMap rds_update_map) {
955 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
957 "[xds_client %p] RDS update received containing %" PRIuPTR
959 xds_client(), rds_update_map.size());
961 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
962 for (auto& p : rds_update_map) {
963 const std::string& route_config_name = p.first;
964 XdsApi::RdsUpdate& rds_update = p.second;
965 auto& state = rds_state.subscribed_resources[route_config_name];
966 if (state != nullptr) state->Finish();
967 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
968 gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
969 rds_update.ToString().c_str());
971 RouteConfigState& route_config_state =
972 xds_client()->route_config_map_[route_config_name];
973 // Ignore identical update.
974 if (route_config_state.update.has_value() &&
975 *route_config_state.update == rds_update) {
976 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
978 "[xds_client %p] RDS resource identical to current, ignoring",
984 route_config_state.update = std::move(rds_update);
985 // Notify all watchers.
986 for (const auto& p : route_config_state.watchers) {
987 p.first->OnRouteConfigChanged(*route_config_state.update);
992 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
993 XdsApi::CdsUpdateMap cds_update_map) {
994 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
996 "[xds_client %p] CDS update received containing %" PRIuPTR
998 xds_client(), cds_update_map.size());
1000 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1001 std::set<std::string> eds_resource_names_seen;
1002 for (auto& p : cds_update_map) {
1003 const char* cluster_name = p.first.c_str();
1004 XdsApi::CdsUpdate& cds_update = p.second;
1005 auto& state = cds_state.subscribed_resources[cluster_name];
1006 if (state != nullptr) state->Finish();
1007 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1009 "[xds_client %p] cluster=%s: eds_service_name=%s, "
1010 "lrs_load_reporting_server_name=%s",
1011 xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
1012 cds_update.lrs_load_reporting_server_name.has_value()
1013 ? cds_update.lrs_load_reporting_server_name.value().c_str()
1016 // Record the EDS resource names seen.
1017 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1019 : cds_update.eds_service_name);
1020 // Ignore identical update.
1021 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1022 if (cluster_state.update.has_value() &&
1023 *cluster_state.update == cds_update) {
1024 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1026 "[xds_client %p] CDS update identical to current, ignoring.",
1031 // Update the cluster state.
1032 cluster_state.update = std::move(cds_update);
1033 // Notify all watchers.
1034 for (const auto& p : cluster_state.watchers) {
1035 p.first->OnClusterChanged(cluster_state.update.value());
1038 // For any subscribed resource that is not present in the update,
1039 // remove it from the cache and notify watchers that it does not exist.
1040 for (const auto& p : cds_state.subscribed_resources) {
1041 const std::string& cluster_name = p.first;
1042 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1043 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1044 // If the resource was newly requested but has not yet been received,
1045 // we don't want to generate an error for the watchers, because this CDS
1046 // response may be in reaction to an earlier request that did not yet
1047 // request the new resource, so its absence from the response does not
1048 // necessarily indicate that the resource does not exist.
1049 // For that case, we rely on the request timeout instead.
1050 if (!cluster_state.update.has_value()) continue;
1051 cluster_state.update.reset();
1052 for (const auto& p : cluster_state.watchers) {
1053 p.first->OnResourceDoesNotExist();
1057 // For any EDS resource that is no longer referred to by any CDS
1058 // resources, remove it from the cache and notify watchers that it
1060 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1061 for (const auto& p : eds_state.subscribed_resources) {
1062 const std::string& eds_resource_name = p.first;
1063 if (eds_resource_names_seen.find(eds_resource_name) ==
1064 eds_resource_names_seen.end()) {
1065 EndpointState& endpoint_state =
1066 xds_client()->endpoint_map_[eds_resource_name];
1067 endpoint_state.update.reset();
1068 for (const auto& p : endpoint_state.watchers) {
1069 p.first->OnResourceDoesNotExist();
1075 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1076 XdsApi::EdsUpdateMap eds_update_map) {
1077 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1079 "[xds_client %p] EDS update received containing %" PRIuPTR
1081 xds_client(), eds_update_map.size());
1083 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1084 for (auto& p : eds_update_map) {
1085 const char* eds_service_name = p.first.c_str();
1086 XdsApi::EdsUpdate& eds_update = p.second;
1087 auto& state = eds_state.subscribed_resources[eds_service_name];
1088 if (state != nullptr) state->Finish();
1089 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1090 gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1091 eds_service_name, eds_update.ToString().c_str());
1093 EndpointState& endpoint_state =
1094 xds_client()->endpoint_map_[eds_service_name];
1095 // Ignore identical update.
1096 if (endpoint_state.update.has_value() &&
1097 *endpoint_state.update == eds_update) {
1098 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1100 "[xds_client %p] EDS update identical to current, ignoring.",
1105 // Update the cluster state.
1106 endpoint_state.update = std::move(eds_update);
1107 // Notify all watchers.
1108 for (const auto& p : endpoint_state.watchers) {
1109 p.first->OnEndpointChanged(endpoint_state.update.value());
1114 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1115 grpc_error* error) {
1116 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1118 MutexLock lock(&ads_calld->xds_client()->mu_);
1119 ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1121 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1124 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1125 grpc_error* error) {
1126 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1127 // Clean up the sent message.
1128 grpc_byte_buffer_destroy(send_message_payload_);
1129 send_message_payload_ = nullptr;
1130 // Continue to send another pending message if any.
1131 // TODO(roth): The current code to handle buffered messages has the
1132 // advantage of sending only the most recent list of resource names for
1133 // each resource type (no matter how many times that resource type has
1134 // been requested to send while the current message sending is still
1135 // pending). But its disadvantage is that we send the requests in fixed
1136 // order of resource types. We need to fix this if we are seeing some
1137 // resource type(s) starved due to frequent requests of other resource
1139 auto it = buffered_requests_.begin();
1140 if (it != buffered_requests_.end()) {
1141 SendMessageLocked(*it);
1142 buffered_requests_.erase(it);
1145 GRPC_ERROR_UNREF(error);
1148 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1149 void* arg, grpc_error* /* error */) {
1150 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1153 MutexLock lock(&ads_calld->xds_client()->mu_);
1154 done = ads_calld->OnResponseReceivedLocked();
1156 if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1159 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1160 // Empty payload means the call was cancelled.
1161 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1164 // Read the response.
1165 grpc_byte_buffer_reader bbr;
1166 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1167 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1168 grpc_byte_buffer_reader_destroy(&bbr);
1169 grpc_byte_buffer_destroy(recv_message_payload_);
1170 recv_message_payload_ = nullptr;
1171 // Parse and validate the response.
1172 XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1173 response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1174 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1175 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1176 ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1177 grpc_slice_unref_internal(response_slice);
1178 if (result.type_url.empty()) {
1179 // Ignore unparsable response.
1181 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1182 xds_client(), grpc_error_string(result.parse_error));
1183 GRPC_ERROR_UNREF(result.parse_error);
1186 auto& state = state_map_[result.type_url];
1187 state.nonce = std::move(result.nonce);
1188 // NACK or ACK the response.
1189 if (result.parse_error != GRPC_ERROR_NONE) {
1190 GRPC_ERROR_UNREF(state.error);
1191 state.error = result.parse_error;
1192 // NACK unacceptable update.
1194 "[xds_client %p] ADS response invalid for resource type %s "
1195 "version %s, will NACK: nonce=%s error=%s",
1196 xds_client(), result.type_url.c_str(), result.version.c_str(),
1197 state.nonce.c_str(), grpc_error_string(result.parse_error));
1198 SendMessageLocked(result.type_url);
1200 seen_response_ = true;
1201 // Accept the ADS response according to the type_url.
1202 if (result.type_url == XdsApi::kLdsTypeUrl) {
1203 AcceptLdsUpdate(std::move(result.lds_update_map));
1204 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1205 AcceptRdsUpdate(std::move(result.rds_update_map));
1206 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1207 AcceptCdsUpdate(std::move(result.cds_update_map));
1208 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1209 AcceptEdsUpdate(std::move(result.eds_update_map));
1211 xds_client()->resource_version_map_[result.type_url] =
1212 std::move(result.version);
1214 SendMessageLocked(result.type_url);
1215 // Start load reporting if needed.
1216 auto& lrs_call = chand()->lrs_calld_;
1217 if (lrs_call != nullptr) {
1218 LrsCallState* lrs_calld = lrs_call->calld();
1219 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1223 if (xds_client()->shutting_down_) return true;
1224 // Keep listening for updates.
1226 memset(&op, 0, sizeof(op));
1227 op.op = GRPC_OP_RECV_MESSAGE;
1228 op.data.recv_message.recv_message = &recv_message_payload_;
1230 op.reserved = nullptr;
1231 GPR_ASSERT(call_ != nullptr);
1232 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1233 const grpc_call_error call_error =
1234 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1235 GPR_ASSERT(GRPC_CALL_OK == call_error);
1239 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1240 void* arg, grpc_error* error) {
1241 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1243 MutexLock lock(&ads_calld->xds_client()->mu_);
1244 ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1246 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1249 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1250 grpc_error* error) {
1251 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1252 char* status_details = grpc_slice_to_c_string(status_details_);
1254 "[xds_client %p] ADS call status received. Status = %d, details "
1255 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1256 xds_client(), status_code_, status_details, chand(), this, call_,
1257 grpc_error_string(error));
1258 gpr_free(status_details);
1260 // Ignore status from a stale call.
1261 if (IsCurrentCallOnChannel()) {
1262 // Try to restart the call.
1263 parent_->OnCallFinishedLocked();
1264 // Send error to all watchers.
1265 xds_client()->NotifyOnErrorLocked(
1266 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1268 GRPC_ERROR_UNREF(error);
1271 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1272 // If the retryable ADS call is null (which only happens when the xds channel
1273 // is shutting down), all the ADS calls are stale.
1274 if (chand()->ads_calld_ == nullptr) return false;
1275 return this == chand()->ads_calld_->calld();
1278 std::set<absl::string_view>
1279 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1280 const std::string& type_url) {
1281 std::set<absl::string_view> resource_names;
1282 auto it = state_map_.find(type_url);
1283 if (it != state_map_.end()) {
1284 for (auto& p : it->second.subscribed_resources) {
1285 resource_names.insert(p.first);
1286 OrphanablePtr<ResourceState>& state = p.second;
1287 state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1290 return resource_names;
1294 // XdsClient::ChannelState::LrsCallState::Reporter
1297 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1298 if (next_report_timer_callback_pending_) {
1299 grpc_timer_cancel(&next_report_timer_);
1303 void XdsClient::ChannelState::LrsCallState::Reporter::
1304 ScheduleNextReportLocked() {
1305 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1306 grpc_timer_init(&next_report_timer_, next_report_time,
1307 &on_next_report_timer_);
1308 next_report_timer_callback_pending_ = true;
1311 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1312 void* arg, grpc_error* error) {
1313 Reporter* self = static_cast<Reporter*>(arg);
1316 MutexLock lock(&self->xds_client()->mu_);
1317 done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1319 if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1322 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1323 grpc_error* error) {
1324 next_report_timer_callback_pending_ = false;
1325 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1326 GRPC_ERROR_UNREF(error);
1329 return SendReportLocked();
1334 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1335 for (const auto& p : snapshot) {
1336 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1337 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1338 for (const auto& q : cluster_snapshot.locality_stats) {
1339 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1340 if (!locality_snapshot.IsZero()) return false;
1348 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1349 // Construct snapshot from all reported stats.
1350 XdsApi::ClusterLoadReportMap snapshot =
1351 xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1352 parent_->cluster_names_);
1353 // Skip client load report if the counters were all zero in the last
1354 // report and they are still zero in this one.
1355 const bool old_val = last_report_counters_were_zero_;
1356 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1357 if (old_val && last_report_counters_were_zero_) {
1358 if (xds_client()->load_report_map_.empty()) {
1359 parent_->chand()->StopLrsCall();
1362 ScheduleNextReportLocked();
1365 // Create a request that contains the snapshot.
1366 grpc_slice request_payload_slice =
1367 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1368 parent_->send_message_payload_ =
1369 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1370 grpc_slice_unref_internal(request_payload_slice);
1373 memset(&op, 0, sizeof(op));
1374 op.op = GRPC_OP_SEND_MESSAGE;
1375 op.data.send_message.send_message = parent_->send_message_payload_;
1376 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1377 parent_->call_, &op, 1, &on_report_done_);
1378 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1380 "[xds_client %p] calld=%p call_error=%d sending client load report",
1381 xds_client(), this, call_error);
1382 GPR_ASSERT(GRPC_CALL_OK == call_error);
1387 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1388 void* arg, grpc_error* error) {
1389 Reporter* self = static_cast<Reporter*>(arg);
1392 MutexLock lock(&self->xds_client()->mu_);
1393 done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1395 if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1398 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1399 grpc_error* error) {
1400 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1401 parent_->send_message_payload_ = nullptr;
1402 // If there are no more registered stats to report, cancel the call.
1403 if (xds_client()->load_report_map_.empty()) {
1404 parent_->chand()->StopLrsCall();
1405 GRPC_ERROR_UNREF(error);
1408 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1409 GRPC_ERROR_UNREF(error);
1410 // If this reporter is no longer the current one on the call, the reason
1411 // might be that it was orphaned for a new one due to config update.
1412 if (!IsCurrentReporterOnCall()) {
1413 parent_->MaybeStartReportingLocked();
1417 ScheduleNextReportLocked();
1422 // XdsClient::ChannelState::LrsCallState
1425 XdsClient::ChannelState::LrsCallState::LrsCallState(
1426 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1427 : InternallyRefCounted<LrsCallState>(
1428 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "LrsCallState"
1430 parent_(std::move(parent)) {
1431 // Init the LRS call. Note that the call will progress every time there's
1432 // activity in xds_client()->interested_parties_, which is comprised of
1433 // the polling entities from client_channel.
1434 GPR_ASSERT(xds_client() != nullptr);
1435 const auto& method =
1436 chand()->server_.ShouldUseV3()
1437 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1438 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1439 call_ = grpc_channel_create_pollset_set_call(
1440 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1441 xds_client()->interested_parties_, method, nullptr,
1442 GRPC_MILLIS_INF_FUTURE, nullptr);
1443 GPR_ASSERT(call_ != nullptr);
1444 // Init the request payload.
1445 grpc_slice request_payload_slice =
1446 xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1447 send_message_payload_ =
1448 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1449 grpc_slice_unref_internal(request_payload_slice);
1450 // Init other data associated with the LRS call.
1451 grpc_metadata_array_init(&initial_metadata_recv_);
1452 grpc_metadata_array_init(&trailing_metadata_recv_);
1454 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1456 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1458 xds_client(), chand(), this, call_);
1461 grpc_call_error call_error;
1463 memset(ops, 0, sizeof(ops));
1464 // Op: send initial metadata.
1466 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1467 op->data.send_initial_metadata.count = 0;
1468 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1469 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1470 op->reserved = nullptr;
1472 // Op: send request message.
1473 GPR_ASSERT(send_message_payload_ != nullptr);
1474 op->op = GRPC_OP_SEND_MESSAGE;
1475 op->data.send_message.send_message = send_message_payload_;
1477 op->reserved = nullptr;
1479 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1480 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1481 grpc_schedule_on_exec_ctx);
1482 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1483 &on_initial_request_sent_);
1484 GPR_ASSERT(GRPC_CALL_OK == call_error);
1485 // Op: recv initial metadata.
1487 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1488 op->data.recv_initial_metadata.recv_initial_metadata =
1489 &initial_metadata_recv_;
1491 op->reserved = nullptr;
1493 // Op: recv response.
1494 op->op = GRPC_OP_RECV_MESSAGE;
1495 op->data.recv_message.recv_message = &recv_message_payload_;
1497 op->reserved = nullptr;
1499 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1500 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1501 grpc_schedule_on_exec_ctx);
1502 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1503 &on_response_received_);
1504 GPR_ASSERT(GRPC_CALL_OK == call_error);
1505 // Op: recv server status.
1507 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1508 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1509 op->data.recv_status_on_client.status = &status_code_;
1510 op->data.recv_status_on_client.status_details = &status_details_;
1512 op->reserved = nullptr;
1514 // This callback signals the end of the call, so it relies on the initial
1515 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1517 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1518 grpc_schedule_on_exec_ctx);
1519 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1520 &on_status_received_);
1521 GPR_ASSERT(GRPC_CALL_OK == call_error);
1524 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1525 grpc_metadata_array_destroy(&initial_metadata_recv_);
1526 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1527 grpc_byte_buffer_destroy(send_message_payload_);
1528 grpc_byte_buffer_destroy(recv_message_payload_);
1529 grpc_slice_unref_internal(status_details_);
1530 GPR_ASSERT(call_ != nullptr);
1531 grpc_call_unref(call_);
1534 void XdsClient::ChannelState::LrsCallState::Orphan() {
1536 GPR_ASSERT(call_ != nullptr);
1537 // If we are here because xds_client wants to cancel the call,
1538 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1539 // we are here because xds_client has to orphan a failed call, then the
1540 // following cancellation will be a no-op.
1541 grpc_call_cancel_internal(call_);
1542 // Note that the initial ref is hold by on_status_received_. So the
1543 // corresponding unref happens in on_status_received_ instead of here.
1546 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1547 // Don't start again if already started.
1548 if (reporter_ != nullptr) return;
1549 // Don't start if the previous send_message op (of the initial request or the
1550 // last report of the previous reporter) hasn't completed.
1551 if (send_message_payload_ != nullptr) return;
1552 // Don't start if no LRS response has arrived.
1553 if (!seen_response()) return;
1554 // Don't start if the ADS call hasn't received any valid response. Note that
1555 // this must be the first channel because it is the current channel but its
1556 // ADS call hasn't seen any response.
1557 if (chand()->ads_calld_ == nullptr ||
1558 chand()->ads_calld_->calld() == nullptr ||
1559 !chand()->ads_calld_->calld()->seen_response()) {
1563 reporter_ = MakeOrphanable<Reporter>(
1564 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1567 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1568 void* arg, grpc_error* /*error*/) {
1569 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1571 MutexLock lock(&lrs_calld->xds_client()->mu_);
1572 lrs_calld->OnInitialRequestSentLocked();
1574 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1577 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1578 // Clear the send_message_payload_.
1579 grpc_byte_buffer_destroy(send_message_payload_);
1580 send_message_payload_ = nullptr;
1581 MaybeStartReportingLocked();
1584 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1585 void* arg, grpc_error* /*error*/) {
1586 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1589 MutexLock lock(&lrs_calld->xds_client()->mu_);
1590 done = lrs_calld->OnResponseReceivedLocked();
1592 if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1595 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1596 // Empty payload means the call was cancelled.
1597 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1600 // Read the response.
1601 grpc_byte_buffer_reader bbr;
1602 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1603 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1604 grpc_byte_buffer_reader_destroy(&bbr);
1605 grpc_byte_buffer_destroy(recv_message_payload_);
1606 recv_message_payload_ = nullptr;
1607 // This anonymous lambda is a hack to avoid the usage of goto.
1609 // Parse the response.
1610 bool send_all_clusters = false;
1611 std::set<std::string> new_cluster_names;
1612 grpc_millis new_load_reporting_interval;
1613 grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1614 response_slice, &send_all_clusters, &new_cluster_names,
1615 &new_load_reporting_interval);
1616 if (parse_error != GRPC_ERROR_NONE) {
1618 "[xds_client %p] LRS response parsing failed. error=%s",
1619 xds_client(), grpc_error_string(parse_error));
1620 GRPC_ERROR_UNREF(parse_error);
1623 seen_response_ = true;
1624 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1627 "[xds_client %p] LRS response received, %" PRIuPTR
1628 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1630 xds_client(), new_cluster_names.size(), send_all_clusters,
1631 new_load_reporting_interval);
1633 for (const auto& name : new_cluster_names) {
1634 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1635 xds_client(), i++, name.c_str());
1638 if (new_load_reporting_interval <
1639 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1640 new_load_reporting_interval =
1641 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1642 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1644 "[xds_client %p] Increased load_report_interval to minimum "
1646 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1649 // Ignore identical update.
1650 if (send_all_clusters == send_all_clusters_ &&
1651 cluster_names_ == new_cluster_names &&
1652 load_reporting_interval_ == new_load_reporting_interval) {
1653 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1655 "[xds_client %p] Incoming LRS response identical to current, "
1661 // Stop current load reporting (if any) to adopt the new config.
1663 // Record the new config.
1664 send_all_clusters_ = send_all_clusters;
1665 cluster_names_ = std::move(new_cluster_names);
1666 load_reporting_interval_ = new_load_reporting_interval;
1667 // Try starting sending load report.
1668 MaybeStartReportingLocked();
1670 grpc_slice_unref_internal(response_slice);
1671 if (xds_client()->shutting_down_) return true;
1672 // Keep listening for LRS config updates.
1674 memset(&op, 0, sizeof(op));
1675 op.op = GRPC_OP_RECV_MESSAGE;
1676 op.data.recv_message.recv_message = &recv_message_payload_;
1678 op.reserved = nullptr;
1679 GPR_ASSERT(call_ != nullptr);
1680 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1681 const grpc_call_error call_error =
1682 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1683 GPR_ASSERT(GRPC_CALL_OK == call_error);
1687 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1688 void* arg, grpc_error* error) {
1689 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1691 MutexLock lock(&lrs_calld->xds_client()->mu_);
1692 lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1694 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1697 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1698 grpc_error* error) {
1699 GPR_ASSERT(call_ != nullptr);
1700 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1701 char* status_details = grpc_slice_to_c_string(status_details_);
1703 "[xds_client %p] LRS call status received. Status = %d, details "
1704 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1705 xds_client(), status_code_, status_details, chand(), this, call_,
1706 grpc_error_string(error));
1707 gpr_free(status_details);
1709 // Ignore status from a stale call.
1710 if (IsCurrentCallOnChannel()) {
1711 GPR_ASSERT(!xds_client()->shutting_down_);
1712 // Try to restart the call.
1713 parent_->OnCallFinishedLocked();
1715 GRPC_ERROR_UNREF(error);
1718 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1719 // If the retryable LRS call is null (which only happens when the xds channel
1720 // is shutting down), all the LRS calls are stale.
1721 if (chand()->lrs_calld_ == nullptr) return false;
1722 return this == chand()->lrs_calld_->calld();
1731 grpc_millis GetRequestTimeout() {
1732 return grpc_channel_args_find_integer(
1733 g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1734 {15000, 0, INT_MAX});
1739 XdsClient::XdsClient(grpc_error** error)
1740 : DualRefCounted<XdsClient>(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)
1743 request_timeout_(GetRequestTimeout()),
1744 interested_parties_(grpc_pollset_set_create()),
1746 XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1747 api_(this, &grpc_xds_client_trace,
1748 bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
1749 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1750 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1752 if (*error != GRPC_ERROR_NONE) {
1753 gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1754 this, grpc_error_string(*error));
1757 // Create ChannelState object.
1758 chand_ = MakeOrphanable<ChannelState>(
1759 WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1762 XdsClient::~XdsClient() {
1763 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1764 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1766 grpc_pollset_set_destroy(interested_parties_);
1769 void XdsClient::AddChannelzLinkage(
1770 channelz::ChannelNode* parent_channelz_node) {
1771 channelz::ChannelNode* xds_channelz_node =
1772 grpc_channel_get_channelz_node(chand_->channel());
1773 if (xds_channelz_node != nullptr) {
1774 parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1778 void XdsClient::RemoveChannelzLinkage(
1779 channelz::ChannelNode* parent_channelz_node) {
1780 channelz::ChannelNode* xds_channelz_node =
1781 grpc_channel_get_channelz_node(chand_->channel());
1782 if (xds_channelz_node != nullptr) {
1783 parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1787 void XdsClient::Orphan() {
1788 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1789 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1792 MutexLock lock(g_mu);
1793 if (g_xds_client == this) g_xds_client = nullptr;
1796 MutexLock lock(&mu_);
1797 shutting_down_ = true;
1798 // Orphan ChannelState object.
1800 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1801 // created by the XdsResolver because the maps contain refs for watchers
1802 // which in turn hold refs to the loadbalancing policies. At this point, it
1803 // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1804 // policies before those calls are done would lead to issues such as
1805 // https://github.com/grpc/grpc/issues/20928.
1806 if (!listener_map_.empty()) {
1807 cluster_map_.clear();
1808 endpoint_map_.clear();
1813 void XdsClient::WatchListenerData(
1814 absl::string_view listener_name,
1815 std::unique_ptr<ListenerWatcherInterface> watcher) {
1816 std::string listener_name_str = std::string(listener_name);
1817 MutexLock lock(&mu_);
1818 ListenerState& listener_state = listener_map_[listener_name_str];
1819 ListenerWatcherInterface* w = watcher.get();
1820 listener_state.watchers[w] = std::move(watcher);
1821 // If we've already received an LDS update, notify the new watcher
1823 if (listener_state.update.has_value()) {
1824 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1825 gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1826 this, listener_name_str.c_str());
1828 w->OnListenerChanged(*listener_state.update);
1830 chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1833 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1834 ListenerWatcherInterface* watcher,
1835 bool delay_unsubscription) {
1836 MutexLock lock(&mu_);
1837 if (shutting_down_) return;
1838 std::string listener_name_str = std::string(listener_name);
1839 ListenerState& listener_state = listener_map_[listener_name_str];
1840 auto it = listener_state.watchers.find(watcher);
1841 if (it != listener_state.watchers.end()) {
1842 listener_state.watchers.erase(it);
1843 if (listener_state.watchers.empty()) {
1844 listener_map_.erase(listener_name_str);
1845 chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1846 delay_unsubscription);
1851 void XdsClient::WatchRouteConfigData(
1852 absl::string_view route_config_name,
1853 std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1854 std::string route_config_name_str = std::string(route_config_name);
1855 MutexLock lock(&mu_);
1856 RouteConfigState& route_config_state =
1857 route_config_map_[route_config_name_str];
1858 RouteConfigWatcherInterface* w = watcher.get();
1859 route_config_state.watchers[w] = std::move(watcher);
1860 // If we've already received an RDS update, notify the new watcher
1862 if (route_config_state.update.has_value()) {
1863 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1865 "[xds_client %p] returning cached route config data for %s", this,
1866 route_config_name_str.c_str());
1868 w->OnRouteConfigChanged(*route_config_state.update);
1870 chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1873 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1874 RouteConfigWatcherInterface* watcher,
1875 bool delay_unsubscription) {
1876 MutexLock lock(&mu_);
1877 if (shutting_down_) return;
1878 std::string route_config_name_str = std::string(route_config_name);
1879 RouteConfigState& route_config_state =
1880 route_config_map_[route_config_name_str];
1881 auto it = route_config_state.watchers.find(watcher);
1882 if (it != route_config_state.watchers.end()) {
1883 route_config_state.watchers.erase(it);
1884 if (route_config_state.watchers.empty()) {
1885 route_config_map_.erase(route_config_name_str);
1886 chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1887 delay_unsubscription);
1892 void XdsClient::WatchClusterData(
1893 absl::string_view cluster_name,
1894 std::unique_ptr<ClusterWatcherInterface> watcher) {
1895 std::string cluster_name_str = std::string(cluster_name);
1896 MutexLock lock(&mu_);
1897 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1898 ClusterWatcherInterface* w = watcher.get();
1899 cluster_state.watchers[w] = std::move(watcher);
1900 // If we've already received a CDS update, notify the new watcher
1902 if (cluster_state.update.has_value()) {
1903 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1904 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1905 this, cluster_name_str.c_str());
1907 w->OnClusterChanged(cluster_state.update.value());
1909 chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1912 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1913 ClusterWatcherInterface* watcher,
1914 bool delay_unsubscription) {
1915 MutexLock lock(&mu_);
1916 if (shutting_down_) return;
1917 std::string cluster_name_str = std::string(cluster_name);
1918 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1919 auto it = cluster_state.watchers.find(watcher);
1920 if (it != cluster_state.watchers.end()) {
1921 cluster_state.watchers.erase(it);
1922 if (cluster_state.watchers.empty()) {
1923 cluster_map_.erase(cluster_name_str);
1924 chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1925 delay_unsubscription);
1930 void XdsClient::WatchEndpointData(
1931 absl::string_view eds_service_name,
1932 std::unique_ptr<EndpointWatcherInterface> watcher) {
1933 std::string eds_service_name_str = std::string(eds_service_name);
1934 MutexLock lock(&mu_);
1935 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1936 EndpointWatcherInterface* w = watcher.get();
1937 endpoint_state.watchers[w] = std::move(watcher);
1938 // If we've already received an EDS update, notify the new watcher
1940 if (endpoint_state.update.has_value()) {
1941 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1942 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1943 this, eds_service_name_str.c_str());
1945 w->OnEndpointChanged(endpoint_state.update.value());
1947 chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1950 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1951 EndpointWatcherInterface* watcher,
1952 bool delay_unsubscription) {
1953 MutexLock lock(&mu_);
1954 if (shutting_down_) return;
1955 std::string eds_service_name_str = std::string(eds_service_name);
1956 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1957 auto it = endpoint_state.watchers.find(watcher);
1958 if (it != endpoint_state.watchers.end()) {
1959 endpoint_state.watchers.erase(it);
1960 if (endpoint_state.watchers.empty()) {
1961 endpoint_map_.erase(eds_service_name_str);
1962 chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1963 delay_unsubscription);
1968 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1969 absl::string_view lrs_server, absl::string_view cluster_name,
1970 absl::string_view eds_service_name) {
1971 // TODO(roth): When we add support for direct federation, use the
1972 // server name specified in lrs_server.
1974 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1975 MutexLock lock(&mu_);
1976 // We jump through some hoops here to make sure that the absl::string_views
1977 // stored in the XdsClusterDropStats object point to the strings
1978 // in the load_report_map_ key, so that they have the same lifetime.
1979 auto it = load_report_map_
1980 .emplace(std::make_pair(std::move(key), LoadReportState()))
1982 LoadReportState& load_report_state = it->second;
1983 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1984 if (load_report_state.drop_stats != nullptr) {
1985 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1987 if (cluster_drop_stats == nullptr) {
1988 if (load_report_state.drop_stats != nullptr) {
1989 load_report_state.deleted_drop_stats +=
1990 load_report_state.drop_stats->GetSnapshotAndReset();
1992 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1993 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1994 it->first.first /*cluster_name*/,
1995 it->first.second /*eds_service_name*/);
1996 load_report_state.drop_stats = cluster_drop_stats.get();
1998 chand_->MaybeStartLrsCall();
1999 return cluster_drop_stats;
2002 void XdsClient::RemoveClusterDropStats(
2003 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2004 absl::string_view eds_service_name,
2005 XdsClusterDropStats* cluster_drop_stats) {
2006 MutexLock lock(&mu_);
2007 // TODO(roth): When we add support for direct federation, use the
2008 // server name specified in lrs_server.
2009 auto it = load_report_map_.find(
2010 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2011 if (it == load_report_map_.end()) return;
2012 LoadReportState& load_report_state = it->second;
2013 if (load_report_state.drop_stats == cluster_drop_stats) {
2014 // Record final snapshot in deleted_drop_stats, which will be
2015 // added to the next load report.
2016 load_report_state.deleted_drop_stats +=
2017 load_report_state.drop_stats->GetSnapshotAndReset();
2018 load_report_state.drop_stats = nullptr;
2022 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2023 absl::string_view lrs_server, absl::string_view cluster_name,
2024 absl::string_view eds_service_name,
2025 RefCountedPtr<XdsLocalityName> locality) {
2026 // TODO(roth): When we add support for direct federation, use the
2027 // server name specified in lrs_server.
2029 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2030 MutexLock lock(&mu_);
2031 // We jump through some hoops here to make sure that the absl::string_views
2032 // stored in the XdsClusterLocalityStats object point to the strings
2033 // in the load_report_map_ key, so that they have the same lifetime.
2034 auto it = load_report_map_
2035 .emplace(std::make_pair(std::move(key), LoadReportState()))
2037 LoadReportState& load_report_state = it->second;
2038 LoadReportState::LocalityState& locality_state =
2039 load_report_state.locality_stats[locality];
2040 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2041 if (locality_state.locality_stats != nullptr) {
2042 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2044 if (cluster_locality_stats == nullptr) {
2045 if (locality_state.locality_stats != nullptr) {
2046 locality_state.deleted_locality_stats +=
2047 locality_state.locality_stats->GetSnapshotAndReset();
2049 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2050 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2051 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2052 std::move(locality));
2053 locality_state.locality_stats = cluster_locality_stats.get();
2055 chand_->MaybeStartLrsCall();
2056 return cluster_locality_stats;
2059 void XdsClient::RemoveClusterLocalityStats(
2060 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2061 absl::string_view eds_service_name,
2062 const RefCountedPtr<XdsLocalityName>& locality,
2063 XdsClusterLocalityStats* cluster_locality_stats) {
2064 MutexLock lock(&mu_);
2065 // TODO(roth): When we add support for direct federation, use the
2066 // server name specified in lrs_server.
2067 auto it = load_report_map_.find(
2068 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2069 if (it == load_report_map_.end()) return;
2070 LoadReportState& load_report_state = it->second;
2071 auto locality_it = load_report_state.locality_stats.find(locality);
2072 if (locality_it == load_report_state.locality_stats.end()) return;
2073 LoadReportState::LocalityState& locality_state = locality_it->second;
2074 if (locality_state.locality_stats == cluster_locality_stats) {
2075 // Record final snapshot in deleted_locality_stats, which will be
2076 // added to the next load report.
2077 locality_state.deleted_locality_stats +=
2078 locality_state.locality_stats->GetSnapshotAndReset();
2079 locality_state.locality_stats = nullptr;
2083 void XdsClient::ResetBackoff() {
2084 MutexLock lock(&mu_);
2085 if (chand_ != nullptr) {
2086 grpc_channel_reset_connect_backoff(chand_->channel());
2090 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2091 for (const auto& p : listener_map_) {
2092 const ListenerState& listener_state = p.second;
2093 for (const auto& p : listener_state.watchers) {
2094 p.first->OnError(GRPC_ERROR_REF(error));
2097 for (const auto& p : route_config_map_) {
2098 const RouteConfigState& route_config_state = p.second;
2099 for (const auto& p : route_config_state.watchers) {
2100 p.first->OnError(GRPC_ERROR_REF(error));
2103 for (const auto& p : cluster_map_) {
2104 const ClusterState& cluster_state = p.second;
2105 for (const auto& p : cluster_state.watchers) {
2106 p.first->OnError(GRPC_ERROR_REF(error));
2109 for (const auto& p : endpoint_map_) {
2110 const EndpointState& endpoint_state = p.second;
2111 for (const auto& p : endpoint_state.watchers) {
2112 p.first->OnError(GRPC_ERROR_REF(error));
2115 GRPC_ERROR_UNREF(error);
2118 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2119 bool send_all_clusters, const std::set<std::string>& clusters) {
2120 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2121 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2123 XdsApi::ClusterLoadReportMap snapshot_map;
2124 for (auto load_report_it = load_report_map_.begin();
2125 load_report_it != load_report_map_.end();) {
2126 // Cluster key is cluster and EDS service name.
2127 const auto& cluster_key = load_report_it->first;
2128 LoadReportState& load_report = load_report_it->second;
2129 // If the CDS response for a cluster indicates to use LRS but the
2130 // LRS server does not say that it wants reports for this cluster,
2131 // then we'll have stats objects here whose data we're not going to
2132 // include in the load report. However, we still need to clear out
2133 // the data from the stats objects, so that if the LRS server starts
2134 // asking for the data in the future, we don't incorrectly include
2135 // data from previous reporting intervals in that future report.
2136 const bool record_stats =
2137 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2138 XdsApi::ClusterLoadReport snapshot;
2139 // Aggregate drop stats.
2140 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2141 if (load_report.drop_stats != nullptr) {
2142 snapshot.dropped_requests +=
2143 load_report.drop_stats->GetSnapshotAndReset();
2144 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2146 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2147 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2148 load_report.drop_stats);
2151 // Aggregate locality stats.
2152 for (auto it = load_report.locality_stats.begin();
2153 it != load_report.locality_stats.end();) {
2154 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2155 auto& locality_state = it->second;
2156 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2157 snapshot.locality_stats[locality_name];
2158 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2159 if (locality_state.locality_stats != nullptr) {
2160 locality_snapshot +=
2161 locality_state.locality_stats->GetSnapshotAndReset();
2162 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2164 "[xds_client %p] cluster=%s eds_service_name=%s "
2165 "locality=%s locality_stats=%p",
2166 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2167 locality_name->AsHumanReadableString().c_str(),
2168 locality_state.locality_stats);
2171 // If the only thing left in this entry was final snapshots from
2172 // deleted locality stats objects, remove the entry.
2173 if (locality_state.locality_stats == nullptr) {
2174 it = load_report.locality_stats.erase(it);
2179 // Compute load report interval.
2180 const grpc_millis now = ExecCtx::Get()->Now();
2181 snapshot.load_report_interval = now - load_report.last_report_time;
2182 load_report.last_report_time = now;
2185 snapshot_map[cluster_key] = std::move(snapshot);
2187 // If the only thing left in this entry was final snapshots from
2188 // deleted stats objects, remove the entry.
2189 if (load_report.locality_stats.empty() &&
2190 load_report.drop_stats == nullptr) {
2191 load_report_it = load_report_map_.erase(load_report_it);
2196 return snapshot_map;
2200 // accessors for global state
2203 void XdsClientGlobalInit() { g_mu = new Mutex; }
2205 void XdsClientGlobalShutdown() {
2210 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2211 MutexLock lock(g_mu);
2212 if (g_xds_client != nullptr) {
2213 auto xds_client = g_xds_client->RefIfNonZero();
2214 if (xds_client != nullptr) return xds_client;
2216 auto xds_client = MakeRefCounted<XdsClient>(error);
2217 g_xds_client = xds_client.get();
2221 namespace internal {
2223 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2224 MutexLock lock(g_mu);
2225 g_channel_args = args;
2228 void UnsetGlobalXdsClientForTest() {
2229 MutexLock lock(g_mu);
2230 g_xds_client = nullptr;
2233 } // namespace internal
2235 } // namespace grpc_core