afd2a051aadb1cbce6621df3753385c0d44e9909
[platform/upstream/nodejs.git] / deps / uv / src / unix / stream.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45
46 struct uv__stream_select_s {
47   uv_stream_t* stream;
48   uv_thread_t thread;
49   uv_sem_t close_sem;
50   uv_sem_t async_sem;
51   uv_async_t async;
52   int events;
53   int fake_fd;
54   int int_fd;
55   int fd;
56 };
57 #endif /* defined(__APPLE__) */
58
59 static void uv__stream_connect(uv_stream_t*);
60 static void uv__write(uv_stream_t* stream);
61 static void uv__read(uv_stream_t* stream);
62 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
63 static size_t uv__write_req_size(uv_write_t* req);
64
65
66 /* Used by the accept() EMFILE party trick. */
67 static int uv__open_cloexec(const char* path, int flags) {
68   int err;
69   int fd;
70
71 #if defined(__linux__)
72   fd = open(path, flags | UV__O_CLOEXEC);
73   if (fd != -1)
74     return fd;
75
76   if (errno != EINVAL)
77     return -errno;
78
79   /* O_CLOEXEC not supported. */
80 #endif
81
82   fd = open(path, flags);
83   if (fd == -1)
84     return -errno;
85
86   err = uv__cloexec(fd, 1);
87   if (err) {
88     uv__close(fd);
89     return err;
90   }
91
92   return fd;
93 }
94
95
96 static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) {
97   unsigned int i;
98   size_t bytes;
99
100   bytes = 0;
101   for (i = 0; i < nbufs; i++)
102     bytes += bufs[i].len;
103
104   return bytes;
105 }
106
107
108 void uv__stream_init(uv_loop_t* loop,
109                      uv_stream_t* stream,
110                      uv_handle_type type) {
111   int err;
112
113   uv__handle_init(loop, (uv_handle_t*)stream, type);
114   stream->read_cb = NULL;
115   stream->read2_cb = NULL;
116   stream->alloc_cb = NULL;
117   stream->close_cb = NULL;
118   stream->connection_cb = NULL;
119   stream->connect_req = NULL;
120   stream->shutdown_req = NULL;
121   stream->accepted_fd = -1;
122   stream->delayed_error = 0;
123   QUEUE_INIT(&stream->write_queue);
124   QUEUE_INIT(&stream->write_completed_queue);
125   stream->write_queue_size = 0;
126
127   if (loop->emfile_fd == -1) {
128     err = uv__open_cloexec("/", O_RDONLY);
129     if (err >= 0)
130       loop->emfile_fd = err;
131   }
132
133 #if defined(__APPLE__)
134   stream->select = NULL;
135 #endif /* defined(__APPLE_) */
136
137   uv__io_init(&stream->io_watcher, uv__stream_io, -1);
138 }
139
140
141 #if defined(__APPLE__)
142 static void uv__stream_osx_select(void* arg) {
143   uv_stream_t* stream;
144   uv__stream_select_t* s;
145   char buf[1024];
146   fd_set sread;
147   fd_set swrite;
148   int events;
149   int fd;
150   int r;
151   int max_fd;
152
153   stream = arg;
154   s = stream->select;
155   fd = s->fd;
156
157   if (fd > s->int_fd)
158     max_fd = fd;
159   else
160     max_fd = s->int_fd;
161
162   while (1) {
163     /* Terminate on semaphore */
164     if (uv_sem_trywait(&s->close_sem) == 0)
165       break;
166
167     /* Watch fd using select(2) */
168     FD_ZERO(&sread);
169     FD_ZERO(&swrite);
170
171     if (uv_is_readable(stream))
172       FD_SET(fd, &sread);
173     if (uv_is_writable(stream))
174       FD_SET(fd, &swrite);
175     FD_SET(s->int_fd, &sread);
176
177     /* Wait indefinitely for fd events */
178     r = select(max_fd + 1, &sread, &swrite, NULL, NULL);
179     if (r == -1) {
180       if (errno == EINTR)
181         continue;
182
183       /* XXX: Possible?! */
184       abort();
185     }
186
187     /* Ignore timeouts */
188     if (r == 0)
189       continue;
190
191     /* Empty socketpair's buffer in case of interruption */
192     if (FD_ISSET(s->int_fd, &sread))
193       while (1) {
194         r = read(s->int_fd, buf, sizeof(buf));
195
196         if (r == sizeof(buf))
197           continue;
198
199         if (r != -1)
200           break;
201
202         if (errno == EAGAIN || errno == EWOULDBLOCK)
203           break;
204
205         if (errno == EINTR)
206           continue;
207
208         abort();
209       }
210
211     /* Handle events */
212     events = 0;
213     if (FD_ISSET(fd, &sread))
214       events |= UV__POLLIN;
215     if (FD_ISSET(fd, &swrite))
216       events |= UV__POLLOUT;
217
218     assert(events != 0 || FD_ISSET(s->int_fd, &sread));
219     if (events != 0) {
220       ACCESS_ONCE(int, s->events) = events;
221
222       uv_async_send(&s->async);
223       uv_sem_wait(&s->async_sem);
224
225       /* Should be processed at this stage */
226       assert((s->events == 0) || (stream->flags & UV_CLOSING));
227     }
228   }
229 }
230
231
232 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
233   /* Notify select() thread about state change */
234   uv__stream_select_t* s;
235   int r;
236
237   s = stream->select;
238
239   /* Interrupt select() loop
240    * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
241    * emit read event on other side
242    */
243   do
244     r = write(s->fake_fd, "x", 1);
245   while (r == -1 && errno == EINTR);
246
247   assert(r == 1);
248 }
249
250
251 static void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
252   uv__stream_select_t* s;
253   uv_stream_t* stream;
254   int events;
255
256   s = container_of(handle, uv__stream_select_t, async);
257   stream = s->stream;
258
259   /* Get and reset stream's events */
260   events = s->events;
261   ACCESS_ONCE(int, s->events) = 0;
262   uv_sem_post(&s->async_sem);
263
264   assert(events != 0);
265   assert(events == (events & (UV__POLLIN | UV__POLLOUT)));
266
267   /* Invoke callback on event-loop */
268   if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
269     uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
270
271   if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
272     uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
273 }
274
275
276 static void uv__stream_osx_cb_close(uv_handle_t* async) {
277   uv__stream_select_t* s;
278
279   s = container_of(async, uv__stream_select_t, async);
280   free(s);
281 }
282
283
284 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
285   /*
286    * kqueue doesn't work with some files from /dev mount on osx.
287    * select(2) in separate thread for those fds
288    */
289
290   struct kevent filter[1];
291   struct kevent events[1];
292   struct timespec timeout;
293   uv__stream_select_t* s;
294   int fds[2];
295   int err;
296   int ret;
297   int kq;
298
299   kq = kqueue();
300   if (kq == -1) {
301     perror("(libuv) kqueue()");
302     return -errno;
303   }
304
305   EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
306
307   /* Use small timeout, because we only want to capture EINVALs */
308   timeout.tv_sec = 0;
309   timeout.tv_nsec = 1;
310
311   ret = kevent(kq, filter, 1, events, 1, &timeout);
312   uv__close(kq);
313
314   if (ret == -1)
315     return -errno;
316
317   if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
318     return 0;
319
320   /* At this point we definitely know that this fd won't work with kqueue */
321   s = malloc(sizeof(*s));
322   if (s == NULL)
323     return -ENOMEM;
324
325   s->events = 0;
326   s->fd = *fd;
327
328   err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
329   if (err) {
330     free(s);
331     return err;
332   }
333
334   s->async.flags |= UV__HANDLE_INTERNAL;
335   uv__handle_unref(&s->async);
336
337   if (uv_sem_init(&s->close_sem, 0))
338     goto fatal1;
339
340   if (uv_sem_init(&s->async_sem, 0))
341     goto fatal2;
342
343   /* Create fds for io watcher and to interrupt the select() loop. */
344   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
345     goto fatal3;
346
347   s->fake_fd = fds[0];
348   s->int_fd = fds[1];
349
350   if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
351     goto fatal4;
352
353   s->stream = stream;
354   stream->select = s;
355   *fd = s->fake_fd;
356
357   return 0;
358
359 fatal4:
360   uv__close(s->fake_fd);
361   uv__close(s->int_fd);
362   s->fake_fd = -1;
363   s->int_fd = -1;
364 fatal3:
365   uv_sem_destroy(&s->async_sem);
366 fatal2:
367   uv_sem_destroy(&s->close_sem);
368 fatal1:
369   uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
370   return -errno;
371 }
372 #endif /* defined(__APPLE__) */
373
374
375 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
376   assert(fd >= 0);
377   stream->flags |= flags;
378
379   if (stream->type == UV_TCP) {
380     if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
381       return -errno;
382
383     /* TODO Use delay the user passed in. */
384     if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
385       return -errno;
386   }
387
388   stream->io_watcher.fd = fd;
389
390   return 0;
391 }
392
393
394 void uv__stream_destroy(uv_stream_t* stream) {
395   uv_write_t* req;
396   QUEUE* q;
397
398   assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT));
399   assert(stream->flags & UV_CLOSED);
400
401   if (stream->connect_req) {
402     uv__req_unregister(stream->loop, stream->connect_req);
403     stream->connect_req->cb(stream->connect_req, -ECANCELED);
404     stream->connect_req = NULL;
405   }
406
407   while (!QUEUE_EMPTY(&stream->write_queue)) {
408     q = QUEUE_HEAD(&stream->write_queue);
409     QUEUE_REMOVE(q);
410
411     req = QUEUE_DATA(q, uv_write_t, queue);
412     uv__req_unregister(stream->loop, req);
413
414     if (req->bufs != req->bufsml)
415       free(req->bufs);
416     req->bufs = NULL;
417
418     if (req->cb != NULL)
419       req->cb(req, -ECANCELED);
420   }
421
422   while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
423     q = QUEUE_HEAD(&stream->write_completed_queue);
424     QUEUE_REMOVE(q);
425
426     req = QUEUE_DATA(q, uv_write_t, queue);
427     uv__req_unregister(stream->loop, req);
428
429     if (req->bufs != NULL) {
430       stream->write_queue_size -= uv__write_req_size(req);
431       if (req->bufs != req->bufsml)
432         free(req->bufs);
433       req->bufs = NULL;
434     }
435
436     if (req->cb)
437       req->cb(req, req->error);
438   }
439
440   if (stream->shutdown_req) {
441     /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
442      * fait accompli at this point. Maybe we should revisit this in v0.11.
443      * A possible reason for leaving it unchanged is that it informs the
444      * callee that the handle has been destroyed.
445      */
446     uv__req_unregister(stream->loop, stream->shutdown_req);
447     stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
448     stream->shutdown_req = NULL;
449   }
450 }
451
452
453 /* Implements a best effort approach to mitigating accept() EMFILE errors.
454  * We have a spare file descriptor stashed away that we close to get below
455  * the EMFILE limit. Next, we accept all pending connections and close them
456  * immediately to signal the clients that we're overloaded - and we are, but
457  * we still keep on trucking.
458  *
459  * There is one caveat: it's not reliable in a multi-threaded environment.
460  * The file descriptor limit is per process. Our party trick fails if another
461  * thread opens a file or creates a socket in the time window between us
462  * calling close() and accept().
463  */
464 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
465   int err;
466
467   if (loop->emfile_fd == -1)
468     return -EMFILE;
469
470   uv__close(loop->emfile_fd);
471   loop->emfile_fd = -1;
472
473   do {
474     err = uv__accept(accept_fd);
475     if (err >= 0)
476       uv__close(err);
477   } while (err >= 0 || err == -EINTR);
478
479   SAVE_ERRNO(loop->emfile_fd = uv__open_cloexec("/", O_RDONLY));
480   return err;
481 }
482
483
484 #if defined(UV_HAVE_KQUEUE)
485 # define UV_DEC_BACKLOG(w) w->rcount--;
486 #else
487 # define UV_DEC_BACKLOG(w) /* no-op */
488 #endif /* defined(UV_HAVE_KQUEUE) */
489
490
491 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
492   uv_stream_t* stream;
493   int err;
494
495   stream = container_of(w, uv_stream_t, io_watcher);
496   assert(events == UV__POLLIN);
497   assert(stream->accepted_fd == -1);
498   assert(!(stream->flags & UV_CLOSING));
499
500   uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
501
502   /* connection_cb can close the server socket while we're
503    * in the loop so check it on each iteration.
504    */
505   while (uv__stream_fd(stream) != -1) {
506     assert(stream->accepted_fd == -1);
507
508 #if defined(UV_HAVE_KQUEUE)
509     if (w->rcount <= 0)
510       return;
511 #endif /* defined(UV_HAVE_KQUEUE) */
512
513     err = uv__accept(uv__stream_fd(stream));
514     if (err < 0) {
515       if (err == -EAGAIN || err == -EWOULDBLOCK)
516         return;  /* Not an error. */
517
518       if (err == -ECONNABORTED)
519         continue;  /* Ignore. Nothing we can do about that. */
520
521       if (err == -EMFILE || err == -ENFILE) {
522         err = uv__emfile_trick(loop, uv__stream_fd(stream));
523         if (err == -EAGAIN || err == -EWOULDBLOCK)
524           break;
525       }
526
527       stream->connection_cb(stream, err);
528       continue;
529     }
530
531     UV_DEC_BACKLOG(w)
532     stream->accepted_fd = err;
533     stream->connection_cb(stream, 0);
534
535     if (stream->accepted_fd != -1) {
536       /* The user hasn't yet accepted called uv_accept() */
537       uv__io_stop(loop, &stream->io_watcher, UV__POLLIN);
538       return;
539     }
540
541     if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) {
542       /* Give other processes a chance to accept connections. */
543       struct timespec timeout = { 0, 1 };
544       nanosleep(&timeout, NULL);
545     }
546   }
547 }
548
549
550 #undef UV_DEC_BACKLOG
551
552
553 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
554   int err;
555
556   /* TODO document this */
557   assert(server->loop == client->loop);
558
559   if (server->accepted_fd == -1)
560     return -EAGAIN;
561
562   switch (client->type) {
563     case UV_NAMED_PIPE:
564     case UV_TCP:
565       err = uv__stream_open(client,
566                             server->accepted_fd,
567                             UV_STREAM_READABLE | UV_STREAM_WRITABLE);
568       if (err) {
569         /* TODO handle error */
570         uv__close(server->accepted_fd);
571         server->accepted_fd = -1;
572         return err;
573       }
574       break;
575
576     case UV_UDP:
577       err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
578       if (err) {
579         uv__close(server->accepted_fd);
580         server->accepted_fd = -1;
581         return err;
582       }
583       break;
584
585     default:
586       assert(0);
587   }
588
589   uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
590   server->accepted_fd = -1;
591   return 0;
592 }
593
594
595 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
596   int err;
597
598   err = -EINVAL;
599   switch (stream->type) {
600   case UV_TCP:
601     err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
602     break;
603
604   case UV_NAMED_PIPE:
605     err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
606     break;
607
608   default:
609     assert(0);
610   }
611
612   if (err == 0)
613     uv__handle_start(stream);
614
615   return err;
616 }
617
618
619 static void uv__drain(uv_stream_t* stream) {
620   uv_shutdown_t* req;
621   int err;
622
623   assert(QUEUE_EMPTY(&stream->write_queue));
624   uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
625
626   /* Shutdown? */
627   if ((stream->flags & UV_STREAM_SHUTTING) &&
628       !(stream->flags & UV_CLOSING) &&
629       !(stream->flags & UV_STREAM_SHUT)) {
630     assert(stream->shutdown_req);
631
632     req = stream->shutdown_req;
633     stream->shutdown_req = NULL;
634     stream->flags &= ~UV_STREAM_SHUTTING;
635     uv__req_unregister(stream->loop, req);
636
637     err = 0;
638     if (shutdown(uv__stream_fd(stream), SHUT_WR))
639       err = -errno;
640
641     if (err == 0)
642       stream->flags |= UV_STREAM_SHUT;
643
644     if (req->cb != NULL)
645       req->cb(req, err);
646   }
647 }
648
649
650 static size_t uv__write_req_size(uv_write_t* req) {
651   size_t size;
652
653   assert(req->bufs != NULL);
654   size = uv_count_bufs(req->bufs + req->write_index,
655                        req->nbufs - req->write_index);
656   assert(req->handle->write_queue_size >= size);
657
658   return size;
659 }
660
661
662 static void uv__write_req_finish(uv_write_t* req) {
663   uv_stream_t* stream = req->handle;
664
665   /* Pop the req off tcp->write_queue. */
666   QUEUE_REMOVE(&req->queue);
667
668   /* Only free when there was no error. On error, we touch up write_queue_size
669    * right before making the callback. The reason we don't do that right away
670    * is that a write_queue_size > 0 is our only way to signal to the user that
671    * they should stop writing - which they should if we got an error. Something
672    * to revisit in future revisions of the libuv API.
673    */
674   if (req->error == 0) {
675     if (req->bufs != req->bufsml)
676       free(req->bufs);
677     req->bufs = NULL;
678   }
679
680   /* Add it to the write_completed_queue where it will have its
681    * callback called in the near future.
682    */
683   QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
684   uv__io_feed(stream->loop, &stream->io_watcher);
685 }
686
687
688 static int uv__handle_fd(uv_handle_t* handle) {
689   switch (handle->type) {
690     case UV_NAMED_PIPE:
691     case UV_TCP:
692       return ((uv_stream_t*) handle)->io_watcher.fd;
693
694     case UV_UDP:
695       return ((uv_udp_t*) handle)->io_watcher.fd;
696
697     default:
698       return -1;
699   }
700 }
701
702 static int uv__getiovmax() {
703 #if defined(IOV_MAX)
704   return IOV_MAX;
705 #elif defined(_SC_IOV_MAX)
706   static int iovmax = -1;
707   if (iovmax == -1)
708     iovmax = sysconf(_SC_IOV_MAX);
709   return iovmax;
710 #else
711   return 1024;
712 #endif
713 }
714
715 static void uv__write(uv_stream_t* stream) {
716   struct iovec* iov;
717   QUEUE* q;
718   uv_write_t* req;
719   int iovmax;
720   int iovcnt;
721   ssize_t n;
722
723 start:
724
725   assert(uv__stream_fd(stream) >= 0);
726
727   if (QUEUE_EMPTY(&stream->write_queue))
728     return;
729
730   q = QUEUE_HEAD(&stream->write_queue);
731   req = QUEUE_DATA(q, uv_write_t, queue);
732   assert(req->handle == stream);
733
734   /*
735    * Cast to iovec. We had to have our own uv_buf_t instead of iovec
736    * because Windows's WSABUF is not an iovec.
737    */
738   assert(sizeof(uv_buf_t) == sizeof(struct iovec));
739   iov = (struct iovec*) &(req->bufs[req->write_index]);
740   iovcnt = req->nbufs - req->write_index;
741
742   iovmax = uv__getiovmax();
743
744   /* Limit iov count to avoid EINVALs from writev() */
745   if (iovcnt > iovmax)
746     iovcnt = iovmax;
747
748   /*
749    * Now do the actual writev. Note that we've been updating the pointers
750    * inside the iov each time we write. So there is no need to offset it.
751    */
752
753   if (req->send_handle) {
754     struct msghdr msg;
755     char scratch[64];
756     struct cmsghdr *cmsg;
757     int fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
758
759     assert(fd_to_send >= 0);
760
761     msg.msg_name = NULL;
762     msg.msg_namelen = 0;
763     msg.msg_iov = iov;
764     msg.msg_iovlen = iovcnt;
765     msg.msg_flags = 0;
766
767     msg.msg_control = (void*) scratch;
768     msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
769
770     cmsg = CMSG_FIRSTHDR(&msg);
771     cmsg->cmsg_level = SOL_SOCKET;
772     cmsg->cmsg_type = SCM_RIGHTS;
773     cmsg->cmsg_len = msg.msg_controllen;
774
775     /* silence aliasing warning */
776     {
777       void* pv = CMSG_DATA(cmsg);
778       int* pi = pv;
779       *pi = fd_to_send;
780     }
781
782     do {
783       n = sendmsg(uv__stream_fd(stream), &msg, 0);
784     }
785     while (n == -1 && errno == EINTR);
786   } else {
787     do {
788       if (iovcnt == 1) {
789         n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
790       } else {
791         n = writev(uv__stream_fd(stream), iov, iovcnt);
792       }
793     }
794     while (n == -1 && errno == EINTR);
795   }
796
797   if (n < 0) {
798     if (errno != EAGAIN && errno != EWOULDBLOCK) {
799       /* Error */
800       req->error = -errno;
801       uv__write_req_finish(req);
802       uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
803       if (!uv__io_active(&stream->io_watcher, UV__POLLIN))
804         uv__handle_stop(stream);
805       return;
806     } else if (stream->flags & UV_STREAM_BLOCKING) {
807       /* If this is a blocking stream, try again. */
808       goto start;
809     }
810   } else {
811     /* Successful write */
812
813     while (n >= 0) {
814       uv_buf_t* buf = &(req->bufs[req->write_index]);
815       size_t len = buf->len;
816
817       assert(req->write_index < req->nbufs);
818
819       if ((size_t)n < len) {
820         buf->base += n;
821         buf->len -= n;
822         stream->write_queue_size -= n;
823         n = 0;
824
825         /* There is more to write. */
826         if (stream->flags & UV_STREAM_BLOCKING) {
827           /*
828            * If we're blocking then we should not be enabling the write
829            * watcher - instead we need to try again.
830            */
831           goto start;
832         } else {
833           /* Break loop and ensure the watcher is pending. */
834           break;
835         }
836
837       } else {
838         /* Finished writing the buf at index req->write_index. */
839         req->write_index++;
840
841         assert((size_t)n >= len);
842         n -= len;
843
844         assert(stream->write_queue_size >= len);
845         stream->write_queue_size -= len;
846
847         if (req->write_index == req->nbufs) {
848           /* Then we're done! */
849           assert(n == 0);
850           uv__write_req_finish(req);
851           /* TODO: start trying to write the next request. */
852           return;
853         }
854       }
855     }
856   }
857
858   /* Either we've counted n down to zero or we've got EAGAIN. */
859   assert(n == 0 || n == -1);
860
861   /* Only non-blocking streams should use the write_watcher. */
862   assert(!(stream->flags & UV_STREAM_BLOCKING));
863
864   /* We're not done. */
865   uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
866 }
867
868
869 static void uv__write_callbacks(uv_stream_t* stream) {
870   uv_write_t* req;
871   QUEUE* q;
872
873   while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
874     /* Pop a req off write_completed_queue. */
875     q = QUEUE_HEAD(&stream->write_completed_queue);
876     req = QUEUE_DATA(q, uv_write_t, queue);
877     QUEUE_REMOVE(q);
878     uv__req_unregister(stream->loop, req);
879
880     if (req->bufs != NULL) {
881       stream->write_queue_size -= uv__write_req_size(req);
882       if (req->bufs != req->bufsml)
883         free(req->bufs);
884       req->bufs = NULL;
885     }
886
887     /* NOTE: call callback AFTER freeing the request data. */
888     if (req->cb)
889       req->cb(req, req->error);
890   }
891
892   assert(QUEUE_EMPTY(&stream->write_completed_queue));
893
894   /* Write queue drained. */
895   if (QUEUE_EMPTY(&stream->write_queue))
896     uv__drain(stream);
897 }
898
899
900 static uv_handle_type uv__handle_type(int fd) {
901   struct sockaddr_storage ss;
902   socklen_t len;
903   int type;
904
905   memset(&ss, 0, sizeof(ss));
906   len = sizeof(ss);
907
908   if (getsockname(fd, (struct sockaddr*)&ss, &len))
909     return UV_UNKNOWN_HANDLE;
910
911   len = sizeof type;
912
913   if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
914     return UV_UNKNOWN_HANDLE;
915
916   if (type == SOCK_STREAM) {
917     switch (ss.ss_family) {
918       case AF_UNIX:
919         return UV_NAMED_PIPE;
920       case AF_INET:
921       case AF_INET6:
922         return UV_TCP;
923       }
924   }
925
926   if (type == SOCK_DGRAM &&
927       (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
928     return UV_UDP;
929
930   return UV_UNKNOWN_HANDLE;
931 }
932
933
934 static void uv__stream_read_cb(uv_stream_t* stream,
935                                int status,
936                                const uv_buf_t* buf,
937                                uv_handle_type type) {
938   if (stream->read_cb != NULL)
939     stream->read_cb(stream, status, buf);
940   else
941     stream->read2_cb((uv_pipe_t*) stream, status, buf, type);
942 }
943
944
945 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
946   stream->flags |= UV_STREAM_READ_EOF;
947   uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
948   if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
949     uv__handle_stop(stream);
950   uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
951 }
952
953
954 static void uv__read(uv_stream_t* stream) {
955   uv_buf_t buf;
956   ssize_t nread;
957   struct msghdr msg;
958   struct cmsghdr* cmsg;
959   char cmsg_space[64];
960   int count;
961
962   stream->flags &= ~UV_STREAM_READ_PARTIAL;
963
964   /* Prevent loop starvation when the data comes in as fast as (or faster than)
965    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
966    */
967   count = 32;
968
969   /* XXX: Maybe instead of having UV_STREAM_READING we just test if
970    * tcp->read_cb is NULL or not?
971    */
972   while ((stream->read_cb || stream->read2_cb)
973       && (stream->flags & UV_STREAM_READING)
974       && (count-- > 0)) {
975     assert(stream->alloc_cb != NULL);
976
977     stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
978     if (buf.len == 0) {
979       /* User indicates it can't or won't handle the read. */
980       uv__stream_read_cb(stream, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
981       return;
982     }
983
984     assert(buf.base != NULL);
985     assert(uv__stream_fd(stream) >= 0);
986
987     if (stream->read_cb) {
988       do {
989         nread = read(uv__stream_fd(stream), buf.base, buf.len);
990       }
991       while (nread < 0 && errno == EINTR);
992     } else {
993       assert(stream->read2_cb);
994       /* read2_cb uses recvmsg */
995       msg.msg_flags = 0;
996       msg.msg_iov = (struct iovec*) &buf;
997       msg.msg_iovlen = 1;
998       msg.msg_name = NULL;
999       msg.msg_namelen = 0;
1000       /* Set up to receive a descriptor even if one isn't in the message */
1001       msg.msg_controllen = 64;
1002       msg.msg_control = (void*)  cmsg_space;
1003
1004       do {
1005         nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1006       }
1007       while (nread < 0 && errno == EINTR);
1008     }
1009
1010     if (nread < 0) {
1011       /* Error */
1012       if (errno == EAGAIN || errno == EWOULDBLOCK) {
1013         /* Wait for the next one. */
1014         if (stream->flags & UV_STREAM_READING) {
1015           uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
1016         }
1017         uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE);
1018       } else {
1019         /* Error. User should call uv_close(). */
1020         uv__stream_read_cb(stream, -errno, &buf, UV_UNKNOWN_HANDLE);
1021         assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
1022                "stream->read_cb(status=-1) did not call uv_close()");
1023       }
1024       return;
1025     } else if (nread == 0) {
1026       uv__stream_eof(stream, &buf);
1027       return;
1028     } else {
1029       /* Successful read */
1030       ssize_t buflen = buf.len;
1031
1032       if (stream->read_cb) {
1033         stream->read_cb(stream, nread, &buf);
1034       } else {
1035         assert(stream->read2_cb);
1036
1037         /*
1038          * XXX: Some implementations can send multiple file descriptors in a
1039          * single message. We should be using CMSG_NXTHDR() to walk the
1040          * chain to get at them all. This would require changing the API to
1041          * hand these back up the caller, is a pain.
1042          */
1043
1044         for (cmsg = CMSG_FIRSTHDR(&msg);
1045              msg.msg_controllen > 0 && cmsg != NULL;
1046              cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1047
1048           if (cmsg->cmsg_type == SCM_RIGHTS) {
1049             if (stream->accepted_fd != -1) {
1050               fprintf(stderr, "(libuv) ignoring extra FD received\n");
1051             }
1052
1053             /* silence aliasing warning */
1054             {
1055               void* pv = CMSG_DATA(cmsg);
1056               int* pi = pv;
1057               stream->accepted_fd = *pi;
1058             }
1059
1060           } else {
1061             fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1062                 cmsg->cmsg_type);
1063           }
1064         }
1065
1066
1067         if (stream->accepted_fd >= 0) {
1068           stream->read2_cb((uv_pipe_t*) stream,
1069                            nread,
1070                            &buf,
1071                            uv__handle_type(stream->accepted_fd));
1072         } else {
1073           stream->read2_cb((uv_pipe_t*) stream, nread, &buf, UV_UNKNOWN_HANDLE);
1074         }
1075       }
1076
1077       /* Return if we didn't fill the buffer, there is no more data to read. */
1078       if (nread < buflen) {
1079         stream->flags |= UV_STREAM_READ_PARTIAL;
1080         return;
1081       }
1082     }
1083   }
1084 }
1085
1086
1087 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1088   assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
1089          "uv_shutdown (unix) only supports uv_handle_t right now");
1090
1091   if (!(stream->flags & UV_STREAM_WRITABLE) ||
1092       stream->flags & UV_STREAM_SHUT ||
1093       stream->flags & UV_CLOSED ||
1094       stream->flags & UV_CLOSING) {
1095     return -ENOTCONN;
1096   }
1097
1098   assert(uv__stream_fd(stream) >= 0);
1099
1100   /* Initialize request */
1101   uv__req_init(stream->loop, req, UV_SHUTDOWN);
1102   req->handle = stream;
1103   req->cb = cb;
1104   stream->shutdown_req = req;
1105   stream->flags |= UV_STREAM_SHUTTING;
1106
1107   uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
1108
1109   return 0;
1110 }
1111
1112
1113 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1114   uv_stream_t* stream;
1115
1116   stream = container_of(w, uv_stream_t, io_watcher);
1117
1118   assert(stream->type == UV_TCP ||
1119          stream->type == UV_NAMED_PIPE ||
1120          stream->type == UV_TTY);
1121   assert(!(stream->flags & UV_CLOSING));
1122
1123   if (stream->connect_req) {
1124     uv__stream_connect(stream);
1125     return;
1126   }
1127
1128   assert(uv__stream_fd(stream) >= 0);
1129
1130   /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
1131   if (events & (UV__POLLIN | UV__POLLERR))
1132     uv__read(stream);
1133
1134   if (uv__stream_fd(stream) == -1)
1135     return;  /* read_cb closed stream. */
1136
1137   /* Short-circuit iff POLLHUP is set, the user is still interested in read
1138    * events and uv__read() reported a partial read but not EOF. If the EOF
1139    * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1140    * have to do anything. If the partial read flag is not set, we can't
1141    * report the EOF yet because there is still data to read.
1142    */
1143   if ((events & UV__POLLHUP) &&
1144       (stream->flags & UV_STREAM_READING) &&
1145       (stream->flags & UV_STREAM_READ_PARTIAL) &&
1146       !(stream->flags & UV_STREAM_READ_EOF)) {
1147     uv_buf_t buf = { NULL, 0 };
1148     uv__stream_eof(stream, &buf);
1149   }
1150
1151   if (uv__stream_fd(stream) == -1)
1152     return;  /* read_cb closed stream. */
1153
1154   if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) {
1155     uv__write(stream);
1156     uv__write_callbacks(stream);
1157   }
1158 }
1159
1160
1161 /**
1162  * We get called here from directly following a call to connect(2).
1163  * In order to determine if we've errored out or succeeded must call
1164  * getsockopt.
1165  */
1166 static void uv__stream_connect(uv_stream_t* stream) {
1167   int error;
1168   uv_connect_t* req = stream->connect_req;
1169   socklen_t errorsize = sizeof(int);
1170
1171   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1172   assert(req);
1173
1174   if (stream->delayed_error) {
1175     /* To smooth over the differences between unixes errors that
1176      * were reported synchronously on the first connect can be delayed
1177      * until the next tick--which is now.
1178      */
1179     error = stream->delayed_error;
1180     stream->delayed_error = 0;
1181   } else {
1182     /* Normal situation: we need to get the socket error from the kernel. */
1183     assert(uv__stream_fd(stream) >= 0);
1184     getsockopt(uv__stream_fd(stream),
1185                SOL_SOCKET,
1186                SO_ERROR,
1187                &error,
1188                &errorsize);
1189     error = -error;
1190   }
1191
1192   if (error == -EINPROGRESS)
1193     return;
1194
1195   stream->connect_req = NULL;
1196   uv__req_unregister(stream->loop, req);
1197   uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
1198
1199   if (req->cb)
1200     req->cb(req, error);
1201 }
1202
1203
1204 int uv_write2(uv_write_t* req,
1205               uv_stream_t* stream,
1206               const uv_buf_t bufs[],
1207               unsigned int nbufs,
1208               uv_stream_t* send_handle,
1209               uv_write_cb cb) {
1210   int empty_queue;
1211
1212   assert(nbufs > 0);
1213   assert((stream->type == UV_TCP ||
1214           stream->type == UV_NAMED_PIPE ||
1215           stream->type == UV_TTY) &&
1216          "uv_write (unix) does not yet support other types of streams");
1217
1218   if (uv__stream_fd(stream) < 0)
1219     return -EBADF;
1220
1221   if (send_handle) {
1222     if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1223       return -EINVAL;
1224
1225     /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1226      * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1227      * evaluates to a function that operates on a uv_stream_t with a couple of
1228      * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1229      * which works but only by accident.
1230      */
1231     if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1232       return -EBADF;
1233   }
1234
1235   /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1236    * it means there are error-state requests in the write_completed_queue that
1237    * will touch up write_queue_size later, see also uv__write_req_finish().
1238    * We chould check that write_queue is empty instead but that implies making
1239    * a write() syscall when we know that the handle is in error mode.
1240    */
1241   empty_queue = (stream->write_queue_size == 0);
1242
1243   /* Initialize the req */
1244   uv__req_init(stream->loop, req, UV_WRITE);
1245   req->cb = cb;
1246   req->handle = stream;
1247   req->error = 0;
1248   req->send_handle = send_handle;
1249   QUEUE_INIT(&req->queue);
1250
1251   req->bufs = req->bufsml;
1252   if (nbufs > ARRAY_SIZE(req->bufsml))
1253     req->bufs = malloc(nbufs * sizeof(bufs[0]));
1254
1255   if (req->bufs == NULL)
1256     return -ENOMEM;
1257
1258   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1259   req->nbufs = nbufs;
1260   req->write_index = 0;
1261   stream->write_queue_size += uv_count_bufs(bufs, nbufs);
1262
1263   /* Append the request to write_queue. */
1264   QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1265
1266   /* If the queue was empty when this function began, we should attempt to
1267    * do the write immediately. Otherwise start the write_watcher and wait
1268    * for the fd to become writable.
1269    */
1270   if (stream->connect_req) {
1271     /* Still connecting, do nothing. */
1272   }
1273   else if (empty_queue) {
1274     uv__write(stream);
1275   }
1276   else {
1277     /*
1278      * blocking streams should never have anything in the queue.
1279      * if this assert fires then somehow the blocking stream isn't being
1280      * sufficiently flushed in uv__write.
1281      */
1282     assert(!(stream->flags & UV_STREAM_BLOCKING));
1283     uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
1284   }
1285
1286   return 0;
1287 }
1288
1289
1290 /* The buffers to be written must remain valid until the callback is called.
1291  * This is not required for the uv_buf_t array.
1292  */
1293 int uv_write(uv_write_t* req,
1294              uv_stream_t* handle,
1295              const uv_buf_t bufs[],
1296              unsigned int nbufs,
1297              uv_write_cb cb) {
1298   return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1299 }
1300
1301
1302 void uv_try_write_cb(uv_write_t* req, int status) {
1303   /* Should not be called */
1304   abort();
1305 }
1306
1307
1308 int uv_try_write(uv_stream_t* stream, const char* buf, size_t size) {
1309   int r;
1310   int has_pollout;
1311   size_t written;
1312   size_t req_size;
1313   uv_write_t req;
1314   uv_buf_t bufstruct;
1315
1316   /* Connecting or already writing some data */
1317   if (stream->connect_req != NULL || stream->write_queue_size != 0)
1318     return 0;
1319
1320   has_pollout = uv__io_active(&stream->io_watcher, UV__POLLOUT);
1321
1322   bufstruct = uv_buf_init((char*) buf, size);
1323   r = uv_write(&req, stream, &bufstruct, 1, uv_try_write_cb);
1324   if (r != 0)
1325     return r;
1326
1327   /* Remove not written bytes from write queue size */
1328   written = size;
1329   if (req.bufs != NULL)
1330     req_size = uv__write_req_size(&req);
1331   else
1332     req_size = 0;
1333   written -= req_size;
1334   stream->write_queue_size -= req_size;
1335
1336   /* Unqueue request, regardless of immediateness */
1337   QUEUE_REMOVE(&req.queue);
1338   uv__req_unregister(stream->loop, &req);
1339   if (req.bufs != req.bufsml)
1340     free(req.bufs);
1341   req.bufs = NULL;
1342
1343   /* Do not poll for writable, if we wasn't before calling this */
1344   if (!has_pollout)
1345     uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
1346
1347   return (int) written;
1348 }
1349
1350
1351 static int uv__read_start_common(uv_stream_t* stream,
1352                                  uv_alloc_cb alloc_cb,
1353                                  uv_read_cb read_cb,
1354                                  uv_read2_cb read2_cb) {
1355   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1356       stream->type == UV_TTY);
1357
1358   if (stream->flags & UV_CLOSING)
1359     return -EINVAL;
1360
1361   /* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just
1362    * expresses the desired state of the user.
1363    */
1364   stream->flags |= UV_STREAM_READING;
1365
1366 #if defined(__APPLE__)
1367   /* Notify select() thread about state change */
1368   if (stream->select != NULL)
1369     uv__stream_osx_interrupt_select(stream);
1370 #endif /* defined(__APPLE__) */
1371
1372   /* TODO: try to do the read inline? */
1373   /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1374    * not start the IO watcher.
1375    */
1376   assert(uv__stream_fd(stream) >= 0);
1377   assert(alloc_cb);
1378
1379   stream->read_cb = read_cb;
1380   stream->read2_cb = read2_cb;
1381   stream->alloc_cb = alloc_cb;
1382
1383   uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
1384   uv__handle_start(stream);
1385
1386   return 0;
1387 }
1388
1389
1390 int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
1391     uv_read_cb read_cb) {
1392   return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
1393 }
1394
1395
1396 int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
1397     uv_read2_cb read_cb) {
1398   return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
1399 }
1400
1401
1402 int uv_read_stop(uv_stream_t* stream) {
1403   /* Sanity check. We're going to stop the handle unless it's primed for
1404    * writing but that means there should be some kind of write action in
1405    * progress.
1406    */
1407   assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) ||
1408          !QUEUE_EMPTY(&stream->write_completed_queue) ||
1409          !QUEUE_EMPTY(&stream->write_queue) ||
1410          stream->shutdown_req != NULL ||
1411          stream->connect_req != NULL);
1412
1413   stream->flags &= ~UV_STREAM_READING;
1414   uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
1415   if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
1416     uv__handle_stop(stream);
1417
1418 #if defined(__APPLE__)
1419   /* Notify select() thread about state change */
1420   if (stream->select != NULL)
1421     uv__stream_osx_interrupt_select(stream);
1422 #endif /* defined(__APPLE__) */
1423
1424   stream->read_cb = NULL;
1425   stream->read2_cb = NULL;
1426   stream->alloc_cb = NULL;
1427   return 0;
1428 }
1429
1430
1431 int uv_is_readable(const uv_stream_t* stream) {
1432   return !!(stream->flags & UV_STREAM_READABLE);
1433 }
1434
1435
1436 int uv_is_writable(const uv_stream_t* stream) {
1437   return !!(stream->flags & UV_STREAM_WRITABLE);
1438 }
1439
1440
1441 #if defined(__APPLE__)
1442 int uv___stream_fd(uv_stream_t* handle) {
1443   uv__stream_select_t* s;
1444
1445   assert(handle->type == UV_TCP ||
1446          handle->type == UV_TTY ||
1447          handle->type == UV_NAMED_PIPE);
1448
1449   s = handle->select;
1450   if (s != NULL)
1451     return s->fd;
1452
1453   return handle->io_watcher.fd;
1454 }
1455 #endif /* defined(__APPLE__) */
1456
1457
1458 void uv__stream_close(uv_stream_t* handle) {
1459 #if defined(__APPLE__)
1460   /* Terminate select loop first */
1461   if (handle->select != NULL) {
1462     uv__stream_select_t* s;
1463
1464     s = handle->select;
1465
1466     uv_sem_post(&s->close_sem);
1467     uv_sem_post(&s->async_sem);
1468     uv__stream_osx_interrupt_select(handle);
1469     uv_thread_join(&s->thread);
1470     uv_sem_destroy(&s->close_sem);
1471     uv_sem_destroy(&s->async_sem);
1472     uv__close(s->fake_fd);
1473     uv__close(s->int_fd);
1474     uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1475
1476     handle->select = NULL;
1477   }
1478 #endif /* defined(__APPLE__) */
1479
1480   uv__io_close(handle->loop, &handle->io_watcher);
1481   uv_read_stop(handle);
1482   uv__handle_stop(handle);
1483
1484   if (handle->io_watcher.fd != -1) {
1485     /* Don't close stdio file descriptors.  Nothing good comes from it. */
1486     if (handle->io_watcher.fd > STDERR_FILENO)
1487       uv__close(handle->io_watcher.fd);
1488     handle->io_watcher.fd = -1;
1489   }
1490
1491   if (handle->accepted_fd != -1) {
1492     uv__close(handle->accepted_fd);
1493     handle->accepted_fd = -1;
1494   }
1495
1496   assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
1497 }
1498
1499
1500 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1501   assert(0 && "implement me");
1502   abort();
1503   return 0;
1504 }