3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
38 #include "src/core/ext/filters/client_channel/resolver_registry.h"
39 #include "src/core/ext/filters/client_channel/retry_throttle.h"
40 #include "src/core/ext/filters/client_channel/server_address.h"
41 #include "src/core/ext/filters/client_channel/service_config.h"
42 #include "src/core/ext/filters/client_channel/subchannel.h"
43 #include "src/core/ext/filters/deadline/deadline_filter.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/connected_channel.h"
47 #include "src/core/lib/channel/status_util.h"
48 #include "src/core/lib/gpr/string.h"
49 #include "src/core/lib/gprpp/inlined_vector.h"
50 #include "src/core/lib/gprpp/manual_constructor.h"
51 #include "src/core/lib/gprpp/mutex_lock.h"
52 #include "src/core/lib/iomgr/combiner.h"
53 #include "src/core/lib/iomgr/iomgr.h"
54 #include "src/core/lib/iomgr/polling_entity.h"
55 #include "src/core/lib/profiling/timers.h"
56 #include "src/core/lib/slice/slice_internal.h"
57 #include "src/core/lib/slice/slice_string_helpers.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 #include "src/core/lib/transport/status_metadata.h"
69 // ResolvingLoadBalancingPolicy::ResolverResultHandler
72 class ResolvingLoadBalancingPolicy::ResolverResultHandler
73 : public Resolver::ResultHandler {
75 explicit ResolverResultHandler(
76 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
77 : parent_(std::move(parent)) {}
79 ~ResolverResultHandler() {
80 if (parent_->tracer_->enabled()) {
81 gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
86 void ReturnResult(Resolver::Result result) override {
87 parent_->OnResolverResultChangedLocked(std::move(result));
90 void ReturnError(grpc_error* error) override {
91 parent_->OnResolverError(error);
95 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
99 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
102 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
103 : public LoadBalancingPolicy::ChannelControlHelper {
105 explicit ResolvingControlHelper(
106 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
107 : parent_(std::move(parent)) {}
109 Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
110 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
111 if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
112 return parent_->channel_control_helper()->CreateSubchannel(args);
115 grpc_channel* CreateChannel(const char* target,
116 const grpc_channel_args& args) override {
117 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
118 if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
119 return parent_->channel_control_helper()->CreateChannel(target, args);
122 void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
123 UniquePtr<SubchannelPicker> picker) override {
124 if (parent_->resolver_ == nullptr) {
126 GRPC_ERROR_UNREF(state_error);
129 // If this request is from the pending child policy, ignore it until
130 // it reports READY, at which point we swap it into place.
131 if (CalledByPendingChild()) {
132 if (parent_->tracer_->enabled()) {
134 "resolving_lb=%p helper=%p: pending child policy %p reports "
136 parent_.get(), this, child_,
137 grpc_connectivity_state_name(state));
139 if (state != GRPC_CHANNEL_READY) {
140 GRPC_ERROR_UNREF(state_error);
143 grpc_pollset_set_del_pollset_set(
144 parent_->lb_policy_->interested_parties(),
145 parent_->interested_parties());
146 MutexLock lock(&parent_->lb_policy_mu_);
147 parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
148 } else if (!CalledByCurrentChild()) {
149 // This request is from an outdated child, so ignore it.
150 GRPC_ERROR_UNREF(state_error);
153 parent_->channel_control_helper()->UpdateState(state, state_error,
157 void RequestReresolution() override {
158 // If there is a pending child policy, ignore re-resolution requests
159 // from the current child policy (or any outdated child).
160 if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
163 if (parent_->tracer_->enabled()) {
164 gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
167 if (parent_->resolver_ != nullptr) {
168 parent_->resolver_->RequestReresolutionLocked();
172 void set_child(LoadBalancingPolicy* child) { child_ = child; }
175 bool CalledByPendingChild() const {
176 GPR_ASSERT(child_ != nullptr);
177 return child_ == parent_->pending_lb_policy_.get();
180 bool CalledByCurrentChild() const {
181 GPR_ASSERT(child_ != nullptr);
182 return child_ == parent_->lb_policy_.get();
185 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
186 LoadBalancingPolicy* child_ = nullptr;
190 // ResolvingLoadBalancingPolicy
193 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
194 Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
195 UniquePtr<char> child_policy_name, RefCountedPtr<Config> child_lb_config,
197 : LoadBalancingPolicy(std::move(args)),
199 target_uri_(std::move(target_uri)),
200 child_policy_name_(std::move(child_policy_name)),
201 child_lb_config_(std::move(child_lb_config)) {
202 GPR_ASSERT(child_policy_name_ != nullptr);
203 // Don't fetch service config, since this ctor is for use in nested LB
204 // policies, not at the top level, and we only fetch the service
205 // config at the top level.
206 grpc_arg arg = grpc_channel_arg_integer_create(
207 const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0);
208 grpc_channel_args* new_args =
209 grpc_channel_args_copy_and_add(args.args, &arg, 1);
210 *error = Init(*new_args);
211 grpc_channel_args_destroy(new_args);
214 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
215 Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
216 ProcessResolverResultCallback process_resolver_result,
217 void* process_resolver_result_user_data, grpc_error** error)
218 : LoadBalancingPolicy(std::move(args)),
220 target_uri_(std::move(target_uri)),
221 process_resolver_result_(process_resolver_result),
222 process_resolver_result_user_data_(process_resolver_result_user_data) {
223 GPR_ASSERT(process_resolver_result != nullptr);
224 gpr_mu_init(&lb_policy_mu_);
225 *error = Init(*args.args);
228 grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
229 resolver_ = ResolverRegistry::CreateResolver(
230 target_uri_.get(), &args, interested_parties(), combiner(),
231 UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
232 if (resolver_ == nullptr) {
233 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
235 // Return our picker to the channel.
236 channel_control_helper()->UpdateState(
237 GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
238 UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
239 return GRPC_ERROR_NONE;
242 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
243 GPR_ASSERT(resolver_ == nullptr);
244 GPR_ASSERT(lb_policy_ == nullptr);
245 gpr_mu_destroy(&lb_policy_mu_);
248 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
249 if (resolver_ != nullptr) {
251 MutexLock lock(&lb_policy_mu_);
252 if (lb_policy_ != nullptr) {
253 if (tracer_->enabled()) {
254 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
257 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
258 interested_parties());
261 if (pending_lb_policy_ != nullptr) {
262 if (tracer_->enabled()) {
263 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
264 this, pending_lb_policy_.get());
266 grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
267 interested_parties());
268 pending_lb_policy_.reset();
273 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
274 if (lb_policy_ != nullptr) {
275 lb_policy_->ExitIdleLocked();
276 if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
278 if (!started_resolving_ && resolver_ != nullptr) {
279 StartResolvingLocked();
284 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
285 if (resolver_ != nullptr) {
286 resolver_->ResetBackoffLocked();
287 resolver_->RequestReresolutionLocked();
289 if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
290 if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
293 void ResolvingLoadBalancingPolicy::FillChildRefsForChannelz(
294 channelz::ChildRefsList* child_subchannels,
295 channelz::ChildRefsList* child_channels) {
296 // Delegate to the lb_policy_ to fill the children subchannels.
297 // This must be done holding lb_policy_mu_, since this method does not
298 // run in the combiner.
299 MutexLock lock(&lb_policy_mu_);
300 if (lb_policy_ != nullptr) {
301 lb_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
303 if (pending_lb_policy_ != nullptr) {
304 pending_lb_policy_->FillChildRefsForChannelz(child_subchannels,
309 void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
310 if (tracer_->enabled()) {
311 gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
313 GPR_ASSERT(!started_resolving_);
314 started_resolving_ = true;
315 channel_control_helper()->UpdateState(
316 GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
317 UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
318 resolver_->StartLocked();
321 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
322 if (resolver_ == nullptr) {
323 GRPC_ERROR_UNREF(error);
326 if (tracer_->enabled()) {
327 gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
328 grpc_error_string(error));
330 // If we already have an LB policy from a previous resolution
331 // result, then we continue to let it set the connectivity state.
332 // Otherwise, we go into TRANSIENT_FAILURE.
333 if (lb_policy_ == nullptr) {
334 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
335 "Resolver transient failure", &error, 1);
336 channel_control_helper()->UpdateState(
337 GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(state_error),
338 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
340 GRPC_ERROR_UNREF(error);
343 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
344 const char* lb_policy_name, RefCountedPtr<Config> lb_policy_config,
345 Resolver::Result result, TraceStringVector* trace_strings) {
346 // If the child policy name changes, we need to create a new child
347 // policy. When this happens, we leave child_policy_ as-is and store
348 // the new child policy in pending_child_policy_. Once the new child
349 // policy transitions into state READY, we swap it into child_policy_,
350 // replacing the original child policy. So pending_child_policy_ is
351 // non-null only between when we apply an update that changes the child
352 // policy name and when the new child reports state READY.
354 // Updates can arrive at any point during this transition. We always
355 // apply updates relative to the most recently created child policy,
356 // even if the most recent one is still in pending_child_policy_. This
357 // is true both when applying the updates to an existing child policy
358 // and when determining whether we need to create a new policy.
360 // As a result of this, there are several cases to consider here:
362 // 1. We have no existing child policy (i.e., we have started up but
363 // have not yet received a serverlist from the balancer or gone
364 // into fallback mode; in this case, both child_policy_ and
365 // pending_child_policy_ are null). In this case, we create a
366 // new child policy and store it in child_policy_.
368 // 2. We have an existing child policy and have no pending child policy
369 // from a previous update (i.e., either there has not been a
370 // previous update that changed the policy name, or we have already
371 // finished swapping in the new policy; in this case, child_policy_
372 // is non-null but pending_child_policy_ is null). In this case:
373 // a. If child_policy_->name() equals child_policy_name, then we
374 // update the existing child policy.
375 // b. If child_policy_->name() does not equal child_policy_name,
376 // we create a new policy. The policy will be stored in
377 // pending_child_policy_ and will later be swapped into
378 // child_policy_ by the helper when the new child transitions
381 // 3. We have an existing child policy and have a pending child policy
382 // from a previous update (i.e., a previous update set
383 // pending_child_policy_ as per case 2b above and that policy has
384 // not yet transitioned into state READY and been swapped into
385 // child_policy_; in this case, both child_policy_ and
386 // pending_child_policy_ are non-null). In this case:
387 // a. If pending_child_policy_->name() equals child_policy_name,
388 // then we update the existing pending child policy.
389 // b. If pending_child_policy->name() does not equal
390 // child_policy_name, then we create a new policy. The new
391 // policy is stored in pending_child_policy_ (replacing the one
392 // that was there before, which will be immediately shut down)
393 // and will later be swapped into child_policy_ by the helper
394 // when the new child transitions into state READY.
395 const bool create_policy =
397 lb_policy_ == nullptr ||
399 (pending_lb_policy_ == nullptr &&
400 strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
402 (pending_lb_policy_ != nullptr &&
403 strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
404 LoadBalancingPolicy* policy_to_update = nullptr;
406 // Cases 1, 2b, and 3b: create a new child policy.
407 // If lb_policy_ is null, we set it (case 1), else we set
408 // pending_lb_policy_ (cases 2b and 3b).
409 if (tracer_->enabled()) {
410 gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
411 lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
414 CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
415 auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
417 MutexLock lock(&lb_policy_mu_);
418 lb_policy = std::move(new_policy);
420 policy_to_update = lb_policy.get();
422 // Cases 2a and 3a: update an existing policy.
423 // If we have a pending child policy, send the update to the pending
424 // policy (case 3a), else send it to the current policy (case 2a).
425 policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
428 GPR_ASSERT(policy_to_update != nullptr);
429 // Update the policy.
430 if (tracer_->enabled()) {
431 gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
432 policy_to_update == pending_lb_policy_.get() ? "pending " : "",
435 UpdateArgs update_args;
436 update_args.addresses = std::move(result.addresses);
437 update_args.config = std::move(lb_policy_config);
438 // TODO(roth): Once channel args is converted to C++, use std::move() here.
439 update_args.args = result.args;
440 result.args = nullptr;
441 policy_to_update->UpdateLocked(std::move(update_args));
444 // Creates a new LB policy.
445 // Updates trace_strings to indicate what was done.
446 OrphanablePtr<LoadBalancingPolicy>
447 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
448 const char* lb_policy_name, const grpc_channel_args& args,
449 TraceStringVector* trace_strings) {
450 ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
451 LoadBalancingPolicy::Args lb_policy_args;
452 lb_policy_args.combiner = combiner();
453 lb_policy_args.channel_control_helper =
454 UniquePtr<ChannelControlHelper>(helper);
455 lb_policy_args.args = &args;
456 OrphanablePtr<LoadBalancingPolicy> lb_policy =
457 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
458 lb_policy_name, std::move(lb_policy_args));
459 if (GPR_UNLIKELY(lb_policy == nullptr)) {
460 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
461 if (channelz_node() != nullptr) {
463 gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
464 trace_strings->push_back(str);
468 helper->set_child(lb_policy.get());
469 if (tracer_->enabled()) {
470 gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
471 this, lb_policy_name, lb_policy.get());
473 if (channelz_node() != nullptr) {
475 gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
476 trace_strings->push_back(str);
478 // Propagate channelz node.
479 auto* channelz = channelz_node();
480 if (channelz != nullptr) {
481 lb_policy->set_channelz_node(channelz->Ref());
483 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
484 interested_parties());
488 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
489 bool resolution_contains_addresses, TraceStringVector* trace_strings) {
490 if (!resolution_contains_addresses &&
491 previous_resolution_contained_addresses_) {
492 trace_strings->push_back(gpr_strdup("Address list became empty"));
493 } else if (resolution_contains_addresses &&
494 !previous_resolution_contained_addresses_) {
495 trace_strings->push_back(gpr_strdup("Address list became non-empty"));
497 previous_resolution_contained_addresses_ = resolution_contains_addresses;
500 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
501 TraceStringVector* trace_strings) const {
502 if (!trace_strings->empty()) {
505 gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
507 for (size_t i = 0; i < trace_strings->size(); ++i) {
508 if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
510 gpr_strvec_add(&v, (*trace_strings)[i]);
514 flat = gpr_strvec_flatten(&v, &flat_len);
515 channelz_node()->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
516 grpc_slice_new(flat, flat_len, gpr_free));
517 gpr_strvec_destroy(&v);
521 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
522 Resolver::Result result) {
523 // Handle race conditions.
524 if (resolver_ == nullptr) return;
525 if (tracer_->enabled()) {
526 gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
528 // We only want to trace the address resolution in the follow cases:
529 // (a) Address resolution resulted in service config change.
530 // (b) Address resolution that causes number of backends to go from
532 // (c) Address resolution that causes number of backends to go from
534 // (d) Address resolution that causes a new LB policy to be created.
536 // We track a list of strings to eventually be concatenated and traced.
537 TraceStringVector trace_strings;
538 const bool resolution_contains_addresses = result.addresses.size() > 0;
539 // Process the resolver result.
540 const char* lb_policy_name = nullptr;
541 RefCountedPtr<Config> lb_policy_config;
542 bool service_config_changed = false;
543 if (process_resolver_result_ != nullptr) {
544 service_config_changed =
545 process_resolver_result_(process_resolver_result_user_data_, &result,
546 &lb_policy_name, &lb_policy_config);
548 lb_policy_name = child_policy_name_.get();
549 lb_policy_config = child_lb_config_;
551 GPR_ASSERT(lb_policy_name != nullptr);
552 // Create or update LB policy, as needed.
553 CreateOrUpdateLbPolicyLocked(lb_policy_name, std::move(lb_policy_config),
554 std::move(result), &trace_strings);
555 // Add channel trace event.
556 if (channelz_node() != nullptr) {
557 if (service_config_changed) {
558 // TODO(ncteisen): might be worth somehow including a snippet of the
559 // config in the trace, at the risk of bloating the trace logs.
560 trace_strings.push_back(gpr_strdup("Service config changed"));
562 MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
564 ConcatenateAndAddChannelTraceLocked(&trace_strings);
568 } // namespace grpc_core