uv: upgrade to e7758e1
[platform/upstream/nodejs.git] / deps / uv / src / win / pipe.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include <assert.h>
23 #include <io.h>
24 #include <string.h>
25 #include <stdio.h>
26
27 #include "uv.h"
28 #include "../uv-common.h"
29 #include "internal.h"
30
31
32 /* A zero-size buffer for use by uv_pipe_read */
33 static char uv_zero_[] = "";
34
35 /* Null uv_buf_t */
36 static const uv_buf_t uv_null_buf_ = { 0, NULL };
37
38 /* The timeout that the pipe will wait for the remote end to write data */
39 /* when the local ends wants to shut it down. */
40 static const int64_t eof_timeout = 50; /* ms */
41
42 static const int default_pending_pipe_instances = 4;
43
44 /* IPC protocol flags. */
45 #define UV_IPC_RAW_DATA   0x0001
46 #define UV_IPC_UV_STREAM  0x0002
47
48 /* IPC frame header. */
49 typedef struct {
50   int flags;
51   uint64_t raw_data_length;
52 } uv_ipc_frame_header_t;
53
54 /* IPC frame, which contains an imported TCP socket stream. */
55 typedef struct {
56   uv_ipc_frame_header_t header;
57   WSAPROTOCOL_INFOW socket_info;
58 } uv_ipc_frame_uv_stream;
59
60 static void eof_timer_init(uv_pipe_t* pipe);
61 static void eof_timer_start(uv_pipe_t* pipe);
62 static void eof_timer_stop(uv_pipe_t* pipe);
63 static void eof_timer_cb(uv_timer_t* timer, int status);
64 static void eof_timer_destroy(uv_pipe_t* pipe);
65 static void eof_timer_close_cb(uv_handle_t* handle);
66
67
68 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
69   _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId());
70 }
71
72
73 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
74   uv_stream_init(loop, (uv_stream_t*)handle);
75
76   handle->type = UV_NAMED_PIPE;
77   handle->reqs_pending = 0;
78   handle->handle = INVALID_HANDLE_VALUE;
79   handle->name = NULL;
80   handle->ipc_pid = 0;
81   handle->remaining_ipc_rawdata_bytes = 0;
82   handle->pending_socket_info = NULL;
83   handle->ipc = ipc;
84   handle->non_overlapped_writes_tail = NULL;
85
86   uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
87
88   loop->counters.pipe_init++;
89
90   return 0;
91 }
92
93
94 static void uv_pipe_connection_init(uv_pipe_t* handle) {
95   uv_connection_init((uv_stream_t*) handle);
96   handle->read_req.data = handle;
97   handle->eof_timer = NULL;
98 }
99
100
101 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
102     char* name, size_t nameSize) {
103   HANDLE pipeHandle;
104   int errno;
105   int err;
106   char* ptr = (char*)handle;
107
108   while (TRUE) {
109     uv_unique_pipe_name(ptr, name, nameSize);
110
111     pipeHandle = CreateNamedPipeA(name,
112       access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
113       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
114       NULL);
115
116     if (pipeHandle != INVALID_HANDLE_VALUE) {
117       /* No name collisions.  We're done. */
118       break;
119     }
120
121     errno = GetLastError();
122     if (errno != ERROR_PIPE_BUSY && errno != ERROR_ACCESS_DENIED) {
123       uv__set_sys_error(loop, errno);
124       err = -1;
125       goto done;
126     }
127
128     /* Pipe name collision.  Increment the pointer and try again. */
129     ptr++;
130   }
131
132   if (CreateIoCompletionPort(pipeHandle,
133                              loop->iocp,
134                              (ULONG_PTR)handle,
135                              0) == NULL) {
136     uv__set_sys_error(loop, GetLastError());
137     err = -1;
138     goto done;
139   }
140
141   uv_pipe_connection_init(handle);
142   handle->handle = pipeHandle;
143   err = 0;
144
145 done:
146   if (err && pipeHandle != INVALID_HANDLE_VALUE) {
147     CloseHandle(pipeHandle);
148   }
149
150   return err;
151 }
152
153
154 static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
155     HANDLE pipeHandle) {
156   NTSTATUS nt_status;
157   IO_STATUS_BLOCK io_status;
158   FILE_MODE_INFORMATION mode_info;
159   DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
160
161   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
162     return -1;
163   }
164
165   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
166   nt_status = pNtQueryInformationFile(pipeHandle,
167                                       &io_status,
168                                       &mode_info,
169                                       sizeof(mode_info),
170                                       FileModeInformation);
171   if (nt_status != STATUS_SUCCESS) {
172     return -1;
173   }
174
175   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
176       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
177     /* Non-overlapped pipe. */
178     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
179   } else {
180     /* Overlapped pipe.  Try to associate with IOCP. */
181     if (CreateIoCompletionPort(pipeHandle,
182                                loop->iocp,
183                                (ULONG_PTR)handle,
184                                0) == NULL) {
185       handle->flags |= UV_HANDLE_EMULATE_IOCP;
186     }
187   }
188
189   return 0;
190 }
191
192
193 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
194   int errno;
195   uv_loop_t* loop;
196   uv_pipe_t* handle;
197   uv_shutdown_t* req;
198
199   req = (uv_shutdown_t*) parameter;
200   assert(req);
201   handle = (uv_pipe_t*) req->handle;
202   assert(handle);
203   loop = handle->loop;
204   assert(loop);
205
206   FlushFileBuffers(handle->handle);
207
208   /* Post completed */
209   POST_COMPLETION_FOR_REQ(loop, req);
210
211   return 0;
212 }
213
214
215 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
216   unsigned int uv_alloced;
217   DWORD result;
218   uv_shutdown_t* req;
219   NTSTATUS nt_status;
220   IO_STATUS_BLOCK io_status;
221   FILE_PIPE_LOCAL_INFORMATION pipe_info;
222
223   if (handle->flags & UV_HANDLE_SHUTTING &&
224       !(handle->flags & UV_HANDLE_SHUT) &&
225       handle->write_reqs_pending == 0) {
226     req = handle->shutdown_req;
227
228     /* Try to avoid flushing the pipe buffer in the thread pool. */
229     nt_status = pNtQueryInformationFile(handle->handle,
230                                         &io_status,
231                                         &pipe_info,
232                                         sizeof pipe_info,
233                                         FilePipeLocalInformation);
234
235     if (nt_status != STATUS_SUCCESS) {
236       /* Failure */
237       handle->flags &= ~UV_HANDLE_SHUTTING;
238       if (req->cb) {
239         uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status));
240         req->cb(req, -1);
241       }
242       DECREASE_PENDING_REQ_COUNT(handle);
243       return;
244     }
245
246     if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
247       handle->flags |= UV_HANDLE_SHUT;
248
249       /* Short-circuit, no need to call FlushFileBuffers. */
250       uv_insert_pending_req(loop, (uv_req_t*) req);
251       return;
252     }
253
254     /* Run FlushFileBuffers in the thread pool. */
255     result = QueueUserWorkItem(pipe_shutdown_thread_proc,
256                                req,
257                                WT_EXECUTELONGFUNCTION);
258     if (result) {
259       /* Mark the handle as shut now to avoid going through this again. */
260       handle->flags |= UV_HANDLE_SHUT;
261       return;
262
263     } else {
264       /* Failure. */
265       handle->flags &= ~UV_HANDLE_SHUTTING;
266       if (req->cb) {
267         uv__set_sys_error(loop, GetLastError());
268         req->cb(req, -1);
269       }
270       DECREASE_PENDING_REQ_COUNT(handle);
271       return;
272     }
273   }
274
275   if (handle->flags & UV_HANDLE_CLOSING &&
276       handle->reqs_pending == 0) {
277     assert(!(handle->flags & UV_HANDLE_CLOSED));
278     handle->flags |= UV_HANDLE_CLOSED;
279
280     if (handle->flags & UV_HANDLE_CONNECTION) {
281       if (handle->pending_socket_info) {
282         free(handle->pending_socket_info);
283         handle->pending_socket_info = NULL;
284       }
285       
286       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
287         if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
288           UnregisterWait(handle->read_req.wait_handle);
289           handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
290         }
291         if (handle->read_req.event_handle) {
292           CloseHandle(handle->read_req.event_handle);
293           handle->read_req.event_handle = NULL;
294         }
295       }
296     }
297
298     if (handle->flags & UV_HANDLE_PIPESERVER) {
299       assert(handle->accept_reqs);
300       free(handle->accept_reqs);
301       handle->accept_reqs = NULL;
302     }
303
304     /* Remember the state of this flag because the close callback is */
305     /* allowed to clobber or free the handle's memory */
306     uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED;
307
308     if (handle->close_cb) {
309       handle->close_cb((uv_handle_t*)handle);
310     }
311
312     if (uv_alloced) {
313       free(handle);
314     }
315
316     uv_unref(loop);
317   }
318 }
319
320
321 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
322   handle->pending_instances = count;
323   handle->flags |= UV_HANDLE_PIPESERVER;
324 }
325
326
327 /* Creates a pipe server. */
328 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
329   uv_loop_t* loop = handle->loop;
330   int i, errno, nameSize;
331   uv_pipe_accept_t* req;
332
333   if (handle->flags & UV_HANDLE_BOUND) {
334     uv__set_sys_error(loop, WSAEINVAL);
335     return -1;
336   }
337
338   if (!name) {
339     uv__set_sys_error(loop, WSAEINVAL);
340     return -1;
341   }
342
343   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
344     handle->pending_instances = default_pending_pipe_instances;
345   }
346
347   handle->accept_reqs = (uv_pipe_accept_t*)
348     malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
349   if (!handle->accept_reqs) {
350     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
351   }
352
353   for (i = 0; i < handle->pending_instances; i++) {
354     req = &handle->accept_reqs[i];
355     uv_req_init(loop, (uv_req_t*) req);
356     req->type = UV_ACCEPT;
357     req->data = handle;
358     req->pipeHandle = INVALID_HANDLE_VALUE;
359     req->next_pending = NULL;
360   }
361
362   /* Convert name to UTF16. */
363   nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(wchar_t);
364   handle->name = (wchar_t*)malloc(nameSize);
365   if (!handle->name) {
366     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
367   }
368
369   if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(wchar_t))) {
370     uv__set_sys_error(loop, GetLastError());
371     return -1;
372   }
373
374   /*
375    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
376    * If this fails then there's already a pipe server for the given pipe name.
377    */
378   handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
379       PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
380       FILE_FLAG_FIRST_PIPE_INSTANCE,
381       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
382       PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
383
384   if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
385     errno = GetLastError();
386     if (errno == ERROR_ACCESS_DENIED) {
387       uv__set_error(loop, UV_EADDRINUSE, errno);
388     } else if (errno == ERROR_PATH_NOT_FOUND || errno == ERROR_INVALID_NAME) {
389       uv__set_error(loop, UV_EACCES, errno);
390     } else {
391       uv__set_sys_error(loop, errno);
392     }
393     goto error;
394   }
395
396   if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle)) {
397     uv__set_sys_error(loop, GetLastError());
398     goto error;
399   }
400
401   handle->pending_accepts = NULL;
402   handle->flags |= UV_HANDLE_PIPESERVER;
403   handle->flags |= UV_HANDLE_BOUND;
404
405   return 0;
406
407 error:
408   if (handle->name) {
409     free(handle->name);
410     handle->name = NULL;
411   }
412
413   if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
414     CloseHandle(handle->accept_reqs[0].pipeHandle);
415     handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
416   }
417
418   return -1;
419 }
420
421
422 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
423   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
424   int errno;
425   uv_loop_t* loop;
426   uv_pipe_t* handle;
427   uv_connect_t* req;
428
429   req = (uv_connect_t*) parameter;
430   assert(req);
431   handle = (uv_pipe_t*) req->handle;
432   assert(handle);
433   loop = handle->loop;
434   assert(loop);
435
436   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
437   /* We wait for the pipe to become available with WaitNamedPipe. */
438   while (WaitNamedPipeW(handle->name, 30000)) {
439     /* The pipe is now available, try to connect. */
440     pipeHandle = CreateFileW(handle->name,
441                             GENERIC_READ | GENERIC_WRITE,
442                             0,
443                             NULL,
444                             OPEN_EXISTING,
445                             FILE_FLAG_OVERLAPPED,
446                             NULL);
447
448     if (pipeHandle != INVALID_HANDLE_VALUE) {
449       break;
450     }
451
452     SwitchToThread();
453   }
454
455   if (pipeHandle != INVALID_HANDLE_VALUE &&
456       !uv_set_pipe_handle(loop, handle, pipeHandle)) {
457     handle->handle = pipeHandle;
458     SET_REQ_SUCCESS(req);
459   } else {
460     SET_REQ_ERROR(req, GetLastError());
461   }
462
463   /* Post completed */
464   POST_COMPLETION_FOR_REQ(loop, req);
465
466   return 0;
467 }
468
469
470 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
471     const char* name, uv_connect_cb cb) {
472   uv_loop_t* loop = handle->loop;
473   int errno, nameSize;
474   HANDLE pipeHandle;
475
476   handle->handle = INVALID_HANDLE_VALUE;
477
478   uv_req_init(loop, (uv_req_t*) req);
479   req->type = UV_CONNECT;
480   req->handle = (uv_stream_t*) handle;
481   req->cb = cb;
482
483   /* Convert name to UTF16. */
484   nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(wchar_t);
485   handle->name = (wchar_t*)malloc(nameSize);
486   if (!handle->name) {
487     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
488   }
489
490   if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(wchar_t))) {
491     errno = GetLastError();
492     goto error;
493   }
494
495   pipeHandle = CreateFileW(handle->name,
496                           GENERIC_READ | GENERIC_WRITE,
497                           0,
498                           NULL,
499                           OPEN_EXISTING,
500                           FILE_FLAG_OVERLAPPED,
501                           NULL);
502
503   if (pipeHandle == INVALID_HANDLE_VALUE) {
504     if (GetLastError() == ERROR_PIPE_BUSY) {
505       /* Wait for the server to make a pipe instance available. */
506       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
507                              req,
508                              WT_EXECUTELONGFUNCTION)) {
509         errno = GetLastError();
510         goto error;
511       }
512
513       handle->reqs_pending++;
514
515       return;
516     }
517
518     errno = GetLastError();
519     goto error;
520   }
521
522   if (uv_set_pipe_handle(loop, (uv_pipe_t*)req->handle, pipeHandle)) {
523     errno = GetLastError();
524     goto error;
525   }
526
527   handle->handle = pipeHandle;
528
529   SET_REQ_SUCCESS(req);
530   uv_insert_pending_req(loop, (uv_req_t*) req);
531   handle->reqs_pending++;
532   return;
533
534 error:
535   if (handle->name) {
536     free(handle->name);
537     handle->name = NULL;
538   }
539
540   if (pipeHandle != INVALID_HANDLE_VALUE) {
541     CloseHandle(pipeHandle);
542   }
543
544   /* Make this req pending reporting an error. */
545   SET_REQ_ERROR(req, errno);
546   uv_insert_pending_req(loop, (uv_req_t*) req);
547   handle->reqs_pending++;
548   return;
549 }
550
551
552 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
553 /* with it. */
554 void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
555   int i;
556   HANDLE pipeHandle;
557
558   if (handle->name) {
559     free(handle->name);
560     handle->name = NULL;
561   }
562
563   if (handle->flags & UV_HANDLE_PIPESERVER) {
564     for (i = 0; i < handle->pending_instances; i++) {
565       pipeHandle = handle->accept_reqs[i].pipeHandle;
566       if (pipeHandle != INVALID_HANDLE_VALUE) {
567         CloseHandle(pipeHandle);
568         handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
569       }
570     }
571   }
572
573   if (handle->flags & UV_HANDLE_CONNECTION) {
574     eof_timer_destroy(handle);
575   }
576
577   if ((handle->flags & UV_HANDLE_CONNECTION)
578       && handle->handle != INVALID_HANDLE_VALUE) {
579     CloseHandle(handle->handle);
580     handle->handle = INVALID_HANDLE_VALUE;
581   }
582
583   handle->flags |= UV_HANDLE_SHUT;
584 }
585
586
587 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
588     uv_pipe_accept_t* req, BOOL firstInstance) {
589   assert(handle->flags & UV_HANDLE_LISTENING);
590
591   if (!firstInstance) {
592     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
593
594     req->pipeHandle = CreateNamedPipeW(handle->name,
595         PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
596         PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
597         PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
598
599     if (req->pipeHandle == INVALID_HANDLE_VALUE) {
600       SET_REQ_ERROR(req, GetLastError());
601       uv_insert_pending_req(loop, (uv_req_t*) req);
602       handle->reqs_pending++;
603       return;
604     }
605
606     if (uv_set_pipe_handle(loop, handle, req->pipeHandle)) {
607       CloseHandle(req->pipeHandle);
608       req->pipeHandle = INVALID_HANDLE_VALUE;
609       SET_REQ_ERROR(req, GetLastError());
610       uv_insert_pending_req(loop, (uv_req_t*) req);
611       handle->reqs_pending++;
612       return;
613     }
614   }
615
616   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
617
618   /* Prepare the overlapped structure. */
619   memset(&(req->overlapped), 0, sizeof(req->overlapped));
620
621   if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
622       GetLastError() != ERROR_IO_PENDING) {
623     if (GetLastError() == ERROR_PIPE_CONNECTED) {
624       SET_REQ_SUCCESS(req);
625     } else {
626       CloseHandle(req->pipeHandle);
627       req->pipeHandle = INVALID_HANDLE_VALUE;
628       /* Make this req pending reporting an error. */
629       SET_REQ_ERROR(req, GetLastError());
630     }
631     uv_insert_pending_req(loop, (uv_req_t*) req);
632     handle->reqs_pending++;
633     return;
634   }
635
636   handle->reqs_pending++;
637 }
638
639
640 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
641   uv_loop_t* loop = server->loop;
642   uv_pipe_t* pipe_client;
643   uv_pipe_accept_t* req;
644
645   if (server->ipc) {
646     if (!server->pending_socket_info) {
647       /* No valid pending sockets. */
648       uv__set_sys_error(loop, WSAEWOULDBLOCK);
649       return -1;
650     }
651
652     return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
653   } else {
654     pipe_client = (uv_pipe_t*)client;
655
656     /* Find a connection instance that has been connected, but not yet */
657     /* accepted. */
658     req = server->pending_accepts;
659
660     if (!req) {
661       /* No valid connections found, so we error out. */
662       uv__set_sys_error(loop, WSAEWOULDBLOCK);
663       return -1;
664     }
665
666     /* Initialize the client handle and copy the pipeHandle to the client */
667     uv_pipe_connection_init(pipe_client);
668     pipe_client->handle = req->pipeHandle;
669
670     /* Prepare the req to pick up a new connection */
671     server->pending_accepts = req->next_pending;
672     req->next_pending = NULL;
673     req->pipeHandle = INVALID_HANDLE_VALUE;
674
675     if (!(server->flags & UV_HANDLE_CLOSING)) {
676       uv_pipe_queue_accept(loop, server, req, FALSE);
677     }
678   }
679
680   return 0;
681 }
682
683
684 /* Starts listening for connections for the given pipe. */
685 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
686   uv_loop_t* loop = handle->loop;
687
688   int i, errno;
689
690   if (!(handle->flags & UV_HANDLE_BOUND)) {
691     uv__set_artificial_error(loop, UV_EINVAL);
692     return -1;
693   }
694
695   if (handle->flags & UV_HANDLE_LISTENING ||
696       handle->flags & UV_HANDLE_READING) {
697     uv__set_artificial_error(loop, UV_EALREADY);
698     return -1;
699   }
700
701   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
702     uv__set_artificial_error(loop, UV_ENOTSUP);
703     return -1;
704   }
705
706   handle->flags |= UV_HANDLE_LISTENING;
707   handle->connection_cb = cb;
708
709   /* First pipe handle should have already been created in uv_pipe_bind */
710   assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
711
712   for (i = 0; i < handle->pending_instances; i++) {
713     uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
714   }
715
716   return 0;
717 }
718
719
720 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
721   int result;
722   DWORD bytes;
723   uv_read_t* req = (uv_read_t*) parameter;
724   uv_pipe_t* handle = (uv_pipe_t*) req->data;
725   uv_loop_t* loop = handle->loop;
726
727   assert(req != NULL);
728   assert(req->type == UV_READ);
729   assert(handle->type == UV_NAMED_PIPE);
730
731   result = ReadFile(handle->handle,
732                     &uv_zero_,
733                     0,
734                     &bytes,
735                     NULL);
736
737   if (!result) {
738     SET_REQ_ERROR(req, GetLastError());
739   }
740
741   POST_COMPLETION_FOR_REQ(loop, req);
742   return 0;
743 }
744
745
746 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
747   int result;
748   DWORD bytes;
749   uv_write_t* req = (uv_write_t*) parameter;
750   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
751   uv_loop_t* loop = handle->loop;
752
753   assert(req != NULL);
754   assert(req->type == UV_WRITE);
755   assert(handle->type == UV_NAMED_PIPE);
756   assert(req->write_buffer.base);
757
758   result = WriteFile(handle->handle,
759                      req->write_buffer.base,
760                      req->write_buffer.len,
761                      &bytes,
762                      NULL);
763
764   if (!result) {
765     SET_REQ_ERROR(req, GetLastError());
766   }
767
768   POST_COMPLETION_FOR_REQ(loop, req);
769   return 0;
770 }
771
772
773 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
774   uv_read_t* req;
775   uv_tcp_t* handle;
776
777   req = (uv_read_t*) context;
778   assert(req != NULL);
779   handle = (uv_tcp_t*)req->data;
780   assert(handle != NULL);
781   assert(!timed_out);
782
783   if (!PostQueuedCompletionStatus(handle->loop->iocp,
784                                   req->overlapped.InternalHigh,
785                                   0,
786                                   &req->overlapped)) {
787     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
788   }
789 }
790
791
792 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
793   uv_write_t* req;
794   uv_tcp_t* handle;
795
796   req = (uv_write_t*) context;
797   assert(req != NULL);
798   handle = (uv_tcp_t*)req->handle;
799   assert(handle != NULL);
800   assert(!timed_out);
801
802   if (!PostQueuedCompletionStatus(handle->loop->iocp,
803                                   req->overlapped.InternalHigh,
804                                   0,
805                                   &req->overlapped)) {
806     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
807   }
808 }
809
810
811 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
812   uv_read_t* req;
813   int result;
814
815   assert(handle->flags & UV_HANDLE_READING);
816   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
817
818   assert(handle->handle != INVALID_HANDLE_VALUE);
819
820   req = &handle->read_req;
821
822   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
823     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
824                            req,
825                            WT_EXECUTELONGFUNCTION)) {
826       /* Make this req pending reporting an error. */
827       SET_REQ_ERROR(req, GetLastError());
828       goto error;
829     } 
830   } else {
831     memset(&req->overlapped, 0, sizeof(req->overlapped));
832     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
833       req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
834     }
835
836     /* Do 0-read */
837     result = ReadFile(handle->handle,
838                       &uv_zero_,
839                       0,
840                       NULL,
841                       &req->overlapped);
842
843     if (!result && GetLastError() != ERROR_IO_PENDING) {
844       /* Make this req pending reporting an error. */
845       SET_REQ_ERROR(req, GetLastError());
846       goto error;
847     }
848
849     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
850       if (!req->event_handle) {
851         req->event_handle = CreateEvent(NULL, 0, 0, NULL);
852         if (!req->event_handle) {
853           uv_fatal_error(GetLastError(), "CreateEvent");
854         }
855       }
856       if (req->wait_handle == INVALID_HANDLE_VALUE) {
857         if (!RegisterWaitForSingleObject(&req->wait_handle,
858             req->overlapped.hEvent, post_completion_read_wait, (void*) req,
859             INFINITE, WT_EXECUTEINWAITTHREAD)) {
860           SET_REQ_ERROR(req, GetLastError());
861           goto error;
862         }
863       }
864     }
865   }
866
867   /* Start the eof timer if there is one */
868   eof_timer_start(handle);
869   handle->flags |= UV_HANDLE_READ_PENDING;
870   handle->reqs_pending++;
871   return;
872
873 error:
874   uv_insert_pending_req(loop, (uv_req_t*)req);
875   handle->flags |= UV_HANDLE_READ_PENDING;
876   handle->reqs_pending++;
877 }
878
879
880 static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
881     uv_read_cb read_cb, uv_read2_cb read2_cb) {
882   uv_loop_t* loop = handle->loop;
883
884   if (!(handle->flags & UV_HANDLE_CONNECTION)) {
885     uv__set_artificial_error(loop, UV_EINVAL);
886     return -1;
887   }
888
889   if (handle->flags & UV_HANDLE_READING) {
890     uv__set_artificial_error(loop, UV_EALREADY);
891     return -1;
892   }
893
894   if (handle->flags & UV_HANDLE_EOF) {
895     uv__set_artificial_error(loop, UV_EOF);
896     return -1;
897   }
898
899   handle->flags |= UV_HANDLE_READING;
900   handle->read_cb = read_cb;
901   handle->read2_cb = read2_cb;
902   handle->alloc_cb = alloc_cb;
903
904   /* If reading was stopped and then started again, there could still be a */
905   /* read request pending. */
906   if (!(handle->flags & UV_HANDLE_READ_PENDING))
907     uv_pipe_queue_read(loop, handle);
908
909   return 0;
910 }
911
912
913 int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
914     uv_read_cb read_cb) {
915   return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
916 }
917
918
919 int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
920     uv_read2_cb read_cb) {
921   return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
922 }
923
924
925 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
926     uv_write_t* req) {
927   req->next_req = NULL;
928   if (handle->non_overlapped_writes_tail) {
929     req->next_req =
930       handle->non_overlapped_writes_tail->next_req;
931     handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
932     handle->non_overlapped_writes_tail = req;
933   } else {
934     req->next_req = (uv_req_t*)req;
935     handle->non_overlapped_writes_tail = req;
936   }
937 }
938
939
940 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
941   uv_write_t* req;
942
943   if (handle->non_overlapped_writes_tail) {
944     req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
945
946     if (req == handle->non_overlapped_writes_tail) {
947       handle->non_overlapped_writes_tail = NULL;
948     } else {
949       handle->non_overlapped_writes_tail->next_req =
950         req->next_req;
951     }
952
953     return req;
954   } else {
955     /* queue empty */
956     return NULL;
957   }
958 }
959
960
961 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
962   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
963   if (req) {
964     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
965                            req,
966                            WT_EXECUTELONGFUNCTION)) {
967       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
968     }
969   }
970 }
971
972
973 static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
974     uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
975     uv_stream_t* send_handle, uv_write_cb cb) {
976   int result;
977   uv_tcp_t* tcp_send_handle;
978   uv_write_t* ipc_header_req;
979   uv_ipc_frame_uv_stream ipc_frame;
980
981   if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) {
982     uv__set_artificial_error(loop, UV_ENOTSUP);
983     return -1;
984   }
985
986   /* Only TCP server handles are supported for sharing. */
987   if (send_handle && (send_handle->type != UV_TCP ||
988       send_handle->flags & UV_HANDLE_CONNECTION)) {
989     uv__set_artificial_error(loop, UV_ENOTSUP);
990     return -1;
991   }
992
993   assert(handle->handle != INVALID_HANDLE_VALUE);
994
995   if (!(handle->flags & UV_HANDLE_CONNECTION)) {
996     uv__set_artificial_error(loop, UV_EINVAL);
997     return -1;
998   }
999
1000   if (handle->flags & UV_HANDLE_SHUTTING) {
1001     uv__set_artificial_error(loop, UV_EOF);
1002     return -1;
1003   }
1004
1005   uv_req_init(loop, (uv_req_t*) req);
1006   req->type = UV_WRITE;
1007   req->handle = (uv_stream_t*) handle;
1008   req->cb = cb;
1009   req->ipc_header = 0;
1010   req->event_handle = NULL;
1011   req->wait_handle = INVALID_HANDLE_VALUE;
1012   memset(&req->overlapped, 0, sizeof(req->overlapped));
1013
1014   if (handle->ipc) {
1015     assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1016     ipc_frame.header.flags = 0;
1017
1018     /* Use the IPC framing protocol. */
1019     if (send_handle) {
1020       tcp_send_handle = (uv_tcp_t*)send_handle;
1021
1022       if (uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
1023           &ipc_frame.socket_info)) {
1024         return -1;
1025       }
1026       ipc_frame.header.flags |= UV_IPC_UV_STREAM;
1027     }
1028
1029     if (bufcnt == 1) {
1030       ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1031       ipc_frame.header.raw_data_length = bufs[0].len;
1032     }
1033
1034     /* 
1035      * Use the provided req if we're only doing a single write.
1036      * If we're doing multiple writes, use ipc_header_write_req to do
1037      * the first write, and then use the provided req for the second write.
1038      */
1039     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1040       ipc_header_req = req;
1041     } else {
1042       /* 
1043        * Try to use the preallocated write req if it's available.
1044        * Otherwise allocate a new one.
1045        */
1046       if (handle->ipc_header_write_req.type != UV_WRITE) {
1047         ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
1048       } else {
1049         ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
1050         if (!ipc_header_req) {
1051           uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1052         }
1053       }
1054
1055       uv_req_init(loop, (uv_req_t*) ipc_header_req);
1056       ipc_header_req->type = UV_WRITE;
1057       ipc_header_req->handle = (uv_stream_t*) handle;
1058       ipc_header_req->cb = NULL;
1059       ipc_header_req->ipc_header = 1;
1060     }
1061
1062     /* Write the header or the whole frame. */
1063     memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
1064
1065     result = WriteFile(handle->handle,
1066                         &ipc_frame,
1067                         ipc_frame.header.flags & UV_IPC_UV_STREAM ?
1068                           sizeof(ipc_frame) : sizeof(ipc_frame.header),
1069                         NULL,
1070                         &ipc_header_req->overlapped);
1071     if (!result && GetLastError() != ERROR_IO_PENDING) {
1072       uv__set_sys_error(loop, GetLastError());
1073       return -1;
1074     }
1075
1076     if (result) {
1077       /* Request completed immediately. */
1078       ipc_header_req->queued_bytes = 0;
1079     } else {
1080       /* Request queued by the kernel. */
1081       ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_UV_STREAM ?
1082         sizeof(ipc_frame) : sizeof(ipc_frame.header);
1083       handle->write_queue_size += req->queued_bytes;
1084     }
1085
1086     if (handle->write_reqs_pending == 0) {
1087       uv_ref(loop);
1088     }
1089
1090     handle->reqs_pending++;
1091     handle->write_reqs_pending++;
1092
1093     /* If we don't have any raw data to write - we're done. */
1094     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1095       return 0;
1096     }
1097   }
1098
1099   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1100     req->write_buffer = bufs[0];
1101     uv_insert_non_overlapped_write_req(handle, req);
1102     if (handle->write_reqs_pending == 0) {
1103       uv_queue_non_overlapped_write(handle);
1104     }
1105
1106     /* Request queued by the kernel. */
1107     req->queued_bytes = uv_count_bufs(bufs, bufcnt);
1108     handle->write_queue_size += req->queued_bytes;
1109   } else {
1110     result = WriteFile(handle->handle,
1111                        bufs[0].base,
1112                        bufs[0].len,
1113                        NULL,
1114                        &req->overlapped);
1115
1116     if (!result && GetLastError() != ERROR_IO_PENDING) {
1117       uv__set_sys_error(loop, GetLastError());
1118       return -1;
1119     }
1120
1121     if (result) {
1122       /* Request completed immediately. */
1123       req->queued_bytes = 0;
1124     } else {
1125       /* Request queued by the kernel. */
1126       req->queued_bytes = uv_count_bufs(bufs, bufcnt);
1127       handle->write_queue_size += req->queued_bytes;
1128     }
1129
1130     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1131       req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1132       if (!req->event_handle) {
1133         uv_fatal_error(GetLastError(), "CreateEvent");
1134       }
1135       if (!RegisterWaitForSingleObject(&req->wait_handle,
1136           req->overlapped.hEvent, post_completion_write_wait, (void*) req,
1137           INFINITE, WT_EXECUTEINWAITTHREAD)) {
1138         uv__set_sys_error(loop, GetLastError());
1139         return -1;
1140       }
1141     }
1142   }
1143
1144   if (handle->write_reqs_pending == 0) {
1145     uv_ref(loop);
1146   }
1147
1148   handle->reqs_pending++;
1149   handle->write_reqs_pending++;
1150
1151   return 0;
1152 }
1153
1154
1155 int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
1156     uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
1157   return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb);
1158 }
1159
1160
1161 int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
1162     uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) {
1163   if (!handle->ipc) {
1164     uv__set_artificial_error(loop, UV_EINVAL);
1165     return -1;
1166   }
1167
1168   return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb);
1169 }
1170
1171
1172 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1173     uv_buf_t buf) {
1174   /* If there is an eof timer running, we don't need it any more, */
1175   /* so discard it. */
1176   eof_timer_destroy(handle);
1177
1178   handle->flags |= UV_HANDLE_EOF;
1179   uv_read_stop((uv_stream_t*) handle);
1180
1181   uv__set_artificial_error(loop, UV_EOF);
1182   if (handle->read2_cb) {
1183     handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE);
1184   } else {
1185     handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_);
1186   }
1187 }
1188
1189
1190 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1191     uv_buf_t buf) {
1192   /* If there is an eof timer running, we don't need it any more, */
1193   /* so discard it. */
1194   eof_timer_destroy(handle);
1195
1196   uv_read_stop((uv_stream_t*) handle);
1197
1198   uv__set_sys_error(loop, error);
1199   if (handle->read2_cb) {
1200     handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE);
1201   } else {
1202     handle->read_cb((uv_stream_t*)handle, -1, buf);
1203   }
1204 }
1205
1206
1207 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1208     int error, uv_buf_t buf) {
1209   if (error == ERROR_BROKEN_PIPE) {
1210     uv_pipe_read_eof(loop, handle, buf);
1211   } else {
1212     uv_pipe_read_error(loop, handle, error, buf);
1213   }
1214 }
1215
1216
1217 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1218     uv_req_t* req) {
1219   DWORD bytes, avail;
1220   uv_buf_t buf;
1221   uv_ipc_frame_uv_stream ipc_frame;
1222
1223   assert(handle->type == UV_NAMED_PIPE);
1224
1225   handle->flags &= ~UV_HANDLE_READ_PENDING;
1226   eof_timer_stop(handle);
1227
1228   if (!REQ_SUCCESS(req)) {
1229     /* An error occurred doing the 0-read. */
1230     if (handle->flags & UV_HANDLE_READING) {
1231       uv_pipe_read_error_or_eof(loop,
1232                                 handle,
1233                                 GET_REQ_ERROR(req),
1234                                 uv_null_buf_);
1235     }
1236   } else {
1237     /* Do non-blocking reads until the buffer is empty */
1238     while (handle->flags & UV_HANDLE_READING) {
1239       if (!PeekNamedPipe(handle->handle,
1240                           NULL,
1241                           0,
1242                           NULL,
1243                           &avail,
1244                           NULL)) {
1245         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1246         break;
1247       }
1248
1249       if (avail == 0) {
1250         /* There is nothing to read after all. */
1251         break;
1252       }
1253
1254       if (handle->ipc) {
1255         /* Use the IPC framing protocol to read the incoming data. */
1256         if (handle->remaining_ipc_rawdata_bytes == 0) {
1257           /* We're reading a new frame.  First, read the header. */
1258           assert(avail >= sizeof(ipc_frame.header));
1259
1260           if (!ReadFile(handle->handle,
1261                         &ipc_frame.header,
1262                         sizeof(ipc_frame.header),
1263                         &bytes,
1264                         NULL)) {
1265             uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1266               uv_null_buf_);
1267             break;
1268           }
1269
1270           assert(bytes == sizeof(ipc_frame.header));
1271           assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA));
1272
1273           if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
1274             assert(avail - sizeof(ipc_frame.header) >=
1275               sizeof(ipc_frame.socket_info));
1276
1277             /* Read the TCP socket info. */
1278             if (!ReadFile(handle->handle,
1279                           &ipc_frame.socket_info,
1280                           sizeof(ipc_frame) - sizeof(ipc_frame.header),
1281                           &bytes,
1282                           NULL)) {
1283               uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1284                 uv_null_buf_);
1285               break;
1286             }
1287
1288             assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1289
1290             /* Store the pending socket info. */
1291             assert(!handle->pending_socket_info);
1292             handle->pending_socket_info =
1293               (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
1294             if (!handle->pending_socket_info) {
1295               uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1296             }
1297
1298             *(handle->pending_socket_info) = ipc_frame.socket_info;
1299           }
1300
1301           if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1302             handle->remaining_ipc_rawdata_bytes =
1303               ipc_frame.header.raw_data_length;
1304             continue;
1305           }
1306         } else {
1307           avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
1308         }
1309       }
1310
1311       buf = handle->alloc_cb((uv_handle_t*) handle, avail);
1312       assert(buf.len > 0);
1313
1314       if (ReadFile(handle->handle,
1315                    buf.base,
1316                    buf.len,
1317                    &bytes,
1318                    NULL)) {
1319         /* Successful read */
1320         if (handle->ipc) {
1321           assert(handle->remaining_ipc_rawdata_bytes >= bytes);
1322           handle->remaining_ipc_rawdata_bytes = 
1323             handle->remaining_ipc_rawdata_bytes - bytes;
1324           if (handle->read2_cb) {
1325             handle->read2_cb(handle, bytes, buf,
1326               handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
1327           } else if (handle->read_cb) {
1328             handle->read_cb((uv_stream_t*)handle, bytes, buf);
1329           }
1330
1331           if (handle->pending_socket_info) {
1332             free(handle->pending_socket_info);
1333             handle->pending_socket_info = NULL;
1334           }
1335         } else {
1336           handle->read_cb((uv_stream_t*)handle, bytes, buf);
1337         }
1338
1339         /* Read again only if bytes == buf.len */
1340         if (bytes <= buf.len) {
1341           break;
1342         }
1343       } else {
1344         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1345         break;
1346       }
1347     }
1348
1349     /* Post another 0-read if still reading and not closing. */
1350     if ((handle->flags & UV_HANDLE_READING) &&
1351         !(handle->flags & UV_HANDLE_READ_PENDING)) {
1352       uv_pipe_queue_read(loop, handle);
1353     }
1354   }
1355
1356   DECREASE_PENDING_REQ_COUNT(handle);
1357 }
1358
1359
1360 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1361     uv_write_t* req) {
1362   assert(handle->type == UV_NAMED_PIPE);
1363
1364   assert(handle->write_queue_size >= req->queued_bytes);
1365   handle->write_queue_size -= req->queued_bytes;
1366
1367   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1368     if (req->wait_handle != INVALID_HANDLE_VALUE) {
1369       UnregisterWait(req->wait_handle);
1370       req->wait_handle = INVALID_HANDLE_VALUE;
1371     }
1372     if (req->event_handle) {
1373       CloseHandle(req->event_handle);
1374       req->event_handle = NULL;
1375     }
1376   }
1377
1378   if (req->ipc_header) {
1379     if (req == &handle->ipc_header_write_req) {
1380       req->type = UV_UNKNOWN_REQ;
1381     } else {
1382       free(req);
1383     }
1384   } else {
1385     if (req->cb) {
1386       if (!REQ_SUCCESS(req)) {
1387         uv__set_sys_error(loop, GET_REQ_ERROR(req));
1388         ((uv_write_cb)req->cb)(req, -1);
1389       } else {
1390         ((uv_write_cb)req->cb)(req, 0);
1391       }
1392     }
1393   }
1394
1395   handle->write_reqs_pending--;
1396
1397   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1398       handle->non_overlapped_writes_tail) {
1399     assert(handle->write_reqs_pending > 0);
1400     uv_queue_non_overlapped_write(handle);
1401   }
1402
1403   if (handle->write_reqs_pending == 0) {
1404     uv_unref(loop);
1405   }
1406
1407   if (handle->write_reqs_pending == 0 &&
1408       handle->flags & UV_HANDLE_SHUTTING) {
1409     uv_want_endgame(loop, (uv_handle_t*)handle);
1410   }
1411
1412   DECREASE_PENDING_REQ_COUNT(handle);
1413 }
1414
1415
1416 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1417     uv_req_t* raw_req) {
1418   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1419
1420   assert(handle->type == UV_NAMED_PIPE);
1421
1422   if (REQ_SUCCESS(req)) {
1423     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1424     req->next_pending = handle->pending_accepts;
1425     handle->pending_accepts = req;
1426
1427     if (handle->connection_cb) {
1428       handle->connection_cb((uv_stream_t*)handle, 0);
1429     }
1430   } else {
1431     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1432       CloseHandle(req->pipeHandle);
1433       req->pipeHandle = INVALID_HANDLE_VALUE;
1434     }
1435     if (!(handle->flags & UV_HANDLE_CLOSING)) {
1436       uv_pipe_queue_accept(loop, handle, req, FALSE);
1437     }
1438   }
1439
1440   DECREASE_PENDING_REQ_COUNT(handle);
1441 }
1442
1443
1444 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1445     uv_connect_t* req) {
1446   assert(handle->type == UV_NAMED_PIPE);
1447
1448   if (req->cb) {
1449     if (REQ_SUCCESS(req)) {
1450       uv_pipe_connection_init(handle);
1451       ((uv_connect_cb)req->cb)(req, 0);
1452     } else {
1453       uv__set_sys_error(loop, GET_REQ_ERROR(req));
1454       ((uv_connect_cb)req->cb)(req, -1);
1455     }
1456   }
1457
1458   DECREASE_PENDING_REQ_COUNT(handle);
1459 }
1460
1461
1462 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1463     uv_shutdown_t* req) {
1464   assert(handle->type == UV_NAMED_PIPE);
1465
1466   /* Initialize and optionally start the eof timer. */
1467   /* This makes no sense if we've already seen EOF. */
1468   if (!(handle->flags & UV_HANDLE_EOF)) {
1469     eof_timer_init(handle);
1470
1471     /* If reading start the timer right now. */
1472     /* Otherwise uv_pipe_queue_read will start it. */
1473     if (handle->flags & UV_HANDLE_READ_PENDING) {
1474       eof_timer_start(handle);
1475     }
1476   }
1477
1478   if (req->cb) {
1479     req->cb(req, 0);
1480   }
1481
1482   DECREASE_PENDING_REQ_COUNT(handle);
1483 }
1484
1485
1486 static void eof_timer_init(uv_pipe_t* pipe) {
1487   int r;
1488
1489   assert(pipe->eof_timer == NULL);
1490   assert(pipe->flags & UV_HANDLE_CONNECTION);
1491
1492   pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
1493
1494   r = uv_timer_init(pipe->loop, pipe->eof_timer);
1495   assert(r == 0); /* timers can't fail */
1496   pipe->eof_timer->data = pipe;
1497 }
1498
1499
1500 static void eof_timer_start(uv_pipe_t* pipe) {
1501   assert(pipe->flags & UV_HANDLE_CONNECTION);
1502
1503   if (pipe->eof_timer != NULL) {
1504     uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
1505   }
1506 }
1507
1508
1509 static void eof_timer_stop(uv_pipe_t* pipe) {
1510   assert(pipe->flags & UV_HANDLE_CONNECTION);
1511
1512   if (pipe->eof_timer != NULL) {
1513     uv_timer_stop(pipe->eof_timer);
1514   }
1515 }
1516
1517
1518 static void eof_timer_cb(uv_timer_t* timer, int status) {
1519   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1520   uv_loop_t* loop = timer->loop;
1521
1522   assert(status == 0); /* timers can't fail */
1523   assert(pipe->type == UV_NAMED_PIPE);
1524
1525   /* This should always be true, since we start the timer only */
1526   /* in uv_pipe_queue_read after successfully calling ReadFile, */
1527   /* or in uv_process_pipe_shutdown_req if a read is pending, */
1528   /* and we always immediately stop the timer in */
1529   /* uv_process_pipe_read_req. */
1530   assert(pipe->flags & UV_HANDLE_READ_PENDING) ;
1531
1532   /* If there are many packets coming off the iocp then the timer callback */
1533   /* may be called before the read request is coming off the queue. */
1534   /* Therefore we check here if the read request has completed but will */
1535   /* be processed later. */
1536   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1537       HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
1538     return;
1539   }
1540
1541   /* Force both ends off the pipe. */
1542   CloseHandle(pipe->handle);
1543   pipe->handle = INVALID_HANDLE_VALUE;
1544
1545   /* Stop reading, so the pending read that is going to fail will */
1546   /* not be reported to the user. */
1547   uv_read_stop((uv_stream_t*) pipe);
1548
1549   /* Report the eof and update flags. This will get reported even if the */
1550   /* user stopped reading in the meantime. TODO: is that okay? */
1551   uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1552 }
1553
1554
1555 static void eof_timer_destroy(uv_pipe_t* pipe) {
1556   assert(pipe->flags && UV_HANDLE_CONNECTION);
1557
1558   if (pipe->eof_timer) {
1559     uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
1560     pipe->eof_timer = NULL;
1561   }
1562 }
1563
1564
1565 static void eof_timer_close_cb(uv_handle_t* handle) {
1566   assert(handle->type == UV_TIMER);
1567   free(handle);
1568 }
1569
1570
1571 void uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1572   HANDLE os_handle = (HANDLE)_get_osfhandle(file);
1573
1574   if (os_handle == INVALID_HANDLE_VALUE ||
1575       uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) {
1576     return;
1577   }
1578
1579   uv_pipe_connection_init(pipe);
1580   pipe->handle = os_handle;
1581
1582   if (pipe->ipc) {
1583     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1584     pipe->ipc_pid = uv_parent_pid();
1585     assert(pipe->ipc_pid != -1);
1586   }
1587 }