52e2b9a1d96abc55b0680dacd4bf86dbd4f02759
[platform/upstream/cmake.git] / Utilities / cmlibuv / src / unix / stream.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45
46 struct uv__stream_select_s {
47   uv_stream_t* stream;
48   uv_thread_t thread;
49   uv_sem_t close_sem;
50   uv_sem_t async_sem;
51   uv_async_t async;
52   int events;
53   int fake_fd;
54   int int_fd;
55   int fd;
56   fd_set* sread;
57   size_t sread_sz;
58   fd_set* swrite;
59   size_t swrite_sz;
60 };
61 #endif /* defined(__APPLE__) */
62
63 static void uv__stream_connect(uv_stream_t*);
64 static void uv__write(uv_stream_t* stream);
65 static void uv__read(uv_stream_t* stream);
66 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
67 static void uv__write_callbacks(uv_stream_t* stream);
68 static size_t uv__write_req_size(uv_write_t* req);
69
70
71 void uv__stream_init(uv_loop_t* loop,
72                      uv_stream_t* stream,
73                      uv_handle_type type) {
74   int err;
75
76   uv__handle_init(loop, (uv_handle_t*)stream, type);
77   stream->read_cb = NULL;
78   stream->alloc_cb = NULL;
79   stream->close_cb = NULL;
80   stream->connection_cb = NULL;
81   stream->connect_req = NULL;
82   stream->shutdown_req = NULL;
83   stream->accepted_fd = -1;
84   stream->queued_fds = NULL;
85   stream->delayed_error = 0;
86   QUEUE_INIT(&stream->write_queue);
87   QUEUE_INIT(&stream->write_completed_queue);
88   stream->write_queue_size = 0;
89
90   if (loop->emfile_fd == -1) {
91     err = uv__open_cloexec("/dev/null", O_RDONLY);
92     if (err < 0)
93         /* In the rare case that "/dev/null" isn't mounted open "/"
94          * instead.
95          */
96         err = uv__open_cloexec("/", O_RDONLY);
97     if (err >= 0)
98       loop->emfile_fd = err;
99   }
100
101 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
102   stream->select = NULL;
103 #endif /* defined(__APPLE_) */
104
105   uv__io_init(&stream->io_watcher, uv__stream_io, -1);
106 }
107
108
109 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
110 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
111   /* Notify select() thread about state change */
112   uv__stream_select_t* s;
113   int r;
114
115   s = stream->select;
116   if (s == NULL)
117     return;
118
119   /* Interrupt select() loop
120    * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
121    * emit read event on other side
122    */
123   do
124     r = write(s->fake_fd, "x", 1);
125   while (r == -1 && errno == EINTR);
126
127   assert(r == 1);
128 #else  /* !defined(__APPLE__) */
129   /* No-op on any other platform */
130 #endif  /* !defined(__APPLE__) */
131 }
132
133
134 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
135 static void uv__stream_osx_select(void* arg) {
136   uv_stream_t* stream;
137   uv__stream_select_t* s;
138   char buf[1024];
139   int events;
140   int fd;
141   int r;
142   int max_fd;
143
144   stream = arg;
145   s = stream->select;
146   fd = s->fd;
147
148   if (fd > s->int_fd)
149     max_fd = fd;
150   else
151     max_fd = s->int_fd;
152
153   for (;;) {
154     /* Terminate on semaphore */
155     if (uv_sem_trywait(&s->close_sem) == 0)
156       break;
157
158     /* Watch fd using select(2) */
159     memset(s->sread, 0, s->sread_sz);
160     memset(s->swrite, 0, s->swrite_sz);
161
162     if (uv__io_active(&stream->io_watcher, POLLIN))
163       FD_SET(fd, s->sread);
164     if (uv__io_active(&stream->io_watcher, POLLOUT))
165       FD_SET(fd, s->swrite);
166     FD_SET(s->int_fd, s->sread);
167
168     /* Wait indefinitely for fd events */
169     r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
170     if (r == -1) {
171       if (errno == EINTR)
172         continue;
173
174       /* XXX: Possible?! */
175       abort();
176     }
177
178     /* Ignore timeouts */
179     if (r == 0)
180       continue;
181
182     /* Empty socketpair's buffer in case of interruption */
183     if (FD_ISSET(s->int_fd, s->sread))
184       for (;;) {
185         r = read(s->int_fd, buf, sizeof(buf));
186
187         if (r == sizeof(buf))
188           continue;
189
190         if (r != -1)
191           break;
192
193         if (errno == EAGAIN || errno == EWOULDBLOCK)
194           break;
195
196         if (errno == EINTR)
197           continue;
198
199         abort();
200       }
201
202     /* Handle events */
203     events = 0;
204     if (FD_ISSET(fd, s->sread))
205       events |= POLLIN;
206     if (FD_ISSET(fd, s->swrite))
207       events |= POLLOUT;
208
209     assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
210     if (events != 0) {
211       ACCESS_ONCE(int, s->events) = events;
212
213       uv_async_send(&s->async);
214       uv_sem_wait(&s->async_sem);
215
216       /* Should be processed at this stage */
217       assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
218     }
219   }
220 }
221
222
223 static void uv__stream_osx_select_cb(uv_async_t* handle) {
224   uv__stream_select_t* s;
225   uv_stream_t* stream;
226   int events;
227
228   s = container_of(handle, uv__stream_select_t, async);
229   stream = s->stream;
230
231   /* Get and reset stream's events */
232   events = s->events;
233   ACCESS_ONCE(int, s->events) = 0;
234
235   assert(events != 0);
236   assert(events == (events & (POLLIN | POLLOUT)));
237
238   /* Invoke callback on event-loop */
239   if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
240     uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
241
242   if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
243     uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
244
245   if (stream->flags & UV_HANDLE_CLOSING)
246     return;
247
248   /* NOTE: It is important to do it here, otherwise `select()` might be called
249    * before the actual `uv__read()`, leading to the blocking syscall
250    */
251   uv_sem_post(&s->async_sem);
252 }
253
254
255 static void uv__stream_osx_cb_close(uv_handle_t* async) {
256   uv__stream_select_t* s;
257
258   s = container_of(async, uv__stream_select_t, async);
259   uv__free(s);
260 }
261
262
263 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
264   /*
265    * kqueue doesn't work with some files from /dev mount on osx.
266    * select(2) in separate thread for those fds
267    */
268
269   struct kevent filter[1];
270   struct kevent events[1];
271   struct timespec timeout;
272   uv__stream_select_t* s;
273   int fds[2];
274   int err;
275   int ret;
276   int kq;
277   int old_fd;
278   int max_fd;
279   size_t sread_sz;
280   size_t swrite_sz;
281
282   kq = kqueue();
283   if (kq == -1) {
284     perror("(libuv) kqueue()");
285     return UV__ERR(errno);
286   }
287
288   EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
289
290   /* Use small timeout, because we only want to capture EINVALs */
291   timeout.tv_sec = 0;
292   timeout.tv_nsec = 1;
293
294   do
295     ret = kevent(kq, filter, 1, events, 1, &timeout);
296   while (ret == -1 && errno == EINTR);
297
298   uv__close(kq);
299
300   if (ret == -1)
301     return UV__ERR(errno);
302
303   if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
304     return 0;
305
306   /* At this point we definitely know that this fd won't work with kqueue */
307
308   /*
309    * Create fds for io watcher and to interrupt the select() loop.
310    * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
311    */
312   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
313     return UV__ERR(errno);
314
315   max_fd = *fd;
316   if (fds[1] > max_fd)
317     max_fd = fds[1];
318
319   sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
320   swrite_sz = sread_sz;
321
322   s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
323   if (s == NULL) {
324     err = UV_ENOMEM;
325     goto failed_malloc;
326   }
327
328   s->events = 0;
329   s->fd = *fd;
330   s->sread = (fd_set*) ((char*) s + sizeof(*s));
331   s->sread_sz = sread_sz;
332   s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
333   s->swrite_sz = swrite_sz;
334
335   err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
336   if (err)
337     goto failed_async_init;
338
339   s->async.flags |= UV_HANDLE_INTERNAL;
340   uv__handle_unref(&s->async);
341
342   err = uv_sem_init(&s->close_sem, 0);
343   if (err != 0)
344     goto failed_close_sem_init;
345
346   err = uv_sem_init(&s->async_sem, 0);
347   if (err != 0)
348     goto failed_async_sem_init;
349
350   s->fake_fd = fds[0];
351   s->int_fd = fds[1];
352
353   old_fd = *fd;
354   s->stream = stream;
355   stream->select = s;
356   *fd = s->fake_fd;
357
358   err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
359   if (err != 0)
360     goto failed_thread_create;
361
362   return 0;
363
364 failed_thread_create:
365   s->stream = NULL;
366   stream->select = NULL;
367   *fd = old_fd;
368
369   uv_sem_destroy(&s->async_sem);
370
371 failed_async_sem_init:
372   uv_sem_destroy(&s->close_sem);
373
374 failed_close_sem_init:
375   uv__close(fds[0]);
376   uv__close(fds[1]);
377   uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
378   return err;
379
380 failed_async_init:
381   uv__free(s);
382
383 failed_malloc:
384   uv__close(fds[0]);
385   uv__close(fds[1]);
386
387   return err;
388 }
389 #endif /* defined(__APPLE__) */
390
391
392 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
393 #if defined(__APPLE__)
394   int enable;
395 #endif
396
397   if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
398     return UV_EBUSY;
399
400   assert(fd >= 0);
401   stream->flags |= flags;
402
403   if (stream->type == UV_TCP) {
404     if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
405       return UV__ERR(errno);
406
407     /* TODO Use delay the user passed in. */
408     if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
409         uv__tcp_keepalive(fd, 1, 60)) {
410       return UV__ERR(errno);
411     }
412   }
413
414 #if defined(__APPLE__)
415   enable = 1;
416   if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
417       errno != ENOTSOCK &&
418       errno != EINVAL) {
419     return UV__ERR(errno);
420   }
421 #endif
422
423   stream->io_watcher.fd = fd;
424
425   return 0;
426 }
427
428
429 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
430   uv_write_t* req;
431   QUEUE* q;
432   while (!QUEUE_EMPTY(&stream->write_queue)) {
433     q = QUEUE_HEAD(&stream->write_queue);
434     QUEUE_REMOVE(q);
435
436     req = QUEUE_DATA(q, uv_write_t, queue);
437     req->error = error;
438
439     QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
440   }
441 }
442
443
444 void uv__stream_destroy(uv_stream_t* stream) {
445   assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
446   assert(stream->flags & UV_HANDLE_CLOSED);
447
448   if (stream->connect_req) {
449     uv__req_unregister(stream->loop, stream->connect_req);
450     stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
451     stream->connect_req = NULL;
452   }
453
454   uv__stream_flush_write_queue(stream, UV_ECANCELED);
455   uv__write_callbacks(stream);
456
457   if (stream->shutdown_req) {
458     /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
459      * fait accompli at this point. Maybe we should revisit this in v0.11.
460      * A possible reason for leaving it unchanged is that it informs the
461      * callee that the handle has been destroyed.
462      */
463     uv__req_unregister(stream->loop, stream->shutdown_req);
464     stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
465     stream->shutdown_req = NULL;
466   }
467
468   assert(stream->write_queue_size == 0);
469 }
470
471
472 /* Implements a best effort approach to mitigating accept() EMFILE errors.
473  * We have a spare file descriptor stashed away that we close to get below
474  * the EMFILE limit. Next, we accept all pending connections and close them
475  * immediately to signal the clients that we're overloaded - and we are, but
476  * we still keep on trucking.
477  *
478  * There is one caveat: it's not reliable in a multi-threaded environment.
479  * The file descriptor limit is per process. Our party trick fails if another
480  * thread opens a file or creates a socket in the time window between us
481  * calling close() and accept().
482  */
483 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
484   int err;
485   int emfile_fd;
486
487   if (loop->emfile_fd == -1)
488     return UV_EMFILE;
489
490   uv__close(loop->emfile_fd);
491   loop->emfile_fd = -1;
492
493   do {
494     err = uv__accept(accept_fd);
495     if (err >= 0)
496       uv__close(err);
497   } while (err >= 0 || err == UV_EINTR);
498
499   emfile_fd = uv__open_cloexec("/", O_RDONLY);
500   if (emfile_fd >= 0)
501     loop->emfile_fd = emfile_fd;
502
503   return err;
504 }
505
506
507 #if defined(UV_HAVE_KQUEUE)
508 # define UV_DEC_BACKLOG(w) w->rcount--;
509 #else
510 # define UV_DEC_BACKLOG(w) /* no-op */
511 #endif /* defined(UV_HAVE_KQUEUE) */
512
513
514 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
515   uv_stream_t* stream;
516   int err;
517
518   stream = container_of(w, uv_stream_t, io_watcher);
519   assert(events & POLLIN);
520   assert(stream->accepted_fd == -1);
521   assert(!(stream->flags & UV_HANDLE_CLOSING));
522
523   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
524
525   /* connection_cb can close the server socket while we're
526    * in the loop so check it on each iteration.
527    */
528   while (uv__stream_fd(stream) != -1) {
529     assert(stream->accepted_fd == -1);
530
531 #if defined(UV_HAVE_KQUEUE)
532     if (w->rcount <= 0)
533       return;
534 #endif /* defined(UV_HAVE_KQUEUE) */
535
536     err = uv__accept(uv__stream_fd(stream));
537     if (err < 0) {
538       if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
539         return;  /* Not an error. */
540
541       if (err == UV_ECONNABORTED)
542         continue;  /* Ignore. Nothing we can do about that. */
543
544       if (err == UV_EMFILE || err == UV_ENFILE) {
545         err = uv__emfile_trick(loop, uv__stream_fd(stream));
546         if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
547           break;
548       }
549
550       stream->connection_cb(stream, err);
551       continue;
552     }
553
554     UV_DEC_BACKLOG(w)
555     stream->accepted_fd = err;
556     stream->connection_cb(stream, 0);
557
558     if (stream->accepted_fd != -1) {
559       /* The user hasn't yet accepted called uv_accept() */
560       uv__io_stop(loop, &stream->io_watcher, POLLIN);
561       return;
562     }
563
564     if (stream->type == UV_TCP &&
565         (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
566       /* Give other processes a chance to accept connections. */
567       struct timespec timeout = { 0, 1 };
568       nanosleep(&timeout, NULL);
569     }
570   }
571 }
572
573
574 #undef UV_DEC_BACKLOG
575
576
577 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
578   int err;
579
580   assert(server->loop == client->loop);
581
582   if (server->accepted_fd == -1)
583     return UV_EAGAIN;
584
585   switch (client->type) {
586     case UV_NAMED_PIPE:
587     case UV_TCP:
588       err = uv__stream_open(client,
589                             server->accepted_fd,
590                             UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
591       if (err) {
592         /* TODO handle error */
593         uv__close(server->accepted_fd);
594         goto done;
595       }
596       break;
597
598     case UV_UDP:
599       err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
600       if (err) {
601         uv__close(server->accepted_fd);
602         goto done;
603       }
604       break;
605
606     default:
607       return UV_EINVAL;
608   }
609
610   client->flags |= UV_HANDLE_BOUND;
611
612 done:
613   /* Process queued fds */
614   if (server->queued_fds != NULL) {
615     uv__stream_queued_fds_t* queued_fds;
616
617     queued_fds = server->queued_fds;
618
619     /* Read first */
620     server->accepted_fd = queued_fds->fds[0];
621
622     /* All read, free */
623     assert(queued_fds->offset > 0);
624     if (--queued_fds->offset == 0) {
625       uv__free(queued_fds);
626       server->queued_fds = NULL;
627     } else {
628       /* Shift rest */
629       memmove(queued_fds->fds,
630               queued_fds->fds + 1,
631               queued_fds->offset * sizeof(*queued_fds->fds));
632     }
633   } else {
634     server->accepted_fd = -1;
635     if (err == 0)
636       uv__io_start(server->loop, &server->io_watcher, POLLIN);
637   }
638   return err;
639 }
640
641
642 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
643   int err;
644
645   switch (stream->type) {
646   case UV_TCP:
647     err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
648     break;
649
650   case UV_NAMED_PIPE:
651     err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
652     break;
653
654   default:
655     err = UV_EINVAL;
656   }
657
658   if (err == 0)
659     uv__handle_start(stream);
660
661   return err;
662 }
663
664
665 static void uv__drain(uv_stream_t* stream) {
666   uv_shutdown_t* req;
667   int err;
668
669   assert(QUEUE_EMPTY(&stream->write_queue));
670   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
671   uv__stream_osx_interrupt_select(stream);
672
673   /* Shutdown? */
674   if ((stream->flags & UV_HANDLE_SHUTTING) &&
675       !(stream->flags & UV_HANDLE_CLOSING) &&
676       !(stream->flags & UV_HANDLE_SHUT)) {
677     assert(stream->shutdown_req);
678
679     req = stream->shutdown_req;
680     stream->shutdown_req = NULL;
681     stream->flags &= ~UV_HANDLE_SHUTTING;
682     uv__req_unregister(stream->loop, req);
683
684     err = 0;
685     if (shutdown(uv__stream_fd(stream), SHUT_WR))
686       err = UV__ERR(errno);
687
688     if (err == 0)
689       stream->flags |= UV_HANDLE_SHUT;
690
691     if (req->cb != NULL)
692       req->cb(req, err);
693   }
694 }
695
696
697 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
698   if (n == 1)
699     return write(fd, vec->iov_base, vec->iov_len);
700   else
701     return writev(fd, vec, n);
702 }
703
704
705 static size_t uv__write_req_size(uv_write_t* req) {
706   size_t size;
707
708   assert(req->bufs != NULL);
709   size = uv__count_bufs(req->bufs + req->write_index,
710                         req->nbufs - req->write_index);
711   assert(req->handle->write_queue_size >= size);
712
713   return size;
714 }
715
716
717 /* Returns 1 if all write request data has been written, or 0 if there is still
718  * more data to write.
719  *
720  * Note: the return value only says something about the *current* request.
721  * There may still be other write requests sitting in the queue.
722  */
723 static int uv__write_req_update(uv_stream_t* stream,
724                                 uv_write_t* req,
725                                 size_t n) {
726   uv_buf_t* buf;
727   size_t len;
728
729   assert(n <= stream->write_queue_size);
730   stream->write_queue_size -= n;
731
732   buf = req->bufs + req->write_index;
733
734   do {
735     len = n < buf->len ? n : buf->len;
736     buf->base += len;
737     buf->len -= len;
738     buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
739     n -= len;
740   } while (n > 0);
741
742   req->write_index = buf - req->bufs;
743
744   return req->write_index == req->nbufs;
745 }
746
747
748 static void uv__write_req_finish(uv_write_t* req) {
749   uv_stream_t* stream = req->handle;
750
751   /* Pop the req off tcp->write_queue. */
752   QUEUE_REMOVE(&req->queue);
753
754   /* Only free when there was no error. On error, we touch up write_queue_size
755    * right before making the callback. The reason we don't do that right away
756    * is that a write_queue_size > 0 is our only way to signal to the user that
757    * they should stop writing - which they should if we got an error. Something
758    * to revisit in future revisions of the libuv API.
759    */
760   if (req->error == 0) {
761     if (req->bufs != req->bufsml)
762       uv__free(req->bufs);
763     req->bufs = NULL;
764   }
765
766   /* Add it to the write_completed_queue where it will have its
767    * callback called in the near future.
768    */
769   QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
770   uv__io_feed(stream->loop, &stream->io_watcher);
771 }
772
773
774 static int uv__handle_fd(uv_handle_t* handle) {
775   switch (handle->type) {
776     case UV_NAMED_PIPE:
777     case UV_TCP:
778       return ((uv_stream_t*) handle)->io_watcher.fd;
779
780     case UV_UDP:
781       return ((uv_udp_t*) handle)->io_watcher.fd;
782
783     default:
784       return -1;
785   }
786 }
787
788 static int uv__try_write(uv_stream_t* stream,
789                          const uv_buf_t bufs[],
790                          unsigned int nbufs,
791                          uv_stream_t* send_handle) {
792   struct iovec* iov;
793   int iovmax;
794   int iovcnt;
795   ssize_t n;
796
797   /*
798    * Cast to iovec. We had to have our own uv_buf_t instead of iovec
799    * because Windows's WSABUF is not an iovec.
800    */
801   iov = (struct iovec*) bufs;
802   iovcnt = nbufs;
803
804   iovmax = uv__getiovmax();
805
806   /* Limit iov count to avoid EINVALs from writev() */
807   if (iovcnt > iovmax)
808     iovcnt = iovmax;
809
810   /*
811    * Now do the actual writev. Note that we've been updating the pointers
812    * inside the iov each time we write. So there is no need to offset it.
813    */
814   if (send_handle != NULL) {
815     int fd_to_send;
816     struct msghdr msg;
817     struct cmsghdr *cmsg;
818     union {
819       char data[64];
820       struct cmsghdr alias;
821     } scratch;
822
823     if (uv__is_closing(send_handle))
824       return UV_EBADF;
825
826     fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
827
828     memset(&scratch, 0, sizeof(scratch));
829
830     assert(fd_to_send >= 0);
831
832     msg.msg_name = NULL;
833     msg.msg_namelen = 0;
834     msg.msg_iov = iov;
835     msg.msg_iovlen = iovcnt;
836     msg.msg_flags = 0;
837
838     msg.msg_control = &scratch.alias;
839     msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
840
841     cmsg = CMSG_FIRSTHDR(&msg);
842     cmsg->cmsg_level = SOL_SOCKET;
843     cmsg->cmsg_type = SCM_RIGHTS;
844     cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
845
846     /* silence aliasing warning */
847     {
848       void* pv = CMSG_DATA(cmsg);
849       int* pi = pv;
850       *pi = fd_to_send;
851     }
852
853     do
854       n = sendmsg(uv__stream_fd(stream), &msg, 0);
855     while (n == -1 && errno == EINTR);
856   } else {
857     do
858       n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
859     while (n == -1 && errno == EINTR);
860   }
861
862   if (n >= 0)
863     return n;
864
865   if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
866     return UV_EAGAIN;
867
868 #ifdef __APPLE__
869   /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
870    * have a bug where a race condition causes the kernel to return EPROTOTYPE
871    * because the socket isn't fully constructed. It's probably the result of
872    * the peer closing the connection and that is why libuv translates it to
873    * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
874    * away but some VPN software causes the same behavior except the error is
875    * permanent, not transient, turning the retry mechanism into an infinite
876    * loop. See https://github.com/libuv/libuv/pull/482.
877    */
878   if (errno == EPROTOTYPE)
879     return UV_ECONNRESET;
880 #endif  /* __APPLE__ */
881
882   return UV__ERR(errno);
883 }
884
885 static void uv__write(uv_stream_t* stream) {
886   QUEUE* q;
887   uv_write_t* req;
888   ssize_t n;
889
890   assert(uv__stream_fd(stream) >= 0);
891
892   for (;;) {
893     if (QUEUE_EMPTY(&stream->write_queue))
894       return;
895
896     q = QUEUE_HEAD(&stream->write_queue);
897     req = QUEUE_DATA(q, uv_write_t, queue);
898     assert(req->handle == stream);
899
900     n = uv__try_write(stream,
901                       &(req->bufs[req->write_index]),
902                       req->nbufs - req->write_index,
903                       req->send_handle);
904
905     /* Ensure the handle isn't sent again in case this is a partial write. */
906     if (n >= 0) {
907       req->send_handle = NULL;
908       if (uv__write_req_update(stream, req, n)) {
909         uv__write_req_finish(req);
910         return;  /* TODO(bnoordhuis) Start trying to write the next request. */
911       }
912     } else if (n != UV_EAGAIN)
913       break;
914
915     /* If this is a blocking stream, try again. */
916     if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
917       continue;
918
919     /* We're not done. */
920     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
921
922     /* Notify select() thread about state change */
923     uv__stream_osx_interrupt_select(stream);
924
925     return;
926   }
927
928   req->error = n;
929   /* XXX(jwn): this must call uv__stream_flush_write_queue(stream, n) here, since we won't generate any more events */
930   uv__write_req_finish(req);
931   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
932   uv__stream_osx_interrupt_select(stream);
933 }
934
935
936 static void uv__write_callbacks(uv_stream_t* stream) {
937   uv_write_t* req;
938   QUEUE* q;
939   QUEUE pq;
940
941   if (QUEUE_EMPTY(&stream->write_completed_queue))
942     return;
943
944   QUEUE_MOVE(&stream->write_completed_queue, &pq);
945
946   while (!QUEUE_EMPTY(&pq)) {
947     /* Pop a req off write_completed_queue. */
948     q = QUEUE_HEAD(&pq);
949     req = QUEUE_DATA(q, uv_write_t, queue);
950     QUEUE_REMOVE(q);
951     uv__req_unregister(stream->loop, req);
952
953     if (req->bufs != NULL) {
954       stream->write_queue_size -= uv__write_req_size(req);
955       if (req->bufs != req->bufsml)
956         uv__free(req->bufs);
957       req->bufs = NULL;
958     }
959
960     /* NOTE: call callback AFTER freeing the request data. */
961     if (req->cb)
962       req->cb(req, req->error);
963   }
964 }
965
966
967 uv_handle_type uv__handle_type(int fd) {
968   struct sockaddr_storage ss;
969   socklen_t sslen;
970   socklen_t len;
971   int type;
972
973   memset(&ss, 0, sizeof(ss));
974   sslen = sizeof(ss);
975
976   if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
977     return UV_UNKNOWN_HANDLE;
978
979   len = sizeof type;
980
981   if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
982     return UV_UNKNOWN_HANDLE;
983
984   if (type == SOCK_STREAM) {
985 #if defined(_AIX) || defined(__DragonFly__)
986     /* on AIX/DragonFly the getsockname call returns an empty sa structure
987      * for sockets of type AF_UNIX.  For all other types it will
988      * return a properly filled in structure.
989      */
990     if (sslen == 0)
991       return UV_NAMED_PIPE;
992 #endif
993     switch (ss.ss_family) {
994       case AF_UNIX:
995         return UV_NAMED_PIPE;
996       case AF_INET:
997       case AF_INET6:
998         return UV_TCP;
999       }
1000   }
1001
1002   if (type == SOCK_DGRAM &&
1003       (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
1004     return UV_UDP;
1005
1006   return UV_UNKNOWN_HANDLE;
1007 }
1008
1009
1010 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
1011   stream->flags |= UV_HANDLE_READ_EOF;
1012   stream->flags &= ~UV_HANDLE_READING;
1013   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1014   uv__handle_stop(stream);
1015   uv__stream_osx_interrupt_select(stream);
1016   stream->read_cb(stream, UV_EOF, buf);
1017 }
1018
1019
1020 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
1021   uv__stream_queued_fds_t* queued_fds;
1022   unsigned int queue_size;
1023
1024   queued_fds = stream->queued_fds;
1025   if (queued_fds == NULL) {
1026     queue_size = 8;
1027     queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1028                             sizeof(*queued_fds));
1029     if (queued_fds == NULL)
1030       return UV_ENOMEM;
1031     queued_fds->size = queue_size;
1032     queued_fds->offset = 0;
1033     stream->queued_fds = queued_fds;
1034
1035     /* Grow */
1036   } else if (queued_fds->size == queued_fds->offset) {
1037     queue_size = queued_fds->size + 8;
1038     queued_fds = uv__realloc(queued_fds,
1039                              (queue_size - 1) * sizeof(*queued_fds->fds) +
1040                               sizeof(*queued_fds));
1041
1042     /*
1043      * Allocation failure, report back.
1044      * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1045      */
1046     if (queued_fds == NULL)
1047       return UV_ENOMEM;
1048     queued_fds->size = queue_size;
1049     stream->queued_fds = queued_fds;
1050   }
1051
1052   /* Put fd in a queue */
1053   queued_fds->fds[queued_fds->offset++] = fd;
1054
1055   return 0;
1056 }
1057
1058
1059 #if defined(__PASE__)
1060 /* on IBMi PASE the control message length can not exceed 256. */
1061 # define UV__CMSG_FD_COUNT 60
1062 #else
1063 # define UV__CMSG_FD_COUNT 64
1064 #endif
1065 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1066
1067
1068 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1069   struct cmsghdr* cmsg;
1070
1071   for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1072     char* start;
1073     char* end;
1074     int err;
1075     void* pv;
1076     int* pi;
1077     unsigned int i;
1078     unsigned int count;
1079
1080     if (cmsg->cmsg_type != SCM_RIGHTS) {
1081       fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1082           cmsg->cmsg_type);
1083       continue;
1084     }
1085
1086     /* silence aliasing warning */
1087     pv = CMSG_DATA(cmsg);
1088     pi = pv;
1089
1090     /* Count available fds */
1091     start = (char*) cmsg;
1092     end = (char*) cmsg + cmsg->cmsg_len;
1093     count = 0;
1094     while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1095       count++;
1096     assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1097
1098     for (i = 0; i < count; i++) {
1099       /* Already has accepted fd, queue now */
1100       if (stream->accepted_fd != -1) {
1101         err = uv__stream_queue_fd(stream, pi[i]);
1102         if (err != 0) {
1103           /* Close rest */
1104           for (; i < count; i++)
1105             uv__close(pi[i]);
1106           return err;
1107         }
1108       } else {
1109         stream->accepted_fd = pi[i];
1110       }
1111     }
1112   }
1113
1114   return 0;
1115 }
1116
1117
1118 #ifdef __clang__
1119 # pragma clang diagnostic push
1120 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1121 # pragma clang diagnostic ignored "-Wvla-extension"
1122 #endif
1123
1124 static void uv__read(uv_stream_t* stream) {
1125   uv_buf_t buf;
1126   ssize_t nread;
1127   struct msghdr msg;
1128   char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1129   int count;
1130   int err;
1131   int is_ipc;
1132
1133   stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1134
1135   /* Prevent loop starvation when the data comes in as fast as (or faster than)
1136    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1137    */
1138   count = 32;
1139
1140   is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1141
1142   /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1143    * tcp->read_cb is NULL or not?
1144    */
1145   while (stream->read_cb
1146       && (stream->flags & UV_HANDLE_READING)
1147       && (count-- > 0)) {
1148     assert(stream->alloc_cb != NULL);
1149
1150     buf = uv_buf_init(NULL, 0);
1151     stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1152     if (buf.base == NULL || buf.len == 0) {
1153       /* User indicates it can't or won't handle the read. */
1154       stream->read_cb(stream, UV_ENOBUFS, &buf);
1155       return;
1156     }
1157
1158     assert(buf.base != NULL);
1159     assert(uv__stream_fd(stream) >= 0);
1160
1161     if (!is_ipc) {
1162       do {
1163         nread = read(uv__stream_fd(stream), buf.base, buf.len);
1164       }
1165       while (nread < 0 && errno == EINTR);
1166     } else {
1167       /* ipc uses recvmsg */
1168       msg.msg_flags = 0;
1169       msg.msg_iov = (struct iovec*) &buf;
1170       msg.msg_iovlen = 1;
1171       msg.msg_name = NULL;
1172       msg.msg_namelen = 0;
1173       /* Set up to receive a descriptor even if one isn't in the message */
1174       msg.msg_controllen = sizeof(cmsg_space);
1175       msg.msg_control = cmsg_space;
1176
1177       do {
1178         nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1179       }
1180       while (nread < 0 && errno == EINTR);
1181     }
1182
1183     if (nread < 0) {
1184       /* Error */
1185       if (errno == EAGAIN || errno == EWOULDBLOCK) {
1186         /* Wait for the next one. */
1187         if (stream->flags & UV_HANDLE_READING) {
1188           uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1189           uv__stream_osx_interrupt_select(stream);
1190         }
1191         stream->read_cb(stream, 0, &buf);
1192 #if defined(__CYGWIN__) || defined(__MSYS__)
1193       } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1194         uv__stream_eof(stream, &buf);
1195         return;
1196 #endif
1197       } else {
1198         /* Error. User should call uv_close(). */
1199         stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1200         stream->read_cb(stream, UV__ERR(errno), &buf);
1201         if (stream->flags & UV_HANDLE_READING) {
1202           stream->flags &= ~UV_HANDLE_READING;
1203           uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1204           uv__handle_stop(stream);
1205           uv__stream_osx_interrupt_select(stream);
1206         }
1207       }
1208       return;
1209     } else if (nread == 0) {
1210       uv__stream_eof(stream, &buf);
1211       return;
1212     } else {
1213       /* Successful read */
1214       ssize_t buflen = buf.len;
1215
1216       if (is_ipc) {
1217         err = uv__stream_recv_cmsg(stream, &msg);
1218         if (err != 0) {
1219           stream->read_cb(stream, err, &buf);
1220           return;
1221         }
1222       }
1223
1224 #if defined(__MVS__)
1225       if (is_ipc && msg.msg_controllen > 0) {
1226         uv_buf_t blankbuf;
1227         int nread;
1228         struct iovec *old;
1229
1230         blankbuf.base = 0;
1231         blankbuf.len = 0;
1232         old = msg.msg_iov;
1233         msg.msg_iov = (struct iovec*) &blankbuf;
1234         nread = 0;
1235         do {
1236           nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1237           err = uv__stream_recv_cmsg(stream, &msg);
1238           if (err != 0) {
1239             stream->read_cb(stream, err, &buf);
1240             msg.msg_iov = old;
1241             return;
1242           }
1243         } while (nread == 0 && msg.msg_controllen > 0);
1244         msg.msg_iov = old;
1245       }
1246 #endif
1247       stream->read_cb(stream, nread, &buf);
1248
1249       /* Return if we didn't fill the buffer, there is no more data to read. */
1250       if (nread < buflen) {
1251         stream->flags |= UV_HANDLE_READ_PARTIAL;
1252         return;
1253       }
1254     }
1255   }
1256 }
1257
1258
1259 #ifdef __clang__
1260 # pragma clang diagnostic pop
1261 #endif
1262
1263 #undef UV__CMSG_FD_COUNT
1264 #undef UV__CMSG_FD_SIZE
1265
1266
1267 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1268   assert(stream->type == UV_TCP ||
1269          stream->type == UV_TTY ||
1270          stream->type == UV_NAMED_PIPE);
1271
1272   if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1273       stream->flags & UV_HANDLE_SHUT ||
1274       stream->flags & UV_HANDLE_SHUTTING ||
1275       uv__is_closing(stream)) {
1276     return UV_ENOTCONN;
1277   }
1278
1279   assert(uv__stream_fd(stream) >= 0);
1280
1281   /* Initialize request */
1282   uv__req_init(stream->loop, req, UV_SHUTDOWN);
1283   req->handle = stream;
1284   req->cb = cb;
1285   stream->shutdown_req = req;
1286   stream->flags |= UV_HANDLE_SHUTTING;
1287   stream->flags &= ~UV_HANDLE_WRITABLE;
1288
1289   uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1290   uv__stream_osx_interrupt_select(stream);
1291
1292   return 0;
1293 }
1294
1295
1296 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1297   uv_stream_t* stream;
1298
1299   stream = container_of(w, uv_stream_t, io_watcher);
1300
1301   assert(stream->type == UV_TCP ||
1302          stream->type == UV_NAMED_PIPE ||
1303          stream->type == UV_TTY);
1304   assert(!(stream->flags & UV_HANDLE_CLOSING));
1305
1306   if (stream->connect_req) {
1307     uv__stream_connect(stream);
1308     return;
1309   }
1310
1311   assert(uv__stream_fd(stream) >= 0);
1312
1313   /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1314   if (events & (POLLIN | POLLERR | POLLHUP))
1315     uv__read(stream);
1316
1317   if (uv__stream_fd(stream) == -1)
1318     return;  /* read_cb closed stream. */
1319
1320   /* Short-circuit iff POLLHUP is set, the user is still interested in read
1321    * events and uv__read() reported a partial read but not EOF. If the EOF
1322    * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1323    * have to do anything. If the partial read flag is not set, we can't
1324    * report the EOF yet because there is still data to read.
1325    */
1326   if ((events & POLLHUP) &&
1327       (stream->flags & UV_HANDLE_READING) &&
1328       (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1329       !(stream->flags & UV_HANDLE_READ_EOF)) {
1330     uv_buf_t buf = { NULL, 0 };
1331     uv__stream_eof(stream, &buf);
1332   }
1333
1334   if (uv__stream_fd(stream) == -1)
1335     return;  /* read_cb closed stream. */
1336
1337   if (events & (POLLOUT | POLLERR | POLLHUP)) {
1338     uv__write(stream);
1339     uv__write_callbacks(stream);
1340
1341     /* Write queue drained. */
1342     if (QUEUE_EMPTY(&stream->write_queue))
1343       uv__drain(stream);
1344   }
1345 }
1346
1347
1348 /**
1349  * We get called here from directly following a call to connect(2).
1350  * In order to determine if we've errored out or succeeded must call
1351  * getsockopt.
1352  */
1353 static void uv__stream_connect(uv_stream_t* stream) {
1354   int error;
1355   uv_connect_t* req = stream->connect_req;
1356   socklen_t errorsize = sizeof(int);
1357
1358   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1359   assert(req);
1360
1361   if (stream->delayed_error) {
1362     /* To smooth over the differences between unixes errors that
1363      * were reported synchronously on the first connect can be delayed
1364      * until the next tick--which is now.
1365      */
1366     error = stream->delayed_error;
1367     stream->delayed_error = 0;
1368   } else {
1369     /* Normal situation: we need to get the socket error from the kernel. */
1370     assert(uv__stream_fd(stream) >= 0);
1371     getsockopt(uv__stream_fd(stream),
1372                SOL_SOCKET,
1373                SO_ERROR,
1374                &error,
1375                &errorsize);
1376     error = UV__ERR(error);
1377   }
1378
1379   if (error == UV__ERR(EINPROGRESS))
1380     return;
1381
1382   stream->connect_req = NULL;
1383   uv__req_unregister(stream->loop, req);
1384
1385   if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1386     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1387   }
1388
1389   if (req->cb)
1390     req->cb(req, error);
1391
1392   if (uv__stream_fd(stream) == -1)
1393     return;
1394
1395   if (error < 0) {
1396     uv__stream_flush_write_queue(stream, UV_ECANCELED);
1397     uv__write_callbacks(stream);
1398   }
1399 }
1400
1401
1402 static int uv__check_before_write(uv_stream_t* stream,
1403                                   unsigned int nbufs,
1404                                   uv_stream_t* send_handle) {
1405   assert(nbufs > 0);
1406   assert((stream->type == UV_TCP ||
1407           stream->type == UV_NAMED_PIPE ||
1408           stream->type == UV_TTY) &&
1409          "uv_write (unix) does not yet support other types of streams");
1410
1411   if (uv__stream_fd(stream) < 0)
1412     return UV_EBADF;
1413
1414   if (!(stream->flags & UV_HANDLE_WRITABLE))
1415     return UV_EPIPE;
1416
1417   if (send_handle != NULL) {
1418     if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1419       return UV_EINVAL;
1420
1421     /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1422      * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1423      * evaluates to a function that operates on a uv_stream_t with a couple of
1424      * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1425      * which works but only by accident.
1426      */
1427     if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1428       return UV_EBADF;
1429
1430 #if defined(__CYGWIN__) || defined(__MSYS__)
1431     /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1432        See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1433     return UV_ENOSYS;
1434 #endif
1435   }
1436
1437   return 0;
1438 }
1439
1440 int uv_write2(uv_write_t* req,
1441               uv_stream_t* stream,
1442               const uv_buf_t bufs[],
1443               unsigned int nbufs,
1444               uv_stream_t* send_handle,
1445               uv_write_cb cb) {
1446   int empty_queue;
1447   int err;
1448
1449   err = uv__check_before_write(stream, nbufs, send_handle);
1450   if (err < 0)
1451     return err;
1452
1453   /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1454    * it means there are error-state requests in the write_completed_queue that
1455    * will touch up write_queue_size later, see also uv__write_req_finish().
1456    * We could check that write_queue is empty instead but that implies making
1457    * a write() syscall when we know that the handle is in error mode.
1458    */
1459   empty_queue = (stream->write_queue_size == 0);
1460
1461   /* Initialize the req */
1462   uv__req_init(stream->loop, req, UV_WRITE);
1463   req->cb = cb;
1464   req->handle = stream;
1465   req->error = 0;
1466   req->send_handle = send_handle;
1467   QUEUE_INIT(&req->queue);
1468
1469   req->bufs = req->bufsml;
1470   if (nbufs > ARRAY_SIZE(req->bufsml))
1471     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1472
1473   if (req->bufs == NULL)
1474     return UV_ENOMEM;
1475
1476   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1477   req->nbufs = nbufs;
1478   req->write_index = 0;
1479   stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1480
1481   /* Append the request to write_queue. */
1482   QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1483
1484   /* If the queue was empty when this function began, we should attempt to
1485    * do the write immediately. Otherwise start the write_watcher and wait
1486    * for the fd to become writable.
1487    */
1488   if (stream->connect_req) {
1489     /* Still connecting, do nothing. */
1490   }
1491   else if (empty_queue) {
1492     uv__write(stream);
1493   }
1494   else {
1495     /*
1496      * blocking streams should never have anything in the queue.
1497      * if this assert fires then somehow the blocking stream isn't being
1498      * sufficiently flushed in uv__write.
1499      */
1500     assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1501     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1502     uv__stream_osx_interrupt_select(stream);
1503   }
1504
1505   return 0;
1506 }
1507
1508
1509 /* The buffers to be written must remain valid until the callback is called.
1510  * This is not required for the uv_buf_t array.
1511  */
1512 int uv_write(uv_write_t* req,
1513              uv_stream_t* handle,
1514              const uv_buf_t bufs[],
1515              unsigned int nbufs,
1516              uv_write_cb cb) {
1517   return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1518 }
1519
1520
1521 int uv_try_write(uv_stream_t* stream,
1522                  const uv_buf_t bufs[],
1523                  unsigned int nbufs) {
1524   return uv_try_write2(stream, bufs, nbufs, NULL);
1525 }
1526
1527
1528 int uv_try_write2(uv_stream_t* stream,
1529                   const uv_buf_t bufs[],
1530                   unsigned int nbufs,
1531                   uv_stream_t* send_handle) {
1532   int err;
1533
1534   /* Connecting or already writing some data */
1535   if (stream->connect_req != NULL || stream->write_queue_size != 0)
1536     return UV_EAGAIN;
1537
1538   err = uv__check_before_write(stream, nbufs, NULL);
1539   if (err < 0)
1540     return err;
1541
1542   return uv__try_write(stream, bufs, nbufs, send_handle);
1543 }
1544
1545
1546 int uv__read_start(uv_stream_t* stream,
1547                    uv_alloc_cb alloc_cb,
1548                    uv_read_cb read_cb) {
1549   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1550       stream->type == UV_TTY);
1551
1552   /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1553    * just expresses the desired state of the user. */
1554   stream->flags |= UV_HANDLE_READING;
1555   stream->flags &= ~UV_HANDLE_READ_EOF;
1556
1557   /* TODO: try to do the read inline? */
1558   assert(uv__stream_fd(stream) >= 0);
1559   assert(alloc_cb);
1560
1561   stream->read_cb = read_cb;
1562   stream->alloc_cb = alloc_cb;
1563
1564   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1565   uv__handle_start(stream);
1566   uv__stream_osx_interrupt_select(stream);
1567
1568   return 0;
1569 }
1570
1571
1572 int uv_read_stop(uv_stream_t* stream) {
1573   if (!(stream->flags & UV_HANDLE_READING))
1574     return 0;
1575
1576   stream->flags &= ~UV_HANDLE_READING;
1577   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1578   uv__handle_stop(stream);
1579   uv__stream_osx_interrupt_select(stream);
1580
1581   stream->read_cb = NULL;
1582   stream->alloc_cb = NULL;
1583   return 0;
1584 }
1585
1586
1587 int uv_is_readable(const uv_stream_t* stream) {
1588   return !!(stream->flags & UV_HANDLE_READABLE);
1589 }
1590
1591
1592 int uv_is_writable(const uv_stream_t* stream) {
1593   return !!(stream->flags & UV_HANDLE_WRITABLE);
1594 }
1595
1596
1597 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
1598 int uv___stream_fd(const uv_stream_t* handle) {
1599   const uv__stream_select_t* s;
1600
1601   assert(handle->type == UV_TCP ||
1602          handle->type == UV_TTY ||
1603          handle->type == UV_NAMED_PIPE);
1604
1605   s = handle->select;
1606   if (s != NULL)
1607     return s->fd;
1608
1609   return handle->io_watcher.fd;
1610 }
1611 #endif /* defined(__APPLE__) */
1612
1613
1614 void uv__stream_close(uv_stream_t* handle) {
1615   unsigned int i;
1616   uv__stream_queued_fds_t* queued_fds;
1617
1618 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
1619   /* Terminate select loop first */
1620   if (handle->select != NULL) {
1621     uv__stream_select_t* s;
1622
1623     s = handle->select;
1624
1625     uv_sem_post(&s->close_sem);
1626     uv_sem_post(&s->async_sem);
1627     uv__stream_osx_interrupt_select(handle);
1628     uv_thread_join(&s->thread);
1629     uv_sem_destroy(&s->close_sem);
1630     uv_sem_destroy(&s->async_sem);
1631     uv__close(s->fake_fd);
1632     uv__close(s->int_fd);
1633     uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1634
1635     handle->select = NULL;
1636   }
1637 #endif /* defined(__APPLE__) */
1638
1639   uv__io_close(handle->loop, &handle->io_watcher);
1640   uv_read_stop(handle);
1641   uv__handle_stop(handle);
1642   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1643
1644   if (handle->io_watcher.fd != -1) {
1645     /* Don't close stdio file descriptors.  Nothing good comes from it. */
1646     if (handle->io_watcher.fd > STDERR_FILENO)
1647       uv__close(handle->io_watcher.fd);
1648     handle->io_watcher.fd = -1;
1649   }
1650
1651   if (handle->accepted_fd != -1) {
1652     uv__close(handle->accepted_fd);
1653     handle->accepted_fd = -1;
1654   }
1655
1656   /* Close all queued fds */
1657   if (handle->queued_fds != NULL) {
1658     queued_fds = handle->queued_fds;
1659     for (i = 0; i < queued_fds->offset; i++)
1660       uv__close(queued_fds->fds[i]);
1661     uv__free(handle->queued_fds);
1662     handle->queued_fds = NULL;
1663   }
1664
1665   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1666 }
1667
1668
1669 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1670   /* Don't need to check the file descriptor, uv__nonblock()
1671    * will fail with EBADF if it's not valid.
1672    */
1673   return uv__nonblock(uv__stream_fd(handle), !blocking);
1674 }