Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / test / cpp / end2end / xds_end2end_test.cc
index 2cc704d..847dec9 100644 (file)
@@ -54,6 +54,7 @@
 
 #include "src/proto/grpc/testing/echo.grpc.pb.h"
 #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
+#include "src/proto/grpc/testing/xds/cds_for_test.grpc.pb.h"
 #include "src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h"
 #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
 
@@ -83,6 +84,7 @@ namespace {
 
 using std::chrono::system_clock;
 
+using ::envoy::api::v2::Cluster;
 using ::envoy::api::v2::ClusterLoadAssignment;
 using ::envoy::api::v2::DiscoveryRequest;
 using ::envoy::api::v2::DiscoveryResponse;
@@ -94,6 +96,7 @@ using ::envoy::service::load_stats::v2::LoadStatsRequest;
 using ::envoy::service::load_stats::v2::LoadStatsResponse;
 using ::envoy::service::load_stats::v2::UpstreamLocalityStats;
 
+constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
 constexpr char kEdsTypeUrl[] =
     "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
 constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region";
@@ -105,14 +108,16 @@ constexpr int kDefaultLocalityPriority = 0;
 
 constexpr char kBootstrapFile[] =
     "{\n"
-    "  \"xds_server\": {\n"
-    "    \"server_uri\": \"fake:///lb\",\n"
-    "    \"channel_creds\": [\n"
-    "      {\n"
-    "        \"type\": \"fake\"\n"
-    "      }\n"
-    "    ]\n"
-    "  },\n"
+    "  \"xds_servers\": [\n"
+    "    {\n"
+    "      \"server_uri\": \"fake:///lb\",\n"
+    "      \"channel_creds\": [\n"
+    "        {\n"
+    "          \"type\": \"fake\"\n"
+    "        }\n"
+    "      ]\n"
+    "    }\n"
+    "  ],\n"
     "  \"node\": {\n"
     "    \"id\": \"xds_end2end_test\",\n"
     "    \"cluster\": \"test\",\n"
@@ -129,14 +134,16 @@ constexpr char kBootstrapFile[] =
 
 constexpr char kBootstrapFileBad[] =
     "{\n"
-    "  \"xds_server\": {\n"
-    "    \"server_uri\": \"fake:///wrong_lb\",\n"
-    "    \"channel_creds\": [\n"
-    "      {\n"
-    "        \"type\": \"fake\"\n"
-    "      }\n"
-    "    ]\n"
-    "  },\n"
+    "  \"xds_servers\": [\n"
+    "    {\n"
+    "      \"server_uri\": \"fake:///wrong_lb\",\n"
+    "      \"channel_creds\": [\n"
+    "        {\n"
+    "          \"type\": \"fake\"\n"
+    "        }\n"
+    "      ]\n"
+    "    }\n"
+    "  ],\n"
     "  \"node\": {\n"
     "  }\n"
     "}\n";
@@ -332,9 +339,16 @@ class ClientStats {
   std::map<grpc::string, uint64_t> dropped_requests_;
 };
 
-// Only the EDS functionality is implemented.
+// TODO(roth): Change this service to a real fake.
 class AdsServiceImpl : public AdsService {
  public:
+  enum ResponseState {
+    NOT_SENT,
+    SENT,
+    ACKED,
+    NACKED,
+  };
+
   struct ResponseArgs {
     struct Locality {
       Locality(const grpc::string& sub_zone, std::vector<int> ports,
@@ -367,6 +381,70 @@ class AdsServiceImpl : public AdsService {
   using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
   using ResponseDelayPair = std::pair<DiscoveryResponse, int>;
 
+  AdsServiceImpl(bool enable_load_reporting) {
+    default_cluster_.set_name("application_target_name");
+    default_cluster_.set_type(envoy::api::v2::Cluster::EDS);
+    default_cluster_.mutable_eds_cluster_config()
+        ->mutable_eds_config()
+        ->mutable_ads();
+    default_cluster_.set_lb_policy(envoy::api::v2::Cluster::ROUND_ROBIN);
+    if (enable_load_reporting) {
+      default_cluster_.mutable_lrs_server()->mutable_self();
+    }
+    cds_response_data_ = {
+        {"application_target_name", default_cluster_},
+    };
+  }
+
+  void HandleCdsRequest(DiscoveryRequest* request, Stream* stream) {
+    gpr_log(GPR_INFO, "ADS[%p]: received CDS request '%s'", this,
+            request->DebugString().c_str());
+    const std::string version_str = "version_1";
+    const std::string nonce_str = "nonce_1";
+    grpc_core::MutexLock lock(&ads_mu_);
+    if (cds_response_state_ == NOT_SENT) {
+      DiscoveryResponse response;
+      response.set_type_url(kCdsTypeUrl);
+      response.set_version_info(version_str);
+      response.set_nonce(nonce_str);
+      for (const auto& cluster_name : request->resource_names()) {
+        auto iter = cds_response_data_.find(cluster_name);
+        if (iter == cds_response_data_.end()) continue;
+        response.add_resources()->PackFrom(iter->second);
+      }
+      stream->Write(response);
+      cds_response_state_ = SENT;
+    } else if (cds_response_state_ == SENT) {
+      GPR_ASSERT(!request->response_nonce().empty());
+      cds_response_state_ =
+          request->version_info() == version_str ? ACKED : NACKED;
+    }
+  }
+
+  void HandleEdsRequest(DiscoveryRequest* request, Stream* stream) {
+    gpr_log(GPR_INFO, "ADS[%p]: received EDS request '%s'", this,
+            request->DebugString().c_str());
+    IncreaseRequestCount();
+    std::vector<ResponseDelayPair> responses_and_delays;
+    {
+      grpc_core::MutexLock lock(&ads_mu_);
+      responses_and_delays = eds_responses_and_delays_;
+    }
+    // Send response.
+    for (const auto& p : responses_and_delays) {
+      const DiscoveryResponse& response = p.first;
+      const int delay_ms = p.second;
+      gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms);
+      if (delay_ms > 0) {
+        gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
+      }
+      gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this,
+              response.DebugString().c_str());
+      IncreaseResponseCount();
+      stream->Write(response);
+    }
+  }
+
   Status StreamAggregatedResources(ServerContext* context,
                                    Stream* stream) override {
     gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
@@ -378,21 +456,21 @@ class AdsServiceImpl : public AdsService {
       // Balancer shouldn't receive the call credentials metadata.
       EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
                 context->client_metadata().end());
-      // Read request.
+      // Keep servicing requests until the EDS response has been sent back.
       DiscoveryRequest request;
-      if (!stream->Read(&request)) return;
-      IncreaseRequestCount();
-      gpr_log(GPR_INFO, "ADS[%p]: received initial message '%s'", this,
-              request.DebugString().c_str());
-      // Send response.
-      std::vector<ResponseDelayPair> responses_and_delays;
-      {
-        grpc_core::MutexLock lock(&ads_mu_);
-        responses_and_delays = responses_and_delays_;
-      }
-      for (const auto& response_and_delay : responses_and_delays) {
-        SendResponse(stream, response_and_delay.first,
-                     response_and_delay.second);
+      // TODO(roth): For each supported type, we currently only handle one
+      // request without replying to any new requests (for ACK/NACK or new
+      // resource names). It's not causing a big problem now but should be
+      // fixed.
+      bool eds_sent = false;
+      while (!eds_sent || cds_response_state_ == SENT) {
+        if (!stream->Read(&request)) return;
+        if (request.type_url() == kCdsTypeUrl) {
+          HandleCdsRequest(&request, stream);
+        } else if (request.type_url() == kEdsTypeUrl) {
+          HandleEdsRequest(&request, stream);
+          eds_sent = true;
+        }
       }
       // Wait until notified done.
       grpc_core::MutexLock lock(&ads_mu_);
@@ -402,29 +480,42 @@ class AdsServiceImpl : public AdsService {
     return Status::OK;
   }
 
-  void add_response(const DiscoveryResponse& response, int send_after_ms) {
+  Cluster GetDefaultCluster() const { return default_cluster_; }
+
+  void SetCdsResponse(
+      std::map<std::string /*cluster_name*/, Cluster> cds_response_data) {
+    cds_response_data_ = std::move(cds_response_data);
+  }
+
+  ResponseState cds_response_state() {
+    grpc_core::MutexLock lock(&ads_mu_);
+    return cds_response_state_;
+  }
+
+  void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) {
     grpc_core::MutexLock lock(&ads_mu_);
-    responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
+    eds_responses_and_delays_.push_back(
+        std::make_pair(response, send_after_ms));
   }
 
   void Start() {
     grpc_core::MutexLock lock(&ads_mu_);
     ads_done_ = false;
-    responses_and_delays_.clear();
+    eds_responses_and_delays_.clear();
   }
 
   void Shutdown() {
     {
       grpc_core::MutexLock lock(&ads_mu_);
       NotifyDoneWithAdsCallLocked();
-      responses_and_delays_.clear();
+      eds_responses_and_delays_.clear();
     }
     gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
   }
 
   static DiscoveryResponse BuildResponse(const ResponseArgs& args) {
     ClusterLoadAssignment assignment;
-    assignment.set_cluster_name("service name");
+    assignment.set_cluster_name("application_target_name");
     for (const auto& locality : args.locality_list) {
       auto* endpoints = assignment.add_endpoints();
       endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
@@ -478,23 +569,16 @@ class AdsServiceImpl : public AdsService {
   }
 
  private:
-  void SendResponse(Stream* stream, const DiscoveryResponse& response,
-                    int delay_ms) {
-    gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms);
-    if (delay_ms > 0) {
-      gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
-    }
-    gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this,
-            response.DebugString().c_str());
-    IncreaseResponseCount();
-    stream->Write(response);
-  }
-
   grpc_core::CondVar ads_cond_;
   // Protect the members below.
   grpc_core::Mutex ads_mu_;
   bool ads_done_ = false;
-  std::vector<ResponseDelayPair> responses_and_delays_;
+  // CDS response data.
+  Cluster default_cluster_;
+  std::map<std::string /*cluster_name*/, Cluster> cds_response_data_;
+  ResponseState cds_response_state_ = NOT_SENT;
+  // EDS response data.
+  std::vector<ResponseDelayPair> eds_responses_and_delays_;
 };
 
 class LrsServiceImpl : public LrsService {
@@ -617,7 +701,7 @@ class TestType {
 class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
  protected:
   XdsEnd2endTest(size_t num_backends, size_t num_balancers,
-                 int client_load_reporting_interval_seconds)
+                 int client_load_reporting_interval_seconds = 100)
       : server_host_("localhost"),
         num_backends_(num_backends),
         num_balancers_(num_balancers),
@@ -652,7 +736,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
     // Start the load balancers.
     for (size_t i = 0; i < num_balancers_; ++i) {
       balancers_.emplace_back(
-          new BalancerServerThread(client_load_reporting_interval_seconds_));
+          new BalancerServerThread(GetParam().enable_load_reporting()
+                                       ? client_load_reporting_interval_seconds_
+                                       : 0));
       balancers_.back()->Start(server_host_);
     }
     ResetStub();
@@ -688,7 +774,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
     // If the parent channel is using the fake resolver, we inject the
     // response generator for the parent here, and then SetNextResolution()
     // will inject the xds channel's response generator via the parent's
-    // reponse generator.
+    // response generator.
     //
     // In contrast, if we are using the xds resolver, then the parent
     // channel never uses a response generator, and we inject the xds
@@ -864,7 +950,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
 
   void ScheduleResponseForBalancer(size_t i, const DiscoveryResponse& response,
                                    int delay_ms) {
-    balancers_[i]->ads_service()->add_response(response, delay_ms);
+    balancers_[i]->ads_service()->AddEdsResponse(response, delay_ms);
   }
 
   Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
@@ -980,7 +1066,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
   class BalancerServerThread : public ServerThread {
    public:
     explicit BalancerServerThread(int client_load_reporting_interval = 0)
-        : lrs_service_(client_load_reporting_interval) {}
+        : ads_service_(client_load_reporting_interval > 0),
+          lrs_service_(client_load_reporting_interval) {}
 
     AdsServiceImpl* ads_service() { return &ads_service_; }
     LrsServiceImpl* lrs_service() { return &lrs_service_; }
@@ -1042,7 +1129,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
 
 class BasicTest : public XdsEnd2endTest {
  public:
-  BasicTest() : XdsEnd2endTest(4, 1, 0) {}
+  BasicTest() : XdsEnd2endTest(4, 1) {}
 };
 
 // Tests that the balancer sends the correct response to the client, and the
@@ -1248,6 +1335,73 @@ TEST_P(SecureNamingTest, TargetNameIsUnexpected) {
       "");
 }
 
+using CdsTest = BasicTest;
+
+// Tests that CDS client should send an ACK upon correct CDS response.
+TEST_P(CdsTest, Vanilla) {
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  SendRpc();
+  EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+            AdsServiceImpl::ACKED);
+}
+
+// Tests that CDS client should send a NACK if the cluster type in CDS response
+// is other than EDS.
+TEST_P(CdsTest, WrongClusterType) {
+  auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+  cluster.set_type(envoy::api::v2::Cluster::STATIC);
+  balancers_[0]->ads_service()->SetCdsResponse(
+      {{"application_target_name", std::move(cluster)}});
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  SendRpc();
+  EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+            AdsServiceImpl::NACKED);
+}
+
+// Tests that CDS client should send a NACK if the eds_config in CDS response is
+// other than ADS.
+TEST_P(CdsTest, WrongEdsConfig) {
+  auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+  cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
+  balancers_[0]->ads_service()->SetCdsResponse(
+      {{"application_target_name", std::move(cluster)}});
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  SendRpc();
+  EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+            AdsServiceImpl::NACKED);
+}
+
+// Tests that CDS client should send a NACK if the lb_policy in CDS response is
+// other than ROUND_ROBIN.
+TEST_P(CdsTest, WrongLbPolicy) {
+  auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+  cluster.set_lb_policy(envoy::api::v2::Cluster::LEAST_REQUEST);
+  balancers_[0]->ads_service()->SetCdsResponse(
+      {{"application_target_name", std::move(cluster)}});
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  SendRpc();
+  EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+            AdsServiceImpl::NACKED);
+}
+
+// Tests that CDS client should send a NACK if the lrs_server in CDS response is
+// other than SELF.
+TEST_P(CdsTest, WrongLrsServer) {
+  auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+  cluster.mutable_lrs_server()->mutable_ads();
+  balancers_[0]->ads_service()->SetCdsResponse(
+      {{"application_target_name", std::move(cluster)}});
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  SendRpc();
+  EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+            AdsServiceImpl::NACKED);
+}
+
 using LocalityMapTest = BasicTest;
 
 // Tests that the localities in a locality map are picked according to their
@@ -1440,7 +1594,7 @@ TEST_P(FailoverTest, ChooseHighestPriority) {
   ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
   WaitForBackend(3, false);
   for (size_t i = 0; i < 3; ++i) {
-    EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+    EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
   }
   // The ADS service got a single request, and sent a single response.
   EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
@@ -1464,7 +1618,7 @@ TEST_P(FailoverTest, Failover) {
   WaitForBackend(1, false);
   for (size_t i = 0; i < 4; ++i) {
     if (i == 1) continue;
-    EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+    EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
   }
   // The ADS service got a single request, and sent a single response.
   EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
@@ -1489,7 +1643,7 @@ TEST_P(FailoverTest, SwitchBackToHigherPriority) {
   WaitForBackend(1, false);
   for (size_t i = 0; i < 4; ++i) {
     if (i == 1) continue;
-    EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+    EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
   }
   StartBackend(0);
   WaitForBackend(0);
@@ -1528,7 +1682,7 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) {
   WaitForBackend(2, false);
   for (size_t i = 0; i < 4; ++i) {
     if (i == 2) continue;
-    EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+    EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
   }
   // The ADS service got a single request, and sent a single response.
   EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
@@ -1557,7 +1711,7 @@ TEST_P(FailoverTest, UpdatePriority) {
   ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 1000);
   WaitForBackend(3, false);
   for (size_t i = 0; i < 3; ++i) {
-    EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+    EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
   }
   WaitForBackend(1);
   CheckRpcSendOk(kNumRpcs);
@@ -2053,7 +2207,7 @@ TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) {
 
 class BalancerUpdateTest : public XdsEnd2endTest {
  public:
-  BalancerUpdateTest() : XdsEnd2endTest(4, 3, 0) {}
+  BalancerUpdateTest() : XdsEnd2endTest(4, 3) {}
 };
 
 // Tests that the old LB call is still used after the balancer address update as
@@ -2429,36 +2583,44 @@ grpc::string TestTypeName(const ::testing::TestParamInfo<TestType>& info) {
   return info.param.AsString();
 }
 
-// TODO(juanlishen): Load reporting disabled is currently tested only with DNS
-// resolver.  Once we implement CDS, test it via the xds resolver too.
-
 INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest,
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),
+                                           TestType(true, false),
                                            TestType(true, true)),
                          &TestTypeName);
 
 INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest,
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),
+                                           TestType(true, false),
+                                           TestType(true, true)),
+                         &TestTypeName);
+
+// CDS depends on XdsResolver.
+INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest,
+                         ::testing::Values(TestType(true, false),
                                            TestType(true, true)),
                          &TestTypeName);
 
 INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),
+                                           TestType(true, false),
                                            TestType(true, true)),
                          &TestTypeName);
 
 INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest,
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),
+                                           TestType(true, false),
                                            TestType(true, true)),
                          &TestTypeName);
 
 INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest,
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),
+                                           TestType(true, false),
                                            TestType(true, true)),
                          &TestTypeName);