3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34 #include <grpcpp/create_channel.h>
35 #include <grpcpp/health_check_service_interface.h>
36 #include <grpcpp/impl/codegen/sync.h>
37 #include <grpcpp/server.h>
38 #include <grpcpp/server_builder.h>
40 #include "src/core/ext/filters/client_channel/backup_poller.h"
41 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
42 #include "src/core/ext/filters/client_channel/parse_address.h"
43 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
44 #include "src/core/ext/filters/client_channel/server_address.h"
45 #include "src/core/ext/filters/client_channel/service_config.h"
46 #include "src/core/lib/backoff/backoff.h"
47 #include "src/core/lib/channel/channel_args.h"
48 #include "src/core/lib/gpr/env.h"
49 #include "src/core/lib/gprpp/debug_location.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/iomgr/tcp_client.h"
52 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
53 #include "src/cpp/client/secure_credentials.h"
54 #include "src/cpp/server/secure_server_credentials.h"
56 #include "src/proto/grpc/testing/echo.grpc.pb.h"
57 #include "src/proto/grpc/testing/xds/orca_load_report_for_test.pb.h"
58 #include "test/core/util/port.h"
59 #include "test/core/util/test_config.h"
60 #include "test/core/util/test_lb_policies.h"
61 #include "test/cpp/end2end/test_service_impl.h"
63 #include <gmock/gmock.h>
64 #include <gtest/gtest.h>
66 using grpc::testing::EchoRequest;
67 using grpc::testing::EchoResponse;
68 using std::chrono::system_clock;
70 // defined in tcp_client.cc
71 extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
73 static grpc_tcp_client_vtable* default_client_impl;
79 gpr_atm g_connection_delay_ms;
81 void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep,
82 grpc_pollset_set* interested_parties,
83 const grpc_channel_args* channel_args,
84 const grpc_resolved_address* addr,
85 grpc_millis deadline) {
86 const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms);
88 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
90 default_client_impl->connect(closure, ep, interested_parties, channel_args,
91 addr, deadline + delay_ms);
94 grpc_tcp_client_vtable delayed_connect = {tcp_client_connect_with_delay};
96 // Subclass of TestServiceImpl that increments a request counter for
97 // every call to the Echo RPC.
98 class MyTestServiceImpl : public TestServiceImpl {
100 Status Echo(ServerContext* context, const EchoRequest* request,
101 EchoResponse* response) override {
102 const udpa::data::orca::v1::OrcaLoadReport* load_report = nullptr;
104 grpc::internal::MutexLock lock(&mu_);
106 load_report = load_report_;
108 AddClient(context->peer());
109 if (load_report != nullptr) {
110 // TODO(roth): Once we provide a more standard server-side API for
111 // populating this data, use that API here.
112 context->AddTrailingMetadata("x-endpoint-load-metrics-bin",
113 load_report->SerializeAsString());
115 return TestServiceImpl::Echo(context, request, response);
118 int request_count() {
119 grpc::internal::MutexLock lock(&mu_);
120 return request_count_;
123 void ResetCounters() {
124 grpc::internal::MutexLock lock(&mu_);
128 std::set<grpc::string> clients() {
129 grpc::internal::MutexLock lock(&clients_mu_);
133 void set_load_report(udpa::data::orca::v1::OrcaLoadReport* load_report) {
134 grpc::internal::MutexLock lock(&mu_);
135 load_report_ = load_report;
139 void AddClient(const grpc::string& client) {
140 grpc::internal::MutexLock lock(&clients_mu_);
141 clients_.insert(client);
144 grpc::internal::Mutex mu_;
145 int request_count_ = 0;
146 const udpa::data::orca::v1::OrcaLoadReport* load_report_ = nullptr;
147 grpc::internal::Mutex clients_mu_;
148 std::set<grpc::string> clients_;
151 class FakeResolverResponseGeneratorWrapper {
153 FakeResolverResponseGeneratorWrapper()
154 : response_generator_(grpc_core::MakeRefCounted<
155 grpc_core::FakeResolverResponseGenerator>()) {}
157 FakeResolverResponseGeneratorWrapper(
158 FakeResolverResponseGeneratorWrapper&& other) {
159 response_generator_ = std::move(other.response_generator_);
162 void SetNextResolution(const std::vector<int>& ports,
163 const char* service_config_json = nullptr) {
164 grpc_core::ExecCtx exec_ctx;
165 response_generator_->SetResponse(
166 BuildFakeResults(ports, service_config_json));
169 void SetNextResolutionUponError(const std::vector<int>& ports) {
170 grpc_core::ExecCtx exec_ctx;
171 response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
174 void SetFailureOnReresolution() {
175 grpc_core::ExecCtx exec_ctx;
176 response_generator_->SetFailureOnReresolution();
179 grpc_core::FakeResolverResponseGenerator* Get() const {
180 return response_generator_.get();
184 static grpc_core::Resolver::Result BuildFakeResults(
185 const std::vector<int>& ports,
186 const char* service_config_json = nullptr) {
187 grpc_core::Resolver::Result result;
188 for (const int& port : ports) {
190 gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
191 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
192 GPR_ASSERT(lb_uri != nullptr);
193 grpc_resolved_address address;
194 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
195 result.addresses.emplace_back(address.addr, address.len,
197 grpc_uri_destroy(lb_uri);
198 gpr_free(lb_uri_str);
200 if (service_config_json != nullptr) {
201 result.service_config = grpc_core::ServiceConfig::Create(
202 service_config_json, &result.service_config_error);
203 GPR_ASSERT(result.service_config != nullptr);
208 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
212 class ClientLbEnd2endTest : public ::testing::Test {
214 ClientLbEnd2endTest()
215 : server_host_("localhost"),
216 kRequestMessage_("Live long and prosper."),
217 creds_(new SecureChannelCredentials(
218 grpc_fake_transport_security_credentials_create())) {}
220 static void SetUpTestCase() {
221 // Make the backup poller poll very frequently in order to pick up
222 // updates from all the subchannels's FDs.
223 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
225 // Workaround Apple CFStream bug
226 gpr_setenv("grpc_cfstream", "0");
230 void SetUp() override { grpc_init(); }
232 void TearDown() override {
233 for (size_t i = 0; i < servers_.size(); ++i) {
234 servers_[i]->Shutdown();
236 // Explicitly destroy all the members so that we can make sure grpc_shutdown
237 // has finished by the end of this function, and thus all the registered
238 // LB policy factories are removed.
241 grpc_shutdown_blocking();
244 void CreateServers(size_t num_servers,
245 std::vector<int> ports = std::vector<int>()) {
247 for (size_t i = 0; i < num_servers; ++i) {
249 if (ports.size() == num_servers) port = ports[i];
250 servers_.emplace_back(new ServerData(port));
254 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
256 void StartServers(size_t num_servers,
257 std::vector<int> ports = std::vector<int>()) {
258 CreateServers(num_servers, std::move(ports));
259 for (size_t i = 0; i < num_servers; ++i) {
264 std::vector<int> GetServersPorts(size_t start_index = 0) {
265 std::vector<int> ports;
266 for (size_t i = start_index; i < servers_.size(); ++i) {
267 ports.push_back(servers_[i]->port_);
272 FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
273 return FakeResolverResponseGeneratorWrapper();
276 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
277 const std::shared_ptr<Channel>& channel) {
278 return grpc::testing::EchoTestService::NewStub(channel);
281 std::shared_ptr<Channel> BuildChannel(
282 const grpc::string& lb_policy_name,
283 const FakeResolverResponseGeneratorWrapper& response_generator,
284 ChannelArguments args = ChannelArguments()) {
285 if (lb_policy_name.size() > 0) {
286 args.SetLoadBalancingPolicyName(lb_policy_name);
287 } // else, default to pick first
288 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
289 response_generator.Get());
290 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
294 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
295 EchoResponse* response = nullptr, int timeout_ms = 1000,
296 Status* result = nullptr, bool wait_for_ready = false) {
297 const bool local_response = (response == nullptr);
298 if (local_response) response = new EchoResponse;
300 request.set_message(kRequestMessage_);
301 ClientContext context;
302 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
303 if (wait_for_ready) context.set_wait_for_ready(true);
304 Status status = stub->Echo(&context, request, response);
305 if (result != nullptr) *result = status;
306 if (local_response) delete response;
311 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
312 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
313 EchoResponse response;
316 SendRpc(stub, &response, 2000, &status, wait_for_ready);
317 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
319 << "Error: " << status.error_message() << " "
320 << status.error_details();
321 ASSERT_EQ(response.message(), kRequestMessage_)
322 << "From " << location.file() << ":" << location.line();
323 if (!success) abort();
326 void CheckRpcSendFailure(
327 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
328 const bool success = SendRpc(stub);
329 EXPECT_FALSE(success);
334 std::unique_ptr<Server> server_;
335 MyTestServiceImpl service_;
336 std::unique_ptr<std::thread> thread_;
337 bool server_ready_ = false;
338 bool started_ = false;
340 explicit ServerData(int port = 0) {
341 port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
344 void Start(const grpc::string& server_host) {
345 gpr_log(GPR_INFO, "starting server on port %d", port_);
347 grpc::internal::Mutex mu;
348 grpc::internal::MutexLock lock(&mu);
349 grpc::internal::CondVar cond;
350 thread_.reset(new std::thread(
351 std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
352 cond.WaitUntil(&mu, [this] { return server_ready_; });
353 server_ready_ = false;
354 gpr_log(GPR_INFO, "server startup complete");
357 void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
358 grpc::internal::CondVar* cond) {
359 std::ostringstream server_address;
360 server_address << server_host << ":" << port_;
361 ServerBuilder builder;
362 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
363 grpc_fake_transport_security_server_credentials_create()));
364 builder.AddListeningPort(server_address.str(), std::move(creds));
365 builder.RegisterService(&service_);
366 server_ = builder.BuildAndStart();
367 grpc::internal::MutexLock lock(mu);
368 server_ready_ = true;
373 if (!started_) return;
374 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
379 void SetServingStatus(const grpc::string& service, bool serving) {
380 server_->GetHealthCheckService()->SetServingStatus(service, serving);
384 void ResetCounters() {
385 for (const auto& server : servers_) server->service_.ResetCounters();
389 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
390 size_t server_idx, const grpc_core::DebugLocation& location,
391 bool ignore_failure = false) {
393 if (ignore_failure) {
396 CheckRpcSendOk(stub, location, true);
398 } while (servers_[server_idx]->service_.request_count() == 0);
402 bool WaitForChannelState(
403 Channel* channel, std::function<bool(grpc_connectivity_state)> predicate,
404 bool try_to_connect = false, int timeout_seconds = 5) {
405 const gpr_timespec deadline =
406 grpc_timeout_seconds_to_deadline(timeout_seconds);
408 grpc_connectivity_state state = channel->GetState(try_to_connect);
409 if (predicate(state)) break;
410 if (!channel->WaitForStateChange(state, deadline)) return false;
415 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
416 auto predicate = [](grpc_connectivity_state state) {
417 return state != GRPC_CHANNEL_READY;
419 return WaitForChannelState(channel, predicate, false, timeout_seconds);
422 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
423 auto predicate = [](grpc_connectivity_state state) {
424 return state == GRPC_CHANNEL_READY;
426 return WaitForChannelState(channel, predicate, true, timeout_seconds);
429 bool SeenAllServers() {
430 for (const auto& server : servers_) {
431 if (server->service_.request_count() == 0) return false;
436 // Updates \a connection_order by appending to it the index of the newly
437 // connected server. Must be called after every single RPC.
438 void UpdateConnectionOrder(
439 const std::vector<std::unique_ptr<ServerData>>& servers,
440 std::vector<int>* connection_order) {
441 for (size_t i = 0; i < servers.size(); ++i) {
442 if (servers[i]->service_.request_count() == 1) {
443 // Was the server index known? If not, update connection_order.
445 std::find(connection_order->begin(), connection_order->end(), i);
446 if (it == connection_order->end()) {
447 connection_order->push_back(i);
454 const grpc::string server_host_;
455 std::vector<std::unique_ptr<ServerData>> servers_;
456 const grpc::string kRequestMessage_;
457 std::shared_ptr<ChannelCredentials> creds_;
460 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
461 const int kNumServers = 3;
462 StartServers(kNumServers);
463 auto response_generator = BuildResolverResponseGenerator();
464 auto channel = BuildChannel("", response_generator);
465 auto stub = BuildStub(channel);
466 // Initial state should be IDLE.
467 EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
468 // Tell the channel to try to connect.
469 // Note that this call also returns IDLE, since the state change has
470 // not yet occurred; it just gets triggered by this call.
471 EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
472 // Now that the channel is trying to connect, we should be in state
474 EXPECT_EQ(channel->GetState(false /* try_to_connect */),
475 GRPC_CHANNEL_CONNECTING);
476 // Return a resolver result, which allows the connection attempt to proceed.
477 response_generator.SetNextResolution(GetServersPorts());
478 // We should eventually transition into state READY.
479 EXPECT_TRUE(WaitForChannelReady(channel.get()));
482 TEST_F(ClientLbEnd2endTest, PickFirst) {
483 // Start servers and send one RPC per server.
484 const int kNumServers = 3;
485 StartServers(kNumServers);
486 auto response_generator = BuildResolverResponseGenerator();
487 auto channel = BuildChannel(
488 "", response_generator); // test that pick first is the default.
489 auto stub = BuildStub(channel);
490 response_generator.SetNextResolution(GetServersPorts());
491 for (size_t i = 0; i < servers_.size(); ++i) {
492 CheckRpcSendOk(stub, DEBUG_LOCATION);
494 // All requests should have gone to a single server.
496 for (size_t i = 0; i < servers_.size(); ++i) {
497 const int request_count = servers_[i]->service_.request_count();
498 if (request_count == kNumServers) {
501 EXPECT_EQ(0, request_count);
505 // Check LB policy name for the channel.
506 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
509 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
510 StartServers(1); // Single server
511 auto response_generator = BuildResolverResponseGenerator();
512 auto channel = BuildChannel(
513 "", response_generator); // test that pick first is the default.
514 auto stub = BuildStub(channel);
515 response_generator.SetNextResolution({servers_[0]->port_});
516 WaitForServer(stub, 0, DEBUG_LOCATION);
517 // Create a new channel and its corresponding PF LB policy, which will pick
518 // the subchannels in READY state from the previous RPC against the same
519 // target (even if it happened over a different channel, because subchannels
520 // are globally reused). Progress should happen without any transition from
522 auto second_response_generator = BuildResolverResponseGenerator();
523 auto second_channel = BuildChannel("", second_response_generator);
524 auto second_stub = BuildStub(second_channel);
525 second_response_generator.SetNextResolution({servers_[0]->port_});
526 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
529 TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
530 ChannelArguments args;
531 constexpr int kInitialBackOffMs = 5000;
532 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
533 // Create 2 servers, but start only the second one.
534 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
535 grpc_pick_unused_port_or_die()};
536 CreateServers(2, ports);
538 auto response_generator1 = BuildResolverResponseGenerator();
539 auto channel1 = BuildChannel("pick_first", response_generator1, args);
540 auto stub1 = BuildStub(channel1);
541 response_generator1.SetNextResolution(ports);
542 // Wait for second server to be ready.
543 WaitForServer(stub1, 1, DEBUG_LOCATION);
544 // Create a second channel with the same addresses. Its PF instance
545 // should immediately pick the second subchannel, since it's already
547 auto response_generator2 = BuildResolverResponseGenerator();
548 auto channel2 = BuildChannel("pick_first", response_generator2, args);
549 response_generator2.SetNextResolution(ports);
550 // Check that the channel reports READY without waiting for the
552 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
555 TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
556 ChannelArguments args;
557 constexpr int kInitialBackOffMs = 100;
558 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
559 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
560 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
561 auto response_generator = BuildResolverResponseGenerator();
562 auto channel = BuildChannel("pick_first", response_generator, args);
563 auto stub = BuildStub(channel);
564 response_generator.SetNextResolution(ports);
565 // The channel won't become connected (there's no server).
566 ASSERT_FALSE(channel->WaitForConnected(
567 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
568 // Bring up a server on the chosen port.
569 StartServers(1, ports);
571 ASSERT_TRUE(channel->WaitForConnected(
572 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
573 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
574 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
575 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
576 // We should have waited at least kInitialBackOffMs. We substract one to
577 // account for test and precision accuracy drift.
578 EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
579 // But not much more.
582 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
586 TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
587 ChannelArguments args;
588 constexpr int kMinReconnectBackOffMs = 1000;
589 args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
590 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
591 auto response_generator = BuildResolverResponseGenerator();
592 auto channel = BuildChannel("pick_first", response_generator, args);
593 auto stub = BuildStub(channel);
594 response_generator.SetNextResolution(ports);
595 // Make connection delay a 10% longer than it's willing to in order to make
596 // sure we are hitting the codepath that waits for the min reconnect backoff.
597 gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
598 default_client_impl = grpc_tcp_client_impl;
599 grpc_set_tcp_client_impl(&delayed_connect);
600 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
601 channel->WaitForConnected(
602 grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
603 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
604 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
605 gpr_log(GPR_DEBUG, "Waited %" PRId64 " ms", waited_ms);
606 // We should have waited at least kMinReconnectBackOffMs. We substract one to
607 // account for test and precision accuracy drift.
608 EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
609 gpr_atm_rel_store(&g_connection_delay_ms, 0);
612 TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
613 ChannelArguments args;
614 constexpr int kInitialBackOffMs = 1000;
615 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
616 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
617 auto response_generator = BuildResolverResponseGenerator();
618 auto channel = BuildChannel("pick_first", response_generator, args);
619 auto stub = BuildStub(channel);
620 response_generator.SetNextResolution(ports);
621 // The channel won't become connected (there's no server).
623 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
624 // Bring up a server on the chosen port.
625 StartServers(1, ports);
626 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
627 // Wait for connect, but not long enough. This proves that we're
628 // being throttled by initial backoff.
630 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
631 // Reset connection backoff.
632 experimental::ChannelResetConnectionBackoff(channel.get());
633 // Wait for connect. Should happen ~immediately.
635 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
636 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
637 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
638 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
639 // We should have waited less than kInitialBackOffMs.
640 EXPECT_LT(waited_ms, kInitialBackOffMs);
643 TEST_F(ClientLbEnd2endTest,
644 PickFirstResetConnectionBackoffNextAttemptStartsImmediately) {
645 ChannelArguments args;
646 constexpr int kInitialBackOffMs = 1000;
647 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
648 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
649 auto response_generator = BuildResolverResponseGenerator();
650 auto channel = BuildChannel("pick_first", response_generator, args);
651 auto stub = BuildStub(channel);
652 response_generator.SetNextResolution(ports);
653 // Wait for connect, which should fail ~immediately, because the server
655 gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
657 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
658 // Reset connection backoff.
659 // Note that the time at which the third attempt will be started is
660 // actually computed at this point, so we record the start time here.
661 gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
662 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
663 experimental::ChannelResetConnectionBackoff(channel.get());
664 // Trigger a second connection attempt. This should also fail
665 // ~immediately, but the retry should be scheduled for
666 // kInitialBackOffMs instead of applying the multiplier.
667 gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
669 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
670 // Bring up a server on the chosen port.
671 gpr_log(GPR_INFO, "=== STARTING BACKEND");
672 StartServers(1, ports);
673 // Wait for connect. Should happen within kInitialBackOffMs.
674 // Give an extra 100ms to account for the time spent in the second and
675 // third connection attempts themselves (since what we really want to
676 // measure is the time between the two). As long as this is less than
677 // the 1.6x increase we would see if the backoff state was not reset
678 // properly, the test is still proving that the backoff was reset.
679 constexpr int kWaitMs = kInitialBackOffMs + 100;
680 gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
681 EXPECT_TRUE(channel->WaitForConnected(
682 grpc_timeout_milliseconds_to_deadline(kWaitMs)));
683 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
684 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
685 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
686 EXPECT_LT(waited_ms, kWaitMs);
689 TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
690 // Start servers and send one RPC per server.
691 const int kNumServers = 3;
692 StartServers(kNumServers);
693 auto response_generator = BuildResolverResponseGenerator();
694 auto channel = BuildChannel("pick_first", response_generator);
695 auto stub = BuildStub(channel);
697 std::vector<int> ports;
699 // Perform one RPC against the first server.
700 ports.emplace_back(servers_[0]->port_);
701 response_generator.SetNextResolution(ports);
702 gpr_log(GPR_INFO, "****** SET [0] *******");
703 CheckRpcSendOk(stub, DEBUG_LOCATION);
704 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
706 // An empty update will result in the channel going into TRANSIENT_FAILURE.
708 response_generator.SetNextResolution(ports);
709 gpr_log(GPR_INFO, "****** SET none *******");
710 grpc_connectivity_state channel_state;
712 channel_state = channel->GetState(true /* try to connect */);
713 } while (channel_state == GRPC_CHANNEL_READY);
714 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
715 servers_[0]->service_.ResetCounters();
717 // Next update introduces servers_[1], making the channel recover.
719 ports.emplace_back(servers_[1]->port_);
720 response_generator.SetNextResolution(ports);
721 gpr_log(GPR_INFO, "****** SET [1] *******");
722 WaitForServer(stub, 1, DEBUG_LOCATION);
723 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
725 // And again for servers_[2]
727 ports.emplace_back(servers_[2]->port_);
728 response_generator.SetNextResolution(ports);
729 gpr_log(GPR_INFO, "****** SET [2] *******");
730 WaitForServer(stub, 2, DEBUG_LOCATION);
731 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
732 EXPECT_EQ(servers_[1]->service_.request_count(), 0);
734 // Check LB policy name for the channel.
735 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
738 TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
739 // Start servers and send one RPC per server.
740 const int kNumServers = 3;
741 StartServers(kNumServers);
742 auto response_generator = BuildResolverResponseGenerator();
743 auto channel = BuildChannel("pick_first", response_generator);
744 auto stub = BuildStub(channel);
746 std::vector<int> ports;
748 // Perform one RPC against the first server.
749 ports.emplace_back(servers_[0]->port_);
750 response_generator.SetNextResolution(ports);
751 gpr_log(GPR_INFO, "****** SET [0] *******");
752 CheckRpcSendOk(stub, DEBUG_LOCATION);
753 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
754 servers_[0]->service_.ResetCounters();
756 // Send and superset update
758 ports.emplace_back(servers_[1]->port_);
759 ports.emplace_back(servers_[0]->port_);
760 response_generator.SetNextResolution(ports);
761 gpr_log(GPR_INFO, "****** SET superset *******");
762 CheckRpcSendOk(stub, DEBUG_LOCATION);
763 // We stick to the previously connected server.
764 WaitForServer(stub, 0, DEBUG_LOCATION);
765 EXPECT_EQ(0, servers_[1]->service_.request_count());
767 // Check LB policy name for the channel.
768 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
771 TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) {
773 const int kNumServers = 1;
774 StartServers(kNumServers);
775 std::vector<int> ports = GetServersPorts();
776 // Create two channels that (by default) use the global subchannel pool.
777 auto response_generator1 = BuildResolverResponseGenerator();
778 auto channel1 = BuildChannel("pick_first", response_generator1);
779 auto stub1 = BuildStub(channel1);
780 response_generator1.SetNextResolution(ports);
781 auto response_generator2 = BuildResolverResponseGenerator();
782 auto channel2 = BuildChannel("pick_first", response_generator2);
783 auto stub2 = BuildStub(channel2);
784 response_generator2.SetNextResolution(ports);
785 WaitForServer(stub1, 0, DEBUG_LOCATION);
786 // Send one RPC on each channel.
787 CheckRpcSendOk(stub1, DEBUG_LOCATION);
788 CheckRpcSendOk(stub2, DEBUG_LOCATION);
789 // The server receives two requests.
790 EXPECT_EQ(2, servers_[0]->service_.request_count());
791 // The two requests are from the same client port, because the two channels
792 // share subchannels via the global subchannel pool.
793 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
796 TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) {
798 const int kNumServers = 1;
799 StartServers(kNumServers);
800 std::vector<int> ports = GetServersPorts();
801 // Create two channels that use local subchannel pool.
802 ChannelArguments args;
803 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
804 auto response_generator1 = BuildResolverResponseGenerator();
805 auto channel1 = BuildChannel("pick_first", response_generator1, args);
806 auto stub1 = BuildStub(channel1);
807 response_generator1.SetNextResolution(ports);
808 auto response_generator2 = BuildResolverResponseGenerator();
809 auto channel2 = BuildChannel("pick_first", response_generator2, args);
810 auto stub2 = BuildStub(channel2);
811 response_generator2.SetNextResolution(ports);
812 WaitForServer(stub1, 0, DEBUG_LOCATION);
813 // Send one RPC on each channel.
814 CheckRpcSendOk(stub1, DEBUG_LOCATION);
815 CheckRpcSendOk(stub2, DEBUG_LOCATION);
816 // The server receives two requests.
817 EXPECT_EQ(2, servers_[0]->service_.request_count());
818 // The two requests are from two client ports, because the two channels didn't
819 // share subchannels with each other.
820 EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
823 TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
824 const int kNumUpdates = 1000;
825 const int kNumServers = 3;
826 StartServers(kNumServers);
827 auto response_generator = BuildResolverResponseGenerator();
828 auto channel = BuildChannel("pick_first", response_generator);
829 auto stub = BuildStub(channel);
830 std::vector<int> ports = GetServersPorts();
831 for (size_t i = 0; i < kNumUpdates; ++i) {
832 std::shuffle(ports.begin(), ports.end(),
833 std::mt19937(std::random_device()()));
834 response_generator.SetNextResolution(ports);
835 // We should re-enter core at the end of the loop to give the resolution
836 // setting closure a chance to run.
837 if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
839 // Check LB policy name for the channel.
840 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
843 TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
844 // Prepare the ports for up servers and down servers.
845 const int kNumServers = 3;
846 const int kNumAliveServers = 1;
847 StartServers(kNumAliveServers);
848 std::vector<int> alive_ports, dead_ports;
849 for (size_t i = 0; i < kNumServers; ++i) {
850 if (i < kNumAliveServers) {
851 alive_ports.emplace_back(servers_[i]->port_);
853 dead_ports.emplace_back(grpc_pick_unused_port_or_die());
856 auto response_generator = BuildResolverResponseGenerator();
857 auto channel = BuildChannel("pick_first", response_generator);
858 auto stub = BuildStub(channel);
859 // The initial resolution only contains dead ports. There won't be any
860 // selected subchannel. Re-resolution will return the same result.
861 response_generator.SetNextResolution(dead_ports);
862 gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
863 for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
864 // Set a re-resolution result that contains reachable ports, so that the
865 // pick_first LB policy can recover soon.
866 response_generator.SetNextResolutionUponError(alive_ports);
867 gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
868 WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
869 CheckRpcSendOk(stub, DEBUG_LOCATION);
870 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
871 // Check LB policy name for the channel.
872 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
875 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
876 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
877 StartServers(1, ports);
878 auto response_generator = BuildResolverResponseGenerator();
879 auto channel = BuildChannel("pick_first", response_generator);
880 auto stub = BuildStub(channel);
881 response_generator.SetNextResolution(ports);
882 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
883 WaitForServer(stub, 0, DEBUG_LOCATION);
884 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
885 servers_[0]->Shutdown();
886 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
887 gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
888 StartServers(1, ports);
889 WaitForServer(stub, 0, DEBUG_LOCATION);
892 TEST_F(ClientLbEnd2endTest,
893 PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
894 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
895 grpc_pick_unused_port_or_die()};
896 CreateServers(2, ports);
898 auto response_generator = BuildResolverResponseGenerator();
899 auto channel = BuildChannel("pick_first", response_generator);
900 auto stub = BuildStub(channel);
901 response_generator.SetNextResolution(ports);
902 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
903 WaitForServer(stub, 1, DEBUG_LOCATION);
904 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
905 servers_[1]->Shutdown();
906 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
907 gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
908 StartServers(2, ports);
909 WaitForServer(stub, 0, DEBUG_LOCATION);
912 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
913 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
914 StartServers(1, ports);
915 auto response_generator = BuildResolverResponseGenerator();
916 auto channel_1 = BuildChannel("pick_first", response_generator);
917 auto stub_1 = BuildStub(channel_1);
918 response_generator.SetNextResolution(ports);
919 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
920 WaitForServer(stub_1, 0, DEBUG_LOCATION);
921 gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
922 servers_[0]->Shutdown();
923 // Channel 1 will receive a re-resolution containing the same server. It will
924 // create a new subchannel and hold a ref to it.
925 StartServers(1, ports);
926 gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
927 auto response_generator_2 = BuildResolverResponseGenerator();
928 auto channel_2 = BuildChannel("pick_first", response_generator_2);
929 auto stub_2 = BuildStub(channel_2);
930 response_generator_2.SetNextResolution(ports);
931 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
932 WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
933 gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
934 servers_[0]->Shutdown();
935 // Wait until the disconnection has triggered the connectivity notification.
936 // Otherwise, the subchannel may be picked for next call but will fail soon.
937 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
938 // Channel 2 will also receive a re-resolution containing the same server.
939 // Both channels will ref the same subchannel that failed.
940 StartServers(1, ports);
941 gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
942 gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
943 // The first call after the server restart will succeed.
944 CheckRpcSendOk(stub_2, DEBUG_LOCATION);
945 gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
946 // Check LB policy name for the channel.
947 EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
948 // Check LB policy name for the channel.
949 EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
952 TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
953 // Start server, send RPC, and make sure channel is READY.
954 const int kNumServers = 1;
955 StartServers(kNumServers);
956 auto response_generator = BuildResolverResponseGenerator();
958 BuildChannel("", response_generator); // pick_first is the default.
959 auto stub = BuildStub(channel);
960 response_generator.SetNextResolution(GetServersPorts());
961 CheckRpcSendOk(stub, DEBUG_LOCATION);
962 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
963 // Stop server. Channel should go into state IDLE.
964 response_generator.SetFailureOnReresolution();
965 servers_[0]->Shutdown();
966 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
967 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
971 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
972 auto response_generator = BuildResolverResponseGenerator();
974 BuildChannel("", response_generator); // pick_first is the default.
975 auto stub = BuildStub(channel);
976 // Create a number of servers, but only start 1 of them.
979 // Initially resolve to first server and make sure it connects.
980 gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
981 response_generator.SetNextResolution({servers_[0]->port_});
982 CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
983 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
984 // Send a resolution update with the remaining servers, none of which are
985 // running yet, so the update will stay pending. Note that it's important
986 // to have multiple servers here, or else the test will be flaky; with only
987 // one server, the pending subchannel list has already gone into
988 // TRANSIENT_FAILURE due to hitting the end of the list by the time we
991 "Phase 2: Resolver update pointing to remaining "
992 "(not started) servers.");
993 response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
994 // RPCs will continue to be sent to the first server.
995 CheckRpcSendOk(stub, DEBUG_LOCATION);
996 // Now stop the first server, so that the current subchannel list
997 // fails. This should cause us to immediately swap over to the
998 // pending list, even though it's not yet connected. The state should
999 // be set to CONNECTING, since that's what the pending subchannel list
1000 // was doing when we swapped over.
1001 gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
1002 servers_[0]->Shutdown();
1003 WaitForChannelNotReady(channel.get());
1004 // TODO(roth): This should always return CONNECTING, but it's flaky
1005 // between that and TRANSIENT_FAILURE. I suspect that this problem
1006 // will go away once we move the backoff code out of the subchannel
1007 // and into the LB policies.
1008 EXPECT_THAT(channel->GetState(false),
1009 ::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
1010 GRPC_CHANNEL_TRANSIENT_FAILURE));
1011 // Now start the second server.
1012 gpr_log(GPR_INFO, "Phase 4: Starting second server.");
1014 // The channel should go to READY state and RPCs should go to the
1016 WaitForChannelReady(channel.get());
1017 WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
1020 TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) {
1021 // Start server, send RPC, and make sure channel is READY.
1022 const int kNumServers = 1;
1023 StartServers(kNumServers);
1024 auto response_generator = BuildResolverResponseGenerator();
1026 BuildChannel("", response_generator); // pick_first is the default.
1027 auto stub = BuildStub(channel);
1028 response_generator.SetNextResolution(GetServersPorts());
1029 CheckRpcSendOk(stub, DEBUG_LOCATION);
1030 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1031 // Stop server. Channel should go into state IDLE.
1032 servers_[0]->Shutdown();
1033 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1034 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1035 // Now send resolver update that includes no addresses. Channel
1036 // should stay in state IDLE.
1037 response_generator.SetNextResolution({});
1038 EXPECT_FALSE(channel->WaitForStateChange(
1039 GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1040 // Now bring the backend back up and send a non-empty resolver update,
1041 // and then try to send an RPC. Channel should go back into state READY.
1043 response_generator.SetNextResolution(GetServersPorts());
1044 CheckRpcSendOk(stub, DEBUG_LOCATION);
1045 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1048 TEST_F(ClientLbEnd2endTest, RoundRobin) {
1049 // Start servers and send one RPC per server.
1050 const int kNumServers = 3;
1051 StartServers(kNumServers);
1052 auto response_generator = BuildResolverResponseGenerator();
1053 auto channel = BuildChannel("round_robin", response_generator);
1054 auto stub = BuildStub(channel);
1055 response_generator.SetNextResolution(GetServersPorts());
1056 // Wait until all backends are ready.
1058 CheckRpcSendOk(stub, DEBUG_LOCATION);
1059 } while (!SeenAllServers());
1061 // "Sync" to the end of the list. Next sequence of picks will start at the
1062 // first server (index 0).
1063 WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
1064 std::vector<int> connection_order;
1065 for (size_t i = 0; i < servers_.size(); ++i) {
1066 CheckRpcSendOk(stub, DEBUG_LOCATION);
1067 UpdateConnectionOrder(servers_, &connection_order);
1069 // Backends should be iterated over in the order in which the addresses were
1071 const auto expected = std::vector<int>{0, 1, 2};
1072 EXPECT_EQ(expected, connection_order);
1073 // Check LB policy name for the channel.
1074 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1077 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
1078 StartServers(1); // Single server
1079 auto response_generator = BuildResolverResponseGenerator();
1080 auto channel = BuildChannel("round_robin", response_generator);
1081 auto stub = BuildStub(channel);
1082 response_generator.SetNextResolution({servers_[0]->port_});
1083 WaitForServer(stub, 0, DEBUG_LOCATION);
1084 // Create a new channel and its corresponding RR LB policy, which will pick
1085 // the subchannels in READY state from the previous RPC against the same
1086 // target (even if it happened over a different channel, because subchannels
1087 // are globally reused). Progress should happen without any transition from
1088 // this READY state.
1089 auto second_response_generator = BuildResolverResponseGenerator();
1090 auto second_channel = BuildChannel("round_robin", second_response_generator);
1091 auto second_stub = BuildStub(second_channel);
1092 second_response_generator.SetNextResolution({servers_[0]->port_});
1093 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
1096 TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
1097 // Start servers and send one RPC per server.
1098 const int kNumServers = 3;
1099 StartServers(kNumServers);
1100 auto response_generator = BuildResolverResponseGenerator();
1101 auto channel = BuildChannel("round_robin", response_generator);
1102 auto stub = BuildStub(channel);
1103 std::vector<int> ports;
1104 // Start with a single server.
1105 gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1106 ports.emplace_back(servers_[0]->port_);
1107 response_generator.SetNextResolution(ports);
1108 WaitForServer(stub, 0, DEBUG_LOCATION);
1109 // Send RPCs. They should all go servers_[0]
1110 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1111 EXPECT_EQ(10, servers_[0]->service_.request_count());
1112 EXPECT_EQ(0, servers_[1]->service_.request_count());
1113 EXPECT_EQ(0, servers_[2]->service_.request_count());
1114 servers_[0]->service_.ResetCounters();
1115 // And now for the second server.
1116 gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1118 ports.emplace_back(servers_[1]->port_);
1119 response_generator.SetNextResolution(ports);
1120 // Wait until update has been processed, as signaled by the second backend
1121 // receiving a request.
1122 EXPECT_EQ(0, servers_[1]->service_.request_count());
1123 WaitForServer(stub, 1, DEBUG_LOCATION);
1124 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1125 EXPECT_EQ(0, servers_[0]->service_.request_count());
1126 EXPECT_EQ(10, servers_[1]->service_.request_count());
1127 EXPECT_EQ(0, servers_[2]->service_.request_count());
1128 servers_[1]->service_.ResetCounters();
1129 // ... and for the last server.
1130 gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1132 ports.emplace_back(servers_[2]->port_);
1133 response_generator.SetNextResolution(ports);
1134 WaitForServer(stub, 2, DEBUG_LOCATION);
1135 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1136 EXPECT_EQ(0, servers_[0]->service_.request_count());
1137 EXPECT_EQ(0, servers_[1]->service_.request_count());
1138 EXPECT_EQ(10, servers_[2]->service_.request_count());
1139 servers_[2]->service_.ResetCounters();
1140 // Back to all servers.
1141 gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1143 ports.emplace_back(servers_[0]->port_);
1144 ports.emplace_back(servers_[1]->port_);
1145 ports.emplace_back(servers_[2]->port_);
1146 response_generator.SetNextResolution(ports);
1147 WaitForServer(stub, 0, DEBUG_LOCATION);
1148 WaitForServer(stub, 1, DEBUG_LOCATION);
1149 WaitForServer(stub, 2, DEBUG_LOCATION);
1150 // Send three RPCs, one per server.
1151 for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1152 EXPECT_EQ(1, servers_[0]->service_.request_count());
1153 EXPECT_EQ(1, servers_[1]->service_.request_count());
1154 EXPECT_EQ(1, servers_[2]->service_.request_count());
1155 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1156 gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1158 response_generator.SetNextResolution(ports);
1159 grpc_connectivity_state channel_state;
1161 channel_state = channel->GetState(true /* try to connect */);
1162 } while (channel_state == GRPC_CHANNEL_READY);
1163 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
1164 servers_[0]->service_.ResetCounters();
1165 // Next update introduces servers_[1], making the channel recover.
1166 gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1168 ports.emplace_back(servers_[1]->port_);
1169 response_generator.SetNextResolution(ports);
1170 WaitForServer(stub, 1, DEBUG_LOCATION);
1171 channel_state = channel->GetState(false /* try to connect */);
1172 ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
1173 // Check LB policy name for the channel.
1174 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1177 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
1178 const int kNumServers = 3;
1179 StartServers(kNumServers);
1180 auto response_generator = BuildResolverResponseGenerator();
1181 auto channel = BuildChannel("round_robin", response_generator);
1182 auto stub = BuildStub(channel);
1183 std::vector<int> ports;
1184 // Start with a single server.
1185 ports.emplace_back(servers_[0]->port_);
1186 response_generator.SetNextResolution(ports);
1187 WaitForServer(stub, 0, DEBUG_LOCATION);
1188 // Send RPCs. They should all go to servers_[0]
1189 for (size_t i = 0; i < 10; ++i) SendRpc(stub);
1190 EXPECT_EQ(10, servers_[0]->service_.request_count());
1191 EXPECT_EQ(0, servers_[1]->service_.request_count());
1192 EXPECT_EQ(0, servers_[2]->service_.request_count());
1193 servers_[0]->service_.ResetCounters();
1194 // Shutdown one of the servers to be sent in the update.
1195 servers_[1]->Shutdown();
1196 ports.emplace_back(servers_[1]->port_);
1197 ports.emplace_back(servers_[2]->port_);
1198 response_generator.SetNextResolution(ports);
1199 WaitForServer(stub, 0, DEBUG_LOCATION);
1200 WaitForServer(stub, 2, DEBUG_LOCATION);
1201 // Send three RPCs, one per server.
1202 for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
1203 // The server in shutdown shouldn't receive any.
1204 EXPECT_EQ(0, servers_[1]->service_.request_count());
1207 TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
1208 // Start servers and send one RPC per server.
1209 const int kNumServers = 3;
1210 StartServers(kNumServers);
1211 auto response_generator = BuildResolverResponseGenerator();
1212 auto channel = BuildChannel("round_robin", response_generator);
1213 auto stub = BuildStub(channel);
1214 std::vector<int> ports = GetServersPorts();
1215 for (size_t i = 0; i < 1000; ++i) {
1216 std::shuffle(ports.begin(), ports.end(),
1217 std::mt19937(std::random_device()()));
1218 response_generator.SetNextResolution(ports);
1219 if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
1221 // Check LB policy name for the channel.
1222 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1225 TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
1226 // TODO(dgq): replicate the way internal testing exercises the concurrent
1227 // update provisions of RR.
1230 TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
1231 // Start servers and send one RPC per server.
1232 const int kNumServers = 3;
1233 std::vector<int> first_ports;
1234 std::vector<int> second_ports;
1235 first_ports.reserve(kNumServers);
1236 for (int i = 0; i < kNumServers; ++i) {
1237 first_ports.push_back(grpc_pick_unused_port_or_die());
1239 second_ports.reserve(kNumServers);
1240 for (int i = 0; i < kNumServers; ++i) {
1241 second_ports.push_back(grpc_pick_unused_port_or_die());
1243 StartServers(kNumServers, first_ports);
1244 auto response_generator = BuildResolverResponseGenerator();
1245 auto channel = BuildChannel("round_robin", response_generator);
1246 auto stub = BuildStub(channel);
1247 response_generator.SetNextResolution(first_ports);
1248 // Send a number of RPCs, which succeed.
1249 for (size_t i = 0; i < 100; ++i) {
1250 CheckRpcSendOk(stub, DEBUG_LOCATION);
1253 gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
1254 for (size_t i = 0; i < servers_.size(); ++i) {
1255 servers_[i]->Shutdown();
1257 gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
1258 gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
1259 // Client requests should fail. Send enough to tickle all subchannels.
1260 for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
1261 gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
1262 // Bring servers back up on a different set of ports. We need to do this to be
1263 // sure that the eventual success is *not* due to subchannel reconnection
1264 // attempts and that an actual re-resolution has happened as a result of the
1265 // RR policy going into transient failure when all its subchannels become
1266 // unavailable (in transient failure as well).
1267 gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
1268 StartServers(kNumServers, second_ports);
1269 // Don't notify of the update. Wait for the LB policy's re-resolution to
1270 // "pull" the new ports.
1271 response_generator.SetNextResolutionUponError(second_ports);
1272 gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
1273 gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
1274 // Client request should eventually (but still fairly soon) succeed.
1275 const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
1276 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1277 while (gpr_time_cmp(deadline, now) > 0) {
1278 if (SendRpc(stub)) break;
1279 now = gpr_now(GPR_CLOCK_MONOTONIC);
1281 ASSERT_GT(gpr_time_cmp(deadline, now), 0);
1284 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) {
1285 // Start servers and create channel. Channel should go to READY state.
1286 const int kNumServers = 3;
1287 StartServers(kNumServers);
1288 auto response_generator = BuildResolverResponseGenerator();
1289 auto channel = BuildChannel("round_robin", response_generator);
1290 auto stub = BuildStub(channel);
1291 response_generator.SetNextResolution(GetServersPorts());
1292 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1293 // Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
1294 // TODO(roth): This test should ideally check that even when the
1295 // subchannels are in state CONNECTING for an extended period of time,
1296 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1297 // currently have a good way to get a subchannel to report CONNECTING
1298 // for a long period of time, since the servers in this test framework
1299 // are on the loopback interface, which will immediately return a
1300 // "Connection refused" error, so the subchannels will only be in
1301 // CONNECTING state very briefly. When we have time, see if we can
1302 // find a way to fix this.
1303 for (size_t i = 0; i < servers_.size(); ++i) {
1304 servers_[i]->Shutdown();
1306 auto predicate = [](grpc_connectivity_state state) {
1307 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1309 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1312 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) {
1313 // Create channel and return servers that don't exist. Channel should
1314 // quickly transition into TRANSIENT_FAILURE.
1315 // TODO(roth): This test should ideally check that even when the
1316 // subchannels are in state CONNECTING for an extended period of time,
1317 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1318 // currently have a good way to get a subchannel to report CONNECTING
1319 // for a long period of time, since the servers in this test framework
1320 // are on the loopback interface, which will immediately return a
1321 // "Connection refused" error, so the subchannels will only be in
1322 // CONNECTING state very briefly. When we have time, see if we can
1323 // find a way to fix this.
1324 auto response_generator = BuildResolverResponseGenerator();
1325 auto channel = BuildChannel("round_robin", response_generator);
1326 auto stub = BuildStub(channel);
1327 response_generator.SetNextResolution({
1328 grpc_pick_unused_port_or_die(),
1329 grpc_pick_unused_port_or_die(),
1330 grpc_pick_unused_port_or_die(),
1332 for (size_t i = 0; i < servers_.size(); ++i) {
1333 servers_[i]->Shutdown();
1335 auto predicate = [](grpc_connectivity_state state) {
1336 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1338 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1341 TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
1342 const int kNumServers = 3;
1343 StartServers(kNumServers);
1344 const auto ports = GetServersPorts();
1345 auto response_generator = BuildResolverResponseGenerator();
1346 auto channel = BuildChannel("round_robin", response_generator);
1347 auto stub = BuildStub(channel);
1348 response_generator.SetNextResolution(ports);
1349 for (size_t i = 0; i < kNumServers; ++i) {
1350 WaitForServer(stub, i, DEBUG_LOCATION);
1352 for (size_t i = 0; i < servers_.size(); ++i) {
1353 CheckRpcSendOk(stub, DEBUG_LOCATION);
1354 EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1356 // One request should have gone to each server.
1357 for (size_t i = 0; i < servers_.size(); ++i) {
1358 EXPECT_EQ(1, servers_[i]->service_.request_count());
1360 const auto pre_death = servers_[0]->service_.request_count();
1361 // Kill the first server.
1362 servers_[0]->Shutdown();
1363 // Client request still succeed. May need retrying if RR had returned a pick
1364 // before noticing the change in the server's connectivity.
1365 while (!SendRpc(stub)) {
1366 } // Retry until success.
1367 // Send a bunch of RPCs that should succeed.
1368 for (int i = 0; i < 10 * kNumServers; ++i) {
1369 CheckRpcSendOk(stub, DEBUG_LOCATION);
1371 const auto post_death = servers_[0]->service_.request_count();
1372 // No requests have gone to the deceased server.
1373 EXPECT_EQ(pre_death, post_death);
1374 // Bring the first server back up.
1376 // Requests should start arriving at the first server either right away (if
1377 // the server managed to start before the RR policy retried the subchannel) or
1378 // after the subchannel retry delay otherwise (RR's subchannel retried before
1379 // the server was fully back up).
1380 WaitForServer(stub, 0, DEBUG_LOCATION);
1383 // If health checking is required by client but health checking service
1384 // is not running on the server, the channel should be treated as healthy.
1385 TEST_F(ClientLbEnd2endTest,
1386 RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) {
1387 StartServers(1); // Single server
1388 ChannelArguments args;
1389 args.SetServiceConfigJSON(
1390 "{\"healthCheckConfig\": "
1391 "{\"serviceName\": \"health_check_service_name\"}}");
1392 auto response_generator = BuildResolverResponseGenerator();
1393 auto channel = BuildChannel("round_robin", response_generator, args);
1394 auto stub = BuildStub(channel);
1395 response_generator.SetNextResolution({servers_[0]->port_});
1396 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1397 CheckRpcSendOk(stub, DEBUG_LOCATION);
1400 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) {
1401 EnableDefaultHealthCheckService(true);
1403 const int kNumServers = 3;
1404 StartServers(kNumServers);
1405 ChannelArguments args;
1406 args.SetServiceConfigJSON(
1407 "{\"healthCheckConfig\": "
1408 "{\"serviceName\": \"health_check_service_name\"}}");
1409 auto response_generator = BuildResolverResponseGenerator();
1410 auto channel = BuildChannel("round_robin", response_generator, args);
1411 auto stub = BuildStub(channel);
1412 response_generator.SetNextResolution(GetServersPorts());
1413 // Channel should not become READY, because health checks should be failing.
1415 "*** initial state: unknown health check service name for "
1417 EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1418 // Now set one of the servers to be healthy.
1419 // The channel should become healthy and all requests should go to
1420 // the healthy server.
1421 gpr_log(GPR_INFO, "*** server 0 healthy");
1422 servers_[0]->SetServingStatus("health_check_service_name", true);
1423 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1424 for (int i = 0; i < 10; ++i) {
1425 CheckRpcSendOk(stub, DEBUG_LOCATION);
1427 EXPECT_EQ(10, servers_[0]->service_.request_count());
1428 EXPECT_EQ(0, servers_[1]->service_.request_count());
1429 EXPECT_EQ(0, servers_[2]->service_.request_count());
1430 // Now set a second server to be healthy.
1431 gpr_log(GPR_INFO, "*** server 2 healthy");
1432 servers_[2]->SetServingStatus("health_check_service_name", true);
1433 WaitForServer(stub, 2, DEBUG_LOCATION);
1434 for (int i = 0; i < 10; ++i) {
1435 CheckRpcSendOk(stub, DEBUG_LOCATION);
1437 EXPECT_EQ(5, servers_[0]->service_.request_count());
1438 EXPECT_EQ(0, servers_[1]->service_.request_count());
1439 EXPECT_EQ(5, servers_[2]->service_.request_count());
1440 // Now set the remaining server to be healthy.
1441 gpr_log(GPR_INFO, "*** server 1 healthy");
1442 servers_[1]->SetServingStatus("health_check_service_name", true);
1443 WaitForServer(stub, 1, DEBUG_LOCATION);
1444 for (int i = 0; i < 9; ++i) {
1445 CheckRpcSendOk(stub, DEBUG_LOCATION);
1447 EXPECT_EQ(3, servers_[0]->service_.request_count());
1448 EXPECT_EQ(3, servers_[1]->service_.request_count());
1449 EXPECT_EQ(3, servers_[2]->service_.request_count());
1450 // Now set one server to be unhealthy again. Then wait until the
1451 // unhealthiness has hit the client. We know that the client will see
1452 // this when we send kNumServers requests and one of the remaining servers
1453 // sees two of the requests.
1454 gpr_log(GPR_INFO, "*** server 0 unhealthy");
1455 servers_[0]->SetServingStatus("health_check_service_name", false);
1458 for (int i = 0; i < kNumServers; ++i) {
1459 CheckRpcSendOk(stub, DEBUG_LOCATION);
1461 } while (servers_[1]->service_.request_count() != 2 &&
1462 servers_[2]->service_.request_count() != 2);
1463 // Now set the remaining two servers to be unhealthy. Make sure the
1464 // channel leaves READY state and that RPCs fail.
1465 gpr_log(GPR_INFO, "*** all servers unhealthy");
1466 servers_[1]->SetServingStatus("health_check_service_name", false);
1467 servers_[2]->SetServingStatus("health_check_service_name", false);
1468 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1469 CheckRpcSendFailure(stub);
1471 EnableDefaultHealthCheckService(false);
1474 TEST_F(ClientLbEnd2endTest,
1475 RoundRobinWithHealthCheckingHandlesSubchannelFailure) {
1476 EnableDefaultHealthCheckService(true);
1478 const int kNumServers = 3;
1479 StartServers(kNumServers);
1480 servers_[0]->SetServingStatus("health_check_service_name", true);
1481 servers_[1]->SetServingStatus("health_check_service_name", true);
1482 servers_[2]->SetServingStatus("health_check_service_name", true);
1483 ChannelArguments args;
1484 args.SetServiceConfigJSON(
1485 "{\"healthCheckConfig\": "
1486 "{\"serviceName\": \"health_check_service_name\"}}");
1487 auto response_generator = BuildResolverResponseGenerator();
1488 auto channel = BuildChannel("round_robin", response_generator, args);
1489 auto stub = BuildStub(channel);
1490 response_generator.SetNextResolution(GetServersPorts());
1491 WaitForServer(stub, 0, DEBUG_LOCATION);
1492 // Stop server 0 and send a new resolver result to ensure that RR
1493 // checks each subchannel's state.
1494 servers_[0]->Shutdown();
1495 response_generator.SetNextResolution(GetServersPorts());
1496 // Send a bunch more RPCs.
1497 for (size_t i = 0; i < 100; i++) {
1502 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
1503 EnableDefaultHealthCheckService(true);
1505 const int kNumServers = 1;
1506 StartServers(kNumServers);
1507 // Create a channel with health-checking enabled.
1508 ChannelArguments args;
1509 args.SetServiceConfigJSON(
1510 "{\"healthCheckConfig\": "
1511 "{\"serviceName\": \"health_check_service_name\"}}");
1512 auto response_generator1 = BuildResolverResponseGenerator();
1513 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1514 auto stub1 = BuildStub(channel1);
1515 std::vector<int> ports = GetServersPorts();
1516 response_generator1.SetNextResolution(ports);
1517 // Create a channel with health checking enabled but inhibited.
1518 args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1519 auto response_generator2 = BuildResolverResponseGenerator();
1520 auto channel2 = BuildChannel("round_robin", response_generator2, args);
1521 auto stub2 = BuildStub(channel2);
1522 response_generator2.SetNextResolution(ports);
1523 // First channel should not become READY, because health checks should be
1525 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1526 CheckRpcSendFailure(stub1);
1527 // Second channel should be READY.
1528 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1529 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1530 // Enable health checks on the backend and wait for channel 1 to succeed.
1531 servers_[0]->SetServingStatus("health_check_service_name", true);
1532 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1533 // Check that we created only one subchannel to the backend.
1534 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1536 EnableDefaultHealthCheckService(false);
1539 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
1540 EnableDefaultHealthCheckService(true);
1542 const int kNumServers = 1;
1543 StartServers(kNumServers);
1544 // Create a channel with health-checking enabled.
1545 ChannelArguments args;
1546 args.SetServiceConfigJSON(
1547 "{\"healthCheckConfig\": "
1548 "{\"serviceName\": \"health_check_service_name\"}}");
1549 auto response_generator1 = BuildResolverResponseGenerator();
1550 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1551 auto stub1 = BuildStub(channel1);
1552 std::vector<int> ports = GetServersPorts();
1553 response_generator1.SetNextResolution(ports);
1554 // Create a channel with health-checking enabled with a different
1556 ChannelArguments args2;
1557 args2.SetServiceConfigJSON(
1558 "{\"healthCheckConfig\": "
1559 "{\"serviceName\": \"health_check_service_name2\"}}");
1560 auto response_generator2 = BuildResolverResponseGenerator();
1561 auto channel2 = BuildChannel("round_robin", response_generator2, args2);
1562 auto stub2 = BuildStub(channel2);
1563 response_generator2.SetNextResolution(ports);
1564 // Allow health checks from channel 2 to succeed.
1565 servers_[0]->SetServingStatus("health_check_service_name2", true);
1566 // First channel should not become READY, because health checks should be
1568 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1569 CheckRpcSendFailure(stub1);
1570 // Second channel should be READY.
1571 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1572 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1573 // Enable health checks for channel 1 and wait for it to succeed.
1574 servers_[0]->SetServingStatus("health_check_service_name", true);
1575 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1576 // Check that we created only one subchannel to the backend.
1577 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1579 EnableDefaultHealthCheckService(false);
1582 TEST_F(ClientLbEnd2endTest,
1583 RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) {
1584 EnableDefaultHealthCheckService(true);
1586 const int kNumServers = 1;
1587 StartServers(kNumServers);
1588 // Create a channel with health-checking enabled.
1589 const char* kServiceConfigJson =
1590 "{\"healthCheckConfig\": "
1591 "{\"serviceName\": \"health_check_service_name\"}}";
1592 auto response_generator = BuildResolverResponseGenerator();
1593 auto channel = BuildChannel("round_robin", response_generator);
1594 auto stub = BuildStub(channel);
1595 std::vector<int> ports = GetServersPorts();
1596 response_generator.SetNextResolution(ports, kServiceConfigJson);
1597 servers_[0]->SetServingStatus("health_check_service_name", true);
1598 EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
1599 // Send an update on the channel to change it to use a health checking
1600 // service name that is not being reported as healthy.
1601 const char* kServiceConfigJson2 =
1602 "{\"healthCheckConfig\": "
1603 "{\"serviceName\": \"health_check_service_name2\"}}";
1604 response_generator.SetNextResolution(ports, kServiceConfigJson2);
1605 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1607 EnableDefaultHealthCheckService(false);
1610 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
1612 const int kNumServers = 1;
1613 StartServers(kNumServers);
1614 // Set max idle time and build the channel.
1615 ChannelArguments args;
1616 args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
1617 auto response_generator = BuildResolverResponseGenerator();
1618 auto channel = BuildChannel("", response_generator, args);
1619 auto stub = BuildStub(channel);
1620 // The initial channel state should be IDLE.
1621 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1622 // After sending RPC, channel state should be READY.
1623 response_generator.SetNextResolution(GetServersPorts());
1624 CheckRpcSendOk(stub, DEBUG_LOCATION);
1625 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1626 // After a period time not using the channel, the channel state should switch
1628 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
1629 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1630 // Sending a new RPC should awake the IDLE channel.
1631 response_generator.SetNextResolution(GetServersPorts());
1632 CheckRpcSendOk(stub, DEBUG_LOCATION);
1633 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1636 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
1638 void SetUp() override {
1639 ClientLbEnd2endTest::SetUp();
1640 grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
1641 ReportTrailerIntercepted, this);
1644 void TearDown() override { ClientLbEnd2endTest::TearDown(); }
1646 int trailers_intercepted() {
1647 grpc::internal::MutexLock lock(&mu_);
1648 return trailers_intercepted_;
1651 const udpa::data::orca::v1::OrcaLoadReport* backend_load_report() {
1652 grpc::internal::MutexLock lock(&mu_);
1653 return load_report_.get();
1657 static void ReportTrailerIntercepted(
1658 void* arg, const grpc_core::LoadBalancingPolicy::BackendMetricData*
1659 backend_metric_data) {
1660 ClientLbInterceptTrailingMetadataTest* self =
1661 static_cast<ClientLbInterceptTrailingMetadataTest*>(arg);
1662 grpc::internal::MutexLock lock(&self->mu_);
1663 self->trailers_intercepted_++;
1664 if (backend_metric_data != nullptr) {
1665 self->load_report_.reset(new udpa::data::orca::v1::OrcaLoadReport);
1666 self->load_report_->set_cpu_utilization(
1667 backend_metric_data->cpu_utilization);
1668 self->load_report_->set_mem_utilization(
1669 backend_metric_data->mem_utilization);
1670 self->load_report_->set_rps(backend_metric_data->requests_per_second);
1671 for (const auto& p : backend_metric_data->request_cost) {
1672 grpc_core::UniquePtr<char> name = p.first.dup();
1673 (*self->load_report_->mutable_request_cost())[name.get()] = p.second;
1675 for (const auto& p : backend_metric_data->utilization) {
1676 grpc_core::UniquePtr<char> name = p.first.dup();
1677 (*self->load_report_->mutable_utilization())[name.get()] = p.second;
1682 grpc::internal::Mutex mu_;
1683 int trailers_intercepted_ = 0;
1684 std::unique_ptr<udpa::data::orca::v1::OrcaLoadReport> load_report_;
1687 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
1688 const int kNumServers = 1;
1689 const int kNumRpcs = 10;
1690 StartServers(kNumServers);
1691 auto response_generator = BuildResolverResponseGenerator();
1693 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1694 auto stub = BuildStub(channel);
1695 response_generator.SetNextResolution(GetServersPorts());
1696 for (size_t i = 0; i < kNumRpcs; ++i) {
1697 CheckRpcSendOk(stub, DEBUG_LOCATION);
1699 // Check LB policy name for the channel.
1700 EXPECT_EQ("intercept_trailing_metadata_lb",
1701 channel->GetLoadBalancingPolicyName());
1702 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1703 EXPECT_EQ(nullptr, backend_load_report());
1706 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
1707 const int kNumServers = 1;
1708 const int kNumRpcs = 10;
1709 StartServers(kNumServers);
1710 ChannelArguments args;
1711 args.SetServiceConfigJSON(
1713 " \"methodConfig\": [ {\n"
1715 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
1717 " \"retryPolicy\": {\n"
1718 " \"maxAttempts\": 3,\n"
1719 " \"initialBackoff\": \"1s\",\n"
1720 " \"maxBackoff\": \"120s\",\n"
1721 " \"backoffMultiplier\": 1.6,\n"
1722 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
1726 auto response_generator = BuildResolverResponseGenerator();
1728 BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
1729 auto stub = BuildStub(channel);
1730 response_generator.SetNextResolution(GetServersPorts());
1731 for (size_t i = 0; i < kNumRpcs; ++i) {
1732 CheckRpcSendOk(stub, DEBUG_LOCATION);
1734 // Check LB policy name for the channel.
1735 EXPECT_EQ("intercept_trailing_metadata_lb",
1736 channel->GetLoadBalancingPolicyName());
1737 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1738 EXPECT_EQ(nullptr, backend_load_report());
1741 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
1742 const int kNumServers = 1;
1743 const int kNumRpcs = 10;
1744 StartServers(kNumServers);
1745 udpa::data::orca::v1::OrcaLoadReport load_report;
1746 load_report.set_cpu_utilization(0.5);
1747 load_report.set_mem_utilization(0.75);
1748 load_report.set_rps(25);
1749 auto* request_cost = load_report.mutable_request_cost();
1750 (*request_cost)["foo"] = 0.8;
1751 (*request_cost)["bar"] = 1.4;
1752 auto* utilization = load_report.mutable_utilization();
1753 (*utilization)["baz"] = 1.1;
1754 (*utilization)["quux"] = 0.9;
1755 for (const auto& server : servers_) {
1756 server->service_.set_load_report(&load_report);
1758 auto response_generator = BuildResolverResponseGenerator();
1760 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1761 auto stub = BuildStub(channel);
1762 response_generator.SetNextResolution(GetServersPorts());
1763 for (size_t i = 0; i < kNumRpcs; ++i) {
1764 CheckRpcSendOk(stub, DEBUG_LOCATION);
1765 auto* actual = backend_load_report();
1766 ASSERT_NE(actual, nullptr);
1767 // TODO(roth): Change this to use EqualsProto() once that becomes
1768 // available in OSS.
1769 EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
1770 EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
1771 EXPECT_EQ(actual->rps(), load_report.rps());
1772 EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
1773 for (const auto& p : actual->request_cost()) {
1774 auto it = load_report.request_cost().find(p.first);
1775 ASSERT_NE(it, load_report.request_cost().end());
1776 EXPECT_EQ(it->second, p.second);
1778 EXPECT_EQ(actual->utilization().size(), load_report.utilization().size());
1779 for (const auto& p : actual->utilization()) {
1780 auto it = load_report.utilization().find(p.first);
1781 ASSERT_NE(it, load_report.utilization().end());
1782 EXPECT_EQ(it->second, p.second);
1785 // Check LB policy name for the channel.
1786 EXPECT_EQ("intercept_trailing_metadata_lb",
1787 channel->GetLoadBalancingPolicyName());
1788 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1792 } // namespace testing
1795 int main(int argc, char** argv) {
1796 ::testing::InitGoogleTest(&argc, argv);
1797 grpc::testing::TestEnvironment env(argc, argv);
1798 const auto result = RUN_ALL_TESTS();