3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include "absl/strings/str_cat.h"
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/atm.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/time.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 #include <grpcpp/create_channel.h>
37 #include <grpcpp/health_check_service_interface.h>
38 #include <grpcpp/impl/codegen/sync.h>
39 #include <grpcpp/server.h>
40 #include <grpcpp/server_builder.h>
42 #include "src/core/ext/filters/client_channel/backup_poller.h"
43 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
44 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
45 #include "src/core/ext/filters/client_channel/server_address.h"
46 #include "src/core/ext/filters/client_channel/service_config.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/gpr/env.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/iomgr/parse_address.h"
53 #include "src/core/lib/iomgr/tcp_client.h"
54 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
55 #include "src/cpp/client/secure_credentials.h"
56 #include "src/cpp/server/secure_server_credentials.h"
58 #include "src/proto/grpc/testing/echo.grpc.pb.h"
59 #include "src/proto/grpc/testing/xds/orca_load_report_for_test.pb.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/test_config.h"
62 #include "test/core/util/test_lb_policies.h"
63 #include "test/cpp/end2end/test_service_impl.h"
65 #include <gmock/gmock.h>
66 #include <gtest/gtest.h>
68 using grpc::testing::EchoRequest;
69 using grpc::testing::EchoResponse;
70 using std::chrono::system_clock;
72 // defined in tcp_client.cc
73 extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
75 static grpc_tcp_client_vtable* default_client_impl;
81 gpr_atm g_connection_delay_ms;
83 void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep,
84 grpc_pollset_set* interested_parties,
85 const grpc_channel_args* channel_args,
86 const grpc_resolved_address* addr,
87 grpc_millis deadline) {
88 const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms);
90 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
92 default_client_impl->connect(closure, ep, interested_parties, channel_args,
93 addr, deadline + delay_ms);
96 grpc_tcp_client_vtable delayed_connect = {tcp_client_connect_with_delay};
98 // Subclass of TestServiceImpl that increments a request counter for
99 // every call to the Echo RPC.
100 class MyTestServiceImpl : public TestServiceImpl {
102 Status Echo(ServerContext* context, const EchoRequest* request,
103 EchoResponse* response) override {
104 const udpa::data::orca::v1::OrcaLoadReport* load_report = nullptr;
106 grpc::internal::MutexLock lock(&mu_);
108 load_report = load_report_;
110 AddClient(context->peer());
111 if (load_report != nullptr) {
112 // TODO(roth): Once we provide a more standard server-side API for
113 // populating this data, use that API here.
114 context->AddTrailingMetadata("x-endpoint-load-metrics-bin",
115 load_report->SerializeAsString());
117 return TestServiceImpl::Echo(context, request, response);
120 int request_count() {
121 grpc::internal::MutexLock lock(&mu_);
122 return request_count_;
125 void ResetCounters() {
126 grpc::internal::MutexLock lock(&mu_);
130 std::set<std::string> clients() {
131 grpc::internal::MutexLock lock(&clients_mu_);
135 void set_load_report(udpa::data::orca::v1::OrcaLoadReport* load_report) {
136 grpc::internal::MutexLock lock(&mu_);
137 load_report_ = load_report;
141 void AddClient(const std::string& client) {
142 grpc::internal::MutexLock lock(&clients_mu_);
143 clients_.insert(client);
146 grpc::internal::Mutex mu_;
147 int request_count_ = 0;
148 const udpa::data::orca::v1::OrcaLoadReport* load_report_ = nullptr;
149 grpc::internal::Mutex clients_mu_;
150 std::set<std::string> clients_;
153 class FakeResolverResponseGeneratorWrapper {
155 FakeResolverResponseGeneratorWrapper()
156 : response_generator_(grpc_core::MakeRefCounted<
157 grpc_core::FakeResolverResponseGenerator>()) {}
159 FakeResolverResponseGeneratorWrapper(
160 FakeResolverResponseGeneratorWrapper&& other) noexcept {
161 response_generator_ = std::move(other.response_generator_);
164 void SetNextResolution(const std::vector<int>& ports,
165 const char* service_config_json = nullptr) {
166 grpc_core::ExecCtx exec_ctx;
167 response_generator_->SetResponse(
168 BuildFakeResults(ports, service_config_json));
171 void SetNextResolutionUponError(const std::vector<int>& ports) {
172 grpc_core::ExecCtx exec_ctx;
173 response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
176 void SetFailureOnReresolution() {
177 grpc_core::ExecCtx exec_ctx;
178 response_generator_->SetFailureOnReresolution();
181 grpc_core::FakeResolverResponseGenerator* Get() const {
182 return response_generator_.get();
186 static grpc_core::Resolver::Result BuildFakeResults(
187 const std::vector<int>& ports,
188 const char* service_config_json = nullptr) {
189 grpc_core::Resolver::Result result;
190 for (const int& port : ports) {
191 std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port);
192 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str.c_str(), true);
193 GPR_ASSERT(lb_uri != nullptr);
194 grpc_resolved_address address;
195 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
196 result.addresses.emplace_back(address.addr, address.len,
198 grpc_uri_destroy(lb_uri);
200 if (service_config_json != nullptr) {
201 result.service_config = grpc_core::ServiceConfig::Create(
202 service_config_json, &result.service_config_error);
203 GPR_ASSERT(result.service_config != nullptr);
208 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
212 class ClientLbEnd2endTest : public ::testing::Test {
214 ClientLbEnd2endTest()
215 : server_host_("localhost"),
216 kRequestMessage_("Live long and prosper."),
217 creds_(new SecureChannelCredentials(
218 grpc_fake_transport_security_credentials_create())) {}
220 static void SetUpTestCase() {
221 // Make the backup poller poll very frequently in order to pick up
222 // updates from all the subchannels's FDs.
223 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
225 // Workaround Apple CFStream bug
226 gpr_setenv("grpc_cfstream", "0");
230 void SetUp() override { grpc_init(); }
232 void TearDown() override {
233 for (size_t i = 0; i < servers_.size(); ++i) {
234 servers_[i]->Shutdown();
238 grpc_shutdown_blocking();
241 void CreateServers(size_t num_servers,
242 std::vector<int> ports = std::vector<int>()) {
244 for (size_t i = 0; i < num_servers; ++i) {
246 if (ports.size() == num_servers) port = ports[i];
247 servers_.emplace_back(new ServerData(port));
251 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
253 void StartServers(size_t num_servers,
254 std::vector<int> ports = std::vector<int>()) {
255 CreateServers(num_servers, std::move(ports));
256 for (size_t i = 0; i < num_servers; ++i) {
261 std::vector<int> GetServersPorts(size_t start_index = 0) {
262 std::vector<int> ports;
263 for (size_t i = start_index; i < servers_.size(); ++i) {
264 ports.push_back(servers_[i]->port_);
269 FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
270 return FakeResolverResponseGeneratorWrapper();
273 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
274 const std::shared_ptr<Channel>& channel) {
275 return grpc::testing::EchoTestService::NewStub(channel);
278 std::shared_ptr<Channel> BuildChannel(
279 const std::string& lb_policy_name,
280 const FakeResolverResponseGeneratorWrapper& response_generator,
281 ChannelArguments args = ChannelArguments()) {
282 if (lb_policy_name.size() > 0) {
283 args.SetLoadBalancingPolicyName(lb_policy_name);
284 } // else, default to pick first
285 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
286 response_generator.Get());
287 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
291 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
292 EchoResponse* response = nullptr, int timeout_ms = 1000,
293 Status* result = nullptr, bool wait_for_ready = false) {
294 const bool local_response = (response == nullptr);
295 if (local_response) response = new EchoResponse;
297 request.set_message(kRequestMessage_);
298 request.mutable_param()->set_echo_metadata(true);
299 ClientContext context;
300 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
301 if (wait_for_ready) context.set_wait_for_ready(true);
302 context.AddMetadata("foo", "1");
303 context.AddMetadata("bar", "2");
304 context.AddMetadata("baz", "3");
305 Status status = stub->Echo(&context, request, response);
306 if (result != nullptr) *result = status;
307 if (local_response) delete response;
312 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
313 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
314 EchoResponse response;
317 SendRpc(stub, &response, 2000, &status, wait_for_ready);
318 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
320 << "Error: " << status.error_message() << " "
321 << status.error_details();
322 ASSERT_EQ(response.message(), kRequestMessage_)
323 << "From " << location.file() << ":" << location.line();
324 if (!success) abort();
327 void CheckRpcSendFailure(
328 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
329 const bool success = SendRpc(stub);
330 EXPECT_FALSE(success);
335 std::unique_ptr<Server> server_;
336 MyTestServiceImpl service_;
337 std::unique_ptr<std::thread> thread_;
338 bool server_ready_ = false;
339 bool started_ = false;
341 explicit ServerData(int port = 0) {
342 port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
345 void Start(const std::string& server_host) {
346 gpr_log(GPR_INFO, "starting server on port %d", port_);
348 grpc::internal::Mutex mu;
349 grpc::internal::MutexLock lock(&mu);
350 grpc::internal::CondVar cond;
351 thread_.reset(new std::thread(
352 std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
353 cond.WaitUntil(&mu, [this] { return server_ready_; });
354 server_ready_ = false;
355 gpr_log(GPR_INFO, "server startup complete");
358 void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
359 grpc::internal::CondVar* cond) {
360 std::ostringstream server_address;
361 server_address << server_host << ":" << port_;
362 ServerBuilder builder;
363 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
364 grpc_fake_transport_security_server_credentials_create()));
365 builder.AddListeningPort(server_address.str(), std::move(creds));
366 builder.RegisterService(&service_);
367 server_ = builder.BuildAndStart();
368 grpc::internal::MutexLock lock(mu);
369 server_ready_ = true;
374 if (!started_) return;
375 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
380 void SetServingStatus(const std::string& service, bool serving) {
381 server_->GetHealthCheckService()->SetServingStatus(service, serving);
385 void ResetCounters() {
386 for (const auto& server : servers_) server->service_.ResetCounters();
390 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
391 size_t server_idx, const grpc_core::DebugLocation& location,
392 bool ignore_failure = false) {
394 if (ignore_failure) {
397 CheckRpcSendOk(stub, location, true);
399 } while (servers_[server_idx]->service_.request_count() == 0);
403 bool WaitForChannelState(
404 Channel* channel, std::function<bool(grpc_connectivity_state)> predicate,
405 bool try_to_connect = false, int timeout_seconds = 5) {
406 const gpr_timespec deadline =
407 grpc_timeout_seconds_to_deadline(timeout_seconds);
409 grpc_connectivity_state state = channel->GetState(try_to_connect);
410 if (predicate(state)) break;
411 if (!channel->WaitForStateChange(state, deadline)) return false;
416 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
417 auto predicate = [](grpc_connectivity_state state) {
418 return state != GRPC_CHANNEL_READY;
420 return WaitForChannelState(channel, predicate, false, timeout_seconds);
423 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
424 auto predicate = [](grpc_connectivity_state state) {
425 return state == GRPC_CHANNEL_READY;
427 return WaitForChannelState(channel, predicate, true, timeout_seconds);
430 bool SeenAllServers() {
431 for (const auto& server : servers_) {
432 if (server->service_.request_count() == 0) return false;
437 // Updates \a connection_order by appending to it the index of the newly
438 // connected server. Must be called after every single RPC.
439 void UpdateConnectionOrder(
440 const std::vector<std::unique_ptr<ServerData>>& servers,
441 std::vector<int>* connection_order) {
442 for (size_t i = 0; i < servers.size(); ++i) {
443 if (servers[i]->service_.request_count() == 1) {
444 // Was the server index known? If not, update connection_order.
446 std::find(connection_order->begin(), connection_order->end(), i);
447 if (it == connection_order->end()) {
448 connection_order->push_back(i);
455 const std::string server_host_;
456 std::vector<std::unique_ptr<ServerData>> servers_;
457 const std::string kRequestMessage_;
458 std::shared_ptr<ChannelCredentials> creds_;
461 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
462 const int kNumServers = 3;
463 StartServers(kNumServers);
464 auto response_generator = BuildResolverResponseGenerator();
465 auto channel = BuildChannel("", response_generator);
466 auto stub = BuildStub(channel);
467 // Initial state should be IDLE.
468 EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
469 // Tell the channel to try to connect.
470 // Note that this call also returns IDLE, since the state change has
471 // not yet occurred; it just gets triggered by this call.
472 EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
473 // Now that the channel is trying to connect, we should be in state
475 EXPECT_EQ(channel->GetState(false /* try_to_connect */),
476 GRPC_CHANNEL_CONNECTING);
477 // Return a resolver result, which allows the connection attempt to proceed.
478 response_generator.SetNextResolution(GetServersPorts());
479 // We should eventually transition into state READY.
480 EXPECT_TRUE(WaitForChannelReady(channel.get()));
483 TEST_F(ClientLbEnd2endTest, PickFirst) {
484 // Start servers and send one RPC per server.
485 const int kNumServers = 3;
486 StartServers(kNumServers);
487 auto response_generator = BuildResolverResponseGenerator();
488 auto channel = BuildChannel(
489 "", response_generator); // test that pick first is the default.
490 auto stub = BuildStub(channel);
491 response_generator.SetNextResolution(GetServersPorts());
492 for (size_t i = 0; i < servers_.size(); ++i) {
493 CheckRpcSendOk(stub, DEBUG_LOCATION);
495 // All requests should have gone to a single server.
497 for (size_t i = 0; i < servers_.size(); ++i) {
498 const int request_count = servers_[i]->service_.request_count();
499 if (request_count == kNumServers) {
502 EXPECT_EQ(0, request_count);
506 // Check LB policy name for the channel.
507 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
510 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
511 StartServers(1); // Single server
512 auto response_generator = BuildResolverResponseGenerator();
513 auto channel = BuildChannel(
514 "", response_generator); // test that pick first is the default.
515 auto stub = BuildStub(channel);
516 response_generator.SetNextResolution({servers_[0]->port_});
517 WaitForServer(stub, 0, DEBUG_LOCATION);
518 // Create a new channel and its corresponding PF LB policy, which will pick
519 // the subchannels in READY state from the previous RPC against the same
520 // target (even if it happened over a different channel, because subchannels
521 // are globally reused). Progress should happen without any transition from
523 auto second_response_generator = BuildResolverResponseGenerator();
524 auto second_channel = BuildChannel("", second_response_generator);
525 auto second_stub = BuildStub(second_channel);
526 second_response_generator.SetNextResolution({servers_[0]->port_});
527 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
530 TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
531 ChannelArguments args;
532 constexpr int kInitialBackOffMs = 5000;
533 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
534 // Create 2 servers, but start only the second one.
535 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
536 grpc_pick_unused_port_or_die()};
537 CreateServers(2, ports);
539 auto response_generator1 = BuildResolverResponseGenerator();
540 auto channel1 = BuildChannel("pick_first", response_generator1, args);
541 auto stub1 = BuildStub(channel1);
542 response_generator1.SetNextResolution(ports);
543 // Wait for second server to be ready.
544 WaitForServer(stub1, 1, DEBUG_LOCATION);
545 // Create a second channel with the same addresses. Its PF instance
546 // should immediately pick the second subchannel, since it's already
548 auto response_generator2 = BuildResolverResponseGenerator();
549 auto channel2 = BuildChannel("pick_first", response_generator2, args);
550 response_generator2.SetNextResolution(ports);
551 // Check that the channel reports READY without waiting for the
553 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
556 TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
557 ChannelArguments args;
558 constexpr int kInitialBackOffMs = 100;
559 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
560 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
561 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
562 auto response_generator = BuildResolverResponseGenerator();
563 auto channel = BuildChannel("pick_first", response_generator, args);
564 auto stub = BuildStub(channel);
565 response_generator.SetNextResolution(ports);
566 // The channel won't become connected (there's no server).
567 ASSERT_FALSE(channel->WaitForConnected(
568 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
569 // Bring up a server on the chosen port.
570 StartServers(1, ports);
572 ASSERT_TRUE(channel->WaitForConnected(
573 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
574 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
575 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
576 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
577 // We should have waited at least kInitialBackOffMs. We substract one to
578 // account for test and precision accuracy drift.
579 EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
580 // But not much more.
583 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
587 TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
588 ChannelArguments args;
589 constexpr int kMinReconnectBackOffMs = 1000;
590 args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
591 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
592 auto response_generator = BuildResolverResponseGenerator();
593 auto channel = BuildChannel("pick_first", response_generator, args);
594 auto stub = BuildStub(channel);
595 response_generator.SetNextResolution(ports);
596 // Make connection delay a 10% longer than it's willing to in order to make
597 // sure we are hitting the codepath that waits for the min reconnect backoff.
598 gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
599 default_client_impl = grpc_tcp_client_impl;
600 grpc_set_tcp_client_impl(&delayed_connect);
601 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
602 channel->WaitForConnected(
603 grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
604 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
605 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
606 gpr_log(GPR_DEBUG, "Waited %" PRId64 " ms", waited_ms);
607 // We should have waited at least kMinReconnectBackOffMs. We substract one to
608 // account for test and precision accuracy drift.
609 EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
610 gpr_atm_rel_store(&g_connection_delay_ms, 0);
613 TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
614 ChannelArguments args;
615 constexpr int kInitialBackOffMs = 1000;
616 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
617 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
618 auto response_generator = BuildResolverResponseGenerator();
619 auto channel = BuildChannel("pick_first", response_generator, args);
620 auto stub = BuildStub(channel);
621 response_generator.SetNextResolution(ports);
622 // The channel won't become connected (there's no server).
624 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
625 // Bring up a server on the chosen port.
626 StartServers(1, ports);
627 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
628 // Wait for connect, but not long enough. This proves that we're
629 // being throttled by initial backoff.
631 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
632 // Reset connection backoff.
633 experimental::ChannelResetConnectionBackoff(channel.get());
634 // Wait for connect. Should happen as soon as the client connects to
635 // the newly started server, which should be before the initial
636 // backoff timeout elapses.
638 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(20)));
639 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
640 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
641 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
642 // We should have waited less than kInitialBackOffMs.
643 EXPECT_LT(waited_ms, kInitialBackOffMs);
646 TEST_F(ClientLbEnd2endTest,
647 PickFirstResetConnectionBackoffNextAttemptStartsImmediately) {
648 ChannelArguments args;
649 constexpr int kInitialBackOffMs = 1000;
650 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
651 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
652 auto response_generator = BuildResolverResponseGenerator();
653 auto channel = BuildChannel("pick_first", response_generator, args);
654 auto stub = BuildStub(channel);
655 response_generator.SetNextResolution(ports);
656 // Wait for connect, which should fail ~immediately, because the server
658 gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
660 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
661 // Reset connection backoff.
662 // Note that the time at which the third attempt will be started is
663 // actually computed at this point, so we record the start time here.
664 gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
665 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
666 experimental::ChannelResetConnectionBackoff(channel.get());
667 // Trigger a second connection attempt. This should also fail
668 // ~immediately, but the retry should be scheduled for
669 // kInitialBackOffMs instead of applying the multiplier.
670 gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
672 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
673 // Bring up a server on the chosen port.
674 gpr_log(GPR_INFO, "=== STARTING BACKEND");
675 StartServers(1, ports);
676 // Wait for connect. Should happen within kInitialBackOffMs.
677 // Give an extra 100ms to account for the time spent in the second and
678 // third connection attempts themselves (since what we really want to
679 // measure is the time between the two). As long as this is less than
680 // the 1.6x increase we would see if the backoff state was not reset
681 // properly, the test is still proving that the backoff was reset.
682 constexpr int kWaitMs = kInitialBackOffMs + 100;
683 gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
684 EXPECT_TRUE(channel->WaitForConnected(
685 grpc_timeout_milliseconds_to_deadline(kWaitMs)));
686 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
687 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
688 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
689 EXPECT_LT(waited_ms, kWaitMs);
692 TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
693 // Start servers and send one RPC per server.
694 const int kNumServers = 3;
695 StartServers(kNumServers);
696 auto response_generator = BuildResolverResponseGenerator();
697 auto channel = BuildChannel("pick_first", response_generator);
698 auto stub = BuildStub(channel);
700 std::vector<int> ports;
702 // Perform one RPC against the first server.
703 ports.emplace_back(servers_[0]->port_);
704 response_generator.SetNextResolution(ports);
705 gpr_log(GPR_INFO, "****** SET [0] *******");
706 CheckRpcSendOk(stub, DEBUG_LOCATION);
707 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
709 // An empty update will result in the channel going into TRANSIENT_FAILURE.
711 response_generator.SetNextResolution(ports);
712 gpr_log(GPR_INFO, "****** SET none *******");
713 grpc_connectivity_state channel_state;
715 channel_state = channel->GetState(true /* try to connect */);
716 } while (channel_state == GRPC_CHANNEL_READY);
717 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
718 servers_[0]->service_.ResetCounters();
720 // Next update introduces servers_[1], making the channel recover.
722 ports.emplace_back(servers_[1]->port_);
723 response_generator.SetNextResolution(ports);
724 gpr_log(GPR_INFO, "****** SET [1] *******");
725 WaitForServer(stub, 1, DEBUG_LOCATION);
726 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
728 // And again for servers_[2]
730 ports.emplace_back(servers_[2]->port_);
731 response_generator.SetNextResolution(ports);
732 gpr_log(GPR_INFO, "****** SET [2] *******");
733 WaitForServer(stub, 2, DEBUG_LOCATION);
734 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
735 EXPECT_EQ(servers_[1]->service_.request_count(), 0);
737 // Check LB policy name for the channel.
738 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
741 TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
742 // Start servers and send one RPC per server.
743 const int kNumServers = 3;
744 StartServers(kNumServers);
745 auto response_generator = BuildResolverResponseGenerator();
746 auto channel = BuildChannel("pick_first", response_generator);
747 auto stub = BuildStub(channel);
749 std::vector<int> ports;
751 // Perform one RPC against the first server.
752 ports.emplace_back(servers_[0]->port_);
753 response_generator.SetNextResolution(ports);
754 gpr_log(GPR_INFO, "****** SET [0] *******");
755 CheckRpcSendOk(stub, DEBUG_LOCATION);
756 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
757 servers_[0]->service_.ResetCounters();
759 // Send and superset update
761 ports.emplace_back(servers_[1]->port_);
762 ports.emplace_back(servers_[0]->port_);
763 response_generator.SetNextResolution(ports);
764 gpr_log(GPR_INFO, "****** SET superset *******");
765 CheckRpcSendOk(stub, DEBUG_LOCATION);
766 // We stick to the previously connected server.
767 WaitForServer(stub, 0, DEBUG_LOCATION);
768 EXPECT_EQ(0, servers_[1]->service_.request_count());
770 // Check LB policy name for the channel.
771 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
774 TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) {
776 const int kNumServers = 1;
777 StartServers(kNumServers);
778 std::vector<int> ports = GetServersPorts();
779 // Create two channels that (by default) use the global subchannel pool.
780 auto response_generator1 = BuildResolverResponseGenerator();
781 auto channel1 = BuildChannel("pick_first", response_generator1);
782 auto stub1 = BuildStub(channel1);
783 response_generator1.SetNextResolution(ports);
784 auto response_generator2 = BuildResolverResponseGenerator();
785 auto channel2 = BuildChannel("pick_first", response_generator2);
786 auto stub2 = BuildStub(channel2);
787 response_generator2.SetNextResolution(ports);
788 WaitForServer(stub1, 0, DEBUG_LOCATION);
789 // Send one RPC on each channel.
790 CheckRpcSendOk(stub1, DEBUG_LOCATION);
791 CheckRpcSendOk(stub2, DEBUG_LOCATION);
792 // The server receives two requests.
793 EXPECT_EQ(2, servers_[0]->service_.request_count());
794 // The two requests are from the same client port, because the two channels
795 // share subchannels via the global subchannel pool.
796 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
799 TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) {
801 const int kNumServers = 1;
802 StartServers(kNumServers);
803 std::vector<int> ports = GetServersPorts();
804 // Create two channels that use local subchannel pool.
805 ChannelArguments args;
806 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
807 auto response_generator1 = BuildResolverResponseGenerator();
808 auto channel1 = BuildChannel("pick_first", response_generator1, args);
809 auto stub1 = BuildStub(channel1);
810 response_generator1.SetNextResolution(ports);
811 auto response_generator2 = BuildResolverResponseGenerator();
812 auto channel2 = BuildChannel("pick_first", response_generator2, args);
813 auto stub2 = BuildStub(channel2);
814 response_generator2.SetNextResolution(ports);
815 WaitForServer(stub1, 0, DEBUG_LOCATION);
816 // Send one RPC on each channel.
817 CheckRpcSendOk(stub1, DEBUG_LOCATION);
818 CheckRpcSendOk(stub2, DEBUG_LOCATION);
819 // The server receives two requests.
820 EXPECT_EQ(2, servers_[0]->service_.request_count());
821 // The two requests are from two client ports, because the two channels didn't
822 // share subchannels with each other.
823 EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
826 TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
827 const int kNumUpdates = 1000;
828 const int kNumServers = 3;
829 StartServers(kNumServers);
830 auto response_generator = BuildResolverResponseGenerator();
831 auto channel = BuildChannel("pick_first", response_generator);
832 auto stub = BuildStub(channel);
833 std::vector<int> ports = GetServersPorts();
834 for (size_t i = 0; i < kNumUpdates; ++i) {
835 std::shuffle(ports.begin(), ports.end(),
836 std::mt19937(std::random_device()()));
837 response_generator.SetNextResolution(ports);
838 // We should re-enter core at the end of the loop to give the resolution
839 // setting closure a chance to run.
840 if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
842 // Check LB policy name for the channel.
843 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
846 TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
847 // Prepare the ports for up servers and down servers.
848 const int kNumServers = 3;
849 const int kNumAliveServers = 1;
850 StartServers(kNumAliveServers);
851 std::vector<int> alive_ports, dead_ports;
852 for (size_t i = 0; i < kNumServers; ++i) {
853 if (i < kNumAliveServers) {
854 alive_ports.emplace_back(servers_[i]->port_);
856 dead_ports.emplace_back(grpc_pick_unused_port_or_die());
859 auto response_generator = BuildResolverResponseGenerator();
860 auto channel = BuildChannel("pick_first", response_generator);
861 auto stub = BuildStub(channel);
862 // The initial resolution only contains dead ports. There won't be any
863 // selected subchannel. Re-resolution will return the same result.
864 response_generator.SetNextResolution(dead_ports);
865 gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
866 for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
867 // Set a re-resolution result that contains reachable ports, so that the
868 // pick_first LB policy can recover soon.
869 response_generator.SetNextResolutionUponError(alive_ports);
870 gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
871 WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
872 CheckRpcSendOk(stub, DEBUG_LOCATION);
873 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
874 // Check LB policy name for the channel.
875 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
878 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
879 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
880 StartServers(1, ports);
881 auto response_generator = BuildResolverResponseGenerator();
882 auto channel = BuildChannel("pick_first", response_generator);
883 auto stub = BuildStub(channel);
884 response_generator.SetNextResolution(ports);
885 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
886 WaitForServer(stub, 0, DEBUG_LOCATION);
887 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
888 servers_[0]->Shutdown();
889 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
890 gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
891 StartServers(1, ports);
892 WaitForServer(stub, 0, DEBUG_LOCATION);
895 TEST_F(ClientLbEnd2endTest,
896 PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
897 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
898 grpc_pick_unused_port_or_die()};
899 CreateServers(2, ports);
901 auto response_generator = BuildResolverResponseGenerator();
902 auto channel = BuildChannel("pick_first", response_generator);
903 auto stub = BuildStub(channel);
904 response_generator.SetNextResolution(ports);
905 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
906 WaitForServer(stub, 1, DEBUG_LOCATION);
907 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
908 servers_[1]->Shutdown();
909 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
910 gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
911 StartServers(2, ports);
912 WaitForServer(stub, 0, DEBUG_LOCATION);
915 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
916 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
917 StartServers(1, ports);
918 auto response_generator = BuildResolverResponseGenerator();
919 auto channel_1 = BuildChannel("pick_first", response_generator);
920 auto stub_1 = BuildStub(channel_1);
921 response_generator.SetNextResolution(ports);
922 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
923 WaitForServer(stub_1, 0, DEBUG_LOCATION);
924 gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
925 servers_[0]->Shutdown();
926 // Channel 1 will receive a re-resolution containing the same server. It will
927 // create a new subchannel and hold a ref to it.
928 StartServers(1, ports);
929 gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
930 auto response_generator_2 = BuildResolverResponseGenerator();
931 auto channel_2 = BuildChannel("pick_first", response_generator_2);
932 auto stub_2 = BuildStub(channel_2);
933 response_generator_2.SetNextResolution(ports);
934 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
935 WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
936 gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
937 servers_[0]->Shutdown();
938 // Wait until the disconnection has triggered the connectivity notification.
939 // Otherwise, the subchannel may be picked for next call but will fail soon.
940 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
941 // Channel 2 will also receive a re-resolution containing the same server.
942 // Both channels will ref the same subchannel that failed.
943 StartServers(1, ports);
944 gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
945 gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
946 // The first call after the server restart will succeed.
947 CheckRpcSendOk(stub_2, DEBUG_LOCATION);
948 gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
949 // Check LB policy name for the channel.
950 EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
951 // Check LB policy name for the channel.
952 EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
955 TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
956 // Start server, send RPC, and make sure channel is READY.
957 const int kNumServers = 1;
958 StartServers(kNumServers);
959 auto response_generator = BuildResolverResponseGenerator();
961 BuildChannel("", response_generator); // pick_first is the default.
962 auto stub = BuildStub(channel);
963 response_generator.SetNextResolution(GetServersPorts());
964 CheckRpcSendOk(stub, DEBUG_LOCATION);
965 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
966 // Stop server. Channel should go into state IDLE.
967 response_generator.SetFailureOnReresolution();
968 servers_[0]->Shutdown();
969 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
970 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
974 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
975 auto response_generator = BuildResolverResponseGenerator();
977 BuildChannel("", response_generator); // pick_first is the default.
978 auto stub = BuildStub(channel);
979 // Create a number of servers, but only start 1 of them.
982 // Initially resolve to first server and make sure it connects.
983 gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
984 response_generator.SetNextResolution({servers_[0]->port_});
985 CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
986 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
987 // Send a resolution update with the remaining servers, none of which are
988 // running yet, so the update will stay pending. Note that it's important
989 // to have multiple servers here, or else the test will be flaky; with only
990 // one server, the pending subchannel list has already gone into
991 // TRANSIENT_FAILURE due to hitting the end of the list by the time we
994 "Phase 2: Resolver update pointing to remaining "
995 "(not started) servers.");
996 response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
997 // RPCs will continue to be sent to the first server.
998 CheckRpcSendOk(stub, DEBUG_LOCATION);
999 // Now stop the first server, so that the current subchannel list
1000 // fails. This should cause us to immediately swap over to the
1001 // pending list, even though it's not yet connected. The state should
1002 // be set to CONNECTING, since that's what the pending subchannel list
1003 // was doing when we swapped over.
1004 gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
1005 servers_[0]->Shutdown();
1006 WaitForChannelNotReady(channel.get());
1007 // TODO(roth): This should always return CONNECTING, but it's flaky
1008 // between that and TRANSIENT_FAILURE. I suspect that this problem
1009 // will go away once we move the backoff code out of the subchannel
1010 // and into the LB policies.
1011 EXPECT_THAT(channel->GetState(false),
1012 ::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
1013 GRPC_CHANNEL_TRANSIENT_FAILURE));
1014 // Now start the second server.
1015 gpr_log(GPR_INFO, "Phase 4: Starting second server.");
1017 // The channel should go to READY state and RPCs should go to the
1019 WaitForChannelReady(channel.get());
1020 WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
1023 TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) {
1024 // Start server, send RPC, and make sure channel is READY.
1025 const int kNumServers = 1;
1026 StartServers(kNumServers);
1027 auto response_generator = BuildResolverResponseGenerator();
1029 BuildChannel("", response_generator); // pick_first is the default.
1030 auto stub = BuildStub(channel);
1031 response_generator.SetNextResolution(GetServersPorts());
1032 CheckRpcSendOk(stub, DEBUG_LOCATION);
1033 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1034 // Stop server. Channel should go into state IDLE.
1035 servers_[0]->Shutdown();
1036 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1037 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1038 // Now send resolver update that includes no addresses. Channel
1039 // should stay in state IDLE.
1040 response_generator.SetNextResolution({});
1041 EXPECT_FALSE(channel->WaitForStateChange(
1042 GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1043 // Now bring the backend back up and send a non-empty resolver update,
1044 // and then try to send an RPC. Channel should go back into state READY.
1046 response_generator.SetNextResolution(GetServersPorts());
1047 CheckRpcSendOk(stub, DEBUG_LOCATION);
1048 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1051 TEST_F(ClientLbEnd2endTest, RoundRobin) {
1052 // Start servers and send one RPC per server.
1053 const int kNumServers = 3;
1054 StartServers(kNumServers);
1055 auto response_generator = BuildResolverResponseGenerator();
1056 auto channel = BuildChannel("round_robin", response_generator);
1057 auto stub = BuildStub(channel);
1058 response_generator.SetNextResolution(GetServersPorts());
1059 // Wait until all backends are ready.
1061 CheckRpcSendOk(stub, DEBUG_LOCATION);
1062 } while (!SeenAllServers());
1064 // "Sync" to the end of the list. Next sequence of picks will start at the
1065 // first server (index 0).
1066 WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
1067 std::vector<int> connection_order;
1068 for (size_t i = 0; i < servers_.size(); ++i) {
1069 CheckRpcSendOk(stub, DEBUG_LOCATION);
1070 UpdateConnectionOrder(servers_, &connection_order);
1072 // Backends should be iterated over in the order in which the addresses were
1074 const auto expected = std::vector<int>{0, 1, 2};
1075 EXPECT_EQ(expected, connection_order);
1076 // Check LB policy name for the channel.
1077 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1080 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
1081 StartServers(1); // Single server
1082 auto response_generator = BuildResolverResponseGenerator();
1083 auto channel = BuildChannel("round_robin", response_generator);
1084 auto stub = BuildStub(channel);
1085 response_generator.SetNextResolution({servers_[0]->port_});
1086 WaitForServer(stub, 0, DEBUG_LOCATION);
1087 // Create a new channel and its corresponding RR LB policy, which will pick
1088 // the subchannels in READY state from the previous RPC against the same
1089 // target (even if it happened over a different channel, because subchannels
1090 // are globally reused). Progress should happen without any transition from
1091 // this READY state.
1092 auto second_response_generator = BuildResolverResponseGenerator();
1093 auto second_channel = BuildChannel("round_robin", second_response_generator);
1094 auto second_stub = BuildStub(second_channel);
1095 second_response_generator.SetNextResolution({servers_[0]->port_});
1096 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
1099 TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
1100 // Start servers and send one RPC per server.
1101 const int kNumServers = 3;
1102 StartServers(kNumServers);
1103 auto response_generator = BuildResolverResponseGenerator();
1104 auto channel = BuildChannel("round_robin", response_generator);
1105 auto stub = BuildStub(channel);
1106 std::vector<int> ports;
1107 // Start with a single server.
1108 gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1109 ports.emplace_back(servers_[0]->port_);
1110 response_generator.SetNextResolution(ports);
1111 WaitForServer(stub, 0, DEBUG_LOCATION);
1112 // Send RPCs. They should all go servers_[0]
1113 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1114 EXPECT_EQ(10, servers_[0]->service_.request_count());
1115 EXPECT_EQ(0, servers_[1]->service_.request_count());
1116 EXPECT_EQ(0, servers_[2]->service_.request_count());
1117 servers_[0]->service_.ResetCounters();
1118 // And now for the second server.
1119 gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1121 ports.emplace_back(servers_[1]->port_);
1122 response_generator.SetNextResolution(ports);
1123 // Wait until update has been processed, as signaled by the second backend
1124 // receiving a request.
1125 EXPECT_EQ(0, servers_[1]->service_.request_count());
1126 WaitForServer(stub, 1, DEBUG_LOCATION);
1127 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1128 EXPECT_EQ(0, servers_[0]->service_.request_count());
1129 EXPECT_EQ(10, servers_[1]->service_.request_count());
1130 EXPECT_EQ(0, servers_[2]->service_.request_count());
1131 servers_[1]->service_.ResetCounters();
1132 // ... and for the last server.
1133 gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1135 ports.emplace_back(servers_[2]->port_);
1136 response_generator.SetNextResolution(ports);
1137 WaitForServer(stub, 2, DEBUG_LOCATION);
1138 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1139 EXPECT_EQ(0, servers_[0]->service_.request_count());
1140 EXPECT_EQ(0, servers_[1]->service_.request_count());
1141 EXPECT_EQ(10, servers_[2]->service_.request_count());
1142 servers_[2]->service_.ResetCounters();
1143 // Back to all servers.
1144 gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1146 ports.emplace_back(servers_[0]->port_);
1147 ports.emplace_back(servers_[1]->port_);
1148 ports.emplace_back(servers_[2]->port_);
1149 response_generator.SetNextResolution(ports);
1150 WaitForServer(stub, 0, DEBUG_LOCATION);
1151 WaitForServer(stub, 1, DEBUG_LOCATION);
1152 WaitForServer(stub, 2, DEBUG_LOCATION);
1153 // Send three RPCs, one per server.
1154 for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1155 EXPECT_EQ(1, servers_[0]->service_.request_count());
1156 EXPECT_EQ(1, servers_[1]->service_.request_count());
1157 EXPECT_EQ(1, servers_[2]->service_.request_count());
1158 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1159 gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1161 response_generator.SetNextResolution(ports);
1162 grpc_connectivity_state channel_state;
1164 channel_state = channel->GetState(true /* try to connect */);
1165 } while (channel_state == GRPC_CHANNEL_READY);
1166 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
1167 servers_[0]->service_.ResetCounters();
1168 // Next update introduces servers_[1], making the channel recover.
1169 gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1171 ports.emplace_back(servers_[1]->port_);
1172 response_generator.SetNextResolution(ports);
1173 WaitForServer(stub, 1, DEBUG_LOCATION);
1174 channel_state = channel->GetState(false /* try to connect */);
1175 ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
1176 // Check LB policy name for the channel.
1177 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1180 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
1181 const int kNumServers = 3;
1182 StartServers(kNumServers);
1183 auto response_generator = BuildResolverResponseGenerator();
1184 auto channel = BuildChannel("round_robin", response_generator);
1185 auto stub = BuildStub(channel);
1186 std::vector<int> ports;
1187 // Start with a single server.
1188 ports.emplace_back(servers_[0]->port_);
1189 response_generator.SetNextResolution(ports);
1190 WaitForServer(stub, 0, DEBUG_LOCATION);
1191 // Send RPCs. They should all go to servers_[0]
1192 for (size_t i = 0; i < 10; ++i) SendRpc(stub);
1193 EXPECT_EQ(10, servers_[0]->service_.request_count());
1194 EXPECT_EQ(0, servers_[1]->service_.request_count());
1195 EXPECT_EQ(0, servers_[2]->service_.request_count());
1196 servers_[0]->service_.ResetCounters();
1197 // Shutdown one of the servers to be sent in the update.
1198 servers_[1]->Shutdown();
1199 ports.emplace_back(servers_[1]->port_);
1200 ports.emplace_back(servers_[2]->port_);
1201 response_generator.SetNextResolution(ports);
1202 WaitForServer(stub, 0, DEBUG_LOCATION);
1203 WaitForServer(stub, 2, DEBUG_LOCATION);
1204 // Send three RPCs, one per server.
1205 for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
1206 // The server in shutdown shouldn't receive any.
1207 EXPECT_EQ(0, servers_[1]->service_.request_count());
1210 TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
1211 // Start servers and send one RPC per server.
1212 const int kNumServers = 3;
1213 StartServers(kNumServers);
1214 auto response_generator = BuildResolverResponseGenerator();
1215 auto channel = BuildChannel("round_robin", response_generator);
1216 auto stub = BuildStub(channel);
1217 std::vector<int> ports = GetServersPorts();
1218 for (size_t i = 0; i < 1000; ++i) {
1219 std::shuffle(ports.begin(), ports.end(),
1220 std::mt19937(std::random_device()()));
1221 response_generator.SetNextResolution(ports);
1222 if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
1224 // Check LB policy name for the channel.
1225 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1228 TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
1229 // TODO(dgq): replicate the way internal testing exercises the concurrent
1230 // update provisions of RR.
1233 TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
1234 // Start servers and send one RPC per server.
1235 const int kNumServers = 3;
1236 std::vector<int> first_ports;
1237 std::vector<int> second_ports;
1238 first_ports.reserve(kNumServers);
1239 for (int i = 0; i < kNumServers; ++i) {
1240 first_ports.push_back(grpc_pick_unused_port_or_die());
1242 second_ports.reserve(kNumServers);
1243 for (int i = 0; i < kNumServers; ++i) {
1244 second_ports.push_back(grpc_pick_unused_port_or_die());
1246 StartServers(kNumServers, first_ports);
1247 auto response_generator = BuildResolverResponseGenerator();
1248 auto channel = BuildChannel("round_robin", response_generator);
1249 auto stub = BuildStub(channel);
1250 response_generator.SetNextResolution(first_ports);
1251 // Send a number of RPCs, which succeed.
1252 for (size_t i = 0; i < 100; ++i) {
1253 CheckRpcSendOk(stub, DEBUG_LOCATION);
1256 gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
1257 for (size_t i = 0; i < servers_.size(); ++i) {
1258 servers_[i]->Shutdown();
1260 gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
1261 gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
1262 // Client requests should fail. Send enough to tickle all subchannels.
1263 for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
1264 gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
1265 // Bring servers back up on a different set of ports. We need to do this to be
1266 // sure that the eventual success is *not* due to subchannel reconnection
1267 // attempts and that an actual re-resolution has happened as a result of the
1268 // RR policy going into transient failure when all its subchannels become
1269 // unavailable (in transient failure as well).
1270 gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
1271 StartServers(kNumServers, second_ports);
1272 // Don't notify of the update. Wait for the LB policy's re-resolution to
1273 // "pull" the new ports.
1274 response_generator.SetNextResolutionUponError(second_ports);
1275 gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
1276 gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
1277 // Client request should eventually (but still fairly soon) succeed.
1278 const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
1279 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1280 while (gpr_time_cmp(deadline, now) > 0) {
1281 if (SendRpc(stub)) break;
1282 now = gpr_now(GPR_CLOCK_MONOTONIC);
1284 ASSERT_GT(gpr_time_cmp(deadline, now), 0);
1287 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) {
1288 // Start servers and create channel. Channel should go to READY state.
1289 const int kNumServers = 3;
1290 StartServers(kNumServers);
1291 auto response_generator = BuildResolverResponseGenerator();
1292 auto channel = BuildChannel("round_robin", response_generator);
1293 auto stub = BuildStub(channel);
1294 response_generator.SetNextResolution(GetServersPorts());
1295 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1296 // Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
1297 // TODO(roth): This test should ideally check that even when the
1298 // subchannels are in state CONNECTING for an extended period of time,
1299 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1300 // currently have a good way to get a subchannel to report CONNECTING
1301 // for a long period of time, since the servers in this test framework
1302 // are on the loopback interface, which will immediately return a
1303 // "Connection refused" error, so the subchannels will only be in
1304 // CONNECTING state very briefly. When we have time, see if we can
1305 // find a way to fix this.
1306 for (size_t i = 0; i < servers_.size(); ++i) {
1307 servers_[i]->Shutdown();
1309 auto predicate = [](grpc_connectivity_state state) {
1310 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1312 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1315 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) {
1316 // Create channel and return servers that don't exist. Channel should
1317 // quickly transition into TRANSIENT_FAILURE.
1318 // TODO(roth): This test should ideally check that even when the
1319 // subchannels are in state CONNECTING for an extended period of time,
1320 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1321 // currently have a good way to get a subchannel to report CONNECTING
1322 // for a long period of time, since the servers in this test framework
1323 // are on the loopback interface, which will immediately return a
1324 // "Connection refused" error, so the subchannels will only be in
1325 // CONNECTING state very briefly. When we have time, see if we can
1326 // find a way to fix this.
1327 auto response_generator = BuildResolverResponseGenerator();
1328 auto channel = BuildChannel("round_robin", response_generator);
1329 auto stub = BuildStub(channel);
1330 response_generator.SetNextResolution({
1331 grpc_pick_unused_port_or_die(),
1332 grpc_pick_unused_port_or_die(),
1333 grpc_pick_unused_port_or_die(),
1335 for (size_t i = 0; i < servers_.size(); ++i) {
1336 servers_[i]->Shutdown();
1338 auto predicate = [](grpc_connectivity_state state) {
1339 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1341 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1344 TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
1345 const int kNumServers = 3;
1346 StartServers(kNumServers);
1347 const auto ports = GetServersPorts();
1348 auto response_generator = BuildResolverResponseGenerator();
1349 auto channel = BuildChannel("round_robin", response_generator);
1350 auto stub = BuildStub(channel);
1351 response_generator.SetNextResolution(ports);
1352 for (size_t i = 0; i < kNumServers; ++i) {
1353 WaitForServer(stub, i, DEBUG_LOCATION);
1355 for (size_t i = 0; i < servers_.size(); ++i) {
1356 CheckRpcSendOk(stub, DEBUG_LOCATION);
1357 EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1359 // One request should have gone to each server.
1360 for (size_t i = 0; i < servers_.size(); ++i) {
1361 EXPECT_EQ(1, servers_[i]->service_.request_count());
1363 const auto pre_death = servers_[0]->service_.request_count();
1364 // Kill the first server.
1365 servers_[0]->Shutdown();
1366 // Client request still succeed. May need retrying if RR had returned a pick
1367 // before noticing the change in the server's connectivity.
1368 while (!SendRpc(stub)) {
1369 } // Retry until success.
1370 // Send a bunch of RPCs that should succeed.
1371 for (int i = 0; i < 10 * kNumServers; ++i) {
1372 CheckRpcSendOk(stub, DEBUG_LOCATION);
1374 const auto post_death = servers_[0]->service_.request_count();
1375 // No requests have gone to the deceased server.
1376 EXPECT_EQ(pre_death, post_death);
1377 // Bring the first server back up.
1379 // Requests should start arriving at the first server either right away (if
1380 // the server managed to start before the RR policy retried the subchannel) or
1381 // after the subchannel retry delay otherwise (RR's subchannel retried before
1382 // the server was fully back up).
1383 WaitForServer(stub, 0, DEBUG_LOCATION);
1386 // If health checking is required by client but health checking service
1387 // is not running on the server, the channel should be treated as healthy.
1388 TEST_F(ClientLbEnd2endTest,
1389 RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) {
1390 StartServers(1); // Single server
1391 ChannelArguments args;
1392 args.SetServiceConfigJSON(
1393 "{\"healthCheckConfig\": "
1394 "{\"serviceName\": \"health_check_service_name\"}}");
1395 auto response_generator = BuildResolverResponseGenerator();
1396 auto channel = BuildChannel("round_robin", response_generator, args);
1397 auto stub = BuildStub(channel);
1398 response_generator.SetNextResolution({servers_[0]->port_});
1399 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1400 CheckRpcSendOk(stub, DEBUG_LOCATION);
1403 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) {
1404 EnableDefaultHealthCheckService(true);
1406 const int kNumServers = 3;
1407 StartServers(kNumServers);
1408 ChannelArguments args;
1409 args.SetServiceConfigJSON(
1410 "{\"healthCheckConfig\": "
1411 "{\"serviceName\": \"health_check_service_name\"}}");
1412 auto response_generator = BuildResolverResponseGenerator();
1413 auto channel = BuildChannel("round_robin", response_generator, args);
1414 auto stub = BuildStub(channel);
1415 response_generator.SetNextResolution(GetServersPorts());
1416 // Channel should not become READY, because health checks should be failing.
1418 "*** initial state: unknown health check service name for "
1420 EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1421 // Now set one of the servers to be healthy.
1422 // The channel should become healthy and all requests should go to
1423 // the healthy server.
1424 gpr_log(GPR_INFO, "*** server 0 healthy");
1425 servers_[0]->SetServingStatus("health_check_service_name", true);
1426 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1427 for (int i = 0; i < 10; ++i) {
1428 CheckRpcSendOk(stub, DEBUG_LOCATION);
1430 EXPECT_EQ(10, servers_[0]->service_.request_count());
1431 EXPECT_EQ(0, servers_[1]->service_.request_count());
1432 EXPECT_EQ(0, servers_[2]->service_.request_count());
1433 // Now set a second server to be healthy.
1434 gpr_log(GPR_INFO, "*** server 2 healthy");
1435 servers_[2]->SetServingStatus("health_check_service_name", true);
1436 WaitForServer(stub, 2, DEBUG_LOCATION);
1437 for (int i = 0; i < 10; ++i) {
1438 CheckRpcSendOk(stub, DEBUG_LOCATION);
1440 EXPECT_EQ(5, servers_[0]->service_.request_count());
1441 EXPECT_EQ(0, servers_[1]->service_.request_count());
1442 EXPECT_EQ(5, servers_[2]->service_.request_count());
1443 // Now set the remaining server to be healthy.
1444 gpr_log(GPR_INFO, "*** server 1 healthy");
1445 servers_[1]->SetServingStatus("health_check_service_name", true);
1446 WaitForServer(stub, 1, DEBUG_LOCATION);
1447 for (int i = 0; i < 9; ++i) {
1448 CheckRpcSendOk(stub, DEBUG_LOCATION);
1450 EXPECT_EQ(3, servers_[0]->service_.request_count());
1451 EXPECT_EQ(3, servers_[1]->service_.request_count());
1452 EXPECT_EQ(3, servers_[2]->service_.request_count());
1453 // Now set one server to be unhealthy again. Then wait until the
1454 // unhealthiness has hit the client. We know that the client will see
1455 // this when we send kNumServers requests and one of the remaining servers
1456 // sees two of the requests.
1457 gpr_log(GPR_INFO, "*** server 0 unhealthy");
1458 servers_[0]->SetServingStatus("health_check_service_name", false);
1461 for (int i = 0; i < kNumServers; ++i) {
1462 CheckRpcSendOk(stub, DEBUG_LOCATION);
1464 } while (servers_[1]->service_.request_count() != 2 &&
1465 servers_[2]->service_.request_count() != 2);
1466 // Now set the remaining two servers to be unhealthy. Make sure the
1467 // channel leaves READY state and that RPCs fail.
1468 gpr_log(GPR_INFO, "*** all servers unhealthy");
1469 servers_[1]->SetServingStatus("health_check_service_name", false);
1470 servers_[2]->SetServingStatus("health_check_service_name", false);
1471 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1472 CheckRpcSendFailure(stub);
1474 EnableDefaultHealthCheckService(false);
1477 TEST_F(ClientLbEnd2endTest,
1478 RoundRobinWithHealthCheckingHandlesSubchannelFailure) {
1479 EnableDefaultHealthCheckService(true);
1481 const int kNumServers = 3;
1482 StartServers(kNumServers);
1483 servers_[0]->SetServingStatus("health_check_service_name", true);
1484 servers_[1]->SetServingStatus("health_check_service_name", true);
1485 servers_[2]->SetServingStatus("health_check_service_name", true);
1486 ChannelArguments args;
1487 args.SetServiceConfigJSON(
1488 "{\"healthCheckConfig\": "
1489 "{\"serviceName\": \"health_check_service_name\"}}");
1490 auto response_generator = BuildResolverResponseGenerator();
1491 auto channel = BuildChannel("round_robin", response_generator, args);
1492 auto stub = BuildStub(channel);
1493 response_generator.SetNextResolution(GetServersPorts());
1494 WaitForServer(stub, 0, DEBUG_LOCATION);
1495 // Stop server 0 and send a new resolver result to ensure that RR
1496 // checks each subchannel's state.
1497 servers_[0]->Shutdown();
1498 response_generator.SetNextResolution(GetServersPorts());
1499 // Send a bunch more RPCs.
1500 for (size_t i = 0; i < 100; i++) {
1505 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
1506 EnableDefaultHealthCheckService(true);
1508 const int kNumServers = 1;
1509 StartServers(kNumServers);
1510 // Create a channel with health-checking enabled.
1511 ChannelArguments args;
1512 args.SetServiceConfigJSON(
1513 "{\"healthCheckConfig\": "
1514 "{\"serviceName\": \"health_check_service_name\"}}");
1515 auto response_generator1 = BuildResolverResponseGenerator();
1516 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1517 auto stub1 = BuildStub(channel1);
1518 std::vector<int> ports = GetServersPorts();
1519 response_generator1.SetNextResolution(ports);
1520 // Create a channel with health checking enabled but inhibited.
1521 args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1522 auto response_generator2 = BuildResolverResponseGenerator();
1523 auto channel2 = BuildChannel("round_robin", response_generator2, args);
1524 auto stub2 = BuildStub(channel2);
1525 response_generator2.SetNextResolution(ports);
1526 // First channel should not become READY, because health checks should be
1528 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1529 CheckRpcSendFailure(stub1);
1530 // Second channel should be READY.
1531 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1532 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1533 // Enable health checks on the backend and wait for channel 1 to succeed.
1534 servers_[0]->SetServingStatus("health_check_service_name", true);
1535 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1536 // Check that we created only one subchannel to the backend.
1537 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1539 EnableDefaultHealthCheckService(false);
1542 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
1543 EnableDefaultHealthCheckService(true);
1545 const int kNumServers = 1;
1546 StartServers(kNumServers);
1547 // Create a channel with health-checking enabled.
1548 ChannelArguments args;
1549 args.SetServiceConfigJSON(
1550 "{\"healthCheckConfig\": "
1551 "{\"serviceName\": \"health_check_service_name\"}}");
1552 auto response_generator1 = BuildResolverResponseGenerator();
1553 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1554 auto stub1 = BuildStub(channel1);
1555 std::vector<int> ports = GetServersPorts();
1556 response_generator1.SetNextResolution(ports);
1557 // Create a channel with health-checking enabled with a different
1559 ChannelArguments args2;
1560 args2.SetServiceConfigJSON(
1561 "{\"healthCheckConfig\": "
1562 "{\"serviceName\": \"health_check_service_name2\"}}");
1563 auto response_generator2 = BuildResolverResponseGenerator();
1564 auto channel2 = BuildChannel("round_robin", response_generator2, args2);
1565 auto stub2 = BuildStub(channel2);
1566 response_generator2.SetNextResolution(ports);
1567 // Allow health checks from channel 2 to succeed.
1568 servers_[0]->SetServingStatus("health_check_service_name2", true);
1569 // First channel should not become READY, because health checks should be
1571 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1572 CheckRpcSendFailure(stub1);
1573 // Second channel should be READY.
1574 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1575 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1576 // Enable health checks for channel 1 and wait for it to succeed.
1577 servers_[0]->SetServingStatus("health_check_service_name", true);
1578 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1579 // Check that we created only one subchannel to the backend.
1580 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1582 EnableDefaultHealthCheckService(false);
1585 TEST_F(ClientLbEnd2endTest,
1586 RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) {
1587 EnableDefaultHealthCheckService(true);
1589 const int kNumServers = 1;
1590 StartServers(kNumServers);
1591 // Create a channel with health-checking enabled.
1592 const char* kServiceConfigJson =
1593 "{\"healthCheckConfig\": "
1594 "{\"serviceName\": \"health_check_service_name\"}}";
1595 auto response_generator = BuildResolverResponseGenerator();
1596 auto channel = BuildChannel("round_robin", response_generator);
1597 auto stub = BuildStub(channel);
1598 std::vector<int> ports = GetServersPorts();
1599 response_generator.SetNextResolution(ports, kServiceConfigJson);
1600 servers_[0]->SetServingStatus("health_check_service_name", true);
1601 EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
1602 // Send an update on the channel to change it to use a health checking
1603 // service name that is not being reported as healthy.
1604 const char* kServiceConfigJson2 =
1605 "{\"healthCheckConfig\": "
1606 "{\"serviceName\": \"health_check_service_name2\"}}";
1607 response_generator.SetNextResolution(ports, kServiceConfigJson2);
1608 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1610 EnableDefaultHealthCheckService(false);
1613 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
1615 const int kNumServers = 1;
1616 StartServers(kNumServers);
1617 // Set max idle time and build the channel.
1618 ChannelArguments args;
1619 args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
1620 auto response_generator = BuildResolverResponseGenerator();
1621 auto channel = BuildChannel("", response_generator, args);
1622 auto stub = BuildStub(channel);
1623 // The initial channel state should be IDLE.
1624 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1625 // After sending RPC, channel state should be READY.
1626 response_generator.SetNextResolution(GetServersPorts());
1627 CheckRpcSendOk(stub, DEBUG_LOCATION);
1628 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1629 // After a period time not using the channel, the channel state should switch
1631 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
1632 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1633 // Sending a new RPC should awake the IDLE channel.
1634 response_generator.SetNextResolution(GetServersPorts());
1635 CheckRpcSendOk(stub, DEBUG_LOCATION);
1636 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1639 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
1641 void SetUp() override {
1642 ClientLbEnd2endTest::SetUp();
1643 current_test_instance_ = this;
1646 static void SetUpTestCase() {
1648 grpc_core::RegisterTestPickArgsLoadBalancingPolicy(SavePickArgs);
1651 static void TearDownTestCase() { grpc_shutdown_blocking(); }
1653 const std::vector<grpc_core::PickArgsSeen>& args_seen_list() {
1654 grpc::internal::MutexLock lock(&mu_);
1655 return args_seen_list_;
1659 static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
1660 ClientLbPickArgsTest* self = current_test_instance_;
1661 grpc::internal::MutexLock lock(&self->mu_);
1662 self->args_seen_list_.emplace_back(args_seen);
1665 static ClientLbPickArgsTest* current_test_instance_;
1666 grpc::internal::Mutex mu_;
1667 std::vector<grpc_core::PickArgsSeen> args_seen_list_;
1670 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
1672 TEST_F(ClientLbPickArgsTest, Basic) {
1673 const int kNumServers = 1;
1674 StartServers(kNumServers);
1675 auto response_generator = BuildResolverResponseGenerator();
1676 auto channel = BuildChannel("test_pick_args_lb", response_generator);
1677 auto stub = BuildStub(channel);
1678 response_generator.SetNextResolution(GetServersPorts());
1679 CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/true);
1680 // Check LB policy name for the channel.
1681 EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
1682 // There will be two entries, one for the pick tried in state
1683 // CONNECTING and another for the pick tried in state READY.
1684 EXPECT_THAT(args_seen_list(),
1685 ::testing::ElementsAre(
1687 ::testing::Field(&grpc_core::PickArgsSeen::path,
1688 "/grpc.testing.EchoTestService/Echo"),
1689 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
1690 ::testing::UnorderedElementsAre(
1691 ::testing::Pair("foo", "1"),
1692 ::testing::Pair("bar", "2"),
1693 ::testing::Pair("baz", "3")))),
1695 ::testing::Field(&grpc_core::PickArgsSeen::path,
1696 "/grpc.testing.EchoTestService/Echo"),
1697 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
1698 ::testing::UnorderedElementsAre(
1699 ::testing::Pair("foo", "1"),
1700 ::testing::Pair("bar", "2"),
1701 ::testing::Pair("baz", "3"))))));
1704 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
1706 void SetUp() override {
1707 ClientLbEnd2endTest::SetUp();
1708 current_test_instance_ = this;
1711 static void SetUpTestCase() {
1713 grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
1714 ReportTrailerIntercepted);
1717 static void TearDownTestCase() { grpc_shutdown_blocking(); }
1719 int trailers_intercepted() {
1720 grpc::internal::MutexLock lock(&mu_);
1721 return trailers_intercepted_;
1724 const grpc_core::MetadataVector& trailing_metadata() {
1725 grpc::internal::MutexLock lock(&mu_);
1726 return trailing_metadata_;
1729 const udpa::data::orca::v1::OrcaLoadReport* backend_load_report() {
1730 grpc::internal::MutexLock lock(&mu_);
1731 return load_report_.get();
1735 static void ReportTrailerIntercepted(
1736 const grpc_core::TrailingMetadataArgsSeen& args_seen) {
1737 const auto* backend_metric_data = args_seen.backend_metric_data;
1738 ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
1739 grpc::internal::MutexLock lock(&self->mu_);
1740 self->trailers_intercepted_++;
1741 self->trailing_metadata_ = args_seen.metadata;
1742 if (backend_metric_data != nullptr) {
1743 self->load_report_.reset(new udpa::data::orca::v1::OrcaLoadReport);
1744 self->load_report_->set_cpu_utilization(
1745 backend_metric_data->cpu_utilization);
1746 self->load_report_->set_mem_utilization(
1747 backend_metric_data->mem_utilization);
1748 self->load_report_->set_rps(backend_metric_data->requests_per_second);
1749 for (const auto& p : backend_metric_data->request_cost) {
1750 std::string name = std::string(p.first);
1751 (*self->load_report_->mutable_request_cost())[std::move(name)] =
1754 for (const auto& p : backend_metric_data->utilization) {
1755 std::string name = std::string(p.first);
1756 (*self->load_report_->mutable_utilization())[std::move(name)] =
1762 static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
1763 grpc::internal::Mutex mu_;
1764 int trailers_intercepted_ = 0;
1765 grpc_core::MetadataVector trailing_metadata_;
1766 std::unique_ptr<udpa::data::orca::v1::OrcaLoadReport> load_report_;
1769 ClientLbInterceptTrailingMetadataTest*
1770 ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr;
1772 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
1773 const int kNumServers = 1;
1774 const int kNumRpcs = 10;
1775 StartServers(kNumServers);
1776 auto response_generator = BuildResolverResponseGenerator();
1778 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1779 auto stub = BuildStub(channel);
1780 response_generator.SetNextResolution(GetServersPorts());
1781 for (size_t i = 0; i < kNumRpcs; ++i) {
1782 CheckRpcSendOk(stub, DEBUG_LOCATION);
1784 // Check LB policy name for the channel.
1785 EXPECT_EQ("intercept_trailing_metadata_lb",
1786 channel->GetLoadBalancingPolicyName());
1787 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1788 EXPECT_THAT(trailing_metadata(),
1789 ::testing::UnorderedElementsAre(
1790 // TODO(roth): Should grpc-status be visible here?
1791 ::testing::Pair("grpc-status", "0"),
1792 ::testing::Pair("user-agent", ::testing::_),
1793 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
1794 ::testing::Pair("baz", "3")));
1795 EXPECT_EQ(nullptr, backend_load_report());
1798 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
1799 const int kNumServers = 1;
1800 const int kNumRpcs = 10;
1801 StartServers(kNumServers);
1802 ChannelArguments args;
1803 args.SetServiceConfigJSON(
1805 " \"methodConfig\": [ {\n"
1807 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
1809 " \"retryPolicy\": {\n"
1810 " \"maxAttempts\": 3,\n"
1811 " \"initialBackoff\": \"1s\",\n"
1812 " \"maxBackoff\": \"120s\",\n"
1813 " \"backoffMultiplier\": 1.6,\n"
1814 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
1818 auto response_generator = BuildResolverResponseGenerator();
1820 BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
1821 auto stub = BuildStub(channel);
1822 response_generator.SetNextResolution(GetServersPorts());
1823 for (size_t i = 0; i < kNumRpcs; ++i) {
1824 CheckRpcSendOk(stub, DEBUG_LOCATION);
1826 // Check LB policy name for the channel.
1827 EXPECT_EQ("intercept_trailing_metadata_lb",
1828 channel->GetLoadBalancingPolicyName());
1829 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1830 EXPECT_THAT(trailing_metadata(),
1831 ::testing::UnorderedElementsAre(
1832 // TODO(roth): Should grpc-status be visible here?
1833 ::testing::Pair("grpc-status", "0"),
1834 ::testing::Pair("user-agent", ::testing::_),
1835 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
1836 ::testing::Pair("baz", "3")));
1837 EXPECT_EQ(nullptr, backend_load_report());
1840 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
1841 const int kNumServers = 1;
1842 const int kNumRpcs = 10;
1843 StartServers(kNumServers);
1844 udpa::data::orca::v1::OrcaLoadReport load_report;
1845 load_report.set_cpu_utilization(0.5);
1846 load_report.set_mem_utilization(0.75);
1847 load_report.set_rps(25);
1848 auto* request_cost = load_report.mutable_request_cost();
1849 (*request_cost)["foo"] = 0.8;
1850 (*request_cost)["bar"] = 1.4;
1851 auto* utilization = load_report.mutable_utilization();
1852 (*utilization)["baz"] = 1.1;
1853 (*utilization)["quux"] = 0.9;
1854 for (const auto& server : servers_) {
1855 server->service_.set_load_report(&load_report);
1857 auto response_generator = BuildResolverResponseGenerator();
1859 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1860 auto stub = BuildStub(channel);
1861 response_generator.SetNextResolution(GetServersPorts());
1862 for (size_t i = 0; i < kNumRpcs; ++i) {
1863 CheckRpcSendOk(stub, DEBUG_LOCATION);
1864 auto* actual = backend_load_report();
1865 ASSERT_NE(actual, nullptr);
1866 // TODO(roth): Change this to use EqualsProto() once that becomes
1867 // available in OSS.
1868 EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
1869 EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
1870 EXPECT_EQ(actual->rps(), load_report.rps());
1871 EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
1872 for (const auto& p : actual->request_cost()) {
1873 auto it = load_report.request_cost().find(p.first);
1874 ASSERT_NE(it, load_report.request_cost().end());
1875 EXPECT_EQ(it->second, p.second);
1877 EXPECT_EQ(actual->utilization().size(), load_report.utilization().size());
1878 for (const auto& p : actual->utilization()) {
1879 auto it = load_report.utilization().find(p.first);
1880 ASSERT_NE(it, load_report.utilization().end());
1881 EXPECT_EQ(it->second, p.second);
1884 // Check LB policy name for the channel.
1885 EXPECT_EQ("intercept_trailing_metadata_lb",
1886 channel->GetLoadBalancingPolicyName());
1887 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1891 } // namespace testing
1894 int main(int argc, char** argv) {
1895 ::testing::InitGoogleTest(&argc, argv);
1896 grpc::testing::TestEnvironment env(argc, argv);
1897 const auto result = RUN_ALL_TESTS();