2 // Copyright 2018 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <grpc/support/port_platform.h>
19 #include "absl/strings/string_view.h"
21 #include <grpc/grpc.h>
23 #include "src/core/ext/filters/client_channel/lb_policy.h"
24 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
25 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
27 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/ext/xds/xds_client_stats.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/gpr/env.h"
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
40 TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
45 // global circuit breaker atomic map
48 class CircuitBreakerCallCounterMap {
51 std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
53 class CallCounter : public RefCounted<CallCounter> {
55 explicit CallCounter(Key key) : key_(std::move(key)) {}
56 ~CallCounter() override;
58 uint32_t Increment() { return concurrent_requests_.FetchAdd(1); }
59 void Decrement() { concurrent_requests_.FetchSub(1); }
63 Atomic<uint32_t> concurrent_requests_{0};
66 RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
67 const std::string& eds_service_name);
71 std::map<Key, CallCounter*> map_;
74 CircuitBreakerCallCounterMap* g_call_counter_map = nullptr;
76 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
77 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
78 const std::string& eds_service_name) {
79 Key key(cluster, eds_service_name);
80 RefCountedPtr<CallCounter> result;
82 auto it = map_.find(key);
83 if (it == map_.end()) {
84 it = map_.insert({key, nullptr}).first;
86 result = it->second->RefIfNonZero();
88 if (result == nullptr) {
89 result = MakeRefCounted<CallCounter>(std::move(key));
90 it->second = result.get();
95 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
96 MutexLock lock(&g_call_counter_map->mu_);
97 auto it = g_call_counter_map->map_.find(key_);
98 if (it != g_call_counter_map->map_.end() && it->second == this) {
99 g_call_counter_map->map_.erase(it);
107 constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental";
109 // TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
110 // removed once circuit breaking feature is fully integrated and enabled by
112 bool XdsCircuitBreakingEnabled() {
113 char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
115 bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
117 return parse_succeeded && parsed_value;
120 // Config for xDS Cluster Impl LB policy.
121 class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
123 XdsClusterImplLbConfig(
124 RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
125 std::string cluster_name, std::string eds_service_name,
126 absl::optional<std::string> lrs_load_reporting_server_name,
127 uint32_t max_concurrent_requests,
128 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)
129 : child_policy_(std::move(child_policy)),
130 cluster_name_(std::move(cluster_name)),
131 eds_service_name_(std::move(eds_service_name)),
132 lrs_load_reporting_server_name_(
133 std::move(lrs_load_reporting_server_name)),
134 max_concurrent_requests_(max_concurrent_requests),
135 drop_config_(std::move(drop_config)) {}
137 const char* name() const override { return kXdsClusterImpl; }
139 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
140 return child_policy_;
142 const std::string& cluster_name() const { return cluster_name_; }
143 const std::string& eds_service_name() const { return eds_service_name_; }
144 const absl::optional<std::string>& lrs_load_reporting_server_name() const {
145 return lrs_load_reporting_server_name_;
147 const uint32_t max_concurrent_requests() const {
148 return max_concurrent_requests_;
150 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config() const {
155 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
156 std::string cluster_name_;
157 std::string eds_service_name_;
158 absl::optional<std::string> lrs_load_reporting_server_name_;
159 uint32_t max_concurrent_requests_;
160 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
163 // xDS Cluster Impl LB policy.
164 class XdsClusterImplLb : public LoadBalancingPolicy {
166 XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
168 const char* name() const override { return kXdsClusterImpl; }
170 void UpdateLocked(UpdateArgs args) override;
171 void ExitIdleLocked() override;
172 void ResetBackoffLocked() override;
175 class StatsSubchannelWrapper : public DelegatingSubchannel {
177 StatsSubchannelWrapper(
178 RefCountedPtr<SubchannelInterface> wrapped_subchannel,
179 RefCountedPtr<XdsClusterLocalityStats> locality_stats)
180 : DelegatingSubchannel(std::move(wrapped_subchannel)),
181 locality_stats_(std::move(locality_stats)) {}
183 XdsClusterLocalityStats* locality_stats() const {
184 return locality_stats_.get();
188 RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
191 // A simple wrapper for ref-counting a picker from the child policy.
192 class RefCountedPicker : public RefCounted<RefCountedPicker> {
194 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
195 : picker_(std::move(picker)) {}
196 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
199 std::unique_ptr<SubchannelPicker> picker_;
202 // A picker that wraps the picker from the child to perform drops.
203 class Picker : public SubchannelPicker {
205 Picker(XdsClusterImplLb* xds_cluster_impl_lb,
206 RefCountedPtr<RefCountedPicker> picker);
208 PickResult Pick(PickArgs args) override;
211 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
212 bool xds_circuit_breaking_enabled_;
213 uint32_t max_concurrent_requests_;
214 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
215 RefCountedPtr<XdsClusterDropStats> drop_stats_;
216 RefCountedPtr<RefCountedPicker> picker_;
219 class Helper : public ChannelControlHelper {
221 explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
222 : xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {}
225 xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper");
228 RefCountedPtr<SubchannelInterface> CreateSubchannel(
229 ServerAddress address, const grpc_channel_args& args) override;
230 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
231 std::unique_ptr<SubchannelPicker> picker) override;
232 void RequestReresolution() override;
233 void AddTraceEvent(TraceSeverity severity,
234 absl::string_view message) override;
237 RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy_;
240 ~XdsClusterImplLb() override;
242 void ShutdownLocked() override;
244 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
245 const grpc_channel_args* args);
246 void UpdateChildPolicyLocked(ServerAddressList addresses,
247 const grpc_channel_args* args);
249 void MaybeUpdatePickerLocked();
251 // Current config from the resolver.
252 RefCountedPtr<XdsClusterImplLbConfig> config_;
254 // Current concurrent number of requests.
255 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
258 bool shutting_down_ = false;
261 RefCountedPtr<XdsClient> xds_client_;
263 // The stats for client-side load reporting.
264 RefCountedPtr<XdsClusterDropStats> drop_stats_;
266 OrphanablePtr<LoadBalancingPolicy> child_policy_;
268 // Latest state and picker reported by the child policy.
269 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
270 absl::Status status_;
271 RefCountedPtr<RefCountedPicker> picker_;
275 // XdsClusterImplLb::Picker
278 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
279 RefCountedPtr<RefCountedPicker> picker)
280 : call_counter_(xds_cluster_impl_lb->call_counter_),
281 xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
282 max_concurrent_requests_(
283 xds_cluster_impl_lb->config_->max_concurrent_requests()),
284 drop_config_(xds_cluster_impl_lb->config_->drop_config()),
285 drop_stats_(xds_cluster_impl_lb->drop_stats_),
286 picker_(std::move(picker)) {
287 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
288 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
289 xds_cluster_impl_lb, this);
293 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
294 LoadBalancingPolicy::PickArgs args) {
296 const std::string* drop_category;
297 if (drop_config_->ShouldDrop(&drop_category)) {
298 if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
300 result.type = PickResult::PICK_COMPLETE;
303 // Handle circuit breaking.
304 uint32_t current = call_counter_->Increment();
305 if (xds_circuit_breaking_enabled_) {
306 // Check and see if we exceeded the max concurrent requests count.
307 if (current >= max_concurrent_requests_) {
308 call_counter_->Decrement();
309 if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
311 result.type = PickResult::PICK_COMPLETE;
315 // If we're not dropping the call, we should always have a child picker.
316 if (picker_ == nullptr) { // Should never happen.
318 result.type = PickResult::PICK_FAILED;
319 result.error = grpc_error_set_int(
320 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
321 "xds_cluster_impl picker not given any child picker"),
322 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
323 call_counter_->Decrement();
326 // Not dropping, so delegate to child picker.
327 PickResult result = picker_->Pick(args);
328 if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
329 XdsClusterLocalityStats* locality_stats = nullptr;
330 if (drop_stats_ != nullptr) { // If load reporting is enabled.
331 auto* subchannel_wrapper =
332 static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
333 // Handle load reporting.
334 locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
335 // Record a call started.
336 locality_stats->AddCallStarted();
337 // Unwrap subchannel to pass back up the stack.
338 result.subchannel = subchannel_wrapper->wrapped_subchannel();
340 // Intercept the recv_trailing_metadata op to record call completion.
341 auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release();
342 auto original_recv_trailing_metadata_ready =
343 result.recv_trailing_metadata_ready;
344 result.recv_trailing_metadata_ready =
345 // Note: This callback does not run in either the control plane
346 // work serializer or in the data plane mutex.
347 [locality_stats, original_recv_trailing_metadata_ready, call_counter](
348 grpc_error* error, MetadataInterface* metadata,
349 CallState* call_state) {
350 // Record call completion for load reporting.
351 if (locality_stats != nullptr) {
352 const bool call_failed = error != GRPC_ERROR_NONE;
353 locality_stats->AddCallFinished(call_failed);
354 locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
356 // Decrement number of calls in flight.
357 call_counter->Decrement();
358 call_counter->Unref(DEBUG_LOCATION, "call");
359 // Invoke the original recv_trailing_metadata_ready callback, if any.
360 if (original_recv_trailing_metadata_ready != nullptr) {
361 original_recv_trailing_metadata_ready(error, metadata, call_state);
365 // TODO(roth): We should ideally also record call failures here in the case
366 // where a pick fails. This is challenging, because we don't know which
367 // picks are for wait_for_ready RPCs or how many times we'll return a
368 // failure for the same wait_for_ready RPC.
369 call_counter_->Decrement();
378 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
380 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
381 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
382 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
383 this, xds_client_.get());
387 XdsClusterImplLb::~XdsClusterImplLb() {
388 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
390 "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
395 void XdsClusterImplLb::ShutdownLocked() {
396 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
397 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
399 shutting_down_ = true;
400 // Remove the child policy's interested_parties pollset_set from the
402 if (child_policy_ != nullptr) {
403 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
404 interested_parties());
405 child_policy_.reset();
407 // Drop our ref to the child's picker, in case it's holding a ref to
414 void XdsClusterImplLb::ExitIdleLocked() {
415 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
418 void XdsClusterImplLb::ResetBackoffLocked() {
419 // The XdsClient will have its backoff reset by the xds resolver, so we
420 // don't need to do it here.
421 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
424 void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
425 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
426 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
429 const bool is_initial_update = config_ == nullptr;
430 auto old_config = std::move(config_);
431 config_ = std::move(args.config);
432 // On initial update, create drop stats.
433 if (is_initial_update) {
434 if (config_->lrs_load_reporting_server_name().has_value()) {
435 drop_stats_ = xds_client_->AddClusterDropStats(
436 config_->lrs_load_reporting_server_name().value(),
437 config_->cluster_name(), config_->eds_service_name());
439 call_counter_ = g_call_counter_map->GetOrCreate(
440 config_->cluster_name(), config_->eds_service_name());
442 // Cluster name, EDS service name, and LRS server name should never
443 // change, because the EDS policy above us should be swapped out if
445 GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
446 GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
447 GPR_ASSERT(config_->lrs_load_reporting_server_name() ==
448 old_config->lrs_load_reporting_server_name());
450 // Update picker if max_concurrent_requests has changed.
451 if (is_initial_update || config_->max_concurrent_requests() !=
452 old_config->max_concurrent_requests()) {
453 MaybeUpdatePickerLocked();
455 // Update child policy.
456 UpdateChildPolicyLocked(std::move(args.addresses), args.args);
460 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
461 // If we're dropping all calls, report READY, regardless of what (or
462 // whether) the child has reported.
463 if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
464 auto drop_picker = absl::make_unique<Picker>(this, picker_);
465 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
467 "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
470 this, drop_picker.get());
472 channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
473 std::move(drop_picker));
476 // Otherwise, update only if we have a child picker.
477 if (picker_ != nullptr) {
478 auto drop_picker = absl::make_unique<Picker>(this, picker_);
479 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
481 "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
484 this, ConnectivityStateName(state_), status_.ToString().c_str(),
487 channel_control_helper()->UpdateState(state_, status_,
488 std::move(drop_picker));
492 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
493 const grpc_channel_args* args) {
494 LoadBalancingPolicy::Args lb_policy_args;
495 lb_policy_args.work_serializer = work_serializer();
496 lb_policy_args.args = args;
497 lb_policy_args.channel_control_helper =
498 absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
499 OrphanablePtr<LoadBalancingPolicy> lb_policy =
500 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
501 &grpc_xds_cluster_impl_lb_trace);
502 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
504 "[xds_cluster_impl_lb %p] Created new child policy handler %p",
505 this, lb_policy.get());
507 // Add our interested_parties pollset_set to that of the newly created
508 // child policy. This will make the child policy progress upon activity on
509 // this policy, which in turn is tied to the application's call.
510 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
511 interested_parties());
515 void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses,
516 const grpc_channel_args* args) {
517 // Create policy if needed.
518 if (child_policy_ == nullptr) {
519 child_policy_ = CreateChildPolicyLocked(args);
521 // Construct update args.
522 UpdateArgs update_args;
523 update_args.addresses = std::move(addresses);
524 update_args.config = config_->child_policy();
525 update_args.args = args;
526 // Update the policy.
527 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
529 "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
530 child_policy_.get());
532 child_policy_->UpdateLocked(std::move(update_args));
536 // XdsClusterImplLb::Helper
539 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
540 ServerAddress address, const grpc_channel_args& args) {
541 if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
542 // If load reporting is enabled, wrap the subchannel such that it
543 // includes the locality stats object, which will be used by the EdsPicker.
544 if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name()
546 RefCountedPtr<XdsLocalityName> locality_name;
547 auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
548 if (attribute != nullptr) {
549 const auto* locality_attr =
550 static_cast<const XdsLocalityAttribute*>(attribute);
551 locality_name = locality_attr->locality_name();
553 RefCountedPtr<XdsClusterLocalityStats> locality_stats =
554 xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
555 *xds_cluster_impl_policy_->config_
556 ->lrs_load_reporting_server_name(),
557 xds_cluster_impl_policy_->config_->cluster_name(),
558 xds_cluster_impl_policy_->config_->eds_service_name(),
559 std::move(locality_name));
560 return MakeRefCounted<StatsSubchannelWrapper>(
561 xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
562 std::move(address), args),
563 std::move(locality_stats));
565 // Load reporting not enabled, so don't wrap the subchannel.
566 return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
567 std::move(address), args);
570 void XdsClusterImplLb::Helper::UpdateState(
571 grpc_connectivity_state state, const absl::Status& status,
572 std::unique_ptr<SubchannelPicker> picker) {
573 if (xds_cluster_impl_policy_->shutting_down_) return;
574 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
576 "[xds_cluster_impl_lb %p] child connectivity state update: "
579 xds_cluster_impl_policy_.get(), ConnectivityStateName(state),
580 status.ToString().c_str(), picker.get());
582 // Save the state and picker.
583 xds_cluster_impl_policy_->state_ = state;
584 xds_cluster_impl_policy_->status_ = status;
585 xds_cluster_impl_policy_->picker_ =
586 MakeRefCounted<RefCountedPicker>(std::move(picker));
587 // Wrap the picker and return it to the channel.
588 xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
591 void XdsClusterImplLb::Helper::RequestReresolution() {
592 if (xds_cluster_impl_policy_->shutting_down_) return;
593 xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution();
596 void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
597 absl::string_view message) {
598 if (xds_cluster_impl_policy_->shutting_down_) return;
599 xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity,
607 class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
609 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
610 LoadBalancingPolicy::Args args) const override {
611 grpc_error* error = GRPC_ERROR_NONE;
612 RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
613 if (error != GRPC_ERROR_NONE) {
616 "cannot get XdsClient to instantiate xds_cluster_impl LB policy: %s",
617 grpc_error_string(error));
618 GRPC_ERROR_UNREF(error);
621 return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
625 const char* name() const override { return kXdsClusterImpl; }
627 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
628 const Json& json, grpc_error** error) const override {
629 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
630 if (json.type() == Json::Type::JSON_NULL) {
631 // This policy was configured in the deprecated loadBalancingPolicy
632 // field or in the client API.
633 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
634 "field:loadBalancingPolicy error:xds_cluster_impl policy requires "
635 "configuration. Please use loadBalancingConfig field of service "
639 std::vector<grpc_error*> error_list;
641 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
642 auto it = json.object_value().find("childPolicy");
643 if (it == json.object_value().end()) {
644 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
645 "field:childPolicy error:required field missing"));
647 grpc_error* parse_error = GRPC_ERROR_NONE;
648 child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
649 it->second, &parse_error);
650 if (child_policy == nullptr) {
651 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
652 std::vector<grpc_error*> child_errors;
653 child_errors.push_back(parse_error);
654 error_list.push_back(
655 GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
659 std::string cluster_name;
660 it = json.object_value().find("clusterName");
661 if (it == json.object_value().end()) {
662 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
663 "field:clusterName error:required field missing"));
664 } else if (it->second.type() != Json::Type::STRING) {
665 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
666 "field:clusterName error:type should be string"));
668 cluster_name = it->second.string_value();
671 std::string eds_service_name;
672 it = json.object_value().find("edsServiceName");
673 if (it != json.object_value().end()) {
674 if (it->second.type() != Json::Type::STRING) {
675 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
676 "field:edsServiceName error:type should be string"));
678 eds_service_name = it->second.string_value();
681 // LRS load reporting server name.
682 absl::optional<std::string> lrs_load_reporting_server_name;
683 it = json.object_value().find("lrsLoadReportingServerName");
684 if (it != json.object_value().end()) {
685 if (it->second.type() != Json::Type::STRING) {
686 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
687 "field:lrsLoadReportingServerName error:type should be string"));
689 lrs_load_reporting_server_name = it->second.string_value();
692 // Max concurrent requests.
693 uint32_t max_concurrent_requests = 1024;
694 it = json.object_value().find("maxConcurrentRequests");
695 if (it != json.object_value().end()) {
696 if (it->second.type() != Json::Type::NUMBER) {
697 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
698 "field:max_concurrent_requests error:must be of type number"));
700 max_concurrent_requests =
701 gpr_parse_nonnegative_int(it->second.string_value().c_str());
705 auto drop_config = MakeRefCounted<XdsApi::EdsUpdate::DropConfig>();
706 it = json.object_value().find("dropCategories");
707 if (it == json.object_value().end()) {
708 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
709 "field:dropCategories error:required field missing"));
711 std::vector<grpc_error*> child_errors =
712 ParseDropCategories(it->second, drop_config.get());
713 if (!child_errors.empty()) {
714 error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
715 "field:dropCategories", &child_errors));
718 if (!error_list.empty()) {
719 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
720 "xds_cluster_impl_experimental LB policy config", &error_list);
723 return MakeRefCounted<XdsClusterImplLbConfig>(
724 std::move(child_policy), std::move(cluster_name),
725 std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
726 max_concurrent_requests, std::move(drop_config));
730 static std::vector<grpc_error*> ParseDropCategories(
731 const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
732 std::vector<grpc_error*> error_list;
733 if (json.type() != Json::Type::ARRAY) {
734 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
735 "dropCategories field is not an array"));
738 for (size_t i = 0; i < json.array_value().size(); ++i) {
739 const Json& entry = json.array_value()[i];
740 std::vector<grpc_error*> child_errors =
741 ParseDropCategory(entry, drop_config);
742 if (!child_errors.empty()) {
743 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
744 absl::StrCat("errors parsing index ", i).c_str());
745 for (size_t i = 0; i < child_errors.size(); ++i) {
746 error = grpc_error_add_child(error, child_errors[i]);
748 error_list.push_back(error);
754 static std::vector<grpc_error*> ParseDropCategory(
755 const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
756 std::vector<grpc_error*> error_list;
757 if (json.type() != Json::Type::OBJECT) {
758 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
759 "dropCategories entry is not an object"));
762 std::string category;
763 auto it = json.object_value().find("category");
764 if (it == json.object_value().end()) {
765 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
766 "\"category\" field not present"));
767 } else if (it->second.type() != Json::Type::STRING) {
768 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
769 "\"category\" field is not a string"));
771 category = it->second.string_value();
773 uint32_t requests_per_million = 0;
774 it = json.object_value().find("requests_per_million");
775 if (it == json.object_value().end()) {
776 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
777 "\"requests_per_million\" field is not present"));
778 } else if (it->second.type() != Json::Type::NUMBER) {
779 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
780 "\"requests_per_million\" field is not a number"));
782 requests_per_million =
783 gpr_parse_nonnegative_int(it->second.string_value().c_str());
785 if (error_list.empty()) {
786 drop_config->AddCategory(std::move(category), requests_per_million);
794 } // namespace grpc_core
797 // Plugin registration
800 void grpc_lb_policy_xds_cluster_impl_init() {
801 grpc_core::g_call_counter_map = new grpc_core::CircuitBreakerCallCounterMap();
802 grpc_core::LoadBalancingPolicyRegistry::Builder::
803 RegisterLoadBalancingPolicyFactory(
804 absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
807 void grpc_lb_policy_xds_cluster_impl_shutdown() {
808 delete grpc_core::g_call_counter_map;