3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
23 #ifdef GRPC_WINSOCK_SOCKET
25 #include "src/core/lib/iomgr/sockaddr.h"
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/time.h>
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/iomgr/iocp_windows.h"
39 #include "src/core/lib/iomgr/pollset_windows.h"
40 #include "src/core/lib/iomgr/resolve_address.h"
41 #include "src/core/lib/iomgr/sockaddr_utils.h"
42 #include "src/core/lib/iomgr/socket_windows.h"
43 #include "src/core/lib/iomgr/tcp_server.h"
44 #include "src/core/lib/iomgr/tcp_windows.h"
46 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
48 /* one listening port */
49 typedef struct grpc_tcp_listener grpc_tcp_listener;
50 struct grpc_tcp_listener {
51 /* This seemingly magic number comes from AcceptEx's documentation. each
52 address buffer needs to have at least 16 more bytes at their end. */
53 uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
54 /* This will hold the socket for the next accept. */
56 /* The listener winsocket. */
57 grpc_winsocket* socket;
58 /* The actual TCP port number. */
61 grpc_tcp_server* server;
62 /* The cached AcceptEx for that port. */
63 LPFN_ACCEPTEX AcceptEx;
65 int outstanding_calls;
66 /* closure for socket notification of accept being ready */
67 grpc_closure on_accept;
69 struct grpc_tcp_listener* next;
72 /* the overall server */
73 struct grpc_tcp_server {
75 /* Called whenever accept() succeeds on a server port. */
76 grpc_tcp_server_cb on_accept_cb;
77 void* on_accept_cb_arg;
81 /* active port count: how many ports are actually still listening */
84 /* linked list of server ports */
85 grpc_tcp_listener* head;
86 grpc_tcp_listener* tail;
88 /* List of closures passed to shutdown_starting_add(). */
89 grpc_closure_list shutdown_starting;
91 /* shutdown callback */
92 grpc_closure* shutdown_complete;
94 grpc_channel_args* channel_args;
97 /* Public function. Allocates the proper data structures to hold a
99 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
100 const grpc_channel_args* args,
101 grpc_tcp_server** server) {
102 grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
103 s->channel_args = grpc_channel_args_copy(args);
104 gpr_ref_init(&s->refs, 1);
107 s->on_accept_cb = NULL;
108 s->on_accept_cb_arg = NULL;
111 s->shutdown_starting.head = NULL;
112 s->shutdown_starting.tail = NULL;
113 s->shutdown_complete = shutdown_complete;
115 return GRPC_ERROR_NONE;
118 static void destroy_server(void* arg, grpc_error* error) {
119 grpc_tcp_server* s = (grpc_tcp_server*)arg;
121 /* Now that the accepts have been aborted, we can destroy the sockets.
122 The IOCP won't get notified on these, so we can flag them as already
123 closed by the system. */
125 grpc_tcp_listener* sp = s->head;
128 grpc_winsocket_destroy(sp->socket);
131 grpc_channel_args_destroy(s->channel_args);
132 gpr_mu_destroy(&s->mu);
136 static void finish_shutdown_locked(grpc_tcp_server* s) {
137 if (s->shutdown_complete != NULL) {
138 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
142 grpc_core::ExecCtx::Run(
144 GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
148 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
149 gpr_ref_non_zero(&s->refs);
153 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
154 grpc_closure* shutdown_starting) {
156 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
158 gpr_mu_unlock(&s->mu);
161 static void tcp_server_destroy(grpc_tcp_server* s) {
162 grpc_tcp_listener* sp;
165 /* First, shutdown all fd's. This will queue abortion calls for all
166 of the pending accepts due to the normal operation mechanism. */
167 if (s->active_ports == 0) {
168 finish_shutdown_locked(s);
170 for (sp = s->head; sp; sp = sp->next) {
171 sp->shutting_down = 1;
172 grpc_winsocket_shutdown(sp->socket);
175 gpr_mu_unlock(&s->mu);
178 static void tcp_server_unref(grpc_tcp_server* s) {
179 if (gpr_unref(&s->refs)) {
180 grpc_tcp_server_shutdown_listeners(s);
182 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
183 gpr_mu_unlock(&s->mu);
184 tcp_server_destroy(s);
188 /* Prepare (bind) a recently-created socket for listening. */
189 static grpc_error* prepare_socket(SOCKET sock,
190 const grpc_resolved_address* addr,
192 grpc_resolved_address sockname_temp;
193 grpc_error* error = GRPC_ERROR_NONE;
194 int sockname_temp_len;
196 error = grpc_tcp_prepare_socket(sock);
197 if (error != GRPC_ERROR_NONE) {
201 if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
203 error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
207 if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
208 error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
212 sockname_temp_len = sizeof(struct sockaddr_storage);
213 if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
214 &sockname_temp_len) == SOCKET_ERROR) {
215 error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
218 sockname_temp.len = (size_t)sockname_temp_len;
220 *port = grpc_sockaddr_get_port(&sockname_temp);
221 return GRPC_ERROR_NONE;
224 GPR_ASSERT(error != GRPC_ERROR_NONE);
225 char* tgtaddr = grpc_sockaddr_to_uri(addr);
227 grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
228 "Failed to prepare server socket", &error, 1),
229 GRPC_ERROR_STR_TARGET_ADDRESS,
230 grpc_slice_from_copied_string(tgtaddr)),
231 GRPC_ERROR_INT_FD, (intptr_t)sock);
233 GRPC_ERROR_UNREF(error);
234 if (sock != INVALID_SOCKET) closesocket(sock);
238 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
239 sp->shutting_down = 0;
240 GPR_ASSERT(sp->server->active_ports > 0);
241 if (0 == --sp->server->active_ports) {
242 finish_shutdown_locked(sp->server);
246 /* In order to do an async accept, we need to create a socket first which
247 will be the one assigned to the new incoming connection. */
248 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
249 SOCKET sock = INVALID_SOCKET;
251 DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
252 DWORD bytes_received = 0;
253 grpc_error* error = GRPC_ERROR_NONE;
255 if (port->shutting_down) {
256 return GRPC_ERROR_NONE;
259 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
260 grpc_get_default_wsa_socket_flags());
261 if (sock == INVALID_SOCKET) {
262 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
266 error = grpc_tcp_prepare_socket(sock);
267 if (error != GRPC_ERROR_NONE) goto failure;
269 /* Start the "accept" asynchronously. */
270 success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
271 addrlen, addrlen, &bytes_received,
272 &port->socket->read_info.overlapped);
274 /* It is possible to get an accept immediately without delay. However, we
275 will still get an IOCP notification for it. So let's just ignore it. */
277 int last_error = WSAGetLastError();
278 if (last_error != ERROR_IO_PENDING) {
279 error = GRPC_WSA_ERROR(last_error, "AcceptEx");
284 /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
285 immediately process an accept that happened in the meantime. */
286 port->new_socket = sock;
287 grpc_socket_notify_on_read(port->socket, &port->on_accept);
288 port->outstanding_calls++;
292 GPR_ASSERT(error != GRPC_ERROR_NONE);
293 if (sock != INVALID_SOCKET) closesocket(sock);
297 /* Event manager callback when reads are ready. */
298 static void on_accept(void* arg, grpc_error* error) {
299 grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
300 SOCKET sock = sp->new_socket;
301 grpc_winsocket_callback_info* info = &sp->socket->read_info;
302 grpc_endpoint* ep = NULL;
303 grpc_resolved_address peer_name;
304 char* peer_name_string;
306 DWORD transfered_bytes;
311 gpr_mu_lock(&sp->server->mu);
313 peer_name.len = sizeof(struct sockaddr_storage);
315 /* The general mechanism for shutting down is to queue abortion calls. While
316 this is necessary in the read/write case, it's useless for the accept
317 case. We only need to adjust the pending callback count */
318 if (error != GRPC_ERROR_NONE) {
319 const char* msg = grpc_error_string(error);
320 gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
322 gpr_mu_unlock(&sp->server->mu);
326 /* The IOCP notified us of a completed operation. Let's grab the results,
327 and act accordingly. */
328 transfered_bytes = 0;
329 wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
330 &transfered_bytes, FALSE, &flags);
332 if (!sp->shutting_down) {
333 char* utf8_message = gpr_format_message(WSAGetLastError());
334 gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
335 gpr_free(utf8_message);
339 if (!sp->shutting_down) {
340 peer_name_string = NULL;
341 err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
342 (char*)&sp->socket->socket, sizeof(sp->socket->socket));
344 char* utf8_message = gpr_format_message(WSAGetLastError());
345 gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
346 gpr_free(utf8_message);
348 int peer_name_len = (int)peer_name.len;
349 err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
350 peer_name.len = (size_t)peer_name_len;
352 peer_name_string = grpc_sockaddr_to_uri(&peer_name);
354 char* utf8_message = gpr_format_message(WSAGetLastError());
355 gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
356 gpr_free(utf8_message);
358 gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
359 ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
360 sp->server->channel_args, peer_name_string);
362 gpr_free(peer_name_string);
368 /* The only time we should call our callback, is where we successfully
369 managed to accept a connection, and created an endpoint. */
372 grpc_tcp_server_acceptor* acceptor =
373 (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
374 acceptor->from_server = sp->server;
375 acceptor->port_index = sp->port_index;
376 acceptor->fd_index = 0;
377 acceptor->external_connection = false;
378 sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
380 /* As we were notified from the IOCP of one and exactly one accept,
381 the former socked we created has now either been destroy or assigned
382 to the new connection. We need to create a new one for the next
384 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
385 if (0 == --sp->outstanding_calls) {
386 decrement_active_ports_and_notify_locked(sp);
388 gpr_mu_unlock(&sp->server->mu);
391 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
392 const grpc_resolved_address* addr,
394 grpc_tcp_listener** listener) {
395 grpc_tcp_listener* sp = NULL;
398 GUID guid = WSAID_ACCEPTEX;
399 DWORD ioctl_num_bytes;
400 LPFN_ACCEPTEX AcceptEx;
401 grpc_error* error = GRPC_ERROR_NONE;
403 /* We need to grab the AcceptEx pointer for that port, as it may be
404 interface-dependent. We'll cache it to avoid doing that again. */
406 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
407 &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
410 char* utf8_message = gpr_format_message(WSAGetLastError());
411 gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
412 gpr_free(utf8_message);
414 return GRPC_ERROR_NONE;
417 error = prepare_socket(sock, addr, &port);
418 if (error != GRPC_ERROR_NONE) {
422 GPR_ASSERT(port >= 0);
424 GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
425 sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
427 if (s->head == NULL) {
434 sp->socket = grpc_winsocket_create(sock, "listener");
435 sp->shutting_down = 0;
436 sp->outstanding_calls = 0;
437 sp->AcceptEx = AcceptEx;
438 sp->new_socket = INVALID_SOCKET;
440 sp->port_index = port_index;
441 GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
442 GPR_ASSERT(sp->socket);
443 gpr_mu_unlock(&s->mu);
446 return GRPC_ERROR_NONE;
449 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
450 const grpc_resolved_address* addr,
452 grpc_tcp_listener* sp = NULL;
454 grpc_resolved_address addr6_v4mapped;
455 grpc_resolved_address wildcard;
456 grpc_resolved_address* allocated_addr = NULL;
457 grpc_resolved_address sockname_temp;
458 unsigned port_index = 0;
459 grpc_error* error = GRPC_ERROR_NONE;
461 if (s->tail != NULL) {
462 port_index = s->tail->port_index + 1;
465 /* Check if this is a wildcard port, and if so, try to keep the port the same
466 as some previously created listener. */
467 if (grpc_sockaddr_get_port(addr) == 0) {
468 for (sp = s->head; sp; sp = sp->next) {
469 int sockname_temp_len = sizeof(struct sockaddr_storage);
470 if (0 == getsockname(sp->socket->socket,
471 (grpc_sockaddr*)sockname_temp.addr,
472 &sockname_temp_len)) {
473 sockname_temp.len = (size_t)sockname_temp_len;
474 *port = grpc_sockaddr_get_port(&sockname_temp);
477 (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
478 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
479 grpc_sockaddr_set_port(allocated_addr, *port);
480 addr = allocated_addr;
487 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
488 addr = &addr6_v4mapped;
491 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
492 if (grpc_sockaddr_is_wildcard(addr, port)) {
493 grpc_sockaddr_make_wildcard6(*port, &wildcard);
498 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
499 grpc_get_default_wsa_socket_flags());
500 if (sock == INVALID_SOCKET) {
501 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
505 error = add_socket_to_server(s, sock, addr, port_index, &sp);
508 gpr_free(allocated_addr);
510 if (error != GRPC_ERROR_NONE) {
511 grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
512 "Failed to add port to server", &error, 1);
513 GRPC_ERROR_UNREF(error);
517 GPR_ASSERT(sp != NULL);
523 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
524 size_t pollset_count,
525 grpc_tcp_server_cb on_accept_cb,
526 void* on_accept_cb_arg) {
527 grpc_tcp_listener* sp;
528 GPR_ASSERT(on_accept_cb);
530 GPR_ASSERT(!s->on_accept_cb);
531 GPR_ASSERT(s->active_ports == 0);
532 s->on_accept_cb = on_accept_cb;
533 s->on_accept_cb_arg = on_accept_cb_arg;
534 for (sp = s->head; sp; sp = sp->next) {
535 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
538 gpr_mu_unlock(&s->mu);
541 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
542 unsigned port_index) {
546 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
551 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
552 grpc_tcp_server* s) {
556 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
558 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
559 tcp_server_create, tcp_server_start,
560 tcp_server_add_port, tcp_server_create_fd_handler,
561 tcp_server_port_fd_count, tcp_server_port_fd,
562 tcp_server_ref, tcp_server_shutdown_starting_add,
563 tcp_server_unref, tcp_server_shutdown_listeners};
564 #endif /* GRPC_WINSOCK_SOCKET */