Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / test / cpp / end2end / service_config_end2end_test.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <algorithm>
20 #include <memory>
21 #include <mutex>
22 #include <random>
23 #include <set>
24 #include <thread>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34 #include <grpcpp/create_channel.h>
35 #include <grpcpp/health_check_service_interface.h>
36 #include <grpcpp/impl/codegen/sync.h>
37 #include <grpcpp/server.h>
38 #include <grpcpp/server_builder.h>
39
40 #include "src/core/ext/filters/client_channel/backup_poller.h"
41 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
42 #include "src/core/ext/filters/client_channel/parse_address.h"
43 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
44 #include "src/core/ext/filters/client_channel/server_address.h"
45 #include "src/core/lib/backoff/backoff.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/iomgr/tcp_client.h"
50 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
51 #include "src/cpp/client/secure_credentials.h"
52 #include "src/cpp/server/secure_server_credentials.h"
53
54 #include "src/proto/grpc/testing/echo.grpc.pb.h"
55 #include "test/core/util/port.h"
56 #include "test/core/util/test_config.h"
57 #include "test/cpp/end2end/test_service_impl.h"
58
59 #include <gmock/gmock.h>
60 #include <gtest/gtest.h>
61
62 using grpc::testing::EchoRequest;
63 using grpc::testing::EchoResponse;
64 using std::chrono::system_clock;
65
66 namespace grpc {
67 namespace testing {
68 namespace {
69
70 // Subclass of TestServiceImpl that increments a request counter for
71 // every call to the Echo RPC.
72 class MyTestServiceImpl : public TestServiceImpl {
73  public:
74   MyTestServiceImpl() : request_count_(0) {}
75
76   Status Echo(ServerContext* context, const EchoRequest* request,
77               EchoResponse* response) override {
78     {
79       grpc::internal::MutexLock lock(&mu_);
80       ++request_count_;
81     }
82     AddClient(context->peer());
83     return TestServiceImpl::Echo(context, request, response);
84   }
85
86   int request_count() {
87     grpc::internal::MutexLock lock(&mu_);
88     return request_count_;
89   }
90
91   void ResetCounters() {
92     grpc::internal::MutexLock lock(&mu_);
93     request_count_ = 0;
94   }
95
96   std::set<grpc::string> clients() {
97     grpc::internal::MutexLock lock(&clients_mu_);
98     return clients_;
99   }
100
101  private:
102   void AddClient(const grpc::string& client) {
103     grpc::internal::MutexLock lock(&clients_mu_);
104     clients_.insert(client);
105   }
106
107   grpc::internal::Mutex mu_;
108   int request_count_;
109   grpc::internal::Mutex clients_mu_;
110   std::set<grpc::string> clients_;
111 };
112
113 class ServiceConfigEnd2endTest : public ::testing::Test {
114  protected:
115   ServiceConfigEnd2endTest()
116       : server_host_("localhost"),
117         kRequestMessage_("Live long and prosper."),
118         creds_(new SecureChannelCredentials(
119             grpc_fake_transport_security_credentials_create())) {
120     // Make the backup poller poll very frequently in order to pick up
121     // updates from all the subchannels's FDs.
122     GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
123   }
124
125   void SetUp() override {
126     grpc_init();
127     response_generator_ =
128         grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
129   }
130
131   void TearDown() override {
132     for (size_t i = 0; i < servers_.size(); ++i) {
133       servers_[i]->Shutdown();
134     }
135     // Explicitly destroy all the members so that we can make sure grpc_shutdown
136     // has finished by the end of this function, and thus all the registered
137     // LB policy factories are removed.
138     stub_.reset();
139     servers_.clear();
140     creds_.reset();
141     grpc_shutdown_blocking();
142   }
143
144   void CreateServers(size_t num_servers,
145                      std::vector<int> ports = std::vector<int>()) {
146     servers_.clear();
147     for (size_t i = 0; i < num_servers; ++i) {
148       int port = 0;
149       if (ports.size() == num_servers) port = ports[i];
150       servers_.emplace_back(new ServerData(port));
151     }
152   }
153
154   void StartServer(size_t index) { servers_[index]->Start(server_host_); }
155
156   void StartServers(size_t num_servers,
157                     std::vector<int> ports = std::vector<int>()) {
158     CreateServers(num_servers, std::move(ports));
159     for (size_t i = 0; i < num_servers; ++i) {
160       StartServer(i);
161     }
162   }
163
164   grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
165     grpc_core::Resolver::Result result;
166     for (const int& port : ports) {
167       char* lb_uri_str;
168       gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
169       grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
170       GPR_ASSERT(lb_uri != nullptr);
171       grpc_resolved_address address;
172       GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
173       result.addresses.emplace_back(address.addr, address.len,
174                                     nullptr /* args */);
175       grpc_uri_destroy(lb_uri);
176       gpr_free(lb_uri_str);
177     }
178     return result;
179   }
180
181   void SetNextResolutionNoServiceConfig(const std::vector<int>& ports) {
182     grpc_core::ExecCtx exec_ctx;
183     grpc_core::Resolver::Result result = BuildFakeResults(ports);
184     response_generator_->SetResponse(result);
185   }
186
187   void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
188     grpc_core::ExecCtx exec_ctx;
189     grpc_core::Resolver::Result result = BuildFakeResults(ports);
190     result.service_config =
191         grpc_core::ServiceConfig::Create("{}", &result.service_config_error);
192     response_generator_->SetResponse(result);
193   }
194
195   void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
196     grpc_core::ExecCtx exec_ctx;
197     grpc_core::Resolver::Result result = BuildFakeResults(ports);
198     result.service_config =
199         grpc_core::ServiceConfig::Create("{", &result.service_config_error);
200     response_generator_->SetResponse(result);
201   }
202
203   void SetNextResolutionWithServiceConfig(const std::vector<int>& ports,
204                                           const char* svc_cfg) {
205     grpc_core::ExecCtx exec_ctx;
206     grpc_core::Resolver::Result result = BuildFakeResults(ports);
207     result.service_config =
208         grpc_core::ServiceConfig::Create(svc_cfg, &result.service_config_error);
209     response_generator_->SetResponse(result);
210   }
211
212   std::vector<int> GetServersPorts(size_t start_index = 0) {
213     std::vector<int> ports;
214     for (size_t i = start_index; i < servers_.size(); ++i) {
215       ports.push_back(servers_[i]->port_);
216     }
217     return ports;
218   }
219
220   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
221       const std::shared_ptr<Channel>& channel) {
222     return grpc::testing::EchoTestService::NewStub(channel);
223   }
224
225   std::shared_ptr<Channel> BuildChannel() {
226     ChannelArguments args;
227     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
228                     response_generator_.get());
229     return ::grpc::CreateCustomChannel("fake:///", creds_, args);
230   }
231
232   std::shared_ptr<Channel> BuildChannelWithDefaultServiceConfig() {
233     ChannelArguments args;
234     args.SetServiceConfigJSON(ValidDefaultServiceConfig());
235     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
236                     response_generator_.get());
237     return ::grpc::CreateCustomChannel("fake:///", creds_, args);
238   }
239
240   bool SendRpc(
241       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
242       EchoResponse* response = nullptr, int timeout_ms = 1000,
243       Status* result = nullptr, bool wait_for_ready = false) {
244     const bool local_response = (response == nullptr);
245     if (local_response) response = new EchoResponse;
246     EchoRequest request;
247     request.set_message(kRequestMessage_);
248     ClientContext context;
249     context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
250     if (wait_for_ready) context.set_wait_for_ready(true);
251     Status status = stub->Echo(&context, request, response);
252     if (result != nullptr) *result = status;
253     if (local_response) delete response;
254     return status.ok();
255   }
256
257   void CheckRpcSendOk(
258       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
259       const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
260     EchoResponse response;
261     Status status;
262     const bool success =
263         SendRpc(stub, &response, 2000, &status, wait_for_ready);
264     ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
265                          << "\n"
266                          << "Error: " << status.error_message() << " "
267                          << status.error_details();
268     ASSERT_EQ(response.message(), kRequestMessage_)
269         << "From " << location.file() << ":" << location.line();
270     if (!success) abort();
271   }
272
273   void CheckRpcSendFailure(
274       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
275     const bool success = SendRpc(stub);
276     EXPECT_FALSE(success);
277   }
278
279   struct ServerData {
280     int port_;
281     std::unique_ptr<Server> server_;
282     MyTestServiceImpl service_;
283     std::unique_ptr<std::thread> thread_;
284     bool server_ready_ = false;
285     bool started_ = false;
286
287     explicit ServerData(int port = 0) {
288       port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
289     }
290
291     void Start(const grpc::string& server_host) {
292       gpr_log(GPR_INFO, "starting server on port %d", port_);
293       started_ = true;
294       grpc::internal::Mutex mu;
295       grpc::internal::MutexLock lock(&mu);
296       grpc::internal::CondVar cond;
297       thread_.reset(new std::thread(
298           std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
299       cond.WaitUntil(&mu, [this] { return server_ready_; });
300       server_ready_ = false;
301       gpr_log(GPR_INFO, "server startup complete");
302     }
303
304     void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
305                grpc::internal::CondVar* cond) {
306       std::ostringstream server_address;
307       server_address << server_host << ":" << port_;
308       ServerBuilder builder;
309       std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
310           grpc_fake_transport_security_server_credentials_create()));
311       builder.AddListeningPort(server_address.str(), std::move(creds));
312       builder.RegisterService(&service_);
313       server_ = builder.BuildAndStart();
314       grpc::internal::MutexLock lock(mu);
315       server_ready_ = true;
316       cond->Signal();
317     }
318
319     void Shutdown() {
320       if (!started_) return;
321       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
322       thread_->join();
323       started_ = false;
324     }
325
326     void SetServingStatus(const grpc::string& service, bool serving) {
327       server_->GetHealthCheckService()->SetServingStatus(service, serving);
328     }
329   };
330
331   void ResetCounters() {
332     for (const auto& server : servers_) server->service_.ResetCounters();
333   }
334
335   void WaitForServer(
336       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
337       size_t server_idx, const grpc_core::DebugLocation& location,
338       bool ignore_failure = false) {
339     do {
340       if (ignore_failure) {
341         SendRpc(stub);
342       } else {
343         CheckRpcSendOk(stub, location, true);
344       }
345     } while (servers_[server_idx]->service_.request_count() == 0);
346     ResetCounters();
347   }
348
349   bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
350     const gpr_timespec deadline =
351         grpc_timeout_seconds_to_deadline(timeout_seconds);
352     grpc_connectivity_state state;
353     while ((state = channel->GetState(false /* try_to_connect */)) ==
354            GRPC_CHANNEL_READY) {
355       if (!channel->WaitForStateChange(state, deadline)) return false;
356     }
357     return true;
358   }
359
360   bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
361     const gpr_timespec deadline =
362         grpc_timeout_seconds_to_deadline(timeout_seconds);
363     grpc_connectivity_state state;
364     while ((state = channel->GetState(true /* try_to_connect */)) !=
365            GRPC_CHANNEL_READY) {
366       if (!channel->WaitForStateChange(state, deadline)) return false;
367     }
368     return true;
369   }
370
371   bool SeenAllServers() {
372     for (const auto& server : servers_) {
373       if (server->service_.request_count() == 0) return false;
374     }
375     return true;
376   }
377
378   // Updates \a connection_order by appending to it the index of the newly
379   // connected server. Must be called after every single RPC.
380   void UpdateConnectionOrder(
381       const std::vector<std::unique_ptr<ServerData>>& servers,
382       std::vector<int>* connection_order) {
383     for (size_t i = 0; i < servers.size(); ++i) {
384       if (servers[i]->service_.request_count() == 1) {
385         // Was the server index known? If not, update connection_order.
386         const auto it =
387             std::find(connection_order->begin(), connection_order->end(), i);
388         if (it == connection_order->end()) {
389           connection_order->push_back(i);
390           return;
391         }
392       }
393     }
394   }
395
396   const char* ValidServiceConfigV1() { return "{\"version\": \"1\"}"; }
397
398   const char* ValidServiceConfigV2() { return "{\"version\": \"2\"}"; }
399
400   const char* ValidDefaultServiceConfig() {
401     return "{\"version\": \"valid_default\"}";
402   }
403
404   const char* InvalidDefaultServiceConfig() {
405     return "{\"version\": \"invalid_default\"}";
406   }
407
408   const grpc::string server_host_;
409   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
410   std::vector<std::unique_ptr<ServerData>> servers_;
411   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
412       response_generator_;
413   const grpc::string kRequestMessage_;
414   std::shared_ptr<ChannelCredentials> creds_;
415 };
416
417 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigTest) {
418   StartServers(1);
419   auto channel = BuildChannel();
420   auto stub = BuildStub(channel);
421   SetNextResolutionNoServiceConfig(GetServersPorts());
422   CheckRpcSendOk(stub, DEBUG_LOCATION);
423   EXPECT_STREQ("", channel->GetServiceConfigJSON().c_str());
424 }
425
426 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigWithDefaultConfigTest) {
427   StartServers(1);
428   auto channel = BuildChannelWithDefaultServiceConfig();
429   auto stub = BuildStub(channel);
430   SetNextResolutionNoServiceConfig(GetServersPorts());
431   CheckRpcSendOk(stub, DEBUG_LOCATION);
432   EXPECT_STREQ(ValidDefaultServiceConfig(),
433                channel->GetServiceConfigJSON().c_str());
434 }
435
436 TEST_F(ServiceConfigEnd2endTest, InvalidServiceConfigTest) {
437   StartServers(1);
438   auto channel = BuildChannel();
439   auto stub = BuildStub(channel);
440   SetNextResolutionInvalidServiceConfig(GetServersPorts());
441   CheckRpcSendFailure(stub);
442 }
443
444 TEST_F(ServiceConfigEnd2endTest, InvalidServiceConfigWithDefaultConfigTest) {
445   StartServers(1);
446   auto channel = BuildChannelWithDefaultServiceConfig();
447   auto stub = BuildStub(channel);
448   SetNextResolutionInvalidServiceConfig(GetServersPorts());
449   CheckRpcSendOk(stub, DEBUG_LOCATION);
450   EXPECT_STREQ(ValidDefaultServiceConfig(),
451                channel->GetServiceConfigJSON().c_str());
452 }
453
454 TEST_F(ServiceConfigEnd2endTest, ValidServiceConfigUpdatesTest) {
455   StartServers(1);
456   auto channel = BuildChannel();
457   auto stub = BuildStub(channel);
458   SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
459   CheckRpcSendOk(stub, DEBUG_LOCATION);
460   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
461   SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV2());
462   CheckRpcSendOk(stub, DEBUG_LOCATION);
463   EXPECT_STREQ(ValidServiceConfigV2(), channel->GetServiceConfigJSON().c_str());
464 }
465
466 TEST_F(ServiceConfigEnd2endTest,
467        NoServiceConfigUpdateAfterValidServiceConfigTest) {
468   StartServers(1);
469   auto channel = BuildChannel();
470   auto stub = BuildStub(channel);
471   SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
472   CheckRpcSendOk(stub, DEBUG_LOCATION);
473   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
474   SetNextResolutionNoServiceConfig(GetServersPorts());
475   CheckRpcSendOk(stub, DEBUG_LOCATION);
476   EXPECT_STREQ("", channel->GetServiceConfigJSON().c_str());
477 }
478
479 TEST_F(ServiceConfigEnd2endTest,
480        NoServiceConfigUpdateAfterValidServiceConfigWithDefaultConfigTest) {
481   StartServers(1);
482   auto channel = BuildChannelWithDefaultServiceConfig();
483   auto stub = BuildStub(channel);
484   SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
485   CheckRpcSendOk(stub, DEBUG_LOCATION);
486   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
487   SetNextResolutionNoServiceConfig(GetServersPorts());
488   CheckRpcSendOk(stub, DEBUG_LOCATION);
489   EXPECT_STREQ(ValidDefaultServiceConfig(),
490                channel->GetServiceConfigJSON().c_str());
491 }
492
493 TEST_F(ServiceConfigEnd2endTest,
494        InvalidServiceConfigUpdateAfterValidServiceConfigTest) {
495   StartServers(1);
496   auto channel = BuildChannel();
497   auto stub = BuildStub(channel);
498   SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
499   CheckRpcSendOk(stub, DEBUG_LOCATION);
500   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
501   SetNextResolutionInvalidServiceConfig(GetServersPorts());
502   CheckRpcSendOk(stub, DEBUG_LOCATION);
503   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
504 }
505
506 TEST_F(ServiceConfigEnd2endTest,
507        InvalidServiceConfigUpdateAfterValidServiceConfigWithDefaultConfigTest) {
508   StartServers(1);
509   auto channel = BuildChannelWithDefaultServiceConfig();
510   auto stub = BuildStub(channel);
511   SetNextResolutionWithServiceConfig(GetServersPorts(), ValidServiceConfigV1());
512   CheckRpcSendOk(stub, DEBUG_LOCATION);
513   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
514   SetNextResolutionInvalidServiceConfig(GetServersPorts());
515   CheckRpcSendOk(stub, DEBUG_LOCATION);
516   EXPECT_STREQ(ValidServiceConfigV1(), channel->GetServiceConfigJSON().c_str());
517 }
518
519 TEST_F(ServiceConfigEnd2endTest,
520        ValidServiceConfigAfterInvalidServiceConfigTest) {
521   StartServers(1);
522   auto channel = BuildChannel();
523   auto stub = BuildStub(channel);
524   SetNextResolutionInvalidServiceConfig(GetServersPorts());
525   CheckRpcSendFailure(stub);
526   SetNextResolutionValidServiceConfig(GetServersPorts());
527   CheckRpcSendOk(stub, DEBUG_LOCATION);
528 }
529
530 TEST_F(ServiceConfigEnd2endTest, NoServiceConfigAfterInvalidServiceConfigTest) {
531   StartServers(1);
532   auto channel = BuildChannel();
533   auto stub = BuildStub(channel);
534   SetNextResolutionInvalidServiceConfig(GetServersPorts());
535   CheckRpcSendFailure(stub);
536   SetNextResolutionNoServiceConfig(GetServersPorts());
537   CheckRpcSendOk(stub, DEBUG_LOCATION);
538   EXPECT_STREQ("", channel->GetServiceConfigJSON().c_str());
539 }
540
541 TEST_F(ServiceConfigEnd2endTest,
542        AnotherInvalidServiceConfigAfterInvalidServiceConfigTest) {
543   StartServers(1);
544   auto channel = BuildChannel();
545   auto stub = BuildStub(channel);
546   SetNextResolutionInvalidServiceConfig(GetServersPorts());
547   CheckRpcSendFailure(stub);
548   SetNextResolutionInvalidServiceConfig(GetServersPorts());
549   CheckRpcSendFailure(stub);
550 }
551
552 }  // namespace
553 }  // namespace testing
554 }  // namespace grpc
555
556 int main(int argc, char** argv) {
557   ::testing::InitGoogleTest(&argc, argv);
558   grpc::testing::TestEnvironment env(argc, argv);
559   const auto result = RUN_ALL_TESTS();
560   return result;
561 }