Imported Upstream version 1.28.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / resolving_lb_policy.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
40 #include "src/core/ext/filters/client_channel/retry_throttle.h"
41 #include "src/core/ext/filters/client_channel/server_address.h"
42 #include "src/core/ext/filters/client_channel/service_config.h"
43 #include "src/core/ext/filters/client_channel/subchannel.h"
44 #include "src/core/ext/filters/deadline/deadline_filter.h"
45 #include "src/core/lib/backoff/backoff.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/channel/connected_channel.h"
48 #include "src/core/lib/channel/status_util.h"
49 #include "src/core/lib/gpr/string.h"
50 #include "src/core/lib/gprpp/inlined_vector.h"
51 #include "src/core/lib/gprpp/manual_constructor.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/iomgr/combiner.h"
54 #include "src/core/lib/iomgr/iomgr.h"
55 #include "src/core/lib/iomgr/polling_entity.h"
56 #include "src/core/lib/profiling/timers.h"
57 #include "src/core/lib/slice/slice_internal.h"
58 #include "src/core/lib/slice/slice_string_helpers.h"
59 #include "src/core/lib/surface/channel.h"
60 #include "src/core/lib/transport/connectivity_state.h"
61 #include "src/core/lib/transport/error_utils.h"
62 #include "src/core/lib/transport/metadata.h"
63 #include "src/core/lib/transport/metadata_batch.h"
64 #include "src/core/lib/transport/static_metadata.h"
65 #include "src/core/lib/transport/status_metadata.h"
66
67 namespace grpc_core {
68
69 //
70 // ResolvingLoadBalancingPolicy::ResolverResultHandler
71 //
72
73 class ResolvingLoadBalancingPolicy::ResolverResultHandler
74     : public Resolver::ResultHandler {
75  public:
76   explicit ResolverResultHandler(
77       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
78       : parent_(std::move(parent)) {}
79
80   ~ResolverResultHandler() {
81     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
82       gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
83               parent_.get());
84     }
85   }
86
87   void ReturnResult(Resolver::Result result) override {
88     parent_->OnResolverResultChangedLocked(std::move(result));
89   }
90
91   void ReturnError(grpc_error* error) override {
92     parent_->OnResolverError(error);
93   }
94
95  private:
96   RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
97 };
98
99 //
100 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
101 //
102
103 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
104     : public LoadBalancingPolicy::ChannelControlHelper {
105  public:
106   explicit ResolvingControlHelper(
107       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
108       : parent_(std::move(parent)) {}
109
110   RefCountedPtr<SubchannelInterface> CreateSubchannel(
111       const grpc_channel_args& args) override {
112     if (parent_->resolver_ == nullptr) return nullptr;  // Shutting down.
113     return parent_->channel_control_helper()->CreateSubchannel(args);
114   }
115
116   void UpdateState(grpc_connectivity_state state,
117                    std::unique_ptr<SubchannelPicker> picker) override {
118     if (parent_->resolver_ == nullptr) return;  // Shutting down.
119     parent_->channel_control_helper()->UpdateState(state, std::move(picker));
120   }
121
122   void RequestReresolution() override {
123     if (parent_->resolver_ == nullptr) return;  // Shutting down.
124     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
125       gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
126               parent_.get());
127     }
128     parent_->resolver_->RequestReresolutionLocked();
129   }
130
131   void AddTraceEvent(TraceSeverity severity, StringView message) override {
132     if (parent_->resolver_ == nullptr) return;  // Shutting down.
133     parent_->channel_control_helper()->AddTraceEvent(severity, message);
134   }
135
136  private:
137   RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
138 };
139
140 //
141 // ResolvingLoadBalancingPolicy
142 //
143
144 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
145     Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
146     ProcessResolverResultCallback process_resolver_result,
147     void* process_resolver_result_user_data)
148     : LoadBalancingPolicy(std::move(args)),
149       tracer_(tracer),
150       target_uri_(std::move(target_uri)),
151       process_resolver_result_(process_resolver_result),
152       process_resolver_result_user_data_(process_resolver_result_user_data) {
153   GPR_ASSERT(process_resolver_result != nullptr);
154   resolver_ = ResolverRegistry::CreateResolver(
155       target_uri_.get(), args.args, interested_parties(), combiner(),
156       absl::make_unique<ResolverResultHandler>(Ref()));
157   // Since the validity of args has been checked when create the channel,
158   // CreateResolver() must return a non-null result.
159   GPR_ASSERT(resolver_ != nullptr);
160   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
161     gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
162   }
163   channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING,
164                                         absl::make_unique<QueuePicker>(Ref()));
165   resolver_->StartLocked();
166 }
167
168 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
169   GPR_ASSERT(resolver_ == nullptr);
170   GPR_ASSERT(lb_policy_ == nullptr);
171 }
172
173 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
174   if (resolver_ != nullptr) {
175     resolver_.reset();
176     if (lb_policy_ != nullptr) {
177       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
178         gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
179                 lb_policy_.get());
180       }
181       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
182                                        interested_parties());
183       lb_policy_.reset();
184     }
185   }
186 }
187
188 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
189   if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
190 }
191
192 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
193   if (resolver_ != nullptr) {
194     resolver_->ResetBackoffLocked();
195     resolver_->RequestReresolutionLocked();
196   }
197   if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
198 }
199
200 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
201   if (resolver_ == nullptr) {
202     GRPC_ERROR_UNREF(error);
203     return;
204   }
205   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
206     gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
207             grpc_error_string(error));
208   }
209   // If we already have an LB policy from a previous resolution
210   // result, then we continue to let it set the connectivity state.
211   // Otherwise, we go into TRANSIENT_FAILURE.
212   if (lb_policy_ == nullptr) {
213     grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
214         "Resolver transient failure", &error, 1);
215     channel_control_helper()->UpdateState(
216         GRPC_CHANNEL_TRANSIENT_FAILURE,
217         absl::make_unique<TransientFailurePicker>(state_error));
218   }
219   GRPC_ERROR_UNREF(error);
220 }
221
222 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
223     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
224     Resolver::Result result) {
225   // Construct update.
226   UpdateArgs update_args;
227   update_args.addresses = std::move(result.addresses);
228   update_args.config = std::move(lb_policy_config);
229   // TODO(roth): Once channel args is converted to C++, use std::move() here.
230   update_args.args = result.args;
231   result.args = nullptr;
232   // Create policy if needed.
233   if (lb_policy_ == nullptr) {
234     lb_policy_ = CreateLbPolicyLocked(*update_args.args);
235   }
236   // Update the policy.
237   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
238     gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
239             lb_policy_.get());
240   }
241   lb_policy_->UpdateLocked(std::move(update_args));
242 }
243
244 // Creates a new LB policy.
245 // Updates trace_strings to indicate what was done.
246 OrphanablePtr<LoadBalancingPolicy>
247 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
248     const grpc_channel_args& args) {
249   LoadBalancingPolicy::Args lb_policy_args;
250   lb_policy_args.combiner = combiner();
251   lb_policy_args.channel_control_helper =
252       absl::make_unique<ResolvingControlHelper>(Ref());
253   lb_policy_args.args = &args;
254   OrphanablePtr<LoadBalancingPolicy> lb_policy =
255       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
256   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
257     gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
258             lb_policy.get());
259   }
260   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
261                                    interested_parties());
262   return lb_policy;
263 }
264
265 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
266     bool resolution_contains_addresses, TraceStringVector* trace_strings) {
267   if (!resolution_contains_addresses &&
268       previous_resolution_contained_addresses_) {
269     trace_strings->push_back(gpr_strdup("Address list became empty"));
270   } else if (resolution_contains_addresses &&
271              !previous_resolution_contained_addresses_) {
272     trace_strings->push_back(gpr_strdup("Address list became non-empty"));
273   }
274   previous_resolution_contained_addresses_ = resolution_contains_addresses;
275 }
276
277 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
278     TraceStringVector* trace_strings) const {
279   if (!trace_strings->empty()) {
280     gpr_strvec v;
281     gpr_strvec_init(&v);
282     gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
283     bool is_first = 1;
284     for (size_t i = 0; i < trace_strings->size(); ++i) {
285       if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
286       is_first = false;
287       gpr_strvec_add(&v, (*trace_strings)[i]);
288     }
289     size_t len = 0;
290     grpc_core::UniquePtr<char> message(gpr_strvec_flatten(&v, &len));
291     channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
292                                             StringView(message.get()));
293     gpr_strvec_destroy(&v);
294   }
295 }
296
297 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
298     Resolver::Result result) {
299   // Handle race conditions.
300   if (resolver_ == nullptr) return;
301   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
302     gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
303   }
304   // We only want to trace the address resolution in the follow cases:
305   // (a) Address resolution resulted in service config change.
306   // (b) Address resolution that causes number of backends to go from
307   //     zero to non-zero.
308   // (c) Address resolution that causes number of backends to go from
309   //     non-zero to zero.
310   // (d) Address resolution that causes a new LB policy to be created.
311   //
312   // We track a list of strings to eventually be concatenated and traced.
313   TraceStringVector trace_strings;
314   const bool resolution_contains_addresses = result.addresses.size() > 0;
315   // Process the resolver result.
316   RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
317   bool service_config_changed = false;
318   char* service_config_error_string = nullptr;
319   if (process_resolver_result_ != nullptr) {
320     grpc_error* service_config_error = GRPC_ERROR_NONE;
321     bool no_valid_service_config = false;
322     service_config_changed = process_resolver_result_(
323         process_resolver_result_user_data_, result, &lb_policy_config,
324         &service_config_error, &no_valid_service_config);
325     if (service_config_error != GRPC_ERROR_NONE) {
326       service_config_error_string =
327           gpr_strdup(grpc_error_string(service_config_error));
328       if (no_valid_service_config) {
329         // We received an invalid service config and we don't have a
330         // fallback service config.
331         OnResolverError(service_config_error);
332       } else {
333         GRPC_ERROR_UNREF(service_config_error);
334       }
335     }
336   } else {
337     lb_policy_config = child_lb_config_;
338   }
339   if (lb_policy_config != nullptr) {
340     // Create or update LB policy, as needed.
341     CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
342                                  std::move(result));
343   }
344   // Add channel trace event.
345   if (service_config_changed) {
346     // TODO(ncteisen): might be worth somehow including a snippet of the
347     // config in the trace, at the risk of bloating the trace logs.
348     trace_strings.push_back(gpr_strdup("Service config changed"));
349   }
350   if (service_config_error_string != nullptr) {
351     trace_strings.push_back(service_config_error_string);
352     service_config_error_string = nullptr;
353   }
354   MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
355                                                &trace_strings);
356   ConcatenateAndAddChannelTraceLocked(&trace_strings);
357 }
358
359 }  // namespace grpc_core