1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
31 #include <sys/types.h>
32 #include <sys/socket.h>
36 #include <limits.h> /* IOV_MAX */
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
46 struct uv__stream_select_s {
61 #endif /* defined(__APPLE__) */
63 static void uv__stream_connect(uv_stream_t*);
64 static void uv__write(uv_stream_t* stream);
65 static void uv__read(uv_stream_t* stream);
66 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
67 static void uv__write_callbacks(uv_stream_t* stream);
68 static size_t uv__write_req_size(uv_write_t* req);
71 void uv__stream_init(uv_loop_t* loop,
73 uv_handle_type type) {
76 uv__handle_init(loop, (uv_handle_t*)stream, type);
77 stream->read_cb = NULL;
78 stream->alloc_cb = NULL;
79 stream->close_cb = NULL;
80 stream->connection_cb = NULL;
81 stream->connect_req = NULL;
82 stream->shutdown_req = NULL;
83 stream->accepted_fd = -1;
84 stream->queued_fds = NULL;
85 stream->delayed_error = 0;
86 QUEUE_INIT(&stream->write_queue);
87 QUEUE_INIT(&stream->write_completed_queue);
88 stream->write_queue_size = 0;
90 if (loop->emfile_fd == -1) {
91 err = uv__open_cloexec("/dev/null", O_RDONLY);
93 /* In the rare case that "/dev/null" isn't mounted open "/"
96 err = uv__open_cloexec("/", O_RDONLY);
98 loop->emfile_fd = err;
101 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
102 stream->select = NULL;
103 #endif /* defined(__APPLE_) */
105 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
109 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
110 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
111 /* Notify select() thread about state change */
112 uv__stream_select_t* s;
119 /* Interrupt select() loop
120 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
121 * emit read event on other side
124 r = write(s->fake_fd, "x", 1);
125 while (r == -1 && errno == EINTR);
128 #else /* !defined(__APPLE__) */
129 /* No-op on any other platform */
130 #endif /* !defined(__APPLE__) */
134 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
135 static void uv__stream_osx_select(void* arg) {
137 uv__stream_select_t* s;
154 /* Terminate on semaphore */
155 if (uv_sem_trywait(&s->close_sem) == 0)
158 /* Watch fd using select(2) */
159 memset(s->sread, 0, s->sread_sz);
160 memset(s->swrite, 0, s->swrite_sz);
162 if (uv__io_active(&stream->io_watcher, POLLIN))
163 FD_SET(fd, s->sread);
164 if (uv__io_active(&stream->io_watcher, POLLOUT))
165 FD_SET(fd, s->swrite);
166 FD_SET(s->int_fd, s->sread);
168 /* Wait indefinitely for fd events */
169 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
174 /* XXX: Possible?! */
178 /* Ignore timeouts */
182 /* Empty socketpair's buffer in case of interruption */
183 if (FD_ISSET(s->int_fd, s->sread))
185 r = read(s->int_fd, buf, sizeof(buf));
187 if (r == sizeof(buf))
193 if (errno == EAGAIN || errno == EWOULDBLOCK)
204 if (FD_ISSET(fd, s->sread))
206 if (FD_ISSET(fd, s->swrite))
209 assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
211 ACCESS_ONCE(int, s->events) = events;
213 uv_async_send(&s->async);
214 uv_sem_wait(&s->async_sem);
216 /* Should be processed at this stage */
217 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
223 static void uv__stream_osx_select_cb(uv_async_t* handle) {
224 uv__stream_select_t* s;
228 s = container_of(handle, uv__stream_select_t, async);
231 /* Get and reset stream's events */
233 ACCESS_ONCE(int, s->events) = 0;
236 assert(events == (events & (POLLIN | POLLOUT)));
238 /* Invoke callback on event-loop */
239 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
240 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
242 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
243 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
245 if (stream->flags & UV_HANDLE_CLOSING)
248 /* NOTE: It is important to do it here, otherwise `select()` might be called
249 * before the actual `uv__read()`, leading to the blocking syscall
251 uv_sem_post(&s->async_sem);
255 static void uv__stream_osx_cb_close(uv_handle_t* async) {
256 uv__stream_select_t* s;
258 s = container_of(async, uv__stream_select_t, async);
263 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
265 * kqueue doesn't work with some files from /dev mount on osx.
266 * select(2) in separate thread for those fds
269 struct kevent filter[1];
270 struct kevent events[1];
271 struct timespec timeout;
272 uv__stream_select_t* s;
284 perror("(libuv) kqueue()");
285 return UV__ERR(errno);
288 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
290 /* Use small timeout, because we only want to capture EINVALs */
295 ret = kevent(kq, filter, 1, events, 1, &timeout);
296 while (ret == -1 && errno == EINTR);
301 return UV__ERR(errno);
303 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
306 /* At this point we definitely know that this fd won't work with kqueue */
309 * Create fds for io watcher and to interrupt the select() loop.
310 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
312 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
313 return UV__ERR(errno);
319 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
320 swrite_sz = sread_sz;
322 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
330 s->sread = (fd_set*) ((char*) s + sizeof(*s));
331 s->sread_sz = sread_sz;
332 s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
333 s->swrite_sz = swrite_sz;
335 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
337 goto failed_async_init;
339 s->async.flags |= UV_HANDLE_INTERNAL;
340 uv__handle_unref(&s->async);
342 err = uv_sem_init(&s->close_sem, 0);
344 goto failed_close_sem_init;
346 err = uv_sem_init(&s->async_sem, 0);
348 goto failed_async_sem_init;
358 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
360 goto failed_thread_create;
364 failed_thread_create:
366 stream->select = NULL;
369 uv_sem_destroy(&s->async_sem);
371 failed_async_sem_init:
372 uv_sem_destroy(&s->close_sem);
374 failed_close_sem_init:
377 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
389 #endif /* defined(__APPLE__) */
392 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
393 #if defined(__APPLE__)
397 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
401 stream->flags |= flags;
403 if (stream->type == UV_TCP) {
404 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
405 return UV__ERR(errno);
407 /* TODO Use delay the user passed in. */
408 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
409 uv__tcp_keepalive(fd, 1, 60)) {
410 return UV__ERR(errno);
414 #if defined(__APPLE__)
416 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
419 return UV__ERR(errno);
423 stream->io_watcher.fd = fd;
429 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
432 while (!QUEUE_EMPTY(&stream->write_queue)) {
433 q = QUEUE_HEAD(&stream->write_queue);
436 req = QUEUE_DATA(q, uv_write_t, queue);
439 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
444 void uv__stream_destroy(uv_stream_t* stream) {
445 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
446 assert(stream->flags & UV_HANDLE_CLOSED);
448 if (stream->connect_req) {
449 uv__req_unregister(stream->loop, stream->connect_req);
450 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
451 stream->connect_req = NULL;
454 uv__stream_flush_write_queue(stream, UV_ECANCELED);
455 uv__write_callbacks(stream);
457 if (stream->shutdown_req) {
458 /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
459 * fait accompli at this point. Maybe we should revisit this in v0.11.
460 * A possible reason for leaving it unchanged is that it informs the
461 * callee that the handle has been destroyed.
463 uv__req_unregister(stream->loop, stream->shutdown_req);
464 stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
465 stream->shutdown_req = NULL;
468 assert(stream->write_queue_size == 0);
472 /* Implements a best effort approach to mitigating accept() EMFILE errors.
473 * We have a spare file descriptor stashed away that we close to get below
474 * the EMFILE limit. Next, we accept all pending connections and close them
475 * immediately to signal the clients that we're overloaded - and we are, but
476 * we still keep on trucking.
478 * There is one caveat: it's not reliable in a multi-threaded environment.
479 * The file descriptor limit is per process. Our party trick fails if another
480 * thread opens a file or creates a socket in the time window between us
481 * calling close() and accept().
483 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
487 if (loop->emfile_fd == -1)
490 uv__close(loop->emfile_fd);
491 loop->emfile_fd = -1;
494 err = uv__accept(accept_fd);
497 } while (err >= 0 || err == UV_EINTR);
499 emfile_fd = uv__open_cloexec("/", O_RDONLY);
501 loop->emfile_fd = emfile_fd;
507 #if defined(UV_HAVE_KQUEUE)
508 # define UV_DEC_BACKLOG(w) w->rcount--;
510 # define UV_DEC_BACKLOG(w) /* no-op */
511 #endif /* defined(UV_HAVE_KQUEUE) */
514 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
518 stream = container_of(w, uv_stream_t, io_watcher);
519 assert(events & POLLIN);
520 assert(stream->accepted_fd == -1);
521 assert(!(stream->flags & UV_HANDLE_CLOSING));
523 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
525 /* connection_cb can close the server socket while we're
526 * in the loop so check it on each iteration.
528 while (uv__stream_fd(stream) != -1) {
529 assert(stream->accepted_fd == -1);
531 #if defined(UV_HAVE_KQUEUE)
534 #endif /* defined(UV_HAVE_KQUEUE) */
536 err = uv__accept(uv__stream_fd(stream));
538 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
539 return; /* Not an error. */
541 if (err == UV_ECONNABORTED)
542 continue; /* Ignore. Nothing we can do about that. */
544 if (err == UV_EMFILE || err == UV_ENFILE) {
545 err = uv__emfile_trick(loop, uv__stream_fd(stream));
546 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
550 stream->connection_cb(stream, err);
555 stream->accepted_fd = err;
556 stream->connection_cb(stream, 0);
558 if (stream->accepted_fd != -1) {
559 /* The user hasn't yet accepted called uv_accept() */
560 uv__io_stop(loop, &stream->io_watcher, POLLIN);
564 if (stream->type == UV_TCP &&
565 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
566 /* Give other processes a chance to accept connections. */
567 struct timespec timeout = { 0, 1 };
568 nanosleep(&timeout, NULL);
574 #undef UV_DEC_BACKLOG
577 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
580 assert(server->loop == client->loop);
582 if (server->accepted_fd == -1)
585 switch (client->type) {
588 err = uv__stream_open(client,
590 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
592 /* TODO handle error */
593 uv__close(server->accepted_fd);
599 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
601 uv__close(server->accepted_fd);
610 client->flags |= UV_HANDLE_BOUND;
613 /* Process queued fds */
614 if (server->queued_fds != NULL) {
615 uv__stream_queued_fds_t* queued_fds;
617 queued_fds = server->queued_fds;
620 server->accepted_fd = queued_fds->fds[0];
623 assert(queued_fds->offset > 0);
624 if (--queued_fds->offset == 0) {
625 uv__free(queued_fds);
626 server->queued_fds = NULL;
629 memmove(queued_fds->fds,
631 queued_fds->offset * sizeof(*queued_fds->fds));
634 server->accepted_fd = -1;
636 uv__io_start(server->loop, &server->io_watcher, POLLIN);
642 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
645 switch (stream->type) {
647 err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
651 err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
659 uv__handle_start(stream);
665 static void uv__drain(uv_stream_t* stream) {
669 assert(QUEUE_EMPTY(&stream->write_queue));
670 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
671 uv__stream_osx_interrupt_select(stream);
674 if ((stream->flags & UV_HANDLE_SHUTTING) &&
675 !(stream->flags & UV_HANDLE_CLOSING) &&
676 !(stream->flags & UV_HANDLE_SHUT)) {
677 assert(stream->shutdown_req);
679 req = stream->shutdown_req;
680 stream->shutdown_req = NULL;
681 stream->flags &= ~UV_HANDLE_SHUTTING;
682 uv__req_unregister(stream->loop, req);
685 if (shutdown(uv__stream_fd(stream), SHUT_WR))
686 err = UV__ERR(errno);
689 stream->flags |= UV_HANDLE_SHUT;
697 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
699 return write(fd, vec->iov_base, vec->iov_len);
701 return writev(fd, vec, n);
705 static size_t uv__write_req_size(uv_write_t* req) {
708 assert(req->bufs != NULL);
709 size = uv__count_bufs(req->bufs + req->write_index,
710 req->nbufs - req->write_index);
711 assert(req->handle->write_queue_size >= size);
717 /* Returns 1 if all write request data has been written, or 0 if there is still
718 * more data to write.
720 * Note: the return value only says something about the *current* request.
721 * There may still be other write requests sitting in the queue.
723 static int uv__write_req_update(uv_stream_t* stream,
729 assert(n <= stream->write_queue_size);
730 stream->write_queue_size -= n;
732 buf = req->bufs + req->write_index;
735 len = n < buf->len ? n : buf->len;
738 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
742 req->write_index = buf - req->bufs;
744 return req->write_index == req->nbufs;
748 static void uv__write_req_finish(uv_write_t* req) {
749 uv_stream_t* stream = req->handle;
751 /* Pop the req off tcp->write_queue. */
752 QUEUE_REMOVE(&req->queue);
754 /* Only free when there was no error. On error, we touch up write_queue_size
755 * right before making the callback. The reason we don't do that right away
756 * is that a write_queue_size > 0 is our only way to signal to the user that
757 * they should stop writing - which they should if we got an error. Something
758 * to revisit in future revisions of the libuv API.
760 if (req->error == 0) {
761 if (req->bufs != req->bufsml)
766 /* Add it to the write_completed_queue where it will have its
767 * callback called in the near future.
769 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
770 uv__io_feed(stream->loop, &stream->io_watcher);
774 static int uv__handle_fd(uv_handle_t* handle) {
775 switch (handle->type) {
778 return ((uv_stream_t*) handle)->io_watcher.fd;
781 return ((uv_udp_t*) handle)->io_watcher.fd;
788 static int uv__try_write(uv_stream_t* stream,
789 const uv_buf_t bufs[],
791 uv_stream_t* send_handle) {
798 * Cast to iovec. We had to have our own uv_buf_t instead of iovec
799 * because Windows's WSABUF is not an iovec.
801 iov = (struct iovec*) bufs;
804 iovmax = uv__getiovmax();
806 /* Limit iov count to avoid EINVALs from writev() */
811 * Now do the actual writev. Note that we've been updating the pointers
812 * inside the iov each time we write. So there is no need to offset it.
814 if (send_handle != NULL) {
817 struct cmsghdr *cmsg;
820 struct cmsghdr alias;
823 if (uv__is_closing(send_handle))
826 fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
828 memset(&scratch, 0, sizeof(scratch));
830 assert(fd_to_send >= 0);
835 msg.msg_iovlen = iovcnt;
838 msg.msg_control = &scratch.alias;
839 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
841 cmsg = CMSG_FIRSTHDR(&msg);
842 cmsg->cmsg_level = SOL_SOCKET;
843 cmsg->cmsg_type = SCM_RIGHTS;
844 cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
846 /* silence aliasing warning */
848 void* pv = CMSG_DATA(cmsg);
854 n = sendmsg(uv__stream_fd(stream), &msg, 0);
855 while (n == -1 && errno == EINTR);
858 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
859 while (n == -1 && errno == EINTR);
865 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
869 /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
870 * have a bug where a race condition causes the kernel to return EPROTOTYPE
871 * because the socket isn't fully constructed. It's probably the result of
872 * the peer closing the connection and that is why libuv translates it to
873 * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
874 * away but some VPN software causes the same behavior except the error is
875 * permanent, not transient, turning the retry mechanism into an infinite
876 * loop. See https://github.com/libuv/libuv/pull/482.
878 if (errno == EPROTOTYPE)
879 return UV_ECONNRESET;
880 #endif /* __APPLE__ */
882 return UV__ERR(errno);
885 static void uv__write(uv_stream_t* stream) {
890 assert(uv__stream_fd(stream) >= 0);
893 if (QUEUE_EMPTY(&stream->write_queue))
896 q = QUEUE_HEAD(&stream->write_queue);
897 req = QUEUE_DATA(q, uv_write_t, queue);
898 assert(req->handle == stream);
900 n = uv__try_write(stream,
901 &(req->bufs[req->write_index]),
902 req->nbufs - req->write_index,
905 /* Ensure the handle isn't sent again in case this is a partial write. */
907 req->send_handle = NULL;
908 if (uv__write_req_update(stream, req, n)) {
909 uv__write_req_finish(req);
910 return; /* TODO(bnoordhuis) Start trying to write the next request. */
912 } else if (n != UV_EAGAIN)
915 /* If this is a blocking stream, try again. */
916 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
919 /* We're not done. */
920 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
922 /* Notify select() thread about state change */
923 uv__stream_osx_interrupt_select(stream);
929 /* XXX(jwn): this must call uv__stream_flush_write_queue(stream, n) here, since we won't generate any more events */
930 uv__write_req_finish(req);
931 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
932 uv__stream_osx_interrupt_select(stream);
936 static void uv__write_callbacks(uv_stream_t* stream) {
941 if (QUEUE_EMPTY(&stream->write_completed_queue))
944 QUEUE_MOVE(&stream->write_completed_queue, &pq);
946 while (!QUEUE_EMPTY(&pq)) {
947 /* Pop a req off write_completed_queue. */
949 req = QUEUE_DATA(q, uv_write_t, queue);
951 uv__req_unregister(stream->loop, req);
953 if (req->bufs != NULL) {
954 stream->write_queue_size -= uv__write_req_size(req);
955 if (req->bufs != req->bufsml)
960 /* NOTE: call callback AFTER freeing the request data. */
962 req->cb(req, req->error);
967 uv_handle_type uv__handle_type(int fd) {
968 struct sockaddr_storage ss;
973 memset(&ss, 0, sizeof(ss));
976 if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
977 return UV_UNKNOWN_HANDLE;
981 if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
982 return UV_UNKNOWN_HANDLE;
984 if (type == SOCK_STREAM) {
985 #if defined(_AIX) || defined(__DragonFly__)
986 /* on AIX/DragonFly the getsockname call returns an empty sa structure
987 * for sockets of type AF_UNIX. For all other types it will
988 * return a properly filled in structure.
991 return UV_NAMED_PIPE;
993 switch (ss.ss_family) {
995 return UV_NAMED_PIPE;
1002 if (type == SOCK_DGRAM &&
1003 (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
1006 return UV_UNKNOWN_HANDLE;
1010 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
1011 stream->flags |= UV_HANDLE_READ_EOF;
1012 stream->flags &= ~UV_HANDLE_READING;
1013 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1014 uv__handle_stop(stream);
1015 uv__stream_osx_interrupt_select(stream);
1016 stream->read_cb(stream, UV_EOF, buf);
1020 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
1021 uv__stream_queued_fds_t* queued_fds;
1022 unsigned int queue_size;
1024 queued_fds = stream->queued_fds;
1025 if (queued_fds == NULL) {
1027 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1028 sizeof(*queued_fds));
1029 if (queued_fds == NULL)
1031 queued_fds->size = queue_size;
1032 queued_fds->offset = 0;
1033 stream->queued_fds = queued_fds;
1036 } else if (queued_fds->size == queued_fds->offset) {
1037 queue_size = queued_fds->size + 8;
1038 queued_fds = uv__realloc(queued_fds,
1039 (queue_size - 1) * sizeof(*queued_fds->fds) +
1040 sizeof(*queued_fds));
1043 * Allocation failure, report back.
1044 * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1046 if (queued_fds == NULL)
1048 queued_fds->size = queue_size;
1049 stream->queued_fds = queued_fds;
1052 /* Put fd in a queue */
1053 queued_fds->fds[queued_fds->offset++] = fd;
1059 #if defined(__PASE__)
1060 /* on IBMi PASE the control message length can not exceed 256. */
1061 # define UV__CMSG_FD_COUNT 60
1063 # define UV__CMSG_FD_COUNT 64
1065 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1068 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1069 struct cmsghdr* cmsg;
1071 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1080 if (cmsg->cmsg_type != SCM_RIGHTS) {
1081 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1086 /* silence aliasing warning */
1087 pv = CMSG_DATA(cmsg);
1090 /* Count available fds */
1091 start = (char*) cmsg;
1092 end = (char*) cmsg + cmsg->cmsg_len;
1094 while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1096 assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1098 for (i = 0; i < count; i++) {
1099 /* Already has accepted fd, queue now */
1100 if (stream->accepted_fd != -1) {
1101 err = uv__stream_queue_fd(stream, pi[i]);
1104 for (; i < count; i++)
1109 stream->accepted_fd = pi[i];
1119 # pragma clang diagnostic push
1120 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1121 # pragma clang diagnostic ignored "-Wvla-extension"
1124 static void uv__read(uv_stream_t* stream) {
1128 char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1133 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1135 /* Prevent loop starvation when the data comes in as fast as (or faster than)
1136 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1140 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1142 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1143 * tcp->read_cb is NULL or not?
1145 while (stream->read_cb
1146 && (stream->flags & UV_HANDLE_READING)
1148 assert(stream->alloc_cb != NULL);
1150 buf = uv_buf_init(NULL, 0);
1151 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1152 if (buf.base == NULL || buf.len == 0) {
1153 /* User indicates it can't or won't handle the read. */
1154 stream->read_cb(stream, UV_ENOBUFS, &buf);
1158 assert(buf.base != NULL);
1159 assert(uv__stream_fd(stream) >= 0);
1163 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1165 while (nread < 0 && errno == EINTR);
1167 /* ipc uses recvmsg */
1169 msg.msg_iov = (struct iovec*) &buf;
1171 msg.msg_name = NULL;
1172 msg.msg_namelen = 0;
1173 /* Set up to receive a descriptor even if one isn't in the message */
1174 msg.msg_controllen = sizeof(cmsg_space);
1175 msg.msg_control = cmsg_space;
1178 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1180 while (nread < 0 && errno == EINTR);
1185 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1186 /* Wait for the next one. */
1187 if (stream->flags & UV_HANDLE_READING) {
1188 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1189 uv__stream_osx_interrupt_select(stream);
1191 stream->read_cb(stream, 0, &buf);
1192 #if defined(__CYGWIN__) || defined(__MSYS__)
1193 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1194 uv__stream_eof(stream, &buf);
1198 /* Error. User should call uv_close(). */
1199 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1200 stream->read_cb(stream, UV__ERR(errno), &buf);
1201 if (stream->flags & UV_HANDLE_READING) {
1202 stream->flags &= ~UV_HANDLE_READING;
1203 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1204 uv__handle_stop(stream);
1205 uv__stream_osx_interrupt_select(stream);
1209 } else if (nread == 0) {
1210 uv__stream_eof(stream, &buf);
1213 /* Successful read */
1214 ssize_t buflen = buf.len;
1217 err = uv__stream_recv_cmsg(stream, &msg);
1219 stream->read_cb(stream, err, &buf);
1224 #if defined(__MVS__)
1225 if (is_ipc && msg.msg_controllen > 0) {
1233 msg.msg_iov = (struct iovec*) &blankbuf;
1236 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1237 err = uv__stream_recv_cmsg(stream, &msg);
1239 stream->read_cb(stream, err, &buf);
1243 } while (nread == 0 && msg.msg_controllen > 0);
1247 stream->read_cb(stream, nread, &buf);
1249 /* Return if we didn't fill the buffer, there is no more data to read. */
1250 if (nread < buflen) {
1251 stream->flags |= UV_HANDLE_READ_PARTIAL;
1260 # pragma clang diagnostic pop
1263 #undef UV__CMSG_FD_COUNT
1264 #undef UV__CMSG_FD_SIZE
1267 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1268 assert(stream->type == UV_TCP ||
1269 stream->type == UV_TTY ||
1270 stream->type == UV_NAMED_PIPE);
1272 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1273 stream->flags & UV_HANDLE_SHUT ||
1274 stream->flags & UV_HANDLE_SHUTTING ||
1275 uv__is_closing(stream)) {
1279 assert(uv__stream_fd(stream) >= 0);
1281 /* Initialize request */
1282 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1283 req->handle = stream;
1285 stream->shutdown_req = req;
1286 stream->flags |= UV_HANDLE_SHUTTING;
1287 stream->flags &= ~UV_HANDLE_WRITABLE;
1289 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1290 uv__stream_osx_interrupt_select(stream);
1296 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1297 uv_stream_t* stream;
1299 stream = container_of(w, uv_stream_t, io_watcher);
1301 assert(stream->type == UV_TCP ||
1302 stream->type == UV_NAMED_PIPE ||
1303 stream->type == UV_TTY);
1304 assert(!(stream->flags & UV_HANDLE_CLOSING));
1306 if (stream->connect_req) {
1307 uv__stream_connect(stream);
1311 assert(uv__stream_fd(stream) >= 0);
1313 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1314 if (events & (POLLIN | POLLERR | POLLHUP))
1317 if (uv__stream_fd(stream) == -1)
1318 return; /* read_cb closed stream. */
1320 /* Short-circuit iff POLLHUP is set, the user is still interested in read
1321 * events and uv__read() reported a partial read but not EOF. If the EOF
1322 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1323 * have to do anything. If the partial read flag is not set, we can't
1324 * report the EOF yet because there is still data to read.
1326 if ((events & POLLHUP) &&
1327 (stream->flags & UV_HANDLE_READING) &&
1328 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1329 !(stream->flags & UV_HANDLE_READ_EOF)) {
1330 uv_buf_t buf = { NULL, 0 };
1331 uv__stream_eof(stream, &buf);
1334 if (uv__stream_fd(stream) == -1)
1335 return; /* read_cb closed stream. */
1337 if (events & (POLLOUT | POLLERR | POLLHUP)) {
1339 uv__write_callbacks(stream);
1341 /* Write queue drained. */
1342 if (QUEUE_EMPTY(&stream->write_queue))
1349 * We get called here from directly following a call to connect(2).
1350 * In order to determine if we've errored out or succeeded must call
1353 static void uv__stream_connect(uv_stream_t* stream) {
1355 uv_connect_t* req = stream->connect_req;
1356 socklen_t errorsize = sizeof(int);
1358 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1361 if (stream->delayed_error) {
1362 /* To smooth over the differences between unixes errors that
1363 * were reported synchronously on the first connect can be delayed
1364 * until the next tick--which is now.
1366 error = stream->delayed_error;
1367 stream->delayed_error = 0;
1369 /* Normal situation: we need to get the socket error from the kernel. */
1370 assert(uv__stream_fd(stream) >= 0);
1371 getsockopt(uv__stream_fd(stream),
1376 error = UV__ERR(error);
1379 if (error == UV__ERR(EINPROGRESS))
1382 stream->connect_req = NULL;
1383 uv__req_unregister(stream->loop, req);
1385 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1386 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1390 req->cb(req, error);
1392 if (uv__stream_fd(stream) == -1)
1396 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1397 uv__write_callbacks(stream);
1402 static int uv__check_before_write(uv_stream_t* stream,
1404 uv_stream_t* send_handle) {
1406 assert((stream->type == UV_TCP ||
1407 stream->type == UV_NAMED_PIPE ||
1408 stream->type == UV_TTY) &&
1409 "uv_write (unix) does not yet support other types of streams");
1411 if (uv__stream_fd(stream) < 0)
1414 if (!(stream->flags & UV_HANDLE_WRITABLE))
1417 if (send_handle != NULL) {
1418 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1421 /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1422 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1423 * evaluates to a function that operates on a uv_stream_t with a couple of
1424 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1425 * which works but only by accident.
1427 if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1430 #if defined(__CYGWIN__) || defined(__MSYS__)
1431 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1432 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1440 int uv_write2(uv_write_t* req,
1441 uv_stream_t* stream,
1442 const uv_buf_t bufs[],
1444 uv_stream_t* send_handle,
1449 err = uv__check_before_write(stream, nbufs, send_handle);
1453 /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1454 * it means there are error-state requests in the write_completed_queue that
1455 * will touch up write_queue_size later, see also uv__write_req_finish().
1456 * We could check that write_queue is empty instead but that implies making
1457 * a write() syscall when we know that the handle is in error mode.
1459 empty_queue = (stream->write_queue_size == 0);
1461 /* Initialize the req */
1462 uv__req_init(stream->loop, req, UV_WRITE);
1464 req->handle = stream;
1466 req->send_handle = send_handle;
1467 QUEUE_INIT(&req->queue);
1469 req->bufs = req->bufsml;
1470 if (nbufs > ARRAY_SIZE(req->bufsml))
1471 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1473 if (req->bufs == NULL)
1476 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1478 req->write_index = 0;
1479 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1481 /* Append the request to write_queue. */
1482 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1484 /* If the queue was empty when this function began, we should attempt to
1485 * do the write immediately. Otherwise start the write_watcher and wait
1486 * for the fd to become writable.
1488 if (stream->connect_req) {
1489 /* Still connecting, do nothing. */
1491 else if (empty_queue) {
1496 * blocking streams should never have anything in the queue.
1497 * if this assert fires then somehow the blocking stream isn't being
1498 * sufficiently flushed in uv__write.
1500 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1501 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1502 uv__stream_osx_interrupt_select(stream);
1509 /* The buffers to be written must remain valid until the callback is called.
1510 * This is not required for the uv_buf_t array.
1512 int uv_write(uv_write_t* req,
1513 uv_stream_t* handle,
1514 const uv_buf_t bufs[],
1517 return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1521 int uv_try_write(uv_stream_t* stream,
1522 const uv_buf_t bufs[],
1523 unsigned int nbufs) {
1524 return uv_try_write2(stream, bufs, nbufs, NULL);
1528 int uv_try_write2(uv_stream_t* stream,
1529 const uv_buf_t bufs[],
1531 uv_stream_t* send_handle) {
1534 /* Connecting or already writing some data */
1535 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1538 err = uv__check_before_write(stream, nbufs, NULL);
1542 return uv__try_write(stream, bufs, nbufs, send_handle);
1546 int uv__read_start(uv_stream_t* stream,
1547 uv_alloc_cb alloc_cb,
1548 uv_read_cb read_cb) {
1549 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1550 stream->type == UV_TTY);
1552 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1553 * just expresses the desired state of the user. */
1554 stream->flags |= UV_HANDLE_READING;
1555 stream->flags &= ~UV_HANDLE_READ_EOF;
1557 /* TODO: try to do the read inline? */
1558 assert(uv__stream_fd(stream) >= 0);
1561 stream->read_cb = read_cb;
1562 stream->alloc_cb = alloc_cb;
1564 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1565 uv__handle_start(stream);
1566 uv__stream_osx_interrupt_select(stream);
1572 int uv_read_stop(uv_stream_t* stream) {
1573 if (!(stream->flags & UV_HANDLE_READING))
1576 stream->flags &= ~UV_HANDLE_READING;
1577 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1578 uv__handle_stop(stream);
1579 uv__stream_osx_interrupt_select(stream);
1581 stream->read_cb = NULL;
1582 stream->alloc_cb = NULL;
1587 int uv_is_readable(const uv_stream_t* stream) {
1588 return !!(stream->flags & UV_HANDLE_READABLE);
1592 int uv_is_writable(const uv_stream_t* stream) {
1593 return !!(stream->flags & UV_HANDLE_WRITABLE);
1597 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
1598 int uv___stream_fd(const uv_stream_t* handle) {
1599 const uv__stream_select_t* s;
1601 assert(handle->type == UV_TCP ||
1602 handle->type == UV_TTY ||
1603 handle->type == UV_NAMED_PIPE);
1609 return handle->io_watcher.fd;
1611 #endif /* defined(__APPLE__) */
1614 void uv__stream_close(uv_stream_t* handle) {
1616 uv__stream_queued_fds_t* queued_fds;
1618 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
1619 /* Terminate select loop first */
1620 if (handle->select != NULL) {
1621 uv__stream_select_t* s;
1625 uv_sem_post(&s->close_sem);
1626 uv_sem_post(&s->async_sem);
1627 uv__stream_osx_interrupt_select(handle);
1628 uv_thread_join(&s->thread);
1629 uv_sem_destroy(&s->close_sem);
1630 uv_sem_destroy(&s->async_sem);
1631 uv__close(s->fake_fd);
1632 uv__close(s->int_fd);
1633 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1635 handle->select = NULL;
1637 #endif /* defined(__APPLE__) */
1639 uv__io_close(handle->loop, &handle->io_watcher);
1640 uv_read_stop(handle);
1641 uv__handle_stop(handle);
1642 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1644 if (handle->io_watcher.fd != -1) {
1645 /* Don't close stdio file descriptors. Nothing good comes from it. */
1646 if (handle->io_watcher.fd > STDERR_FILENO)
1647 uv__close(handle->io_watcher.fd);
1648 handle->io_watcher.fd = -1;
1651 if (handle->accepted_fd != -1) {
1652 uv__close(handle->accepted_fd);
1653 handle->accepted_fd = -1;
1656 /* Close all queued fds */
1657 if (handle->queued_fds != NULL) {
1658 queued_fds = handle->queued_fds;
1659 for (i = 0; i < queued_fds->offset; i++)
1660 uv__close(queued_fds->fds[i]);
1661 uv__free(handle->queued_fds);
1662 handle->queued_fds = NULL;
1665 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1669 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1670 /* Don't need to check the file descriptor, uv__nonblock()
1671 * will fail with EBADF if it's not valid.
1673 return uv__nonblock(uv__stream_fd(handle), !blocking);