Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / ext / xds / xds_client.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/map.h"
46 #include "src/core/lib/gprpp/memory.h"
47 #include "src/core/lib/gprpp/orphanable.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/iomgr/sockaddr.h"
51 #include "src/core/lib/iomgr/sockaddr_utils.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/security/credentials/credentials.h"
54 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
55 #include "src/core/lib/slice/slice_internal.h"
56 #include "src/core/lib/slice/slice_string_helpers.h"
57 #include "src/core/lib/surface/call.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/surface/channel_init.h"
60 #include "src/core/lib/transport/static_metadata.h"
61
62 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
63 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
64 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
65 #define GRPC_XDS_RECONNECT_JITTER 0.2
66 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
67
68 namespace grpc_core {
69
70 TraceFlag grpc_xds_client_trace(false, "xds_client");
71
72 namespace {
73
74 Mutex* g_mu = nullptr;
75 const grpc_channel_args* g_channel_args = nullptr;
76 XdsClient* g_xds_client = nullptr;
77
78 }  // namespace
79
80 //
81 // Internal class declarations
82 //
83
84 // An xds call wrapper that can restart a call upon failure. Holds a ref to
85 // the xds channel. The template parameter is the kind of wrapped xds call.
86 template <typename T>
87 class XdsClient::ChannelState::RetryableCall
88     : public InternallyRefCounted<RetryableCall<T>> {
89  public:
90   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
91
92   void Orphan() override;
93
94   void OnCallFinishedLocked();
95
96   T* calld() const { return calld_.get(); }
97   ChannelState* chand() const { return chand_.get(); }
98
99   bool IsCurrentCallOnChannel() const;
100
101  private:
102   void StartNewCallLocked();
103   void StartRetryTimerLocked();
104   static void OnRetryTimer(void* arg, grpc_error* error);
105   void OnRetryTimerLocked(grpc_error* error);
106
107   // The wrapped xds call that talks to the xds server. It's instantiated
108   // every time we start a new call. It's null during call retry backoff.
109   OrphanablePtr<T> calld_;
110   // The owning xds channel.
111   RefCountedPtr<ChannelState> chand_;
112
113   // Retry state.
114   BackOff backoff_;
115   grpc_timer retry_timer_;
116   grpc_closure on_retry_timer_;
117   bool retry_timer_callback_pending_ = false;
118
119   bool shutting_down_ = false;
120 };
121
122 // Contains an ADS call to the xds server.
123 class XdsClient::ChannelState::AdsCallState
124     : public InternallyRefCounted<AdsCallState> {
125  public:
126   // The ctor and dtor should not be used directly.
127   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
128   ~AdsCallState() override;
129
130   void Orphan() override;
131
132   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
133   ChannelState* chand() const { return parent_->chand(); }
134   XdsClient* xds_client() const { return chand()->xds_client(); }
135   bool seen_response() const { return seen_response_; }
136
137   void Subscribe(const std::string& type_url, const std::string& name);
138   void Unsubscribe(const std::string& type_url, const std::string& name,
139                    bool delay_unsubscription);
140
141   bool HasSubscribedResources() const;
142
143  private:
144   class ResourceState : public InternallyRefCounted<ResourceState> {
145    public:
146     ResourceState(const std::string& type_url, const std::string& name)
147         : type_url_(type_url), name_(name) {
148       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
149                         grpc_schedule_on_exec_ctx);
150     }
151
152     void Orphan() override {
153       Finish();
154       Unref(DEBUG_LOCATION, "Orphan");
155     }
156
157     void Start(RefCountedPtr<AdsCallState> ads_calld) {
158       if (sent_) return;
159       sent_ = true;
160       ads_calld_ = std::move(ads_calld);
161       Ref(DEBUG_LOCATION, "timer").release();
162       timer_pending_ = true;
163       grpc_timer_init(
164           &timer_,
165           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
166           &timer_callback_);
167     }
168
169     void Finish() {
170       if (timer_pending_) {
171         grpc_timer_cancel(&timer_);
172         timer_pending_ = false;
173       }
174     }
175
176    private:
177     static void OnTimer(void* arg, grpc_error* error) {
178       ResourceState* self = static_cast<ResourceState*>(arg);
179       {
180         MutexLock lock(&self->ads_calld_->xds_client()->mu_);
181         self->OnTimerLocked(GRPC_ERROR_REF(error));
182       }
183       self->ads_calld_.reset();
184       self->Unref(DEBUG_LOCATION, "timer");
185     }
186
187     void OnTimerLocked(grpc_error* error) {
188       if (error == GRPC_ERROR_NONE && timer_pending_) {
189         timer_pending_ = false;
190         grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
191             absl::StrFormat(
192                 "timeout obtaining resource {type=%s name=%s} from xds server",
193                 type_url_, name_)
194                 .c_str());
195         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
196           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
197                   grpc_error_string(watcher_error));
198         }
199         if (type_url_ == XdsApi::kLdsTypeUrl) {
200           ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
201           for (const auto& p : state.watchers) {
202             p.first->OnError(GRPC_ERROR_REF(watcher_error));
203           }
204         } else if (type_url_ == XdsApi::kRdsTypeUrl) {
205           RouteConfigState& state =
206               ads_calld_->xds_client()->route_config_map_[name_];
207           for (const auto& p : state.watchers) {
208             p.first->OnError(GRPC_ERROR_REF(watcher_error));
209           }
210         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
211           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
212           for (const auto& p : state.watchers) {
213             p.first->OnError(GRPC_ERROR_REF(watcher_error));
214           }
215         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
216           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
217           for (const auto& p : state.watchers) {
218             p.first->OnError(GRPC_ERROR_REF(watcher_error));
219           }
220         } else {
221           GPR_UNREACHABLE_CODE(return );
222         }
223         GRPC_ERROR_UNREF(watcher_error);
224       }
225       GRPC_ERROR_UNREF(error);
226     }
227
228     const std::string type_url_;
229     const std::string name_;
230
231     RefCountedPtr<AdsCallState> ads_calld_;
232     bool sent_ = false;
233     bool timer_pending_ = false;
234     grpc_timer timer_;
235     grpc_closure timer_callback_;
236   };
237
238   struct ResourceTypeState {
239     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
240
241     // Version, nonce, and error for this resource type.
242     std::string version;
243     std::string nonce;
244     grpc_error* error = GRPC_ERROR_NONE;
245
246     // Subscribed resources of this type.
247     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
248         subscribed_resources;
249   };
250
251   void SendMessageLocked(const std::string& type_url);
252
253   void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
254   void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
255   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
256   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
257
258   static void OnRequestSent(void* arg, grpc_error* error);
259   void OnRequestSentLocked(grpc_error* error);
260   static void OnResponseReceived(void* arg, grpc_error* error);
261   bool OnResponseReceivedLocked();
262   static void OnStatusReceived(void* arg, grpc_error* error);
263   void OnStatusReceivedLocked(grpc_error* error);
264
265   bool IsCurrentCallOnChannel() const;
266
267   std::set<absl::string_view> ResourceNamesForRequest(
268       const std::string& type_url);
269
270   // The owning RetryableCall<>.
271   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
272
273   bool sent_initial_message_ = false;
274   bool seen_response_ = false;
275
276   // Always non-NULL.
277   grpc_call* call_;
278
279   // recv_initial_metadata
280   grpc_metadata_array initial_metadata_recv_;
281
282   // send_message
283   grpc_byte_buffer* send_message_payload_ = nullptr;
284   grpc_closure on_request_sent_;
285
286   // recv_message
287   grpc_byte_buffer* recv_message_payload_ = nullptr;
288   grpc_closure on_response_received_;
289
290   // recv_trailing_metadata
291   grpc_metadata_array trailing_metadata_recv_;
292   grpc_status_code status_code_;
293   grpc_slice status_details_;
294   grpc_closure on_status_received_;
295
296   // Resource types for which requests need to be sent.
297   std::set<std::string /*type_url*/> buffered_requests_;
298
299   // State for each resource type.
300   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
301 };
302
303 // Contains an LRS call to the xds server.
304 class XdsClient::ChannelState::LrsCallState
305     : public InternallyRefCounted<LrsCallState> {
306  public:
307   // The ctor and dtor should not be used directly.
308   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
309   ~LrsCallState() override;
310
311   void Orphan() override;
312
313   void MaybeStartReportingLocked();
314
315   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
316   ChannelState* chand() const { return parent_->chand(); }
317   XdsClient* xds_client() const { return chand()->xds_client(); }
318   bool seen_response() const { return seen_response_; }
319
320  private:
321   // Reports client-side load stats according to a fixed interval.
322   class Reporter : public InternallyRefCounted<Reporter> {
323    public:
324     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
325         : parent_(std::move(parent)), report_interval_(report_interval) {
326       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
327                         grpc_schedule_on_exec_ctx);
328       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
329                         grpc_schedule_on_exec_ctx);
330       ScheduleNextReportLocked();
331     }
332
333     void Orphan() override;
334
335    private:
336     void ScheduleNextReportLocked();
337     static void OnNextReportTimer(void* arg, grpc_error* error);
338     bool OnNextReportTimerLocked(grpc_error* error);
339     void SendReportLocked();
340     static void OnReportDone(void* arg, grpc_error* error);
341     bool OnReportDoneLocked(grpc_error* error);
342
343     bool IsCurrentReporterOnCall() const {
344       return this == parent_->reporter_.get();
345     }
346     XdsClient* xds_client() const { return parent_->xds_client(); }
347
348     // The owning LRS call.
349     RefCountedPtr<LrsCallState> parent_;
350
351     // The load reporting state.
352     const grpc_millis report_interval_;
353     bool last_report_counters_were_zero_ = false;
354     bool next_report_timer_callback_pending_ = false;
355     grpc_timer next_report_timer_;
356     grpc_closure on_next_report_timer_;
357     grpc_closure on_report_done_;
358   };
359
360   static void OnInitialRequestSent(void* arg, grpc_error* error);
361   void OnInitialRequestSentLocked();
362   static void OnResponseReceived(void* arg, grpc_error* error);
363   bool OnResponseReceivedLocked();
364   static void OnStatusReceived(void* arg, grpc_error* error);
365   void OnStatusReceivedLocked(grpc_error* error);
366
367   bool IsCurrentCallOnChannel() const;
368
369   // The owning RetryableCall<>.
370   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
371   bool seen_response_ = false;
372
373   // Always non-NULL.
374   grpc_call* call_;
375
376   // recv_initial_metadata
377   grpc_metadata_array initial_metadata_recv_;
378
379   // send_message
380   grpc_byte_buffer* send_message_payload_ = nullptr;
381   grpc_closure on_initial_request_sent_;
382
383   // recv_message
384   grpc_byte_buffer* recv_message_payload_ = nullptr;
385   grpc_closure on_response_received_;
386
387   // recv_trailing_metadata
388   grpc_metadata_array trailing_metadata_recv_;
389   grpc_status_code status_code_;
390   grpc_slice status_details_;
391   grpc_closure on_status_received_;
392
393   // Load reporting state.
394   bool send_all_clusters_ = false;
395   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
396   grpc_millis load_reporting_interval_ = 0;
397   OrphanablePtr<Reporter> reporter_;
398 };
399
400 //
401 // XdsClient::ChannelState::StateWatcher
402 //
403
404 class XdsClient::ChannelState::StateWatcher
405     : public AsyncConnectivityStateWatcherInterface {
406  public:
407   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
408       : parent_(std::move(parent)) {}
409
410  private:
411   void OnConnectivityStateChange(grpc_connectivity_state new_state,
412                                  const absl::Status& status) override {
413     MutexLock lock(&parent_->xds_client_->mu_);
414     if (!parent_->shutting_down_ &&
415         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
416       // In TRANSIENT_FAILURE.  Notify all watchers of error.
417       gpr_log(GPR_INFO,
418               "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
419               "status_message:(%s)",
420               parent_->xds_client(), status.ToString().c_str());
421       parent_->xds_client()->NotifyOnErrorLocked(
422           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
423               "xds channel in TRANSIENT_FAILURE"));
424     }
425   }
426
427   RefCountedPtr<ChannelState> parent_;
428 };
429
430 //
431 // XdsClient::ChannelState
432 //
433
434 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
435                                       grpc_channel* channel)
436     : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
437       xds_client_(std::move(xds_client)),
438       channel_(channel) {
439   GPR_ASSERT(channel_ != nullptr);
440   StartConnectivityWatchLocked();
441 }
442
443 XdsClient::ChannelState::~ChannelState() {
444   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
445     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
446             this);
447   }
448   grpc_channel_destroy(channel_);
449   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
450 }
451
452 void XdsClient::ChannelState::Orphan() {
453   shutting_down_ = true;
454   CancelConnectivityWatchLocked();
455   ads_calld_.reset();
456   lrs_calld_.reset();
457   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
458 }
459
460 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
461     const {
462   return ads_calld_->calld();
463 }
464
465 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
466     const {
467   return lrs_calld_->calld();
468 }
469
470 bool XdsClient::ChannelState::HasActiveAdsCall() const {
471   return ads_calld_->calld() != nullptr;
472 }
473
474 void XdsClient::ChannelState::MaybeStartLrsCall() {
475   if (lrs_calld_ != nullptr) return;
476   lrs_calld_.reset(
477       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
478 }
479
480 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
481
482 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
483   grpc_channel_element* client_channel_elem =
484       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
485   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
486   watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
487   grpc_client_channel_start_connectivity_watch(
488       client_channel_elem, GRPC_CHANNEL_IDLE,
489       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
490 }
491
492 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
493   grpc_channel_element* client_channel_elem =
494       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
495   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
496   grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
497 }
498
499 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
500                                         const std::string& name) {
501   if (ads_calld_ == nullptr) {
502     // Start the ADS call if this is the first request.
503     ads_calld_.reset(new RetryableCall<AdsCallState>(
504         Ref(DEBUG_LOCATION, "ChannelState+ads")));
505     // Note: AdsCallState's ctor will automatically subscribe to all
506     // resources that the XdsClient already has watchers for, so we can
507     // return here.
508     return;
509   }
510   // If the ADS call is in backoff state, we don't need to do anything now
511   // because when the call is restarted it will resend all necessary requests.
512   if (ads_calld() == nullptr) return;
513   // Subscribe to this resource if the ADS call is active.
514   ads_calld()->Subscribe(type_url, name);
515 }
516
517 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
518                                           const std::string& name,
519                                           bool delay_unsubscription) {
520   if (ads_calld_ != nullptr) {
521     auto* calld = ads_calld_->calld();
522     if (calld != nullptr) {
523       calld->Unsubscribe(type_url, name, delay_unsubscription);
524       if (!calld->HasSubscribedResources()) ads_calld_.reset();
525     }
526   }
527 }
528
529 //
530 // XdsClient::ChannelState::RetryableCall<>
531 //
532
533 template <typename T>
534 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
535     RefCountedPtr<ChannelState> chand)
536     : chand_(std::move(chand)),
537       backoff_(
538           BackOff::Options()
539               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
540                                    1000)
541               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
542               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
543               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
544   // Closure Initialization
545   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
546                     grpc_schedule_on_exec_ctx);
547   StartNewCallLocked();
548 }
549
550 template <typename T>
551 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
552   shutting_down_ = true;
553   calld_.reset();
554   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
555   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
556 }
557
558 template <typename T>
559 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
560   const bool seen_response = calld_->seen_response();
561   calld_.reset();
562   if (seen_response) {
563     // If we lost connection to the xds server, reset backoff and restart the
564     // call immediately.
565     backoff_.Reset();
566     StartNewCallLocked();
567   } else {
568     // If we failed to connect to the xds server, retry later.
569     StartRetryTimerLocked();
570   }
571 }
572
573 template <typename T>
574 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
575   if (shutting_down_) return;
576   GPR_ASSERT(chand_->channel_ != nullptr);
577   GPR_ASSERT(calld_ == nullptr);
578   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
579     gpr_log(GPR_INFO,
580             "[xds_client %p] Start new call from retryable call (chand: %p, "
581             "retryable call: %p)",
582             chand()->xds_client(), chand(), this);
583   }
584   calld_ = MakeOrphanable<T>(
585       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
586 }
587
588 template <typename T>
589 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
590   if (shutting_down_) return;
591   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
592   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
593     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
594     gpr_log(GPR_INFO,
595             "[xds_client %p] Failed to connect to xds server (chand: %p) "
596             "retry timer will fire in %" PRId64 "ms.",
597             chand()->xds_client(), chand(), timeout);
598   }
599   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
600   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
601   retry_timer_callback_pending_ = true;
602 }
603
604 template <typename T>
605 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
606     void* arg, grpc_error* error) {
607   RetryableCall* calld = static_cast<RetryableCall*>(arg);
608   {
609     MutexLock lock(&calld->chand_->xds_client()->mu_);
610     calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
611   }
612   calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
613 }
614
615 template <typename T>
616 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
617     grpc_error* error) {
618   retry_timer_callback_pending_ = false;
619   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
620     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
621       gpr_log(
622           GPR_INFO,
623           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
624           chand()->xds_client(), chand(), this);
625     }
626     StartNewCallLocked();
627   }
628   GRPC_ERROR_UNREF(error);
629 }
630
631 //
632 // XdsClient::ChannelState::AdsCallState
633 //
634
635 XdsClient::ChannelState::AdsCallState::AdsCallState(
636     RefCountedPtr<RetryableCall<AdsCallState>> parent)
637     : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
638       parent_(std::move(parent)) {
639   // Init the ADS call. Note that the call will progress every time there's
640   // activity in xds_client()->interested_parties_, which is comprised of
641   // the polling entities from client_channel.
642   GPR_ASSERT(xds_client() != nullptr);
643   // Create a call with the specified method name.
644   const auto& method =
645       xds_client()->bootstrap_->server().ShouldUseV3()
646           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
647           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
648   call_ = grpc_channel_create_pollset_set_call(
649       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
650       xds_client()->interested_parties_, method, nullptr,
651       GRPC_MILLIS_INF_FUTURE, nullptr);
652   GPR_ASSERT(call_ != nullptr);
653   // Init data associated with the call.
654   grpc_metadata_array_init(&initial_metadata_recv_);
655   grpc_metadata_array_init(&trailing_metadata_recv_);
656   // Start the call.
657   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
658     gpr_log(GPR_INFO,
659             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
660             "call: %p)",
661             xds_client(), chand(), this, call_);
662   }
663   // Create the ops.
664   grpc_call_error call_error;
665   grpc_op ops[3];
666   memset(ops, 0, sizeof(ops));
667   // Op: send initial metadata.
668   grpc_op* op = ops;
669   op->op = GRPC_OP_SEND_INITIAL_METADATA;
670   op->data.send_initial_metadata.count = 0;
671   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
672               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
673   op->reserved = nullptr;
674   op++;
675   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
676                                                  nullptr);
677   GPR_ASSERT(GRPC_CALL_OK == call_error);
678   // Op: send request message.
679   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
680                     grpc_schedule_on_exec_ctx);
681   for (const auto& p : xds_client()->listener_map_) {
682     Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
683   }
684   for (const auto& p : xds_client()->route_config_map_) {
685     Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
686   }
687   for (const auto& p : xds_client()->cluster_map_) {
688     Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
689   }
690   for (const auto& p : xds_client()->endpoint_map_) {
691     Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
692   }
693   // Op: recv initial metadata.
694   op = ops;
695   op->op = GRPC_OP_RECV_INITIAL_METADATA;
696   op->data.recv_initial_metadata.recv_initial_metadata =
697       &initial_metadata_recv_;
698   op->flags = 0;
699   op->reserved = nullptr;
700   op++;
701   // Op: recv response.
702   op->op = GRPC_OP_RECV_MESSAGE;
703   op->data.recv_message.recv_message = &recv_message_payload_;
704   op->flags = 0;
705   op->reserved = nullptr;
706   op++;
707   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
708   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
709                     grpc_schedule_on_exec_ctx);
710   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
711                                                  &on_response_received_);
712   GPR_ASSERT(GRPC_CALL_OK == call_error);
713   // Op: recv server status.
714   op = ops;
715   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
716   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
717   op->data.recv_status_on_client.status = &status_code_;
718   op->data.recv_status_on_client.status_details = &status_details_;
719   op->flags = 0;
720   op->reserved = nullptr;
721   op++;
722   // This callback signals the end of the call, so it relies on the initial
723   // ref instead of a new ref. When it's invoked, it's the initial ref that is
724   // unreffed.
725   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
726                     grpc_schedule_on_exec_ctx);
727   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
728                                                  &on_status_received_);
729   GPR_ASSERT(GRPC_CALL_OK == call_error);
730 }
731
732 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
733   grpc_metadata_array_destroy(&initial_metadata_recv_);
734   grpc_metadata_array_destroy(&trailing_metadata_recv_);
735   grpc_byte_buffer_destroy(send_message_payload_);
736   grpc_byte_buffer_destroy(recv_message_payload_);
737   grpc_slice_unref_internal(status_details_);
738   GPR_ASSERT(call_ != nullptr);
739   grpc_call_unref(call_);
740 }
741
742 void XdsClient::ChannelState::AdsCallState::Orphan() {
743   GPR_ASSERT(call_ != nullptr);
744   // If we are here because xds_client wants to cancel the call,
745   // on_status_received_ will complete the cancellation and clean up. Otherwise,
746   // we are here because xds_client has to orphan a failed call, then the
747   // following cancellation will be a no-op.
748   grpc_call_cancel_internal(call_);
749   state_map_.clear();
750   // Note that the initial ref is hold by on_status_received_. So the
751   // corresponding unref happens in on_status_received_ instead of here.
752 }
753
754 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
755     const std::string& type_url) {
756   // Buffer message sending if an existing message is in flight.
757   if (send_message_payload_ != nullptr) {
758     buffered_requests_.insert(type_url);
759     return;
760   }
761   auto& state = state_map_[type_url];
762   grpc_slice request_payload_slice;
763   std::set<absl::string_view> resource_names =
764       ResourceNamesForRequest(type_url);
765   request_payload_slice = xds_client()->api_.CreateAdsRequest(
766       type_url, resource_names, state.version, state.nonce,
767       GRPC_ERROR_REF(state.error), !sent_initial_message_);
768   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
769       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
770     state_map_.erase(type_url);
771   }
772   sent_initial_message_ = true;
773   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
774     gpr_log(GPR_INFO,
775             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
776             "error=%s resources=%s",
777             xds_client(), type_url.c_str(), state.version.c_str(),
778             state.nonce.c_str(), grpc_error_string(state.error),
779             absl::StrJoin(resource_names, " ").c_str());
780   }
781   GRPC_ERROR_UNREF(state.error);
782   state.error = GRPC_ERROR_NONE;
783   // Create message payload.
784   send_message_payload_ =
785       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
786   grpc_slice_unref_internal(request_payload_slice);
787   // Send the message.
788   grpc_op op;
789   memset(&op, 0, sizeof(op));
790   op.op = GRPC_OP_SEND_MESSAGE;
791   op.data.send_message.send_message = send_message_payload_;
792   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
793   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
794                     grpc_schedule_on_exec_ctx);
795   grpc_call_error call_error =
796       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
797   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
798     gpr_log(GPR_ERROR,
799             "[xds_client %p] calld=%p call_error=%d sending ADS message",
800             xds_client(), this, call_error);
801     GPR_ASSERT(GRPC_CALL_OK == call_error);
802   }
803 }
804
805 void XdsClient::ChannelState::AdsCallState::Subscribe(
806     const std::string& type_url, const std::string& name) {
807   auto& state = state_map_[type_url].subscribed_resources[name];
808   if (state == nullptr) {
809     state = MakeOrphanable<ResourceState>(type_url, name);
810     SendMessageLocked(type_url);
811   }
812 }
813
814 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
815     const std::string& type_url, const std::string& name,
816     bool delay_unsubscription) {
817   state_map_[type_url].subscribed_resources.erase(name);
818   if (!delay_unsubscription) SendMessageLocked(type_url);
819 }
820
821 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
822   for (const auto& p : state_map_) {
823     if (!p.second.subscribed_resources.empty()) return true;
824   }
825   return false;
826 }
827
828 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
829     XdsApi::LdsUpdateMap lds_update_map) {
830   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
831     gpr_log(GPR_INFO,
832             "[xds_client %p] LDS update received containing %" PRIuPTR
833             " resources",
834             xds_client(), lds_update_map.size());
835   }
836   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
837   std::set<std::string> rds_resource_names_seen;
838   for (auto& p : lds_update_map) {
839     const std::string& listener_name = p.first;
840     XdsApi::LdsUpdate& lds_update = p.second;
841     auto& state = lds_state.subscribed_resources[listener_name];
842     if (state != nullptr) state->Finish();
843     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
844       gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
845               xds_client(), listener_name.c_str(),
846               (!lds_update.route_config_name.empty()
847                    ? lds_update.route_config_name.c_str()
848                    : "<inlined>"));
849       if (lds_update.rds_update.has_value()) {
850         gpr_log(GPR_INFO, "RouteConfiguration: %s",
851                 lds_update.rds_update->ToString().c_str());
852       }
853     }
854     // Record the RDS resource names seen.
855     if (!lds_update.route_config_name.empty()) {
856       rds_resource_names_seen.insert(lds_update.route_config_name);
857     }
858     // Ignore identical update.
859     ListenerState& listener_state = xds_client()->listener_map_[listener_name];
860     if (listener_state.update.has_value() &&
861         *listener_state.update == lds_update) {
862       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
863         gpr_log(GPR_INFO,
864                 "[xds_client %p] LDS update for %s identical to current, "
865                 "ignoring.",
866                 xds_client(), listener_name.c_str());
867       }
868       continue;
869     }
870     // Update the listener state.
871     listener_state.update = std::move(lds_update);
872     // Notify watchers.
873     for (const auto& p : listener_state.watchers) {
874       p.first->OnListenerChanged(*listener_state.update);
875     }
876   }
877   // For any subscribed resource that is not present in the update,
878   // remove it from the cache and notify watchers that it does not exist.
879   for (const auto& p : lds_state.subscribed_resources) {
880     const std::string& listener_name = p.first;
881     if (lds_update_map.find(listener_name) == lds_update_map.end()) {
882       ListenerState& listener_state =
883           xds_client()->listener_map_[listener_name];
884       // If the resource was newly requested but has not yet been received,
885       // we don't want to generate an error for the watchers, because this LDS
886       // response may be in reaction to an earlier request that did not yet
887       // request the new resource, so its absence from the response does not
888       // necessarily indicate that the resource does not exist.
889       // For that case, we rely on the request timeout instead.
890       if (!listener_state.update.has_value()) continue;
891       listener_state.update.reset();
892       for (const auto& p : listener_state.watchers) {
893         p.first->OnResourceDoesNotExist();
894       }
895     }
896   }
897   // For any RDS resource that is no longer referred to by any LDS
898   // resources, remove it from the cache and notify watchers that it
899   // does not exist.
900   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
901   for (const auto& p : rds_state.subscribed_resources) {
902     const std::string& rds_resource_name = p.first;
903     if (rds_resource_names_seen.find(rds_resource_name) ==
904         rds_resource_names_seen.end()) {
905       RouteConfigState& route_config_state =
906           xds_client()->route_config_map_[rds_resource_name];
907       route_config_state.update.reset();
908       for (const auto& p : route_config_state.watchers) {
909         p.first->OnResourceDoesNotExist();
910       }
911     }
912   }
913 }
914
915 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
916     XdsApi::RdsUpdateMap rds_update_map) {
917   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
918     gpr_log(GPR_INFO,
919             "[xds_client %p] RDS update received containing %" PRIuPTR
920             " resources",
921             xds_client(), rds_update_map.size());
922   }
923   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
924   for (auto& p : rds_update_map) {
925     const std::string& route_config_name = p.first;
926     XdsApi::RdsUpdate& rds_update = p.second;
927     auto& state = rds_state.subscribed_resources[route_config_name];
928     if (state != nullptr) state->Finish();
929     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
930       gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
931               rds_update.ToString().c_str());
932     }
933     RouteConfigState& route_config_state =
934         xds_client()->route_config_map_[route_config_name];
935     // Ignore identical update.
936     if (route_config_state.update.has_value() &&
937         *route_config_state.update == rds_update) {
938       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
939         gpr_log(GPR_INFO,
940                 "[xds_client %p] RDS resource identical to current, ignoring",
941                 xds_client());
942       }
943       continue;
944     }
945     // Update the cache.
946     route_config_state.update = std::move(rds_update);
947     // Notify all watchers.
948     for (const auto& p : route_config_state.watchers) {
949       p.first->OnRouteConfigChanged(*route_config_state.update);
950     }
951   }
952 }
953
954 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
955     XdsApi::CdsUpdateMap cds_update_map) {
956   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
957     gpr_log(GPR_INFO,
958             "[xds_client %p] CDS update received containing %" PRIuPTR
959             " resources",
960             xds_client(), cds_update_map.size());
961   }
962   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
963   std::set<std::string> eds_resource_names_seen;
964   for (auto& p : cds_update_map) {
965     const char* cluster_name = p.first.c_str();
966     XdsApi::CdsUpdate& cds_update = p.second;
967     auto& state = cds_state.subscribed_resources[cluster_name];
968     if (state != nullptr) state->Finish();
969     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
970       gpr_log(GPR_INFO,
971               "[xds_client %p] cluster=%s: eds_service_name=%s, "
972               "lrs_load_reporting_server_name=%s",
973               xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
974               cds_update.lrs_load_reporting_server_name.has_value()
975                   ? cds_update.lrs_load_reporting_server_name.value().c_str()
976                   : "(N/A)");
977     }
978     // Record the EDS resource names seen.
979     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
980                                        ? cluster_name
981                                        : cds_update.eds_service_name);
982     // Ignore identical update.
983     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
984     if (cluster_state.update.has_value() &&
985         *cluster_state.update == cds_update) {
986       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
987         gpr_log(GPR_INFO,
988                 "[xds_client %p] CDS update identical to current, ignoring.",
989                 xds_client());
990       }
991       continue;
992     }
993     // Update the cluster state.
994     cluster_state.update = std::move(cds_update);
995     // Notify all watchers.
996     for (const auto& p : cluster_state.watchers) {
997       p.first->OnClusterChanged(cluster_state.update.value());
998     }
999   }
1000   // For any subscribed resource that is not present in the update,
1001   // remove it from the cache and notify watchers that it does not exist.
1002   for (const auto& p : cds_state.subscribed_resources) {
1003     const std::string& cluster_name = p.first;
1004     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1005       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1006       // If the resource was newly requested but has not yet been received,
1007       // we don't want to generate an error for the watchers, because this CDS
1008       // response may be in reaction to an earlier request that did not yet
1009       // request the new resource, so its absence from the response does not
1010       // necessarily indicate that the resource does not exist.
1011       // For that case, we rely on the request timeout instead.
1012       if (!cluster_state.update.has_value()) continue;
1013       cluster_state.update.reset();
1014       for (const auto& p : cluster_state.watchers) {
1015         p.first->OnResourceDoesNotExist();
1016       }
1017     }
1018   }
1019   // For any EDS resource that is no longer referred to by any CDS
1020   // resources, remove it from the cache and notify watchers that it
1021   // does not exist.
1022   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1023   for (const auto& p : eds_state.subscribed_resources) {
1024     const std::string& eds_resource_name = p.first;
1025     if (eds_resource_names_seen.find(eds_resource_name) ==
1026         eds_resource_names_seen.end()) {
1027       EndpointState& endpoint_state =
1028           xds_client()->endpoint_map_[eds_resource_name];
1029       endpoint_state.update.reset();
1030       for (const auto& p : endpoint_state.watchers) {
1031         p.first->OnResourceDoesNotExist();
1032       }
1033     }
1034   }
1035 }
1036
1037 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1038     XdsApi::EdsUpdateMap eds_update_map) {
1039   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1040     gpr_log(GPR_INFO,
1041             "[xds_client %p] EDS update received containing %" PRIuPTR
1042             " resources",
1043             xds_client(), eds_update_map.size());
1044   }
1045   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1046   for (auto& p : eds_update_map) {
1047     const char* eds_service_name = p.first.c_str();
1048     XdsApi::EdsUpdate& eds_update = p.second;
1049     auto& state = eds_state.subscribed_resources[eds_service_name];
1050     if (state != nullptr) state->Finish();
1051     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1052       gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1053               eds_service_name, eds_update.ToString().c_str());
1054     }
1055     EndpointState& endpoint_state =
1056         xds_client()->endpoint_map_[eds_service_name];
1057     // Ignore identical update.
1058     if (endpoint_state.update.has_value() &&
1059         *endpoint_state.update == eds_update) {
1060       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1061         gpr_log(GPR_INFO,
1062                 "[xds_client %p] EDS update identical to current, ignoring.",
1063                 xds_client());
1064       }
1065       continue;
1066     }
1067     // Update the cluster state.
1068     endpoint_state.update = std::move(eds_update);
1069     // Notify all watchers.
1070     for (const auto& p : endpoint_state.watchers) {
1071       p.first->OnEndpointChanged(endpoint_state.update.value());
1072     }
1073   }
1074 }
1075
1076 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1077                                                           grpc_error* error) {
1078   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1079   {
1080     MutexLock lock(&ads_calld->xds_client()->mu_);
1081     ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1082   }
1083   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1084 }
1085
1086 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1087     grpc_error* error) {
1088   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1089     // Clean up the sent message.
1090     grpc_byte_buffer_destroy(send_message_payload_);
1091     send_message_payload_ = nullptr;
1092     // Continue to send another pending message if any.
1093     // TODO(roth): The current code to handle buffered messages has the
1094     // advantage of sending only the most recent list of resource names for
1095     // each resource type (no matter how many times that resource type has
1096     // been requested to send while the current message sending is still
1097     // pending). But its disadvantage is that we send the requests in fixed
1098     // order of resource types. We need to fix this if we are seeing some
1099     // resource type(s) starved due to frequent requests of other resource
1100     // type(s).
1101     auto it = buffered_requests_.begin();
1102     if (it != buffered_requests_.end()) {
1103       SendMessageLocked(*it);
1104       buffered_requests_.erase(it);
1105     }
1106   }
1107   GRPC_ERROR_UNREF(error);
1108 }
1109
1110 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1111     void* arg, grpc_error* /* error */) {
1112   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1113   bool done;
1114   {
1115     MutexLock lock(&ads_calld->xds_client()->mu_);
1116     done = ads_calld->OnResponseReceivedLocked();
1117   }
1118   if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1119 }
1120
1121 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1122   // Empty payload means the call was cancelled.
1123   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1124     return true;
1125   }
1126   // Read the response.
1127   grpc_byte_buffer_reader bbr;
1128   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1129   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1130   grpc_byte_buffer_reader_destroy(&bbr);
1131   grpc_byte_buffer_destroy(recv_message_payload_);
1132   recv_message_payload_ = nullptr;
1133   // Parse and validate the response.
1134   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1135       response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1136       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1137       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1138       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1139   grpc_slice_unref_internal(response_slice);
1140   if (result.type_url.empty()) {
1141     // Ignore unparsable response.
1142     gpr_log(GPR_ERROR,
1143             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1144             xds_client(), grpc_error_string(result.parse_error));
1145     GRPC_ERROR_UNREF(result.parse_error);
1146   } else {
1147     // Update nonce.
1148     auto& state = state_map_[result.type_url];
1149     state.nonce = std::move(result.nonce);
1150     // NACK or ACK the response.
1151     if (result.parse_error != GRPC_ERROR_NONE) {
1152       GRPC_ERROR_UNREF(state.error);
1153       state.error = result.parse_error;
1154       // NACK unacceptable update.
1155       gpr_log(GPR_ERROR,
1156               "[xds_client %p] ADS response invalid for resource type %s "
1157               "version %s, will NACK: nonce=%s error=%s",
1158               xds_client(), result.type_url.c_str(), result.version.c_str(),
1159               state.nonce.c_str(), grpc_error_string(result.parse_error));
1160       SendMessageLocked(result.type_url);
1161     } else {
1162       seen_response_ = true;
1163       // Accept the ADS response according to the type_url.
1164       if (result.type_url == XdsApi::kLdsTypeUrl) {
1165         AcceptLdsUpdate(std::move(result.lds_update_map));
1166       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1167         AcceptRdsUpdate(std::move(result.rds_update_map));
1168       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1169         AcceptCdsUpdate(std::move(result.cds_update_map));
1170       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1171         AcceptEdsUpdate(std::move(result.eds_update_map));
1172       }
1173       state.version = std::move(result.version);
1174       // ACK the update.
1175       SendMessageLocked(result.type_url);
1176       // Start load reporting if needed.
1177       auto& lrs_call = chand()->lrs_calld_;
1178       if (lrs_call != nullptr) {
1179         LrsCallState* lrs_calld = lrs_call->calld();
1180         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1181       }
1182     }
1183   }
1184   if (xds_client()->shutting_down_) return true;
1185   // Keep listening for updates.
1186   grpc_op op;
1187   memset(&op, 0, sizeof(op));
1188   op.op = GRPC_OP_RECV_MESSAGE;
1189   op.data.recv_message.recv_message = &recv_message_payload_;
1190   op.flags = 0;
1191   op.reserved = nullptr;
1192   GPR_ASSERT(call_ != nullptr);
1193   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1194   const grpc_call_error call_error =
1195       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1196   GPR_ASSERT(GRPC_CALL_OK == call_error);
1197   return false;
1198 }
1199
1200 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1201     void* arg, grpc_error* error) {
1202   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1203   {
1204     MutexLock lock(&ads_calld->xds_client()->mu_);
1205     ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1206   }
1207   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1208 }
1209
1210 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1211     grpc_error* error) {
1212   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1213     char* status_details = grpc_slice_to_c_string(status_details_);
1214     gpr_log(GPR_INFO,
1215             "[xds_client %p] ADS call status received. Status = %d, details "
1216             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1217             xds_client(), status_code_, status_details, chand(), this, call_,
1218             grpc_error_string(error));
1219     gpr_free(status_details);
1220   }
1221   // Ignore status from a stale call.
1222   if (IsCurrentCallOnChannel()) {
1223     // Try to restart the call.
1224     parent_->OnCallFinishedLocked();
1225     // Send error to all watchers.
1226     xds_client()->NotifyOnErrorLocked(
1227         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1228   }
1229   GRPC_ERROR_UNREF(error);
1230 }
1231
1232 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1233   // If the retryable ADS call is null (which only happens when the xds channel
1234   // is shutting down), all the ADS calls are stale.
1235   if (chand()->ads_calld_ == nullptr) return false;
1236   return this == chand()->ads_calld_->calld();
1237 }
1238
1239 std::set<absl::string_view>
1240 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1241     const std::string& type_url) {
1242   std::set<absl::string_view> resource_names;
1243   auto it = state_map_.find(type_url);
1244   if (it != state_map_.end()) {
1245     for (auto& p : it->second.subscribed_resources) {
1246       resource_names.insert(p.first);
1247       OrphanablePtr<ResourceState>& state = p.second;
1248       state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1249     }
1250   }
1251   return resource_names;
1252 }
1253
1254 //
1255 // XdsClient::ChannelState::LrsCallState::Reporter
1256 //
1257
1258 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1259   if (next_report_timer_callback_pending_) {
1260     grpc_timer_cancel(&next_report_timer_);
1261   }
1262 }
1263
1264 void XdsClient::ChannelState::LrsCallState::Reporter::
1265     ScheduleNextReportLocked() {
1266   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1267   grpc_timer_init(&next_report_timer_, next_report_time,
1268                   &on_next_report_timer_);
1269   next_report_timer_callback_pending_ = true;
1270 }
1271
1272 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1273     void* arg, grpc_error* error) {
1274   Reporter* self = static_cast<Reporter*>(arg);
1275   bool done;
1276   {
1277     MutexLock lock(&self->xds_client()->mu_);
1278     done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1279   }
1280   if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1281 }
1282
1283 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1284     grpc_error* error) {
1285   next_report_timer_callback_pending_ = false;
1286   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1287     GRPC_ERROR_UNREF(error);
1288     return true;
1289   }
1290   SendReportLocked();
1291   return false;
1292 }
1293
1294 namespace {
1295
1296 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1297   for (const auto& p : snapshot) {
1298     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1299     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1300     for (const auto& q : cluster_snapshot.locality_stats) {
1301       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1302       if (!locality_snapshot.IsZero()) return false;
1303     }
1304   }
1305   return true;
1306 }
1307
1308 }  // namespace
1309
1310 void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1311   // Construct snapshot from all reported stats.
1312   XdsApi::ClusterLoadReportMap snapshot =
1313       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1314                                                   parent_->cluster_names_);
1315   // Skip client load report if the counters were all zero in the last
1316   // report and they are still zero in this one.
1317   const bool old_val = last_report_counters_were_zero_;
1318   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1319   if (old_val && last_report_counters_were_zero_) {
1320     ScheduleNextReportLocked();
1321     return;
1322   }
1323   // Create a request that contains the snapshot.
1324   grpc_slice request_payload_slice =
1325       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1326   parent_->send_message_payload_ =
1327       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1328   grpc_slice_unref_internal(request_payload_slice);
1329   // Send the report.
1330   grpc_op op;
1331   memset(&op, 0, sizeof(op));
1332   op.op = GRPC_OP_SEND_MESSAGE;
1333   op.data.send_message.send_message = parent_->send_message_payload_;
1334   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1335       parent_->call_, &op, 1, &on_report_done_);
1336   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1337     gpr_log(GPR_ERROR,
1338             "[xds_client %p] calld=%p call_error=%d sending client load report",
1339             xds_client(), this, call_error);
1340     GPR_ASSERT(GRPC_CALL_OK == call_error);
1341   }
1342 }
1343
1344 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1345     void* arg, grpc_error* error) {
1346   Reporter* self = static_cast<Reporter*>(arg);
1347   bool done;
1348   {
1349     MutexLock lock(&self->xds_client()->mu_);
1350     done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1351   }
1352   if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1353 }
1354
1355 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1356     grpc_error* error) {
1357   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1358   parent_->send_message_payload_ = nullptr;
1359   // If there are no more registered stats to report, cancel the call.
1360   if (xds_client()->load_report_map_.empty()) {
1361     parent_->chand()->StopLrsCall();
1362     GRPC_ERROR_UNREF(error);
1363     return true;
1364   }
1365   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1366     GRPC_ERROR_UNREF(error);
1367     // If this reporter is no longer the current one on the call, the reason
1368     // might be that it was orphaned for a new one due to config update.
1369     if (!IsCurrentReporterOnCall()) {
1370       parent_->MaybeStartReportingLocked();
1371     }
1372     return true;
1373   }
1374   ScheduleNextReportLocked();
1375   return false;
1376 }
1377
1378 //
1379 // XdsClient::ChannelState::LrsCallState
1380 //
1381
1382 XdsClient::ChannelState::LrsCallState::LrsCallState(
1383     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1384     : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
1385       parent_(std::move(parent)) {
1386   // Init the LRS call. Note that the call will progress every time there's
1387   // activity in xds_client()->interested_parties_, which is comprised of
1388   // the polling entities from client_channel.
1389   GPR_ASSERT(xds_client() != nullptr);
1390   const auto& method =
1391       xds_client()->bootstrap_->server().ShouldUseV3()
1392           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1393           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1394   call_ = grpc_channel_create_pollset_set_call(
1395       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1396       xds_client()->interested_parties_, method, nullptr,
1397       GRPC_MILLIS_INF_FUTURE, nullptr);
1398   GPR_ASSERT(call_ != nullptr);
1399   // Init the request payload.
1400   grpc_slice request_payload_slice =
1401       xds_client()->api_.CreateLrsInitialRequest();
1402   send_message_payload_ =
1403       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1404   grpc_slice_unref_internal(request_payload_slice);
1405   // Init other data associated with the LRS call.
1406   grpc_metadata_array_init(&initial_metadata_recv_);
1407   grpc_metadata_array_init(&trailing_metadata_recv_);
1408   // Start the call.
1409   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1410     gpr_log(GPR_INFO,
1411             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1412             "call: %p)",
1413             xds_client(), chand(), this, call_);
1414   }
1415   // Create the ops.
1416   grpc_call_error call_error;
1417   grpc_op ops[3];
1418   memset(ops, 0, sizeof(ops));
1419   // Op: send initial metadata.
1420   grpc_op* op = ops;
1421   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1422   op->data.send_initial_metadata.count = 0;
1423   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1424               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1425   op->reserved = nullptr;
1426   op++;
1427   // Op: send request message.
1428   GPR_ASSERT(send_message_payload_ != nullptr);
1429   op->op = GRPC_OP_SEND_MESSAGE;
1430   op->data.send_message.send_message = send_message_payload_;
1431   op->flags = 0;
1432   op->reserved = nullptr;
1433   op++;
1434   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1435   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1436                     grpc_schedule_on_exec_ctx);
1437   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1438                                                  &on_initial_request_sent_);
1439   GPR_ASSERT(GRPC_CALL_OK == call_error);
1440   // Op: recv initial metadata.
1441   op = ops;
1442   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1443   op->data.recv_initial_metadata.recv_initial_metadata =
1444       &initial_metadata_recv_;
1445   op->flags = 0;
1446   op->reserved = nullptr;
1447   op++;
1448   // Op: recv response.
1449   op->op = GRPC_OP_RECV_MESSAGE;
1450   op->data.recv_message.recv_message = &recv_message_payload_;
1451   op->flags = 0;
1452   op->reserved = nullptr;
1453   op++;
1454   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1455   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1456                     grpc_schedule_on_exec_ctx);
1457   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1458                                                  &on_response_received_);
1459   GPR_ASSERT(GRPC_CALL_OK == call_error);
1460   // Op: recv server status.
1461   op = ops;
1462   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1463   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1464   op->data.recv_status_on_client.status = &status_code_;
1465   op->data.recv_status_on_client.status_details = &status_details_;
1466   op->flags = 0;
1467   op->reserved = nullptr;
1468   op++;
1469   // This callback signals the end of the call, so it relies on the initial
1470   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1471   // unreffed.
1472   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1473                     grpc_schedule_on_exec_ctx);
1474   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1475                                                  &on_status_received_);
1476   GPR_ASSERT(GRPC_CALL_OK == call_error);
1477 }
1478
1479 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1480   grpc_metadata_array_destroy(&initial_metadata_recv_);
1481   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1482   grpc_byte_buffer_destroy(send_message_payload_);
1483   grpc_byte_buffer_destroy(recv_message_payload_);
1484   grpc_slice_unref_internal(status_details_);
1485   GPR_ASSERT(call_ != nullptr);
1486   grpc_call_unref(call_);
1487 }
1488
1489 void XdsClient::ChannelState::LrsCallState::Orphan() {
1490   reporter_.reset();
1491   GPR_ASSERT(call_ != nullptr);
1492   // If we are here because xds_client wants to cancel the call,
1493   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1494   // we are here because xds_client has to orphan a failed call, then the
1495   // following cancellation will be a no-op.
1496   grpc_call_cancel_internal(call_);
1497   // Note that the initial ref is hold by on_status_received_. So the
1498   // corresponding unref happens in on_status_received_ instead of here.
1499 }
1500
1501 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1502   // Don't start again if already started.
1503   if (reporter_ != nullptr) return;
1504   // Don't start if the previous send_message op (of the initial request or the
1505   // last report of the previous reporter) hasn't completed.
1506   if (send_message_payload_ != nullptr) return;
1507   // Don't start if no LRS response has arrived.
1508   if (!seen_response()) return;
1509   // Don't start if the ADS call hasn't received any valid response. Note that
1510   // this must be the first channel because it is the current channel but its
1511   // ADS call hasn't seen any response.
1512   if (chand()->ads_calld_ == nullptr ||
1513       chand()->ads_calld_->calld() == nullptr ||
1514       !chand()->ads_calld_->calld()->seen_response()) {
1515     return;
1516   }
1517   // Start reporting.
1518   reporter_ = MakeOrphanable<Reporter>(
1519       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1520 }
1521
1522 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1523     void* arg, grpc_error* /*error*/) {
1524   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1525   {
1526     MutexLock lock(&lrs_calld->xds_client()->mu_);
1527     lrs_calld->OnInitialRequestSentLocked();
1528   }
1529   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1530 }
1531
1532 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1533   // Clear the send_message_payload_.
1534   grpc_byte_buffer_destroy(send_message_payload_);
1535   send_message_payload_ = nullptr;
1536   MaybeStartReportingLocked();
1537 }
1538
1539 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1540     void* arg, grpc_error* /*error*/) {
1541   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1542   bool done;
1543   {
1544     MutexLock lock(&lrs_calld->xds_client()->mu_);
1545     done = lrs_calld->OnResponseReceivedLocked();
1546   }
1547   if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1548 }
1549
1550 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1551   // Empty payload means the call was cancelled.
1552   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1553     return true;
1554   }
1555   // Read the response.
1556   grpc_byte_buffer_reader bbr;
1557   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1558   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1559   grpc_byte_buffer_reader_destroy(&bbr);
1560   grpc_byte_buffer_destroy(recv_message_payload_);
1561   recv_message_payload_ = nullptr;
1562   // This anonymous lambda is a hack to avoid the usage of goto.
1563   [&]() {
1564     // Parse the response.
1565     bool send_all_clusters = false;
1566     std::set<std::string> new_cluster_names;
1567     grpc_millis new_load_reporting_interval;
1568     grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1569         response_slice, &send_all_clusters, &new_cluster_names,
1570         &new_load_reporting_interval);
1571     if (parse_error != GRPC_ERROR_NONE) {
1572       gpr_log(GPR_ERROR,
1573               "[xds_client %p] LRS response parsing failed. error=%s",
1574               xds_client(), grpc_error_string(parse_error));
1575       GRPC_ERROR_UNREF(parse_error);
1576       return;
1577     }
1578     seen_response_ = true;
1579     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1580       gpr_log(
1581           GPR_INFO,
1582           "[xds_client %p] LRS response received, %" PRIuPTR
1583           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1584           "ms",
1585           xds_client(), new_cluster_names.size(), send_all_clusters,
1586           new_load_reporting_interval);
1587       size_t i = 0;
1588       for (const auto& name : new_cluster_names) {
1589         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1590                 xds_client(), i++, name.c_str());
1591       }
1592     }
1593     if (new_load_reporting_interval <
1594         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1595       new_load_reporting_interval =
1596           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1597       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1598         gpr_log(GPR_INFO,
1599                 "[xds_client %p] Increased load_report_interval to minimum "
1600                 "value %dms",
1601                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1602       }
1603     }
1604     // Ignore identical update.
1605     if (send_all_clusters == send_all_clusters_ &&
1606         cluster_names_ == new_cluster_names &&
1607         load_reporting_interval_ == new_load_reporting_interval) {
1608       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1609         gpr_log(GPR_INFO,
1610                 "[xds_client %p] Incoming LRS response identical to current, "
1611                 "ignoring.",
1612                 xds_client());
1613       }
1614       return;
1615     }
1616     // Stop current load reporting (if any) to adopt the new config.
1617     reporter_.reset();
1618     // Record the new config.
1619     send_all_clusters_ = send_all_clusters;
1620     cluster_names_ = std::move(new_cluster_names);
1621     load_reporting_interval_ = new_load_reporting_interval;
1622     // Try starting sending load report.
1623     MaybeStartReportingLocked();
1624   }();
1625   grpc_slice_unref_internal(response_slice);
1626   if (xds_client()->shutting_down_) return true;
1627   // Keep listening for LRS config updates.
1628   grpc_op op;
1629   memset(&op, 0, sizeof(op));
1630   op.op = GRPC_OP_RECV_MESSAGE;
1631   op.data.recv_message.recv_message = &recv_message_payload_;
1632   op.flags = 0;
1633   op.reserved = nullptr;
1634   GPR_ASSERT(call_ != nullptr);
1635   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1636   const grpc_call_error call_error =
1637       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1638   GPR_ASSERT(GRPC_CALL_OK == call_error);
1639   return false;
1640 }
1641
1642 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1643     void* arg, grpc_error* error) {
1644   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1645   {
1646     MutexLock lock(&lrs_calld->xds_client()->mu_);
1647     lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1648   }
1649   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1650 }
1651
1652 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1653     grpc_error* error) {
1654   GPR_ASSERT(call_ != nullptr);
1655   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1656     char* status_details = grpc_slice_to_c_string(status_details_);
1657     gpr_log(GPR_INFO,
1658             "[xds_client %p] LRS call status received. Status = %d, details "
1659             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1660             xds_client(), status_code_, status_details, chand(), this, call_,
1661             grpc_error_string(error));
1662     gpr_free(status_details);
1663   }
1664   // Ignore status from a stale call.
1665   if (IsCurrentCallOnChannel()) {
1666     GPR_ASSERT(!xds_client()->shutting_down_);
1667     // Try to restart the call.
1668     parent_->OnCallFinishedLocked();
1669   }
1670   GRPC_ERROR_UNREF(error);
1671 }
1672
1673 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1674   // If the retryable LRS call is null (which only happens when the xds channel
1675   // is shutting down), all the LRS calls are stale.
1676   if (chand()->lrs_calld_ == nullptr) return false;
1677   return this == chand()->lrs_calld_->calld();
1678 }
1679
1680 //
1681 // XdsClient
1682 //
1683
1684 namespace {
1685
1686 grpc_millis GetRequestTimeout() {
1687   return grpc_channel_args_find_integer(
1688       g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1689       {15000, 0, INT_MAX});
1690 }
1691
1692 grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
1693                                grpc_error** error) {
1694   // Build channel args.
1695   absl::InlinedVector<grpc_arg, 2> args_to_add = {
1696       grpc_channel_arg_integer_create(
1697           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1698           5 * 60 * GPR_MS_PER_SEC),
1699       grpc_channel_arg_integer_create(
1700           const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1701   };
1702   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
1703       g_channel_args, args_to_add.data(), args_to_add.size());
1704   // Find credentials and create channel.
1705   RefCountedPtr<grpc_channel_credentials> creds;
1706   for (const auto& channel_creds : bootstrap.server().channel_creds) {
1707     if (channel_creds.type == "google_default") {
1708       creds.reset(grpc_google_default_credentials_create(nullptr));
1709       break;
1710     }
1711     if (channel_creds.type == "insecure") {
1712       grpc_channel* channel = grpc_insecure_channel_create(
1713           bootstrap.server().server_uri.c_str(), new_args, nullptr);
1714       grpc_channel_args_destroy(new_args);
1715       return channel;
1716     }
1717     if (channel_creds.type == "fake") {
1718       creds.reset(grpc_fake_transport_security_credentials_create());
1719       break;
1720     }
1721   }
1722   if (creds == nullptr) {
1723     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1724         "no supported credential types found");
1725     return nullptr;
1726   }
1727   grpc_channel* channel = grpc_secure_channel_create(
1728       creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr);
1729   grpc_channel_args_destroy(new_args);
1730   return channel;
1731 }
1732
1733 }  // namespace
1734
1735 XdsClient::XdsClient(grpc_error** error)
1736     : DualRefCounted<XdsClient>(&grpc_xds_client_trace),
1737       request_timeout_(GetRequestTimeout()),
1738       interested_parties_(grpc_pollset_set_create()),
1739       bootstrap_(
1740           XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1741       api_(this, &grpc_xds_client_trace, bootstrap_.get()) {
1742   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1743     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1744   }
1745   if (*error != GRPC_ERROR_NONE) {
1746     gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1747             this, grpc_error_string(*error));
1748     return;
1749   }
1750   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1751     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
1752             bootstrap_->server().server_uri.c_str());
1753   }
1754   grpc_channel* channel = CreateXdsChannel(*bootstrap_, error);
1755   if (*error != GRPC_ERROR_NONE) {
1756     gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
1757             grpc_error_string(*error));
1758     return;
1759   }
1760   // Create ChannelState object.
1761   chand_ = MakeOrphanable<ChannelState>(
1762       WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
1763 }
1764
1765 XdsClient::~XdsClient() {
1766   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1767     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1768   }
1769   grpc_pollset_set_destroy(interested_parties_);
1770 }
1771
1772 void XdsClient::AddChannelzLinkage(
1773     channelz::ChannelNode* parent_channelz_node) {
1774   channelz::ChannelNode* xds_channelz_node =
1775       grpc_channel_get_channelz_node(chand_->channel());
1776   if (xds_channelz_node != nullptr) {
1777     parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1778   }
1779 }
1780
1781 void XdsClient::RemoveChannelzLinkage(
1782     channelz::ChannelNode* parent_channelz_node) {
1783   channelz::ChannelNode* xds_channelz_node =
1784       grpc_channel_get_channelz_node(chand_->channel());
1785   if (xds_channelz_node != nullptr) {
1786     parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1787   }
1788 }
1789
1790 void XdsClient::Orphan() {
1791   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1792     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1793   }
1794   {
1795     MutexLock lock(g_mu);
1796     if (g_xds_client == this) g_xds_client = nullptr;
1797   }
1798   {
1799     MutexLock lock(&mu_);
1800     shutting_down_ = true;
1801     // Orphan ChannelState object.
1802     chand_.reset();
1803     // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1804     // created by the XdsResolver because the maps contain refs for watchers
1805     // which in turn hold refs to the loadbalancing policies. At this point, it
1806     // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1807     // policies before those calls are done would lead to issues such as
1808     // https://github.com/grpc/grpc/issues/20928.
1809     if (!listener_map_.empty()) {
1810       cluster_map_.clear();
1811       endpoint_map_.clear();
1812     }
1813   }
1814 }
1815
1816 void XdsClient::WatchListenerData(
1817     absl::string_view listener_name,
1818     std::unique_ptr<ListenerWatcherInterface> watcher) {
1819   std::string listener_name_str = std::string(listener_name);
1820   MutexLock lock(&mu_);
1821   ListenerState& listener_state = listener_map_[listener_name_str];
1822   ListenerWatcherInterface* w = watcher.get();
1823   listener_state.watchers[w] = std::move(watcher);
1824   // If we've already received an LDS update, notify the new watcher
1825   // immediately.
1826   if (listener_state.update.has_value()) {
1827     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1828       gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1829               this, listener_name_str.c_str());
1830     }
1831     w->OnListenerChanged(*listener_state.update);
1832   }
1833   chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1834 }
1835
1836 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1837                                         ListenerWatcherInterface* watcher,
1838                                         bool delay_unsubscription) {
1839   MutexLock lock(&mu_);
1840   if (shutting_down_) return;
1841   std::string listener_name_str = std::string(listener_name);
1842   ListenerState& listener_state = listener_map_[listener_name_str];
1843   auto it = listener_state.watchers.find(watcher);
1844   if (it != listener_state.watchers.end()) {
1845     listener_state.watchers.erase(it);
1846     if (listener_state.watchers.empty()) {
1847       listener_map_.erase(listener_name_str);
1848       chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1849                           delay_unsubscription);
1850     }
1851   }
1852 }
1853
1854 void XdsClient::WatchRouteConfigData(
1855     absl::string_view route_config_name,
1856     std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1857   std::string route_config_name_str = std::string(route_config_name);
1858   MutexLock lock(&mu_);
1859   RouteConfigState& route_config_state =
1860       route_config_map_[route_config_name_str];
1861   RouteConfigWatcherInterface* w = watcher.get();
1862   route_config_state.watchers[w] = std::move(watcher);
1863   // If we've already received an RDS update, notify the new watcher
1864   // immediately.
1865   if (route_config_state.update.has_value()) {
1866     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1867       gpr_log(GPR_INFO,
1868               "[xds_client %p] returning cached route config data for %s", this,
1869               route_config_name_str.c_str());
1870     }
1871     w->OnRouteConfigChanged(*route_config_state.update);
1872   }
1873   chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1874 }
1875
1876 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1877                                            RouteConfigWatcherInterface* watcher,
1878                                            bool delay_unsubscription) {
1879   MutexLock lock(&mu_);
1880   if (shutting_down_) return;
1881   std::string route_config_name_str = std::string(route_config_name);
1882   RouteConfigState& route_config_state =
1883       route_config_map_[route_config_name_str];
1884   auto it = route_config_state.watchers.find(watcher);
1885   if (it != route_config_state.watchers.end()) {
1886     route_config_state.watchers.erase(it);
1887     if (route_config_state.watchers.empty()) {
1888       route_config_map_.erase(route_config_name_str);
1889       chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1890                           delay_unsubscription);
1891     }
1892   }
1893 }
1894
1895 void XdsClient::WatchClusterData(
1896     absl::string_view cluster_name,
1897     std::unique_ptr<ClusterWatcherInterface> watcher) {
1898   std::string cluster_name_str = std::string(cluster_name);
1899   MutexLock lock(&mu_);
1900   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1901   ClusterWatcherInterface* w = watcher.get();
1902   cluster_state.watchers[w] = std::move(watcher);
1903   // If we've already received a CDS update, notify the new watcher
1904   // immediately.
1905   if (cluster_state.update.has_value()) {
1906     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1907       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1908               this, cluster_name_str.c_str());
1909     }
1910     w->OnClusterChanged(cluster_state.update.value());
1911   }
1912   chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1913 }
1914
1915 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1916                                        ClusterWatcherInterface* watcher,
1917                                        bool delay_unsubscription) {
1918   MutexLock lock(&mu_);
1919   if (shutting_down_) return;
1920   std::string cluster_name_str = std::string(cluster_name);
1921   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1922   auto it = cluster_state.watchers.find(watcher);
1923   if (it != cluster_state.watchers.end()) {
1924     cluster_state.watchers.erase(it);
1925     if (cluster_state.watchers.empty()) {
1926       cluster_map_.erase(cluster_name_str);
1927       chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1928                           delay_unsubscription);
1929     }
1930   }
1931 }
1932
1933 void XdsClient::WatchEndpointData(
1934     absl::string_view eds_service_name,
1935     std::unique_ptr<EndpointWatcherInterface> watcher) {
1936   std::string eds_service_name_str = std::string(eds_service_name);
1937   MutexLock lock(&mu_);
1938   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1939   EndpointWatcherInterface* w = watcher.get();
1940   endpoint_state.watchers[w] = std::move(watcher);
1941   // If we've already received an EDS update, notify the new watcher
1942   // immediately.
1943   if (endpoint_state.update.has_value()) {
1944     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1945       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1946               this, eds_service_name_str.c_str());
1947     }
1948     w->OnEndpointChanged(endpoint_state.update.value());
1949   }
1950   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1951 }
1952
1953 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1954                                         EndpointWatcherInterface* watcher,
1955                                         bool delay_unsubscription) {
1956   MutexLock lock(&mu_);
1957   if (shutting_down_) return;
1958   std::string eds_service_name_str = std::string(eds_service_name);
1959   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1960   auto it = endpoint_state.watchers.find(watcher);
1961   if (it != endpoint_state.watchers.end()) {
1962     endpoint_state.watchers.erase(it);
1963     if (endpoint_state.watchers.empty()) {
1964       endpoint_map_.erase(eds_service_name_str);
1965       chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1966                           delay_unsubscription);
1967     }
1968   }
1969 }
1970
1971 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1972     absl::string_view lrs_server, absl::string_view cluster_name,
1973     absl::string_view eds_service_name) {
1974   // TODO(roth): When we add support for direct federation, use the
1975   // server name specified in lrs_server.
1976   auto key =
1977       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1978   MutexLock lock(&mu_);
1979   // We jump through some hoops here to make sure that the absl::string_views
1980   // stored in the XdsClusterDropStats object point to the strings
1981   // in the load_report_map_ key, so that they have the same lifetime.
1982   auto it = load_report_map_
1983                 .emplace(std::make_pair(std::move(key), LoadReportState()))
1984                 .first;
1985   auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1986       Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1987       it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
1988   it->second.drop_stats.insert(cluster_drop_stats.get());
1989   chand_->MaybeStartLrsCall();
1990   return cluster_drop_stats;
1991 }
1992
1993 void XdsClient::RemoveClusterDropStats(
1994     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
1995     absl::string_view eds_service_name,
1996     XdsClusterDropStats* cluster_drop_stats) {
1997   MutexLock lock(&mu_);
1998   auto load_report_it = load_report_map_.find(
1999       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2000   if (load_report_it == load_report_map_.end()) return;
2001   LoadReportState& load_report_state = load_report_it->second;
2002   // TODO(roth): When we add support for direct federation, use the
2003   // server name specified in lrs_server.
2004   auto it = load_report_state.drop_stats.find(cluster_drop_stats);
2005   if (it != load_report_state.drop_stats.end()) {
2006     // Record final drop stats in deleted_drop_stats, which will be
2007     // added to the next load report.
2008     auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset();
2009     load_report_state.deleted_drop_stats += dropped_requests;
2010     load_report_state.drop_stats.erase(it);
2011   }
2012 }
2013
2014 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2015     absl::string_view lrs_server, absl::string_view cluster_name,
2016     absl::string_view eds_service_name,
2017     RefCountedPtr<XdsLocalityName> locality) {
2018   // TODO(roth): When we add support for direct federation, use the
2019   // server name specified in lrs_server.
2020   auto key =
2021       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2022   MutexLock lock(&mu_);
2023   // We jump through some hoops here to make sure that the absl::string_views
2024   // stored in the XdsClusterLocalityStats object point to the strings
2025   // in the load_report_map_ key, so that they have the same lifetime.
2026   auto it = load_report_map_
2027                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2028                 .first;
2029   auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2030       Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2031       it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2032       locality);
2033   it->second.locality_stats[std::move(locality)].locality_stats.insert(
2034       cluster_locality_stats.get());
2035   chand_->MaybeStartLrsCall();
2036   return cluster_locality_stats;
2037 }
2038
2039 void XdsClient::RemoveClusterLocalityStats(
2040     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2041     absl::string_view eds_service_name,
2042     const RefCountedPtr<XdsLocalityName>& locality,
2043     XdsClusterLocalityStats* cluster_locality_stats) {
2044   MutexLock lock(&mu_);
2045   auto load_report_it = load_report_map_.find(
2046       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2047   if (load_report_it == load_report_map_.end()) return;
2048   LoadReportState& load_report_state = load_report_it->second;
2049   // TODO(roth): When we add support for direct federation, use the
2050   // server name specified in lrs_server.
2051   auto locality_it = load_report_state.locality_stats.find(locality);
2052   if (locality_it == load_report_state.locality_stats.end()) return;
2053   auto& locality_set = locality_it->second.locality_stats;
2054   auto it = locality_set.find(cluster_locality_stats);
2055   if (it != locality_set.end()) {
2056     // Record final snapshot in deleted_locality_stats, which will be
2057     // added to the next load report.
2058     locality_it->second.deleted_locality_stats.emplace_back(
2059         cluster_locality_stats->GetSnapshotAndReset());
2060     locality_set.erase(it);
2061   }
2062 }
2063
2064 void XdsClient::ResetBackoff() {
2065   MutexLock lock(&mu_);
2066   if (chand_ != nullptr) {
2067     grpc_channel_reset_connect_backoff(chand_->channel());
2068   }
2069 }
2070
2071 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2072   for (const auto& p : listener_map_) {
2073     const ListenerState& listener_state = p.second;
2074     for (const auto& p : listener_state.watchers) {
2075       p.first->OnError(GRPC_ERROR_REF(error));
2076     }
2077   }
2078   for (const auto& p : route_config_map_) {
2079     const RouteConfigState& route_config_state = p.second;
2080     for (const auto& p : route_config_state.watchers) {
2081       p.first->OnError(GRPC_ERROR_REF(error));
2082     }
2083   }
2084   for (const auto& p : cluster_map_) {
2085     const ClusterState& cluster_state = p.second;
2086     for (const auto& p : cluster_state.watchers) {
2087       p.first->OnError(GRPC_ERROR_REF(error));
2088     }
2089   }
2090   for (const auto& p : endpoint_map_) {
2091     const EndpointState& endpoint_state = p.second;
2092     for (const auto& p : endpoint_state.watchers) {
2093       p.first->OnError(GRPC_ERROR_REF(error));
2094     }
2095   }
2096   GRPC_ERROR_UNREF(error);
2097 }
2098
2099 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2100     bool send_all_clusters, const std::set<std::string>& clusters) {
2101   XdsApi::ClusterLoadReportMap snapshot_map;
2102   for (auto load_report_it = load_report_map_.begin();
2103        load_report_it != load_report_map_.end();) {
2104     // Cluster key is cluster and EDS service name.
2105     const auto& cluster_key = load_report_it->first;
2106     LoadReportState& load_report = load_report_it->second;
2107     // If the CDS response for a cluster indicates to use LRS but the
2108     // LRS server does not say that it wants reports for this cluster,
2109     // then we'll have stats objects here whose data we're not going to
2110     // include in the load report.  However, we still need to clear out
2111     // the data from the stats objects, so that if the LRS server starts
2112     // asking for the data in the future, we don't incorrectly include
2113     // data from previous reporting intervals in that future report.
2114     const bool record_stats =
2115         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2116     XdsApi::ClusterLoadReport snapshot;
2117     // Aggregate drop stats.
2118     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2119     for (auto& drop_stats : load_report.drop_stats) {
2120       auto dropped_requests = drop_stats->GetSnapshotAndReset();
2121       snapshot.dropped_requests += dropped_requests;
2122     }
2123     // Aggregate locality stats.
2124     for (auto it = load_report.locality_stats.begin();
2125          it != load_report.locality_stats.end();) {
2126       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2127       auto& locality_state = it->second;
2128       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2129           snapshot.locality_stats[locality_name];
2130       for (auto& locality_stats : locality_state.locality_stats) {
2131         locality_snapshot += locality_stats->GetSnapshotAndReset();
2132       }
2133       // Add final snapshots from recently deleted locality stats objects.
2134       for (auto& deleted_locality_stats :
2135            locality_state.deleted_locality_stats) {
2136         locality_snapshot += deleted_locality_stats;
2137       }
2138       locality_state.deleted_locality_stats.clear();
2139       // If the only thing left in this entry was final snapshots from
2140       // deleted locality stats objects, remove the entry.
2141       if (locality_state.locality_stats.empty()) {
2142         it = load_report.locality_stats.erase(it);
2143       } else {
2144         ++it;
2145       }
2146     }
2147     if (record_stats) {
2148       // Compute load report interval.
2149       const grpc_millis now = ExecCtx::Get()->Now();
2150       snapshot.load_report_interval = now - load_report.last_report_time;
2151       load_report.last_report_time = now;
2152       // Record snapshot.
2153       snapshot_map[cluster_key] = std::move(snapshot);
2154     }
2155     // If the only thing left in this entry was final snapshots from
2156     // deleted stats objects, remove the entry.
2157     if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
2158       load_report_it = load_report_map_.erase(load_report_it);
2159     } else {
2160       ++load_report_it;
2161     }
2162   }
2163   return snapshot_map;
2164 }
2165
2166 //
2167 // accessors for global state
2168 //
2169
2170 void XdsClientGlobalInit() { g_mu = new Mutex; }
2171
2172 void XdsClientGlobalShutdown() {
2173   delete g_mu;
2174   g_mu = nullptr;
2175 }
2176
2177 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2178   MutexLock lock(g_mu);
2179   if (g_xds_client != nullptr) {
2180     auto xds_client = g_xds_client->RefIfNonZero();
2181     if (xds_client != nullptr) return xds_client;
2182   }
2183   auto xds_client = MakeRefCounted<XdsClient>(error);
2184   g_xds_client = xds_client.get();
2185   return xds_client;
2186 }
2187
2188 namespace internal {
2189
2190 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2191   MutexLock lock(g_mu);
2192   g_channel_args = args;
2193 }
2194
2195 void UnsetGlobalXdsClientForTest() {
2196   MutexLock lock(g_mu);
2197   g_xds_client = nullptr;
2198 }
2199
2200 }  // namespace internal
2201
2202 }  // namespace grpc_core