uv: Upgrade to v0.10.19
[platform/upstream/nodejs.git] / deps / uv / src / unix / udp.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31
32 static void uv__udp_run_completed(uv_udp_t* handle);
33 static void uv__udp_run_pending(uv_udp_t* handle);
34 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
35 static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
36 static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
37 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
38 static int uv__send(uv_udp_send_t* req,
39                     uv_udp_t* handle,
40                     uv_buf_t bufs[],
41                     int bufcnt,
42                     struct sockaddr* addr,
43                     socklen_t addrlen,
44                     uv_udp_send_cb send_cb);
45
46
47 void uv__udp_close(uv_udp_t* handle) {
48   uv__io_close(handle->loop, &handle->io_watcher);
49   uv__handle_stop(handle);
50   close(handle->io_watcher.fd);
51   handle->io_watcher.fd = -1;
52 }
53
54
55 void uv__udp_finish_close(uv_udp_t* handle) {
56   uv_udp_send_t* req;
57   ngx_queue_t* q;
58
59   assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
60   assert(handle->io_watcher.fd == -1);
61
62   uv__udp_run_completed(handle);
63
64   while (!ngx_queue_empty(&handle->write_queue)) {
65     q = ngx_queue_head(&handle->write_queue);
66     ngx_queue_remove(q);
67
68     req = ngx_queue_data(q, uv_udp_send_t, queue);
69     uv__req_unregister(handle->loop, req);
70
71     if (req->bufs != req->bufsml)
72       free(req->bufs);
73     req->bufs = NULL;
74
75     if (req->send_cb) {
76       uv__set_artificial_error(handle->loop, UV_ECANCELED);
77       req->send_cb(req, -1);
78     }
79   }
80
81   /* Now tear down the handle. */
82   handle->recv_cb = NULL;
83   handle->alloc_cb = NULL;
84   /* but _do not_ touch close_cb */
85 }
86
87
88 static void uv__udp_run_pending(uv_udp_t* handle) {
89   uv_udp_send_t* req;
90   ngx_queue_t* q;
91   struct msghdr h;
92   ssize_t size;
93
94   while (!ngx_queue_empty(&handle->write_queue)) {
95     q = ngx_queue_head(&handle->write_queue);
96     assert(q != NULL);
97
98     req = ngx_queue_data(q, uv_udp_send_t, queue);
99     assert(req != NULL);
100
101     memset(&h, 0, sizeof h);
102     h.msg_name = &req->addr;
103     h.msg_namelen = (req->addr.sin6_family == AF_INET6 ?
104       sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
105     h.msg_iov = (struct iovec*)req->bufs;
106     h.msg_iovlen = req->bufcnt;
107
108     do {
109       size = sendmsg(handle->io_watcher.fd, &h, 0);
110     }
111     while (size == -1 && errno == EINTR);
112
113     /* TODO try to write once or twice more in the
114      * hope that the socket becomes readable again?
115      */
116     if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
117       break;
118
119     req->status = (size == -1 ? -errno : size);
120
121 #ifndef NDEBUG
122     /* Sanity check. */
123     if (size != -1) {
124       ssize_t nbytes;
125       int i;
126
127       for (nbytes = i = 0; i < req->bufcnt; i++)
128         nbytes += req->bufs[i].len;
129
130       assert(size == nbytes);
131     }
132 #endif
133
134     /* Sending a datagram is an atomic operation: either all data
135      * is written or nothing is (and EMSGSIZE is raised). That is
136      * why we don't handle partial writes. Just pop the request
137      * off the write queue and onto the completed queue, done.
138      */
139     ngx_queue_remove(&req->queue);
140     ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
141   }
142 }
143
144
145 static void uv__udp_run_completed(uv_udp_t* handle) {
146   uv_udp_send_t* req;
147   ngx_queue_t* q;
148
149   while (!ngx_queue_empty(&handle->write_completed_queue)) {
150     q = ngx_queue_head(&handle->write_completed_queue);
151     ngx_queue_remove(q);
152
153     req = ngx_queue_data(q, uv_udp_send_t, queue);
154     uv__req_unregister(handle->loop, req);
155
156     if (req->bufs != req->bufsml)
157       free(req->bufs);
158     req->bufs = NULL;
159
160     if (req->send_cb == NULL)
161       continue;
162
163     /* req->status >= 0 == bytes written
164      * req->status <  0 == errno
165      */
166     if (req->status >= 0) {
167       req->send_cb(req, 0);
168     }
169     else {
170       uv__set_sys_error(handle->loop, -req->status);
171       req->send_cb(req, -1);
172     }
173   }
174 }
175
176
177 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
178   if (revents & UV__POLLIN)
179     uv__udp_recvmsg(loop, w, revents);
180
181   if (revents & UV__POLLOUT)
182     uv__udp_sendmsg(loop, w, revents);
183 }
184
185
186 static void uv__udp_recvmsg(uv_loop_t* loop,
187                             uv__io_t* w,
188                             unsigned int revents) {
189   struct sockaddr_storage peer;
190   struct msghdr h;
191   uv_udp_t* handle;
192   ssize_t nread;
193   uv_buf_t buf;
194   int flags;
195   int count;
196
197   handle = container_of(w, uv_udp_t, io_watcher);
198   assert(handle->type == UV_UDP);
199   assert(revents & UV__POLLIN);
200
201   assert(handle->recv_cb != NULL);
202   assert(handle->alloc_cb != NULL);
203
204   /* Prevent loop starvation when the data comes in as fast as (or faster than)
205    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
206    */
207   count = 32;
208
209   memset(&h, 0, sizeof(h));
210   h.msg_name = &peer;
211
212   do {
213     buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
214     assert(buf.len > 0);
215     assert(buf.base != NULL);
216
217     h.msg_namelen = sizeof(peer);
218     h.msg_iov = (void*) &buf;
219     h.msg_iovlen = 1;
220
221     do {
222       nread = recvmsg(handle->io_watcher.fd, &h, 0);
223     }
224     while (nread == -1 && errno == EINTR);
225
226     if (nread == -1) {
227       if (errno == EAGAIN || errno == EWOULDBLOCK) {
228         uv__set_sys_error(handle->loop, EAGAIN);
229         handle->recv_cb(handle, 0, buf, NULL, 0);
230       }
231       else {
232         uv__set_sys_error(handle->loop, errno);
233         handle->recv_cb(handle, -1, buf, NULL, 0);
234       }
235     }
236     else {
237       flags = 0;
238
239       if (h.msg_flags & MSG_TRUNC)
240         flags |= UV_UDP_PARTIAL;
241
242       handle->recv_cb(handle,
243                       nread,
244                       buf,
245                       (struct sockaddr*)&peer,
246                       flags);
247     }
248   }
249   /* recv_cb callback may decide to pause or close the handle */
250   while (nread != -1
251       && count-- > 0
252       && handle->io_watcher.fd != -1
253       && handle->recv_cb != NULL);
254 }
255
256
257 static void uv__udp_sendmsg(uv_loop_t* loop,
258                             uv__io_t* w,
259                             unsigned int revents) {
260   uv_udp_t* handle;
261
262   handle = container_of(w, uv_udp_t, io_watcher);
263   assert(handle->type == UV_UDP);
264   assert(revents & UV__POLLOUT);
265
266   assert(!ngx_queue_empty(&handle->write_queue)
267       || !ngx_queue_empty(&handle->write_completed_queue));
268
269   /* Write out pending data first. */
270   uv__udp_run_pending(handle);
271
272   /* Drain 'request completed' queue. */
273   uv__udp_run_completed(handle);
274
275   if (!ngx_queue_empty(&handle->write_completed_queue)) {
276     /* Schedule completion callbacks. */
277     uv__io_feed(handle->loop, &handle->io_watcher);
278   }
279   else if (ngx_queue_empty(&handle->write_queue)) {
280     /* Pending queue and completion queue empty, stop watcher. */
281     uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
282
283     if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
284       uv__handle_stop(handle);
285   }
286 }
287
288
289 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
290  * refinements for programs that use multicast.
291  *
292  * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
293  * are different from the BSDs: it _shares_ the port rather than steal it
294  * from the current listener.  While useful, it's not something we can emulate
295  * on other platforms so we don't enable it.
296  */
297 static int uv__set_reuse(int fd) {
298   int yes;
299
300 #if defined(SO_REUSEPORT) && !defined(__linux__)
301   yes = 1;
302   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
303     return -errno;
304 #else
305   yes = 1;
306   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
307     return -errno;
308 #endif
309
310   return 0;
311 }
312
313
314 static int uv__bind(uv_udp_t* handle,
315                     int domain,
316                     struct sockaddr* addr,
317                     socklen_t len,
318                     unsigned flags) {
319   int saved_errno;
320   int status;
321   int err;
322   int yes;
323   int fd;
324
325   saved_errno = errno;
326   status = -1;
327   fd = -1;
328
329   /* Check for bad flags. */
330   if (flags & ~UV_UDP_IPV6ONLY) {
331     uv__set_sys_error(handle->loop, EINVAL);
332     goto out;
333   }
334
335   /* Cannot set IPv6-only mode on non-IPv6 socket. */
336   if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
337     uv__set_sys_error(handle->loop, EINVAL);
338     goto out;
339   }
340
341   if (handle->io_watcher.fd == -1) {
342     if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
343       uv__set_sys_error(handle->loop, errno);
344       goto out;
345     }
346     handle->io_watcher.fd = fd;
347   }
348
349   fd = handle->io_watcher.fd;
350   err = uv__set_reuse(fd);
351   if (err) {
352     uv__set_sys_error(handle->loop, -err);
353     goto out;
354   }
355
356   if (flags & UV_UDP_IPV6ONLY) {
357 #ifdef IPV6_V6ONLY
358     yes = 1;
359     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
360       uv__set_sys_error(handle->loop, errno);
361       goto out;
362     }
363 #else
364     uv__set_sys_error(handle->loop, ENOTSUP);
365     goto out;
366 #endif
367   }
368
369   if (bind(fd, addr, len) == -1) {
370     uv__set_sys_error(handle->loop, errno);
371     goto out;
372   }
373
374   handle->io_watcher.fd = fd;
375   status = 0;
376
377 out:
378   if (status) {
379     close(handle->io_watcher.fd);
380     handle->io_watcher.fd = -1;
381   }
382
383   errno = saved_errno;
384   return status;
385 }
386
387
388 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
389   unsigned char taddr[sizeof(struct sockaddr_in6)];
390   socklen_t addrlen;
391
392   assert(domain == AF_INET || domain == AF_INET6);
393
394   if (handle->io_watcher.fd != -1)
395     return 0;
396
397   switch (domain) {
398   case AF_INET:
399   {
400     struct sockaddr_in* addr = (void*)&taddr;
401     memset(addr, 0, sizeof *addr);
402     addr->sin_family = AF_INET;
403     addr->sin_addr.s_addr = INADDR_ANY;
404     addrlen = sizeof *addr;
405     break;
406   }
407   case AF_INET6:
408   {
409     struct sockaddr_in6* addr = (void*)&taddr;
410     memset(addr, 0, sizeof *addr);
411     addr->sin6_family = AF_INET6;
412     addr->sin6_addr = in6addr_any;
413     addrlen = sizeof *addr;
414     break;
415   }
416   default:
417     assert(0 && "unsupported address family");
418     abort();
419   }
420
421   return uv__bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
422 }
423
424
425 static int uv__send(uv_udp_send_t* req,
426                     uv_udp_t* handle,
427                     uv_buf_t bufs[],
428                     int bufcnt,
429                     struct sockaddr* addr,
430                     socklen_t addrlen,
431                     uv_udp_send_cb send_cb) {
432   assert(bufcnt > 0);
433
434   if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
435     return -1;
436
437   uv__req_init(handle->loop, req, UV_UDP_SEND);
438
439   assert(addrlen <= sizeof(req->addr));
440   memcpy(&req->addr, addr, addrlen);
441   req->send_cb = send_cb;
442   req->handle = handle;
443   req->bufcnt = bufcnt;
444
445   if (bufcnt <= (int) ARRAY_SIZE(req->bufsml)) {
446     req->bufs = req->bufsml;
447   }
448   else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
449     uv__set_sys_error(handle->loop, ENOMEM);
450     return -1;
451   }
452
453   memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
454   ngx_queue_insert_tail(&handle->write_queue, &req->queue);
455   uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
456   uv__handle_start(handle);
457
458   return 0;
459 }
460
461
462 int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
463   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
464   handle->alloc_cb = NULL;
465   handle->recv_cb = NULL;
466   uv__io_init(&handle->io_watcher, uv__udp_io, -1);
467   ngx_queue_init(&handle->write_queue);
468   ngx_queue_init(&handle->write_completed_queue);
469   return 0;
470 }
471
472
473 int uv__udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
474   return uv__bind(handle,
475                   AF_INET,
476                   (struct sockaddr*)&addr,
477                   sizeof addr,
478                   flags);
479 }
480
481
482 int uv__udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
483   return uv__bind(handle,
484                   AF_INET6,
485                   (struct sockaddr*)&addr,
486                   sizeof addr,
487                   flags);
488 }
489
490
491 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
492   int saved_errno;
493   int status;
494   int yes;
495
496   saved_errno = errno;
497   status = -1;
498
499   /* Check for already active socket. */
500   if (handle->io_watcher.fd != -1) {
501     uv__set_artificial_error(handle->loop, UV_EALREADY);
502     goto out;
503   }
504
505   yes = 1;
506   if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
507     uv__set_sys_error(handle->loop, errno);
508     goto out;
509   }
510
511   /* On the BSDs, SO_REUSEADDR lets you reuse an address that's in the TIME_WAIT
512    * state (i.e. was until recently tied to a socket) while SO_REUSEPORT lets
513    * multiple processes bind to the same address. Yes, it's something of a
514    * misnomer but then again, SO_REUSEADDR was already taken.
515    *
516    * None of the above applies to Linux: SO_REUSEADDR implies SO_REUSEPORT on
517    * Linux and hence it does not have SO_REUSEPORT at all.
518    */
519 #ifdef SO_REUSEPORT
520   yes = 1;
521   if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof yes) == -1) {
522     uv__set_sys_error(handle->loop, errno);
523     goto out;
524   }
525 #endif
526
527   handle->io_watcher.fd = sock;
528   status = 0;
529
530 out:
531   errno = saved_errno;
532   return status;
533 }
534
535
536 int uv_udp_set_membership(uv_udp_t* handle,
537                           const char* multicast_addr,
538                           const char* interface_addr,
539                           uv_membership membership) {
540   struct ip_mreq mreq;
541   int optname;
542
543   memset(&mreq, 0, sizeof mreq);
544
545   if (interface_addr) {
546     mreq.imr_interface.s_addr = inet_addr(interface_addr);
547   } else {
548     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
549   }
550
551   mreq.imr_multiaddr.s_addr = inet_addr(multicast_addr);
552
553   switch (membership) {
554   case UV_JOIN_GROUP:
555     optname = IP_ADD_MEMBERSHIP;
556     break;
557   case UV_LEAVE_GROUP:
558     optname = IP_DROP_MEMBERSHIP;
559     break;
560   default:
561     return uv__set_artificial_error(handle->loop, UV_EINVAL);
562   }
563
564   if (setsockopt(handle->io_watcher.fd,
565                  IPPROTO_IP,
566                  optname,
567                  &mreq,
568                  sizeof(mreq))) {
569     return uv__set_sys_error(handle->loop, errno);
570   }
571
572   return 0;
573 }
574
575
576 static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option, int val) {
577 #if defined(__sun)
578   char arg = val;
579 #else
580   int arg = val;
581 #endif
582
583   if (val < 0 || val > 255)
584     return uv__set_sys_error(handle->loop, EINVAL);
585
586   if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, option, &arg, sizeof(arg)))
587     return uv__set_sys_error(handle->loop, errno);
588
589   return 0;
590 }
591
592
593 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
594   if (setsockopt(handle->io_watcher.fd,
595                  SOL_SOCKET,
596                  SO_BROADCAST,
597                  &on,
598                  sizeof(on))) {
599     return uv__set_sys_error(handle->loop, errno);
600   }
601
602   return 0;
603 }
604
605
606 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
607   if (ttl < 1 || ttl > 255)
608     return uv__set_sys_error(handle->loop, EINVAL);
609
610   if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, IP_TTL, &ttl, sizeof(ttl)))
611     return uv__set_sys_error(handle->loop, errno);
612
613   return 0;
614 }
615
616
617 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
618   return uv__setsockopt_maybe_char(handle, IP_MULTICAST_TTL, ttl);
619 }
620
621
622 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
623   return uv__setsockopt_maybe_char(handle, IP_MULTICAST_LOOP, on);
624 }
625
626
627 int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) {
628   socklen_t socklen;
629   int saved_errno;
630   int rv = 0;
631
632   /* Don't clobber errno. */
633   saved_errno = errno;
634
635   if (handle->io_watcher.fd == -1) {
636     uv__set_sys_error(handle->loop, EINVAL);
637     rv = -1;
638     goto out;
639   }
640
641   /* sizeof(socklen_t) != sizeof(int) on some systems. */
642   socklen = (socklen_t)*namelen;
643
644   if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) {
645     uv__set_sys_error(handle->loop, errno);
646     rv = -1;
647   } else {
648     *namelen = (int)socklen;
649   }
650
651 out:
652   errno = saved_errno;
653   return rv;
654 }
655
656
657 int uv__udp_send(uv_udp_send_t* req,
658                  uv_udp_t* handle,
659                  uv_buf_t bufs[],
660                  int bufcnt,
661                  struct sockaddr_in addr,
662                  uv_udp_send_cb send_cb) {
663   return uv__send(req,
664                   handle,
665                   bufs,
666                   bufcnt,
667                   (struct sockaddr*)&addr,
668                   sizeof addr,
669                   send_cb);
670 }
671
672
673 int uv__udp_send6(uv_udp_send_t* req,
674                   uv_udp_t* handle,
675                   uv_buf_t bufs[],
676                   int bufcnt,
677                   struct sockaddr_in6 addr,
678                   uv_udp_send_cb send_cb) {
679   return uv__send(req,
680                   handle,
681                   bufs,
682                   bufcnt,
683                   (struct sockaddr*)&addr,
684                   sizeof addr,
685                   send_cb);
686 }
687
688
689 int uv__udp_recv_start(uv_udp_t* handle,
690                        uv_alloc_cb alloc_cb,
691                        uv_udp_recv_cb recv_cb) {
692   if (alloc_cb == NULL || recv_cb == NULL) {
693     uv__set_artificial_error(handle->loop, UV_EINVAL);
694     return -1;
695   }
696
697   if (uv__io_active(&handle->io_watcher, UV__POLLIN)) {
698     uv__set_artificial_error(handle->loop, UV_EALREADY);
699     return -1;
700   }
701
702   if (uv__udp_maybe_deferred_bind(handle, AF_INET))
703     return -1;
704
705   handle->alloc_cb = alloc_cb;
706   handle->recv_cb = recv_cb;
707
708   uv__io_start(handle->loop, &handle->io_watcher, UV__POLLIN);
709   uv__handle_start(handle);
710
711   return 0;
712 }
713
714
715 int uv__udp_recv_stop(uv_udp_t* handle) {
716   uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN);
717
718   if (!uv__io_active(&handle->io_watcher, UV__POLLOUT))
719     uv__handle_stop(handle);
720
721   handle->alloc_cb = NULL;
722   handle->recv_cb = NULL;
723
724   return 0;
725 }