3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include "absl/memory/memory.h"
28 #include "absl/strings/str_cat.h"
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/time.h>
35 #include <grpcpp/channel.h>
36 #include <grpcpp/client_context.h>
37 #include <grpcpp/create_channel.h>
38 #include <grpcpp/health_check_service_interface.h>
39 #include <grpcpp/impl/codegen/sync.h>
40 #include <grpcpp/server.h>
41 #include <grpcpp/server_builder.h>
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
45 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
46 #include "src/core/ext/filters/client_channel/server_address.h"
47 #include "src/core/ext/filters/client_channel/service_config.h"
48 #include "src/core/lib/backoff/backoff.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/gpr/env.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/ref_counted_ptr.h"
53 #include "src/core/lib/iomgr/parse_address.h"
54 #include "src/core/lib/iomgr/tcp_client.h"
55 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
56 #include "src/cpp/client/secure_credentials.h"
57 #include "src/cpp/server/secure_server_credentials.h"
59 #include "src/proto/grpc/testing/echo.grpc.pb.h"
60 #include "src/proto/grpc/testing/xds/orca_load_report_for_test.pb.h"
61 #include "test/core/util/port.h"
62 #include "test/core/util/test_config.h"
63 #include "test/core/util/test_lb_policies.h"
64 #include "test/cpp/end2end/test_service_impl.h"
66 #include <gmock/gmock.h>
67 #include <gtest/gtest.h>
69 using grpc::testing::EchoRequest;
70 using grpc::testing::EchoResponse;
71 using std::chrono::system_clock;
73 // defined in tcp_client.cc
74 extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
76 static grpc_tcp_client_vtable* default_client_impl;
82 gpr_atm g_connection_delay_ms;
84 void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep,
85 grpc_pollset_set* interested_parties,
86 const grpc_channel_args* channel_args,
87 const grpc_resolved_address* addr,
88 grpc_millis deadline) {
89 const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms);
91 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
93 default_client_impl->connect(closure, ep, interested_parties, channel_args,
94 addr, deadline + delay_ms);
97 grpc_tcp_client_vtable delayed_connect = {tcp_client_connect_with_delay};
99 // Subclass of TestServiceImpl that increments a request counter for
100 // every call to the Echo RPC.
101 class MyTestServiceImpl : public TestServiceImpl {
103 Status Echo(ServerContext* context, const EchoRequest* request,
104 EchoResponse* response) override {
105 const udpa::data::orca::v1::OrcaLoadReport* load_report = nullptr;
107 grpc::internal::MutexLock lock(&mu_);
109 load_report = load_report_;
111 AddClient(context->peer());
112 if (load_report != nullptr) {
113 // TODO(roth): Once we provide a more standard server-side API for
114 // populating this data, use that API here.
115 context->AddTrailingMetadata("x-endpoint-load-metrics-bin",
116 load_report->SerializeAsString());
118 return TestServiceImpl::Echo(context, request, response);
121 int request_count() {
122 grpc::internal::MutexLock lock(&mu_);
123 return request_count_;
126 void ResetCounters() {
127 grpc::internal::MutexLock lock(&mu_);
131 std::set<std::string> clients() {
132 grpc::internal::MutexLock lock(&clients_mu_);
136 void set_load_report(udpa::data::orca::v1::OrcaLoadReport* load_report) {
137 grpc::internal::MutexLock lock(&mu_);
138 load_report_ = load_report;
142 void AddClient(const std::string& client) {
143 grpc::internal::MutexLock lock(&clients_mu_);
144 clients_.insert(client);
147 grpc::internal::Mutex mu_;
148 int request_count_ = 0;
149 const udpa::data::orca::v1::OrcaLoadReport* load_report_ = nullptr;
150 grpc::internal::Mutex clients_mu_;
151 std::set<std::string> clients_;
154 class FakeResolverResponseGeneratorWrapper {
156 FakeResolverResponseGeneratorWrapper()
157 : response_generator_(grpc_core::MakeRefCounted<
158 grpc_core::FakeResolverResponseGenerator>()) {}
160 FakeResolverResponseGeneratorWrapper(
161 FakeResolverResponseGeneratorWrapper&& other) noexcept {
162 response_generator_ = std::move(other.response_generator_);
165 void SetNextResolution(
166 const std::vector<int>& ports, const char* service_config_json = nullptr,
167 const char* attribute_key = nullptr,
168 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
170 grpc_core::ExecCtx exec_ctx;
171 response_generator_->SetResponse(BuildFakeResults(
172 ports, service_config_json, attribute_key, std::move(attribute)));
175 void SetNextResolutionUponError(const std::vector<int>& ports) {
176 grpc_core::ExecCtx exec_ctx;
177 response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
180 void SetFailureOnReresolution() {
181 grpc_core::ExecCtx exec_ctx;
182 response_generator_->SetFailureOnReresolution();
185 grpc_core::FakeResolverResponseGenerator* Get() const {
186 return response_generator_.get();
190 static grpc_core::Resolver::Result BuildFakeResults(
191 const std::vector<int>& ports, const char* service_config_json = nullptr,
192 const char* attribute_key = nullptr,
193 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
195 grpc_core::Resolver::Result result;
196 for (const int& port : ports) {
197 std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port);
198 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str.c_str(), true);
199 GPR_ASSERT(lb_uri != nullptr);
200 grpc_resolved_address address;
201 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
202 std::map<const char*,
203 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface>>
205 if (attribute != nullptr) {
206 attributes[attribute_key] = attribute->Copy();
208 result.addresses.emplace_back(address.addr, address.len,
209 nullptr /* args */, std::move(attributes));
210 grpc_uri_destroy(lb_uri);
212 if (service_config_json != nullptr) {
213 result.service_config = grpc_core::ServiceConfig::Create(
214 nullptr, service_config_json, &result.service_config_error);
215 GPR_ASSERT(result.service_config != nullptr);
220 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
224 class ClientLbEnd2endTest : public ::testing::Test {
226 ClientLbEnd2endTest()
227 : server_host_("localhost"),
228 kRequestMessage_("Live long and prosper."),
229 creds_(new SecureChannelCredentials(
230 grpc_fake_transport_security_credentials_create())) {}
232 static void SetUpTestCase() {
233 // Make the backup poller poll very frequently in order to pick up
234 // updates from all the subchannels's FDs.
235 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
237 // Workaround Apple CFStream bug
238 gpr_setenv("grpc_cfstream", "0");
242 void SetUp() override { grpc_init(); }
244 void TearDown() override {
245 for (size_t i = 0; i < servers_.size(); ++i) {
246 servers_[i]->Shutdown();
253 void CreateServers(size_t num_servers,
254 std::vector<int> ports = std::vector<int>()) {
256 for (size_t i = 0; i < num_servers; ++i) {
258 if (ports.size() == num_servers) port = ports[i];
259 servers_.emplace_back(new ServerData(port));
263 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
265 void StartServers(size_t num_servers,
266 std::vector<int> ports = std::vector<int>()) {
267 CreateServers(num_servers, std::move(ports));
268 for (size_t i = 0; i < num_servers; ++i) {
273 std::vector<int> GetServersPorts(size_t start_index = 0) {
274 std::vector<int> ports;
275 for (size_t i = start_index; i < servers_.size(); ++i) {
276 ports.push_back(servers_[i]->port_);
281 FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
282 return FakeResolverResponseGeneratorWrapper();
285 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
286 const std::shared_ptr<Channel>& channel) {
287 return grpc::testing::EchoTestService::NewStub(channel);
290 std::shared_ptr<Channel> BuildChannel(
291 const std::string& lb_policy_name,
292 const FakeResolverResponseGeneratorWrapper& response_generator,
293 ChannelArguments args = ChannelArguments()) {
294 if (!lb_policy_name.empty()) {
295 args.SetLoadBalancingPolicyName(lb_policy_name);
296 } // else, default to pick first
297 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
298 response_generator.Get());
299 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
303 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
304 EchoResponse* response = nullptr, int timeout_ms = 1000,
305 Status* result = nullptr, bool wait_for_ready = false) {
306 const bool local_response = (response == nullptr);
307 if (local_response) response = new EchoResponse;
309 request.set_message(kRequestMessage_);
310 request.mutable_param()->set_echo_metadata(true);
311 ClientContext context;
312 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
313 if (wait_for_ready) context.set_wait_for_ready(true);
314 context.AddMetadata("foo", "1");
315 context.AddMetadata("bar", "2");
316 context.AddMetadata("baz", "3");
317 Status status = stub->Echo(&context, request, response);
318 if (result != nullptr) *result = status;
319 if (local_response) delete response;
324 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
325 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
326 EchoResponse response;
329 SendRpc(stub, &response, 2000, &status, wait_for_ready);
330 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
332 << "Error: " << status.error_message() << " "
333 << status.error_details();
334 ASSERT_EQ(response.message(), kRequestMessage_)
335 << "From " << location.file() << ":" << location.line();
336 if (!success) abort();
339 void CheckRpcSendFailure(
340 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
341 const bool success = SendRpc(stub);
342 EXPECT_FALSE(success);
347 std::unique_ptr<Server> server_;
348 MyTestServiceImpl service_;
349 std::unique_ptr<std::thread> thread_;
350 bool server_ready_ = false;
351 bool started_ = false;
353 explicit ServerData(int port = 0) {
354 port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
357 void Start(const std::string& server_host) {
358 gpr_log(GPR_INFO, "starting server on port %d", port_);
360 grpc::internal::Mutex mu;
361 grpc::internal::MutexLock lock(&mu);
362 grpc::internal::CondVar cond;
363 thread_ = absl::make_unique<std::thread>(
364 std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
365 cond.WaitUntil(&mu, [this] { return server_ready_; });
366 server_ready_ = false;
367 gpr_log(GPR_INFO, "server startup complete");
370 void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
371 grpc::internal::CondVar* cond) {
372 std::ostringstream server_address;
373 server_address << server_host << ":" << port_;
374 ServerBuilder builder;
375 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
376 grpc_fake_transport_security_server_credentials_create()));
377 builder.AddListeningPort(server_address.str(), std::move(creds));
378 builder.RegisterService(&service_);
379 server_ = builder.BuildAndStart();
380 grpc::internal::MutexLock lock(mu);
381 server_ready_ = true;
386 if (!started_) return;
387 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
392 void SetServingStatus(const std::string& service, bool serving) {
393 server_->GetHealthCheckService()->SetServingStatus(service, serving);
397 void ResetCounters() {
398 for (const auto& server : servers_) server->service_.ResetCounters();
402 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
403 size_t server_idx, const grpc_core::DebugLocation& location,
404 bool ignore_failure = false) {
406 if (ignore_failure) {
409 CheckRpcSendOk(stub, location, true);
411 } while (servers_[server_idx]->service_.request_count() == 0);
415 bool WaitForChannelState(
417 const std::function<bool(grpc_connectivity_state)>& predicate,
418 bool try_to_connect = false, int timeout_seconds = 5) {
419 const gpr_timespec deadline =
420 grpc_timeout_seconds_to_deadline(timeout_seconds);
422 grpc_connectivity_state state = channel->GetState(try_to_connect);
423 if (predicate(state)) break;
424 if (!channel->WaitForStateChange(state, deadline)) return false;
429 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
430 auto predicate = [](grpc_connectivity_state state) {
431 return state != GRPC_CHANNEL_READY;
433 return WaitForChannelState(channel, predicate, false, timeout_seconds);
436 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
437 auto predicate = [](grpc_connectivity_state state) {
438 return state == GRPC_CHANNEL_READY;
440 return WaitForChannelState(channel, predicate, true, timeout_seconds);
443 bool SeenAllServers() {
444 for (const auto& server : servers_) {
445 if (server->service_.request_count() == 0) return false;
450 // Updates \a connection_order by appending to it the index of the newly
451 // connected server. Must be called after every single RPC.
452 void UpdateConnectionOrder(
453 const std::vector<std::unique_ptr<ServerData>>& servers,
454 std::vector<int>* connection_order) {
455 for (size_t i = 0; i < servers.size(); ++i) {
456 if (servers[i]->service_.request_count() == 1) {
457 // Was the server index known? If not, update connection_order.
459 std::find(connection_order->begin(), connection_order->end(), i);
460 if (it == connection_order->end()) {
461 connection_order->push_back(i);
468 const std::string server_host_;
469 std::vector<std::unique_ptr<ServerData>> servers_;
470 const std::string kRequestMessage_;
471 std::shared_ptr<ChannelCredentials> creds_;
474 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
475 const int kNumServers = 3;
476 StartServers(kNumServers);
477 auto response_generator = BuildResolverResponseGenerator();
478 auto channel = BuildChannel("", response_generator);
479 auto stub = BuildStub(channel);
480 // Initial state should be IDLE.
481 EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
482 // Tell the channel to try to connect.
483 // Note that this call also returns IDLE, since the state change has
484 // not yet occurred; it just gets triggered by this call.
485 EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
486 // Now that the channel is trying to connect, we should be in state
488 EXPECT_EQ(channel->GetState(false /* try_to_connect */),
489 GRPC_CHANNEL_CONNECTING);
490 // Return a resolver result, which allows the connection attempt to proceed.
491 response_generator.SetNextResolution(GetServersPorts());
492 // We should eventually transition into state READY.
493 EXPECT_TRUE(WaitForChannelReady(channel.get()));
496 TEST_F(ClientLbEnd2endTest, PickFirst) {
497 // Start servers and send one RPC per server.
498 const int kNumServers = 3;
499 StartServers(kNumServers);
500 auto response_generator = BuildResolverResponseGenerator();
501 auto channel = BuildChannel(
502 "", response_generator); // test that pick first is the default.
503 auto stub = BuildStub(channel);
504 response_generator.SetNextResolution(GetServersPorts());
505 for (size_t i = 0; i < servers_.size(); ++i) {
506 CheckRpcSendOk(stub, DEBUG_LOCATION);
508 // All requests should have gone to a single server.
510 for (size_t i = 0; i < servers_.size(); ++i) {
511 const int request_count = servers_[i]->service_.request_count();
512 if (request_count == kNumServers) {
515 EXPECT_EQ(0, request_count);
519 // Check LB policy name for the channel.
520 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
523 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
524 StartServers(1); // Single server
525 auto response_generator = BuildResolverResponseGenerator();
526 auto channel = BuildChannel(
527 "", response_generator); // test that pick first is the default.
528 auto stub = BuildStub(channel);
529 response_generator.SetNextResolution({servers_[0]->port_});
530 WaitForServer(stub, 0, DEBUG_LOCATION);
531 // Create a new channel and its corresponding PF LB policy, which will pick
532 // the subchannels in READY state from the previous RPC against the same
533 // target (even if it happened over a different channel, because subchannels
534 // are globally reused). Progress should happen without any transition from
536 auto second_response_generator = BuildResolverResponseGenerator();
537 auto second_channel = BuildChannel("", second_response_generator);
538 auto second_stub = BuildStub(second_channel);
539 second_response_generator.SetNextResolution({servers_[0]->port_});
540 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
543 TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
544 ChannelArguments args;
545 constexpr int kInitialBackOffMs = 5000;
546 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
547 // Create 2 servers, but start only the second one.
548 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
549 grpc_pick_unused_port_or_die()};
550 CreateServers(2, ports);
552 auto response_generator1 = BuildResolverResponseGenerator();
553 auto channel1 = BuildChannel("pick_first", response_generator1, args);
554 auto stub1 = BuildStub(channel1);
555 response_generator1.SetNextResolution(ports);
556 // Wait for second server to be ready.
557 WaitForServer(stub1, 1, DEBUG_LOCATION);
558 // Create a second channel with the same addresses. Its PF instance
559 // should immediately pick the second subchannel, since it's already
561 auto response_generator2 = BuildResolverResponseGenerator();
562 auto channel2 = BuildChannel("pick_first", response_generator2, args);
563 response_generator2.SetNextResolution(ports);
564 // Check that the channel reports READY without waiting for the
566 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
569 TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
570 ChannelArguments args;
571 constexpr int kInitialBackOffMs = 100;
572 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
573 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
574 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
575 auto response_generator = BuildResolverResponseGenerator();
576 auto channel = BuildChannel("pick_first", response_generator, args);
577 auto stub = BuildStub(channel);
578 response_generator.SetNextResolution(ports);
579 // The channel won't become connected (there's no server).
580 ASSERT_FALSE(channel->WaitForConnected(
581 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
582 // Bring up a server on the chosen port.
583 StartServers(1, ports);
585 ASSERT_TRUE(channel->WaitForConnected(
586 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
587 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
588 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
589 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
590 // We should have waited at least kInitialBackOffMs. We substract one to
591 // account for test and precision accuracy drift.
592 EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
593 // But not much more.
596 grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
600 TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
601 ChannelArguments args;
602 constexpr int kMinReconnectBackOffMs = 1000;
603 args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
604 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
605 auto response_generator = BuildResolverResponseGenerator();
606 auto channel = BuildChannel("pick_first", response_generator, args);
607 auto stub = BuildStub(channel);
608 response_generator.SetNextResolution(ports);
609 // Make connection delay a 10% longer than it's willing to in order to make
610 // sure we are hitting the codepath that waits for the min reconnect backoff.
611 gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
612 default_client_impl = grpc_tcp_client_impl;
613 grpc_set_tcp_client_impl(&delayed_connect);
614 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
615 channel->WaitForConnected(
616 grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
617 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
618 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
619 gpr_log(GPR_DEBUG, "Waited %" PRId64 " ms", waited_ms);
620 // We should have waited at least kMinReconnectBackOffMs. We substract one to
621 // account for test and precision accuracy drift.
622 EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
623 gpr_atm_rel_store(&g_connection_delay_ms, 0);
626 TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
627 ChannelArguments args;
628 constexpr int kInitialBackOffMs = 1000;
629 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
630 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
631 auto response_generator = BuildResolverResponseGenerator();
632 auto channel = BuildChannel("pick_first", response_generator, args);
633 auto stub = BuildStub(channel);
634 response_generator.SetNextResolution(ports);
635 // The channel won't become connected (there's no server).
637 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
638 // Bring up a server on the chosen port.
639 StartServers(1, ports);
640 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
641 // Wait for connect, but not long enough. This proves that we're
642 // being throttled by initial backoff.
644 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
645 // Reset connection backoff.
646 experimental::ChannelResetConnectionBackoff(channel.get());
647 // Wait for connect. Should happen as soon as the client connects to
648 // the newly started server, which should be before the initial
649 // backoff timeout elapses.
651 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(20)));
652 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
653 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
654 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
655 // We should have waited less than kInitialBackOffMs.
656 EXPECT_LT(waited_ms, kInitialBackOffMs);
659 TEST_F(ClientLbEnd2endTest,
660 PickFirstResetConnectionBackoffNextAttemptStartsImmediately) {
661 ChannelArguments args;
662 constexpr int kInitialBackOffMs = 1000;
663 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
664 const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
665 auto response_generator = BuildResolverResponseGenerator();
666 auto channel = BuildChannel("pick_first", response_generator, args);
667 auto stub = BuildStub(channel);
668 response_generator.SetNextResolution(ports);
669 // Wait for connect, which should fail ~immediately, because the server
671 gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
673 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
674 // Reset connection backoff.
675 // Note that the time at which the third attempt will be started is
676 // actually computed at this point, so we record the start time here.
677 gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
678 const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
679 experimental::ChannelResetConnectionBackoff(channel.get());
680 // Trigger a second connection attempt. This should also fail
681 // ~immediately, but the retry should be scheduled for
682 // kInitialBackOffMs instead of applying the multiplier.
683 gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT");
685 channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
686 // Bring up a server on the chosen port.
687 gpr_log(GPR_INFO, "=== STARTING BACKEND");
688 StartServers(1, ports);
689 // Wait for connect. Should happen within kInitialBackOffMs.
690 // Give an extra 100ms to account for the time spent in the second and
691 // third connection attempts themselves (since what we really want to
692 // measure is the time between the two). As long as this is less than
693 // the 1.6x increase we would see if the backoff state was not reset
694 // properly, the test is still proving that the backoff was reset.
695 constexpr int kWaitMs = kInitialBackOffMs + 100;
696 gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT");
697 EXPECT_TRUE(channel->WaitForConnected(
698 grpc_timeout_milliseconds_to_deadline(kWaitMs)));
699 const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
700 const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
701 gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
702 EXPECT_LT(waited_ms, kWaitMs);
705 TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
706 // Start servers and send one RPC per server.
707 const int kNumServers = 3;
708 StartServers(kNumServers);
709 auto response_generator = BuildResolverResponseGenerator();
710 auto channel = BuildChannel("pick_first", response_generator);
711 auto stub = BuildStub(channel);
713 std::vector<int> ports;
715 // Perform one RPC against the first server.
716 ports.emplace_back(servers_[0]->port_);
717 response_generator.SetNextResolution(ports);
718 gpr_log(GPR_INFO, "****** SET [0] *******");
719 CheckRpcSendOk(stub, DEBUG_LOCATION);
720 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
722 // An empty update will result in the channel going into TRANSIENT_FAILURE.
724 response_generator.SetNextResolution(ports);
725 gpr_log(GPR_INFO, "****** SET none *******");
726 grpc_connectivity_state channel_state;
728 channel_state = channel->GetState(true /* try to connect */);
729 } while (channel_state == GRPC_CHANNEL_READY);
730 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
731 servers_[0]->service_.ResetCounters();
733 // Next update introduces servers_[1], making the channel recover.
735 ports.emplace_back(servers_[1]->port_);
736 response_generator.SetNextResolution(ports);
737 gpr_log(GPR_INFO, "****** SET [1] *******");
738 WaitForServer(stub, 1, DEBUG_LOCATION);
739 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
741 // And again for servers_[2]
743 ports.emplace_back(servers_[2]->port_);
744 response_generator.SetNextResolution(ports);
745 gpr_log(GPR_INFO, "****** SET [2] *******");
746 WaitForServer(stub, 2, DEBUG_LOCATION);
747 EXPECT_EQ(servers_[0]->service_.request_count(), 0);
748 EXPECT_EQ(servers_[1]->service_.request_count(), 0);
750 // Check LB policy name for the channel.
751 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
754 TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
755 // Start servers and send one RPC per server.
756 const int kNumServers = 3;
757 StartServers(kNumServers);
758 auto response_generator = BuildResolverResponseGenerator();
759 auto channel = BuildChannel("pick_first", response_generator);
760 auto stub = BuildStub(channel);
762 std::vector<int> ports;
764 // Perform one RPC against the first server.
765 ports.emplace_back(servers_[0]->port_);
766 response_generator.SetNextResolution(ports);
767 gpr_log(GPR_INFO, "****** SET [0] *******");
768 CheckRpcSendOk(stub, DEBUG_LOCATION);
769 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
770 servers_[0]->service_.ResetCounters();
772 // Send and superset update
774 ports.emplace_back(servers_[1]->port_);
775 ports.emplace_back(servers_[0]->port_);
776 response_generator.SetNextResolution(ports);
777 gpr_log(GPR_INFO, "****** SET superset *******");
778 CheckRpcSendOk(stub, DEBUG_LOCATION);
779 // We stick to the previously connected server.
780 WaitForServer(stub, 0, DEBUG_LOCATION);
781 EXPECT_EQ(0, servers_[1]->service_.request_count());
783 // Check LB policy name for the channel.
784 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
787 TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) {
789 const int kNumServers = 1;
790 StartServers(kNumServers);
791 std::vector<int> ports = GetServersPorts();
792 // Create two channels that (by default) use the global subchannel pool.
793 auto response_generator1 = BuildResolverResponseGenerator();
794 auto channel1 = BuildChannel("pick_first", response_generator1);
795 auto stub1 = BuildStub(channel1);
796 response_generator1.SetNextResolution(ports);
797 auto response_generator2 = BuildResolverResponseGenerator();
798 auto channel2 = BuildChannel("pick_first", response_generator2);
799 auto stub2 = BuildStub(channel2);
800 response_generator2.SetNextResolution(ports);
801 WaitForServer(stub1, 0, DEBUG_LOCATION);
802 // Send one RPC on each channel.
803 CheckRpcSendOk(stub1, DEBUG_LOCATION);
804 CheckRpcSendOk(stub2, DEBUG_LOCATION);
805 // The server receives two requests.
806 EXPECT_EQ(2, servers_[0]->service_.request_count());
807 // The two requests are from the same client port, because the two channels
808 // share subchannels via the global subchannel pool.
809 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
812 TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) {
814 const int kNumServers = 1;
815 StartServers(kNumServers);
816 std::vector<int> ports = GetServersPorts();
817 // Create two channels that use local subchannel pool.
818 ChannelArguments args;
819 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
820 auto response_generator1 = BuildResolverResponseGenerator();
821 auto channel1 = BuildChannel("pick_first", response_generator1, args);
822 auto stub1 = BuildStub(channel1);
823 response_generator1.SetNextResolution(ports);
824 auto response_generator2 = BuildResolverResponseGenerator();
825 auto channel2 = BuildChannel("pick_first", response_generator2, args);
826 auto stub2 = BuildStub(channel2);
827 response_generator2.SetNextResolution(ports);
828 WaitForServer(stub1, 0, DEBUG_LOCATION);
829 // Send one RPC on each channel.
830 CheckRpcSendOk(stub1, DEBUG_LOCATION);
831 CheckRpcSendOk(stub2, DEBUG_LOCATION);
832 // The server receives two requests.
833 EXPECT_EQ(2, servers_[0]->service_.request_count());
834 // The two requests are from two client ports, because the two channels didn't
835 // share subchannels with each other.
836 EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
839 TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
840 const int kNumUpdates = 1000;
841 const int kNumServers = 3;
842 StartServers(kNumServers);
843 auto response_generator = BuildResolverResponseGenerator();
844 auto channel = BuildChannel("pick_first", response_generator);
845 auto stub = BuildStub(channel);
846 std::vector<int> ports = GetServersPorts();
847 for (size_t i = 0; i < kNumUpdates; ++i) {
848 std::shuffle(ports.begin(), ports.end(),
849 std::mt19937(std::random_device()()));
850 response_generator.SetNextResolution(ports);
851 // We should re-enter core at the end of the loop to give the resolution
852 // setting closure a chance to run.
853 if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
855 // Check LB policy name for the channel.
856 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
859 TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
860 // Prepare the ports for up servers and down servers.
861 const int kNumServers = 3;
862 const int kNumAliveServers = 1;
863 StartServers(kNumAliveServers);
864 std::vector<int> alive_ports, dead_ports;
865 for (size_t i = 0; i < kNumServers; ++i) {
866 if (i < kNumAliveServers) {
867 alive_ports.emplace_back(servers_[i]->port_);
869 dead_ports.emplace_back(grpc_pick_unused_port_or_die());
872 auto response_generator = BuildResolverResponseGenerator();
873 auto channel = BuildChannel("pick_first", response_generator);
874 auto stub = BuildStub(channel);
875 // The initial resolution only contains dead ports. There won't be any
876 // selected subchannel. Re-resolution will return the same result.
877 response_generator.SetNextResolution(dead_ports);
878 gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
879 for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
880 // Set a re-resolution result that contains reachable ports, so that the
881 // pick_first LB policy can recover soon.
882 response_generator.SetNextResolutionUponError(alive_ports);
883 gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
884 WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
885 CheckRpcSendOk(stub, DEBUG_LOCATION);
886 EXPECT_EQ(servers_[0]->service_.request_count(), 1);
887 // Check LB policy name for the channel.
888 EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
891 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
892 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
893 StartServers(1, ports);
894 auto response_generator = BuildResolverResponseGenerator();
895 auto channel = BuildChannel("pick_first", response_generator);
896 auto stub = BuildStub(channel);
897 response_generator.SetNextResolution(ports);
898 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
899 WaitForServer(stub, 0, DEBUG_LOCATION);
900 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
901 servers_[0]->Shutdown();
902 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
903 gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
904 StartServers(1, ports);
905 WaitForServer(stub, 0, DEBUG_LOCATION);
908 TEST_F(ClientLbEnd2endTest,
909 PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
910 std::vector<int> ports = {grpc_pick_unused_port_or_die(),
911 grpc_pick_unused_port_or_die()};
912 CreateServers(2, ports);
914 auto response_generator = BuildResolverResponseGenerator();
915 auto channel = BuildChannel("pick_first", response_generator);
916 auto stub = BuildStub(channel);
917 response_generator.SetNextResolution(ports);
918 gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
919 WaitForServer(stub, 1, DEBUG_LOCATION);
920 gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
921 servers_[1]->Shutdown();
922 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
923 gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
924 StartServers(2, ports);
925 WaitForServer(stub, 0, DEBUG_LOCATION);
928 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
929 std::vector<int> ports = {grpc_pick_unused_port_or_die()};
930 StartServers(1, ports);
931 auto response_generator = BuildResolverResponseGenerator();
932 auto channel_1 = BuildChannel("pick_first", response_generator);
933 auto stub_1 = BuildStub(channel_1);
934 response_generator.SetNextResolution(ports);
935 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
936 WaitForServer(stub_1, 0, DEBUG_LOCATION);
937 gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
938 servers_[0]->Shutdown();
939 // Channel 1 will receive a re-resolution containing the same server. It will
940 // create a new subchannel and hold a ref to it.
941 StartServers(1, ports);
942 gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
943 auto response_generator_2 = BuildResolverResponseGenerator();
944 auto channel_2 = BuildChannel("pick_first", response_generator_2);
945 auto stub_2 = BuildStub(channel_2);
946 response_generator_2.SetNextResolution(ports);
947 gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
948 WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
949 gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
950 servers_[0]->Shutdown();
951 // Wait until the disconnection has triggered the connectivity notification.
952 // Otherwise, the subchannel may be picked for next call but will fail soon.
953 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
954 // Channel 2 will also receive a re-resolution containing the same server.
955 // Both channels will ref the same subchannel that failed.
956 StartServers(1, ports);
957 gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
958 gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
959 // The first call after the server restart will succeed.
960 CheckRpcSendOk(stub_2, DEBUG_LOCATION);
961 gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
962 // Check LB policy name for the channel.
963 EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
964 // Check LB policy name for the channel.
965 EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
968 TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
969 // Start server, send RPC, and make sure channel is READY.
970 const int kNumServers = 1;
971 StartServers(kNumServers);
972 auto response_generator = BuildResolverResponseGenerator();
974 BuildChannel("", response_generator); // pick_first is the default.
975 auto stub = BuildStub(channel);
976 response_generator.SetNextResolution(GetServersPorts());
977 CheckRpcSendOk(stub, DEBUG_LOCATION);
978 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
979 // Stop server. Channel should go into state IDLE.
980 response_generator.SetFailureOnReresolution();
981 servers_[0]->Shutdown();
982 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
983 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
987 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
988 auto response_generator = BuildResolverResponseGenerator();
990 BuildChannel("", response_generator); // pick_first is the default.
991 auto stub = BuildStub(channel);
992 // Create a number of servers, but only start 1 of them.
995 // Initially resolve to first server and make sure it connects.
996 gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
997 response_generator.SetNextResolution({servers_[0]->port_});
998 CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
999 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1000 // Send a resolution update with the remaining servers, none of which are
1001 // running yet, so the update will stay pending. Note that it's important
1002 // to have multiple servers here, or else the test will be flaky; with only
1003 // one server, the pending subchannel list has already gone into
1004 // TRANSIENT_FAILURE due to hitting the end of the list by the time we
1007 "Phase 2: Resolver update pointing to remaining "
1008 "(not started) servers.");
1009 response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
1010 // RPCs will continue to be sent to the first server.
1011 CheckRpcSendOk(stub, DEBUG_LOCATION);
1012 // Now stop the first server, so that the current subchannel list
1013 // fails. This should cause us to immediately swap over to the
1014 // pending list, even though it's not yet connected. The state should
1015 // be set to CONNECTING, since that's what the pending subchannel list
1016 // was doing when we swapped over.
1017 gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
1018 servers_[0]->Shutdown();
1019 WaitForChannelNotReady(channel.get());
1020 // TODO(roth): This should always return CONNECTING, but it's flaky
1021 // between that and TRANSIENT_FAILURE. I suspect that this problem
1022 // will go away once we move the backoff code out of the subchannel
1023 // and into the LB policies.
1024 EXPECT_THAT(channel->GetState(false),
1025 ::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
1026 GRPC_CHANNEL_TRANSIENT_FAILURE));
1027 // Now start the second server.
1028 gpr_log(GPR_INFO, "Phase 4: Starting second server.");
1030 // The channel should go to READY state and RPCs should go to the
1032 WaitForChannelReady(channel.get());
1033 WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
1036 TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) {
1037 // Start server, send RPC, and make sure channel is READY.
1038 const int kNumServers = 1;
1039 StartServers(kNumServers);
1040 auto response_generator = BuildResolverResponseGenerator();
1042 BuildChannel("", response_generator); // pick_first is the default.
1043 auto stub = BuildStub(channel);
1044 response_generator.SetNextResolution(GetServersPorts());
1045 CheckRpcSendOk(stub, DEBUG_LOCATION);
1046 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1047 // Stop server. Channel should go into state IDLE.
1048 servers_[0]->Shutdown();
1049 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1050 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1051 // Now send resolver update that includes no addresses. Channel
1052 // should stay in state IDLE.
1053 response_generator.SetNextResolution({});
1054 EXPECT_FALSE(channel->WaitForStateChange(
1055 GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1056 // Now bring the backend back up and send a non-empty resolver update,
1057 // and then try to send an RPC. Channel should go back into state READY.
1059 response_generator.SetNextResolution(GetServersPorts());
1060 CheckRpcSendOk(stub, DEBUG_LOCATION);
1061 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1064 TEST_F(ClientLbEnd2endTest, RoundRobin) {
1065 // Start servers and send one RPC per server.
1066 const int kNumServers = 3;
1067 StartServers(kNumServers);
1068 auto response_generator = BuildResolverResponseGenerator();
1069 auto channel = BuildChannel("round_robin", response_generator);
1070 auto stub = BuildStub(channel);
1071 response_generator.SetNextResolution(GetServersPorts());
1072 // Wait until all backends are ready.
1074 CheckRpcSendOk(stub, DEBUG_LOCATION);
1075 } while (!SeenAllServers());
1077 // "Sync" to the end of the list. Next sequence of picks will start at the
1078 // first server (index 0).
1079 WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
1080 std::vector<int> connection_order;
1081 for (size_t i = 0; i < servers_.size(); ++i) {
1082 CheckRpcSendOk(stub, DEBUG_LOCATION);
1083 UpdateConnectionOrder(servers_, &connection_order);
1085 // Backends should be iterated over in the order in which the addresses were
1087 const auto expected = std::vector<int>{0, 1, 2};
1088 EXPECT_EQ(expected, connection_order);
1089 // Check LB policy name for the channel.
1090 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1093 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
1094 StartServers(1); // Single server
1095 auto response_generator = BuildResolverResponseGenerator();
1096 auto channel = BuildChannel("round_robin", response_generator);
1097 auto stub = BuildStub(channel);
1098 response_generator.SetNextResolution({servers_[0]->port_});
1099 WaitForServer(stub, 0, DEBUG_LOCATION);
1100 // Create a new channel and its corresponding RR LB policy, which will pick
1101 // the subchannels in READY state from the previous RPC against the same
1102 // target (even if it happened over a different channel, because subchannels
1103 // are globally reused). Progress should happen without any transition from
1104 // this READY state.
1105 auto second_response_generator = BuildResolverResponseGenerator();
1106 auto second_channel = BuildChannel("round_robin", second_response_generator);
1107 auto second_stub = BuildStub(second_channel);
1108 second_response_generator.SetNextResolution({servers_[0]->port_});
1109 CheckRpcSendOk(second_stub, DEBUG_LOCATION);
1112 TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
1113 // Start servers and send one RPC per server.
1114 const int kNumServers = 3;
1115 StartServers(kNumServers);
1116 auto response_generator = BuildResolverResponseGenerator();
1117 auto channel = BuildChannel("round_robin", response_generator);
1118 auto stub = BuildStub(channel);
1119 std::vector<int> ports;
1120 // Start with a single server.
1121 gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1122 ports.emplace_back(servers_[0]->port_);
1123 response_generator.SetNextResolution(ports);
1124 WaitForServer(stub, 0, DEBUG_LOCATION);
1125 // Send RPCs. They should all go servers_[0]
1126 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1127 EXPECT_EQ(10, servers_[0]->service_.request_count());
1128 EXPECT_EQ(0, servers_[1]->service_.request_count());
1129 EXPECT_EQ(0, servers_[2]->service_.request_count());
1130 servers_[0]->service_.ResetCounters();
1131 // And now for the second server.
1132 gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1134 ports.emplace_back(servers_[1]->port_);
1135 response_generator.SetNextResolution(ports);
1136 // Wait until update has been processed, as signaled by the second backend
1137 // receiving a request.
1138 EXPECT_EQ(0, servers_[1]->service_.request_count());
1139 WaitForServer(stub, 1, DEBUG_LOCATION);
1140 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1141 EXPECT_EQ(0, servers_[0]->service_.request_count());
1142 EXPECT_EQ(10, servers_[1]->service_.request_count());
1143 EXPECT_EQ(0, servers_[2]->service_.request_count());
1144 servers_[1]->service_.ResetCounters();
1145 // ... and for the last server.
1146 gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1148 ports.emplace_back(servers_[2]->port_);
1149 response_generator.SetNextResolution(ports);
1150 WaitForServer(stub, 2, DEBUG_LOCATION);
1151 for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1152 EXPECT_EQ(0, servers_[0]->service_.request_count());
1153 EXPECT_EQ(0, servers_[1]->service_.request_count());
1154 EXPECT_EQ(10, servers_[2]->service_.request_count());
1155 servers_[2]->service_.ResetCounters();
1156 // Back to all servers.
1157 gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1159 ports.emplace_back(servers_[0]->port_);
1160 ports.emplace_back(servers_[1]->port_);
1161 ports.emplace_back(servers_[2]->port_);
1162 response_generator.SetNextResolution(ports);
1163 WaitForServer(stub, 0, DEBUG_LOCATION);
1164 WaitForServer(stub, 1, DEBUG_LOCATION);
1165 WaitForServer(stub, 2, DEBUG_LOCATION);
1166 // Send three RPCs, one per server.
1167 for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
1168 EXPECT_EQ(1, servers_[0]->service_.request_count());
1169 EXPECT_EQ(1, servers_[1]->service_.request_count());
1170 EXPECT_EQ(1, servers_[2]->service_.request_count());
1171 // An empty update will result in the channel going into TRANSIENT_FAILURE.
1172 gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1174 response_generator.SetNextResolution(ports);
1175 grpc_connectivity_state channel_state;
1177 channel_state = channel->GetState(true /* try to connect */);
1178 } while (channel_state == GRPC_CHANNEL_READY);
1179 ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
1180 servers_[0]->service_.ResetCounters();
1181 // Next update introduces servers_[1], making the channel recover.
1182 gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1184 ports.emplace_back(servers_[1]->port_);
1185 response_generator.SetNextResolution(ports);
1186 WaitForServer(stub, 1, DEBUG_LOCATION);
1187 channel_state = channel->GetState(false /* try to connect */);
1188 ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
1189 // Check LB policy name for the channel.
1190 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1193 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
1194 const int kNumServers = 3;
1195 StartServers(kNumServers);
1196 auto response_generator = BuildResolverResponseGenerator();
1197 auto channel = BuildChannel("round_robin", response_generator);
1198 auto stub = BuildStub(channel);
1199 std::vector<int> ports;
1200 // Start with a single server.
1201 ports.emplace_back(servers_[0]->port_);
1202 response_generator.SetNextResolution(ports);
1203 WaitForServer(stub, 0, DEBUG_LOCATION);
1204 // Send RPCs. They should all go to servers_[0]
1205 for (size_t i = 0; i < 10; ++i) SendRpc(stub);
1206 EXPECT_EQ(10, servers_[0]->service_.request_count());
1207 EXPECT_EQ(0, servers_[1]->service_.request_count());
1208 EXPECT_EQ(0, servers_[2]->service_.request_count());
1209 servers_[0]->service_.ResetCounters();
1210 // Shutdown one of the servers to be sent in the update.
1211 servers_[1]->Shutdown();
1212 ports.emplace_back(servers_[1]->port_);
1213 ports.emplace_back(servers_[2]->port_);
1214 response_generator.SetNextResolution(ports);
1215 WaitForServer(stub, 0, DEBUG_LOCATION);
1216 WaitForServer(stub, 2, DEBUG_LOCATION);
1217 // Send three RPCs, one per server.
1218 for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
1219 // The server in shutdown shouldn't receive any.
1220 EXPECT_EQ(0, servers_[1]->service_.request_count());
1223 TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
1224 // Start servers and send one RPC per server.
1225 const int kNumServers = 3;
1226 StartServers(kNumServers);
1227 auto response_generator = BuildResolverResponseGenerator();
1228 auto channel = BuildChannel("round_robin", response_generator);
1229 auto stub = BuildStub(channel);
1230 std::vector<int> ports = GetServersPorts();
1231 for (size_t i = 0; i < 1000; ++i) {
1232 std::shuffle(ports.begin(), ports.end(),
1233 std::mt19937(std::random_device()()));
1234 response_generator.SetNextResolution(ports);
1235 if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
1237 // Check LB policy name for the channel.
1238 EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1241 TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
1242 // TODO(dgq): replicate the way internal testing exercises the concurrent
1243 // update provisions of RR.
1246 TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
1247 // Start servers and send one RPC per server.
1248 const int kNumServers = 3;
1249 std::vector<int> first_ports;
1250 std::vector<int> second_ports;
1251 first_ports.reserve(kNumServers);
1252 for (int i = 0; i < kNumServers; ++i) {
1253 first_ports.push_back(grpc_pick_unused_port_or_die());
1255 second_ports.reserve(kNumServers);
1256 for (int i = 0; i < kNumServers; ++i) {
1257 second_ports.push_back(grpc_pick_unused_port_or_die());
1259 StartServers(kNumServers, first_ports);
1260 auto response_generator = BuildResolverResponseGenerator();
1261 auto channel = BuildChannel("round_robin", response_generator);
1262 auto stub = BuildStub(channel);
1263 response_generator.SetNextResolution(first_ports);
1264 // Send a number of RPCs, which succeed.
1265 for (size_t i = 0; i < 100; ++i) {
1266 CheckRpcSendOk(stub, DEBUG_LOCATION);
1269 gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
1270 for (size_t i = 0; i < servers_.size(); ++i) {
1271 servers_[i]->Shutdown();
1273 gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
1274 gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
1275 // Client requests should fail. Send enough to tickle all subchannels.
1276 for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
1277 gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
1278 // Bring servers back up on a different set of ports. We need to do this to be
1279 // sure that the eventual success is *not* due to subchannel reconnection
1280 // attempts and that an actual re-resolution has happened as a result of the
1281 // RR policy going into transient failure when all its subchannels become
1282 // unavailable (in transient failure as well).
1283 gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
1284 StartServers(kNumServers, second_ports);
1285 // Don't notify of the update. Wait for the LB policy's re-resolution to
1286 // "pull" the new ports.
1287 response_generator.SetNextResolutionUponError(second_ports);
1288 gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
1289 gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
1290 // Client request should eventually (but still fairly soon) succeed.
1291 const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
1292 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1293 while (gpr_time_cmp(deadline, now) > 0) {
1294 if (SendRpc(stub)) break;
1295 now = gpr_now(GPR_CLOCK_MONOTONIC);
1297 ASSERT_GT(gpr_time_cmp(deadline, now), 0);
1300 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) {
1301 // Start servers and create channel. Channel should go to READY state.
1302 const int kNumServers = 3;
1303 StartServers(kNumServers);
1304 auto response_generator = BuildResolverResponseGenerator();
1305 auto channel = BuildChannel("round_robin", response_generator);
1306 auto stub = BuildStub(channel);
1307 response_generator.SetNextResolution(GetServersPorts());
1308 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1309 // Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
1310 // TODO(roth): This test should ideally check that even when the
1311 // subchannels are in state CONNECTING for an extended period of time,
1312 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1313 // currently have a good way to get a subchannel to report CONNECTING
1314 // for a long period of time, since the servers in this test framework
1315 // are on the loopback interface, which will immediately return a
1316 // "Connection refused" error, so the subchannels will only be in
1317 // CONNECTING state very briefly. When we have time, see if we can
1318 // find a way to fix this.
1319 for (size_t i = 0; i < servers_.size(); ++i) {
1320 servers_[i]->Shutdown();
1322 auto predicate = [](grpc_connectivity_state state) {
1323 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1325 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1328 TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) {
1329 // Create channel and return servers that don't exist. Channel should
1330 // quickly transition into TRANSIENT_FAILURE.
1331 // TODO(roth): This test should ideally check that even when the
1332 // subchannels are in state CONNECTING for an extended period of time,
1333 // we will still report TRANSIENT_FAILURE. Unfortunately, we don't
1334 // currently have a good way to get a subchannel to report CONNECTING
1335 // for a long period of time, since the servers in this test framework
1336 // are on the loopback interface, which will immediately return a
1337 // "Connection refused" error, so the subchannels will only be in
1338 // CONNECTING state very briefly. When we have time, see if we can
1339 // find a way to fix this.
1340 auto response_generator = BuildResolverResponseGenerator();
1341 auto channel = BuildChannel("round_robin", response_generator);
1342 auto stub = BuildStub(channel);
1343 response_generator.SetNextResolution({
1344 grpc_pick_unused_port_or_die(),
1345 grpc_pick_unused_port_or_die(),
1346 grpc_pick_unused_port_or_die(),
1348 for (size_t i = 0; i < servers_.size(); ++i) {
1349 servers_[i]->Shutdown();
1351 auto predicate = [](grpc_connectivity_state state) {
1352 return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1354 EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1357 TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
1358 const int kNumServers = 3;
1359 StartServers(kNumServers);
1360 const auto ports = GetServersPorts();
1361 auto response_generator = BuildResolverResponseGenerator();
1362 auto channel = BuildChannel("round_robin", response_generator);
1363 auto stub = BuildStub(channel);
1364 response_generator.SetNextResolution(ports);
1365 for (size_t i = 0; i < kNumServers; ++i) {
1366 WaitForServer(stub, i, DEBUG_LOCATION);
1368 for (size_t i = 0; i < servers_.size(); ++i) {
1369 CheckRpcSendOk(stub, DEBUG_LOCATION);
1370 EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1372 // One request should have gone to each server.
1373 for (size_t i = 0; i < servers_.size(); ++i) {
1374 EXPECT_EQ(1, servers_[i]->service_.request_count());
1376 const auto pre_death = servers_[0]->service_.request_count();
1377 // Kill the first server.
1378 servers_[0]->Shutdown();
1379 // Client request still succeed. May need retrying if RR had returned a pick
1380 // before noticing the change in the server's connectivity.
1381 while (!SendRpc(stub)) {
1382 } // Retry until success.
1383 // Send a bunch of RPCs that should succeed.
1384 for (int i = 0; i < 10 * kNumServers; ++i) {
1385 CheckRpcSendOk(stub, DEBUG_LOCATION);
1387 const auto post_death = servers_[0]->service_.request_count();
1388 // No requests have gone to the deceased server.
1389 EXPECT_EQ(pre_death, post_death);
1390 // Bring the first server back up.
1392 // Requests should start arriving at the first server either right away (if
1393 // the server managed to start before the RR policy retried the subchannel) or
1394 // after the subchannel retry delay otherwise (RR's subchannel retried before
1395 // the server was fully back up).
1396 WaitForServer(stub, 0, DEBUG_LOCATION);
1399 // If health checking is required by client but health checking service
1400 // is not running on the server, the channel should be treated as healthy.
1401 TEST_F(ClientLbEnd2endTest,
1402 RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) {
1403 StartServers(1); // Single server
1404 ChannelArguments args;
1405 args.SetServiceConfigJSON(
1406 "{\"healthCheckConfig\": "
1407 "{\"serviceName\": \"health_check_service_name\"}}");
1408 auto response_generator = BuildResolverResponseGenerator();
1409 auto channel = BuildChannel("round_robin", response_generator, args);
1410 auto stub = BuildStub(channel);
1411 response_generator.SetNextResolution({servers_[0]->port_});
1412 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1413 CheckRpcSendOk(stub, DEBUG_LOCATION);
1416 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) {
1417 EnableDefaultHealthCheckService(true);
1419 const int kNumServers = 3;
1420 StartServers(kNumServers);
1421 ChannelArguments args;
1422 args.SetServiceConfigJSON(
1423 "{\"healthCheckConfig\": "
1424 "{\"serviceName\": \"health_check_service_name\"}}");
1425 auto response_generator = BuildResolverResponseGenerator();
1426 auto channel = BuildChannel("round_robin", response_generator, args);
1427 auto stub = BuildStub(channel);
1428 response_generator.SetNextResolution(GetServersPorts());
1429 // Channel should not become READY, because health checks should be failing.
1431 "*** initial state: unknown health check service name for "
1433 EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1434 // Now set one of the servers to be healthy.
1435 // The channel should become healthy and all requests should go to
1436 // the healthy server.
1437 gpr_log(GPR_INFO, "*** server 0 healthy");
1438 servers_[0]->SetServingStatus("health_check_service_name", true);
1439 EXPECT_TRUE(WaitForChannelReady(channel.get()));
1440 for (int i = 0; i < 10; ++i) {
1441 CheckRpcSendOk(stub, DEBUG_LOCATION);
1443 EXPECT_EQ(10, servers_[0]->service_.request_count());
1444 EXPECT_EQ(0, servers_[1]->service_.request_count());
1445 EXPECT_EQ(0, servers_[2]->service_.request_count());
1446 // Now set a second server to be healthy.
1447 gpr_log(GPR_INFO, "*** server 2 healthy");
1448 servers_[2]->SetServingStatus("health_check_service_name", true);
1449 WaitForServer(stub, 2, DEBUG_LOCATION);
1450 for (int i = 0; i < 10; ++i) {
1451 CheckRpcSendOk(stub, DEBUG_LOCATION);
1453 EXPECT_EQ(5, servers_[0]->service_.request_count());
1454 EXPECT_EQ(0, servers_[1]->service_.request_count());
1455 EXPECT_EQ(5, servers_[2]->service_.request_count());
1456 // Now set the remaining server to be healthy.
1457 gpr_log(GPR_INFO, "*** server 1 healthy");
1458 servers_[1]->SetServingStatus("health_check_service_name", true);
1459 WaitForServer(stub, 1, DEBUG_LOCATION);
1460 for (int i = 0; i < 9; ++i) {
1461 CheckRpcSendOk(stub, DEBUG_LOCATION);
1463 EXPECT_EQ(3, servers_[0]->service_.request_count());
1464 EXPECT_EQ(3, servers_[1]->service_.request_count());
1465 EXPECT_EQ(3, servers_[2]->service_.request_count());
1466 // Now set one server to be unhealthy again. Then wait until the
1467 // unhealthiness has hit the client. We know that the client will see
1468 // this when we send kNumServers requests and one of the remaining servers
1469 // sees two of the requests.
1470 gpr_log(GPR_INFO, "*** server 0 unhealthy");
1471 servers_[0]->SetServingStatus("health_check_service_name", false);
1474 for (int i = 0; i < kNumServers; ++i) {
1475 CheckRpcSendOk(stub, DEBUG_LOCATION);
1477 } while (servers_[1]->service_.request_count() != 2 &&
1478 servers_[2]->service_.request_count() != 2);
1479 // Now set the remaining two servers to be unhealthy. Make sure the
1480 // channel leaves READY state and that RPCs fail.
1481 gpr_log(GPR_INFO, "*** all servers unhealthy");
1482 servers_[1]->SetServingStatus("health_check_service_name", false);
1483 servers_[2]->SetServingStatus("health_check_service_name", false);
1484 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1485 CheckRpcSendFailure(stub);
1487 EnableDefaultHealthCheckService(false);
1490 TEST_F(ClientLbEnd2endTest,
1491 RoundRobinWithHealthCheckingHandlesSubchannelFailure) {
1492 EnableDefaultHealthCheckService(true);
1494 const int kNumServers = 3;
1495 StartServers(kNumServers);
1496 servers_[0]->SetServingStatus("health_check_service_name", true);
1497 servers_[1]->SetServingStatus("health_check_service_name", true);
1498 servers_[2]->SetServingStatus("health_check_service_name", true);
1499 ChannelArguments args;
1500 args.SetServiceConfigJSON(
1501 "{\"healthCheckConfig\": "
1502 "{\"serviceName\": \"health_check_service_name\"}}");
1503 auto response_generator = BuildResolverResponseGenerator();
1504 auto channel = BuildChannel("round_robin", response_generator, args);
1505 auto stub = BuildStub(channel);
1506 response_generator.SetNextResolution(GetServersPorts());
1507 WaitForServer(stub, 0, DEBUG_LOCATION);
1508 // Stop server 0 and send a new resolver result to ensure that RR
1509 // checks each subchannel's state.
1510 servers_[0]->Shutdown();
1511 response_generator.SetNextResolution(GetServersPorts());
1512 // Send a bunch more RPCs.
1513 for (size_t i = 0; i < 100; i++) {
1518 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
1519 EnableDefaultHealthCheckService(true);
1521 const int kNumServers = 1;
1522 StartServers(kNumServers);
1523 // Create a channel with health-checking enabled.
1524 ChannelArguments args;
1525 args.SetServiceConfigJSON(
1526 "{\"healthCheckConfig\": "
1527 "{\"serviceName\": \"health_check_service_name\"}}");
1528 auto response_generator1 = BuildResolverResponseGenerator();
1529 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1530 auto stub1 = BuildStub(channel1);
1531 std::vector<int> ports = GetServersPorts();
1532 response_generator1.SetNextResolution(ports);
1533 // Create a channel with health checking enabled but inhibited.
1534 args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
1535 auto response_generator2 = BuildResolverResponseGenerator();
1536 auto channel2 = BuildChannel("round_robin", response_generator2, args);
1537 auto stub2 = BuildStub(channel2);
1538 response_generator2.SetNextResolution(ports);
1539 // First channel should not become READY, because health checks should be
1541 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1542 CheckRpcSendFailure(stub1);
1543 // Second channel should be READY.
1544 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1545 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1546 // Enable health checks on the backend and wait for channel 1 to succeed.
1547 servers_[0]->SetServingStatus("health_check_service_name", true);
1548 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1549 // Check that we created only one subchannel to the backend.
1550 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1552 EnableDefaultHealthCheckService(false);
1555 TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
1556 EnableDefaultHealthCheckService(true);
1558 const int kNumServers = 1;
1559 StartServers(kNumServers);
1560 // Create a channel with health-checking enabled.
1561 ChannelArguments args;
1562 args.SetServiceConfigJSON(
1563 "{\"healthCheckConfig\": "
1564 "{\"serviceName\": \"health_check_service_name\"}}");
1565 auto response_generator1 = BuildResolverResponseGenerator();
1566 auto channel1 = BuildChannel("round_robin", response_generator1, args);
1567 auto stub1 = BuildStub(channel1);
1568 std::vector<int> ports = GetServersPorts();
1569 response_generator1.SetNextResolution(ports);
1570 // Create a channel with health-checking enabled with a different
1572 ChannelArguments args2;
1573 args2.SetServiceConfigJSON(
1574 "{\"healthCheckConfig\": "
1575 "{\"serviceName\": \"health_check_service_name2\"}}");
1576 auto response_generator2 = BuildResolverResponseGenerator();
1577 auto channel2 = BuildChannel("round_robin", response_generator2, args2);
1578 auto stub2 = BuildStub(channel2);
1579 response_generator2.SetNextResolution(ports);
1580 // Allow health checks from channel 2 to succeed.
1581 servers_[0]->SetServingStatus("health_check_service_name2", true);
1582 // First channel should not become READY, because health checks should be
1584 EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1585 CheckRpcSendFailure(stub1);
1586 // Second channel should be READY.
1587 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1588 CheckRpcSendOk(stub2, DEBUG_LOCATION);
1589 // Enable health checks for channel 1 and wait for it to succeed.
1590 servers_[0]->SetServingStatus("health_check_service_name", true);
1591 CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
1592 // Check that we created only one subchannel to the backend.
1593 EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1595 EnableDefaultHealthCheckService(false);
1598 TEST_F(ClientLbEnd2endTest,
1599 RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) {
1600 EnableDefaultHealthCheckService(true);
1602 const int kNumServers = 1;
1603 StartServers(kNumServers);
1604 // Create a channel with health-checking enabled.
1605 const char* kServiceConfigJson =
1606 "{\"healthCheckConfig\": "
1607 "{\"serviceName\": \"health_check_service_name\"}}";
1608 auto response_generator = BuildResolverResponseGenerator();
1609 auto channel = BuildChannel("round_robin", response_generator);
1610 auto stub = BuildStub(channel);
1611 std::vector<int> ports = GetServersPorts();
1612 response_generator.SetNextResolution(ports, kServiceConfigJson);
1613 servers_[0]->SetServingStatus("health_check_service_name", true);
1614 EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
1615 // Send an update on the channel to change it to use a health checking
1616 // service name that is not being reported as healthy.
1617 const char* kServiceConfigJson2 =
1618 "{\"healthCheckConfig\": "
1619 "{\"serviceName\": \"health_check_service_name2\"}}";
1620 response_generator.SetNextResolution(ports, kServiceConfigJson2);
1621 EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1623 EnableDefaultHealthCheckService(false);
1626 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
1628 const int kNumServers = 1;
1629 StartServers(kNumServers);
1630 // Set max idle time and build the channel.
1631 ChannelArguments args;
1632 args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
1633 auto response_generator = BuildResolverResponseGenerator();
1634 auto channel = BuildChannel("", response_generator, args);
1635 auto stub = BuildStub(channel);
1636 // The initial channel state should be IDLE.
1637 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1638 // After sending RPC, channel state should be READY.
1639 response_generator.SetNextResolution(GetServersPorts());
1640 CheckRpcSendOk(stub, DEBUG_LOCATION);
1641 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1642 // After a period time not using the channel, the channel state should switch
1644 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
1645 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1646 // Sending a new RPC should awake the IDLE channel.
1647 response_generator.SetNextResolution(GetServersPorts());
1648 CheckRpcSendOk(stub, DEBUG_LOCATION);
1649 EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1652 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
1654 void SetUp() override {
1655 ClientLbEnd2endTest::SetUp();
1656 current_test_instance_ = this;
1659 static void SetUpTestCase() {
1661 grpc_core::RegisterTestPickArgsLoadBalancingPolicy(SavePickArgs);
1664 static void TearDownTestCase() { grpc_shutdown(); }
1666 const std::vector<grpc_core::PickArgsSeen>& args_seen_list() {
1667 grpc::internal::MutexLock lock(&mu_);
1668 return args_seen_list_;
1672 static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
1673 ClientLbPickArgsTest* self = current_test_instance_;
1674 grpc::internal::MutexLock lock(&self->mu_);
1675 self->args_seen_list_.emplace_back(args_seen);
1678 static ClientLbPickArgsTest* current_test_instance_;
1679 grpc::internal::Mutex mu_;
1680 std::vector<grpc_core::PickArgsSeen> args_seen_list_;
1683 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
1685 TEST_F(ClientLbPickArgsTest, Basic) {
1686 const int kNumServers = 1;
1687 StartServers(kNumServers);
1688 auto response_generator = BuildResolverResponseGenerator();
1689 auto channel = BuildChannel("test_pick_args_lb", response_generator);
1690 auto stub = BuildStub(channel);
1691 response_generator.SetNextResolution(GetServersPorts());
1692 CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/true);
1693 // Check LB policy name for the channel.
1694 EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
1695 // There will be two entries, one for the pick tried in state
1696 // CONNECTING and another for the pick tried in state READY.
1697 EXPECT_THAT(args_seen_list(),
1698 ::testing::ElementsAre(
1700 ::testing::Field(&grpc_core::PickArgsSeen::path,
1701 "/grpc.testing.EchoTestService/Echo"),
1702 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
1703 ::testing::UnorderedElementsAre(
1704 ::testing::Pair("foo", "1"),
1705 ::testing::Pair("bar", "2"),
1706 ::testing::Pair("baz", "3")))),
1708 ::testing::Field(&grpc_core::PickArgsSeen::path,
1709 "/grpc.testing.EchoTestService/Echo"),
1710 ::testing::Field(&grpc_core::PickArgsSeen::metadata,
1711 ::testing::UnorderedElementsAre(
1712 ::testing::Pair("foo", "1"),
1713 ::testing::Pair("bar", "2"),
1714 ::testing::Pair("baz", "3"))))));
1717 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
1719 void SetUp() override {
1720 ClientLbEnd2endTest::SetUp();
1721 current_test_instance_ = this;
1724 static void SetUpTestCase() {
1726 grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
1727 ReportTrailerIntercepted);
1730 static void TearDownTestCase() { grpc_shutdown(); }
1732 int trailers_intercepted() {
1733 grpc::internal::MutexLock lock(&mu_);
1734 return trailers_intercepted_;
1737 const grpc_core::MetadataVector& trailing_metadata() {
1738 grpc::internal::MutexLock lock(&mu_);
1739 return trailing_metadata_;
1742 const udpa::data::orca::v1::OrcaLoadReport* backend_load_report() {
1743 grpc::internal::MutexLock lock(&mu_);
1744 return load_report_.get();
1748 static void ReportTrailerIntercepted(
1749 const grpc_core::TrailingMetadataArgsSeen& args_seen) {
1750 const auto* backend_metric_data = args_seen.backend_metric_data;
1751 ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
1752 grpc::internal::MutexLock lock(&self->mu_);
1753 self->trailers_intercepted_++;
1754 self->trailing_metadata_ = args_seen.metadata;
1755 if (backend_metric_data != nullptr) {
1756 self->load_report_ =
1757 absl::make_unique<udpa::data::orca::v1::OrcaLoadReport>();
1758 self->load_report_->set_cpu_utilization(
1759 backend_metric_data->cpu_utilization);
1760 self->load_report_->set_mem_utilization(
1761 backend_metric_data->mem_utilization);
1762 self->load_report_->set_rps(backend_metric_data->requests_per_second);
1763 for (const auto& p : backend_metric_data->request_cost) {
1764 std::string name = std::string(p.first);
1765 (*self->load_report_->mutable_request_cost())[name] = p.second;
1767 for (const auto& p : backend_metric_data->utilization) {
1768 std::string name = std::string(p.first);
1769 (*self->load_report_->mutable_utilization())[name] = p.second;
1774 static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
1775 grpc::internal::Mutex mu_;
1776 int trailers_intercepted_ = 0;
1777 grpc_core::MetadataVector trailing_metadata_;
1778 std::unique_ptr<udpa::data::orca::v1::OrcaLoadReport> load_report_;
1781 ClientLbInterceptTrailingMetadataTest*
1782 ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr;
1784 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
1785 const int kNumServers = 1;
1786 const int kNumRpcs = 10;
1787 StartServers(kNumServers);
1788 auto response_generator = BuildResolverResponseGenerator();
1790 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1791 auto stub = BuildStub(channel);
1792 response_generator.SetNextResolution(GetServersPorts());
1793 for (size_t i = 0; i < kNumRpcs; ++i) {
1794 CheckRpcSendOk(stub, DEBUG_LOCATION);
1796 // Check LB policy name for the channel.
1797 EXPECT_EQ("intercept_trailing_metadata_lb",
1798 channel->GetLoadBalancingPolicyName());
1799 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1800 EXPECT_THAT(trailing_metadata(),
1801 ::testing::UnorderedElementsAre(
1802 // TODO(roth): Should grpc-status be visible here?
1803 ::testing::Pair("grpc-status", "0"),
1804 ::testing::Pair("user-agent", ::testing::_),
1805 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
1806 ::testing::Pair("baz", "3")));
1807 EXPECT_EQ(nullptr, backend_load_report());
1810 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
1811 const int kNumServers = 1;
1812 const int kNumRpcs = 10;
1813 StartServers(kNumServers);
1814 ChannelArguments args;
1815 args.SetServiceConfigJSON(
1817 " \"methodConfig\": [ {\n"
1819 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
1821 " \"retryPolicy\": {\n"
1822 " \"maxAttempts\": 3,\n"
1823 " \"initialBackoff\": \"1s\",\n"
1824 " \"maxBackoff\": \"120s\",\n"
1825 " \"backoffMultiplier\": 1.6,\n"
1826 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
1830 auto response_generator = BuildResolverResponseGenerator();
1832 BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
1833 auto stub = BuildStub(channel);
1834 response_generator.SetNextResolution(GetServersPorts());
1835 for (size_t i = 0; i < kNumRpcs; ++i) {
1836 CheckRpcSendOk(stub, DEBUG_LOCATION);
1838 // Check LB policy name for the channel.
1839 EXPECT_EQ("intercept_trailing_metadata_lb",
1840 channel->GetLoadBalancingPolicyName());
1841 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1842 EXPECT_THAT(trailing_metadata(),
1843 ::testing::UnorderedElementsAre(
1844 // TODO(roth): Should grpc-status be visible here?
1845 ::testing::Pair("grpc-status", "0"),
1846 ::testing::Pair("user-agent", ::testing::_),
1847 ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
1848 ::testing::Pair("baz", "3")));
1849 EXPECT_EQ(nullptr, backend_load_report());
1852 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
1853 const int kNumServers = 1;
1854 const int kNumRpcs = 10;
1855 StartServers(kNumServers);
1856 udpa::data::orca::v1::OrcaLoadReport load_report;
1857 load_report.set_cpu_utilization(0.5);
1858 load_report.set_mem_utilization(0.75);
1859 load_report.set_rps(25);
1860 auto* request_cost = load_report.mutable_request_cost();
1861 (*request_cost)["foo"] = 0.8;
1862 (*request_cost)["bar"] = 1.4;
1863 auto* utilization = load_report.mutable_utilization();
1864 (*utilization)["baz"] = 1.1;
1865 (*utilization)["quux"] = 0.9;
1866 for (const auto& server : servers_) {
1867 server->service_.set_load_report(&load_report);
1869 auto response_generator = BuildResolverResponseGenerator();
1871 BuildChannel("intercept_trailing_metadata_lb", response_generator);
1872 auto stub = BuildStub(channel);
1873 response_generator.SetNextResolution(GetServersPorts());
1874 for (size_t i = 0; i < kNumRpcs; ++i) {
1875 CheckRpcSendOk(stub, DEBUG_LOCATION);
1876 auto* actual = backend_load_report();
1877 ASSERT_NE(actual, nullptr);
1878 // TODO(roth): Change this to use EqualsProto() once that becomes
1879 // available in OSS.
1880 EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
1881 EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
1882 EXPECT_EQ(actual->rps(), load_report.rps());
1883 EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
1884 for (const auto& p : actual->request_cost()) {
1885 auto it = load_report.request_cost().find(p.first);
1886 ASSERT_NE(it, load_report.request_cost().end());
1887 EXPECT_EQ(it->second, p.second);
1889 EXPECT_EQ(actual->utilization().size(), load_report.utilization().size());
1890 for (const auto& p : actual->utilization()) {
1891 auto it = load_report.utilization().find(p.first);
1892 ASSERT_NE(it, load_report.utilization().end());
1893 EXPECT_EQ(it->second, p.second);
1896 // Check LB policy name for the channel.
1897 EXPECT_EQ("intercept_trailing_metadata_lb",
1898 channel->GetLoadBalancingPolicyName());
1899 EXPECT_EQ(kNumRpcs, trailers_intercepted());
1902 class ClientLbAddressTest : public ClientLbEnd2endTest {
1904 static const char* kAttributeKey;
1906 class Attribute : public grpc_core::ServerAddress::AttributeInterface {
1908 explicit Attribute(const std::string& str) : str_(str) {}
1910 std::unique_ptr<AttributeInterface> Copy() const override {
1911 return absl::make_unique<Attribute>(str_);
1914 int Cmp(const AttributeInterface* other) const override {
1915 return str_.compare(static_cast<const Attribute*>(other)->str_);
1918 std::string ToString() const override { return str_; }
1924 void SetUp() override {
1925 ClientLbEnd2endTest::SetUp();
1926 current_test_instance_ = this;
1929 static void SetUpTestCase() {
1931 grpc_core::RegisterAddressTestLoadBalancingPolicy(SaveAddress);
1934 static void TearDownTestCase() { grpc_shutdown(); }
1936 const std::vector<std::string>& addresses_seen() {
1937 grpc::internal::MutexLock lock(&mu_);
1938 return addresses_seen_;
1942 static void SaveAddress(const grpc_core::ServerAddress& address) {
1943 ClientLbAddressTest* self = current_test_instance_;
1944 grpc::internal::MutexLock lock(&self->mu_);
1945 self->addresses_seen_.emplace_back(address.ToString());
1948 static ClientLbAddressTest* current_test_instance_;
1949 grpc::internal::Mutex mu_;
1950 std::vector<std::string> addresses_seen_;
1953 const char* ClientLbAddressTest::kAttributeKey = "attribute_key";
1955 ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
1957 TEST_F(ClientLbAddressTest, Basic) {
1958 const int kNumServers = 1;
1959 StartServers(kNumServers);
1960 auto response_generator = BuildResolverResponseGenerator();
1961 auto channel = BuildChannel("address_test_lb", response_generator);
1962 auto stub = BuildStub(channel);
1963 // Addresses returned by the resolver will have attached attributes.
1964 response_generator.SetNextResolution(GetServersPorts(), nullptr,
1966 absl::make_unique<Attribute>("foo"));
1967 CheckRpcSendOk(stub, DEBUG_LOCATION);
1968 // Check LB policy name for the channel.
1969 EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
1970 // Make sure that the attributes wind up on the subchannels.
1971 std::vector<std::string> expected;
1972 for (const int port : GetServersPorts()) {
1973 expected.emplace_back(absl::StrCat(
1974 "127.0.0.1:", port, " args={} attributes={", kAttributeKey, "=foo}"));
1976 EXPECT_EQ(addresses_seen(), expected);
1980 } // namespace testing
1983 int main(int argc, char** argv) {
1984 ::testing::InitGoogleTest(&argc, argv);
1985 grpc::testing::TestEnvironment env(argc, argv);
1986 const auto result = RUN_ALL_TESTS();