Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / resolving_lb_policy.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_join.h"
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/sync.h>
36
37 #include "src/core/ext/filters/client_channel/backup_poller.h"
38 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
39 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
40 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
41 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
42 #include "src/core/ext/filters/client_channel/resolver_registry.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/server_address.h"
45 #include "src/core/ext/filters/client_channel/service_config.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/filters/deadline/deadline_filter.h"
48 #include "src/core/lib/backoff/backoff.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/connected_channel.h"
51 #include "src/core/lib/channel/status_util.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/iomgr.h"
56 #include "src/core/lib/iomgr/polling_entity.h"
57 #include "src/core/lib/profiling/timers.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/transport/connectivity_state.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/static_metadata.h"
66 #include "src/core/lib/transport/status_metadata.h"
67
68 namespace grpc_core {
69
70 //
71 // ResolvingLoadBalancingPolicy::ResolverResultHandler
72 //
73
74 class ResolvingLoadBalancingPolicy::ResolverResultHandler
75     : public Resolver::ResultHandler {
76  public:
77   explicit ResolverResultHandler(
78       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
79       : parent_(std::move(parent)) {}
80
81   ~ResolverResultHandler() override {
82     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
83       gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
84               parent_.get());
85     }
86   }
87
88   void ReturnResult(Resolver::Result result) override {
89     parent_->OnResolverResultChangedLocked(std::move(result));
90   }
91
92   void ReturnError(grpc_error* error) override {
93     parent_->OnResolverError(error);
94   }
95
96  private:
97   RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
98 };
99
100 //
101 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
102 //
103
104 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
105     : public LoadBalancingPolicy::ChannelControlHelper {
106  public:
107   explicit ResolvingControlHelper(
108       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
109       : parent_(std::move(parent)) {}
110
111   RefCountedPtr<SubchannelInterface> CreateSubchannel(
112       ServerAddress address, const grpc_channel_args& args) override {
113     if (parent_->resolver_ == nullptr) return nullptr;  // Shutting down.
114     return parent_->channel_control_helper()->CreateSubchannel(
115         std::move(address), args);
116   }
117
118   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
119                    std::unique_ptr<SubchannelPicker> picker) override {
120     if (parent_->resolver_ == nullptr) return;  // Shutting down.
121     parent_->channel_control_helper()->UpdateState(state, status,
122                                                    std::move(picker));
123   }
124
125   void RequestReresolution() override {
126     if (parent_->resolver_ == nullptr) return;  // Shutting down.
127     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
128       gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
129               parent_.get());
130     }
131     parent_->resolver_->RequestReresolutionLocked();
132   }
133
134   void AddTraceEvent(TraceSeverity severity,
135                      absl::string_view message) override {
136     if (parent_->resolver_ == nullptr) return;  // Shutting down.
137     parent_->channel_control_helper()->AddTraceEvent(severity, message);
138   }
139
140  private:
141   RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
142 };
143
144 //
145 // ResolvingLoadBalancingPolicy
146 //
147
148 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
149     Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
150     ChannelConfigHelper* helper)
151     : LoadBalancingPolicy(std::move(args)),
152       tracer_(tracer),
153       target_uri_(std::move(target_uri)),
154       helper_(helper) {
155   GPR_ASSERT(helper_ != nullptr);
156   resolver_ = ResolverRegistry::CreateResolver(
157       target_uri_.get(), args.args, interested_parties(), work_serializer(),
158       absl::make_unique<ResolverResultHandler>(Ref()));
159   // Since the validity of args has been checked when create the channel,
160   // CreateResolver() must return a non-null result.
161   GPR_ASSERT(resolver_ != nullptr);
162   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
163     gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
164   }
165   channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
166                                         absl::make_unique<QueuePicker>(Ref()));
167   resolver_->StartLocked();
168 }
169
170 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
171   GPR_ASSERT(resolver_ == nullptr);
172   GPR_ASSERT(lb_policy_ == nullptr);
173 }
174
175 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
176   if (resolver_ != nullptr) {
177     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
178       gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this,
179               resolver_.get());
180     }
181     resolver_.reset();
182     if (lb_policy_ != nullptr) {
183       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
184         gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
185                 lb_policy_.get());
186       }
187       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
188                                        interested_parties());
189       lb_policy_.reset();
190     }
191   }
192 }
193
194 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
195   if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
196 }
197
198 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
199   if (resolver_ != nullptr) {
200     resolver_->ResetBackoffLocked();
201     resolver_->RequestReresolutionLocked();
202   }
203   if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
204 }
205
206 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
207   if (resolver_ == nullptr) {
208     GRPC_ERROR_UNREF(error);
209     return;
210   }
211   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
212     gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
213             grpc_error_string(error));
214   }
215   // If we already have an LB policy from a previous resolution
216   // result, then we continue to let it set the connectivity state.
217   // Otherwise, we go into TRANSIENT_FAILURE.
218   if (lb_policy_ == nullptr) {
219     grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
220         "Resolver transient failure", &error, 1);
221     helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error));
222     channel_control_helper()->UpdateState(
223         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
224         absl::make_unique<TransientFailurePicker>(state_error));
225   }
226   GRPC_ERROR_UNREF(error);
227 }
228
229 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
230     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
231     Resolver::Result result) {
232   // Construct update.
233   UpdateArgs update_args;
234   update_args.addresses = std::move(result.addresses);
235   update_args.config = std::move(lb_policy_config);
236   // Remove the config selector from channel args so that we're not holding
237   // unnecessary refs that cause it to be destroyed somewhere other than in the
238   // WorkSerializer.
239   const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
240   update_args.args =
241       grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
242   // Create policy if needed.
243   if (lb_policy_ == nullptr) {
244     lb_policy_ = CreateLbPolicyLocked(*update_args.args);
245   }
246   // Update the policy.
247   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
248     gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
249             lb_policy_.get());
250   }
251   lb_policy_->UpdateLocked(std::move(update_args));
252 }
253
254 // Creates a new LB policy.
255 OrphanablePtr<LoadBalancingPolicy>
256 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
257     const grpc_channel_args& args) {
258   LoadBalancingPolicy::Args lb_policy_args;
259   lb_policy_args.work_serializer = work_serializer();
260   lb_policy_args.channel_control_helper =
261       absl::make_unique<ResolvingControlHelper>(Ref());
262   lb_policy_args.args = &args;
263   OrphanablePtr<LoadBalancingPolicy> lb_policy =
264       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
265   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
266     gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
267             lb_policy.get());
268   }
269   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
270                                    interested_parties());
271   return lb_policy;
272 }
273
274 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
275     bool resolution_contains_addresses, TraceStringVector* trace_strings) {
276   if (!resolution_contains_addresses &&
277       previous_resolution_contained_addresses_) {
278     trace_strings->push_back("Address list became empty");
279   } else if (resolution_contains_addresses &&
280              !previous_resolution_contained_addresses_) {
281     trace_strings->push_back("Address list became non-empty");
282   }
283   previous_resolution_contained_addresses_ = resolution_contains_addresses;
284 }
285
286 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
287     const TraceStringVector& trace_strings) const {
288   if (!trace_strings.empty()) {
289     std::string message =
290         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
291     channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
292                                             message);
293   }
294 }
295
296 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
297     Resolver::Result result) {
298   // Handle race conditions.
299   if (resolver_ == nullptr) return;
300   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
301     gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
302   }
303   // We only want to trace the address resolution in the follow cases:
304   // (a) Address resolution resulted in service config change.
305   // (b) Address resolution that causes number of backends to go from
306   //     zero to non-zero.
307   // (c) Address resolution that causes number of backends to go from
308   //     non-zero to zero.
309   // (d) Address resolution that causes a new LB policy to be created.
310   //
311   // We track a list of strings to eventually be concatenated and traced.
312   TraceStringVector trace_strings;
313   MaybeAddTraceMessagesForAddressChangesLocked(!result.addresses.empty(),
314                                                &trace_strings);
315   // The result of grpc_error_string() is owned by the error itself.
316   // We're storing that string in trace_strings, so we need to make sure
317   // that the error lives until we're done with the string.
318   grpc_error* service_config_error =
319       GRPC_ERROR_REF(result.service_config_error);
320   if (service_config_error != GRPC_ERROR_NONE) {
321     trace_strings.push_back(grpc_error_string(service_config_error));
322   }
323   // Choose the service config.
324   ChannelConfigHelper::ChooseServiceConfigResult service_config_result;
325   if (helper_ != nullptr) {
326     service_config_result = helper_->ChooseServiceConfig(result);
327   } else {
328     service_config_result.lb_policy_config = child_lb_config_;
329   }
330   if (service_config_result.no_valid_service_config) {
331     // We received an invalid service config and we don't have a
332     // previous service config to fall back to.
333     OnResolverError(GRPC_ERROR_REF(service_config_error));
334     trace_strings.push_back("no valid service config");
335   } else {
336     // Create or update LB policy, as needed.
337     CreateOrUpdateLbPolicyLocked(
338         std::move(service_config_result.lb_policy_config), std::move(result));
339     if (service_config_result.service_config_changed) {
340       // Tell channel to start using new service config for calls.
341       // This needs to happen after the LB policy has been updated, since
342       // the ConfigSelector may need the LB policy to know about new
343       // destinations before it can send RPCs to those destinations.
344       if (helper_ != nullptr) helper_->StartUsingServiceConfigForCalls();
345       // TODO(ncteisen): might be worth somehow including a snippet of the
346       // config in the trace, at the risk of bloating the trace logs.
347       trace_strings.push_back("Service config changed");
348     }
349   }
350   // Add channel trace event.
351   ConcatenateAndAddChannelTraceLocked(trace_strings);
352   GRPC_ERROR_UNREF(service_config_error);
353 }
354
355 }  // namespace grpc_core