Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / ext / xds / xds_client.cc
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_client.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_bootstrap.h"
39 #include "src/core/ext/xds/xds_channel_args.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/ext/xds/xds_http_filters.h"
42 #include "src/core/lib/address_utils/sockaddr_utils.h"
43 #include "src/core/lib/backoff/backoff.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/channel/channel_stack.h"
46 #include "src/core/lib/gpr/env.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/memory.h"
49 #include "src/core/lib/gprpp/orphanable.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/gprpp/sync.h"
52 #include "src/core/lib/iomgr/sockaddr.h"
53 #include "src/core/lib/iomgr/timer.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/slice/slice_string_helpers.h"
56 #include "src/core/lib/surface/call.h"
57 #include "src/core/lib/surface/channel.h"
58 #include "src/core/lib/surface/channel_init.h"
59 #include "src/core/lib/transport/static_metadata.h"
60
61 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
62 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
63 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
64 #define GRPC_XDS_RECONNECT_JITTER 0.2
65 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
66
67 namespace grpc_core {
68
69 TraceFlag grpc_xds_client_trace(false, "xds_client");
70 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
71
72 namespace {
73
74 Mutex* g_mu = nullptr;
75 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
76 XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
77 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
78
79 }  // namespace
80
81 //
82 // Internal class declarations
83 //
84
85 // An xds call wrapper that can restart a call upon failure. Holds a ref to
86 // the xds channel. The template parameter is the kind of wrapped xds call.
87 template <typename T>
88 class XdsClient::ChannelState::RetryableCall
89     : public InternallyRefCounted<RetryableCall<T>> {
90  public:
91   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
92
93   void Orphan() override;
94
95   void OnCallFinishedLocked();
96
97   T* calld() const { return calld_.get(); }
98   ChannelState* chand() const { return chand_.get(); }
99
100   bool IsCurrentCallOnChannel() const;
101
102  private:
103   void StartNewCallLocked();
104   void StartRetryTimerLocked();
105   static void OnRetryTimer(void* arg, grpc_error_handle error);
106   void OnRetryTimerLocked(grpc_error_handle error);
107
108   // The wrapped xds call that talks to the xds server. It's instantiated
109   // every time we start a new call. It's null during call retry backoff.
110   OrphanablePtr<T> calld_;
111   // The owning xds channel.
112   RefCountedPtr<ChannelState> chand_;
113
114   // Retry state.
115   BackOff backoff_;
116   grpc_timer retry_timer_;
117   grpc_closure on_retry_timer_;
118   bool retry_timer_callback_pending_ = false;
119
120   bool shutting_down_ = false;
121 };
122
123 // Contains an ADS call to the xds server.
124 class XdsClient::ChannelState::AdsCallState
125     : public InternallyRefCounted<AdsCallState> {
126  public:
127   // The ctor and dtor should not be used directly.
128   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
129   ~AdsCallState() override;
130
131   void Orphan() override;
132
133   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
134   ChannelState* chand() const { return parent_->chand(); }
135   XdsClient* xds_client() const { return chand()->xds_client(); }
136   bool seen_response() const { return seen_response_; }
137
138   void SubscribeLocked(const std::string& type_url, const std::string& name)
139       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
140   void UnsubscribeLocked(const std::string& type_url, const std::string& name,
141                          bool delay_unsubscription)
142       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
143
144   bool HasSubscribedResources() const;
145
146  private:
147   class ResourceState : public InternallyRefCounted<ResourceState> {
148    public:
149     ResourceState(const std::string& type_url, const std::string& name,
150                   bool sent_initial_request)
151         : type_url_(type_url),
152           name_(name),
153           sent_initial_request_(sent_initial_request) {
154       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
155                         grpc_schedule_on_exec_ctx);
156     }
157
158     void Orphan() override {
159       Finish();
160       Unref(DEBUG_LOCATION, "Orphan");
161     }
162
163     void Start(RefCountedPtr<AdsCallState> ads_calld) {
164       if (sent_initial_request_) return;
165       sent_initial_request_ = true;
166       ads_calld_ = std::move(ads_calld);
167       Ref(DEBUG_LOCATION, "timer").release();
168       timer_pending_ = true;
169       grpc_timer_init(
170           &timer_,
171           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
172           &timer_callback_);
173     }
174
175     void Finish() {
176       if (timer_pending_) {
177         grpc_timer_cancel(&timer_);
178         timer_pending_ = false;
179       }
180     }
181
182    private:
183     static void OnTimer(void* arg, grpc_error_handle error) {
184       ResourceState* self = static_cast<ResourceState*>(arg);
185       {
186         MutexLock lock(&self->ads_calld_->xds_client()->mu_);
187         self->OnTimerLocked(GRPC_ERROR_REF(error));
188       }
189       self->ads_calld_.reset();
190       self->Unref(DEBUG_LOCATION, "timer");
191     }
192
193     void OnTimerLocked(grpc_error_handle error)
194         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
195       if (error == GRPC_ERROR_NONE && timer_pending_) {
196         timer_pending_ = false;
197         grpc_error_handle watcher_error =
198             GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
199                 "timeout obtaining resource {type=%s name=%s} from xds server",
200                 type_url_, name_));
201         watcher_error = grpc_error_set_int(
202             watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
203         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
204           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
205                   grpc_error_std_string(watcher_error).c_str());
206         }
207         if (type_url_ == XdsApi::kLdsTypeUrl) {
208           ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
209           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
210           for (const auto& p : state.watchers) {
211             p.first->OnError(GRPC_ERROR_REF(watcher_error));
212           }
213         } else if (type_url_ == XdsApi::kRdsTypeUrl) {
214           RouteConfigState& state =
215               ads_calld_->xds_client()->route_config_map_[name_];
216           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
217           for (const auto& p : state.watchers) {
218             p.first->OnError(GRPC_ERROR_REF(watcher_error));
219           }
220         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
221           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
222           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
223           for (const auto& p : state.watchers) {
224             p.first->OnError(GRPC_ERROR_REF(watcher_error));
225           }
226         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
227           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
228           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
229           for (const auto& p : state.watchers) {
230             p.first->OnError(GRPC_ERROR_REF(watcher_error));
231           }
232         } else {
233           GPR_UNREACHABLE_CODE(return );
234         }
235         GRPC_ERROR_UNREF(watcher_error);
236       }
237       GRPC_ERROR_UNREF(error);
238     }
239
240     const std::string type_url_;
241     const std::string name_;
242
243     RefCountedPtr<AdsCallState> ads_calld_;
244     bool sent_initial_request_;
245     bool timer_pending_ = false;
246     grpc_timer timer_;
247     grpc_closure timer_callback_;
248   };
249
250   struct ResourceTypeState {
251     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
252
253     // Nonce and error for this resource type.
254     std::string nonce;
255     grpc_error_handle error = GRPC_ERROR_NONE;
256
257     // Subscribed resources of this type.
258     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
259         subscribed_resources;
260   };
261
262   void SendMessageLocked(const std::string& type_url)
263       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
264
265   void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
266                              XdsApi::LdsUpdateMap lds_update_map,
267                              const std::set<std::string>& resource_names_failed)
268       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
269   void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
270                              XdsApi::RdsUpdateMap rds_update_map)
271       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
272   void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
273                              XdsApi::CdsUpdateMap cds_update_map,
274                              const std::set<std::string>& resource_names_failed)
275       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
276   void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
277                              XdsApi::EdsUpdateMap eds_update_map)
278       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
279
280   template <typename StateMap>
281   void RejectAdsUpdateLocked(grpc_millis update_time,
282                              const XdsApi::AdsParseResult& result,
283                              StateMap* state_map)
284       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
285
286   static void OnRequestSent(void* arg, grpc_error_handle error);
287   void OnRequestSentLocked(grpc_error_handle error)
288       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
289   static void OnResponseReceived(void* arg, grpc_error_handle error);
290   bool OnResponseReceivedLocked()
291       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
292   static void OnStatusReceived(void* arg, grpc_error_handle error);
293   void OnStatusReceivedLocked(grpc_error_handle error)
294       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
295
296   bool IsCurrentCallOnChannel() const;
297
298   std::set<absl::string_view> ResourceNamesForRequest(
299       const std::string& type_url);
300
301   // The owning RetryableCall<>.
302   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
303
304   bool sent_initial_message_ = false;
305   bool seen_response_ = false;
306
307   // Always non-NULL.
308   grpc_call* call_;
309
310   // recv_initial_metadata
311   grpc_metadata_array initial_metadata_recv_;
312
313   // send_message
314   grpc_byte_buffer* send_message_payload_ = nullptr;
315   grpc_closure on_request_sent_;
316
317   // recv_message
318   grpc_byte_buffer* recv_message_payload_ = nullptr;
319   grpc_closure on_response_received_;
320
321   // recv_trailing_metadata
322   grpc_metadata_array trailing_metadata_recv_;
323   grpc_status_code status_code_;
324   grpc_slice status_details_;
325   grpc_closure on_status_received_;
326
327   // Resource types for which requests need to be sent.
328   std::set<std::string /*type_url*/> buffered_requests_;
329
330   // State for each resource type.
331   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
332 };
333
334 // Contains an LRS call to the xds server.
335 class XdsClient::ChannelState::LrsCallState
336     : public InternallyRefCounted<LrsCallState> {
337  public:
338   // The ctor and dtor should not be used directly.
339   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
340   ~LrsCallState() override;
341
342   void Orphan() override;
343
344   void MaybeStartReportingLocked();
345
346   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
347   ChannelState* chand() const { return parent_->chand(); }
348   XdsClient* xds_client() const { return chand()->xds_client(); }
349   bool seen_response() const { return seen_response_; }
350
351  private:
352   // Reports client-side load stats according to a fixed interval.
353   class Reporter : public InternallyRefCounted<Reporter> {
354    public:
355     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
356         : parent_(std::move(parent)), report_interval_(report_interval) {
357       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
358                         grpc_schedule_on_exec_ctx);
359       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
360                         grpc_schedule_on_exec_ctx);
361       ScheduleNextReportLocked();
362     }
363
364     void Orphan() override;
365
366    private:
367     void ScheduleNextReportLocked()
368         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
369     static void OnNextReportTimer(void* arg, grpc_error_handle error);
370     bool OnNextReportTimerLocked(grpc_error_handle error)
371         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
372     bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
373     static void OnReportDone(void* arg, grpc_error_handle error);
374     bool OnReportDoneLocked(grpc_error_handle error)
375         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
376
377     bool IsCurrentReporterOnCall() const {
378       return this == parent_->reporter_.get();
379     }
380     XdsClient* xds_client() const { return parent_->xds_client(); }
381
382     // The owning LRS call.
383     RefCountedPtr<LrsCallState> parent_;
384
385     // The load reporting state.
386     const grpc_millis report_interval_;
387     bool last_report_counters_were_zero_ = false;
388     bool next_report_timer_callback_pending_ = false;
389     grpc_timer next_report_timer_;
390     grpc_closure on_next_report_timer_;
391     grpc_closure on_report_done_;
392   };
393
394   static void OnInitialRequestSent(void* arg, grpc_error_handle error);
395   void OnInitialRequestSentLocked()
396       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
397   static void OnResponseReceived(void* arg, grpc_error_handle error);
398   bool OnResponseReceivedLocked()
399       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
400   static void OnStatusReceived(void* arg, grpc_error_handle error);
401   void OnStatusReceivedLocked(grpc_error_handle error)
402       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
403
404   bool IsCurrentCallOnChannel() const;
405
406   // The owning RetryableCall<>.
407   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
408   bool seen_response_ = false;
409
410   // Always non-NULL.
411   grpc_call* call_;
412
413   // recv_initial_metadata
414   grpc_metadata_array initial_metadata_recv_;
415
416   // send_message
417   grpc_byte_buffer* send_message_payload_ = nullptr;
418   grpc_closure on_initial_request_sent_;
419
420   // recv_message
421   grpc_byte_buffer* recv_message_payload_ = nullptr;
422   grpc_closure on_response_received_;
423
424   // recv_trailing_metadata
425   grpc_metadata_array trailing_metadata_recv_;
426   grpc_status_code status_code_;
427   grpc_slice status_details_;
428   grpc_closure on_status_received_;
429
430   // Load reporting state.
431   bool send_all_clusters_ = false;
432   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
433   grpc_millis load_reporting_interval_ = 0;
434   OrphanablePtr<Reporter> reporter_;
435 };
436
437 //
438 // XdsClient::ChannelState::StateWatcher
439 //
440
441 class XdsClient::ChannelState::StateWatcher
442     : public AsyncConnectivityStateWatcherInterface {
443  public:
444   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
445       : parent_(std::move(parent)) {}
446
447  private:
448   void OnConnectivityStateChange(grpc_connectivity_state new_state,
449                                  const absl::Status& status) override {
450     MutexLock lock(&parent_->xds_client_->mu_);
451     if (!parent_->shutting_down_ &&
452         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
453       // In TRANSIENT_FAILURE.  Notify all watchers of error.
454       gpr_log(GPR_INFO,
455               "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
456               "status_message:(%s)",
457               parent_->xds_client(), status.ToString().c_str());
458       parent_->xds_client_->NotifyOnErrorLocked(
459           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
460               "xds channel in TRANSIENT_FAILURE"));
461     }
462   }
463
464   RefCountedPtr<ChannelState> parent_;
465 };
466
467 //
468 // XdsClient::ChannelState
469 //
470
471 namespace {
472
473 grpc_channel* CreateXdsChannel(grpc_channel_args* args,
474                                const XdsBootstrap::XdsServer& server) {
475   RefCountedPtr<grpc_channel_credentials> channel_creds =
476       XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
477                                                 server.channel_creds_config);
478   return grpc_secure_channel_create(channel_creds.get(),
479                                     server.server_uri.c_str(), args, nullptr);
480 }
481
482 }  // namespace
483
484 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
485                                       const XdsBootstrap::XdsServer& server)
486     : InternallyRefCounted<ChannelState>(
487           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
488               ? "ChannelState"
489               : nullptr),
490       xds_client_(std::move(xds_client)),
491       server_(server) {
492   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
493     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
494             xds_client_.get(), server.server_uri.c_str());
495   }
496   channel_ = CreateXdsChannel(xds_client_->args_, server);
497   GPR_ASSERT(channel_ != nullptr);
498   StartConnectivityWatchLocked();
499 }
500
501 XdsClient::ChannelState::~ChannelState() {
502   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
503     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
504             this);
505   }
506   grpc_channel_destroy(channel_);
507   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
508 }
509
510 void XdsClient::ChannelState::Orphan() {
511   shutting_down_ = true;
512   CancelConnectivityWatchLocked();
513   ads_calld_.reset();
514   lrs_calld_.reset();
515   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
516 }
517
518 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
519     const {
520   return ads_calld_->calld();
521 }
522
523 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
524     const {
525   return lrs_calld_->calld();
526 }
527
528 bool XdsClient::ChannelState::HasActiveAdsCall() const {
529   return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
530 }
531
532 void XdsClient::ChannelState::MaybeStartLrsCall() {
533   if (lrs_calld_ != nullptr) return;
534   lrs_calld_.reset(
535       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
536 }
537
538 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
539
540 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
541   ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
542   GPR_ASSERT(client_channel != nullptr);
543   watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
544   client_channel->AddConnectivityWatcher(
545       GRPC_CHANNEL_IDLE,
546       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
547 }
548
549 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
550   ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
551   GPR_ASSERT(client_channel != nullptr);
552   client_channel->RemoveConnectivityWatcher(watcher_);
553 }
554
555 void XdsClient::ChannelState::SubscribeLocked(const std::string& type_url,
556                                               const std::string& name) {
557   if (ads_calld_ == nullptr) {
558     // Start the ADS call if this is the first request.
559     ads_calld_.reset(new RetryableCall<AdsCallState>(
560         Ref(DEBUG_LOCATION, "ChannelState+ads")));
561     // Note: AdsCallState's ctor will automatically subscribe to all
562     // resources that the XdsClient already has watchers for, so we can
563     // return here.
564     return;
565   }
566   // If the ADS call is in backoff state, we don't need to do anything now
567   // because when the call is restarted it will resend all necessary requests.
568   if (ads_calld() == nullptr) return;
569   // Subscribe to this resource if the ADS call is active.
570   ads_calld()->SubscribeLocked(type_url, name);
571 }
572
573 void XdsClient::ChannelState::UnsubscribeLocked(const std::string& type_url,
574                                                 const std::string& name,
575                                                 bool delay_unsubscription) {
576   if (ads_calld_ != nullptr) {
577     auto* calld = ads_calld_->calld();
578     if (calld != nullptr) {
579       calld->UnsubscribeLocked(type_url, name, delay_unsubscription);
580       if (!calld->HasSubscribedResources()) ads_calld_.reset();
581     }
582   }
583 }
584
585 //
586 // XdsClient::ChannelState::RetryableCall<>
587 //
588
589 template <typename T>
590 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
591     RefCountedPtr<ChannelState> chand)
592     : chand_(std::move(chand)),
593       backoff_(
594           BackOff::Options()
595               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
596                                    1000)
597               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
598               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
599               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
600   // Closure Initialization
601   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
602                     grpc_schedule_on_exec_ctx);
603   StartNewCallLocked();
604 }
605
606 template <typename T>
607 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
608   shutting_down_ = true;
609   calld_.reset();
610   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
611   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
612 }
613
614 template <typename T>
615 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
616   const bool seen_response = calld_->seen_response();
617   calld_.reset();
618   if (seen_response) {
619     // If we lost connection to the xds server, reset backoff and restart the
620     // call immediately.
621     backoff_.Reset();
622     StartNewCallLocked();
623   } else {
624     // If we failed to connect to the xds server, retry later.
625     StartRetryTimerLocked();
626   }
627 }
628
629 template <typename T>
630 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
631   if (shutting_down_) return;
632   GPR_ASSERT(chand_->channel_ != nullptr);
633   GPR_ASSERT(calld_ == nullptr);
634   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
635     gpr_log(GPR_INFO,
636             "[xds_client %p] Start new call from retryable call (chand: %p, "
637             "retryable call: %p)",
638             chand()->xds_client(), chand(), this);
639   }
640   calld_ = MakeOrphanable<T>(
641       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
642 }
643
644 template <typename T>
645 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
646   if (shutting_down_) return;
647   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
648   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
649     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
650     gpr_log(GPR_INFO,
651             "[xds_client %p] Failed to connect to xds server (chand: %p) "
652             "retry timer will fire in %" PRId64 "ms.",
653             chand()->xds_client(), chand(), timeout);
654   }
655   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
656   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
657   retry_timer_callback_pending_ = true;
658 }
659
660 template <typename T>
661 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
662     void* arg, grpc_error_handle error) {
663   RetryableCall* calld = static_cast<RetryableCall*>(arg);
664   {
665     MutexLock lock(&calld->chand_->xds_client()->mu_);
666     calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
667   }
668   calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
669 }
670
671 template <typename T>
672 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
673     grpc_error_handle error) {
674   retry_timer_callback_pending_ = false;
675   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
676     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
677       gpr_log(
678           GPR_INFO,
679           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
680           chand()->xds_client(), chand(), this);
681     }
682     StartNewCallLocked();
683   }
684   GRPC_ERROR_UNREF(error);
685 }
686
687 //
688 // XdsClient::ChannelState::AdsCallState
689 //
690
691 XdsClient::ChannelState::AdsCallState::AdsCallState(
692     RefCountedPtr<RetryableCall<AdsCallState>> parent)
693     : InternallyRefCounted<AdsCallState>(
694           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
695               ? "AdsCallState"
696               : nullptr),
697       parent_(std::move(parent)) {
698   // Init the ADS call. Note that the call will progress every time there's
699   // activity in xds_client()->interested_parties_, which is comprised of
700   // the polling entities from client_channel.
701   GPR_ASSERT(xds_client() != nullptr);
702   // Create a call with the specified method name.
703   const auto& method =
704       chand()->server_.ShouldUseV3()
705           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
706           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
707   call_ = grpc_channel_create_pollset_set_call(
708       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
709       xds_client()->interested_parties_, method, nullptr,
710       GRPC_MILLIS_INF_FUTURE, nullptr);
711   GPR_ASSERT(call_ != nullptr);
712   // Init data associated with the call.
713   grpc_metadata_array_init(&initial_metadata_recv_);
714   grpc_metadata_array_init(&trailing_metadata_recv_);
715   // Start the call.
716   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
717     gpr_log(GPR_INFO,
718             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
719             "call: %p)",
720             xds_client(), chand(), this, call_);
721   }
722   // Create the ops.
723   grpc_call_error call_error;
724   grpc_op ops[3];
725   memset(ops, 0, sizeof(ops));
726   // Op: send initial metadata.
727   grpc_op* op = ops;
728   op->op = GRPC_OP_SEND_INITIAL_METADATA;
729   op->data.send_initial_metadata.count = 0;
730   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
731               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
732   op->reserved = nullptr;
733   op++;
734   call_error = grpc_call_start_batch_and_execute(
735       call_, ops, static_cast<size_t>(op - ops), nullptr);
736   GPR_ASSERT(GRPC_CALL_OK == call_error);
737   // Op: send request message.
738   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
739                     grpc_schedule_on_exec_ctx);
740   for (const auto& p : xds_client()->listener_map_) {
741     SubscribeLocked(XdsApi::kLdsTypeUrl, std::string(p.first));
742   }
743   for (const auto& p : xds_client()->route_config_map_) {
744     SubscribeLocked(XdsApi::kRdsTypeUrl, std::string(p.first));
745   }
746   for (const auto& p : xds_client()->cluster_map_) {
747     SubscribeLocked(XdsApi::kCdsTypeUrl, std::string(p.first));
748   }
749   for (const auto& p : xds_client()->endpoint_map_) {
750     SubscribeLocked(XdsApi::kEdsTypeUrl, std::string(p.first));
751   }
752   // Op: recv initial metadata.
753   op = ops;
754   op->op = GRPC_OP_RECV_INITIAL_METADATA;
755   op->data.recv_initial_metadata.recv_initial_metadata =
756       &initial_metadata_recv_;
757   op->flags = 0;
758   op->reserved = nullptr;
759   op++;
760   // Op: recv response.
761   op->op = GRPC_OP_RECV_MESSAGE;
762   op->data.recv_message.recv_message = &recv_message_payload_;
763   op->flags = 0;
764   op->reserved = nullptr;
765   op++;
766   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
767   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
768                     grpc_schedule_on_exec_ctx);
769   call_error = grpc_call_start_batch_and_execute(
770       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
771   GPR_ASSERT(GRPC_CALL_OK == call_error);
772   // Op: recv server status.
773   op = ops;
774   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
775   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
776   op->data.recv_status_on_client.status = &status_code_;
777   op->data.recv_status_on_client.status_details = &status_details_;
778   op->flags = 0;
779   op->reserved = nullptr;
780   op++;
781   // This callback signals the end of the call, so it relies on the initial
782   // ref instead of a new ref. When it's invoked, it's the initial ref that is
783   // unreffed.
784   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
785                     grpc_schedule_on_exec_ctx);
786   call_error = grpc_call_start_batch_and_execute(
787       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
788   GPR_ASSERT(GRPC_CALL_OK == call_error);
789 }
790
791 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
792   grpc_metadata_array_destroy(&initial_metadata_recv_);
793   grpc_metadata_array_destroy(&trailing_metadata_recv_);
794   grpc_byte_buffer_destroy(send_message_payload_);
795   grpc_byte_buffer_destroy(recv_message_payload_);
796   grpc_slice_unref_internal(status_details_);
797   GPR_ASSERT(call_ != nullptr);
798   grpc_call_unref(call_);
799 }
800
801 void XdsClient::ChannelState::AdsCallState::Orphan() {
802   GPR_ASSERT(call_ != nullptr);
803   // If we are here because xds_client wants to cancel the call,
804   // on_status_received_ will complete the cancellation and clean up. Otherwise,
805   // we are here because xds_client has to orphan a failed call, then the
806   // following cancellation will be a no-op.
807   grpc_call_cancel_internal(call_);
808   state_map_.clear();
809   // Note that the initial ref is hold by on_status_received_. So the
810   // corresponding unref happens in on_status_received_ instead of here.
811 }
812
813 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
814     const std::string& type_url)
815     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
816   // Buffer message sending if an existing message is in flight.
817   if (send_message_payload_ != nullptr) {
818     buffered_requests_.insert(type_url);
819     return;
820   }
821   auto& state = state_map_[type_url];
822   grpc_slice request_payload_slice;
823   std::set<absl::string_view> resource_names =
824       ResourceNamesForRequest(type_url);
825   request_payload_slice = xds_client()->api_.CreateAdsRequest(
826       chand()->server_, type_url, resource_names,
827       xds_client()->resource_version_map_[type_url], state.nonce,
828       GRPC_ERROR_REF(state.error), !sent_initial_message_);
829   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
830       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
831     state_map_.erase(type_url);
832   }
833   sent_initial_message_ = true;
834   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
835     gpr_log(GPR_INFO,
836             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
837             "error=%s resources=%s",
838             xds_client(), type_url.c_str(),
839             xds_client()->resource_version_map_[type_url].c_str(),
840             state.nonce.c_str(), grpc_error_std_string(state.error).c_str(),
841             absl::StrJoin(resource_names, " ").c_str());
842   }
843   GRPC_ERROR_UNREF(state.error);
844   state.error = GRPC_ERROR_NONE;
845   // Create message payload.
846   send_message_payload_ =
847       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
848   grpc_slice_unref_internal(request_payload_slice);
849   // Send the message.
850   grpc_op op;
851   memset(&op, 0, sizeof(op));
852   op.op = GRPC_OP_SEND_MESSAGE;
853   op.data.send_message.send_message = send_message_payload_;
854   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
855   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
856                     grpc_schedule_on_exec_ctx);
857   grpc_call_error call_error =
858       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
859   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
860     gpr_log(GPR_ERROR,
861             "[xds_client %p] calld=%p call_error=%d sending ADS message",
862             xds_client(), this, call_error);
863     GPR_ASSERT(GRPC_CALL_OK == call_error);
864   }
865 }
866
867 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
868     const std::string& type_url, const std::string& name) {
869   auto& state = state_map_[type_url].subscribed_resources[name];
870   if (state == nullptr) {
871     state = MakeOrphanable<ResourceState>(
872         type_url, name, !xds_client()->resource_version_map_[type_url].empty());
873     SendMessageLocked(type_url);
874   }
875 }
876
877 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
878     const std::string& type_url, const std::string& name,
879     bool delay_unsubscription) {
880   state_map_[type_url].subscribed_resources.erase(name);
881   if (!delay_unsubscription) SendMessageLocked(type_url);
882 }
883
884 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
885   for (const auto& p : state_map_) {
886     if (!p.second.subscribed_resources.empty()) return true;
887   }
888   return false;
889 }
890
891 namespace {
892
893 // Build a resource metadata struct for ADS result accepting methods and CSDS.
894 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
895     std::string serialized_proto, std::string version,
896     grpc_millis update_time) {
897   XdsApi::ResourceMetadata resource_metadata;
898   resource_metadata.serialized_proto = std::move(serialized_proto);
899   resource_metadata.update_time = update_time;
900   resource_metadata.version = std::move(version);
901   resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
902   return resource_metadata;
903 }
904
905 }  // namespace
906
907 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
908     std::string version, grpc_millis update_time,
909     XdsApi::LdsUpdateMap lds_update_map,
910     const std::set<std::string>& resource_names_failed) {
911   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
912     gpr_log(GPR_INFO,
913             "[xds_client %p] LDS update received containing %" PRIuPTR
914             " resources",
915             xds_client(), lds_update_map.size());
916   }
917   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
918   std::set<std::string> rds_resource_names_seen;
919   for (auto& p : lds_update_map) {
920     const std::string& listener_name = p.first;
921     XdsApi::LdsUpdate& lds_update = p.second.resource;
922     auto& state = lds_state.subscribed_resources[listener_name];
923     if (state != nullptr) state->Finish();
924     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
925       gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
926               listener_name.c_str(), lds_update.ToString().c_str());
927     }
928     // Record the RDS resource names seen.
929     if (!lds_update.http_connection_manager.route_config_name.empty()) {
930       rds_resource_names_seen.insert(
931           lds_update.http_connection_manager.route_config_name);
932     }
933     // Ignore identical update.
934     ListenerState& listener_state = xds_client()->listener_map_[listener_name];
935     if (listener_state.update.has_value() &&
936         *listener_state.update == lds_update) {
937       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
938         gpr_log(GPR_INFO,
939                 "[xds_client %p] LDS update for %s identical to current, "
940                 "ignoring.",
941                 xds_client(), listener_name.c_str());
942       }
943       continue;
944     }
945     // Update the listener state.
946     listener_state.update = std::move(lds_update);
947     listener_state.meta = CreateResourceMetadataAcked(
948         std::move(p.second.serialized_proto), version, update_time);
949     // Notify watchers.
950     for (const auto& p : listener_state.watchers) {
951       p.first->OnListenerChanged(*listener_state.update);
952     }
953   }
954   // For invalid resources in the update, if they are already in the
955   // cache, pretend that they are present in the update, so that we
956   // don't incorrectly consider them deleted below.
957   for (const std::string& listener_name : resource_names_failed) {
958     auto it = xds_client()->listener_map_.find(listener_name);
959     if (it != xds_client()->listener_map_.end()) {
960       auto& resource = it->second.update;
961       if (!resource.has_value()) continue;
962       lds_update_map[listener_name];
963       if (!resource->http_connection_manager.route_config_name.empty()) {
964         rds_resource_names_seen.insert(
965             resource->http_connection_manager.route_config_name);
966       }
967     }
968   }
969   // For any subscribed resource that is not present in the update,
970   // remove it from the cache and notify watchers that it does not exist.
971   for (const auto& p : lds_state.subscribed_resources) {
972     const std::string& listener_name = p.first;
973     if (lds_update_map.find(listener_name) == lds_update_map.end()) {
974       ListenerState& listener_state =
975           xds_client()->listener_map_[listener_name];
976       // If the resource was newly requested but has not yet been received,
977       // we don't want to generate an error for the watchers, because this LDS
978       // response may be in reaction to an earlier request that did not yet
979       // request the new resource, so its absence from the response does not
980       // necessarily indicate that the resource does not exist.
981       // For that case, we rely on the request timeout instead.
982       if (!listener_state.update.has_value()) continue;
983       listener_state.update.reset();
984       for (const auto& p : listener_state.watchers) {
985         p.first->OnResourceDoesNotExist();
986       }
987     }
988   }
989   // For any RDS resource that is no longer referred to by any LDS
990   // resources, remove it from the cache and notify watchers that it
991   // does not exist.
992   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
993   for (const auto& p : rds_state.subscribed_resources) {
994     const std::string& rds_resource_name = p.first;
995     if (rds_resource_names_seen.find(rds_resource_name) ==
996         rds_resource_names_seen.end()) {
997       RouteConfigState& route_config_state =
998           xds_client()->route_config_map_[rds_resource_name];
999       route_config_state.update.reset();
1000       for (const auto& p : route_config_state.watchers) {
1001         p.first->OnResourceDoesNotExist();
1002       }
1003     }
1004   }
1005 }
1006
1007 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
1008     std::string version, grpc_millis update_time,
1009     XdsApi::RdsUpdateMap rds_update_map) {
1010   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1011     gpr_log(GPR_INFO,
1012             "[xds_client %p] RDS update received containing %" PRIuPTR
1013             " resources",
1014             xds_client(), rds_update_map.size());
1015   }
1016   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
1017   for (auto& p : rds_update_map) {
1018     const std::string& route_config_name = p.first;
1019     XdsApi::RdsUpdate& rds_update = p.second.resource;
1020     auto& state = rds_state.subscribed_resources[route_config_name];
1021     if (state != nullptr) state->Finish();
1022     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1023       gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
1024               rds_update.ToString().c_str());
1025     }
1026     RouteConfigState& route_config_state =
1027         xds_client()->route_config_map_[route_config_name];
1028     // Ignore identical update.
1029     if (route_config_state.update.has_value() &&
1030         *route_config_state.update == rds_update) {
1031       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1032         gpr_log(GPR_INFO,
1033                 "[xds_client %p] RDS resource identical to current, ignoring",
1034                 xds_client());
1035       }
1036       continue;
1037     }
1038     // Update the cache.
1039     route_config_state.update = std::move(rds_update);
1040     route_config_state.meta = CreateResourceMetadataAcked(
1041         std::move(p.second.serialized_proto), version, update_time);
1042     // Notify all watchers.
1043     for (const auto& p : route_config_state.watchers) {
1044       p.first->OnRouteConfigChanged(*route_config_state.update);
1045     }
1046   }
1047 }
1048
1049 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
1050     std::string version, grpc_millis update_time,
1051     XdsApi::CdsUpdateMap cds_update_map,
1052     const std::set<std::string>& resource_names_failed) {
1053   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1054     gpr_log(GPR_INFO,
1055             "[xds_client %p] CDS update received containing %" PRIuPTR
1056             " resources",
1057             xds_client(), cds_update_map.size());
1058   }
1059   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1060   std::set<std::string> eds_resource_names_seen;
1061   for (auto& p : cds_update_map) {
1062     const char* cluster_name = p.first.c_str();
1063     XdsApi::CdsUpdate& cds_update = p.second.resource;
1064     auto& state = cds_state.subscribed_resources[cluster_name];
1065     if (state != nullptr) state->Finish();
1066     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1067       gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1068               cluster_name, cds_update.ToString().c_str());
1069     }
1070     // Record the EDS resource names seen.
1071     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1072                                        ? cluster_name
1073                                        : cds_update.eds_service_name);
1074     // Ignore identical update.
1075     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1076     if (cluster_state.update.has_value() &&
1077         *cluster_state.update == cds_update) {
1078       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1079         gpr_log(GPR_INFO,
1080                 "[xds_client %p] CDS update identical to current, ignoring.",
1081                 xds_client());
1082       }
1083       continue;
1084     }
1085     // Update the cluster state.
1086     cluster_state.update = std::move(cds_update);
1087     cluster_state.meta = CreateResourceMetadataAcked(
1088         std::move(p.second.serialized_proto), version, update_time);
1089     // Notify all watchers.
1090     for (const auto& p : cluster_state.watchers) {
1091       p.first->OnClusterChanged(cluster_state.update.value());
1092     }
1093   }
1094   // For invalid resources in the update, if they are already in the
1095   // cache, pretend that they are present in the update, so that we
1096   // don't incorrectly consider them deleted below.
1097   for (const std::string& cluster_name : resource_names_failed) {
1098     auto it = xds_client()->cluster_map_.find(cluster_name);
1099     if (it != xds_client()->cluster_map_.end()) {
1100       auto& resource = it->second.update;
1101       if (!resource.has_value()) continue;
1102       cds_update_map[cluster_name];
1103       eds_resource_names_seen.insert(resource->eds_service_name.empty()
1104                                          ? cluster_name
1105                                          : resource->eds_service_name);
1106     }
1107   }
1108   // For any subscribed resource that is not present in the update,
1109   // remove it from the cache and notify watchers that it does not exist.
1110   for (const auto& p : cds_state.subscribed_resources) {
1111     const std::string& cluster_name = p.first;
1112     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1113       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1114       // If the resource was newly requested but has not yet been received,
1115       // we don't want to generate an error for the watchers, because this CDS
1116       // response may be in reaction to an earlier request that did not yet
1117       // request the new resource, so its absence from the response does not
1118       // necessarily indicate that the resource does not exist.
1119       // For that case, we rely on the request timeout instead.
1120       if (!cluster_state.update.has_value()) continue;
1121       cluster_state.update.reset();
1122       for (const auto& p : cluster_state.watchers) {
1123         p.first->OnResourceDoesNotExist();
1124       }
1125     }
1126   }
1127   // For any EDS resource that is no longer referred to by any CDS
1128   // resources, remove it from the cache and notify watchers that it
1129   // does not exist.
1130   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1131   for (const auto& p : eds_state.subscribed_resources) {
1132     const std::string& eds_resource_name = p.first;
1133     if (eds_resource_names_seen.find(eds_resource_name) ==
1134         eds_resource_names_seen.end()) {
1135       EndpointState& endpoint_state =
1136           xds_client()->endpoint_map_[eds_resource_name];
1137       endpoint_state.update.reset();
1138       for (const auto& p : endpoint_state.watchers) {
1139         p.first->OnResourceDoesNotExist();
1140       }
1141     }
1142   }
1143 }
1144
1145 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
1146     std::string version, grpc_millis update_time,
1147     XdsApi::EdsUpdateMap eds_update_map) {
1148   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1149     gpr_log(GPR_INFO,
1150             "[xds_client %p] EDS update received containing %" PRIuPTR
1151             " resources",
1152             xds_client(), eds_update_map.size());
1153   }
1154   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1155   for (auto& p : eds_update_map) {
1156     const char* eds_service_name = p.first.c_str();
1157     XdsApi::EdsUpdate& eds_update = p.second.resource;
1158     auto& state = eds_state.subscribed_resources[eds_service_name];
1159     if (state != nullptr) state->Finish();
1160     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1161       gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1162               eds_service_name, eds_update.ToString().c_str());
1163     }
1164     EndpointState& endpoint_state =
1165         xds_client()->endpoint_map_[eds_service_name];
1166     // Ignore identical update.
1167     if (endpoint_state.update.has_value() &&
1168         *endpoint_state.update == eds_update) {
1169       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1170         gpr_log(GPR_INFO,
1171                 "[xds_client %p] EDS update identical to current, ignoring.",
1172                 xds_client());
1173       }
1174       continue;
1175     }
1176     // Update the cluster state.
1177     endpoint_state.update = std::move(eds_update);
1178     endpoint_state.meta = CreateResourceMetadataAcked(
1179         std::move(p.second.serialized_proto), version, update_time);
1180     // Notify all watchers.
1181     for (const auto& p : endpoint_state.watchers) {
1182       p.first->OnEndpointChanged(endpoint_state.update.value());
1183     }
1184   }
1185 }
1186
1187 namespace {
1188
1189 // Update resource_metadata for NACK.
1190 void UpdateResourceMetadataNacked(const std::string& version,
1191                                   const std::string& details,
1192                                   grpc_millis update_time,
1193                                   XdsApi::ResourceMetadata* resource_metadata) {
1194   resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
1195   resource_metadata->failed_version = version;
1196   resource_metadata->failed_details = details;
1197   resource_metadata->failed_update_time = update_time;
1198 }
1199
1200 }  // namespace
1201
1202 template <typename StateMap>
1203 void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked(
1204     grpc_millis update_time, const XdsApi::AdsParseResult& result,
1205     StateMap* state_map) {
1206   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1207     gpr_log(GPR_INFO,
1208             "[xds_client %p] %s update NACKed containing %" PRIuPTR
1209             " invalid resources",
1210             xds_client(), result.type_url.c_str(),
1211             result.resource_names_failed.size());
1212   }
1213   std::string details = grpc_error_std_string(result.parse_error);
1214   for (auto& name : result.resource_names_failed) {
1215     auto it = state_map->find(name);
1216     if (it == state_map->end()) continue;
1217     auto& state = it->second;
1218     // Notify watchers of error.
1219     for (const auto& p : state.watchers) {
1220       p.first->OnError(GRPC_ERROR_REF(result.parse_error));
1221     }
1222     // Update resource metadata for CSDS.
1223     UpdateResourceMetadataNacked(result.version, details, update_time,
1224                                  &state.meta);
1225   }
1226 }
1227
1228 void XdsClient::ChannelState::AdsCallState::OnRequestSent(
1229     void* arg, grpc_error_handle error) {
1230   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1231   {
1232     MutexLock lock(&ads_calld->xds_client()->mu_);
1233     ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1234   }
1235   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1236 }
1237
1238 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1239     grpc_error_handle error) {
1240   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1241     // Clean up the sent message.
1242     grpc_byte_buffer_destroy(send_message_payload_);
1243     send_message_payload_ = nullptr;
1244     // Continue to send another pending message if any.
1245     // TODO(roth): The current code to handle buffered messages has the
1246     // advantage of sending only the most recent list of resource names for
1247     // each resource type (no matter how many times that resource type has
1248     // been requested to send while the current message sending is still
1249     // pending). But its disadvantage is that we send the requests in fixed
1250     // order of resource types. We need to fix this if we are seeing some
1251     // resource type(s) starved due to frequent requests of other resource
1252     // type(s).
1253     auto it = buffered_requests_.begin();
1254     if (it != buffered_requests_.end()) {
1255       SendMessageLocked(*it);
1256       buffered_requests_.erase(it);
1257     }
1258   }
1259   GRPC_ERROR_UNREF(error);
1260 }
1261
1262 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1263     void* arg, grpc_error_handle /* error */) {
1264   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1265   bool done;
1266   {
1267     MutexLock lock(&ads_calld->xds_client()->mu_);
1268     done = ads_calld->OnResponseReceivedLocked();
1269   }
1270   if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1271 }
1272
1273 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1274   // Empty payload means the call was cancelled.
1275   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1276     return true;
1277   }
1278   // Read the response.
1279   grpc_byte_buffer_reader bbr;
1280   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1281   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1282   grpc_byte_buffer_reader_destroy(&bbr);
1283   grpc_byte_buffer_destroy(recv_message_payload_);
1284   recv_message_payload_ = nullptr;
1285   // Parse and validate the response.
1286   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1287       chand()->server_, response_slice,
1288       ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1289       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1290       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1291       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1292   grpc_slice_unref_internal(response_slice);
1293   if (result.type_url.empty()) {
1294     // Ignore unparsable response.
1295     gpr_log(GPR_ERROR,
1296             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1297             xds_client(), grpc_error_std_string(result.parse_error).c_str());
1298     GRPC_ERROR_UNREF(result.parse_error);
1299   } else {
1300     grpc_millis update_time = grpc_core::ExecCtx::Get()->Now();
1301     // Update nonce.
1302     auto& state = state_map_[result.type_url];
1303     state.nonce = std::move(result.nonce);
1304     // If we got an error, we'll NACK the update.
1305     if (result.parse_error != GRPC_ERROR_NONE) {
1306       gpr_log(GPR_ERROR,
1307               "[xds_client %p] ADS response invalid for resource type %s "
1308               "version %s, will NACK: nonce=%s error=%s",
1309               xds_client(), result.type_url.c_str(), result.version.c_str(),
1310               state.nonce.c_str(),
1311               grpc_error_std_string(result.parse_error).c_str());
1312       result.parse_error =
1313           grpc_error_set_int(result.parse_error, GRPC_ERROR_INT_GRPC_STATUS,
1314                              GRPC_STATUS_UNAVAILABLE);
1315       GRPC_ERROR_UNREF(state.error);
1316       state.error = result.parse_error;
1317       if (result.type_url == XdsApi::kLdsTypeUrl) {
1318         RejectAdsUpdateLocked(update_time, result,
1319                               &xds_client()->listener_map_);
1320       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1321         RejectAdsUpdateLocked(update_time, result,
1322                               &xds_client()->route_config_map_);
1323       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1324         RejectAdsUpdateLocked(update_time, result, &xds_client()->cluster_map_);
1325       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1326         RejectAdsUpdateLocked(update_time, result,
1327                               &xds_client()->endpoint_map_);
1328       }
1329     }
1330     // Process any valid resources.
1331     bool have_valid_resources = false;
1332     if (result.type_url == XdsApi::kLdsTypeUrl) {
1333       have_valid_resources = !result.lds_update_map.empty();
1334       AcceptLdsUpdateLocked(result.version, update_time,
1335                             std::move(result.lds_update_map),
1336                             result.resource_names_failed);
1337     } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1338       have_valid_resources = !result.rds_update_map.empty();
1339       AcceptRdsUpdateLocked(result.version, update_time,
1340                             std::move(result.rds_update_map));
1341     } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1342       have_valid_resources = !result.cds_update_map.empty();
1343       AcceptCdsUpdateLocked(result.version, update_time,
1344                             std::move(result.cds_update_map),
1345                             result.resource_names_failed);
1346     } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1347       have_valid_resources = !result.eds_update_map.empty();
1348       AcceptEdsUpdateLocked(result.version, update_time,
1349                             std::move(result.eds_update_map));
1350     }
1351     if (have_valid_resources) {
1352       seen_response_ = true;
1353       xds_client()->resource_version_map_[result.type_url] =
1354           std::move(result.version);
1355       // Start load reporting if needed.
1356       auto& lrs_call = chand()->lrs_calld_;
1357       if (lrs_call != nullptr) {
1358         LrsCallState* lrs_calld = lrs_call->calld();
1359         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1360       }
1361     }
1362     // Send ACK or NACK.
1363     SendMessageLocked(result.type_url);
1364   }
1365   if (xds_client()->shutting_down_) return true;
1366   // Keep listening for updates.
1367   grpc_op op;
1368   memset(&op, 0, sizeof(op));
1369   op.op = GRPC_OP_RECV_MESSAGE;
1370   op.data.recv_message.recv_message = &recv_message_payload_;
1371   op.flags = 0;
1372   op.reserved = nullptr;
1373   GPR_ASSERT(call_ != nullptr);
1374   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1375   const grpc_call_error call_error =
1376       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1377   GPR_ASSERT(GRPC_CALL_OK == call_error);
1378   return false;
1379 }
1380
1381 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1382     void* arg, grpc_error_handle error) {
1383   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1384   {
1385     MutexLock lock(&ads_calld->xds_client()->mu_);
1386     ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1387   }
1388   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1389 }
1390
1391 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1392     grpc_error_handle error) {
1393   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1394     char* status_details = grpc_slice_to_c_string(status_details_);
1395     gpr_log(GPR_INFO,
1396             "[xds_client %p] ADS call status received. Status = %d, details "
1397             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1398             xds_client(), status_code_, status_details, chand(), this, call_,
1399             grpc_error_std_string(error).c_str());
1400     gpr_free(status_details);
1401   }
1402   // Ignore status from a stale call.
1403   if (IsCurrentCallOnChannel()) {
1404     // Try to restart the call.
1405     parent_->OnCallFinishedLocked();
1406     // Send error to all watchers.
1407     xds_client()->NotifyOnErrorLocked(
1408         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1409   }
1410   GRPC_ERROR_UNREF(error);
1411 }
1412
1413 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1414   // If the retryable ADS call is null (which only happens when the xds channel
1415   // is shutting down), all the ADS calls are stale.
1416   if (chand()->ads_calld_ == nullptr) return false;
1417   return this == chand()->ads_calld_->calld();
1418 }
1419
1420 std::set<absl::string_view>
1421 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1422     const std::string& type_url) {
1423   std::set<absl::string_view> resource_names;
1424   auto it = state_map_.find(type_url);
1425   if (it != state_map_.end()) {
1426     for (auto& p : it->second.subscribed_resources) {
1427       resource_names.insert(p.first);
1428       OrphanablePtr<ResourceState>& state = p.second;
1429       state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1430     }
1431   }
1432   return resource_names;
1433 }
1434
1435 //
1436 // XdsClient::ChannelState::LrsCallState::Reporter
1437 //
1438
1439 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1440   if (next_report_timer_callback_pending_) {
1441     grpc_timer_cancel(&next_report_timer_);
1442   }
1443 }
1444
1445 void XdsClient::ChannelState::LrsCallState::Reporter::
1446     ScheduleNextReportLocked() {
1447   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1448   grpc_timer_init(&next_report_timer_, next_report_time,
1449                   &on_next_report_timer_);
1450   next_report_timer_callback_pending_ = true;
1451 }
1452
1453 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1454     void* arg, grpc_error_handle error) {
1455   Reporter* self = static_cast<Reporter*>(arg);
1456   bool done;
1457   {
1458     MutexLock lock(&self->xds_client()->mu_);
1459     done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1460   }
1461   if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1462 }
1463
1464 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1465     grpc_error_handle error) {
1466   next_report_timer_callback_pending_ = false;
1467   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1468     GRPC_ERROR_UNREF(error);
1469     return true;
1470   }
1471   return SendReportLocked();
1472 }
1473
1474 namespace {
1475
1476 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1477   for (const auto& p : snapshot) {
1478     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1479     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1480     for (const auto& q : cluster_snapshot.locality_stats) {
1481       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1482       if (!locality_snapshot.IsZero()) return false;
1483     }
1484   }
1485   return true;
1486 }
1487
1488 }  // namespace
1489
1490 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1491   // Construct snapshot from all reported stats.
1492   XdsApi::ClusterLoadReportMap snapshot =
1493       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1494                                                   parent_->cluster_names_);
1495   // Skip client load report if the counters were all zero in the last
1496   // report and they are still zero in this one.
1497   const bool old_val = last_report_counters_were_zero_;
1498   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1499   if (old_val && last_report_counters_were_zero_) {
1500     if (xds_client()->load_report_map_.empty()) {
1501       parent_->chand()->StopLrsCall();
1502       return true;
1503     }
1504     ScheduleNextReportLocked();
1505     return false;
1506   }
1507   // Create a request that contains the snapshot.
1508   grpc_slice request_payload_slice =
1509       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1510   parent_->send_message_payload_ =
1511       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1512   grpc_slice_unref_internal(request_payload_slice);
1513   // Send the report.
1514   grpc_op op;
1515   memset(&op, 0, sizeof(op));
1516   op.op = GRPC_OP_SEND_MESSAGE;
1517   op.data.send_message.send_message = parent_->send_message_payload_;
1518   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1519       parent_->call_, &op, 1, &on_report_done_);
1520   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1521     gpr_log(GPR_ERROR,
1522             "[xds_client %p] calld=%p call_error=%d sending client load report",
1523             xds_client(), this, call_error);
1524     GPR_ASSERT(GRPC_CALL_OK == call_error);
1525   }
1526   return false;
1527 }
1528
1529 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1530     void* arg, grpc_error_handle error) {
1531   Reporter* self = static_cast<Reporter*>(arg);
1532   bool done;
1533   {
1534     MutexLock lock(&self->xds_client()->mu_);
1535     done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1536   }
1537   if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1538 }
1539
1540 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1541     grpc_error_handle error) {
1542   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1543   parent_->send_message_payload_ = nullptr;
1544   // If there are no more registered stats to report, cancel the call.
1545   if (xds_client()->load_report_map_.empty()) {
1546     parent_->chand()->StopLrsCall();
1547     GRPC_ERROR_UNREF(error);
1548     return true;
1549   }
1550   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1551     GRPC_ERROR_UNREF(error);
1552     // If this reporter is no longer the current one on the call, the reason
1553     // might be that it was orphaned for a new one due to config update.
1554     if (!IsCurrentReporterOnCall()) {
1555       parent_->MaybeStartReportingLocked();
1556     }
1557     return true;
1558   }
1559   ScheduleNextReportLocked();
1560   return false;
1561 }
1562
1563 //
1564 // XdsClient::ChannelState::LrsCallState
1565 //
1566
1567 XdsClient::ChannelState::LrsCallState::LrsCallState(
1568     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1569     : InternallyRefCounted<LrsCallState>(
1570           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1571               ? "LrsCallState"
1572               : nullptr),
1573       parent_(std::move(parent)) {
1574   // Init the LRS call. Note that the call will progress every time there's
1575   // activity in xds_client()->interested_parties_, which is comprised of
1576   // the polling entities from client_channel.
1577   GPR_ASSERT(xds_client() != nullptr);
1578   const auto& method =
1579       chand()->server_.ShouldUseV3()
1580           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1581           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1582   call_ = grpc_channel_create_pollset_set_call(
1583       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1584       xds_client()->interested_parties_, method, nullptr,
1585       GRPC_MILLIS_INF_FUTURE, nullptr);
1586   GPR_ASSERT(call_ != nullptr);
1587   // Init the request payload.
1588   grpc_slice request_payload_slice =
1589       xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1590   send_message_payload_ =
1591       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1592   grpc_slice_unref_internal(request_payload_slice);
1593   // Init other data associated with the LRS call.
1594   grpc_metadata_array_init(&initial_metadata_recv_);
1595   grpc_metadata_array_init(&trailing_metadata_recv_);
1596   // Start the call.
1597   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1598     gpr_log(GPR_INFO,
1599             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1600             "call: %p)",
1601             xds_client(), chand(), this, call_);
1602   }
1603   // Create the ops.
1604   grpc_call_error call_error;
1605   grpc_op ops[3];
1606   memset(ops, 0, sizeof(ops));
1607   // Op: send initial metadata.
1608   grpc_op* op = ops;
1609   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1610   op->data.send_initial_metadata.count = 0;
1611   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1612               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1613   op->reserved = nullptr;
1614   op++;
1615   // Op: send request message.
1616   GPR_ASSERT(send_message_payload_ != nullptr);
1617   op->op = GRPC_OP_SEND_MESSAGE;
1618   op->data.send_message.send_message = send_message_payload_;
1619   op->flags = 0;
1620   op->reserved = nullptr;
1621   op++;
1622   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1623   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1624                     grpc_schedule_on_exec_ctx);
1625   call_error = grpc_call_start_batch_and_execute(
1626       call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1627   GPR_ASSERT(GRPC_CALL_OK == call_error);
1628   // Op: recv initial metadata.
1629   op = ops;
1630   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1631   op->data.recv_initial_metadata.recv_initial_metadata =
1632       &initial_metadata_recv_;
1633   op->flags = 0;
1634   op->reserved = nullptr;
1635   op++;
1636   // Op: recv response.
1637   op->op = GRPC_OP_RECV_MESSAGE;
1638   op->data.recv_message.recv_message = &recv_message_payload_;
1639   op->flags = 0;
1640   op->reserved = nullptr;
1641   op++;
1642   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1643   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1644                     grpc_schedule_on_exec_ctx);
1645   call_error = grpc_call_start_batch_and_execute(
1646       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1647   GPR_ASSERT(GRPC_CALL_OK == call_error);
1648   // Op: recv server status.
1649   op = ops;
1650   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1651   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1652   op->data.recv_status_on_client.status = &status_code_;
1653   op->data.recv_status_on_client.status_details = &status_details_;
1654   op->flags = 0;
1655   op->reserved = nullptr;
1656   op++;
1657   // This callback signals the end of the call, so it relies on the initial
1658   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1659   // unreffed.
1660   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1661                     grpc_schedule_on_exec_ctx);
1662   call_error = grpc_call_start_batch_and_execute(
1663       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1664   GPR_ASSERT(GRPC_CALL_OK == call_error);
1665 }
1666
1667 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1668   grpc_metadata_array_destroy(&initial_metadata_recv_);
1669   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1670   grpc_byte_buffer_destroy(send_message_payload_);
1671   grpc_byte_buffer_destroy(recv_message_payload_);
1672   grpc_slice_unref_internal(status_details_);
1673   GPR_ASSERT(call_ != nullptr);
1674   grpc_call_unref(call_);
1675 }
1676
1677 void XdsClient::ChannelState::LrsCallState::Orphan() {
1678   reporter_.reset();
1679   GPR_ASSERT(call_ != nullptr);
1680   // If we are here because xds_client wants to cancel the call,
1681   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1682   // we are here because xds_client has to orphan a failed call, then the
1683   // following cancellation will be a no-op.
1684   grpc_call_cancel_internal(call_);
1685   // Note that the initial ref is hold by on_status_received_. So the
1686   // corresponding unref happens in on_status_received_ instead of here.
1687 }
1688
1689 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1690   // Don't start again if already started.
1691   if (reporter_ != nullptr) return;
1692   // Don't start if the previous send_message op (of the initial request or the
1693   // last report of the previous reporter) hasn't completed.
1694   if (send_message_payload_ != nullptr) return;
1695   // Don't start if no LRS response has arrived.
1696   if (!seen_response()) return;
1697   // Don't start if the ADS call hasn't received any valid response. Note that
1698   // this must be the first channel because it is the current channel but its
1699   // ADS call hasn't seen any response.
1700   if (chand()->ads_calld_ == nullptr ||
1701       chand()->ads_calld_->calld() == nullptr ||
1702       !chand()->ads_calld_->calld()->seen_response()) {
1703     return;
1704   }
1705   // Start reporting.
1706   reporter_ = MakeOrphanable<Reporter>(
1707       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1708 }
1709
1710 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1711     void* arg, grpc_error_handle /*error*/) {
1712   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1713   {
1714     MutexLock lock(&lrs_calld->xds_client()->mu_);
1715     lrs_calld->OnInitialRequestSentLocked();
1716   }
1717   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1718 }
1719
1720 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1721   // Clear the send_message_payload_.
1722   grpc_byte_buffer_destroy(send_message_payload_);
1723   send_message_payload_ = nullptr;
1724   MaybeStartReportingLocked();
1725 }
1726
1727 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1728     void* arg, grpc_error_handle /*error*/) {
1729   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1730   bool done;
1731   {
1732     MutexLock lock(&lrs_calld->xds_client()->mu_);
1733     done = lrs_calld->OnResponseReceivedLocked();
1734   }
1735   if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1736 }
1737
1738 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1739   // Empty payload means the call was cancelled.
1740   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1741     return true;
1742   }
1743   // Read the response.
1744   grpc_byte_buffer_reader bbr;
1745   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1746   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1747   grpc_byte_buffer_reader_destroy(&bbr);
1748   grpc_byte_buffer_destroy(recv_message_payload_);
1749   recv_message_payload_ = nullptr;
1750   // This anonymous lambda is a hack to avoid the usage of goto.
1751   [&]() {
1752     // Parse the response.
1753     bool send_all_clusters = false;
1754     std::set<std::string> new_cluster_names;
1755     grpc_millis new_load_reporting_interval;
1756     grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse(
1757         response_slice, &send_all_clusters, &new_cluster_names,
1758         &new_load_reporting_interval);
1759     if (parse_error != GRPC_ERROR_NONE) {
1760       gpr_log(GPR_ERROR,
1761               "[xds_client %p] LRS response parsing failed. error=%s",
1762               xds_client(), grpc_error_std_string(parse_error).c_str());
1763       GRPC_ERROR_UNREF(parse_error);
1764       return;
1765     }
1766     seen_response_ = true;
1767     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1768       gpr_log(
1769           GPR_INFO,
1770           "[xds_client %p] LRS response received, %" PRIuPTR
1771           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1772           "ms",
1773           xds_client(), new_cluster_names.size(), send_all_clusters,
1774           new_load_reporting_interval);
1775       size_t i = 0;
1776       for (const auto& name : new_cluster_names) {
1777         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1778                 xds_client(), i++, name.c_str());
1779       }
1780     }
1781     if (new_load_reporting_interval <
1782         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1783       new_load_reporting_interval =
1784           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1785       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1786         gpr_log(GPR_INFO,
1787                 "[xds_client %p] Increased load_report_interval to minimum "
1788                 "value %dms",
1789                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1790       }
1791     }
1792     // Ignore identical update.
1793     if (send_all_clusters == send_all_clusters_ &&
1794         cluster_names_ == new_cluster_names &&
1795         load_reporting_interval_ == new_load_reporting_interval) {
1796       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1797         gpr_log(GPR_INFO,
1798                 "[xds_client %p] Incoming LRS response identical to current, "
1799                 "ignoring.",
1800                 xds_client());
1801       }
1802       return;
1803     }
1804     // Stop current load reporting (if any) to adopt the new config.
1805     reporter_.reset();
1806     // Record the new config.
1807     send_all_clusters_ = send_all_clusters;
1808     cluster_names_ = std::move(new_cluster_names);
1809     load_reporting_interval_ = new_load_reporting_interval;
1810     // Try starting sending load report.
1811     MaybeStartReportingLocked();
1812   }();
1813   grpc_slice_unref_internal(response_slice);
1814   if (xds_client()->shutting_down_) return true;
1815   // Keep listening for LRS config updates.
1816   grpc_op op;
1817   memset(&op, 0, sizeof(op));
1818   op.op = GRPC_OP_RECV_MESSAGE;
1819   op.data.recv_message.recv_message = &recv_message_payload_;
1820   op.flags = 0;
1821   op.reserved = nullptr;
1822   GPR_ASSERT(call_ != nullptr);
1823   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1824   const grpc_call_error call_error =
1825       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1826   GPR_ASSERT(GRPC_CALL_OK == call_error);
1827   return false;
1828 }
1829
1830 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1831     void* arg, grpc_error_handle error) {
1832   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1833   {
1834     MutexLock lock(&lrs_calld->xds_client()->mu_);
1835     lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1836   }
1837   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1838 }
1839
1840 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1841     grpc_error_handle error) {
1842   GPR_ASSERT(call_ != nullptr);
1843   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1844     char* status_details = grpc_slice_to_c_string(status_details_);
1845     gpr_log(GPR_INFO,
1846             "[xds_client %p] LRS call status received. Status = %d, details "
1847             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1848             xds_client(), status_code_, status_details, chand(), this, call_,
1849             grpc_error_std_string(error).c_str());
1850     gpr_free(status_details);
1851   }
1852   // Ignore status from a stale call.
1853   if (IsCurrentCallOnChannel()) {
1854     GPR_ASSERT(!xds_client()->shutting_down_);
1855     // Try to restart the call.
1856     parent_->OnCallFinishedLocked();
1857   }
1858   GRPC_ERROR_UNREF(error);
1859 }
1860
1861 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1862   // If the retryable LRS call is null (which only happens when the xds channel
1863   // is shutting down), all the LRS calls are stale.
1864   if (chand()->lrs_calld_ == nullptr) return false;
1865   return this == chand()->lrs_calld_->calld();
1866 }
1867
1868 //
1869 // XdsClient
1870 //
1871
1872 namespace {
1873
1874 grpc_millis GetRequestTimeout(const grpc_channel_args* args) {
1875   return grpc_channel_args_find_integer(
1876       args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1877       {15000, 0, INT_MAX});
1878 }
1879
1880 grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
1881   absl::InlinedVector<grpc_arg, 2> args_to_add = {
1882       grpc_channel_arg_integer_create(
1883           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1884           5 * 60 * GPR_MS_PER_SEC),
1885       grpc_channel_arg_integer_create(
1886           const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1887   };
1888   return grpc_channel_args_copy_and_add(args, args_to_add.data(),
1889                                         args_to_add.size());
1890 }
1891
1892 }  // namespace
1893
1894 XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
1895                      const grpc_channel_args* args)
1896     : DualRefCounted<XdsClient>(
1897           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1898                                                                   : nullptr),
1899       bootstrap_(std::move(bootstrap)),
1900       args_(ModifyChannelArgs(args)),
1901       request_timeout_(GetRequestTimeout(args)),
1902       interested_parties_(grpc_pollset_set_create()),
1903       certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1904           bootstrap_->certificate_providers())),
1905       api_(this, &grpc_xds_client_trace, bootstrap_->node(),
1906            &bootstrap_->certificate_providers()) {
1907   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1908     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1909   }
1910   // Create ChannelState object.
1911   chand_ = MakeOrphanable<ChannelState>(
1912       WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1913 }
1914
1915 XdsClient::~XdsClient() {
1916   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1917     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1918   }
1919   grpc_channel_args_destroy(args_);
1920   grpc_pollset_set_destroy(interested_parties_);
1921 }
1922
1923 void XdsClient::AddChannelzLinkage(
1924     channelz::ChannelNode* parent_channelz_node) {
1925   MutexLock lock(&mu_);
1926   channelz::ChannelNode* xds_channelz_node =
1927       grpc_channel_get_channelz_node(chand_->channel());
1928   if (xds_channelz_node != nullptr) {
1929     parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1930   }
1931 }
1932
1933 void XdsClient::RemoveChannelzLinkage(
1934     channelz::ChannelNode* parent_channelz_node) {
1935   MutexLock lock(&mu_);
1936   channelz::ChannelNode* xds_channelz_node =
1937       grpc_channel_get_channelz_node(chand_->channel());
1938   if (xds_channelz_node != nullptr) {
1939     parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1940   }
1941 }
1942
1943 void XdsClient::Orphan() {
1944   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1945     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1946   }
1947   {
1948     MutexLock lock(g_mu);
1949     if (g_xds_client == this) g_xds_client = nullptr;
1950   }
1951   {
1952     MutexLock lock(&mu_);
1953     shutting_down_ = true;
1954     // Orphan ChannelState object.
1955     chand_.reset();
1956     // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1957     // created by the XdsResolver because the maps contain refs for watchers
1958     // which in turn hold refs to the loadbalancing policies. At this point, it
1959     // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1960     // policies before those calls are done would lead to issues such as
1961     // https://github.com/grpc/grpc/issues/20928.
1962     if (!listener_map_.empty()) {
1963       cluster_map_.clear();
1964       endpoint_map_.clear();
1965     }
1966   }
1967 }
1968
1969 void XdsClient::WatchListenerData(
1970     absl::string_view listener_name,
1971     std::unique_ptr<ListenerWatcherInterface> watcher) {
1972   std::string listener_name_str = std::string(listener_name);
1973   MutexLock lock(&mu_);
1974   ListenerState& listener_state = listener_map_[listener_name_str];
1975   ListenerWatcherInterface* w = watcher.get();
1976   listener_state.watchers[w] = std::move(watcher);
1977   // If we've already received an LDS update, notify the new watcher
1978   // immediately.
1979   if (listener_state.update.has_value()) {
1980     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1981       gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1982               this, listener_name_str.c_str());
1983     }
1984     w->OnListenerChanged(*listener_state.update);
1985   }
1986   chand_->SubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str);
1987 }
1988
1989 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1990                                         ListenerWatcherInterface* watcher,
1991                                         bool delay_unsubscription) {
1992   MutexLock lock(&mu_);
1993   if (shutting_down_) return;
1994   std::string listener_name_str = std::string(listener_name);
1995   ListenerState& listener_state = listener_map_[listener_name_str];
1996   auto it = listener_state.watchers.find(watcher);
1997   if (it != listener_state.watchers.end()) {
1998     listener_state.watchers.erase(it);
1999     if (listener_state.watchers.empty()) {
2000       listener_map_.erase(listener_name_str);
2001       chand_->UnsubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str,
2002                                 delay_unsubscription);
2003     }
2004   }
2005 }
2006
2007 void XdsClient::WatchRouteConfigData(
2008     absl::string_view route_config_name,
2009     std::unique_ptr<RouteConfigWatcherInterface> watcher) {
2010   std::string route_config_name_str = std::string(route_config_name);
2011   MutexLock lock(&mu_);
2012   RouteConfigState& route_config_state =
2013       route_config_map_[route_config_name_str];
2014   RouteConfigWatcherInterface* w = watcher.get();
2015   route_config_state.watchers[w] = std::move(watcher);
2016   // If we've already received an RDS update, notify the new watcher
2017   // immediately.
2018   if (route_config_state.update.has_value()) {
2019     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2020       gpr_log(GPR_INFO,
2021               "[xds_client %p] returning cached route config data for %s", this,
2022               route_config_name_str.c_str());
2023     }
2024     w->OnRouteConfigChanged(*route_config_state.update);
2025   }
2026   chand_->SubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str);
2027 }
2028
2029 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
2030                                            RouteConfigWatcherInterface* watcher,
2031                                            bool delay_unsubscription) {
2032   MutexLock lock(&mu_);
2033   if (shutting_down_) return;
2034   std::string route_config_name_str = std::string(route_config_name);
2035   RouteConfigState& route_config_state =
2036       route_config_map_[route_config_name_str];
2037   auto it = route_config_state.watchers.find(watcher);
2038   if (it != route_config_state.watchers.end()) {
2039     route_config_state.watchers.erase(it);
2040     if (route_config_state.watchers.empty()) {
2041       route_config_map_.erase(route_config_name_str);
2042       chand_->UnsubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str,
2043                                 delay_unsubscription);
2044     }
2045   }
2046 }
2047
2048 void XdsClient::WatchClusterData(
2049     absl::string_view cluster_name,
2050     std::unique_ptr<ClusterWatcherInterface> watcher) {
2051   std::string cluster_name_str = std::string(cluster_name);
2052   MutexLock lock(&mu_);
2053   ClusterState& cluster_state = cluster_map_[cluster_name_str];
2054   ClusterWatcherInterface* w = watcher.get();
2055   cluster_state.watchers[w] = std::move(watcher);
2056   // If we've already received a CDS update, notify the new watcher
2057   // immediately.
2058   if (cluster_state.update.has_value()) {
2059     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2060       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
2061               this, cluster_name_str.c_str());
2062     }
2063     w->OnClusterChanged(cluster_state.update.value());
2064   }
2065   chand_->SubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str);
2066 }
2067
2068 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
2069                                        ClusterWatcherInterface* watcher,
2070                                        bool delay_unsubscription) {
2071   MutexLock lock(&mu_);
2072   if (shutting_down_) return;
2073   std::string cluster_name_str = std::string(cluster_name);
2074   ClusterState& cluster_state = cluster_map_[cluster_name_str];
2075   auto it = cluster_state.watchers.find(watcher);
2076   if (it != cluster_state.watchers.end()) {
2077     cluster_state.watchers.erase(it);
2078     if (cluster_state.watchers.empty()) {
2079       cluster_map_.erase(cluster_name_str);
2080       chand_->UnsubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str,
2081                                 delay_unsubscription);
2082     }
2083   }
2084 }
2085
2086 void XdsClient::WatchEndpointData(
2087     absl::string_view eds_service_name,
2088     std::unique_ptr<EndpointWatcherInterface> watcher) {
2089   std::string eds_service_name_str = std::string(eds_service_name);
2090   MutexLock lock(&mu_);
2091   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
2092   EndpointWatcherInterface* w = watcher.get();
2093   endpoint_state.watchers[w] = std::move(watcher);
2094   // If we've already received an EDS update, notify the new watcher
2095   // immediately.
2096   if (endpoint_state.update.has_value()) {
2097     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2098       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
2099               this, eds_service_name_str.c_str());
2100     }
2101     w->OnEndpointChanged(endpoint_state.update.value());
2102   }
2103   chand_->SubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str);
2104 }
2105
2106 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
2107                                         EndpointWatcherInterface* watcher,
2108                                         bool delay_unsubscription) {
2109   MutexLock lock(&mu_);
2110   if (shutting_down_) return;
2111   std::string eds_service_name_str = std::string(eds_service_name);
2112   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
2113   auto it = endpoint_state.watchers.find(watcher);
2114   if (it != endpoint_state.watchers.end()) {
2115     endpoint_state.watchers.erase(it);
2116     if (endpoint_state.watchers.empty()) {
2117       endpoint_map_.erase(eds_service_name_str);
2118       chand_->UnsubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str,
2119                                 delay_unsubscription);
2120     }
2121   }
2122 }
2123
2124 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
2125     absl::string_view lrs_server, absl::string_view cluster_name,
2126     absl::string_view eds_service_name) {
2127   // TODO(roth): When we add support for direct federation, use the
2128   // server name specified in lrs_server.
2129   auto key =
2130       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2131   MutexLock lock(&mu_);
2132   // We jump through some hoops here to make sure that the absl::string_views
2133   // stored in the XdsClusterDropStats object point to the strings
2134   // in the load_report_map_ key, so that they have the same lifetime.
2135   auto it = load_report_map_
2136                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2137                 .first;
2138   LoadReportState& load_report_state = it->second;
2139   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
2140   if (load_report_state.drop_stats != nullptr) {
2141     cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
2142   }
2143   if (cluster_drop_stats == nullptr) {
2144     if (load_report_state.drop_stats != nullptr) {
2145       load_report_state.deleted_drop_stats +=
2146           load_report_state.drop_stats->GetSnapshotAndReset();
2147     }
2148     cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
2149         Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
2150         it->first.first /*cluster_name*/,
2151         it->first.second /*eds_service_name*/);
2152     load_report_state.drop_stats = cluster_drop_stats.get();
2153   }
2154   chand_->MaybeStartLrsCall();
2155   return cluster_drop_stats;
2156 }
2157
2158 void XdsClient::RemoveClusterDropStats(
2159     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2160     absl::string_view eds_service_name,
2161     XdsClusterDropStats* cluster_drop_stats) {
2162   MutexLock lock(&mu_);
2163   // TODO(roth): When we add support for direct federation, use the
2164   // server name specified in lrs_server.
2165   auto it = load_report_map_.find(
2166       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2167   if (it == load_report_map_.end()) return;
2168   LoadReportState& load_report_state = it->second;
2169   if (load_report_state.drop_stats == cluster_drop_stats) {
2170     // Record final snapshot in deleted_drop_stats, which will be
2171     // added to the next load report.
2172     load_report_state.deleted_drop_stats +=
2173         load_report_state.drop_stats->GetSnapshotAndReset();
2174     load_report_state.drop_stats = nullptr;
2175   }
2176 }
2177
2178 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2179     absl::string_view lrs_server, absl::string_view cluster_name,
2180     absl::string_view eds_service_name,
2181     RefCountedPtr<XdsLocalityName> locality) {
2182   // TODO(roth): When we add support for direct federation, use the
2183   // server name specified in lrs_server.
2184   auto key =
2185       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2186   MutexLock lock(&mu_);
2187   // We jump through some hoops here to make sure that the absl::string_views
2188   // stored in the XdsClusterLocalityStats object point to the strings
2189   // in the load_report_map_ key, so that they have the same lifetime.
2190   auto it = load_report_map_
2191                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2192                 .first;
2193   LoadReportState& load_report_state = it->second;
2194   LoadReportState::LocalityState& locality_state =
2195       load_report_state.locality_stats[locality];
2196   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2197   if (locality_state.locality_stats != nullptr) {
2198     cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2199   }
2200   if (cluster_locality_stats == nullptr) {
2201     if (locality_state.locality_stats != nullptr) {
2202       locality_state.deleted_locality_stats +=
2203           locality_state.locality_stats->GetSnapshotAndReset();
2204     }
2205     cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2206         Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2207         it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2208         std::move(locality));
2209     locality_state.locality_stats = cluster_locality_stats.get();
2210   }
2211   chand_->MaybeStartLrsCall();
2212   return cluster_locality_stats;
2213 }
2214
2215 void XdsClient::RemoveClusterLocalityStats(
2216     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2217     absl::string_view eds_service_name,
2218     const RefCountedPtr<XdsLocalityName>& locality,
2219     XdsClusterLocalityStats* cluster_locality_stats) {
2220   MutexLock lock(&mu_);
2221   // TODO(roth): When we add support for direct federation, use the
2222   // server name specified in lrs_server.
2223   auto it = load_report_map_.find(
2224       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2225   if (it == load_report_map_.end()) return;
2226   LoadReportState& load_report_state = it->second;
2227   auto locality_it = load_report_state.locality_stats.find(locality);
2228   if (locality_it == load_report_state.locality_stats.end()) return;
2229   LoadReportState::LocalityState& locality_state = locality_it->second;
2230   if (locality_state.locality_stats == cluster_locality_stats) {
2231     // Record final snapshot in deleted_locality_stats, which will be
2232     // added to the next load report.
2233     locality_state.deleted_locality_stats +=
2234         locality_state.locality_stats->GetSnapshotAndReset();
2235     locality_state.locality_stats = nullptr;
2236   }
2237 }
2238
2239 void XdsClient::ResetBackoff() {
2240   MutexLock lock(&mu_);
2241   if (chand_ != nullptr) {
2242     grpc_channel_reset_connect_backoff(chand_->channel());
2243   }
2244 }
2245
2246 void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
2247   for (const auto& p : listener_map_) {
2248     const ListenerState& listener_state = p.second;
2249     for (const auto& p : listener_state.watchers) {
2250       p.first->OnError(GRPC_ERROR_REF(error));
2251     }
2252   }
2253   for (const auto& p : route_config_map_) {
2254     const RouteConfigState& route_config_state = p.second;
2255     for (const auto& p : route_config_state.watchers) {
2256       p.first->OnError(GRPC_ERROR_REF(error));
2257     }
2258   }
2259   for (const auto& p : cluster_map_) {
2260     const ClusterState& cluster_state = p.second;
2261     for (const auto& p : cluster_state.watchers) {
2262       p.first->OnError(GRPC_ERROR_REF(error));
2263     }
2264   }
2265   for (const auto& p : endpoint_map_) {
2266     const EndpointState& endpoint_state = p.second;
2267     for (const auto& p : endpoint_state.watchers) {
2268       p.first->OnError(GRPC_ERROR_REF(error));
2269     }
2270   }
2271   GRPC_ERROR_UNREF(error);
2272 }
2273
2274 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2275     bool send_all_clusters, const std::set<std::string>& clusters) {
2276   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2277     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2278   }
2279   XdsApi::ClusterLoadReportMap snapshot_map;
2280   for (auto load_report_it = load_report_map_.begin();
2281        load_report_it != load_report_map_.end();) {
2282     // Cluster key is cluster and EDS service name.
2283     const auto& cluster_key = load_report_it->first;
2284     LoadReportState& load_report = load_report_it->second;
2285     // If the CDS response for a cluster indicates to use LRS but the
2286     // LRS server does not say that it wants reports for this cluster,
2287     // then we'll have stats objects here whose data we're not going to
2288     // include in the load report.  However, we still need to clear out
2289     // the data from the stats objects, so that if the LRS server starts
2290     // asking for the data in the future, we don't incorrectly include
2291     // data from previous reporting intervals in that future report.
2292     const bool record_stats =
2293         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2294     XdsApi::ClusterLoadReport snapshot;
2295     // Aggregate drop stats.
2296     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2297     if (load_report.drop_stats != nullptr) {
2298       snapshot.dropped_requests +=
2299           load_report.drop_stats->GetSnapshotAndReset();
2300       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2301         gpr_log(GPR_INFO,
2302                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2303                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2304                 load_report.drop_stats);
2305       }
2306     }
2307     // Aggregate locality stats.
2308     for (auto it = load_report.locality_stats.begin();
2309          it != load_report.locality_stats.end();) {
2310       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2311       auto& locality_state = it->second;
2312       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2313           snapshot.locality_stats[locality_name];
2314       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2315       if (locality_state.locality_stats != nullptr) {
2316         locality_snapshot +=
2317             locality_state.locality_stats->GetSnapshotAndReset();
2318         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2319           gpr_log(GPR_INFO,
2320                   "[xds_client %p] cluster=%s eds_service_name=%s "
2321                   "locality=%s locality_stats=%p",
2322                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2323                   locality_name->AsHumanReadableString().c_str(),
2324                   locality_state.locality_stats);
2325         }
2326       }
2327       // If the only thing left in this entry was final snapshots from
2328       // deleted locality stats objects, remove the entry.
2329       if (locality_state.locality_stats == nullptr) {
2330         it = load_report.locality_stats.erase(it);
2331       } else {
2332         ++it;
2333       }
2334     }
2335     // Compute load report interval.
2336     const grpc_millis now = ExecCtx::Get()->Now();
2337     snapshot.load_report_interval = now - load_report.last_report_time;
2338     load_report.last_report_time = now;
2339     // Record snapshot.
2340     if (record_stats) {
2341       snapshot_map[cluster_key] = std::move(snapshot);
2342     }
2343     // If the only thing left in this entry was final snapshots from
2344     // deleted stats objects, remove the entry.
2345     if (load_report.locality_stats.empty() &&
2346         load_report.drop_stats == nullptr) {
2347       load_report_it = load_report_map_.erase(load_report_it);
2348     } else {
2349       ++load_report_it;
2350     }
2351   }
2352   return snapshot_map;
2353 }
2354
2355 std::string XdsClient::DumpClientConfigBinary() {
2356   MutexLock lock(&mu_);
2357   XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2358   // Update per-xds-type version if available, this version corresponding to the
2359   // last successful ADS update version.
2360   for (auto& p : resource_version_map_) {
2361     resource_type_metadata_map[p.first].version = p.second;
2362   }
2363   // Collect resource metadata from listeners
2364   auto& lds_map =
2365       resource_type_metadata_map[XdsApi::kLdsTypeUrl].resource_metadata_map;
2366   for (auto& p : listener_map_) {
2367     lds_map[p.first] = &p.second.meta;
2368   }
2369   // Collect resource metadata from route configs
2370   auto& rds_map =
2371       resource_type_metadata_map[XdsApi::kRdsTypeUrl].resource_metadata_map;
2372   for (auto& p : route_config_map_) {
2373     rds_map[p.first] = &p.second.meta;
2374   }
2375   // Collect resource metadata from clusters
2376   auto& cds_map =
2377       resource_type_metadata_map[XdsApi::kCdsTypeUrl].resource_metadata_map;
2378   for (auto& p : cluster_map_) {
2379     cds_map[p.first] = &p.second.meta;
2380   }
2381   // Collect resource metadata from endpoints
2382   auto& eds_map =
2383       resource_type_metadata_map[XdsApi::kEdsTypeUrl].resource_metadata_map;
2384   for (auto& p : endpoint_map_) {
2385     eds_map[p.first] = &p.second.meta;
2386   }
2387   // Assemble config dump messages
2388   return api_.AssembleClientConfig(resource_type_metadata_map);
2389 }
2390
2391 //
2392 // accessors for global state
2393 //
2394
2395 void XdsClientGlobalInit() {
2396   g_mu = new Mutex;
2397   XdsHttpFilterRegistry::Init();
2398 }
2399
2400 // TODO(roth): Find a better way to clear the fallback config that does
2401 // not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
2402 void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
2403   gpr_free(g_fallback_bootstrap_config);
2404   g_fallback_bootstrap_config = nullptr;
2405   delete g_mu;
2406   g_mu = nullptr;
2407   XdsHttpFilterRegistry::Shutdown();
2408 }
2409
2410 namespace {
2411
2412 std::string GetBootstrapContents(const char* fallback_config,
2413                                  grpc_error_handle* error) {
2414   // First, try GRPC_XDS_BOOTSTRAP env var.
2415   grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
2416   if (path != nullptr) {
2417     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2418       gpr_log(GPR_INFO,
2419               "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
2420               "environment variable: %s",
2421               path.get());
2422     }
2423     grpc_slice contents;
2424     *error =
2425         grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
2426     if (*error != GRPC_ERROR_NONE) return "";
2427     std::string contents_str(StringViewFromSlice(contents));
2428     grpc_slice_unref_internal(contents);
2429     return contents_str;
2430   }
2431   // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
2432   grpc_core::UniquePtr<char> env_config(
2433       gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
2434   if (env_config != nullptr) {
2435     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2436       gpr_log(GPR_INFO,
2437               "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
2438               "environment variable");
2439     }
2440     return env_config.get();
2441   }
2442   // Finally, try fallback config.
2443   if (fallback_config != nullptr) {
2444     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2445       gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
2446     }
2447     return fallback_config;
2448   }
2449   // No bootstrap config found.
2450   *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2451       "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
2452       "not defined");
2453   return "";
2454 }
2455
2456 }  // namespace
2457
2458 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(const grpc_channel_args* args,
2459                                                 grpc_error_handle* error) {
2460   RefCountedPtr<XdsClient> xds_client;
2461   // If getting bootstrap from channel args, create a local XdsClient
2462   // instance for the channel or server instead of using the global instance.
2463   const char* bootstrap_config = grpc_channel_args_find_string(
2464       args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
2465   if (bootstrap_config != nullptr) {
2466     std::unique_ptr<XdsBootstrap> bootstrap =
2467         XdsBootstrap::Create(bootstrap_config, error);
2468     if (*error == GRPC_ERROR_NONE) {
2469       grpc_channel_args* xds_channel_args =
2470           grpc_channel_args_find_pointer<grpc_channel_args>(
2471               args,
2472               GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
2473       return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
2474     }
2475     return nullptr;
2476   }
2477   // Otherwise, use the global instance.
2478   {
2479     MutexLock lock(g_mu);
2480     if (g_xds_client != nullptr) {
2481       auto xds_client = g_xds_client->RefIfNonZero();
2482       if (xds_client != nullptr) return xds_client;
2483     }
2484     // Find bootstrap contents.
2485     std::string bootstrap_contents =
2486         GetBootstrapContents(g_fallback_bootstrap_config, error);
2487     if (*error != GRPC_ERROR_NONE) return nullptr;
2488     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2489       gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
2490               bootstrap_contents.c_str());
2491     }
2492     // Parse bootstrap.
2493     std::unique_ptr<XdsBootstrap> bootstrap =
2494         XdsBootstrap::Create(bootstrap_contents, error);
2495     if (*error != GRPC_ERROR_NONE) return nullptr;
2496     // Instantiate XdsClient.
2497     xds_client =
2498         MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
2499     g_xds_client = xds_client.get();
2500   }
2501   return xds_client;
2502 }
2503
2504 namespace internal {
2505
2506 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2507   MutexLock lock(g_mu);
2508   g_channel_args = args;
2509 }
2510
2511 void UnsetGlobalXdsClientForTest() {
2512   MutexLock lock(g_mu);
2513   g_xds_client = nullptr;
2514 }
2515
2516 void SetXdsFallbackBootstrapConfig(const char* config) {
2517   MutexLock lock(g_mu);
2518   gpr_free(g_fallback_bootstrap_config);
2519   g_fallback_bootstrap_config = gpr_strdup(config);
2520 }
2521
2522 }  // namespace internal
2523
2524 //
2525 // embedding XdsClient in channel args
2526 //
2527
2528 #define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
2529
2530 namespace {
2531
2532 void* XdsClientArgCopy(void* p) {
2533   XdsClient* xds_client = static_cast<XdsClient*>(p);
2534   xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2535   return p;
2536 }
2537
2538 void XdsClientArgDestroy(void* p) {
2539   XdsClient* xds_client = static_cast<XdsClient*>(p);
2540   xds_client->Unref(DEBUG_LOCATION, "channel arg");
2541 }
2542
2543 int XdsClientArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
2544
2545 const grpc_arg_pointer_vtable kXdsClientArgVtable = {
2546     XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
2547
2548 }  // namespace
2549
2550 grpc_arg XdsClient::MakeChannelArg() const {
2551   return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2552                                          const_cast<XdsClient*>(this),
2553                                          &kXdsClientArgVtable);
2554 }
2555
2556 RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
2557     const grpc_channel_args& args) {
2558   XdsClient* xds_client =
2559       grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2560   if (xds_client == nullptr) return nullptr;
2561   return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
2562 }
2563
2564 }  // namespace grpc_core
2565
2566 // The returned bytes may contain NULL(0), so we can't use c-string.
2567 grpc_slice grpc_dump_xds_configs() {
2568   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2569   grpc_core::ExecCtx exec_ctx;
2570   grpc_error_handle error = GRPC_ERROR_NONE;
2571   auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
2572   if (error != GRPC_ERROR_NONE) {
2573     // If we isn't using xDS, just return an empty string.
2574     GRPC_ERROR_UNREF(error);
2575     return grpc_empty_slice();
2576   }
2577   return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary());
2578 }