#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
+#include "src/proto/grpc/testing/xds/cds_for_test.grpc.pb.h"
#include "src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h"
#include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
using std::chrono::system_clock;
+using ::envoy::api::v2::Cluster;
using ::envoy::api::v2::ClusterLoadAssignment;
using ::envoy::api::v2::DiscoveryRequest;
using ::envoy::api::v2::DiscoveryResponse;
using ::envoy::service::load_stats::v2::LoadStatsResponse;
using ::envoy::service::load_stats::v2::UpstreamLocalityStats;
+constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
constexpr char kEdsTypeUrl[] =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region";
constexpr char kBootstrapFile[] =
"{\n"
- " \"xds_server\": {\n"
- " \"server_uri\": \"fake:///lb\",\n"
- " \"channel_creds\": [\n"
- " {\n"
- " \"type\": \"fake\"\n"
- " }\n"
- " ]\n"
- " },\n"
+ " \"xds_servers\": [\n"
+ " {\n"
+ " \"server_uri\": \"fake:///lb\",\n"
+ " \"channel_creds\": [\n"
+ " {\n"
+ " \"type\": \"fake\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ],\n"
" \"node\": {\n"
" \"id\": \"xds_end2end_test\",\n"
" \"cluster\": \"test\",\n"
constexpr char kBootstrapFileBad[] =
"{\n"
- " \"xds_server\": {\n"
- " \"server_uri\": \"fake:///wrong_lb\",\n"
- " \"channel_creds\": [\n"
- " {\n"
- " \"type\": \"fake\"\n"
- " }\n"
- " ]\n"
- " },\n"
+ " \"xds_servers\": [\n"
+ " {\n"
+ " \"server_uri\": \"fake:///wrong_lb\",\n"
+ " \"channel_creds\": [\n"
+ " {\n"
+ " \"type\": \"fake\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ],\n"
" \"node\": {\n"
" }\n"
"}\n";
std::map<grpc::string, uint64_t> dropped_requests_;
};
-// Only the EDS functionality is implemented.
+// TODO(roth): Change this service to a real fake.
class AdsServiceImpl : public AdsService {
public:
+ enum ResponseState {
+ NOT_SENT,
+ SENT,
+ ACKED,
+ NACKED,
+ };
+
struct ResponseArgs {
struct Locality {
Locality(const grpc::string& sub_zone, std::vector<int> ports,
using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
using ResponseDelayPair = std::pair<DiscoveryResponse, int>;
+ AdsServiceImpl(bool enable_load_reporting) {
+ default_cluster_.set_name("application_target_name");
+ default_cluster_.set_type(envoy::api::v2::Cluster::EDS);
+ default_cluster_.mutable_eds_cluster_config()
+ ->mutable_eds_config()
+ ->mutable_ads();
+ default_cluster_.set_lb_policy(envoy::api::v2::Cluster::ROUND_ROBIN);
+ if (enable_load_reporting) {
+ default_cluster_.mutable_lrs_server()->mutable_self();
+ }
+ cds_response_data_ = {
+ {"application_target_name", default_cluster_},
+ };
+ }
+
+ void HandleCdsRequest(DiscoveryRequest* request, Stream* stream) {
+ gpr_log(GPR_INFO, "ADS[%p]: received CDS request '%s'", this,
+ request->DebugString().c_str());
+ const std::string version_str = "version_1";
+ const std::string nonce_str = "nonce_1";
+ grpc_core::MutexLock lock(&ads_mu_);
+ if (cds_response_state_ == NOT_SENT) {
+ DiscoveryResponse response;
+ response.set_type_url(kCdsTypeUrl);
+ response.set_version_info(version_str);
+ response.set_nonce(nonce_str);
+ for (const auto& cluster_name : request->resource_names()) {
+ auto iter = cds_response_data_.find(cluster_name);
+ if (iter == cds_response_data_.end()) continue;
+ response.add_resources()->PackFrom(iter->second);
+ }
+ stream->Write(response);
+ cds_response_state_ = SENT;
+ } else if (cds_response_state_ == SENT) {
+ GPR_ASSERT(!request->response_nonce().empty());
+ cds_response_state_ =
+ request->version_info() == version_str ? ACKED : NACKED;
+ }
+ }
+
+ void HandleEdsRequest(DiscoveryRequest* request, Stream* stream) {
+ gpr_log(GPR_INFO, "ADS[%p]: received EDS request '%s'", this,
+ request->DebugString().c_str());
+ IncreaseRequestCount();
+ std::vector<ResponseDelayPair> responses_and_delays;
+ {
+ grpc_core::MutexLock lock(&ads_mu_);
+ responses_and_delays = eds_responses_and_delays_;
+ }
+ // Send response.
+ for (const auto& p : responses_and_delays) {
+ const DiscoveryResponse& response = p.first;
+ const int delay_ms = p.second;
+ gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms);
+ if (delay_ms > 0) {
+ gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
+ }
+ gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this,
+ response.DebugString().c_str());
+ IncreaseResponseCount();
+ stream->Write(response);
+ }
+ }
+
Status StreamAggregatedResources(ServerContext* context,
Stream* stream) override {
gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
// Balancer shouldn't receive the call credentials metadata.
EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
context->client_metadata().end());
- // Read request.
+ // Keep servicing requests until the EDS response has been sent back.
DiscoveryRequest request;
- if (!stream->Read(&request)) return;
- IncreaseRequestCount();
- gpr_log(GPR_INFO, "ADS[%p]: received initial message '%s'", this,
- request.DebugString().c_str());
- // Send response.
- std::vector<ResponseDelayPair> responses_and_delays;
- {
- grpc_core::MutexLock lock(&ads_mu_);
- responses_and_delays = responses_and_delays_;
- }
- for (const auto& response_and_delay : responses_and_delays) {
- SendResponse(stream, response_and_delay.first,
- response_and_delay.second);
+ // TODO(roth): For each supported type, we currently only handle one
+ // request without replying to any new requests (for ACK/NACK or new
+ // resource names). It's not causing a big problem now but should be
+ // fixed.
+ bool eds_sent = false;
+ while (!eds_sent || cds_response_state_ == SENT) {
+ if (!stream->Read(&request)) return;
+ if (request.type_url() == kCdsTypeUrl) {
+ HandleCdsRequest(&request, stream);
+ } else if (request.type_url() == kEdsTypeUrl) {
+ HandleEdsRequest(&request, stream);
+ eds_sent = true;
+ }
}
// Wait until notified done.
grpc_core::MutexLock lock(&ads_mu_);
return Status::OK;
}
- void add_response(const DiscoveryResponse& response, int send_after_ms) {
+ Cluster GetDefaultCluster() const { return default_cluster_; }
+
+ void SetCdsResponse(
+ std::map<std::string /*cluster_name*/, Cluster> cds_response_data) {
+ cds_response_data_ = std::move(cds_response_data);
+ }
+
+ ResponseState cds_response_state() {
+ grpc_core::MutexLock lock(&ads_mu_);
+ return cds_response_state_;
+ }
+
+ void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) {
grpc_core::MutexLock lock(&ads_mu_);
- responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
+ eds_responses_and_delays_.push_back(
+ std::make_pair(response, send_after_ms));
}
void Start() {
grpc_core::MutexLock lock(&ads_mu_);
ads_done_ = false;
- responses_and_delays_.clear();
+ eds_responses_and_delays_.clear();
}
void Shutdown() {
{
grpc_core::MutexLock lock(&ads_mu_);
NotifyDoneWithAdsCallLocked();
- responses_and_delays_.clear();
+ eds_responses_and_delays_.clear();
}
gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
}
static DiscoveryResponse BuildResponse(const ResponseArgs& args) {
ClusterLoadAssignment assignment;
- assignment.set_cluster_name("service name");
+ assignment.set_cluster_name("application_target_name");
for (const auto& locality : args.locality_list) {
auto* endpoints = assignment.add_endpoints();
endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
}
private:
- void SendResponse(Stream* stream, const DiscoveryResponse& response,
- int delay_ms) {
- gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms);
- if (delay_ms > 0) {
- gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
- }
- gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this,
- response.DebugString().c_str());
- IncreaseResponseCount();
- stream->Write(response);
- }
-
grpc_core::CondVar ads_cond_;
// Protect the members below.
grpc_core::Mutex ads_mu_;
bool ads_done_ = false;
- std::vector<ResponseDelayPair> responses_and_delays_;
+ // CDS response data.
+ Cluster default_cluster_;
+ std::map<std::string /*cluster_name*/, Cluster> cds_response_data_;
+ ResponseState cds_response_state_ = NOT_SENT;
+ // EDS response data.
+ std::vector<ResponseDelayPair> eds_responses_and_delays_;
};
class LrsServiceImpl : public LrsService {
class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
protected:
XdsEnd2endTest(size_t num_backends, size_t num_balancers,
- int client_load_reporting_interval_seconds)
+ int client_load_reporting_interval_seconds = 100)
: server_host_("localhost"),
num_backends_(num_backends),
num_balancers_(num_balancers),
// Start the load balancers.
for (size_t i = 0; i < num_balancers_; ++i) {
balancers_.emplace_back(
- new BalancerServerThread(client_load_reporting_interval_seconds_));
+ new BalancerServerThread(GetParam().enable_load_reporting()
+ ? client_load_reporting_interval_seconds_
+ : 0));
balancers_.back()->Start(server_host_);
}
ResetStub();
// If the parent channel is using the fake resolver, we inject the
// response generator for the parent here, and then SetNextResolution()
// will inject the xds channel's response generator via the parent's
- // reponse generator.
+ // response generator.
//
// In contrast, if we are using the xds resolver, then the parent
// channel never uses a response generator, and we inject the xds
void ScheduleResponseForBalancer(size_t i, const DiscoveryResponse& response,
int delay_ms) {
- balancers_[i]->ads_service()->add_response(response, delay_ms);
+ balancers_[i]->ads_service()->AddEdsResponse(response, delay_ms);
}
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
class BalancerServerThread : public ServerThread {
public:
explicit BalancerServerThread(int client_load_reporting_interval = 0)
- : lrs_service_(client_load_reporting_interval) {}
+ : ads_service_(client_load_reporting_interval > 0),
+ lrs_service_(client_load_reporting_interval) {}
AdsServiceImpl* ads_service() { return &ads_service_; }
LrsServiceImpl* lrs_service() { return &lrs_service_; }
class BasicTest : public XdsEnd2endTest {
public:
- BasicTest() : XdsEnd2endTest(4, 1, 0) {}
+ BasicTest() : XdsEnd2endTest(4, 1) {}
};
// Tests that the balancer sends the correct response to the client, and the
"");
}
+using CdsTest = BasicTest;
+
+// Tests that CDS client should send an ACK upon correct CDS response.
+TEST_P(CdsTest, Vanilla) {
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ SendRpc();
+ EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+ AdsServiceImpl::ACKED);
+}
+
+// Tests that CDS client should send a NACK if the cluster type in CDS response
+// is other than EDS.
+TEST_P(CdsTest, WrongClusterType) {
+ auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+ cluster.set_type(envoy::api::v2::Cluster::STATIC);
+ balancers_[0]->ads_service()->SetCdsResponse(
+ {{"application_target_name", std::move(cluster)}});
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ SendRpc();
+ EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+ AdsServiceImpl::NACKED);
+}
+
+// Tests that CDS client should send a NACK if the eds_config in CDS response is
+// other than ADS.
+TEST_P(CdsTest, WrongEdsConfig) {
+ auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+ cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
+ balancers_[0]->ads_service()->SetCdsResponse(
+ {{"application_target_name", std::move(cluster)}});
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ SendRpc();
+ EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+ AdsServiceImpl::NACKED);
+}
+
+// Tests that CDS client should send a NACK if the lb_policy in CDS response is
+// other than ROUND_ROBIN.
+TEST_P(CdsTest, WrongLbPolicy) {
+ auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+ cluster.set_lb_policy(envoy::api::v2::Cluster::LEAST_REQUEST);
+ balancers_[0]->ads_service()->SetCdsResponse(
+ {{"application_target_name", std::move(cluster)}});
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ SendRpc();
+ EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+ AdsServiceImpl::NACKED);
+}
+
+// Tests that CDS client should send a NACK if the lrs_server in CDS response is
+// other than SELF.
+TEST_P(CdsTest, WrongLrsServer) {
+ auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
+ cluster.mutable_lrs_server()->mutable_ads();
+ balancers_[0]->ads_service()->SetCdsResponse(
+ {{"application_target_name", std::move(cluster)}});
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ SendRpc();
+ EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+ AdsServiceImpl::NACKED);
+}
+
using LocalityMapTest = BasicTest;
// Tests that the localities in a locality map are picked according to their
ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
WaitForBackend(3, false);
for (size_t i = 0; i < 3; ++i) {
- EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+ EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
// The ADS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
WaitForBackend(1, false);
for (size_t i = 0; i < 4; ++i) {
if (i == 1) continue;
- EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+ EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
// The ADS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
WaitForBackend(1, false);
for (size_t i = 0; i < 4; ++i) {
if (i == 1) continue;
- EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+ EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
StartBackend(0);
WaitForBackend(0);
WaitForBackend(2, false);
for (size_t i = 0; i < 4; ++i) {
if (i == 2) continue;
- EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+ EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
// The ADS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 1000);
WaitForBackend(3, false);
for (size_t i = 0; i < 3; ++i) {
- EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
+ EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
WaitForBackend(1);
CheckRpcSendOk(kNumRpcs);
class BalancerUpdateTest : public XdsEnd2endTest {
public:
- BalancerUpdateTest() : XdsEnd2endTest(4, 3, 0) {}
+ BalancerUpdateTest() : XdsEnd2endTest(4, 3) {}
};
// Tests that the old LB call is still used after the balancer address update as
return info.param.AsString();
}
-// TODO(juanlishen): Load reporting disabled is currently tested only with DNS
-// resolver. Once we implement CDS, test it via the xds resolver too.
-
INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest,
::testing::Values(TestType(false, true),
TestType(false, false),
+ TestType(true, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest,
::testing::Values(TestType(false, true),
TestType(false, false),
+ TestType(true, false),
+ TestType(true, true)),
+ &TestTypeName);
+
+// CDS depends on XdsResolver.
+INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest,
+ ::testing::Values(TestType(true, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
::testing::Values(TestType(false, true),
TestType(false, false),
+ TestType(true, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest,
::testing::Values(TestType(false, true),
TestType(false, false),
+ TestType(true, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest,
::testing::Values(TestType(false, true),
TestType(false, false),
+ TestType(true, false),
TestType(true, true)),
&TestTypeName);