3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
23 #ifdef GRPC_WINSOCK_SOCKET
25 #include "src/core/lib/iomgr/sockaddr.h"
32 #include "absl/strings/str_cat.h"
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/log_windows.h>
37 #include <grpc/support/string_util.h>
38 #include <grpc/support/sync.h>
39 #include <grpc/support/time.h>
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/iomgr/iocp_windows.h"
43 #include "src/core/lib/iomgr/pollset_windows.h"
44 #include "src/core/lib/iomgr/resolve_address.h"
45 #include "src/core/lib/iomgr/sockaddr_utils.h"
46 #include "src/core/lib/iomgr/socket_windows.h"
47 #include "src/core/lib/iomgr/tcp_server.h"
48 #include "src/core/lib/iomgr/tcp_windows.h"
49 #include "src/core/lib/slice/slice_internal.h"
51 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
53 /* one listening port */
54 typedef struct grpc_tcp_listener grpc_tcp_listener;
55 struct grpc_tcp_listener {
56 /* This seemingly magic number comes from AcceptEx's documentation. each
57 address buffer needs to have at least 16 more bytes at their end. */
58 uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
59 /* This will hold the socket for the next accept. */
61 /* The listener winsocket. */
62 grpc_winsocket* socket;
63 /* The actual TCP port number. */
66 grpc_tcp_server* server;
67 /* The cached AcceptEx for that port. */
68 LPFN_ACCEPTEX AcceptEx;
70 int outstanding_calls;
71 /* closure for socket notification of accept being ready */
72 grpc_closure on_accept;
74 struct grpc_tcp_listener* next;
77 /* the overall server */
78 struct grpc_tcp_server {
80 /* Called whenever accept() succeeds on a server port. */
81 grpc_tcp_server_cb on_accept_cb;
82 void* on_accept_cb_arg;
86 /* active port count: how many ports are actually still listening */
89 /* linked list of server ports */
90 grpc_tcp_listener* head;
91 grpc_tcp_listener* tail;
93 /* List of closures passed to shutdown_starting_add(). */
94 grpc_closure_list shutdown_starting;
96 /* shutdown callback */
97 grpc_closure* shutdown_complete;
99 grpc_channel_args* channel_args;
102 /* Public function. Allocates the proper data structures to hold a
104 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
105 const grpc_channel_args* args,
106 grpc_tcp_server** server) {
107 grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
108 s->channel_args = grpc_channel_args_copy(args);
109 gpr_ref_init(&s->refs, 1);
112 s->on_accept_cb = NULL;
113 s->on_accept_cb_arg = NULL;
116 s->shutdown_starting.head = NULL;
117 s->shutdown_starting.tail = NULL;
118 s->shutdown_complete = shutdown_complete;
120 return GRPC_ERROR_NONE;
123 static void destroy_server(void* arg, grpc_error* error) {
124 grpc_tcp_server* s = (grpc_tcp_server*)arg;
126 /* Now that the accepts have been aborted, we can destroy the sockets.
127 The IOCP won't get notified on these, so we can flag them as already
128 closed by the system. */
130 grpc_tcp_listener* sp = s->head;
133 grpc_winsocket_destroy(sp->socket);
136 grpc_channel_args_destroy(s->channel_args);
137 gpr_mu_destroy(&s->mu);
141 static void finish_shutdown_locked(grpc_tcp_server* s) {
142 if (s->shutdown_complete != NULL) {
143 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
147 grpc_core::ExecCtx::Run(
149 GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
153 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
154 gpr_ref_non_zero(&s->refs);
158 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
159 grpc_closure* shutdown_starting) {
161 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
163 gpr_mu_unlock(&s->mu);
166 static void tcp_server_destroy(grpc_tcp_server* s) {
167 grpc_tcp_listener* sp;
170 /* First, shutdown all fd's. This will queue abortion calls for all
171 of the pending accepts due to the normal operation mechanism. */
172 if (s->active_ports == 0) {
173 finish_shutdown_locked(s);
175 for (sp = s->head; sp; sp = sp->next) {
176 sp->shutting_down = 1;
177 grpc_winsocket_shutdown(sp->socket);
180 gpr_mu_unlock(&s->mu);
183 static void tcp_server_unref(grpc_tcp_server* s) {
184 if (gpr_unref(&s->refs)) {
185 grpc_tcp_server_shutdown_listeners(s);
187 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
188 gpr_mu_unlock(&s->mu);
189 tcp_server_destroy(s);
193 /* Prepare (bind) a recently-created socket for listening. */
194 static grpc_error* prepare_socket(SOCKET sock,
195 const grpc_resolved_address* addr,
197 grpc_resolved_address sockname_temp;
198 grpc_error* error = GRPC_ERROR_NONE;
199 int sockname_temp_len;
201 error = grpc_tcp_prepare_socket(sock);
202 if (error != GRPC_ERROR_NONE) {
206 if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
208 error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
212 if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
213 error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
217 sockname_temp_len = sizeof(struct sockaddr_storage);
218 if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
219 &sockname_temp_len) == SOCKET_ERROR) {
220 error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
223 sockname_temp.len = (size_t)sockname_temp_len;
225 *port = grpc_sockaddr_get_port(&sockname_temp);
226 return GRPC_ERROR_NONE;
229 GPR_ASSERT(error != GRPC_ERROR_NONE);
232 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
233 "Failed to prepare server socket", &error, 1),
234 GRPC_ERROR_STR_TARGET_ADDRESS,
235 grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr))),
236 GRPC_ERROR_INT_FD, (intptr_t)sock);
237 GRPC_ERROR_UNREF(error);
238 if (sock != INVALID_SOCKET) closesocket(sock);
242 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
243 sp->shutting_down = 0;
244 GPR_ASSERT(sp->server->active_ports > 0);
245 if (0 == --sp->server->active_ports) {
246 finish_shutdown_locked(sp->server);
250 /* In order to do an async accept, we need to create a socket first which
251 will be the one assigned to the new incoming connection. */
252 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
253 SOCKET sock = INVALID_SOCKET;
255 DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
256 DWORD bytes_received = 0;
257 grpc_error* error = GRPC_ERROR_NONE;
259 if (port->shutting_down) {
260 return GRPC_ERROR_NONE;
263 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
264 grpc_get_default_wsa_socket_flags());
265 if (sock == INVALID_SOCKET) {
266 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
270 error = grpc_tcp_prepare_socket(sock);
271 if (error != GRPC_ERROR_NONE) goto failure;
273 /* Start the "accept" asynchronously. */
274 success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
275 addrlen, addrlen, &bytes_received,
276 &port->socket->read_info.overlapped);
278 /* It is possible to get an accept immediately without delay. However, we
279 will still get an IOCP notification for it. So let's just ignore it. */
281 int last_error = WSAGetLastError();
282 if (last_error != ERROR_IO_PENDING) {
283 error = GRPC_WSA_ERROR(last_error, "AcceptEx");
288 /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
289 immediately process an accept that happened in the meantime. */
290 port->new_socket = sock;
291 grpc_socket_notify_on_read(port->socket, &port->on_accept);
292 port->outstanding_calls++;
296 GPR_ASSERT(error != GRPC_ERROR_NONE);
297 if (sock != INVALID_SOCKET) closesocket(sock);
301 /* Event manager callback when reads are ready. */
302 static void on_accept(void* arg, grpc_error* error) {
303 grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
304 SOCKET sock = sp->new_socket;
305 grpc_winsocket_callback_info* info = &sp->socket->read_info;
306 grpc_endpoint* ep = NULL;
307 grpc_resolved_address peer_name;
308 DWORD transfered_bytes;
313 gpr_mu_lock(&sp->server->mu);
315 peer_name.len = sizeof(struct sockaddr_storage);
317 /* The general mechanism for shutting down is to queue abortion calls. While
318 this is necessary in the read/write case, it's useless for the accept
319 case. We only need to adjust the pending callback count */
320 if (error != GRPC_ERROR_NONE) {
321 const char* msg = grpc_error_string(error);
322 gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
324 gpr_mu_unlock(&sp->server->mu);
328 /* The IOCP notified us of a completed operation. Let's grab the results,
329 and act accordingly. */
330 transfered_bytes = 0;
331 wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
332 &transfered_bytes, FALSE, &flags);
334 if (!sp->shutting_down) {
335 char* utf8_message = gpr_format_message(WSAGetLastError());
336 gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
337 gpr_free(utf8_message);
341 if (!sp->shutting_down) {
342 err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
343 (char*)&sp->socket->socket, sizeof(sp->socket->socket));
345 char* utf8_message = gpr_format_message(WSAGetLastError());
346 gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
347 gpr_free(utf8_message);
349 int peer_name_len = (int)peer_name.len;
350 err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
351 peer_name.len = (size_t)peer_name_len;
352 std::string peer_name_string;
354 peer_name_string = grpc_sockaddr_to_uri(&peer_name);
356 char* utf8_message = gpr_format_message(WSAGetLastError());
357 gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
358 gpr_free(utf8_message);
360 std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
361 ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
362 sp->server->channel_args, peer_name_string.c_str());
368 /* The only time we should call our callback, is where we successfully
369 managed to accept a connection, and created an endpoint. */
372 grpc_tcp_server_acceptor* acceptor =
373 (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
374 acceptor->from_server = sp->server;
375 acceptor->port_index = sp->port_index;
376 acceptor->fd_index = 0;
377 acceptor->external_connection = false;
378 sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
380 /* As we were notified from the IOCP of one and exactly one accept,
381 the former socked we created has now either been destroy or assigned
382 to the new connection. We need to create a new one for the next
384 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
385 if (0 == --sp->outstanding_calls) {
386 decrement_active_ports_and_notify_locked(sp);
388 gpr_mu_unlock(&sp->server->mu);
391 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
392 const grpc_resolved_address* addr,
394 grpc_tcp_listener** listener) {
395 grpc_tcp_listener* sp = NULL;
398 GUID guid = WSAID_ACCEPTEX;
399 DWORD ioctl_num_bytes;
400 LPFN_ACCEPTEX AcceptEx;
401 grpc_error* error = GRPC_ERROR_NONE;
403 /* We need to grab the AcceptEx pointer for that port, as it may be
404 interface-dependent. We'll cache it to avoid doing that again. */
406 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
407 &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
410 char* utf8_message = gpr_format_message(WSAGetLastError());
411 gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
412 gpr_free(utf8_message);
414 return GRPC_ERROR_NONE;
417 error = prepare_socket(sock, addr, &port);
418 if (error != GRPC_ERROR_NONE) {
422 GPR_ASSERT(port >= 0);
424 GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
425 sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
427 if (s->head == NULL) {
434 sp->socket = grpc_winsocket_create(sock, "listener");
435 sp->shutting_down = 0;
436 sp->outstanding_calls = 0;
437 sp->AcceptEx = AcceptEx;
438 sp->new_socket = INVALID_SOCKET;
440 sp->port_index = port_index;
441 GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
442 GPR_ASSERT(sp->socket);
443 gpr_mu_unlock(&s->mu);
446 return GRPC_ERROR_NONE;
449 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
450 const grpc_resolved_address* addr,
452 grpc_tcp_listener* sp = NULL;
454 grpc_resolved_address addr6_v4mapped;
455 grpc_resolved_address wildcard;
456 grpc_resolved_address* allocated_addr = NULL;
457 grpc_resolved_address sockname_temp;
458 unsigned port_index = 0;
459 grpc_error* error = GRPC_ERROR_NONE;
461 if (s->tail != NULL) {
462 port_index = s->tail->port_index + 1;
465 /* Check if this is a wildcard port, and if so, try to keep the port the same
466 as some previously created listener. */
467 if (grpc_sockaddr_get_port(addr) == 0) {
468 for (sp = s->head; sp; sp = sp->next) {
469 int sockname_temp_len = sizeof(struct sockaddr_storage);
470 if (0 == getsockname(sp->socket->socket,
471 (grpc_sockaddr*)sockname_temp.addr,
472 &sockname_temp_len)) {
473 sockname_temp.len = (size_t)sockname_temp_len;
474 *port = grpc_sockaddr_get_port(&sockname_temp);
477 (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
478 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
479 grpc_sockaddr_set_port(allocated_addr, *port);
480 addr = allocated_addr;
487 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
488 addr = &addr6_v4mapped;
491 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
492 if (grpc_sockaddr_is_wildcard(addr, port)) {
493 grpc_sockaddr_make_wildcard6(*port, &wildcard);
498 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
499 grpc_get_default_wsa_socket_flags());
500 if (sock == INVALID_SOCKET) {
501 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
505 error = add_socket_to_server(s, sock, addr, port_index, &sp);
508 gpr_free(allocated_addr);
510 if (error != GRPC_ERROR_NONE) {
511 grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
512 "Failed to add port to server", &error, 1);
513 GRPC_ERROR_UNREF(error);
517 GPR_ASSERT(sp != NULL);
523 static void tcp_server_start(grpc_tcp_server* s,
524 const std::vector<grpc_pollset*>* /*pollsets*/,
525 grpc_tcp_server_cb on_accept_cb,
526 void* on_accept_cb_arg) {
527 grpc_tcp_listener* sp;
528 GPR_ASSERT(on_accept_cb);
530 GPR_ASSERT(!s->on_accept_cb);
531 GPR_ASSERT(s->active_ports == 0);
532 s->on_accept_cb = on_accept_cb;
533 s->on_accept_cb_arg = on_accept_cb_arg;
534 for (sp = s->head; sp; sp = sp->next) {
535 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
538 gpr_mu_unlock(&s->mu);
541 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
542 unsigned port_index) {
546 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
551 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
552 grpc_tcp_server* s) {
556 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
558 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
559 tcp_server_create, tcp_server_start,
560 tcp_server_add_port, tcp_server_create_fd_handler,
561 tcp_server_port_fd_count, tcp_server_port_fd,
562 tcp_server_ref, tcp_server_shutdown_starting_add,
563 tcp_server_unref, tcp_server_shutdown_listeners};
564 #endif /* GRPC_WINSOCK_SOCKET */