1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
31 #include <sys/types.h>
32 #include <sys/socket.h>
36 #include <limits.h> /* IOV_MAX */
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
46 struct uv__stream_select_s {
57 #endif /* defined(__APPLE__) */
59 static void uv__stream_connect(uv_stream_t*);
60 static void uv__write(uv_stream_t* stream);
61 static void uv__read(uv_stream_t* stream);
62 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
63 static size_t uv__write_req_size(uv_write_t* req);
66 /* Used by the accept() EMFILE party trick. */
67 static int uv__open_cloexec(const char* path, int flags) {
71 #if defined(__linux__)
72 fd = open(path, flags | UV__O_CLOEXEC);
79 /* O_CLOEXEC not supported. */
82 fd = open(path, flags);
86 err = uv__cloexec(fd, 1);
96 static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) {
101 for (i = 0; i < nbufs; i++)
102 bytes += bufs[i].len;
108 void uv__stream_init(uv_loop_t* loop,
110 uv_handle_type type) {
113 uv__handle_init(loop, (uv_handle_t*)stream, type);
114 stream->read_cb = NULL;
115 stream->read2_cb = NULL;
116 stream->alloc_cb = NULL;
117 stream->close_cb = NULL;
118 stream->connection_cb = NULL;
119 stream->connect_req = NULL;
120 stream->shutdown_req = NULL;
121 stream->accepted_fd = -1;
122 stream->delayed_error = 0;
123 QUEUE_INIT(&stream->write_queue);
124 QUEUE_INIT(&stream->write_completed_queue);
125 stream->write_queue_size = 0;
127 if (loop->emfile_fd == -1) {
128 err = uv__open_cloexec("/", O_RDONLY);
130 loop->emfile_fd = err;
133 #if defined(__APPLE__)
134 stream->select = NULL;
135 #endif /* defined(__APPLE_) */
137 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
141 #if defined(__APPLE__)
142 static void uv__stream_osx_select(void* arg) {
144 uv__stream_select_t* s;
163 /* Terminate on semaphore */
164 if (uv_sem_trywait(&s->close_sem) == 0)
167 /* Watch fd using select(2) */
171 if (uv_is_readable(stream))
173 if (uv_is_writable(stream))
175 FD_SET(s->int_fd, &sread);
177 /* Wait indefinitely for fd events */
178 r = select(max_fd + 1, &sread, &swrite, NULL, NULL);
183 /* XXX: Possible?! */
187 /* Ignore timeouts */
191 /* Empty socketpair's buffer in case of interruption */
192 if (FD_ISSET(s->int_fd, &sread))
194 r = read(s->int_fd, buf, sizeof(buf));
196 if (r == sizeof(buf))
202 if (errno == EAGAIN || errno == EWOULDBLOCK)
213 if (FD_ISSET(fd, &sread))
214 events |= UV__POLLIN;
215 if (FD_ISSET(fd, &swrite))
216 events |= UV__POLLOUT;
218 assert(events != 0 || FD_ISSET(s->int_fd, &sread));
220 ACCESS_ONCE(int, s->events) = events;
222 uv_async_send(&s->async);
223 uv_sem_wait(&s->async_sem);
225 /* Should be processed at this stage */
226 assert((s->events == 0) || (stream->flags & UV_CLOSING));
232 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
233 /* Notify select() thread about state change */
234 uv__stream_select_t* s;
239 /* Interrupt select() loop
240 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
241 * emit read event on other side
244 r = write(s->fake_fd, "x", 1);
245 while (r == -1 && errno == EINTR);
251 static void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
252 uv__stream_select_t* s;
256 s = container_of(handle, uv__stream_select_t, async);
259 /* Get and reset stream's events */
261 ACCESS_ONCE(int, s->events) = 0;
262 uv_sem_post(&s->async_sem);
265 assert(events == (events & (UV__POLLIN | UV__POLLOUT)));
267 /* Invoke callback on event-loop */
268 if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
269 uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
271 if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
272 uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
276 static void uv__stream_osx_cb_close(uv_handle_t* async) {
277 uv__stream_select_t* s;
279 s = container_of(async, uv__stream_select_t, async);
284 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
286 * kqueue doesn't work with some files from /dev mount on osx.
287 * select(2) in separate thread for those fds
290 struct kevent filter[1];
291 struct kevent events[1];
292 struct timespec timeout;
293 uv__stream_select_t* s;
301 perror("(libuv) kqueue()");
305 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
307 /* Use small timeout, because we only want to capture EINVALs */
311 ret = kevent(kq, filter, 1, events, 1, &timeout);
317 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
320 /* At this point we definitely know that this fd won't work with kqueue */
321 s = malloc(sizeof(*s));
328 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
334 s->async.flags |= UV__HANDLE_INTERNAL;
335 uv__handle_unref(&s->async);
337 if (uv_sem_init(&s->close_sem, 0))
340 if (uv_sem_init(&s->async_sem, 0))
343 /* Create fds for io watcher and to interrupt the select() loop. */
344 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
350 if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
360 uv__close(s->fake_fd);
361 uv__close(s->int_fd);
365 uv_sem_destroy(&s->async_sem);
367 uv_sem_destroy(&s->close_sem);
369 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
372 #endif /* defined(__APPLE__) */
375 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
377 stream->flags |= flags;
379 if (stream->type == UV_TCP) {
380 if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
383 /* TODO Use delay the user passed in. */
384 if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
388 stream->io_watcher.fd = fd;
394 void uv__stream_destroy(uv_stream_t* stream) {
398 assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT));
399 assert(stream->flags & UV_CLOSED);
401 if (stream->connect_req) {
402 uv__req_unregister(stream->loop, stream->connect_req);
403 stream->connect_req->cb(stream->connect_req, -ECANCELED);
404 stream->connect_req = NULL;
407 while (!QUEUE_EMPTY(&stream->write_queue)) {
408 q = QUEUE_HEAD(&stream->write_queue);
411 req = QUEUE_DATA(q, uv_write_t, queue);
412 uv__req_unregister(stream->loop, req);
414 if (req->bufs != req->bufsml)
419 req->cb(req, -ECANCELED);
422 while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
423 q = QUEUE_HEAD(&stream->write_completed_queue);
426 req = QUEUE_DATA(q, uv_write_t, queue);
427 uv__req_unregister(stream->loop, req);
429 if (req->bufs != NULL) {
430 stream->write_queue_size -= uv__write_req_size(req);
431 if (req->bufs != req->bufsml)
437 req->cb(req, req->error);
440 if (stream->shutdown_req) {
441 /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
442 * fait accompli at this point. Maybe we should revisit this in v0.11.
443 * A possible reason for leaving it unchanged is that it informs the
444 * callee that the handle has been destroyed.
446 uv__req_unregister(stream->loop, stream->shutdown_req);
447 stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
448 stream->shutdown_req = NULL;
453 /* Implements a best effort approach to mitigating accept() EMFILE errors.
454 * We have a spare file descriptor stashed away that we close to get below
455 * the EMFILE limit. Next, we accept all pending connections and close them
456 * immediately to signal the clients that we're overloaded - and we are, but
457 * we still keep on trucking.
459 * There is one caveat: it's not reliable in a multi-threaded environment.
460 * The file descriptor limit is per process. Our party trick fails if another
461 * thread opens a file or creates a socket in the time window between us
462 * calling close() and accept().
464 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
467 if (loop->emfile_fd == -1)
470 uv__close(loop->emfile_fd);
471 loop->emfile_fd = -1;
474 err = uv__accept(accept_fd);
477 } while (err >= 0 || err == -EINTR);
479 SAVE_ERRNO(loop->emfile_fd = uv__open_cloexec("/", O_RDONLY));
484 #if defined(UV_HAVE_KQUEUE)
485 # define UV_DEC_BACKLOG(w) w->rcount--;
487 # define UV_DEC_BACKLOG(w) /* no-op */
488 #endif /* defined(UV_HAVE_KQUEUE) */
491 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
495 stream = container_of(w, uv_stream_t, io_watcher);
496 assert(events == UV__POLLIN);
497 assert(stream->accepted_fd == -1);
498 assert(!(stream->flags & UV_CLOSING));
500 uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
502 /* connection_cb can close the server socket while we're
503 * in the loop so check it on each iteration.
505 while (uv__stream_fd(stream) != -1) {
506 assert(stream->accepted_fd == -1);
508 #if defined(UV_HAVE_KQUEUE)
511 #endif /* defined(UV_HAVE_KQUEUE) */
513 err = uv__accept(uv__stream_fd(stream));
515 if (err == -EAGAIN || err == -EWOULDBLOCK)
516 return; /* Not an error. */
518 if (err == -ECONNABORTED)
519 continue; /* Ignore. Nothing we can do about that. */
521 if (err == -EMFILE || err == -ENFILE) {
522 err = uv__emfile_trick(loop, uv__stream_fd(stream));
523 if (err == -EAGAIN || err == -EWOULDBLOCK)
527 stream->connection_cb(stream, err);
532 stream->accepted_fd = err;
533 stream->connection_cb(stream, 0);
535 if (stream->accepted_fd != -1) {
536 /* The user hasn't yet accepted called uv_accept() */
537 uv__io_stop(loop, &stream->io_watcher, UV__POLLIN);
541 if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) {
542 /* Give other processes a chance to accept connections. */
543 struct timespec timeout = { 0, 1 };
544 nanosleep(&timeout, NULL);
550 #undef UV_DEC_BACKLOG
553 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
556 /* TODO document this */
557 assert(server->loop == client->loop);
559 if (server->accepted_fd == -1)
562 switch (client->type) {
565 err = uv__stream_open(client,
567 UV_STREAM_READABLE | UV_STREAM_WRITABLE);
569 /* TODO handle error */
570 uv__close(server->accepted_fd);
571 server->accepted_fd = -1;
577 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
579 uv__close(server->accepted_fd);
580 server->accepted_fd = -1;
589 uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
590 server->accepted_fd = -1;
595 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
599 switch (stream->type) {
601 err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
605 err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
613 uv__handle_start(stream);
619 static void uv__drain(uv_stream_t* stream) {
623 assert(QUEUE_EMPTY(&stream->write_queue));
624 uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
627 if ((stream->flags & UV_STREAM_SHUTTING) &&
628 !(stream->flags & UV_CLOSING) &&
629 !(stream->flags & UV_STREAM_SHUT)) {
630 assert(stream->shutdown_req);
632 req = stream->shutdown_req;
633 stream->shutdown_req = NULL;
634 stream->flags &= ~UV_STREAM_SHUTTING;
635 uv__req_unregister(stream->loop, req);
638 if (shutdown(uv__stream_fd(stream), SHUT_WR))
642 stream->flags |= UV_STREAM_SHUT;
650 static size_t uv__write_req_size(uv_write_t* req) {
653 assert(req->bufs != NULL);
654 size = uv_count_bufs(req->bufs + req->write_index,
655 req->nbufs - req->write_index);
656 assert(req->handle->write_queue_size >= size);
662 static void uv__write_req_finish(uv_write_t* req) {
663 uv_stream_t* stream = req->handle;
665 /* Pop the req off tcp->write_queue. */
666 QUEUE_REMOVE(&req->queue);
668 /* Only free when there was no error. On error, we touch up write_queue_size
669 * right before making the callback. The reason we don't do that right away
670 * is that a write_queue_size > 0 is our only way to signal to the user that
671 * they should stop writing - which they should if we got an error. Something
672 * to revisit in future revisions of the libuv API.
674 if (req->error == 0) {
675 if (req->bufs != req->bufsml)
680 /* Add it to the write_completed_queue where it will have its
681 * callback called in the near future.
683 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
684 uv__io_feed(stream->loop, &stream->io_watcher);
688 static int uv__handle_fd(uv_handle_t* handle) {
689 switch (handle->type) {
692 return ((uv_stream_t*) handle)->io_watcher.fd;
695 return ((uv_udp_t*) handle)->io_watcher.fd;
702 static int uv__getiovmax() {
705 #elif defined(_SC_IOV_MAX)
706 static int iovmax = -1;
708 iovmax = sysconf(_SC_IOV_MAX);
715 static void uv__write(uv_stream_t* stream) {
725 assert(uv__stream_fd(stream) >= 0);
727 if (QUEUE_EMPTY(&stream->write_queue))
730 q = QUEUE_HEAD(&stream->write_queue);
731 req = QUEUE_DATA(q, uv_write_t, queue);
732 assert(req->handle == stream);
735 * Cast to iovec. We had to have our own uv_buf_t instead of iovec
736 * because Windows's WSABUF is not an iovec.
738 assert(sizeof(uv_buf_t) == sizeof(struct iovec));
739 iov = (struct iovec*) &(req->bufs[req->write_index]);
740 iovcnt = req->nbufs - req->write_index;
742 iovmax = uv__getiovmax();
744 /* Limit iov count to avoid EINVALs from writev() */
749 * Now do the actual writev. Note that we've been updating the pointers
750 * inside the iov each time we write. So there is no need to offset it.
753 if (req->send_handle) {
756 struct cmsghdr *cmsg;
757 int fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
759 assert(fd_to_send >= 0);
764 msg.msg_iovlen = iovcnt;
767 msg.msg_control = (void*) scratch;
768 msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
770 cmsg = CMSG_FIRSTHDR(&msg);
771 cmsg->cmsg_level = SOL_SOCKET;
772 cmsg->cmsg_type = SCM_RIGHTS;
773 cmsg->cmsg_len = msg.msg_controllen;
775 /* silence aliasing warning */
777 void* pv = CMSG_DATA(cmsg);
783 n = sendmsg(uv__stream_fd(stream), &msg, 0);
785 while (n == -1 && errno == EINTR);
789 n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
791 n = writev(uv__stream_fd(stream), iov, iovcnt);
794 while (n == -1 && errno == EINTR);
798 if (errno != EAGAIN && errno != EWOULDBLOCK) {
801 uv__write_req_finish(req);
802 uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
803 if (!uv__io_active(&stream->io_watcher, UV__POLLIN))
804 uv__handle_stop(stream);
806 } else if (stream->flags & UV_STREAM_BLOCKING) {
807 /* If this is a blocking stream, try again. */
811 /* Successful write */
814 uv_buf_t* buf = &(req->bufs[req->write_index]);
815 size_t len = buf->len;
817 assert(req->write_index < req->nbufs);
819 if ((size_t)n < len) {
822 stream->write_queue_size -= n;
825 /* There is more to write. */
826 if (stream->flags & UV_STREAM_BLOCKING) {
828 * If we're blocking then we should not be enabling the write
829 * watcher - instead we need to try again.
833 /* Break loop and ensure the watcher is pending. */
838 /* Finished writing the buf at index req->write_index. */
841 assert((size_t)n >= len);
844 assert(stream->write_queue_size >= len);
845 stream->write_queue_size -= len;
847 if (req->write_index == req->nbufs) {
848 /* Then we're done! */
850 uv__write_req_finish(req);
851 /* TODO: start trying to write the next request. */
858 /* Either we've counted n down to zero or we've got EAGAIN. */
859 assert(n == 0 || n == -1);
861 /* Only non-blocking streams should use the write_watcher. */
862 assert(!(stream->flags & UV_STREAM_BLOCKING));
864 /* We're not done. */
865 uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
869 static void uv__write_callbacks(uv_stream_t* stream) {
873 while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
874 /* Pop a req off write_completed_queue. */
875 q = QUEUE_HEAD(&stream->write_completed_queue);
876 req = QUEUE_DATA(q, uv_write_t, queue);
878 uv__req_unregister(stream->loop, req);
880 if (req->bufs != NULL) {
881 stream->write_queue_size -= uv__write_req_size(req);
882 if (req->bufs != req->bufsml)
887 /* NOTE: call callback AFTER freeing the request data. */
889 req->cb(req, req->error);
892 assert(QUEUE_EMPTY(&stream->write_completed_queue));
894 /* Write queue drained. */
895 if (QUEUE_EMPTY(&stream->write_queue))
900 static uv_handle_type uv__handle_type(int fd) {
901 struct sockaddr_storage ss;
905 memset(&ss, 0, sizeof(ss));
908 if (getsockname(fd, (struct sockaddr*)&ss, &len))
909 return UV_UNKNOWN_HANDLE;
913 if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
914 return UV_UNKNOWN_HANDLE;
916 if (type == SOCK_STREAM) {
917 switch (ss.ss_family) {
919 return UV_NAMED_PIPE;
926 if (type == SOCK_DGRAM &&
927 (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
930 return UV_UNKNOWN_HANDLE;
934 static void uv__stream_read_cb(uv_stream_t* stream,
937 uv_handle_type type) {
938 if (stream->read_cb != NULL)
939 stream->read_cb(stream, status, buf);
941 stream->read2_cb((uv_pipe_t*) stream, status, buf, type);
945 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
946 stream->flags |= UV_STREAM_READ_EOF;
947 uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
948 if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
949 uv__handle_stop(stream);
950 uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
954 static void uv__read(uv_stream_t* stream) {
958 struct cmsghdr* cmsg;
962 stream->flags &= ~UV_STREAM_READ_PARTIAL;
964 /* Prevent loop starvation when the data comes in as fast as (or faster than)
965 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
969 /* XXX: Maybe instead of having UV_STREAM_READING we just test if
970 * tcp->read_cb is NULL or not?
972 while ((stream->read_cb || stream->read2_cb)
973 && (stream->flags & UV_STREAM_READING)
975 assert(stream->alloc_cb != NULL);
977 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
979 /* User indicates it can't or won't handle the read. */
980 uv__stream_read_cb(stream, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
984 assert(buf.base != NULL);
985 assert(uv__stream_fd(stream) >= 0);
987 if (stream->read_cb) {
989 nread = read(uv__stream_fd(stream), buf.base, buf.len);
991 while (nread < 0 && errno == EINTR);
993 assert(stream->read2_cb);
994 /* read2_cb uses recvmsg */
996 msg.msg_iov = (struct iovec*) &buf;
1000 /* Set up to receive a descriptor even if one isn't in the message */
1001 msg.msg_controllen = 64;
1002 msg.msg_control = (void*) cmsg_space;
1005 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1007 while (nread < 0 && errno == EINTR);
1012 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1013 /* Wait for the next one. */
1014 if (stream->flags & UV_STREAM_READING) {
1015 uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
1017 uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE);
1019 /* Error. User should call uv_close(). */
1020 uv__stream_read_cb(stream, -errno, &buf, UV_UNKNOWN_HANDLE);
1021 assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
1022 "stream->read_cb(status=-1) did not call uv_close()");
1025 } else if (nread == 0) {
1026 uv__stream_eof(stream, &buf);
1029 /* Successful read */
1030 ssize_t buflen = buf.len;
1032 if (stream->read_cb) {
1033 stream->read_cb(stream, nread, &buf);
1035 assert(stream->read2_cb);
1038 * XXX: Some implementations can send multiple file descriptors in a
1039 * single message. We should be using CMSG_NXTHDR() to walk the
1040 * chain to get at them all. This would require changing the API to
1041 * hand these back up the caller, is a pain.
1044 for (cmsg = CMSG_FIRSTHDR(&msg);
1045 msg.msg_controllen > 0 && cmsg != NULL;
1046 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1048 if (cmsg->cmsg_type == SCM_RIGHTS) {
1049 if (stream->accepted_fd != -1) {
1050 fprintf(stderr, "(libuv) ignoring extra FD received\n");
1053 /* silence aliasing warning */
1055 void* pv = CMSG_DATA(cmsg);
1057 stream->accepted_fd = *pi;
1061 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1067 if (stream->accepted_fd >= 0) {
1068 stream->read2_cb((uv_pipe_t*) stream,
1071 uv__handle_type(stream->accepted_fd));
1073 stream->read2_cb((uv_pipe_t*) stream, nread, &buf, UV_UNKNOWN_HANDLE);
1077 /* Return if we didn't fill the buffer, there is no more data to read. */
1078 if (nread < buflen) {
1079 stream->flags |= UV_STREAM_READ_PARTIAL;
1087 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1088 assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
1089 "uv_shutdown (unix) only supports uv_handle_t right now");
1091 if (!(stream->flags & UV_STREAM_WRITABLE) ||
1092 stream->flags & UV_STREAM_SHUT ||
1093 stream->flags & UV_CLOSED ||
1094 stream->flags & UV_CLOSING) {
1098 assert(uv__stream_fd(stream) >= 0);
1100 /* Initialize request */
1101 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1102 req->handle = stream;
1104 stream->shutdown_req = req;
1105 stream->flags |= UV_STREAM_SHUTTING;
1107 uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
1113 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1114 uv_stream_t* stream;
1116 stream = container_of(w, uv_stream_t, io_watcher);
1118 assert(stream->type == UV_TCP ||
1119 stream->type == UV_NAMED_PIPE ||
1120 stream->type == UV_TTY);
1121 assert(!(stream->flags & UV_CLOSING));
1123 if (stream->connect_req) {
1124 uv__stream_connect(stream);
1128 assert(uv__stream_fd(stream) >= 0);
1130 /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
1131 if (events & (UV__POLLIN | UV__POLLERR))
1134 if (uv__stream_fd(stream) == -1)
1135 return; /* read_cb closed stream. */
1137 /* Short-circuit iff POLLHUP is set, the user is still interested in read
1138 * events and uv__read() reported a partial read but not EOF. If the EOF
1139 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1140 * have to do anything. If the partial read flag is not set, we can't
1141 * report the EOF yet because there is still data to read.
1143 if ((events & UV__POLLHUP) &&
1144 (stream->flags & UV_STREAM_READING) &&
1145 (stream->flags & UV_STREAM_READ_PARTIAL) &&
1146 !(stream->flags & UV_STREAM_READ_EOF)) {
1147 uv_buf_t buf = { NULL, 0 };
1148 uv__stream_eof(stream, &buf);
1151 if (uv__stream_fd(stream) == -1)
1152 return; /* read_cb closed stream. */
1154 if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) {
1156 uv__write_callbacks(stream);
1162 * We get called here from directly following a call to connect(2).
1163 * In order to determine if we've errored out or succeeded must call
1166 static void uv__stream_connect(uv_stream_t* stream) {
1168 uv_connect_t* req = stream->connect_req;
1169 socklen_t errorsize = sizeof(int);
1171 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1174 if (stream->delayed_error) {
1175 /* To smooth over the differences between unixes errors that
1176 * were reported synchronously on the first connect can be delayed
1177 * until the next tick--which is now.
1179 error = stream->delayed_error;
1180 stream->delayed_error = 0;
1182 /* Normal situation: we need to get the socket error from the kernel. */
1183 assert(uv__stream_fd(stream) >= 0);
1184 getsockopt(uv__stream_fd(stream),
1192 if (error == -EINPROGRESS)
1195 stream->connect_req = NULL;
1196 uv__req_unregister(stream->loop, req);
1197 uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
1200 req->cb(req, error);
1204 int uv_write2(uv_write_t* req,
1205 uv_stream_t* stream,
1206 const uv_buf_t bufs[],
1208 uv_stream_t* send_handle,
1213 assert((stream->type == UV_TCP ||
1214 stream->type == UV_NAMED_PIPE ||
1215 stream->type == UV_TTY) &&
1216 "uv_write (unix) does not yet support other types of streams");
1218 if (uv__stream_fd(stream) < 0)
1222 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1225 /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1226 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1227 * evaluates to a function that operates on a uv_stream_t with a couple of
1228 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1229 * which works but only by accident.
1231 if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1235 /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1236 * it means there are error-state requests in the write_completed_queue that
1237 * will touch up write_queue_size later, see also uv__write_req_finish().
1238 * We chould check that write_queue is empty instead but that implies making
1239 * a write() syscall when we know that the handle is in error mode.
1241 empty_queue = (stream->write_queue_size == 0);
1243 /* Initialize the req */
1244 uv__req_init(stream->loop, req, UV_WRITE);
1246 req->handle = stream;
1248 req->send_handle = send_handle;
1249 QUEUE_INIT(&req->queue);
1251 req->bufs = req->bufsml;
1252 if (nbufs > ARRAY_SIZE(req->bufsml))
1253 req->bufs = malloc(nbufs * sizeof(bufs[0]));
1255 if (req->bufs == NULL)
1258 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1260 req->write_index = 0;
1261 stream->write_queue_size += uv_count_bufs(bufs, nbufs);
1263 /* Append the request to write_queue. */
1264 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1266 /* If the queue was empty when this function began, we should attempt to
1267 * do the write immediately. Otherwise start the write_watcher and wait
1268 * for the fd to become writable.
1270 if (stream->connect_req) {
1271 /* Still connecting, do nothing. */
1273 else if (empty_queue) {
1278 * blocking streams should never have anything in the queue.
1279 * if this assert fires then somehow the blocking stream isn't being
1280 * sufficiently flushed in uv__write.
1282 assert(!(stream->flags & UV_STREAM_BLOCKING));
1283 uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
1290 /* The buffers to be written must remain valid until the callback is called.
1291 * This is not required for the uv_buf_t array.
1293 int uv_write(uv_write_t* req,
1294 uv_stream_t* handle,
1295 const uv_buf_t bufs[],
1298 return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1302 void uv_try_write_cb(uv_write_t* req, int status) {
1303 /* Should not be called */
1308 int uv_try_write(uv_stream_t* stream, const char* buf, size_t size) {
1316 /* Connecting or already writing some data */
1317 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1320 has_pollout = uv__io_active(&stream->io_watcher, UV__POLLOUT);
1322 bufstruct = uv_buf_init((char*) buf, size);
1323 r = uv_write(&req, stream, &bufstruct, 1, uv_try_write_cb);
1327 /* Remove not written bytes from write queue size */
1329 if (req.bufs != NULL)
1330 req_size = uv__write_req_size(&req);
1333 written -= req_size;
1334 stream->write_queue_size -= req_size;
1336 /* Unqueue request, regardless of immediateness */
1337 QUEUE_REMOVE(&req.queue);
1338 uv__req_unregister(stream->loop, &req);
1339 if (req.bufs != req.bufsml)
1343 /* Do not poll for writable, if we wasn't before calling this */
1345 uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
1347 return (int) written;
1351 static int uv__read_start_common(uv_stream_t* stream,
1352 uv_alloc_cb alloc_cb,
1354 uv_read2_cb read2_cb) {
1355 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1356 stream->type == UV_TTY);
1358 if (stream->flags & UV_CLOSING)
1361 /* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just
1362 * expresses the desired state of the user.
1364 stream->flags |= UV_STREAM_READING;
1366 #if defined(__APPLE__)
1367 /* Notify select() thread about state change */
1368 if (stream->select != NULL)
1369 uv__stream_osx_interrupt_select(stream);
1370 #endif /* defined(__APPLE__) */
1372 /* TODO: try to do the read inline? */
1373 /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1374 * not start the IO watcher.
1376 assert(uv__stream_fd(stream) >= 0);
1379 stream->read_cb = read_cb;
1380 stream->read2_cb = read2_cb;
1381 stream->alloc_cb = alloc_cb;
1383 uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
1384 uv__handle_start(stream);
1390 int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
1391 uv_read_cb read_cb) {
1392 return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
1396 int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
1397 uv_read2_cb read_cb) {
1398 return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
1402 int uv_read_stop(uv_stream_t* stream) {
1403 /* Sanity check. We're going to stop the handle unless it's primed for
1404 * writing but that means there should be some kind of write action in
1407 assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) ||
1408 !QUEUE_EMPTY(&stream->write_completed_queue) ||
1409 !QUEUE_EMPTY(&stream->write_queue) ||
1410 stream->shutdown_req != NULL ||
1411 stream->connect_req != NULL);
1413 stream->flags &= ~UV_STREAM_READING;
1414 uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
1415 if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
1416 uv__handle_stop(stream);
1418 #if defined(__APPLE__)
1419 /* Notify select() thread about state change */
1420 if (stream->select != NULL)
1421 uv__stream_osx_interrupt_select(stream);
1422 #endif /* defined(__APPLE__) */
1424 stream->read_cb = NULL;
1425 stream->read2_cb = NULL;
1426 stream->alloc_cb = NULL;
1431 int uv_is_readable(const uv_stream_t* stream) {
1432 return !!(stream->flags & UV_STREAM_READABLE);
1436 int uv_is_writable(const uv_stream_t* stream) {
1437 return !!(stream->flags & UV_STREAM_WRITABLE);
1441 #if defined(__APPLE__)
1442 int uv___stream_fd(uv_stream_t* handle) {
1443 uv__stream_select_t* s;
1445 assert(handle->type == UV_TCP ||
1446 handle->type == UV_TTY ||
1447 handle->type == UV_NAMED_PIPE);
1453 return handle->io_watcher.fd;
1455 #endif /* defined(__APPLE__) */
1458 void uv__stream_close(uv_stream_t* handle) {
1459 #if defined(__APPLE__)
1460 /* Terminate select loop first */
1461 if (handle->select != NULL) {
1462 uv__stream_select_t* s;
1466 uv_sem_post(&s->close_sem);
1467 uv_sem_post(&s->async_sem);
1468 uv__stream_osx_interrupt_select(handle);
1469 uv_thread_join(&s->thread);
1470 uv_sem_destroy(&s->close_sem);
1471 uv_sem_destroy(&s->async_sem);
1472 uv__close(s->fake_fd);
1473 uv__close(s->int_fd);
1474 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1476 handle->select = NULL;
1478 #endif /* defined(__APPLE__) */
1480 uv__io_close(handle->loop, &handle->io_watcher);
1481 uv_read_stop(handle);
1482 uv__handle_stop(handle);
1484 if (handle->io_watcher.fd != -1) {
1485 /* Don't close stdio file descriptors. Nothing good comes from it. */
1486 if (handle->io_watcher.fd > STDERR_FILENO)
1487 uv__close(handle->io_watcher.fd);
1488 handle->io_watcher.fd = -1;
1491 if (handle->accepted_fd != -1) {
1492 uv__close(handle->accepted_fd);
1493 handle->accepted_fd = -1;
1496 assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
1500 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1501 assert(0 && "implement me");