3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
37 #include "src/core/ext/filters/client_channel/backup_poller.h"
38 #include "src/core/ext/filters/client_channel/parse_address.h"
39 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
40 #include "src/core/ext/filters/client_channel/server_address.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/tmpfile.h"
43 #include "src/core/lib/gprpp/map.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/gprpp/sync.h"
46 #include "src/core/lib/iomgr/sockaddr.h"
47 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
48 #include "src/cpp/client/secure_credentials.h"
49 #include "src/cpp/server/secure_server_credentials.h"
51 #include "test/core/util/port.h"
52 #include "test/core/util/test_config.h"
53 #include "test/cpp/end2end/test_service_impl.h"
55 #include "src/proto/grpc/testing/echo.grpc.pb.h"
56 #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
57 #include "src/proto/grpc/testing/xds/cds_for_test.grpc.pb.h"
58 #include "src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h"
59 #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
61 #include <gmock/gmock.h>
62 #include <gtest/gtest.h>
64 // TODO(dgq): Other scenarios in need of testing:
65 // - Send a serverlist with faulty ip:port addresses (port > 2^16, etc).
66 // - Test reception of invalid serverlist
67 // - Test against a non-LB server.
68 // - Random LB server closing the stream unexpectedly.
70 // Findings from end to end testing to be covered here:
71 // - Handling of LB servers restart, including reconnection after backing-off
73 // - Destruction of load balanced channel (and therefore of xds instance)
75 // 1) the internal LB call is still active. This should work by virtue
76 // of the weak reference the LB call holds. The call should be terminated as
77 // part of the xds shutdown process.
78 // 2) the retry timer is active. Again, the weak reference it holds should
79 // prevent a premature call to \a glb_destroy.
85 using std::chrono::system_clock;
87 using ::envoy::api::v2::Cluster;
88 using ::envoy::api::v2::ClusterLoadAssignment;
89 using ::envoy::api::v2::DiscoveryRequest;
90 using ::envoy::api::v2::DiscoveryResponse;
91 using ::envoy::api::v2::FractionalPercent;
92 using ::envoy::service::discovery::v2::AggregatedDiscoveryService;
93 using ::envoy::service::load_stats::v2::ClusterStats;
94 using ::envoy::service::load_stats::v2::LoadReportingService;
95 using ::envoy::service::load_stats::v2::LoadStatsRequest;
96 using ::envoy::service::load_stats::v2::LoadStatsResponse;
97 using ::envoy::service::load_stats::v2::UpstreamLocalityStats;
99 constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
100 constexpr char kEdsTypeUrl[] =
101 "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
102 constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region";
103 constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone";
104 constexpr char kLbDropType[] = "lb";
105 constexpr char kThrottleDropType[] = "throttle";
106 constexpr int kDefaultLocalityWeight = 3;
107 constexpr int kDefaultLocalityPriority = 0;
109 constexpr char kBootstrapFile[] =
111 " \"xds_servers\": [\n"
113 " \"server_uri\": \"fake:///lb\",\n"
114 " \"channel_creds\": [\n"
116 " \"type\": \"fake\"\n"
122 " \"id\": \"xds_end2end_test\",\n"
123 " \"cluster\": \"test\",\n"
125 " \"foo\": \"bar\"\n"
128 " \"region\": \"corp\",\n"
129 " \"zone\": \"svl\",\n"
130 " \"subzone\": \"mp3\"\n"
135 constexpr char kBootstrapFileBad[] =
137 " \"xds_servers\": [\n"
139 " \"server_uri\": \"fake:///wrong_lb\",\n"
140 " \"channel_creds\": [\n"
142 " \"type\": \"fake\"\n"
151 char* g_bootstrap_file;
152 char* g_bootstrap_file_bad;
154 void WriteBootstrapFiles() {
155 char* bootstrap_file;
156 FILE* out = gpr_tmpfile("xds_bootstrap", &bootstrap_file);
157 fputs(kBootstrapFile, out);
159 g_bootstrap_file = bootstrap_file;
160 out = gpr_tmpfile("xds_bootstrap_bad", &bootstrap_file);
161 fputs(kBootstrapFileBad, out);
163 g_bootstrap_file_bad = bootstrap_file;
166 // Helper class to minimize the number of unique ports we use for this test.
170 if (idx_ >= ports_.size()) {
171 ports_.push_back(grpc_pick_unused_port_or_die());
173 return ports_[idx_++];
176 void Reset() { idx_ = 0; }
179 std::vector<int> ports_;
183 PortSaver* g_port_saver = nullptr;
185 template <typename ServiceType>
186 class CountedService : public ServiceType {
188 size_t request_count() {
189 grpc_core::MutexLock lock(&mu_);
190 return request_count_;
193 size_t response_count() {
194 grpc_core::MutexLock lock(&mu_);
195 return response_count_;
198 void IncreaseResponseCount() {
199 grpc_core::MutexLock lock(&mu_);
202 void IncreaseRequestCount() {
203 grpc_core::MutexLock lock(&mu_);
207 void ResetCounters() {
208 grpc_core::MutexLock lock(&mu_);
214 grpc_core::Mutex mu_;
217 size_t request_count_ = 0;
218 size_t response_count_ = 0;
221 using BackendService = CountedService<TestServiceImpl>;
222 using AdsService = CountedService<AggregatedDiscoveryService::Service>;
223 using LrsService = CountedService<LoadReportingService::Service>;
225 const char g_kCallCredsMdKey[] = "Balancer should not ...";
226 const char g_kCallCredsMdValue[] = "... receive me";
228 class BackendServiceImpl : public BackendService {
230 BackendServiceImpl() {}
232 Status Echo(ServerContext* context, const EchoRequest* request,
233 EchoResponse* response) override {
234 // Backend should receive the call credentials metadata.
235 auto call_credentials_entry =
236 context->client_metadata().find(g_kCallCredsMdKey);
237 EXPECT_NE(call_credentials_entry, context->client_metadata().end());
238 if (call_credentials_entry != context->client_metadata().end()) {
239 EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue);
241 IncreaseRequestCount();
242 const auto status = TestServiceImpl::Echo(context, request, response);
243 IncreaseResponseCount();
244 AddClient(context->peer());
251 std::set<grpc::string> clients() {
252 grpc_core::MutexLock lock(&clients_mu_);
257 void AddClient(const grpc::string& client) {
258 grpc_core::MutexLock lock(&clients_mu_);
259 clients_.insert(client);
262 grpc_core::Mutex mu_;
263 grpc_core::Mutex clients_mu_;
264 std::set<grpc::string> clients_;
269 struct LocalityStats {
270 // Converts from proto message class.
271 LocalityStats(const UpstreamLocalityStats& upstream_locality_stats)
272 : total_successful_requests(
273 upstream_locality_stats.total_successful_requests()),
274 total_requests_in_progress(
275 upstream_locality_stats.total_requests_in_progress()),
276 total_error_requests(upstream_locality_stats.total_error_requests()),
277 total_issued_requests(
278 upstream_locality_stats.total_issued_requests()) {}
280 uint64_t total_successful_requests;
281 uint64_t total_requests_in_progress;
282 uint64_t total_error_requests;
283 uint64_t total_issued_requests;
286 // Converts from proto message class.
287 ClientStats(const ClusterStats& cluster_stats)
288 : total_dropped_requests_(cluster_stats.total_dropped_requests()) {
289 for (const auto& input_locality_stats :
290 cluster_stats.upstream_locality_stats()) {
291 locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
292 LocalityStats(input_locality_stats));
294 for (const auto& input_dropped_requests :
295 cluster_stats.dropped_requests()) {
296 dropped_requests_.emplace(input_dropped_requests.category(),
297 input_dropped_requests.dropped_count());
301 uint64_t total_successful_requests() const {
303 for (auto& p : locality_stats_) {
304 sum += p.second.total_successful_requests;
308 uint64_t total_requests_in_progress() const {
310 for (auto& p : locality_stats_) {
311 sum += p.second.total_requests_in_progress;
315 uint64_t total_error_requests() const {
317 for (auto& p : locality_stats_) {
318 sum += p.second.total_error_requests;
322 uint64_t total_issued_requests() const {
324 for (auto& p : locality_stats_) {
325 sum += p.second.total_issued_requests;
329 uint64_t total_dropped_requests() const { return total_dropped_requests_; }
330 uint64_t dropped_requests(const grpc::string& category) const {
331 auto iter = dropped_requests_.find(category);
332 GPR_ASSERT(iter != dropped_requests_.end());
337 std::map<grpc::string, LocalityStats> locality_stats_;
338 uint64_t total_dropped_requests_;
339 std::map<grpc::string, uint64_t> dropped_requests_;
342 // TODO(roth): Change this service to a real fake.
343 class AdsServiceImpl : public AdsService {
352 struct ResponseArgs {
354 Locality(const grpc::string& sub_zone, std::vector<int> ports,
355 int lb_weight = kDefaultLocalityWeight,
356 int priority = kDefaultLocalityPriority,
357 std::vector<envoy::api::v2::HealthStatus> health_statuses = {})
358 : sub_zone(std::move(sub_zone)),
359 ports(std::move(ports)),
360 lb_weight(lb_weight),
362 health_statuses(std::move(health_statuses)) {}
364 const grpc::string sub_zone;
365 std::vector<int> ports;
368 std::vector<envoy::api::v2::HealthStatus> health_statuses;
371 ResponseArgs() = default;
372 explicit ResponseArgs(std::vector<Locality> locality_list)
373 : locality_list(std::move(locality_list)) {}
375 std::vector<Locality> locality_list;
376 std::map<grpc::string, uint32_t> drop_categories;
377 FractionalPercent::DenominatorType drop_denominator =
378 FractionalPercent::MILLION;
381 using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
382 using ResponseDelayPair = std::pair<DiscoveryResponse, int>;
384 AdsServiceImpl(bool enable_load_reporting) {
385 default_cluster_.set_name("application_target_name");
386 default_cluster_.set_type(envoy::api::v2::Cluster::EDS);
387 default_cluster_.mutable_eds_cluster_config()
388 ->mutable_eds_config()
390 default_cluster_.set_lb_policy(envoy::api::v2::Cluster::ROUND_ROBIN);
391 if (enable_load_reporting) {
392 default_cluster_.mutable_lrs_server()->mutable_self();
394 cds_response_data_ = {
395 {"application_target_name", default_cluster_},
399 void HandleCdsRequest(DiscoveryRequest* request, Stream* stream) {
400 gpr_log(GPR_INFO, "ADS[%p]: received CDS request '%s'", this,
401 request->DebugString().c_str());
402 const std::string version_str = "version_1";
403 const std::string nonce_str = "nonce_1";
404 grpc_core::MutexLock lock(&ads_mu_);
405 if (cds_response_state_ == NOT_SENT) {
406 DiscoveryResponse response;
407 response.set_type_url(kCdsTypeUrl);
408 response.set_version_info(version_str);
409 response.set_nonce(nonce_str);
410 for (const auto& cluster_name : request->resource_names()) {
411 auto iter = cds_response_data_.find(cluster_name);
412 if (iter == cds_response_data_.end()) continue;
413 response.add_resources()->PackFrom(iter->second);
415 stream->Write(response);
416 cds_response_state_ = SENT;
417 } else if (cds_response_state_ == SENT) {
418 GPR_ASSERT(!request->response_nonce().empty());
419 cds_response_state_ =
420 request->version_info() == version_str ? ACKED : NACKED;
424 void HandleEdsRequest(DiscoveryRequest* request, Stream* stream) {
425 gpr_log(GPR_INFO, "ADS[%p]: received EDS request '%s'", this,
426 request->DebugString().c_str());
427 IncreaseRequestCount();
428 std::vector<ResponseDelayPair> responses_and_delays;
430 grpc_core::MutexLock lock(&ads_mu_);
431 responses_and_delays = eds_responses_and_delays_;
434 for (const auto& p : responses_and_delays) {
435 const DiscoveryResponse& response = p.first;
436 const int delay_ms = p.second;
437 gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms);
439 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
441 gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this,
442 response.DebugString().c_str());
443 IncreaseResponseCount();
444 stream->Write(response);
448 Status StreamAggregatedResources(ServerContext* context,
449 Stream* stream) override {
450 gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
453 grpc_core::MutexLock lock(&ads_mu_);
454 if (ads_done_) return;
456 // Balancer shouldn't receive the call credentials metadata.
457 EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
458 context->client_metadata().end());
459 // Keep servicing requests until the EDS response has been sent back.
460 DiscoveryRequest request;
461 // TODO(roth): For each supported type, we currently only handle one
462 // request without replying to any new requests (for ACK/NACK or new
463 // resource names). It's not causing a big problem now but should be
465 bool eds_sent = false;
466 while (!eds_sent || cds_response_state_ == SENT) {
467 if (!stream->Read(&request)) return;
468 if (request.type_url() == kCdsTypeUrl) {
469 HandleCdsRequest(&request, stream);
470 } else if (request.type_url() == kEdsTypeUrl) {
471 HandleEdsRequest(&request, stream);
475 // Wait until notified done.
476 grpc_core::MutexLock lock(&ads_mu_);
477 ads_cond_.WaitUntil(&ads_mu_, [this] { return ads_done_; });
479 gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this);
483 Cluster GetDefaultCluster() const { return default_cluster_; }
486 std::map<std::string /*cluster_name*/, Cluster> cds_response_data) {
487 cds_response_data_ = std::move(cds_response_data);
490 ResponseState cds_response_state() {
491 grpc_core::MutexLock lock(&ads_mu_);
492 return cds_response_state_;
495 void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) {
496 grpc_core::MutexLock lock(&ads_mu_);
497 eds_responses_and_delays_.push_back(
498 std::make_pair(response, send_after_ms));
502 grpc_core::MutexLock lock(&ads_mu_);
504 eds_responses_and_delays_.clear();
509 grpc_core::MutexLock lock(&ads_mu_);
510 NotifyDoneWithAdsCallLocked();
511 eds_responses_and_delays_.clear();
513 gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
516 static DiscoveryResponse BuildResponse(const ResponseArgs& args) {
517 ClusterLoadAssignment assignment;
518 assignment.set_cluster_name("application_target_name");
519 for (const auto& locality : args.locality_list) {
520 auto* endpoints = assignment.add_endpoints();
521 endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
522 endpoints->set_priority(locality.priority);
523 endpoints->mutable_locality()->set_region(kDefaultLocalityRegion);
524 endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
525 endpoints->mutable_locality()->set_sub_zone(locality.sub_zone);
526 for (size_t i = 0; i < locality.ports.size(); ++i) {
527 const int& port = locality.ports[i];
528 auto* lb_endpoints = endpoints->add_lb_endpoints();
529 if (locality.health_statuses.size() > i &&
530 locality.health_statuses[i] !=
531 envoy::api::v2::HealthStatus::UNKNOWN) {
532 lb_endpoints->set_health_status(locality.health_statuses[i]);
534 auto* endpoint = lb_endpoints->mutable_endpoint();
535 auto* address = endpoint->mutable_address();
536 auto* socket_address = address->mutable_socket_address();
537 socket_address->set_address("127.0.0.1");
538 socket_address->set_port_value(port);
541 if (!args.drop_categories.empty()) {
542 auto* policy = assignment.mutable_policy();
543 for (const auto& p : args.drop_categories) {
544 const grpc::string& name = p.first;
545 const uint32_t parts_per_million = p.second;
546 auto* drop_overload = policy->add_drop_overloads();
547 drop_overload->set_category(name);
548 auto* drop_percentage = drop_overload->mutable_drop_percentage();
549 drop_percentage->set_numerator(parts_per_million);
550 drop_percentage->set_denominator(args.drop_denominator);
553 DiscoveryResponse response;
554 response.set_type_url(kEdsTypeUrl);
555 response.add_resources()->PackFrom(assignment);
559 void NotifyDoneWithAdsCall() {
560 grpc_core::MutexLock lock(&ads_mu_);
561 NotifyDoneWithAdsCallLocked();
564 void NotifyDoneWithAdsCallLocked() {
567 ads_cond_.Broadcast();
572 grpc_core::CondVar ads_cond_;
573 // Protect the members below.
574 grpc_core::Mutex ads_mu_;
575 bool ads_done_ = false;
576 // CDS response data.
577 Cluster default_cluster_;
578 std::map<std::string /*cluster_name*/, Cluster> cds_response_data_;
579 ResponseState cds_response_state_ = NOT_SENT;
580 // EDS response data.
581 std::vector<ResponseDelayPair> eds_responses_and_delays_;
584 class LrsServiceImpl : public LrsService {
586 using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>;
588 explicit LrsServiceImpl(int client_load_reporting_interval_seconds)
589 : client_load_reporting_interval_seconds_(
590 client_load_reporting_interval_seconds) {}
592 Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override {
593 gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
595 LoadStatsRequest request;
596 if (stream->Read(&request)) {
597 if (client_load_reporting_interval_seconds_ > 0) {
598 IncreaseRequestCount();
600 LoadStatsResponse response;
601 auto server_name = request.cluster_stats()[0].cluster_name();
602 GPR_ASSERT(server_name != "");
603 response.add_clusters(server_name);
604 response.mutable_load_reporting_interval()->set_seconds(
605 client_load_reporting_interval_seconds_);
606 stream->Write(response);
607 IncreaseResponseCount();
610 if (stream->Read(&request)) {
611 gpr_log(GPR_INFO, "LRS[%p]: received client load report message '%s'",
612 this, request.DebugString().c_str());
613 GPR_ASSERT(request.cluster_stats().size() == 1);
614 const ClusterStats& cluster_stats = request.cluster_stats()[0];
615 // We need to acquire the lock here in order to prevent the notify_one
616 // below from firing before its corresponding wait is executed.
617 grpc_core::MutexLock lock(&load_report_mu_);
618 GPR_ASSERT(client_stats_ == nullptr);
619 client_stats_.reset(new ClientStats(cluster_stats));
620 load_report_ready_ = true;
621 load_report_cond_.Signal();
624 // Wait until notified done.
625 grpc_core::MutexLock lock(&lrs_mu_);
626 lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done; });
628 gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
634 load_report_ready_ = false;
635 client_stats_.reset();
640 grpc_core::MutexLock lock(&lrs_mu_);
641 NotifyDoneWithLrsCallLocked();
643 gpr_log(GPR_INFO, "LRS[%p]: shut down", this);
646 ClientStats* WaitForLoadReport() {
647 grpc_core::MutexLock lock(&load_report_mu_);
648 load_report_cond_.WaitUntil(&load_report_mu_,
649 [this] { return load_report_ready_; });
650 load_report_ready_ = false;
651 return client_stats_.get();
654 void NotifyDoneWithLrsCall() {
655 grpc_core::MutexLock lock(&lrs_mu_);
656 NotifyDoneWithLrsCallLocked();
659 void NotifyDoneWithLrsCallLocked() {
667 const int client_load_reporting_interval_seconds_;
669 grpc_core::CondVar lrs_cv_;
671 grpc_core::Mutex lrs_mu_;
672 bool lrs_done = false;
674 grpc_core::CondVar load_report_cond_;
675 // Protect the members below.
676 grpc_core::Mutex load_report_mu_;
677 std::unique_ptr<ClientStats> client_stats_;
678 bool load_report_ready_ = false;
683 TestType(bool use_xds_resolver, bool enable_load_reporting)
684 : use_xds_resolver_(use_xds_resolver),
685 enable_load_reporting_(enable_load_reporting) {}
687 bool use_xds_resolver() const { return use_xds_resolver_; }
688 bool enable_load_reporting() const { return enable_load_reporting_; }
690 grpc::string AsString() const {
691 grpc::string retval = (use_xds_resolver_ ? "XdsResolver" : "FakeResolver");
692 if (enable_load_reporting_) retval += "WithLoadReporting";
697 const bool use_xds_resolver_;
698 const bool enable_load_reporting_;
701 class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
703 XdsEnd2endTest(size_t num_backends, size_t num_balancers,
704 int client_load_reporting_interval_seconds = 100)
705 : server_host_("localhost"),
706 num_backends_(num_backends),
707 num_balancers_(num_balancers),
708 client_load_reporting_interval_seconds_(
709 client_load_reporting_interval_seconds) {}
711 static void SetUpTestCase() {
712 // Make the backup poller poll very frequently in order to pick up
713 // updates from all the subchannels's FDs.
714 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
716 // Workaround Apple CFStream bug
717 gpr_setenv("grpc_cfstream", "0");
722 static void TearDownTestCase() { grpc_shutdown(); }
724 void SetUp() override {
725 gpr_setenv("GRPC_XDS_BOOTSTRAP", g_bootstrap_file);
726 g_port_saver->Reset();
727 response_generator_ =
728 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
729 lb_channel_response_generator_ =
730 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
731 // Start the backends.
732 for (size_t i = 0; i < num_backends_; ++i) {
733 backends_.emplace_back(new BackendServerThread);
734 backends_.back()->Start(server_host_);
736 // Start the load balancers.
737 for (size_t i = 0; i < num_balancers_; ++i) {
738 balancers_.emplace_back(
739 new BalancerServerThread(GetParam().enable_load_reporting()
740 ? client_load_reporting_interval_seconds_
742 balancers_.back()->Start(server_host_);
747 void TearDown() override {
748 ShutdownAllBackends();
749 for (auto& balancer : balancers_) balancer->Shutdown();
752 void StartAllBackends() {
753 for (auto& backend : backends_) backend->Start(server_host_);
756 void StartBackend(size_t index) { backends_[index]->Start(server_host_); }
758 void ShutdownAllBackends() {
759 for (auto& backend : backends_) backend->Shutdown();
762 void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
764 void ResetStub(int fallback_timeout = 0, int failover_timeout = 0,
765 const grpc::string& expected_targets = "") {
766 ChannelArguments args;
767 // TODO(juanlishen): Add setter to ChannelArguments.
768 if (fallback_timeout > 0) {
769 args.SetInt(GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, fallback_timeout);
771 if (failover_timeout > 0) {
772 args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout);
774 // If the parent channel is using the fake resolver, we inject the
775 // response generator for the parent here, and then SetNextResolution()
776 // will inject the xds channel's response generator via the parent's
777 // response generator.
779 // In contrast, if we are using the xds resolver, then the parent
780 // channel never uses a response generator, and we inject the xds
781 // channel's response generator here.
782 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
783 GetParam().use_xds_resolver()
784 ? lb_channel_response_generator_.get()
785 : response_generator_.get());
786 if (!expected_targets.empty()) {
787 args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
789 grpc::string scheme =
790 GetParam().use_xds_resolver() ? "xds-experimental" : "fake";
791 std::ostringstream uri;
792 uri << scheme << ":///" << kApplicationTargetName_;
793 // TODO(dgq): templatize tests to run everything using both secure and
794 // insecure channel credentials.
795 grpc_channel_credentials* channel_creds =
796 grpc_fake_transport_security_credentials_create();
797 grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
798 g_kCallCredsMdKey, g_kCallCredsMdValue, false);
799 std::shared_ptr<ChannelCredentials> creds(
800 new SecureChannelCredentials(grpc_composite_channel_credentials_create(
801 channel_creds, call_creds, nullptr)));
803 channel_creds->Unref();
804 channel_ = ::grpc::CreateCustomChannel(uri.str(), creds, args);
805 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
808 void ResetBackendCounters() {
809 for (auto& backend : backends_) backend->backend_service()->ResetCounters();
812 bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0) {
813 if (stop_index == 0) stop_index = backends_.size();
814 for (size_t i = start_index; i < stop_index; ++i) {
815 if (backends_[i]->backend_service()->request_count() == 0) return false;
820 void SendRpcAndCount(int* num_total, int* num_ok, int* num_failure,
822 const Status status = SendRpc();
826 if (status.error_message() == "Call dropped by load balancing policy") {
835 std::tuple<int, int, int> WaitForAllBackends(size_t start_index = 0,
836 size_t stop_index = 0) {
841 while (!SeenAllBackends(start_index, stop_index)) {
842 SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
844 ResetBackendCounters();
846 "Performed %d warm up requests against the backends. "
847 "%d succeeded, %d failed, %d dropped.",
848 num_total, num_ok, num_failure, num_drops);
849 return std::make_tuple(num_ok, num_failure, num_drops);
852 void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
853 gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========",
854 static_cast<unsigned long>(backend_idx));
857 } while (backends_[backend_idx]->backend_service()->request_count() == 0);
858 if (reset_counters) ResetBackendCounters();
859 gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
860 static_cast<unsigned long>(backend_idx));
863 grpc_core::ServerAddressList CreateAddressListFromPortList(
864 const std::vector<int>& ports) {
865 grpc_core::ServerAddressList addresses;
866 for (int port : ports) {
868 gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
869 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
870 GPR_ASSERT(lb_uri != nullptr);
871 grpc_resolved_address address;
872 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
873 addresses.emplace_back(address.addr, address.len, nullptr);
874 grpc_uri_destroy(lb_uri);
875 gpr_free(lb_uri_str);
880 void SetNextResolution(const std::vector<int>& ports,
881 grpc_core::FakeResolverResponseGenerator*
882 lb_channel_response_generator = nullptr) {
883 if (GetParam().use_xds_resolver()) return; // Not used with xds resolver.
884 grpc_core::ExecCtx exec_ctx;
885 grpc_core::Resolver::Result result;
886 result.addresses = CreateAddressListFromPortList(ports);
887 grpc_error* error = GRPC_ERROR_NONE;
888 const char* service_config_json =
889 GetParam().enable_load_reporting()
890 ? kDefaultServiceConfig_
891 : kDefaultServiceConfigWithoutLoadReporting_;
892 result.service_config =
893 grpc_core::ServiceConfig::Create(service_config_json, &error);
894 GRPC_ERROR_UNREF(error);
895 grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
896 lb_channel_response_generator == nullptr
897 ? lb_channel_response_generator_.get()
898 : lb_channel_response_generator);
899 result.args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
900 response_generator_->SetResponse(std::move(result));
903 void SetNextResolutionForLbChannelAllBalancers(
904 const char* service_config_json = nullptr,
905 grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator =
907 std::vector<int> ports;
908 for (size_t i = 0; i < balancers_.size(); ++i) {
909 ports.emplace_back(balancers_[i]->port());
911 SetNextResolutionForLbChannel(ports, service_config_json,
912 lb_channel_response_generator);
915 void SetNextResolutionForLbChannel(
916 const std::vector<int>& ports, const char* service_config_json = nullptr,
917 grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator =
919 grpc_core::ExecCtx exec_ctx;
920 grpc_core::Resolver::Result result;
921 result.addresses = CreateAddressListFromPortList(ports);
922 if (service_config_json != nullptr) {
923 grpc_error* error = GRPC_ERROR_NONE;
924 result.service_config =
925 grpc_core::ServiceConfig::Create(service_config_json, &error);
926 GRPC_ERROR_UNREF(error);
928 if (lb_channel_response_generator == nullptr) {
929 lb_channel_response_generator = lb_channel_response_generator_.get();
931 lb_channel_response_generator->SetResponse(std::move(result));
934 void SetNextReresolutionResponse(const std::vector<int>& ports) {
935 grpc_core::ExecCtx exec_ctx;
936 grpc_core::Resolver::Result result;
937 result.addresses = CreateAddressListFromPortList(ports);
938 response_generator_->SetReresolutionResponse(std::move(result));
941 const std::vector<int> GetBackendPorts(size_t start_index = 0,
942 size_t stop_index = 0) const {
943 if (stop_index == 0) stop_index = backends_.size();
944 std::vector<int> backend_ports;
945 for (size_t i = start_index; i < stop_index; ++i) {
946 backend_ports.push_back(backends_[i]->port());
948 return backend_ports;
951 void ScheduleResponseForBalancer(size_t i, const DiscoveryResponse& response,
953 balancers_[i]->ads_service()->AddEdsResponse(response, delay_ms);
956 Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
957 bool wait_for_ready = false) {
958 const bool local_response = (response == nullptr);
959 if (local_response) response = new EchoResponse;
961 request.set_message(kRequestMessage_);
962 ClientContext context;
963 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
964 if (wait_for_ready) context.set_wait_for_ready(true);
965 Status status = stub_->Echo(&context, request, response);
966 if (local_response) delete response;
970 void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000,
971 bool wait_for_ready = false) {
972 for (size_t i = 0; i < times; ++i) {
973 EchoResponse response;
974 const Status status = SendRpc(&response, timeout_ms, wait_for_ready);
975 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
976 << " message=" << status.error_message();
977 EXPECT_EQ(response.message(), kRequestMessage_);
981 void CheckRpcSendFailure() {
982 const Status status = SendRpc();
983 EXPECT_FALSE(status.ok());
988 ServerThread() : port_(g_port_saver->GetPort()) {}
989 virtual ~ServerThread(){};
991 void Start(const grpc::string& server_host) {
992 gpr_log(GPR_INFO, "starting %s server on port %d", Type(), port_);
993 GPR_ASSERT(!running_);
997 // We need to acquire the lock here in order to prevent the notify_one
998 // by ServerThread::Serve from firing before the wait below is hit.
999 grpc_core::MutexLock lock(&mu);
1000 grpc_core::CondVar cond;
1001 thread_.reset(new std::thread(
1002 std::bind(&ServerThread::Serve, this, server_host, &mu, &cond)));
1004 gpr_log(GPR_INFO, "%s server startup complete", Type());
1007 void Serve(const grpc::string& server_host, grpc_core::Mutex* mu,
1008 grpc_core::CondVar* cond) {
1009 // We need to acquire the lock here in order to prevent the notify_one
1010 // below from firing before its corresponding wait is executed.
1011 grpc_core::MutexLock lock(mu);
1012 std::ostringstream server_address;
1013 server_address << server_host << ":" << port_;
1014 ServerBuilder builder;
1015 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
1016 grpc_fake_transport_security_server_credentials_create()));
1017 builder.AddListeningPort(server_address.str(), creds);
1018 RegisterAllServices(&builder);
1019 server_ = builder.BuildAndStart();
1024 if (!running_) return;
1025 gpr_log(GPR_INFO, "%s about to shutdown", Type());
1026 ShutdownAllServices();
1027 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
1029 gpr_log(GPR_INFO, "%s shutdown completed", Type());
1033 int port() const { return port_; }
1036 virtual void RegisterAllServices(ServerBuilder* builder) = 0;
1037 virtual void StartAllServices() = 0;
1038 virtual void ShutdownAllServices() = 0;
1040 virtual const char* Type() = 0;
1043 std::unique_ptr<Server> server_;
1044 std::unique_ptr<std::thread> thread_;
1045 bool running_ = false;
1048 class BackendServerThread : public ServerThread {
1050 BackendServiceImpl* backend_service() { return &backend_service_; }
1053 void RegisterAllServices(ServerBuilder* builder) override {
1054 builder->RegisterService(&backend_service_);
1057 void StartAllServices() override { backend_service_.Start(); }
1059 void ShutdownAllServices() override { backend_service_.Shutdown(); }
1061 const char* Type() override { return "Backend"; }
1063 BackendServiceImpl backend_service_;
1066 class BalancerServerThread : public ServerThread {
1068 explicit BalancerServerThread(int client_load_reporting_interval = 0)
1069 : ads_service_(client_load_reporting_interval > 0),
1070 lrs_service_(client_load_reporting_interval) {}
1072 AdsServiceImpl* ads_service() { return &ads_service_; }
1073 LrsServiceImpl* lrs_service() { return &lrs_service_; }
1076 void RegisterAllServices(ServerBuilder* builder) override {
1077 builder->RegisterService(&ads_service_);
1078 builder->RegisterService(&lrs_service_);
1081 void StartAllServices() override {
1082 ads_service_.Start();
1083 lrs_service_.Start();
1086 void ShutdownAllServices() override {
1087 ads_service_.Shutdown();
1088 lrs_service_.Shutdown();
1091 const char* Type() override { return "Balancer"; }
1093 AdsServiceImpl ads_service_;
1094 LrsServiceImpl lrs_service_;
1097 const grpc::string server_host_;
1098 const size_t num_backends_;
1099 const size_t num_balancers_;
1100 const int client_load_reporting_interval_seconds_;
1101 std::shared_ptr<Channel> channel_;
1102 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
1103 std::vector<std::unique_ptr<BackendServerThread>> backends_;
1104 std::vector<std::unique_ptr<BalancerServerThread>> balancers_;
1105 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
1106 response_generator_;
1107 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
1108 lb_channel_response_generator_;
1109 const grpc::string kRequestMessage_ = "Live long and prosper.";
1110 const grpc::string kApplicationTargetName_ = "application_target_name";
1111 const char* kDefaultServiceConfig_ =
1113 " \"loadBalancingConfig\":[\n"
1114 " { \"does_not_exist\":{} },\n"
1115 " { \"xds_experimental\":{\n"
1116 " \"lrsLoadReportingServerName\": \"\"\n"
1120 const char* kDefaultServiceConfigWithoutLoadReporting_ =
1122 " \"loadBalancingConfig\":[\n"
1123 " { \"does_not_exist\":{} },\n"
1124 " { \"xds_experimental\":{\n"
1130 class BasicTest : public XdsEnd2endTest {
1132 BasicTest() : XdsEnd2endTest(4, 1) {}
1135 // Tests that the balancer sends the correct response to the client, and the
1136 // client sends RPCs to the backends using the default child policy.
1137 TEST_P(BasicTest, Vanilla) {
1138 SetNextResolution({});
1139 SetNextResolutionForLbChannelAllBalancers();
1140 const size_t kNumRpcsPerAddress = 100;
1141 AdsServiceImpl::ResponseArgs args({
1142 {"locality0", GetBackendPorts()},
1144 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1145 // Make sure that trying to connect works without a call.
1146 channel_->GetState(true /* try_to_connect */);
1147 // We need to wait for all backends to come online.
1148 WaitForAllBackends();
1149 // Send kNumRpcsPerAddress RPCs per server.
1150 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
1151 // Each backend should have gotten 100 requests.
1152 for (size_t i = 0; i < backends_.size(); ++i) {
1153 EXPECT_EQ(kNumRpcsPerAddress,
1154 backends_[i]->backend_service()->request_count());
1156 // The ADS service got a single request, and sent a single response.
1157 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1158 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1159 // Check LB policy name for the channel.
1161 (GetParam().use_xds_resolver() ? "cds_experimental" : "xds_experimental"),
1162 channel_->GetLoadBalancingPolicyName());
1165 TEST_P(BasicTest, IgnoresUnhealthyEndpoints) {
1166 SetNextResolution({});
1167 SetNextResolutionForLbChannelAllBalancers();
1168 const size_t kNumRpcsPerAddress = 100;
1169 AdsServiceImpl::ResponseArgs args({
1172 kDefaultLocalityWeight,
1173 kDefaultLocalityPriority,
1174 {envoy::api::v2::HealthStatus::DRAINING}},
1176 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1177 // Make sure that trying to connect works without a call.
1178 channel_->GetState(true /* try_to_connect */);
1179 // We need to wait for all backends to come online.
1180 WaitForAllBackends(/*start_index=*/1);
1181 // Send kNumRpcsPerAddress RPCs per server.
1182 CheckRpcSendOk(kNumRpcsPerAddress * (num_backends_ - 1));
1183 // Each backend should have gotten 100 requests.
1184 for (size_t i = 1; i < backends_.size(); ++i) {
1185 EXPECT_EQ(kNumRpcsPerAddress,
1186 backends_[i]->backend_service()->request_count());
1188 // The ADS service got a single request, and sent a single response.
1189 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1190 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1193 // Tests that subchannel sharing works when the same backend is listed multiple
1195 TEST_P(BasicTest, SameBackendListedMultipleTimes) {
1196 SetNextResolution({});
1197 SetNextResolutionForLbChannelAllBalancers();
1198 // Same backend listed twice.
1199 std::vector<int> ports(2, backends_[0]->port());
1200 AdsServiceImpl::ResponseArgs args({
1201 {"locality0", ports},
1203 const size_t kNumRpcsPerAddress = 10;
1204 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1205 // We need to wait for the backend to come online.
1207 // Send kNumRpcsPerAddress RPCs per server.
1208 CheckRpcSendOk(kNumRpcsPerAddress * ports.size());
1209 // Backend should have gotten 20 requests.
1210 EXPECT_EQ(kNumRpcsPerAddress * ports.size(),
1211 backends_[0]->backend_service()->request_count());
1212 // And they should have come from a single client port, because of
1213 // subchannel sharing.
1214 EXPECT_EQ(1UL, backends_[0]->backend_service()->clients().size());
1217 // Tests that RPCs will be blocked until a non-empty serverlist is received.
1218 TEST_P(BasicTest, InitiallyEmptyServerlist) {
1219 SetNextResolution({});
1220 SetNextResolutionForLbChannelAllBalancers();
1221 const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
1222 const int kCallDeadlineMs = kServerlistDelayMs * 2;
1223 // First response is an empty serverlist, sent right away.
1224 AdsServiceImpl::ResponseArgs::Locality empty_locality("locality0", {});
1225 AdsServiceImpl::ResponseArgs args({
1228 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1229 // Send non-empty serverlist only after kServerlistDelayMs.
1230 args = AdsServiceImpl::ResponseArgs({
1231 {"locality0", GetBackendPorts()},
1233 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args),
1234 kServerlistDelayMs);
1235 const auto t0 = system_clock::now();
1236 // Client will block: LB will initially send empty serverlist.
1237 CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */);
1238 const auto ellapsed_ms =
1239 std::chrono::duration_cast<std::chrono::milliseconds>(
1240 system_clock::now() - t0);
1241 // but eventually, the LB sends a serverlist update that allows the call to
1242 // proceed. The call delay must be larger than the delay in sending the
1243 // populated serverlist but under the call's deadline (which is enforced by
1244 // the call's deadline).
1245 EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs);
1246 // The ADS service got a single request.
1247 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1248 // and sent two responses.
1249 EXPECT_EQ(2U, balancers_[0]->ads_service()->response_count());
1252 // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
1253 // all the servers are unreachable.
1254 TEST_P(BasicTest, AllServersUnreachableFailFast) {
1255 SetNextResolution({});
1256 SetNextResolutionForLbChannelAllBalancers();
1257 const size_t kNumUnreachableServers = 5;
1258 std::vector<int> ports;
1259 for (size_t i = 0; i < kNumUnreachableServers; ++i) {
1260 ports.push_back(g_port_saver->GetPort());
1262 AdsServiceImpl::ResponseArgs args({
1263 {"locality0", ports},
1265 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1266 const Status status = SendRpc();
1267 // The error shouldn't be DEADLINE_EXCEEDED.
1268 EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1269 // The ADS service got a single request, and sent a single response.
1270 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1271 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1274 // Tests that RPCs fail when the backends are down, and will succeed again after
1275 // the backends are restarted.
1276 TEST_P(BasicTest, BackendsRestart) {
1277 SetNextResolution({});
1278 SetNextResolutionForLbChannelAllBalancers();
1279 AdsServiceImpl::ResponseArgs args({
1280 {"locality0", GetBackendPorts()},
1282 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1283 WaitForAllBackends();
1284 // Stop backends. RPCs should fail.
1285 ShutdownAllBackends();
1286 CheckRpcSendFailure();
1287 // Restart all backends. RPCs should start succeeding again.
1289 CheckRpcSendOk(1 /* times */, 2000 /* timeout_ms */,
1290 true /* wait_for_ready */);
1293 using SecureNamingTest = BasicTest;
1295 // Tests that secure naming check passes if target name is expected.
1296 TEST_P(SecureNamingTest, TargetNameIsExpected) {
1297 // TODO(juanlishen): Use separate fake creds for the balancer channel.
1298 ResetStub(0, 0, kApplicationTargetName_ + ";lb");
1299 SetNextResolution({});
1300 SetNextResolutionForLbChannel({balancers_[0]->port()});
1301 const size_t kNumRpcsPerAddress = 100;
1302 AdsServiceImpl::ResponseArgs args({
1303 {"locality0", GetBackendPorts()},
1305 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1306 // Make sure that trying to connect works without a call.
1307 channel_->GetState(true /* try_to_connect */);
1308 // We need to wait for all backends to come online.
1309 WaitForAllBackends();
1310 // Send kNumRpcsPerAddress RPCs per server.
1311 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
1312 // Each backend should have gotten 100 requests.
1313 for (size_t i = 0; i < backends_.size(); ++i) {
1314 EXPECT_EQ(kNumRpcsPerAddress,
1315 backends_[i]->backend_service()->request_count());
1317 // The ADS service got a single request, and sent a single response.
1318 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1319 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1322 // Tests that secure naming check fails if target name is unexpected.
1323 TEST_P(SecureNamingTest, TargetNameIsUnexpected) {
1324 gpr_setenv("GRPC_XDS_BOOTSTRAP", g_bootstrap_file_bad);
1325 ::testing::FLAGS_gtest_death_test_style = "threadsafe";
1326 // Make sure that we blow up (via abort() from the security connector) when
1327 // the name from the balancer doesn't match expectations.
1328 ASSERT_DEATH_IF_SUPPORTED(
1330 ResetStub(0, 0, kApplicationTargetName_ + ";lb");
1331 SetNextResolution({});
1332 SetNextResolutionForLbChannel({balancers_[0]->port()});
1333 channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
1338 using CdsTest = BasicTest;
1340 // Tests that CDS client should send an ACK upon correct CDS response.
1341 TEST_P(CdsTest, Vanilla) {
1342 SetNextResolution({});
1343 SetNextResolutionForLbChannelAllBalancers();
1345 EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
1346 AdsServiceImpl::ACKED);
1349 // Tests that CDS client should send a NACK if the cluster type in CDS response
1350 // is other than EDS.
1351 TEST_P(CdsTest, WrongClusterType) {
1352 auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
1353 cluster.set_type(envoy::api::v2::Cluster::STATIC);
1354 balancers_[0]->ads_service()->SetCdsResponse(
1355 {{"application_target_name", std::move(cluster)}});
1356 SetNextResolution({});
1357 SetNextResolutionForLbChannelAllBalancers();
1359 EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
1360 AdsServiceImpl::NACKED);
1363 // Tests that CDS client should send a NACK if the eds_config in CDS response is
1365 TEST_P(CdsTest, WrongEdsConfig) {
1366 auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
1367 cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
1368 balancers_[0]->ads_service()->SetCdsResponse(
1369 {{"application_target_name", std::move(cluster)}});
1370 SetNextResolution({});
1371 SetNextResolutionForLbChannelAllBalancers();
1373 EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
1374 AdsServiceImpl::NACKED);
1377 // Tests that CDS client should send a NACK if the lb_policy in CDS response is
1378 // other than ROUND_ROBIN.
1379 TEST_P(CdsTest, WrongLbPolicy) {
1380 auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
1381 cluster.set_lb_policy(envoy::api::v2::Cluster::LEAST_REQUEST);
1382 balancers_[0]->ads_service()->SetCdsResponse(
1383 {{"application_target_name", std::move(cluster)}});
1384 SetNextResolution({});
1385 SetNextResolutionForLbChannelAllBalancers();
1387 EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
1388 AdsServiceImpl::NACKED);
1391 // Tests that CDS client should send a NACK if the lrs_server in CDS response is
1393 TEST_P(CdsTest, WrongLrsServer) {
1394 auto cluster = balancers_[0]->ads_service()->GetDefaultCluster();
1395 cluster.mutable_lrs_server()->mutable_ads();
1396 balancers_[0]->ads_service()->SetCdsResponse(
1397 {{"application_target_name", std::move(cluster)}});
1398 SetNextResolution({});
1399 SetNextResolutionForLbChannelAllBalancers();
1401 EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
1402 AdsServiceImpl::NACKED);
1405 using LocalityMapTest = BasicTest;
1407 // Tests that the localities in a locality map are picked according to their
1409 TEST_P(LocalityMapTest, WeightedRoundRobin) {
1410 SetNextResolution({});
1411 SetNextResolutionForLbChannelAllBalancers();
1412 const size_t kNumRpcs = 5000;
1413 const int kLocalityWeight0 = 2;
1414 const int kLocalityWeight1 = 8;
1415 const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
1416 const double kLocalityWeightRate0 =
1417 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
1418 const double kLocalityWeightRate1 =
1419 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
1420 // ADS response contains 2 localities, each of which contains 1 backend.
1421 AdsServiceImpl::ResponseArgs args({
1422 {"locality0", GetBackendPorts(0, 1), kLocalityWeight0},
1423 {"locality1", GetBackendPorts(1, 2), kLocalityWeight1},
1425 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1426 // Wait for both backends to be ready.
1427 WaitForAllBackends(0, 2);
1428 // Send kNumRpcs RPCs.
1429 CheckRpcSendOk(kNumRpcs);
1430 // The locality picking rates should be roughly equal to the expectation.
1431 const double locality_picked_rate_0 =
1432 static_cast<double>(backends_[0]->backend_service()->request_count()) /
1434 const double locality_picked_rate_1 =
1435 static_cast<double>(backends_[1]->backend_service()->request_count()) /
1437 const double kErrorTolerance = 0.2;
1438 EXPECT_THAT(locality_picked_rate_0,
1440 ::testing::Ge(kLocalityWeightRate0 * (1 - kErrorTolerance)),
1441 ::testing::Le(kLocalityWeightRate0 * (1 + kErrorTolerance))));
1442 EXPECT_THAT(locality_picked_rate_1,
1444 ::testing::Ge(kLocalityWeightRate1 * (1 - kErrorTolerance)),
1445 ::testing::Le(kLocalityWeightRate1 * (1 + kErrorTolerance))));
1446 // The ADS service got a single request, and sent a single response.
1447 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1448 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1451 // Tests that the locality map can work properly even when it contains a large
1452 // number of localities.
1453 TEST_P(LocalityMapTest, StressTest) {
1454 SetNextResolution({});
1455 SetNextResolutionForLbChannelAllBalancers();
1456 const size_t kNumLocalities = 100;
1457 // The first ADS response contains kNumLocalities localities, each of which
1458 // contains backend 0.
1459 AdsServiceImpl::ResponseArgs args;
1460 for (size_t i = 0; i < kNumLocalities; ++i) {
1461 grpc::string name = "locality" + std::to_string(i);
1462 AdsServiceImpl::ResponseArgs::Locality locality(name,
1463 {backends_[0]->port()});
1464 args.locality_list.emplace_back(std::move(locality));
1466 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1467 // The second ADS response contains 1 locality, which contains backend 1.
1468 args = AdsServiceImpl::ResponseArgs({
1469 {"locality0", GetBackendPorts(1, 2)},
1471 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args),
1473 // Wait until backend 0 is ready, before which kNumLocalities localities are
1474 // received and handled by the xds policy.
1475 WaitForBackend(0, /*reset_counters=*/false);
1476 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1477 // Wait until backend 1 is ready, before which kNumLocalities localities are
1478 // removed by the xds policy.
1480 // The ADS service got a single request.
1481 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1482 // and sent two responses.
1483 EXPECT_EQ(2U, balancers_[0]->ads_service()->response_count());
1486 // Tests that the localities in a locality map are picked correctly after update
1487 // (addition, modification, deletion).
1488 TEST_P(LocalityMapTest, UpdateMap) {
1489 SetNextResolution({});
1490 SetNextResolutionForLbChannelAllBalancers();
1491 const size_t kNumRpcs = 1000;
1492 // The locality weight for the first 3 localities.
1493 const std::vector<int> kLocalityWeights0 = {2, 3, 4};
1494 const double kTotalLocalityWeight0 =
1495 std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
1496 std::vector<double> locality_weight_rate_0;
1497 for (int weight : kLocalityWeights0) {
1498 locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
1500 // Delete the first locality, keep the second locality, change the third
1501 // locality's weight from 4 to 2, and add a new locality with weight 6.
1502 const std::vector<int> kLocalityWeights1 = {3, 2, 6};
1503 const double kTotalLocalityWeight1 =
1504 std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
1505 std::vector<double> locality_weight_rate_1 = {
1506 0 /* placeholder for locality 0 */};
1507 for (int weight : kLocalityWeights1) {
1508 locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
1510 AdsServiceImpl::ResponseArgs args({
1511 {"locality0", GetBackendPorts(0, 1), 2},
1512 {"locality1", GetBackendPorts(1, 2), 3},
1513 {"locality2", GetBackendPorts(2, 3), 4},
1515 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1516 args = AdsServiceImpl::ResponseArgs({
1517 {"locality1", GetBackendPorts(1, 2), 3},
1518 {"locality2", GetBackendPorts(2, 3), 2},
1519 {"locality3", GetBackendPorts(3, 4), 6},
1521 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 5000);
1522 // Wait for the first 3 backends to be ready.
1523 WaitForAllBackends(0, 3);
1524 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1525 // Send kNumRpcs RPCs.
1526 CheckRpcSendOk(kNumRpcs);
1527 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1528 // The picking rates of the first 3 backends should be roughly equal to the
1530 std::vector<double> locality_picked_rates;
1531 for (size_t i = 0; i < 3; ++i) {
1532 locality_picked_rates.push_back(
1533 static_cast<double>(backends_[i]->backend_service()->request_count()) /
1536 const double kErrorTolerance = 0.2;
1537 for (size_t i = 0; i < 3; ++i) {
1539 locality_picked_rates[i],
1541 ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
1542 ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
1544 // Backend 3 hasn't received any request.
1545 EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
1546 // The ADS service got a single request, and sent a single response.
1547 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1548 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1549 // Wait until the locality update has been processed, as signaled by backend 3
1550 // receiving a request.
1552 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1553 // Send kNumRpcs RPCs.
1554 CheckRpcSendOk(kNumRpcs);
1555 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1556 // Backend 0 no longer receives any request.
1557 EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
1558 // The picking rates of the last 3 backends should be roughly equal to the
1560 locality_picked_rates = {0 /* placeholder for backend 0 */};
1561 for (size_t i = 1; i < 4; ++i) {
1562 locality_picked_rates.push_back(
1563 static_cast<double>(backends_[i]->backend_service()->request_count()) /
1566 for (size_t i = 1; i < 4; ++i) {
1568 locality_picked_rates[i],
1570 ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
1571 ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
1573 // The ADS service got a single request.
1574 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1575 // and sent two responses.
1576 EXPECT_EQ(2U, balancers_[0]->ads_service()->response_count());
1579 class FailoverTest : public BasicTest {
1581 FailoverTest() { ResetStub(0, 100, ""); }
1584 // Localities with the highest priority are used when multiple priority exist.
1585 TEST_P(FailoverTest, ChooseHighestPriority) {
1586 SetNextResolution({});
1587 SetNextResolutionForLbChannelAllBalancers();
1588 AdsServiceImpl::ResponseArgs args({
1589 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
1590 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
1591 {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
1592 {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
1594 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1595 WaitForBackend(3, false);
1596 for (size_t i = 0; i < 3; ++i) {
1597 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1599 // The ADS service got a single request, and sent a single response.
1600 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1601 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1604 // If the higher priority localities are not reachable, failover to the highest
1605 // priority among the rest.
1606 TEST_P(FailoverTest, Failover) {
1607 SetNextResolution({});
1608 SetNextResolutionForLbChannelAllBalancers();
1609 AdsServiceImpl::ResponseArgs args({
1610 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
1611 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
1612 {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
1613 {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
1617 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1618 WaitForBackend(1, false);
1619 for (size_t i = 0; i < 4; ++i) {
1620 if (i == 1) continue;
1621 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1623 // The ADS service got a single request, and sent a single response.
1624 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1625 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1628 // If a locality with higher priority than the current one becomes ready,
1630 TEST_P(FailoverTest, SwitchBackToHigherPriority) {
1631 SetNextResolution({});
1632 SetNextResolutionForLbChannelAllBalancers();
1633 const size_t kNumRpcs = 100;
1634 AdsServiceImpl::ResponseArgs args({
1635 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
1636 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
1637 {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
1638 {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
1642 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1643 WaitForBackend(1, false);
1644 for (size_t i = 0; i < 4; ++i) {
1645 if (i == 1) continue;
1646 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1650 CheckRpcSendOk(kNumRpcs);
1651 EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
1652 // The ADS service got a single request, and sent a single response.
1653 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1654 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1657 // The first update only contains unavailable priorities. The second update
1658 // contains available priorities.
1659 TEST_P(FailoverTest, UpdateInitialUnavailable) {
1660 SetNextResolution({});
1661 SetNextResolutionForLbChannelAllBalancers();
1662 AdsServiceImpl::ResponseArgs args({
1663 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0},
1664 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 1},
1666 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1667 args = AdsServiceImpl::ResponseArgs({
1668 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0},
1669 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 1},
1670 {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 2},
1671 {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 3},
1675 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 1000);
1676 gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1677 gpr_time_from_millis(500, GPR_TIMESPAN));
1678 // Send 0.5 second worth of RPCs.
1680 CheckRpcSendFailure();
1681 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
1682 WaitForBackend(2, false);
1683 for (size_t i = 0; i < 4; ++i) {
1684 if (i == 2) continue;
1685 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1687 // The ADS service got a single request, and sent a single response.
1688 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1689 EXPECT_EQ(2U, balancers_[0]->ads_service()->response_count());
1692 // Tests that after the localities' priorities are updated, we still choose the
1693 // highest READY priority with the updated localities.
1694 TEST_P(FailoverTest, UpdatePriority) {
1695 SetNextResolution({});
1696 SetNextResolutionForLbChannelAllBalancers();
1697 const size_t kNumRpcs = 100;
1698 AdsServiceImpl::ResponseArgs args({
1699 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
1700 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
1701 {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
1702 {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
1704 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1705 args = AdsServiceImpl::ResponseArgs({
1706 {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 2},
1707 {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 0},
1708 {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 1},
1709 {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 3},
1711 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 1000);
1712 WaitForBackend(3, false);
1713 for (size_t i = 0; i < 3; ++i) {
1714 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1717 CheckRpcSendOk(kNumRpcs);
1718 EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
1719 // The ADS service got a single request, and sent a single response.
1720 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1721 EXPECT_EQ(2U, balancers_[0]->ads_service()->response_count());
1724 using DropTest = BasicTest;
1726 // Tests that RPCs are dropped according to the drop config.
1727 TEST_P(DropTest, Vanilla) {
1728 SetNextResolution({});
1729 SetNextResolutionForLbChannelAllBalancers();
1730 const size_t kNumRpcs = 5000;
1731 const uint32_t kDropPerMillionForLb = 100000;
1732 const uint32_t kDropPerMillionForThrottle = 200000;
1733 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1734 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1735 const double KDropRateForLbAndThrottle =
1736 kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1737 // The ADS response contains two drop categories.
1738 AdsServiceImpl::ResponseArgs args({
1739 {"locality0", GetBackendPorts()},
1741 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1742 {kThrottleDropType, kDropPerMillionForThrottle}};
1743 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1744 WaitForAllBackends();
1745 // Send kNumRpcs RPCs and count the drops.
1746 size_t num_drops = 0;
1747 for (size_t i = 0; i < kNumRpcs; ++i) {
1748 EchoResponse response;
1749 const Status status = SendRpc(&response);
1751 status.error_message() == "Call dropped by load balancing policy") {
1754 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1755 << " message=" << status.error_message();
1756 EXPECT_EQ(response.message(), kRequestMessage_);
1759 // The drop rate should be roughly equal to the expectation.
1760 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1761 const double kErrorTolerance = 0.2;
1765 ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
1766 ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
1767 // The ADS service got a single request, and sent a single response.
1768 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1769 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1772 // Tests that drop config is converted correctly from per hundred.
1773 TEST_P(DropTest, DropPerHundred) {
1774 SetNextResolution({});
1775 SetNextResolutionForLbChannelAllBalancers();
1776 const size_t kNumRpcs = 5000;
1777 const uint32_t kDropPerHundredForLb = 10;
1778 const double kDropRateForLb = kDropPerHundredForLb / 100.0;
1779 // The ADS response contains one drop category.
1780 AdsServiceImpl::ResponseArgs args({
1781 {"locality0", GetBackendPorts()},
1783 args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
1784 args.drop_denominator = FractionalPercent::HUNDRED;
1785 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1786 WaitForAllBackends();
1787 // Send kNumRpcs RPCs and count the drops.
1788 size_t num_drops = 0;
1789 for (size_t i = 0; i < kNumRpcs; ++i) {
1790 EchoResponse response;
1791 const Status status = SendRpc(&response);
1793 status.error_message() == "Call dropped by load balancing policy") {
1796 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1797 << " message=" << status.error_message();
1798 EXPECT_EQ(response.message(), kRequestMessage_);
1801 // The drop rate should be roughly equal to the expectation.
1802 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1803 const double kErrorTolerance = 0.2;
1806 ::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)),
1807 ::testing::Le(kDropRateForLb * (1 + kErrorTolerance))));
1808 // The ADS service got a single request, and sent a single response.
1809 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1810 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1813 // Tests that drop config is converted correctly from per ten thousand.
1814 TEST_P(DropTest, DropPerTenThousand) {
1815 SetNextResolution({});
1816 SetNextResolutionForLbChannelAllBalancers();
1817 const size_t kNumRpcs = 5000;
1818 const uint32_t kDropPerTenThousandForLb = 1000;
1819 const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
1820 // The ADS response contains one drop category.
1821 AdsServiceImpl::ResponseArgs args({
1822 {"locality0", GetBackendPorts()},
1824 args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
1825 args.drop_denominator = FractionalPercent::TEN_THOUSAND;
1826 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1827 WaitForAllBackends();
1828 // Send kNumRpcs RPCs and count the drops.
1829 size_t num_drops = 0;
1830 for (size_t i = 0; i < kNumRpcs; ++i) {
1831 EchoResponse response;
1832 const Status status = SendRpc(&response);
1834 status.error_message() == "Call dropped by load balancing policy") {
1837 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1838 << " message=" << status.error_message();
1839 EXPECT_EQ(response.message(), kRequestMessage_);
1842 // The drop rate should be roughly equal to the expectation.
1843 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1844 const double kErrorTolerance = 0.2;
1847 ::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)),
1848 ::testing::Le(kDropRateForLb * (1 + kErrorTolerance))));
1849 // The ADS service got a single request, and sent a single response.
1850 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1851 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1854 // Tests that drop is working correctly after update.
1855 TEST_P(DropTest, Update) {
1856 SetNextResolution({});
1857 SetNextResolutionForLbChannelAllBalancers();
1858 const size_t kNumRpcs = 1000;
1859 const uint32_t kDropPerMillionForLb = 100000;
1860 const uint32_t kDropPerMillionForThrottle = 200000;
1861 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1862 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1863 const double KDropRateForLbAndThrottle =
1864 kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1865 // The first ADS response contains one drop category.
1866 AdsServiceImpl::ResponseArgs args({
1867 {"locality0", GetBackendPorts()},
1869 args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
1870 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1871 // The second ADS response contains two drop categories.
1872 // TODO(juanlishen): Change the ADS response sending to deterministic style
1873 // (e.g., by using condition variable) so that we can shorten the test
1875 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1876 {kThrottleDropType, kDropPerMillionForThrottle}};
1877 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 10000);
1878 WaitForAllBackends();
1879 // Send kNumRpcs RPCs and count the drops.
1880 size_t num_drops = 0;
1881 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1882 for (size_t i = 0; i < kNumRpcs; ++i) {
1883 EchoResponse response;
1884 const Status status = SendRpc(&response);
1886 status.error_message() == "Call dropped by load balancing policy") {
1889 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1890 << " message=" << status.error_message();
1891 EXPECT_EQ(response.message(), kRequestMessage_);
1894 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1895 // The drop rate should be roughly equal to the expectation.
1896 double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1897 const double kErrorTolerance = 0.3;
1900 ::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)),
1901 ::testing::Le(kDropRateForLb * (1 + kErrorTolerance))));
1902 // Wait until the drop rate increases to the middle of the two configs, which
1903 // implies that the update has been in effect.
1904 const double kDropRateThreshold =
1905 (kDropRateForLb + KDropRateForLbAndThrottle) / 2;
1906 size_t num_rpcs = kNumRpcs;
1907 while (seen_drop_rate < kDropRateThreshold) {
1908 EchoResponse response;
1909 const Status status = SendRpc(&response);
1912 status.error_message() == "Call dropped by load balancing policy") {
1915 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1916 << " message=" << status.error_message();
1917 EXPECT_EQ(response.message(), kRequestMessage_);
1919 seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
1921 // Send kNumRpcs RPCs and count the drops.
1923 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1924 for (size_t i = 0; i < kNumRpcs; ++i) {
1925 EchoResponse response;
1926 const Status status = SendRpc(&response);
1928 status.error_message() == "Call dropped by load balancing policy") {
1931 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1932 << " message=" << status.error_message();
1933 EXPECT_EQ(response.message(), kRequestMessage_);
1936 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1937 // The new drop rate should be roughly equal to the expectation.
1938 seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1942 ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
1943 ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
1944 // The ADS service got a single request,
1945 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1946 // and sent two responses
1947 EXPECT_EQ(2U, balancers_[0]->ads_service()->response_count());
1950 // Tests that all the RPCs are dropped if any drop category drops 100%.
1951 TEST_P(DropTest, DropAll) {
1952 SetNextResolution({});
1953 SetNextResolutionForLbChannelAllBalancers();
1954 const size_t kNumRpcs = 1000;
1955 const uint32_t kDropPerMillionForLb = 100000;
1956 const uint32_t kDropPerMillionForThrottle = 1000000;
1957 // The ADS response contains two drop categories.
1958 AdsServiceImpl::ResponseArgs args({
1959 {"locality0", GetBackendPorts()},
1961 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1962 {kThrottleDropType, kDropPerMillionForThrottle}};
1963 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
1964 // Send kNumRpcs RPCs and all of them are dropped.
1965 for (size_t i = 0; i < kNumRpcs; ++i) {
1966 EchoResponse response;
1967 const Status status = SendRpc(&response);
1968 EXPECT_TRUE(!status.ok() && status.error_message() ==
1969 "Call dropped by load balancing policy");
1971 // The ADS service got a single request, and sent a single response.
1972 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
1973 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
1976 using FallbackTest = BasicTest;
1978 // Tests that RPCs are handled by the fallback backends before the serverlist is
1979 // received, but will be handled by the serverlist after it's received.
1980 TEST_P(FallbackTest, Vanilla) {
1981 const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
1982 const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
1983 const size_t kNumBackendsInResolution = backends_.size() / 2;
1984 ResetStub(kFallbackTimeoutMs);
1985 SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution));
1986 SetNextResolutionForLbChannelAllBalancers();
1987 // Send non-empty serverlist only after kServerlistDelayMs.
1988 AdsServiceImpl::ResponseArgs args({
1989 {"locality0", GetBackendPorts(kNumBackendsInResolution)},
1991 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args),
1992 kServerlistDelayMs);
1993 // Wait until all the fallback backends are reachable.
1994 WaitForAllBackends(0 /* start_index */,
1995 kNumBackendsInResolution /* stop_index */);
1996 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1997 CheckRpcSendOk(kNumBackendsInResolution);
1998 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1999 // Fallback is used: each backend returned by the resolver should have
2000 // gotten one request.
2001 for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
2002 EXPECT_EQ(1U, backends_[i]->backend_service()->request_count());
2004 for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
2005 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
2007 // Wait until the serverlist reception has been processed and all backends
2008 // in the serverlist are reachable.
2009 WaitForAllBackends(kNumBackendsInResolution /* start_index */);
2010 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
2011 CheckRpcSendOk(backends_.size() - kNumBackendsInResolution);
2012 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
2013 // Serverlist is used: each backend returned by the balancer should
2014 // have gotten one request.
2015 for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
2016 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
2018 for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
2019 EXPECT_EQ(1U, backends_[i]->backend_service()->request_count());
2021 // The ADS service got a single request, and sent a single response.
2022 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2023 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2026 // Tests that RPCs are handled by the updated fallback backends before
2027 // serverlist is received,
2028 TEST_P(FallbackTest, Update) {
2029 const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
2030 const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
2031 const size_t kNumBackendsInResolution = backends_.size() / 3;
2032 const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3;
2033 ResetStub(kFallbackTimeoutMs);
2034 SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution));
2035 SetNextResolutionForLbChannelAllBalancers();
2036 // Send non-empty serverlist only after kServerlistDelayMs.
2037 AdsServiceImpl::ResponseArgs args({
2038 {"locality0", GetBackendPorts(kNumBackendsInResolution +
2039 kNumBackendsInResolutionUpdate)},
2041 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args),
2042 kServerlistDelayMs);
2043 // Wait until all the fallback backends are reachable.
2044 WaitForAllBackends(0 /* start_index */,
2045 kNumBackendsInResolution /* stop_index */);
2046 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
2047 CheckRpcSendOk(kNumBackendsInResolution);
2048 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
2049 // Fallback is used: each backend returned by the resolver should have
2050 // gotten one request.
2051 for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
2052 EXPECT_EQ(1U, backends_[i]->backend_service()->request_count());
2054 for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
2055 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
2057 SetNextResolution(GetBackendPorts(
2058 kNumBackendsInResolution,
2059 kNumBackendsInResolution + kNumBackendsInResolutionUpdate));
2060 // Wait until the resolution update has been processed and all the new
2061 // fallback backends are reachable.
2062 WaitForAllBackends(kNumBackendsInResolution /* start_index */,
2063 kNumBackendsInResolution +
2064 kNumBackendsInResolutionUpdate /* stop_index */);
2065 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
2066 CheckRpcSendOk(kNumBackendsInResolutionUpdate);
2067 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
2068 // The resolution update is used: each backend in the resolution update should
2069 // have gotten one request.
2070 for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
2071 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
2073 for (size_t i = kNumBackendsInResolution;
2074 i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
2075 EXPECT_EQ(1U, backends_[i]->backend_service()->request_count());
2077 for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
2078 i < backends_.size(); ++i) {
2079 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
2081 // Wait until the serverlist reception has been processed and all backends
2082 // in the serverlist are reachable.
2083 WaitForAllBackends(kNumBackendsInResolution +
2084 kNumBackendsInResolutionUpdate /* start_index */);
2085 gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
2086 CheckRpcSendOk(backends_.size() - kNumBackendsInResolution -
2087 kNumBackendsInResolutionUpdate);
2088 gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
2089 // Serverlist is used: each backend returned by the balancer should
2090 // have gotten one request.
2092 i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
2093 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
2095 for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
2096 i < backends_.size(); ++i) {
2097 EXPECT_EQ(1U, backends_[i]->backend_service()->request_count());
2099 // The ADS service got a single request, and sent a single response.
2100 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2101 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2104 // Tests that fallback will kick in immediately if the balancer channel fails.
2105 TEST_P(FallbackTest, FallbackEarlyWhenBalancerChannelFails) {
2106 const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
2107 ResetStub(kFallbackTimeoutMs);
2108 // Return an unreachable balancer and one fallback backend.
2109 SetNextResolution({backends_[0]->port()});
2110 SetNextResolutionForLbChannel({g_port_saver->GetPort()});
2111 // Send RPC with deadline less than the fallback timeout and make sure it
2113 CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
2114 /* wait_for_ready */ false);
2117 // Tests that fallback will kick in immediately if the balancer call fails.
2118 TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) {
2119 const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
2120 ResetStub(kFallbackTimeoutMs);
2121 // Return one balancer and one fallback backend.
2122 SetNextResolution({backends_[0]->port()});
2123 SetNextResolutionForLbChannelAllBalancers();
2124 // Balancer drops call without sending a serverlist.
2125 balancers_[0]->ads_service()->NotifyDoneWithAdsCall();
2126 // Send RPC with deadline less than the fallback timeout and make sure it
2128 CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
2129 /* wait_for_ready */ false);
2132 // Tests that fallback mode is entered if balancer response is received but the
2133 // backends can't be reached.
2134 TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) {
2135 const int kFallbackTimeoutMs = 500 * grpc_test_slowdown_factor();
2136 ResetStub(kFallbackTimeoutMs);
2137 SetNextResolution({backends_[0]->port()});
2138 SetNextResolutionForLbChannelAllBalancers();
2139 // Send a serverlist that only contains an unreachable backend before fallback
2141 AdsServiceImpl::ResponseArgs args({
2142 {"locality0", {g_port_saver->GetPort()}},
2144 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2145 // Because no child policy is ready before fallback timeout, we enter fallback
2150 // Tests that fallback mode is exited if the balancer tells the client to drop
2152 TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
2153 // Return an unreachable balancer and one fallback backend.
2154 SetNextResolution({backends_[0]->port()});
2155 SetNextResolutionForLbChannel({g_port_saver->GetPort()});
2156 // Enter fallback mode because the LB channel fails to connect.
2158 // Return a new balancer that sends a response to drop all calls.
2159 AdsServiceImpl::ResponseArgs args({
2160 {"locality0", GetBackendPorts()},
2162 args.drop_categories = {{kLbDropType, 1000000}};
2163 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2164 SetNextResolutionForLbChannelAllBalancers();
2165 // Send RPCs until failure.
2166 gpr_timespec deadline = gpr_time_add(
2167 gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(5000, GPR_TIMESPAN));
2169 auto status = SendRpc();
2170 if (!status.ok()) break;
2171 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
2172 CheckRpcSendFailure();
2175 // Tests that fallback mode is exited if the child policy becomes ready.
2176 TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) {
2177 // Return an unreachable balancer and one fallback backend.
2178 SetNextResolution({backends_[0]->port()});
2179 SetNextResolutionForLbChannel({g_port_saver->GetPort()});
2180 // Enter fallback mode because the LB channel fails to connect.
2182 // Return a new balancer that sends a dead backend.
2184 AdsServiceImpl::ResponseArgs args({
2185 {"locality0", {backends_[1]->port()}},
2187 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2188 SetNextResolutionForLbChannelAllBalancers();
2189 // The state (TRANSIENT_FAILURE) update from the child policy will be ignored
2190 // because we are still in fallback mode.
2191 gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2192 gpr_time_from_millis(500, GPR_TIMESPAN));
2193 // Send 0.5 second worth of RPCs.
2196 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
2197 // After the backend is restarted, the child policy will eventually be READY,
2198 // and we will exit fallback mode.
2201 // We have exited fallback mode, so calls will go to the child policy
2203 CheckRpcSendOk(100);
2204 EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
2205 EXPECT_EQ(100U, backends_[1]->backend_service()->request_count());
2208 class BalancerUpdateTest : public XdsEnd2endTest {
2210 BalancerUpdateTest() : XdsEnd2endTest(4, 3) {}
2213 // Tests that the old LB call is still used after the balancer address update as
2214 // long as that call is still alive.
2215 TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
2216 SetNextResolution({});
2217 SetNextResolutionForLbChannelAllBalancers();
2218 AdsServiceImpl::ResponseArgs args({
2219 {"locality0", {backends_[0]->port()}},
2221 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2222 args = AdsServiceImpl::ResponseArgs({
2223 {"locality0", {backends_[1]->port()}},
2225 ScheduleResponseForBalancer(1, AdsServiceImpl::BuildResponse(args), 0);
2226 // Wait until the first backend is ready.
2228 // Send 10 requests.
2229 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
2231 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
2232 // All 10 requests should have gone to the first backend.
2233 EXPECT_EQ(10U, backends_[0]->backend_service()->request_count());
2234 // The ADS service of balancer 0 got a single request, and sent a single
2236 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2237 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2238 EXPECT_EQ(0U, balancers_[1]->ads_service()->request_count());
2239 EXPECT_EQ(0U, balancers_[1]->ads_service()->response_count());
2240 EXPECT_EQ(0U, balancers_[2]->ads_service()->request_count());
2241 EXPECT_EQ(0U, balancers_[2]->ads_service()->response_count());
2242 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
2243 SetNextResolutionForLbChannel({balancers_[1]->port()});
2244 gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
2245 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2246 gpr_timespec deadline = gpr_time_add(
2247 gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
2248 // Send 10 seconds worth of RPCs
2251 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
2252 // The current LB call is still working, so xds continued using it to the
2253 // first balancer, which doesn't assign the second backend.
2254 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2255 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2256 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2257 EXPECT_EQ(0U, balancers_[1]->ads_service()->request_count());
2258 EXPECT_EQ(0U, balancers_[1]->ads_service()->response_count());
2259 EXPECT_EQ(0U, balancers_[2]->ads_service()->request_count());
2260 EXPECT_EQ(0U, balancers_[2]->ads_service()->response_count());
2263 // Tests that the old LB call is still used after multiple balancer address
2264 // updates as long as that call is still alive. Send an update with the same set
2265 // of LBs as the one in SetUp() in order to verify that the LB channel inside
2266 // xds keeps the initial connection (which by definition is also present in the
2268 TEST_P(BalancerUpdateTest, Repeated) {
2269 SetNextResolution({});
2270 SetNextResolutionForLbChannelAllBalancers();
2271 AdsServiceImpl::ResponseArgs args({
2272 {"locality0", {backends_[0]->port()}},
2274 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2275 args = AdsServiceImpl::ResponseArgs({
2276 {"locality0", {backends_[1]->port()}},
2278 ScheduleResponseForBalancer(1, AdsServiceImpl::BuildResponse(args), 0);
2279 // Wait until the first backend is ready.
2281 // Send 10 requests.
2282 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
2284 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
2285 // All 10 requests should have gone to the first backend.
2286 EXPECT_EQ(10U, backends_[0]->backend_service()->request_count());
2287 // The ADS service of balancer 0 got a single request, and sent a single
2289 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2290 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2291 EXPECT_EQ(0U, balancers_[1]->ads_service()->request_count());
2292 EXPECT_EQ(0U, balancers_[1]->ads_service()->response_count());
2293 EXPECT_EQ(0U, balancers_[2]->ads_service()->request_count());
2294 EXPECT_EQ(0U, balancers_[2]->ads_service()->response_count());
2295 std::vector<int> ports;
2296 ports.emplace_back(balancers_[0]->port());
2297 ports.emplace_back(balancers_[1]->port());
2298 ports.emplace_back(balancers_[2]->port());
2299 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
2300 SetNextResolutionForLbChannel(ports);
2301 gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
2302 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2303 gpr_timespec deadline = gpr_time_add(
2304 gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
2305 // Send 10 seconds worth of RPCs
2308 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
2309 // xds continued using the original LB call to the first balancer, which
2310 // doesn't assign the second backend.
2311 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2313 ports.emplace_back(balancers_[0]->port());
2314 ports.emplace_back(balancers_[1]->port());
2315 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 ==========");
2316 SetNextResolutionForLbChannel(ports);
2317 gpr_log(GPR_INFO, "========= UPDATE 2 DONE ==========");
2318 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2319 deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2320 gpr_time_from_millis(10000, GPR_TIMESPAN));
2321 // Send 10 seconds worth of RPCs
2324 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
2325 // xds continued using the original LB call to the first balancer, which
2326 // doesn't assign the second backend.
2327 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2330 // Tests that if the balancer is down, the RPCs will still be sent to the
2331 // backends according to the last balancer response, until a new balancer is
2333 TEST_P(BalancerUpdateTest, DeadUpdate) {
2334 SetNextResolution({});
2335 SetNextResolutionForLbChannel({balancers_[0]->port()});
2336 AdsServiceImpl::ResponseArgs args({
2337 {"locality0", {backends_[0]->port()}},
2339 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2340 args = AdsServiceImpl::ResponseArgs({
2341 {"locality0", {backends_[1]->port()}},
2343 ScheduleResponseForBalancer(1, AdsServiceImpl::BuildResponse(args), 0);
2344 // Start servers and send 10 RPCs per server.
2345 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
2347 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
2348 // All 10 requests should have gone to the first backend.
2349 EXPECT_EQ(10U, backends_[0]->backend_service()->request_count());
2351 gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
2352 balancers_[0]->Shutdown();
2353 gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
2354 // This is serviced by the existing child policy.
2355 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
2357 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
2358 // All 10 requests should again have gone to the first backend.
2359 EXPECT_EQ(20U, backends_[0]->backend_service()->request_count());
2360 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2361 // The ADS service of balancer 0 got a single request, and sent a single
2363 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2364 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2365 EXPECT_EQ(0U, balancers_[1]->ads_service()->request_count());
2366 EXPECT_EQ(0U, balancers_[1]->ads_service()->response_count());
2367 EXPECT_EQ(0U, balancers_[2]->ads_service()->request_count());
2368 EXPECT_EQ(0U, balancers_[2]->ads_service()->response_count());
2369 gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
2370 SetNextResolutionForLbChannel({balancers_[1]->port()});
2371 gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
2372 // Wait until update has been processed, as signaled by the second backend
2373 // receiving a request. In the meantime, the client continues to be serviced
2374 // (by the first backend) without interruption.
2375 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
2377 // This is serviced by the updated RR policy
2378 backends_[1]->backend_service()->ResetCounters();
2379 gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
2381 gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
2382 // All 10 requests should have gone to the second backend.
2383 EXPECT_EQ(10U, backends_[1]->backend_service()->request_count());
2384 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2385 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2386 // The second balancer, published as part of the first update, may end up
2387 // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
2388 // firing races with the arrival of the update containing the second
2390 EXPECT_GE(balancers_[1]->ads_service()->request_count(), 1U);
2391 EXPECT_GE(balancers_[1]->ads_service()->response_count(), 1U);
2392 EXPECT_LE(balancers_[1]->ads_service()->request_count(), 2U);
2393 EXPECT_LE(balancers_[1]->ads_service()->response_count(), 2U);
2394 EXPECT_EQ(0U, balancers_[2]->ads_service()->request_count());
2395 EXPECT_EQ(0U, balancers_[2]->ads_service()->response_count());
2398 // The re-resolution tests are deferred because they rely on the fallback mode,
2399 // which hasn't been supported.
2401 // TODO(juanlishen): Add TEST_P(BalancerUpdateTest, ReresolveDeadBackend).
2403 // TODO(juanlishen): Add TEST_P(UpdatesWithClientLoadReportingTest,
2404 // ReresolveDeadBalancer)
2406 class ClientLoadReportingTest : public XdsEnd2endTest {
2408 ClientLoadReportingTest() : XdsEnd2endTest(4, 1, 3) {}
2411 // Tests that the load report received at the balancer is correct.
2412 TEST_P(ClientLoadReportingTest, Vanilla) {
2413 SetNextResolution({});
2414 SetNextResolutionForLbChannel({balancers_[0]->port()});
2415 const size_t kNumRpcsPerAddress = 100;
2416 // TODO(juanlishen): Partition the backends after multiple localities is
2418 AdsServiceImpl::ResponseArgs args({
2419 {"locality0", GetBackendPorts()},
2421 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2422 // Wait until all backends are ready.
2424 int num_failure = 0;
2426 std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
2427 // Send kNumRpcsPerAddress RPCs per server.
2428 CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
2429 // Each backend should have gotten 100 requests.
2430 for (size_t i = 0; i < backends_.size(); ++i) {
2431 EXPECT_EQ(kNumRpcsPerAddress,
2432 backends_[i]->backend_service()->request_count());
2434 // The ADS service got a single request, and sent a single response.
2435 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2436 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2437 // The LRS service got a single request, and sent a single response.
2438 EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count());
2439 EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count());
2440 // The load report received at the balancer should be correct.
2441 ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
2442 EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
2443 client_stats->total_successful_requests());
2444 EXPECT_EQ(0U, client_stats->total_requests_in_progress());
2445 EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
2446 client_stats->total_issued_requests());
2447 EXPECT_EQ(0U, client_stats->total_error_requests());
2448 EXPECT_EQ(0U, client_stats->total_dropped_requests());
2451 // Tests that if the balancer restarts, the client load report contains the
2452 // stats before and after the restart correctly.
2453 TEST_P(ClientLoadReportingTest, BalancerRestart) {
2454 SetNextResolution({});
2455 SetNextResolutionForLbChannel({balancers_[0]->port()});
2456 const size_t kNumBackendsFirstPass = backends_.size() / 2;
2457 const size_t kNumBackendsSecondPass =
2458 backends_.size() - kNumBackendsFirstPass;
2459 AdsServiceImpl::ResponseArgs args({
2460 {"locality0", GetBackendPorts(0, kNumBackendsFirstPass)},
2462 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2463 // Wait until all backends returned by the balancer are ready.
2465 int num_failure = 0;
2467 std::tie(num_ok, num_failure, num_drops) =
2468 WaitForAllBackends(/* start_index */ 0,
2469 /* stop_index */ kNumBackendsFirstPass);
2470 ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
2471 EXPECT_EQ(static_cast<size_t>(num_ok),
2472 client_stats->total_successful_requests());
2473 EXPECT_EQ(0U, client_stats->total_requests_in_progress());
2474 EXPECT_EQ(0U, client_stats->total_error_requests());
2475 EXPECT_EQ(0U, client_stats->total_dropped_requests());
2476 // Shut down the balancer.
2477 balancers_[0]->Shutdown();
2478 // We should continue using the last EDS response we received from the
2479 // balancer before it was shut down.
2480 // Note: We need to use WaitForAllBackends() here instead of just
2481 // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
2482 // shuts down, the XdsClient will generate an error to the
2483 // ServiceConfigWatcher, which will cause the xds resolver to send a
2484 // no-op update to the LB policy. When this update gets down to the
2485 // round_robin child policy for the locality, it will generate a new
2486 // subchannel list, which resets the start index randomly. So we need
2487 // to be a little more permissive here to avoid spurious failures.
2488 ResetBackendCounters();
2489 int num_started = std::get<0>(WaitForAllBackends(
2490 /* start_index */ 0, /* stop_index */ kNumBackendsFirstPass));
2491 // Now restart the balancer, this time pointing to the new backends.
2492 balancers_[0]->Start(server_host_);
2493 args = AdsServiceImpl::ResponseArgs({
2494 {"locality0", GetBackendPorts(kNumBackendsFirstPass)},
2496 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2497 // Wait for queries to start going to one of the new backends.
2498 // This tells us that we're now using the new serverlist.
2499 std::tie(num_ok, num_failure, num_drops) =
2500 WaitForAllBackends(/* start_index */ kNumBackendsFirstPass);
2501 num_started += num_ok + num_failure + num_drops;
2502 // Send one RPC per backend.
2503 CheckRpcSendOk(kNumBackendsSecondPass);
2504 num_started += kNumBackendsSecondPass;
2505 // Check client stats.
2506 client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
2507 EXPECT_EQ(num_started, client_stats->total_successful_requests());
2508 EXPECT_EQ(0U, client_stats->total_requests_in_progress());
2509 EXPECT_EQ(0U, client_stats->total_error_requests());
2510 EXPECT_EQ(0U, client_stats->total_dropped_requests());
2513 class ClientLoadReportingWithDropTest : public XdsEnd2endTest {
2515 ClientLoadReportingWithDropTest() : XdsEnd2endTest(4, 1, 20) {}
2518 // Tests that the drop stats are correctly reported by client load reporting.
2519 TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
2520 SetNextResolution({});
2521 SetNextResolutionForLbChannelAllBalancers();
2522 const size_t kNumRpcs = 3000;
2523 const uint32_t kDropPerMillionForLb = 100000;
2524 const uint32_t kDropPerMillionForThrottle = 200000;
2525 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
2526 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
2527 const double KDropRateForLbAndThrottle =
2528 kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
2529 // The ADS response contains two drop categories.
2530 AdsServiceImpl::ResponseArgs args({
2531 {"locality0", GetBackendPorts()},
2533 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
2534 {kThrottleDropType, kDropPerMillionForThrottle}};
2535 ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
2537 int num_failure = 0;
2539 std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
2540 const size_t num_warmup = num_ok + num_failure + num_drops;
2541 // Send kNumRpcs RPCs and count the drops.
2542 for (size_t i = 0; i < kNumRpcs; ++i) {
2543 EchoResponse response;
2544 const Status status = SendRpc(&response);
2546 status.error_message() == "Call dropped by load balancing policy") {
2549 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
2550 << " message=" << status.error_message();
2551 EXPECT_EQ(response.message(), kRequestMessage_);
2554 // The drop rate should be roughly equal to the expectation.
2555 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
2556 const double kErrorTolerance = 0.2;
2560 ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
2561 ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
2562 // Check client stats.
2563 ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
2564 EXPECT_EQ(num_drops, client_stats->total_dropped_requests());
2565 const size_t total_rpc = num_warmup + kNumRpcs;
2567 client_stats->dropped_requests(kLbDropType),
2569 ::testing::Ge(total_rpc * kDropRateForLb * (1 - kErrorTolerance)),
2570 ::testing::Le(total_rpc * kDropRateForLb * (1 + kErrorTolerance))));
2571 EXPECT_THAT(client_stats->dropped_requests(kThrottleDropType),
2573 ::testing::Ge(total_rpc * (1 - kDropRateForLb) *
2574 kDropRateForThrottle * (1 - kErrorTolerance)),
2575 ::testing::Le(total_rpc * (1 - kDropRateForLb) *
2576 kDropRateForThrottle * (1 + kErrorTolerance))));
2577 // The ADS service got a single request, and sent a single response.
2578 EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count());
2579 EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
2582 grpc::string TestTypeName(const ::testing::TestParamInfo<TestType>& info) {
2583 return info.param.AsString();
2586 INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest,
2587 ::testing::Values(TestType(false, true),
2588 TestType(false, false),
2589 TestType(true, false),
2590 TestType(true, true)),
2593 INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest,
2594 ::testing::Values(TestType(false, true),
2595 TestType(false, false),
2596 TestType(true, false),
2597 TestType(true, true)),
2600 // CDS depends on XdsResolver.
2601 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest,
2602 ::testing::Values(TestType(true, false),
2603 TestType(true, true)),
2606 INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
2607 ::testing::Values(TestType(false, true),
2608 TestType(false, false),
2609 TestType(true, false),
2610 TestType(true, true)),
2613 INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest,
2614 ::testing::Values(TestType(false, true),
2615 TestType(false, false),
2616 TestType(true, false),
2617 TestType(true, true)),
2620 INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest,
2621 ::testing::Values(TestType(false, true),
2622 TestType(false, false),
2623 TestType(true, false),
2624 TestType(true, true)),
2627 // Fallback does not work with xds resolver.
2628 INSTANTIATE_TEST_SUITE_P(XdsTest, FallbackTest,
2629 ::testing::Values(TestType(false, true),
2630 TestType(false, false)),
2633 INSTANTIATE_TEST_SUITE_P(XdsTest, BalancerUpdateTest,
2634 ::testing::Values(TestType(false, true),
2635 TestType(false, false),
2636 TestType(true, true)),
2639 // Load reporting tests are not run with load reporting disabled.
2640 INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingTest,
2641 ::testing::Values(TestType(false, true),
2642 TestType(true, true)),
2645 // Load reporting tests are not run with load reporting disabled.
2646 INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingWithDropTest,
2647 ::testing::Values(TestType(false, true),
2648 TestType(true, true)),
2652 } // namespace testing
2655 int main(int argc, char** argv) {
2656 grpc::testing::TestEnvironment env(argc, argv);
2657 ::testing::InitGoogleTest(&argc, argv);
2658 grpc::testing::WriteBootstrapFiles();
2659 grpc::testing::g_port_saver = new grpc::testing::PortSaver();
2660 const auto result = RUN_ALL_TESTS();