3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_join.h"
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/sync.h>
37 #include "src/core/ext/filters/client_channel/backup_poller.h"
38 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
39 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
40 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
41 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
42 #include "src/core/ext/filters/client_channel/resolver_registry.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/server_address.h"
45 #include "src/core/ext/filters/client_channel/service_config.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/filters/deadline/deadline_filter.h"
48 #include "src/core/lib/backoff/backoff.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/connected_channel.h"
51 #include "src/core/lib/channel/status_util.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/iomgr.h"
56 #include "src/core/lib/iomgr/polling_entity.h"
57 #include "src/core/lib/profiling/timers.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/transport/connectivity_state.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/static_metadata.h"
66 #include "src/core/lib/transport/status_metadata.h"
71 // ResolvingLoadBalancingPolicy::ResolverResultHandler
74 class ResolvingLoadBalancingPolicy::ResolverResultHandler
75 : public Resolver::ResultHandler {
77 explicit ResolverResultHandler(
78 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
79 : parent_(std::move(parent)) {}
81 ~ResolverResultHandler() override {
82 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
83 gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
88 void ReturnResult(Resolver::Result result) override {
89 parent_->OnResolverResultChangedLocked(std::move(result));
92 void ReturnError(grpc_error* error) override {
93 parent_->OnResolverError(error);
97 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
101 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
104 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
105 : public LoadBalancingPolicy::ChannelControlHelper {
107 explicit ResolvingControlHelper(
108 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
109 : parent_(std::move(parent)) {}
111 RefCountedPtr<SubchannelInterface> CreateSubchannel(
112 ServerAddress address, const grpc_channel_args& args) override {
113 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
114 return parent_->channel_control_helper()->CreateSubchannel(
115 std::move(address), args);
118 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
119 std::unique_ptr<SubchannelPicker> picker) override {
120 if (parent_->resolver_ == nullptr) return; // Shutting down.
121 parent_->channel_control_helper()->UpdateState(state, status,
125 void RequestReresolution() override {
126 if (parent_->resolver_ == nullptr) return; // Shutting down.
127 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
128 gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
131 parent_->resolver_->RequestReresolutionLocked();
134 void AddTraceEvent(TraceSeverity severity,
135 absl::string_view message) override {
136 if (parent_->resolver_ == nullptr) return; // Shutting down.
137 parent_->channel_control_helper()->AddTraceEvent(severity, message);
141 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
145 // ResolvingLoadBalancingPolicy
148 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
149 Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
150 ChannelConfigHelper* helper)
151 : LoadBalancingPolicy(std::move(args)),
153 target_uri_(std::move(target_uri)),
155 GPR_ASSERT(helper_ != nullptr);
156 resolver_ = ResolverRegistry::CreateResolver(
157 target_uri_.get(), args.args, interested_parties(), work_serializer(),
158 absl::make_unique<ResolverResultHandler>(Ref()));
159 // Since the validity of args has been checked when create the channel,
160 // CreateResolver() must return a non-null result.
161 GPR_ASSERT(resolver_ != nullptr);
162 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
163 gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
165 channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
166 absl::make_unique<QueuePicker>(Ref()));
167 resolver_->StartLocked();
170 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
171 GPR_ASSERT(resolver_ == nullptr);
172 GPR_ASSERT(lb_policy_ == nullptr);
175 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
176 if (resolver_ != nullptr) {
177 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
178 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this,
182 if (lb_policy_ != nullptr) {
183 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
184 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
187 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
188 interested_parties());
194 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
195 if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
198 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
199 if (resolver_ != nullptr) {
200 resolver_->ResetBackoffLocked();
201 resolver_->RequestReresolutionLocked();
203 if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
206 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
207 if (resolver_ == nullptr) {
208 GRPC_ERROR_UNREF(error);
211 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
212 gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
213 grpc_error_string(error));
215 // If we already have an LB policy from a previous resolution
216 // result, then we continue to let it set the connectivity state.
217 // Otherwise, we go into TRANSIENT_FAILURE.
218 if (lb_policy_ == nullptr) {
219 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
220 "Resolver transient failure", &error, 1);
221 helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error));
222 channel_control_helper()->UpdateState(
223 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
224 absl::make_unique<TransientFailurePicker>(state_error));
226 GRPC_ERROR_UNREF(error);
229 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
230 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
231 Resolver::Result result) {
233 UpdateArgs update_args;
234 update_args.addresses = std::move(result.addresses);
235 update_args.config = std::move(lb_policy_config);
236 // Remove the config selector from channel args so that we're not holding
237 // unnecessary refs that cause it to be destroyed somewhere other than in the
239 const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
241 grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
242 // Create policy if needed.
243 if (lb_policy_ == nullptr) {
244 lb_policy_ = CreateLbPolicyLocked(*update_args.args);
246 // Update the policy.
247 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
248 gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
251 lb_policy_->UpdateLocked(std::move(update_args));
254 // Creates a new LB policy.
255 OrphanablePtr<LoadBalancingPolicy>
256 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
257 const grpc_channel_args& args) {
258 LoadBalancingPolicy::Args lb_policy_args;
259 lb_policy_args.work_serializer = work_serializer();
260 lb_policy_args.channel_control_helper =
261 absl::make_unique<ResolvingControlHelper>(Ref());
262 lb_policy_args.args = &args;
263 OrphanablePtr<LoadBalancingPolicy> lb_policy =
264 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
265 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
266 gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
269 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
270 interested_parties());
274 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
275 bool resolution_contains_addresses, TraceStringVector* trace_strings) {
276 if (!resolution_contains_addresses &&
277 previous_resolution_contained_addresses_) {
278 trace_strings->push_back("Address list became empty");
279 } else if (resolution_contains_addresses &&
280 !previous_resolution_contained_addresses_) {
281 trace_strings->push_back("Address list became non-empty");
283 previous_resolution_contained_addresses_ = resolution_contains_addresses;
286 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
287 const TraceStringVector& trace_strings) const {
288 if (!trace_strings.empty()) {
289 std::string message =
290 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
291 channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
296 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
297 Resolver::Result result) {
298 // Handle race conditions.
299 if (resolver_ == nullptr) return;
300 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
301 gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
303 // We only want to trace the address resolution in the follow cases:
304 // (a) Address resolution resulted in service config change.
305 // (b) Address resolution that causes number of backends to go from
307 // (c) Address resolution that causes number of backends to go from
309 // (d) Address resolution that causes a new LB policy to be created.
311 // We track a list of strings to eventually be concatenated and traced.
312 TraceStringVector trace_strings;
313 MaybeAddTraceMessagesForAddressChangesLocked(!result.addresses.empty(),
315 // The result of grpc_error_string() is owned by the error itself.
316 // We're storing that string in trace_strings, so we need to make sure
317 // that the error lives until we're done with the string.
318 grpc_error* service_config_error =
319 GRPC_ERROR_REF(result.service_config_error);
320 if (service_config_error != GRPC_ERROR_NONE) {
321 trace_strings.push_back(grpc_error_string(service_config_error));
323 // Choose the service config.
324 ChannelConfigHelper::ChooseServiceConfigResult service_config_result;
325 if (helper_ != nullptr) {
326 service_config_result = helper_->ChooseServiceConfig(result);
328 service_config_result.lb_policy_config = child_lb_config_;
330 if (service_config_result.no_valid_service_config) {
331 // We received an invalid service config and we don't have a
332 // previous service config to fall back to.
333 OnResolverError(GRPC_ERROR_REF(service_config_error));
334 trace_strings.push_back("no valid service config");
336 // Create or update LB policy, as needed.
337 CreateOrUpdateLbPolicyLocked(
338 std::move(service_config_result.lb_policy_config), std::move(result));
339 if (service_config_result.service_config_changed) {
340 // Tell channel to start using new service config for calls.
341 // This needs to happen after the LB policy has been updated, since
342 // the ConfigSelector may need the LB policy to know about new
343 // destinations before it can send RPCs to those destinations.
344 if (helper_ != nullptr) helper_->StartUsingServiceConfigForCalls();
345 // TODO(ncteisen): might be worth somehow including a snippet of the
346 // config in the trace, at the risk of bloating the trace logs.
347 trace_strings.push_back("Service config changed");
350 // Add channel trace event.
351 ConcatenateAndAddChannelTraceLocked(trace_strings);
352 GRPC_ERROR_UNREF(service_config_error);
355 } // namespace grpc_core