3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
38 #include "src/core/ext/filters/client_channel/resolver_registry.h"
39 #include "src/core/ext/filters/client_channel/retry_throttle.h"
40 #include "src/core/ext/filters/client_channel/server_address.h"
41 #include "src/core/ext/filters/client_channel/service_config.h"
42 #include "src/core/ext/filters/client_channel/subchannel.h"
43 #include "src/core/ext/filters/deadline/deadline_filter.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/connected_channel.h"
47 #include "src/core/lib/channel/status_util.h"
48 #include "src/core/lib/gpr/string.h"
49 #include "src/core/lib/gprpp/inlined_vector.h"
50 #include "src/core/lib/gprpp/manual_constructor.h"
51 #include "src/core/lib/gprpp/sync.h"
52 #include "src/core/lib/iomgr/combiner.h"
53 #include "src/core/lib/iomgr/iomgr.h"
54 #include "src/core/lib/iomgr/polling_entity.h"
55 #include "src/core/lib/profiling/timers.h"
56 #include "src/core/lib/slice/slice_internal.h"
57 #include "src/core/lib/slice/slice_string_helpers.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 #include "src/core/lib/transport/status_metadata.h"
69 // ResolvingLoadBalancingPolicy::ResolverResultHandler
72 class ResolvingLoadBalancingPolicy::ResolverResultHandler
73 : public Resolver::ResultHandler {
75 explicit ResolverResultHandler(
76 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
77 : parent_(std::move(parent)) {}
79 ~ResolverResultHandler() {
80 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
81 gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
86 void ReturnResult(Resolver::Result result) override {
87 parent_->OnResolverResultChangedLocked(std::move(result));
90 void ReturnError(grpc_error* error) override {
91 parent_->OnResolverError(error);
95 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
99 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
102 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
103 : public LoadBalancingPolicy::ChannelControlHelper {
105 explicit ResolvingControlHelper(
106 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
107 : parent_(std::move(parent)) {}
109 Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
110 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
111 if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
112 return parent_->channel_control_helper()->CreateSubchannel(args);
115 grpc_channel* CreateChannel(const char* target,
116 const grpc_channel_args& args) override {
117 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
118 if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
119 return parent_->channel_control_helper()->CreateChannel(target, args);
122 void UpdateState(grpc_connectivity_state state,
123 UniquePtr<SubchannelPicker> picker) override {
124 if (parent_->resolver_ == nullptr) return; // Shutting down.
125 // If this request is from the pending child policy, ignore it until
126 // it reports READY, at which point we swap it into place.
127 if (CalledByPendingChild()) {
128 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
130 "resolving_lb=%p helper=%p: pending child policy %p reports "
132 parent_.get(), this, child_,
133 grpc_connectivity_state_name(state));
135 if (state != GRPC_CHANNEL_READY) return;
136 grpc_pollset_set_del_pollset_set(
137 parent_->lb_policy_->interested_parties(),
138 parent_->interested_parties());
139 MutexLock lock(&parent_->lb_policy_mu_);
140 parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
141 } else if (!CalledByCurrentChild()) {
142 // This request is from an outdated child, so ignore it.
145 parent_->channel_control_helper()->UpdateState(state, std::move(picker));
148 void RequestReresolution() override {
149 // If there is a pending child policy, ignore re-resolution requests
150 // from the current child policy (or any outdated child).
151 if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
154 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
155 gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
158 if (parent_->resolver_ != nullptr) {
159 parent_->resolver_->RequestReresolutionLocked();
163 void set_child(LoadBalancingPolicy* child) { child_ = child; }
166 bool CalledByPendingChild() const {
167 GPR_ASSERT(child_ != nullptr);
168 return child_ == parent_->pending_lb_policy_.get();
171 bool CalledByCurrentChild() const {
172 GPR_ASSERT(child_ != nullptr);
173 return child_ == parent_->lb_policy_.get();
176 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
177 LoadBalancingPolicy* child_ = nullptr;
181 // ResolvingLoadBalancingPolicy
184 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
185 Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
186 UniquePtr<char> child_policy_name,
187 RefCountedPtr<ParsedLoadBalancingConfig> child_lb_config,
189 : LoadBalancingPolicy(std::move(args)),
191 target_uri_(std::move(target_uri)),
192 child_policy_name_(std::move(child_policy_name)),
193 child_lb_config_(std::move(child_lb_config)) {
194 GPR_ASSERT(child_policy_name_ != nullptr);
195 // Don't fetch service config, since this ctor is for use in nested LB
196 // policies, not at the top level, and we only fetch the service
197 // config at the top level.
198 grpc_arg arg = grpc_channel_arg_integer_create(
199 const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0);
200 grpc_channel_args* new_args =
201 grpc_channel_args_copy_and_add(args.args, &arg, 1);
202 *error = Init(*new_args);
203 grpc_channel_args_destroy(new_args);
206 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
207 Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
208 ProcessResolverResultCallback process_resolver_result,
209 void* process_resolver_result_user_data, grpc_error** error)
210 : LoadBalancingPolicy(std::move(args)),
212 target_uri_(std::move(target_uri)),
213 process_resolver_result_(process_resolver_result),
214 process_resolver_result_user_data_(process_resolver_result_user_data) {
215 GPR_ASSERT(process_resolver_result != nullptr);
216 gpr_mu_init(&lb_policy_mu_);
217 *error = Init(*args.args);
220 grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
221 resolver_ = ResolverRegistry::CreateResolver(
222 target_uri_.get(), &args, interested_parties(), combiner(),
223 UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
224 if (resolver_ == nullptr) {
225 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
227 // Return our picker to the channel.
228 channel_control_helper()->UpdateState(
229 GRPC_CHANNEL_IDLE, UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
230 return GRPC_ERROR_NONE;
233 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
234 GPR_ASSERT(resolver_ == nullptr);
235 GPR_ASSERT(lb_policy_ == nullptr);
236 gpr_mu_destroy(&lb_policy_mu_);
239 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
240 if (resolver_ != nullptr) {
242 MutexLock lock(&lb_policy_mu_);
243 if (lb_policy_ != nullptr) {
244 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
245 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
248 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
249 interested_parties());
252 if (pending_lb_policy_ != nullptr) {
253 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
254 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
255 this, pending_lb_policy_.get());
257 grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
258 interested_parties());
259 pending_lb_policy_.reset();
264 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
265 if (lb_policy_ != nullptr) {
266 lb_policy_->ExitIdleLocked();
267 if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
269 if (!started_resolving_ && resolver_ != nullptr) {
270 StartResolvingLocked();
275 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
276 if (resolver_ != nullptr) {
277 resolver_->ResetBackoffLocked();
278 resolver_->RequestReresolutionLocked();
280 if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
281 if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
284 void ResolvingLoadBalancingPolicy::FillChildRefsForChannelz(
285 channelz::ChildRefsList* child_subchannels,
286 channelz::ChildRefsList* child_channels) {
287 // Delegate to the lb_policy_ to fill the children subchannels.
288 // This must be done holding lb_policy_mu_, since this method does not
289 // run in the combiner.
290 MutexLock lock(&lb_policy_mu_);
291 if (lb_policy_ != nullptr) {
292 lb_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
294 if (pending_lb_policy_ != nullptr) {
295 pending_lb_policy_->FillChildRefsForChannelz(child_subchannels,
300 void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
301 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
302 gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
304 GPR_ASSERT(!started_resolving_);
305 started_resolving_ = true;
306 channel_control_helper()->UpdateState(
307 GRPC_CHANNEL_CONNECTING,
308 UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
309 resolver_->StartLocked();
312 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
313 if (resolver_ == nullptr) {
314 GRPC_ERROR_UNREF(error);
317 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
318 gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
319 grpc_error_string(error));
321 // If we already have an LB policy from a previous resolution
322 // result, then we continue to let it set the connectivity state.
323 // Otherwise, we go into TRANSIENT_FAILURE.
324 if (lb_policy_ == nullptr) {
325 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
326 "Resolver transient failure", &error, 1);
327 channel_control_helper()->UpdateState(
328 GRPC_CHANNEL_TRANSIENT_FAILURE,
329 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
331 GRPC_ERROR_UNREF(error);
334 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
335 const char* lb_policy_name,
336 RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config,
337 Resolver::Result result, TraceStringVector* trace_strings) {
338 // If the child policy name changes, we need to create a new child
339 // policy. When this happens, we leave child_policy_ as-is and store
340 // the new child policy in pending_child_policy_. Once the new child
341 // policy transitions into state READY, we swap it into child_policy_,
342 // replacing the original child policy. So pending_child_policy_ is
343 // non-null only between when we apply an update that changes the child
344 // policy name and when the new child reports state READY.
346 // Updates can arrive at any point during this transition. We always
347 // apply updates relative to the most recently created child policy,
348 // even if the most recent one is still in pending_child_policy_. This
349 // is true both when applying the updates to an existing child policy
350 // and when determining whether we need to create a new policy.
352 // As a result of this, there are several cases to consider here:
354 // 1. We have no existing child policy (i.e., we have started up but
355 // have not yet received a serverlist from the balancer or gone
356 // into fallback mode; in this case, both child_policy_ and
357 // pending_child_policy_ are null). In this case, we create a
358 // new child policy and store it in child_policy_.
360 // 2. We have an existing child policy and have no pending child policy
361 // from a previous update (i.e., either there has not been a
362 // previous update that changed the policy name, or we have already
363 // finished swapping in the new policy; in this case, child_policy_
364 // is non-null but pending_child_policy_ is null). In this case:
365 // a. If child_policy_->name() equals child_policy_name, then we
366 // update the existing child policy.
367 // b. If child_policy_->name() does not equal child_policy_name,
368 // we create a new policy. The policy will be stored in
369 // pending_child_policy_ and will later be swapped into
370 // child_policy_ by the helper when the new child transitions
373 // 3. We have an existing child policy and have a pending child policy
374 // from a previous update (i.e., a previous update set
375 // pending_child_policy_ as per case 2b above and that policy has
376 // not yet transitioned into state READY and been swapped into
377 // child_policy_; in this case, both child_policy_ and
378 // pending_child_policy_ are non-null). In this case:
379 // a. If pending_child_policy_->name() equals child_policy_name,
380 // then we update the existing pending child policy.
381 // b. If pending_child_policy->name() does not equal
382 // child_policy_name, then we create a new policy. The new
383 // policy is stored in pending_child_policy_ (replacing the one
384 // that was there before, which will be immediately shut down)
385 // and will later be swapped into child_policy_ by the helper
386 // when the new child transitions into state READY.
387 const bool create_policy =
389 lb_policy_ == nullptr ||
391 (pending_lb_policy_ == nullptr &&
392 strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
394 (pending_lb_policy_ != nullptr &&
395 strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
396 LoadBalancingPolicy* policy_to_update = nullptr;
398 // Cases 1, 2b, and 3b: create a new child policy.
399 // If lb_policy_ is null, we set it (case 1), else we set
400 // pending_lb_policy_ (cases 2b and 3b).
401 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
402 gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
403 lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
406 CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
407 auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
409 MutexLock lock(&lb_policy_mu_);
410 lb_policy = std::move(new_policy);
412 policy_to_update = lb_policy.get();
414 // Cases 2a and 3a: update an existing policy.
415 // If we have a pending child policy, send the update to the pending
416 // policy (case 3a), else send it to the current policy (case 2a).
417 policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
420 GPR_ASSERT(policy_to_update != nullptr);
421 // Update the policy.
422 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
423 gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
424 policy_to_update == pending_lb_policy_.get() ? "pending " : "",
427 UpdateArgs update_args;
428 update_args.addresses = std::move(result.addresses);
429 update_args.config = std::move(lb_policy_config);
430 // TODO(roth): Once channel args is converted to C++, use std::move() here.
431 update_args.args = result.args;
432 result.args = nullptr;
433 policy_to_update->UpdateLocked(std::move(update_args));
436 // Creates a new LB policy.
437 // Updates trace_strings to indicate what was done.
438 OrphanablePtr<LoadBalancingPolicy>
439 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
440 const char* lb_policy_name, const grpc_channel_args& args,
441 TraceStringVector* trace_strings) {
442 ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
443 LoadBalancingPolicy::Args lb_policy_args;
444 lb_policy_args.combiner = combiner();
445 lb_policy_args.channel_control_helper =
446 UniquePtr<ChannelControlHelper>(helper);
447 lb_policy_args.args = &args;
448 OrphanablePtr<LoadBalancingPolicy> lb_policy =
449 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
450 lb_policy_name, std::move(lb_policy_args));
451 if (GPR_UNLIKELY(lb_policy == nullptr)) {
452 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
453 if (channelz_node() != nullptr) {
455 gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
456 trace_strings->push_back(str);
460 helper->set_child(lb_policy.get());
461 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
462 gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
463 this, lb_policy_name, lb_policy.get());
465 if (channelz_node() != nullptr) {
467 gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
468 trace_strings->push_back(str);
470 // Propagate channelz node.
471 auto* channelz = channelz_node();
472 if (channelz != nullptr) {
473 lb_policy->set_channelz_node(channelz->Ref());
475 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
476 interested_parties());
480 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
481 bool resolution_contains_addresses, TraceStringVector* trace_strings) {
482 if (!resolution_contains_addresses &&
483 previous_resolution_contained_addresses_) {
484 trace_strings->push_back(gpr_strdup("Address list became empty"));
485 } else if (resolution_contains_addresses &&
486 !previous_resolution_contained_addresses_) {
487 trace_strings->push_back(gpr_strdup("Address list became non-empty"));
489 previous_resolution_contained_addresses_ = resolution_contains_addresses;
492 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
493 TraceStringVector* trace_strings) const {
494 if (!trace_strings->empty()) {
497 gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
499 for (size_t i = 0; i < trace_strings->size(); ++i) {
500 if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
502 gpr_strvec_add(&v, (*trace_strings)[i]);
506 flat = gpr_strvec_flatten(&v, &flat_len);
507 channelz_node()->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
508 grpc_slice_new(flat, flat_len, gpr_free));
509 gpr_strvec_destroy(&v);
513 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
514 Resolver::Result result) {
515 // Handle race conditions.
516 if (resolver_ == nullptr) return;
517 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
518 gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
520 // We only want to trace the address resolution in the follow cases:
521 // (a) Address resolution resulted in service config change.
522 // (b) Address resolution that causes number of backends to go from
524 // (c) Address resolution that causes number of backends to go from
526 // (d) Address resolution that causes a new LB policy to be created.
528 // We track a list of strings to eventually be concatenated and traced.
529 TraceStringVector trace_strings;
530 const bool resolution_contains_addresses = result.addresses.size() > 0;
531 // Process the resolver result.
532 const char* lb_policy_name = nullptr;
533 RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config;
534 bool service_config_changed = false;
535 char* service_config_error_string = nullptr;
536 if (process_resolver_result_ != nullptr) {
537 grpc_error* service_config_error = GRPC_ERROR_NONE;
538 service_config_changed = process_resolver_result_(
539 process_resolver_result_user_data_, result, &lb_policy_name,
540 &lb_policy_config, &service_config_error);
541 if (service_config_error != GRPC_ERROR_NONE) {
542 service_config_error_string =
543 gpr_strdup(grpc_error_string(service_config_error));
544 if (lb_policy_name == nullptr) {
545 // Use an empty lb_policy_name as an indicator that we received an
546 // invalid service config and we don't have a fallback service config.
547 OnResolverError(service_config_error);
549 GRPC_ERROR_UNREF(service_config_error);
553 lb_policy_name = child_policy_name_.get();
554 lb_policy_config = child_lb_config_;
556 if (lb_policy_name != nullptr) {
557 // Create or update LB policy, as needed.
558 CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config,
559 std::move(result), &trace_strings);
561 // Add channel trace event.
562 if (channelz_node() != nullptr) {
563 if (service_config_changed) {
564 // TODO(ncteisen): might be worth somehow including a snippet of the
565 // config in the trace, at the risk of bloating the trace logs.
566 trace_strings.push_back(gpr_strdup("Service config changed"));
568 if (service_config_error_string != nullptr) {
569 trace_strings.push_back(service_config_error_string);
570 service_config_error_string = nullptr;
572 MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
574 ConcatenateAndAddChannelTraceLocked(&trace_strings);
576 gpr_free(service_config_error_string);
579 } // namespace grpc_core