X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=test%2Fcpp%2Fend2end%2Fclient_lb_end2end_test.cc;h=7776d8f8f96ad8f19a7cb3ef06415e0c3c69bf4a;hb=754713aa17cd9d27a649dd3a8a1657ddb01bca1a;hp=f6d4a48b6f047a789c7ac8f84dbfb019a590df14;hpb=331872e1a34acd893ecb6c02d9fadb108c5beee2;p=platform%2Fupstream%2Fgrpc.git diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index f6d4a48..7776d8f 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -21,13 +21,16 @@ #include #include #include +#include #include +#include "absl/memory/memory.h" +#include "absl/strings/str_cat.h" + #include #include #include #include -#include #include #include #include @@ -39,20 +42,24 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/global_subchannel_pool.h" -#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/env.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/cpp/client/secure_credentials.h" #include "src/cpp/server/secure_server_credentials.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "src/proto/grpc/testing/xds/orca_load_report_for_test.pb.h" #include "test/core/util/port.h" +#include "test/core/util/resolve_localhost_ip46.h" #include "test/core/util/test_config.h" #include "test/core/util/test_lb_policies.h" #include "test/cpp/end2end/test_service_impl.h" @@ -62,7 +69,6 @@ using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; -using std::chrono::system_clock; // defined in tcp_client.cc extern grpc_tcp_client_vtable* grpc_tcp_client_impl; @@ -94,15 +100,21 @@ grpc_tcp_client_vtable delayed_connect = {tcp_client_connect_with_delay}; // every call to the Echo RPC. class MyTestServiceImpl : public TestServiceImpl { public: - MyTestServiceImpl() : request_count_(0) {} - Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { + const udpa::data::orca::v1::OrcaLoadReport* load_report = nullptr; { grpc::internal::MutexLock lock(&mu_); ++request_count_; + load_report = load_report_; } AddClient(context->peer()); + if (load_report != nullptr) { + // TODO(roth): Once we provide a more standard server-side API for + // populating this data, use that API here. + context->AddTrailingMetadata("x-endpoint-load-metrics-bin", + load_report->SerializeAsString()); + } return TestServiceImpl::Echo(context, request, response); } @@ -116,21 +128,104 @@ class MyTestServiceImpl : public TestServiceImpl { request_count_ = 0; } - std::set clients() { + std::set clients() { grpc::internal::MutexLock lock(&clients_mu_); return clients_; } + void set_load_report(udpa::data::orca::v1::OrcaLoadReport* load_report) { + grpc::internal::MutexLock lock(&mu_); + load_report_ = load_report; + } + private: - void AddClient(const grpc::string& client) { + void AddClient(const std::string& client) { grpc::internal::MutexLock lock(&clients_mu_); clients_.insert(client); } grpc::internal::Mutex mu_; - int request_count_; + int request_count_ = 0; + const udpa::data::orca::v1::OrcaLoadReport* load_report_ = nullptr; grpc::internal::Mutex clients_mu_; - std::set clients_; + std::set clients_; +}; + +class FakeResolverResponseGeneratorWrapper { + public: + explicit FakeResolverResponseGeneratorWrapper(bool ipv6_only) + : ipv6_only_(ipv6_only), + response_generator_(grpc_core::MakeRefCounted< + grpc_core::FakeResolverResponseGenerator>()) {} + + FakeResolverResponseGeneratorWrapper( + FakeResolverResponseGeneratorWrapper&& other) noexcept { + ipv6_only_ = other.ipv6_only_; + response_generator_ = std::move(other.response_generator_); + } + + void SetNextResolution( + const std::vector& ports, const char* service_config_json = nullptr, + const char* attribute_key = nullptr, + std::unique_ptr attribute = + nullptr) { + grpc_core::ExecCtx exec_ctx; + response_generator_->SetResponse( + BuildFakeResults(ipv6_only_, ports, service_config_json, attribute_key, + std::move(attribute))); + } + + void SetNextResolutionUponError(const std::vector& ports) { + grpc_core::ExecCtx exec_ctx; + response_generator_->SetReresolutionResponse( + BuildFakeResults(ipv6_only_, ports)); + } + + void SetFailureOnReresolution() { + grpc_core::ExecCtx exec_ctx; + response_generator_->SetFailureOnReresolution(); + } + + grpc_core::FakeResolverResponseGenerator* Get() const { + return response_generator_.get(); + } + + private: + static grpc_core::Resolver::Result BuildFakeResults( + bool ipv6_only, const std::vector& ports, + const char* service_config_json = nullptr, + const char* attribute_key = nullptr, + std::unique_ptr attribute = + nullptr) { + grpc_core::Resolver::Result result; + for (const int& port : ports) { + std::string lb_uri_str = + absl::StrCat(ipv6_only ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port); + grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str.c_str(), true); + GPR_ASSERT(lb_uri != nullptr); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); + std::map> + attributes; + if (attribute != nullptr) { + attributes[attribute_key] = attribute->Copy(); + } + result.addresses.emplace_back(address.addr, address.len, + nullptr /* args */, std::move(attributes)); + grpc_uri_destroy(lb_uri); + } + if (service_config_json != nullptr) { + result.service_config = grpc_core::ServiceConfig::Create( + nullptr, service_config_json, &result.service_config_error); + GPR_ASSERT(result.service_config != nullptr); + } + return result; + } + + bool ipv6_only_ = false; + grpc_core::RefCountedPtr + response_generator_; }; class ClientLbEnd2endTest : public ::testing::Test { @@ -141,23 +236,32 @@ class ClientLbEnd2endTest : public ::testing::Test { creds_(new SecureChannelCredentials( grpc_fake_transport_security_credentials_create())) {} + static void SetUpTestCase() { + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); +#if TARGET_OS_IPHONE + // Workaround Apple CFStream bug + gpr_setenv("grpc_cfstream", "0"); +#endif + } + void SetUp() override { grpc_init(); - response_generator_ = - grpc_core::MakeRefCounted(); + bool localhost_resolves_to_ipv4 = false; + bool localhost_resolves_to_ipv6 = false; + grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4, + &localhost_resolves_to_ipv6); + ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6; } void TearDown() override { for (size_t i = 0; i < servers_.size(); ++i) { servers_[i]->Shutdown(); } - // Explicitly destroy all the members so that we can make sure grpc_shutdown - // has finished by the end of this function, and thus all the registered - // LB policy factories are removed. - stub_.reset(); servers_.clear(); creds_.reset(); - grpc_shutdown_blocking(); + grpc_shutdown(); } void CreateServers(size_t num_servers, @@ -180,38 +284,6 @@ class ClientLbEnd2endTest : public ::testing::Test { } } - grpc_core::Resolver::Result BuildFakeResults(const std::vector& ports) { - grpc_core::Resolver::Result result; - for (const int& port : ports) { - char* lb_uri_str; - gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port); - grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); - GPR_ASSERT(lb_uri != nullptr); - grpc_resolved_address address; - GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); - result.addresses.emplace_back(address.addr, address.len, - nullptr /* args */); - grpc_uri_destroy(lb_uri); - gpr_free(lb_uri_str); - } - return result; - } - - void SetNextResolution(const std::vector& ports) { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponse(BuildFakeResults(ports)); - } - - void SetNextResolutionUponError(const std::vector& ports) { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetReresolutionResponse(BuildFakeResults(ports)); - } - - void SetFailureOnReresolution() { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetFailureOnReresolution(); - } - std::vector GetServersPorts(size_t start_index = 0) { std::vector ports; for (size_t i = start_index; i < servers_.size(); ++i) { @@ -220,19 +292,24 @@ class ClientLbEnd2endTest : public ::testing::Test { return ports; } + FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() { + return FakeResolverResponseGeneratorWrapper(ipv6_only_); + } + std::unique_ptr BuildStub( const std::shared_ptr& channel) { return grpc::testing::EchoTestService::NewStub(channel); } std::shared_ptr BuildChannel( - const grpc::string& lb_policy_name, + const std::string& lb_policy_name, + const FakeResolverResponseGeneratorWrapper& response_generator, ChannelArguments args = ChannelArguments()) { - if (lb_policy_name.size() > 0) { + if (!lb_policy_name.empty()) { args.SetLoadBalancingPolicyName(lb_policy_name); } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, - response_generator_.get()); + response_generator.Get()); return ::grpc::CreateCustomChannel("fake:///", creds_, args); } @@ -244,9 +321,13 @@ class ClientLbEnd2endTest : public ::testing::Test { if (local_response) response = new EchoResponse; EchoRequest request; request.set_message(kRequestMessage_); + request.mutable_param()->set_echo_metadata(true); ClientContext context; context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); if (wait_for_ready) context.set_wait_for_ready(true); + context.AddMetadata("foo", "1"); + context.AddMetadata("bar", "2"); + context.AddMetadata("baz", "3"); Status status = stub->Echo(&context, request, response); if (result != nullptr) *result = status; if (local_response) delete response; @@ -287,20 +368,20 @@ class ClientLbEnd2endTest : public ::testing::Test { port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); } - void Start(const grpc::string& server_host) { + void Start(const std::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); started_ = true; grpc::internal::Mutex mu; grpc::internal::MutexLock lock(&mu); grpc::internal::CondVar cond; - thread_.reset(new std::thread( - std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); + thread_ = absl::make_unique( + std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); cond.WaitUntil(&mu, [this] { return server_ready_; }); server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } - void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, + void Serve(const std::string& server_host, grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) { std::ostringstream server_address; server_address << server_host << ":" << port_; @@ -322,7 +403,7 @@ class ClientLbEnd2endTest : public ::testing::Test { started_ = false; } - void SetServingStatus(const grpc::string& service, bool serving) { + void SetServingStatus(const std::string& service, bool serving) { server_->GetHealthCheckService()->SetServingStatus(service, serving); } }; @@ -345,26 +426,32 @@ class ClientLbEnd2endTest : public ::testing::Test { ResetCounters(); } - bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + bool WaitForChannelState( + Channel* channel, + const std::function& predicate, + bool try_to_connect = false, int timeout_seconds = 5) { const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_seconds); - grpc_connectivity_state state; - while ((state = channel->GetState(false /* try_to_connect */)) == - GRPC_CHANNEL_READY) { + while (true) { + grpc_connectivity_state state = channel->GetState(try_to_connect); + if (predicate(state)) break; if (!channel->WaitForStateChange(state, deadline)) return false; } return true; } + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + auto predicate = [](grpc_connectivity_state state) { + return state != GRPC_CHANNEL_READY; + }; + return WaitForChannelState(channel, predicate, false, timeout_seconds); + } + bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) { - const gpr_timespec deadline = - grpc_timeout_seconds_to_deadline(timeout_seconds); - grpc_connectivity_state state; - while ((state = channel->GetState(true /* try_to_connect */)) != - GRPC_CHANNEL_READY) { - if (!channel->WaitForStateChange(state, deadline)) return false; - } - return true; + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_READY; + }; + return WaitForChannelState(channel, predicate, true, timeout_seconds); } bool SeenAllServers() { @@ -392,19 +479,18 @@ class ClientLbEnd2endTest : public ::testing::Test { } } - const grpc::string server_host_; - std::unique_ptr stub_; + const std::string server_host_; std::vector> servers_; - grpc_core::RefCountedPtr - response_generator_; - const grpc::string kRequestMessage_; + const std::string kRequestMessage_; std::shared_ptr creds_; + bool ipv6_only_ = false; }; TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) { const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel(""); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("", response_generator); auto stub = BuildStub(channel); // Initial state should be IDLE. EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE); @@ -417,7 +503,7 @@ TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) { EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_CONNECTING); // Return a resolver result, which allows the connection attempt to proceed. - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); // We should eventually transition into state READY. EXPECT_TRUE(WaitForChannelReady(channel.get())); } @@ -426,9 +512,11 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel(""); // test that pick first is the default. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel( + "", response_generator); // test that pick first is the default. auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < servers_.size(); ++i) { CheckRpcSendOk(stub, DEBUG_LOCATION); } @@ -448,19 +536,22 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { } TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { - StartServers(1); // Single server - auto channel = BuildChannel(""); // test that pick first is the default. + StartServers(1); // Single server + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel( + "", response_generator); // test that pick first is the default. auto stub = BuildStub(channel); - SetNextResolution({servers_[0]->port_}); + response_generator.SetNextResolution({servers_[0]->port_}); WaitForServer(stub, 0, DEBUG_LOCATION); // Create a new channel and its corresponding PF LB policy, which will pick // the subchannels in READY state from the previous RPC against the same // target (even if it happened over a different channel, because subchannels // are globally reused). Progress should happen without any transition from // this READY state. - auto second_channel = BuildChannel(""); + auto second_response_generator = BuildResolverResponseGenerator(); + auto second_channel = BuildChannel("", second_response_generator); auto second_stub = BuildStub(second_channel); - SetNextResolution({servers_[0]->port_}); + second_response_generator.SetNextResolution({servers_[0]->port_}); CheckRpcSendOk(second_stub, DEBUG_LOCATION); } @@ -473,16 +564,18 @@ TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { grpc_pick_unused_port_or_die()}; CreateServers(2, ports); StartServer(1); - auto channel1 = BuildChannel("pick_first", args); + auto response_generator1 = BuildResolverResponseGenerator(); + auto channel1 = BuildChannel("pick_first", response_generator1, args); auto stub1 = BuildStub(channel1); - SetNextResolution(ports); + response_generator1.SetNextResolution(ports); // Wait for second server to be ready. WaitForServer(stub1, 1, DEBUG_LOCATION); // Create a second channel with the same addresses. Its PF instance // should immediately pick the second subchannel, since it's already // in READY state. - auto channel2 = BuildChannel("pick_first", args); - SetNextResolution(ports); + auto response_generator2 = BuildResolverResponseGenerator(); + auto channel2 = BuildChannel("pick_first", response_generator2, args); + response_generator2.SetNextResolution(ports); // Check that the channel reports READY without waiting for the // initial backoff. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */)); @@ -494,9 +587,10 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); const std::vector ports = {grpc_pick_unused_port_or_die()}; const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); - auto channel = BuildChannel("pick_first", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator, args); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); // The channel won't become connected (there's no server). ASSERT_FALSE(channel->WaitForConnected( grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2))); @@ -523,9 +617,10 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { constexpr int kMinReconnectBackOffMs = 1000; args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs); const std::vector ports = {grpc_pick_unused_port_or_die()}; - auto channel = BuildChannel("pick_first", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator, args); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); // Make connection delay a 10% longer than it's willing to in order to make // sure we are hitting the codepath that waits for the min reconnect backoff. gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10); @@ -548,9 +643,10 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { constexpr int kInitialBackOffMs = 1000; args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); const std::vector ports = {grpc_pick_unused_port_or_die()}; - auto channel = BuildChannel("pick_first", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator, args); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); // The channel won't become connected (there's no server). EXPECT_FALSE( channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); @@ -563,9 +659,11 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); // Reset connection backoff. experimental::ChannelResetConnectionBackoff(channel.get()); - // Wait for connect. Should happen ~immediately. + // Wait for connect. Should happen as soon as the client connects to + // the newly started server, which should be before the initial + // backoff timeout elapses. EXPECT_TRUE( - channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(20))); const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms); @@ -579,9 +677,10 @@ TEST_F(ClientLbEnd2endTest, constexpr int kInitialBackOffMs = 1000; args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); const std::vector ports = {grpc_pick_unused_port_or_die()}; - auto channel = BuildChannel("pick_first", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator, args); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); // Wait for connect, which should fail ~immediately, because the server // is not up. gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT"); @@ -622,21 +721,22 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); auto stub = BuildStub(channel); std::vector ports; // Perform one RPC against the first server. ports.emplace_back(servers_[0]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(servers_[0]->service_.request_count(), 1); // An empty update will result in the channel going into TRANSIENT_FAILURE. ports.clear(); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET none *******"); grpc_connectivity_state channel_state; do { @@ -648,7 +748,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Next update introduces servers_[1], making the channel recover. ports.clear(); ports.emplace_back(servers_[1]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [1] *******"); WaitForServer(stub, 1, DEBUG_LOCATION); EXPECT_EQ(servers_[0]->service_.request_count(), 0); @@ -656,7 +756,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // And again for servers_[2] ports.clear(); ports.emplace_back(servers_[2]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [2] *******"); WaitForServer(stub, 2, DEBUG_LOCATION); EXPECT_EQ(servers_[0]->service_.request_count(), 0); @@ -670,14 +770,15 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); auto stub = BuildStub(channel); std::vector ports; // Perform one RPC against the first server. ports.emplace_back(servers_[0]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(servers_[0]->service_.request_count(), 1); @@ -687,7 +788,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { ports.clear(); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[0]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET superset *******"); CheckRpcSendOk(stub, DEBUG_LOCATION); // We stick to the previously connected server. @@ -704,12 +805,14 @@ TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) { StartServers(kNumServers); std::vector ports = GetServersPorts(); // Create two channels that (by default) use the global subchannel pool. - auto channel1 = BuildChannel("pick_first"); + auto response_generator1 = BuildResolverResponseGenerator(); + auto channel1 = BuildChannel("pick_first", response_generator1); auto stub1 = BuildStub(channel1); - SetNextResolution(ports); - auto channel2 = BuildChannel("pick_first"); + response_generator1.SetNextResolution(ports); + auto response_generator2 = BuildResolverResponseGenerator(); + auto channel2 = BuildChannel("pick_first", response_generator2); auto stub2 = BuildStub(channel2); - SetNextResolution(ports); + response_generator2.SetNextResolution(ports); WaitForServer(stub1, 0, DEBUG_LOCATION); // Send one RPC on each channel. CheckRpcSendOk(stub1, DEBUG_LOCATION); @@ -729,12 +832,14 @@ TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) { // Create two channels that use local subchannel pool. ChannelArguments args; args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1); - auto channel1 = BuildChannel("pick_first", args); + auto response_generator1 = BuildResolverResponseGenerator(); + auto channel1 = BuildChannel("pick_first", response_generator1, args); auto stub1 = BuildStub(channel1); - SetNextResolution(ports); - auto channel2 = BuildChannel("pick_first", args); + response_generator1.SetNextResolution(ports); + auto response_generator2 = BuildResolverResponseGenerator(); + auto channel2 = BuildChannel("pick_first", response_generator2, args); auto stub2 = BuildStub(channel2); - SetNextResolution(ports); + response_generator2.SetNextResolution(ports); WaitForServer(stub1, 0, DEBUG_LOCATION); // Send one RPC on each channel. CheckRpcSendOk(stub1, DEBUG_LOCATION); @@ -750,13 +855,14 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { const int kNumUpdates = 1000; const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); auto stub = BuildStub(channel); std::vector ports = GetServersPorts(); for (size_t i = 0; i < kNumUpdates; ++i) { std::shuffle(ports.begin(), ports.end(), std::mt19937(std::random_device()())); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); // We should re-enter core at the end of the loop to give the resolution // setting closure a chance to run. if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION); @@ -778,16 +884,17 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { dead_ports.emplace_back(grpc_pick_unused_port_or_die()); } } - auto channel = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); auto stub = BuildStub(channel); // The initial resolution only contains dead ports. There won't be any // selected subchannel. Re-resolution will return the same result. - SetNextResolution(dead_ports); + response_generator.SetNextResolution(dead_ports); gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******"); for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub); // Set a re-resolution result that contains reachable ports, so that the // pick_first LB policy can recover soon. - SetNextResolutionUponError(alive_ports); + response_generator.SetNextResolutionUponError(alive_ports); gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******"); WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */); CheckRpcSendOk(stub, DEBUG_LOCATION); @@ -799,9 +906,10 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) { std::vector ports = {grpc_pick_unused_port_or_die()}; StartServers(1, ports); - auto channel = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******"); WaitForServer(stub, 0, DEBUG_LOCATION); gpr_log(GPR_INFO, "****** STOPPING SERVER ******"); @@ -818,9 +926,10 @@ TEST_F(ClientLbEnd2endTest, grpc_pick_unused_port_or_die()}; CreateServers(2, ports); StartServer(1); - auto channel = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******"); WaitForServer(stub, 1, DEBUG_LOCATION); gpr_log(GPR_INFO, "****** STOPPING SERVER ******"); @@ -834,9 +943,10 @@ TEST_F(ClientLbEnd2endTest, TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { std::vector ports = {grpc_pick_unused_port_or_die()}; StartServers(1, ports); - auto channel_1 = BuildChannel("pick_first"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel_1 = BuildChannel("pick_first", response_generator); auto stub_1 = BuildStub(channel_1); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******"); WaitForServer(stub_1, 0, DEBUG_LOCATION); gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******"); @@ -845,13 +955,10 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { // create a new subchannel and hold a ref to it. StartServers(1, ports); gpr_log(GPR_INFO, "****** SERVER RESTARTED *******"); - auto channel_2 = BuildChannel("pick_first"); + auto response_generator_2 = BuildResolverResponseGenerator(); + auto channel_2 = BuildChannel("pick_first", response_generator_2); auto stub_2 = BuildStub(channel_2); - // TODO(juanlishen): This resolution result will only be visible to channel 2 - // since the response generator is only associated with channel 2 now. We - // should change the response generator to be able to deliver updates to - // multiple channels at once. - SetNextResolution(ports); + response_generator_2.SetNextResolution(ports); gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******"); WaitForServer(stub_2, 0, DEBUG_LOCATION, true); gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******"); @@ -877,13 +984,15 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { // Start server, send RPC, and make sure channel is READY. const int kNumServers = 1; StartServers(kNumServers); - auto channel = BuildChannel(""); // pick_first is the default. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = + BuildChannel("", response_generator); // pick_first is the default. auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Stop server. Channel should go into state IDLE. - SetFailureOnReresolution(); + response_generator.SetFailureOnReresolution(); servers_[0]->Shutdown(); EXPECT_TRUE(WaitForChannelNotReady(channel.get())); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); @@ -891,14 +1000,16 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { } TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) { - auto channel = BuildChannel(""); // pick_first is the default. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = + BuildChannel("", response_generator); // pick_first is the default. auto stub = BuildStub(channel); // Create a number of servers, but only start 1 of them. CreateServers(10); StartServer(0); // Initially resolve to first server and make sure it connects. gpr_log(GPR_INFO, "Phase 1: Connect to first server."); - SetNextResolution({servers_[0]->port_}); + response_generator.SetNextResolution({servers_[0]->port_}); CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Send a resolution update with the remaining servers, none of which are @@ -910,7 +1021,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) { gpr_log(GPR_INFO, "Phase 2: Resolver update pointing to remaining " "(not started) servers."); - SetNextResolution(GetServersPorts(1 /* start_index */)); + response_generator.SetNextResolution(GetServersPorts(1 /* start_index */)); // RPCs will continue to be sent to the first server. CheckRpcSendOk(stub, DEBUG_LOCATION); // Now stop the first server, so that the current subchannel list @@ -941,9 +1052,11 @@ TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) { // Start server, send RPC, and make sure channel is READY. const int kNumServers = 1; StartServers(kNumServers); - auto channel = BuildChannel(""); // pick_first is the default. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = + BuildChannel("", response_generator); // pick_first is the default. auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Stop server. Channel should go into state IDLE. @@ -952,13 +1065,13 @@ TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) { EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); // Now send resolver update that includes no addresses. Channel // should stay in state IDLE. - SetNextResolution({}); + response_generator.SetNextResolution({}); EXPECT_FALSE(channel->WaitForStateChange( GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3))); // Now bring the backend back up and send a non-empty resolver update, // and then try to send an RPC. Channel should go back into state READY. StartServer(0); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); } @@ -967,9 +1080,10 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); // Wait until all backends are ready. do { CheckRpcSendOk(stub, DEBUG_LOCATION); @@ -993,18 +1107,20 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) { StartServers(1); // Single server - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); - SetNextResolution({servers_[0]->port_}); + response_generator.SetNextResolution({servers_[0]->port_}); WaitForServer(stub, 0, DEBUG_LOCATION); // Create a new channel and its corresponding RR LB policy, which will pick // the subchannels in READY state from the previous RPC against the same // target (even if it happened over a different channel, because subchannels // are globally reused). Progress should happen without any transition from // this READY state. - auto second_channel = BuildChannel("round_robin"); + auto second_response_generator = BuildResolverResponseGenerator(); + auto second_channel = BuildChannel("round_robin", second_response_generator); auto second_stub = BuildStub(second_channel); - SetNextResolution({servers_[0]->port_}); + second_response_generator.SetNextResolution({servers_[0]->port_}); CheckRpcSendOk(second_stub, DEBUG_LOCATION); } @@ -1012,13 +1128,14 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); std::vector ports; // Start with a single server. gpr_log(GPR_INFO, "*** FIRST BACKEND ***"); ports.emplace_back(servers_[0]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); // Send RPCs. They should all go servers_[0] for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); @@ -1030,7 +1147,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { gpr_log(GPR_INFO, "*** SECOND BACKEND ***"); ports.clear(); ports.emplace_back(servers_[1]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0, servers_[1]->service_.request_count()); @@ -1044,7 +1161,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { gpr_log(GPR_INFO, "*** THIRD BACKEND ***"); ports.clear(); ports.emplace_back(servers_[2]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); WaitForServer(stub, 2, DEBUG_LOCATION); for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(0, servers_[0]->service_.request_count()); @@ -1057,7 +1174,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { ports.emplace_back(servers_[0]->port_); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 1, DEBUG_LOCATION); WaitForServer(stub, 2, DEBUG_LOCATION); @@ -1069,7 +1186,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // An empty update will result in the channel going into TRANSIENT_FAILURE. gpr_log(GPR_INFO, "*** NO BACKENDS ***"); ports.clear(); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); grpc_connectivity_state channel_state; do { channel_state = channel->GetState(true /* try to connect */); @@ -1080,7 +1197,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***"); ports.clear(); ports.emplace_back(servers_[1]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); WaitForServer(stub, 1, DEBUG_LOCATION); channel_state = channel->GetState(false /* try to connect */); ASSERT_EQ(channel_state, GRPC_CHANNEL_READY); @@ -1091,13 +1208,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); std::vector ports; - // Start with a single server. ports.emplace_back(servers_[0]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); // Send RPCs. They should all go to servers_[0] for (size_t i = 0; i < 10; ++i) SendRpc(stub); @@ -1105,15 +1222,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); servers_[0]->service_.ResetCounters(); - // Shutdown one of the servers to be sent in the update. servers_[1]->Shutdown(); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 2, DEBUG_LOCATION); - // Send three RPCs, one per server. for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub); // The server in shutdown shouldn't receive any. @@ -1124,13 +1239,14 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); std::vector ports = GetServersPorts(); for (size_t i = 0; i < 1000; ++i) { std::shuffle(ports.begin(), ports.end(), std::mt19937(std::random_device()())); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION); } // Check LB policy name for the channel. @@ -1156,9 +1272,10 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { second_ports.push_back(grpc_pick_unused_port_or_die()); } StartServers(kNumServers, first_ports); - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); - SetNextResolution(first_ports); + response_generator.SetNextResolution(first_ports); // Send a number of RPCs, which succeed. for (size_t i = 0; i < 100; ++i) { CheckRpcSendOk(stub, DEBUG_LOCATION); @@ -1182,7 +1299,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { StartServers(kNumServers, second_ports); // Don't notify of the update. Wait for the LB policy's re-resolution to // "pull" the new ports. - SetNextResolutionUponError(second_ports); + response_generator.SetNextResolutionUponError(second_ports); gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******"); gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******"); // Client request should eventually (but still fairly soon) succeed. @@ -1195,13 +1312,71 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { ASSERT_GT(gpr_time_cmp(deadline, now), 0); } +TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) { + // Start servers and create channel. Channel should go to READY state. + const int kNumServers = 3; + StartServers(kNumServers); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + EXPECT_TRUE(WaitForChannelReady(channel.get())); + // Now kill the servers. The channel should transition to TRANSIENT_FAILURE. + // TODO(roth): This test should ideally check that even when the + // subchannels are in state CONNECTING for an extended period of time, + // we will still report TRANSIENT_FAILURE. Unfortunately, we don't + // currently have a good way to get a subchannel to report CONNECTING + // for a long period of time, since the servers in this test framework + // are on the loopback interface, which will immediately return a + // "Connection refused" error, so the subchannels will only be in + // CONNECTING state very briefly. When we have time, see if we can + // find a way to fix this. + for (size_t i = 0; i < servers_.size(); ++i) { + servers_[i]->Shutdown(); + } + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_TRANSIENT_FAILURE; + }; + EXPECT_TRUE(WaitForChannelState(channel.get(), predicate)); +} + +TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) { + // Create channel and return servers that don't exist. Channel should + // quickly transition into TRANSIENT_FAILURE. + // TODO(roth): This test should ideally check that even when the + // subchannels are in state CONNECTING for an extended period of time, + // we will still report TRANSIENT_FAILURE. Unfortunately, we don't + // currently have a good way to get a subchannel to report CONNECTING + // for a long period of time, since the servers in this test framework + // are on the loopback interface, which will immediately return a + // "Connection refused" error, so the subchannels will only be in + // CONNECTING state very briefly. When we have time, see if we can + // find a way to fix this. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution({ + grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die(), + }); + for (size_t i = 0; i < servers_.size(); ++i) { + servers_[i]->Shutdown(); + } + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_TRANSIENT_FAILURE; + }; + EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true)); +} + TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { const int kNumServers = 3; StartServers(kNumServers); const auto ports = GetServersPorts(); - auto channel = BuildChannel("round_robin"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); - SetNextResolution(ports); + response_generator.SetNextResolution(ports); for (size_t i = 0; i < kNumServers; ++i) { WaitForServer(stub, i, DEBUG_LOCATION); } @@ -1245,9 +1420,10 @@ TEST_F(ClientLbEnd2endTest, args.SetServiceConfigJSON( "{\"healthCheckConfig\": " "{\"serviceName\": \"health_check_service_name\"}}"); - auto channel = BuildChannel("round_robin", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator, args); auto stub = BuildStub(channel); - SetNextResolution({servers_[0]->port_}); + response_generator.SetNextResolution({servers_[0]->port_}); EXPECT_TRUE(WaitForChannelReady(channel.get())); CheckRpcSendOk(stub, DEBUG_LOCATION); } @@ -1261,9 +1437,10 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) { args.SetServiceConfigJSON( "{\"healthCheckConfig\": " "{\"serviceName\": \"health_check_service_name\"}}"); - auto channel = BuildChannel("round_robin", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator, args); auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); // Channel should not become READY, because health checks should be failing. gpr_log(GPR_INFO, "*** initial state: unknown health check service name for " @@ -1325,6 +1502,34 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) { EnableDefaultHealthCheckService(false); } +TEST_F(ClientLbEnd2endTest, + RoundRobinWithHealthCheckingHandlesSubchannelFailure) { + EnableDefaultHealthCheckService(true); + // Start servers. + const int kNumServers = 3; + StartServers(kNumServers); + servers_[0]->SetServingStatus("health_check_service_name", true); + servers_[1]->SetServingStatus("health_check_service_name", true); + servers_[2]->SetServingStatus("health_check_service_name", true); + ChannelArguments args; + args.SetServiceConfigJSON( + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name\"}}"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator, args); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + WaitForServer(stub, 0, DEBUG_LOCATION); + // Stop server 0 and send a new resolver result to ensure that RR + // checks each subchannel's state. + servers_[0]->Shutdown(); + response_generator.SetNextResolution(GetServersPorts()); + // Send a bunch more RPCs. + for (size_t i = 0; i < 100; i++) { + SendRpc(stub); + } +} + TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { EnableDefaultHealthCheckService(true); // Start server. @@ -1335,15 +1540,17 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { args.SetServiceConfigJSON( "{\"healthCheckConfig\": " "{\"serviceName\": \"health_check_service_name\"}}"); - auto channel1 = BuildChannel("round_robin", args); + auto response_generator1 = BuildResolverResponseGenerator(); + auto channel1 = BuildChannel("round_robin", response_generator1, args); auto stub1 = BuildStub(channel1); std::vector ports = GetServersPorts(); - SetNextResolution(ports); + response_generator1.SetNextResolution(ports); // Create a channel with health checking enabled but inhibited. args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1); - auto channel2 = BuildChannel("round_robin", args); + auto response_generator2 = BuildResolverResponseGenerator(); + auto channel2 = BuildChannel("round_robin", response_generator2, args); auto stub2 = BuildStub(channel2); - SetNextResolution(ports); + response_generator2.SetNextResolution(ports); // First channel should not become READY, because health checks should be // failing. EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); @@ -1370,19 +1577,21 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { args.SetServiceConfigJSON( "{\"healthCheckConfig\": " "{\"serviceName\": \"health_check_service_name\"}}"); - auto channel1 = BuildChannel("round_robin", args); + auto response_generator1 = BuildResolverResponseGenerator(); + auto channel1 = BuildChannel("round_robin", response_generator1, args); auto stub1 = BuildStub(channel1); std::vector ports = GetServersPorts(); - SetNextResolution(ports); + response_generator1.SetNextResolution(ports); // Create a channel with health-checking enabled with a different // service name. ChannelArguments args2; args2.SetServiceConfigJSON( "{\"healthCheckConfig\": " "{\"serviceName\": \"health_check_service_name2\"}}"); - auto channel2 = BuildChannel("round_robin", args2); + auto response_generator2 = BuildResolverResponseGenerator(); + auto channel2 = BuildChannel("round_robin", response_generator2, args2); auto stub2 = BuildStub(channel2); - SetNextResolution(ports); + response_generator2.SetNextResolution(ports); // Allow health checks from channel 2 to succeed. servers_[0]->SetServingStatus("health_check_service_name2", true); // First channel should not become READY, because health checks should be @@ -1401,40 +1610,201 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { EnableDefaultHealthCheckService(false); } +TEST_F(ClientLbEnd2endTest, + RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) { + EnableDefaultHealthCheckService(true); + // Start server. + const int kNumServers = 1; + StartServers(kNumServers); + // Create a channel with health-checking enabled. + const char* kServiceConfigJson = + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name\"}}"; + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + std::vector ports = GetServersPorts(); + response_generator.SetNextResolution(ports, kServiceConfigJson); + servers_[0]->SetServingStatus("health_check_service_name", true); + EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */)); + // Send an update on the channel to change it to use a health checking + // service name that is not being reported as healthy. + const char* kServiceConfigJson2 = + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name2\"}}"; + response_generator.SetNextResolution(ports, kServiceConfigJson2); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + // Clean up. + EnableDefaultHealthCheckService(false); +} + +TEST_F(ClientLbEnd2endTest, ChannelIdleness) { + // Start server. + const int kNumServers = 1; + StartServers(kNumServers); + // Set max idle time and build the channel. + ChannelArguments args; + args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("", response_generator, args); + auto stub = BuildStub(channel); + // The initial channel state should be IDLE. + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); + // After sending RPC, channel state should be READY. + response_generator.SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + // After a period time not using the channel, the channel state should switch + // to IDLE. + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); + // Sending a new RPC should awake the IDLE channel. + response_generator.SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); +} + +class ClientLbPickArgsTest : public ClientLbEnd2endTest { + protected: + void SetUp() override { + ClientLbEnd2endTest::SetUp(); + current_test_instance_ = this; + } + + static void SetUpTestCase() { + grpc_init(); + grpc_core::RegisterTestPickArgsLoadBalancingPolicy(SavePickArgs); + } + + static void TearDownTestCase() { grpc_shutdown(); } + + const std::vector& args_seen_list() { + grpc::internal::MutexLock lock(&mu_); + return args_seen_list_; + } + + private: + static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) { + ClientLbPickArgsTest* self = current_test_instance_; + grpc::internal::MutexLock lock(&self->mu_); + self->args_seen_list_.emplace_back(args_seen); + } + + static ClientLbPickArgsTest* current_test_instance_; + grpc::internal::Mutex mu_; + std::vector args_seen_list_; +}; + +ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr; + +TEST_F(ClientLbPickArgsTest, Basic) { + const int kNumServers = 1; + StartServers(kNumServers); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("test_pick_args_lb", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/true); + // Check LB policy name for the channel. + EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName()); + // There will be two entries, one for the pick tried in state + // CONNECTING and another for the pick tried in state READY. + EXPECT_THAT(args_seen_list(), + ::testing::ElementsAre( + ::testing::AllOf( + ::testing::Field(&grpc_core::PickArgsSeen::path, + "/grpc.testing.EchoTestService/Echo"), + ::testing::Field(&grpc_core::PickArgsSeen::metadata, + ::testing::UnorderedElementsAre( + ::testing::Pair("foo", "1"), + ::testing::Pair("bar", "2"), + ::testing::Pair("baz", "3")))), + ::testing::AllOf( + ::testing::Field(&grpc_core::PickArgsSeen::path, + "/grpc.testing.EchoTestService/Echo"), + ::testing::Field(&grpc_core::PickArgsSeen::metadata, + ::testing::UnorderedElementsAre( + ::testing::Pair("foo", "1"), + ::testing::Pair("bar", "2"), + ::testing::Pair("baz", "3")))))); +} + class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest { protected: void SetUp() override { ClientLbEnd2endTest::SetUp(); + current_test_instance_ = this; + } + + static void SetUpTestCase() { + grpc_init(); grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy( - ReportTrailerIntercepted, this); + ReportTrailerIntercepted); } - void TearDown() override { ClientLbEnd2endTest::TearDown(); } + static void TearDownTestCase() { grpc_shutdown(); } int trailers_intercepted() { grpc::internal::MutexLock lock(&mu_); return trailers_intercepted_; } + const grpc_core::MetadataVector& trailing_metadata() { + grpc::internal::MutexLock lock(&mu_); + return trailing_metadata_; + } + + const udpa::data::orca::v1::OrcaLoadReport* backend_load_report() { + grpc::internal::MutexLock lock(&mu_); + return load_report_.get(); + } + private: - static void ReportTrailerIntercepted(void* arg) { - ClientLbInterceptTrailingMetadataTest* self = - static_cast(arg); + static void ReportTrailerIntercepted( + const grpc_core::TrailingMetadataArgsSeen& args_seen) { + const auto* backend_metric_data = args_seen.backend_metric_data; + ClientLbInterceptTrailingMetadataTest* self = current_test_instance_; grpc::internal::MutexLock lock(&self->mu_); self->trailers_intercepted_++; + self->trailing_metadata_ = args_seen.metadata; + if (backend_metric_data != nullptr) { + self->load_report_ = + absl::make_unique(); + self->load_report_->set_cpu_utilization( + backend_metric_data->cpu_utilization); + self->load_report_->set_mem_utilization( + backend_metric_data->mem_utilization); + self->load_report_->set_rps(backend_metric_data->requests_per_second); + for (const auto& p : backend_metric_data->request_cost) { + std::string name = std::string(p.first); + (*self->load_report_->mutable_request_cost())[name] = p.second; + } + for (const auto& p : backend_metric_data->utilization) { + std::string name = std::string(p.first); + (*self->load_report_->mutable_utilization())[name] = p.second; + } + } } + static ClientLbInterceptTrailingMetadataTest* current_test_instance_; grpc::internal::Mutex mu_; int trailers_intercepted_ = 0; + grpc_core::MetadataVector trailing_metadata_; + std::unique_ptr load_report_; }; +ClientLbInterceptTrailingMetadataTest* + ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr; + TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) { const int kNumServers = 1; const int kNumRpcs = 10; StartServers(kNumServers); - auto channel = BuildChannel("intercept_trailing_metadata_lb"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = + BuildChannel("intercept_trailing_metadata_lb", response_generator); auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < kNumRpcs; ++i) { CheckRpcSendOk(stub, DEBUG_LOCATION); } @@ -1442,6 +1812,14 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) { EXPECT_EQ("intercept_trailing_metadata_lb", channel->GetLoadBalancingPolicyName()); EXPECT_EQ(kNumRpcs, trailers_intercepted()); + EXPECT_THAT(trailing_metadata(), + ::testing::UnorderedElementsAre( + // TODO(roth): Should grpc-status be visible here? + ::testing::Pair("grpc-status", "0"), + ::testing::Pair("user-agent", ::testing::_), + ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"), + ::testing::Pair("baz", "3"))); + EXPECT_EQ(nullptr, backend_load_report()); } TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) { @@ -1464,11 +1842,71 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) { " }\n" " } ]\n" "}"); - auto channel = BuildChannel("intercept_trailing_metadata_lb", args); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = + BuildChannel("intercept_trailing_metadata_lb", response_generator, args); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + for (size_t i = 0; i < kNumRpcs; ++i) { + CheckRpcSendOk(stub, DEBUG_LOCATION); + } + // Check LB policy name for the channel. + EXPECT_EQ("intercept_trailing_metadata_lb", + channel->GetLoadBalancingPolicyName()); + EXPECT_EQ(kNumRpcs, trailers_intercepted()); + EXPECT_THAT(trailing_metadata(), + ::testing::UnorderedElementsAre( + // TODO(roth): Should grpc-status be visible here? + ::testing::Pair("grpc-status", "0"), + ::testing::Pair("user-agent", ::testing::_), + ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"), + ::testing::Pair("baz", "3"))); + EXPECT_EQ(nullptr, backend_load_report()); +} + +TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { + const int kNumServers = 1; + const int kNumRpcs = 10; + StartServers(kNumServers); + udpa::data::orca::v1::OrcaLoadReport load_report; + load_report.set_cpu_utilization(0.5); + load_report.set_mem_utilization(0.75); + load_report.set_rps(25); + auto* request_cost = load_report.mutable_request_cost(); + (*request_cost)["foo"] = 0.8; + (*request_cost)["bar"] = 1.4; + auto* utilization = load_report.mutable_utilization(); + (*utilization)["baz"] = 1.1; + (*utilization)["quux"] = 0.9; + for (const auto& server : servers_) { + server->service_.set_load_report(&load_report); + } + auto response_generator = BuildResolverResponseGenerator(); + auto channel = + BuildChannel("intercept_trailing_metadata_lb", response_generator); auto stub = BuildStub(channel); - SetNextResolution(GetServersPorts()); + response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < kNumRpcs; ++i) { CheckRpcSendOk(stub, DEBUG_LOCATION); + auto* actual = backend_load_report(); + ASSERT_NE(actual, nullptr); + // TODO(roth): Change this to use EqualsProto() once that becomes + // available in OSS. + EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization()); + EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization()); + EXPECT_EQ(actual->rps(), load_report.rps()); + EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size()); + for (const auto& p : actual->request_cost()) { + auto it = load_report.request_cost().find(p.first); + ASSERT_NE(it, load_report.request_cost().end()); + EXPECT_EQ(it->second, p.second); + } + EXPECT_EQ(actual->utilization().size(), load_report.utilization().size()); + for (const auto& p : actual->utilization()) { + auto it = load_report.utilization().find(p.first); + ASSERT_NE(it, load_report.utilization().end()); + EXPECT_EQ(it->second, p.second); + } } // Check LB policy name for the channel. EXPECT_EQ("intercept_trailing_metadata_lb", @@ -1476,14 +1914,89 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) { EXPECT_EQ(kNumRpcs, trailers_intercepted()); } +class ClientLbAddressTest : public ClientLbEnd2endTest { + protected: + static const char* kAttributeKey; + + class Attribute : public grpc_core::ServerAddress::AttributeInterface { + public: + explicit Attribute(const std::string& str) : str_(str) {} + + std::unique_ptr Copy() const override { + return absl::make_unique(str_); + } + + int Cmp(const AttributeInterface* other) const override { + return str_.compare(static_cast(other)->str_); + } + + std::string ToString() const override { return str_; } + + private: + std::string str_; + }; + + void SetUp() override { + ClientLbEnd2endTest::SetUp(); + current_test_instance_ = this; + } + + static void SetUpTestCase() { + grpc_init(); + grpc_core::RegisterAddressTestLoadBalancingPolicy(SaveAddress); + } + + static void TearDownTestCase() { grpc_shutdown(); } + + const std::vector& addresses_seen() { + grpc::internal::MutexLock lock(&mu_); + return addresses_seen_; + } + + private: + static void SaveAddress(const grpc_core::ServerAddress& address) { + ClientLbAddressTest* self = current_test_instance_; + grpc::internal::MutexLock lock(&self->mu_); + self->addresses_seen_.emplace_back(address.ToString()); + } + + static ClientLbAddressTest* current_test_instance_; + grpc::internal::Mutex mu_; + std::vector addresses_seen_; +}; + +const char* ClientLbAddressTest::kAttributeKey = "attribute_key"; + +ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr; + +TEST_F(ClientLbAddressTest, Basic) { + const int kNumServers = 1; + StartServers(kNumServers); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("address_test_lb", response_generator); + auto stub = BuildStub(channel); + // Addresses returned by the resolver will have attached attributes. + response_generator.SetNextResolution(GetServersPorts(), nullptr, + kAttributeKey, + absl::make_unique("foo")); + CheckRpcSendOk(stub, DEBUG_LOCATION); + // Check LB policy name for the channel. + EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName()); + // Make sure that the attributes wind up on the subchannels. + std::vector expected; + for (const int port : GetServersPorts()) { + expected.emplace_back( + absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", port, + " args={} attributes={", kAttributeKey, "=foo}")); + } + EXPECT_EQ(addresses_seen(), expected); +} + } // namespace } // namespace testing } // namespace grpc int main(int argc, char** argv) { - // Make the backup poller poll very frequently in order to pick up - // updates from all the subchannels's FDs. - GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(argc, argv); const auto result = RUN_ALL_TESTS();