1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
32 static void uv__udp_run_completed(uv_udp_t* handle);
33 static void uv__udp_run_pending(uv_udp_t* handle);
34 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
35 static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
36 static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
37 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
38 static int uv__send(uv_udp_send_t* req,
42 struct sockaddr* addr,
44 uv_udp_send_cb send_cb);
47 void uv__udp_close(uv_udp_t* handle) {
48 uv__io_close(handle->loop, &handle->io_watcher);
49 uv__handle_stop(handle);
50 close(handle->io_watcher.fd);
51 handle->io_watcher.fd = -1;
55 void uv__udp_finish_close(uv_udp_t* handle) {
59 assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
60 assert(handle->io_watcher.fd == -1);
62 uv__udp_run_completed(handle);
64 while (!ngx_queue_empty(&handle->write_queue)) {
65 q = ngx_queue_head(&handle->write_queue);
68 req = ngx_queue_data(q, uv_udp_send_t, queue);
69 uv__req_unregister(handle->loop, req);
71 if (req->bufs != req->bufsml)
76 uv__set_artificial_error(handle->loop, UV_ECANCELED);
77 req->send_cb(req, -1);
81 /* Now tear down the handle. */
82 handle->recv_cb = NULL;
83 handle->alloc_cb = NULL;
84 /* but _do not_ touch close_cb */
88 static void uv__udp_run_pending(uv_udp_t* handle) {
94 while (!ngx_queue_empty(&handle->write_queue)) {
95 q = ngx_queue_head(&handle->write_queue);
98 req = ngx_queue_data(q, uv_udp_send_t, queue);
101 memset(&h, 0, sizeof h);
102 h.msg_name = &req->addr;
103 h.msg_namelen = (req->addr.sin6_family == AF_INET6 ?
104 sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
105 h.msg_iov = (struct iovec*)req->bufs;
106 h.msg_iovlen = req->bufcnt;
109 size = sendmsg(handle->io_watcher.fd, &h, 0);
111 while (size == -1 && errno == EINTR);
113 /* TODO try to write once or twice more in the
114 * hope that the socket becomes readable again?
116 if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
119 req->status = (size == -1 ? -errno : size);
127 for (nbytes = i = 0; i < req->bufcnt; i++)
128 nbytes += req->bufs[i].len;
130 assert(size == nbytes);
134 /* Sending a datagram is an atomic operation: either all data
135 * is written or nothing is (and EMSGSIZE is raised). That is
136 * why we don't handle partial writes. Just pop the request
137 * off the write queue and onto the completed queue, done.
139 ngx_queue_remove(&req->queue);
140 ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
145 static void uv__udp_run_completed(uv_udp_t* handle) {
149 while (!ngx_queue_empty(&handle->write_completed_queue)) {
150 q = ngx_queue_head(&handle->write_completed_queue);
153 req = ngx_queue_data(q, uv_udp_send_t, queue);
154 uv__req_unregister(handle->loop, req);
156 if (req->bufs != req->bufsml)
160 if (req->send_cb == NULL)
163 /* req->status >= 0 == bytes written
164 * req->status < 0 == errno
166 if (req->status >= 0) {
167 req->send_cb(req, 0);
170 uv__set_sys_error(handle->loop, -req->status);
171 req->send_cb(req, -1);
177 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
178 if (revents & UV__POLLIN)
179 uv__udp_recvmsg(loop, w, revents);
181 if (revents & UV__POLLOUT)
182 uv__udp_sendmsg(loop, w, revents);
186 static void uv__udp_recvmsg(uv_loop_t* loop,
188 unsigned int revents) {
189 struct sockaddr_storage peer;
197 handle = container_of(w, uv_udp_t, io_watcher);
198 assert(handle->type == UV_UDP);
199 assert(revents & UV__POLLIN);
201 assert(handle->recv_cb != NULL);
202 assert(handle->alloc_cb != NULL);
204 /* Prevent loop starvation when the data comes in as fast as (or faster than)
205 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
209 memset(&h, 0, sizeof(h));
213 buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
215 assert(buf.base != NULL);
217 h.msg_namelen = sizeof(peer);
218 h.msg_iov = (void*) &buf;
222 nread = recvmsg(handle->io_watcher.fd, &h, 0);
224 while (nread == -1 && errno == EINTR);
227 if (errno == EAGAIN || errno == EWOULDBLOCK) {
228 uv__set_sys_error(handle->loop, EAGAIN);
229 handle->recv_cb(handle, 0, buf, NULL, 0);
232 uv__set_sys_error(handle->loop, errno);
233 handle->recv_cb(handle, -1, buf, NULL, 0);
239 if (h.msg_flags & MSG_TRUNC)
240 flags |= UV_UDP_PARTIAL;
242 handle->recv_cb(handle,
245 (struct sockaddr*)&peer,
249 /* recv_cb callback may decide to pause or close the handle */
252 && handle->io_watcher.fd != -1
253 && handle->recv_cb != NULL);
257 static void uv__udp_sendmsg(uv_loop_t* loop,
259 unsigned int revents) {
262 handle = container_of(w, uv_udp_t, io_watcher);
263 assert(handle->type == UV_UDP);
264 assert(revents & UV__POLLOUT);
266 assert(!ngx_queue_empty(&handle->write_queue)
267 || !ngx_queue_empty(&handle->write_completed_queue));
269 /* Write out pending data first. */
270 uv__udp_run_pending(handle);
272 /* Drain 'request completed' queue. */
273 uv__udp_run_completed(handle);
275 if (!ngx_queue_empty(&handle->write_completed_queue)) {
276 /* Schedule completion callbacks. */
277 uv__io_feed(handle->loop, &handle->io_watcher);
279 else if (ngx_queue_empty(&handle->write_queue)) {
280 /* Pending queue and completion queue empty, stop watcher. */
281 uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
283 if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
284 uv__handle_stop(handle);
289 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
290 * refinements for programs that use multicast.
292 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
293 * are different from the BSDs: it _shares_ the port rather than steal it
294 * from the current listener. While useful, it's not something we can emulate
295 * on other platforms so we don't enable it.
297 static int uv__set_reuse(int fd) {
300 #if defined(SO_REUSEPORT) && !defined(__linux__)
302 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
306 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
314 static int uv__bind(uv_udp_t* handle,
316 struct sockaddr* addr,
329 /* Check for bad flags. */
330 if (flags & ~UV_UDP_IPV6ONLY) {
331 uv__set_sys_error(handle->loop, EINVAL);
335 /* Cannot set IPv6-only mode on non-IPv6 socket. */
336 if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
337 uv__set_sys_error(handle->loop, EINVAL);
341 if (handle->io_watcher.fd == -1) {
342 if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
343 uv__set_sys_error(handle->loop, errno);
346 handle->io_watcher.fd = fd;
349 fd = handle->io_watcher.fd;
350 err = uv__set_reuse(fd);
352 uv__set_sys_error(handle->loop, -err);
356 if (flags & UV_UDP_IPV6ONLY) {
359 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
360 uv__set_sys_error(handle->loop, errno);
364 uv__set_sys_error(handle->loop, ENOTSUP);
369 if (bind(fd, addr, len) == -1) {
370 uv__set_sys_error(handle->loop, errno);
374 handle->io_watcher.fd = fd;
379 close(handle->io_watcher.fd);
380 handle->io_watcher.fd = -1;
388 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
389 unsigned char taddr[sizeof(struct sockaddr_in6)];
392 assert(domain == AF_INET || domain == AF_INET6);
394 if (handle->io_watcher.fd != -1)
400 struct sockaddr_in* addr = (void*)&taddr;
401 memset(addr, 0, sizeof *addr);
402 addr->sin_family = AF_INET;
403 addr->sin_addr.s_addr = INADDR_ANY;
404 addrlen = sizeof *addr;
409 struct sockaddr_in6* addr = (void*)&taddr;
410 memset(addr, 0, sizeof *addr);
411 addr->sin6_family = AF_INET6;
412 addr->sin6_addr = in6addr_any;
413 addrlen = sizeof *addr;
417 assert(0 && "unsupported address family");
421 return uv__bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
425 static int uv__send(uv_udp_send_t* req,
429 struct sockaddr* addr,
431 uv_udp_send_cb send_cb) {
434 if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
437 uv__req_init(handle->loop, req, UV_UDP_SEND);
439 assert(addrlen <= sizeof(req->addr));
440 memcpy(&req->addr, addr, addrlen);
441 req->send_cb = send_cb;
442 req->handle = handle;
443 req->bufcnt = bufcnt;
445 if (bufcnt <= (int) ARRAY_SIZE(req->bufsml)) {
446 req->bufs = req->bufsml;
448 else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
449 uv__set_sys_error(handle->loop, ENOMEM);
453 memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
454 ngx_queue_insert_tail(&handle->write_queue, &req->queue);
455 uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
456 uv__handle_start(handle);
462 int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
463 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
464 handle->alloc_cb = NULL;
465 handle->recv_cb = NULL;
466 uv__io_init(&handle->io_watcher, uv__udp_io, -1);
467 ngx_queue_init(&handle->write_queue);
468 ngx_queue_init(&handle->write_completed_queue);
473 int uv__udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
474 return uv__bind(handle,
476 (struct sockaddr*)&addr,
482 int uv__udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
483 return uv__bind(handle,
485 (struct sockaddr*)&addr,
491 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
499 /* Check for already active socket. */
500 if (handle->io_watcher.fd != -1) {
501 uv__set_artificial_error(handle->loop, UV_EALREADY);
506 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
507 uv__set_sys_error(handle->loop, errno);
511 /* On the BSDs, SO_REUSEADDR lets you reuse an address that's in the TIME_WAIT
512 * state (i.e. was until recently tied to a socket) while SO_REUSEPORT lets
513 * multiple processes bind to the same address. Yes, it's something of a
514 * misnomer but then again, SO_REUSEADDR was already taken.
516 * None of the above applies to Linux: SO_REUSEADDR implies SO_REUSEPORT on
517 * Linux and hence it does not have SO_REUSEPORT at all.
521 if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof yes) == -1) {
522 uv__set_sys_error(handle->loop, errno);
527 handle->io_watcher.fd = sock;
536 int uv_udp_set_membership(uv_udp_t* handle,
537 const char* multicast_addr,
538 const char* interface_addr,
539 uv_membership membership) {
543 memset(&mreq, 0, sizeof mreq);
545 if (interface_addr) {
546 mreq.imr_interface.s_addr = inet_addr(interface_addr);
548 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
551 mreq.imr_multiaddr.s_addr = inet_addr(multicast_addr);
553 switch (membership) {
555 optname = IP_ADD_MEMBERSHIP;
558 optname = IP_DROP_MEMBERSHIP;
561 return uv__set_artificial_error(handle->loop, UV_EINVAL);
564 if (setsockopt(handle->io_watcher.fd,
569 return uv__set_sys_error(handle->loop, errno);
576 static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option, int val) {
583 if (val < 0 || val > 255)
584 return uv__set_sys_error(handle->loop, EINVAL);
586 if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, option, &arg, sizeof(arg)))
587 return uv__set_sys_error(handle->loop, errno);
593 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
594 if (setsockopt(handle->io_watcher.fd,
599 return uv__set_sys_error(handle->loop, errno);
606 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
607 if (ttl < 1 || ttl > 255)
608 return uv__set_sys_error(handle->loop, EINVAL);
610 if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, IP_TTL, &ttl, sizeof(ttl)))
611 return uv__set_sys_error(handle->loop, errno);
617 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
618 return uv__setsockopt_maybe_char(handle, IP_MULTICAST_TTL, ttl);
622 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
623 return uv__setsockopt_maybe_char(handle, IP_MULTICAST_LOOP, on);
627 int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) {
632 /* Don't clobber errno. */
635 if (handle->io_watcher.fd == -1) {
636 uv__set_sys_error(handle->loop, EINVAL);
641 /* sizeof(socklen_t) != sizeof(int) on some systems. */
642 socklen = (socklen_t)*namelen;
644 if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) {
645 uv__set_sys_error(handle->loop, errno);
648 *namelen = (int)socklen;
657 int uv__udp_send(uv_udp_send_t* req,
661 struct sockaddr_in addr,
662 uv_udp_send_cb send_cb) {
667 (struct sockaddr*)&addr,
673 int uv__udp_send6(uv_udp_send_t* req,
677 struct sockaddr_in6 addr,
678 uv_udp_send_cb send_cb) {
683 (struct sockaddr*)&addr,
689 int uv__udp_recv_start(uv_udp_t* handle,
690 uv_alloc_cb alloc_cb,
691 uv_udp_recv_cb recv_cb) {
692 if (alloc_cb == NULL || recv_cb == NULL) {
693 uv__set_artificial_error(handle->loop, UV_EINVAL);
697 if (uv__io_active(&handle->io_watcher, UV__POLLIN)) {
698 uv__set_artificial_error(handle->loop, UV_EALREADY);
702 if (uv__udp_maybe_deferred_bind(handle, AF_INET))
705 handle->alloc_cb = alloc_cb;
706 handle->recv_cb = recv_cb;
708 uv__io_start(handle->loop, &handle->io_watcher, UV__POLLIN);
709 uv__handle_start(handle);
715 int uv__udp_recv_stop(uv_udp_t* handle) {
716 uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN);
718 if (!uv__io_active(&handle->io_watcher, UV__POLLOUT))
719 uv__handle_stop(handle);
721 handle->alloc_cb = NULL;
722 handle->recv_cb = NULL;