3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
40 #include "src/core/ext/filters/client_channel/retry_throttle.h"
41 #include "src/core/ext/filters/client_channel/server_address.h"
42 #include "src/core/ext/filters/client_channel/service_config.h"
43 #include "src/core/ext/filters/client_channel/subchannel.h"
44 #include "src/core/ext/filters/deadline/deadline_filter.h"
45 #include "src/core/lib/backoff/backoff.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/channel/connected_channel.h"
48 #include "src/core/lib/channel/status_util.h"
49 #include "src/core/lib/gpr/string.h"
50 #include "src/core/lib/gprpp/inlined_vector.h"
51 #include "src/core/lib/gprpp/manual_constructor.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/iomgr/combiner.h"
54 #include "src/core/lib/iomgr/iomgr.h"
55 #include "src/core/lib/iomgr/polling_entity.h"
56 #include "src/core/lib/profiling/timers.h"
57 #include "src/core/lib/slice/slice_internal.h"
58 #include "src/core/lib/slice/slice_string_helpers.h"
59 #include "src/core/lib/surface/channel.h"
60 #include "src/core/lib/transport/connectivity_state.h"
61 #include "src/core/lib/transport/error_utils.h"
62 #include "src/core/lib/transport/metadata.h"
63 #include "src/core/lib/transport/metadata_batch.h"
64 #include "src/core/lib/transport/static_metadata.h"
65 #include "src/core/lib/transport/status_metadata.h"
70 // ResolvingLoadBalancingPolicy::ResolverResultHandler
73 class ResolvingLoadBalancingPolicy::ResolverResultHandler
74 : public Resolver::ResultHandler {
76 explicit ResolverResultHandler(
77 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
78 : parent_(std::move(parent)) {}
80 ~ResolverResultHandler() {
81 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
82 gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
87 void ReturnResult(Resolver::Result result) override {
88 parent_->OnResolverResultChangedLocked(std::move(result));
91 void ReturnError(grpc_error* error) override {
92 parent_->OnResolverError(error);
96 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
100 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
103 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
104 : public LoadBalancingPolicy::ChannelControlHelper {
106 explicit ResolvingControlHelper(
107 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
108 : parent_(std::move(parent)) {}
110 RefCountedPtr<SubchannelInterface> CreateSubchannel(
111 const grpc_channel_args& args) override {
112 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
113 return parent_->channel_control_helper()->CreateSubchannel(args);
116 void UpdateState(grpc_connectivity_state state,
117 std::unique_ptr<SubchannelPicker> picker) override {
118 if (parent_->resolver_ == nullptr) return; // Shutting down.
119 parent_->channel_control_helper()->UpdateState(state, std::move(picker));
122 void RequestReresolution() override {
123 if (parent_->resolver_ == nullptr) return; // Shutting down.
124 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
125 gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
128 parent_->resolver_->RequestReresolutionLocked();
131 void AddTraceEvent(TraceSeverity severity, StringView message) override {
132 if (parent_->resolver_ == nullptr) return; // Shutting down.
133 parent_->channel_control_helper()->AddTraceEvent(severity, message);
137 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
141 // ResolvingLoadBalancingPolicy
144 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
145 Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
146 ProcessResolverResultCallback process_resolver_result,
147 void* process_resolver_result_user_data)
148 : LoadBalancingPolicy(std::move(args)),
150 target_uri_(std::move(target_uri)),
151 process_resolver_result_(process_resolver_result),
152 process_resolver_result_user_data_(process_resolver_result_user_data) {
153 GPR_ASSERT(process_resolver_result != nullptr);
154 resolver_ = ResolverRegistry::CreateResolver(
155 target_uri_.get(), args.args, interested_parties(), combiner(),
156 absl::make_unique<ResolverResultHandler>(Ref()));
157 // Since the validity of args has been checked when create the channel,
158 // CreateResolver() must return a non-null result.
159 GPR_ASSERT(resolver_ != nullptr);
160 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
161 gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
163 channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING,
164 absl::make_unique<QueuePicker>(Ref()));
165 resolver_->StartLocked();
168 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
169 GPR_ASSERT(resolver_ == nullptr);
170 GPR_ASSERT(lb_policy_ == nullptr);
173 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
174 if (resolver_ != nullptr) {
176 if (lb_policy_ != nullptr) {
177 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
178 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
181 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
182 interested_parties());
188 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
189 if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
192 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
193 if (resolver_ != nullptr) {
194 resolver_->ResetBackoffLocked();
195 resolver_->RequestReresolutionLocked();
197 if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
200 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
201 if (resolver_ == nullptr) {
202 GRPC_ERROR_UNREF(error);
205 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
206 gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
207 grpc_error_string(error));
209 // If we already have an LB policy from a previous resolution
210 // result, then we continue to let it set the connectivity state.
211 // Otherwise, we go into TRANSIENT_FAILURE.
212 if (lb_policy_ == nullptr) {
213 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
214 "Resolver transient failure", &error, 1);
215 channel_control_helper()->UpdateState(
216 GRPC_CHANNEL_TRANSIENT_FAILURE,
217 absl::make_unique<TransientFailurePicker>(state_error));
219 GRPC_ERROR_UNREF(error);
222 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
223 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
224 Resolver::Result result) {
226 UpdateArgs update_args;
227 update_args.addresses = std::move(result.addresses);
228 update_args.config = std::move(lb_policy_config);
229 // TODO(roth): Once channel args is converted to C++, use std::move() here.
230 update_args.args = result.args;
231 result.args = nullptr;
232 // Create policy if needed.
233 if (lb_policy_ == nullptr) {
234 lb_policy_ = CreateLbPolicyLocked(*update_args.args);
236 // Update the policy.
237 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
238 gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
241 lb_policy_->UpdateLocked(std::move(update_args));
244 // Creates a new LB policy.
245 // Updates trace_strings to indicate what was done.
246 OrphanablePtr<LoadBalancingPolicy>
247 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
248 const grpc_channel_args& args) {
249 LoadBalancingPolicy::Args lb_policy_args;
250 lb_policy_args.combiner = combiner();
251 lb_policy_args.channel_control_helper =
252 absl::make_unique<ResolvingControlHelper>(Ref());
253 lb_policy_args.args = &args;
254 OrphanablePtr<LoadBalancingPolicy> lb_policy =
255 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
256 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
257 gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
260 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
261 interested_parties());
265 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
266 bool resolution_contains_addresses, TraceStringVector* trace_strings) {
267 if (!resolution_contains_addresses &&
268 previous_resolution_contained_addresses_) {
269 trace_strings->push_back(gpr_strdup("Address list became empty"));
270 } else if (resolution_contains_addresses &&
271 !previous_resolution_contained_addresses_) {
272 trace_strings->push_back(gpr_strdup("Address list became non-empty"));
274 previous_resolution_contained_addresses_ = resolution_contains_addresses;
277 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
278 TraceStringVector* trace_strings) const {
279 if (!trace_strings->empty()) {
282 gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
284 for (size_t i = 0; i < trace_strings->size(); ++i) {
285 if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
287 gpr_strvec_add(&v, (*trace_strings)[i]);
290 grpc_core::UniquePtr<char> message(gpr_strvec_flatten(&v, &len));
291 channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
292 StringView(message.get()));
293 gpr_strvec_destroy(&v);
297 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
298 Resolver::Result result) {
299 // Handle race conditions.
300 if (resolver_ == nullptr) return;
301 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
302 gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
304 // We only want to trace the address resolution in the follow cases:
305 // (a) Address resolution resulted in service config change.
306 // (b) Address resolution that causes number of backends to go from
308 // (c) Address resolution that causes number of backends to go from
310 // (d) Address resolution that causes a new LB policy to be created.
312 // We track a list of strings to eventually be concatenated and traced.
313 TraceStringVector trace_strings;
314 const bool resolution_contains_addresses = result.addresses.size() > 0;
315 // Process the resolver result.
316 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
317 bool service_config_changed = false;
318 char* service_config_error_string = nullptr;
319 if (process_resolver_result_ != nullptr) {
320 grpc_error* service_config_error = GRPC_ERROR_NONE;
321 bool no_valid_service_config = false;
322 service_config_changed = process_resolver_result_(
323 process_resolver_result_user_data_, result, &lb_policy_config,
324 &service_config_error, &no_valid_service_config);
325 if (service_config_error != GRPC_ERROR_NONE) {
326 service_config_error_string =
327 gpr_strdup(grpc_error_string(service_config_error));
328 if (no_valid_service_config) {
329 // We received an invalid service config and we don't have a
330 // fallback service config.
331 OnResolverError(service_config_error);
333 GRPC_ERROR_UNREF(service_config_error);
337 lb_policy_config = child_lb_config_;
339 if (lb_policy_config != nullptr) {
340 // Create or update LB policy, as needed.
341 CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
344 // Add channel trace event.
345 if (service_config_changed) {
346 // TODO(ncteisen): might be worth somehow including a snippet of the
347 // config in the trace, at the risk of bloating the trace logs.
348 trace_strings.push_back(gpr_strdup("Service config changed"));
350 if (service_config_error_string != nullptr) {
351 trace_strings.push_back(service_config_error_string);
352 service_config_error_string = nullptr;
354 MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
356 ConcatenateAndAddChannelTraceLocked(&trace_strings);
359 } // namespace grpc_core