3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
23 #ifdef GRPC_WINSOCK_SOCKET
30 #include "absl/strings/str_cat.h"
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/log_windows.h>
35 #include <grpc/support/string_util.h>
36 #include <grpc/support/sync.h>
37 #include <grpc/support/time.h>
39 #include "src/core/lib/address_utils/sockaddr_utils.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/iomgr/iocp_windows.h"
42 #include "src/core/lib/iomgr/pollset_windows.h"
43 #include "src/core/lib/iomgr/resolve_address.h"
44 #include "src/core/lib/iomgr/sockaddr.h"
45 #include "src/core/lib/iomgr/socket_windows.h"
46 #include "src/core/lib/iomgr/tcp_server.h"
47 #include "src/core/lib/iomgr/tcp_windows.h"
48 #include "src/core/lib/slice/slice_internal.h"
50 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
52 /* one listening port */
53 typedef struct grpc_tcp_listener grpc_tcp_listener;
54 struct grpc_tcp_listener {
55 /* This seemingly magic number comes from AcceptEx's documentation. each
56 address buffer needs to have at least 16 more bytes at their end. */
57 uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
58 /* This will hold the socket for the next accept. */
60 /* The listener winsocket. */
61 grpc_winsocket* socket;
62 /* The actual TCP port number. */
65 grpc_tcp_server* server;
66 /* The cached AcceptEx for that port. */
67 LPFN_ACCEPTEX AcceptEx;
69 int outstanding_calls;
70 /* closure for socket notification of accept being ready */
71 grpc_closure on_accept;
73 struct grpc_tcp_listener* next;
76 /* the overall server */
77 struct grpc_tcp_server {
79 /* Called whenever accept() succeeds on a server port. */
80 grpc_tcp_server_cb on_accept_cb;
81 void* on_accept_cb_arg;
85 /* active port count: how many ports are actually still listening */
88 /* linked list of server ports */
89 grpc_tcp_listener* head;
90 grpc_tcp_listener* tail;
92 /* List of closures passed to shutdown_starting_add(). */
93 grpc_closure_list shutdown_starting;
95 /* shutdown callback */
96 grpc_closure* shutdown_complete;
98 grpc_channel_args* channel_args;
99 grpc_slice_allocator_factory* slice_allocator_factory;
102 /* Public function. Allocates the proper data structures to hold a
104 static grpc_error_handle tcp_server_create(
105 grpc_closure* shutdown_complete, const grpc_channel_args* args,
106 grpc_slice_allocator_factory* slice_allocator_factory,
107 grpc_tcp_server** server) {
108 grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
109 s->channel_args = grpc_channel_args_copy(args);
110 gpr_ref_init(&s->refs, 1);
113 s->on_accept_cb = NULL;
114 s->on_accept_cb_arg = NULL;
117 s->shutdown_starting.head = NULL;
118 s->shutdown_starting.tail = NULL;
119 s->shutdown_complete = shutdown_complete;
120 s->slice_allocator_factory = slice_allocator_factory;
122 return GRPC_ERROR_NONE;
125 static void destroy_server(void* arg, grpc_error_handle error) {
126 grpc_tcp_server* s = (grpc_tcp_server*)arg;
128 /* Now that the accepts have been aborted, we can destroy the sockets.
129 The IOCP won't get notified on these, so we can flag them as already
130 closed by the system. */
132 grpc_tcp_listener* sp = s->head;
135 grpc_winsocket_destroy(sp->socket);
138 grpc_channel_args_destroy(s->channel_args);
139 gpr_mu_destroy(&s->mu);
143 static void finish_shutdown_locked(grpc_tcp_server* s) {
144 if (s->shutdown_complete != NULL) {
145 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
149 grpc_core::ExecCtx::Run(
151 GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
155 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
156 gpr_ref_non_zero(&s->refs);
160 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
161 grpc_closure* shutdown_starting) {
163 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
165 gpr_mu_unlock(&s->mu);
168 static void tcp_server_destroy(grpc_tcp_server* s) {
169 grpc_tcp_listener* sp;
171 grpc_slice_allocator_factory_destroy(s->slice_allocator_factory);
172 /* First, shutdown all fd's. This will queue abortion calls for all
173 of the pending accepts due to the normal operation mechanism. */
174 if (s->active_ports == 0) {
175 finish_shutdown_locked(s);
177 for (sp = s->head; sp; sp = sp->next) {
178 sp->shutting_down = 1;
179 grpc_winsocket_shutdown(sp->socket);
182 gpr_mu_unlock(&s->mu);
185 static void tcp_server_unref(grpc_tcp_server* s) {
186 if (gpr_unref(&s->refs)) {
187 grpc_tcp_server_shutdown_listeners(s);
189 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
190 gpr_mu_unlock(&s->mu);
191 tcp_server_destroy(s);
195 /* Prepare (bind) a recently-created socket for listening. */
196 static grpc_error_handle prepare_socket(SOCKET sock,
197 const grpc_resolved_address* addr,
199 grpc_resolved_address sockname_temp;
200 grpc_error_handle error = GRPC_ERROR_NONE;
201 int sockname_temp_len;
203 error = grpc_tcp_prepare_socket(sock);
204 if (error != GRPC_ERROR_NONE) {
208 if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
210 error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
214 if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
215 error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
219 sockname_temp_len = sizeof(struct sockaddr_storage);
220 if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
221 &sockname_temp_len) == SOCKET_ERROR) {
222 error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
225 sockname_temp.len = (size_t)sockname_temp_len;
227 *port = grpc_sockaddr_get_port(&sockname_temp);
228 return GRPC_ERROR_NONE;
231 GPR_ASSERT(error != GRPC_ERROR_NONE);
234 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
235 "Failed to prepare server socket", &error, 1),
236 GRPC_ERROR_STR_TARGET_ADDRESS,
237 grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr))),
238 GRPC_ERROR_INT_FD, (intptr_t)sock);
239 GRPC_ERROR_UNREF(error);
240 if (sock != INVALID_SOCKET) closesocket(sock);
244 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
245 sp->shutting_down = 0;
246 GPR_ASSERT(sp->server->active_ports > 0);
247 if (0 == --sp->server->active_ports) {
248 finish_shutdown_locked(sp->server);
252 /* In order to do an async accept, we need to create a socket first which
253 will be the one assigned to the new incoming connection. */
254 static grpc_error_handle start_accept_locked(grpc_tcp_listener* port) {
255 SOCKET sock = INVALID_SOCKET;
257 DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
258 DWORD bytes_received = 0;
259 grpc_error_handle error = GRPC_ERROR_NONE;
261 if (port->shutting_down) {
262 return GRPC_ERROR_NONE;
265 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
266 grpc_get_default_wsa_socket_flags());
267 if (sock == INVALID_SOCKET) {
268 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
272 error = grpc_tcp_prepare_socket(sock);
273 if (error != GRPC_ERROR_NONE) goto failure;
275 /* Start the "accept" asynchronously. */
276 success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
277 addrlen, addrlen, &bytes_received,
278 &port->socket->read_info.overlapped);
280 /* It is possible to get an accept immediately without delay. However, we
281 will still get an IOCP notification for it. So let's just ignore it. */
283 int last_error = WSAGetLastError();
284 if (last_error != ERROR_IO_PENDING) {
285 error = GRPC_WSA_ERROR(last_error, "AcceptEx");
290 /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
291 immediately process an accept that happened in the meantime. */
292 port->new_socket = sock;
293 grpc_socket_notify_on_read(port->socket, &port->on_accept);
294 port->outstanding_calls++;
298 GPR_ASSERT(error != GRPC_ERROR_NONE);
299 if (sock != INVALID_SOCKET) closesocket(sock);
303 /* Event manager callback when reads are ready. */
304 static void on_accept(void* arg, grpc_error_handle error) {
305 grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
306 SOCKET sock = sp->new_socket;
307 grpc_winsocket_callback_info* info = &sp->socket->read_info;
308 grpc_endpoint* ep = NULL;
309 grpc_resolved_address peer_name;
310 DWORD transfered_bytes;
315 gpr_mu_lock(&sp->server->mu);
317 peer_name.len = sizeof(struct sockaddr_storage);
319 /* The general mechanism for shutting down is to queue abortion calls. While
320 this is necessary in the read/write case, it's useless for the accept
321 case. We only need to adjust the pending callback count */
322 if (error != GRPC_ERROR_NONE) {
323 gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
324 grpc_error_std_string(error).c_str());
326 gpr_mu_unlock(&sp->server->mu);
329 /* The IOCP notified us of a completed operation. Let's grab the results,
330 and act accordingly. */
331 transfered_bytes = 0;
332 wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
333 &transfered_bytes, FALSE, &flags);
335 if (!sp->shutting_down) {
336 char* utf8_message = gpr_format_message(WSAGetLastError());
337 gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
338 gpr_free(utf8_message);
342 if (!sp->shutting_down) {
343 err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
344 (char*)&sp->socket->socket, sizeof(sp->socket->socket));
346 char* utf8_message = gpr_format_message(WSAGetLastError());
347 gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
348 gpr_free(utf8_message);
350 int peer_name_len = (int)peer_name.len;
351 err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
352 peer_name.len = (size_t)peer_name_len;
353 std::string peer_name_string;
355 peer_name_string = grpc_sockaddr_to_uri(&peer_name);
357 char* utf8_message = gpr_format_message(WSAGetLastError());
358 gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
359 gpr_free(utf8_message);
361 std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
362 ep = grpc_tcp_create(
363 grpc_winsocket_create(sock, fd_name.c_str()),
364 sp->server->channel_args, peer_name_string.c_str(),
365 grpc_slice_allocator_factory_create_slice_allocator(
366 sp->server->slice_allocator_factory, peer_name_string));
372 /* The only time we should call our callback, is where we successfully
373 managed to accept a connection, and created an endpoint. */
376 grpc_tcp_server_acceptor* acceptor =
377 (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
378 acceptor->from_server = sp->server;
379 acceptor->port_index = sp->port_index;
380 acceptor->fd_index = 0;
381 acceptor->external_connection = false;
382 sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
384 /* As we were notified from the IOCP of one and exactly one accept,
385 the former socked we created has now either been destroy or assigned
386 to the new connection. We need to create a new one for the next
388 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
389 if (0 == --sp->outstanding_calls) {
390 decrement_active_ports_and_notify_locked(sp);
392 gpr_mu_unlock(&sp->server->mu);
395 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
396 const grpc_resolved_address* addr,
398 grpc_tcp_listener** listener) {
399 grpc_tcp_listener* sp = NULL;
402 GUID guid = WSAID_ACCEPTEX;
403 DWORD ioctl_num_bytes;
404 LPFN_ACCEPTEX AcceptEx;
405 grpc_error_handle error = GRPC_ERROR_NONE;
407 /* We need to grab the AcceptEx pointer for that port, as it may be
408 interface-dependent. We'll cache it to avoid doing that again. */
410 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
411 &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
414 char* utf8_message = gpr_format_message(WSAGetLastError());
415 gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
416 gpr_free(utf8_message);
418 return GRPC_ERROR_NONE;
421 error = prepare_socket(sock, addr, &port);
422 if (error != GRPC_ERROR_NONE) {
426 GPR_ASSERT(port >= 0);
428 GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
429 sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
431 if (s->head == NULL) {
438 sp->socket = grpc_winsocket_create(sock, "listener");
439 sp->shutting_down = 0;
440 sp->outstanding_calls = 0;
441 sp->AcceptEx = AcceptEx;
442 sp->new_socket = INVALID_SOCKET;
444 sp->port_index = port_index;
445 GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
446 GPR_ASSERT(sp->socket);
447 gpr_mu_unlock(&s->mu);
450 return GRPC_ERROR_NONE;
453 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
454 const grpc_resolved_address* addr,
456 grpc_tcp_listener* sp = NULL;
458 grpc_resolved_address addr6_v4mapped;
459 grpc_resolved_address wildcard;
460 grpc_resolved_address* allocated_addr = NULL;
461 grpc_resolved_address sockname_temp;
462 unsigned port_index = 0;
463 grpc_error_handle error = GRPC_ERROR_NONE;
465 if (s->tail != NULL) {
466 port_index = s->tail->port_index + 1;
469 /* Check if this is a wildcard port, and if so, try to keep the port the same
470 as some previously created listener. */
471 if (grpc_sockaddr_get_port(addr) == 0) {
472 for (sp = s->head; sp; sp = sp->next) {
473 int sockname_temp_len = sizeof(struct sockaddr_storage);
474 if (0 == getsockname(sp->socket->socket,
475 (grpc_sockaddr*)sockname_temp.addr,
476 &sockname_temp_len)) {
477 sockname_temp.len = (size_t)sockname_temp_len;
478 *port = grpc_sockaddr_get_port(&sockname_temp);
481 (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
482 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
483 grpc_sockaddr_set_port(allocated_addr, *port);
484 addr = allocated_addr;
491 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
492 addr = &addr6_v4mapped;
495 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
496 if (grpc_sockaddr_is_wildcard(addr, port)) {
497 grpc_sockaddr_make_wildcard6(*port, &wildcard);
502 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
503 grpc_get_default_wsa_socket_flags());
504 if (sock == INVALID_SOCKET) {
505 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
509 error = add_socket_to_server(s, sock, addr, port_index, &sp);
512 gpr_free(allocated_addr);
514 if (error != GRPC_ERROR_NONE) {
515 grpc_error_handle error_out =
516 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
517 "Failed to add port to server", &error, 1);
518 GRPC_ERROR_UNREF(error);
522 GPR_ASSERT(sp != NULL);
528 static void tcp_server_start(grpc_tcp_server* s,
529 const std::vector<grpc_pollset*>* /*pollsets*/,
530 grpc_tcp_server_cb on_accept_cb,
531 void* on_accept_cb_arg) {
532 grpc_tcp_listener* sp;
533 GPR_ASSERT(on_accept_cb);
535 GPR_ASSERT(!s->on_accept_cb);
536 GPR_ASSERT(s->active_ports == 0);
537 s->on_accept_cb = on_accept_cb;
538 s->on_accept_cb_arg = on_accept_cb_arg;
539 for (sp = s->head; sp; sp = sp->next) {
540 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
543 gpr_mu_unlock(&s->mu);
546 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
547 unsigned port_index) {
551 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
556 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
557 grpc_tcp_server* s) {
561 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
563 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
564 tcp_server_create, tcp_server_start,
565 tcp_server_add_port, tcp_server_create_fd_handler,
566 tcp_server_port_fd_count, tcp_server_port_fd,
567 tcp_server_ref, tcp_server_shutdown_starting_add,
568 tcp_server_unref, tcp_server_shutdown_listeners};
569 #endif /* GRPC_WINSOCK_SOCKET */