3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include <gmock/gmock.h>
28 #include <gtest/gtest.h>
30 #include "absl/memory/memory.h"
31 #include "absl/strings/str_cat.h"
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/atm.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/time.h>
38 #include <grpcpp/channel.h>
39 #include <grpcpp/client_context.h>
40 #include <grpcpp/create_channel.h>
41 #include <grpcpp/health_check_service_interface.h>
42 #include <grpcpp/impl/codegen/sync.h>
43 #include <grpcpp/server.h>
44 #include <grpcpp/server_builder.h>
45 #include <grpcpp/support/validate_service_config.h>
47 #include "src/core/ext/filters/client_channel/backup_poller.h"
48 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
49 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
50 #include "src/core/ext/filters/client_channel/server_address.h"
51 #include "src/core/lib/address_utils/parse_address.h"
52 #include "src/core/lib/backoff/backoff.h"
53 #include "src/core/lib/channel/channel_args.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/ref_counted_ptr.h"
56 #include "src/core/lib/iomgr/tcp_client.h"
57 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
58 #include "src/cpp/client/secure_credentials.h"
59 #include "src/cpp/server/secure_server_credentials.h"
60 #include "src/proto/grpc/testing/echo.grpc.pb.h"
61 #include "test/core/util/port.h"
62 #include "test/core/util/resolve_localhost_ip46.h"
63 #include "test/core/util/test_config.h"
64 #include "test/cpp/end2end/test_service_impl.h"
66 using grpc::testing::EchoRequest;
67 using grpc::testing::EchoResponse;
73 // Subclass of TestServiceImpl that increments a request counter for
74 // every call to the Echo RPC.
75 class MyTestServiceImpl : public TestServiceImpl {
77 MyTestServiceImpl() : request_count_(0) {}
79 Status Echo(ServerContext* context, const EchoRequest* request,
80 EchoResponse* response) override {
82 grpc::internal::MutexLock lock(&mu_);
85 AddClient(context->peer());
86 return TestServiceImpl::Echo(context, request, response);
90 grpc::internal::MutexLock lock(&mu_);
91 return request_count_;
94 void ResetCounters() {
95 grpc::internal::MutexLock lock(&mu_);
99 std::set<std::string> clients() {
100 grpc::internal::MutexLock lock(&clients_mu_);
105 void AddClient(const std::string& client) {
106 grpc::internal::MutexLock lock(&clients_mu_);
107 clients_.insert(client);
110 grpc::internal::Mutex mu_;
112 grpc::internal::Mutex clients_mu_;
113 std::set<std::string> clients_;
116 class ServiceConfigEnd2endTest : public ::testing::Test {
118 ServiceConfigEnd2endTest()
119 : server_host_("localhost"),
120 kRequestMessage_("Live long and prosper."),
121 creds_(new SecureChannelCredentials(
122 grpc_fake_transport_security_credentials_create())) {}
124 static void SetUpTestCase() {
125 // Make the backup poller poll very frequently in order to pick up
126 // updates from all the subchannels's FDs.
127 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
130 void SetUp() override {
132 response_generator_ =
133 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
134 bool localhost_resolves_to_ipv4 = false;
135 bool localhost_resolves_to_ipv6 = false;
136 grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
137 &localhost_resolves_to_ipv6);
138 ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
141 void TearDown() override {
142 for (size_t i = 0; i < servers_.size(); ++i) {
143 servers_[i]->Shutdown();
145 // Explicitly destroy all the members so that we can make sure grpc_shutdown
146 // has finished by the end of this function, and thus all the registered
147 // LB policy factories are removed.
154 void CreateServers(size_t num_servers,
155 std::vector<int> ports = std::vector<int>()) {
157 for (size_t i = 0; i < num_servers; ++i) {
159 if (ports.size() == num_servers) port = ports[i];
160 servers_.emplace_back(new ServerData(port));
164 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
166 void StartServers(size_t num_servers,
167 std::vector<int> ports = std::vector<int>()) {
168 CreateServers(num_servers, std::move(ports));
169 for (size_t i = 0; i < num_servers; ++i) {
174 grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
175 grpc_core::Resolver::Result result;
176 for (const int& port : ports) {
177 std::string lb_uri_str =
178 absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port);
179 absl::StatusOr<grpc_core::URI> lb_uri = grpc_core::URI::Parse(lb_uri_str);
180 GPR_ASSERT(lb_uri.ok());
181 grpc_resolved_address address;
182 GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
183 result.addresses.emplace_back(address.addr, address.len,
189 void SetNextResolutionNoServiceConfig(const std::vector<int>& ports) {
190 grpc_core::ExecCtx exec_ctx;
191 grpc_core::Resolver::Result result = BuildFakeResults(ports);
192 response_generator_->SetResponse(result);
195 void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
196 grpc_core::ExecCtx exec_ctx;
197 grpc_core::Resolver::Result result = BuildFakeResults(ports);
198 result.service_config = grpc_core::ServiceConfig::Create(
199 nullptr, "{}", &result.service_config_error);
200 response_generator_->SetResponse(result);
203 void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
204 grpc_core::ExecCtx exec_ctx;
205 grpc_core::Resolver::Result result = BuildFakeResults(ports);
206 result.service_config = grpc_core::ServiceConfig::Create(
207 nullptr, "{", &result.service_config_error);
208 response_generator_->SetResponse(result);
211 void SetNextResolutionWithServiceConfig(const std::vector<int>& ports,
212 const char* svc_cfg) {
213 grpc_core::ExecCtx exec_ctx;
214 grpc_core::Resolver::Result result = BuildFakeResults(ports);
215 result.service_config = grpc_core::ServiceConfig::Create(
216 nullptr, svc_cfg, &result.service_config_error);
217 response_generator_->SetResponse(result);
220 std::vector<int> GetServersPorts(size_t start_index = 0) {
221 std::vector<int> ports;
222 for (size_t i = start_index; i < servers_.size(); ++i) {
223 ports.push_back(servers_[i]->port_);
228 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
229 const std::shared_ptr<Channel>& channel) {
230 return grpc::testing::EchoTestService::NewStub(channel);
233 std::shared_ptr<Channel> BuildChannel() {
234 ChannelArguments args;
235 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
236 response_generator_.get());
237 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
240 std::shared_ptr<Channel> BuildChannelWithDefaultServiceConfig() {
241 ChannelArguments args;
242 EXPECT_THAT(grpc::experimental::ValidateServiceConfigJSON(
243 ValidDefaultServiceConfig()),
244 ::testing::StrEq(""));
245 args.SetServiceConfigJSON(ValidDefaultServiceConfig());
246 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
247 response_generator_.get());
248 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
251 std::shared_ptr<Channel> BuildChannelWithInvalidDefaultServiceConfig() {
252 ChannelArguments args;
253 EXPECT_THAT(grpc::experimental::ValidateServiceConfigJSON(
254 InvalidDefaultServiceConfig()),
255 ::testing::HasSubstr("JSON parse error"));
256 args.SetServiceConfigJSON(InvalidDefaultServiceConfig());
257 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
258 response_generator_.get());
259 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
263 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
264 EchoResponse* response = nullptr, int timeout_ms = 1000,
265 Status* result = nullptr, bool wait_for_ready = false) {
266 const bool local_response = (response == nullptr);
267 if (local_response) response = new EchoResponse;
269 request.set_message(kRequestMessage_);
270 ClientContext context;
271 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
272 if (wait_for_ready) context.set_wait_for_ready(true);
273 Status status = stub->Echo(&context, request, response);
274 if (result != nullptr) *result = status;
275 if (local_response) delete response;
280 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
281 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
282 EchoResponse response;
285 SendRpc(stub, &response, 2000, &status, wait_for_ready);
286 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
288 << "Error: " << status.error_message() << " "
289 << status.error_details();
290 ASSERT_EQ(response.message(), kRequestMessage_)
291 << "From " << location.file() << ":" << location.line();
292 if (!success) abort();
295 void CheckRpcSendFailure(
296 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
297 const bool success = SendRpc(stub);
298 EXPECT_FALSE(success);
303 std::unique_ptr<Server> server_;
304 MyTestServiceImpl service_;
305 std::unique_ptr<std::thread> thread_;
307 grpc::internal::Mutex mu_;
308 grpc::internal::CondVar cond_;
309 bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
310 bool started_ ABSL_GUARDED_BY(mu_) = false;
312 explicit ServerData(int port = 0)
313 : port_(port > 0 ? port : grpc_pick_unused_port_or_die()) {}
315 void Start(const std::string& server_host) {
316 gpr_log(GPR_INFO, "starting server on port %d", port_);
317 grpc::internal::MutexLock lock(&mu_);
319 thread_ = absl::make_unique<std::thread>(
320 std::bind(&ServerData::Serve, this, server_host));
321 while (!server_ready_) {
324 server_ready_ = false;
325 gpr_log(GPR_INFO, "server startup complete");
328 void Serve(const std::string& server_host) {
329 std::ostringstream server_address;
330 server_address << server_host << ":" << port_;
331 ServerBuilder builder;
332 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
333 grpc_fake_transport_security_server_credentials_create()));
334 builder.AddListeningPort(server_address.str(), std::move(creds));
335 builder.RegisterService(&service_);
336 server_ = builder.BuildAndStart();
337 grpc::internal::MutexLock lock(&mu_);
338 server_ready_ = true;
343 grpc::internal::MutexLock lock(&mu_);
344 if (!started_) return;
345 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
350 void SetServingStatus(const std::string& service, bool serving) {
351 server_->GetHealthCheckService()->SetServingStatus(service, serving);
355 void ResetCounters() {
356 for (const auto& server : servers_) server->service_.ResetCounters();
360 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
361 size_t server_idx, const grpc_core::DebugLocation& location,
362 bool ignore_failure = false) {
364 if (ignore_failure) {
367 CheckRpcSendOk(stub, location, true);
369 } while (servers_[server_idx]->service_.request_count() == 0);
373 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
374 const gpr_timespec deadline =
375 grpc_timeout_seconds_to_deadline(timeout_seconds);
376 grpc_connectivity_state state;
377 while ((state = channel->GetState(false /* try_to_connect */)) ==
378 GRPC_CHANNEL_READY) {
379 if (!channel->WaitForStateChange(state, deadline)) return false;
384 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
385 const gpr_timespec deadline =
386 grpc_timeout_seconds_to_deadline(timeout_seconds);
387 grpc_connectivity_state state;
388 while ((state = channel->GetState(true /* try_to_connect */)) !=
389 GRPC_CHANNEL_READY) {
390 if (!channel->WaitForStateChange(state, deadline)) return false;
395 bool SeenAllServers() {
396 for (const auto& server : servers_) {
397 if (server->service_.request_count() == 0) return false;
402 // Updates \a connection_order by appending to it the index of the newly
403 // connected server. Must be called after every single RPC.
404 void UpdateConnectionOrder(
405 const std::vector<std::unique_ptr<ServerData>>& servers,
406 std::vector<int>* connection_order) {
407 for (size_t i = 0; i < servers.size(); ++i) {
408 if (servers[i]->service_.request_count() == 1) {
409 // Was the server index known? If not, update connection_order.
411 std::find(connection_order->begin(), connection_order->end(), i);
412 if (it == connection_order->end()) {
413 connection_order->push_back(i);
420 const char* ValidServiceConfigV1() { return "{\"version\": \"1\"}"; }
422 const char* ValidServiceConfigV2() { return "{\"version\": \"2\"}"; }
424 const char* ValidDefaultServiceConfig() {
425 return "{\"version\": \"valid_default\"}";
428 const char* InvalidDefaultServiceConfig() {
429 return "{\"version\": \"invalid_default\"";
432 bool ipv6_only_ = false;
433 const std::string server_host_;
434 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
435 std::vector<std::unique_ptr<ServerData>> servers_;
436 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
438 const std::string kRequestMessage_;
439 std::shared_ptr<ChannelCredentials> creds_;
442 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigTest) {
444 auto channel = BuildChannel();
445 auto stub = BuildStub(channel);
446 SetNextResolutionNoServiceConfig(GetServersPorts());
447 CheckRpcSendOk(stub, DEBUG_LOCATION);
448 EXPECT_STREQ("{}", channel->GetServiceConfigJSON().c_str());
451 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigWithDefaultConfigTest) {
453 auto channel = BuildChannelWithDefaultServiceConfig();
454 auto stub = BuildStub(channel);
455 SetNextResolutionNoServiceConfig(GetServersPorts());
456 CheckRpcSendOk(stub, DEBUG_LOCATION);
457 EXPECT_STREQ(ValidDefaultServiceConfig(),
458 channel->GetServiceConfigJSON().c_str());
461 TEST_F(ServiceConfigEnd2endTest, InvalidServiceConfigTest) {
463 auto channel = BuildChannel();
464 auto stub = BuildStub(channel);
465 SetNextResolutionInvalidServiceConfig(GetServersPorts());
466 CheckRpcSendFailure(stub);
469 TEST_F(ServiceConfigEnd2endTest, ValidServiceConfigUpdatesTest) {
471 auto channel = BuildChannel();
472 auto stub = BuildStub(channel);
473 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
474 CheckRpcSendOk(stub, DEBUG_LOCATION);
475 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
476 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV2());
477 CheckRpcSendOk(stub, DEBUG_LOCATION);
478 EXPECT_STREQ(ValidServiceConfigV2(), channel->GetServiceConfigJSON().c_str());
481 TEST_F(ServiceConfigEnd2endTest,
482 NoServiceConfigUpdateAfterValidServiceConfigTest) {
484 auto channel = BuildChannel();
485 auto stub = BuildStub(channel);
486 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
487 CheckRpcSendOk(stub, DEBUG_LOCATION);
488 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
489 SetNextResolutionNoServiceConfig(GetServersPorts());
490 CheckRpcSendOk(stub, DEBUG_LOCATION);
491 EXPECT_STREQ("{}", channel->GetServiceConfigJSON().c_str());
494 TEST_F(ServiceConfigEnd2endTest,
495 NoServiceConfigUpdateAfterValidServiceConfigWithDefaultConfigTest) {
497 auto channel = BuildChannelWithDefaultServiceConfig();
498 auto stub = BuildStub(channel);
499 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
500 CheckRpcSendOk(stub, DEBUG_LOCATION);
501 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
502 SetNextResolutionNoServiceConfig(GetServersPorts());
503 CheckRpcSendOk(stub, DEBUG_LOCATION);
504 EXPECT_STREQ(ValidDefaultServiceConfig(),
505 channel->GetServiceConfigJSON().c_str());
508 TEST_F(ServiceConfigEnd2endTest,
509 InvalidServiceConfigUpdateAfterValidServiceConfigTest) {
511 auto channel = BuildChannel();
512 auto stub = BuildStub(channel);
513 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
514 CheckRpcSendOk(stub, DEBUG_LOCATION);
515 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
516 SetNextResolutionInvalidServiceConfig(GetServersPorts());
517 CheckRpcSendOk(stub, DEBUG_LOCATION);
518 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
521 TEST_F(ServiceConfigEnd2endTest,
522 InvalidServiceConfigUpdateAfterValidServiceConfigWithDefaultConfigTest) {
524 auto channel = BuildChannelWithDefaultServiceConfig();
525 auto stub = BuildStub(channel);
526 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
527 CheckRpcSendOk(stub, DEBUG_LOCATION);
528 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
529 SetNextResolutionInvalidServiceConfig(GetServersPorts());
530 CheckRpcSendOk(stub, DEBUG_LOCATION);
531 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
534 TEST_F(ServiceConfigEnd2endTest,
535 ValidServiceConfigAfterInvalidServiceConfigTest) {
537 auto channel = BuildChannel();
538 auto stub = BuildStub(channel);
539 SetNextResolutionInvalidServiceConfig(GetServersPorts());
540 CheckRpcSendFailure(stub);
541 SetNextResolutionValidServiceConfig(GetServersPorts());
542 CheckRpcSendOk(stub, DEBUG_LOCATION);
545 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigAfterInvalidServiceConfigTest) {
547 auto channel = BuildChannel();
548 auto stub = BuildStub(channel);
549 SetNextResolutionInvalidServiceConfig(GetServersPorts());
550 CheckRpcSendFailure(stub);
551 SetNextResolutionNoServiceConfig(GetServersPorts());
552 CheckRpcSendOk(stub, DEBUG_LOCATION);
553 EXPECT_STREQ("{}", channel->GetServiceConfigJSON().c_str());
556 TEST_F(ServiceConfigEnd2endTest,
557 AnotherInvalidServiceConfigAfterInvalidServiceConfigTest) {
559 auto channel = BuildChannel();
560 auto stub = BuildStub(channel);
561 SetNextResolutionInvalidServiceConfig(GetServersPorts());
562 CheckRpcSendFailure(stub);
563 SetNextResolutionInvalidServiceConfig(GetServersPorts());
564 CheckRpcSendFailure(stub);
567 TEST_F(ServiceConfigEnd2endTest, InvalidDefaultServiceConfigTest) {
569 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
570 auto stub = BuildStub(channel);
571 // An invalid default service config results in a lame channel which fails all
573 CheckRpcSendFailure(stub);
576 TEST_F(ServiceConfigEnd2endTest,
577 InvalidDefaultServiceConfigTestWithValidServiceConfig) {
579 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
580 auto stub = BuildStub(channel);
581 CheckRpcSendFailure(stub);
582 // An invalid default service config results in a lame channel which fails all
584 SetNextResolutionValidServiceConfig(GetServersPorts());
585 CheckRpcSendFailure(stub);
588 TEST_F(ServiceConfigEnd2endTest,
589 InvalidDefaultServiceConfigTestWithInvalidServiceConfig) {
591 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
592 auto stub = BuildStub(channel);
593 CheckRpcSendFailure(stub);
594 // An invalid default service config results in a lame channel which fails all
596 SetNextResolutionInvalidServiceConfig(GetServersPorts());
597 CheckRpcSendFailure(stub);
600 TEST_F(ServiceConfigEnd2endTest,
601 InvalidDefaultServiceConfigTestWithNoServiceConfig) {
603 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
604 auto stub = BuildStub(channel);
605 CheckRpcSendFailure(stub);
606 // An invalid default service config results in a lame channel which fails all
608 SetNextResolutionNoServiceConfig(GetServersPorts());
609 CheckRpcSendFailure(stub);
613 } // namespace testing
616 int main(int argc, char** argv) {
617 ::testing::InitGoogleTest(&argc, argv);
618 grpc::testing::TestEnvironment env(argc, argv);
619 const auto result = RUN_ALL_TESTS();