3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include "absl/strings/str_cat.h"
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/atm.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/time.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 #include <grpcpp/create_channel.h>
37 #include <grpcpp/health_check_service_interface.h>
38 #include <grpcpp/impl/codegen/sync.h>
39 #include <grpcpp/server.h>
40 #include <grpcpp/server_builder.h>
42 #include "src/core/ext/filters/client_channel/backup_poller.h"
43 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
44 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
45 #include "src/core/ext/filters/client_channel/server_address.h"
46 #include "src/core/ext/filters/client_channel/service_config.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/gpr/env.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/iomgr/parse_address.h"
53 #include "src/core/lib/iomgr/tcp_client.h"
54 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
55 #include "src/cpp/client/secure_credentials.h"
56 #include "src/cpp/server/secure_server_credentials.h"
58 #include "src/proto/grpc/testing/echo.grpc.pb.h"
59 #include "src/proto/grpc/testing/xds/orca_load_report_for_test.pb.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/test_config.h"
62 #include "test/core/util/test_lb_policies.h"
63 #include "test/cpp/end2end/test_service_impl.h"
65 #include <gmock/gmock.h>
66 #include <gtest/gtest.h>
68 using grpc::testing::EchoRequest;
69 using grpc::testing::EchoResponse;
70 using std::chrono::system_clock;
72 // defined in tcp_client.cc
73 extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
75 static grpc_tcp_client_vtable* default_client_impl;
81 gpr_atm g_connection_delay_ms;
83 void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep,
84 grpc_pollset_set* interested_parties,
85 const grpc_channel_args* channel_args,
86 const grpc_resolved_address* addr,
87 grpc_millis deadline) {
88 const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms);
90 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
92 default_client_impl->connect(closure, ep, interested_parties, channel_args,
93 addr, deadline + delay_ms);
96 grpc_tcp_client_vtable delayed_connect = {tcp_client_connect_with_delay};
98 // Subclass of TestServiceImpl that increments a request counter for
99 // every call to the Echo RPC.
100 class MyTestServiceImpl : public TestServiceImpl {
102 Status Echo(ServerContext* context, const EchoRequest* request,
103 EchoResponse* response) override {
104 const udpa::data::orca::v1::OrcaLoadReport* load_report = nullptr;
106 grpc::internal::MutexLock lock(&mu_);
108 load_report = load_report_;
110 AddClient(context->peer());
111 if (load_report != nullptr) {
112 // TODO(roth): Once we provide a more standard server-side API for
113 // populating this data, use that API here.
114 context->AddTrailingMetadata("x-endpoint-load-metrics-bin",
115 load_report->SerializeAsString());
117 return TestServiceImpl::Echo(context, request, response);
120 int request_count() {
121 grpc::internal::MutexLock lock(&mu_);
122 return request_count_;
125 void ResetCounters() {
126 grpc::internal::MutexLock lock(&mu_);
130 std::set<std::string> clients() {
131 grpc::internal::MutexLock lock(&clients_mu_);
135 void set_load_report(udpa::data::orca::v1::OrcaLoadReport* load_report) {
136 grpc::internal::MutexLock lock(&mu_);
137 load_report_ = load_report;
141 void AddClient(const std::string& client) {
142 grpc::internal::MutexLock lock(&clients_mu_);
143 clients_.insert(client);
146 grpc::internal::Mutex mu_;
147 int request_count_ = 0;
148 const udpa::data::orca::v1::OrcaLoadReport* load_report_ = nullptr;
149 grpc::internal::Mutex clients_mu_;
150 std::set<std::string> clients_;
153 class FakeResolverResponseGeneratorWrapper {
155 FakeResolverResponseGeneratorWrapper()
156 : response_generator_(grpc_core::MakeRefCounted<
157 grpc_core::FakeResolverResponseGenerator>()) {}
159 FakeResolverResponseGeneratorWrapper(
160 FakeResolverResponseGeneratorWrapper&& other) noexcept {
161 response_generator_ = std::move(other.response_generator_);
164 void SetNextResolution(
165 const std::vector<int>& ports, const char* service_config_json = nullptr,
166 const char* attribute_key = nullptr,
167 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
169 grpc_core::ExecCtx exec_ctx;
170 response_generator_->SetResponse(BuildFakeResults(
171 ports, service_config_json, attribute_key, std::move(attribute)));
174 void SetNextResolutionUponError(const std::vector<int>& ports) {
175 grpc_core::ExecCtx exec_ctx;
176 response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
179 void SetFailureOnReresolution() {
180 grpc_core::ExecCtx exec_ctx;
181 response_generator_->SetFailureOnReresolution();
184 grpc_core::FakeResolverResponseGenerator* Get() const {
185 return response_generator_.get();
189 static grpc_core::Resolver::Result BuildFakeResults(
190 const std::vector<int>& ports, const char* service_config_json = nullptr,
191 const char* attribute_key = nullptr,
192 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
194 grpc_core::Resolver::Result result;
195 for (const int& port : ports) {
196 std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port);
197 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str.c_str(), true);
198 GPR_ASSERT(lb_uri != nullptr);
199 grpc_resolved_address address;
200 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
201 std::map<const char*,
202 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface>>
204 if (attribute != nullptr) {
205 attributes[attribute_key] = attribute->Copy();
207 result.addresses.emplace_back(address.addr, address.len,
208 nullptr /* args */, std::move(attributes));
209 grpc_uri_destroy(lb_uri);
211 if (service_config_json != nullptr) {
212 result.service_config = grpc_core::ServiceConfig::Create(
213 nullptr, service_config_json, &result.service_config_error);
214 GPR_ASSERT(result.service_config != nullptr);
219 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
223 class ClientLbEnd2endTest : public ::testing::Test {
225 ClientLbEnd2endTest()
226 : server_host_("localhost"),
227 kRequestMessage_("Live long and prosper."),
228 creds_(new SecureChannelCredentials(
229 grpc_fake_transport_security_credentials_create())) {}
231 static void SetUpTestCase() {
232 // Make the backup poller poll very frequently in order to pick up
233 // updates from all the subchannels's FDs.
234 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
236 // Workaround Apple CFStream bug
237 gpr_setenv("grpc_cfstream", "0");
241 void SetUp() override { grpc_init(); }
243 void TearDown() override {
244 for (size_t i = 0; i < servers_.size(); ++i) {
245 servers_[i]->Shutdown();
249 grpc_shutdown_blocking();
252 void CreateServers(size_t num_servers,
253 std::vector<int> ports = std::vector<int>()) {
255 for (size_t i = 0; i < num_servers; ++i) {
257 if (ports.size() == num_servers) port = ports[i];
258 servers_.emplace_back(new ServerData(port));
262 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
264 void StartServers(size_t num_servers,
265 std::vector<int> ports = std::vector<int>()) {
266 CreateServers(num_servers, std::move(ports));
267 for (size_t i = 0; i < num_servers; ++i) {
272 std::vector<int> GetServersPorts(size_t start_index = 0) {
273 std::vector<int> ports;
274 for (size_t i = start_index; i < servers_.size(); ++i) {
275 ports.push_back(servers_[i]->port_);
280 FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
281 return FakeResolverResponseGeneratorWrapper();
284 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
285 const std::shared_ptr<Channel>& channel) {
286 return grpc::testing::EchoTestService::NewStub(channel);
289 std::shared_ptr<Channel> BuildChannel(
290 const std::string& lb_policy_name,
291 const FakeResolverResponseGeneratorWrapper& response_generator,
292 ChannelArguments args = ChannelArguments()) {
293 if (lb_policy_name.size() > 0) {
294 args.SetLoadBalancingPolicyName(lb_policy_name);
295 } // else, default to pick first
296 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
297 response_generator.Get());
298 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
302 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
303 EchoResponse* response = nullptr, int timeout_ms = 1000,
304 Status* result = nullptr, bool wait_for_ready = false) {
305 const bool local_response = (response == nullptr);
306 if (local_response) response = new EchoResponse;
308 request.set_message(kRequestMessage_);
309 request.mutable_param()->set_echo_metadata(true);
310 ClientContext context;
311 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
312 if (wait_for_ready) context.set_wait_for_ready(true);
313 context.AddMetadata("foo", "1");
314 context.AddMetadata("bar", "2");
315 context.AddMetadata("baz", "3");
316 Status status = stub->Echo(&context, request, response);
317 if (result != nullptr) *result = status;
318 if (local_response) delete response;
323 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
324 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
325 EchoResponse response;
328 SendRpc(stub, &response, 2000, &status, wait_for_ready);
329 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
331 << "Error: " << status.error_message() << " "
332 << status.error_details();
333 ASSERT_EQ(response.message(), kRequestMessage_)
334 << "From " << location.file() << ":" << location.line();
335 if (!success) abort();
338 void CheckRpcSendFailure(
339 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
340 const bool success = SendRpc(stub);
341 EXPECT_FALSE(success);
346 std::unique_ptr<Server> server_;
347 MyTestServiceImpl service_;
348 std::unique_ptr<std::thread> thread_;
349 bool server_ready_ = false;
350 bool started_ = false;
352 explicit ServerData(int port = 0) {
353 port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
356 void Start(const std::string& server_host) {
357 gpr_log(GPR_INFO, "starting server on port %d", port_);
359 grpc::internal::Mutex mu;
360 grpc::internal::MutexLock lock(&mu);
361 grpc::internal::CondVar cond;
362 thread_.reset(new std::thread(
363 std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
364 cond.WaitUntil(&mu, [this] { return server_ready_; });
365 server_ready_ = false;
366 gpr_log(GPR_INFO, "server startup complete");
369 void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
370 grpc::internal::CondVar* cond) {
371 std::ostringstream server_address;
372 server_address << server_host << ":" << port_;
373 ServerBuilder builder;
374 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
375 grpc_fake_transport_security_server_credentials_create()));
376 builder.AddListeningPort(server_address.str(), std::move(creds));
377 builder.RegisterService(&service_);
378 server_ = builder.BuildAndStart();
379 grpc::internal::MutexLock lock(mu);
380 server_ready_ = true;
385 if (!started_) return;
386 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
391 void SetServingStatus(const std::string& service, bool serving) {
392 server_->GetHealthCheckService()->SetServingStatus(service, serving);
396 void ResetCounters() {
397 for (const auto& server : servers_) server->service_.ResetCounters();
401 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
402 size_t server_idx, const grpc_core::DebugLocation& location,
403 bool ignore_failure = false) {
405 if (ignore_failure) {
408 CheckRpcSendOk(stub, location, true);
410 } while (servers_[server_idx]->service_.request_count() == 0);
414 bool WaitForChannelState(
415 Channel* channel, std::function<bool(grpc_connectivity_state)> predicate,
416 bool try_to_connect = false, int timeout_seconds = 5) {
417 const gpr_timespec deadline =
418 grpc_timeout_seconds_to_deadline(timeout_seconds);
420 grpc_connectivity_state state = channel->GetState(try_to_connect);
421 if (predicate(state)) break;
422 if (!channel->WaitForStateChange(state, deadline)) return false;
427 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
428 auto predicate = [](grpc_connectivity_state state) {
429 return state != GRPC_CHANNEL_READY;
431 return WaitForChannelState(channel, predicate, false, timeout_seconds);
434 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
435 auto predicate = [](grpc_connectivity_state state) {
436 return state == GRPC_CHANNEL_READY;
438 return WaitForChannelState(channel, predicate, true, timeout_seconds);
441 bool SeenAllServers() {
442 for (const auto& server : servers_) {
443 if (server->service_.request_count() == 0) return false;
448 // Updates \a connection_order by appending to it the index of the newly
449 // connected server. Must be called after every single RPC.
450 void UpdateConnectionOrder(
451 const std::vector<std::unique_ptr<ServerData>>& servers,
452 std::vector<int>* connection_order) {
453 for (size_t i = 0; i < servers.size(); ++i) {
454 if (servers[i]->service_.request_count() == 1) {
455 // Was the server index known? If not, update connection_order.
457 std::find(connection_order->begin(), connection_order->end(), i);
458 if (it == connection_order->end()) {
459 connection_order->push_back(i);
466 const std::string server_host_;
467 std::vector<std::unique_ptr<ServerData>> servers_;
468 const std::string kRequestMessage_;
469 std::shared_ptr<ChannelCredentials> creds_;
472 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
473 const int kNumServers = 3;
474 StartServers(kNumServers);
475 auto response_generator = BuildResolverResponseGenerator();
476 auto channel = BuildChannel("", response_generator);
477 auto stub = BuildStub(channel);
478 // Initial state should be IDLE.
479 EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
480 // Tell the channel to try to connect.
481 // Note that this call also returns IDLE, since the state change has
482 // not yet occurred; it just gets triggered by this call.
483 EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
484 // Now that the channel is trying to connect, we should be in state
486 EXPECT_EQ(channel->GetState(false /* try_to_connect */),
487 GRPC_CHANNEL_CONNECTING);
488 // Return a resolver result, which allows the connection attempt to proceed.
489 response_generator.SetNextResolution(GetServersPorts());
490 // We should eventually transition into state READY.
491 EXPECT_TRUE(WaitForChannelReady(channel.get()));
494 TEST_F(ClientLbEnd2endTest, PickFirst) {
495 // Start servers and send one RPC per server.
496 const int kNumServers = 3;
497 StartServers(kNumServers);
498 auto response_generator = BuildResolverResponseGenerator();
499 auto channel = BuildChannel(
500 "", response_generator); // test that pick first is the default.
501 auto stub = BuildStub(channel);
502 response_generator.SetNextResolution(GetServersPorts());
503 for (size_t i = 0; i < servers_.size(); ++i) {
504 CheckRpcSendOk(stub, DEBUG_LOCATION);
506 // All requests should have gone to a single server.
508 for (size_t i = 0; i < servers_.size(); ++i) {
509 const int request_count = servers_[i]->service_.request_count();
510 if (request_count == kNumServers) {
513 EXPECT_EQ(0, request_count);
517 // Check LB policy name for the channel.
518 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
521 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
522 StartServers(1); // Single server
523 auto response_generator = BuildResolverResponseGenerator();
524 auto channel = BuildChannel(
525 "", response_generator); // test that pick first is the default.
526 auto stub = BuildStub(channel);
527 response_generator.SetNextResolution({servers_[0]->port_});
528 WaitForServer(stub, 0, DEBUG_LOCATION);
529 // Create a new channel and its corresponding PF LB policy, which will pick
530 // the subchannels in READY state from the previous RPC against the same
531 // target (even if it happened over a different channel, because subchannels
532 // are globally reused). Progress should happen without any transition from
534 auto second_response_generator = BuildResolverResponseGenerator();
535 auto second_channel = BuildChannel("", second_response_generator);
536 auto second_stub = BuildStub(second_channel);
537 second_response_generator.SetNextResolution({servers_[0]->port_});
538 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
541 TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
542 ChannelArguments args;
543 constexpr int kInitialBackOffMs = 5000;
544 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
545 // Create 2 servers, but start only the second one.
546 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
547 grpc_pick_unused_port_or_die()};
548 CreateServers(2, ports);
550 auto response_generator1 = BuildResolverResponseGenerator();
551 auto channel1 = BuildChannel("pick_first", response_generator1, args);
552 auto stub1 = BuildStub(channel1);
553 response_generator1.SetNextResolution(ports);
554 // Wait for second server to be ready.
555 WaitForServer(stub1, 1, DEBUG_LOCATION);
556 // Create a second channel with the same addresses. Its PF instance
557 // should immediately pick the second subchannel, since it's already
559 auto response_generator2 = BuildResolverResponseGenerator();
560 auto channel2 = BuildChannel("pick_first", response_generator2, args);
561 response_generator2.SetNextResolution(ports);
562 // Check that the channel reports READY without waiting for the
564 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
567 TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
568 ChannelArguments args;
569 constexpr int kInitialBackOffMs = 100;
570 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
571 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
572 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
573 auto response_generator = BuildResolverResponseGenerator();
574 auto channel = BuildChannel("pick_first", response_generator, args);
575 auto stub = BuildStub(channel);
576 response_generator.SetNextResolution(ports);
577 // The channel won't become connected (there's no server).
578 ASSERT_FALSE(channel->WaitForConnected(
579 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
580 // Bring up a server on the chosen port.
581 StartServers(1, ports);
583 ASSERT_TRUE(channel->WaitForConnected(
584 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
585 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
586 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
587 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
588 // We should have waited at least kInitialBackOffMs. We substract one to
589 // account for test and precision accuracy drift.
590 EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
591 // But not much more.
594 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
598 TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
599 ChannelArguments args;
600 constexpr int kMinReconnectBackOffMs = 1000;
601 args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
602 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
603 auto response_generator = BuildResolverResponseGenerator();
604 auto channel = BuildChannel("pick_first", response_generator, args);
605 auto stub = BuildStub(channel);
606 response_generator.SetNextResolution(ports);
607 // Make connection delay a 10% longer than it's willing to in order to make
608 // sure we are hitting the codepath that waits for the min reconnect backoff.
609 gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
610 default_client_impl = grpc_tcp_client_impl;
611 grpc_set_tcp_client_impl(&delayed_connect);
612 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
613 channel->WaitForConnected(
614 grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
615 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
616 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
617 gpr_log(GPR_DEBUG, "Waited %" PRId64 " ms", waited_ms);
618 // We should have waited at least kMinReconnectBackOffMs. We substract one to
619 // account for test and precision accuracy drift.
620 EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
621 gpr_atm_rel_store(&g_connection_delay_ms, 0);
624 TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
625 ChannelArguments args;
626 constexpr int kInitialBackOffMs = 1000;
627 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
628 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
629 auto response_generator = BuildResolverResponseGenerator();
630 auto channel = BuildChannel("pick_first", response_generator, args);
631 auto stub = BuildStub(channel);
632 response_generator.SetNextResolution(ports);
633 // The channel won't become connected (there's no server).
635 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
636 // Bring up a server on the chosen port.
637 StartServers(1, ports);
638 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
639 // Wait for connect, but not long enough. This proves that we're
640 // being throttled by initial backoff.
642 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
643 // Reset connection backoff.
644 experimental::ChannelResetConnectionBackoff(channel.get());
645 // Wait for connect. Should happen as soon as the client connects to
646 // the newly started server, which should be before the initial
647 // backoff timeout elapses.
649 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(20)));
650 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
651 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
652 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
653 // We should have waited less than kInitialBackOffMs.
654 EXPECT_LT(waited_ms, kInitialBackOffMs);
657 TEST_F(ClientLbEnd2endTest,
658 PickFirstResetConnectionBackoffNextAttemptStartsImmediately) {
659 ChannelArguments args;
660 constexpr int kInitialBackOffMs = 1000;
661 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
662 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
663 auto response_generator = BuildResolverResponseGenerator();
664 auto channel = BuildChannel("pick_first", response_generator, args);
665 auto stub = BuildStub(channel);
666 response_generator.SetNextResolution(ports);
667 // Wait for connect, which should fail ~immediately, because the server
669 gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
671 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
672 // Reset connection backoff.
673 // Note that the time at which the third attempt will be started is
674 // actually computed at this point, so we record the start time here.
675 gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
676 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
677 experimental::ChannelResetConnectionBackoff(channel.get());
678 // Trigger a second connection attempt. This should also fail
679 // ~immediately, but the retry should be scheduled for
680 // kInitialBackOffMs instead of applying the multiplier.
681 gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
683 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
684 // Bring up a server on the chosen port.
685 gpr_log(GPR_INFO, "=== STARTING BACKEND");
686 StartServers(1, ports);
687 // Wait for connect. Should happen within kInitialBackOffMs.
688 // Give an extra 100ms to account for the time spent in the second and
689 // third connection attempts themselves (since what we really want to
690 // measure is the time between the two). As long as this is less than
691 // the 1.6x increase we would see if the backoff state was not reset
692 // properly, the test is still proving that the backoff was reset.
693 constexpr int kWaitMs = kInitialBackOffMs + 100;
694 gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
695 EXPECT_TRUE(channel->WaitForConnected(
696 grpc_timeout_milliseconds_to_deadline(kWaitMs)));
697 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
698 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
699 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
700 EXPECT_LT(waited_ms, kWaitMs);
703 TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
704 // Start servers and send one RPC per server.
705 const int kNumServers = 3;
706 StartServers(kNumServers);
707 auto response_generator = BuildResolverResponseGenerator();
708 auto channel = BuildChannel("pick_first", response_generator);
709 auto stub = BuildStub(channel);
711 std::vector<int> ports;
713 // Perform one RPC against the first server.
714 ports.emplace_back(servers_[0]->port_);
715 response_generator.SetNextResolution(ports);
716 gpr_log(GPR_INFO, "****** SET [0] *******");
717 CheckRpcSendOk(stub, DEBUG_LOCATION);
718 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
720 // An empty update will result in the channel going into TRANSIENT_FAILURE.
722 response_generator.SetNextResolution(ports);
723 gpr_log(GPR_INFO, "****** SET none *******");
724 grpc_connectivity_state channel_state;
726 channel_state = channel->GetState(true /* try to connect */);
727 } while (channel_state == GRPC_CHANNEL_READY);
728 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
729 servers_[0]->service_.ResetCounters();
731 // Next update introduces servers_[1], making the channel recover.
733 ports.emplace_back(servers_[1]->port_);
734 response_generator.SetNextResolution(ports);
735 gpr_log(GPR_INFO, "****** SET [1] *******");
736 WaitForServer(stub, 1, DEBUG_LOCATION);
737 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
739 // And again for servers_[2]
741 ports.emplace_back(servers_[2]->port_);
742 response_generator.SetNextResolution(ports);
743 gpr_log(GPR_INFO, "****** SET [2] *******");
744 WaitForServer(stub, 2, DEBUG_LOCATION);
745 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
746 EXPECT_EQ(servers_[1]->service_.request_count(), 0);
748 // Check LB policy name for the channel.
749 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
752 TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
753 // Start servers and send one RPC per server.
754 const int kNumServers = 3;
755 StartServers(kNumServers);
756 auto response_generator = BuildResolverResponseGenerator();
757 auto channel = BuildChannel("pick_first", response_generator);
758 auto stub = BuildStub(channel);
760 std::vector<int> ports;
762 // Perform one RPC against the first server.
763 ports.emplace_back(servers_[0]->port_);
764 response_generator.SetNextResolution(ports);
765 gpr_log(GPR_INFO, "****** SET [0] *******");
766 CheckRpcSendOk(stub, DEBUG_LOCATION);
767 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
768 servers_[0]->service_.ResetCounters();
770 // Send and superset update
772 ports.emplace_back(servers_[1]->port_);
773 ports.emplace_back(servers_[0]->port_);
774 response_generator.SetNextResolution(ports);
775 gpr_log(GPR_INFO, "****** SET superset *******");
776 CheckRpcSendOk(stub, DEBUG_LOCATION);
777 // We stick to the previously connected server.
778 WaitForServer(stub, 0, DEBUG_LOCATION);
779 EXPECT_EQ(0, servers_[1]->service_.request_count());
781 // Check LB policy name for the channel.
782 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
785 TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) {
787 const int kNumServers = 1;
788 StartServers(kNumServers);
789 std::vector<int> ports = GetServersPorts();
790 // Create two channels that (by default) use the global subchannel pool.
791 auto response_generator1 = BuildResolverResponseGenerator();
792 auto channel1 = BuildChannel("pick_first", response_generator1);
793 auto stub1 = BuildStub(channel1);
794 response_generator1.SetNextResolution(ports);
795 auto response_generator2 = BuildResolverResponseGenerator();
796 auto channel2 = BuildChannel("pick_first", response_generator2);
797 auto stub2 = BuildStub(channel2);
798 response_generator2.SetNextResolution(ports);
799 WaitForServer(stub1, 0, DEBUG_LOCATION);
800 // Send one RPC on each channel.
801 CheckRpcSendOk(stub1, DEBUG_LOCATION);
802 CheckRpcSendOk(stub2, DEBUG_LOCATION);
803 // The server receives two requests.
804 EXPECT_EQ(2, servers_[0]->service_.request_count());
805 // The two requests are from the same client port, because the two channels
806 // share subchannels via the global subchannel pool.
807 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
810 TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) {
812 const int kNumServers = 1;
813 StartServers(kNumServers);
814 std::vector<int> ports = GetServersPorts();
815 // Create two channels that use local subchannel pool.
816 ChannelArguments args;
817 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
818 auto response_generator1 = BuildResolverResponseGenerator();
819 auto channel1 = BuildChannel("pick_first", response_generator1, args);
820 auto stub1 = BuildStub(channel1);
821 response_generator1.SetNextResolution(ports);
822 auto response_generator2 = BuildResolverResponseGenerator();
823 auto channel2 = BuildChannel("pick_first", response_generator2, args);
824 auto stub2 = BuildStub(channel2);
825 response_generator2.SetNextResolution(ports);
826 WaitForServer(stub1, 0, DEBUG_LOCATION);
827 // Send one RPC on each channel.
828 CheckRpcSendOk(stub1, DEBUG_LOCATION);
829 CheckRpcSendOk(stub2, DEBUG_LOCATION);
830 // The server receives two requests.
831 EXPECT_EQ(2, servers_[0]->service_.request_count());
832 // The two requests are from two client ports, because the two channels didn't
833 // share subchannels with each other.
834 EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
837 TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
838 const int kNumUpdates = 1000;
839 const int kNumServers = 3;
840 StartServers(kNumServers);
841 auto response_generator = BuildResolverResponseGenerator();
842 auto channel = BuildChannel("pick_first", response_generator);
843 auto stub = BuildStub(channel);
844 std::vector<int> ports = GetServersPorts();
845 for (size_t i = 0; i < kNumUpdates; ++i) {
846 std::shuffle(ports.begin(), ports.end(),
847 std::mt19937(std::random_device()()));
848 response_generator.SetNextResolution(ports);
849 // We should re-enter core at the end of the loop to give the resolution
850 // setting closure a chance to run.
851 if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
853 // Check LB policy name for the channel.
854 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
857 TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
858 // Prepare the ports for up servers and down servers.
859 const int kNumServers = 3;
860 const int kNumAliveServers = 1;
861 StartServers(kNumAliveServers);
862 std::vector<int> alive_ports, dead_ports;
863 for (size_t i = 0; i < kNumServers; ++i) {
864 if (i < kNumAliveServers) {
865 alive_ports.emplace_back(servers_[i]->port_);
867 dead_ports.emplace_back(grpc_pick_unused_port_or_die());
870 auto response_generator = BuildResolverResponseGenerator();
871 auto channel = BuildChannel("pick_first", response_generator);
872 auto stub = BuildStub(channel);
873 // The initial resolution only contains dead ports. There won't be any
874 // selected subchannel. Re-resolution will return the same result.
875 response_generator.SetNextResolution(dead_ports);
876 gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
877 for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
878 // Set a re-resolution result that contains reachable ports, so that the
879 // pick_first LB policy can recover soon.
880 response_generator.SetNextResolutionUponError(alive_ports);
881 gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
882 WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
883 CheckRpcSendOk(stub, DEBUG_LOCATION);
884 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
885 // Check LB policy name for the channel.
886 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
889 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
890 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
891 StartServers(1, ports);
892 auto response_generator = BuildResolverResponseGenerator();
893 auto channel = BuildChannel("pick_first", response_generator);
894 auto stub = BuildStub(channel);
895 response_generator.SetNextResolution(ports);
896 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
897 WaitForServer(stub, 0, DEBUG_LOCATION);
898 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
899 servers_[0]->Shutdown();
900 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
901 gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
902 StartServers(1, ports);
903 WaitForServer(stub, 0, DEBUG_LOCATION);
906 TEST_F(ClientLbEnd2endTest,
907 PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
908 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
909 grpc_pick_unused_port_or_die()};
910 CreateServers(2, ports);
912 auto response_generator = BuildResolverResponseGenerator();
913 auto channel = BuildChannel("pick_first", response_generator);
914 auto stub = BuildStub(channel);
915 response_generator.SetNextResolution(ports);
916 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
917 WaitForServer(stub, 1, DEBUG_LOCATION);
918 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
919 servers_[1]->Shutdown();
920 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
921 gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
922 StartServers(2, ports);
923 WaitForServer(stub, 0, DEBUG_LOCATION);
926 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
927 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
928 StartServers(1, ports);
929 auto response_generator = BuildResolverResponseGenerator();
930 auto channel_1 = BuildChannel("pick_first", response_generator);
931 auto stub_1 = BuildStub(channel_1);
932 response_generator.SetNextResolution(ports);
933 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
934 WaitForServer(stub_1, 0, DEBUG_LOCATION);
935 gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
936 servers_[0]->Shutdown();
937 // Channel 1 will receive a re-resolution containing the same server. It will
938 // create a new subchannel and hold a ref to it.
939 StartServers(1, ports);
940 gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
941 auto response_generator_2 = BuildResolverResponseGenerator();
942 auto channel_2 = BuildChannel("pick_first", response_generator_2);
943 auto stub_2 = BuildStub(channel_2);
944 response_generator_2.SetNextResolution(ports);
945 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
946 WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
947 gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
948 servers_[0]->Shutdown();
949 // Wait until the disconnection has triggered the connectivity notification.
950 // Otherwise, the subchannel may be picked for next call but will fail soon.
951 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
952 // Channel 2 will also receive a re-resolution containing the same server.
953 // Both channels will ref the same subchannel that failed.
954 StartServers(1, ports);
955 gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
956 gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
957 // The first call after the server restart will succeed.
958 CheckRpcSendOk(stub_2, DEBUG_LOCATION);
959 gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
960 // Check LB policy name for the channel.
961 EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
962 // Check LB policy name for the channel.
963 EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
966 TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
967 // Start server, send RPC, and make sure channel is READY.
968 const int kNumServers = 1;
969 StartServers(kNumServers);
970 auto response_generator = BuildResolverResponseGenerator();
972 BuildChannel("", response_generator); // pick_first is the default.
973 auto stub = BuildStub(channel);
974 response_generator.SetNextResolution(GetServersPorts());
975 CheckRpcSendOk(stub, DEBUG_LOCATION);
976 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
977 // Stop server. Channel should go into state IDLE.
978 response_generator.SetFailureOnReresolution();
979 servers_[0]->Shutdown();
980 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
981 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
985 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
986 auto response_generator = BuildResolverResponseGenerator();
988 BuildChannel("", response_generator); // pick_first is the default.
989 auto stub = BuildStub(channel);
990 // Create a number of servers, but only start 1 of them.
993 // Initially resolve to first server and make sure it connects.
994 gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
995 response_generator.SetNextResolution({servers_[0]->port_});
996 CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
997 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
998 // Send a resolution update with the remaining servers, none of which are
999 // running yet, so the update will stay pending. Note that it's important
1000 // to have multiple servers here, or else the test will be flaky; with only
1001 // one server, the pending subchannel list has already gone into
1002 // TRANSIENT_FAILURE due to hitting the end of the list by the time we
1005 "Phase 2: Resolver update pointing to remaining "
1006 "(not started) servers.");
1007 response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
1008 // RPCs will continue to be sent to the first server.
1009 CheckRpcSendOk(stub, DEBUG_LOCATION);
1010 // Now stop the first server, so that the current subchannel list
1011 // fails. This should cause us to immediately swap over to the
1012 // pending list, even though it's not yet connected. The state should
1013 // be set to CONNECTING, since that's what the pending subchannel list
1014 // was doing when we swapped over.
1015 gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
1016 servers_[0]->Shutdown();
1017 WaitForChannelNotReady(channel.get());
1018 // TODO(roth): This should always return CONNECTING, but it's flaky
1019 // between that and TRANSIENT_FAILURE. I suspect that this problem
1020 // will go away once we move the backoff code out of the subchannel
1021 // and into the LB policies.
1022 EXPECT_THAT(channel->GetState(false),
1023 ::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
1024 GRPC_CHANNEL_TRANSIENT_FAILURE));
1025 // Now start the second server.
1026 gpr_log(GPR_INFO, "Phase 4: Starting second server.");
1028 // The channel should go to READY state and RPCs should go to the
1030 WaitForChannelReady(channel.get());
1031 WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
1034 TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) {
1035 // Start server, send RPC, and make sure channel is READY.
1036 const int kNumServers = 1;
1037 StartServers(kNumServers);
1038 auto response_generator = BuildResolverResponseGenerator();
1040 BuildChannel("", response_generator); // pick_first is the default.
1041 auto stub = BuildStub(channel);
1042 response_generator.SetNextResolution(GetServersPorts());
1043 CheckRpcSendOk(stub, DEBUG_LOCATION);
1044 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1045 // Stop server. Channel should go into state IDLE.
1046 servers_[0]->Shutdown();
1047 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1048 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1049 // Now send resolver update that includes no addresses. Channel
1050 // should stay in state IDLE.
1051 response_generator.SetNextResolution({});
1052 EXPECT_FALSE(channel->WaitForStateChange(
1053 GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1054 // Now bring the backend back up and send a non-empty resolver update,
1055 // and then try to send an RPC. Channel should go back into state READY.
1057 response_generator.SetNextResolution(GetServersPorts());
1058 CheckRpcSendOk(stub, DEBUG_LOCATION);
1059 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1062 TEST_F(ClientLbEnd2endTest, RoundRobin) {
1063 // Start servers and send one RPC per server.
1064 const int kNumServers = 3;
1065 StartServers(kNumServers);
1066 auto response_generator = BuildResolverResponseGenerator();
1067 auto channel = BuildChannel("round_robin", response_generator);
1068 auto stub = BuildStub(channel);
1069 response_generator.SetNextResolution(GetServersPorts());
1070 // Wait until all backends are ready.
1072 CheckRpcSendOk(stub, DEBUG_LOCATION);
1073 } while (!SeenAllServers());
1075 // "Sync" to the end of the list. Next sequence of picks will start at the
1076 // first server (index 0).
1077 WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
1078 std::vector<int> connection_order;
1079 for (size_t i = 0; i < servers_.size(); ++i) {
1080 CheckRpcSendOk(stub, DEBUG_LOCATION);
1081 UpdateConnectionOrder(servers_, &connection_order);
1083 // Backends should be iterated over in the order in which the addresses were
1085 const auto expected = std::vector<int>{0, 1, 2};
1086 EXPECT_EQ(expected, connection_order);
1087 // Check LB policy name for the channel.
1088 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1091 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
1092 StartServers(1); // Single server
1093 auto response_generator = BuildResolverResponseGenerator();
1094 auto channel = BuildChannel("round_robin", response_generator);
1095 auto stub = BuildStub(channel);
1096 response_generator.SetNextResolution({servers_[0]->port_});
1097 WaitForServer(stub, 0, DEBUG_LOCATION);
1098 // Create a new channel and its corresponding RR LB policy, which will pick
1099 // the subchannels in READY state from the previous RPC against the same
1100 // target (even if it happened over a different channel, because subchannels
1101 // are globally reused). Progress should happen without any transition from
1102 // this READY state.
1103 auto second_response_generator = BuildResolverResponseGenerator();
1104 auto second_channel = BuildChannel("round_robin", second_response_generator);
1105 auto second_stub = BuildStub(second_channel);
1106 second_response_generator.SetNextResolution({servers_[0]->port_});
1107 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
1110 TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
1111 // Start servers and send one RPC per server.
1112 const int kNumServers = 3;
1113 StartServers(kNumServers);
1114 auto response_generator = BuildResolverResponseGenerator();
1115 auto channel = BuildChannel("round_robin", response_generator);
1116 auto stub = BuildStub(channel);
1117 std::vector<int> ports;
1118 // Start with a single server.
1119 gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1120 ports.emplace_back(servers_[0]->port_);
1121 response_generator.SetNextResolution(ports);
1122 WaitForServer(stub, 0, DEBUG_LOCATION);
1123 // Send RPCs. They should all go servers_[0]
1124 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1125 EXPECT_EQ(10, servers_[0]->service_.request_count());
1126 EXPECT_EQ(0, servers_[1]->service_.request_count());
1127 EXPECT_EQ(0, servers_[2]->service_.request_count());
1128 servers_[0]->service_.ResetCounters();
1129 // And now for the second server.
1130 gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1132 ports.emplace_back(servers_[1]->port_);
1133 response_generator.SetNextResolution(ports);
1134 // Wait until update has been processed, as signaled by the second backend
1135 // receiving a request.
1136 EXPECT_EQ(0, servers_[1]->service_.request_count());
1137 WaitForServer(stub, 1, DEBUG_LOCATION);
1138 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1139 EXPECT_EQ(0, servers_[0]->service_.request_count());
1140 EXPECT_EQ(10, servers_[1]->service_.request_count());
1141 EXPECT_EQ(0, servers_[2]->service_.request_count());
1142 servers_[1]->service_.ResetCounters();
1143 // ... and for the last server.
1144 gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1146 ports.emplace_back(servers_[2]->port_);
1147 response_generator.SetNextResolution(ports);
1148 WaitForServer(stub, 2, DEBUG_LOCATION);
1149 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1150 EXPECT_EQ(0, servers_[0]->service_.request_count());
1151 EXPECT_EQ(0, servers_[1]->service_.request_count());
1152 EXPECT_EQ(10, servers_[2]->service_.request_count());
1153 servers_[2]->service_.ResetCounters();
1154 // Back to all servers.
1155 gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1157 ports.emplace_back(servers_[0]->port_);
1158 ports.emplace_back(servers_[1]->port_);
1159 ports.emplace_back(servers_[2]->port_);
1160 response_generator.SetNextResolution(ports);
1161 WaitForServer(stub, 0, DEBUG_LOCATION);
1162 WaitForServer(stub, 1, DEBUG_LOCATION);
1163 WaitForServer(stub, 2, DEBUG_LOCATION);
1164 // Send three RPCs, one per server.
1165 for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1166 EXPECT_EQ(1, servers_[0]->service_.request_count());
1167 EXPECT_EQ(1, servers_[1]->service_.request_count());
1168 EXPECT_EQ(1, servers_[2]->service_.request_count());
1169 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1170 gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1172 response_generator.SetNextResolution(ports);
1173 grpc_connectivity_state channel_state;
1175 channel_state = channel->GetState(true /* try to connect */);
1176 } while (channel_state == GRPC_CHANNEL_READY);
1177 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
1178 servers_[0]->service_.ResetCounters();
1179 // Next update introduces servers_[1], making the channel recover.
1180 gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1182 ports.emplace_back(servers_[1]->port_);
1183 response_generator.SetNextResolution(ports);
1184 WaitForServer(stub, 1, DEBUG_LOCATION);
1185 channel_state = channel->GetState(false /* try to connect */);
1186 ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
1187 // Check LB policy name for the channel.
1188 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1191 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
1192 const int kNumServers = 3;
1193 StartServers(kNumServers);
1194 auto response_generator = BuildResolverResponseGenerator();
1195 auto channel = BuildChannel("round_robin", response_generator);
1196 auto stub = BuildStub(channel);
1197 std::vector<int> ports;
1198 // Start with a single server.
1199 ports.emplace_back(servers_[0]->port_);
1200 response_generator.SetNextResolution(ports);
1201 WaitForServer(stub, 0, DEBUG_LOCATION);
1202 // Send RPCs. They should all go to servers_[0]
1203 for (size_t i = 0; i < 10; ++i) SendRpc(stub);
1204 EXPECT_EQ(10, servers_[0]->service_.request_count());
1205 EXPECT_EQ(0, servers_[1]->service_.request_count());
1206 EXPECT_EQ(0, servers_[2]->service_.request_count());
1207 servers_[0]->service_.ResetCounters();
1208 // Shutdown one of the servers to be sent in the update.
1209 servers_[1]->Shutdown();
1210 ports.emplace_back(servers_[1]->port_);
1211 ports.emplace_back(servers_[2]->port_);
1212 response_generator.SetNextResolution(ports);
1213 WaitForServer(stub, 0, DEBUG_LOCATION);
1214 WaitForServer(stub, 2, DEBUG_LOCATION);
1215 // Send three RPCs, one per server.
1216 for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
1217 // The server in shutdown shouldn't receive any.
1218 EXPECT_EQ(0, servers_[1]->service_.request_count());
1221 TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
1222 // Start servers and send one RPC per server.
1223 const int kNumServers = 3;
1224 StartServers(kNumServers);
1225 auto response_generator = BuildResolverResponseGenerator();
1226 auto channel = BuildChannel("round_robin", response_generator);
1227 auto stub = BuildStub(channel);
1228 std::vector<int> ports = GetServersPorts();
1229 for (size_t i = 0; i < 1000; ++i) {
1230 std::shuffle(ports.begin(), ports.end(),
1231 std::mt19937(std::random_device()()));
1232 response_generator.SetNextResolution(ports);
1233 if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
1235 // Check LB policy name for the channel.
1236 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1239 TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
1240 // TODO(dgq): replicate the way internal testing exercises the concurrent
1241 // update provisions of RR.
1244 TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
1245 // Start servers and send one RPC per server.
1246 const int kNumServers = 3;
1247 std::vector<int> first_ports;
1248 std::vector<int> second_ports;
1249 first_ports.reserve(kNumServers);
1250 for (int i = 0; i < kNumServers; ++i) {
1251 first_ports.push_back(grpc_pick_unused_port_or_die());
1253 second_ports.reserve(kNumServers);
1254 for (int i = 0; i < kNumServers; ++i) {
1255 second_ports.push_back(grpc_pick_unused_port_or_die());
1257 StartServers(kNumServers, first_ports);
1258 auto response_generator = BuildResolverResponseGenerator();
1259 auto channel = BuildChannel("round_robin", response_generator);
1260 auto stub = BuildStub(channel);
1261 response_generator.SetNextResolution(first_ports);
1262 // Send a number of RPCs, which succeed.
1263 for (size_t i = 0; i < 100; ++i) {
1264 CheckRpcSendOk(stub, DEBUG_LOCATION);
1267 gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
1268 for (size_t i = 0; i < servers_.size(); ++i) {
1269 servers_[i]->Shutdown();
1271 gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
1272 gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
1273 // Client requests should fail. Send enough to tickle all subchannels.
1274 for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
1275 gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
1276 // Bring servers back up on a different set of ports. We need to do this to be
1277 // sure that the eventual success is *not* due to subchannel reconnection
1278 // attempts and that an actual re-resolution has happened as a result of the
1279 // RR policy going into transient failure when all its subchannels become
1280 // unavailable (in transient failure as well).
1281 gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
1282 StartServers(kNumServers, second_ports);
1283 // Don't notify of the update. Wait for the LB policy's re-resolution to
1284 // "pull" the new ports.
1285 response_generator.SetNextResolutionUponError(second_ports);
1286 gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
1287 gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
1288 // Client request should eventually (but still fairly soon) succeed.
1289 const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
1290 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1291 while (gpr_time_cmp(deadline, now) > 0) {
1292 if (SendRpc(stub)) break;
1293 now = gpr_now(GPR_CLOCK_MONOTONIC);
1295 ASSERT_GT(gpr_time_cmp(deadline, now), 0);
1298 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) {
1299 // Start servers and create channel. Channel should go to READY state.
1300 const int kNumServers = 3;
1301 StartServers(kNumServers);
1302 auto response_generator = BuildResolverResponseGenerator();
1303 auto channel = BuildChannel("round_robin", response_generator);
1304 auto stub = BuildStub(channel);
1305 response_generator.SetNextResolution(GetServersPorts());
1306 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1307 // Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
1308 // TODO(roth): This test should ideally check that even when the
1309 // subchannels are in state CONNECTING for an extended period of time,
1310 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1311 // currently have a good way to get a subchannel to report CONNECTING
1312 // for a long period of time, since the servers in this test framework
1313 // are on the loopback interface, which will immediately return a
1314 // "Connection refused" error, so the subchannels will only be in
1315 // CONNECTING state very briefly. When we have time, see if we can
1316 // find a way to fix this.
1317 for (size_t i = 0; i < servers_.size(); ++i) {
1318 servers_[i]->Shutdown();
1320 auto predicate = [](grpc_connectivity_state state) {
1321 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1323 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1326 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) {
1327 // Create channel and return servers that don't exist. Channel should
1328 // quickly transition into TRANSIENT_FAILURE.
1329 // TODO(roth): This test should ideally check that even when the
1330 // subchannels are in state CONNECTING for an extended period of time,
1331 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1332 // currently have a good way to get a subchannel to report CONNECTING
1333 // for a long period of time, since the servers in this test framework
1334 // are on the loopback interface, which will immediately return a
1335 // "Connection refused" error, so the subchannels will only be in
1336 // CONNECTING state very briefly. When we have time, see if we can
1337 // find a way to fix this.
1338 auto response_generator = BuildResolverResponseGenerator();
1339 auto channel = BuildChannel("round_robin", response_generator);
1340 auto stub = BuildStub(channel);
1341 response_generator.SetNextResolution({
1342 grpc_pick_unused_port_or_die(),
1343 grpc_pick_unused_port_or_die(),
1344 grpc_pick_unused_port_or_die(),
1346 for (size_t i = 0; i < servers_.size(); ++i) {
1347 servers_[i]->Shutdown();
1349 auto predicate = [](grpc_connectivity_state state) {
1350 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1352 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1355 TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
1356 const int kNumServers = 3;
1357 StartServers(kNumServers);
1358 const auto ports = GetServersPorts();
1359 auto response_generator = BuildResolverResponseGenerator();
1360 auto channel = BuildChannel("round_robin", response_generator);
1361 auto stub = BuildStub(channel);
1362 response_generator.SetNextResolution(ports);
1363 for (size_t i = 0; i < kNumServers; ++i) {
1364 WaitForServer(stub, i, DEBUG_LOCATION);
1366 for (size_t i = 0; i < servers_.size(); ++i) {
1367 CheckRpcSendOk(stub, DEBUG_LOCATION);
1368 EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1370 // One request should have gone to each server.
1371 for (size_t i = 0; i < servers_.size(); ++i) {
1372 EXPECT_EQ(1, servers_[i]->service_.request_count());
1374 const auto pre_death = servers_[0]->service_.request_count();
1375 // Kill the first server.
1376 servers_[0]->Shutdown();
1377 // Client request still succeed. May need retrying if RR had returned a pick
1378 // before noticing the change in the server's connectivity.
1379 while (!SendRpc(stub)) {
1380 } // Retry until success.
1381 // Send a bunch of RPCs that should succeed.
1382 for (int i = 0; i < 10 * kNumServers; ++i) {
1383 CheckRpcSendOk(stub, DEBUG_LOCATION);
1385 const auto post_death = servers_[0]->service_.request_count();
1386 // No requests have gone to the deceased server.
1387 EXPECT_EQ(pre_death, post_death);
1388 // Bring the first server back up.
1390 // Requests should start arriving at the first server either right away (if
1391 // the server managed to start before the RR policy retried the subchannel) or
1392 // after the subchannel retry delay otherwise (RR's subchannel retried before
1393 // the server was fully back up).
1394 WaitForServer(stub, 0, DEBUG_LOCATION);
1397 // If health checking is required by client but health checking service
1398 // is not running on the server, the channel should be treated as healthy.
1399 TEST_F(ClientLbEnd2endTest,
1400 RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) {
1401 StartServers(1); // Single server
1402 ChannelArguments args;
1403 args.SetServiceConfigJSON(
1404 "{\"healthCheckConfig\": "
1405 "{\"serviceName\": \"health_check_service_name\"}}");
1406 auto response_generator = BuildResolverResponseGenerator();
1407 auto channel = BuildChannel("round_robin", response_generator, args);
1408 auto stub = BuildStub(channel);
1409 response_generator.SetNextResolution({servers_[0]->port_});
1410 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1411 CheckRpcSendOk(stub, DEBUG_LOCATION);
1414 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) {
1415 EnableDefaultHealthCheckService(true);
1417 const int kNumServers = 3;
1418 StartServers(kNumServers);
1419 ChannelArguments args;
1420 args.SetServiceConfigJSON(
1421 "{\"healthCheckConfig\": "
1422 "{\"serviceName\": \"health_check_service_name\"}}");
1423 auto response_generator = BuildResolverResponseGenerator();
1424 auto channel = BuildChannel("round_robin", response_generator, args);
1425 auto stub = BuildStub(channel);
1426 response_generator.SetNextResolution(GetServersPorts());
1427 // Channel should not become READY, because health checks should be failing.
1429 "*** initial state: unknown health check service name for "
1431 EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1432 // Now set one of the servers to be healthy.
1433 // The channel should become healthy and all requests should go to
1434 // the healthy server.
1435 gpr_log(GPR_INFO, "*** server 0 healthy");
1436 servers_[0]->SetServingStatus("health_check_service_name", true);
1437 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1438 for (int i = 0; i < 10; ++i) {
1439 CheckRpcSendOk(stub, DEBUG_LOCATION);
1441 EXPECT_EQ(10, servers_[0]->service_.request_count());
1442 EXPECT_EQ(0, servers_[1]->service_.request_count());
1443 EXPECT_EQ(0, servers_[2]->service_.request_count());
1444 // Now set a second server to be healthy.
1445 gpr_log(GPR_INFO, "*** server 2 healthy");
1446 servers_[2]->SetServingStatus("health_check_service_name", true);
1447 WaitForServer(stub, 2, DEBUG_LOCATION);
1448 for (int i = 0; i < 10; ++i) {
1449 CheckRpcSendOk(stub, DEBUG_LOCATION);
1451 EXPECT_EQ(5, servers_[0]->service_.request_count());
1452 EXPECT_EQ(0, servers_[1]->service_.request_count());
1453 EXPECT_EQ(5, servers_[2]->service_.request_count());
1454 // Now set the remaining server to be healthy.
1455 gpr_log(GPR_INFO, "*** server 1 healthy");
1456 servers_[1]->SetServingStatus("health_check_service_name", true);
1457 WaitForServer(stub, 1, DEBUG_LOCATION);
1458 for (int i = 0; i < 9; ++i) {
1459 CheckRpcSendOk(stub, DEBUG_LOCATION);
1461 EXPECT_EQ(3, servers_[0]->service_.request_count());
1462 EXPECT_EQ(3, servers_[1]->service_.request_count());
1463 EXPECT_EQ(3, servers_[2]->service_.request_count());
1464 // Now set one server to be unhealthy again. Then wait until the
1465 // unhealthiness has hit the client. We know that the client will see
1466 // this when we send kNumServers requests and one of the remaining servers
1467 // sees two of the requests.
1468 gpr_log(GPR_INFO, "*** server 0 unhealthy");
1469 servers_[0]->SetServingStatus("health_check_service_name", false);
1472 for (int i = 0; i < kNumServers; ++i) {
1473 CheckRpcSendOk(stub, DEBUG_LOCATION);
1475 } while (servers_[1]->service_.request_count() != 2 &&
1476 servers_[2]->service_.request_count() != 2);
1477 // Now set the remaining two servers to be unhealthy. Make sure the
1478 // channel leaves READY state and that RPCs fail.
1479 gpr_log(GPR_INFO, "*** all servers unhealthy");
1480 servers_[1]->SetServingStatus("health_check_service_name", false);
1481 servers_[2]->SetServingStatus("health_check_service_name", false);
1482 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1483 CheckRpcSendFailure(stub);
1485 EnableDefaultHealthCheckService(false);
1488 TEST_F(ClientLbEnd2endTest,
1489 RoundRobinWithHealthCheckingHandlesSubchannelFailure) {
1490 EnableDefaultHealthCheckService(true);
1492 const int kNumServers = 3;
1493 StartServers(kNumServers);
1494 servers_[0]->SetServingStatus("health_check_service_name", true);
1495 servers_[1]->SetServingStatus("health_check_service_name", true);
1496 servers_[2]->SetServingStatus("health_check_service_name", true);
1497 ChannelArguments args;
1498 args.SetServiceConfigJSON(
1499 "{\"healthCheckConfig\": "
1500 "{\"serviceName\": \"health_check_service_name\"}}");
1501 auto response_generator = BuildResolverResponseGenerator();
1502 auto channel = BuildChannel("round_robin", response_generator, args);
1503 auto stub = BuildStub(channel);
1504 response_generator.SetNextResolution(GetServersPorts());
1505 WaitForServer(stub, 0, DEBUG_LOCATION);
1506 // Stop server 0 and send a new resolver result to ensure that RR
1507 // checks each subchannel's state.
1508 servers_[0]->Shutdown();
1509 response_generator.SetNextResolution(GetServersPorts());
1510 // Send a bunch more RPCs.
1511 for (size_t i = 0; i < 100; i++) {
1516 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
1517 EnableDefaultHealthCheckService(true);
1519 const int kNumServers = 1;
1520 StartServers(kNumServers);
1521 // Create a channel with health-checking enabled.
1522 ChannelArguments args;
1523 args.SetServiceConfigJSON(
1524 "{\"healthCheckConfig\": "
1525 "{\"serviceName\": \"health_check_service_name\"}}");
1526 auto response_generator1 = BuildResolverResponseGenerator();
1527 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1528 auto stub1 = BuildStub(channel1);
1529 std::vector<int> ports = GetServersPorts();
1530 response_generator1.SetNextResolution(ports);
1531 // Create a channel with health checking enabled but inhibited.
1532 args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1533 auto response_generator2 = BuildResolverResponseGenerator();
1534 auto channel2 = BuildChannel("round_robin", response_generator2, args);
1535 auto stub2 = BuildStub(channel2);
1536 response_generator2.SetNextResolution(ports);
1537 // First channel should not become READY, because health checks should be
1539 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1540 CheckRpcSendFailure(stub1);
1541 // Second channel should be READY.
1542 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1543 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1544 // Enable health checks on the backend and wait for channel 1 to succeed.
1545 servers_[0]->SetServingStatus("health_check_service_name", true);
1546 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1547 // Check that we created only one subchannel to the backend.
1548 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1550 EnableDefaultHealthCheckService(false);
1553 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
1554 EnableDefaultHealthCheckService(true);
1556 const int kNumServers = 1;
1557 StartServers(kNumServers);
1558 // Create a channel with health-checking enabled.
1559 ChannelArguments args;
1560 args.SetServiceConfigJSON(
1561 "{\"healthCheckConfig\": "
1562 "{\"serviceName\": \"health_check_service_name\"}}");
1563 auto response_generator1 = BuildResolverResponseGenerator();
1564 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1565 auto stub1 = BuildStub(channel1);
1566 std::vector<int> ports = GetServersPorts();
1567 response_generator1.SetNextResolution(ports);
1568 // Create a channel with health-checking enabled with a different
1570 ChannelArguments args2;
1571 args2.SetServiceConfigJSON(
1572 "{\"healthCheckConfig\": "
1573 "{\"serviceName\": \"health_check_service_name2\"}}");
1574 auto response_generator2 = BuildResolverResponseGenerator();
1575 auto channel2 = BuildChannel("round_robin", response_generator2, args2);
1576 auto stub2 = BuildStub(channel2);
1577 response_generator2.SetNextResolution(ports);
1578 // Allow health checks from channel 2 to succeed.
1579 servers_[0]->SetServingStatus("health_check_service_name2", true);
1580 // First channel should not become READY, because health checks should be
1582 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1583 CheckRpcSendFailure(stub1);
1584 // Second channel should be READY.
1585 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1586 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1587 // Enable health checks for channel 1 and wait for it to succeed.
1588 servers_[0]->SetServingStatus("health_check_service_name", true);
1589 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1590 // Check that we created only one subchannel to the backend.
1591 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1593 EnableDefaultHealthCheckService(false);
1596 TEST_F(ClientLbEnd2endTest,
1597 RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) {
1598 EnableDefaultHealthCheckService(true);
1600 const int kNumServers = 1;
1601 StartServers(kNumServers);
1602 // Create a channel with health-checking enabled.
1603 const char* kServiceConfigJson =
1604 "{\"healthCheckConfig\": "
1605 "{\"serviceName\": \"health_check_service_name\"}}";
1606 auto response_generator = BuildResolverResponseGenerator();
1607 auto channel = BuildChannel("round_robin", response_generator);
1608 auto stub = BuildStub(channel);
1609 std::vector<int> ports = GetServersPorts();
1610 response_generator.SetNextResolution(ports, kServiceConfigJson);
1611 servers_[0]->SetServingStatus("health_check_service_name", true);
1612 EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
1613 // Send an update on the channel to change it to use a health checking
1614 // service name that is not being reported as healthy.
1615 const char* kServiceConfigJson2 =
1616 "{\"healthCheckConfig\": "
1617 "{\"serviceName\": \"health_check_service_name2\"}}";
1618 response_generator.SetNextResolution(ports, kServiceConfigJson2);
1619 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1621 EnableDefaultHealthCheckService(false);
1624 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
1626 const int kNumServers = 1;
1627 StartServers(kNumServers);
1628 // Set max idle time and build the channel.
1629 ChannelArguments args;
1630 args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
1631 auto response_generator = BuildResolverResponseGenerator();
1632 auto channel = BuildChannel("", response_generator, args);
1633 auto stub = BuildStub(channel);
1634 // The initial channel state should be IDLE.
1635 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1636 // After sending RPC, channel state should be READY.
1637 response_generator.SetNextResolution(GetServersPorts());
1638 CheckRpcSendOk(stub, DEBUG_LOCATION);
1639 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1640 // After a period time not using the channel, the channel state should switch
1642 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
1643 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1644 // Sending a new RPC should awake the IDLE channel.
1645 response_generator.SetNextResolution(GetServersPorts());
1646 CheckRpcSendOk(stub, DEBUG_LOCATION);
1647 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1650 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
1652 void SetUp() override {
1653 ClientLbEnd2endTest::SetUp();
1654 current_test_instance_ = this;
1657 static void SetUpTestCase() {
1659 grpc_core::RegisterTestPickArgsLoadBalancingPolicy(SavePickArgs);
1662 static void TearDownTestCase() { grpc_shutdown_blocking(); }
1664 const std::vector<grpc_core::PickArgsSeen>& args_seen_list() {
1665 grpc::internal::MutexLock lock(&mu_);
1666 return args_seen_list_;
1670 static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
1671 ClientLbPickArgsTest* self = current_test_instance_;
1672 grpc::internal::MutexLock lock(&self->mu_);
1673 self->args_seen_list_.emplace_back(args_seen);
1676 static ClientLbPickArgsTest* current_test_instance_;
1677 grpc::internal::Mutex mu_;
1678 std::vector<grpc_core::PickArgsSeen> args_seen_list_;
1681 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
1683 TEST_F(ClientLbPickArgsTest, Basic) {
1684 const int kNumServers = 1;
1685 StartServers(kNumServers);
1686 auto response_generator = BuildResolverResponseGenerator();
1687 auto channel = BuildChannel("test_pick_args_lb", response_generator);
1688 auto stub = BuildStub(channel);
1689 response_generator.SetNextResolution(GetServersPorts());
1690 CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/true);
1691 // Check LB policy name for the channel.
1692 EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
1693 // There will be two entries, one for the pick tried in state
1694 // CONNECTING and another for the pick tried in state READY.
1695 EXPECT_THAT(args_seen_list(),
1696 ::testing::ElementsAre(
1698 ::testing::Field(&grpc_core::PickArgsSeen::path,
1699 "/grpc.testing.EchoTestService/Echo"),
1700 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
1701 ::testing::UnorderedElementsAre(
1702 ::testing::Pair("foo", "1"),
1703 ::testing::Pair("bar", "2"),
1704 ::testing::Pair("baz", "3")))),
1706 ::testing::Field(&grpc_core::PickArgsSeen::path,
1707 "/grpc.testing.EchoTestService/Echo"),
1708 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
1709 ::testing::UnorderedElementsAre(
1710 ::testing::Pair("foo", "1"),
1711 ::testing::Pair("bar", "2"),
1712 ::testing::Pair("baz", "3"))))));
1715 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
1717 void SetUp() override {
1718 ClientLbEnd2endTest::SetUp();
1719 current_test_instance_ = this;
1722 static void SetUpTestCase() {
1724 grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
1725 ReportTrailerIntercepted);
1728 static void TearDownTestCase() { grpc_shutdown_blocking(); }
1730 int trailers_intercepted() {
1731 grpc::internal::MutexLock lock(&mu_);
1732 return trailers_intercepted_;
1735 const grpc_core::MetadataVector& trailing_metadata() {
1736 grpc::internal::MutexLock lock(&mu_);
1737 return trailing_metadata_;
1740 const udpa::data::orca::v1::OrcaLoadReport* backend_load_report() {
1741 grpc::internal::MutexLock lock(&mu_);
1742 return load_report_.get();
1746 static void ReportTrailerIntercepted(
1747 const grpc_core::TrailingMetadataArgsSeen& args_seen) {
1748 const auto* backend_metric_data = args_seen.backend_metric_data;
1749 ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
1750 grpc::internal::MutexLock lock(&self->mu_);
1751 self->trailers_intercepted_++;
1752 self->trailing_metadata_ = args_seen.metadata;
1753 if (backend_metric_data != nullptr) {
1754 self->load_report_.reset(new udpa::data::orca::v1::OrcaLoadReport);
1755 self->load_report_->set_cpu_utilization(
1756 backend_metric_data->cpu_utilization);
1757 self->load_report_->set_mem_utilization(
1758 backend_metric_data->mem_utilization);
1759 self->load_report_->set_rps(backend_metric_data->requests_per_second);
1760 for (const auto& p : backend_metric_data->request_cost) {
1761 std::string name = std::string(p.first);
1762 (*self->load_report_->mutable_request_cost())[std::move(name)] =
1765 for (const auto& p : backend_metric_data->utilization) {
1766 std::string name = std::string(p.first);
1767 (*self->load_report_->mutable_utilization())[std::move(name)] =
1773 static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
1774 grpc::internal::Mutex mu_;
1775 int trailers_intercepted_ = 0;
1776 grpc_core::MetadataVector trailing_metadata_;
1777 std::unique_ptr<udpa::data::orca::v1::OrcaLoadReport> load_report_;
1780 ClientLbInterceptTrailingMetadataTest*
1781 ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr;
1783 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
1784 const int kNumServers = 1;
1785 const int kNumRpcs = 10;
1786 StartServers(kNumServers);
1787 auto response_generator = BuildResolverResponseGenerator();
1789 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1790 auto stub = BuildStub(channel);
1791 response_generator.SetNextResolution(GetServersPorts());
1792 for (size_t i = 0; i < kNumRpcs; ++i) {
1793 CheckRpcSendOk(stub, DEBUG_LOCATION);
1795 // Check LB policy name for the channel.
1796 EXPECT_EQ("intercept_trailing_metadata_lb",
1797 channel->GetLoadBalancingPolicyName());
1798 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1799 EXPECT_THAT(trailing_metadata(),
1800 ::testing::UnorderedElementsAre(
1801 // TODO(roth): Should grpc-status be visible here?
1802 ::testing::Pair("grpc-status", "0"),
1803 ::testing::Pair("user-agent", ::testing::_),
1804 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
1805 ::testing::Pair("baz", "3")));
1806 EXPECT_EQ(nullptr, backend_load_report());
1809 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
1810 const int kNumServers = 1;
1811 const int kNumRpcs = 10;
1812 StartServers(kNumServers);
1813 ChannelArguments args;
1814 args.SetServiceConfigJSON(
1816 " \"methodConfig\": [ {\n"
1818 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
1820 " \"retryPolicy\": {\n"
1821 " \"maxAttempts\": 3,\n"
1822 " \"initialBackoff\": \"1s\",\n"
1823 " \"maxBackoff\": \"120s\",\n"
1824 " \"backoffMultiplier\": 1.6,\n"
1825 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
1829 auto response_generator = BuildResolverResponseGenerator();
1831 BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
1832 auto stub = BuildStub(channel);
1833 response_generator.SetNextResolution(GetServersPorts());
1834 for (size_t i = 0; i < kNumRpcs; ++i) {
1835 CheckRpcSendOk(stub, DEBUG_LOCATION);
1837 // Check LB policy name for the channel.
1838 EXPECT_EQ("intercept_trailing_metadata_lb",
1839 channel->GetLoadBalancingPolicyName());
1840 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1841 EXPECT_THAT(trailing_metadata(),
1842 ::testing::UnorderedElementsAre(
1843 // TODO(roth): Should grpc-status be visible here?
1844 ::testing::Pair("grpc-status", "0"),
1845 ::testing::Pair("user-agent", ::testing::_),
1846 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
1847 ::testing::Pair("baz", "3")));
1848 EXPECT_EQ(nullptr, backend_load_report());
1851 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
1852 const int kNumServers = 1;
1853 const int kNumRpcs = 10;
1854 StartServers(kNumServers);
1855 udpa::data::orca::v1::OrcaLoadReport load_report;
1856 load_report.set_cpu_utilization(0.5);
1857 load_report.set_mem_utilization(0.75);
1858 load_report.set_rps(25);
1859 auto* request_cost = load_report.mutable_request_cost();
1860 (*request_cost)["foo"] = 0.8;
1861 (*request_cost)["bar"] = 1.4;
1862 auto* utilization = load_report.mutable_utilization();
1863 (*utilization)["baz"] = 1.1;
1864 (*utilization)["quux"] = 0.9;
1865 for (const auto& server : servers_) {
1866 server->service_.set_load_report(&load_report);
1868 auto response_generator = BuildResolverResponseGenerator();
1870 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1871 auto stub = BuildStub(channel);
1872 response_generator.SetNextResolution(GetServersPorts());
1873 for (size_t i = 0; i < kNumRpcs; ++i) {
1874 CheckRpcSendOk(stub, DEBUG_LOCATION);
1875 auto* actual = backend_load_report();
1876 ASSERT_NE(actual, nullptr);
1877 // TODO(roth): Change this to use EqualsProto() once that becomes
1878 // available in OSS.
1879 EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
1880 EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
1881 EXPECT_EQ(actual->rps(), load_report.rps());
1882 EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
1883 for (const auto& p : actual->request_cost()) {
1884 auto it = load_report.request_cost().find(p.first);
1885 ASSERT_NE(it, load_report.request_cost().end());
1886 EXPECT_EQ(it->second, p.second);
1888 EXPECT_EQ(actual->utilization().size(), load_report.utilization().size());
1889 for (const auto& p : actual->utilization()) {
1890 auto it = load_report.utilization().find(p.first);
1891 ASSERT_NE(it, load_report.utilization().end());
1892 EXPECT_EQ(it->second, p.second);
1895 // Check LB policy name for the channel.
1896 EXPECT_EQ("intercept_trailing_metadata_lb",
1897 channel->GetLoadBalancingPolicyName());
1898 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1901 class ClientLbAddressTest : public ClientLbEnd2endTest {
1903 static const char* kAttributeKey;
1905 class Attribute : public grpc_core::ServerAddress::AttributeInterface {
1907 explicit Attribute(const std::string& str) : str_(str) {}
1909 std::unique_ptr<AttributeInterface> Copy() const override {
1910 return absl::make_unique<Attribute>(str_);
1913 int Cmp(const AttributeInterface* other) const override {
1914 return str_.compare(static_cast<const Attribute*>(other)->str_);
1917 std::string ToString() const override { return str_; }
1923 void SetUp() override {
1924 ClientLbEnd2endTest::SetUp();
1925 current_test_instance_ = this;
1928 static void SetUpTestCase() {
1930 grpc_core::RegisterAddressTestLoadBalancingPolicy(SaveAddress);
1933 static void TearDownTestCase() { grpc_shutdown_blocking(); }
1935 const std::vector<std::string>& addresses_seen() {
1936 grpc::internal::MutexLock lock(&mu_);
1937 return addresses_seen_;
1941 static void SaveAddress(const grpc_core::ServerAddress& address) {
1942 ClientLbAddressTest* self = current_test_instance_;
1943 grpc::internal::MutexLock lock(&self->mu_);
1944 self->addresses_seen_.emplace_back(address.ToString());
1947 static ClientLbAddressTest* current_test_instance_;
1948 grpc::internal::Mutex mu_;
1949 std::vector<std::string> addresses_seen_;
1952 const char* ClientLbAddressTest::kAttributeKey = "attribute_key";
1954 ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
1956 TEST_F(ClientLbAddressTest, Basic) {
1957 const int kNumServers = 1;
1958 StartServers(kNumServers);
1959 auto response_generator = BuildResolverResponseGenerator();
1960 auto channel = BuildChannel("address_test_lb", response_generator);
1961 auto stub = BuildStub(channel);
1962 // Addresses returned by the resolver will have attached attributes.
1963 response_generator.SetNextResolution(GetServersPorts(), nullptr,
1965 absl::make_unique<Attribute>("foo"));
1966 CheckRpcSendOk(stub, DEBUG_LOCATION);
1967 // Check LB policy name for the channel.
1968 EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
1969 // Make sure that the attributes wind up on the subchannels.
1970 std::vector<std::string> expected;
1971 for (const int port : GetServersPorts()) {
1972 expected.emplace_back(absl::StrCat(
1973 "127.0.0.1:", port, " args={} attributes={", kAttributeKey, "=foo}"));
1975 EXPECT_EQ(addresses_seen(), expected);
1979 } // namespace testing
1982 int main(int argc, char** argv) {
1983 ::testing::InitGoogleTest(&argc, argv);
1984 grpc::testing::TestEnvironment env(argc, argv);
1985 const auto result = RUN_ALL_TESTS();