Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / test / core / iomgr / fd_posix_test.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_EV
23
24 #include <ctype.h>
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <netinet/in.h>
28 #include <poll.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <sys/time.h>
34 #include <unistd.h>
35
36 #include <grpc/grpc.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/sync.h>
40 #include <grpc/support/time.h>
41
42 #include "src/core/lib/iomgr/ev_posix.h"
43 #include "src/core/lib/iomgr/iomgr.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "test/core/util/test_config.h"
46
47 static gpr_mu* g_mu;
48 static grpc_pollset* g_pollset;
49
50 /* buffer size used to send and receive data.
51    1024 is the minimal value to set TCP send and receive buffer. */
52 #define BUF_SIZE 1024
53
54 /* Create a test socket with the right properties for testing.
55    port is the TCP port to listen or connect to.
56    Return a socket FD and sockaddr_in. */
57 static void create_test_socket(int port, int* socket_fd,
58                                struct sockaddr_in* sin) {
59   int fd;
60   int one = 1;
61   int buffer_size_bytes = BUF_SIZE;
62   int flags;
63
64   fd = socket(AF_INET, SOCK_STREAM, 0);
65   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
66   /* Reset the size of socket send buffer to the minimal value to facilitate
67      buffer filling up and triggering notify_on_write  */
68   GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
69   GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
70   /* Make fd non-blocking */
71   flags = fcntl(fd, F_GETFL, 0);
72   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
73   *socket_fd = fd;
74
75   /* Use local address for test */
76   sin->sin_family = AF_INET;
77   sin->sin_addr.s_addr = htonl(0x7f000001);
78   GPR_ASSERT(port >= 0 && port < 65536);
79   sin->sin_port = htons(static_cast<uint16_t>(port));
80 }
81
82 /* Phony gRPC callback */
83 void no_op_cb(void* /*arg*/, int /*success*/) {}
84
85 /* =======An upload server to test notify_on_read===========
86    The server simply reads and counts a stream of bytes. */
87
88 /* An upload server. */
89 typedef struct {
90   grpc_fd* em_fd;           /* listening fd */
91   ssize_t read_bytes_total; /* total number of received bytes */
92   int done;                 /* set to 1 when a server finishes serving */
93   grpc_closure listen_closure;
94 } server;
95
96 static void server_init(server* sv) {
97   sv->read_bytes_total = 0;
98   sv->done = 0;
99 }
100
101 /* An upload session.
102    Created when a new upload request arrives in the server. */
103 typedef struct {
104   server* sv;              /* not owned by a single session */
105   grpc_fd* em_fd;          /* fd to read upload bytes */
106   char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
107   grpc_closure session_read_closure;
108 } session;
109
110 /* Called when an upload session can be safely shutdown.
111    Close session FD and start to shutdown listen FD. */
112 static void session_shutdown_cb(void* arg, /*session */
113                                 bool /*success*/) {
114   session* se = static_cast<session*>(arg);
115   server* sv = se->sv;
116   grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
117   gpr_free(se);
118   /* Start to shutdown listen fd. */
119   grpc_fd_shutdown(sv->em_fd,
120                    GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
121 }
122
123 /* Called when data become readable in a session. */
124 static void session_read_cb(void* arg, /*session */
125                             grpc_error_handle error) {
126   session* se = static_cast<session*>(arg);
127   int fd = grpc_fd_wrapped_fd(se->em_fd);
128
129   ssize_t read_once = 0;
130   ssize_t read_total = 0;
131
132   if (error != GRPC_ERROR_NONE) {
133     session_shutdown_cb(arg, true);
134     return;
135   }
136
137   do {
138     read_once = read(fd, se->read_buf, BUF_SIZE);
139     if (read_once > 0) read_total += read_once;
140   } while (read_once > 0);
141   se->sv->read_bytes_total += read_total;
142
143   /* read() returns 0 to indicate the TCP connection was closed by the client.
144      read(fd, read_buf, 0) also returns 0 which should never be called as such.
145      It is possible to read nothing due to spurious edge event or data has
146      been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
147   if (read_once == 0) {
148     session_shutdown_cb(arg, true);
149   } else if (read_once == -1) {
150     if (errno == EAGAIN) {
151       /* An edge triggered event is cached in the kernel until next poll.
152          In the current single thread implementation, session_read_cb is called
153          in the polling thread, such that polling only happens after this
154          callback, and will catch read edge event if data is available again
155          before notify_on_read.
156          TODO(chenw): in multi-threaded version, callback and polling can be
157          run in different threads. polling may catch a persist read edge event
158          before notify_on_read is called.  */
159       grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
160     } else {
161       gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
162       abort();
163     }
164   }
165 }
166
167 /* Called when the listen FD can be safely shutdown.
168    Close listen FD and signal that server can be shutdown. */
169 static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
170   server* sv = static_cast<server*>(arg);
171
172   grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
173
174   gpr_mu_lock(g_mu);
175   sv->done = 1;
176   GPR_ASSERT(
177       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
178   gpr_mu_unlock(g_mu);
179 }
180
181 /* Called when a new TCP connection request arrives in the listening port. */
182 static void listen_cb(void* arg, /*=sv_arg*/
183                       grpc_error_handle error) {
184   server* sv = static_cast<server*>(arg);
185   int fd;
186   int flags;
187   session* se;
188   struct sockaddr_storage ss;
189   socklen_t slen = sizeof(ss);
190   grpc_fd* listen_em_fd = sv->em_fd;
191
192   if (error != GRPC_ERROR_NONE) {
193     listen_shutdown_cb(arg, 1);
194     return;
195   }
196
197   fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
198               reinterpret_cast<struct sockaddr*>(&ss), &slen);
199   GPR_ASSERT(fd >= 0);
200   GPR_ASSERT(fd < FD_SETSIZE);
201   flags = fcntl(fd, F_GETFL, 0);
202   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
203   se = static_cast<session*>(gpr_malloc(sizeof(*se)));
204   se->sv = sv;
205   se->em_fd = grpc_fd_create(fd, "listener", false);
206   grpc_pollset_add_fd(g_pollset, se->em_fd);
207   GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
208                     grpc_schedule_on_exec_ctx);
209   grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
210
211   grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
212 }
213
214 /* Max number of connections pending to be accepted by listen(). */
215 #define MAX_NUM_FD 1024
216
217 /* Start a test server, return the TCP listening port bound to listen_fd.
218    listen_cb() is registered to be interested in reading from listen_fd.
219    When connection request arrives, listen_cb() is called to accept the
220    connection request. */
221 static int server_start(server* sv) {
222   int port = 0;
223   int fd;
224   struct sockaddr_in sin;
225   socklen_t addr_len;
226
227   create_test_socket(port, &fd, &sin);
228   addr_len = sizeof(sin);
229   GPR_ASSERT(bind(fd, (struct sockaddr*)&sin, addr_len) == 0);
230   GPR_ASSERT(getsockname(fd, (struct sockaddr*)&sin, &addr_len) == 0);
231   port = ntohs(sin.sin_port);
232   GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
233
234   sv->em_fd = grpc_fd_create(fd, "server", false);
235   grpc_pollset_add_fd(g_pollset, sv->em_fd);
236   /* Register to be interested in reading from listen_fd. */
237   GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
238                     grpc_schedule_on_exec_ctx);
239   grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
240
241   return port;
242 }
243
244 /* Wait and shutdown a sever. */
245 static void server_wait_and_shutdown(server* sv) {
246   gpr_mu_lock(g_mu);
247   while (!sv->done) {
248     grpc_core::ExecCtx exec_ctx;
249     grpc_pollset_worker* worker = nullptr;
250     GPR_ASSERT(GRPC_LOG_IF_ERROR(
251         "pollset_work",
252         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
253     gpr_mu_unlock(g_mu);
254
255     gpr_mu_lock(g_mu);
256   }
257   gpr_mu_unlock(g_mu);
258 }
259
260 /* ===An upload client to test notify_on_write=== */
261
262 /* Client write buffer size */
263 #define CLIENT_WRITE_BUF_SIZE 10
264 /* Total number of times that the client fills up the write buffer */
265 #define CLIENT_TOTAL_WRITE_CNT 3
266
267 /* An upload client. */
268 typedef struct {
269   grpc_fd* em_fd;
270   char write_buf[CLIENT_WRITE_BUF_SIZE];
271   ssize_t write_bytes_total;
272   /* Number of times that the client fills up the write buffer and calls
273      notify_on_write to schedule another write. */
274   int client_write_cnt;
275
276   int done; /* set to 1 when a client finishes sending */
277   grpc_closure write_closure;
278 } client;
279
280 static void client_init(client* cl) {
281   memset(cl->write_buf, 0, sizeof(cl->write_buf));
282   cl->write_bytes_total = 0;
283   cl->client_write_cnt = 0;
284   cl->done = 0;
285 }
286
287 /* Called when a client upload session is ready to shutdown. */
288 static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
289   client* cl = static_cast<client*>(arg);
290   grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
291   cl->done = 1;
292   GPR_ASSERT(
293       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
294 }
295
296 /* Write as much as possible, then register notify_on_write. */
297 static void client_session_write(void* arg, /*client */
298                                  grpc_error_handle error) {
299   client* cl = static_cast<client*>(arg);
300   int fd = grpc_fd_wrapped_fd(cl->em_fd);
301   ssize_t write_once = 0;
302
303   if (error != GRPC_ERROR_NONE) {
304     gpr_mu_lock(g_mu);
305     client_session_shutdown_cb(arg, 1);
306     gpr_mu_unlock(g_mu);
307     return;
308   }
309
310   do {
311     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
312     if (write_once > 0) cl->write_bytes_total += write_once;
313   } while (write_once > 0);
314
315   if (errno == EAGAIN) {
316     gpr_mu_lock(g_mu);
317     if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
318       GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
319                         grpc_schedule_on_exec_ctx);
320       grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
321       cl->client_write_cnt++;
322     } else {
323       client_session_shutdown_cb(arg, 1);
324     }
325     gpr_mu_unlock(g_mu);
326   } else {
327     gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
328     abort();
329   }
330 }
331
332 /* Start a client to send a stream of bytes. */
333 static void client_start(client* cl, int port) {
334   int fd;
335   struct sockaddr_in sin;
336   create_test_socket(port, &fd, &sin);
337   if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
338       -1) {
339     if (errno == EINPROGRESS) {
340       struct pollfd pfd;
341       pfd.fd = fd;
342       pfd.events = POLLOUT;
343       pfd.revents = 0;
344       if (poll(&pfd, 1, -1) == -1) {
345         gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
346         abort();
347       }
348     } else {
349       gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
350       abort();
351     }
352   }
353
354   cl->em_fd = grpc_fd_create(fd, "client", false);
355   grpc_pollset_add_fd(g_pollset, cl->em_fd);
356
357   client_session_write(cl, GRPC_ERROR_NONE);
358 }
359
360 /* Wait for the signal to shutdown a client. */
361 static void client_wait_and_shutdown(client* cl) {
362   gpr_mu_lock(g_mu);
363   while (!cl->done) {
364     grpc_pollset_worker* worker = nullptr;
365     grpc_core::ExecCtx exec_ctx;
366     GPR_ASSERT(GRPC_LOG_IF_ERROR(
367         "pollset_work",
368         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
369     gpr_mu_unlock(g_mu);
370
371     gpr_mu_lock(g_mu);
372   }
373   gpr_mu_unlock(g_mu);
374 }
375
376 /* Test grpc_fd. Start an upload server and client, upload a stream of
377    bytes from the client to the server, and verify that the total number of
378    sent bytes is equal to the total number of received bytes. */
379 static void test_grpc_fd(void) {
380   server sv;
381   client cl;
382   int port;
383   grpc_core::ExecCtx exec_ctx;
384
385   server_init(&sv);
386   port = server_start(&sv);
387   client_init(&cl);
388   client_start(&cl, port);
389
390   client_wait_and_shutdown(&cl);
391   server_wait_and_shutdown(&sv);
392   GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
393   gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
394 }
395
396 typedef struct fd_change_data {
397   grpc_iomgr_cb_func cb_that_ran;
398 } fd_change_data;
399
400 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
401
402 void destroy_change_data(fd_change_data* /*fdc*/) {}
403
404 static void first_read_callback(void* arg /* fd_change_data */,
405                                 grpc_error_handle /*error*/) {
406   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
407
408   gpr_mu_lock(g_mu);
409   fdc->cb_that_ran = first_read_callback;
410   GPR_ASSERT(
411       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
412   gpr_mu_unlock(g_mu);
413 }
414
415 static void second_read_callback(void* arg /* fd_change_data */,
416                                  grpc_error_handle /*error*/) {
417   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
418
419   gpr_mu_lock(g_mu);
420   fdc->cb_that_ran = second_read_callback;
421   GPR_ASSERT(
422       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
423   gpr_mu_unlock(g_mu);
424 }
425
426 /* Test that changing the callback we use for notify_on_read actually works.
427    Note that we have two different but almost identical callbacks above -- the
428    point is to have two different function pointers and two different data
429    pointers and make sure that changing both really works. */
430 static void test_grpc_fd_change(void) {
431   grpc_fd* em_fd;
432   fd_change_data a, b;
433   int flags;
434   int sv[2];
435   char data;
436   ssize_t result;
437   grpc_closure first_closure;
438   grpc_closure second_closure;
439   grpc_core::ExecCtx exec_ctx;
440
441   GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
442                     grpc_schedule_on_exec_ctx);
443   GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
444                     grpc_schedule_on_exec_ctx);
445
446   init_change_data(&a);
447   init_change_data(&b);
448
449   GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
450   flags = fcntl(sv[0], F_GETFL, 0);
451   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
452   flags = fcntl(sv[1], F_GETFL, 0);
453   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
454
455   em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
456   grpc_pollset_add_fd(g_pollset, em_fd);
457
458   /* Register the first callback, then make its FD readable */
459   grpc_fd_notify_on_read(em_fd, &first_closure);
460   data = 0;
461   result = write(sv[1], &data, 1);
462   GPR_ASSERT(result == 1);
463
464   /* And now wait for it to run. */
465   gpr_mu_lock(g_mu);
466   while (a.cb_that_ran == nullptr) {
467     grpc_pollset_worker* worker = nullptr;
468     GPR_ASSERT(GRPC_LOG_IF_ERROR(
469         "pollset_work",
470         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
471     gpr_mu_unlock(g_mu);
472
473     gpr_mu_lock(g_mu);
474   }
475   GPR_ASSERT(a.cb_that_ran == first_read_callback);
476   gpr_mu_unlock(g_mu);
477
478   /* And drain the socket so we can generate a new read edge */
479   result = read(sv[0], &data, 1);
480   GPR_ASSERT(result == 1);
481
482   /* Now register a second callback with distinct change data, and do the same
483      thing again. */
484   grpc_fd_notify_on_read(em_fd, &second_closure);
485   data = 0;
486   result = write(sv[1], &data, 1);
487   GPR_ASSERT(result == 1);
488
489   gpr_mu_lock(g_mu);
490   while (b.cb_that_ran == nullptr) {
491     grpc_pollset_worker* worker = nullptr;
492     GPR_ASSERT(GRPC_LOG_IF_ERROR(
493         "pollset_work",
494         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
495     gpr_mu_unlock(g_mu);
496
497     gpr_mu_lock(g_mu);
498   }
499   /* Except now we verify that second_read_callback ran instead */
500   GPR_ASSERT(b.cb_that_ran == second_read_callback);
501   gpr_mu_unlock(g_mu);
502
503   grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
504
505   destroy_change_data(&a);
506   destroy_change_data(&b);
507   close(sv[1]);
508 }
509
510 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
511   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
512 }
513
514 int main(int argc, char** argv) {
515   grpc_closure destroyed;
516   grpc::testing::TestEnvironment env(argc, argv);
517   grpc_init();
518   {
519     grpc_core::ExecCtx exec_ctx;
520     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
521     grpc_pollset_init(g_pollset, &g_mu);
522     test_grpc_fd();
523     test_grpc_fd_change();
524     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
525                       grpc_schedule_on_exec_ctx);
526     grpc_pollset_shutdown(g_pollset, &destroyed);
527     grpc_core::ExecCtx::Get()->Flush();
528     gpr_free(g_pollset);
529   }
530   grpc_shutdown();
531   return 0;
532 }
533
534 #else /* GRPC_POSIX_SOCKET_EV */
535
536 int main(int argc, char** argv) { return 1; }
537
538 #endif /* GRPC_POSIX_SOCKET_EV */