3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_join.h"
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/sync.h>
37 #include "src/core/ext/filters/client_channel/backup_poller.h"
38 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
39 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
40 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
41 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
42 #include "src/core/ext/filters/client_channel/resolver_registry.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/server_address.h"
45 #include "src/core/ext/filters/client_channel/service_config.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/filters/deadline/deadline_filter.h"
48 #include "src/core/lib/backoff/backoff.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/connected_channel.h"
51 #include "src/core/lib/channel/status_util.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/iomgr.h"
56 #include "src/core/lib/iomgr/polling_entity.h"
57 #include "src/core/lib/profiling/timers.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/transport/connectivity_state.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/static_metadata.h"
66 #include "src/core/lib/transport/status_metadata.h"
71 // ResolvingLoadBalancingPolicy::ResolverResultHandler
74 class ResolvingLoadBalancingPolicy::ResolverResultHandler
75 : public Resolver::ResultHandler {
77 explicit ResolverResultHandler(
78 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
79 : parent_(std::move(parent)) {}
81 ~ResolverResultHandler() {
82 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
83 gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
88 void ReturnResult(Resolver::Result result) override {
89 parent_->OnResolverResultChangedLocked(std::move(result));
92 void ReturnError(grpc_error* error) override {
93 parent_->OnResolverError(error);
97 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
101 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
104 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
105 : public LoadBalancingPolicy::ChannelControlHelper {
107 explicit ResolvingControlHelper(
108 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
109 : parent_(std::move(parent)) {}
111 RefCountedPtr<SubchannelInterface> CreateSubchannel(
112 const grpc_channel_args& args) override {
113 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
114 return parent_->channel_control_helper()->CreateSubchannel(args);
117 void UpdateState(grpc_connectivity_state state,
118 std::unique_ptr<SubchannelPicker> picker) override {
119 if (parent_->resolver_ == nullptr) return; // Shutting down.
120 parent_->channel_control_helper()->UpdateState(state, std::move(picker));
123 void RequestReresolution() override {
124 if (parent_->resolver_ == nullptr) return; // Shutting down.
125 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
126 gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
129 parent_->resolver_->RequestReresolutionLocked();
132 void AddTraceEvent(TraceSeverity severity,
133 absl::string_view message) override {
134 if (parent_->resolver_ == nullptr) return; // Shutting down.
135 parent_->channel_control_helper()->AddTraceEvent(severity, message);
139 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
143 // ResolvingLoadBalancingPolicy
146 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
147 Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
148 ProcessResolverResultCallback process_resolver_result,
149 void* process_resolver_result_user_data)
150 : LoadBalancingPolicy(std::move(args)),
152 target_uri_(std::move(target_uri)),
153 process_resolver_result_(process_resolver_result),
154 process_resolver_result_user_data_(process_resolver_result_user_data) {
155 GPR_ASSERT(process_resolver_result != nullptr);
156 resolver_ = ResolverRegistry::CreateResolver(
157 target_uri_.get(), args.args, interested_parties(), work_serializer(),
158 absl::make_unique<ResolverResultHandler>(Ref()));
159 // Since the validity of args has been checked when create the channel,
160 // CreateResolver() must return a non-null result.
161 GPR_ASSERT(resolver_ != nullptr);
162 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
163 gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
165 channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING,
166 absl::make_unique<QueuePicker>(Ref()));
167 resolver_->StartLocked();
170 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
171 GPR_ASSERT(resolver_ == nullptr);
172 GPR_ASSERT(lb_policy_ == nullptr);
175 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
176 if (resolver_ != nullptr) {
178 if (lb_policy_ != nullptr) {
179 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
180 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
183 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
184 interested_parties());
190 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
191 if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
194 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
195 if (resolver_ != nullptr) {
196 resolver_->ResetBackoffLocked();
197 resolver_->RequestReresolutionLocked();
199 if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
202 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
203 if (resolver_ == nullptr) {
204 GRPC_ERROR_UNREF(error);
207 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
208 gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
209 grpc_error_string(error));
211 // If we already have an LB policy from a previous resolution
212 // result, then we continue to let it set the connectivity state.
213 // Otherwise, we go into TRANSIENT_FAILURE.
214 if (lb_policy_ == nullptr) {
215 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
216 "Resolver transient failure", &error, 1);
217 channel_control_helper()->UpdateState(
218 GRPC_CHANNEL_TRANSIENT_FAILURE,
219 absl::make_unique<TransientFailurePicker>(state_error));
221 GRPC_ERROR_UNREF(error);
224 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
225 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
226 Resolver::Result result) {
228 UpdateArgs update_args;
229 update_args.addresses = std::move(result.addresses);
230 update_args.config = std::move(lb_policy_config);
231 // TODO(roth): Once channel args is converted to C++, use std::move() here.
232 update_args.args = result.args;
233 result.args = nullptr;
234 // Create policy if needed.
235 if (lb_policy_ == nullptr) {
236 lb_policy_ = CreateLbPolicyLocked(*update_args.args);
238 // Update the policy.
239 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
240 gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
243 lb_policy_->UpdateLocked(std::move(update_args));
246 // Creates a new LB policy.
247 OrphanablePtr<LoadBalancingPolicy>
248 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
249 const grpc_channel_args& args) {
250 LoadBalancingPolicy::Args lb_policy_args;
251 lb_policy_args.work_serializer = work_serializer();
252 lb_policy_args.channel_control_helper =
253 absl::make_unique<ResolvingControlHelper>(Ref());
254 lb_policy_args.args = &args;
255 OrphanablePtr<LoadBalancingPolicy> lb_policy =
256 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
257 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
258 gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
261 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
262 interested_parties());
266 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
267 bool resolution_contains_addresses, TraceStringVector* trace_strings) {
268 if (!resolution_contains_addresses &&
269 previous_resolution_contained_addresses_) {
270 trace_strings->push_back("Address list became empty");
271 } else if (resolution_contains_addresses &&
272 !previous_resolution_contained_addresses_) {
273 trace_strings->push_back("Address list became non-empty");
275 previous_resolution_contained_addresses_ = resolution_contains_addresses;
278 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
279 const TraceStringVector& trace_strings) const {
280 if (!trace_strings.empty()) {
281 std::string message =
282 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
283 channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
288 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
289 Resolver::Result result) {
290 // Handle race conditions.
291 if (resolver_ == nullptr) return;
292 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
293 gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
295 // We only want to trace the address resolution in the follow cases:
296 // (a) Address resolution resulted in service config change.
297 // (b) Address resolution that causes number of backends to go from
299 // (c) Address resolution that causes number of backends to go from
301 // (d) Address resolution that causes a new LB policy to be created.
303 // We track a list of strings to eventually be concatenated and traced.
304 TraceStringVector trace_strings;
305 const bool resolution_contains_addresses = result.addresses.size() > 0;
306 // Process the resolver result.
307 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
308 bool service_config_changed = false;
309 std::string service_config_error_string;
310 if (process_resolver_result_ != nullptr) {
311 grpc_error* service_config_error = GRPC_ERROR_NONE;
312 bool no_valid_service_config = false;
313 service_config_changed = process_resolver_result_(
314 process_resolver_result_user_data_, result, &lb_policy_config,
315 &service_config_error, &no_valid_service_config);
316 if (service_config_error != GRPC_ERROR_NONE) {
317 service_config_error_string = grpc_error_string(service_config_error);
318 if (no_valid_service_config) {
319 // We received an invalid service config and we don't have a
320 // fallback service config.
321 OnResolverError(service_config_error);
323 GRPC_ERROR_UNREF(service_config_error);
327 lb_policy_config = child_lb_config_;
329 if (lb_policy_config != nullptr) {
330 // Create or update LB policy, as needed.
331 CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
334 // Add channel trace event.
335 if (service_config_changed) {
336 // TODO(ncteisen): might be worth somehow including a snippet of the
337 // config in the trace, at the risk of bloating the trace logs.
338 trace_strings.push_back("Service config changed");
340 if (!service_config_error_string.empty()) {
341 trace_strings.push_back(service_config_error_string.c_str());
343 MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
345 ConcatenateAndAddChannelTraceLocked(trace_strings);
348 } // namespace grpc_core