Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / resolving_lb_policy.cc
index 74a5c29..889c20c 100644 (file)
@@ -78,7 +78,7 @@ class ResolvingLoadBalancingPolicy::ResolverResultHandler
       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
       : parent_(std::move(parent)) {}
 
-  ~ResolverResultHandler() {
+  ~ResolverResultHandler() override {
     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
       gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
               parent_.get());
@@ -109,15 +109,17 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
       : parent_(std::move(parent)) {}
 
   RefCountedPtr<SubchannelInterface> CreateSubchannel(
-      const grpc_channel_args& args) override {
+      ServerAddress address, const grpc_channel_args& args) override {
     if (parent_->resolver_ == nullptr) return nullptr;  // Shutting down.
-    return parent_->channel_control_helper()->CreateSubchannel(args);
+    return parent_->channel_control_helper()->CreateSubchannel(
+        std::move(address), args);
   }
 
-  void UpdateState(grpc_connectivity_state state,
+  void UpdateState(grpc_connectivity_state state, const absl::Status& status,
                    std::unique_ptr<SubchannelPicker> picker) override {
     if (parent_->resolver_ == nullptr) return;  // Shutting down.
-    parent_->channel_control_helper()->UpdateState(state, std::move(picker));
+    parent_->channel_control_helper()->UpdateState(state, status,
+                                                   std::move(picker));
   }
 
   void RequestReresolution() override {
@@ -145,14 +147,12 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
 
 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
     Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
-    ProcessResolverResultCallback process_resolver_result,
-    void* process_resolver_result_user_data)
+    ChannelConfigHelper* helper)
     : LoadBalancingPolicy(std::move(args)),
       tracer_(tracer),
       target_uri_(std::move(target_uri)),
-      process_resolver_result_(process_resolver_result),
-      process_resolver_result_user_data_(process_resolver_result_user_data) {
-  GPR_ASSERT(process_resolver_result != nullptr);
+      helper_(helper) {
+  GPR_ASSERT(helper_ != nullptr);
   resolver_ = ResolverRegistry::CreateResolver(
       target_uri_.get(), args.args, interested_parties(), work_serializer(),
       absl::make_unique<ResolverResultHandler>(Ref()));
@@ -162,7 +162,7 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
     gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
   }
-  channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING,
+  channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
                                         absl::make_unique<QueuePicker>(Ref()));
   resolver_->StartLocked();
 }
@@ -174,6 +174,10 @@ ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
 
 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
   if (resolver_ != nullptr) {
+    if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+      gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this,
+              resolver_.get());
+    }
     resolver_.reset();
     if (lb_policy_ != nullptr) {
       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
@@ -214,8 +218,9 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
   if (lb_policy_ == nullptr) {
     grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
         "Resolver transient failure", &error, 1);
+    helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error));
     channel_control_helper()->UpdateState(
-        GRPC_CHANNEL_TRANSIENT_FAILURE,
+        GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
         absl::make_unique<TransientFailurePicker>(state_error));
   }
   GRPC_ERROR_UNREF(error);
@@ -228,9 +233,12 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
   UpdateArgs update_args;
   update_args.addresses = std::move(result.addresses);
   update_args.config = std::move(lb_policy_config);
-  // TODO(roth): Once channel args is converted to C++, use std::move() here.
-  update_args.args = result.args;
-  result.args = nullptr;
+  // Remove the config selector from channel args so that we're not holding
+  // unnecessary refs that cause it to be destroyed somewhere other than in the
+  // WorkSerializer.
+  const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
+  update_args.args =
+      grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
   // Create policy if needed.
   if (lb_policy_ == nullptr) {
     lb_policy_ = CreateLbPolicyLocked(*update_args.args);
@@ -302,47 +310,46 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
   //
   // We track a list of strings to eventually be concatenated and traced.
   TraceStringVector trace_strings;
-  const bool resolution_contains_addresses = result.addresses.size() > 0;
-  // Process the resolver result.
-  RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
-  bool service_config_changed = false;
-  std::string service_config_error_string;
-  if (process_resolver_result_ != nullptr) {
-    grpc_error* service_config_error = GRPC_ERROR_NONE;
-    bool no_valid_service_config = false;
-    service_config_changed = process_resolver_result_(
-        process_resolver_result_user_data_, result, &lb_policy_config,
-        &service_config_error, &no_valid_service_config);
-    if (service_config_error != GRPC_ERROR_NONE) {
-      service_config_error_string = grpc_error_string(service_config_error);
-      if (no_valid_service_config) {
-        // We received an invalid service config and we don't have a
-        // fallback service config.
-        OnResolverError(service_config_error);
-      } else {
-        GRPC_ERROR_UNREF(service_config_error);
-      }
-    }
+  MaybeAddTraceMessagesForAddressChangesLocked(!result.addresses.empty(),
+                                               &trace_strings);
+  // The result of grpc_error_string() is owned by the error itself.
+  // We're storing that string in trace_strings, so we need to make sure
+  // that the error lives until we're done with the string.
+  grpc_error* service_config_error =
+      GRPC_ERROR_REF(result.service_config_error);
+  if (service_config_error != GRPC_ERROR_NONE) {
+    trace_strings.push_back(grpc_error_string(service_config_error));
+  }
+  // Choose the service config.
+  ChannelConfigHelper::ChooseServiceConfigResult service_config_result;
+  if (helper_ != nullptr) {
+    service_config_result = helper_->ChooseServiceConfig(result);
   } else {
-    lb_policy_config = child_lb_config_;
+    service_config_result.lb_policy_config = child_lb_config_;
   }
-  if (lb_policy_config != nullptr) {
+  if (service_config_result.no_valid_service_config) {
+    // We received an invalid service config and we don't have a
+    // previous service config to fall back to.
+    OnResolverError(GRPC_ERROR_REF(service_config_error));
+    trace_strings.push_back("no valid service config");
+  } else {
     // Create or update LB policy, as needed.
-    CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
-                                 std::move(result));
+    CreateOrUpdateLbPolicyLocked(
+        std::move(service_config_result.lb_policy_config), std::move(result));
+    if (service_config_result.service_config_changed) {
+      // Tell channel to start using new service config for calls.
+      // This needs to happen after the LB policy has been updated, since
+      // the ConfigSelector may need the LB policy to know about new
+      // destinations before it can send RPCs to those destinations.
+      if (helper_ != nullptr) helper_->StartUsingServiceConfigForCalls();
+      // TODO(ncteisen): might be worth somehow including a snippet of the
+      // config in the trace, at the risk of bloating the trace logs.
+      trace_strings.push_back("Service config changed");
+    }
   }
   // Add channel trace event.
-  if (service_config_changed) {
-    // TODO(ncteisen): might be worth somehow including a snippet of the
-    // config in the trace, at the risk of bloating the trace logs.
-    trace_strings.push_back("Service config changed");
-  }
-  if (!service_config_error_string.empty()) {
-    trace_strings.push_back(service_config_error_string.c_str());
-  }
-  MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
-                                               &trace_strings);
   ConcatenateAndAddChannelTraceLocked(trace_strings);
+  GRPC_ERROR_UNREF(service_config_error);
 }
 
 }  // namespace grpc_core