3fb8af9328cd263685f1f55506e6ec22878ec650
[platform/upstream/nodejs.git] / deps / uv / src / unix / udp.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31
32 static void uv__udp_run_completed(uv_udp_t* handle);
33 static void uv__udp_run_pending(uv_udp_t* handle);
34 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
35 static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
36 static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
37 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
38 static int uv__send(uv_udp_send_t* req,
39                     uv_udp_t* handle,
40                     uv_buf_t bufs[],
41                     int bufcnt,
42                     struct sockaddr* addr,
43                     socklen_t addrlen,
44                     uv_udp_send_cb send_cb);
45
46
47 void uv__udp_close(uv_udp_t* handle) {
48   uv__io_close(handle->loop, &handle->io_watcher);
49   uv__handle_stop(handle);
50   close(handle->io_watcher.fd);
51   handle->io_watcher.fd = -1;
52 }
53
54
55 void uv__udp_finish_close(uv_udp_t* handle) {
56   uv_udp_send_t* req;
57   ngx_queue_t* q;
58
59   assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
60   assert(handle->io_watcher.fd == -1);
61
62   uv__udp_run_completed(handle);
63
64   while (!ngx_queue_empty(&handle->write_queue)) {
65     q = ngx_queue_head(&handle->write_queue);
66     ngx_queue_remove(q);
67
68     req = ngx_queue_data(q, uv_udp_send_t, queue);
69     uv__req_unregister(handle->loop, req);
70
71     if (req->bufs != req->bufsml)
72       free(req->bufs);
73     req->bufs = NULL;
74
75     if (req->send_cb) {
76       uv__set_artificial_error(handle->loop, UV_ECANCELED);
77       req->send_cb(req, -1);
78     }
79   }
80
81   /* Now tear down the handle. */
82   handle->recv_cb = NULL;
83   handle->alloc_cb = NULL;
84   /* but _do not_ touch close_cb */
85 }
86
87
88 static void uv__udp_run_pending(uv_udp_t* handle) {
89   uv_udp_send_t* req;
90   ngx_queue_t* q;
91   struct msghdr h;
92   ssize_t size;
93
94   while (!ngx_queue_empty(&handle->write_queue)) {
95     q = ngx_queue_head(&handle->write_queue);
96     assert(q != NULL);
97
98     req = ngx_queue_data(q, uv_udp_send_t, queue);
99     assert(req != NULL);
100
101     memset(&h, 0, sizeof h);
102     h.msg_name = &req->addr;
103     h.msg_namelen = (req->addr.sin6_family == AF_INET6 ?
104       sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
105     h.msg_iov = (struct iovec*)req->bufs;
106     h.msg_iovlen = req->bufcnt;
107
108     do {
109       size = sendmsg(handle->io_watcher.fd, &h, 0);
110     }
111     while (size == -1 && errno == EINTR);
112
113     /* TODO try to write once or twice more in the
114      * hope that the socket becomes readable again?
115      */
116     if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
117       break;
118
119     req->status = (size == -1 ? -errno : size);
120
121 #ifndef NDEBUG
122     /* Sanity check. */
123     if (size != -1) {
124       ssize_t nbytes;
125       int i;
126
127       for (nbytes = i = 0; i < req->bufcnt; i++)
128         nbytes += req->bufs[i].len;
129
130       assert(size == nbytes);
131     }
132 #endif
133
134     /* Sending a datagram is an atomic operation: either all data
135      * is written or nothing is (and EMSGSIZE is raised). That is
136      * why we don't handle partial writes. Just pop the request
137      * off the write queue and onto the completed queue, done.
138      */
139     ngx_queue_remove(&req->queue);
140     ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
141   }
142 }
143
144
145 static void uv__udp_run_completed(uv_udp_t* handle) {
146   uv_udp_send_t* req;
147   ngx_queue_t* q;
148
149   while (!ngx_queue_empty(&handle->write_completed_queue)) {
150     q = ngx_queue_head(&handle->write_completed_queue);
151     ngx_queue_remove(q);
152
153     req = ngx_queue_data(q, uv_udp_send_t, queue);
154     uv__req_unregister(handle->loop, req);
155
156     if (req->bufs != req->bufsml)
157       free(req->bufs);
158     req->bufs = NULL;
159
160     if (req->send_cb == NULL)
161       continue;
162
163     /* req->status >= 0 == bytes written
164      * req->status <  0 == errno
165      */
166     if (req->status >= 0) {
167       req->send_cb(req, 0);
168     }
169     else {
170       uv__set_sys_error(handle->loop, -req->status);
171       req->send_cb(req, -1);
172     }
173   }
174 }
175
176
177 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
178   if (revents & UV__POLLIN)
179     uv__udp_recvmsg(loop, w, revents);
180
181   if (revents & UV__POLLOUT)
182     uv__udp_sendmsg(loop, w, revents);
183 }
184
185
186 static void uv__udp_recvmsg(uv_loop_t* loop,
187                             uv__io_t* w,
188                             unsigned int revents) {
189   struct sockaddr_storage peer;
190   struct msghdr h;
191   uv_udp_t* handle;
192   ssize_t nread;
193   uv_buf_t buf;
194   int flags;
195   int count;
196
197   handle = container_of(w, uv_udp_t, io_watcher);
198   assert(handle->type == UV_UDP);
199   assert(revents & UV__POLLIN);
200
201   assert(handle->recv_cb != NULL);
202   assert(handle->alloc_cb != NULL);
203
204   /* Prevent loop starvation when the data comes in as fast as (or faster than)
205    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
206    */
207   count = 32;
208
209   memset(&h, 0, sizeof(h));
210   h.msg_name = &peer;
211
212   do {
213     buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
214     assert(buf.len > 0);
215     assert(buf.base != NULL);
216
217     h.msg_namelen = sizeof(peer);
218     h.msg_iov = (void*) &buf;
219     h.msg_iovlen = 1;
220
221     do {
222       nread = recvmsg(handle->io_watcher.fd, &h, 0);
223     }
224     while (nread == -1 && errno == EINTR);
225
226     if (nread == -1) {
227       if (errno == EAGAIN || errno == EWOULDBLOCK) {
228         uv__set_sys_error(handle->loop, EAGAIN);
229         handle->recv_cb(handle, 0, buf, NULL, 0);
230       }
231       else {
232         uv__set_sys_error(handle->loop, errno);
233         handle->recv_cb(handle, -1, buf, NULL, 0);
234       }
235     }
236     else {
237       flags = 0;
238
239       if (h.msg_flags & MSG_TRUNC)
240         flags |= UV_UDP_PARTIAL;
241
242       handle->recv_cb(handle,
243                       nread,
244                       buf,
245                       (struct sockaddr*)&peer,
246                       flags);
247     }
248   }
249   /* recv_cb callback may decide to pause or close the handle */
250   while (nread != -1
251       && count-- > 0
252       && handle->io_watcher.fd != -1
253       && handle->recv_cb != NULL);
254 }
255
256
257 static void uv__udp_sendmsg(uv_loop_t* loop,
258                             uv__io_t* w,
259                             unsigned int revents) {
260   uv_udp_t* handle;
261
262   handle = container_of(w, uv_udp_t, io_watcher);
263   assert(handle->type == UV_UDP);
264   assert(revents & UV__POLLOUT);
265
266   assert(!ngx_queue_empty(&handle->write_queue)
267       || !ngx_queue_empty(&handle->write_completed_queue));
268
269   /* Write out pending data first. */
270   uv__udp_run_pending(handle);
271
272   /* Drain 'request completed' queue. */
273   uv__udp_run_completed(handle);
274
275   if (!ngx_queue_empty(&handle->write_completed_queue)) {
276     /* Schedule completion callbacks. */
277     uv__io_feed(handle->loop, &handle->io_watcher);
278   }
279   else if (ngx_queue_empty(&handle->write_queue)) {
280     /* Pending queue and completion queue empty, stop watcher. */
281     uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
282
283     if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
284       uv__handle_stop(handle);
285   }
286 }
287
288
289 static int uv__bind(uv_udp_t* handle,
290                     int domain,
291                     struct sockaddr* addr,
292                     socklen_t len,
293                     unsigned flags) {
294   int saved_errno;
295   int status;
296   int yes;
297   int fd;
298
299   saved_errno = errno;
300   status = -1;
301   fd = -1;
302
303   /* Check for bad flags. */
304   if (flags & ~UV_UDP_IPV6ONLY) {
305     uv__set_sys_error(handle->loop, EINVAL);
306     goto out;
307   }
308
309   /* Cannot set IPv6-only mode on non-IPv6 socket. */
310   if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
311     uv__set_sys_error(handle->loop, EINVAL);
312     goto out;
313   }
314
315   if (handle->io_watcher.fd == -1) {
316     if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
317       uv__set_sys_error(handle->loop, errno);
318       goto out;
319     }
320     handle->io_watcher.fd = fd;
321   }
322
323   fd = handle->io_watcher.fd;
324   yes = 1;
325   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
326     uv__set_sys_error(handle->loop, errno);
327     goto out;
328   }
329
330   /* On the BSDs, SO_REUSEADDR lets you reuse an address that's in the TIME_WAIT
331    * state (i.e. was until recently tied to a socket) while SO_REUSEPORT lets
332    * multiple processes bind to the same address. Yes, it's something of a
333    * misnomer but then again, SO_REUSEADDR was already taken.
334    *
335    * None of the above applies to Linux: SO_REUSEADDR implies SO_REUSEPORT on
336    * Linux and hence it does not have SO_REUSEPORT at all.
337    */
338 #ifdef SO_REUSEPORT
339   yes = 1;
340   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof yes) == -1) {
341     uv__set_sys_error(handle->loop, errno);
342     goto out;
343   }
344 #endif
345
346   if (flags & UV_UDP_IPV6ONLY) {
347 #ifdef IPV6_V6ONLY
348     yes = 1;
349     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
350       uv__set_sys_error(handle->loop, errno);
351       goto out;
352     }
353 #else
354     uv__set_sys_error(handle->loop, ENOTSUP);
355     goto out;
356 #endif
357   }
358
359   if (bind(fd, addr, len) == -1) {
360     uv__set_sys_error(handle->loop, errno);
361     goto out;
362   }
363
364   handle->io_watcher.fd = fd;
365   status = 0;
366
367 out:
368   if (status) {
369     close(handle->io_watcher.fd);
370     handle->io_watcher.fd = -1;
371   }
372
373   errno = saved_errno;
374   return status;
375 }
376
377
378 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
379   unsigned char taddr[sizeof(struct sockaddr_in6)];
380   socklen_t addrlen;
381
382   assert(domain == AF_INET || domain == AF_INET6);
383
384   if (handle->io_watcher.fd != -1)
385     return 0;
386
387   switch (domain) {
388   case AF_INET:
389   {
390     struct sockaddr_in* addr = (void*)&taddr;
391     memset(addr, 0, sizeof *addr);
392     addr->sin_family = AF_INET;
393     addr->sin_addr.s_addr = INADDR_ANY;
394     addrlen = sizeof *addr;
395     break;
396   }
397   case AF_INET6:
398   {
399     struct sockaddr_in6* addr = (void*)&taddr;
400     memset(addr, 0, sizeof *addr);
401     addr->sin6_family = AF_INET6;
402     addr->sin6_addr = in6addr_any;
403     addrlen = sizeof *addr;
404     break;
405   }
406   default:
407     assert(0 && "unsupported address family");
408     abort();
409   }
410
411   return uv__bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
412 }
413
414
415 static int uv__send(uv_udp_send_t* req,
416                     uv_udp_t* handle,
417                     uv_buf_t bufs[],
418                     int bufcnt,
419                     struct sockaddr* addr,
420                     socklen_t addrlen,
421                     uv_udp_send_cb send_cb) {
422   assert(bufcnt > 0);
423
424   if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
425     return -1;
426
427   uv__req_init(handle->loop, req, UV_UDP_SEND);
428
429   assert(addrlen <= sizeof(req->addr));
430   memcpy(&req->addr, addr, addrlen);
431   req->send_cb = send_cb;
432   req->handle = handle;
433   req->bufcnt = bufcnt;
434
435   if (bufcnt <= (int) ARRAY_SIZE(req->bufsml)) {
436     req->bufs = req->bufsml;
437   }
438   else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
439     uv__set_sys_error(handle->loop, ENOMEM);
440     return -1;
441   }
442
443   memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
444   ngx_queue_insert_tail(&handle->write_queue, &req->queue);
445   uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
446   uv__handle_start(handle);
447
448   return 0;
449 }
450
451
452 int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
453   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
454   handle->alloc_cb = NULL;
455   handle->recv_cb = NULL;
456   uv__io_init(&handle->io_watcher, uv__udp_io, -1);
457   ngx_queue_init(&handle->write_queue);
458   ngx_queue_init(&handle->write_completed_queue);
459   return 0;
460 }
461
462
463 int uv__udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
464   return uv__bind(handle,
465                   AF_INET,
466                   (struct sockaddr*)&addr,
467                   sizeof addr,
468                   flags);
469 }
470
471
472 int uv__udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
473   return uv__bind(handle,
474                   AF_INET6,
475                   (struct sockaddr*)&addr,
476                   sizeof addr,
477                   flags);
478 }
479
480
481 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
482   int saved_errno;
483   int status;
484   int yes;
485
486   saved_errno = errno;
487   status = -1;
488
489   /* Check for already active socket. */
490   if (handle->io_watcher.fd != -1) {
491     uv__set_artificial_error(handle->loop, UV_EALREADY);
492     goto out;
493   }
494
495   yes = 1;
496   if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
497     uv__set_sys_error(handle->loop, errno);
498     goto out;
499   }
500
501   /* On the BSDs, SO_REUSEADDR lets you reuse an address that's in the TIME_WAIT
502    * state (i.e. was until recently tied to a socket) while SO_REUSEPORT lets
503    * multiple processes bind to the same address. Yes, it's something of a
504    * misnomer but then again, SO_REUSEADDR was already taken.
505    *
506    * None of the above applies to Linux: SO_REUSEADDR implies SO_REUSEPORT on
507    * Linux and hence it does not have SO_REUSEPORT at all.
508    */
509 #ifdef SO_REUSEPORT
510   yes = 1;
511   if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof yes) == -1) {
512     uv__set_sys_error(handle->loop, errno);
513     goto out;
514   }
515 #endif
516
517   handle->io_watcher.fd = sock;
518   status = 0;
519
520 out:
521   errno = saved_errno;
522   return status;
523 }
524
525
526 int uv_udp_set_membership(uv_udp_t* handle,
527                           const char* multicast_addr,
528                           const char* interface_addr,
529                           uv_membership membership) {
530   struct ip_mreq mreq;
531   int optname;
532
533   memset(&mreq, 0, sizeof mreq);
534
535   if (interface_addr) {
536     mreq.imr_interface.s_addr = inet_addr(interface_addr);
537   } else {
538     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
539   }
540
541   mreq.imr_multiaddr.s_addr = inet_addr(multicast_addr);
542
543   switch (membership) {
544   case UV_JOIN_GROUP:
545     optname = IP_ADD_MEMBERSHIP;
546     break;
547   case UV_LEAVE_GROUP:
548     optname = IP_DROP_MEMBERSHIP;
549     break;
550   default:
551     return uv__set_artificial_error(handle->loop, UV_EINVAL);
552   }
553
554   if (setsockopt(handle->io_watcher.fd,
555                  IPPROTO_IP,
556                  optname,
557                  &mreq,
558                  sizeof(mreq))) {
559     return uv__set_sys_error(handle->loop, errno);
560   }
561
562   return 0;
563 }
564
565
566 static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option, int val) {
567 #if defined(__sun)
568   char arg = val;
569 #else
570   int arg = val;
571 #endif
572
573   if (val < 0 || val > 255)
574     return uv__set_sys_error(handle->loop, EINVAL);
575
576   if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, option, &arg, sizeof(arg)))
577     return uv__set_sys_error(handle->loop, errno);
578
579   return 0;
580 }
581
582
583 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
584   if (setsockopt(handle->io_watcher.fd,
585                  SOL_SOCKET,
586                  SO_BROADCAST,
587                  &on,
588                  sizeof(on))) {
589     return uv__set_sys_error(handle->loop, errno);
590   }
591
592   return 0;
593 }
594
595
596 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
597   if (ttl < 1 || ttl > 255)
598     return uv__set_sys_error(handle->loop, EINVAL);
599
600   if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, IP_TTL, &ttl, sizeof(ttl)))
601     return uv__set_sys_error(handle->loop, errno);
602
603   return 0;
604 }
605
606
607 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
608   return uv__setsockopt_maybe_char(handle, IP_MULTICAST_TTL, ttl);
609 }
610
611
612 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
613   return uv__setsockopt_maybe_char(handle, IP_MULTICAST_LOOP, on);
614 }
615
616
617 int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) {
618   socklen_t socklen;
619   int saved_errno;
620   int rv = 0;
621
622   /* Don't clobber errno. */
623   saved_errno = errno;
624
625   if (handle->io_watcher.fd == -1) {
626     uv__set_sys_error(handle->loop, EINVAL);
627     rv = -1;
628     goto out;
629   }
630
631   /* sizeof(socklen_t) != sizeof(int) on some systems. */
632   socklen = (socklen_t)*namelen;
633
634   if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) {
635     uv__set_sys_error(handle->loop, errno);
636     rv = -1;
637   } else {
638     *namelen = (int)socklen;
639   }
640
641 out:
642   errno = saved_errno;
643   return rv;
644 }
645
646
647 int uv__udp_send(uv_udp_send_t* req,
648                  uv_udp_t* handle,
649                  uv_buf_t bufs[],
650                  int bufcnt,
651                  struct sockaddr_in addr,
652                  uv_udp_send_cb send_cb) {
653   return uv__send(req,
654                   handle,
655                   bufs,
656                   bufcnt,
657                   (struct sockaddr*)&addr,
658                   sizeof addr,
659                   send_cb);
660 }
661
662
663 int uv__udp_send6(uv_udp_send_t* req,
664                   uv_udp_t* handle,
665                   uv_buf_t bufs[],
666                   int bufcnt,
667                   struct sockaddr_in6 addr,
668                   uv_udp_send_cb send_cb) {
669   return uv__send(req,
670                   handle,
671                   bufs,
672                   bufcnt,
673                   (struct sockaddr*)&addr,
674                   sizeof addr,
675                   send_cb);
676 }
677
678
679 int uv__udp_recv_start(uv_udp_t* handle,
680                        uv_alloc_cb alloc_cb,
681                        uv_udp_recv_cb recv_cb) {
682   if (alloc_cb == NULL || recv_cb == NULL) {
683     uv__set_artificial_error(handle->loop, UV_EINVAL);
684     return -1;
685   }
686
687   if (uv__io_active(&handle->io_watcher, UV__POLLIN)) {
688     uv__set_artificial_error(handle->loop, UV_EALREADY);
689     return -1;
690   }
691
692   if (uv__udp_maybe_deferred_bind(handle, AF_INET))
693     return -1;
694
695   handle->alloc_cb = alloc_cb;
696   handle->recv_cb = recv_cb;
697
698   uv__io_start(handle->loop, &handle->io_watcher, UV__POLLIN);
699   uv__handle_start(handle);
700
701   return 0;
702 }
703
704
705 int uv__udp_recv_stop(uv_udp_t* handle) {
706   uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN);
707
708   if (!uv__io_active(&handle->io_watcher, UV__POLLOUT))
709     uv__handle_stop(handle);
710
711   handle->alloc_cb = NULL;
712   handle->recv_cb = NULL;
713
714   return 0;
715 }