Imported Upstream version 3.25.0
[platform/upstream/cmake.git] / Utilities / cmlibuv / src / unix / udp.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37 #endif
38
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41 #endif
42
43 union uv__sockaddr {
44   struct sockaddr_in6 in6;
45   struct sockaddr_in in;
46   struct sockaddr addr;
47 };
48
49 static void uv__udp_run_completed(uv_udp_t* handle);
50 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
51 static void uv__udp_recvmsg(uv_udp_t* handle);
52 static void uv__udp_sendmsg(uv_udp_t* handle);
53 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
54                                        int domain,
55                                        unsigned int flags);
56
57 #if HAVE_MMSG
58
59 #define UV__MMSG_MAXWIDTH 20
60
61 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
62 static void uv__udp_sendmmsg(uv_udp_t* handle);
63
64 static int uv__recvmmsg_avail;
65 static int uv__sendmmsg_avail;
66 static uv_once_t once = UV_ONCE_INIT;
67
68 static void uv__udp_mmsg_init(void) {
69   int ret;
70   int s;
71   s = uv__socket(AF_INET, SOCK_DGRAM, 0);
72   if (s < 0)
73     return;
74   ret = uv__sendmmsg(s, NULL, 0);
75   if (ret == 0 || errno != ENOSYS) {
76     uv__sendmmsg_avail = 1;
77     uv__recvmmsg_avail = 1;
78   } else {
79     ret = uv__recvmmsg(s, NULL, 0);
80     if (ret == 0 || errno != ENOSYS)
81       uv__recvmmsg_avail = 1;
82   }
83   uv__close(s);
84 }
85
86 #endif
87
88 void uv__udp_close(uv_udp_t* handle) {
89   uv__io_close(handle->loop, &handle->io_watcher);
90   uv__handle_stop(handle);
91
92   if (handle->io_watcher.fd != -1) {
93     uv__close(handle->io_watcher.fd);
94     handle->io_watcher.fd = -1;
95   }
96 }
97
98
99 void uv__udp_finish_close(uv_udp_t* handle) {
100   uv_udp_send_t* req;
101   QUEUE* q;
102
103   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
104   assert(handle->io_watcher.fd == -1);
105
106   while (!QUEUE_EMPTY(&handle->write_queue)) {
107     q = QUEUE_HEAD(&handle->write_queue);
108     QUEUE_REMOVE(q);
109
110     req = QUEUE_DATA(q, uv_udp_send_t, queue);
111     req->status = UV_ECANCELED;
112     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
113   }
114
115   uv__udp_run_completed(handle);
116
117   assert(handle->send_queue_size == 0);
118   assert(handle->send_queue_count == 0);
119
120   /* Now tear down the handle. */
121   handle->recv_cb = NULL;
122   handle->alloc_cb = NULL;
123   /* but _do not_ touch close_cb */
124 }
125
126
127 static void uv__udp_run_completed(uv_udp_t* handle) {
128   uv_udp_send_t* req;
129   QUEUE* q;
130
131   assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
132   handle->flags |= UV_HANDLE_UDP_PROCESSING;
133
134   while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
135     q = QUEUE_HEAD(&handle->write_completed_queue);
136     QUEUE_REMOVE(q);
137
138     req = QUEUE_DATA(q, uv_udp_send_t, queue);
139     uv__req_unregister(handle->loop, req);
140
141     handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
142     handle->send_queue_count--;
143
144     if (req->bufs != req->bufsml)
145       uv__free(req->bufs);
146     req->bufs = NULL;
147
148     if (req->send_cb == NULL)
149       continue;
150
151     /* req->status >= 0 == bytes written
152      * req->status <  0 == errno
153      */
154     if (req->status >= 0)
155       req->send_cb(req, 0);
156     else
157       req->send_cb(req, req->status);
158   }
159
160   if (QUEUE_EMPTY(&handle->write_queue)) {
161     /* Pending queue and completion queue empty, stop watcher. */
162     uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
163     if (!uv__io_active(&handle->io_watcher, POLLIN))
164       uv__handle_stop(handle);
165   }
166
167   handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
168 }
169
170
171 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
172   uv_udp_t* handle;
173
174   handle = container_of(w, uv_udp_t, io_watcher);
175   assert(handle->type == UV_UDP);
176
177   if (revents & POLLIN)
178     uv__udp_recvmsg(handle);
179
180   if (revents & POLLOUT) {
181     uv__udp_sendmsg(handle);
182     uv__udp_run_completed(handle);
183   }
184 }
185
186 #if HAVE_MMSG
187 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
188   struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
189   struct iovec iov[UV__MMSG_MAXWIDTH];
190   struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
191   ssize_t nread;
192   uv_buf_t chunk_buf;
193   size_t chunks;
194   int flags;
195   size_t k;
196
197   /* prepare structures for recvmmsg */
198   chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
199   if (chunks > ARRAY_SIZE(iov))
200     chunks = ARRAY_SIZE(iov);
201   for (k = 0; k < chunks; ++k) {
202     iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
203     iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
204     memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
205     msgs[k].msg_hdr.msg_iov = iov + k;
206     msgs[k].msg_hdr.msg_iovlen = 1;
207     msgs[k].msg_hdr.msg_name = peers + k;
208     msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
209     msgs[k].msg_hdr.msg_control = NULL;
210     msgs[k].msg_hdr.msg_controllen = 0;
211     msgs[k].msg_hdr.msg_flags = 0;
212   }
213
214   do
215     nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks);
216   while (nread == -1 && errno == EINTR);
217
218   if (nread < 1) {
219     if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
220       handle->recv_cb(handle, 0, buf, NULL, 0);
221     else
222       handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
223   } else {
224     /* pass each chunk to the application */
225     for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
226       flags = UV_UDP_MMSG_CHUNK;
227       if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
228         flags |= UV_UDP_PARTIAL;
229
230       chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
231       handle->recv_cb(handle,
232                       msgs[k].msg_len,
233                       &chunk_buf,
234                       msgs[k].msg_hdr.msg_name,
235                       flags);
236     }
237
238     /* one last callback so the original buffer is freed */
239     if (handle->recv_cb != NULL)
240       handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
241   }
242   return nread;
243 }
244 #endif
245
246 static void uv__udp_recvmsg(uv_udp_t* handle) {
247   struct sockaddr_storage peer;
248   struct msghdr h;
249   ssize_t nread;
250   uv_buf_t buf;
251   int flags;
252   int count;
253
254   assert(handle->recv_cb != NULL);
255   assert(handle->alloc_cb != NULL);
256
257   /* Prevent loop starvation when the data comes in as fast as (or faster than)
258    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
259    */
260   count = 32;
261
262   do {
263     buf = uv_buf_init(NULL, 0);
264     handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
265     if (buf.base == NULL || buf.len == 0) {
266       handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
267       return;
268     }
269     assert(buf.base != NULL);
270
271 #if HAVE_MMSG
272     if (uv_udp_using_recvmmsg(handle)) {
273       nread = uv__udp_recvmmsg(handle, &buf);
274       if (nread > 0)
275         count -= nread;
276       continue;
277     }
278 #endif
279
280     memset(&h, 0, sizeof(h));
281     memset(&peer, 0, sizeof(peer));
282     h.msg_name = &peer;
283     h.msg_namelen = sizeof(peer);
284     h.msg_iov = (void*) &buf;
285     h.msg_iovlen = 1;
286
287     do {
288       nread = recvmsg(handle->io_watcher.fd, &h, 0);
289     }
290     while (nread == -1 && errno == EINTR);
291
292     if (nread == -1) {
293       if (errno == EAGAIN || errno == EWOULDBLOCK)
294         handle->recv_cb(handle, 0, &buf, NULL, 0);
295       else
296         handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
297     }
298     else {
299       flags = 0;
300       if (h.msg_flags & MSG_TRUNC)
301         flags |= UV_UDP_PARTIAL;
302
303       handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
304     }
305     count--;
306   }
307   /* recv_cb callback may decide to pause or close the handle */
308   while (nread != -1
309       && count > 0
310       && handle->io_watcher.fd != -1
311       && handle->recv_cb != NULL);
312 }
313
314 #if HAVE_MMSG
315 static void uv__udp_sendmmsg(uv_udp_t* handle) {
316   uv_udp_send_t* req;
317   struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
318   struct uv__mmsghdr *p;
319   QUEUE* q;
320   ssize_t npkts;
321   size_t pkts;
322   size_t i;
323
324   if (QUEUE_EMPTY(&handle->write_queue))
325     return;
326
327 write_queue_drain:
328   for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
329        pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
330        ++pkts, q = QUEUE_HEAD(q)) {
331     assert(q != NULL);
332     req = QUEUE_DATA(q, uv_udp_send_t, queue);
333     assert(req != NULL);
334
335     p = &h[pkts];
336     memset(p, 0, sizeof(*p));
337     if (req->addr.ss_family == AF_UNSPEC) {
338       p->msg_hdr.msg_name = NULL;
339       p->msg_hdr.msg_namelen = 0;
340     } else {
341       p->msg_hdr.msg_name = &req->addr;
342       if (req->addr.ss_family == AF_INET6)
343         p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
344       else if (req->addr.ss_family == AF_INET)
345         p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
346       else if (req->addr.ss_family == AF_UNIX)
347         p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
348       else {
349         assert(0 && "unsupported address family");
350         abort();
351       }
352     }
353     h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
354     h[pkts].msg_hdr.msg_iovlen = req->nbufs;
355   }
356
357   do
358     npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts);
359   while (npkts == -1 && errno == EINTR);
360
361   if (npkts < 1) {
362     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
363       return;
364     for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
365          i < pkts && q != &handle->write_queue;
366          ++i, q = QUEUE_HEAD(&handle->write_queue)) {
367       assert(q != NULL);
368       req = QUEUE_DATA(q, uv_udp_send_t, queue);
369       assert(req != NULL);
370
371       req->status = UV__ERR(errno);
372       QUEUE_REMOVE(&req->queue);
373       QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
374     }
375     uv__io_feed(handle->loop, &handle->io_watcher);
376     return;
377   }
378
379   /* Safety: npkts known to be >0 below. Hence cast from ssize_t
380    * to size_t safe.
381    */
382   for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
383        i < (size_t)npkts && q != &handle->write_queue;
384        ++i, q = QUEUE_HEAD(&handle->write_queue)) {
385     assert(q != NULL);
386     req = QUEUE_DATA(q, uv_udp_send_t, queue);
387     assert(req != NULL);
388
389     req->status = req->bufs[0].len;
390
391     /* Sending a datagram is an atomic operation: either all data
392      * is written or nothing is (and EMSGSIZE is raised). That is
393      * why we don't handle partial writes. Just pop the request
394      * off the write queue and onto the completed queue, done.
395      */
396     QUEUE_REMOVE(&req->queue);
397     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
398   }
399
400   /* couldn't batch everything, continue sending (jump to avoid stack growth) */
401   if (!QUEUE_EMPTY(&handle->write_queue))
402     goto write_queue_drain;
403   uv__io_feed(handle->loop, &handle->io_watcher);
404   return;
405 }
406 #endif
407
408 static void uv__udp_sendmsg(uv_udp_t* handle) {
409   uv_udp_send_t* req;
410   struct msghdr h;
411   QUEUE* q;
412   ssize_t size;
413
414 #if HAVE_MMSG
415   uv_once(&once, uv__udp_mmsg_init);
416   if (uv__sendmmsg_avail) {
417     uv__udp_sendmmsg(handle);
418     return;
419   }
420 #endif
421
422   while (!QUEUE_EMPTY(&handle->write_queue)) {
423     q = QUEUE_HEAD(&handle->write_queue);
424     assert(q != NULL);
425
426     req = QUEUE_DATA(q, uv_udp_send_t, queue);
427     assert(req != NULL);
428
429     memset(&h, 0, sizeof h);
430     if (req->addr.ss_family == AF_UNSPEC) {
431       h.msg_name = NULL;
432       h.msg_namelen = 0;
433     } else {
434       h.msg_name = &req->addr;
435       if (req->addr.ss_family == AF_INET6)
436         h.msg_namelen = sizeof(struct sockaddr_in6);
437       else if (req->addr.ss_family == AF_INET)
438         h.msg_namelen = sizeof(struct sockaddr_in);
439       else if (req->addr.ss_family == AF_UNIX)
440         h.msg_namelen = sizeof(struct sockaddr_un);
441       else {
442         assert(0 && "unsupported address family");
443         abort();
444       }
445     }
446     h.msg_iov = (struct iovec*) req->bufs;
447     h.msg_iovlen = req->nbufs;
448
449     do {
450       size = sendmsg(handle->io_watcher.fd, &h, 0);
451     } while (size == -1 && errno == EINTR);
452
453     if (size == -1) {
454       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
455         break;
456     }
457
458     req->status = (size == -1 ? UV__ERR(errno) : size);
459
460     /* Sending a datagram is an atomic operation: either all data
461      * is written or nothing is (and EMSGSIZE is raised). That is
462      * why we don't handle partial writes. Just pop the request
463      * off the write queue and onto the completed queue, done.
464      */
465     QUEUE_REMOVE(&req->queue);
466     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
467     uv__io_feed(handle->loop, &handle->io_watcher);
468   }
469 }
470
471 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
472  * refinements for programs that use multicast.
473  *
474  * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
475  * are different from the BSDs: it _shares_ the port rather than steal it
476  * from the current listener.  While useful, it's not something we can emulate
477  * on other platforms so we don't enable it.
478  *
479  * zOS does not support getsockname with SO_REUSEPORT option when using
480  * AF_UNIX.
481  */
482 static int uv__set_reuse(int fd) {
483   int yes;
484   yes = 1;
485
486 #if defined(SO_REUSEPORT) && defined(__MVS__)
487   struct sockaddr_in sockfd;
488   unsigned int sockfd_len = sizeof(sockfd);
489   if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
490       return UV__ERR(errno);
491   if (sockfd.sin_family == AF_UNIX) {
492     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
493       return UV__ERR(errno);
494   } else {
495     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
496        return UV__ERR(errno);
497   }
498 #elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__)
499   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
500     return UV__ERR(errno);
501 #else
502   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
503     return UV__ERR(errno);
504 #endif
505
506   return 0;
507 }
508
509 /*
510  * The Linux kernel suppresses some ICMP error messages by default for UDP
511  * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
512  * error reporting, hopefully resulting in faster failover to working name
513  * servers.
514  */
515 static int uv__set_recverr(int fd, sa_family_t ss_family) {
516 #if defined(__linux__)
517   int yes;
518
519   yes = 1;
520   if (ss_family == AF_INET) {
521     if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
522       return UV__ERR(errno);
523   } else if (ss_family == AF_INET6) {
524     if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
525        return UV__ERR(errno);
526   }
527 #endif
528   return 0;
529 }
530
531
532 int uv__udp_bind(uv_udp_t* handle,
533                  const struct sockaddr* addr,
534                  unsigned int addrlen,
535                  unsigned int flags) {
536   int err;
537   int yes;
538   int fd;
539
540   /* Check for bad flags. */
541   if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR))
542     return UV_EINVAL;
543
544   /* Cannot set IPv6-only mode on non-IPv6 socket. */
545   if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
546     return UV_EINVAL;
547
548   fd = handle->io_watcher.fd;
549   if (fd == -1) {
550     err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
551     if (err < 0)
552       return err;
553     fd = err;
554     handle->io_watcher.fd = fd;
555   }
556
557   if (flags & UV_UDP_LINUX_RECVERR) {
558     err = uv__set_recverr(fd, addr->sa_family);
559     if (err)
560       return err;
561   }
562
563   if (flags & UV_UDP_REUSEADDR) {
564     err = uv__set_reuse(fd);
565     if (err)
566       return err;
567   }
568
569   if (flags & UV_UDP_IPV6ONLY) {
570 #ifdef IPV6_V6ONLY
571     yes = 1;
572     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
573       err = UV__ERR(errno);
574       return err;
575     }
576 #else
577     err = UV_ENOTSUP;
578     return err;
579 #endif
580   }
581
582   if (bind(fd, addr, addrlen)) {
583     err = UV__ERR(errno);
584     if (errno == EAFNOSUPPORT)
585       /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
586        * socket created with AF_INET to an AF_INET6 address or vice versa. */
587       err = UV_EINVAL;
588     return err;
589   }
590
591   if (addr->sa_family == AF_INET6)
592     handle->flags |= UV_HANDLE_IPV6;
593
594   handle->flags |= UV_HANDLE_BOUND;
595   return 0;
596 }
597
598
599 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
600                                        int domain,
601                                        unsigned int flags) {
602   union uv__sockaddr taddr;
603   socklen_t addrlen;
604
605   if (handle->io_watcher.fd != -1)
606     return 0;
607
608   switch (domain) {
609   case AF_INET:
610   {
611     struct sockaddr_in* addr = &taddr.in;
612     memset(addr, 0, sizeof *addr);
613     addr->sin_family = AF_INET;
614     addr->sin_addr.s_addr = INADDR_ANY;
615     addrlen = sizeof *addr;
616     break;
617   }
618   case AF_INET6:
619   {
620     struct sockaddr_in6* addr = &taddr.in6;
621     memset(addr, 0, sizeof *addr);
622     addr->sin6_family = AF_INET6;
623     addr->sin6_addr = in6addr_any;
624     addrlen = sizeof *addr;
625     break;
626   }
627   default:
628     assert(0 && "unsupported address family");
629     abort();
630   }
631
632   return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
633 }
634
635
636 int uv__udp_connect(uv_udp_t* handle,
637                     const struct sockaddr* addr,
638                     unsigned int addrlen) {
639   int err;
640
641   err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
642   if (err)
643     return err;
644
645   do {
646     errno = 0;
647     err = connect(handle->io_watcher.fd, addr, addrlen);
648   } while (err == -1 && errno == EINTR);
649
650   if (err)
651     return UV__ERR(errno);
652
653   handle->flags |= UV_HANDLE_UDP_CONNECTED;
654
655   return 0;
656 }
657
658 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
659  * Any of uv supported UNIXs kernel should be standardized, but the kernel
660  * implementation logic not same, let's use pseudocode to explain the udp
661  * disconnect behaviors:
662  *
663  * Predefined stubs for pseudocode:
664  *   1. sodisconnect: The function to perform the real udp disconnect
665  *   2. pru_connect: The function to perform the real udp connect
666  *   3. so: The kernel object match with socket fd
667  *   4. addr: The sockaddr parameter from user space
668  *
669  * BSDs:
670  *   if(sodisconnect(so) == 0) { // udp disconnect succeed
671  *     if (addr->sa_len != so->addr->sa_len) return EINVAL;
672  *     if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
673  *     pru_connect(so);
674  *   }
675  *   else return EISCONN;
676  *
677  * z/OS (same with Windows):
678  *   if(addr->sa_len < so->addr->sa_len) return EINVAL;
679  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
680  *
681  * AIX:
682  *   if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
683  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
684  *
685  * Linux,Others:
686  *   if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
687  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
688  */
689 int uv__udp_disconnect(uv_udp_t* handle) {
690     int r;
691 #if defined(__MVS__)
692     struct sockaddr_storage addr;
693 #else
694     struct sockaddr addr;
695 #endif
696
697     memset(&addr, 0, sizeof(addr));
698
699 #if defined(__MVS__)
700     addr.ss_family = AF_UNSPEC;
701 #else
702     addr.sa_family = AF_UNSPEC;
703 #endif
704
705     do {
706       errno = 0;
707 #ifdef __PASE__
708       /* On IBMi a connectionless transport socket can be disconnected by
709        * either setting the addr parameter to NULL or setting the
710        * addr_length parameter to zero, and issuing another connect().
711        * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
712        */
713       r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
714 #else
715       r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
716 #endif
717     } while (r == -1 && errno == EINTR);
718
719     if (r == -1) {
720 #if defined(BSD)  /* The macro BSD is from sys/param.h */
721       if (errno != EAFNOSUPPORT && errno != EINVAL)
722         return UV__ERR(errno);
723 #else
724       return UV__ERR(errno);
725 #endif
726     }
727
728     handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
729     return 0;
730 }
731
732 int uv__udp_send(uv_udp_send_t* req,
733                  uv_udp_t* handle,
734                  const uv_buf_t bufs[],
735                  unsigned int nbufs,
736                  const struct sockaddr* addr,
737                  unsigned int addrlen,
738                  uv_udp_send_cb send_cb) {
739   int err;
740   int empty_queue;
741
742   assert(nbufs > 0);
743
744   if (addr) {
745     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
746     if (err)
747       return err;
748   }
749
750   /* It's legal for send_queue_count > 0 even when the write_queue is empty;
751    * it means there are error-state requests in the write_completed_queue that
752    * will touch up send_queue_size/count later.
753    */
754   empty_queue = (handle->send_queue_count == 0);
755
756   uv__req_init(handle->loop, req, UV_UDP_SEND);
757   assert(addrlen <= sizeof(req->addr));
758   if (addr == NULL)
759     req->addr.ss_family = AF_UNSPEC;
760   else
761     memcpy(&req->addr, addr, addrlen);
762   req->send_cb = send_cb;
763   req->handle = handle;
764   req->nbufs = nbufs;
765
766   req->bufs = req->bufsml;
767   if (nbufs > ARRAY_SIZE(req->bufsml))
768     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
769
770   if (req->bufs == NULL) {
771     uv__req_unregister(handle->loop, req);
772     return UV_ENOMEM;
773   }
774
775   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
776   handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
777   handle->send_queue_count++;
778   QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
779   uv__handle_start(handle);
780
781   if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
782     uv__udp_sendmsg(handle);
783
784     /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
785      * away. In such cases the `io_watcher` has to be queued for asynchronous
786      * write.
787      */
788     if (!QUEUE_EMPTY(&handle->write_queue))
789       uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
790   } else {
791     uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
792   }
793
794   return 0;
795 }
796
797
798 int uv__udp_try_send(uv_udp_t* handle,
799                      const uv_buf_t bufs[],
800                      unsigned int nbufs,
801                      const struct sockaddr* addr,
802                      unsigned int addrlen) {
803   int err;
804   struct msghdr h;
805   ssize_t size;
806
807   assert(nbufs > 0);
808
809   /* already sending a message */
810   if (handle->send_queue_count != 0)
811     return UV_EAGAIN;
812
813   if (addr) {
814     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
815     if (err)
816       return err;
817   } else {
818     assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
819   }
820
821   memset(&h, 0, sizeof h);
822   h.msg_name = (struct sockaddr*) addr;
823   h.msg_namelen = addrlen;
824   h.msg_iov = (struct iovec*) bufs;
825   h.msg_iovlen = nbufs;
826
827   do {
828     size = sendmsg(handle->io_watcher.fd, &h, 0);
829   } while (size == -1 && errno == EINTR);
830
831   if (size == -1) {
832     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
833       return UV_EAGAIN;
834     else
835       return UV__ERR(errno);
836   }
837
838   return size;
839 }
840
841
842 static int uv__udp_set_membership4(uv_udp_t* handle,
843                                    const struct sockaddr_in* multicast_addr,
844                                    const char* interface_addr,
845                                    uv_membership membership) {
846   struct ip_mreq mreq;
847   int optname;
848   int err;
849
850   memset(&mreq, 0, sizeof mreq);
851
852   if (interface_addr) {
853     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
854     if (err)
855       return err;
856   } else {
857     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
858   }
859
860   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
861
862   switch (membership) {
863   case UV_JOIN_GROUP:
864     optname = IP_ADD_MEMBERSHIP;
865     break;
866   case UV_LEAVE_GROUP:
867     optname = IP_DROP_MEMBERSHIP;
868     break;
869   default:
870     return UV_EINVAL;
871   }
872
873   if (setsockopt(handle->io_watcher.fd,
874                  IPPROTO_IP,
875                  optname,
876                  &mreq,
877                  sizeof(mreq))) {
878 #if defined(__MVS__)
879   if (errno == ENXIO)
880     return UV_ENODEV;
881 #endif
882     return UV__ERR(errno);
883   }
884
885   return 0;
886 }
887
888
889 static int uv__udp_set_membership6(uv_udp_t* handle,
890                                    const struct sockaddr_in6* multicast_addr,
891                                    const char* interface_addr,
892                                    uv_membership membership) {
893   int optname;
894   struct ipv6_mreq mreq;
895   struct sockaddr_in6 addr6;
896
897   memset(&mreq, 0, sizeof mreq);
898
899   if (interface_addr) {
900     if (uv_ip6_addr(interface_addr, 0, &addr6))
901       return UV_EINVAL;
902     mreq.ipv6mr_interface = addr6.sin6_scope_id;
903   } else {
904     mreq.ipv6mr_interface = 0;
905   }
906
907   mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
908
909   switch (membership) {
910   case UV_JOIN_GROUP:
911     optname = IPV6_ADD_MEMBERSHIP;
912     break;
913   case UV_LEAVE_GROUP:
914     optname = IPV6_DROP_MEMBERSHIP;
915     break;
916   default:
917     return UV_EINVAL;
918   }
919
920   if (setsockopt(handle->io_watcher.fd,
921                  IPPROTO_IPV6,
922                  optname,
923                  &mreq,
924                  sizeof(mreq))) {
925 #if defined(__MVS__)
926   if (errno == ENXIO)
927     return UV_ENODEV;
928 #endif
929     return UV__ERR(errno);
930   }
931
932   return 0;
933 }
934
935
936 #if !defined(__OpenBSD__) &&                                        \
937     !defined(__NetBSD__) &&                                         \
938     !defined(__ANDROID__) &&                                        \
939     !defined(__DragonFly__) &&                                      \
940     !defined(__QNX__) &&                                            \
941     !defined(__GNU__)
942 static int uv__udp_set_source_membership4(uv_udp_t* handle,
943                                           const struct sockaddr_in* multicast_addr,
944                                           const char* interface_addr,
945                                           const struct sockaddr_in* source_addr,
946                                           uv_membership membership) {
947   struct ip_mreq_source mreq;
948   int optname;
949   int err;
950
951   err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
952   if (err)
953     return err;
954
955   memset(&mreq, 0, sizeof(mreq));
956
957   if (interface_addr != NULL) {
958     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
959     if (err)
960       return err;
961   } else {
962     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
963   }
964
965   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
966   mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
967
968   if (membership == UV_JOIN_GROUP)
969     optname = IP_ADD_SOURCE_MEMBERSHIP;
970   else if (membership == UV_LEAVE_GROUP)
971     optname = IP_DROP_SOURCE_MEMBERSHIP;
972   else
973     return UV_EINVAL;
974
975   if (setsockopt(handle->io_watcher.fd,
976                  IPPROTO_IP,
977                  optname,
978                  &mreq,
979                  sizeof(mreq))) {
980     return UV__ERR(errno);
981   }
982
983   return 0;
984 }
985
986
987 static int uv__udp_set_source_membership6(uv_udp_t* handle,
988                                           const struct sockaddr_in6* multicast_addr,
989                                           const char* interface_addr,
990                                           const struct sockaddr_in6* source_addr,
991                                           uv_membership membership) {
992   struct group_source_req mreq;
993   struct sockaddr_in6 addr6;
994   int optname;
995   int err;
996
997   err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
998   if (err)
999     return err;
1000
1001   memset(&mreq, 0, sizeof(mreq));
1002
1003   if (interface_addr != NULL) {
1004     err = uv_ip6_addr(interface_addr, 0, &addr6);
1005     if (err)
1006       return err;
1007     mreq.gsr_interface = addr6.sin6_scope_id;
1008   } else {
1009     mreq.gsr_interface = 0;
1010   }
1011
1012   STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
1013   STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
1014   memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
1015   memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
1016
1017   if (membership == UV_JOIN_GROUP)
1018     optname = MCAST_JOIN_SOURCE_GROUP;
1019   else if (membership == UV_LEAVE_GROUP)
1020     optname = MCAST_LEAVE_SOURCE_GROUP;
1021   else
1022     return UV_EINVAL;
1023
1024   if (setsockopt(handle->io_watcher.fd,
1025                  IPPROTO_IPV6,
1026                  optname,
1027                  &mreq,
1028                  sizeof(mreq))) {
1029     return UV__ERR(errno);
1030   }
1031
1032   return 0;
1033 }
1034 #endif
1035
1036
1037 int uv__udp_init_ex(uv_loop_t* loop,
1038                     uv_udp_t* handle,
1039                     unsigned flags,
1040                     int domain) {
1041   int fd;
1042
1043   fd = -1;
1044   if (domain != AF_UNSPEC) {
1045     fd = uv__socket(domain, SOCK_DGRAM, 0);
1046     if (fd < 0)
1047       return fd;
1048   }
1049
1050   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
1051   handle->alloc_cb = NULL;
1052   handle->recv_cb = NULL;
1053   handle->send_queue_size = 0;
1054   handle->send_queue_count = 0;
1055   uv__io_init(&handle->io_watcher, uv__udp_io, fd);
1056   QUEUE_INIT(&handle->write_queue);
1057   QUEUE_INIT(&handle->write_completed_queue);
1058
1059   return 0;
1060 }
1061
1062
1063 int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
1064 #if HAVE_MMSG
1065   if (handle->flags & UV_HANDLE_UDP_RECVMMSG) {
1066     uv_once(&once, uv__udp_mmsg_init);
1067     return uv__recvmmsg_avail;
1068   }
1069 #endif
1070   return 0;
1071 }
1072
1073
1074 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
1075   int err;
1076
1077   /* Check for already active socket. */
1078   if (handle->io_watcher.fd != -1)
1079     return UV_EBUSY;
1080
1081   if (uv__fd_exists(handle->loop, sock))
1082     return UV_EEXIST;
1083
1084   err = uv__nonblock(sock, 1);
1085   if (err)
1086     return err;
1087
1088   err = uv__set_reuse(sock);
1089   if (err)
1090     return err;
1091
1092   handle->io_watcher.fd = sock;
1093   if (uv__udp_is_connected(handle))
1094     handle->flags |= UV_HANDLE_UDP_CONNECTED;
1095
1096   return 0;
1097 }
1098
1099
1100 int uv_udp_set_membership(uv_udp_t* handle,
1101                           const char* multicast_addr,
1102                           const char* interface_addr,
1103                           uv_membership membership) {
1104   int err;
1105   struct sockaddr_in addr4;
1106   struct sockaddr_in6 addr6;
1107
1108   if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
1109     err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
1110     if (err)
1111       return err;
1112     return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
1113   } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
1114     err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1115     if (err)
1116       return err;
1117     return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
1118   } else {
1119     return UV_EINVAL;
1120   }
1121 }
1122
1123
1124 int uv_udp_set_source_membership(uv_udp_t* handle,
1125                                  const char* multicast_addr,
1126                                  const char* interface_addr,
1127                                  const char* source_addr,
1128                                  uv_membership membership) {
1129 #if !defined(__OpenBSD__) &&                                        \
1130     !defined(__NetBSD__) &&                                         \
1131     !defined(__ANDROID__) &&                                        \
1132     !defined(__DragonFly__) &&                                      \
1133     !defined(__QNX__) &&                                            \
1134     !defined(__GNU__)
1135   int err;
1136   union uv__sockaddr mcast_addr;
1137   union uv__sockaddr src_addr;
1138
1139   err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
1140   if (err) {
1141     err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
1142     if (err)
1143       return err;
1144     err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
1145     if (err)
1146       return err;
1147     return uv__udp_set_source_membership6(handle,
1148                                           &mcast_addr.in6,
1149                                           interface_addr,
1150                                           &src_addr.in6,
1151                                           membership);
1152   }
1153
1154   err = uv_ip4_addr(source_addr, 0, &src_addr.in);
1155   if (err)
1156     return err;
1157   return uv__udp_set_source_membership4(handle,
1158                                         &mcast_addr.in,
1159                                         interface_addr,
1160                                         &src_addr.in,
1161                                         membership);
1162 #else
1163   return UV_ENOSYS;
1164 #endif
1165 }
1166
1167
1168 static int uv__setsockopt(uv_udp_t* handle,
1169                          int option4,
1170                          int option6,
1171                          const void* val,
1172                          socklen_t size) {
1173   int r;
1174
1175   if (handle->flags & UV_HANDLE_IPV6)
1176     r = setsockopt(handle->io_watcher.fd,
1177                    IPPROTO_IPV6,
1178                    option6,
1179                    val,
1180                    size);
1181   else
1182     r = setsockopt(handle->io_watcher.fd,
1183                    IPPROTO_IP,
1184                    option4,
1185                    val,
1186                    size);
1187   if (r)
1188     return UV__ERR(errno);
1189
1190   return 0;
1191 }
1192
1193 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1194                                      int option4,
1195                                      int option6,
1196                                      int val) {
1197 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
1198   char arg = val;
1199 #elif defined(__OpenBSD__)
1200   unsigned char arg = val;
1201 #else
1202   int arg = val;
1203 #endif
1204
1205   if (val < 0 || val > 255)
1206     return UV_EINVAL;
1207
1208   return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1209 }
1210
1211
1212 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1213   if (setsockopt(handle->io_watcher.fd,
1214                  SOL_SOCKET,
1215                  SO_BROADCAST,
1216                  &on,
1217                  sizeof(on))) {
1218     return UV__ERR(errno);
1219   }
1220
1221   return 0;
1222 }
1223
1224
1225 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1226   if (ttl < 1 || ttl > 255)
1227     return UV_EINVAL;
1228
1229 #if defined(__MVS__)
1230   if (!(handle->flags & UV_HANDLE_IPV6))
1231     return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
1232 #endif
1233
1234 /*
1235  * On Solaris and derivatives such as SmartOS, the length of socket options
1236  * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1237  * so hardcode the size of these options on this platform,
1238  * and use the general uv__setsockopt_maybe_char call on other platforms.
1239  */
1240 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1241     defined(__MVS__) || defined(__QNX__)
1242
1243   return uv__setsockopt(handle,
1244                         IP_TTL,
1245                         IPV6_UNICAST_HOPS,
1246                         &ttl,
1247                         sizeof(ttl));
1248
1249 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1250            defined(__MVS__) || defined(__QNX__)) */
1251
1252   return uv__setsockopt_maybe_char(handle,
1253                                    IP_TTL,
1254                                    IPV6_UNICAST_HOPS,
1255                                    ttl);
1256
1257 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1258           defined(__MVS__) || defined(__QNX__) */
1259 }
1260
1261
1262 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1263 /*
1264  * On Solaris and derivatives such as SmartOS, the length of socket options
1265  * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1266  * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1267  * and use the general uv__setsockopt_maybe_char call otherwise.
1268  */
1269 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1270     defined(__MVS__) || defined(__QNX__)
1271   if (handle->flags & UV_HANDLE_IPV6)
1272     return uv__setsockopt(handle,
1273                           IP_MULTICAST_TTL,
1274                           IPV6_MULTICAST_HOPS,
1275                           &ttl,
1276                           sizeof(ttl));
1277 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1278     defined(__MVS__) || defined(__QNX__) */
1279
1280   return uv__setsockopt_maybe_char(handle,
1281                                    IP_MULTICAST_TTL,
1282                                    IPV6_MULTICAST_HOPS,
1283                                    ttl);
1284 }
1285
1286
1287 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1288 /*
1289  * On Solaris and derivatives such as SmartOS, the length of socket options
1290  * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1291  * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1292  * and use the general uv__setsockopt_maybe_char call otherwise.
1293  */
1294 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1295     defined(__MVS__) || defined(__QNX__)
1296   if (handle->flags & UV_HANDLE_IPV6)
1297     return uv__setsockopt(handle,
1298                           IP_MULTICAST_LOOP,
1299                           IPV6_MULTICAST_LOOP,
1300                           &on,
1301                           sizeof(on));
1302 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1303     defined(__MVS__) || defined(__QNX__) */
1304
1305   return uv__setsockopt_maybe_char(handle,
1306                                    IP_MULTICAST_LOOP,
1307                                    IPV6_MULTICAST_LOOP,
1308                                    on);
1309 }
1310
1311 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1312   struct sockaddr_storage addr_st;
1313   struct sockaddr_in* addr4;
1314   struct sockaddr_in6* addr6;
1315
1316   addr4 = (struct sockaddr_in*) &addr_st;
1317   addr6 = (struct sockaddr_in6*) &addr_st;
1318
1319   if (!interface_addr) {
1320     memset(&addr_st, 0, sizeof addr_st);
1321     if (handle->flags & UV_HANDLE_IPV6) {
1322       addr_st.ss_family = AF_INET6;
1323       addr6->sin6_scope_id = 0;
1324     } else {
1325       addr_st.ss_family = AF_INET;
1326       addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1327     }
1328   } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1329     /* nothing, address was parsed */
1330   } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1331     /* nothing, address was parsed */
1332   } else {
1333     return UV_EINVAL;
1334   }
1335
1336   if (addr_st.ss_family == AF_INET) {
1337     if (setsockopt(handle->io_watcher.fd,
1338                    IPPROTO_IP,
1339                    IP_MULTICAST_IF,
1340                    (void*) &addr4->sin_addr,
1341                    sizeof(addr4->sin_addr)) == -1) {
1342       return UV__ERR(errno);
1343     }
1344   } else if (addr_st.ss_family == AF_INET6) {
1345     if (setsockopt(handle->io_watcher.fd,
1346                    IPPROTO_IPV6,
1347                    IPV6_MULTICAST_IF,
1348                    &addr6->sin6_scope_id,
1349                    sizeof(addr6->sin6_scope_id)) == -1) {
1350       return UV__ERR(errno);
1351     }
1352   } else {
1353     assert(0 && "unexpected address family");
1354     abort();
1355   }
1356
1357   return 0;
1358 }
1359
1360 int uv_udp_getpeername(const uv_udp_t* handle,
1361                        struct sockaddr* name,
1362                        int* namelen) {
1363
1364   return uv__getsockpeername((const uv_handle_t*) handle,
1365                              getpeername,
1366                              name,
1367                              namelen);
1368 }
1369
1370 int uv_udp_getsockname(const uv_udp_t* handle,
1371                        struct sockaddr* name,
1372                        int* namelen) {
1373
1374   return uv__getsockpeername((const uv_handle_t*) handle,
1375                              getsockname,
1376                              name,
1377                              namelen);
1378 }
1379
1380
1381 int uv__udp_recv_start(uv_udp_t* handle,
1382                        uv_alloc_cb alloc_cb,
1383                        uv_udp_recv_cb recv_cb) {
1384   int err;
1385
1386   if (alloc_cb == NULL || recv_cb == NULL)
1387     return UV_EINVAL;
1388
1389   if (uv__io_active(&handle->io_watcher, POLLIN))
1390     return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1391
1392   err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1393   if (err)
1394     return err;
1395
1396   handle->alloc_cb = alloc_cb;
1397   handle->recv_cb = recv_cb;
1398
1399   uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1400   uv__handle_start(handle);
1401
1402   return 0;
1403 }
1404
1405
1406 int uv__udp_recv_stop(uv_udp_t* handle) {
1407   uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1408
1409   if (!uv__io_active(&handle->io_watcher, POLLOUT))
1410     uv__handle_stop(handle);
1411
1412   handle->alloc_cb = NULL;
1413   handle->recv_cb = NULL;
1414
1415   return 0;
1416 }