Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / resolver / xds / xds_resolver.cc
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
33
34 namespace grpc_core {
35
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
37
38 const char* kXdsClusterAttribute = "xds_cluster_name";
39
40 namespace {
41
42 //
43 // XdsResolver
44 //
45
46 class XdsResolver : public Resolver {
47  public:
48   explicit XdsResolver(ResolverArgs args)
49       : Resolver(std::move(args.work_serializer),
50                  std::move(args.result_handler)),
51         args_(grpc_channel_args_copy(args.args)),
52         interested_parties_(args.pollset_set) {
53     char* path = args.uri->path;
54     if (path[0] == '/') ++path;
55     server_name_ = path;
56     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
57       gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
58               server_name_.c_str());
59     }
60   }
61
62   ~XdsResolver() override {
63     grpc_channel_args_destroy(args_);
64     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
65       gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
66     }
67   }
68
69   void StartLocked() override;
70
71   void ShutdownLocked() override;
72
73  private:
74   class Notifier {
75    public:
76     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
77     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
78     Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
79     explicit Notifier(RefCountedPtr<XdsResolver> resolver);
80
81    private:
82     enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
83
84     static void RunInExecCtx(void* arg, grpc_error* error);
85     void RunInWorkSerializer(grpc_error* error);
86
87     RefCountedPtr<XdsResolver> resolver_;
88     grpc_closure closure_;
89     XdsApi::LdsUpdate update_;
90     Type type_;
91   };
92
93   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
94    public:
95     explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
96         : resolver_(std::move(resolver)) {}
97     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
98       new Notifier(resolver_, std::move(listener));
99     }
100     void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
101     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
102
103    private:
104     RefCountedPtr<XdsResolver> resolver_;
105   };
106
107   class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
108    public:
109     explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
110         : resolver_(std::move(resolver)) {}
111     void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
112       new Notifier(resolver_, std::move(route_config));
113     }
114     void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
115     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
116
117    private:
118     RefCountedPtr<XdsResolver> resolver_;
119   };
120
121   class ClusterState
122       : public RefCounted<ClusterState, PolymorphicRefCount, false> {
123    public:
124     using ClusterStateMap =
125         std::map<std::string, std::unique_ptr<ClusterState>>;
126
127     ClusterState(const std::string& cluster_name,
128                  ClusterStateMap* cluster_state_map)
129         : it_(cluster_state_map
130                   ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
131                   .first) {}
132     const std::string& cluster() const { return it_->first; }
133
134    private:
135     ClusterStateMap::iterator it_;
136   };
137
138   class XdsConfigSelector : public ConfigSelector {
139    public:
140     XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
141                       const std::vector<XdsApi::Route>& routes,
142                       grpc_error* error);
143     ~XdsConfigSelector() override;
144
145     const char* name() const override { return "XdsConfigSelector"; }
146
147     bool Equals(const ConfigSelector* other) const override {
148       const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
149       // Don't need to compare resolver_, since that will always be the same.
150       return route_table_ == other_xds->route_table_ &&
151              clusters_ == other_xds->clusters_;
152     }
153
154     CallConfig GetCallConfig(GetCallConfigArgs args) override;
155
156    private:
157     struct Route {
158       XdsApi::Route route;
159       absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
160           weighted_cluster_state;
161       RefCountedPtr<ServiceConfig> method_config;
162       bool operator==(const Route& other) const {
163         return route == other.route &&
164                weighted_cluster_state == other.weighted_cluster_state;
165       }
166     };
167     using RouteTable = std::vector<Route>;
168
169     void MaybeAddCluster(const std::string& name);
170     grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
171                                    const XdsApi::Route& route);
172
173     RefCountedPtr<XdsResolver> resolver_;
174     RouteTable route_table_;
175     std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
176   };
177
178   void OnListenerUpdate(XdsApi::LdsUpdate listener);
179   void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
180   void OnError(grpc_error* error);
181   void OnResourceDoesNotExist();
182
183   grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
184   void GenerateResult();
185   void MaybeRemoveUnusedClusters();
186
187   std::string server_name_;
188   const grpc_channel_args* args_;
189   grpc_pollset_set* interested_parties_;
190   RefCountedPtr<XdsClient> xds_client_;
191   XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
192   std::string route_config_name_;
193   XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
194   ClusterState::ClusterStateMap cluster_state_map_;
195   std::vector<XdsApi::Route> current_update_;
196   XdsApi::Duration http_max_stream_duration_;
197 };
198
199 //
200 // XdsResolver::Notifier
201 //
202
203 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
204                                 XdsApi::LdsUpdate update)
205     : resolver_(std::move(resolver)),
206       update_(std::move(update)),
207       type_(kLdsUpdate) {
208   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
209   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
210 }
211
212 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
213                                 XdsApi::RdsUpdate update)
214     : resolver_(std::move(resolver)), type_(kRdsUpdate) {
215   update_.rds_update = std::move(update);
216   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
217   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
218 }
219
220 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
221                                 grpc_error* error)
222     : resolver_(std::move(resolver)), type_(kError) {
223   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
224   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
225 }
226
227 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
228     : resolver_(std::move(resolver)), type_(kDoesNotExist) {
229   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
230   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
231 }
232
233 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
234   Notifier* self = static_cast<Notifier*>(arg);
235   GRPC_ERROR_REF(error);
236   self->resolver_->work_serializer()->Run(
237       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
238 }
239
240 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
241   if (resolver_->xds_client_ == nullptr) {
242     GRPC_ERROR_UNREF(error);
243     delete this;
244     return;
245   }
246   switch (type_) {
247     case kLdsUpdate:
248       resolver_->OnListenerUpdate(std::move(update_));
249       break;
250     case kRdsUpdate:
251       resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
252       break;
253     case kError:
254       resolver_->OnError(error);
255       break;
256     case kDoesNotExist:
257       resolver_->OnResourceDoesNotExist();
258       break;
259   };
260   delete this;
261 }
262
263 //
264 // XdsResolver::XdsConfigSelector
265 //
266
267 XdsResolver::XdsConfigSelector::XdsConfigSelector(
268     RefCountedPtr<XdsResolver> resolver,
269     const std::vector<XdsApi::Route>& routes, grpc_error* error)
270     : resolver_(std::move(resolver)) {
271   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
272     gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
273             resolver_.get(), this);
274   }
275   // 1. Construct the route table
276   // 2  Update resolver's cluster state map
277   // 3. Construct cluster list to hold on to entries in the cluster state
278   // map.
279   // Reserve the necessary entries up-front to avoid reallocation as we add
280   // elements. This is necessary because the string_view in the entry's
281   // weighted_cluster_state field points to the memory in the route field, so
282   // moving the entry in a reallocation will cause the string_view to point to
283   // invalid data.
284   route_table_.reserve(routes.size());
285   for (auto& route : routes) {
286     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
287       gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
288               resolver_.get(), this, route.ToString().c_str());
289     }
290     route_table_.emplace_back();
291     auto& route_entry = route_table_.back();
292     route_entry.route = route;
293     // If the route doesn't specify a timeout, set its timeout to the global
294     // one.
295     if (!route.max_stream_duration.has_value()) {
296       route_entry.route.max_stream_duration =
297           resolver_->http_max_stream_duration_;
298     }
299     error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
300     if (route.weighted_clusters.empty()) {
301       MaybeAddCluster(route.cluster_name);
302     } else {
303       uint32_t end = 0;
304       for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
305         MaybeAddCluster(weighted_cluster.name);
306         end += weighted_cluster.weight;
307         route_entry.weighted_cluster_state.emplace_back(end,
308                                                         weighted_cluster.name);
309       }
310     }
311   }
312 }
313
314 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
315     RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
316   grpc_error* error = GRPC_ERROR_NONE;
317   std::vector<std::string> fields;
318   if (route.max_stream_duration.has_value() &&
319       (route.max_stream_duration->seconds != 0 ||
320        route.max_stream_duration->nanos != 0)) {
321     fields.emplace_back(absl::StrFormat("    \"timeout\": \"%d.%09ds\"",
322                                         route.max_stream_duration->seconds,
323                                         route.max_stream_duration->nanos));
324   }
325   if (!fields.empty()) {
326     std::string json = absl::StrCat(
327         "{\n"
328         "  \"methodConfig\": [ {\n"
329         "    \"name\": [\n"
330         "      {}\n"
331         "    ],\n"
332         "    ",
333         absl::StrJoin(fields, ",\n"),
334         "\n  } ]\n"
335         "}");
336     *method_config =
337         ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
338   }
339   return error;
340 }
341
342 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
343   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
344     gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
345             resolver_.get(), this);
346   }
347   clusters_.clear();
348   resolver_->MaybeRemoveUnusedClusters();
349 }
350
351 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
352   if (clusters_.find(name) == clusters_.end()) {
353     auto it = resolver_->cluster_state_map_.find(name);
354     if (it == resolver_->cluster_state_map_.end()) {
355       auto new_cluster_state =
356           MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
357       clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
358     } else {
359       clusters_[it->second->cluster()] = it->second->Ref();
360     }
361   }
362 }
363
364 bool PathMatch(const absl::string_view& path,
365                const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
366   switch (path_matcher.type) {
367     case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
368       return path_matcher.case_sensitive
369                  ? absl::StartsWith(path, path_matcher.string_matcher)
370                  : absl::StartsWithIgnoreCase(path,
371                                               path_matcher.string_matcher);
372     case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
373       return path_matcher.case_sensitive
374                  ? path == path_matcher.string_matcher
375                  : absl::EqualsIgnoreCase(path, path_matcher.string_matcher);
376     case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
377       // Note: Case-sensitive option will already have been set appropriately
378       // in path_matcher.regex_matcher when it was constructed, so no
379       // need to check it here.
380       return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
381     default:
382       return false;
383   }
384 }
385
386 absl::optional<absl::string_view> GetMetadataValue(
387     const std::string& target_key, grpc_metadata_batch* initial_metadata,
388     std::string* concatenated_value) {
389   // Find all values for the specified key.
390   GPR_DEBUG_ASSERT(initial_metadata != nullptr);
391   absl::InlinedVector<absl::string_view, 1> values;
392   for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
393        md = md->next) {
394     absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
395     absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
396     if (target_key == key) values.push_back(value);
397   }
398   // If none found, no match.
399   if (values.empty()) return absl::nullopt;
400   // If exactly one found, return it as-is.
401   if (values.size() == 1) return values.front();
402   // If more than one found, concatenate the values, using
403   // *concatenated_values as a temporary holding place for the
404   // concatenated string.
405   *concatenated_value = absl::StrJoin(values, ",");
406   return *concatenated_value;
407 }
408
409 bool HeaderMatchHelper(
410     const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
411     grpc_metadata_batch* initial_metadata) {
412   std::string concatenated_value;
413   absl::optional<absl::string_view> value;
414   // Note: If we ever allow binary headers here, we still need to
415   // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
416   // they are not visible to the LB policy in grpc-go.
417   if (absl::EndsWith(header_matcher.name, "-bin") ||
418       header_matcher.name == "grpc-previous-rpc-attempts") {
419     value = absl::nullopt;
420   } else if (header_matcher.name == "content-type") {
421     value = "application/grpc";
422   } else {
423     value = GetMetadataValue(header_matcher.name, initial_metadata,
424                              &concatenated_value);
425   }
426   if (!value.has_value()) {
427     if (header_matcher.type ==
428         XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
429       return !header_matcher.present_match;
430     } else {
431       // For all other header matcher types, we need the header value to
432       // exist to consider matches.
433       return false;
434     }
435   }
436   switch (header_matcher.type) {
437     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
438       return value.value() == header_matcher.string_matcher;
439     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
440       return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
441     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
442       int64_t int_value;
443       if (!absl::SimpleAtoi(value.value(), &int_value)) {
444         return false;
445       }
446       return int_value >= header_matcher.range_start &&
447              int_value < header_matcher.range_end;
448     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
449       return absl::StartsWith(value.value(), header_matcher.string_matcher);
450     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
451       return absl::EndsWith(value.value(), header_matcher.string_matcher);
452     default:
453       return false;
454   }
455 }
456
457 bool HeadersMatch(
458     const std::vector<XdsApi::Route::Matchers::HeaderMatcher>& header_matchers,
459     grpc_metadata_batch* initial_metadata) {
460   for (const auto& header_matcher : header_matchers) {
461     bool match = HeaderMatchHelper(header_matcher, initial_metadata);
462     if (header_matcher.invert_match) match = !match;
463     if (!match) return false;
464   }
465   return true;
466 }
467
468 bool UnderFraction(const uint32_t fraction_per_million) {
469   // Generate a random number in [0, 1000000).
470   const uint32_t random_number = rand() % 1000000;
471   return random_number < fraction_per_million;
472 }
473
474 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
475     GetCallConfigArgs args) {
476   for (const auto& entry : route_table_) {
477     // Path matching.
478     if (!PathMatch(StringViewFromSlice(*args.path),
479                    entry.route.matchers.path_matcher)) {
480       continue;
481     }
482     // Header Matching.
483     if (!HeadersMatch(entry.route.matchers.header_matchers,
484                       args.initial_metadata)) {
485       continue;
486     }
487     // Match fraction check
488     if (entry.route.matchers.fraction_per_million.has_value() &&
489         !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
490       continue;
491     }
492     // Found a route match
493     absl::string_view cluster_name;
494     if (entry.route.weighted_clusters.empty()) {
495       cluster_name = entry.route.cluster_name;
496     } else {
497       const uint32_t key =
498           rand() %
499           entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
500               .first;
501       // Find the index in weighted clusters corresponding to key.
502       size_t mid = 0;
503       size_t start_index = 0;
504       size_t end_index = entry.weighted_cluster_state.size() - 1;
505       size_t index = 0;
506       while (end_index > start_index) {
507         mid = (start_index + end_index) / 2;
508         if (entry.weighted_cluster_state[mid].first > key) {
509           end_index = mid;
510         } else if (entry.weighted_cluster_state[mid].first < key) {
511           start_index = mid + 1;
512         } else {
513           index = mid + 1;
514           break;
515         }
516       }
517       if (index == 0) index = start_index;
518       GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
519       cluster_name = entry.weighted_cluster_state[index].second;
520     }
521     auto it = clusters_.find(cluster_name);
522     GPR_ASSERT(it != clusters_.end());
523     XdsResolver* resolver =
524         static_cast<XdsResolver*>(resolver_->Ref().release());
525     ClusterState* cluster_state = it->second->Ref().release();
526     CallConfig call_config;
527     if (entry.method_config != nullptr) {
528       call_config.service_config = entry.method_config;
529       call_config.method_configs =
530           entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
531     }
532     call_config.call_attributes[kXdsClusterAttribute] = it->first;
533     call_config.on_call_committed = [resolver, cluster_state]() {
534       cluster_state->Unref();
535       ExecCtx::Run(
536           // TODO(roth): This hop into the ExecCtx is being done to avoid
537           // entering the WorkSerializer while holding the client channel data
538           // plane mutex, since that can lead to deadlocks. However, we should
539           // not have to solve this problem in each individual ConfigSelector
540           // implementation. When we have time, we should fix the client channel
541           // code to avoid this by not invoking the
542           // CallConfig::on_call_committed callback until after it has released
543           // the data plane mutex.
544           DEBUG_LOCATION,
545           GRPC_CLOSURE_CREATE(
546               [](void* arg, grpc_error* /*error*/) {
547                 auto* resolver = static_cast<XdsResolver*>(arg);
548                 resolver->work_serializer()->Run(
549                     [resolver]() {
550                       resolver->MaybeRemoveUnusedClusters();
551                       resolver->Unref();
552                     },
553                     DEBUG_LOCATION);
554               },
555               resolver, nullptr),
556           GRPC_ERROR_NONE);
557     };
558     return call_config;
559   }
560   return CallConfig();
561 }
562
563 //
564 // XdsResolver
565 //
566
567 void XdsResolver::StartLocked() {
568   grpc_error* error = GRPC_ERROR_NONE;
569   xds_client_ = XdsClient::GetOrCreate(&error);
570   if (error != GRPC_ERROR_NONE) {
571     gpr_log(GPR_ERROR,
572             "Failed to create xds client -- channel will remain in "
573             "TRANSIENT_FAILURE: %s",
574             grpc_error_string(error));
575     result_handler()->ReturnError(error);
576     return;
577   }
578   grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
579                                    interested_parties_);
580   channelz::ChannelNode* parent_channelz_node =
581       grpc_channel_args_find_pointer<channelz::ChannelNode>(
582           args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
583   if (parent_channelz_node != nullptr) {
584     xds_client_->AddChannelzLinkage(parent_channelz_node);
585   }
586   auto watcher = absl::make_unique<ListenerWatcher>(Ref());
587   listener_watcher_ = watcher.get();
588   xds_client_->WatchListenerData(server_name_, std::move(watcher));
589 }
590
591 void XdsResolver::ShutdownLocked() {
592   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
593     gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
594   }
595   if (xds_client_ != nullptr) {
596     if (listener_watcher_ != nullptr) {
597       xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
598                                            /*delay_unsubscription=*/false);
599     }
600     if (route_config_watcher_ != nullptr) {
601       xds_client_->CancelRouteConfigDataWatch(
602           server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
603     }
604     channelz::ChannelNode* parent_channelz_node =
605         grpc_channel_args_find_pointer<channelz::ChannelNode>(
606             args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
607     if (parent_channelz_node != nullptr) {
608       xds_client_->RemoveChannelzLinkage(parent_channelz_node);
609     }
610     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
611                                      interested_parties_);
612     xds_client_.reset();
613   }
614 }
615
616 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
617   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
618     gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
619   }
620   if (listener.route_config_name != route_config_name_) {
621     if (route_config_watcher_ != nullptr) {
622       xds_client_->CancelRouteConfigDataWatch(
623           route_config_name_, route_config_watcher_,
624           /*delay_unsubscription=*/!listener.route_config_name.empty());
625       route_config_watcher_ = nullptr;
626     }
627     route_config_name_ = std::move(listener.route_config_name);
628     if (!route_config_name_.empty()) {
629       auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
630       route_config_watcher_ = watcher.get();
631       xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
632     }
633   }
634   http_max_stream_duration_ = listener.http_max_stream_duration;
635   if (route_config_name_.empty()) {
636     GPR_ASSERT(listener.rds_update.has_value());
637     OnRouteConfigUpdate(std::move(*listener.rds_update));
638   }
639 }
640
641 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
642   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
643     gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
644   }
645   // Find the relevant VirtualHost from the RouteConfiguration.
646   XdsApi::RdsUpdate::VirtualHost* vhost =
647       rds_update.FindVirtualHostForDomain(server_name_);
648   if (vhost == nullptr) {
649     OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
650         absl::StrCat("could not find VirtualHost for ", server_name_,
651                      " in RouteConfiguration")
652             .c_str()));
653     return;
654   }
655   // Save the list of routes in the resolver.
656   current_update_ = std::move(vhost->routes);
657   // Send a new result to the channel.
658   GenerateResult();
659 }
660
661 void XdsResolver::OnError(grpc_error* error) {
662   gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
663           this, grpc_error_string(error));
664   Result result;
665   result.args = grpc_channel_args_copy(args_);
666   result.service_config_error = error;
667   result_handler()->ReturnResult(std::move(result));
668 }
669
670 void XdsResolver::OnResourceDoesNotExist() {
671   gpr_log(GPR_ERROR,
672           "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
673           "update and returning empty service config",
674           this);
675   current_update_.clear();
676   Result result;
677   result.service_config =
678       ServiceConfig::Create(args_, "{}", &result.service_config_error);
679   GPR_ASSERT(result.service_config != nullptr);
680   result.args = grpc_channel_args_copy(args_);
681   result_handler()->ReturnResult(std::move(result));
682 }
683
684 grpc_error* XdsResolver::CreateServiceConfig(
685     RefCountedPtr<ServiceConfig>* service_config) {
686   std::vector<std::string> clusters;
687   for (const auto& cluster : cluster_state_map_) {
688     clusters.push_back(
689         absl::StrFormat("      \"%s\":{\n"
690                         "        \"childPolicy\":[ {\n"
691                         "          \"cds_experimental\":{\n"
692                         "            \"cluster\": \"%s\"\n"
693                         "          }\n"
694                         "        } ]\n"
695                         "       }",
696                         cluster.first, cluster.first));
697   }
698   std::vector<std::string> config_parts;
699   config_parts.push_back(
700       "{\n"
701       "  \"loadBalancingConfig\":[\n"
702       "    { \"xds_cluster_manager_experimental\":{\n"
703       "      \"children\":{\n");
704   config_parts.push_back(absl::StrJoin(clusters, ",\n"));
705   config_parts.push_back(
706       "    }\n"
707       "    } }\n"
708       "  ]\n"
709       "}");
710   std::string json = absl::StrJoin(config_parts, "");
711   grpc_error* error = GRPC_ERROR_NONE;
712   *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
713   return error;
714 }
715
716 void XdsResolver::GenerateResult() {
717   if (current_update_.empty()) return;
718   // First create XdsConfigSelector, which may add new entries to the cluster
719   // state map, and then CreateServiceConfig for LB policies.
720   grpc_error* error = GRPC_ERROR_NONE;
721   auto config_selector =
722       MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
723   if (error != GRPC_ERROR_NONE) {
724     OnError(error);
725     return;
726   }
727   Result result;
728   error = CreateServiceConfig(&result.service_config);
729   if (error != GRPC_ERROR_NONE) {
730     OnError(error);
731     return;
732   }
733   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
734     gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
735             result.service_config->json_string().c_str());
736   }
737   grpc_arg new_arg = config_selector->MakeChannelArg();
738   result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
739   result_handler()->ReturnResult(std::move(result));
740 }
741
742 void XdsResolver::MaybeRemoveUnusedClusters() {
743   bool update_needed = false;
744   for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
745     RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
746     if (cluster_state != nullptr) {
747       ++it;
748     } else {
749       update_needed = true;
750       it = cluster_state_map_.erase(it);
751     }
752   }
753   if (update_needed && xds_client_ != nullptr) {
754     // Send a new result to the channel.
755     GenerateResult();
756   }
757 }
758
759 //
760 // Factory
761 //
762
763 class XdsResolverFactory : public ResolverFactory {
764  public:
765   bool IsValidUri(const grpc_uri* uri) const override {
766     if (GPR_UNLIKELY(0 != strcmp(uri->authority, ""))) {
767       gpr_log(GPR_ERROR, "URI authority not supported");
768       return false;
769     }
770     return true;
771   }
772
773   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
774     if (!IsValidUri(args.uri)) return nullptr;
775     return MakeOrphanable<XdsResolver>(std::move(args));
776   }
777
778   const char* scheme() const override { return "xds"; }
779 };
780
781 }  // namespace
782
783 }  // namespace grpc_core
784
785 void grpc_resolver_xds_init() {
786   grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
787       absl::make_unique<grpc_core::XdsResolverFactory>());
788 }
789
790 void grpc_resolver_xds_shutdown() {}