3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
24 #include <grpc/support/port_platform.h>
26 #include "src/core/lib/iomgr/port.h"
28 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
35 #include <sys/socket.h>
37 #include <sys/types.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/string_util.h>
43 #include <grpc/support/sync.h>
44 #include <grpc/support/time.h>
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/memory.h"
49 #include "src/core/lib/iomgr/exec_ctx.h"
50 #include "src/core/lib/iomgr/resolve_address.h"
51 #include "src/core/lib/iomgr/sockaddr.h"
52 #include "src/core/lib/iomgr/sockaddr_utils.h"
53 #include "src/core/lib/iomgr/socket_utils_posix.h"
54 #include "src/core/lib/iomgr/tcp_posix.h"
55 #include "src/core/lib/iomgr/tcp_server.h"
56 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
57 #include "src/core/lib/iomgr/unix_sockets_posix.h"
59 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
60 const grpc_channel_args* args,
61 grpc_tcp_server** server) {
63 static_cast<grpc_tcp_server*>(gpr_zalloc(sizeof(grpc_tcp_server)));
64 s->so_reuseport = grpc_is_socket_reuse_port_supported();
65 s->expand_wildcard_addrs = false;
66 for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
67 if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
68 if (args->args[i].type == GRPC_ARG_INTEGER) {
69 s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
70 (args->args[i].value.integer != 0);
73 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
74 " must be an integer");
76 } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
77 if (args->args[i].type == GRPC_ARG_INTEGER) {
78 s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
81 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
82 GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
86 gpr_ref_init(&s->refs, 1);
89 s->destroyed_ports = 0;
91 s->shutdown_starting.head = nullptr;
92 s->shutdown_starting.tail = nullptr;
93 s->shutdown_complete = shutdown_complete;
94 s->on_accept_cb = nullptr;
95 s->on_accept_cb_arg = nullptr;
99 s->channel_args = grpc_channel_args_copy(args);
100 s->fd_handler = nullptr;
101 gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
103 return GRPC_ERROR_NONE;
106 static void finish_shutdown(grpc_tcp_server* s) {
108 GPR_ASSERT(s->shutdown);
109 gpr_mu_unlock(&s->mu);
110 if (s->shutdown_complete != nullptr) {
111 GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
114 gpr_mu_destroy(&s->mu);
117 grpc_tcp_listener* sp = s->head;
121 grpc_channel_args_destroy(s->channel_args);
122 grpc_core::Delete(s->fd_handler);
127 static void destroyed_port(void* server, grpc_error* error) {
128 grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
130 s->destroyed_ports++;
131 if (s->destroyed_ports == s->nports) {
132 gpr_mu_unlock(&s->mu);
135 GPR_ASSERT(s->destroyed_ports < s->nports);
136 gpr_mu_unlock(&s->mu);
140 /* called when all listening endpoints have been shutdown, so no further
141 events will be received on them - at this point it's safe to destroy
143 static void deactivated_all_ports(grpc_tcp_server* s) {
144 /* delete ALL the things */
147 GPR_ASSERT(s->shutdown);
150 grpc_tcp_listener* sp;
151 for (sp = s->head; sp; sp = sp->next) {
152 grpc_unlink_if_unix_domain_socket(&sp->addr);
153 GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
154 grpc_schedule_on_exec_ctx);
155 grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
156 "tcp_listener_shutdown");
158 gpr_mu_unlock(&s->mu);
160 gpr_mu_unlock(&s->mu);
165 static void tcp_server_destroy(grpc_tcp_server* s) {
168 GPR_ASSERT(!s->shutdown);
171 /* shutdown all fd's */
172 if (s->active_ports) {
173 grpc_tcp_listener* sp;
174 for (sp = s->head; sp; sp = sp->next) {
176 sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
178 gpr_mu_unlock(&s->mu);
180 gpr_mu_unlock(&s->mu);
181 deactivated_all_ports(s);
185 /* event manager callback when reads are ready */
186 static void on_read(void* arg, grpc_error* err) {
187 grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
188 grpc_pollset* read_notifier_pollset;
189 if (err != GRPC_ERROR_NONE) {
193 /* loop until accept4 returns EAGAIN, and then re-arm notification */
195 grpc_resolved_address addr;
198 memset(&addr, 0, sizeof(addr));
199 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
200 /* Note: If we ever decide to return this address to the user, remember to
201 strip off the ::ffff:0.0.0.0/96 prefix first. */
202 int fd = grpc_accept4(sp->fd, &addr, 1, 1);
208 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
211 gpr_mu_lock(&sp->server->mu);
212 if (!sp->server->shutdown_listeners) {
213 gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
215 /* if we have shutdown listeners, accept4 could fail, and we
216 needn't notify users */
218 gpr_mu_unlock(&sp->server->mu);
223 /* For UNIX sockets, the accept call might not fill up the member sun_path
224 * of sockaddr_un, so explicitly call getsockname to get it. */
225 if (grpc_is_unix_socket(&addr)) {
226 memset(&addr, 0, sizeof(addr));
227 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
228 if (getsockname(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
230 gpr_log(GPR_ERROR, "Failed getsockname: %s", strerror(errno));
236 grpc_set_socket_no_sigpipe_if_possible(fd);
238 addr_str = grpc_sockaddr_to_uri(&addr);
239 gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
241 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
242 gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
245 grpc_fd* fdobj = grpc_fd_create(fd, name, true);
247 read_notifier_pollset =
248 sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
249 &sp->server->next_pollset_to_assign, 1)) %
250 sp->server->pollset_count];
252 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
255 grpc_tcp_server_acceptor* acceptor =
256 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
257 acceptor->from_server = sp->server;
258 acceptor->port_index = sp->port_index;
259 acceptor->fd_index = sp->fd_index;
260 acceptor->external_connection = false;
262 sp->server->on_accept_cb(
263 sp->server->on_accept_cb_arg,
264 grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
265 read_notifier_pollset, acceptor);
271 GPR_UNREACHABLE_CODE(return );
274 gpr_mu_lock(&sp->server->mu);
275 if (0 == --sp->server->active_ports && sp->server->shutdown) {
276 gpr_mu_unlock(&sp->server->mu);
277 deactivated_all_ports(sp->server);
279 gpr_mu_unlock(&sp->server->mu);
283 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
284 static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
288 grpc_resolved_address wild4;
289 grpc_resolved_address wild6;
290 unsigned fd_index = 0;
291 grpc_dualstack_mode dsmode;
292 grpc_tcp_listener* sp = nullptr;
293 grpc_tcp_listener* sp2 = nullptr;
294 grpc_error* v6_err = GRPC_ERROR_NONE;
295 grpc_error* v4_err = GRPC_ERROR_NONE;
298 if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
299 return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
303 grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
304 /* Try listening on IPv6 first. */
305 if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
306 &dsmode, &sp)) == GRPC_ERROR_NONE) {
308 requested_port = *out_port = sp->port;
309 if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
310 return GRPC_ERROR_NONE;
313 /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
314 grpc_sockaddr_set_port(&wild4, requested_port);
315 if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
316 &dsmode, &sp2)) == GRPC_ERROR_NONE) {
317 *out_port = sp2->port;
324 if (v6_err != GRPC_ERROR_NONE) {
326 "Failed to add :: listener, "
327 "the environment may not support IPv6: %s",
328 grpc_error_string(v6_err));
329 GRPC_ERROR_UNREF(v6_err);
331 if (v4_err != GRPC_ERROR_NONE) {
333 "Failed to add 0.0.0.0 listener, "
334 "the environment may not support IPv4: %s",
335 grpc_error_string(v4_err));
336 GRPC_ERROR_UNREF(v4_err);
338 return GRPC_ERROR_NONE;
340 grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
341 "Failed to add any wildcard listeners");
342 GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE);
343 root_err = grpc_error_add_child(root_err, v6_err);
344 root_err = grpc_error_add_child(root_err, v4_err);
349 static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
350 grpc_tcp_listener* sp = nullptr;
355 for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
356 l->fd_index += count;
359 for (unsigned i = 0; i < count; i++) {
362 grpc_dualstack_mode dsmode;
363 err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
365 if (err != GRPC_ERROR_NONE) return err;
366 err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
368 if (err != GRPC_ERROR_NONE) return err;
369 listener->server->nports++;
370 grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
371 gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
372 sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
373 sp->next = listener->next;
375 /* sp (the new listener) is a sibling of 'listener' (the original
378 sp->sibling = listener->sibling;
379 listener->sibling = sp;
380 sp->server = listener->server;
382 sp->emfd = grpc_fd_create(fd, name, true);
383 memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
385 sp->port_index = listener->port_index;
386 sp->fd_index = listener->fd_index + count - i;
387 GPR_ASSERT(sp->emfd);
388 while (listener->server->tail->next != nullptr) {
389 listener->server->tail = listener->server->tail->next;
395 return GRPC_ERROR_NONE;
398 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
399 const grpc_resolved_address* addr,
401 grpc_tcp_listener* sp;
402 grpc_resolved_address sockname_temp;
403 grpc_resolved_address addr6_v4mapped;
404 int requested_port = grpc_sockaddr_get_port(addr);
405 unsigned port_index = 0;
406 grpc_dualstack_mode dsmode;
409 if (s->tail != nullptr) {
410 port_index = s->tail->port_index + 1;
412 grpc_unlink_if_unix_domain_socket(addr);
414 /* Check if this is a wildcard port, and if so, try to keep the port the same
415 as some previously created listener. */
416 if (requested_port == 0) {
417 for (sp = s->head; sp; sp = sp->next) {
419 static_cast<socklen_t>(sizeof(struct sockaddr_storage));
422 reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
423 &sockname_temp.len)) {
424 int used_port = grpc_sockaddr_get_port(&sockname_temp);
426 memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
427 grpc_sockaddr_set_port(&sockname_temp, used_port);
428 requested_port = used_port;
429 addr = &sockname_temp;
435 if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
436 return add_wildcard_addrs_to_server(s, port_index, requested_port,
439 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
440 addr = &addr6_v4mapped;
442 if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
444 *out_port = sp->port;
449 /* Return listener at port_index or NULL. Should only be called with s->mu
451 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
452 unsigned port_index) {
453 unsigned num_ports = 0;
454 grpc_tcp_listener* sp;
455 for (sp = s->head; sp; sp = sp->next) {
456 if (!sp->is_sibling) {
457 if (++num_ports > port_index) {
465 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
466 unsigned num_fds = 0;
468 grpc_tcp_listener* sp = get_port_index(s, port_index);
469 for (; sp; sp = sp->sibling) {
472 gpr_mu_unlock(&s->mu);
476 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
479 grpc_tcp_listener* sp = get_port_index(s, port_index);
480 for (; sp; sp = sp->sibling, --fd_index) {
482 gpr_mu_unlock(&s->mu);
486 gpr_mu_unlock(&s->mu);
490 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
491 size_t pollset_count,
492 grpc_tcp_server_cb on_accept_cb,
493 void* on_accept_cb_arg) {
495 grpc_tcp_listener* sp;
496 GPR_ASSERT(on_accept_cb);
498 GPR_ASSERT(!s->on_accept_cb);
499 GPR_ASSERT(s->active_ports == 0);
500 s->on_accept_cb = on_accept_cb;
501 s->on_accept_cb_arg = on_accept_cb_arg;
502 s->pollsets = pollsets;
503 s->pollset_count = pollset_count;
505 while (sp != nullptr) {
506 if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
508 GPR_ASSERT(GRPC_LOG_IF_ERROR(
509 "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
510 for (i = 0; i < pollset_count; i++) {
511 grpc_pollset_add_fd(pollsets[i], sp->emfd);
512 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
513 grpc_schedule_on_exec_ctx);
514 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
519 for (i = 0; i < pollset_count; i++) {
520 grpc_pollset_add_fd(pollsets[i], sp->emfd);
522 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
523 grpc_schedule_on_exec_ctx);
524 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
529 gpr_mu_unlock(&s->mu);
532 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
533 gpr_ref_non_zero(&s->refs);
537 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
538 grpc_closure* shutdown_starting) {
540 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
542 gpr_mu_unlock(&s->mu);
545 static void tcp_server_unref(grpc_tcp_server* s) {
546 if (gpr_unref(&s->refs)) {
547 grpc_tcp_server_shutdown_listeners(s);
549 GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
550 gpr_mu_unlock(&s->mu);
551 tcp_server_destroy(s);
555 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
557 s->shutdown_listeners = true;
558 /* shutdown all fd's */
559 if (s->active_ports) {
560 grpc_tcp_listener* sp;
561 for (sp = s->head; sp; sp = sp->next) {
562 grpc_fd_shutdown(sp->emfd,
563 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
566 gpr_mu_unlock(&s->mu);
570 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
572 explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
574 // TODO(yangg) resolve duplicate code with on_read
575 void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
576 grpc_pollset* read_notifier_pollset;
577 grpc_resolved_address addr;
580 memset(&addr, 0, sizeof(addr));
581 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
582 grpc_core::ExecCtx exec_ctx;
584 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
586 gpr_log(GPR_ERROR, "Failed getpeername: %s", strerror(errno));
590 grpc_set_socket_no_sigpipe_if_possible(fd);
591 addr_str = grpc_sockaddr_to_uri(&addr);
592 gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
593 if (grpc_tcp_trace.enabled()) {
594 gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
597 grpc_fd* fdobj = grpc_fd_create(fd, name, true);
598 read_notifier_pollset =
599 s_->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
600 &s_->next_pollset_to_assign, 1)) %
602 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
603 grpc_tcp_server_acceptor* acceptor =
604 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
605 acceptor->from_server = s_;
606 acceptor->port_index = -1;
607 acceptor->fd_index = -1;
608 acceptor->external_connection = true;
609 acceptor->listener_fd = listener_fd;
610 acceptor->pending_data = buf;
611 s_->on_accept_cb(s_->on_accept_cb_arg,
612 grpc_tcp_create(fdobj, s_->channel_args, addr_str),
613 read_notifier_pollset, acceptor);
623 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
624 grpc_tcp_server* s) {
625 s->fd_handler = grpc_core::New<ExternalConnectionHandler>(s);
626 return s->fd_handler;
629 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
630 tcp_server_create, tcp_server_start,
631 tcp_server_add_port, tcp_server_create_fd_handler,
632 tcp_server_port_fd_count, tcp_server_port_fd,
633 tcp_server_ref, tcp_server_shutdown_starting_add,
634 tcp_server_unref, tcp_server_shutdown_listeners};
636 #endif /* GRPC_POSIX_SOCKET_TCP_SERVER */