Imported Upstream version 1.33.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / lb_policy / xds / eds_drop.cc
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "absl/strings/string_view.h"
20
21 #include <grpc/grpc.h>
22
23 #include "src/core/ext/filters/client_channel/lb_policy.h"
24 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
25 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
27 #include "src/core/ext/xds/xds_client.h"
28 #include "src/core/ext/xds/xds_client_stats.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gpr/string.h"
31 #include "src/core/lib/gprpp/orphanable.h"
32 #include "src/core/lib/gprpp/ref_counted_ptr.h"
33 #include "src/core/lib/iomgr/work_serializer.h"
34
35 namespace grpc_core {
36
37 TraceFlag grpc_eds_drop_lb_trace(false, "eds_drop_lb");
38
39 namespace {
40
41 constexpr char kEdsDrop[] = "eds_drop_experimental";
42
43 // Config for EDS drop LB policy.
44 class EdsDropLbConfig : public LoadBalancingPolicy::Config {
45  public:
46   EdsDropLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
47                   std::string cluster_name, std::string eds_service_name,
48                   absl::optional<std::string> lrs_load_reporting_server_name,
49                   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)
50       : child_policy_(std::move(child_policy)),
51         cluster_name_(std::move(cluster_name)),
52         eds_service_name_(std::move(eds_service_name)),
53         lrs_load_reporting_server_name_(
54             std::move(lrs_load_reporting_server_name)),
55         drop_config_(std::move(drop_config)) {}
56
57   const char* name() const override { return kEdsDrop; }
58
59   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
60     return child_policy_;
61   }
62   const std::string& cluster_name() const { return cluster_name_; }
63   const std::string& eds_service_name() const { return eds_service_name_; }
64   const absl::optional<std::string>& lrs_load_reporting_server_name() const {
65     return lrs_load_reporting_server_name_;
66   };
67   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config() const {
68     return drop_config_;
69   }
70
71  private:
72   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
73   std::string cluster_name_;
74   std::string eds_service_name_;
75   absl::optional<std::string> lrs_load_reporting_server_name_;
76   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
77 };
78
79 // EDS Drop LB policy.
80 class EdsDropLb : public LoadBalancingPolicy {
81  public:
82   EdsDropLb(RefCountedPtr<XdsClient> xds_client, Args args);
83
84   const char* name() const override { return kEdsDrop; }
85
86   void UpdateLocked(UpdateArgs args) override;
87   void ExitIdleLocked() override;
88   void ResetBackoffLocked() override;
89
90  private:
91   // A simple wrapper for ref-counting a picker from the child policy.
92   class RefCountedPicker : public RefCounted<RefCountedPicker> {
93    public:
94     explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
95         : picker_(std::move(picker)) {}
96     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
97
98    private:
99     std::unique_ptr<SubchannelPicker> picker_;
100   };
101
102   // A picker that wraps the picker from the child to perform drops.
103   class DropPicker : public SubchannelPicker {
104    public:
105     DropPicker(EdsDropLb* eds_drop_lb, RefCountedPtr<RefCountedPicker> picker)
106         : drop_config_(eds_drop_lb->config_->drop_config()),
107           drop_stats_(eds_drop_lb->drop_stats_),
108           picker_(std::move(picker)) {}
109
110     PickResult Pick(PickArgs args);
111
112    private:
113     RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
114     RefCountedPtr<XdsClusterDropStats> drop_stats_;
115     RefCountedPtr<RefCountedPicker> picker_;
116   };
117
118   class Helper : public ChannelControlHelper {
119    public:
120     explicit Helper(RefCountedPtr<EdsDropLb> eds_drop_policy)
121         : eds_drop_policy_(std::move(eds_drop_policy)) {}
122
123     ~Helper() { eds_drop_policy_.reset(DEBUG_LOCATION, "Helper"); }
124
125     RefCountedPtr<SubchannelInterface> CreateSubchannel(
126         ServerAddress address, const grpc_channel_args& args) override;
127     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
128                      std::unique_ptr<SubchannelPicker> picker) override;
129     void RequestReresolution() override;
130     void AddTraceEvent(TraceSeverity severity,
131                        absl::string_view message) override;
132
133    private:
134     RefCountedPtr<EdsDropLb> eds_drop_policy_;
135   };
136
137   ~EdsDropLb();
138
139   void ShutdownLocked() override;
140
141   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
142       const grpc_channel_args* args);
143   void UpdateChildPolicyLocked(ServerAddressList addresses,
144                                const grpc_channel_args* args);
145
146   void MaybeUpdatePickerLocked();
147
148   // Current config from the resolver.
149   RefCountedPtr<EdsDropLbConfig> config_;
150
151   // Internal state.
152   bool shutting_down_ = false;
153
154   // The xds client.
155   RefCountedPtr<XdsClient> xds_client_;
156
157   // The stats for client-side load reporting.
158   RefCountedPtr<XdsClusterDropStats> drop_stats_;
159
160   OrphanablePtr<LoadBalancingPolicy> child_policy_;
161
162   // Latest state and picker reported by the child policy.
163   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
164   absl::Status status_;
165   RefCountedPtr<RefCountedPicker> picker_;
166 };
167
168 //
169 // EdsDropLb::DropPicker
170 //
171
172 LoadBalancingPolicy::PickResult EdsDropLb::DropPicker::Pick(
173     LoadBalancingPolicy::PickArgs args) {
174   // Handle drop.
175   const std::string* drop_category;
176   if (drop_config_->ShouldDrop(&drop_category)) {
177     if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
178     PickResult result;
179     result.type = PickResult::PICK_COMPLETE;
180     return result;
181   }
182   // If we're not dropping the call, we should always have a child picker.
183   if (picker_ == nullptr) {  // Should never happen.
184     PickResult result;
185     result.type = PickResult::PICK_FAILED;
186     result.error =
187         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
188                                "eds_drop picker not given any child picker"),
189                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
190     return result;
191   }
192   // Not dropping, so delegate to child picker.
193   return picker_->Pick(args);
194 }
195
196 //
197 // EdsDropLb
198 //
199
200 EdsDropLb::EdsDropLb(RefCountedPtr<XdsClient> xds_client, Args args)
201     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
202   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
203     gpr_log(GPR_INFO, "[eds_drop_lb %p] created -- using xds client %p", this,
204             xds_client_.get());
205   }
206 }
207
208 EdsDropLb::~EdsDropLb() {
209   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
210     gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying xds LB policy", this);
211   }
212 }
213
214 void EdsDropLb::ShutdownLocked() {
215   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
216     gpr_log(GPR_INFO, "[eds_drop_lb %p] shutting down", this);
217   }
218   shutting_down_ = true;
219   // Remove the child policy's interested_parties pollset_set from the
220   // xDS policy.
221   if (child_policy_ != nullptr) {
222     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
223                                      interested_parties());
224     child_policy_.reset();
225   }
226   // Drop our ref to the child's picker, in case it's holding a ref to
227   // the child.
228   picker_.reset();
229   drop_stats_.reset();
230   xds_client_.reset();
231 }
232
233 void EdsDropLb::ExitIdleLocked() {
234   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
235 }
236
237 void EdsDropLb::ResetBackoffLocked() {
238   // The XdsClient will have its backoff reset by the xds resolver, so we
239   // don't need to do it here.
240   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
241 }
242
243 void EdsDropLb::UpdateLocked(UpdateArgs args) {
244   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
245     gpr_log(GPR_INFO, "[eds_drop_lb %p] Received update", this);
246   }
247   // Update config.
248   auto old_config = std::move(config_);
249   config_ = std::move(args.config);
250   // Update load reporting if needed.
251   if (old_config == nullptr ||
252       config_->lrs_load_reporting_server_name() !=
253           old_config->lrs_load_reporting_server_name() ||
254       config_->cluster_name() != old_config->cluster_name() ||
255       config_->eds_service_name() != old_config->eds_service_name()) {
256     drop_stats_.reset();
257     if (config_->lrs_load_reporting_server_name().has_value()) {
258       drop_stats_ = xds_client_->AddClusterDropStats(
259           config_->lrs_load_reporting_server_name().value(),
260           config_->cluster_name(), config_->eds_service_name());
261     }
262     MaybeUpdatePickerLocked();
263   }
264   // Update child policy.
265   UpdateChildPolicyLocked(std::move(args.addresses), args.args);
266   args.args = nullptr;
267 }
268
269 void EdsDropLb::MaybeUpdatePickerLocked() {
270   // If we're dropping all calls, report READY, regardless of what (or
271   // whether) the child has reported.
272   if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
273     auto drop_picker = absl::make_unique<DropPicker>(this, picker_);
274     if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
275       gpr_log(GPR_INFO,
276               "[eds_drop_lb %p] updating connectivity (drop all): state=READY "
277               "picker=%p",
278               this, drop_picker.get());
279     }
280     channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
281                                           std::move(drop_picker));
282     return;
283   }
284   // Otherwise, update only if we have a child picker.
285   if (picker_ != nullptr) {
286     auto drop_picker = absl::make_unique<DropPicker>(this, picker_);
287     if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
288       gpr_log(GPR_INFO,
289               "[eds_drop_lb %p] updating connectivity: state=%s status=(%s) "
290               "picker=%p",
291               this, ConnectivityStateName(state_), status_.ToString().c_str(),
292               drop_picker.get());
293     }
294     channel_control_helper()->UpdateState(state_, status_,
295                                           std::move(drop_picker));
296   }
297 }
298
299 OrphanablePtr<LoadBalancingPolicy> EdsDropLb::CreateChildPolicyLocked(
300     const grpc_channel_args* args) {
301   LoadBalancingPolicy::Args lb_policy_args;
302   lb_policy_args.work_serializer = work_serializer();
303   lb_policy_args.args = args;
304   lb_policy_args.channel_control_helper =
305       absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
306   OrphanablePtr<LoadBalancingPolicy> lb_policy =
307       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
308                                          &grpc_eds_drop_lb_trace);
309   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
310     gpr_log(GPR_INFO, "[eds_drop_lb %p] Created new child policy handler %p",
311             this, lb_policy.get());
312   }
313   // Add our interested_parties pollset_set to that of the newly created
314   // child policy. This will make the child policy progress upon activity on
315   // this policy, which in turn is tied to the application's call.
316   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
317                                    interested_parties());
318   return lb_policy;
319 }
320
321 void EdsDropLb::UpdateChildPolicyLocked(ServerAddressList addresses,
322                                         const grpc_channel_args* args) {
323   // Create policy if needed.
324   if (child_policy_ == nullptr) {
325     child_policy_ = CreateChildPolicyLocked(args);
326   }
327   // Construct update args.
328   UpdateArgs update_args;
329   update_args.addresses = std::move(addresses);
330   update_args.config = config_->child_policy();
331   update_args.args = args;
332   // Update the policy.
333   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
334     gpr_log(GPR_INFO, "[eds_drop_lb %p] Updating child policy handler %p", this,
335             child_policy_.get());
336   }
337   child_policy_->UpdateLocked(std::move(update_args));
338 }
339
340 //
341 // EdsDropLb::Helper
342 //
343
344 RefCountedPtr<SubchannelInterface> EdsDropLb::Helper::CreateSubchannel(
345     ServerAddress address, const grpc_channel_args& args) {
346   if (eds_drop_policy_->shutting_down_) return nullptr;
347   return eds_drop_policy_->channel_control_helper()->CreateSubchannel(
348       std::move(address), args);
349 }
350
351 void EdsDropLb::Helper::UpdateState(grpc_connectivity_state state,
352                                     const absl::Status& status,
353                                     std::unique_ptr<SubchannelPicker> picker) {
354   if (eds_drop_policy_->shutting_down_) return;
355   if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
356     gpr_log(GPR_INFO,
357             "[eds_drop_lb %p] child connectivity state update: state=%s (%s) "
358             "picker=%p",
359             eds_drop_policy_.get(), ConnectivityStateName(state),
360             status.ToString().c_str(), picker.get());
361   }
362   // Save the state and picker.
363   eds_drop_policy_->state_ = state;
364   eds_drop_policy_->status_ = status;
365   eds_drop_policy_->picker_ =
366       MakeRefCounted<RefCountedPicker>(std::move(picker));
367   // Wrap the picker and return it to the channel.
368   eds_drop_policy_->MaybeUpdatePickerLocked();
369 }
370
371 void EdsDropLb::Helper::RequestReresolution() {
372   if (eds_drop_policy_->shutting_down_) return;
373   eds_drop_policy_->channel_control_helper()->RequestReresolution();
374 }
375
376 void EdsDropLb::Helper::AddTraceEvent(TraceSeverity severity,
377                                       absl::string_view message) {
378   if (eds_drop_policy_->shutting_down_) return;
379   eds_drop_policy_->channel_control_helper()->AddTraceEvent(severity, message);
380 }
381
382 //
383 // factory
384 //
385
386 class EdsDropLbFactory : public LoadBalancingPolicyFactory {
387  public:
388   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
389       LoadBalancingPolicy::Args args) const override {
390     grpc_error* error = GRPC_ERROR_NONE;
391     RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
392     if (error != GRPC_ERROR_NONE) {
393       gpr_log(GPR_ERROR,
394               "cannot get XdsClient to instantiate eds_drop LB policy: %s",
395               grpc_error_string(error));
396       GRPC_ERROR_UNREF(error);
397       return nullptr;
398     }
399     return MakeOrphanable<EdsDropLb>(std::move(xds_client), std::move(args));
400   }
401
402   const char* name() const override { return kEdsDrop; }
403
404   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
405       const Json& json, grpc_error** error) const override {
406     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
407     if (json.type() == Json::Type::JSON_NULL) {
408       // This policy was configured in the deprecated loadBalancingPolicy
409       // field or in the client API.
410       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
411           "field:loadBalancingPolicy error:eds_drop policy requires "
412           "configuration. Please use loadBalancingConfig field of service "
413           "config instead.");
414       return nullptr;
415     }
416     std::vector<grpc_error*> error_list;
417     // Child policy.
418     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
419     auto it = json.object_value().find("childPolicy");
420     if (it == json.object_value().end()) {
421       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
422           "field:childPolicy error:required field missing"));
423     } else {
424       grpc_error* parse_error = GRPC_ERROR_NONE;
425       child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
426           it->second, &parse_error);
427       if (child_policy == nullptr) {
428         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
429         std::vector<grpc_error*> child_errors;
430         child_errors.push_back(parse_error);
431         error_list.push_back(
432             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
433       }
434     }
435     // Cluster name.
436     std::string cluster_name;
437     it = json.object_value().find("clusterName");
438     if (it == json.object_value().end()) {
439       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
440           "field:clusterName error:required field missing"));
441     } else if (it->second.type() != Json::Type::STRING) {
442       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
443           "field:clusterName error:type should be string"));
444     } else {
445       cluster_name = it->second.string_value();
446     }
447     // EDS service name.
448     std::string eds_service_name;
449     it = json.object_value().find("edsServiceName");
450     if (it != json.object_value().end()) {
451       if (it->second.type() != Json::Type::STRING) {
452         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
453             "field:edsServiceName error:type should be string"));
454       } else {
455         eds_service_name = it->second.string_value();
456       }
457     }
458     // LRS load reporting server name.
459     absl::optional<std::string> lrs_load_reporting_server_name;
460     it = json.object_value().find("lrsLoadReportingServerName");
461     if (it != json.object_value().end()) {
462       if (it->second.type() != Json::Type::STRING) {
463         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
464             "field:lrsLoadReportingServerName error:type should be string"));
465       } else {
466         lrs_load_reporting_server_name = it->second.string_value();
467       }
468     }
469     // Drop config.
470     auto drop_config = MakeRefCounted<XdsApi::EdsUpdate::DropConfig>();
471     it = json.object_value().find("dropCategories");
472     if (it == json.object_value().end()) {
473       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
474           "field:dropCategories error:required field missing"));
475     } else {
476       std::vector<grpc_error*> child_errors =
477           ParseDropCategories(it->second, drop_config.get());
478       if (!child_errors.empty()) {
479         error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
480             "field:dropCategories", &child_errors));
481       }
482     }
483     if (!error_list.empty()) {
484       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
485           "eds_drop_experimental LB policy config", &error_list);
486       return nullptr;
487     }
488     return MakeRefCounted<EdsDropLbConfig>(
489         std::move(child_policy), std::move(cluster_name),
490         std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
491         std::move(drop_config));
492   }
493
494  private:
495   static std::vector<grpc_error*> ParseDropCategories(
496       const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
497     std::vector<grpc_error*> error_list;
498     if (json.type() != Json::Type::ARRAY) {
499       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
500           "dropCategories field is not an array"));
501       return error_list;
502     }
503     for (size_t i = 0; i < json.array_value().size(); ++i) {
504       const Json& entry = json.array_value()[i];
505       std::vector<grpc_error*> child_errors =
506           ParseDropCategory(entry, drop_config);
507       if (!child_errors.empty()) {
508         grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
509             absl::StrCat("errors parsing index ", i).c_str());
510         for (size_t i = 0; i < child_errors.size(); ++i) {
511           error = grpc_error_add_child(error, child_errors[i]);
512         }
513         error_list.push_back(error);
514       }
515     }
516     return error_list;
517   }
518
519   static std::vector<grpc_error*> ParseDropCategory(
520       const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
521     std::vector<grpc_error*> error_list;
522     if (json.type() != Json::Type::OBJECT) {
523       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
524           "dropCategories entry is not an object"));
525       return error_list;
526     }
527     std::string category;
528     auto it = json.object_value().find("category");
529     if (it == json.object_value().end()) {
530       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
531           "\"category\" field not present"));
532     } else if (it->second.type() != Json::Type::STRING) {
533       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
534           "\"category\" field is not a string"));
535     } else {
536       category = it->second.string_value();
537     }
538     uint32_t requests_per_million = 0;
539     it = json.object_value().find("requests_per_million");
540     if (it == json.object_value().end()) {
541       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
542           "\"requests_per_million\" field is not present"));
543     } else if (it->second.type() != Json::Type::NUMBER) {
544       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
545           "\"requests_per_million\" field is not a number"));
546     } else {
547       requests_per_million =
548           gpr_parse_nonnegative_int(it->second.string_value().c_str());
549     }
550     if (error_list.empty()) {
551       drop_config->AddCategory(std::move(category), requests_per_million);
552     }
553     return error_list;
554   }
555 };
556
557 }  // namespace
558
559 }  // namespace grpc_core
560
561 //
562 // Plugin registration
563 //
564
565 void grpc_lb_policy_eds_drop_init() {
566   grpc_core::LoadBalancingPolicyRegistry::Builder::
567       RegisterLoadBalancingPolicyFactory(
568           absl::make_unique<grpc_core::EdsDropLbFactory>());
569 }
570
571 void grpc_lb_policy_eds_drop_shutdown() {}