Imported Upstream version 1.38.0
[platform/upstream/grpc.git] / src / core / lib / iomgr / tcp_server_windows.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include "src/core/lib/iomgr/sockaddr.h"
26
27 #include <inttypes.h>
28 #include <io.h>
29
30 #include <vector>
31
32 #include "absl/strings/str_cat.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/log_windows.h>
37 #include <grpc/support/string_util.h>
38 #include <grpc/support/sync.h>
39 #include <grpc/support/time.h>
40
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/iomgr/iocp_windows.h"
44 #include "src/core/lib/iomgr/pollset_windows.h"
45 #include "src/core/lib/iomgr/resolve_address.h"
46 #include "src/core/lib/iomgr/socket_windows.h"
47 #include "src/core/lib/iomgr/tcp_server.h"
48 #include "src/core/lib/iomgr/tcp_windows.h"
49 #include "src/core/lib/slice/slice_internal.h"
50
51 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
52
53 /* one listening port */
54 typedef struct grpc_tcp_listener grpc_tcp_listener;
55 struct grpc_tcp_listener {
56   /* This seemingly magic number comes from AcceptEx's documentation. each
57      address buffer needs to have at least 16 more bytes at their end. */
58   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
59   /* This will hold the socket for the next accept. */
60   SOCKET new_socket;
61   /* The listener winsocket. */
62   grpc_winsocket* socket;
63   /* The actual TCP port number. */
64   int port;
65   unsigned port_index;
66   grpc_tcp_server* server;
67   /* The cached AcceptEx for that port. */
68   LPFN_ACCEPTEX AcceptEx;
69   int shutting_down;
70   int outstanding_calls;
71   /* closure for socket notification of accept being ready */
72   grpc_closure on_accept;
73   /* linked list */
74   struct grpc_tcp_listener* next;
75 };
76
77 /* the overall server */
78 struct grpc_tcp_server {
79   gpr_refcount refs;
80   /* Called whenever accept() succeeds on a server port. */
81   grpc_tcp_server_cb on_accept_cb;
82   void* on_accept_cb_arg;
83
84   gpr_mu mu;
85
86   /* active port count: how many ports are actually still listening */
87   int active_ports;
88
89   /* linked list of server ports */
90   grpc_tcp_listener* head;
91   grpc_tcp_listener* tail;
92
93   /* List of closures passed to shutdown_starting_add(). */
94   grpc_closure_list shutdown_starting;
95
96   /* shutdown callback */
97   grpc_closure* shutdown_complete;
98
99   grpc_channel_args* channel_args;
100 };
101
102 /* Public function. Allocates the proper data structures to hold a
103    grpc_tcp_server. */
104 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
105                                            const grpc_channel_args* args,
106                                            grpc_tcp_server** server) {
107   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
108   s->channel_args = grpc_channel_args_copy(args);
109   gpr_ref_init(&s->refs, 1);
110   gpr_mu_init(&s->mu);
111   s->active_ports = 0;
112   s->on_accept_cb = NULL;
113   s->on_accept_cb_arg = NULL;
114   s->head = NULL;
115   s->tail = NULL;
116   s->shutdown_starting.head = NULL;
117   s->shutdown_starting.tail = NULL;
118   s->shutdown_complete = shutdown_complete;
119   *server = s;
120   return GRPC_ERROR_NONE;
121 }
122
123 static void destroy_server(void* arg, grpc_error_handle error) {
124   grpc_tcp_server* s = (grpc_tcp_server*)arg;
125
126   /* Now that the accepts have been aborted, we can destroy the sockets.
127      The IOCP won't get notified on these, so we can flag them as already
128      closed by the system. */
129   while (s->head) {
130     grpc_tcp_listener* sp = s->head;
131     s->head = sp->next;
132     sp->next = NULL;
133     grpc_winsocket_destroy(sp->socket);
134     gpr_free(sp);
135   }
136   grpc_channel_args_destroy(s->channel_args);
137   gpr_mu_destroy(&s->mu);
138   gpr_free(s);
139 }
140
141 static void finish_shutdown_locked(grpc_tcp_server* s) {
142   if (s->shutdown_complete != NULL) {
143     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
144                             GRPC_ERROR_NONE);
145   }
146
147   grpc_core::ExecCtx::Run(
148       DEBUG_LOCATION,
149       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
150       GRPC_ERROR_NONE);
151 }
152
153 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
154   gpr_ref_non_zero(&s->refs);
155   return s;
156 }
157
158 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
159                                              grpc_closure* shutdown_starting) {
160   gpr_mu_lock(&s->mu);
161   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
162                            GRPC_ERROR_NONE);
163   gpr_mu_unlock(&s->mu);
164 }
165
166 static void tcp_server_destroy(grpc_tcp_server* s) {
167   grpc_tcp_listener* sp;
168   gpr_mu_lock(&s->mu);
169
170   /* First, shutdown all fd's. This will queue abortion calls for all
171      of the pending accepts due to the normal operation mechanism. */
172   if (s->active_ports == 0) {
173     finish_shutdown_locked(s);
174   } else {
175     for (sp = s->head; sp; sp = sp->next) {
176       sp->shutting_down = 1;
177       grpc_winsocket_shutdown(sp->socket);
178     }
179   }
180   gpr_mu_unlock(&s->mu);
181 }
182
183 static void tcp_server_unref(grpc_tcp_server* s) {
184   if (gpr_unref(&s->refs)) {
185     grpc_tcp_server_shutdown_listeners(s);
186     gpr_mu_lock(&s->mu);
187     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
188     gpr_mu_unlock(&s->mu);
189     tcp_server_destroy(s);
190   }
191 }
192
193 /* Prepare (bind) a recently-created socket for listening. */
194 static grpc_error_handle prepare_socket(SOCKET sock,
195                                         const grpc_resolved_address* addr,
196                                         int* port) {
197   grpc_resolved_address sockname_temp;
198   grpc_error_handle error = GRPC_ERROR_NONE;
199   int sockname_temp_len;
200
201   error = grpc_tcp_prepare_socket(sock);
202   if (error != GRPC_ERROR_NONE) {
203     goto failure;
204   }
205
206   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
207       SOCKET_ERROR) {
208     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
209     goto failure;
210   }
211
212   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
213     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
214     goto failure;
215   }
216
217   sockname_temp_len = sizeof(struct sockaddr_storage);
218   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
219                   &sockname_temp_len) == SOCKET_ERROR) {
220     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
221     goto failure;
222   }
223   sockname_temp.len = (size_t)sockname_temp_len;
224
225   *port = grpc_sockaddr_get_port(&sockname_temp);
226   return GRPC_ERROR_NONE;
227
228 failure:
229   GPR_ASSERT(error != GRPC_ERROR_NONE);
230   grpc_error_set_int(
231       grpc_error_set_str(
232           GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
233               "Failed to prepare server socket", &error, 1),
234           GRPC_ERROR_STR_TARGET_ADDRESS,
235           grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr))),
236       GRPC_ERROR_INT_FD, (intptr_t)sock);
237   GRPC_ERROR_UNREF(error);
238   if (sock != INVALID_SOCKET) closesocket(sock);
239   return error;
240 }
241
242 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
243   sp->shutting_down = 0;
244   GPR_ASSERT(sp->server->active_ports > 0);
245   if (0 == --sp->server->active_ports) {
246     finish_shutdown_locked(sp->server);
247   }
248 }
249
250 /* In order to do an async accept, we need to create a socket first which
251    will be the one assigned to the new incoming connection. */
252 static grpc_error_handle start_accept_locked(grpc_tcp_listener* port) {
253   SOCKET sock = INVALID_SOCKET;
254   BOOL success;
255   DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
256   DWORD bytes_received = 0;
257   grpc_error_handle error = GRPC_ERROR_NONE;
258
259   if (port->shutting_down) {
260     return GRPC_ERROR_NONE;
261   }
262
263   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
264                    grpc_get_default_wsa_socket_flags());
265   if (sock == INVALID_SOCKET) {
266     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
267     goto failure;
268   }
269
270   error = grpc_tcp_prepare_socket(sock);
271   if (error != GRPC_ERROR_NONE) goto failure;
272
273   /* Start the "accept" asynchronously. */
274   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
275                            addrlen, addrlen, &bytes_received,
276                            &port->socket->read_info.overlapped);
277
278   /* It is possible to get an accept immediately without delay. However, we
279      will still get an IOCP notification for it. So let's just ignore it. */
280   if (!success) {
281     int last_error = WSAGetLastError();
282     if (last_error != ERROR_IO_PENDING) {
283       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
284       goto failure;
285     }
286   }
287
288   /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
289      immediately process an accept that happened in the meantime. */
290   port->new_socket = sock;
291   grpc_socket_notify_on_read(port->socket, &port->on_accept);
292   port->outstanding_calls++;
293   return error;
294
295 failure:
296   GPR_ASSERT(error != GRPC_ERROR_NONE);
297   if (sock != INVALID_SOCKET) closesocket(sock);
298   return error;
299 }
300
301 /* Event manager callback when reads are ready. */
302 static void on_accept(void* arg, grpc_error_handle error) {
303   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
304   SOCKET sock = sp->new_socket;
305   grpc_winsocket_callback_info* info = &sp->socket->read_info;
306   grpc_endpoint* ep = NULL;
307   grpc_resolved_address peer_name;
308   DWORD transfered_bytes;
309   DWORD flags;
310   BOOL wsa_success;
311   int err;
312
313   gpr_mu_lock(&sp->server->mu);
314
315   peer_name.len = sizeof(struct sockaddr_storage);
316
317   /* The general mechanism for shutting down is to queue abortion calls. While
318      this is necessary in the read/write case, it's useless for the accept
319      case. We only need to adjust the pending callback count */
320   if (error != GRPC_ERROR_NONE) {
321     gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
322             grpc_error_std_string(error).c_str());
323
324     gpr_mu_unlock(&sp->server->mu);
325     return;
326   }
327
328   /* The IOCP notified us of a completed operation. Let's grab the results,
329      and act accordingly. */
330   transfered_bytes = 0;
331   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
332                                        &transfered_bytes, FALSE, &flags);
333   if (!wsa_success) {
334     if (!sp->shutting_down) {
335       char* utf8_message = gpr_format_message(WSAGetLastError());
336       gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
337       gpr_free(utf8_message);
338     }
339     closesocket(sock);
340   } else {
341     if (!sp->shutting_down) {
342       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
343                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
344       if (err) {
345         char* utf8_message = gpr_format_message(WSAGetLastError());
346         gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
347         gpr_free(utf8_message);
348       }
349       int peer_name_len = (int)peer_name.len;
350       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
351       peer_name.len = (size_t)peer_name_len;
352       std::string peer_name_string;
353       if (!err) {
354         peer_name_string = grpc_sockaddr_to_uri(&peer_name);
355       } else {
356         char* utf8_message = gpr_format_message(WSAGetLastError());
357         gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
358         gpr_free(utf8_message);
359       }
360       std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
361       ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
362                            sp->server->channel_args, peer_name_string.c_str());
363     } else {
364       closesocket(sock);
365     }
366   }
367
368   /* The only time we should call our callback, is where we successfully
369      managed to accept a connection, and created an endpoint. */
370   if (ep) {
371     // Create acceptor.
372     grpc_tcp_server_acceptor* acceptor =
373         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
374     acceptor->from_server = sp->server;
375     acceptor->port_index = sp->port_index;
376     acceptor->fd_index = 0;
377     acceptor->external_connection = false;
378     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
379   }
380   /* As we were notified from the IOCP of one and exactly one accept,
381      the former socked we created has now either been destroy or assigned
382      to the new connection. We need to create a new one for the next
383      connection. */
384   GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
385   if (0 == --sp->outstanding_calls) {
386     decrement_active_ports_and_notify_locked(sp);
387   }
388   gpr_mu_unlock(&sp->server->mu);
389 }
390
391 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
392                                               const grpc_resolved_address* addr,
393                                               unsigned port_index,
394                                               grpc_tcp_listener** listener) {
395   grpc_tcp_listener* sp = NULL;
396   int port = -1;
397   int status;
398   GUID guid = WSAID_ACCEPTEX;
399   DWORD ioctl_num_bytes;
400   LPFN_ACCEPTEX AcceptEx;
401   grpc_error_handle error = GRPC_ERROR_NONE;
402
403   /* We need to grab the AcceptEx pointer for that port, as it may be
404      interface-dependent. We'll cache it to avoid doing that again. */
405   status =
406       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
407                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
408
409   if (status != 0) {
410     char* utf8_message = gpr_format_message(WSAGetLastError());
411     gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
412     gpr_free(utf8_message);
413     closesocket(sock);
414     return GRPC_ERROR_NONE;
415   }
416
417   error = prepare_socket(sock, addr, &port);
418   if (error != GRPC_ERROR_NONE) {
419     return error;
420   }
421
422   GPR_ASSERT(port >= 0);
423   gpr_mu_lock(&s->mu);
424   GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
425   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
426   sp->next = NULL;
427   if (s->head == NULL) {
428     s->head = sp;
429   } else {
430     s->tail->next = sp;
431   }
432   s->tail = sp;
433   sp->server = s;
434   sp->socket = grpc_winsocket_create(sock, "listener");
435   sp->shutting_down = 0;
436   sp->outstanding_calls = 0;
437   sp->AcceptEx = AcceptEx;
438   sp->new_socket = INVALID_SOCKET;
439   sp->port = port;
440   sp->port_index = port_index;
441   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
442   GPR_ASSERT(sp->socket);
443   gpr_mu_unlock(&s->mu);
444   *listener = sp;
445
446   return GRPC_ERROR_NONE;
447 }
448
449 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
450                                              const grpc_resolved_address* addr,
451                                              int* port) {
452   grpc_tcp_listener* sp = NULL;
453   SOCKET sock;
454   grpc_resolved_address addr6_v4mapped;
455   grpc_resolved_address wildcard;
456   grpc_resolved_address* allocated_addr = NULL;
457   grpc_resolved_address sockname_temp;
458   unsigned port_index = 0;
459   grpc_error_handle error = GRPC_ERROR_NONE;
460
461   if (s->tail != NULL) {
462     port_index = s->tail->port_index + 1;
463   }
464
465   /* Check if this is a wildcard port, and if so, try to keep the port the same
466      as some previously created listener. */
467   if (grpc_sockaddr_get_port(addr) == 0) {
468     for (sp = s->head; sp; sp = sp->next) {
469       int sockname_temp_len = sizeof(struct sockaddr_storage);
470       if (0 == getsockname(sp->socket->socket,
471                            (grpc_sockaddr*)sockname_temp.addr,
472                            &sockname_temp_len)) {
473         sockname_temp.len = (size_t)sockname_temp_len;
474         *port = grpc_sockaddr_get_port(&sockname_temp);
475         if (*port > 0) {
476           allocated_addr =
477               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
478           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
479           grpc_sockaddr_set_port(allocated_addr, *port);
480           addr = allocated_addr;
481           break;
482         }
483       }
484     }
485   }
486
487   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
488     addr = &addr6_v4mapped;
489   }
490
491   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
492   if (grpc_sockaddr_is_wildcard(addr, port)) {
493     grpc_sockaddr_make_wildcard6(*port, &wildcard);
494
495     addr = &wildcard;
496   }
497
498   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
499                    grpc_get_default_wsa_socket_flags());
500   if (sock == INVALID_SOCKET) {
501     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
502     goto done;
503   }
504
505   error = add_socket_to_server(s, sock, addr, port_index, &sp);
506
507 done:
508   gpr_free(allocated_addr);
509
510   if (error != GRPC_ERROR_NONE) {
511     grpc_error_handle error_out =
512         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
513             "Failed to add port to server", &error, 1);
514     GRPC_ERROR_UNREF(error);
515     error = error_out;
516     *port = -1;
517   } else {
518     GPR_ASSERT(sp != NULL);
519     *port = sp->port;
520   }
521   return error;
522 }
523
524 static void tcp_server_start(grpc_tcp_server* s,
525                              const std::vector<grpc_pollset*>* /*pollsets*/,
526                              grpc_tcp_server_cb on_accept_cb,
527                              void* on_accept_cb_arg) {
528   grpc_tcp_listener* sp;
529   GPR_ASSERT(on_accept_cb);
530   gpr_mu_lock(&s->mu);
531   GPR_ASSERT(!s->on_accept_cb);
532   GPR_ASSERT(s->active_ports == 0);
533   s->on_accept_cb = on_accept_cb;
534   s->on_accept_cb_arg = on_accept_cb_arg;
535   for (sp = s->head; sp; sp = sp->next) {
536     GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
537     s->active_ports++;
538   }
539   gpr_mu_unlock(&s->mu);
540 }
541
542 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
543                                          unsigned port_index) {
544   return 0;
545 }
546
547 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
548                               unsigned fd_index) {
549   return -1;
550 }
551
552 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
553     grpc_tcp_server* s) {
554   return nullptr;
555 }
556
557 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
558
559 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
560     tcp_server_create,        tcp_server_start,
561     tcp_server_add_port,      tcp_server_create_fd_handler,
562     tcp_server_port_fd_count, tcp_server_port_fd,
563     tcp_server_ref,           tcp_server_shutdown_starting_add,
564     tcp_server_unref,         tcp_server_shutdown_listeners};
565 #endif /* GRPC_WINSOCK_SOCKET */