4cd87825f36a804d32f2ca911476f8a65947d725
[platform/upstream/grpc.git] / test / core / tsi / alts / handshaker / alts_concurrent_connectivity_test.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <fcntl.h>
22 #include <gmock/gmock.h>
23 #include <netinet/in.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <functional>
31 #include <set>
32 #include <thread>
33
34 #include "absl/memory/memory.h"
35 #include "absl/strings/str_cat.h"
36
37 #include <grpc/grpc.h>
38 #include <grpc/grpc_security.h>
39 #include <grpc/slice.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/string_util.h>
43 #include <grpc/support/time.h>
44
45 #include <grpcpp/impl/codegen/service_type.h>
46 #include <grpcpp/server_builder.h>
47
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/host_port.h"
50 #include "src/core/lib/gprpp/thd.h"
51 #include "src/core/lib/iomgr/error.h"
52 #include "src/core/lib/security/credentials/alts/alts_credentials.h"
53 #include "src/core/lib/security/credentials/credentials.h"
54 #include "src/core/lib/security/security_connector/alts/alts_security_connector.h"
55 #include "src/core/lib/slice/slice_string_helpers.h"
56
57 #include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h"
58 #include "test/core/util/memory_counters.h"
59 #include "test/core/util/port.h"
60 #include "test/core/util/test_config.h"
61
62 #include "test/core/end2end/cq_verifier.h"
63
64 namespace {
65
66 const int kFakeHandshakeServerMaxConcurrentStreams = 40;
67
68 void drain_cq(grpc_completion_queue* cq) {
69   grpc_event ev;
70   do {
71     ev = grpc_completion_queue_next(
72         cq, grpc_timeout_milliseconds_to_deadline(5000), nullptr);
73   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
74 }
75
76 grpc_channel* create_secure_channel_for_test(
77     const char* server_addr, const char* fake_handshake_server_addr,
78     int reconnect_backoff_ms) {
79   grpc_alts_credentials_options* alts_options =
80       grpc_alts_credentials_client_options_create();
81   grpc_channel_credentials* channel_creds =
82       grpc_alts_credentials_create_customized(alts_options,
83                                               fake_handshake_server_addr,
84                                               true /* enable_untrusted_alts */);
85   grpc_alts_credentials_options_destroy(alts_options);
86   // The main goal of these tests are to stress concurrent ALTS handshakes,
87   // so we prevent subchnannel sharing.
88   std::vector<grpc_arg> new_args;
89   new_args.push_back(grpc_channel_arg_integer_create(
90       const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
91   if (reconnect_backoff_ms != 0) {
92     new_args.push_back(grpc_channel_arg_integer_create(
93         const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms"),
94         reconnect_backoff_ms));
95   }
96   grpc_channel_args* channel_args =
97       grpc_channel_args_copy_and_add(nullptr, new_args.data(), new_args.size());
98   grpc_channel* channel = grpc_secure_channel_create(channel_creds, server_addr,
99                                                      channel_args, nullptr);
100   grpc_channel_args_destroy(channel_args);
101   grpc_channel_credentials_release(channel_creds);
102   return channel;
103 }
104
105 class FakeHandshakeServer {
106  public:
107   explicit FakeHandshakeServer(bool check_num_concurrent_rpcs) {
108     int port = grpc_pick_unused_port_or_die();
109     address_ = grpc_core::JoinHostPort("localhost", port);
110     if (check_num_concurrent_rpcs) {
111       service_ = grpc::gcp::
112           CreateFakeHandshakerService(kFakeHandshakeServerMaxConcurrentStreams /* expected max concurrent rpcs */);
113     } else {
114       service_ = grpc::gcp::CreateFakeHandshakerService(
115           0 /* expected max concurrent rpcs unset */);
116     }
117     grpc::ServerBuilder builder;
118     builder.AddListeningPort(address_.c_str(),
119                              grpc::InsecureServerCredentials());
120     builder.RegisterService(service_.get());
121     // TODO(apolcyn): when removing the global concurrent handshake limiting
122     // queue, set MAX_CONCURRENT_STREAMS on this server.
123     server_ = builder.BuildAndStart();
124     gpr_log(GPR_INFO, "Fake handshaker server listening on %s",
125             address_.c_str());
126   }
127
128   ~FakeHandshakeServer() {
129     server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
130   }
131
132   const char* address() { return address_.c_str(); }
133
134  private:
135   std::string address_;
136   std::unique_ptr<grpc::Service> service_;
137   std::unique_ptr<grpc::Server> server_;
138 };
139
140 class TestServer {
141  public:
142   explicit TestServer()
143       : fake_handshake_server_(true /* check num concurrent rpcs */) {
144     grpc_alts_credentials_options* alts_options =
145         grpc_alts_credentials_server_options_create();
146     grpc_server_credentials* server_creds =
147         grpc_alts_server_credentials_create_customized(
148             alts_options, fake_handshake_server_.address(),
149             true /* enable_untrusted_alts */);
150     grpc_alts_credentials_options_destroy(alts_options);
151     server_ = grpc_server_create(nullptr, nullptr);
152     server_cq_ = grpc_completion_queue_create_for_next(nullptr);
153     grpc_server_register_completion_queue(server_, server_cq_, nullptr);
154     int port = grpc_pick_unused_port_or_die();
155     server_addr_ = grpc_core::JoinHostPort("localhost", port);
156     GPR_ASSERT(grpc_server_add_secure_http2_port(server_, server_addr_.c_str(),
157                                                  server_creds));
158     grpc_server_credentials_release(server_creds);
159     grpc_server_start(server_);
160     gpr_log(GPR_DEBUG, "Start TestServer %p. listen on %s", this,
161             server_addr_.c_str());
162     server_thd_ = absl::make_unique<std::thread>(PollUntilShutdown, this);
163   }
164
165   ~TestServer() {
166     gpr_log(GPR_DEBUG, "Begin dtor of TestServer %p", this);
167     grpc_server_shutdown_and_notify(server_, server_cq_, this);
168     server_thd_->join();
169     grpc_server_destroy(server_);
170     grpc_completion_queue_shutdown(server_cq_);
171     drain_cq(server_cq_);
172     grpc_completion_queue_destroy(server_cq_);
173   }
174
175   const char* address() { return server_addr_.c_str(); }
176
177   static void PollUntilShutdown(const TestServer* self) {
178     grpc_event ev = grpc_completion_queue_next(
179         self->server_cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
180     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
181     GPR_ASSERT(ev.tag == self);
182     gpr_log(GPR_DEBUG, "TestServer %p stop polling", self);
183   }
184
185  private:
186   grpc_server* server_;
187   grpc_completion_queue* server_cq_;
188   std::unique_ptr<std::thread> server_thd_;
189   std::string server_addr_;
190   // Give this test server its own ALTS handshake server
191   // so that we avoid competing for ALTS handshake server resources (e.g.
192   // available HTTP2 streams on a globally shared handshaker subchannel)
193   // with clients that are trying to do mutual ALTS handshakes
194   // with this server (which could "deadlock" mutual handshakes).
195   // TODO(apolcyn): remove this workaround from this test and have
196   // clients/servers share a single fake handshake server if
197   // the underlying issue needs to be fixed.
198   FakeHandshakeServer fake_handshake_server_;
199 };
200
201 class ConnectLoopRunner {
202  public:
203   explicit ConnectLoopRunner(
204       const char* server_address, const char* fake_handshake_server_addr,
205       int per_connect_deadline_seconds, size_t loops,
206       grpc_connectivity_state expected_connectivity_states,
207       int reconnect_backoff_ms)
208       : server_address_(grpc_core::UniquePtr<char>(gpr_strdup(server_address))),
209         fake_handshake_server_addr_(
210             grpc_core::UniquePtr<char>(gpr_strdup(fake_handshake_server_addr))),
211         per_connect_deadline_seconds_(per_connect_deadline_seconds),
212         loops_(loops),
213         expected_connectivity_states_(expected_connectivity_states),
214         reconnect_backoff_ms_(reconnect_backoff_ms) {
215     thd_ = absl::make_unique<std::thread>(ConnectLoop, this);
216   }
217
218   ~ConnectLoopRunner() { thd_->join(); }
219
220   static void ConnectLoop(const ConnectLoopRunner* self) {
221     for (size_t i = 0; i < self->loops_; i++) {
222       gpr_log(GPR_DEBUG, "runner:%p connect_loop begin loop %ld", self, i);
223       grpc_completion_queue* cq =
224           grpc_completion_queue_create_for_next(nullptr);
225       grpc_channel* channel = create_secure_channel_for_test(
226           self->server_address_.get(), self->fake_handshake_server_addr_.get(),
227           self->reconnect_backoff_ms_);
228       // Connect, forcing an ALTS handshake
229       gpr_timespec connect_deadline =
230           grpc_timeout_seconds_to_deadline(self->per_connect_deadline_seconds_);
231       grpc_connectivity_state state =
232           grpc_channel_check_connectivity_state(channel, 1);
233       ASSERT_EQ(state, GRPC_CHANNEL_IDLE);
234       while (state != self->expected_connectivity_states_) {
235         if (self->expected_connectivity_states_ ==
236             GRPC_CHANNEL_TRANSIENT_FAILURE) {
237           ASSERT_NE(state, GRPC_CHANNEL_READY);  // sanity check
238         } else {
239           ASSERT_EQ(self->expected_connectivity_states_, GRPC_CHANNEL_READY);
240         }
241         grpc_channel_watch_connectivity_state(
242             channel, state, gpr_inf_future(GPR_CLOCK_REALTIME), cq, nullptr);
243         grpc_event ev =
244             grpc_completion_queue_next(cq, connect_deadline, nullptr);
245         ASSERT_EQ(ev.type, GRPC_OP_COMPLETE)
246             << "connect_loop runner:" << std::hex << self
247             << " got ev.type:" << ev.type << " i:" << i;
248         ASSERT_TRUE(ev.success);
249         grpc_connectivity_state prev_state = state;
250         state = grpc_channel_check_connectivity_state(channel, 1);
251         if (self->expected_connectivity_states_ ==
252                 GRPC_CHANNEL_TRANSIENT_FAILURE &&
253             prev_state == GRPC_CHANNEL_CONNECTING &&
254             state == GRPC_CHANNEL_CONNECTING) {
255           // Detect a race in state checking: if the watch_connectivity_state
256           // completed from prior state "connecting", this could be because the
257           // channel momentarily entered state "transient failure", which is
258           // what we want. However, if the channel immediately re-enters
259           // "connecting" state, then the new state check might still result in
260           // "connecting". A continuous repeat of this can cause this loop to
261           // never terminate in time. So take this scenario to indicate that the
262           // channel momentarily entered transient failure.
263           break;
264         }
265       }
266       grpc_channel_destroy(channel);
267       grpc_completion_queue_shutdown(cq);
268       drain_cq(cq);
269       grpc_completion_queue_destroy(cq);
270       gpr_log(GPR_DEBUG, "runner:%p connect_loop finished loop %ld", self, i);
271     }
272   }
273
274  private:
275   grpc_core::UniquePtr<char> server_address_;
276   grpc_core::UniquePtr<char> fake_handshake_server_addr_;
277   int per_connect_deadline_seconds_;
278   size_t loops_;
279   grpc_connectivity_state expected_connectivity_states_;
280   std::unique_ptr<std::thread> thd_;
281   int reconnect_backoff_ms_;
282 };
283
284 // Perform a few ALTS handshakes sequentially (using the fake, in-process ALTS
285 // handshake server).
286 TEST(AltsConcurrentConnectivityTest, TestBasicClientServerHandshakes) {
287   FakeHandshakeServer fake_handshake_server(
288       true /* check num concurrent rpcs */);
289   TestServer test_server;
290   {
291     ConnectLoopRunner runner(
292         test_server.address(), fake_handshake_server.address(),
293         5 /* per connect deadline seconds */, 10 /* loops */,
294         GRPC_CHANNEL_READY /* expected connectivity states */,
295         0 /* reconnect_backoff_ms unset */);
296   }
297 }
298
299 /* Run a bunch of concurrent ALTS handshakes on concurrent channels
300  * (using the fake, in-process handshake server). */
301 TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
302   FakeHandshakeServer fake_handshake_server(
303       true /* check num concurrent rpcs */);
304   // Test
305   {
306     TestServer test_server;
307     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
308     size_t num_concurrent_connects = 50;
309     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
310     gpr_log(GPR_DEBUG,
311             "start performing concurrent expected-to-succeed connects");
312     for (size_t i = 0; i < num_concurrent_connects; i++) {
313       connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
314           test_server.address(), fake_handshake_server.address(),
315           15 /* per connect deadline seconds */, 5 /* loops */,
316           GRPC_CHANNEL_READY /* expected connectivity states */,
317           0 /* reconnect_backoff_ms unset */));
318     }
319     connect_loop_runners.clear();
320     gpr_log(GPR_DEBUG,
321             "done performing concurrent expected-to-succeed connects");
322     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
323       gpr_log(GPR_DEBUG, "Test took longer than expected.");
324       abort();
325     }
326   }
327 }
328
329 class FakeTcpServer {
330  public:
331   enum ProcessReadResult {
332     CONTINUE_READING,
333     CLOSE_SOCKET,
334   };
335
336   enum class AcceptMode {
337     kWaitForClientToSendFirstBytes,  // useful for emulating ALTS based
338                                      // grpc servers
339     kEagerlySendSettings,  // useful for emulating insecure grpc servers (e.g.
340                            // ALTS handshake servers)
341   };
342
343   explicit FakeTcpServer(
344       AcceptMode accept_mode,
345       const std::function<ProcessReadResult(int, int, int)>& process_read_cb)
346       : accept_mode_(accept_mode), process_read_cb_(process_read_cb) {
347     port_ = grpc_pick_unused_port_or_die();
348     accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
349     address_ = absl::StrCat("[::]:", port_);
350     GPR_ASSERT(accept_socket_ != -1);
351     if (accept_socket_ == -1) {
352       gpr_log(GPR_ERROR, "Failed to create socket: %d", errno);
353       abort();
354     }
355     int val = 1;
356     if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val,
357                    sizeof(val)) != 0) {
358       gpr_log(GPR_ERROR,
359               "Failed to set SO_REUSEADDR on socket bound to [::1]:%d : %d",
360               port_, errno);
361       abort();
362     }
363     if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
364       gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno);
365       abort();
366     }
367     sockaddr_in6 addr;
368     memset(&addr, 0, sizeof(addr));
369     addr.sin6_family = AF_INET6;
370     addr.sin6_port = htons(port_);
371     (reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
372     if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
373              sizeof(addr)) != 0) {
374       gpr_log(GPR_ERROR, "Failed to bind socket to [::1]:%d : %d", port_,
375               errno);
376       abort();
377     }
378     if (listen(accept_socket_, 100)) {
379       gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
380               port_, errno);
381       abort();
382     }
383     gpr_event_init(&stop_ev_);
384     run_server_loop_thd_ = absl::make_unique<std::thread>(RunServerLoop, this);
385   }
386
387   ~FakeTcpServer() {
388     gpr_log(GPR_DEBUG,
389             "FakeTcpServer stop and "
390             "join server thread");
391     gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
392     run_server_loop_thd_->join();
393     gpr_log(GPR_DEBUG,
394             "FakeTcpServer join server "
395             "thread complete");
396   }
397
398   const char* address() { return address_.c_str(); }
399
400   static ProcessReadResult CloseSocketUponReceivingBytesFromPeer(
401       int bytes_received_size, int read_error, int s) {
402     if (bytes_received_size < 0 && read_error != EAGAIN &&
403         read_error != EWOULDBLOCK) {
404       gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
405               errno);
406       abort();
407     }
408     if (bytes_received_size >= 0) {
409       gpr_log(GPR_DEBUG,
410               "Fake TCP server received %d bytes from peer socket: %d. Close "
411               "the "
412               "connection.",
413               bytes_received_size, s);
414       return CLOSE_SOCKET;
415     }
416     return CONTINUE_READING;
417   }
418
419   static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size,
420                                                         int read_error, int s) {
421     if (bytes_received_size < 0 && read_error != EAGAIN &&
422         read_error != EWOULDBLOCK) {
423       gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
424               errno);
425       abort();
426     }
427     if (bytes_received_size == 0) {
428       // The peer has shut down the connection.
429       gpr_log(GPR_DEBUG,
430               "Fake TCP server received 0 bytes from peer socket: %d. Close "
431               "the "
432               "connection.",
433               s);
434       return CLOSE_SOCKET;
435     }
436     return CONTINUE_READING;
437   }
438
439   class FakeTcpServerPeer {
440    public:
441     explicit FakeTcpServerPeer(int fd) : fd_(fd) {}
442
443     ~FakeTcpServerPeer() { close(fd_); }
444
445     void MaybeContinueSendingSettings() {
446       // https://tools.ietf.org/html/rfc7540#section-4.1
447       const std::vector<uint8_t> kEmptyHttp2SettingsFrame = {
448           0x00, 0x00, 0x00,       // length
449           0x04,                   // settings type
450           0x00,                   // flags
451           0x00, 0x00, 0x00, 0x00  // stream identifier
452       };
453       if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) {
454         int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
455         int bytes_sent =
456             send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
457                  bytes_to_send, 0);
458         if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
459           gpr_log(GPR_ERROR,
460                   "Fake TCP server encountered unexpected error:%d |%s| "
461                   "sending %d bytes on fd:%d",
462                   errno, strerror(errno), bytes_to_send, fd_);
463           GPR_ASSERT(0);
464         } else if (bytes_sent > 0) {
465           total_bytes_sent_ += bytes_sent;
466           GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
467         }
468       }
469     }
470
471     int fd() { return fd_; }
472
473    private:
474     int fd_;
475     int total_bytes_sent_ = 0;
476   };
477
478   // Run a loop that periodically, every 10 ms:
479   //   1) Checks if there are any new TCP connections to accept.
480   //   2) Checks if any data has arrived yet on established connections,
481   //      and reads from them if so, processing the sockets as configured.
482   static void RunServerLoop(FakeTcpServer* self) {
483     std::set<std::unique_ptr<FakeTcpServerPeer>> peers;
484     while (!gpr_event_get(&self->stop_ev_)) {
485       int p = accept(self->accept_socket_, nullptr, nullptr);
486       if (p == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
487         gpr_log(GPR_ERROR, "Failed to accept connection: %d", errno);
488         abort();
489       }
490       if (p != -1) {
491         gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
492         if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
493           gpr_log(GPR_ERROR,
494                   "Failed to set O_NONBLOCK on peer socket:%d errno:%d", p,
495                   errno);
496           abort();
497         }
498         peers.insert(absl::make_unique<FakeTcpServerPeer>(p));
499       }
500       auto it = peers.begin();
501       while (it != peers.end()) {
502         FakeTcpServerPeer* peer = (*it).get();
503         if (self->accept_mode_ == AcceptMode::kEagerlySendSettings) {
504           peer->MaybeContinueSendingSettings();
505         }
506         char buf[100];
507         int bytes_received_size = recv(peer->fd(), buf, 100, 0);
508         ProcessReadResult r =
509             self->process_read_cb_(bytes_received_size, errno, peer->fd());
510         if (r == CLOSE_SOCKET) {
511           it = peers.erase(it);
512         } else {
513           GPR_ASSERT(r == CONTINUE_READING);
514           it++;
515         }
516       }
517       gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
518                                    gpr_time_from_millis(10, GPR_TIMESPAN)));
519     }
520     close(self->accept_socket_);
521   }
522
523  private:
524   int accept_socket_;
525   int port_;
526   gpr_event stop_ev_;
527   std::string address_;
528   std::unique_ptr<std::thread> run_server_loop_thd_;
529   const AcceptMode accept_mode_;
530   std::function<ProcessReadResult(int, int, int)> process_read_cb_;
531 };
532
533 /* This test is intended to make sure that ALTS handshakes we correctly
534  * fail fast when the security handshaker gets an error while reading
535  * from the remote peer, after having earlier sent the first bytes of the
536  * ALTS handshake to the peer, i.e. after getting into the middle of a
537  * handshake. */
538 TEST(AltsConcurrentConnectivityTest,
539      TestHandshakeFailsFastWhenPeerEndpointClosesConnectionAfterAccepting) {
540   // Don't enforce the number of concurrent rpcs for the fake handshake
541   // server in this test, because this test will involve handshake RPCs
542   // getting cancelled. Because there isn't explicit synchronization between
543   // an ALTS handshake client's RECV_STATUS op completing after call
544   // cancellation, and the corresponding fake handshake server's sync
545   // method handler returning, enforcing a limit on the number of active
546   // RPCs at the fake handshake server would be inherently racey.
547   FakeHandshakeServer fake_handshake_server(
548       false /* check num concurrent rpcs */);
549   // The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
550   // it waits for the client to send the first bytes.
551   FakeTcpServer fake_backend_server(
552       FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
553       FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
554   {
555     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
556     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
557     size_t num_concurrent_connects = 100;
558     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
559     for (size_t i = 0; i < num_concurrent_connects; i++) {
560       connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
561           fake_backend_server.address(), fake_handshake_server.address(),
562           10 /* per connect deadline seconds */, 3 /* loops */,
563           GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
564           0 /* reconnect_backoff_ms unset */));
565     }
566     connect_loop_runners.clear();
567     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
568     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
569       gpr_log(GPR_ERROR,
570               "Exceeded test deadline. ALTS handshakes might not be failing "
571               "fast when the peer endpoint closes the connection abruptly");
572       abort();
573     }
574   }
575 }
576
577 /* This test is intended to make sure that ALTS handshakes correctly
578  * fail fast when the ALTS handshake server fails incoming handshakes fast. */
579 TEST(AltsConcurrentConnectivityTest,
580      TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
581   // The fake_handshake_server emulates a broken ALTS handshaker, which
582   // is an insecure server. So send settings to the client eagerly.
583   FakeTcpServer fake_handshake_server(
584       FakeTcpServer::AcceptMode::kEagerlySendSettings,
585       FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
586   // The fake_backend_server emulates a secure (ALTS based) server, so wait
587   // for the client to send the first bytes.
588   FakeTcpServer fake_backend_server(
589       FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
590       FakeTcpServer::CloseSocketUponCloseFromPeer);
591   {
592     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
593     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
594     size_t num_concurrent_connects = 100;
595     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
596     for (size_t i = 0; i < num_concurrent_connects; i++) {
597       connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
598           fake_backend_server.address(), fake_handshake_server.address(),
599           20 /* per connect deadline seconds */, 2 /* loops */,
600           GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
601           0 /* reconnect_backoff_ms unset */));
602     }
603     connect_loop_runners.clear();
604     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
605     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
606       gpr_log(GPR_ERROR,
607               "Exceeded test deadline. ALTS handshakes might not be failing "
608               "fast when the handshake server closes new connections");
609       abort();
610     }
611   }
612 }
613
614 /* This test is intended to make sure that ALTS handshakes correctly
615  * fail fast when the ALTS handshake server is non-responsive, in which case
616  * the overall connection deadline kicks in. */
617 TEST(AltsConcurrentConnectivityTest,
618      TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
619   // fake_handshake_server emulates an insecure server, so send settings first.
620   // It will be unresponsive for the rest of the connection, though.
621   FakeTcpServer fake_handshake_server(
622       FakeTcpServer::AcceptMode::kEagerlySendSettings,
623       FakeTcpServer::CloseSocketUponCloseFromPeer);
624   // fake_backend_server emulates an ALTS based server, so wait for the client
625   // to send the first bytes.
626   FakeTcpServer fake_backend_server(
627       FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
628       FakeTcpServer::CloseSocketUponCloseFromPeer);
629   {
630     gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
631     std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
632     size_t num_concurrent_connects = 100;
633     gpr_log(GPR_DEBUG, "start performing concurrent expected-to-fail connects");
634     for (size_t i = 0; i < num_concurrent_connects; i++) {
635       connect_loop_runners.push_back(absl::make_unique<ConnectLoopRunner>(
636           fake_backend_server.address(), fake_handshake_server.address(),
637           10 /* per connect deadline seconds */, 2 /* loops */,
638           GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */,
639           100 /* reconnect_backoff_ms */));
640     }
641     connect_loop_runners.clear();
642     gpr_log(GPR_DEBUG, "done performing concurrent expected-to-fail connects");
643     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), test_deadline) > 0) {
644       gpr_log(GPR_ERROR,
645               "Exceeded test deadline. ALTS handshakes might not be failing "
646               "fast when the handshake server is non-response timeout occurs");
647       abort();
648     }
649   }
650 }
651
652 }  // namespace
653
654 int main(int argc, char** argv) {
655   ::testing::InitGoogleTest(&argc, argv);
656   grpc::testing::TestEnvironment env(argc, argv);
657   grpc_init();
658   auto result = RUN_ALL_TESTS();
659   grpc_shutdown();
660   return result;
661 }