3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
38 const char* kXdsClusterAttribute = "xds_cluster_name";
46 class XdsResolver : public Resolver {
48 explicit XdsResolver(ResolverArgs args)
49 : work_serializer_(std::move(args.work_serializer)),
50 result_handler_(std::move(args.result_handler)),
51 server_name_(absl::StripPrefix(args.uri.path(), "/")),
52 args_(grpc_channel_args_copy(args.args)),
53 interested_parties_(args.pollset_set) {
54 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
55 gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
56 server_name_.c_str());
60 ~XdsResolver() override {
61 grpc_channel_args_destroy(args_);
62 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
63 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
67 void StartLocked() override;
69 void ShutdownLocked() override;
74 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
75 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
76 Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
77 explicit Notifier(RefCountedPtr<XdsResolver> resolver);
80 enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
82 static void RunInExecCtx(void* arg, grpc_error* error);
83 void RunInWorkSerializer(grpc_error* error);
85 RefCountedPtr<XdsResolver> resolver_;
86 grpc_closure closure_;
87 XdsApi::LdsUpdate update_;
91 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
93 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
94 : resolver_(std::move(resolver)) {}
95 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
96 new Notifier(resolver_, std::move(listener));
98 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
99 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
102 RefCountedPtr<XdsResolver> resolver_;
105 class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
107 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
108 : resolver_(std::move(resolver)) {}
109 void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
110 new Notifier(resolver_, std::move(route_config));
112 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
113 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
116 RefCountedPtr<XdsResolver> resolver_;
120 : public RefCounted<ClusterState, PolymorphicRefCount, false> {
122 using ClusterStateMap =
123 std::map<std::string, std::unique_ptr<ClusterState>>;
125 ClusterState(const std::string& cluster_name,
126 ClusterStateMap* cluster_state_map)
127 : it_(cluster_state_map
128 ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
130 const std::string& cluster() const { return it_->first; }
133 ClusterStateMap::iterator it_;
136 class XdsConfigSelector : public ConfigSelector {
138 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
139 const std::vector<XdsApi::Route>& routes,
141 ~XdsConfigSelector() override;
143 const char* name() const override { return "XdsConfigSelector"; }
145 bool Equals(const ConfigSelector* other) const override {
146 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
147 // Don't need to compare resolver_, since that will always be the same.
148 return route_table_ == other_xds->route_table_ &&
149 clusters_ == other_xds->clusters_;
152 CallConfig GetCallConfig(GetCallConfigArgs args) override;
157 absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
158 weighted_cluster_state;
159 RefCountedPtr<ServiceConfig> method_config;
160 bool operator==(const Route& other) const {
161 return route == other.route &&
162 weighted_cluster_state == other.weighted_cluster_state;
165 using RouteTable = std::vector<Route>;
167 void MaybeAddCluster(const std::string& name);
168 grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
169 const XdsApi::Route& route);
171 RefCountedPtr<XdsResolver> resolver_;
172 RouteTable route_table_;
173 std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
176 void OnListenerUpdate(XdsApi::LdsUpdate listener);
177 void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
178 void OnError(grpc_error* error);
179 void OnResourceDoesNotExist();
181 grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
182 void GenerateResult();
183 void MaybeRemoveUnusedClusters();
185 std::shared_ptr<WorkSerializer> work_serializer_;
186 std::unique_ptr<ResultHandler> result_handler_;
187 std::string server_name_;
188 const grpc_channel_args* args_;
189 grpc_pollset_set* interested_parties_;
190 RefCountedPtr<XdsClient> xds_client_;
191 XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
192 std::string route_config_name_;
193 XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
194 ClusterState::ClusterStateMap cluster_state_map_;
195 std::vector<XdsApi::Route> current_update_;
196 XdsApi::Duration http_max_stream_duration_;
200 // XdsResolver::Notifier
203 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
204 XdsApi::LdsUpdate update)
205 : resolver_(std::move(resolver)),
206 update_(std::move(update)),
208 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
209 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
212 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
213 XdsApi::RdsUpdate update)
214 : resolver_(std::move(resolver)), type_(kRdsUpdate) {
215 update_.rds_update = std::move(update);
216 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
217 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
220 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
222 : resolver_(std::move(resolver)), type_(kError) {
223 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
224 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
227 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
228 : resolver_(std::move(resolver)), type_(kDoesNotExist) {
229 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
230 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
233 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
234 Notifier* self = static_cast<Notifier*>(arg);
235 GRPC_ERROR_REF(error);
236 self->resolver_->work_serializer_->Run(
237 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
240 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
241 if (resolver_->xds_client_ == nullptr) {
242 GRPC_ERROR_UNREF(error);
248 resolver_->OnListenerUpdate(std::move(update_));
251 resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
254 resolver_->OnError(error);
257 resolver_->OnResourceDoesNotExist();
264 // XdsResolver::XdsConfigSelector
267 XdsResolver::XdsConfigSelector::XdsConfigSelector(
268 RefCountedPtr<XdsResolver> resolver,
269 const std::vector<XdsApi::Route>& routes, grpc_error* error)
270 : resolver_(std::move(resolver)) {
271 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
272 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
273 resolver_.get(), this);
275 // 1. Construct the route table
276 // 2 Update resolver's cluster state map
277 // 3. Construct cluster list to hold on to entries in the cluster state
279 // Reserve the necessary entries up-front to avoid reallocation as we add
280 // elements. This is necessary because the string_view in the entry's
281 // weighted_cluster_state field points to the memory in the route field, so
282 // moving the entry in a reallocation will cause the string_view to point to
284 route_table_.reserve(routes.size());
285 for (auto& route : routes) {
286 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
287 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
288 resolver_.get(), this, route.ToString().c_str());
290 route_table_.emplace_back();
291 auto& route_entry = route_table_.back();
292 route_entry.route = route;
293 // If the route doesn't specify a timeout, set its timeout to the global
295 if (!route.max_stream_duration.has_value()) {
296 route_entry.route.max_stream_duration =
297 resolver_->http_max_stream_duration_;
299 error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
300 if (route.weighted_clusters.empty()) {
301 MaybeAddCluster(route.cluster_name);
304 for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
305 MaybeAddCluster(weighted_cluster.name);
306 end += weighted_cluster.weight;
307 route_entry.weighted_cluster_state.emplace_back(end,
308 weighted_cluster.name);
314 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
315 RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
316 grpc_error* error = GRPC_ERROR_NONE;
317 std::vector<std::string> fields;
318 if (route.max_stream_duration.has_value() &&
319 (route.max_stream_duration->seconds != 0 ||
320 route.max_stream_duration->nanos != 0)) {
321 fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
322 route.max_stream_duration->seconds,
323 route.max_stream_duration->nanos));
325 if (!fields.empty()) {
326 std::string json = absl::StrCat(
328 " \"methodConfig\": [ {\n"
333 absl::StrJoin(fields, ",\n"),
337 ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
342 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
343 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
344 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
345 resolver_.get(), this);
348 resolver_->MaybeRemoveUnusedClusters();
351 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
352 if (clusters_.find(name) == clusters_.end()) {
353 auto it = resolver_->cluster_state_map_.find(name);
354 if (it == resolver_->cluster_state_map_.end()) {
355 auto new_cluster_state =
356 MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
357 clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
359 clusters_[it->second->cluster()] = it->second->Ref();
364 absl::optional<absl::string_view> GetMetadataValue(
365 const std::string& target_key, grpc_metadata_batch* initial_metadata,
366 std::string* concatenated_value) {
367 // Find all values for the specified key.
368 GPR_DEBUG_ASSERT(initial_metadata != nullptr);
369 absl::InlinedVector<absl::string_view, 1> values;
370 for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
372 absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
373 absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
374 if (target_key == key) values.push_back(value);
376 // If none found, no match.
377 if (values.empty()) return absl::nullopt;
378 // If exactly one found, return it as-is.
379 if (values.size() == 1) return values.front();
380 // If more than one found, concatenate the values, using
381 // *concatenated_values as a temporary holding place for the
382 // concatenated string.
383 *concatenated_value = absl::StrJoin(values, ",");
384 return *concatenated_value;
387 bool HeaderMatchHelper(const HeaderMatcher& header_matcher,
388 grpc_metadata_batch* initial_metadata) {
389 std::string concatenated_value;
390 absl::optional<absl::string_view> value;
391 // Note: If we ever allow binary headers here, we still need to
392 // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
393 // they are not visible to the LB policy in grpc-go.
394 if (absl::EndsWith(header_matcher.name(), "-bin") ||
395 header_matcher.name() == "grpc-previous-rpc-attempts") {
396 value = absl::nullopt;
397 } else if (header_matcher.name() == "content-type") {
398 value = "application/grpc";
400 value = GetMetadataValue(header_matcher.name(), initial_metadata,
401 &concatenated_value);
403 return header_matcher.Match(value);
406 bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
407 grpc_metadata_batch* initial_metadata) {
408 for (const auto& header_matcher : header_matchers) {
409 if (!HeaderMatchHelper(header_matcher, initial_metadata)) return false;
414 bool UnderFraction(const uint32_t fraction_per_million) {
415 // Generate a random number in [0, 1000000).
416 const uint32_t random_number = rand() % 1000000;
417 return random_number < fraction_per_million;
420 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
421 GetCallConfigArgs args) {
422 for (const auto& entry : route_table_) {
424 if (!entry.route.matchers.path_matcher.Match(
425 StringViewFromSlice(*args.path))) {
429 if (!HeadersMatch(entry.route.matchers.header_matchers,
430 args.initial_metadata)) {
433 // Match fraction check
434 if (entry.route.matchers.fraction_per_million.has_value() &&
435 !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
438 // Found a route match
439 absl::string_view cluster_name;
440 if (entry.route.weighted_clusters.empty()) {
441 cluster_name = entry.route.cluster_name;
445 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
447 // Find the index in weighted clusters corresponding to key.
449 size_t start_index = 0;
450 size_t end_index = entry.weighted_cluster_state.size() - 1;
452 while (end_index > start_index) {
453 mid = (start_index + end_index) / 2;
454 if (entry.weighted_cluster_state[mid].first > key) {
456 } else if (entry.weighted_cluster_state[mid].first < key) {
457 start_index = mid + 1;
463 if (index == 0) index = start_index;
464 GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
465 cluster_name = entry.weighted_cluster_state[index].second;
467 auto it = clusters_.find(cluster_name);
468 GPR_ASSERT(it != clusters_.end());
469 XdsResolver* resolver =
470 static_cast<XdsResolver*>(resolver_->Ref().release());
471 ClusterState* cluster_state = it->second->Ref().release();
472 CallConfig call_config;
473 if (entry.method_config != nullptr) {
474 call_config.service_config = entry.method_config;
475 call_config.method_configs =
476 entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
478 call_config.call_attributes[kXdsClusterAttribute] = it->first;
479 call_config.on_call_committed = [resolver, cluster_state]() {
480 cluster_state->Unref();
482 // TODO(roth): This hop into the ExecCtx is being done to avoid
483 // entering the WorkSerializer while holding the client channel data
484 // plane mutex, since that can lead to deadlocks. However, we should
485 // not have to solve this problem in each individual ConfigSelector
486 // implementation. When we have time, we should fix the client channel
487 // code to avoid this by not invoking the
488 // CallConfig::on_call_committed callback until after it has released
489 // the data plane mutex.
492 [](void* arg, grpc_error* /*error*/) {
493 auto* resolver = static_cast<XdsResolver*>(arg);
494 resolver->work_serializer_->Run(
496 resolver->MaybeRemoveUnusedClusters();
513 void XdsResolver::StartLocked() {
514 grpc_error* error = GRPC_ERROR_NONE;
515 xds_client_ = XdsClient::GetOrCreate(&error);
516 if (error != GRPC_ERROR_NONE) {
518 "Failed to create xds client -- channel will remain in "
519 "TRANSIENT_FAILURE: %s",
520 grpc_error_string(error));
521 result_handler_->ReturnError(error);
524 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
525 interested_parties_);
526 channelz::ChannelNode* parent_channelz_node =
527 grpc_channel_args_find_pointer<channelz::ChannelNode>(
528 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
529 if (parent_channelz_node != nullptr) {
530 xds_client_->AddChannelzLinkage(parent_channelz_node);
532 auto watcher = absl::make_unique<ListenerWatcher>(Ref());
533 listener_watcher_ = watcher.get();
534 xds_client_->WatchListenerData(server_name_, std::move(watcher));
537 void XdsResolver::ShutdownLocked() {
538 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
539 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
541 if (xds_client_ != nullptr) {
542 if (listener_watcher_ != nullptr) {
543 xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
544 /*delay_unsubscription=*/false);
546 if (route_config_watcher_ != nullptr) {
547 xds_client_->CancelRouteConfigDataWatch(
548 server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
550 channelz::ChannelNode* parent_channelz_node =
551 grpc_channel_args_find_pointer<channelz::ChannelNode>(
552 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
553 if (parent_channelz_node != nullptr) {
554 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
556 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
557 interested_parties_);
562 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
563 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
564 gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
566 if (listener.route_config_name != route_config_name_) {
567 if (route_config_watcher_ != nullptr) {
568 xds_client_->CancelRouteConfigDataWatch(
569 route_config_name_, route_config_watcher_,
570 /*delay_unsubscription=*/!listener.route_config_name.empty());
571 route_config_watcher_ = nullptr;
573 route_config_name_ = std::move(listener.route_config_name);
574 if (!route_config_name_.empty()) {
575 auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
576 route_config_watcher_ = watcher.get();
577 xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
580 http_max_stream_duration_ = listener.http_max_stream_duration;
581 if (route_config_name_.empty()) {
582 GPR_ASSERT(listener.rds_update.has_value());
583 OnRouteConfigUpdate(std::move(*listener.rds_update));
587 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
588 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
589 gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
591 // Find the relevant VirtualHost from the RouteConfiguration.
592 XdsApi::RdsUpdate::VirtualHost* vhost =
593 rds_update.FindVirtualHostForDomain(server_name_);
594 if (vhost == nullptr) {
595 OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
596 absl::StrCat("could not find VirtualHost for ", server_name_,
597 " in RouteConfiguration")
601 // Save the list of routes in the resolver.
602 current_update_ = std::move(vhost->routes);
603 // Send a new result to the channel.
607 void XdsResolver::OnError(grpc_error* error) {
608 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
609 this, grpc_error_string(error));
611 result.args = grpc_channel_args_copy(args_);
612 result.service_config_error = error;
613 result_handler_->ReturnResult(std::move(result));
616 void XdsResolver::OnResourceDoesNotExist() {
618 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
619 "update and returning empty service config",
621 current_update_.clear();
623 result.service_config =
624 ServiceConfig::Create(args_, "{}", &result.service_config_error);
625 GPR_ASSERT(result.service_config != nullptr);
626 result.args = grpc_channel_args_copy(args_);
627 result_handler_->ReturnResult(std::move(result));
630 grpc_error* XdsResolver::CreateServiceConfig(
631 RefCountedPtr<ServiceConfig>* service_config) {
632 std::vector<std::string> clusters;
633 for (const auto& cluster : cluster_state_map_) {
635 absl::StrFormat(" \"%s\":{\n"
636 " \"childPolicy\":[ {\n"
637 " \"cds_experimental\":{\n"
638 " \"cluster\": \"%s\"\n"
642 cluster.first, cluster.first));
644 std::vector<std::string> config_parts;
645 config_parts.push_back(
647 " \"loadBalancingConfig\":[\n"
648 " { \"xds_cluster_manager_experimental\":{\n"
649 " \"children\":{\n");
650 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
651 config_parts.push_back(
656 std::string json = absl::StrJoin(config_parts, "");
657 grpc_error* error = GRPC_ERROR_NONE;
658 *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
662 void XdsResolver::GenerateResult() {
663 if (current_update_.empty()) return;
664 // First create XdsConfigSelector, which may add new entries to the cluster
665 // state map, and then CreateServiceConfig for LB policies.
666 grpc_error* error = GRPC_ERROR_NONE;
667 auto config_selector =
668 MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
669 if (error != GRPC_ERROR_NONE) {
674 error = CreateServiceConfig(&result.service_config);
675 if (error != GRPC_ERROR_NONE) {
679 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
680 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
681 result.service_config->json_string().c_str());
683 grpc_arg new_arg = config_selector->MakeChannelArg();
684 result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
685 result_handler_->ReturnResult(std::move(result));
688 void XdsResolver::MaybeRemoveUnusedClusters() {
689 bool update_needed = false;
690 for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
691 RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
692 if (cluster_state != nullptr) {
695 update_needed = true;
696 it = cluster_state_map_.erase(it);
699 if (update_needed && xds_client_ != nullptr) {
700 // Send a new result to the channel.
709 class XdsResolverFactory : public ResolverFactory {
711 bool IsValidUri(const URI& uri) const override {
712 if (GPR_UNLIKELY(!uri.authority().empty())) {
713 gpr_log(GPR_ERROR, "URI authority not supported");
719 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
720 if (!IsValidUri(args.uri)) return nullptr;
721 return MakeOrphanable<XdsResolver>(std::move(args));
724 const char* scheme() const override { return "xds"; }
729 } // namespace grpc_core
731 void grpc_resolver_xds_init() {
732 grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
733 absl::make_unique<grpc_core::XdsResolverFactory>());
736 void grpc_resolver_xds_shutdown() {}