3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34 #include <grpcpp/create_channel.h>
35 #include <grpcpp/health_check_service_interface.h>
36 #include <grpcpp/impl/codegen/sync.h>
37 #include <grpcpp/server.h>
38 #include <grpcpp/server_builder.h>
40 #include "src/core/ext/filters/client_channel/backup_poller.h"
41 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
42 #include "src/core/ext/filters/client_channel/parse_address.h"
43 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
44 #include "src/core/ext/filters/client_channel/server_address.h"
45 #include "src/core/lib/backoff/backoff.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/iomgr/tcp_client.h"
50 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
51 #include "src/cpp/client/secure_credentials.h"
52 #include "src/cpp/server/secure_server_credentials.h"
54 #include "src/proto/grpc/testing/echo.grpc.pb.h"
55 #include "test/core/util/port.h"
56 #include "test/core/util/test_config.h"
57 #include "test/core/util/test_lb_policies.h"
58 #include "test/cpp/end2end/test_service_impl.h"
60 #include <gmock/gmock.h>
61 #include <gtest/gtest.h>
63 using grpc::testing::EchoRequest;
64 using grpc::testing::EchoResponse;
65 using std::chrono::system_clock;
67 // defined in tcp_client.cc
68 extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
70 static grpc_tcp_client_vtable* default_client_impl;
76 gpr_atm g_connection_delay_ms;
78 void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep,
79 grpc_pollset_set* interested_parties,
80 const grpc_channel_args* channel_args,
81 const grpc_resolved_address* addr,
82 grpc_millis deadline) {
83 const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms);
85 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
87 default_client_impl->connect(closure, ep, interested_parties, channel_args,
88 addr, deadline + delay_ms);
91 grpc_tcp_client_vtable delayed_connect = {tcp_client_connect_with_delay};
93 // Subclass of TestServiceImpl that increments a request counter for
94 // every call to the Echo RPC.
95 class MyTestServiceImpl : public TestServiceImpl {
97 MyTestServiceImpl() : request_count_(0) {}
99 Status Echo(ServerContext* context, const EchoRequest* request,
100 EchoResponse* response) override {
102 grpc::internal::MutexLock lock(&mu_);
105 AddClient(context->peer());
106 return TestServiceImpl::Echo(context, request, response);
109 int request_count() {
110 grpc::internal::MutexLock lock(&mu_);
111 return request_count_;
114 void ResetCounters() {
115 grpc::internal::MutexLock lock(&mu_);
119 std::set<grpc::string> clients() {
120 grpc::internal::MutexLock lock(&clients_mu_);
125 void AddClient(const grpc::string& client) {
126 grpc::internal::MutexLock lock(&clients_mu_);
127 clients_.insert(client);
130 grpc::internal::Mutex mu_;
132 grpc::internal::Mutex clients_mu_;
133 std::set<grpc::string> clients_;
136 class FakeResolverResponseGeneratorWrapper {
138 FakeResolverResponseGeneratorWrapper()
139 : response_generator_(grpc_core::MakeRefCounted<
140 grpc_core::FakeResolverResponseGenerator>()) {}
142 FakeResolverResponseGeneratorWrapper(
143 FakeResolverResponseGeneratorWrapper&& other) {
144 response_generator_ = std::move(other.response_generator_);
147 void SetNextResolution(const std::vector<int>& ports) {
148 grpc_core::ExecCtx exec_ctx;
149 response_generator_->SetResponse(BuildFakeResults(ports));
152 void SetNextResolutionUponError(const std::vector<int>& ports) {
153 grpc_core::ExecCtx exec_ctx;
154 response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
157 void SetFailureOnReresolution() {
158 grpc_core::ExecCtx exec_ctx;
159 response_generator_->SetFailureOnReresolution();
162 grpc_core::FakeResolverResponseGenerator* Get() const {
163 return response_generator_.get();
167 static grpc_core::Resolver::Result BuildFakeResults(
168 const std::vector<int>& ports) {
169 grpc_core::Resolver::Result result;
170 for (const int& port : ports) {
172 gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
173 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
174 GPR_ASSERT(lb_uri != nullptr);
175 grpc_resolved_address address;
176 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
177 result.addresses.emplace_back(address.addr, address.len,
179 grpc_uri_destroy(lb_uri);
180 gpr_free(lb_uri_str);
185 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
189 class ClientLbEnd2endTest : public ::testing::Test {
191 ClientLbEnd2endTest()
192 : server_host_("localhost"),
193 kRequestMessage_("Live long and prosper."),
194 creds_(new SecureChannelCredentials(
195 grpc_fake_transport_security_credentials_create())) {}
197 static void SetUpTestCase() {
198 // Make the backup poller poll very frequently in order to pick up
199 // updates from all the subchannels's FDs.
200 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
203 void SetUp() override { grpc_init(); }
205 void TearDown() override {
206 for (size_t i = 0; i < servers_.size(); ++i) {
207 servers_[i]->Shutdown();
209 // Explicitly destroy all the members so that we can make sure grpc_shutdown
210 // has finished by the end of this function, and thus all the registered
211 // LB policy factories are removed.
214 grpc_shutdown_blocking();
217 void CreateServers(size_t num_servers,
218 std::vector<int> ports = std::vector<int>()) {
220 for (size_t i = 0; i < num_servers; ++i) {
222 if (ports.size() == num_servers) port = ports[i];
223 servers_.emplace_back(new ServerData(port));
227 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
229 void StartServers(size_t num_servers,
230 std::vector<int> ports = std::vector<int>()) {
231 CreateServers(num_servers, std::move(ports));
232 for (size_t i = 0; i < num_servers; ++i) {
237 std::vector<int> GetServersPorts(size_t start_index = 0) {
238 std::vector<int> ports;
239 for (size_t i = start_index; i < servers_.size(); ++i) {
240 ports.push_back(servers_[i]->port_);
245 FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
246 return FakeResolverResponseGeneratorWrapper();
249 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
250 const std::shared_ptr<Channel>& channel) {
251 return grpc::testing::EchoTestService::NewStub(channel);
254 std::shared_ptr<Channel> BuildChannel(
255 const grpc::string& lb_policy_name,
256 const FakeResolverResponseGeneratorWrapper& response_generator,
257 ChannelArguments args = ChannelArguments()) {
258 if (lb_policy_name.size() > 0) {
259 args.SetLoadBalancingPolicyName(lb_policy_name);
260 } // else, default to pick first
261 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
262 response_generator.Get());
263 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
267 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
268 EchoResponse* response = nullptr, int timeout_ms = 1000,
269 Status* result = nullptr, bool wait_for_ready = false) {
270 const bool local_response = (response == nullptr);
271 if (local_response) response = new EchoResponse;
273 request.set_message(kRequestMessage_);
274 ClientContext context;
275 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
276 if (wait_for_ready) context.set_wait_for_ready(true);
277 Status status = stub->Echo(&context, request, response);
278 if (result != nullptr) *result = status;
279 if (local_response) delete response;
284 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
285 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
286 EchoResponse response;
289 SendRpc(stub, &response, 2000, &status, wait_for_ready);
290 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
292 << "Error: " << status.error_message() << " "
293 << status.error_details();
294 ASSERT_EQ(response.message(), kRequestMessage_)
295 << "From " << location.file() << ":" << location.line();
296 if (!success) abort();
299 void CheckRpcSendFailure(
300 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
301 const bool success = SendRpc(stub);
302 EXPECT_FALSE(success);
307 std::unique_ptr<Server> server_;
308 MyTestServiceImpl service_;
309 std::unique_ptr<std::thread> thread_;
310 bool server_ready_ = false;
311 bool started_ = false;
313 explicit ServerData(int port = 0) {
314 port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
317 void Start(const grpc::string& server_host) {
318 gpr_log(GPR_INFO, "starting server on port %d", port_);
320 grpc::internal::Mutex mu;
321 grpc::internal::MutexLock lock(&mu);
322 grpc::internal::CondVar cond;
323 thread_.reset(new std::thread(
324 std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
325 cond.WaitUntil(&mu, [this] { return server_ready_; });
326 server_ready_ = false;
327 gpr_log(GPR_INFO, "server startup complete");
330 void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
331 grpc::internal::CondVar* cond) {
332 std::ostringstream server_address;
333 server_address << server_host << ":" << port_;
334 ServerBuilder builder;
335 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
336 grpc_fake_transport_security_server_credentials_create()));
337 builder.AddListeningPort(server_address.str(), std::move(creds));
338 builder.RegisterService(&service_);
339 server_ = builder.BuildAndStart();
340 grpc::internal::MutexLock lock(mu);
341 server_ready_ = true;
346 if (!started_) return;
347 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
352 void SetServingStatus(const grpc::string& service, bool serving) {
353 server_->GetHealthCheckService()->SetServingStatus(service, serving);
357 void ResetCounters() {
358 for (const auto& server : servers_) server->service_.ResetCounters();
362 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
363 size_t server_idx, const grpc_core::DebugLocation& location,
364 bool ignore_failure = false) {
366 if (ignore_failure) {
369 CheckRpcSendOk(stub, location, true);
371 } while (servers_[server_idx]->service_.request_count() == 0);
375 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
376 const gpr_timespec deadline =
377 grpc_timeout_seconds_to_deadline(timeout_seconds);
378 grpc_connectivity_state state;
379 while ((state = channel->GetState(false /* try_to_connect */)) ==
380 GRPC_CHANNEL_READY) {
381 if (!channel->WaitForStateChange(state, deadline)) return false;
386 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
387 const gpr_timespec deadline =
388 grpc_timeout_seconds_to_deadline(timeout_seconds);
389 grpc_connectivity_state state;
390 while ((state = channel->GetState(true /* try_to_connect */)) !=
391 GRPC_CHANNEL_READY) {
392 if (!channel->WaitForStateChange(state, deadline)) return false;
397 bool SeenAllServers() {
398 for (const auto& server : servers_) {
399 if (server->service_.request_count() == 0) return false;
404 // Updates \a connection_order by appending to it the index of the newly
405 // connected server. Must be called after every single RPC.
406 void UpdateConnectionOrder(
407 const std::vector<std::unique_ptr<ServerData>>& servers,
408 std::vector<int>* connection_order) {
409 for (size_t i = 0; i < servers.size(); ++i) {
410 if (servers[i]->service_.request_count() == 1) {
411 // Was the server index known? If not, update connection_order.
413 std::find(connection_order->begin(), connection_order->end(), i);
414 if (it == connection_order->end()) {
415 connection_order->push_back(i);
422 const grpc::string server_host_;
423 std::vector<std::unique_ptr<ServerData>> servers_;
424 const grpc::string kRequestMessage_;
425 std::shared_ptr<ChannelCredentials> creds_;
428 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
429 const int kNumServers = 3;
430 StartServers(kNumServers);
431 auto response_generator = BuildResolverResponseGenerator();
432 auto channel = BuildChannel("", response_generator);
433 auto stub = BuildStub(channel);
434 // Initial state should be IDLE.
435 EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
436 // Tell the channel to try to connect.
437 // Note that this call also returns IDLE, since the state change has
438 // not yet occurred; it just gets triggered by this call.
439 EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
440 // Now that the channel is trying to connect, we should be in state
442 EXPECT_EQ(channel->GetState(false /* try_to_connect */),
443 GRPC_CHANNEL_CONNECTING);
444 // Return a resolver result, which allows the connection attempt to proceed.
445 response_generator.SetNextResolution(GetServersPorts());
446 // We should eventually transition into state READY.
447 EXPECT_TRUE(WaitForChannelReady(channel.get()));
450 TEST_F(ClientLbEnd2endTest, PickFirst) {
451 // Start servers and send one RPC per server.
452 const int kNumServers = 3;
453 StartServers(kNumServers);
454 auto response_generator = BuildResolverResponseGenerator();
455 auto channel = BuildChannel(
456 "", response_generator); // test that pick first is the default.
457 auto stub = BuildStub(channel);
458 response_generator.SetNextResolution(GetServersPorts());
459 for (size_t i = 0; i < servers_.size(); ++i) {
460 CheckRpcSendOk(stub, DEBUG_LOCATION);
462 // All requests should have gone to a single server.
464 for (size_t i = 0; i < servers_.size(); ++i) {
465 const int request_count = servers_[i]->service_.request_count();
466 if (request_count == kNumServers) {
469 EXPECT_EQ(0, request_count);
473 // Check LB policy name for the channel.
474 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
477 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
478 StartServers(1); // Single server
479 auto response_generator = BuildResolverResponseGenerator();
480 auto channel = BuildChannel(
481 "", response_generator); // test that pick first is the default.
482 auto stub = BuildStub(channel);
483 response_generator.SetNextResolution({servers_[0]->port_});
484 WaitForServer(stub, 0, DEBUG_LOCATION);
485 // Create a new channel and its corresponding PF LB policy, which will pick
486 // the subchannels in READY state from the previous RPC against the same
487 // target (even if it happened over a different channel, because subchannels
488 // are globally reused). Progress should happen without any transition from
490 auto second_response_generator = BuildResolverResponseGenerator();
491 auto second_channel = BuildChannel("", second_response_generator);
492 auto second_stub = BuildStub(second_channel);
493 second_response_generator.SetNextResolution({servers_[0]->port_});
494 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
497 TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
498 ChannelArguments args;
499 constexpr int kInitialBackOffMs = 5000;
500 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
501 // Create 2 servers, but start only the second one.
502 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
503 grpc_pick_unused_port_or_die()};
504 CreateServers(2, ports);
506 auto response_generator1 = BuildResolverResponseGenerator();
507 auto channel1 = BuildChannel("pick_first", response_generator1, args);
508 auto stub1 = BuildStub(channel1);
509 response_generator1.SetNextResolution(ports);
510 // Wait for second server to be ready.
511 WaitForServer(stub1, 1, DEBUG_LOCATION);
512 // Create a second channel with the same addresses. Its PF instance
513 // should immediately pick the second subchannel, since it's already
515 auto response_generator2 = BuildResolverResponseGenerator();
516 auto channel2 = BuildChannel("pick_first", response_generator2, args);
517 response_generator2.SetNextResolution(ports);
518 // Check that the channel reports READY without waiting for the
520 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
523 TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
524 ChannelArguments args;
525 constexpr int kInitialBackOffMs = 100;
526 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
527 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
528 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
529 auto response_generator = BuildResolverResponseGenerator();
530 auto channel = BuildChannel("pick_first", response_generator, args);
531 auto stub = BuildStub(channel);
532 response_generator.SetNextResolution(ports);
533 // The channel won't become connected (there's no server).
534 ASSERT_FALSE(channel->WaitForConnected(
535 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
536 // Bring up a server on the chosen port.
537 StartServers(1, ports);
539 ASSERT_TRUE(channel->WaitForConnected(
540 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
541 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
542 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
543 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
544 // We should have waited at least kInitialBackOffMs. We substract one to
545 // account for test and precision accuracy drift.
546 EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
547 // But not much more.
550 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
554 TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
555 ChannelArguments args;
556 constexpr int kMinReconnectBackOffMs = 1000;
557 args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
558 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
559 auto response_generator = BuildResolverResponseGenerator();
560 auto channel = BuildChannel("pick_first", response_generator, args);
561 auto stub = BuildStub(channel);
562 response_generator.SetNextResolution(ports);
563 // Make connection delay a 10% longer than it's willing to in order to make
564 // sure we are hitting the codepath that waits for the min reconnect backoff.
565 gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
566 default_client_impl = grpc_tcp_client_impl;
567 grpc_set_tcp_client_impl(&delayed_connect);
568 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
569 channel->WaitForConnected(
570 grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
571 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
572 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
573 gpr_log(GPR_DEBUG, "Waited %" PRId64 " ms", waited_ms);
574 // We should have waited at least kMinReconnectBackOffMs. We substract one to
575 // account for test and precision accuracy drift.
576 EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
577 gpr_atm_rel_store(&g_connection_delay_ms, 0);
580 TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
581 ChannelArguments args;
582 constexpr int kInitialBackOffMs = 1000;
583 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
584 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
585 auto response_generator = BuildResolverResponseGenerator();
586 auto channel = BuildChannel("pick_first", response_generator, args);
587 auto stub = BuildStub(channel);
588 response_generator.SetNextResolution(ports);
589 // The channel won't become connected (there's no server).
591 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
592 // Bring up a server on the chosen port.
593 StartServers(1, ports);
594 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
595 // Wait for connect, but not long enough. This proves that we're
596 // being throttled by initial backoff.
598 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
599 // Reset connection backoff.
600 experimental::ChannelResetConnectionBackoff(channel.get());
601 // Wait for connect. Should happen ~immediately.
603 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
604 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
605 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
606 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
607 // We should have waited less than kInitialBackOffMs.
608 EXPECT_LT(waited_ms, kInitialBackOffMs);
611 TEST_F(ClientLbEnd2endTest,
612 PickFirstResetConnectionBackoffNextAttemptStartsImmediately) {
613 ChannelArguments args;
614 constexpr int kInitialBackOffMs = 1000;
615 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
616 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
617 auto response_generator = BuildResolverResponseGenerator();
618 auto channel = BuildChannel("pick_first", response_generator, args);
619 auto stub = BuildStub(channel);
620 response_generator.SetNextResolution(ports);
621 // Wait for connect, which should fail ~immediately, because the server
623 gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
625 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
626 // Reset connection backoff.
627 // Note that the time at which the third attempt will be started is
628 // actually computed at this point, so we record the start time here.
629 gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
630 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
631 experimental::ChannelResetConnectionBackoff(channel.get());
632 // Trigger a second connection attempt. This should also fail
633 // ~immediately, but the retry should be scheduled for
634 // kInitialBackOffMs instead of applying the multiplier.
635 gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
637 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
638 // Bring up a server on the chosen port.
639 gpr_log(GPR_INFO, "=== STARTING BACKEND");
640 StartServers(1, ports);
641 // Wait for connect. Should happen within kInitialBackOffMs.
642 // Give an extra 100ms to account for the time spent in the second and
643 // third connection attempts themselves (since what we really want to
644 // measure is the time between the two). As long as this is less than
645 // the 1.6x increase we would see if the backoff state was not reset
646 // properly, the test is still proving that the backoff was reset.
647 constexpr int kWaitMs = kInitialBackOffMs + 100;
648 gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
649 EXPECT_TRUE(channel->WaitForConnected(
650 grpc_timeout_milliseconds_to_deadline(kWaitMs)));
651 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
652 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
653 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
654 EXPECT_LT(waited_ms, kWaitMs);
657 TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
658 // Start servers and send one RPC per server.
659 const int kNumServers = 3;
660 StartServers(kNumServers);
661 auto response_generator = BuildResolverResponseGenerator();
662 auto channel = BuildChannel("pick_first", response_generator);
663 auto stub = BuildStub(channel);
665 std::vector<int> ports;
667 // Perform one RPC against the first server.
668 ports.emplace_back(servers_[0]->port_);
669 response_generator.SetNextResolution(ports);
670 gpr_log(GPR_INFO, "****** SET [0] *******");
671 CheckRpcSendOk(stub, DEBUG_LOCATION);
672 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
674 // An empty update will result in the channel going into TRANSIENT_FAILURE.
676 response_generator.SetNextResolution(ports);
677 gpr_log(GPR_INFO, "****** SET none *******");
678 grpc_connectivity_state channel_state;
680 channel_state = channel->GetState(true /* try to connect */);
681 } while (channel_state == GRPC_CHANNEL_READY);
682 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
683 servers_[0]->service_.ResetCounters();
685 // Next update introduces servers_[1], making the channel recover.
687 ports.emplace_back(servers_[1]->port_);
688 response_generator.SetNextResolution(ports);
689 gpr_log(GPR_INFO, "****** SET [1] *******");
690 WaitForServer(stub, 1, DEBUG_LOCATION);
691 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
693 // And again for servers_[2]
695 ports.emplace_back(servers_[2]->port_);
696 response_generator.SetNextResolution(ports);
697 gpr_log(GPR_INFO, "****** SET [2] *******");
698 WaitForServer(stub, 2, DEBUG_LOCATION);
699 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
700 EXPECT_EQ(servers_[1]->service_.request_count(), 0);
702 // Check LB policy name for the channel.
703 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
706 TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
707 // Start servers and send one RPC per server.
708 const int kNumServers = 3;
709 StartServers(kNumServers);
710 auto response_generator = BuildResolverResponseGenerator();
711 auto channel = BuildChannel("pick_first", response_generator);
712 auto stub = BuildStub(channel);
714 std::vector<int> ports;
716 // Perform one RPC against the first server.
717 ports.emplace_back(servers_[0]->port_);
718 response_generator.SetNextResolution(ports);
719 gpr_log(GPR_INFO, "****** SET [0] *******");
720 CheckRpcSendOk(stub, DEBUG_LOCATION);
721 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
722 servers_[0]->service_.ResetCounters();
724 // Send and superset update
726 ports.emplace_back(servers_[1]->port_);
727 ports.emplace_back(servers_[0]->port_);
728 response_generator.SetNextResolution(ports);
729 gpr_log(GPR_INFO, "****** SET superset *******");
730 CheckRpcSendOk(stub, DEBUG_LOCATION);
731 // We stick to the previously connected server.
732 WaitForServer(stub, 0, DEBUG_LOCATION);
733 EXPECT_EQ(0, servers_[1]->service_.request_count());
735 // Check LB policy name for the channel.
736 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
739 TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) {
741 const int kNumServers = 1;
742 StartServers(kNumServers);
743 std::vector<int> ports = GetServersPorts();
744 // Create two channels that (by default) use the global subchannel pool.
745 auto response_generator1 = BuildResolverResponseGenerator();
746 auto channel1 = BuildChannel("pick_first", response_generator1);
747 auto stub1 = BuildStub(channel1);
748 response_generator1.SetNextResolution(ports);
749 auto response_generator2 = BuildResolverResponseGenerator();
750 auto channel2 = BuildChannel("pick_first", response_generator2);
751 auto stub2 = BuildStub(channel2);
752 response_generator2.SetNextResolution(ports);
753 WaitForServer(stub1, 0, DEBUG_LOCATION);
754 // Send one RPC on each channel.
755 CheckRpcSendOk(stub1, DEBUG_LOCATION);
756 CheckRpcSendOk(stub2, DEBUG_LOCATION);
757 // The server receives two requests.
758 EXPECT_EQ(2, servers_[0]->service_.request_count());
759 // The two requests are from the same client port, because the two channels
760 // share subchannels via the global subchannel pool.
761 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
764 TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) {
766 const int kNumServers = 1;
767 StartServers(kNumServers);
768 std::vector<int> ports = GetServersPorts();
769 // Create two channels that use local subchannel pool.
770 ChannelArguments args;
771 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
772 auto response_generator1 = BuildResolverResponseGenerator();
773 auto channel1 = BuildChannel("pick_first", response_generator1, args);
774 auto stub1 = BuildStub(channel1);
775 response_generator1.SetNextResolution(ports);
776 auto response_generator2 = BuildResolverResponseGenerator();
777 auto channel2 = BuildChannel("pick_first", response_generator2, args);
778 auto stub2 = BuildStub(channel2);
779 response_generator2.SetNextResolution(ports);
780 WaitForServer(stub1, 0, DEBUG_LOCATION);
781 // Send one RPC on each channel.
782 CheckRpcSendOk(stub1, DEBUG_LOCATION);
783 CheckRpcSendOk(stub2, DEBUG_LOCATION);
784 // The server receives two requests.
785 EXPECT_EQ(2, servers_[0]->service_.request_count());
786 // The two requests are from two client ports, because the two channels didn't
787 // share subchannels with each other.
788 EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
791 TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
792 const int kNumUpdates = 1000;
793 const int kNumServers = 3;
794 StartServers(kNumServers);
795 auto response_generator = BuildResolverResponseGenerator();
796 auto channel = BuildChannel("pick_first", response_generator);
797 auto stub = BuildStub(channel);
798 std::vector<int> ports = GetServersPorts();
799 for (size_t i = 0; i < kNumUpdates; ++i) {
800 std::shuffle(ports.begin(), ports.end(),
801 std::mt19937(std::random_device()()));
802 response_generator.SetNextResolution(ports);
803 // We should re-enter core at the end of the loop to give the resolution
804 // setting closure a chance to run.
805 if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
807 // Check LB policy name for the channel.
808 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
811 TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
812 // Prepare the ports for up servers and down servers.
813 const int kNumServers = 3;
814 const int kNumAliveServers = 1;
815 StartServers(kNumAliveServers);
816 std::vector<int> alive_ports, dead_ports;
817 for (size_t i = 0; i < kNumServers; ++i) {
818 if (i < kNumAliveServers) {
819 alive_ports.emplace_back(servers_[i]->port_);
821 dead_ports.emplace_back(grpc_pick_unused_port_or_die());
824 auto response_generator = BuildResolverResponseGenerator();
825 auto channel = BuildChannel("pick_first", response_generator);
826 auto stub = BuildStub(channel);
827 // The initial resolution only contains dead ports. There won't be any
828 // selected subchannel. Re-resolution will return the same result.
829 response_generator.SetNextResolution(dead_ports);
830 gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
831 for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
832 // Set a re-resolution result that contains reachable ports, so that the
833 // pick_first LB policy can recover soon.
834 response_generator.SetNextResolutionUponError(alive_ports);
835 gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
836 WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
837 CheckRpcSendOk(stub, DEBUG_LOCATION);
838 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
839 // Check LB policy name for the channel.
840 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
843 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
844 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
845 StartServers(1, ports);
846 auto response_generator = BuildResolverResponseGenerator();
847 auto channel = BuildChannel("pick_first", response_generator);
848 auto stub = BuildStub(channel);
849 response_generator.SetNextResolution(ports);
850 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
851 WaitForServer(stub, 0, DEBUG_LOCATION);
852 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
853 servers_[0]->Shutdown();
854 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
855 gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
856 StartServers(1, ports);
857 WaitForServer(stub, 0, DEBUG_LOCATION);
860 TEST_F(ClientLbEnd2endTest,
861 PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
862 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
863 grpc_pick_unused_port_or_die()};
864 CreateServers(2, ports);
866 auto response_generator = BuildResolverResponseGenerator();
867 auto channel = BuildChannel("pick_first", response_generator);
868 auto stub = BuildStub(channel);
869 response_generator.SetNextResolution(ports);
870 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
871 WaitForServer(stub, 1, DEBUG_LOCATION);
872 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
873 servers_[1]->Shutdown();
874 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
875 gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
876 StartServers(2, ports);
877 WaitForServer(stub, 0, DEBUG_LOCATION);
880 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
881 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
882 StartServers(1, ports);
883 auto response_generator = BuildResolverResponseGenerator();
884 auto channel_1 = BuildChannel("pick_first", response_generator);
885 auto stub_1 = BuildStub(channel_1);
886 response_generator.SetNextResolution(ports);
887 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
888 WaitForServer(stub_1, 0, DEBUG_LOCATION);
889 gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
890 servers_[0]->Shutdown();
891 // Channel 1 will receive a re-resolution containing the same server. It will
892 // create a new subchannel and hold a ref to it.
893 StartServers(1, ports);
894 gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
895 auto response_generator_2 = BuildResolverResponseGenerator();
896 auto channel_2 = BuildChannel("pick_first", response_generator_2);
897 auto stub_2 = BuildStub(channel_2);
898 response_generator_2.SetNextResolution(ports);
899 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
900 WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
901 gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
902 servers_[0]->Shutdown();
903 // Wait until the disconnection has triggered the connectivity notification.
904 // Otherwise, the subchannel may be picked for next call but will fail soon.
905 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
906 // Channel 2 will also receive a re-resolution containing the same server.
907 // Both channels will ref the same subchannel that failed.
908 StartServers(1, ports);
909 gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
910 gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
911 // The first call after the server restart will succeed.
912 CheckRpcSendOk(stub_2, DEBUG_LOCATION);
913 gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
914 // Check LB policy name for the channel.
915 EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
916 // Check LB policy name for the channel.
917 EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
920 TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
921 // Start server, send RPC, and make sure channel is READY.
922 const int kNumServers = 1;
923 StartServers(kNumServers);
924 auto response_generator = BuildResolverResponseGenerator();
926 BuildChannel("", response_generator); // pick_first is the default.
927 auto stub = BuildStub(channel);
928 response_generator.SetNextResolution(GetServersPorts());
929 CheckRpcSendOk(stub, DEBUG_LOCATION);
930 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
931 // Stop server. Channel should go into state IDLE.
932 response_generator.SetFailureOnReresolution();
933 servers_[0]->Shutdown();
934 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
935 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
939 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
940 auto response_generator = BuildResolverResponseGenerator();
942 BuildChannel("", response_generator); // pick_first is the default.
943 auto stub = BuildStub(channel);
944 // Create a number of servers, but only start 1 of them.
947 // Initially resolve to first server and make sure it connects.
948 gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
949 response_generator.SetNextResolution({servers_[0]->port_});
950 CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
951 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
952 // Send a resolution update with the remaining servers, none of which are
953 // running yet, so the update will stay pending. Note that it's important
954 // to have multiple servers here, or else the test will be flaky; with only
955 // one server, the pending subchannel list has already gone into
956 // TRANSIENT_FAILURE due to hitting the end of the list by the time we
959 "Phase 2: Resolver update pointing to remaining "
960 "(not started) servers.");
961 response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
962 // RPCs will continue to be sent to the first server.
963 CheckRpcSendOk(stub, DEBUG_LOCATION);
964 // Now stop the first server, so that the current subchannel list
965 // fails. This should cause us to immediately swap over to the
966 // pending list, even though it's not yet connected. The state should
967 // be set to CONNECTING, since that's what the pending subchannel list
968 // was doing when we swapped over.
969 gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
970 servers_[0]->Shutdown();
971 WaitForChannelNotReady(channel.get());
972 // TODO(roth): This should always return CONNECTING, but it's flaky
973 // between that and TRANSIENT_FAILURE. I suspect that this problem
974 // will go away once we move the backoff code out of the subchannel
975 // and into the LB policies.
976 EXPECT_THAT(channel->GetState(false),
977 ::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
978 GRPC_CHANNEL_TRANSIENT_FAILURE));
979 // Now start the second server.
980 gpr_log(GPR_INFO, "Phase 4: Starting second server.");
982 // The channel should go to READY state and RPCs should go to the
984 WaitForChannelReady(channel.get());
985 WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
988 TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) {
989 // Start server, send RPC, and make sure channel is READY.
990 const int kNumServers = 1;
991 StartServers(kNumServers);
992 auto response_generator = BuildResolverResponseGenerator();
994 BuildChannel("", response_generator); // pick_first is the default.
995 auto stub = BuildStub(channel);
996 response_generator.SetNextResolution(GetServersPorts());
997 CheckRpcSendOk(stub, DEBUG_LOCATION);
998 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
999 // Stop server. Channel should go into state IDLE.
1000 servers_[0]->Shutdown();
1001 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1002 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1003 // Now send resolver update that includes no addresses. Channel
1004 // should stay in state IDLE.
1005 response_generator.SetNextResolution({});
1006 EXPECT_FALSE(channel->WaitForStateChange(
1007 GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1008 // Now bring the backend back up and send a non-empty resolver update,
1009 // and then try to send an RPC. Channel should go back into state READY.
1011 response_generator.SetNextResolution(GetServersPorts());
1012 CheckRpcSendOk(stub, DEBUG_LOCATION);
1013 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1016 TEST_F(ClientLbEnd2endTest, RoundRobin) {
1017 // Start servers and send one RPC per server.
1018 const int kNumServers = 3;
1019 StartServers(kNumServers);
1020 auto response_generator = BuildResolverResponseGenerator();
1021 auto channel = BuildChannel("round_robin", response_generator);
1022 auto stub = BuildStub(channel);
1023 response_generator.SetNextResolution(GetServersPorts());
1024 // Wait until all backends are ready.
1026 CheckRpcSendOk(stub, DEBUG_LOCATION);
1027 } while (!SeenAllServers());
1029 // "Sync" to the end of the list. Next sequence of picks will start at the
1030 // first server (index 0).
1031 WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
1032 std::vector<int> connection_order;
1033 for (size_t i = 0; i < servers_.size(); ++i) {
1034 CheckRpcSendOk(stub, DEBUG_LOCATION);
1035 UpdateConnectionOrder(servers_, &connection_order);
1037 // Backends should be iterated over in the order in which the addresses were
1039 const auto expected = std::vector<int>{0, 1, 2};
1040 EXPECT_EQ(expected, connection_order);
1041 // Check LB policy name for the channel.
1042 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1045 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
1046 StartServers(1); // Single server
1047 auto response_generator = BuildResolverResponseGenerator();
1048 auto channel = BuildChannel("round_robin", response_generator);
1049 auto stub = BuildStub(channel);
1050 response_generator.SetNextResolution({servers_[0]->port_});
1051 WaitForServer(stub, 0, DEBUG_LOCATION);
1052 // Create a new channel and its corresponding RR LB policy, which will pick
1053 // the subchannels in READY state from the previous RPC against the same
1054 // target (even if it happened over a different channel, because subchannels
1055 // are globally reused). Progress should happen without any transition from
1056 // this READY state.
1057 auto second_response_generator = BuildResolverResponseGenerator();
1058 auto second_channel = BuildChannel("round_robin", second_response_generator);
1059 auto second_stub = BuildStub(second_channel);
1060 second_response_generator.SetNextResolution({servers_[0]->port_});
1061 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
1064 TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
1065 // Start servers and send one RPC per server.
1066 const int kNumServers = 3;
1067 StartServers(kNumServers);
1068 auto response_generator = BuildResolverResponseGenerator();
1069 auto channel = BuildChannel("round_robin", response_generator);
1070 auto stub = BuildStub(channel);
1071 std::vector<int> ports;
1072 // Start with a single server.
1073 gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1074 ports.emplace_back(servers_[0]->port_);
1075 response_generator.SetNextResolution(ports);
1076 WaitForServer(stub, 0, DEBUG_LOCATION);
1077 // Send RPCs. They should all go servers_[0]
1078 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1079 EXPECT_EQ(10, servers_[0]->service_.request_count());
1080 EXPECT_EQ(0, servers_[1]->service_.request_count());
1081 EXPECT_EQ(0, servers_[2]->service_.request_count());
1082 servers_[0]->service_.ResetCounters();
1083 // And now for the second server.
1084 gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1086 ports.emplace_back(servers_[1]->port_);
1087 response_generator.SetNextResolution(ports);
1088 // Wait until update has been processed, as signaled by the second backend
1089 // receiving a request.
1090 EXPECT_EQ(0, servers_[1]->service_.request_count());
1091 WaitForServer(stub, 1, DEBUG_LOCATION);
1092 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1093 EXPECT_EQ(0, servers_[0]->service_.request_count());
1094 EXPECT_EQ(10, servers_[1]->service_.request_count());
1095 EXPECT_EQ(0, servers_[2]->service_.request_count());
1096 servers_[1]->service_.ResetCounters();
1097 // ... and for the last server.
1098 gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1100 ports.emplace_back(servers_[2]->port_);
1101 response_generator.SetNextResolution(ports);
1102 WaitForServer(stub, 2, DEBUG_LOCATION);
1103 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1104 EXPECT_EQ(0, servers_[0]->service_.request_count());
1105 EXPECT_EQ(0, servers_[1]->service_.request_count());
1106 EXPECT_EQ(10, servers_[2]->service_.request_count());
1107 servers_[2]->service_.ResetCounters();
1108 // Back to all servers.
1109 gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1111 ports.emplace_back(servers_[0]->port_);
1112 ports.emplace_back(servers_[1]->port_);
1113 ports.emplace_back(servers_[2]->port_);
1114 response_generator.SetNextResolution(ports);
1115 WaitForServer(stub, 0, DEBUG_LOCATION);
1116 WaitForServer(stub, 1, DEBUG_LOCATION);
1117 WaitForServer(stub, 2, DEBUG_LOCATION);
1118 // Send three RPCs, one per server.
1119 for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1120 EXPECT_EQ(1, servers_[0]->service_.request_count());
1121 EXPECT_EQ(1, servers_[1]->service_.request_count());
1122 EXPECT_EQ(1, servers_[2]->service_.request_count());
1123 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1124 gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1126 response_generator.SetNextResolution(ports);
1127 grpc_connectivity_state channel_state;
1129 channel_state = channel->GetState(true /* try to connect */);
1130 } while (channel_state == GRPC_CHANNEL_READY);
1131 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
1132 servers_[0]->service_.ResetCounters();
1133 // Next update introduces servers_[1], making the channel recover.
1134 gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1136 ports.emplace_back(servers_[1]->port_);
1137 response_generator.SetNextResolution(ports);
1138 WaitForServer(stub, 1, DEBUG_LOCATION);
1139 channel_state = channel->GetState(false /* try to connect */);
1140 ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
1141 // Check LB policy name for the channel.
1142 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1145 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
1146 const int kNumServers = 3;
1147 StartServers(kNumServers);
1148 auto response_generator = BuildResolverResponseGenerator();
1149 auto channel = BuildChannel("round_robin", response_generator);
1150 auto stub = BuildStub(channel);
1151 std::vector<int> ports;
1153 // Start with a single server.
1154 ports.emplace_back(servers_[0]->port_);
1155 response_generator.SetNextResolution(ports);
1156 WaitForServer(stub, 0, DEBUG_LOCATION);
1157 // Send RPCs. They should all go to servers_[0]
1158 for (size_t i = 0; i < 10; ++i) SendRpc(stub);
1159 EXPECT_EQ(10, servers_[0]->service_.request_count());
1160 EXPECT_EQ(0, servers_[1]->service_.request_count());
1161 EXPECT_EQ(0, servers_[2]->service_.request_count());
1162 servers_[0]->service_.ResetCounters();
1164 // Shutdown one of the servers to be sent in the update.
1165 servers_[1]->Shutdown();
1166 ports.emplace_back(servers_[1]->port_);
1167 ports.emplace_back(servers_[2]->port_);
1168 response_generator.SetNextResolution(ports);
1169 WaitForServer(stub, 0, DEBUG_LOCATION);
1170 WaitForServer(stub, 2, DEBUG_LOCATION);
1172 // Send three RPCs, one per server.
1173 for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
1174 // The server in shutdown shouldn't receive any.
1175 EXPECT_EQ(0, servers_[1]->service_.request_count());
1178 TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
1179 // Start servers and send one RPC per server.
1180 const int kNumServers = 3;
1181 StartServers(kNumServers);
1182 auto response_generator = BuildResolverResponseGenerator();
1183 auto channel = BuildChannel("round_robin", response_generator);
1184 auto stub = BuildStub(channel);
1185 std::vector<int> ports = GetServersPorts();
1186 for (size_t i = 0; i < 1000; ++i) {
1187 std::shuffle(ports.begin(), ports.end(),
1188 std::mt19937(std::random_device()()));
1189 response_generator.SetNextResolution(ports);
1190 if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
1192 // Check LB policy name for the channel.
1193 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1196 TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
1197 // TODO(dgq): replicate the way internal testing exercises the concurrent
1198 // update provisions of RR.
1201 TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
1202 // Start servers and send one RPC per server.
1203 const int kNumServers = 3;
1204 std::vector<int> first_ports;
1205 std::vector<int> second_ports;
1206 first_ports.reserve(kNumServers);
1207 for (int i = 0; i < kNumServers; ++i) {
1208 first_ports.push_back(grpc_pick_unused_port_or_die());
1210 second_ports.reserve(kNumServers);
1211 for (int i = 0; i < kNumServers; ++i) {
1212 second_ports.push_back(grpc_pick_unused_port_or_die());
1214 StartServers(kNumServers, first_ports);
1215 auto response_generator = BuildResolverResponseGenerator();
1216 auto channel = BuildChannel("round_robin", response_generator);
1217 auto stub = BuildStub(channel);
1218 response_generator.SetNextResolution(first_ports);
1219 // Send a number of RPCs, which succeed.
1220 for (size_t i = 0; i < 100; ++i) {
1221 CheckRpcSendOk(stub, DEBUG_LOCATION);
1224 gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
1225 for (size_t i = 0; i < servers_.size(); ++i) {
1226 servers_[i]->Shutdown();
1228 gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
1229 gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
1230 // Client requests should fail. Send enough to tickle all subchannels.
1231 for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
1232 gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
1233 // Bring servers back up on a different set of ports. We need to do this to be
1234 // sure that the eventual success is *not* due to subchannel reconnection
1235 // attempts and that an actual re-resolution has happened as a result of the
1236 // RR policy going into transient failure when all its subchannels become
1237 // unavailable (in transient failure as well).
1238 gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
1239 StartServers(kNumServers, second_ports);
1240 // Don't notify of the update. Wait for the LB policy's re-resolution to
1241 // "pull" the new ports.
1242 response_generator.SetNextResolutionUponError(second_ports);
1243 gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
1244 gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
1245 // Client request should eventually (but still fairly soon) succeed.
1246 const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
1247 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1248 while (gpr_time_cmp(deadline, now) > 0) {
1249 if (SendRpc(stub)) break;
1250 now = gpr_now(GPR_CLOCK_MONOTONIC);
1252 ASSERT_GT(gpr_time_cmp(deadline, now), 0);
1255 TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
1256 const int kNumServers = 3;
1257 StartServers(kNumServers);
1258 const auto ports = GetServersPorts();
1259 auto response_generator = BuildResolverResponseGenerator();
1260 auto channel = BuildChannel("round_robin", response_generator);
1261 auto stub = BuildStub(channel);
1262 response_generator.SetNextResolution(ports);
1263 for (size_t i = 0; i < kNumServers; ++i) {
1264 WaitForServer(stub, i, DEBUG_LOCATION);
1266 for (size_t i = 0; i < servers_.size(); ++i) {
1267 CheckRpcSendOk(stub, DEBUG_LOCATION);
1268 EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1270 // One request should have gone to each server.
1271 for (size_t i = 0; i < servers_.size(); ++i) {
1272 EXPECT_EQ(1, servers_[i]->service_.request_count());
1274 const auto pre_death = servers_[0]->service_.request_count();
1275 // Kill the first server.
1276 servers_[0]->Shutdown();
1277 // Client request still succeed. May need retrying if RR had returned a pick
1278 // before noticing the change in the server's connectivity.
1279 while (!SendRpc(stub)) {
1280 } // Retry until success.
1281 // Send a bunch of RPCs that should succeed.
1282 for (int i = 0; i < 10 * kNumServers; ++i) {
1283 CheckRpcSendOk(stub, DEBUG_LOCATION);
1285 const auto post_death = servers_[0]->service_.request_count();
1286 // No requests have gone to the deceased server.
1287 EXPECT_EQ(pre_death, post_death);
1288 // Bring the first server back up.
1290 // Requests should start arriving at the first server either right away (if
1291 // the server managed to start before the RR policy retried the subchannel) or
1292 // after the subchannel retry delay otherwise (RR's subchannel retried before
1293 // the server was fully back up).
1294 WaitForServer(stub, 0, DEBUG_LOCATION);
1297 // If health checking is required by client but health checking service
1298 // is not running on the server, the channel should be treated as healthy.
1299 TEST_F(ClientLbEnd2endTest,
1300 RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) {
1301 StartServers(1); // Single server
1302 ChannelArguments args;
1303 args.SetServiceConfigJSON(
1304 "{\"healthCheckConfig\": "
1305 "{\"serviceName\": \"health_check_service_name\"}}");
1306 auto response_generator = BuildResolverResponseGenerator();
1307 auto channel = BuildChannel("round_robin", response_generator, args);
1308 auto stub = BuildStub(channel);
1309 response_generator.SetNextResolution({servers_[0]->port_});
1310 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1311 CheckRpcSendOk(stub, DEBUG_LOCATION);
1314 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) {
1315 EnableDefaultHealthCheckService(true);
1317 const int kNumServers = 3;
1318 StartServers(kNumServers);
1319 ChannelArguments args;
1320 args.SetServiceConfigJSON(
1321 "{\"healthCheckConfig\": "
1322 "{\"serviceName\": \"health_check_service_name\"}}");
1323 auto response_generator = BuildResolverResponseGenerator();
1324 auto channel = BuildChannel("round_robin", response_generator, args);
1325 auto stub = BuildStub(channel);
1326 response_generator.SetNextResolution(GetServersPorts());
1327 // Channel should not become READY, because health checks should be failing.
1329 "*** initial state: unknown health check service name for "
1331 EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1332 // Now set one of the servers to be healthy.
1333 // The channel should become healthy and all requests should go to
1334 // the healthy server.
1335 gpr_log(GPR_INFO, "*** server 0 healthy");
1336 servers_[0]->SetServingStatus("health_check_service_name", true);
1337 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1338 for (int i = 0; i < 10; ++i) {
1339 CheckRpcSendOk(stub, DEBUG_LOCATION);
1341 EXPECT_EQ(10, servers_[0]->service_.request_count());
1342 EXPECT_EQ(0, servers_[1]->service_.request_count());
1343 EXPECT_EQ(0, servers_[2]->service_.request_count());
1344 // Now set a second server to be healthy.
1345 gpr_log(GPR_INFO, "*** server 2 healthy");
1346 servers_[2]->SetServingStatus("health_check_service_name", true);
1347 WaitForServer(stub, 2, DEBUG_LOCATION);
1348 for (int i = 0; i < 10; ++i) {
1349 CheckRpcSendOk(stub, DEBUG_LOCATION);
1351 EXPECT_EQ(5, servers_[0]->service_.request_count());
1352 EXPECT_EQ(0, servers_[1]->service_.request_count());
1353 EXPECT_EQ(5, servers_[2]->service_.request_count());
1354 // Now set the remaining server to be healthy.
1355 gpr_log(GPR_INFO, "*** server 1 healthy");
1356 servers_[1]->SetServingStatus("health_check_service_name", true);
1357 WaitForServer(stub, 1, DEBUG_LOCATION);
1358 for (int i = 0; i < 9; ++i) {
1359 CheckRpcSendOk(stub, DEBUG_LOCATION);
1361 EXPECT_EQ(3, servers_[0]->service_.request_count());
1362 EXPECT_EQ(3, servers_[1]->service_.request_count());
1363 EXPECT_EQ(3, servers_[2]->service_.request_count());
1364 // Now set one server to be unhealthy again. Then wait until the
1365 // unhealthiness has hit the client. We know that the client will see
1366 // this when we send kNumServers requests and one of the remaining servers
1367 // sees two of the requests.
1368 gpr_log(GPR_INFO, "*** server 0 unhealthy");
1369 servers_[0]->SetServingStatus("health_check_service_name", false);
1372 for (int i = 0; i < kNumServers; ++i) {
1373 CheckRpcSendOk(stub, DEBUG_LOCATION);
1375 } while (servers_[1]->service_.request_count() != 2 &&
1376 servers_[2]->service_.request_count() != 2);
1377 // Now set the remaining two servers to be unhealthy. Make sure the
1378 // channel leaves READY state and that RPCs fail.
1379 gpr_log(GPR_INFO, "*** all servers unhealthy");
1380 servers_[1]->SetServingStatus("health_check_service_name", false);
1381 servers_[2]->SetServingStatus("health_check_service_name", false);
1382 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1383 CheckRpcSendFailure(stub);
1385 EnableDefaultHealthCheckService(false);
1388 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
1389 EnableDefaultHealthCheckService(true);
1391 const int kNumServers = 1;
1392 StartServers(kNumServers);
1393 // Create a channel with health-checking enabled.
1394 ChannelArguments args;
1395 args.SetServiceConfigJSON(
1396 "{\"healthCheckConfig\": "
1397 "{\"serviceName\": \"health_check_service_name\"}}");
1398 auto response_generator1 = BuildResolverResponseGenerator();
1399 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1400 auto stub1 = BuildStub(channel1);
1401 std::vector<int> ports = GetServersPorts();
1402 response_generator1.SetNextResolution(ports);
1403 // Create a channel with health checking enabled but inhibited.
1404 args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1405 auto response_generator2 = BuildResolverResponseGenerator();
1406 auto channel2 = BuildChannel("round_robin", response_generator2, args);
1407 auto stub2 = BuildStub(channel2);
1408 response_generator2.SetNextResolution(ports);
1409 // First channel should not become READY, because health checks should be
1411 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1412 CheckRpcSendFailure(stub1);
1413 // Second channel should be READY.
1414 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1415 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1416 // Enable health checks on the backend and wait for channel 1 to succeed.
1417 servers_[0]->SetServingStatus("health_check_service_name", true);
1418 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1419 // Check that we created only one subchannel to the backend.
1420 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1422 EnableDefaultHealthCheckService(false);
1425 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
1426 EnableDefaultHealthCheckService(true);
1428 const int kNumServers = 1;
1429 StartServers(kNumServers);
1430 // Create a channel with health-checking enabled.
1431 ChannelArguments args;
1432 args.SetServiceConfigJSON(
1433 "{\"healthCheckConfig\": "
1434 "{\"serviceName\": \"health_check_service_name\"}}");
1435 auto response_generator1 = BuildResolverResponseGenerator();
1436 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1437 auto stub1 = BuildStub(channel1);
1438 std::vector<int> ports = GetServersPorts();
1439 response_generator1.SetNextResolution(ports);
1440 // Create a channel with health-checking enabled with a different
1442 ChannelArguments args2;
1443 args2.SetServiceConfigJSON(
1444 "{\"healthCheckConfig\": "
1445 "{\"serviceName\": \"health_check_service_name2\"}}");
1446 auto response_generator2 = BuildResolverResponseGenerator();
1447 auto channel2 = BuildChannel("round_robin", response_generator2, args2);
1448 auto stub2 = BuildStub(channel2);
1449 response_generator2.SetNextResolution(ports);
1450 // Allow health checks from channel 2 to succeed.
1451 servers_[0]->SetServingStatus("health_check_service_name2", true);
1452 // First channel should not become READY, because health checks should be
1454 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1455 CheckRpcSendFailure(stub1);
1456 // Second channel should be READY.
1457 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1458 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1459 // Enable health checks for channel 1 and wait for it to succeed.
1460 servers_[0]->SetServingStatus("health_check_service_name", true);
1461 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1462 // Check that we created only one subchannel to the backend.
1463 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1465 EnableDefaultHealthCheckService(false);
1468 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
1470 const int kNumServers = 1;
1471 StartServers(kNumServers);
1472 // Set max idle time and build the channel.
1473 ChannelArguments args;
1474 args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 100);
1475 auto response_generator = BuildResolverResponseGenerator();
1476 auto channel = BuildChannel("", response_generator, args);
1477 auto stub = BuildStub(channel);
1478 // The initial channel state should be IDLE.
1479 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1480 // After sending RPC, channel state should be READY.
1481 response_generator.SetNextResolution(GetServersPorts());
1482 CheckRpcSendOk(stub, DEBUG_LOCATION);
1483 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1484 // After a period time not using the channel, the channel state should switch
1486 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(120));
1487 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1488 // Sending a new RPC should awake the IDLE channel.
1489 response_generator.SetNextResolution(GetServersPorts());
1490 CheckRpcSendOk(stub, DEBUG_LOCATION);
1491 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1494 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
1496 void SetUp() override {
1497 ClientLbEnd2endTest::SetUp();
1498 grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
1499 ReportTrailerIntercepted, this);
1502 void TearDown() override { ClientLbEnd2endTest::TearDown(); }
1504 int trailers_intercepted() {
1505 grpc::internal::MutexLock lock(&mu_);
1506 return trailers_intercepted_;
1510 static void ReportTrailerIntercepted(void* arg) {
1511 ClientLbInterceptTrailingMetadataTest* self =
1512 static_cast<ClientLbInterceptTrailingMetadataTest*>(arg);
1513 grpc::internal::MutexLock lock(&self->mu_);
1514 self->trailers_intercepted_++;
1517 grpc::internal::Mutex mu_;
1518 int trailers_intercepted_ = 0;
1521 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
1522 const int kNumServers = 1;
1523 const int kNumRpcs = 10;
1524 StartServers(kNumServers);
1525 auto response_generator = BuildResolverResponseGenerator();
1527 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1528 auto stub = BuildStub(channel);
1529 response_generator.SetNextResolution(GetServersPorts());
1530 for (size_t i = 0; i < kNumRpcs; ++i) {
1531 CheckRpcSendOk(stub, DEBUG_LOCATION);
1533 // Check LB policy name for the channel.
1534 EXPECT_EQ("intercept_trailing_metadata_lb",
1535 channel->GetLoadBalancingPolicyName());
1536 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1539 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
1540 const int kNumServers = 1;
1541 const int kNumRpcs = 10;
1542 StartServers(kNumServers);
1543 ChannelArguments args;
1544 args.SetServiceConfigJSON(
1546 " \"methodConfig\": [ {\n"
1548 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
1550 " \"retryPolicy\": {\n"
1551 " \"maxAttempts\": 3,\n"
1552 " \"initialBackoff\": \"1s\",\n"
1553 " \"maxBackoff\": \"120s\",\n"
1554 " \"backoffMultiplier\": 1.6,\n"
1555 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
1559 auto response_generator = BuildResolverResponseGenerator();
1561 BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
1562 auto stub = BuildStub(channel);
1563 response_generator.SetNextResolution(GetServersPorts());
1564 for (size_t i = 0; i < kNumRpcs; ++i) {
1565 CheckRpcSendOk(stub, DEBUG_LOCATION);
1567 // Check LB policy name for the channel.
1568 EXPECT_EQ("intercept_trailing_metadata_lb",
1569 channel->GetLoadBalancingPolicyName());
1570 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1574 } // namespace testing
1577 int main(int argc, char** argv) {
1578 ::testing::InitGoogleTest(&argc, argv);
1579 grpc::testing::TestEnvironment env(argc, argv);
1580 const auto result = RUN_ALL_TESTS();