Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / test / cpp / end2end / client_lb_end2end_test.cc
index df867e6..7776d8f 100644 (file)
@@ -24,6 +24,7 @@
 #include <string>
 #include <thread>
 
+#include "absl/memory/memory.h"
 #include "absl/strings/str_cat.h"
 
 #include <grpc/grpc.h>
@@ -41,7 +42,6 @@
 
 #include "src/core/ext/filters/client_channel/backup_poller.h"
 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
-#include "src/core/ext/filters/client_channel/parse_address.h"
 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
 #include "src/core/ext/filters/client_channel/server_address.h"
 #include "src/core/ext/filters/client_channel/service_config.h"
@@ -50,6 +50,7 @@
 #include "src/core/lib/gpr/env.h"
 #include "src/core/lib/gprpp/debug_location.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/parse_address.h"
 #include "src/core/lib/iomgr/tcp_client.h"
 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
 #include "src/cpp/client/secure_credentials.h"
@@ -58,6 +59,7 @@
 #include "src/proto/grpc/testing/echo.grpc.pb.h"
 #include "src/proto/grpc/testing/xds/orca_load_report_for_test.pb.h"
 #include "test/core/util/port.h"
+#include "test/core/util/resolve_localhost_ip46.h"
 #include "test/core/util/test_config.h"
 #include "test/core/util/test_lb_policies.h"
 #include "test/cpp/end2end/test_service_impl.h"
@@ -67,7 +69,6 @@
 
 using grpc::testing::EchoRequest;
 using grpc::testing::EchoResponse;
-using std::chrono::system_clock;
 
 // defined in tcp_client.cc
 extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
@@ -152,25 +153,32 @@ class MyTestServiceImpl : public TestServiceImpl {
 
 class FakeResolverResponseGeneratorWrapper {
  public:
-  FakeResolverResponseGeneratorWrapper()
-      : response_generator_(grpc_core::MakeRefCounted<
+  explicit FakeResolverResponseGeneratorWrapper(bool ipv6_only)
+      : ipv6_only_(ipv6_only),
+        response_generator_(grpc_core::MakeRefCounted<
                             grpc_core::FakeResolverResponseGenerator>()) {}
 
   FakeResolverResponseGeneratorWrapper(
       FakeResolverResponseGeneratorWrapper&& other) noexcept {
+    ipv6_only_ = other.ipv6_only_;
     response_generator_ = std::move(other.response_generator_);
   }
 
-  void SetNextResolution(const std::vector<int>& ports,
-                         const char* service_config_json = nullptr) {
+  void SetNextResolution(
+      const std::vector<int>& ports, const char* service_config_json = nullptr,
+      const char* attribute_key = nullptr,
+      std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
+          nullptr) {
     grpc_core::ExecCtx exec_ctx;
     response_generator_->SetResponse(
-        BuildFakeResults(ports, service_config_json));
+        BuildFakeResults(ipv6_only_, ports, service_config_json, attribute_key,
+                         std::move(attribute)));
   }
 
   void SetNextResolutionUponError(const std::vector<int>& ports) {
     grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
+    response_generator_->SetReresolutionResponse(
+        BuildFakeResults(ipv6_only_, ports));
   }
 
   void SetFailureOnReresolution() {
@@ -184,27 +192,38 @@ class FakeResolverResponseGeneratorWrapper {
 
  private:
   static grpc_core::Resolver::Result BuildFakeResults(
-      const std::vector<int>& ports,
-      const char* service_config_json = nullptr) {
+      bool ipv6_only, const std::vector<int>& ports,
+      const char* service_config_json = nullptr,
+      const char* attribute_key = nullptr,
+      std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
+          nullptr) {
     grpc_core::Resolver::Result result;
     for (const int& port : ports) {
-      std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port);
+      std::string lb_uri_str =
+          absl::StrCat(ipv6_only ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port);
       grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str.c_str(), true);
       GPR_ASSERT(lb_uri != nullptr);
       grpc_resolved_address address;
       GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+      std::map<const char*,
+               std::unique_ptr<grpc_core::ServerAddress::AttributeInterface>>
+          attributes;
+      if (attribute != nullptr) {
+        attributes[attribute_key] = attribute->Copy();
+      }
       result.addresses.emplace_back(address.addr, address.len,
-                                    nullptr /* args */);
+                                    nullptr /* args */, std::move(attributes));
       grpc_uri_destroy(lb_uri);
     }
     if (service_config_json != nullptr) {
       result.service_config = grpc_core::ServiceConfig::Create(
-          service_config_json, &result.service_config_error);
+          nullptr, service_config_json, &result.service_config_error);
       GPR_ASSERT(result.service_config != nullptr);
     }
     return result;
   }
 
+  bool ipv6_only_ = false;
   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
       response_generator_;
 };
@@ -227,7 +246,14 @@ class ClientLbEnd2endTest : public ::testing::Test {
 #endif
   }
 
-  void SetUp() override { grpc_init(); }
+  void SetUp() override {
+    grpc_init();
+    bool localhost_resolves_to_ipv4 = false;
+    bool localhost_resolves_to_ipv6 = false;
+    grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
+                                 &localhost_resolves_to_ipv6);
+    ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
+  }
 
   void TearDown() override {
     for (size_t i = 0; i < servers_.size(); ++i) {
@@ -235,7 +261,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
     }
     servers_.clear();
     creds_.reset();
-    grpc_shutdown_blocking();
+    grpc_shutdown();
   }
 
   void CreateServers(size_t num_servers,
@@ -267,7 +293,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
   }
 
   FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
-    return FakeResolverResponseGeneratorWrapper();
+    return FakeResolverResponseGeneratorWrapper(ipv6_only_);
   }
 
   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
@@ -279,7 +305,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
       const std::string& lb_policy_name,
       const FakeResolverResponseGeneratorWrapper& response_generator,
       ChannelArguments args = ChannelArguments()) {
-    if (lb_policy_name.size() > 0) {
+    if (!lb_policy_name.empty()) {
       args.SetLoadBalancingPolicyName(lb_policy_name);
     }  // else, default to pick first
     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
@@ -348,8 +374,8 @@ class ClientLbEnd2endTest : public ::testing::Test {
       grpc::internal::Mutex mu;
       grpc::internal::MutexLock lock(&mu);
       grpc::internal::CondVar cond;
-      thread_.reset(new std::thread(
-          std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
+      thread_ = absl::make_unique<std::thread>(
+          std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
       cond.WaitUntil(&mu, [this] { return server_ready_; });
       server_ready_ = false;
       gpr_log(GPR_INFO, "server startup complete");
@@ -401,7 +427,8 @@ class ClientLbEnd2endTest : public ::testing::Test {
   }
 
   bool WaitForChannelState(
-      Channel* channel, std::function<bool(grpc_connectivity_state)> predicate,
+      Channel* channel,
+      const std::function<bool(grpc_connectivity_state)>& predicate,
       bool try_to_connect = false, int timeout_seconds = 5) {
     const gpr_timespec deadline =
         grpc_timeout_seconds_to_deadline(timeout_seconds);
@@ -456,6 +483,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
   std::vector<std::unique_ptr<ServerData>> servers_;
   const std::string kRequestMessage_;
   std::shared_ptr<ChannelCredentials> creds_;
+  bool ipv6_only_ = false;
 };
 
 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
@@ -1648,7 +1676,7 @@ class ClientLbPickArgsTest : public ClientLbEnd2endTest {
     grpc_core::RegisterTestPickArgsLoadBalancingPolicy(SavePickArgs);
   }
 
-  static void TearDownTestCase() { grpc_shutdown_blocking(); }
+  static void TearDownTestCase() { grpc_shutdown(); }
 
   const std::vector<grpc_core::PickArgsSeen>& args_seen_list() {
     grpc::internal::MutexLock lock(&mu_);
@@ -1714,7 +1742,7 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
         ReportTrailerIntercepted);
   }
 
-  static void TearDownTestCase() { grpc_shutdown_blocking(); }
+  static void TearDownTestCase() { grpc_shutdown(); }
 
   int trailers_intercepted() {
     grpc::internal::MutexLock lock(&mu_);
@@ -1740,7 +1768,8 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
     self->trailers_intercepted_++;
     self->trailing_metadata_ = args_seen.metadata;
     if (backend_metric_data != nullptr) {
-      self->load_report_.reset(new udpa::data::orca::v1::OrcaLoadReport);
+      self->load_report_ =
+          absl::make_unique<udpa::data::orca::v1::OrcaLoadReport>();
       self->load_report_->set_cpu_utilization(
           backend_metric_data->cpu_utilization);
       self->load_report_->set_mem_utilization(
@@ -1748,13 +1777,11 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
       self->load_report_->set_rps(backend_metric_data->requests_per_second);
       for (const auto& p : backend_metric_data->request_cost) {
         std::string name = std::string(p.first);
-        (*self->load_report_->mutable_request_cost())[std::move(name)] =
-            p.second;
+        (*self->load_report_->mutable_request_cost())[name] = p.second;
       }
       for (const auto& p : backend_metric_data->utilization) {
         std::string name = std::string(p.first);
-        (*self->load_report_->mutable_utilization())[std::move(name)] =
-            p.second;
+        (*self->load_report_->mutable_utilization())[name] = p.second;
       }
     }
   }
@@ -1887,6 +1914,84 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
   EXPECT_EQ(kNumRpcs, trailers_intercepted());
 }
 
+class ClientLbAddressTest : public ClientLbEnd2endTest {
+ protected:
+  static const char* kAttributeKey;
+
+  class Attribute : public grpc_core::ServerAddress::AttributeInterface {
+   public:
+    explicit Attribute(const std::string& str) : str_(str) {}
+
+    std::unique_ptr<AttributeInterface> Copy() const override {
+      return absl::make_unique<Attribute>(str_);
+    }
+
+    int Cmp(const AttributeInterface* other) const override {
+      return str_.compare(static_cast<const Attribute*>(other)->str_);
+    }
+
+    std::string ToString() const override { return str_; }
+
+   private:
+    std::string str_;
+  };
+
+  void SetUp() override {
+    ClientLbEnd2endTest::SetUp();
+    current_test_instance_ = this;
+  }
+
+  static void SetUpTestCase() {
+    grpc_init();
+    grpc_core::RegisterAddressTestLoadBalancingPolicy(SaveAddress);
+  }
+
+  static void TearDownTestCase() { grpc_shutdown(); }
+
+  const std::vector<std::string>& addresses_seen() {
+    grpc::internal::MutexLock lock(&mu_);
+    return addresses_seen_;
+  }
+
+ private:
+  static void SaveAddress(const grpc_core::ServerAddress& address) {
+    ClientLbAddressTest* self = current_test_instance_;
+    grpc::internal::MutexLock lock(&self->mu_);
+    self->addresses_seen_.emplace_back(address.ToString());
+  }
+
+  static ClientLbAddressTest* current_test_instance_;
+  grpc::internal::Mutex mu_;
+  std::vector<std::string> addresses_seen_;
+};
+
+const char* ClientLbAddressTest::kAttributeKey = "attribute_key";
+
+ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
+
+TEST_F(ClientLbAddressTest, Basic) {
+  const int kNumServers = 1;
+  StartServers(kNumServers);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("address_test_lb", response_generator);
+  auto stub = BuildStub(channel);
+  // Addresses returned by the resolver will have attached attributes.
+  response_generator.SetNextResolution(GetServersPorts(), nullptr,
+                                       kAttributeKey,
+                                       absl::make_unique<Attribute>("foo"));
+  CheckRpcSendOk(stub, DEBUG_LOCATION);
+  // Check LB policy name for the channel.
+  EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
+  // Make sure that the attributes wind up on the subchannels.
+  std::vector<std::string> expected;
+  for (const int port : GetServersPorts()) {
+    expected.emplace_back(
+        absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", port,
+                     " args={} attributes={", kAttributeKey, "=foo}"));
+  }
+  EXPECT_EQ(addresses_seen(), expected);
+}
+
 }  // namespace
 }  // namespace testing
 }  // namespace grpc