3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "src/core/lib/iomgr/port.h"
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_EV
27 #include <netinet/in.h>
32 #include <sys/socket.h>
36 #include <grpc/grpc.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/sync.h>
40 #include <grpc/support/time.h>
42 #include "src/core/lib/iomgr/ev_posix.h"
43 #include "src/core/lib/iomgr/iomgr.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "test/core/util/test_config.h"
48 static grpc_pollset* g_pollset;
50 /* buffer size used to send and receive data.
51 1024 is the minimal value to set TCP send and receive buffer. */
54 /* Create a test socket with the right properties for testing.
55 port is the TCP port to listen or connect to.
56 Return a socket FD and sockaddr_in. */
57 static void create_test_socket(int port, int* socket_fd,
58 struct sockaddr_in* sin) {
61 int buffer_size_bytes = BUF_SIZE;
64 fd = socket(AF_INET, SOCK_STREAM, 0);
65 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
66 /* Reset the size of socket send buffer to the minimal value to facilitate
67 buffer filling up and triggering notify_on_write */
68 GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
69 GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
70 /* Make fd non-blocking */
71 flags = fcntl(fd, F_GETFL, 0);
72 GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
75 /* Use local address for test */
76 sin->sin_family = AF_INET;
77 sin->sin_addr.s_addr = htonl(0x7f000001);
78 GPR_ASSERT(port >= 0 && port < 65536);
79 sin->sin_port = htons(static_cast<uint16_t>(port));
82 /* Phony gRPC callback */
83 void no_op_cb(void* /*arg*/, int /*success*/) {}
85 /* =======An upload server to test notify_on_read===========
86 The server simply reads and counts a stream of bytes. */
88 /* An upload server. */
90 grpc_fd* em_fd; /* listening fd */
91 ssize_t read_bytes_total; /* total number of received bytes */
92 int done; /* set to 1 when a server finishes serving */
93 grpc_closure listen_closure;
96 static void server_init(server* sv) {
97 sv->read_bytes_total = 0;
101 /* An upload session.
102 Created when a new upload request arrives in the server. */
104 server* sv; /* not owned by a single session */
105 grpc_fd* em_fd; /* fd to read upload bytes */
106 char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
107 grpc_closure session_read_closure;
110 /* Called when an upload session can be safely shutdown.
111 Close session FD and start to shutdown listen FD. */
112 static void session_shutdown_cb(void* arg, /*session */
114 session* se = static_cast<session*>(arg);
116 grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
118 /* Start to shutdown listen fd. */
119 grpc_fd_shutdown(sv->em_fd,
120 GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
123 /* Called when data become readable in a session. */
124 static void session_read_cb(void* arg, /*session */
125 grpc_error_handle error) {
126 session* se = static_cast<session*>(arg);
127 int fd = grpc_fd_wrapped_fd(se->em_fd);
129 ssize_t read_once = 0;
130 ssize_t read_total = 0;
132 if (error != GRPC_ERROR_NONE) {
133 session_shutdown_cb(arg, true);
138 read_once = read(fd, se->read_buf, BUF_SIZE);
139 if (read_once > 0) read_total += read_once;
140 } while (read_once > 0);
141 se->sv->read_bytes_total += read_total;
143 /* read() returns 0 to indicate the TCP connection was closed by the client.
144 read(fd, read_buf, 0) also returns 0 which should never be called as such.
145 It is possible to read nothing due to spurious edge event or data has
146 been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
147 if (read_once == 0) {
148 session_shutdown_cb(arg, true);
149 } else if (read_once == -1) {
150 if (errno == EAGAIN) {
151 /* An edge triggered event is cached in the kernel until next poll.
152 In the current single thread implementation, session_read_cb is called
153 in the polling thread, such that polling only happens after this
154 callback, and will catch read edge event if data is available again
155 before notify_on_read.
156 TODO(chenw): in multi-threaded version, callback and polling can be
157 run in different threads. polling may catch a persist read edge event
158 before notify_on_read is called. */
159 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
161 gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
167 /* Called when the listen FD can be safely shutdown.
168 Close listen FD and signal that server can be shutdown. */
169 static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
170 server* sv = static_cast<server*>(arg);
172 grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
177 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
181 /* Called when a new TCP connection request arrives in the listening port. */
182 static void listen_cb(void* arg, /*=sv_arg*/
183 grpc_error_handle error) {
184 server* sv = static_cast<server*>(arg);
188 struct sockaddr_storage ss;
189 socklen_t slen = sizeof(ss);
190 grpc_fd* listen_em_fd = sv->em_fd;
192 if (error != GRPC_ERROR_NONE) {
193 listen_shutdown_cb(arg, 1);
197 fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
198 reinterpret_cast<struct sockaddr*>(&ss), &slen);
200 GPR_ASSERT(fd < FD_SETSIZE);
201 flags = fcntl(fd, F_GETFL, 0);
202 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
203 se = static_cast<session*>(gpr_malloc(sizeof(*se)));
205 se->em_fd = grpc_fd_create(fd, "listener", false);
206 grpc_pollset_add_fd(g_pollset, se->em_fd);
207 GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
208 grpc_schedule_on_exec_ctx);
209 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
211 grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
214 /* Max number of connections pending to be accepted by listen(). */
215 #define MAX_NUM_FD 1024
217 /* Start a test server, return the TCP listening port bound to listen_fd.
218 listen_cb() is registered to be interested in reading from listen_fd.
219 When connection request arrives, listen_cb() is called to accept the
220 connection request. */
221 static int server_start(server* sv) {
224 struct sockaddr_in sin;
227 create_test_socket(port, &fd, &sin);
228 addr_len = sizeof(sin);
229 GPR_ASSERT(bind(fd, (struct sockaddr*)&sin, addr_len) == 0);
230 GPR_ASSERT(getsockname(fd, (struct sockaddr*)&sin, &addr_len) == 0);
231 port = ntohs(sin.sin_port);
232 GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
234 sv->em_fd = grpc_fd_create(fd, "server", false);
235 grpc_pollset_add_fd(g_pollset, sv->em_fd);
236 /* Register to be interested in reading from listen_fd. */
237 GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
238 grpc_schedule_on_exec_ctx);
239 grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
244 /* Wait and shutdown a sever. */
245 static void server_wait_and_shutdown(server* sv) {
248 grpc_core::ExecCtx exec_ctx;
249 grpc_pollset_worker* worker = nullptr;
250 GPR_ASSERT(GRPC_LOG_IF_ERROR(
252 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
260 /* ===An upload client to test notify_on_write=== */
262 /* Client write buffer size */
263 #define CLIENT_WRITE_BUF_SIZE 10
264 /* Total number of times that the client fills up the write buffer */
265 #define CLIENT_TOTAL_WRITE_CNT 3
267 /* An upload client. */
270 char write_buf[CLIENT_WRITE_BUF_SIZE];
271 ssize_t write_bytes_total;
272 /* Number of times that the client fills up the write buffer and calls
273 notify_on_write to schedule another write. */
274 int client_write_cnt;
276 int done; /* set to 1 when a client finishes sending */
277 grpc_closure write_closure;
280 static void client_init(client* cl) {
281 memset(cl->write_buf, 0, sizeof(cl->write_buf));
282 cl->write_bytes_total = 0;
283 cl->client_write_cnt = 0;
287 /* Called when a client upload session is ready to shutdown. */
288 static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
289 client* cl = static_cast<client*>(arg);
290 grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
293 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
296 /* Write as much as possible, then register notify_on_write. */
297 static void client_session_write(void* arg, /*client */
298 grpc_error_handle error) {
299 client* cl = static_cast<client*>(arg);
300 int fd = grpc_fd_wrapped_fd(cl->em_fd);
301 ssize_t write_once = 0;
303 if (error != GRPC_ERROR_NONE) {
305 client_session_shutdown_cb(arg, 1);
311 write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
312 if (write_once > 0) cl->write_bytes_total += write_once;
313 } while (write_once > 0);
315 if (errno == EAGAIN) {
317 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
318 GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
319 grpc_schedule_on_exec_ctx);
320 grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
321 cl->client_write_cnt++;
323 client_session_shutdown_cb(arg, 1);
327 gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
332 /* Start a client to send a stream of bytes. */
333 static void client_start(client* cl, int port) {
335 struct sockaddr_in sin;
336 create_test_socket(port, &fd, &sin);
337 if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
339 if (errno == EINPROGRESS) {
342 pfd.events = POLLOUT;
344 if (poll(&pfd, 1, -1) == -1) {
345 gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
349 gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
354 cl->em_fd = grpc_fd_create(fd, "client", false);
355 grpc_pollset_add_fd(g_pollset, cl->em_fd);
357 client_session_write(cl, GRPC_ERROR_NONE);
360 /* Wait for the signal to shutdown a client. */
361 static void client_wait_and_shutdown(client* cl) {
364 grpc_pollset_worker* worker = nullptr;
365 grpc_core::ExecCtx exec_ctx;
366 GPR_ASSERT(GRPC_LOG_IF_ERROR(
368 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
376 /* Test grpc_fd. Start an upload server and client, upload a stream of
377 bytes from the client to the server, and verify that the total number of
378 sent bytes is equal to the total number of received bytes. */
379 static void test_grpc_fd(void) {
383 grpc_core::ExecCtx exec_ctx;
386 port = server_start(&sv);
388 client_start(&cl, port);
390 client_wait_and_shutdown(&cl);
391 server_wait_and_shutdown(&sv);
392 GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
393 gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
396 typedef struct fd_change_data {
397 grpc_iomgr_cb_func cb_that_ran;
400 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
402 void destroy_change_data(fd_change_data* /*fdc*/) {}
404 static void first_read_callback(void* arg /* fd_change_data */,
405 grpc_error_handle /*error*/) {
406 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
409 fdc->cb_that_ran = first_read_callback;
411 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
415 static void second_read_callback(void* arg /* fd_change_data */,
416 grpc_error_handle /*error*/) {
417 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
420 fdc->cb_that_ran = second_read_callback;
422 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
426 /* Test that changing the callback we use for notify_on_read actually works.
427 Note that we have two different but almost identical callbacks above -- the
428 point is to have two different function pointers and two different data
429 pointers and make sure that changing both really works. */
430 static void test_grpc_fd_change(void) {
437 grpc_closure first_closure;
438 grpc_closure second_closure;
439 grpc_core::ExecCtx exec_ctx;
441 GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
442 grpc_schedule_on_exec_ctx);
443 GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
444 grpc_schedule_on_exec_ctx);
446 init_change_data(&a);
447 init_change_data(&b);
449 GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
450 flags = fcntl(sv[0], F_GETFL, 0);
451 GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
452 flags = fcntl(sv[1], F_GETFL, 0);
453 GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
455 em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
456 grpc_pollset_add_fd(g_pollset, em_fd);
458 /* Register the first callback, then make its FD readable */
459 grpc_fd_notify_on_read(em_fd, &first_closure);
461 result = write(sv[1], &data, 1);
462 GPR_ASSERT(result == 1);
464 /* And now wait for it to run. */
466 while (a.cb_that_ran == nullptr) {
467 grpc_pollset_worker* worker = nullptr;
468 GPR_ASSERT(GRPC_LOG_IF_ERROR(
470 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
475 GPR_ASSERT(a.cb_that_ran == first_read_callback);
478 /* And drain the socket so we can generate a new read edge */
479 result = read(sv[0], &data, 1);
480 GPR_ASSERT(result == 1);
482 /* Now register a second callback with distinct change data, and do the same
484 grpc_fd_notify_on_read(em_fd, &second_closure);
486 result = write(sv[1], &data, 1);
487 GPR_ASSERT(result == 1);
490 while (b.cb_that_ran == nullptr) {
491 grpc_pollset_worker* worker = nullptr;
492 GPR_ASSERT(GRPC_LOG_IF_ERROR(
494 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
499 /* Except now we verify that second_read_callback ran instead */
500 GPR_ASSERT(b.cb_that_ran == second_read_callback);
503 grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
505 destroy_change_data(&a);
506 destroy_change_data(&b);
510 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
511 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
514 int main(int argc, char** argv) {
515 grpc_closure destroyed;
516 grpc::testing::TestEnvironment env(argc, argv);
519 grpc_core::ExecCtx exec_ctx;
520 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
521 grpc_pollset_init(g_pollset, &g_mu);
523 test_grpc_fd_change();
524 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
525 grpc_schedule_on_exec_ctx);
526 grpc_pollset_shutdown(g_pollset, &destroyed);
527 grpc_core::ExecCtx::Get()->Flush();
534 #else /* GRPC_POSIX_SOCKET_EV */
536 int main(int argc, char** argv) { return 1; }
538 #endif /* GRPC_POSIX_SOCKET_EV */