0052175ae467463551dbd3365beb484d26abbeaf
[platform/upstream/grpc.git] / src / core / lib / iomgr / tcp_server_windows.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include "src/core/lib/iomgr/sockaddr.h"
26
27 #include <inttypes.h>
28 #include <io.h>
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/iomgr/iocp_windows.h"
39 #include "src/core/lib/iomgr/pollset_windows.h"
40 #include "src/core/lib/iomgr/resolve_address.h"
41 #include "src/core/lib/iomgr/sockaddr_utils.h"
42 #include "src/core/lib/iomgr/socket_windows.h"
43 #include "src/core/lib/iomgr/tcp_server.h"
44 #include "src/core/lib/iomgr/tcp_windows.h"
45
46 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
47
48 /* one listening port */
49 typedef struct grpc_tcp_listener grpc_tcp_listener;
50 struct grpc_tcp_listener {
51   /* This seemingly magic number comes from AcceptEx's documentation. each
52      address buffer needs to have at least 16 more bytes at their end. */
53   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
54   /* This will hold the socket for the next accept. */
55   SOCKET new_socket;
56   /* The listener winsocket. */
57   grpc_winsocket* socket;
58   /* The actual TCP port number. */
59   int port;
60   unsigned port_index;
61   grpc_tcp_server* server;
62   /* The cached AcceptEx for that port. */
63   LPFN_ACCEPTEX AcceptEx;
64   int shutting_down;
65   int outstanding_calls;
66   /* closure for socket notification of accept being ready */
67   grpc_closure on_accept;
68   /* linked list */
69   struct grpc_tcp_listener* next;
70 };
71
72 /* the overall server */
73 struct grpc_tcp_server {
74   gpr_refcount refs;
75   /* Called whenever accept() succeeds on a server port. */
76   grpc_tcp_server_cb on_accept_cb;
77   void* on_accept_cb_arg;
78
79   gpr_mu mu;
80
81   /* active port count: how many ports are actually still listening */
82   int active_ports;
83
84   /* linked list of server ports */
85   grpc_tcp_listener* head;
86   grpc_tcp_listener* tail;
87
88   /* List of closures passed to shutdown_starting_add(). */
89   grpc_closure_list shutdown_starting;
90
91   /* shutdown callback */
92   grpc_closure* shutdown_complete;
93
94   grpc_channel_args* channel_args;
95 };
96
97 /* Public function. Allocates the proper data structures to hold a
98    grpc_tcp_server. */
99 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
100                                      const grpc_channel_args* args,
101                                      grpc_tcp_server** server) {
102   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
103   s->channel_args = grpc_channel_args_copy(args);
104   gpr_ref_init(&s->refs, 1);
105   gpr_mu_init(&s->mu);
106   s->active_ports = 0;
107   s->on_accept_cb = NULL;
108   s->on_accept_cb_arg = NULL;
109   s->head = NULL;
110   s->tail = NULL;
111   s->shutdown_starting.head = NULL;
112   s->shutdown_starting.tail = NULL;
113   s->shutdown_complete = shutdown_complete;
114   *server = s;
115   return GRPC_ERROR_NONE;
116 }
117
118 static void destroy_server(void* arg, grpc_error* error) {
119   grpc_tcp_server* s = (grpc_tcp_server*)arg;
120
121   /* Now that the accepts have been aborted, we can destroy the sockets.
122      The IOCP won't get notified on these, so we can flag them as already
123      closed by the system. */
124   while (s->head) {
125     grpc_tcp_listener* sp = s->head;
126     s->head = sp->next;
127     sp->next = NULL;
128     grpc_winsocket_destroy(sp->socket);
129     gpr_free(sp);
130   }
131   grpc_channel_args_destroy(s->channel_args);
132   gpr_mu_destroy(&s->mu);
133   gpr_free(s);
134 }
135
136 static void finish_shutdown_locked(grpc_tcp_server* s) {
137   if (s->shutdown_complete != NULL) {
138     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
139                             GRPC_ERROR_NONE);
140   }
141
142   grpc_core::ExecCtx::Run(
143       DEBUG_LOCATION,
144       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
145       GRPC_ERROR_NONE);
146 }
147
148 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
149   gpr_ref_non_zero(&s->refs);
150   return s;
151 }
152
153 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
154                                              grpc_closure* shutdown_starting) {
155   gpr_mu_lock(&s->mu);
156   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
157                            GRPC_ERROR_NONE);
158   gpr_mu_unlock(&s->mu);
159 }
160
161 static void tcp_server_destroy(grpc_tcp_server* s) {
162   grpc_tcp_listener* sp;
163   gpr_mu_lock(&s->mu);
164
165   /* First, shutdown all fd's. This will queue abortion calls for all
166      of the pending accepts due to the normal operation mechanism. */
167   if (s->active_ports == 0) {
168     finish_shutdown_locked(s);
169   } else {
170     for (sp = s->head; sp; sp = sp->next) {
171       sp->shutting_down = 1;
172       grpc_winsocket_shutdown(sp->socket);
173     }
174   }
175   gpr_mu_unlock(&s->mu);
176 }
177
178 static void tcp_server_unref(grpc_tcp_server* s) {
179   if (gpr_unref(&s->refs)) {
180     grpc_tcp_server_shutdown_listeners(s);
181     gpr_mu_lock(&s->mu);
182     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
183     gpr_mu_unlock(&s->mu);
184     tcp_server_destroy(s);
185   }
186 }
187
188 /* Prepare (bind) a recently-created socket for listening. */
189 static grpc_error* prepare_socket(SOCKET sock,
190                                   const grpc_resolved_address* addr,
191                                   int* port) {
192   grpc_resolved_address sockname_temp;
193   grpc_error* error = GRPC_ERROR_NONE;
194   int sockname_temp_len;
195
196   error = grpc_tcp_prepare_socket(sock);
197   if (error != GRPC_ERROR_NONE) {
198     goto failure;
199   }
200
201   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
202       SOCKET_ERROR) {
203     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
204     goto failure;
205   }
206
207   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
208     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
209     goto failure;
210   }
211
212   sockname_temp_len = sizeof(struct sockaddr_storage);
213   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
214                   &sockname_temp_len) == SOCKET_ERROR) {
215     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
216     goto failure;
217   }
218   sockname_temp.len = (size_t)sockname_temp_len;
219
220   *port = grpc_sockaddr_get_port(&sockname_temp);
221   return GRPC_ERROR_NONE;
222
223 failure:
224   GPR_ASSERT(error != GRPC_ERROR_NONE);
225   char* tgtaddr = grpc_sockaddr_to_uri(addr);
226   grpc_error_set_int(
227       grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
228                              "Failed to prepare server socket", &error, 1),
229                          GRPC_ERROR_STR_TARGET_ADDRESS,
230                          grpc_slice_from_copied_string(tgtaddr)),
231       GRPC_ERROR_INT_FD, (intptr_t)sock);
232   gpr_free(tgtaddr);
233   GRPC_ERROR_UNREF(error);
234   if (sock != INVALID_SOCKET) closesocket(sock);
235   return error;
236 }
237
238 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
239   sp->shutting_down = 0;
240   GPR_ASSERT(sp->server->active_ports > 0);
241   if (0 == --sp->server->active_ports) {
242     finish_shutdown_locked(sp->server);
243   }
244 }
245
246 /* In order to do an async accept, we need to create a socket first which
247    will be the one assigned to the new incoming connection. */
248 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
249   SOCKET sock = INVALID_SOCKET;
250   BOOL success;
251   DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
252   DWORD bytes_received = 0;
253   grpc_error* error = GRPC_ERROR_NONE;
254
255   if (port->shutting_down) {
256     return GRPC_ERROR_NONE;
257   }
258
259   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
260                    grpc_get_default_wsa_socket_flags());
261   if (sock == INVALID_SOCKET) {
262     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
263     goto failure;
264   }
265
266   error = grpc_tcp_prepare_socket(sock);
267   if (error != GRPC_ERROR_NONE) goto failure;
268
269   /* Start the "accept" asynchronously. */
270   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
271                            addrlen, addrlen, &bytes_received,
272                            &port->socket->read_info.overlapped);
273
274   /* It is possible to get an accept immediately without delay. However, we
275      will still get an IOCP notification for it. So let's just ignore it. */
276   if (!success) {
277     int last_error = WSAGetLastError();
278     if (last_error != ERROR_IO_PENDING) {
279       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
280       goto failure;
281     }
282   }
283
284   /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
285      immediately process an accept that happened in the meantime. */
286   port->new_socket = sock;
287   grpc_socket_notify_on_read(port->socket, &port->on_accept);
288   port->outstanding_calls++;
289   return error;
290
291 failure:
292   GPR_ASSERT(error != GRPC_ERROR_NONE);
293   if (sock != INVALID_SOCKET) closesocket(sock);
294   return error;
295 }
296
297 /* Event manager callback when reads are ready. */
298 static void on_accept(void* arg, grpc_error* error) {
299   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
300   SOCKET sock = sp->new_socket;
301   grpc_winsocket_callback_info* info = &sp->socket->read_info;
302   grpc_endpoint* ep = NULL;
303   grpc_resolved_address peer_name;
304   char* peer_name_string;
305   char* fd_name;
306   DWORD transfered_bytes;
307   DWORD flags;
308   BOOL wsa_success;
309   int err;
310
311   gpr_mu_lock(&sp->server->mu);
312
313   peer_name.len = sizeof(struct sockaddr_storage);
314
315   /* The general mechanism for shutting down is to queue abortion calls. While
316      this is necessary in the read/write case, it's useless for the accept
317      case. We only need to adjust the pending callback count */
318   if (error != GRPC_ERROR_NONE) {
319     const char* msg = grpc_error_string(error);
320     gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
321
322     gpr_mu_unlock(&sp->server->mu);
323     return;
324   }
325
326   /* The IOCP notified us of a completed operation. Let's grab the results,
327      and act accordingly. */
328   transfered_bytes = 0;
329   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
330                                        &transfered_bytes, FALSE, &flags);
331   if (!wsa_success) {
332     if (!sp->shutting_down) {
333       char* utf8_message = gpr_format_message(WSAGetLastError());
334       gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
335       gpr_free(utf8_message);
336     }
337     closesocket(sock);
338   } else {
339     if (!sp->shutting_down) {
340       peer_name_string = NULL;
341       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
342                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
343       if (err) {
344         char* utf8_message = gpr_format_message(WSAGetLastError());
345         gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
346         gpr_free(utf8_message);
347       }
348       int peer_name_len = (int)peer_name.len;
349       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
350       peer_name.len = (size_t)peer_name_len;
351       if (!err) {
352         peer_name_string = grpc_sockaddr_to_uri(&peer_name);
353       } else {
354         char* utf8_message = gpr_format_message(WSAGetLastError());
355         gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
356         gpr_free(utf8_message);
357       }
358       gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
359       ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
360                            sp->server->channel_args, peer_name_string);
361       gpr_free(fd_name);
362       gpr_free(peer_name_string);
363     } else {
364       closesocket(sock);
365     }
366   }
367
368   /* The only time we should call our callback, is where we successfully
369      managed to accept a connection, and created an endpoint. */
370   if (ep) {
371     // Create acceptor.
372     grpc_tcp_server_acceptor* acceptor =
373         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
374     acceptor->from_server = sp->server;
375     acceptor->port_index = sp->port_index;
376     acceptor->fd_index = 0;
377     acceptor->external_connection = false;
378     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
379   }
380   /* As we were notified from the IOCP of one and exactly one accept,
381      the former socked we created has now either been destroy or assigned
382      to the new connection. We need to create a new one for the next
383      connection. */
384   GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
385   if (0 == --sp->outstanding_calls) {
386     decrement_active_ports_and_notify_locked(sp);
387   }
388   gpr_mu_unlock(&sp->server->mu);
389 }
390
391 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
392                                         const grpc_resolved_address* addr,
393                                         unsigned port_index,
394                                         grpc_tcp_listener** listener) {
395   grpc_tcp_listener* sp = NULL;
396   int port = -1;
397   int status;
398   GUID guid = WSAID_ACCEPTEX;
399   DWORD ioctl_num_bytes;
400   LPFN_ACCEPTEX AcceptEx;
401   grpc_error* error = GRPC_ERROR_NONE;
402
403   /* We need to grab the AcceptEx pointer for that port, as it may be
404      interface-dependent. We'll cache it to avoid doing that again. */
405   status =
406       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
407                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
408
409   if (status != 0) {
410     char* utf8_message = gpr_format_message(WSAGetLastError());
411     gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
412     gpr_free(utf8_message);
413     closesocket(sock);
414     return GRPC_ERROR_NONE;
415   }
416
417   error = prepare_socket(sock, addr, &port);
418   if (error != GRPC_ERROR_NONE) {
419     return error;
420   }
421
422   GPR_ASSERT(port >= 0);
423   gpr_mu_lock(&s->mu);
424   GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
425   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
426   sp->next = NULL;
427   if (s->head == NULL) {
428     s->head = sp;
429   } else {
430     s->tail->next = sp;
431   }
432   s->tail = sp;
433   sp->server = s;
434   sp->socket = grpc_winsocket_create(sock, "listener");
435   sp->shutting_down = 0;
436   sp->outstanding_calls = 0;
437   sp->AcceptEx = AcceptEx;
438   sp->new_socket = INVALID_SOCKET;
439   sp->port = port;
440   sp->port_index = port_index;
441   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
442   GPR_ASSERT(sp->socket);
443   gpr_mu_unlock(&s->mu);
444   *listener = sp;
445
446   return GRPC_ERROR_NONE;
447 }
448
449 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
450                                        const grpc_resolved_address* addr,
451                                        int* port) {
452   grpc_tcp_listener* sp = NULL;
453   SOCKET sock;
454   grpc_resolved_address addr6_v4mapped;
455   grpc_resolved_address wildcard;
456   grpc_resolved_address* allocated_addr = NULL;
457   grpc_resolved_address sockname_temp;
458   unsigned port_index = 0;
459   grpc_error* error = GRPC_ERROR_NONE;
460
461   if (s->tail != NULL) {
462     port_index = s->tail->port_index + 1;
463   }
464
465   /* Check if this is a wildcard port, and if so, try to keep the port the same
466      as some previously created listener. */
467   if (grpc_sockaddr_get_port(addr) == 0) {
468     for (sp = s->head; sp; sp = sp->next) {
469       int sockname_temp_len = sizeof(struct sockaddr_storage);
470       if (0 == getsockname(sp->socket->socket,
471                            (grpc_sockaddr*)sockname_temp.addr,
472                            &sockname_temp_len)) {
473         sockname_temp.len = (size_t)sockname_temp_len;
474         *port = grpc_sockaddr_get_port(&sockname_temp);
475         if (*port > 0) {
476           allocated_addr =
477               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
478           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
479           grpc_sockaddr_set_port(allocated_addr, *port);
480           addr = allocated_addr;
481           break;
482         }
483       }
484     }
485   }
486
487   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
488     addr = &addr6_v4mapped;
489   }
490
491   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
492   if (grpc_sockaddr_is_wildcard(addr, port)) {
493     grpc_sockaddr_make_wildcard6(*port, &wildcard);
494
495     addr = &wildcard;
496   }
497
498   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
499                    grpc_get_default_wsa_socket_flags());
500   if (sock == INVALID_SOCKET) {
501     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
502     goto done;
503   }
504
505   error = add_socket_to_server(s, sock, addr, port_index, &sp);
506
507 done:
508   gpr_free(allocated_addr);
509
510   if (error != GRPC_ERROR_NONE) {
511     grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
512         "Failed to add port to server", &error, 1);
513     GRPC_ERROR_UNREF(error);
514     error = error_out;
515     *port = -1;
516   } else {
517     GPR_ASSERT(sp != NULL);
518     *port = sp->port;
519   }
520   return error;
521 }
522
523 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
524                              size_t pollset_count,
525                              grpc_tcp_server_cb on_accept_cb,
526                              void* on_accept_cb_arg) {
527   grpc_tcp_listener* sp;
528   GPR_ASSERT(on_accept_cb);
529   gpr_mu_lock(&s->mu);
530   GPR_ASSERT(!s->on_accept_cb);
531   GPR_ASSERT(s->active_ports == 0);
532   s->on_accept_cb = on_accept_cb;
533   s->on_accept_cb_arg = on_accept_cb_arg;
534   for (sp = s->head; sp; sp = sp->next) {
535     GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
536     s->active_ports++;
537   }
538   gpr_mu_unlock(&s->mu);
539 }
540
541 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
542                                          unsigned port_index) {
543   return 0;
544 }
545
546 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
547                               unsigned fd_index) {
548   return -1;
549 }
550
551 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
552     grpc_tcp_server* s) {
553   return nullptr;
554 }
555
556 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
557
558 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
559     tcp_server_create,        tcp_server_start,
560     tcp_server_add_port,      tcp_server_create_fd_handler,
561     tcp_server_port_fd_count, tcp_server_port_fd,
562     tcp_server_ref,           tcp_server_shutdown_starting_add,
563     tcp_server_unref,         tcp_server_shutdown_listeners};
564 #endif /* GRPC_WINSOCK_SOCKET */