2 // Copyright 2018 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <grpc/support/port_platform.h>
22 #include "absl/strings/str_cat.h"
23 #include "absl/types/optional.h"
25 #include <grpc/grpc.h>
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
32 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
33 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
34 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
35 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
36 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
37 #include "src/core/ext/filters/client_channel/resolver_registry.h"
38 #include "src/core/ext/filters/client_channel/server_address.h"
39 #include "src/core/ext/xds/xds_channel_args.h"
40 #include "src/core/ext/xds/xds_client.h"
41 #include "src/core/ext/xds/xds_client_stats.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/iomgr/work_serializer.h"
47 #include "src/core/lib/transport/error_utils.h"
48 #include "src/core/lib/uri/uri_parser.h"
50 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
54 TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb");
56 const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
60 constexpr char kXdsClusterResolver[] = "xds_cluster_resolver_experimental";
62 // Config for EDS LB policy.
63 class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
65 struct DiscoveryMechanism {
66 std::string cluster_name;
67 absl::optional<std::string> lrs_load_reporting_server_name;
68 uint32_t max_concurrent_requests;
69 enum DiscoveryMechanismType {
73 DiscoveryMechanismType type;
74 std::string eds_service_name;
75 std::string dns_hostname;
77 bool operator==(const DiscoveryMechanism& other) const {
78 return (cluster_name == other.cluster_name &&
79 lrs_load_reporting_server_name ==
80 other.lrs_load_reporting_server_name &&
81 max_concurrent_requests == other.max_concurrent_requests &&
83 eds_service_name == other.eds_service_name &&
84 dns_hostname == other.dns_hostname);
88 XdsClusterResolverLbConfig(
89 std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
90 : discovery_mechanisms_(std::move(discovery_mechanisms)),
91 xds_lb_policy_(std::move(xds_lb_policy)) {}
93 const char* name() const override { return kXdsClusterResolver; }
94 const std::vector<DiscoveryMechanism>& discovery_mechanisms() const {
95 return discovery_mechanisms_;
98 const Json& xds_lb_policy() const { return xds_lb_policy_; }
101 std::vector<DiscoveryMechanism> discovery_mechanisms_;
105 // Xds Cluster Resolver LB policy.
106 class XdsClusterResolverLb : public LoadBalancingPolicy {
108 XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args,
109 std::string server_name, bool is_xds_uri);
111 const char* name() const override { return kXdsClusterResolver; }
113 void UpdateLocked(UpdateArgs args) override;
114 void ResetBackoffLocked() override;
115 void ExitIdleLocked() override;
118 // Discovery Mechanism Base class
120 // Implemented by EDS and LOGICAL_DNS.
122 // Implementations are responsible for calling the LB policy's
123 // OnEndpointChanged(), OnError(), and OnResourceDoesNotExist()
124 // methods when the corresponding events occur.
126 // Must implement Orphan() method to cancel the watchers.
127 class DiscoveryMechanism : public InternallyRefCounted<DiscoveryMechanism> {
130 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
132 : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {}
133 virtual void Start() = 0;
134 void Orphan() override = 0;
135 virtual Json::Array override_child_policy() = 0;
136 virtual bool disable_reresolution() = 0;
138 // Returns a pair containing the cluster and eds_service_name
139 // to use for LRS load reporting. Caller must ensure that config_ is set
141 std::pair<absl::string_view, absl::string_view> GetLrsClusterKey() const {
142 if (!parent_->is_xds_uri_) return {parent_->server_name_, nullptr};
144 parent_->config_->discovery_mechanisms()[index_].cluster_name,
145 parent_->config_->discovery_mechanisms()[index_].eds_service_name};
149 XdsClusterResolverLb* parent() const { return parent_.get(); }
150 size_t index() const { return index_; }
153 RefCountedPtr<XdsClusterResolverLb> parent_;
154 // Stores its own index in the vector of DiscoveryMechanism.
158 class EdsDiscoveryMechanism : public DiscoveryMechanism {
160 EdsDiscoveryMechanism(
161 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
163 : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
164 void Start() override;
165 void Orphan() override;
166 Json::Array override_child_policy() override { return Json::Array{}; }
167 bool disable_reresolution() override { return true; }
170 class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
172 explicit EndpointWatcher(
173 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)
174 : discovery_mechanism_(std::move(discovery_mechanism)) {}
175 ~EndpointWatcher() override {
176 discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
178 void OnEndpointChanged(XdsApi::EdsUpdate update) override {
179 new Notifier(discovery_mechanism_, std::move(update));
181 void OnError(grpc_error_handle error) override {
182 new Notifier(discovery_mechanism_, error);
184 void OnResourceDoesNotExist() override {
185 new Notifier(discovery_mechanism_);
191 Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
192 XdsApi::EdsUpdate update);
193 Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
194 grpc_error_handle error);
196 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism);
197 ~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); }
200 enum Type { kUpdate, kError, kDoesNotExist };
202 static void RunInExecCtx(void* arg, grpc_error_handle error);
203 void RunInWorkSerializer(grpc_error_handle error);
205 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
206 grpc_closure closure_;
207 XdsApi::EdsUpdate update_;
211 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
214 absl::string_view GetEdsResourceName() const {
215 if (!parent()->is_xds_uri_) return parent()->server_name_;
217 ->config_->discovery_mechanisms()[index()]
218 .eds_service_name.empty()) {
220 ->config_->discovery_mechanisms()[index()]
223 return parent()->config_->discovery_mechanisms()[index()].cluster_name;
226 // Note that this is not owned, so this pointer must never be dereferenced.
227 EndpointWatcher* watcher_ = nullptr;
230 class LogicalDNSDiscoveryMechanism : public DiscoveryMechanism {
232 LogicalDNSDiscoveryMechanism(
233 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
235 : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
236 void Start() override;
237 void Orphan() override;
238 Json::Array override_child_policy() override {
241 {"pick_first", Json::Object()},
245 bool disable_reresolution() override { return false; };
248 class ResolverResultHandler : public Resolver::ResultHandler {
250 explicit ResolverResultHandler(
251 RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism)
252 : discovery_mechanism_(std::move(discovery_mechanism)) {}
254 ~ResolverResultHandler() override {}
256 void ReturnResult(Resolver::Result result) override;
258 void ReturnError(grpc_error_handle error) override;
261 RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_;
264 // This is necessary only because of a bug in msvc where nested class cannot
265 // access protected member in base class.
266 friend class ResolverResultHandler;
268 OrphanablePtr<Resolver> resolver_;
271 struct DiscoveryMechanismEntry {
272 OrphanablePtr<DiscoveryMechanism> discovery_mechanism;
273 bool first_update_received = false;
274 // Number of priorities this mechanism has contributed to priority_list_.
275 // (The sum of this across all discovery mechanisms should always equal
276 // the number of priorities in priority_list_.)
277 uint32_t num_priorities = 0;
278 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config;
279 // Populated only when an update has been delivered by the mechanism
280 // but has not yet been applied to the LB policy's combined priority_list_.
281 absl::optional<XdsApi::EdsUpdate::PriorityList> pending_priority_list;
284 class Helper : public ChannelControlHelper {
287 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)
288 : xds_cluster_resolver_policy_(std::move(xds_cluster_resolver_policy)) {
292 xds_cluster_resolver_policy_.reset(DEBUG_LOCATION, "Helper");
295 RefCountedPtr<SubchannelInterface> CreateSubchannel(
296 ServerAddress address, const grpc_channel_args& args) override;
297 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
298 std::unique_ptr<SubchannelPicker> picker) override;
299 // This is a no-op, because we get the addresses from the xds
300 // client, which is a watch-based API.
301 void RequestReresolution() override {}
302 void AddTraceEvent(TraceSeverity severity,
303 absl::string_view message) override;
306 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy_;
309 ~XdsClusterResolverLb() override;
311 void ShutdownLocked() override;
313 void OnEndpointChanged(size_t index, XdsApi::EdsUpdate update);
314 void OnError(size_t index, grpc_error_handle error);
315 void OnResourceDoesNotExist(size_t index);
317 void MaybeDestroyChildPolicyLocked();
319 void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list);
320 void UpdateChildPolicyLocked();
321 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
322 const grpc_channel_args* args);
323 ServerAddressList CreateChildPolicyAddressesLocked();
324 RefCountedPtr<Config> CreateChildPolicyConfigLocked();
325 grpc_channel_args* CreateChildPolicyArgsLocked(
326 const grpc_channel_args* args_in);
328 // The xds client and endpoint watcher.
329 RefCountedPtr<XdsClient> xds_client_;
331 // Server name from target URI.
332 std::string server_name_;
335 // Current channel args and config from the resolver.
336 const grpc_channel_args* args_ = nullptr;
337 RefCountedPtr<XdsClusterResolverLbConfig> config_;
340 bool shutting_down_ = false;
342 // Vector of discovery mechansism entries in priority order.
343 std::vector<DiscoveryMechanismEntry> discovery_mechanisms_;
345 // The latest data from the endpoint watcher.
346 XdsApi::EdsUpdate::PriorityList priority_list_;
347 // State used to retain child policy names for priority policy.
348 std::vector<size_t /*child_number*/> priority_child_numbers_;
350 OrphanablePtr<LoadBalancingPolicy> child_policy_;
354 // XdsClusterResolverLb::Helper
357 RefCountedPtr<SubchannelInterface>
358 XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address,
359 const grpc_channel_args& args) {
360 if (xds_cluster_resolver_policy_->shutting_down_) return nullptr;
361 return xds_cluster_resolver_policy_->channel_control_helper()
362 ->CreateSubchannel(std::move(address), args);
365 void XdsClusterResolverLb::Helper::UpdateState(
366 grpc_connectivity_state state, const absl::Status& status,
367 std::unique_ptr<SubchannelPicker> picker) {
368 if (xds_cluster_resolver_policy_->shutting_down_ ||
369 xds_cluster_resolver_policy_->child_policy_ == nullptr) {
372 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
374 "[xds_cluster_resolver_lb %p] child policy updated state=%s (%s) "
376 xds_cluster_resolver_policy_.get(), ConnectivityStateName(state),
377 status.ToString().c_str(), picker.get());
379 xds_cluster_resolver_policy_->channel_control_helper()->UpdateState(
380 state, status, std::move(picker));
383 void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity,
384 absl::string_view message) {
385 if (xds_cluster_resolver_policy_->shutting_down_) return;
386 xds_cluster_resolver_policy_->channel_control_helper()->AddTraceEvent(
391 // XdsClusterResolverLb::EdsDiscoveryMechanism
394 void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
395 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
397 "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
398 ":%p starting xds watch for %s",
399 parent(), index(), this, std::string(GetEdsResourceName()).c_str());
401 auto watcher = absl::make_unique<EndpointWatcher>(
402 Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"));
403 watcher_ = watcher.get();
404 parent()->xds_client_->WatchEndpointData(GetEdsResourceName(),
408 void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
409 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
411 "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
412 ":%p cancelling xds watch for %s",
413 parent(), index(), this, std::string(GetEdsResourceName()).c_str());
415 parent()->xds_client_->CancelEndpointDataWatch(GetEdsResourceName(),
421 // XdsClusterResolverLb::EndpointWatcher::Notifier
424 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
425 Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
427 XdsApi::EdsUpdate update)
428 : discovery_mechanism_(std::move(discovery_mechanism)),
429 update_(std::move(update)),
431 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
432 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
435 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
436 Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
438 grpc_error_handle error)
439 : discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) {
440 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
441 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
444 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
445 Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
447 : discovery_mechanism_(std::move(discovery_mechanism)),
448 type_(kDoesNotExist) {
449 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
450 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
453 void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
454 RunInExecCtx(void* arg, grpc_error_handle error) {
455 Notifier* self = static_cast<Notifier*>(arg);
456 GRPC_ERROR_REF(error);
457 self->discovery_mechanism_->parent()->work_serializer()->Run(
458 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
461 void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
462 RunInWorkSerializer(grpc_error_handle error) {
465 discovery_mechanism_->parent()->OnEndpointChanged(
466 discovery_mechanism_->index(), std::move(update_));
469 discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
473 discovery_mechanism_->parent()->OnResourceDoesNotExist(
474 discovery_mechanism_->index());
481 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism
484 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
486 parent()->config_->discovery_mechanisms()[index()].dns_hostname;
487 grpc_channel_args* args = nullptr;
488 FakeResolverResponseGenerator* fake_resolver_response_generator =
489 grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
491 GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
492 if (fake_resolver_response_generator != nullptr) {
493 target = absl::StrCat("fake:", target);
494 grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg(
495 fake_resolver_response_generator);
496 args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1);
498 target = absl::StrCat("dns:", target);
499 args = grpc_channel_args_copy(parent()->args_);
501 resolver_ = ResolverRegistry::CreateResolver(
502 target.c_str(), args, parent()->interested_parties(),
503 parent()->work_serializer(),
504 absl::make_unique<ResolverResultHandler>(
505 Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism")));
506 grpc_channel_args_destroy(args);
507 if (resolver_ == nullptr) {
508 parent()->OnResourceDoesNotExist(index());
511 resolver_->StartLocked();
512 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
514 "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism "
515 "%" PRIuPTR ":%p starting dns resolver %p",
516 parent(), index(), this, resolver_.get());
520 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() {
521 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
524 "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism %" PRIuPTR
525 ":%p shutting down dns resolver %p",
526 parent(), index(), this, resolver_.get());
533 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler
536 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
537 ReturnResult(Resolver::Result result) {
538 // convert result to eds update
539 XdsApi::EdsUpdate update;
540 XdsApi::EdsUpdate::Priority::Locality locality;
541 locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
542 locality.lb_weight = 1;
543 locality.endpoints = std::move(result.addresses);
544 XdsApi::EdsUpdate::Priority priority;
545 priority.localities.emplace(locality.name.get(), std::move(locality));
546 update.priorities.emplace_back(std::move(priority));
547 discovery_mechanism_->parent()->OnEndpointChanged(
548 discovery_mechanism_->index(), std::move(update));
551 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
552 ReturnError(grpc_error_handle error) {
553 discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error);
557 // XdsClusterResolverLb public methods
560 XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
561 Args args, std::string server_name,
563 : LoadBalancingPolicy(std::move(args)),
564 xds_client_(std::move(xds_client)),
565 server_name_(std::move(server_name)),
566 is_xds_uri_(is_xds_uri) {
567 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
569 "[xds_cluster_resolver_lb %p] created -- xds_client=%p, "
570 "server_name=%s, is_xds_uri=%d",
571 this, xds_client_.get(), server_name_.c_str(), is_xds_uri_);
575 // Setup channelz linkage.
576 channelz::ChannelNode* parent_channelz_node =
577 grpc_channel_args_find_pointer<channelz::ChannelNode>(
578 args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
579 if (parent_channelz_node != nullptr) {
580 xds_client_->AddChannelzLinkage(parent_channelz_node);
583 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
584 interested_parties());
588 XdsClusterResolverLb::~XdsClusterResolverLb() {
589 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
591 "[xds_cluster_resolver_lb %p] destroying xds_cluster_resolver LB "
597 void XdsClusterResolverLb::ShutdownLocked() {
598 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
599 gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] shutting down", this);
601 shutting_down_ = true;
602 MaybeDestroyChildPolicyLocked();
603 discovery_mechanisms_.clear();
605 // Remove channelz linkage.
606 channelz::ChannelNode* parent_channelz_node =
607 grpc_channel_args_find_pointer<channelz::ChannelNode>(
608 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
609 if (parent_channelz_node != nullptr) {
610 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
613 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
614 interested_parties());
616 xds_client_.reset(DEBUG_LOCATION, "XdsClusterResolverLb");
617 // Destroy channel args.
618 grpc_channel_args_destroy(args_);
622 void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() {
623 if (child_policy_ != nullptr) {
624 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
625 interested_parties());
626 child_policy_.reset();
630 void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
631 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
632 gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this);
634 const bool is_initial_update = args_ == nullptr;
636 auto old_config = std::move(config_);
637 config_ = std::move(args.config);
639 grpc_channel_args_destroy(args_);
642 // Update child policy if needed.
643 if (child_policy_ != nullptr) UpdateChildPolicyLocked();
644 // Create endpoint watcher if needed.
645 if (is_initial_update) {
646 for (const auto& config : config_->discovery_mechanisms()) {
647 DiscoveryMechanismEntry entry;
648 if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
649 DiscoveryMechanismType::EDS) {
650 entry.discovery_mechanism =
651 grpc_core::MakeOrphanable<EdsDiscoveryMechanism>(
652 Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"),
653 discovery_mechanisms_.size());
654 } else if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
655 DiscoveryMechanismType::LOGICAL_DNS) {
656 entry.discovery_mechanism =
657 grpc_core::MakeOrphanable<LogicalDNSDiscoveryMechanism>(
658 Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"),
659 discovery_mechanisms_.size());
663 discovery_mechanisms_.push_back(std::move(entry));
665 // Call start() on all discovery mechanisms after creation.
666 for (const auto& discovery_mechanism : discovery_mechanisms_) {
667 discovery_mechanism.discovery_mechanism->Start();
672 void XdsClusterResolverLb::ResetBackoffLocked() {
673 // When the XdsClient is instantiated in the resolver instead of in this
674 // LB policy, this is done via the resolver, so we don't need to do it here.
675 if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff();
676 if (child_policy_ != nullptr) {
677 child_policy_->ResetBackoffLocked();
681 void XdsClusterResolverLb::ExitIdleLocked() {
682 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
685 void XdsClusterResolverLb::OnEndpointChanged(size_t index,
686 XdsApi::EdsUpdate update) {
687 if (shutting_down_) return;
688 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
690 "[xds_cluster_resolver_lb %p] Received update from xds client"
691 " for discovery mechanism %" PRIuPTR "",
694 // We need at least one priority for each discovery mechanism, just so that we
695 // have a child in which to create the xds_cluster_impl policy. This ensures
696 // that we properly handle the case of a discovery mechanism dropping 100% of
697 // calls, the OnError() case, and the OnResourceDoesNotExist() case.
698 if (update.priorities.empty()) update.priorities.emplace_back();
699 discovery_mechanisms_[index].drop_config = std::move(update.drop_config);
700 discovery_mechanisms_[index].pending_priority_list =
701 std::move(update.priorities);
702 discovery_mechanisms_[index].first_update_received = true;
703 // If any discovery mechanism has not received its first update,
704 // wait until that happens before creating the child policy.
705 // TODO(roth): If this becomes problematic in the future (e.g., a
706 // secondary discovery mechanism delaying us from starting up at all),
707 // we can consider some sort of optimization whereby we can create the
708 // priority policy with only a subset of its children. But we need to
709 // make sure not to get into a situation where the priority policy
710 // will put the channel into TRANSIENT_FAILURE instead of CONNECTING
711 // while we're still waiting for the other discovery mechanism(s).
712 for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
713 if (!mechanism.first_update_received) return;
715 // Construct new priority list.
716 XdsApi::EdsUpdate::PriorityList priority_list;
717 size_t priority_index = 0;
718 for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
719 // If the mechanism has a pending update, use that.
720 // Otherwise, use the priorities that it previously contributed to the
722 if (mechanism.pending_priority_list.has_value()) {
723 priority_list.insert(priority_list.end(),
724 mechanism.pending_priority_list->begin(),
725 mechanism.pending_priority_list->end());
726 priority_index += mechanism.num_priorities;
727 mechanism.num_priorities = mechanism.pending_priority_list->size();
728 mechanism.pending_priority_list.reset();
730 priority_list.insert(
731 priority_list.end(), priority_list_.begin() + priority_index,
732 priority_list_.begin() + priority_index + mechanism.num_priorities);
733 priority_index += mechanism.num_priorities;
736 // Update child policy.
737 UpdatePriorityList(std::move(priority_list));
740 void XdsClusterResolverLb::OnError(size_t index, grpc_error_handle error) {
742 "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
743 " xds watcher reported error: %s",
744 this, index, grpc_error_std_string(error).c_str());
745 GRPC_ERROR_UNREF(error);
746 if (shutting_down_) return;
747 if (!discovery_mechanisms_[index].first_update_received) {
748 // Call OnEndpointChanged with an empty update just like
749 // OnResourceDoesNotExist.
750 OnEndpointChanged(index, XdsApi::EdsUpdate());
754 void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index) {
756 "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
757 " resource does not exist",
759 if (shutting_down_) return;
760 // Call OnEndpointChanged with an empty update.
761 OnEndpointChanged(index, XdsApi::EdsUpdate());
765 // child policy-related methods
768 void XdsClusterResolverLb::UpdatePriorityList(
769 XdsApi::EdsUpdate::PriorityList priority_list) {
770 // Build some maps from locality to child number and the reverse from
771 // the old data in priority_list_ and priority_child_numbers_.
772 std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
774 std::map<size_t, std::set<XdsLocalityName*>> child_locality_map;
775 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
776 size_t child_number = priority_child_numbers_[priority];
777 const auto& localities = priority_list_[priority].localities;
778 for (const auto& p : localities) {
779 XdsLocalityName* locality_name = p.first;
780 locality_child_map[locality_name] = child_number;
781 child_locality_map[child_number].insert(locality_name);
784 // Construct new list of children.
785 std::vector<size_t> priority_child_numbers;
786 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
787 const auto& localities = priority_list[priority].localities;
788 absl::optional<size_t> child_number;
789 // If one of the localities in this priority already existed, reuse its
791 for (const auto& p : localities) {
792 XdsLocalityName* locality_name = p.first;
793 if (!child_number.has_value()) {
794 auto it = locality_child_map.find(locality_name);
795 if (it != locality_child_map.end()) {
796 child_number = it->second;
797 locality_child_map.erase(it);
798 // Remove localities that *used* to be in this child number, so
799 // that we don't incorrectly reuse this child number for a
800 // subsequent priority.
801 for (XdsLocalityName* old_locality :
802 child_locality_map[*child_number]) {
803 locality_child_map.erase(old_locality);
807 // Remove all localities that are now in this child number, so
808 // that we don't accidentally reuse this child number for a
809 // subsequent priority.
810 locality_child_map.erase(locality_name);
813 // If we didn't find an existing child number, assign a new one.
814 if (!child_number.has_value()) {
815 for (child_number = 0;
816 child_locality_map.find(*child_number) != child_locality_map.end();
819 // Add entry so we know that the child number is in use.
820 // (Don't need to add the list of localities, since we won't use them.)
821 child_locality_map[*child_number];
823 priority_child_numbers.push_back(*child_number);
826 priority_list_ = std::move(priority_list);
827 priority_child_numbers_ = std::move(priority_child_numbers);
828 // Update child policy.
829 UpdateChildPolicyLocked();
832 ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
833 ServerAddressList addresses;
834 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
835 const auto& localities = priority_list_[priority].localities;
836 std::string priority_child_name =
837 absl::StrCat("child", priority_child_numbers_[priority]);
838 for (const auto& p : localities) {
839 const auto& locality_name = p.first;
840 const auto& locality = p.second;
841 std::vector<std::string> hierarchical_path = {
842 priority_child_name, locality_name->AsHumanReadableString()};
843 for (const auto& endpoint : locality.endpoints) {
844 const ServerAddressWeightAttribute* weight_attribute = static_cast<
845 const ServerAddressWeightAttribute*>(endpoint.GetAttribute(
846 ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
847 uint32_t weight = locality.lb_weight;
848 if (weight_attribute != nullptr) {
849 weight = locality.lb_weight * weight_attribute->weight();
851 addresses.emplace_back(
853 .WithAttribute(kHierarchicalPathAttributeKey,
854 MakeHierarchicalPathAttribute(hierarchical_path))
855 .WithAttribute(kXdsLocalityNameAttributeKey,
856 absl::make_unique<XdsLocalityAttribute>(
857 locality_name->Ref()))
859 ServerAddressWeightAttribute::
860 kServerAddressWeightAttributeKey,
861 absl::make_unique<ServerAddressWeightAttribute>(weight)));
868 RefCountedPtr<LoadBalancingPolicy::Config>
869 XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
870 Json::Object priority_children;
871 Json::Array priority_priorities;
872 // Setting up index to iterate through the discovery mechanisms and keeping
873 // track the discovery_mechanism each priority belongs to.
874 size_t discovery_index = 0;
875 // Setting up num_priorities_remaining to track the priorities in each
876 // discovery_mechanism.
877 size_t num_priorities_remaining_in_discovery =
878 discovery_mechanisms_[discovery_index].num_priorities;
879 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
881 if (!discovery_mechanisms_[discovery_index]
882 .discovery_mechanism->override_child_policy()
884 child_policy = discovery_mechanisms_[discovery_index]
885 .discovery_mechanism->override_child_policy();
887 const auto& xds_lb_policy = config_->xds_lb_policy().object_value();
888 if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) {
889 const auto& localities = priority_list_[priority].localities;
890 Json::Object weighted_targets;
891 for (const auto& p : localities) {
892 XdsLocalityName* locality_name = p.first;
893 const auto& locality = p.second;
894 // Construct JSON object containing locality name.
895 Json::Object locality_name_json;
896 if (!locality_name->region().empty()) {
897 locality_name_json["region"] = locality_name->region();
899 if (!locality_name->zone().empty()) {
900 locality_name_json["zone"] = locality_name->zone();
902 if (!locality_name->sub_zone().empty()) {
903 locality_name_json["sub_zone"] = locality_name->sub_zone();
905 // Add weighted target entry.
906 weighted_targets[locality_name->AsHumanReadableString()] =
908 {"weight", locality.lb_weight},
912 {"round_robin", Json::Object()},
917 // Construct locality-picking policy.
918 // Start with field from our config and add the "targets" field.
919 child_policy = Json::Array{
921 {"weighted_target_experimental",
923 {"targets", Json::Object()},
927 Json::Object& config =
928 *(*child_policy.mutable_array())[0].mutable_object();
929 auto it = config.begin();
930 GPR_ASSERT(it != config.end());
931 (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
933 auto it = xds_lb_policy.find("RING_HASH");
934 GPR_ASSERT(it != xds_lb_policy.end());
935 Json::Object ring_hash_experimental_policy = it->second.object_value();
936 child_policy = Json::Array{
938 {"ring_hash_experimental", ring_hash_experimental_policy},
943 // Wrap it in the drop policy.
944 Json::Array drop_categories;
945 if (discovery_mechanisms_[discovery_index].drop_config != nullptr) {
946 for (const auto& category : discovery_mechanisms_[discovery_index]
947 .drop_config->drop_category_list()) {
948 drop_categories.push_back(Json::Object{
949 {"category", category.name},
950 {"requests_per_million", category.parts_per_million},
954 const auto lrs_key = discovery_mechanisms_[discovery_index]
955 .discovery_mechanism->GetLrsClusterKey();
956 Json::Object xds_cluster_impl_config = {
957 {"clusterName", std::string(lrs_key.first)},
958 {"childPolicy", std::move(child_policy)},
959 {"dropCategories", std::move(drop_categories)},
960 {"maxConcurrentRequests",
961 config_->discovery_mechanisms()[discovery_index]
962 .max_concurrent_requests},
964 if (!lrs_key.second.empty()) {
965 xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
967 if (config_->discovery_mechanisms()[discovery_index]
968 .lrs_load_reporting_server_name.has_value()) {
969 xds_cluster_impl_config["lrsLoadReportingServerName"] =
970 config_->discovery_mechanisms()[discovery_index]
971 .lrs_load_reporting_server_name.value();
973 Json locality_picking_policy = Json::Array{Json::Object{
974 {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
976 // Add priority entry.
977 const size_t child_number = priority_child_numbers_[priority];
978 std::string child_name = absl::StrCat("child", child_number);
979 priority_priorities.emplace_back(child_name);
980 Json::Object child_config = {
981 {"config", std::move(locality_picking_policy)},
983 if (discovery_mechanisms_[discovery_index]
984 .discovery_mechanism->disable_reresolution()) {
985 child_config["ignore_reresolution_requests"] = true;
987 priority_children[child_name] = std::move(child_config);
988 // Each priority in the priority_list_ should correspond to a priority in a
989 // discovery mechanism in discovery_mechanisms_ (both in the same order).
990 // Keeping track of the discovery_mechanism each priority belongs to.
991 --num_priorities_remaining_in_discovery;
992 while (num_priorities_remaining_in_discovery == 0 &&
993 discovery_index < discovery_mechanisms_.size() - 1) {
995 num_priorities_remaining_in_discovery =
996 discovery_mechanisms_[discovery_index].num_priorities;
999 // There should be matching number of priorities in discovery_mechanisms_ and
1000 // in priority_list_; therefore at the end of looping through all the
1001 // priorities, num_priorities_remaining should be down to 0, and index should
1002 // be the last index in discovery_mechanisms_.
1003 GPR_ASSERT(num_priorities_remaining_in_discovery == 0);
1004 GPR_ASSERT(discovery_index == discovery_mechanisms_.size() - 1);
1005 Json json = Json::Array{Json::Object{
1006 {"priority_experimental",
1008 {"children", std::move(priority_children)},
1009 {"priorities", std::move(priority_priorities)},
1012 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1013 std::string json_str = json.Dump(/*indent=*/1);
1016 "[xds_cluster_resolver_lb %p] generated config for child policy: %s",
1017 this, json_str.c_str());
1019 grpc_error_handle error = GRPC_ERROR_NONE;
1020 RefCountedPtr<LoadBalancingPolicy::Config> config =
1021 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
1022 if (error != GRPC_ERROR_NONE) {
1023 // This should never happen, but if it does, we basically have no
1024 // way to fix it, so we put the channel in TRANSIENT_FAILURE.
1026 "[xds_cluster_resolver_lb %p] error parsing generated child policy "
1028 "will put channel in TRANSIENT_FAILURE: %s",
1029 this, grpc_error_std_string(error).c_str());
1030 absl::Status status = absl::InternalError(
1031 "xds_cluster_resolver LB policy: error parsing generated child policy "
1033 channel_control_helper()->UpdateState(
1034 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1035 absl::make_unique<TransientFailurePicker>(status));
1041 void XdsClusterResolverLb::UpdateChildPolicyLocked() {
1042 if (shutting_down_) return;
1043 UpdateArgs update_args;
1044 update_args.config = CreateChildPolicyConfigLocked();
1045 if (update_args.config == nullptr) return;
1046 update_args.addresses = CreateChildPolicyAddressesLocked();
1047 update_args.args = CreateChildPolicyArgsLocked(args_);
1048 if (child_policy_ == nullptr) {
1049 child_policy_ = CreateChildPolicyLocked(update_args.args);
1051 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1052 gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p",
1053 this, child_policy_.get());
1055 child_policy_->UpdateLocked(std::move(update_args));
1058 grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked(
1059 const grpc_channel_args* args) {
1060 absl::InlinedVector<grpc_arg, 2> new_args = {
1061 // Inhibit client-side health checking, since the balancer does this
1063 grpc_channel_arg_integer_create(
1064 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
1066 if (!is_xds_uri_) new_args.push_back(xds_client_->MakeChannelArg());
1067 return grpc_channel_args_copy_and_add(args, new_args.data(), new_args.size());
1070 OrphanablePtr<LoadBalancingPolicy>
1071 XdsClusterResolverLb::CreateChildPolicyLocked(const grpc_channel_args* args) {
1072 LoadBalancingPolicy::Args lb_policy_args;
1073 lb_policy_args.work_serializer = work_serializer();
1074 lb_policy_args.args = args;
1075 lb_policy_args.channel_control_helper =
1076 absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
1077 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1078 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1079 "priority_experimental", std::move(lb_policy_args));
1080 if (GPR_UNLIKELY(lb_policy == nullptr)) {
1082 "[xds_cluster_resolver_lb %p] failure creating child policy", this);
1085 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1087 "[xds_cluster_resolver_lb %p]: Created new child policy %p", this,
1090 // Add our interested_parties pollset_set to that of the newly created
1091 // child policy. This will make the child policy progress upon activity on
1092 // this policy, which in turn is tied to the application's call.
1093 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1094 interested_parties());
1102 class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
1104 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1105 LoadBalancingPolicy::Args args) const override {
1106 // Find server name.
1107 const char* server_uri =
1108 grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
1109 GPR_ASSERT(server_uri != nullptr);
1110 absl::StatusOr<URI> uri = URI::Parse(server_uri);
1111 GPR_ASSERT(uri.ok() && !uri->path().empty());
1112 absl::string_view server_name = absl::StripPrefix(uri->path(), "/");
1113 // Determine if it's an xds URI.
1114 bool is_xds_uri = uri->scheme() == "xds" || uri->scheme() == "google-c2p";
1116 RefCountedPtr<XdsClient> xds_client =
1117 XdsClient::GetFromChannelArgs(*args.args);
1118 if (xds_client == nullptr) {
1120 grpc_error_handle error = GRPC_ERROR_NONE;
1121 xds_client = XdsClient::GetOrCreate(args.args, &error);
1122 if (error != GRPC_ERROR_NONE) {
1124 "cannot get or create XdsClient to instantiate "
1125 "xds_cluster_resolver LB policy: %s",
1126 grpc_error_std_string(error).c_str());
1127 GRPC_ERROR_UNREF(error);
1132 "XdsClient not present in channel args -- cannot instantiate "
1133 "xds_cluster_resolver LB policy");
1137 return MakeOrphanable<XdsClusterResolverChildHandler>(
1138 std::move(xds_client), std::move(args), server_name, is_xds_uri);
1141 const char* name() const override { return kXdsClusterResolver; }
1143 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1144 const Json& json, grpc_error_handle* error) const override {
1145 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1146 if (json.type() == Json::Type::JSON_NULL) {
1147 // xds_cluster_resolver was mentioned as a policy in the deprecated
1148 // loadBalancingPolicy field or in the client API.
1149 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1150 "field:loadBalancingPolicy error:xds_cluster_resolver policy "
1151 "requires configuration. "
1152 "Please use loadBalancingConfig field of service config instead.");
1155 std::vector<grpc_error_handle> error_list;
1156 std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
1157 discovery_mechanisms;
1158 auto it = json.object_value().find("discoveryMechanisms");
1159 if (it == json.object_value().end()) {
1160 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1161 "field:discoveryMechanisms error:required field missing"));
1162 } else if (it->second.type() != Json::Type::ARRAY) {
1163 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1164 "field:discoveryMechanisms error:type should be array"));
1166 const Json::Array& array = it->second.array_value();
1167 for (size_t i = 0; i < array.size(); ++i) {
1168 XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism;
1169 std::vector<grpc_error_handle> discovery_mechanism_errors =
1170 ParseDiscoveryMechanism(array[i], &discovery_mechanism);
1171 if (!discovery_mechanism_errors.empty()) {
1172 grpc_error_handle error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1173 absl::StrCat("field:discovery_mechanism element: ", i, " error")
1175 for (grpc_error_handle discovery_mechanism_error :
1176 discovery_mechanism_errors) {
1177 error = grpc_error_add_child(error, discovery_mechanism_error);
1179 error_list.push_back(error);
1181 discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
1184 if (discovery_mechanisms.empty()) {
1185 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1186 "field:discovery_mechanism error:list is missing or empty"));
1188 Json xds_lb_policy = Json::Object{
1189 {"ROUND_ROBIN", Json::Object()},
1191 it = json.object_value().find("xdsLbPolicy");
1192 if (it != json.object_value().end()) {
1193 if (it->second.type() != Json::Type::ARRAY) {
1194 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1195 "field:xdsLbPolicy error:type should be array"));
1197 const Json::Array& array = it->second.array_value();
1198 for (size_t i = 0; i < array.size(); ++i) {
1199 if (array[i].type() != Json::Type::OBJECT) {
1200 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1201 "field:xdsLbPolicy error:element should be of type object"));
1204 const Json::Object& policy = array[i].object_value();
1205 auto policy_it = policy.find("ROUND_ROBIN");
1206 if (policy_it != policy.end()) {
1207 if (policy_it->second.type() != Json::Type::OBJECT) {
1208 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1209 "field:ROUND_ROBIN error:type should be object"));
1213 policy_it = policy.find("RING_HASH");
1214 if (policy_it != policy.end()) {
1215 xds_lb_policy = array[i];
1216 size_t min_ring_size;
1217 size_t max_ring_size;
1218 ParseRingHashLbConfig(policy_it->second, &min_ring_size,
1219 &max_ring_size, &error_list);
1224 // Construct config.
1225 if (error_list.empty()) {
1226 return MakeRefCounted<XdsClusterResolverLbConfig>(
1227 std::move(discovery_mechanisms), std::move(xds_lb_policy));
1229 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
1230 "xds_cluster_resolver_experimental LB policy config", &error_list);
1236 static std::vector<grpc_error_handle> ParseDiscoveryMechanism(
1238 XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) {
1239 std::vector<grpc_error_handle> error_list;
1240 if (json.type() != Json::Type::OBJECT) {
1241 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1242 "value should be of type object"));
1246 auto it = json.object_value().find("clusterName");
1247 if (it == json.object_value().end()) {
1248 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1249 "field:clusterName error:required field missing"));
1250 } else if (it->second.type() != Json::Type::STRING) {
1251 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1252 "field:clusterName error:type should be string"));
1254 discovery_mechanism->cluster_name = it->second.string_value();
1256 // LRS load reporting server name.
1257 it = json.object_value().find("lrsLoadReportingServerName");
1258 if (it != json.object_value().end()) {
1259 if (it->second.type() != Json::Type::STRING) {
1260 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1261 "field:lrsLoadReportingServerName error:type should be string"));
1263 discovery_mechanism->lrs_load_reporting_server_name.emplace(
1264 it->second.string_value());
1267 // Max concurrent requests.
1268 discovery_mechanism->max_concurrent_requests = 1024;
1269 it = json.object_value().find("max_concurrent_requests");
1270 if (it != json.object_value().end()) {
1271 if (it->second.type() != Json::Type::NUMBER) {
1272 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1273 "field:max_concurrent_requests error:must be of type number"));
1275 discovery_mechanism->max_concurrent_requests =
1276 gpr_parse_nonnegative_int(it->second.string_value().c_str());
1279 // Discovery Mechanism type
1280 it = json.object_value().find("type");
1281 if (it == json.object_value().end()) {
1282 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1283 "field:type error:required field missing"));
1284 } else if (it->second.type() != Json::Type::STRING) {
1285 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1286 "field:type error:type should be string"));
1288 if (it->second.string_value() == "EDS") {
1289 discovery_mechanism->type = XdsClusterResolverLbConfig::
1290 DiscoveryMechanism::DiscoveryMechanismType::EDS;
1291 it = json.object_value().find("edsServiceName");
1292 if (it != json.object_value().end()) {
1293 if (it->second.type() != Json::Type::STRING) {
1294 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1295 "field:edsServiceName error:type should be string"));
1297 discovery_mechanism->eds_service_name = it->second.string_value();
1300 } else if (it->second.string_value() == "LOGICAL_DNS") {
1301 discovery_mechanism->type = XdsClusterResolverLbConfig::
1302 DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS;
1303 it = json.object_value().find("dnsHostname");
1304 if (it == json.object_value().end()) {
1305 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1306 "field:dnsHostname error:required field missing"));
1307 } else if (it->second.type() != Json::Type::STRING) {
1308 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1309 "field:dnsHostname error:type should be string"));
1311 discovery_mechanism->dns_hostname = it->second.string_value();
1314 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1315 "field:type error:invalid type"));
1321 class XdsClusterResolverChildHandler : public ChildPolicyHandler {
1323 XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
1324 Args args, absl::string_view server_name,
1326 : ChildPolicyHandler(std::move(args),
1327 &grpc_lb_xds_cluster_resolver_trace),
1328 xds_client_(std::move(xds_client)),
1329 server_name_(server_name),
1330 is_xds_uri_(is_xds_uri) {}
1332 bool ConfigChangeRequiresNewPolicyInstance(
1333 LoadBalancingPolicy::Config* old_config,
1334 LoadBalancingPolicy::Config* new_config) const override {
1335 GPR_ASSERT(old_config->name() == kXdsClusterResolver);
1336 GPR_ASSERT(new_config->name() == kXdsClusterResolver);
1337 XdsClusterResolverLbConfig* old_xds_cluster_resolver_config =
1338 static_cast<XdsClusterResolverLbConfig*>(old_config);
1339 XdsClusterResolverLbConfig* new_xds_cluster_resolver_config =
1340 static_cast<XdsClusterResolverLbConfig*>(new_config);
1341 return old_xds_cluster_resolver_config->discovery_mechanisms() !=
1342 new_xds_cluster_resolver_config->discovery_mechanisms();
1345 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1346 const char* /*name*/, LoadBalancingPolicy::Args args) const override {
1347 return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args),
1348 server_name_, is_xds_uri_);
1352 RefCountedPtr<XdsClient> xds_client_;
1353 std::string server_name_;
1360 } // namespace grpc_core
1363 // Plugin registration
1366 void grpc_lb_policy_xds_cluster_resolver_init() {
1367 grpc_core::LoadBalancingPolicyRegistry::Builder::
1368 RegisterLoadBalancingPolicyFactory(
1369 absl::make_unique<grpc_core::XdsClusterResolverLbFactory>());
1372 void grpc_lb_policy_xds_cluster_resolver_shutdown() {}