3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
38 const char* kXdsClusterAttribute = "xds_cluster_name";
46 class XdsResolver : public Resolver {
48 explicit XdsResolver(ResolverArgs args)
49 : Resolver(std::move(args.work_serializer),
50 std::move(args.result_handler)),
51 args_(grpc_channel_args_copy(args.args)),
52 interested_parties_(args.pollset_set) {
53 char* path = args.uri->path;
54 if (path[0] == '/') ++path;
56 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
57 gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
58 server_name_.c_str());
62 ~XdsResolver() override {
63 grpc_channel_args_destroy(args_);
64 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
65 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
69 void StartLocked() override;
71 void ShutdownLocked() override;
76 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
77 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
78 Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
79 explicit Notifier(RefCountedPtr<XdsResolver> resolver);
82 enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
84 static void RunInExecCtx(void* arg, grpc_error* error);
85 void RunInWorkSerializer(grpc_error* error);
87 RefCountedPtr<XdsResolver> resolver_;
88 grpc_closure closure_;
89 XdsApi::LdsUpdate update_;
93 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
95 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
96 : resolver_(std::move(resolver)) {}
97 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
98 new Notifier(resolver_, std::move(listener));
100 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
101 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
104 RefCountedPtr<XdsResolver> resolver_;
107 class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
109 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
110 : resolver_(std::move(resolver)) {}
111 void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
112 new Notifier(resolver_, std::move(route_config));
114 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
115 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
118 RefCountedPtr<XdsResolver> resolver_;
122 : public RefCounted<ClusterState, PolymorphicRefCount, false> {
124 using ClusterStateMap =
125 std::map<std::string, std::unique_ptr<ClusterState>>;
127 ClusterState(const std::string& cluster_name,
128 ClusterStateMap* cluster_state_map)
129 : it_(cluster_state_map
130 ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
132 const std::string& cluster() const { return it_->first; }
135 ClusterStateMap::iterator it_;
138 class XdsConfigSelector : public ConfigSelector {
140 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
141 const std::vector<XdsApi::Route>& routes,
143 ~XdsConfigSelector() override;
145 const char* name() const override { return "XdsConfigSelector"; }
147 bool Equals(const ConfigSelector* other) const override {
148 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
149 // Don't need to compare resolver_, since that will always be the same.
150 return route_table_ == other_xds->route_table_ &&
151 clusters_ == other_xds->clusters_;
154 CallConfig GetCallConfig(GetCallConfigArgs args) override;
159 absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
160 weighted_cluster_state;
161 RefCountedPtr<ServiceConfig> method_config;
162 bool operator==(const Route& other) const {
163 return route == other.route &&
164 weighted_cluster_state == other.weighted_cluster_state;
167 using RouteTable = std::vector<Route>;
169 void MaybeAddCluster(const std::string& name);
170 grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
171 const XdsApi::Route& route);
173 RefCountedPtr<XdsResolver> resolver_;
174 RouteTable route_table_;
175 std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
178 void OnListenerUpdate(XdsApi::LdsUpdate listener);
179 void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
180 void OnError(grpc_error* error);
181 void OnResourceDoesNotExist();
183 grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
184 void GenerateResult();
185 void MaybeRemoveUnusedClusters();
187 std::string server_name_;
188 const grpc_channel_args* args_;
189 grpc_pollset_set* interested_parties_;
190 RefCountedPtr<XdsClient> xds_client_;
191 XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
192 std::string route_config_name_;
193 XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
194 ClusterState::ClusterStateMap cluster_state_map_;
195 std::vector<XdsApi::Route> current_update_;
196 XdsApi::Duration http_max_stream_duration_;
200 // XdsResolver::Notifier
203 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
204 XdsApi::LdsUpdate update)
205 : resolver_(std::move(resolver)),
206 update_(std::move(update)),
208 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
209 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
212 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
213 XdsApi::RdsUpdate update)
214 : resolver_(std::move(resolver)), type_(kRdsUpdate) {
215 update_.rds_update = std::move(update);
216 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
217 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
220 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
222 : resolver_(std::move(resolver)), type_(kError) {
223 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
224 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
227 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
228 : resolver_(std::move(resolver)), type_(kDoesNotExist) {
229 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
230 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
233 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
234 Notifier* self = static_cast<Notifier*>(arg);
235 GRPC_ERROR_REF(error);
236 self->resolver_->work_serializer()->Run(
237 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
240 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
241 if (resolver_->xds_client_ == nullptr) {
242 GRPC_ERROR_UNREF(error);
248 resolver_->OnListenerUpdate(std::move(update_));
251 resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
254 resolver_->OnError(error);
257 resolver_->OnResourceDoesNotExist();
264 // XdsResolver::XdsConfigSelector
267 XdsResolver::XdsConfigSelector::XdsConfigSelector(
268 RefCountedPtr<XdsResolver> resolver,
269 const std::vector<XdsApi::Route>& routes, grpc_error* error)
270 : resolver_(std::move(resolver)) {
271 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
272 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
273 resolver_.get(), this);
275 // 1. Construct the route table
276 // 2 Update resolver's cluster state map
277 // 3. Construct cluster list to hold on to entries in the cluster state
279 // Reserve the necessary entries up-front to avoid reallocation as we add
280 // elements. This is necessary because the string_view in the entry's
281 // weighted_cluster_state field points to the memory in the route field, so
282 // moving the entry in a reallocation will cause the string_view to point to
284 route_table_.reserve(routes.size());
285 for (auto& route : routes) {
286 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
287 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
288 resolver_.get(), this, route.ToString().c_str());
290 route_table_.emplace_back();
291 auto& route_entry = route_table_.back();
292 route_entry.route = route;
293 // If the route doesn't specify a timeout, set its timeout to the global
295 if (!route.max_stream_duration.has_value()) {
296 route_entry.route.max_stream_duration =
297 resolver_->http_max_stream_duration_;
299 error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
300 if (route.weighted_clusters.empty()) {
301 MaybeAddCluster(route.cluster_name);
304 for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
305 MaybeAddCluster(weighted_cluster.name);
306 end += weighted_cluster.weight;
307 route_entry.weighted_cluster_state.emplace_back(end,
308 weighted_cluster.name);
314 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
315 RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
316 grpc_error* error = GRPC_ERROR_NONE;
317 std::vector<std::string> fields;
318 if (route.max_stream_duration.has_value() &&
319 (route.max_stream_duration->seconds != 0 ||
320 route.max_stream_duration->nanos != 0)) {
321 fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
322 route.max_stream_duration->seconds,
323 route.max_stream_duration->nanos));
325 if (!fields.empty()) {
326 std::string json = absl::StrCat(
328 " \"methodConfig\": [ {\n"
333 absl::StrJoin(fields, ",\n"),
337 ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
342 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
343 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
344 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
345 resolver_.get(), this);
348 resolver_->MaybeRemoveUnusedClusters();
351 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
352 if (clusters_.find(name) == clusters_.end()) {
353 auto it = resolver_->cluster_state_map_.find(name);
354 if (it == resolver_->cluster_state_map_.end()) {
355 auto new_cluster_state =
356 MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
357 clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
359 clusters_[it->second->cluster()] = it->second->Ref();
364 bool PathMatch(const absl::string_view& path,
365 const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
366 switch (path_matcher.type) {
367 case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
368 return path_matcher.case_sensitive
369 ? absl::StartsWith(path, path_matcher.string_matcher)
370 : absl::StartsWithIgnoreCase(path,
371 path_matcher.string_matcher);
372 case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
373 return path_matcher.case_sensitive
374 ? path == path_matcher.string_matcher
375 : absl::EqualsIgnoreCase(path, path_matcher.string_matcher);
376 case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
377 // Note: Case-sensitive option will already have been set appropriately
378 // in path_matcher.regex_matcher when it was constructed, so no
379 // need to check it here.
380 return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
386 absl::optional<absl::string_view> GetMetadataValue(
387 const std::string& target_key, grpc_metadata_batch* initial_metadata,
388 std::string* concatenated_value) {
389 // Find all values for the specified key.
390 GPR_DEBUG_ASSERT(initial_metadata != nullptr);
391 absl::InlinedVector<absl::string_view, 1> values;
392 for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
394 absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
395 absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
396 if (target_key == key) values.push_back(value);
398 // If none found, no match.
399 if (values.empty()) return absl::nullopt;
400 // If exactly one found, return it as-is.
401 if (values.size() == 1) return values.front();
402 // If more than one found, concatenate the values, using
403 // *concatenated_values as a temporary holding place for the
404 // concatenated string.
405 *concatenated_value = absl::StrJoin(values, ",");
406 return *concatenated_value;
409 bool HeaderMatchHelper(
410 const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
411 grpc_metadata_batch* initial_metadata) {
412 std::string concatenated_value;
413 absl::optional<absl::string_view> value;
414 // Note: If we ever allow binary headers here, we still need to
415 // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
416 // they are not visible to the LB policy in grpc-go.
417 if (absl::EndsWith(header_matcher.name, "-bin") ||
418 header_matcher.name == "grpc-previous-rpc-attempts") {
419 value = absl::nullopt;
420 } else if (header_matcher.name == "content-type") {
421 value = "application/grpc";
423 value = GetMetadataValue(header_matcher.name, initial_metadata,
424 &concatenated_value);
426 if (!value.has_value()) {
427 if (header_matcher.type ==
428 XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
429 return !header_matcher.present_match;
431 // For all other header matcher types, we need the header value to
432 // exist to consider matches.
436 switch (header_matcher.type) {
437 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
438 return value.value() == header_matcher.string_matcher;
439 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
440 return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
441 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
443 if (!absl::SimpleAtoi(value.value(), &int_value)) {
446 return int_value >= header_matcher.range_start &&
447 int_value < header_matcher.range_end;
448 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
449 return absl::StartsWith(value.value(), header_matcher.string_matcher);
450 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
451 return absl::EndsWith(value.value(), header_matcher.string_matcher);
458 const std::vector<XdsApi::Route::Matchers::HeaderMatcher>& header_matchers,
459 grpc_metadata_batch* initial_metadata) {
460 for (const auto& header_matcher : header_matchers) {
461 bool match = HeaderMatchHelper(header_matcher, initial_metadata);
462 if (header_matcher.invert_match) match = !match;
463 if (!match) return false;
468 bool UnderFraction(const uint32_t fraction_per_million) {
469 // Generate a random number in [0, 1000000).
470 const uint32_t random_number = rand() % 1000000;
471 return random_number < fraction_per_million;
474 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
475 GetCallConfigArgs args) {
476 for (const auto& entry : route_table_) {
478 if (!PathMatch(StringViewFromSlice(*args.path),
479 entry.route.matchers.path_matcher)) {
483 if (!HeadersMatch(entry.route.matchers.header_matchers,
484 args.initial_metadata)) {
487 // Match fraction check
488 if (entry.route.matchers.fraction_per_million.has_value() &&
489 !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
492 // Found a route match
493 absl::string_view cluster_name;
494 if (entry.route.weighted_clusters.empty()) {
495 cluster_name = entry.route.cluster_name;
499 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
501 // Find the index in weighted clusters corresponding to key.
503 size_t start_index = 0;
504 size_t end_index = entry.weighted_cluster_state.size() - 1;
506 while (end_index > start_index) {
507 mid = (start_index + end_index) / 2;
508 if (entry.weighted_cluster_state[mid].first > key) {
510 } else if (entry.weighted_cluster_state[mid].first < key) {
511 start_index = mid + 1;
517 if (index == 0) index = start_index;
518 GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
519 cluster_name = entry.weighted_cluster_state[index].second;
521 auto it = clusters_.find(cluster_name);
522 GPR_ASSERT(it != clusters_.end());
523 XdsResolver* resolver =
524 static_cast<XdsResolver*>(resolver_->Ref().release());
525 ClusterState* cluster_state = it->second->Ref().release();
526 CallConfig call_config;
527 if (entry.method_config != nullptr) {
528 call_config.service_config = entry.method_config;
529 call_config.method_configs =
530 entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
532 call_config.call_attributes[kXdsClusterAttribute] = it->first;
533 call_config.on_call_committed = [resolver, cluster_state]() {
534 cluster_state->Unref();
536 // TODO(roth): This hop into the ExecCtx is being done to avoid
537 // entering the WorkSerializer while holding the client channel data
538 // plane mutex, since that can lead to deadlocks. However, we should
539 // not have to solve this problem in each individual ConfigSelector
540 // implementation. When we have time, we should fix the client channel
541 // code to avoid this by not invoking the
542 // CallConfig::on_call_committed callback until after it has released
543 // the data plane mutex.
546 [](void* arg, grpc_error* /*error*/) {
547 auto* resolver = static_cast<XdsResolver*>(arg);
548 resolver->work_serializer()->Run(
550 resolver->MaybeRemoveUnusedClusters();
567 void XdsResolver::StartLocked() {
568 grpc_error* error = GRPC_ERROR_NONE;
569 xds_client_ = XdsClient::GetOrCreate(&error);
570 if (error != GRPC_ERROR_NONE) {
572 "Failed to create xds client -- channel will remain in "
573 "TRANSIENT_FAILURE: %s",
574 grpc_error_string(error));
575 result_handler()->ReturnError(error);
578 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
579 interested_parties_);
580 channelz::ChannelNode* parent_channelz_node =
581 grpc_channel_args_find_pointer<channelz::ChannelNode>(
582 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
583 if (parent_channelz_node != nullptr) {
584 xds_client_->AddChannelzLinkage(parent_channelz_node);
586 auto watcher = absl::make_unique<ListenerWatcher>(Ref());
587 listener_watcher_ = watcher.get();
588 xds_client_->WatchListenerData(server_name_, std::move(watcher));
591 void XdsResolver::ShutdownLocked() {
592 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
593 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
595 if (xds_client_ != nullptr) {
596 if (listener_watcher_ != nullptr) {
597 xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
598 /*delay_unsubscription=*/false);
600 if (route_config_watcher_ != nullptr) {
601 xds_client_->CancelRouteConfigDataWatch(
602 server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
604 channelz::ChannelNode* parent_channelz_node =
605 grpc_channel_args_find_pointer<channelz::ChannelNode>(
606 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
607 if (parent_channelz_node != nullptr) {
608 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
610 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
611 interested_parties_);
616 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
617 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
618 gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
620 if (listener.route_config_name != route_config_name_) {
621 if (route_config_watcher_ != nullptr) {
622 xds_client_->CancelRouteConfigDataWatch(
623 route_config_name_, route_config_watcher_,
624 /*delay_unsubscription=*/!listener.route_config_name.empty());
625 route_config_watcher_ = nullptr;
627 route_config_name_ = std::move(listener.route_config_name);
628 if (!route_config_name_.empty()) {
629 auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
630 route_config_watcher_ = watcher.get();
631 xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
634 http_max_stream_duration_ = listener.http_max_stream_duration;
635 if (route_config_name_.empty()) {
636 GPR_ASSERT(listener.rds_update.has_value());
637 OnRouteConfigUpdate(std::move(*listener.rds_update));
641 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
642 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
643 gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
645 // Find the relevant VirtualHost from the RouteConfiguration.
646 XdsApi::RdsUpdate::VirtualHost* vhost =
647 rds_update.FindVirtualHostForDomain(server_name_);
648 if (vhost == nullptr) {
649 OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
650 absl::StrCat("could not find VirtualHost for ", server_name_,
651 " in RouteConfiguration")
655 // Save the list of routes in the resolver.
656 current_update_ = std::move(vhost->routes);
657 // Send a new result to the channel.
661 void XdsResolver::OnError(grpc_error* error) {
662 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
663 this, grpc_error_string(error));
665 result.args = grpc_channel_args_copy(args_);
666 result.service_config_error = error;
667 result_handler()->ReturnResult(std::move(result));
670 void XdsResolver::OnResourceDoesNotExist() {
672 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
673 "update and returning empty service config",
675 current_update_.clear();
677 result.service_config =
678 ServiceConfig::Create(args_, "{}", &result.service_config_error);
679 GPR_ASSERT(result.service_config != nullptr);
680 result.args = grpc_channel_args_copy(args_);
681 result_handler()->ReturnResult(std::move(result));
684 grpc_error* XdsResolver::CreateServiceConfig(
685 RefCountedPtr<ServiceConfig>* service_config) {
686 std::vector<std::string> clusters;
687 for (const auto& cluster : cluster_state_map_) {
689 absl::StrFormat(" \"%s\":{\n"
690 " \"childPolicy\":[ {\n"
691 " \"cds_experimental\":{\n"
692 " \"cluster\": \"%s\"\n"
696 cluster.first, cluster.first));
698 std::vector<std::string> config_parts;
699 config_parts.push_back(
701 " \"loadBalancingConfig\":[\n"
702 " { \"xds_cluster_manager_experimental\":{\n"
703 " \"children\":{\n");
704 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
705 config_parts.push_back(
710 std::string json = absl::StrJoin(config_parts, "");
711 grpc_error* error = GRPC_ERROR_NONE;
712 *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
716 void XdsResolver::GenerateResult() {
717 if (current_update_.empty()) return;
718 // First create XdsConfigSelector, which may add new entries to the cluster
719 // state map, and then CreateServiceConfig for LB policies.
720 grpc_error* error = GRPC_ERROR_NONE;
721 auto config_selector =
722 MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
723 if (error != GRPC_ERROR_NONE) {
728 error = CreateServiceConfig(&result.service_config);
729 if (error != GRPC_ERROR_NONE) {
733 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
734 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
735 result.service_config->json_string().c_str());
737 grpc_arg new_arg = config_selector->MakeChannelArg();
738 result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
739 result_handler()->ReturnResult(std::move(result));
742 void XdsResolver::MaybeRemoveUnusedClusters() {
743 bool update_needed = false;
744 for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
745 RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
746 if (cluster_state != nullptr) {
749 update_needed = true;
750 it = cluster_state_map_.erase(it);
753 if (update_needed && xds_client_ != nullptr) {
754 // Send a new result to the channel.
763 class XdsResolverFactory : public ResolverFactory {
765 bool IsValidUri(const grpc_uri* uri) const override {
766 if (GPR_UNLIKELY(0 != strcmp(uri->authority, ""))) {
767 gpr_log(GPR_ERROR, "URI authority not supported");
773 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
774 if (!IsValidUri(args.uri)) return nullptr;
775 return MakeOrphanable<XdsResolver>(std::move(args));
778 const char* scheme() const override { return "xds"; }
783 } // namespace grpc_core
785 void grpc_resolver_xds_init() {
786 grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
787 absl::make_unique<grpc_core::XdsResolverFactory>());
790 void grpc_resolver_xds_shutdown() {}