uv: Upgrade to 5462dab
[platform/upstream/nodejs.git] / deps / uv / src / win / pipe.c
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21
22 #include <assert.h>
23 #include <io.h>
24 #include <string.h>
25 #include <stdio.h>
26
27 #include "uv.h"
28 #include "internal.h"
29 #include "handle-inl.h"
30 #include "stream-inl.h"
31 #include "req-inl.h"
32
33
34 /* A zero-size buffer for use by uv_pipe_read */
35 static char uv_zero_[] = "";
36
37 /* Null uv_buf_t */
38 static const uv_buf_t uv_null_buf_ = { 0, NULL };
39
40 /* The timeout that the pipe will wait for the remote end to write data */
41 /* when the local ends wants to shut it down. */
42 static const int64_t eof_timeout = 50; /* ms */
43
44 static const int default_pending_pipe_instances = 4;
45
46 /* IPC protocol flags. */
47 #define UV_IPC_RAW_DATA       0x0001
48 #define UV_IPC_TCP_SERVER     0x0002
49 #define UV_IPC_TCP_CONNECTION 0x0004
50
51 /* IPC frame header. */
52 typedef struct {
53   int flags;
54   uint64_t raw_data_length;
55 } uv_ipc_frame_header_t;
56
57 /* IPC frame, which contains an imported TCP socket stream. */
58 typedef struct {
59   uv_ipc_frame_header_t header;
60   WSAPROTOCOL_INFOW socket_info;
61 } uv_ipc_frame_uv_stream;
62
63 static void eof_timer_init(uv_pipe_t* pipe);
64 static void eof_timer_start(uv_pipe_t* pipe);
65 static void eof_timer_stop(uv_pipe_t* pipe);
66 static void eof_timer_cb(uv_timer_t* timer, int status);
67 static void eof_timer_destroy(uv_pipe_t* pipe);
68 static void eof_timer_close_cb(uv_handle_t* handle);
69
70
71 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
72   _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId());
73 }
74
75
76 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
77   uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
78
79   handle->reqs_pending = 0;
80   handle->handle = INVALID_HANDLE_VALUE;
81   handle->name = NULL;
82   handle->ipc_pid = 0;
83   handle->remaining_ipc_rawdata_bytes = 0;
84   handle->pending_ipc_info.socket_info = NULL;
85   handle->pending_ipc_info.tcp_connection = 0;
86   handle->ipc = ipc;
87   handle->non_overlapped_writes_tail = NULL;
88
89   uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
90
91   return 0;
92 }
93
94
95 static void uv_pipe_connection_init(uv_pipe_t* handle) {
96   uv_connection_init((uv_stream_t*) handle);
97   handle->read_req.data = handle;
98   handle->eof_timer = NULL;
99 }
100
101
102 static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
103   HANDLE pipeHandle;
104
105   /*
106    * Assume that we have a duplex pipe first, so attempt to
107    * connect with GENERIC_READ | GENERIC_WRITE.
108    */
109   pipeHandle = CreateFileW(name,
110                            GENERIC_READ | GENERIC_WRITE,
111                            0,
112                            NULL,
113                            OPEN_EXISTING,
114                            FILE_FLAG_OVERLAPPED,
115                            NULL);
116   if (pipeHandle != INVALID_HANDLE_VALUE) {
117     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
118     return pipeHandle;
119   }
120
121   /*
122    * If the pipe is not duplex CreateFileW fails with
123    * ERROR_ACCESS_DENIED.  In that case try to connect
124    * as a read-only or write-only.
125    */
126   if (GetLastError() == ERROR_ACCESS_DENIED) {
127     pipeHandle = CreateFileW(name,
128                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
129                              0,
130                              NULL,
131                              OPEN_EXISTING,
132                              FILE_FLAG_OVERLAPPED,
133                              NULL);
134
135     if (pipeHandle != INVALID_HANDLE_VALUE) {
136       *duplex_flags = UV_HANDLE_READABLE;
137       return pipeHandle;
138     }
139   }
140
141   if (GetLastError() == ERROR_ACCESS_DENIED) {
142     pipeHandle = CreateFileW(name,
143                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
144                              0,
145                              NULL,
146                              OPEN_EXISTING,
147                              FILE_FLAG_OVERLAPPED,
148                              NULL);
149
150     if (pipeHandle != INVALID_HANDLE_VALUE) {
151       *duplex_flags = UV_HANDLE_WRITABLE;
152       return pipeHandle;
153     }
154   }
155
156   return INVALID_HANDLE_VALUE;
157 }
158
159
160 uv_err_t uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
161     char* name, size_t nameSize) {
162   HANDLE pipeHandle;
163   int errorno;
164   uv_err_t err;
165   char* ptr = (char*)handle;
166
167   for (;;) {
168     uv_unique_pipe_name(ptr, name, nameSize);
169
170     pipeHandle = CreateNamedPipeA(name,
171       access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
172       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
173       NULL);
174
175     if (pipeHandle != INVALID_HANDLE_VALUE) {
176       /* No name collisions.  We're done. */
177       break;
178     }
179
180     errorno = GetLastError();
181     if (errorno != ERROR_PIPE_BUSY && errorno != ERROR_ACCESS_DENIED) {
182       err = uv__new_sys_error(errorno);
183       goto error;
184     }
185
186     /* Pipe name collision.  Increment the pointer and try again. */
187     ptr++;
188   }
189
190   if (CreateIoCompletionPort(pipeHandle,
191                              loop->iocp,
192                              (ULONG_PTR)handle,
193                              0) == NULL) {
194     err = uv__new_sys_error(GetLastError());
195     goto error;
196   }
197
198   uv_pipe_connection_init(handle);
199   handle->handle = pipeHandle;
200
201   return uv_ok_;
202
203  error:
204   if (pipeHandle != INVALID_HANDLE_VALUE) {
205     CloseHandle(pipeHandle);
206   }
207
208   return err;
209 }
210
211
212 static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
213     HANDLE pipeHandle, DWORD duplex_flags) {
214   NTSTATUS nt_status;
215   IO_STATUS_BLOCK io_status;
216   FILE_MODE_INFORMATION mode_info;
217   DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
218
219   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
220     /* If this returns ERROR_INVALID_PARAMETER we probably opened something */
221     /* that is not a pipe. */
222     if (GetLastError() == ERROR_INVALID_PARAMETER) {
223       SetLastError(WSAENOTSOCK);
224     }
225     return -1;
226   }
227
228   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
229   nt_status = pNtQueryInformationFile(pipeHandle,
230                                       &io_status,
231                                       &mode_info,
232                                       sizeof(mode_info),
233                                       FileModeInformation);
234   if (nt_status != STATUS_SUCCESS) {
235     return -1;
236   }
237
238   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
239       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
240     /* Non-overlapped pipe. */
241     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
242   } else {
243     /* Overlapped pipe.  Try to associate with IOCP. */
244     if (CreateIoCompletionPort(pipeHandle,
245                                loop->iocp,
246                                (ULONG_PTR)handle,
247                                0) == NULL) {
248       handle->flags |= UV_HANDLE_EMULATE_IOCP;
249     }
250   }
251
252   handle->handle = pipeHandle;
253   handle->flags |= duplex_flags;
254
255   return 0;
256 }
257
258
259 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
260   uv_loop_t* loop;
261   uv_pipe_t* handle;
262   uv_shutdown_t* req;
263
264   req = (uv_shutdown_t*) parameter;
265   assert(req);
266   handle = (uv_pipe_t*) req->handle;
267   assert(handle);
268   loop = handle->loop;
269   assert(loop);
270
271   FlushFileBuffers(handle->handle);
272
273   /* Post completed */
274   POST_COMPLETION_FOR_REQ(loop, req);
275
276   return 0;
277 }
278
279
280 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
281   DWORD result;
282   uv_shutdown_t* req;
283   NTSTATUS nt_status;
284   IO_STATUS_BLOCK io_status;
285   FILE_PIPE_LOCAL_INFORMATION pipe_info;
286
287   if ((handle->flags & UV_HANDLE_CONNECTION) &&
288       handle->shutdown_req != NULL &&
289       handle->write_reqs_pending == 0) {
290     req = handle->shutdown_req;
291
292     /* Clear the shutdown_req field so we don't go here again. */
293     handle->shutdown_req = NULL;
294
295     if (handle->flags & UV__HANDLE_CLOSING) {
296       UNREGISTER_HANDLE_REQ(loop, handle, req);
297
298       /* Already closing. Cancel the shutdown. */
299       if (req->cb) {
300         uv__set_artificial_error(loop, UV_ECANCELED);
301         req->cb(req, -1);
302       }
303
304       DECREASE_PENDING_REQ_COUNT(handle);
305       return;
306     }
307
308     /* Try to avoid flushing the pipe buffer in the thread pool. */
309     nt_status = pNtQueryInformationFile(handle->handle,
310                                         &io_status,
311                                         &pipe_info,
312                                         sizeof pipe_info,
313                                         FilePipeLocalInformation);
314
315     if (nt_status != STATUS_SUCCESS) {
316       /* Failure */
317       UNREGISTER_HANDLE_REQ(loop, handle, req);
318
319       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
320       if (req->cb) {
321         uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status));
322         req->cb(req, -1);
323       }
324
325       DECREASE_PENDING_REQ_COUNT(handle);
326       return;
327     }
328
329     if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
330       /* Short-circuit, no need to call FlushFileBuffers. */
331       uv_insert_pending_req(loop, (uv_req_t*) req);
332       return;
333     }
334
335     /* Run FlushFileBuffers in the thread pool. */
336     result = QueueUserWorkItem(pipe_shutdown_thread_proc,
337                                req,
338                                WT_EXECUTELONGFUNCTION);
339     if (result) {
340       return;
341
342     } else {
343       /* Failure. */
344       UNREGISTER_HANDLE_REQ(loop, handle, req);
345
346       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
347       if (req->cb) {
348         uv__set_sys_error(loop, GetLastError());
349         req->cb(req, -1);
350       }
351
352       DECREASE_PENDING_REQ_COUNT(handle);
353       return;
354     }
355   }
356
357   if (handle->flags & UV__HANDLE_CLOSING &&
358       handle->reqs_pending == 0) {
359     assert(!(handle->flags & UV_HANDLE_CLOSED));
360
361     if (handle->flags & UV_HANDLE_CONNECTION) {
362       if (handle->pending_ipc_info.socket_info) {
363         free(handle->pending_ipc_info.socket_info);
364         handle->pending_ipc_info.socket_info = NULL;
365       }
366
367       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
368         if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
369           UnregisterWait(handle->read_req.wait_handle);
370           handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
371         }
372         if (handle->read_req.event_handle) {
373           CloseHandle(handle->read_req.event_handle);
374           handle->read_req.event_handle = NULL;
375         }
376       }
377     }
378
379     if (handle->flags & UV_HANDLE_PIPESERVER) {
380       assert(handle->accept_reqs);
381       free(handle->accept_reqs);
382       handle->accept_reqs = NULL;
383     }
384
385     uv__handle_close(handle);
386   }
387 }
388
389
390 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
391   handle->pending_instances = count;
392   handle->flags |= UV_HANDLE_PIPESERVER;
393 }
394
395
396 /* Creates a pipe server. */
397 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
398   uv_loop_t* loop = handle->loop;
399   int i, errorno, nameSize;
400   uv_pipe_accept_t* req;
401
402   if (handle->flags & UV_HANDLE_BOUND) {
403     uv__set_sys_error(loop, WSAEINVAL);
404     return -1;
405   }
406
407   if (!name) {
408     uv__set_sys_error(loop, WSAEINVAL);
409     return -1;
410   }
411
412   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
413     handle->pending_instances = default_pending_pipe_instances;
414   }
415
416   handle->accept_reqs = (uv_pipe_accept_t*)
417     malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
418   if (!handle->accept_reqs) {
419     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
420   }
421
422   for (i = 0; i < handle->pending_instances; i++) {
423     req = &handle->accept_reqs[i];
424     uv_req_init(loop, (uv_req_t*) req);
425     req->type = UV_ACCEPT;
426     req->data = handle;
427     req->pipeHandle = INVALID_HANDLE_VALUE;
428     req->next_pending = NULL;
429   }
430
431   /* Convert name to UTF16. */
432   nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
433   handle->name = (WCHAR*)malloc(nameSize);
434   if (!handle->name) {
435     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
436   }
437
438   if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
439     uv__set_sys_error(loop, GetLastError());
440     return -1;
441   }
442
443   /*
444    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
445    * If this fails then there's already a pipe server for the given pipe name.
446    */
447   handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
448       PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
449       FILE_FLAG_FIRST_PIPE_INSTANCE,
450       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
451       PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
452
453   if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
454     errorno = GetLastError();
455     if (errorno == ERROR_ACCESS_DENIED) {
456       uv__set_error(loop, UV_EADDRINUSE, errorno);
457     } else if (errorno == ERROR_PATH_NOT_FOUND || errorno == ERROR_INVALID_NAME) {
458       uv__set_error(loop, UV_EACCES, errorno);
459     } else {
460       uv__set_sys_error(loop, errorno);
461     }
462     goto error;
463   }
464
465   if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
466     uv__set_sys_error(loop, GetLastError());
467     goto error;
468   }
469
470   handle->pending_accepts = NULL;
471   handle->flags |= UV_HANDLE_PIPESERVER;
472   handle->flags |= UV_HANDLE_BOUND;
473
474   return 0;
475
476 error:
477   if (handle->name) {
478     free(handle->name);
479     handle->name = NULL;
480   }
481
482   if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
483     CloseHandle(handle->accept_reqs[0].pipeHandle);
484     handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
485   }
486
487   return -1;
488 }
489
490
491 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
492   uv_loop_t* loop;
493   uv_pipe_t* handle;
494   uv_connect_t* req;
495   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
496   DWORD duplex_flags;
497
498   req = (uv_connect_t*) parameter;
499   assert(req);
500   handle = (uv_pipe_t*) req->handle;
501   assert(handle);
502   loop = handle->loop;
503   assert(loop);
504
505   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
506   /* We wait for the pipe to become available with WaitNamedPipe. */
507   while (WaitNamedPipeW(handle->name, 30000)) {
508     /* The pipe is now available, try to connect. */
509     pipeHandle = open_named_pipe(handle->name, &duplex_flags);
510     if (pipeHandle != INVALID_HANDLE_VALUE) {
511       break;
512     }
513
514     SwitchToThread();
515   }
516
517   if (pipeHandle != INVALID_HANDLE_VALUE &&
518       !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
519     SET_REQ_SUCCESS(req);
520   } else {
521     SET_REQ_ERROR(req, GetLastError());
522   }
523
524   /* Post completed */
525   POST_COMPLETION_FOR_REQ(loop, req);
526
527   return 0;
528 }
529
530
531 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
532     const char* name, uv_connect_cb cb) {
533   uv_loop_t* loop = handle->loop;
534   int errorno, nameSize;
535   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
536   DWORD duplex_flags;
537
538   uv_req_init(loop, (uv_req_t*) req);
539   req->type = UV_CONNECT;
540   req->handle = (uv_stream_t*) handle;
541   req->cb = cb;
542
543   /* Convert name to UTF16. */
544   nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
545   handle->name = (WCHAR*)malloc(nameSize);
546   if (!handle->name) {
547     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
548   }
549
550   if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
551     errorno = GetLastError();
552     goto error;
553   }
554
555   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
556   if (pipeHandle == INVALID_HANDLE_VALUE) {
557     if (GetLastError() == ERROR_PIPE_BUSY) {
558       /* Wait for the server to make a pipe instance available. */
559       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
560                              req,
561                              WT_EXECUTELONGFUNCTION)) {
562         errorno = GetLastError();
563         goto error;
564       }
565
566       REGISTER_HANDLE_REQ(loop, handle, req);
567       handle->reqs_pending++;
568
569       return;
570     }
571
572     errorno = GetLastError();
573     goto error;
574   }
575
576   assert(pipeHandle != INVALID_HANDLE_VALUE);
577
578   if (uv_set_pipe_handle(loop,
579                          (uv_pipe_t*) req->handle,
580                          pipeHandle,
581                          duplex_flags)) {
582     errorno = GetLastError();
583     goto error;
584   }
585
586   SET_REQ_SUCCESS(req);
587   uv_insert_pending_req(loop, (uv_req_t*) req);
588   handle->reqs_pending++;
589   REGISTER_HANDLE_REQ(loop, handle, req);
590   return;
591
592 error:
593   if (handle->name) {
594     free(handle->name);
595     handle->name = NULL;
596   }
597
598   if (pipeHandle != INVALID_HANDLE_VALUE) {
599     CloseHandle(pipeHandle);
600   }
601
602   /* Make this req pending reporting an error. */
603   SET_REQ_ERROR(req, errorno);
604   uv_insert_pending_req(loop, (uv_req_t*) req);
605   handle->reqs_pending++;
606   REGISTER_HANDLE_REQ(loop, handle, req);
607   return;
608 }
609
610
611 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
612 /* with it. */
613 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
614   int i;
615   HANDLE pipeHandle;
616
617   if (handle->name) {
618     free(handle->name);
619     handle->name = NULL;
620   }
621
622   if (handle->flags & UV_HANDLE_PIPESERVER) {
623     for (i = 0; i < handle->pending_instances; i++) {
624       pipeHandle = handle->accept_reqs[i].pipeHandle;
625       if (pipeHandle != INVALID_HANDLE_VALUE) {
626         CloseHandle(pipeHandle);
627         handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
628       }
629     }
630   }
631
632   if (handle->flags & UV_HANDLE_CONNECTION) {
633     handle->flags &= ~UV_HANDLE_WRITABLE;
634     eof_timer_destroy(handle);
635   }
636
637   if ((handle->flags & UV_HANDLE_CONNECTION)
638       && handle->handle != INVALID_HANDLE_VALUE) {
639     CloseHandle(handle->handle);
640     handle->handle = INVALID_HANDLE_VALUE;
641   }
642 }
643
644
645 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
646   if (handle->flags & UV_HANDLE_READING) {
647     handle->flags &= ~UV_HANDLE_READING;
648     DECREASE_ACTIVE_COUNT(loop, handle);
649   }
650
651   if (handle->flags & UV_HANDLE_LISTENING) {
652     handle->flags &= ~UV_HANDLE_LISTENING;
653     DECREASE_ACTIVE_COUNT(loop, handle);
654   }
655
656   uv_pipe_cleanup(loop, handle);
657
658   if (handle->reqs_pending == 0) {
659     uv_want_endgame(loop, (uv_handle_t*) handle);
660   }
661
662   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
663   uv__handle_closing(handle);
664 }
665
666
667 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
668     uv_pipe_accept_t* req, BOOL firstInstance) {
669   assert(handle->flags & UV_HANDLE_LISTENING);
670
671   if (!firstInstance) {
672     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
673
674     req->pipeHandle = CreateNamedPipeW(handle->name,
675         PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
676         PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
677         PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
678
679     if (req->pipeHandle == INVALID_HANDLE_VALUE) {
680       SET_REQ_ERROR(req, GetLastError());
681       uv_insert_pending_req(loop, (uv_req_t*) req);
682       handle->reqs_pending++;
683       return;
684     }
685
686     if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
687       CloseHandle(req->pipeHandle);
688       req->pipeHandle = INVALID_HANDLE_VALUE;
689       SET_REQ_ERROR(req, GetLastError());
690       uv_insert_pending_req(loop, (uv_req_t*) req);
691       handle->reqs_pending++;
692       return;
693     }
694   }
695
696   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
697
698   /* Prepare the overlapped structure. */
699   memset(&(req->overlapped), 0, sizeof(req->overlapped));
700
701   if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
702       GetLastError() != ERROR_IO_PENDING) {
703     if (GetLastError() == ERROR_PIPE_CONNECTED) {
704       SET_REQ_SUCCESS(req);
705     } else {
706       CloseHandle(req->pipeHandle);
707       req->pipeHandle = INVALID_HANDLE_VALUE;
708       /* Make this req pending reporting an error. */
709       SET_REQ_ERROR(req, GetLastError());
710     }
711     uv_insert_pending_req(loop, (uv_req_t*) req);
712     handle->reqs_pending++;
713     return;
714   }
715
716   handle->reqs_pending++;
717 }
718
719
720 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
721   uv_loop_t* loop = server->loop;
722   uv_pipe_t* pipe_client;
723   uv_pipe_accept_t* req;
724
725   if (server->ipc) {
726     if (!server->pending_ipc_info.socket_info) {
727       /* No valid pending sockets. */
728       uv__set_sys_error(loop, WSAEWOULDBLOCK);
729       return -1;
730     }
731
732     return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
733                          server->pending_ipc_info.tcp_connection);
734   } else {
735     pipe_client = (uv_pipe_t*)client;
736
737     /* Find a connection instance that has been connected, but not yet */
738     /* accepted. */
739     req = server->pending_accepts;
740
741     if (!req) {
742       /* No valid connections found, so we error out. */
743       uv__set_sys_error(loop, WSAEWOULDBLOCK);
744       return -1;
745     }
746
747     /* Initialize the client handle and copy the pipeHandle to the client */
748     uv_pipe_connection_init(pipe_client);
749     pipe_client->handle = req->pipeHandle;
750     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
751
752     /* Prepare the req to pick up a new connection */
753     server->pending_accepts = req->next_pending;
754     req->next_pending = NULL;
755     req->pipeHandle = INVALID_HANDLE_VALUE;
756
757     if (!(server->flags & UV__HANDLE_CLOSING)) {
758       uv_pipe_queue_accept(loop, server, req, FALSE);
759     }
760   }
761
762   return 0;
763 }
764
765
766 /* Starts listening for connections for the given pipe. */
767 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
768   uv_loop_t* loop = handle->loop;
769   int i;
770
771   if (handle->flags & UV_HANDLE_LISTENING) {
772     handle->connection_cb = cb;
773   }
774
775   if (!(handle->flags & UV_HANDLE_BOUND)) {
776     uv__set_artificial_error(loop, UV_EINVAL);
777     return -1;
778   }
779
780   if (handle->flags & UV_HANDLE_READING) {
781     uv__set_artificial_error(loop, UV_EISCONN);
782     return -1;
783   }
784
785   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
786     uv__set_artificial_error(loop, UV_ENOTSUP);
787     return -1;
788   }
789
790   handle->flags |= UV_HANDLE_LISTENING;
791   INCREASE_ACTIVE_COUNT(loop, handle);
792   handle->connection_cb = cb;
793
794   /* First pipe handle should have already been created in uv_pipe_bind */
795   assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
796
797   for (i = 0; i < handle->pending_instances; i++) {
798     uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
799   }
800
801   return 0;
802 }
803
804
805 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
806   int result;
807   DWORD bytes;
808   uv_read_t* req = (uv_read_t*) parameter;
809   uv_pipe_t* handle = (uv_pipe_t*) req->data;
810   uv_loop_t* loop = handle->loop;
811
812   assert(req != NULL);
813   assert(req->type == UV_READ);
814   assert(handle->type == UV_NAMED_PIPE);
815
816   result = ReadFile(handle->handle,
817                     &uv_zero_,
818                     0,
819                     &bytes,
820                     NULL);
821
822   if (!result) {
823     SET_REQ_ERROR(req, GetLastError());
824   }
825
826   POST_COMPLETION_FOR_REQ(loop, req);
827   return 0;
828 }
829
830
831 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
832   int result;
833   DWORD bytes;
834   uv_write_t* req = (uv_write_t*) parameter;
835   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
836   uv_loop_t* loop = handle->loop;
837
838   assert(req != NULL);
839   assert(req->type == UV_WRITE);
840   assert(handle->type == UV_NAMED_PIPE);
841   assert(req->write_buffer.base);
842
843   result = WriteFile(handle->handle,
844                      req->write_buffer.base,
845                      req->write_buffer.len,
846                      &bytes,
847                      NULL);
848
849   if (!result) {
850     SET_REQ_ERROR(req, GetLastError());
851   }
852
853   POST_COMPLETION_FOR_REQ(loop, req);
854   return 0;
855 }
856
857
858 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
859   uv_read_t* req;
860   uv_tcp_t* handle;
861
862   req = (uv_read_t*) context;
863   assert(req != NULL);
864   handle = (uv_tcp_t*)req->data;
865   assert(handle != NULL);
866   assert(!timed_out);
867
868   if (!PostQueuedCompletionStatus(handle->loop->iocp,
869                                   req->overlapped.InternalHigh,
870                                   0,
871                                   &req->overlapped)) {
872     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
873   }
874 }
875
876
877 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
878   uv_write_t* req;
879   uv_tcp_t* handle;
880
881   req = (uv_write_t*) context;
882   assert(req != NULL);
883   handle = (uv_tcp_t*)req->handle;
884   assert(handle != NULL);
885   assert(!timed_out);
886
887   if (!PostQueuedCompletionStatus(handle->loop->iocp,
888                                   req->overlapped.InternalHigh,
889                                   0,
890                                   &req->overlapped)) {
891     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
892   }
893 }
894
895
896 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
897   uv_read_t* req;
898   int result;
899
900   assert(handle->flags & UV_HANDLE_READING);
901   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
902
903   assert(handle->handle != INVALID_HANDLE_VALUE);
904
905   req = &handle->read_req;
906
907   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
908     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
909                            req,
910                            WT_EXECUTELONGFUNCTION)) {
911       /* Make this req pending reporting an error. */
912       SET_REQ_ERROR(req, GetLastError());
913       goto error;
914     }
915   } else {
916     memset(&req->overlapped, 0, sizeof(req->overlapped));
917     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
918       req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
919     }
920
921     /* Do 0-read */
922     result = ReadFile(handle->handle,
923                       &uv_zero_,
924                       0,
925                       NULL,
926                       &req->overlapped);
927
928     if (!result && GetLastError() != ERROR_IO_PENDING) {
929       /* Make this req pending reporting an error. */
930       SET_REQ_ERROR(req, GetLastError());
931       goto error;
932     }
933
934     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
935       if (!req->event_handle) {
936         req->event_handle = CreateEvent(NULL, 0, 0, NULL);
937         if (!req->event_handle) {
938           uv_fatal_error(GetLastError(), "CreateEvent");
939         }
940       }
941       if (req->wait_handle == INVALID_HANDLE_VALUE) {
942         if (!RegisterWaitForSingleObject(&req->wait_handle,
943             req->overlapped.hEvent, post_completion_read_wait, (void*) req,
944             INFINITE, WT_EXECUTEINWAITTHREAD)) {
945           SET_REQ_ERROR(req, GetLastError());
946           goto error;
947         }
948       }
949     }
950   }
951
952   /* Start the eof timer if there is one */
953   eof_timer_start(handle);
954   handle->flags |= UV_HANDLE_READ_PENDING;
955   handle->reqs_pending++;
956   return;
957
958 error:
959   uv_insert_pending_req(loop, (uv_req_t*)req);
960   handle->flags |= UV_HANDLE_READ_PENDING;
961   handle->reqs_pending++;
962 }
963
964
965 static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
966     uv_read_cb read_cb, uv_read2_cb read2_cb) {
967   uv_loop_t* loop = handle->loop;
968
969   handle->flags |= UV_HANDLE_READING;
970   INCREASE_ACTIVE_COUNT(loop, handle);
971   handle->read_cb = read_cb;
972   handle->read2_cb = read2_cb;
973   handle->alloc_cb = alloc_cb;
974
975   /* If reading was stopped and then started again, there could still be a */
976   /* read request pending. */
977   if (!(handle->flags & UV_HANDLE_READ_PENDING))
978     uv_pipe_queue_read(loop, handle);
979
980   return 0;
981 }
982
983
984 int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
985     uv_read_cb read_cb) {
986   return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
987 }
988
989
990 int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
991     uv_read2_cb read_cb) {
992   return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
993 }
994
995
996 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
997     uv_write_t* req) {
998   req->next_req = NULL;
999   if (handle->non_overlapped_writes_tail) {
1000     req->next_req =
1001       handle->non_overlapped_writes_tail->next_req;
1002     handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1003     handle->non_overlapped_writes_tail = req;
1004   } else {
1005     req->next_req = (uv_req_t*)req;
1006     handle->non_overlapped_writes_tail = req;
1007   }
1008 }
1009
1010
1011 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1012   uv_write_t* req;
1013
1014   if (handle->non_overlapped_writes_tail) {
1015     req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
1016
1017     if (req == handle->non_overlapped_writes_tail) {
1018       handle->non_overlapped_writes_tail = NULL;
1019     } else {
1020       handle->non_overlapped_writes_tail->next_req =
1021         req->next_req;
1022     }
1023
1024     return req;
1025   } else {
1026     /* queue empty */
1027     return NULL;
1028   }
1029 }
1030
1031
1032 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1033   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1034   if (req) {
1035     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1036                            req,
1037                            WT_EXECUTELONGFUNCTION)) {
1038       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1039     }
1040   }
1041 }
1042
1043
1044 static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
1045     uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
1046     uv_stream_t* send_handle, uv_write_cb cb) {
1047   int result;
1048   uv_tcp_t* tcp_send_handle;
1049   uv_write_t* ipc_header_req;
1050   uv_ipc_frame_uv_stream ipc_frame;
1051
1052   if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) {
1053     uv__set_artificial_error(loop, UV_ENOTSUP);
1054     return -1;
1055   }
1056
1057   /* Only TCP handles are supported for sharing. */
1058   if (send_handle && ((send_handle->type != UV_TCP) ||
1059       (!(send_handle->flags & UV_HANDLE_BOUND) &&
1060        !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
1061     uv__set_artificial_error(loop, UV_ENOTSUP);
1062     return -1;
1063   }
1064
1065   assert(handle->handle != INVALID_HANDLE_VALUE);
1066
1067   uv_req_init(loop, (uv_req_t*) req);
1068   req->type = UV_WRITE;
1069   req->handle = (uv_stream_t*) handle;
1070   req->cb = cb;
1071   req->ipc_header = 0;
1072   req->event_handle = NULL;
1073   req->wait_handle = INVALID_HANDLE_VALUE;
1074   memset(&req->overlapped, 0, sizeof(req->overlapped));
1075
1076   if (handle->ipc) {
1077     assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1078     ipc_frame.header.flags = 0;
1079
1080     /* Use the IPC framing protocol. */
1081     if (send_handle) {
1082       tcp_send_handle = (uv_tcp_t*)send_handle;
1083
1084       if (uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
1085           &ipc_frame.socket_info)) {
1086         return -1;
1087       }
1088       ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
1089
1090       if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
1091         ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
1092       }
1093     }
1094
1095     if (bufcnt == 1) {
1096       ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1097       ipc_frame.header.raw_data_length = bufs[0].len;
1098     }
1099
1100     /*
1101      * Use the provided req if we're only doing a single write.
1102      * If we're doing multiple writes, use ipc_header_write_req to do
1103      * the first write, and then use the provided req for the second write.
1104      */
1105     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1106       ipc_header_req = req;
1107     } else {
1108       /*
1109        * Try to use the preallocated write req if it's available.
1110        * Otherwise allocate a new one.
1111        */
1112       if (handle->ipc_header_write_req.type != UV_WRITE) {
1113         ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
1114       } else {
1115         ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
1116         if (!ipc_header_req) {
1117           uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1118         }
1119       }
1120
1121       uv_req_init(loop, (uv_req_t*) ipc_header_req);
1122       ipc_header_req->type = UV_WRITE;
1123       ipc_header_req->handle = (uv_stream_t*) handle;
1124       ipc_header_req->cb = NULL;
1125       ipc_header_req->ipc_header = 1;
1126     }
1127
1128     /* Write the header or the whole frame. */
1129     memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
1130
1131     result = WriteFile(handle->handle,
1132                         &ipc_frame,
1133                         ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1134                           sizeof(ipc_frame) : sizeof(ipc_frame.header),
1135                         NULL,
1136                         &ipc_header_req->overlapped);
1137     if (!result && GetLastError() != ERROR_IO_PENDING) {
1138       uv__set_sys_error(loop, GetLastError());
1139       return -1;
1140     }
1141
1142     if (result) {
1143       /* Request completed immediately. */
1144       ipc_header_req->queued_bytes = 0;
1145     } else {
1146       /* Request queued by the kernel. */
1147       ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1148         sizeof(ipc_frame) : sizeof(ipc_frame.header);
1149       handle->write_queue_size += ipc_header_req->queued_bytes;
1150     }
1151
1152     REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
1153     handle->reqs_pending++;
1154     handle->write_reqs_pending++;
1155
1156     /* If we don't have any raw data to write - we're done. */
1157     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1158       return 0;
1159     }
1160   }
1161
1162   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1163     req->write_buffer = bufs[0];
1164     uv_insert_non_overlapped_write_req(handle, req);
1165     if (handle->write_reqs_pending == 0) {
1166       uv_queue_non_overlapped_write(handle);
1167     }
1168
1169     /* Request queued by the kernel. */
1170     req->queued_bytes = uv_count_bufs(bufs, bufcnt);
1171     handle->write_queue_size += req->queued_bytes;
1172   } else {
1173     result = WriteFile(handle->handle,
1174                        bufs[0].base,
1175                        bufs[0].len,
1176                        NULL,
1177                        &req->overlapped);
1178
1179     if (!result && GetLastError() != ERROR_IO_PENDING) {
1180       uv__set_sys_error(loop, GetLastError());
1181       return -1;
1182     }
1183
1184     if (result) {
1185       /* Request completed immediately. */
1186       req->queued_bytes = 0;
1187     } else {
1188       /* Request queued by the kernel. */
1189       req->queued_bytes = uv_count_bufs(bufs, bufcnt);
1190       handle->write_queue_size += req->queued_bytes;
1191     }
1192
1193     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1194       req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1195       if (!req->event_handle) {
1196         uv_fatal_error(GetLastError(), "CreateEvent");
1197       }
1198       if (!RegisterWaitForSingleObject(&req->wait_handle,
1199           req->overlapped.hEvent, post_completion_write_wait, (void*) req,
1200           INFINITE, WT_EXECUTEINWAITTHREAD)) {
1201         uv__set_sys_error(loop, GetLastError());
1202         return -1;
1203       }
1204     }
1205   }
1206
1207   REGISTER_HANDLE_REQ(loop, handle, req);
1208   handle->reqs_pending++;
1209   handle->write_reqs_pending++;
1210
1211   return 0;
1212 }
1213
1214
1215 int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
1216     uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
1217   return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb);
1218 }
1219
1220
1221 int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
1222     uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) {
1223   if (!handle->ipc) {
1224     uv__set_artificial_error(loop, UV_EINVAL);
1225     return -1;
1226   }
1227
1228   return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb);
1229 }
1230
1231
1232 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1233     uv_buf_t buf) {
1234   /* If there is an eof timer running, we don't need it any more, */
1235   /* so discard it. */
1236   eof_timer_destroy(handle);
1237
1238   handle->flags &= ~UV_HANDLE_READABLE;
1239   uv_read_stop((uv_stream_t*) handle);
1240
1241   uv__set_artificial_error(loop, UV_EOF);
1242   if (handle->read2_cb) {
1243     handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE);
1244   } else {
1245     handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_);
1246   }
1247 }
1248
1249
1250 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1251     uv_buf_t buf) {
1252   /* If there is an eof timer running, we don't need it any more, */
1253   /* so discard it. */
1254   eof_timer_destroy(handle);
1255
1256   uv_read_stop((uv_stream_t*) handle);
1257
1258   uv__set_sys_error(loop, error);
1259   if (handle->read2_cb) {
1260     handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE);
1261   } else {
1262     handle->read_cb((uv_stream_t*)handle, -1, buf);
1263   }
1264 }
1265
1266
1267 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1268     int error, uv_buf_t buf) {
1269   if (error == ERROR_BROKEN_PIPE) {
1270     uv_pipe_read_eof(loop, handle, buf);
1271   } else {
1272     uv_pipe_read_error(loop, handle, error, buf);
1273   }
1274 }
1275
1276
1277 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1278     uv_req_t* req) {
1279   DWORD bytes, avail;
1280   uv_buf_t buf;
1281   uv_ipc_frame_uv_stream ipc_frame;
1282
1283   assert(handle->type == UV_NAMED_PIPE);
1284
1285   handle->flags &= ~UV_HANDLE_READ_PENDING;
1286   eof_timer_stop(handle);
1287
1288   if (!REQ_SUCCESS(req)) {
1289     /* An error occurred doing the 0-read. */
1290     if (handle->flags & UV_HANDLE_READING) {
1291       uv_pipe_read_error_or_eof(loop,
1292                                 handle,
1293                                 GET_REQ_ERROR(req),
1294                                 uv_null_buf_);
1295     }
1296   } else {
1297     /* Do non-blocking reads until the buffer is empty */
1298     while (handle->flags & UV_HANDLE_READING) {
1299       if (!PeekNamedPipe(handle->handle,
1300                           NULL,
1301                           0,
1302                           NULL,
1303                           &avail,
1304                           NULL)) {
1305         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1306         break;
1307       }
1308
1309       if (avail == 0) {
1310         /* There is nothing to read after all. */
1311         break;
1312       }
1313
1314       if (handle->ipc) {
1315         /* Use the IPC framing protocol to read the incoming data. */
1316         if (handle->remaining_ipc_rawdata_bytes == 0) {
1317           /* We're reading a new frame.  First, read the header. */
1318           assert(avail >= sizeof(ipc_frame.header));
1319
1320           if (!ReadFile(handle->handle,
1321                         &ipc_frame.header,
1322                         sizeof(ipc_frame.header),
1323                         &bytes,
1324                         NULL)) {
1325             uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1326               uv_null_buf_);
1327             break;
1328           }
1329
1330           assert(bytes == sizeof(ipc_frame.header));
1331           assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
1332             UV_IPC_TCP_CONNECTION));
1333
1334           if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
1335             assert(avail - sizeof(ipc_frame.header) >=
1336               sizeof(ipc_frame.socket_info));
1337
1338             /* Read the TCP socket info. */
1339             if (!ReadFile(handle->handle,
1340                           &ipc_frame.socket_info,
1341                           sizeof(ipc_frame) - sizeof(ipc_frame.header),
1342                           &bytes,
1343                           NULL)) {
1344               uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1345                 uv_null_buf_);
1346               break;
1347             }
1348
1349             assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1350
1351             /* Store the pending socket info. */
1352             assert(!handle->pending_ipc_info.socket_info);
1353             handle->pending_ipc_info.socket_info =
1354               (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
1355             if (!handle->pending_ipc_info.socket_info) {
1356               uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1357             }
1358
1359             *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
1360             handle->pending_ipc_info.tcp_connection =
1361               ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
1362           }
1363
1364           if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1365             handle->remaining_ipc_rawdata_bytes =
1366               ipc_frame.header.raw_data_length;
1367             continue;
1368           }
1369         } else {
1370           avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
1371         }
1372       }
1373
1374       buf = handle->alloc_cb((uv_handle_t*) handle, avail);
1375       assert(buf.len > 0);
1376
1377       if (ReadFile(handle->handle,
1378                    buf.base,
1379                    buf.len,
1380                    &bytes,
1381                    NULL)) {
1382         /* Successful read */
1383         if (handle->ipc) {
1384           assert(handle->remaining_ipc_rawdata_bytes >= bytes);
1385           handle->remaining_ipc_rawdata_bytes =
1386             handle->remaining_ipc_rawdata_bytes - bytes;
1387           if (handle->read2_cb) {
1388             handle->read2_cb(handle, bytes, buf,
1389               handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
1390           } else if (handle->read_cb) {
1391             handle->read_cb((uv_stream_t*)handle, bytes, buf);
1392           }
1393
1394           if (handle->pending_ipc_info.socket_info) {
1395             free(handle->pending_ipc_info.socket_info);
1396             handle->pending_ipc_info.socket_info = NULL;
1397           }
1398         } else {
1399           handle->read_cb((uv_stream_t*)handle, bytes, buf);
1400         }
1401
1402         /* Read again only if bytes == buf.len */
1403         if (bytes <= buf.len) {
1404           break;
1405         }
1406       } else {
1407         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1408         break;
1409       }
1410     }
1411
1412     /* Post another 0-read if still reading and not closing. */
1413     if ((handle->flags & UV_HANDLE_READING) &&
1414         !(handle->flags & UV_HANDLE_READ_PENDING)) {
1415       uv_pipe_queue_read(loop, handle);
1416     }
1417   }
1418
1419   DECREASE_PENDING_REQ_COUNT(handle);
1420 }
1421
1422
1423 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1424     uv_write_t* req) {
1425   assert(handle->type == UV_NAMED_PIPE);
1426
1427   assert(handle->write_queue_size >= req->queued_bytes);
1428   handle->write_queue_size -= req->queued_bytes;
1429
1430   UNREGISTER_HANDLE_REQ(loop, handle, req);
1431
1432   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1433     if (req->wait_handle != INVALID_HANDLE_VALUE) {
1434       UnregisterWait(req->wait_handle);
1435       req->wait_handle = INVALID_HANDLE_VALUE;
1436     }
1437     if (req->event_handle) {
1438       CloseHandle(req->event_handle);
1439       req->event_handle = NULL;
1440     }
1441   }
1442
1443   if (req->ipc_header) {
1444     if (req == &handle->ipc_header_write_req) {
1445       req->type = UV_UNKNOWN_REQ;
1446     } else {
1447       free(req);
1448     }
1449   } else {
1450     if (req->cb) {
1451       if (!REQ_SUCCESS(req)) {
1452         uv__set_sys_error(loop, GET_REQ_ERROR(req));
1453         ((uv_write_cb)req->cb)(req, -1);
1454       } else {
1455         ((uv_write_cb)req->cb)(req, 0);
1456       }
1457     }
1458   }
1459
1460   handle->write_reqs_pending--;
1461
1462   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1463       handle->non_overlapped_writes_tail) {
1464     assert(handle->write_reqs_pending > 0);
1465     uv_queue_non_overlapped_write(handle);
1466   }
1467
1468   if (handle->shutdown_req != NULL &&
1469       handle->write_reqs_pending == 0) {
1470     uv_want_endgame(loop, (uv_handle_t*)handle);
1471   }
1472
1473   DECREASE_PENDING_REQ_COUNT(handle);
1474 }
1475
1476
1477 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1478     uv_req_t* raw_req) {
1479   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1480
1481   assert(handle->type == UV_NAMED_PIPE);
1482
1483   if (REQ_SUCCESS(req)) {
1484     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1485     req->next_pending = handle->pending_accepts;
1486     handle->pending_accepts = req;
1487
1488     if (handle->connection_cb) {
1489       handle->connection_cb((uv_stream_t*)handle, 0);
1490     }
1491   } else {
1492     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1493       CloseHandle(req->pipeHandle);
1494       req->pipeHandle = INVALID_HANDLE_VALUE;
1495     }
1496     if (!(handle->flags & UV__HANDLE_CLOSING)) {
1497       uv_pipe_queue_accept(loop, handle, req, FALSE);
1498     }
1499   }
1500
1501   DECREASE_PENDING_REQ_COUNT(handle);
1502 }
1503
1504
1505 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1506     uv_connect_t* req) {
1507   assert(handle->type == UV_NAMED_PIPE);
1508
1509   UNREGISTER_HANDLE_REQ(loop, handle, req);
1510
1511   if (req->cb) {
1512     if (REQ_SUCCESS(req)) {
1513       uv_pipe_connection_init(handle);
1514       ((uv_connect_cb)req->cb)(req, 0);
1515     } else {
1516       uv__set_sys_error(loop, GET_REQ_ERROR(req));
1517       ((uv_connect_cb)req->cb)(req, -1);
1518     }
1519   }
1520
1521   DECREASE_PENDING_REQ_COUNT(handle);
1522 }
1523
1524
1525 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1526     uv_shutdown_t* req) {
1527   assert(handle->type == UV_NAMED_PIPE);
1528
1529   UNREGISTER_HANDLE_REQ(loop, handle, req);
1530
1531   /* Initialize and optionally start the eof timer. */
1532   /* This makes no sense if we've already seen EOF. */
1533   if (handle->flags & UV_HANDLE_READABLE) {
1534     eof_timer_init(handle);
1535
1536     /* If reading start the timer right now. */
1537     /* Otherwise uv_pipe_queue_read will start it. */
1538     if (handle->flags & UV_HANDLE_READ_PENDING) {
1539       eof_timer_start(handle);
1540     }
1541   }
1542
1543   if (req->cb) {
1544     req->cb(req, 0);
1545   }
1546
1547   DECREASE_PENDING_REQ_COUNT(handle);
1548 }
1549
1550
1551 static void eof_timer_init(uv_pipe_t* pipe) {
1552   int r;
1553
1554   assert(pipe->eof_timer == NULL);
1555   assert(pipe->flags & UV_HANDLE_CONNECTION);
1556
1557   pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
1558
1559   r = uv_timer_init(pipe->loop, pipe->eof_timer);
1560   assert(r == 0); /* timers can't fail */
1561   pipe->eof_timer->data = pipe;
1562   uv_unref((uv_handle_t*) pipe->eof_timer);
1563 }
1564
1565
1566 static void eof_timer_start(uv_pipe_t* pipe) {
1567   assert(pipe->flags & UV_HANDLE_CONNECTION);
1568
1569   if (pipe->eof_timer != NULL) {
1570     uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
1571   }
1572 }
1573
1574
1575 static void eof_timer_stop(uv_pipe_t* pipe) {
1576   assert(pipe->flags & UV_HANDLE_CONNECTION);
1577
1578   if (pipe->eof_timer != NULL) {
1579     uv_timer_stop(pipe->eof_timer);
1580   }
1581 }
1582
1583
1584 static void eof_timer_cb(uv_timer_t* timer, int status) {
1585   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1586   uv_loop_t* loop = timer->loop;
1587
1588   assert(status == 0); /* timers can't fail */
1589   assert(pipe->type == UV_NAMED_PIPE);
1590
1591   /* This should always be true, since we start the timer only */
1592   /* in uv_pipe_queue_read after successfully calling ReadFile, */
1593   /* or in uv_process_pipe_shutdown_req if a read is pending, */
1594   /* and we always immediately stop the timer in */
1595   /* uv_process_pipe_read_req. */
1596   assert(pipe->flags & UV_HANDLE_READ_PENDING);
1597
1598   /* If there are many packets coming off the iocp then the timer callback */
1599   /* may be called before the read request is coming off the queue. */
1600   /* Therefore we check here if the read request has completed but will */
1601   /* be processed later. */
1602   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1603       HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
1604     return;
1605   }
1606
1607   /* Force both ends off the pipe. */
1608   CloseHandle(pipe->handle);
1609   pipe->handle = INVALID_HANDLE_VALUE;
1610
1611   /* Stop reading, so the pending read that is going to fail will */
1612   /* not be reported to the user. */
1613   uv_read_stop((uv_stream_t*) pipe);
1614
1615   /* Report the eof and update flags. This will get reported even if the */
1616   /* user stopped reading in the meantime. TODO: is that okay? */
1617   uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1618 }
1619
1620
1621 static void eof_timer_destroy(uv_pipe_t* pipe) {
1622   assert(pipe->flags && UV_HANDLE_CONNECTION);
1623
1624   if (pipe->eof_timer) {
1625     uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
1626     pipe->eof_timer = NULL;
1627   }
1628 }
1629
1630
1631 static void eof_timer_close_cb(uv_handle_t* handle) {
1632   assert(handle->type == UV_TIMER);
1633   free(handle);
1634 }
1635
1636
1637 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1638   HANDLE os_handle = (HANDLE)_get_osfhandle(file);
1639
1640   if (os_handle == INVALID_HANDLE_VALUE ||
1641       uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
1642     uv__set_sys_error(pipe->loop, WSAEINVAL);
1643     return -1;
1644   }
1645
1646   uv_pipe_connection_init(pipe);
1647   pipe->handle = os_handle;
1648   pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1649
1650   if (pipe->ipc) {
1651     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1652     pipe->ipc_pid = uv_parent_pid();
1653     assert(pipe->ipc_pid != -1);
1654   }
1655   return 0;
1656 }