Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / lib / iomgr / tcp_server_windows.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <inttypes.h>
26 #include <io.h>
27
28 #include <vector>
29
30 #include "absl/strings/str_cat.h"
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/log_windows.h>
35 #include <grpc/support/string_util.h>
36 #include <grpc/support/sync.h>
37 #include <grpc/support/time.h>
38
39 #include "src/core/lib/address_utils/sockaddr_utils.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/iomgr/iocp_windows.h"
42 #include "src/core/lib/iomgr/pollset_windows.h"
43 #include "src/core/lib/iomgr/resolve_address.h"
44 #include "src/core/lib/iomgr/sockaddr.h"
45 #include "src/core/lib/iomgr/socket_windows.h"
46 #include "src/core/lib/iomgr/tcp_server.h"
47 #include "src/core/lib/iomgr/tcp_windows.h"
48 #include "src/core/lib/slice/slice_internal.h"
49
50 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
51
52 /* one listening port */
53 typedef struct grpc_tcp_listener grpc_tcp_listener;
54 struct grpc_tcp_listener {
55   /* This seemingly magic number comes from AcceptEx's documentation. each
56      address buffer needs to have at least 16 more bytes at their end. */
57   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
58   /* This will hold the socket for the next accept. */
59   SOCKET new_socket;
60   /* The listener winsocket. */
61   grpc_winsocket* socket;
62   /* The actual TCP port number. */
63   int port;
64   unsigned port_index;
65   grpc_tcp_server* server;
66   /* The cached AcceptEx for that port. */
67   LPFN_ACCEPTEX AcceptEx;
68   int shutting_down;
69   int outstanding_calls;
70   /* closure for socket notification of accept being ready */
71   grpc_closure on_accept;
72   /* linked list */
73   struct grpc_tcp_listener* next;
74 };
75
76 /* the overall server */
77 struct grpc_tcp_server {
78   gpr_refcount refs;
79   /* Called whenever accept() succeeds on a server port. */
80   grpc_tcp_server_cb on_accept_cb;
81   void* on_accept_cb_arg;
82
83   gpr_mu mu;
84
85   /* active port count: how many ports are actually still listening */
86   int active_ports;
87
88   /* linked list of server ports */
89   grpc_tcp_listener* head;
90   grpc_tcp_listener* tail;
91
92   /* List of closures passed to shutdown_starting_add(). */
93   grpc_closure_list shutdown_starting;
94
95   /* shutdown callback */
96   grpc_closure* shutdown_complete;
97
98   grpc_channel_args* channel_args;
99   grpc_slice_allocator_factory* slice_allocator_factory;
100 };
101
102 /* Public function. Allocates the proper data structures to hold a
103    grpc_tcp_server. */
104 static grpc_error_handle tcp_server_create(
105     grpc_closure* shutdown_complete, const grpc_channel_args* args,
106     grpc_slice_allocator_factory* slice_allocator_factory,
107     grpc_tcp_server** server) {
108   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
109   s->channel_args = grpc_channel_args_copy(args);
110   gpr_ref_init(&s->refs, 1);
111   gpr_mu_init(&s->mu);
112   s->active_ports = 0;
113   s->on_accept_cb = NULL;
114   s->on_accept_cb_arg = NULL;
115   s->head = NULL;
116   s->tail = NULL;
117   s->shutdown_starting.head = NULL;
118   s->shutdown_starting.tail = NULL;
119   s->shutdown_complete = shutdown_complete;
120   s->slice_allocator_factory = slice_allocator_factory;
121   *server = s;
122   return GRPC_ERROR_NONE;
123 }
124
125 static void destroy_server(void* arg, grpc_error_handle error) {
126   grpc_tcp_server* s = (grpc_tcp_server*)arg;
127
128   /* Now that the accepts have been aborted, we can destroy the sockets.
129      The IOCP won't get notified on these, so we can flag them as already
130      closed by the system. */
131   while (s->head) {
132     grpc_tcp_listener* sp = s->head;
133     s->head = sp->next;
134     sp->next = NULL;
135     grpc_winsocket_destroy(sp->socket);
136     gpr_free(sp);
137   }
138   grpc_channel_args_destroy(s->channel_args);
139   gpr_mu_destroy(&s->mu);
140   gpr_free(s);
141 }
142
143 static void finish_shutdown_locked(grpc_tcp_server* s) {
144   if (s->shutdown_complete != NULL) {
145     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
146                             GRPC_ERROR_NONE);
147   }
148
149   grpc_core::ExecCtx::Run(
150       DEBUG_LOCATION,
151       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
152       GRPC_ERROR_NONE);
153 }
154
155 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
156   gpr_ref_non_zero(&s->refs);
157   return s;
158 }
159
160 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
161                                              grpc_closure* shutdown_starting) {
162   gpr_mu_lock(&s->mu);
163   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
164                            GRPC_ERROR_NONE);
165   gpr_mu_unlock(&s->mu);
166 }
167
168 static void tcp_server_destroy(grpc_tcp_server* s) {
169   grpc_tcp_listener* sp;
170   gpr_mu_lock(&s->mu);
171   grpc_slice_allocator_factory_destroy(s->slice_allocator_factory);
172   /* First, shutdown all fd's. This will queue abortion calls for all
173      of the pending accepts due to the normal operation mechanism. */
174   if (s->active_ports == 0) {
175     finish_shutdown_locked(s);
176   } else {
177     for (sp = s->head; sp; sp = sp->next) {
178       sp->shutting_down = 1;
179       grpc_winsocket_shutdown(sp->socket);
180     }
181   }
182   gpr_mu_unlock(&s->mu);
183 }
184
185 static void tcp_server_unref(grpc_tcp_server* s) {
186   if (gpr_unref(&s->refs)) {
187     grpc_tcp_server_shutdown_listeners(s);
188     gpr_mu_lock(&s->mu);
189     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
190     gpr_mu_unlock(&s->mu);
191     tcp_server_destroy(s);
192   }
193 }
194
195 /* Prepare (bind) a recently-created socket for listening. */
196 static grpc_error_handle prepare_socket(SOCKET sock,
197                                         const grpc_resolved_address* addr,
198                                         int* port) {
199   grpc_resolved_address sockname_temp;
200   grpc_error_handle error = GRPC_ERROR_NONE;
201   int sockname_temp_len;
202
203   error = grpc_tcp_prepare_socket(sock);
204   if (error != GRPC_ERROR_NONE) {
205     goto failure;
206   }
207
208   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
209       SOCKET_ERROR) {
210     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
211     goto failure;
212   }
213
214   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
215     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
216     goto failure;
217   }
218
219   sockname_temp_len = sizeof(struct sockaddr_storage);
220   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
221                   &sockname_temp_len) == SOCKET_ERROR) {
222     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
223     goto failure;
224   }
225   sockname_temp.len = (size_t)sockname_temp_len;
226
227   *port = grpc_sockaddr_get_port(&sockname_temp);
228   return GRPC_ERROR_NONE;
229
230 failure:
231   GPR_ASSERT(error != GRPC_ERROR_NONE);
232   grpc_error_set_int(
233       grpc_error_set_str(
234           GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
235               "Failed to prepare server socket", &error, 1),
236           GRPC_ERROR_STR_TARGET_ADDRESS,
237           grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr))),
238       GRPC_ERROR_INT_FD, (intptr_t)sock);
239   GRPC_ERROR_UNREF(error);
240   if (sock != INVALID_SOCKET) closesocket(sock);
241   return error;
242 }
243
244 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
245   sp->shutting_down = 0;
246   GPR_ASSERT(sp->server->active_ports > 0);
247   if (0 == --sp->server->active_ports) {
248     finish_shutdown_locked(sp->server);
249   }
250 }
251
252 /* In order to do an async accept, we need to create a socket first which
253    will be the one assigned to the new incoming connection. */
254 static grpc_error_handle start_accept_locked(grpc_tcp_listener* port) {
255   SOCKET sock = INVALID_SOCKET;
256   BOOL success;
257   DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
258   DWORD bytes_received = 0;
259   grpc_error_handle error = GRPC_ERROR_NONE;
260
261   if (port->shutting_down) {
262     return GRPC_ERROR_NONE;
263   }
264
265   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
266                    grpc_get_default_wsa_socket_flags());
267   if (sock == INVALID_SOCKET) {
268     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
269     goto failure;
270   }
271
272   error = grpc_tcp_prepare_socket(sock);
273   if (error != GRPC_ERROR_NONE) goto failure;
274
275   /* Start the "accept" asynchronously. */
276   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
277                            addrlen, addrlen, &bytes_received,
278                            &port->socket->read_info.overlapped);
279
280   /* It is possible to get an accept immediately without delay. However, we
281      will still get an IOCP notification for it. So let's just ignore it. */
282   if (!success) {
283     int last_error = WSAGetLastError();
284     if (last_error != ERROR_IO_PENDING) {
285       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
286       goto failure;
287     }
288   }
289
290   /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
291      immediately process an accept that happened in the meantime. */
292   port->new_socket = sock;
293   grpc_socket_notify_on_read(port->socket, &port->on_accept);
294   port->outstanding_calls++;
295   return error;
296
297 failure:
298   GPR_ASSERT(error != GRPC_ERROR_NONE);
299   if (sock != INVALID_SOCKET) closesocket(sock);
300   return error;
301 }
302
303 /* Event manager callback when reads are ready. */
304 static void on_accept(void* arg, grpc_error_handle error) {
305   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
306   SOCKET sock = sp->new_socket;
307   grpc_winsocket_callback_info* info = &sp->socket->read_info;
308   grpc_endpoint* ep = NULL;
309   grpc_resolved_address peer_name;
310   DWORD transfered_bytes;
311   DWORD flags;
312   BOOL wsa_success;
313   int err;
314
315   gpr_mu_lock(&sp->server->mu);
316
317   peer_name.len = sizeof(struct sockaddr_storage);
318
319   /* The general mechanism for shutting down is to queue abortion calls. While
320      this is necessary in the read/write case, it's useless for the accept
321      case. We only need to adjust the pending callback count */
322   if (error != GRPC_ERROR_NONE) {
323     gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
324             grpc_error_std_string(error).c_str());
325
326     gpr_mu_unlock(&sp->server->mu);
327     return;
328   }
329   /* The IOCP notified us of a completed operation. Let's grab the results,
330      and act accordingly. */
331   transfered_bytes = 0;
332   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
333                                        &transfered_bytes, FALSE, &flags);
334   if (!wsa_success) {
335     if (!sp->shutting_down) {
336       char* utf8_message = gpr_format_message(WSAGetLastError());
337       gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
338       gpr_free(utf8_message);
339     }
340     closesocket(sock);
341   } else {
342     if (!sp->shutting_down) {
343       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
344                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
345       if (err) {
346         char* utf8_message = gpr_format_message(WSAGetLastError());
347         gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
348         gpr_free(utf8_message);
349       }
350       int peer_name_len = (int)peer_name.len;
351       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
352       peer_name.len = (size_t)peer_name_len;
353       std::string peer_name_string;
354       if (!err) {
355         peer_name_string = grpc_sockaddr_to_uri(&peer_name);
356       } else {
357         char* utf8_message = gpr_format_message(WSAGetLastError());
358         gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
359         gpr_free(utf8_message);
360       }
361       std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
362       ep = grpc_tcp_create(
363           grpc_winsocket_create(sock, fd_name.c_str()),
364           sp->server->channel_args, peer_name_string.c_str(),
365           grpc_slice_allocator_factory_create_slice_allocator(
366               sp->server->slice_allocator_factory, peer_name_string));
367     } else {
368       closesocket(sock);
369     }
370   }
371
372   /* The only time we should call our callback, is where we successfully
373      managed to accept a connection, and created an endpoint. */
374   if (ep) {
375     // Create acceptor.
376     grpc_tcp_server_acceptor* acceptor =
377         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
378     acceptor->from_server = sp->server;
379     acceptor->port_index = sp->port_index;
380     acceptor->fd_index = 0;
381     acceptor->external_connection = false;
382     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
383   }
384   /* As we were notified from the IOCP of one and exactly one accept,
385      the former socked we created has now either been destroy or assigned
386      to the new connection. We need to create a new one for the next
387      connection. */
388   GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
389   if (0 == --sp->outstanding_calls) {
390     decrement_active_ports_and_notify_locked(sp);
391   }
392   gpr_mu_unlock(&sp->server->mu);
393 }
394
395 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
396                                               const grpc_resolved_address* addr,
397                                               unsigned port_index,
398                                               grpc_tcp_listener** listener) {
399   grpc_tcp_listener* sp = NULL;
400   int port = -1;
401   int status;
402   GUID guid = WSAID_ACCEPTEX;
403   DWORD ioctl_num_bytes;
404   LPFN_ACCEPTEX AcceptEx;
405   grpc_error_handle error = GRPC_ERROR_NONE;
406
407   /* We need to grab the AcceptEx pointer for that port, as it may be
408      interface-dependent. We'll cache it to avoid doing that again. */
409   status =
410       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
411                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
412
413   if (status != 0) {
414     char* utf8_message = gpr_format_message(WSAGetLastError());
415     gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
416     gpr_free(utf8_message);
417     closesocket(sock);
418     return GRPC_ERROR_NONE;
419   }
420
421   error = prepare_socket(sock, addr, &port);
422   if (error != GRPC_ERROR_NONE) {
423     return error;
424   }
425
426   GPR_ASSERT(port >= 0);
427   gpr_mu_lock(&s->mu);
428   GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
429   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
430   sp->next = NULL;
431   if (s->head == NULL) {
432     s->head = sp;
433   } else {
434     s->tail->next = sp;
435   }
436   s->tail = sp;
437   sp->server = s;
438   sp->socket = grpc_winsocket_create(sock, "listener");
439   sp->shutting_down = 0;
440   sp->outstanding_calls = 0;
441   sp->AcceptEx = AcceptEx;
442   sp->new_socket = INVALID_SOCKET;
443   sp->port = port;
444   sp->port_index = port_index;
445   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
446   GPR_ASSERT(sp->socket);
447   gpr_mu_unlock(&s->mu);
448   *listener = sp;
449
450   return GRPC_ERROR_NONE;
451 }
452
453 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
454                                              const grpc_resolved_address* addr,
455                                              int* port) {
456   grpc_tcp_listener* sp = NULL;
457   SOCKET sock;
458   grpc_resolved_address addr6_v4mapped;
459   grpc_resolved_address wildcard;
460   grpc_resolved_address* allocated_addr = NULL;
461   grpc_resolved_address sockname_temp;
462   unsigned port_index = 0;
463   grpc_error_handle error = GRPC_ERROR_NONE;
464
465   if (s->tail != NULL) {
466     port_index = s->tail->port_index + 1;
467   }
468
469   /* Check if this is a wildcard port, and if so, try to keep the port the same
470      as some previously created listener. */
471   if (grpc_sockaddr_get_port(addr) == 0) {
472     for (sp = s->head; sp; sp = sp->next) {
473       int sockname_temp_len = sizeof(struct sockaddr_storage);
474       if (0 == getsockname(sp->socket->socket,
475                            (grpc_sockaddr*)sockname_temp.addr,
476                            &sockname_temp_len)) {
477         sockname_temp.len = (size_t)sockname_temp_len;
478         *port = grpc_sockaddr_get_port(&sockname_temp);
479         if (*port > 0) {
480           allocated_addr =
481               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
482           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
483           grpc_sockaddr_set_port(allocated_addr, *port);
484           addr = allocated_addr;
485           break;
486         }
487       }
488     }
489   }
490
491   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
492     addr = &addr6_v4mapped;
493   }
494
495   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
496   if (grpc_sockaddr_is_wildcard(addr, port)) {
497     grpc_sockaddr_make_wildcard6(*port, &wildcard);
498
499     addr = &wildcard;
500   }
501
502   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
503                    grpc_get_default_wsa_socket_flags());
504   if (sock == INVALID_SOCKET) {
505     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
506     goto done;
507   }
508
509   error = add_socket_to_server(s, sock, addr, port_index, &sp);
510
511 done:
512   gpr_free(allocated_addr);
513
514   if (error != GRPC_ERROR_NONE) {
515     grpc_error_handle error_out =
516         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
517             "Failed to add port to server", &error, 1);
518     GRPC_ERROR_UNREF(error);
519     error = error_out;
520     *port = -1;
521   } else {
522     GPR_ASSERT(sp != NULL);
523     *port = sp->port;
524   }
525   return error;
526 }
527
528 static void tcp_server_start(grpc_tcp_server* s,
529                              const std::vector<grpc_pollset*>* /*pollsets*/,
530                              grpc_tcp_server_cb on_accept_cb,
531                              void* on_accept_cb_arg) {
532   grpc_tcp_listener* sp;
533   GPR_ASSERT(on_accept_cb);
534   gpr_mu_lock(&s->mu);
535   GPR_ASSERT(!s->on_accept_cb);
536   GPR_ASSERT(s->active_ports == 0);
537   s->on_accept_cb = on_accept_cb;
538   s->on_accept_cb_arg = on_accept_cb_arg;
539   for (sp = s->head; sp; sp = sp->next) {
540     GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
541     s->active_ports++;
542   }
543   gpr_mu_unlock(&s->mu);
544 }
545
546 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
547                                          unsigned port_index) {
548   return 0;
549 }
550
551 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
552                               unsigned fd_index) {
553   return -1;
554 }
555
556 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
557     grpc_tcp_server* s) {
558   return nullptr;
559 }
560
561 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
562
563 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
564     tcp_server_create,        tcp_server_start,
565     tcp_server_add_port,      tcp_server_create_fd_handler,
566     tcp_server_port_fd_count, tcp_server_port_fd,
567     tcp_server_ref,           tcp_server_shutdown_starting_add,
568     tcp_server_unref,         tcp_server_shutdown_listeners};
569 #endif /* GRPC_WINSOCK_SOCKET */