9d04020ad29b5007d4d9f29e4e11a912f5df6815
[platform/upstream/grpc.git] / test / cpp / end2end / cfstream_test.cc
1 /*
2  *
3  * Copyright 2019 The gRPC Authors
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 #include <algorithm>
22 #include <memory>
23 #include <mutex>
24 #include <random>
25 #include <thread>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/atm.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/time.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 #include <grpcpp/health_check_service_interface.h>
37 #include <grpcpp/server.h>
38 #include <grpcpp/server_builder.h>
39 #include <gtest/gtest.h>
40
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/gpr/env.h"
43
44 #include "src/proto/grpc/testing/echo.grpc.pb.h"
45 #include "test/core/util/port.h"
46 #include "test/core/util/test_config.h"
47 #include "test/cpp/end2end/test_service_impl.h"
48 #include "test/cpp/util/test_credentials_provider.h"
49
50 #ifdef GRPC_CFSTREAM
51 using grpc::ClientAsyncResponseReader;
52 using grpc::testing::EchoRequest;
53 using grpc::testing::EchoResponse;
54 using grpc::testing::RequestParams;
55 using std::chrono::system_clock;
56
57 namespace grpc {
58 namespace testing {
59 namespace {
60
61 struct TestScenario {
62   TestScenario(const std::string& creds_type, const std::string& content)
63       : credentials_type(creds_type), message_content(content) {}
64   const std::string credentials_type;
65   const std::string message_content;
66 };
67
68 class CFStreamTest : public ::testing::TestWithParam<TestScenario> {
69  protected:
70   CFStreamTest()
71       : server_host_("grpctest"),
72         interface_("lo0"),
73         ipv4_address_("10.0.0.1") {}
74
75   void DNSUp() {
76     std::ostringstream cmd;
77     // Add DNS entry for server_host_ in /etc/hosts
78     cmd << "echo '" << ipv4_address_ << "      " << server_host_
79         << "  ' | sudo tee -a /etc/hosts";
80     std::system(cmd.str().c_str());
81   }
82
83   void DNSDown() {
84     std::ostringstream cmd;
85     // Remove DNS entry for server_host_ in /etc/hosts
86     cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts";
87     std::system(cmd.str().c_str());
88   }
89
90   void InterfaceUp() {
91     std::ostringstream cmd;
92     cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_;
93     std::system(cmd.str().c_str());
94   }
95
96   void InterfaceDown() {
97     std::ostringstream cmd;
98     cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_;
99     std::system(cmd.str().c_str());
100   }
101
102   void NetworkUp() {
103     gpr_log(GPR_DEBUG, "Bringing network up");
104     InterfaceUp();
105     DNSUp();
106   }
107
108   void NetworkDown() {
109     gpr_log(GPR_DEBUG, "Bringing network down");
110     InterfaceDown();
111     DNSDown();
112   }
113
114   void SetUp() override {
115     NetworkUp();
116     grpc_init();
117     StartServer();
118   }
119
120   void TearDown() override {
121     NetworkDown();
122     StopServer();
123     grpc_shutdown();
124   }
125
126   void StartServer() {
127     port_ = grpc_pick_unused_port_or_die();
128     server_.reset(new ServerData(port_, GetParam().credentials_type));
129     server_->Start(server_host_);
130   }
131   void StopServer() { server_->Shutdown(); }
132
133   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
134       const std::shared_ptr<Channel>& channel) {
135     return grpc::testing::EchoTestService::NewStub(channel);
136   }
137
138   std::shared_ptr<Channel> BuildChannel() {
139     std::ostringstream server_address;
140     server_address << server_host_ << ":" << port_;
141     ChannelArguments args;
142     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
143         GetParam().credentials_type, &args);
144     return CreateCustomChannel(server_address.str(), channel_creds, args);
145   }
146
147   void SendRpc(
148       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
149       bool expect_success = false) {
150     auto response = std::unique_ptr<EchoResponse>(new EchoResponse());
151     EchoRequest request;
152     auto& msg = GetParam().message_content;
153     request.set_message(msg);
154     ClientContext context;
155     Status status = stub->Echo(&context, request, response.get());
156     if (status.ok()) {
157       gpr_log(GPR_DEBUG, "RPC with succeeded");
158       EXPECT_EQ(msg, response->message());
159     } else {
160       gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str());
161     }
162     if (expect_success) {
163       EXPECT_TRUE(status.ok());
164     }
165   }
166   void SendAsyncRpc(
167       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
168       RequestParams param = RequestParams()) {
169     EchoRequest request;
170     request.set_message(GetParam().message_content);
171     *request.mutable_param() = std::move(param);
172     AsyncClientCall* call = new AsyncClientCall;
173
174     call->response_reader =
175         stub->PrepareAsyncEcho(&call->context, request, &cq_);
176
177     call->response_reader->StartCall();
178     call->response_reader->Finish(&call->reply, &call->status, (void*)call);
179   }
180
181   void ShutdownCQ() { cq_.Shutdown(); }
182
183   bool CQNext(void** tag, bool* ok) {
184     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10);
185     auto ret = cq_.AsyncNext(tag, ok, deadline);
186     if (ret == grpc::CompletionQueue::GOT_EVENT) {
187       return true;
188     } else if (ret == grpc::CompletionQueue::SHUTDOWN) {
189       return false;
190     } else {
191       GPR_ASSERT(ret == grpc::CompletionQueue::TIMEOUT);
192       // This can happen if we hit the Apple CFStream bug which results in the
193       // read stream freezing. We are ignoring hangs and timeouts, but these
194       // tests are still useful as they can catch memory memory corruptions,
195       // crashes and other bugs that don't result in test freeze/timeout.
196       return false;
197     }
198   }
199
200   bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
201     const gpr_timespec deadline =
202         grpc_timeout_seconds_to_deadline(timeout_seconds);
203     grpc_connectivity_state state;
204     while ((state = channel->GetState(false /* try_to_connect */)) ==
205            GRPC_CHANNEL_READY) {
206       if (!channel->WaitForStateChange(state, deadline)) return false;
207     }
208     return true;
209   }
210
211   bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) {
212     const gpr_timespec deadline =
213         grpc_timeout_seconds_to_deadline(timeout_seconds);
214     grpc_connectivity_state state;
215     while ((state = channel->GetState(true /* try_to_connect */)) !=
216            GRPC_CHANNEL_READY) {
217       if (!channel->WaitForStateChange(state, deadline)) return false;
218     }
219     return true;
220   }
221
222   struct AsyncClientCall {
223     EchoResponse reply;
224     ClientContext context;
225     Status status;
226     std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
227   };
228
229  private:
230   struct ServerData {
231     int port_;
232     const std::string creds_;
233     std::unique_ptr<Server> server_;
234     TestServiceImpl service_;
235     std::unique_ptr<std::thread> thread_;
236     bool server_ready_ = false;
237
238     ServerData(int port, const std::string& creds)
239         : port_(port), creds_(creds) {}
240
241     void Start(const std::string& server_host) {
242       gpr_log(GPR_INFO, "starting server on port %d", port_);
243       std::mutex mu;
244       std::unique_lock<std::mutex> lock(mu);
245       std::condition_variable cond;
246       thread_.reset(new std::thread(
247           std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
248       cond.wait(lock, [this] { return server_ready_; });
249       server_ready_ = false;
250       gpr_log(GPR_INFO, "server startup complete");
251     }
252
253     void Serve(const std::string& server_host, std::mutex* mu,
254                std::condition_variable* cond) {
255       std::ostringstream server_address;
256       server_address << server_host << ":" << port_;
257       ServerBuilder builder;
258       auto server_creds =
259           GetCredentialsProvider()->GetServerCredentials(creds_);
260       builder.AddListeningPort(server_address.str(), server_creds);
261       builder.RegisterService(&service_);
262       server_ = builder.BuildAndStart();
263       std::lock_guard<std::mutex> lock(*mu);
264       server_ready_ = true;
265       cond->notify_one();
266     }
267
268     void Shutdown(bool join = true) {
269       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
270       if (join) thread_->join();
271     }
272   };
273
274   CompletionQueue cq_;
275   const std::string server_host_;
276   const std::string interface_;
277   const std::string ipv4_address_;
278   std::unique_ptr<ServerData> server_;
279   int port_;
280 };
281
282 std::vector<TestScenario> CreateTestScenarios() {
283   std::vector<TestScenario> scenarios;
284   std::vector<std::string> credentials_types;
285   std::vector<std::string> messages;
286
287   credentials_types.push_back(kInsecureCredentialsType);
288   auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
289   for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
290     credentials_types.push_back(*sec);
291   }
292
293   messages.push_back("🖖");
294   for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024; k *= 32) {
295     std::string big_msg;
296     for (size_t i = 0; i < k * 1024; ++i) {
297       char c = 'a' + (i % 26);
298       big_msg += c;
299     }
300     messages.push_back(big_msg);
301   }
302   for (auto cred = credentials_types.begin(); cred != credentials_types.end();
303        ++cred) {
304     for (auto msg = messages.begin(); msg != messages.end(); msg++) {
305       scenarios.emplace_back(*cred, *msg);
306     }
307   }
308
309   return scenarios;
310 }
311
312 INSTANTIATE_TEST_SUITE_P(CFStreamTest, CFStreamTest,
313                          ::testing::ValuesIn(CreateTestScenarios()));
314
315 // gRPC should automatically detech network flaps (without enabling keepalives)
316 //  when CFStream is enabled
317 TEST_P(CFStreamTest, NetworkTransition) {
318   auto channel = BuildChannel();
319   auto stub = BuildStub(channel);
320   // Channel should be in READY state after we send an RPC
321   SendRpc(stub, /*expect_success=*/true);
322   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
323
324   std::atomic_bool shutdown{false};
325   std::thread sender = std::thread([this, &stub, &shutdown]() {
326     while (true) {
327       if (shutdown.load()) {
328         return;
329       }
330       SendRpc(stub);
331       std::this_thread::sleep_for(std::chrono::milliseconds(1000));
332     }
333   });
334
335   // bring down network
336   NetworkDown();
337
338   // network going down should be detected by cfstream
339   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
340
341   // bring network interface back up
342   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
343   NetworkUp();
344
345   // channel should reconnect
346   EXPECT_TRUE(WaitForChannelReady(channel.get()));
347   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
348   shutdown.store(true);
349   sender.join();
350 }
351
352 // Network flaps while RPCs are in flight
353 TEST_P(CFStreamTest, NetworkFlapRpcsInFlight) {
354   auto channel = BuildChannel();
355   auto stub = BuildStub(channel);
356   std::atomic_int rpcs_sent{0};
357
358   // Channel should be in READY state after we send some RPCs
359   for (int i = 0; i < 10; ++i) {
360     RequestParams param;
361     param.set_skip_cancelled_check(true);
362     SendAsyncRpc(stub, param);
363     ++rpcs_sent;
364   }
365   EXPECT_TRUE(WaitForChannelReady(channel.get()));
366
367   // Bring down the network
368   NetworkDown();
369
370   std::thread thd = std::thread([this, &rpcs_sent]() {
371     void* got_tag;
372     bool ok = false;
373     bool network_down = true;
374     int total_completions = 0;
375
376     while (CQNext(&got_tag, &ok)) {
377       ++total_completions;
378       GPR_ASSERT(ok);
379       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
380       if (!call->status.ok()) {
381         gpr_log(GPR_DEBUG, "RPC failed with error: %s",
382                 call->status.error_message().c_str());
383         // Bring network up when RPCs start failing
384         if (network_down) {
385           NetworkUp();
386           network_down = false;
387         }
388       } else {
389         gpr_log(GPR_DEBUG, "RPC succeeded");
390       }
391       delete call;
392     }
393     // Remove line below and uncomment the following line after Apple CFStream
394     // bug has been fixed.
395     (void)rpcs_sent;
396     // EXPECT_EQ(total_completions, rpcs_sent);
397   });
398
399   for (int i = 0; i < 100; ++i) {
400     RequestParams param;
401     param.set_skip_cancelled_check(true);
402     SendAsyncRpc(stub, param);
403     std::this_thread::sleep_for(std::chrono::milliseconds(10));
404     ++rpcs_sent;
405   }
406
407   ShutdownCQ();
408
409   thd.join();
410 }
411
412 // Send a bunch of RPCs, some of which are expected to fail.
413 // We should get back a response for all RPCs
414 TEST_P(CFStreamTest, ConcurrentRpc) {
415   auto channel = BuildChannel();
416   auto stub = BuildStub(channel);
417   std::atomic_int rpcs_sent{0};
418   std::thread thd = std::thread([this, &rpcs_sent]() {
419     void* got_tag;
420     bool ok = false;
421     int total_completions = 0;
422
423     while (CQNext(&got_tag, &ok)) {
424       ++total_completions;
425       GPR_ASSERT(ok);
426       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
427       if (!call->status.ok()) {
428         gpr_log(GPR_DEBUG, "RPC failed with error: %s",
429                 call->status.error_message().c_str());
430         // Bring network up when RPCs start failing
431       } else {
432         gpr_log(GPR_DEBUG, "RPC succeeded");
433       }
434       delete call;
435     }
436     // Remove line below and uncomment the following line after Apple CFStream
437     // bug has been fixed.
438     (void)rpcs_sent;
439     // EXPECT_EQ(total_completions, rpcs_sent);
440   });
441
442   for (int i = 0; i < 10; ++i) {
443     if (i % 3 == 0) {
444       RequestParams param;
445       ErrorStatus* error = param.mutable_expected_error();
446       error->set_code(StatusCode::INTERNAL);
447       error->set_error_message("internal error");
448       SendAsyncRpc(stub, param);
449     } else if (i % 5 == 0) {
450       RequestParams param;
451       param.set_echo_metadata(true);
452       DebugInfo* info = param.mutable_debug_info();
453       info->add_stack_entries("stack_entry1");
454       info->add_stack_entries("stack_entry2");
455       info->set_detail("detailed debug info");
456       SendAsyncRpc(stub, param);
457     } else {
458       SendAsyncRpc(stub);
459     }
460     ++rpcs_sent;
461   }
462
463   ShutdownCQ();
464
465   thd.join();
466 }
467
468 }  // namespace
469 }  // namespace testing
470 }  // namespace grpc
471 #endif  // GRPC_CFSTREAM
472
473 int main(int argc, char** argv) {
474   ::testing::InitGoogleTest(&argc, argv);
475   grpc::testing::TestEnvironment env(argc, argv);
476   gpr_setenv("grpc_cfstream", "1");
477   const auto result = RUN_ALL_TESTS();
478   return result;
479 }