3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include "absl/strings/str_cat.h"
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/atm.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/time.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 #include <grpcpp/create_channel.h>
37 #include <grpcpp/health_check_service_interface.h>
38 #include <grpcpp/impl/codegen/sync.h>
39 #include <grpcpp/server.h>
40 #include <grpcpp/server_builder.h>
41 #include <grpcpp/support/validate_service_config.h>
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
45 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
46 #include "src/core/ext/filters/client_channel/server_address.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/gprpp/debug_location.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/iomgr/parse_address.h"
52 #include "src/core/lib/iomgr/tcp_client.h"
53 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
54 #include "src/cpp/client/secure_credentials.h"
55 #include "src/cpp/server/secure_server_credentials.h"
57 #include "src/proto/grpc/testing/echo.grpc.pb.h"
58 #include "test/core/util/port.h"
59 #include "test/core/util/test_config.h"
60 #include "test/cpp/end2end/test_service_impl.h"
62 #include <gmock/gmock.h>
63 #include <gtest/gtest.h>
65 using grpc::testing::EchoRequest;
66 using grpc::testing::EchoResponse;
67 using std::chrono::system_clock;
73 // Subclass of TestServiceImpl that increments a request counter for
74 // every call to the Echo RPC.
75 class MyTestServiceImpl : public TestServiceImpl {
77 MyTestServiceImpl() : request_count_(0) {}
79 Status Echo(ServerContext* context, const EchoRequest* request,
80 EchoResponse* response) override {
82 grpc::internal::MutexLock lock(&mu_);
85 AddClient(context->peer());
86 return TestServiceImpl::Echo(context, request, response);
90 grpc::internal::MutexLock lock(&mu_);
91 return request_count_;
94 void ResetCounters() {
95 grpc::internal::MutexLock lock(&mu_);
99 std::set<std::string> clients() {
100 grpc::internal::MutexLock lock(&clients_mu_);
105 void AddClient(const std::string& client) {
106 grpc::internal::MutexLock lock(&clients_mu_);
107 clients_.insert(client);
110 grpc::internal::Mutex mu_;
112 grpc::internal::Mutex clients_mu_;
113 std::set<std::string> clients_;
116 class ServiceConfigEnd2endTest : public ::testing::Test {
118 ServiceConfigEnd2endTest()
119 : server_host_("localhost"),
120 kRequestMessage_("Live long and prosper."),
121 creds_(new SecureChannelCredentials(
122 grpc_fake_transport_security_credentials_create())) {}
124 static void SetUpTestCase() {
125 // Make the backup poller poll very frequently in order to pick up
126 // updates from all the subchannels's FDs.
127 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
130 void SetUp() override {
132 response_generator_ =
133 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
136 void TearDown() override {
137 for (size_t i = 0; i < servers_.size(); ++i) {
138 servers_[i]->Shutdown();
140 // Explicitly destroy all the members so that we can make sure grpc_shutdown
141 // has finished by the end of this function, and thus all the registered
142 // LB policy factories are removed.
146 grpc_shutdown_blocking();
149 void CreateServers(size_t num_servers,
150 std::vector<int> ports = std::vector<int>()) {
152 for (size_t i = 0; i < num_servers; ++i) {
154 if (ports.size() == num_servers) port = ports[i];
155 servers_.emplace_back(new ServerData(port));
159 void StartServer(size_t index) { servers_[index]->Start(server_host_); }
161 void StartServers(size_t num_servers,
162 std::vector<int> ports = std::vector<int>()) {
163 CreateServers(num_servers, std::move(ports));
164 for (size_t i = 0; i < num_servers; ++i) {
169 grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
170 grpc_core::Resolver::Result result;
171 for (const int& port : ports) {
172 std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port);
173 grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str.c_str(), true);
174 GPR_ASSERT(lb_uri != nullptr);
175 grpc_resolved_address address;
176 GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
177 result.addresses.emplace_back(address.addr, address.len,
179 grpc_uri_destroy(lb_uri);
184 void SetNextResolutionNoServiceConfig(const std::vector<int>& ports) {
185 grpc_core::ExecCtx exec_ctx;
186 grpc_core::Resolver::Result result = BuildFakeResults(ports);
187 response_generator_->SetResponse(result);
190 void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
191 grpc_core::ExecCtx exec_ctx;
192 grpc_core::Resolver::Result result = BuildFakeResults(ports);
193 result.service_config = grpc_core::ServiceConfig::Create(
194 nullptr, "{}", &result.service_config_error);
195 response_generator_->SetResponse(result);
198 void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
199 grpc_core::ExecCtx exec_ctx;
200 grpc_core::Resolver::Result result = BuildFakeResults(ports);
201 result.service_config = grpc_core::ServiceConfig::Create(
202 nullptr, "{", &result.service_config_error);
203 response_generator_->SetResponse(result);
206 void SetNextResolutionWithServiceConfig(const std::vector<int>& ports,
207 const char* svc_cfg) {
208 grpc_core::ExecCtx exec_ctx;
209 grpc_core::Resolver::Result result = BuildFakeResults(ports);
210 result.service_config = grpc_core::ServiceConfig::Create(
211 nullptr, svc_cfg, &result.service_config_error);
212 response_generator_->SetResponse(result);
215 std::vector<int> GetServersPorts(size_t start_index = 0) {
216 std::vector<int> ports;
217 for (size_t i = start_index; i < servers_.size(); ++i) {
218 ports.push_back(servers_[i]->port_);
223 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
224 const std::shared_ptr<Channel>& channel) {
225 return grpc::testing::EchoTestService::NewStub(channel);
228 std::shared_ptr<Channel> BuildChannel() {
229 ChannelArguments args;
230 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
231 response_generator_.get());
232 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
235 std::shared_ptr<Channel> BuildChannelWithDefaultServiceConfig() {
236 ChannelArguments args;
237 EXPECT_THAT(grpc::experimental::ValidateServiceConfigJSON(
238 ValidDefaultServiceConfig()),
239 ::testing::StrEq(""));
240 args.SetServiceConfigJSON(ValidDefaultServiceConfig());
241 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
242 response_generator_.get());
243 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
246 std::shared_ptr<Channel> BuildChannelWithInvalidDefaultServiceConfig() {
247 ChannelArguments args;
248 EXPECT_THAT(grpc::experimental::ValidateServiceConfigJSON(
249 InvalidDefaultServiceConfig()),
250 ::testing::HasSubstr("JSON parse error"));
251 args.SetServiceConfigJSON(InvalidDefaultServiceConfig());
252 args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
253 response_generator_.get());
254 return ::grpc::CreateCustomChannel("fake:///", creds_, args);
258 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
259 EchoResponse* response = nullptr, int timeout_ms = 1000,
260 Status* result = nullptr, bool wait_for_ready = false) {
261 const bool local_response = (response == nullptr);
262 if (local_response) response = new EchoResponse;
264 request.set_message(kRequestMessage_);
265 ClientContext context;
266 context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
267 if (wait_for_ready) context.set_wait_for_ready(true);
268 Status status = stub->Echo(&context, request, response);
269 if (result != nullptr) *result = status;
270 if (local_response) delete response;
275 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
276 const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
277 EchoResponse response;
280 SendRpc(stub, &response, 2000, &status, wait_for_ready);
281 ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
283 << "Error: " << status.error_message() << " "
284 << status.error_details();
285 ASSERT_EQ(response.message(), kRequestMessage_)
286 << "From " << location.file() << ":" << location.line();
287 if (!success) abort();
290 void CheckRpcSendFailure(
291 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
292 const bool success = SendRpc(stub);
293 EXPECT_FALSE(success);
298 std::unique_ptr<Server> server_;
299 MyTestServiceImpl service_;
300 std::unique_ptr<std::thread> thread_;
301 bool server_ready_ = false;
302 bool started_ = false;
304 explicit ServerData(int port = 0) {
305 port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
308 void Start(const std::string& server_host) {
309 gpr_log(GPR_INFO, "starting server on port %d", port_);
311 grpc::internal::Mutex mu;
312 grpc::internal::MutexLock lock(&mu);
313 grpc::internal::CondVar cond;
314 thread_.reset(new std::thread(
315 std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
316 cond.WaitUntil(&mu, [this] { return server_ready_; });
317 server_ready_ = false;
318 gpr_log(GPR_INFO, "server startup complete");
321 void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
322 grpc::internal::CondVar* cond) {
323 std::ostringstream server_address;
324 server_address << server_host << ":" << port_;
325 ServerBuilder builder;
326 std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
327 grpc_fake_transport_security_server_credentials_create()));
328 builder.AddListeningPort(server_address.str(), std::move(creds));
329 builder.RegisterService(&service_);
330 server_ = builder.BuildAndStart();
331 grpc::internal::MutexLock lock(mu);
332 server_ready_ = true;
337 if (!started_) return;
338 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
343 void SetServingStatus(const std::string& service, bool serving) {
344 server_->GetHealthCheckService()->SetServingStatus(service, serving);
348 void ResetCounters() {
349 for (const auto& server : servers_) server->service_.ResetCounters();
353 const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
354 size_t server_idx, const grpc_core::DebugLocation& location,
355 bool ignore_failure = false) {
357 if (ignore_failure) {
360 CheckRpcSendOk(stub, location, true);
362 } while (servers_[server_idx]->service_.request_count() == 0);
366 bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
367 const gpr_timespec deadline =
368 grpc_timeout_seconds_to_deadline(timeout_seconds);
369 grpc_connectivity_state state;
370 while ((state = channel->GetState(false /* try_to_connect */)) ==
371 GRPC_CHANNEL_READY) {
372 if (!channel->WaitForStateChange(state, deadline)) return false;
377 bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
378 const gpr_timespec deadline =
379 grpc_timeout_seconds_to_deadline(timeout_seconds);
380 grpc_connectivity_state state;
381 while ((state = channel->GetState(true /* try_to_connect */)) !=
382 GRPC_CHANNEL_READY) {
383 if (!channel->WaitForStateChange(state, deadline)) return false;
388 bool SeenAllServers() {
389 for (const auto& server : servers_) {
390 if (server->service_.request_count() == 0) return false;
395 // Updates \a connection_order by appending to it the index of the newly
396 // connected server. Must be called after every single RPC.
397 void UpdateConnectionOrder(
398 const std::vector<std::unique_ptr<ServerData>>& servers,
399 std::vector<int>* connection_order) {
400 for (size_t i = 0; i < servers.size(); ++i) {
401 if (servers[i]->service_.request_count() == 1) {
402 // Was the server index known? If not, update connection_order.
404 std::find(connection_order->begin(), connection_order->end(), i);
405 if (it == connection_order->end()) {
406 connection_order->push_back(i);
413 const char* ValidServiceConfigV1() { return "{\"version\": \"1\"}"; }
415 const char* ValidServiceConfigV2() { return "{\"version\": \"2\"}"; }
417 const char* ValidDefaultServiceConfig() {
418 return "{\"version\": \"valid_default\"}";
421 const char* InvalidDefaultServiceConfig() {
422 return "{\"version\": \"invalid_default\"";
425 const std::string server_host_;
426 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
427 std::vector<std::unique_ptr<ServerData>> servers_;
428 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
430 const std::string kRequestMessage_;
431 std::shared_ptr<ChannelCredentials> creds_;
434 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigTest) {
436 auto channel = BuildChannel();
437 auto stub = BuildStub(channel);
438 SetNextResolutionNoServiceConfig(GetServersPorts());
439 CheckRpcSendOk(stub, DEBUG_LOCATION);
440 EXPECT_STREQ("{}", channel->GetServiceConfigJSON().c_str());
443 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigWithDefaultConfigTest) {
445 auto channel = BuildChannelWithDefaultServiceConfig();
446 auto stub = BuildStub(channel);
447 SetNextResolutionNoServiceConfig(GetServersPorts());
448 CheckRpcSendOk(stub, DEBUG_LOCATION);
449 EXPECT_STREQ(ValidDefaultServiceConfig(),
450 channel->GetServiceConfigJSON().c_str());
453 TEST_F(ServiceConfigEnd2endTest, InvalidServiceConfigTest) {
455 auto channel = BuildChannel();
456 auto stub = BuildStub(channel);
457 SetNextResolutionInvalidServiceConfig(GetServersPorts());
458 CheckRpcSendFailure(stub);
461 TEST_F(ServiceConfigEnd2endTest, ValidServiceConfigUpdatesTest) {
463 auto channel = BuildChannel();
464 auto stub = BuildStub(channel);
465 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
466 CheckRpcSendOk(stub, DEBUG_LOCATION);
467 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
468 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV2());
469 CheckRpcSendOk(stub, DEBUG_LOCATION);
470 EXPECT_STREQ(ValidServiceConfigV2(), channel->GetServiceConfigJSON().c_str());
473 TEST_F(ServiceConfigEnd2endTest,
474 NoServiceConfigUpdateAfterValidServiceConfigTest) {
476 auto channel = BuildChannel();
477 auto stub = BuildStub(channel);
478 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
479 CheckRpcSendOk(stub, DEBUG_LOCATION);
480 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
481 SetNextResolutionNoServiceConfig(GetServersPorts());
482 CheckRpcSendOk(stub, DEBUG_LOCATION);
483 EXPECT_STREQ("{}", channel->GetServiceConfigJSON().c_str());
486 TEST_F(ServiceConfigEnd2endTest,
487 NoServiceConfigUpdateAfterValidServiceConfigWithDefaultConfigTest) {
489 auto channel = BuildChannelWithDefaultServiceConfig();
490 auto stub = BuildStub(channel);
491 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
492 CheckRpcSendOk(stub, DEBUG_LOCATION);
493 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
494 SetNextResolutionNoServiceConfig(GetServersPorts());
495 CheckRpcSendOk(stub, DEBUG_LOCATION);
496 EXPECT_STREQ(ValidDefaultServiceConfig(),
497 channel->GetServiceConfigJSON().c_str());
500 TEST_F(ServiceConfigEnd2endTest,
501 InvalidServiceConfigUpdateAfterValidServiceConfigTest) {
503 auto channel = BuildChannel();
504 auto stub = BuildStub(channel);
505 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
506 CheckRpcSendOk(stub, DEBUG_LOCATION);
507 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
508 SetNextResolutionInvalidServiceConfig(GetServersPorts());
509 CheckRpcSendOk(stub, DEBUG_LOCATION);
510 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
513 TEST_F(ServiceConfigEnd2endTest,
514 InvalidServiceConfigUpdateAfterValidServiceConfigWithDefaultConfigTest) {
516 auto channel = BuildChannelWithDefaultServiceConfig();
517 auto stub = BuildStub(channel);
518 SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
519 CheckRpcSendOk(stub, DEBUG_LOCATION);
520 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
521 SetNextResolutionInvalidServiceConfig(GetServersPorts());
522 CheckRpcSendOk(stub, DEBUG_LOCATION);
523 EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
526 TEST_F(ServiceConfigEnd2endTest,
527 ValidServiceConfigAfterInvalidServiceConfigTest) {
529 auto channel = BuildChannel();
530 auto stub = BuildStub(channel);
531 SetNextResolutionInvalidServiceConfig(GetServersPorts());
532 CheckRpcSendFailure(stub);
533 SetNextResolutionValidServiceConfig(GetServersPorts());
534 CheckRpcSendOk(stub, DEBUG_LOCATION);
537 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigAfterInvalidServiceConfigTest) {
539 auto channel = BuildChannel();
540 auto stub = BuildStub(channel);
541 SetNextResolutionInvalidServiceConfig(GetServersPorts());
542 CheckRpcSendFailure(stub);
543 SetNextResolutionNoServiceConfig(GetServersPorts());
544 CheckRpcSendOk(stub, DEBUG_LOCATION);
545 EXPECT_STREQ("{}", channel->GetServiceConfigJSON().c_str());
548 TEST_F(ServiceConfigEnd2endTest,
549 AnotherInvalidServiceConfigAfterInvalidServiceConfigTest) {
551 auto channel = BuildChannel();
552 auto stub = BuildStub(channel);
553 SetNextResolutionInvalidServiceConfig(GetServersPorts());
554 CheckRpcSendFailure(stub);
555 SetNextResolutionInvalidServiceConfig(GetServersPorts());
556 CheckRpcSendFailure(stub);
559 TEST_F(ServiceConfigEnd2endTest, InvalidDefaultServiceConfigTest) {
561 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
562 auto stub = BuildStub(channel);
563 // An invalid default service config results in a lame channel which fails all
565 CheckRpcSendFailure(stub);
568 TEST_F(ServiceConfigEnd2endTest,
569 InvalidDefaultServiceConfigTestWithValidServiceConfig) {
571 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
572 auto stub = BuildStub(channel);
573 CheckRpcSendFailure(stub);
574 // An invalid default service config results in a lame channel which fails all
576 SetNextResolutionValidServiceConfig(GetServersPorts());
577 CheckRpcSendFailure(stub);
580 TEST_F(ServiceConfigEnd2endTest,
581 InvalidDefaultServiceConfigTestWithInvalidServiceConfig) {
583 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
584 auto stub = BuildStub(channel);
585 CheckRpcSendFailure(stub);
586 // An invalid default service config results in a lame channel which fails all
588 SetNextResolutionInvalidServiceConfig(GetServersPorts());
589 CheckRpcSendFailure(stub);
592 TEST_F(ServiceConfigEnd2endTest,
593 InvalidDefaultServiceConfigTestWithNoServiceConfig) {
595 auto channel = BuildChannelWithInvalidDefaultServiceConfig();
596 auto stub = BuildStub(channel);
597 CheckRpcSendFailure(stub);
598 // An invalid default service config results in a lame channel which fails all
600 SetNextResolutionNoServiceConfig(GetServersPorts());
601 CheckRpcSendFailure(stub);
605 } // namespace testing
608 int main(int argc, char** argv) {
609 ::testing::InitGoogleTest(&argc, argv);
610 grpc::testing::TestEnvironment env(argc, argv);
611 const auto result = RUN_ALL_TESTS();