1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
44 struct sockaddr_in6 in6;
45 struct sockaddr_in in;
49 static void uv__udp_run_completed(uv_udp_t* handle);
50 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
51 static void uv__udp_recvmsg(uv_udp_t* handle);
52 static void uv__udp_sendmsg(uv_udp_t* handle);
53 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
59 #define UV__MMSG_MAXWIDTH 20
61 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
62 static void uv__udp_sendmmsg(uv_udp_t* handle);
64 static int uv__recvmmsg_avail;
65 static int uv__sendmmsg_avail;
66 static uv_once_t once = UV_ONCE_INIT;
68 static void uv__udp_mmsg_init(void) {
71 s = uv__socket(AF_INET, SOCK_DGRAM, 0);
74 ret = uv__sendmmsg(s, NULL, 0);
75 if (ret == 0 || errno != ENOSYS) {
76 uv__sendmmsg_avail = 1;
77 uv__recvmmsg_avail = 1;
79 ret = uv__recvmmsg(s, NULL, 0);
80 if (ret == 0 || errno != ENOSYS)
81 uv__recvmmsg_avail = 1;
88 void uv__udp_close(uv_udp_t* handle) {
89 uv__io_close(handle->loop, &handle->io_watcher);
90 uv__handle_stop(handle);
92 if (handle->io_watcher.fd != -1) {
93 uv__close(handle->io_watcher.fd);
94 handle->io_watcher.fd = -1;
99 void uv__udp_finish_close(uv_udp_t* handle) {
103 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
104 assert(handle->io_watcher.fd == -1);
106 while (!QUEUE_EMPTY(&handle->write_queue)) {
107 q = QUEUE_HEAD(&handle->write_queue);
110 req = QUEUE_DATA(q, uv_udp_send_t, queue);
111 req->status = UV_ECANCELED;
112 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
115 uv__udp_run_completed(handle);
117 assert(handle->send_queue_size == 0);
118 assert(handle->send_queue_count == 0);
120 /* Now tear down the handle. */
121 handle->recv_cb = NULL;
122 handle->alloc_cb = NULL;
123 /* but _do not_ touch close_cb */
127 static void uv__udp_run_completed(uv_udp_t* handle) {
131 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
132 handle->flags |= UV_HANDLE_UDP_PROCESSING;
134 while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
135 q = QUEUE_HEAD(&handle->write_completed_queue);
138 req = QUEUE_DATA(q, uv_udp_send_t, queue);
139 uv__req_unregister(handle->loop, req);
141 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
142 handle->send_queue_count--;
144 if (req->bufs != req->bufsml)
148 if (req->send_cb == NULL)
151 /* req->status >= 0 == bytes written
152 * req->status < 0 == errno
154 if (req->status >= 0)
155 req->send_cb(req, 0);
157 req->send_cb(req, req->status);
160 if (QUEUE_EMPTY(&handle->write_queue)) {
161 /* Pending queue and completion queue empty, stop watcher. */
162 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
163 if (!uv__io_active(&handle->io_watcher, POLLIN))
164 uv__handle_stop(handle);
167 handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
171 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
174 handle = container_of(w, uv_udp_t, io_watcher);
175 assert(handle->type == UV_UDP);
177 if (revents & POLLIN)
178 uv__udp_recvmsg(handle);
180 if (revents & POLLOUT) {
181 uv__udp_sendmsg(handle);
182 uv__udp_run_completed(handle);
187 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
188 struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
189 struct iovec iov[UV__MMSG_MAXWIDTH];
190 struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
197 /* prepare structures for recvmmsg */
198 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
199 if (chunks > ARRAY_SIZE(iov))
200 chunks = ARRAY_SIZE(iov);
201 for (k = 0; k < chunks; ++k) {
202 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
203 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
204 memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
205 msgs[k].msg_hdr.msg_iov = iov + k;
206 msgs[k].msg_hdr.msg_iovlen = 1;
207 msgs[k].msg_hdr.msg_name = peers + k;
208 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
209 msgs[k].msg_hdr.msg_control = NULL;
210 msgs[k].msg_hdr.msg_controllen = 0;
211 msgs[k].msg_hdr.msg_flags = 0;
215 nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks);
216 while (nread == -1 && errno == EINTR);
219 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
220 handle->recv_cb(handle, 0, buf, NULL, 0);
222 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
224 /* pass each chunk to the application */
225 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
226 flags = UV_UDP_MMSG_CHUNK;
227 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
228 flags |= UV_UDP_PARTIAL;
230 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
231 handle->recv_cb(handle,
234 msgs[k].msg_hdr.msg_name,
238 /* one last callback so the original buffer is freed */
239 if (handle->recv_cb != NULL)
240 handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
246 static void uv__udp_recvmsg(uv_udp_t* handle) {
247 struct sockaddr_storage peer;
254 assert(handle->recv_cb != NULL);
255 assert(handle->alloc_cb != NULL);
257 /* Prevent loop starvation when the data comes in as fast as (or faster than)
258 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
263 buf = uv_buf_init(NULL, 0);
264 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
265 if (buf.base == NULL || buf.len == 0) {
266 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
269 assert(buf.base != NULL);
272 if (uv_udp_using_recvmmsg(handle)) {
273 nread = uv__udp_recvmmsg(handle, &buf);
280 memset(&h, 0, sizeof(h));
281 memset(&peer, 0, sizeof(peer));
283 h.msg_namelen = sizeof(peer);
284 h.msg_iov = (void*) &buf;
288 nread = recvmsg(handle->io_watcher.fd, &h, 0);
290 while (nread == -1 && errno == EINTR);
293 if (errno == EAGAIN || errno == EWOULDBLOCK)
294 handle->recv_cb(handle, 0, &buf, NULL, 0);
296 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
300 if (h.msg_flags & MSG_TRUNC)
301 flags |= UV_UDP_PARTIAL;
303 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
307 /* recv_cb callback may decide to pause or close the handle */
310 && handle->io_watcher.fd != -1
311 && handle->recv_cb != NULL);
315 static void uv__udp_sendmmsg(uv_udp_t* handle) {
317 struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
318 struct uv__mmsghdr *p;
324 if (QUEUE_EMPTY(&handle->write_queue))
328 for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
329 pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
330 ++pkts, q = QUEUE_HEAD(q)) {
332 req = QUEUE_DATA(q, uv_udp_send_t, queue);
336 memset(p, 0, sizeof(*p));
337 if (req->addr.ss_family == AF_UNSPEC) {
338 p->msg_hdr.msg_name = NULL;
339 p->msg_hdr.msg_namelen = 0;
341 p->msg_hdr.msg_name = &req->addr;
342 if (req->addr.ss_family == AF_INET6)
343 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
344 else if (req->addr.ss_family == AF_INET)
345 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
346 else if (req->addr.ss_family == AF_UNIX)
347 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
349 assert(0 && "unsupported address family");
353 h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
354 h[pkts].msg_hdr.msg_iovlen = req->nbufs;
358 npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts);
359 while (npkts == -1 && errno == EINTR);
362 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
364 for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
365 i < pkts && q != &handle->write_queue;
366 ++i, q = QUEUE_HEAD(&handle->write_queue)) {
368 req = QUEUE_DATA(q, uv_udp_send_t, queue);
371 req->status = UV__ERR(errno);
372 QUEUE_REMOVE(&req->queue);
373 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
375 uv__io_feed(handle->loop, &handle->io_watcher);
379 /* Safety: npkts known to be >0 below. Hence cast from ssize_t
382 for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
383 i < (size_t)npkts && q != &handle->write_queue;
384 ++i, q = QUEUE_HEAD(&handle->write_queue)) {
386 req = QUEUE_DATA(q, uv_udp_send_t, queue);
389 req->status = req->bufs[0].len;
391 /* Sending a datagram is an atomic operation: either all data
392 * is written or nothing is (and EMSGSIZE is raised). That is
393 * why we don't handle partial writes. Just pop the request
394 * off the write queue and onto the completed queue, done.
396 QUEUE_REMOVE(&req->queue);
397 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
400 /* couldn't batch everything, continue sending (jump to avoid stack growth) */
401 if (!QUEUE_EMPTY(&handle->write_queue))
402 goto write_queue_drain;
403 uv__io_feed(handle->loop, &handle->io_watcher);
408 static void uv__udp_sendmsg(uv_udp_t* handle) {
415 uv_once(&once, uv__udp_mmsg_init);
416 if (uv__sendmmsg_avail) {
417 uv__udp_sendmmsg(handle);
422 while (!QUEUE_EMPTY(&handle->write_queue)) {
423 q = QUEUE_HEAD(&handle->write_queue);
426 req = QUEUE_DATA(q, uv_udp_send_t, queue);
429 memset(&h, 0, sizeof h);
430 if (req->addr.ss_family == AF_UNSPEC) {
434 h.msg_name = &req->addr;
435 if (req->addr.ss_family == AF_INET6)
436 h.msg_namelen = sizeof(struct sockaddr_in6);
437 else if (req->addr.ss_family == AF_INET)
438 h.msg_namelen = sizeof(struct sockaddr_in);
439 else if (req->addr.ss_family == AF_UNIX)
440 h.msg_namelen = sizeof(struct sockaddr_un);
442 assert(0 && "unsupported address family");
446 h.msg_iov = (struct iovec*) req->bufs;
447 h.msg_iovlen = req->nbufs;
450 size = sendmsg(handle->io_watcher.fd, &h, 0);
451 } while (size == -1 && errno == EINTR);
454 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
458 req->status = (size == -1 ? UV__ERR(errno) : size);
460 /* Sending a datagram is an atomic operation: either all data
461 * is written or nothing is (and EMSGSIZE is raised). That is
462 * why we don't handle partial writes. Just pop the request
463 * off the write queue and onto the completed queue, done.
465 QUEUE_REMOVE(&req->queue);
466 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
467 uv__io_feed(handle->loop, &handle->io_watcher);
471 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
472 * refinements for programs that use multicast.
474 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
475 * are different from the BSDs: it _shares_ the port rather than steal it
476 * from the current listener. While useful, it's not something we can emulate
477 * on other platforms so we don't enable it.
479 * zOS does not support getsockname with SO_REUSEPORT option when using
482 static int uv__set_reuse(int fd) {
486 #if defined(SO_REUSEPORT) && defined(__MVS__)
487 struct sockaddr_in sockfd;
488 unsigned int sockfd_len = sizeof(sockfd);
489 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
490 return UV__ERR(errno);
491 if (sockfd.sin_family == AF_UNIX) {
492 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
493 return UV__ERR(errno);
495 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
496 return UV__ERR(errno);
498 #elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__)
499 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
500 return UV__ERR(errno);
502 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
503 return UV__ERR(errno);
510 * The Linux kernel suppresses some ICMP error messages by default for UDP
511 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
512 * error reporting, hopefully resulting in faster failover to working name
515 static int uv__set_recverr(int fd, sa_family_t ss_family) {
516 #if defined(__linux__)
520 if (ss_family == AF_INET) {
521 if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
522 return UV__ERR(errno);
523 } else if (ss_family == AF_INET6) {
524 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
525 return UV__ERR(errno);
532 int uv__udp_bind(uv_udp_t* handle,
533 const struct sockaddr* addr,
534 unsigned int addrlen,
535 unsigned int flags) {
540 /* Check for bad flags. */
541 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR))
544 /* Cannot set IPv6-only mode on non-IPv6 socket. */
545 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
548 fd = handle->io_watcher.fd;
550 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
554 handle->io_watcher.fd = fd;
557 if (flags & UV_UDP_LINUX_RECVERR) {
558 err = uv__set_recverr(fd, addr->sa_family);
563 if (flags & UV_UDP_REUSEADDR) {
564 err = uv__set_reuse(fd);
569 if (flags & UV_UDP_IPV6ONLY) {
572 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
573 err = UV__ERR(errno);
582 if (bind(fd, addr, addrlen)) {
583 err = UV__ERR(errno);
584 if (errno == EAFNOSUPPORT)
585 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
586 * socket created with AF_INET to an AF_INET6 address or vice versa. */
591 if (addr->sa_family == AF_INET6)
592 handle->flags |= UV_HANDLE_IPV6;
594 handle->flags |= UV_HANDLE_BOUND;
599 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
601 unsigned int flags) {
602 union uv__sockaddr taddr;
605 if (handle->io_watcher.fd != -1)
611 struct sockaddr_in* addr = &taddr.in;
612 memset(addr, 0, sizeof *addr);
613 addr->sin_family = AF_INET;
614 addr->sin_addr.s_addr = INADDR_ANY;
615 addrlen = sizeof *addr;
620 struct sockaddr_in6* addr = &taddr.in6;
621 memset(addr, 0, sizeof *addr);
622 addr->sin6_family = AF_INET6;
623 addr->sin6_addr = in6addr_any;
624 addrlen = sizeof *addr;
628 assert(0 && "unsupported address family");
632 return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
636 int uv__udp_connect(uv_udp_t* handle,
637 const struct sockaddr* addr,
638 unsigned int addrlen) {
641 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
647 err = connect(handle->io_watcher.fd, addr, addrlen);
648 } while (err == -1 && errno == EINTR);
651 return UV__ERR(errno);
653 handle->flags |= UV_HANDLE_UDP_CONNECTED;
658 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
659 * Any of uv supported UNIXs kernel should be standardized, but the kernel
660 * implementation logic not same, let's use pseudocode to explain the udp
661 * disconnect behaviors:
663 * Predefined stubs for pseudocode:
664 * 1. sodisconnect: The function to perform the real udp disconnect
665 * 2. pru_connect: The function to perform the real udp connect
666 * 3. so: The kernel object match with socket fd
667 * 4. addr: The sockaddr parameter from user space
670 * if(sodisconnect(so) == 0) { // udp disconnect succeed
671 * if (addr->sa_len != so->addr->sa_len) return EINVAL;
672 * if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
675 * else return EISCONN;
677 * z/OS (same with Windows):
678 * if(addr->sa_len < so->addr->sa_len) return EINVAL;
679 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
682 * if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
683 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
686 * if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
687 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
689 int uv__udp_disconnect(uv_udp_t* handle) {
692 struct sockaddr_storage addr;
694 struct sockaddr addr;
697 memset(&addr, 0, sizeof(addr));
700 addr.ss_family = AF_UNSPEC;
702 addr.sa_family = AF_UNSPEC;
708 /* On IBMi a connectionless transport socket can be disconnected by
709 * either setting the addr parameter to NULL or setting the
710 * addr_length parameter to zero, and issuing another connect().
711 * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
713 r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
715 r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
717 } while (r == -1 && errno == EINTR);
720 #if defined(BSD) /* The macro BSD is from sys/param.h */
721 if (errno != EAFNOSUPPORT && errno != EINVAL)
722 return UV__ERR(errno);
724 return UV__ERR(errno);
728 handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
732 int uv__udp_send(uv_udp_send_t* req,
734 const uv_buf_t bufs[],
736 const struct sockaddr* addr,
737 unsigned int addrlen,
738 uv_udp_send_cb send_cb) {
745 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
750 /* It's legal for send_queue_count > 0 even when the write_queue is empty;
751 * it means there are error-state requests in the write_completed_queue that
752 * will touch up send_queue_size/count later.
754 empty_queue = (handle->send_queue_count == 0);
756 uv__req_init(handle->loop, req, UV_UDP_SEND);
757 assert(addrlen <= sizeof(req->addr));
759 req->addr.ss_family = AF_UNSPEC;
761 memcpy(&req->addr, addr, addrlen);
762 req->send_cb = send_cb;
763 req->handle = handle;
766 req->bufs = req->bufsml;
767 if (nbufs > ARRAY_SIZE(req->bufsml))
768 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
770 if (req->bufs == NULL) {
771 uv__req_unregister(handle->loop, req);
775 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
776 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
777 handle->send_queue_count++;
778 QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
779 uv__handle_start(handle);
781 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
782 uv__udp_sendmsg(handle);
784 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
785 * away. In such cases the `io_watcher` has to be queued for asynchronous
788 if (!QUEUE_EMPTY(&handle->write_queue))
789 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
791 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
798 int uv__udp_try_send(uv_udp_t* handle,
799 const uv_buf_t bufs[],
801 const struct sockaddr* addr,
802 unsigned int addrlen) {
809 /* already sending a message */
810 if (handle->send_queue_count != 0)
814 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
818 assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
821 memset(&h, 0, sizeof h);
822 h.msg_name = (struct sockaddr*) addr;
823 h.msg_namelen = addrlen;
824 h.msg_iov = (struct iovec*) bufs;
825 h.msg_iovlen = nbufs;
828 size = sendmsg(handle->io_watcher.fd, &h, 0);
829 } while (size == -1 && errno == EINTR);
832 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
835 return UV__ERR(errno);
842 static int uv__udp_set_membership4(uv_udp_t* handle,
843 const struct sockaddr_in* multicast_addr,
844 const char* interface_addr,
845 uv_membership membership) {
850 memset(&mreq, 0, sizeof mreq);
852 if (interface_addr) {
853 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
857 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
860 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
862 switch (membership) {
864 optname = IP_ADD_MEMBERSHIP;
867 optname = IP_DROP_MEMBERSHIP;
873 if (setsockopt(handle->io_watcher.fd,
882 return UV__ERR(errno);
889 static int uv__udp_set_membership6(uv_udp_t* handle,
890 const struct sockaddr_in6* multicast_addr,
891 const char* interface_addr,
892 uv_membership membership) {
894 struct ipv6_mreq mreq;
895 struct sockaddr_in6 addr6;
897 memset(&mreq, 0, sizeof mreq);
899 if (interface_addr) {
900 if (uv_ip6_addr(interface_addr, 0, &addr6))
902 mreq.ipv6mr_interface = addr6.sin6_scope_id;
904 mreq.ipv6mr_interface = 0;
907 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
909 switch (membership) {
911 optname = IPV6_ADD_MEMBERSHIP;
914 optname = IPV6_DROP_MEMBERSHIP;
920 if (setsockopt(handle->io_watcher.fd,
929 return UV__ERR(errno);
936 #if !defined(__OpenBSD__) && \
937 !defined(__NetBSD__) && \
938 !defined(__ANDROID__) && \
939 !defined(__DragonFly__) && \
940 !defined(__QNX__) && \
942 static int uv__udp_set_source_membership4(uv_udp_t* handle,
943 const struct sockaddr_in* multicast_addr,
944 const char* interface_addr,
945 const struct sockaddr_in* source_addr,
946 uv_membership membership) {
947 struct ip_mreq_source mreq;
951 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
955 memset(&mreq, 0, sizeof(mreq));
957 if (interface_addr != NULL) {
958 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
962 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
965 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
966 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
968 if (membership == UV_JOIN_GROUP)
969 optname = IP_ADD_SOURCE_MEMBERSHIP;
970 else if (membership == UV_LEAVE_GROUP)
971 optname = IP_DROP_SOURCE_MEMBERSHIP;
975 if (setsockopt(handle->io_watcher.fd,
980 return UV__ERR(errno);
987 static int uv__udp_set_source_membership6(uv_udp_t* handle,
988 const struct sockaddr_in6* multicast_addr,
989 const char* interface_addr,
990 const struct sockaddr_in6* source_addr,
991 uv_membership membership) {
992 struct group_source_req mreq;
993 struct sockaddr_in6 addr6;
997 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1001 memset(&mreq, 0, sizeof(mreq));
1003 if (interface_addr != NULL) {
1004 err = uv_ip6_addr(interface_addr, 0, &addr6);
1007 mreq.gsr_interface = addr6.sin6_scope_id;
1009 mreq.gsr_interface = 0;
1012 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
1013 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
1014 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
1015 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
1017 if (membership == UV_JOIN_GROUP)
1018 optname = MCAST_JOIN_SOURCE_GROUP;
1019 else if (membership == UV_LEAVE_GROUP)
1020 optname = MCAST_LEAVE_SOURCE_GROUP;
1024 if (setsockopt(handle->io_watcher.fd,
1029 return UV__ERR(errno);
1037 int uv__udp_init_ex(uv_loop_t* loop,
1044 if (domain != AF_UNSPEC) {
1045 fd = uv__socket(domain, SOCK_DGRAM, 0);
1050 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
1051 handle->alloc_cb = NULL;
1052 handle->recv_cb = NULL;
1053 handle->send_queue_size = 0;
1054 handle->send_queue_count = 0;
1055 uv__io_init(&handle->io_watcher, uv__udp_io, fd);
1056 QUEUE_INIT(&handle->write_queue);
1057 QUEUE_INIT(&handle->write_completed_queue);
1063 int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
1065 if (handle->flags & UV_HANDLE_UDP_RECVMMSG) {
1066 uv_once(&once, uv__udp_mmsg_init);
1067 return uv__recvmmsg_avail;
1074 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
1077 /* Check for already active socket. */
1078 if (handle->io_watcher.fd != -1)
1081 if (uv__fd_exists(handle->loop, sock))
1084 err = uv__nonblock(sock, 1);
1088 err = uv__set_reuse(sock);
1092 handle->io_watcher.fd = sock;
1093 if (uv__udp_is_connected(handle))
1094 handle->flags |= UV_HANDLE_UDP_CONNECTED;
1100 int uv_udp_set_membership(uv_udp_t* handle,
1101 const char* multicast_addr,
1102 const char* interface_addr,
1103 uv_membership membership) {
1105 struct sockaddr_in addr4;
1106 struct sockaddr_in6 addr6;
1108 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
1109 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
1112 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
1113 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
1114 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1117 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
1124 int uv_udp_set_source_membership(uv_udp_t* handle,
1125 const char* multicast_addr,
1126 const char* interface_addr,
1127 const char* source_addr,
1128 uv_membership membership) {
1129 #if !defined(__OpenBSD__) && \
1130 !defined(__NetBSD__) && \
1131 !defined(__ANDROID__) && \
1132 !defined(__DragonFly__) && \
1133 !defined(__QNX__) && \
1136 union uv__sockaddr mcast_addr;
1137 union uv__sockaddr src_addr;
1139 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
1141 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
1144 err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
1147 return uv__udp_set_source_membership6(handle,
1154 err = uv_ip4_addr(source_addr, 0, &src_addr.in);
1157 return uv__udp_set_source_membership4(handle,
1168 static int uv__setsockopt(uv_udp_t* handle,
1175 if (handle->flags & UV_HANDLE_IPV6)
1176 r = setsockopt(handle->io_watcher.fd,
1182 r = setsockopt(handle->io_watcher.fd,
1188 return UV__ERR(errno);
1193 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1197 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
1199 #elif defined(__OpenBSD__)
1200 unsigned char arg = val;
1205 if (val < 0 || val > 255)
1208 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1212 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1213 if (setsockopt(handle->io_watcher.fd,
1218 return UV__ERR(errno);
1225 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1226 if (ttl < 1 || ttl > 255)
1229 #if defined(__MVS__)
1230 if (!(handle->flags & UV_HANDLE_IPV6))
1231 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */
1235 * On Solaris and derivatives such as SmartOS, the length of socket options
1236 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1237 * so hardcode the size of these options on this platform,
1238 * and use the general uv__setsockopt_maybe_char call on other platforms.
1240 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1241 defined(__MVS__) || defined(__QNX__)
1243 return uv__setsockopt(handle,
1249 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1250 defined(__MVS__) || defined(__QNX__)) */
1252 return uv__setsockopt_maybe_char(handle,
1257 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1258 defined(__MVS__) || defined(__QNX__) */
1262 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1264 * On Solaris and derivatives such as SmartOS, the length of socket options
1265 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1266 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1267 * and use the general uv__setsockopt_maybe_char call otherwise.
1269 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1270 defined(__MVS__) || defined(__QNX__)
1271 if (handle->flags & UV_HANDLE_IPV6)
1272 return uv__setsockopt(handle,
1274 IPV6_MULTICAST_HOPS,
1277 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1278 defined(__MVS__) || defined(__QNX__) */
1280 return uv__setsockopt_maybe_char(handle,
1282 IPV6_MULTICAST_HOPS,
1287 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1289 * On Solaris and derivatives such as SmartOS, the length of socket options
1290 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1291 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1292 * and use the general uv__setsockopt_maybe_char call otherwise.
1294 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1295 defined(__MVS__) || defined(__QNX__)
1296 if (handle->flags & UV_HANDLE_IPV6)
1297 return uv__setsockopt(handle,
1299 IPV6_MULTICAST_LOOP,
1302 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1303 defined(__MVS__) || defined(__QNX__) */
1305 return uv__setsockopt_maybe_char(handle,
1307 IPV6_MULTICAST_LOOP,
1311 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1312 struct sockaddr_storage addr_st;
1313 struct sockaddr_in* addr4;
1314 struct sockaddr_in6* addr6;
1316 addr4 = (struct sockaddr_in*) &addr_st;
1317 addr6 = (struct sockaddr_in6*) &addr_st;
1319 if (!interface_addr) {
1320 memset(&addr_st, 0, sizeof addr_st);
1321 if (handle->flags & UV_HANDLE_IPV6) {
1322 addr_st.ss_family = AF_INET6;
1323 addr6->sin6_scope_id = 0;
1325 addr_st.ss_family = AF_INET;
1326 addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1328 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1329 /* nothing, address was parsed */
1330 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1331 /* nothing, address was parsed */
1336 if (addr_st.ss_family == AF_INET) {
1337 if (setsockopt(handle->io_watcher.fd,
1340 (void*) &addr4->sin_addr,
1341 sizeof(addr4->sin_addr)) == -1) {
1342 return UV__ERR(errno);
1344 } else if (addr_st.ss_family == AF_INET6) {
1345 if (setsockopt(handle->io_watcher.fd,
1348 &addr6->sin6_scope_id,
1349 sizeof(addr6->sin6_scope_id)) == -1) {
1350 return UV__ERR(errno);
1353 assert(0 && "unexpected address family");
1360 int uv_udp_getpeername(const uv_udp_t* handle,
1361 struct sockaddr* name,
1364 return uv__getsockpeername((const uv_handle_t*) handle,
1370 int uv_udp_getsockname(const uv_udp_t* handle,
1371 struct sockaddr* name,
1374 return uv__getsockpeername((const uv_handle_t*) handle,
1381 int uv__udp_recv_start(uv_udp_t* handle,
1382 uv_alloc_cb alloc_cb,
1383 uv_udp_recv_cb recv_cb) {
1386 if (alloc_cb == NULL || recv_cb == NULL)
1389 if (uv__io_active(&handle->io_watcher, POLLIN))
1390 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1392 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1396 handle->alloc_cb = alloc_cb;
1397 handle->recv_cb = recv_cb;
1399 uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1400 uv__handle_start(handle);
1406 int uv__udp_recv_stop(uv_udp_t* handle) {
1407 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1409 if (!uv__io_active(&handle->io_watcher, POLLOUT))
1410 uv__handle_stop(handle);
1412 handle->alloc_cb = NULL;
1413 handle->recv_cb = NULL;