Imported Upstream version 3.25.0
[platform/upstream/cmake.git] / Utilities / cmlibuv / src / unix / stream.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45
46 struct uv__stream_select_s {
47   uv_stream_t* stream;
48   uv_thread_t thread;
49   uv_sem_t close_sem;
50   uv_sem_t async_sem;
51   uv_async_t async;
52   int events;
53   int fake_fd;
54   int int_fd;
55   int fd;
56   fd_set* sread;
57   size_t sread_sz;
58   fd_set* swrite;
59   size_t swrite_sz;
60 };
61 #endif /* defined(__APPLE__) */
62
63 static void uv__stream_connect(uv_stream_t*);
64 static void uv__write(uv_stream_t* stream);
65 static void uv__read(uv_stream_t* stream);
66 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
67 static void uv__write_callbacks(uv_stream_t* stream);
68 static size_t uv__write_req_size(uv_write_t* req);
69 static void uv__drain(uv_stream_t* stream);
70
71
72 void uv__stream_init(uv_loop_t* loop,
73                      uv_stream_t* stream,
74                      uv_handle_type type) {
75   int err;
76
77   uv__handle_init(loop, (uv_handle_t*)stream, type);
78   stream->read_cb = NULL;
79   stream->alloc_cb = NULL;
80   stream->close_cb = NULL;
81   stream->connection_cb = NULL;
82   stream->connect_req = NULL;
83   stream->shutdown_req = NULL;
84   stream->accepted_fd = -1;
85   stream->queued_fds = NULL;
86   stream->delayed_error = 0;
87   QUEUE_INIT(&stream->write_queue);
88   QUEUE_INIT(&stream->write_completed_queue);
89   stream->write_queue_size = 0;
90
91   if (loop->emfile_fd == -1) {
92     err = uv__open_cloexec("/dev/null", O_RDONLY);
93     if (err < 0)
94         /* In the rare case that "/dev/null" isn't mounted open "/"
95          * instead.
96          */
97         err = uv__open_cloexec("/", O_RDONLY);
98     if (err >= 0)
99       loop->emfile_fd = err;
100   }
101
102 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
103   stream->select = NULL;
104 #endif /* defined(__APPLE_) */
105
106   uv__io_init(&stream->io_watcher, uv__stream_io, -1);
107 }
108
109
110 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
111 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
112   /* Notify select() thread about state change */
113   uv__stream_select_t* s;
114   int r;
115
116   s = stream->select;
117   if (s == NULL)
118     return;
119
120   /* Interrupt select() loop
121    * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
122    * emit read event on other side
123    */
124   do
125     r = write(s->fake_fd, "x", 1);
126   while (r == -1 && errno == EINTR);
127
128   assert(r == 1);
129 #else  /* !defined(__APPLE__) */
130   /* No-op on any other platform */
131 #endif  /* !defined(__APPLE__) */
132 }
133
134
135 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
136 static void uv__stream_osx_select(void* arg) {
137   uv_stream_t* stream;
138   uv__stream_select_t* s;
139   char buf[1024];
140   int events;
141   int fd;
142   int r;
143   int max_fd;
144
145   stream = arg;
146   s = stream->select;
147   fd = s->fd;
148
149   if (fd > s->int_fd)
150     max_fd = fd;
151   else
152     max_fd = s->int_fd;
153
154   for (;;) {
155     /* Terminate on semaphore */
156     if (uv_sem_trywait(&s->close_sem) == 0)
157       break;
158
159     /* Watch fd using select(2) */
160     memset(s->sread, 0, s->sread_sz);
161     memset(s->swrite, 0, s->swrite_sz);
162
163     if (uv__io_active(&stream->io_watcher, POLLIN))
164       FD_SET(fd, s->sread);
165     if (uv__io_active(&stream->io_watcher, POLLOUT))
166       FD_SET(fd, s->swrite);
167     FD_SET(s->int_fd, s->sread);
168
169     /* Wait indefinitely for fd events */
170     r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
171     if (r == -1) {
172       if (errno == EINTR)
173         continue;
174
175       /* XXX: Possible?! */
176       abort();
177     }
178
179     /* Ignore timeouts */
180     if (r == 0)
181       continue;
182
183     /* Empty socketpair's buffer in case of interruption */
184     if (FD_ISSET(s->int_fd, s->sread))
185       for (;;) {
186         r = read(s->int_fd, buf, sizeof(buf));
187
188         if (r == sizeof(buf))
189           continue;
190
191         if (r != -1)
192           break;
193
194         if (errno == EAGAIN || errno == EWOULDBLOCK)
195           break;
196
197         if (errno == EINTR)
198           continue;
199
200         abort();
201       }
202
203     /* Handle events */
204     events = 0;
205     if (FD_ISSET(fd, s->sread))
206       events |= POLLIN;
207     if (FD_ISSET(fd, s->swrite))
208       events |= POLLOUT;
209
210     assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
211     if (events != 0) {
212       ACCESS_ONCE(int, s->events) = events;
213
214       uv_async_send(&s->async);
215       uv_sem_wait(&s->async_sem);
216
217       /* Should be processed at this stage */
218       assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
219     }
220   }
221 }
222
223
224 static void uv__stream_osx_select_cb(uv_async_t* handle) {
225   uv__stream_select_t* s;
226   uv_stream_t* stream;
227   int events;
228
229   s = container_of(handle, uv__stream_select_t, async);
230   stream = s->stream;
231
232   /* Get and reset stream's events */
233   events = s->events;
234   ACCESS_ONCE(int, s->events) = 0;
235
236   assert(events != 0);
237   assert(events == (events & (POLLIN | POLLOUT)));
238
239   /* Invoke callback on event-loop */
240   if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
241     uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
242
243   if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
244     uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
245
246   if (stream->flags & UV_HANDLE_CLOSING)
247     return;
248
249   /* NOTE: It is important to do it here, otherwise `select()` might be called
250    * before the actual `uv__read()`, leading to the blocking syscall
251    */
252   uv_sem_post(&s->async_sem);
253 }
254
255
256 static void uv__stream_osx_cb_close(uv_handle_t* async) {
257   uv__stream_select_t* s;
258
259   s = container_of(async, uv__stream_select_t, async);
260   uv__free(s);
261 }
262
263
264 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
265   /*
266    * kqueue doesn't work with some files from /dev mount on osx.
267    * select(2) in separate thread for those fds
268    */
269
270   struct kevent filter[1];
271   struct kevent events[1];
272   struct timespec timeout;
273   uv__stream_select_t* s;
274   int fds[2];
275   int err;
276   int ret;
277   int kq;
278   int old_fd;
279   int max_fd;
280   size_t sread_sz;
281   size_t swrite_sz;
282
283   kq = kqueue();
284   if (kq == -1) {
285     perror("(libuv) kqueue()");
286     return UV__ERR(errno);
287   }
288
289   EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
290
291   /* Use small timeout, because we only want to capture EINVALs */
292   timeout.tv_sec = 0;
293   timeout.tv_nsec = 1;
294
295   do
296     ret = kevent(kq, filter, 1, events, 1, &timeout);
297   while (ret == -1 && errno == EINTR);
298
299   uv__close(kq);
300
301   if (ret == -1)
302     return UV__ERR(errno);
303
304   if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
305     return 0;
306
307   /* At this point we definitely know that this fd won't work with kqueue */
308
309   /*
310    * Create fds for io watcher and to interrupt the select() loop.
311    * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
312    */
313   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
314     return UV__ERR(errno);
315
316   max_fd = *fd;
317   if (fds[1] > max_fd)
318     max_fd = fds[1];
319
320   sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
321   swrite_sz = sread_sz;
322
323   s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
324   if (s == NULL) {
325     err = UV_ENOMEM;
326     goto failed_malloc;
327   }
328
329   s->events = 0;
330   s->fd = *fd;
331   s->sread = (fd_set*) ((char*) s + sizeof(*s));
332   s->sread_sz = sread_sz;
333   s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
334   s->swrite_sz = swrite_sz;
335
336   err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
337   if (err)
338     goto failed_async_init;
339
340   s->async.flags |= UV_HANDLE_INTERNAL;
341   uv__handle_unref(&s->async);
342
343   err = uv_sem_init(&s->close_sem, 0);
344   if (err != 0)
345     goto failed_close_sem_init;
346
347   err = uv_sem_init(&s->async_sem, 0);
348   if (err != 0)
349     goto failed_async_sem_init;
350
351   s->fake_fd = fds[0];
352   s->int_fd = fds[1];
353
354   old_fd = *fd;
355   s->stream = stream;
356   stream->select = s;
357   *fd = s->fake_fd;
358
359   err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
360   if (err != 0)
361     goto failed_thread_create;
362
363   return 0;
364
365 failed_thread_create:
366   s->stream = NULL;
367   stream->select = NULL;
368   *fd = old_fd;
369
370   uv_sem_destroy(&s->async_sem);
371
372 failed_async_sem_init:
373   uv_sem_destroy(&s->close_sem);
374
375 failed_close_sem_init:
376   uv__close(fds[0]);
377   uv__close(fds[1]);
378   uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
379   return err;
380
381 failed_async_init:
382   uv__free(s);
383
384 failed_malloc:
385   uv__close(fds[0]);
386   uv__close(fds[1]);
387
388   return err;
389 }
390 #endif /* defined(__APPLE__) */
391
392
393 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
394 #if defined(__APPLE__)
395   int enable;
396 #endif
397
398   if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
399     return UV_EBUSY;
400
401   assert(fd >= 0);
402   stream->flags |= flags;
403
404   if (stream->type == UV_TCP) {
405     if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
406       return UV__ERR(errno);
407
408     /* TODO Use delay the user passed in. */
409     if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
410         uv__tcp_keepalive(fd, 1, 60)) {
411       return UV__ERR(errno);
412     }
413   }
414
415 #if defined(__APPLE__)
416   enable = 1;
417   if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
418       errno != ENOTSOCK &&
419       errno != EINVAL) {
420     return UV__ERR(errno);
421   }
422 #endif
423
424   stream->io_watcher.fd = fd;
425
426   return 0;
427 }
428
429
430 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
431   uv_write_t* req;
432   QUEUE* q;
433   while (!QUEUE_EMPTY(&stream->write_queue)) {
434     q = QUEUE_HEAD(&stream->write_queue);
435     QUEUE_REMOVE(q);
436
437     req = QUEUE_DATA(q, uv_write_t, queue);
438     req->error = error;
439
440     QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
441   }
442 }
443
444
445 void uv__stream_destroy(uv_stream_t* stream) {
446   assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
447   assert(stream->flags & UV_HANDLE_CLOSED);
448
449   if (stream->connect_req) {
450     uv__req_unregister(stream->loop, stream->connect_req);
451     stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
452     stream->connect_req = NULL;
453   }
454
455   uv__stream_flush_write_queue(stream, UV_ECANCELED);
456   uv__write_callbacks(stream);
457   uv__drain(stream);
458
459   assert(stream->write_queue_size == 0);
460 }
461
462
463 /* Implements a best effort approach to mitigating accept() EMFILE errors.
464  * We have a spare file descriptor stashed away that we close to get below
465  * the EMFILE limit. Next, we accept all pending connections and close them
466  * immediately to signal the clients that we're overloaded - and we are, but
467  * we still keep on trucking.
468  *
469  * There is one caveat: it's not reliable in a multi-threaded environment.
470  * The file descriptor limit is per process. Our party trick fails if another
471  * thread opens a file or creates a socket in the time window between us
472  * calling close() and accept().
473  */
474 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
475   int err;
476   int emfile_fd;
477
478   if (loop->emfile_fd == -1)
479     return UV_EMFILE;
480
481   uv__close(loop->emfile_fd);
482   loop->emfile_fd = -1;
483
484   do {
485     err = uv__accept(accept_fd);
486     if (err >= 0)
487       uv__close(err);
488   } while (err >= 0 || err == UV_EINTR);
489
490   emfile_fd = uv__open_cloexec("/", O_RDONLY);
491   if (emfile_fd >= 0)
492     loop->emfile_fd = emfile_fd;
493
494   return err;
495 }
496
497
498 #if defined(UV_HAVE_KQUEUE)
499 # define UV_DEC_BACKLOG(w) w->rcount--;
500 #else
501 # define UV_DEC_BACKLOG(w) /* no-op */
502 #endif /* defined(UV_HAVE_KQUEUE) */
503
504
505 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
506   uv_stream_t* stream;
507   int err;
508
509   stream = container_of(w, uv_stream_t, io_watcher);
510   assert(events & POLLIN);
511   assert(stream->accepted_fd == -1);
512   assert(!(stream->flags & UV_HANDLE_CLOSING));
513
514   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
515
516   /* connection_cb can close the server socket while we're
517    * in the loop so check it on each iteration.
518    */
519   while (uv__stream_fd(stream) != -1) {
520     assert(stream->accepted_fd == -1);
521
522 #if defined(UV_HAVE_KQUEUE)
523     if (w->rcount <= 0)
524       return;
525 #endif /* defined(UV_HAVE_KQUEUE) */
526
527     err = uv__accept(uv__stream_fd(stream));
528     if (err < 0) {
529       if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
530         return;  /* Not an error. */
531
532       if (err == UV_ECONNABORTED)
533         continue;  /* Ignore. Nothing we can do about that. */
534
535       if (err == UV_EMFILE || err == UV_ENFILE) {
536         err = uv__emfile_trick(loop, uv__stream_fd(stream));
537         if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
538           break;
539       }
540
541       stream->connection_cb(stream, err);
542       continue;
543     }
544
545     UV_DEC_BACKLOG(w)
546     stream->accepted_fd = err;
547     stream->connection_cb(stream, 0);
548
549     if (stream->accepted_fd != -1) {
550       /* The user hasn't yet accepted called uv_accept() */
551       uv__io_stop(loop, &stream->io_watcher, POLLIN);
552       return;
553     }
554
555     if (stream->type == UV_TCP &&
556         (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
557       /* Give other processes a chance to accept connections. */
558       struct timespec timeout = { 0, 1 };
559       nanosleep(&timeout, NULL);
560     }
561   }
562 }
563
564
565 #undef UV_DEC_BACKLOG
566
567
568 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
569   int err;
570
571   assert(server->loop == client->loop);
572
573   if (server->accepted_fd == -1)
574     return UV_EAGAIN;
575
576   switch (client->type) {
577     case UV_NAMED_PIPE:
578     case UV_TCP:
579       err = uv__stream_open(client,
580                             server->accepted_fd,
581                             UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
582       if (err) {
583         /* TODO handle error */
584         uv__close(server->accepted_fd);
585         goto done;
586       }
587       break;
588
589     case UV_UDP:
590       err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
591       if (err) {
592         uv__close(server->accepted_fd);
593         goto done;
594       }
595       break;
596
597     default:
598       return UV_EINVAL;
599   }
600
601   client->flags |= UV_HANDLE_BOUND;
602
603 done:
604   /* Process queued fds */
605   if (server->queued_fds != NULL) {
606     uv__stream_queued_fds_t* queued_fds;
607
608     queued_fds = server->queued_fds;
609
610     /* Read first */
611     server->accepted_fd = queued_fds->fds[0];
612
613     /* All read, free */
614     assert(queued_fds->offset > 0);
615     if (--queued_fds->offset == 0) {
616       uv__free(queued_fds);
617       server->queued_fds = NULL;
618     } else {
619       /* Shift rest */
620       memmove(queued_fds->fds,
621               queued_fds->fds + 1,
622               queued_fds->offset * sizeof(*queued_fds->fds));
623     }
624   } else {
625     server->accepted_fd = -1;
626     if (err == 0)
627       uv__io_start(server->loop, &server->io_watcher, POLLIN);
628   }
629   return err;
630 }
631
632
633 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
634   int err;
635   if (uv__is_closing(stream)) {
636     return UV_EINVAL;
637   }
638   switch (stream->type) {
639   case UV_TCP:
640     err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
641     break;
642
643   case UV_NAMED_PIPE:
644     err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
645     break;
646
647   default:
648     err = UV_EINVAL;
649   }
650
651   if (err == 0)
652     uv__handle_start(stream);
653
654   return err;
655 }
656
657
658 static void uv__drain(uv_stream_t* stream) {
659   uv_shutdown_t* req;
660   int err;
661
662   assert(QUEUE_EMPTY(&stream->write_queue));
663   if (!(stream->flags & UV_HANDLE_CLOSING)) {
664     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
665     uv__stream_osx_interrupt_select(stream);
666   }
667
668   if (!(stream->flags & UV_HANDLE_SHUTTING))
669     return;
670
671   req = stream->shutdown_req;
672   assert(req);
673
674   if ((stream->flags & UV_HANDLE_CLOSING) ||
675       !(stream->flags & UV_HANDLE_SHUT)) {
676     stream->shutdown_req = NULL;
677     stream->flags &= ~UV_HANDLE_SHUTTING;
678     uv__req_unregister(stream->loop, req);
679
680     err = 0;
681     if (stream->flags & UV_HANDLE_CLOSING)
682       /* The user destroyed the stream before we got to do the shutdown. */
683       err = UV_ECANCELED;
684     else if (shutdown(uv__stream_fd(stream), SHUT_WR))
685       err = UV__ERR(errno);
686     else /* Success. */
687       stream->flags |= UV_HANDLE_SHUT;
688
689     if (req->cb != NULL)
690       req->cb(req, err);
691   }
692 }
693
694
695 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
696   if (n == 1)
697     return write(fd, vec->iov_base, vec->iov_len);
698   else
699     return writev(fd, vec, n);
700 }
701
702
703 static size_t uv__write_req_size(uv_write_t* req) {
704   size_t size;
705
706   assert(req->bufs != NULL);
707   size = uv__count_bufs(req->bufs + req->write_index,
708                         req->nbufs - req->write_index);
709   assert(req->handle->write_queue_size >= size);
710
711   return size;
712 }
713
714
715 /* Returns 1 if all write request data has been written, or 0 if there is still
716  * more data to write.
717  *
718  * Note: the return value only says something about the *current* request.
719  * There may still be other write requests sitting in the queue.
720  */
721 static int uv__write_req_update(uv_stream_t* stream,
722                                 uv_write_t* req,
723                                 size_t n) {
724   uv_buf_t* buf;
725   size_t len;
726
727   assert(n <= stream->write_queue_size);
728   stream->write_queue_size -= n;
729
730   buf = req->bufs + req->write_index;
731
732   do {
733     len = n < buf->len ? n : buf->len;
734     buf->base += len;
735     buf->len -= len;
736     buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
737     n -= len;
738   } while (n > 0);
739
740   req->write_index = buf - req->bufs;
741
742   return req->write_index == req->nbufs;
743 }
744
745
746 static void uv__write_req_finish(uv_write_t* req) {
747   uv_stream_t* stream = req->handle;
748
749   /* Pop the req off tcp->write_queue. */
750   QUEUE_REMOVE(&req->queue);
751
752   /* Only free when there was no error. On error, we touch up write_queue_size
753    * right before making the callback. The reason we don't do that right away
754    * is that a write_queue_size > 0 is our only way to signal to the user that
755    * they should stop writing - which they should if we got an error. Something
756    * to revisit in future revisions of the libuv API.
757    */
758   if (req->error == 0) {
759     if (req->bufs != req->bufsml)
760       uv__free(req->bufs);
761     req->bufs = NULL;
762   }
763
764   /* Add it to the write_completed_queue where it will have its
765    * callback called in the near future.
766    */
767   QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
768   uv__io_feed(stream->loop, &stream->io_watcher);
769 }
770
771
772 static int uv__handle_fd(uv_handle_t* handle) {
773   switch (handle->type) {
774     case UV_NAMED_PIPE:
775     case UV_TCP:
776       return ((uv_stream_t*) handle)->io_watcher.fd;
777
778     case UV_UDP:
779       return ((uv_udp_t*) handle)->io_watcher.fd;
780
781     default:
782       return -1;
783   }
784 }
785
786 static int uv__try_write(uv_stream_t* stream,
787                          const uv_buf_t bufs[],
788                          unsigned int nbufs,
789                          uv_stream_t* send_handle) {
790   struct iovec* iov;
791   int iovmax;
792   int iovcnt;
793   ssize_t n;
794
795   /*
796    * Cast to iovec. We had to have our own uv_buf_t instead of iovec
797    * because Windows's WSABUF is not an iovec.
798    */
799   iov = (struct iovec*) bufs;
800   iovcnt = nbufs;
801
802   iovmax = uv__getiovmax();
803
804   /* Limit iov count to avoid EINVALs from writev() */
805   if (iovcnt > iovmax)
806     iovcnt = iovmax;
807
808   /*
809    * Now do the actual writev. Note that we've been updating the pointers
810    * inside the iov each time we write. So there is no need to offset it.
811    */
812   if (send_handle != NULL) {
813     int fd_to_send;
814     struct msghdr msg;
815     struct cmsghdr *cmsg;
816     union {
817       char data[64];
818       struct cmsghdr alias;
819     } scratch;
820
821     if (uv__is_closing(send_handle))
822       return UV_EBADF;
823
824     fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
825
826     memset(&scratch, 0, sizeof(scratch));
827
828     assert(fd_to_send >= 0);
829
830     msg.msg_name = NULL;
831     msg.msg_namelen = 0;
832     msg.msg_iov = iov;
833     msg.msg_iovlen = iovcnt;
834     msg.msg_flags = 0;
835
836     msg.msg_control = &scratch.alias;
837     msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
838
839     cmsg = CMSG_FIRSTHDR(&msg);
840     cmsg->cmsg_level = SOL_SOCKET;
841     cmsg->cmsg_type = SCM_RIGHTS;
842     cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
843
844     /* silence aliasing warning */
845     {
846       void* pv = CMSG_DATA(cmsg);
847       int* pi = pv;
848       *pi = fd_to_send;
849     }
850
851     do
852       n = sendmsg(uv__stream_fd(stream), &msg, 0);
853     while (n == -1 && errno == EINTR);
854   } else {
855     do
856       n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
857     while (n == -1 && errno == EINTR);
858   }
859
860   if (n >= 0)
861     return n;
862
863   if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
864     return UV_EAGAIN;
865
866 #ifdef __APPLE__
867   /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
868    * have a bug where a race condition causes the kernel to return EPROTOTYPE
869    * because the socket isn't fully constructed. It's probably the result of
870    * the peer closing the connection and that is why libuv translates it to
871    * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
872    * away but some VPN software causes the same behavior except the error is
873    * permanent, not transient, turning the retry mechanism into an infinite
874    * loop. See https://github.com/libuv/libuv/pull/482.
875    */
876   if (errno == EPROTOTYPE)
877     return UV_ECONNRESET;
878 #endif  /* __APPLE__ */
879
880   return UV__ERR(errno);
881 }
882
883 static void uv__write(uv_stream_t* stream) {
884   QUEUE* q;
885   uv_write_t* req;
886   ssize_t n;
887
888   assert(uv__stream_fd(stream) >= 0);
889
890   for (;;) {
891     if (QUEUE_EMPTY(&stream->write_queue))
892       return;
893
894     q = QUEUE_HEAD(&stream->write_queue);
895     req = QUEUE_DATA(q, uv_write_t, queue);
896     assert(req->handle == stream);
897
898     n = uv__try_write(stream,
899                       &(req->bufs[req->write_index]),
900                       req->nbufs - req->write_index,
901                       req->send_handle);
902
903     /* Ensure the handle isn't sent again in case this is a partial write. */
904     if (n >= 0) {
905       req->send_handle = NULL;
906       if (uv__write_req_update(stream, req, n)) {
907         uv__write_req_finish(req);
908         return;  /* TODO(bnoordhuis) Start trying to write the next request. */
909       }
910     } else if (n != UV_EAGAIN)
911       break;
912
913     /* If this is a blocking stream, try again. */
914     if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
915       continue;
916
917     /* We're not done. */
918     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
919
920     /* Notify select() thread about state change */
921     uv__stream_osx_interrupt_select(stream);
922
923     return;
924   }
925
926   req->error = n;
927   uv__write_req_finish(req);
928   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
929   uv__stream_osx_interrupt_select(stream);
930 }
931
932
933 static void uv__write_callbacks(uv_stream_t* stream) {
934   uv_write_t* req;
935   QUEUE* q;
936   QUEUE pq;
937
938   if (QUEUE_EMPTY(&stream->write_completed_queue))
939     return;
940
941   QUEUE_MOVE(&stream->write_completed_queue, &pq);
942
943   while (!QUEUE_EMPTY(&pq)) {
944     /* Pop a req off write_completed_queue. */
945     q = QUEUE_HEAD(&pq);
946     req = QUEUE_DATA(q, uv_write_t, queue);
947     QUEUE_REMOVE(q);
948     uv__req_unregister(stream->loop, req);
949
950     if (req->bufs != NULL) {
951       stream->write_queue_size -= uv__write_req_size(req);
952       if (req->bufs != req->bufsml)
953         uv__free(req->bufs);
954       req->bufs = NULL;
955     }
956
957     /* NOTE: call callback AFTER freeing the request data. */
958     if (req->cb)
959       req->cb(req, req->error);
960   }
961 }
962
963
964 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
965   stream->flags |= UV_HANDLE_READ_EOF;
966   stream->flags &= ~UV_HANDLE_READING;
967   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
968   uv__handle_stop(stream);
969   uv__stream_osx_interrupt_select(stream);
970   stream->read_cb(stream, UV_EOF, buf);
971 }
972
973
974 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
975   uv__stream_queued_fds_t* queued_fds;
976   unsigned int queue_size;
977
978   queued_fds = stream->queued_fds;
979   if (queued_fds == NULL) {
980     queue_size = 8;
981     queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
982                             sizeof(*queued_fds));
983     if (queued_fds == NULL)
984       return UV_ENOMEM;
985     queued_fds->size = queue_size;
986     queued_fds->offset = 0;
987     stream->queued_fds = queued_fds;
988
989     /* Grow */
990   } else if (queued_fds->size == queued_fds->offset) {
991     queue_size = queued_fds->size + 8;
992     queued_fds = uv__realloc(queued_fds,
993                              (queue_size - 1) * sizeof(*queued_fds->fds) +
994                               sizeof(*queued_fds));
995
996     /*
997      * Allocation failure, report back.
998      * NOTE: if it is fatal - sockets will be closed in uv__stream_close
999      */
1000     if (queued_fds == NULL)
1001       return UV_ENOMEM;
1002     queued_fds->size = queue_size;
1003     stream->queued_fds = queued_fds;
1004   }
1005
1006   /* Put fd in a queue */
1007   queued_fds->fds[queued_fds->offset++] = fd;
1008
1009   return 0;
1010 }
1011
1012
1013 #if defined(__PASE__)
1014 /* on IBMi PASE the control message length can not exceed 256. */
1015 # define UV__CMSG_FD_COUNT 60
1016 #else
1017 # define UV__CMSG_FD_COUNT 64
1018 #endif
1019 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1020
1021
1022 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1023   struct cmsghdr* cmsg;
1024
1025   for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1026     char* start;
1027     char* end;
1028     int err;
1029     void* pv;
1030     int* pi;
1031     unsigned int i;
1032     unsigned int count;
1033
1034     if (cmsg->cmsg_type != SCM_RIGHTS) {
1035       fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1036           cmsg->cmsg_type);
1037       continue;
1038     }
1039
1040     /* silence aliasing warning */
1041     pv = CMSG_DATA(cmsg);
1042     pi = pv;
1043
1044     /* Count available fds */
1045     start = (char*) cmsg;
1046     end = (char*) cmsg + cmsg->cmsg_len;
1047     count = 0;
1048     while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1049       count++;
1050     assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1051
1052     for (i = 0; i < count; i++) {
1053       /* Already has accepted fd, queue now */
1054       if (stream->accepted_fd != -1) {
1055         err = uv__stream_queue_fd(stream, pi[i]);
1056         if (err != 0) {
1057           /* Close rest */
1058           for (; i < count; i++)
1059             uv__close(pi[i]);
1060           return err;
1061         }
1062       } else {
1063         stream->accepted_fd = pi[i];
1064       }
1065     }
1066   }
1067
1068   return 0;
1069 }
1070
1071
1072 #ifdef __clang__
1073 # pragma clang diagnostic push
1074 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1075 # pragma clang diagnostic ignored "-Wvla-extension"
1076 #endif
1077
1078 static void uv__read(uv_stream_t* stream) {
1079   uv_buf_t buf;
1080   ssize_t nread;
1081   struct msghdr msg;
1082   char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1083   int count;
1084   int err;
1085   int is_ipc;
1086
1087   stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1088
1089   /* Prevent loop starvation when the data comes in as fast as (or faster than)
1090    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1091    */
1092   count = 32;
1093
1094   is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1095
1096   /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1097    * tcp->read_cb is NULL or not?
1098    */
1099   while (stream->read_cb
1100       && (stream->flags & UV_HANDLE_READING)
1101       && (count-- > 0)) {
1102     assert(stream->alloc_cb != NULL);
1103
1104     buf = uv_buf_init(NULL, 0);
1105     stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1106     if (buf.base == NULL || buf.len == 0) {
1107       /* User indicates it can't or won't handle the read. */
1108       stream->read_cb(stream, UV_ENOBUFS, &buf);
1109       return;
1110     }
1111
1112     assert(buf.base != NULL);
1113     assert(uv__stream_fd(stream) >= 0);
1114
1115     if (!is_ipc) {
1116       do {
1117         nread = read(uv__stream_fd(stream), buf.base, buf.len);
1118       }
1119       while (nread < 0 && errno == EINTR);
1120     } else {
1121       /* ipc uses recvmsg */
1122       msg.msg_flags = 0;
1123       msg.msg_iov = (struct iovec*) &buf;
1124       msg.msg_iovlen = 1;
1125       msg.msg_name = NULL;
1126       msg.msg_namelen = 0;
1127       /* Set up to receive a descriptor even if one isn't in the message */
1128       msg.msg_controllen = sizeof(cmsg_space);
1129       msg.msg_control = cmsg_space;
1130
1131       do {
1132         nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133       }
1134       while (nread < 0 && errno == EINTR);
1135     }
1136
1137     if (nread < 0) {
1138       /* Error */
1139       if (errno == EAGAIN || errno == EWOULDBLOCK) {
1140         /* Wait for the next one. */
1141         if (stream->flags & UV_HANDLE_READING) {
1142           uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1143           uv__stream_osx_interrupt_select(stream);
1144         }
1145         stream->read_cb(stream, 0, &buf);
1146 #if defined(__CYGWIN__) || defined(__MSYS__)
1147       } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1148         uv__stream_eof(stream, &buf);
1149         return;
1150 #endif
1151       } else {
1152         /* Error. User should call uv_close(). */
1153         stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1154         stream->read_cb(stream, UV__ERR(errno), &buf);
1155         if (stream->flags & UV_HANDLE_READING) {
1156           stream->flags &= ~UV_HANDLE_READING;
1157           uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1158           uv__handle_stop(stream);
1159           uv__stream_osx_interrupt_select(stream);
1160         }
1161       }
1162       return;
1163     } else if (nread == 0) {
1164       uv__stream_eof(stream, &buf);
1165       return;
1166     } else {
1167       /* Successful read */
1168       ssize_t buflen = buf.len;
1169
1170       if (is_ipc) {
1171         err = uv__stream_recv_cmsg(stream, &msg);
1172         if (err != 0) {
1173           stream->read_cb(stream, err, &buf);
1174           return;
1175         }
1176       }
1177
1178 #if defined(__MVS__)
1179       if (is_ipc && msg.msg_controllen > 0) {
1180         uv_buf_t blankbuf;
1181         int nread;
1182         struct iovec *old;
1183
1184         blankbuf.base = 0;
1185         blankbuf.len = 0;
1186         old = msg.msg_iov;
1187         msg.msg_iov = (struct iovec*) &blankbuf;
1188         nread = 0;
1189         do {
1190           nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1191           err = uv__stream_recv_cmsg(stream, &msg);
1192           if (err != 0) {
1193             stream->read_cb(stream, err, &buf);
1194             msg.msg_iov = old;
1195             return;
1196           }
1197         } while (nread == 0 && msg.msg_controllen > 0);
1198         msg.msg_iov = old;
1199       }
1200 #endif
1201       stream->read_cb(stream, nread, &buf);
1202
1203       /* Return if we didn't fill the buffer, there is no more data to read. */
1204       if (nread < buflen) {
1205         stream->flags |= UV_HANDLE_READ_PARTIAL;
1206         return;
1207       }
1208     }
1209   }
1210 }
1211
1212
1213 #ifdef __clang__
1214 # pragma clang diagnostic pop
1215 #endif
1216
1217 #undef UV__CMSG_FD_COUNT
1218 #undef UV__CMSG_FD_SIZE
1219
1220
1221 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1222   assert(stream->type == UV_TCP ||
1223          stream->type == UV_TTY ||
1224          stream->type == UV_NAMED_PIPE);
1225
1226   if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1227       stream->flags & UV_HANDLE_SHUT ||
1228       stream->flags & UV_HANDLE_SHUTTING ||
1229       uv__is_closing(stream)) {
1230     return UV_ENOTCONN;
1231   }
1232
1233   assert(uv__stream_fd(stream) >= 0);
1234
1235   /* Initialize request. The `shutdown(2)` call will always be deferred until
1236    * `uv__drain`, just before the callback is run. */
1237   uv__req_init(stream->loop, req, UV_SHUTDOWN);
1238   req->handle = stream;
1239   req->cb = cb;
1240   stream->shutdown_req = req;
1241   stream->flags |= UV_HANDLE_SHUTTING;
1242   stream->flags &= ~UV_HANDLE_WRITABLE;
1243
1244   if (QUEUE_EMPTY(&stream->write_queue))
1245     uv__io_feed(stream->loop, &stream->io_watcher);
1246
1247   return 0;
1248 }
1249
1250
1251 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1252   uv_stream_t* stream;
1253
1254   stream = container_of(w, uv_stream_t, io_watcher);
1255
1256   assert(stream->type == UV_TCP ||
1257          stream->type == UV_NAMED_PIPE ||
1258          stream->type == UV_TTY);
1259   assert(!(stream->flags & UV_HANDLE_CLOSING));
1260
1261   if (stream->connect_req) {
1262     uv__stream_connect(stream);
1263     return;
1264   }
1265
1266   assert(uv__stream_fd(stream) >= 0);
1267
1268   /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1269   if (events & (POLLIN | POLLERR | POLLHUP))
1270     uv__read(stream);
1271
1272   if (uv__stream_fd(stream) == -1)
1273     return;  /* read_cb closed stream. */
1274
1275   /* Short-circuit iff POLLHUP is set, the user is still interested in read
1276    * events and uv__read() reported a partial read but not EOF. If the EOF
1277    * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1278    * have to do anything. If the partial read flag is not set, we can't
1279    * report the EOF yet because there is still data to read.
1280    */
1281   if ((events & POLLHUP) &&
1282       (stream->flags & UV_HANDLE_READING) &&
1283       (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1284       !(stream->flags & UV_HANDLE_READ_EOF)) {
1285     uv_buf_t buf = { NULL, 0 };
1286     uv__stream_eof(stream, &buf);
1287   }
1288
1289   if (uv__stream_fd(stream) == -1)
1290     return;  /* read_cb closed stream. */
1291
1292   if (events & (POLLOUT | POLLERR | POLLHUP)) {
1293     uv__write(stream);
1294     uv__write_callbacks(stream);
1295
1296     /* Write queue drained. */
1297     if (QUEUE_EMPTY(&stream->write_queue))
1298       uv__drain(stream);
1299   }
1300 }
1301
1302
1303 /**
1304  * We get called here from directly following a call to connect(2).
1305  * In order to determine if we've errored out or succeeded must call
1306  * getsockopt.
1307  */
1308 static void uv__stream_connect(uv_stream_t* stream) {
1309   int error;
1310   uv_connect_t* req = stream->connect_req;
1311   socklen_t errorsize = sizeof(int);
1312
1313   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1314   assert(req);
1315
1316   if (stream->delayed_error) {
1317     /* To smooth over the differences between unixes errors that
1318      * were reported synchronously on the first connect can be delayed
1319      * until the next tick--which is now.
1320      */
1321     error = stream->delayed_error;
1322     stream->delayed_error = 0;
1323   } else {
1324     /* Normal situation: we need to get the socket error from the kernel. */
1325     assert(uv__stream_fd(stream) >= 0);
1326     getsockopt(uv__stream_fd(stream),
1327                SOL_SOCKET,
1328                SO_ERROR,
1329                &error,
1330                &errorsize);
1331     error = UV__ERR(error);
1332   }
1333
1334   if (error == UV__ERR(EINPROGRESS))
1335     return;
1336
1337   stream->connect_req = NULL;
1338   uv__req_unregister(stream->loop, req);
1339
1340   if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1341     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1342   }
1343
1344   if (req->cb)
1345     req->cb(req, error);
1346
1347   if (uv__stream_fd(stream) == -1)
1348     return;
1349
1350   if (error < 0) {
1351     uv__stream_flush_write_queue(stream, UV_ECANCELED);
1352     uv__write_callbacks(stream);
1353   }
1354 }
1355
1356
1357 static int uv__check_before_write(uv_stream_t* stream,
1358                                   unsigned int nbufs,
1359                                   uv_stream_t* send_handle) {
1360   assert(nbufs > 0);
1361   assert((stream->type == UV_TCP ||
1362           stream->type == UV_NAMED_PIPE ||
1363           stream->type == UV_TTY) &&
1364          "uv_write (unix) does not yet support other types of streams");
1365
1366   if (uv__stream_fd(stream) < 0)
1367     return UV_EBADF;
1368
1369   if (!(stream->flags & UV_HANDLE_WRITABLE))
1370     return UV_EPIPE;
1371
1372   if (send_handle != NULL) {
1373     if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1374       return UV_EINVAL;
1375
1376     /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1377      * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1378      * evaluates to a function that operates on a uv_stream_t with a couple of
1379      * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1380      * which works but only by accident.
1381      */
1382     if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1383       return UV_EBADF;
1384
1385 #if defined(__CYGWIN__) || defined(__MSYS__)
1386     /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1387        See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1388     return UV_ENOSYS;
1389 #endif
1390   }
1391
1392   return 0;
1393 }
1394
1395 int uv_write2(uv_write_t* req,
1396               uv_stream_t* stream,
1397               const uv_buf_t bufs[],
1398               unsigned int nbufs,
1399               uv_stream_t* send_handle,
1400               uv_write_cb cb) {
1401   int empty_queue;
1402   int err;
1403
1404   err = uv__check_before_write(stream, nbufs, send_handle);
1405   if (err < 0)
1406     return err;
1407
1408   /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1409    * it means there are error-state requests in the write_completed_queue that
1410    * will touch up write_queue_size later, see also uv__write_req_finish().
1411    * We could check that write_queue is empty instead but that implies making
1412    * a write() syscall when we know that the handle is in error mode.
1413    */
1414   empty_queue = (stream->write_queue_size == 0);
1415
1416   /* Initialize the req */
1417   uv__req_init(stream->loop, req, UV_WRITE);
1418   req->cb = cb;
1419   req->handle = stream;
1420   req->error = 0;
1421   req->send_handle = send_handle;
1422   QUEUE_INIT(&req->queue);
1423
1424   req->bufs = req->bufsml;
1425   if (nbufs > ARRAY_SIZE(req->bufsml))
1426     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1427
1428   if (req->bufs == NULL)
1429     return UV_ENOMEM;
1430
1431   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1432   req->nbufs = nbufs;
1433   req->write_index = 0;
1434   stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1435
1436   /* Append the request to write_queue. */
1437   QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1438
1439   /* If the queue was empty when this function began, we should attempt to
1440    * do the write immediately. Otherwise start the write_watcher and wait
1441    * for the fd to become writable.
1442    */
1443   if (stream->connect_req) {
1444     /* Still connecting, do nothing. */
1445   }
1446   else if (empty_queue) {
1447     uv__write(stream);
1448   }
1449   else {
1450     /*
1451      * blocking streams should never have anything in the queue.
1452      * if this assert fires then somehow the blocking stream isn't being
1453      * sufficiently flushed in uv__write.
1454      */
1455     assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1456     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1457     uv__stream_osx_interrupt_select(stream);
1458   }
1459
1460   return 0;
1461 }
1462
1463
1464 /* The buffers to be written must remain valid until the callback is called.
1465  * This is not required for the uv_buf_t array.
1466  */
1467 int uv_write(uv_write_t* req,
1468              uv_stream_t* handle,
1469              const uv_buf_t bufs[],
1470              unsigned int nbufs,
1471              uv_write_cb cb) {
1472   return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1473 }
1474
1475
1476 int uv_try_write(uv_stream_t* stream,
1477                  const uv_buf_t bufs[],
1478                  unsigned int nbufs) {
1479   return uv_try_write2(stream, bufs, nbufs, NULL);
1480 }
1481
1482
1483 int uv_try_write2(uv_stream_t* stream,
1484                   const uv_buf_t bufs[],
1485                   unsigned int nbufs,
1486                   uv_stream_t* send_handle) {
1487   int err;
1488
1489   /* Connecting or already writing some data */
1490   if (stream->connect_req != NULL || stream->write_queue_size != 0)
1491     return UV_EAGAIN;
1492
1493   err = uv__check_before_write(stream, nbufs, NULL);
1494   if (err < 0)
1495     return err;
1496
1497   return uv__try_write(stream, bufs, nbufs, send_handle);
1498 }
1499
1500
1501 int uv__read_start(uv_stream_t* stream,
1502                    uv_alloc_cb alloc_cb,
1503                    uv_read_cb read_cb) {
1504   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1505       stream->type == UV_TTY);
1506
1507   /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1508    * just expresses the desired state of the user. */
1509   stream->flags |= UV_HANDLE_READING;
1510   stream->flags &= ~UV_HANDLE_READ_EOF;
1511
1512   /* TODO: try to do the read inline? */
1513   assert(uv__stream_fd(stream) >= 0);
1514   assert(alloc_cb);
1515
1516   stream->read_cb = read_cb;
1517   stream->alloc_cb = alloc_cb;
1518
1519   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1520   uv__handle_start(stream);
1521   uv__stream_osx_interrupt_select(stream);
1522
1523   return 0;
1524 }
1525
1526
1527 int uv_read_stop(uv_stream_t* stream) {
1528   if (!(stream->flags & UV_HANDLE_READING))
1529     return 0;
1530
1531   stream->flags &= ~UV_HANDLE_READING;
1532   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1533   uv__handle_stop(stream);
1534   uv__stream_osx_interrupt_select(stream);
1535
1536   stream->read_cb = NULL;
1537   stream->alloc_cb = NULL;
1538   return 0;
1539 }
1540
1541
1542 int uv_is_readable(const uv_stream_t* stream) {
1543   return !!(stream->flags & UV_HANDLE_READABLE);
1544 }
1545
1546
1547 int uv_is_writable(const uv_stream_t* stream) {
1548   return !!(stream->flags & UV_HANDLE_WRITABLE);
1549 }
1550
1551
1552 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
1553 int uv___stream_fd(const uv_stream_t* handle) {
1554   const uv__stream_select_t* s;
1555
1556   assert(handle->type == UV_TCP ||
1557          handle->type == UV_TTY ||
1558          handle->type == UV_NAMED_PIPE);
1559
1560   s = handle->select;
1561   if (s != NULL)
1562     return s->fd;
1563
1564   return handle->io_watcher.fd;
1565 }
1566 #endif /* defined(__APPLE__) */
1567
1568
1569 void uv__stream_close(uv_stream_t* handle) {
1570   unsigned int i;
1571   uv__stream_queued_fds_t* queued_fds;
1572
1573 #if defined(__APPLE__) && !defined(CMAKE_BOOTSTRAP)
1574   /* Terminate select loop first */
1575   if (handle->select != NULL) {
1576     uv__stream_select_t* s;
1577
1578     s = handle->select;
1579
1580     uv_sem_post(&s->close_sem);
1581     uv_sem_post(&s->async_sem);
1582     uv__stream_osx_interrupt_select(handle);
1583     uv_thread_join(&s->thread);
1584     uv_sem_destroy(&s->close_sem);
1585     uv_sem_destroy(&s->async_sem);
1586     uv__close(s->fake_fd);
1587     uv__close(s->int_fd);
1588     uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1589
1590     handle->select = NULL;
1591   }
1592 #endif /* defined(__APPLE__) */
1593
1594   uv__io_close(handle->loop, &handle->io_watcher);
1595   uv_read_stop(handle);
1596   uv__handle_stop(handle);
1597   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1598
1599   if (handle->io_watcher.fd != -1) {
1600     /* Don't close stdio file descriptors.  Nothing good comes from it. */
1601     if (handle->io_watcher.fd > STDERR_FILENO)
1602       uv__close(handle->io_watcher.fd);
1603     handle->io_watcher.fd = -1;
1604   }
1605
1606   if (handle->accepted_fd != -1) {
1607     uv__close(handle->accepted_fd);
1608     handle->accepted_fd = -1;
1609   }
1610
1611   /* Close all queued fds */
1612   if (handle->queued_fds != NULL) {
1613     queued_fds = handle->queued_fds;
1614     for (i = 0; i < queued_fds->offset; i++)
1615       uv__close(queued_fds->fds[i]);
1616     uv__free(handle->queued_fds);
1617     handle->queued_fds = NULL;
1618   }
1619
1620   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1621 }
1622
1623
1624 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1625   /* Don't need to check the file descriptor, uv__nonblock()
1626    * will fail with EBADF if it's not valid.
1627    */
1628   return uv__nonblock(uv__stream_fd(handle), !blocking);
1629 }