1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
30 #include "handle-inl.h"
31 #include "stream-inl.h"
35 /* A zero-size buffer for use by uv_pipe_read */
36 static char uv_zero_[] = "";
39 static const uv_buf_t uv_null_buf_ = { 0, NULL };
41 /* The timeout that the pipe will wait for the remote end to write data */
42 /* when the local ends wants to shut it down. */
43 static const int64_t eof_timeout = 50; /* ms */
45 static const int default_pending_pipe_instances = 4;
47 /* IPC protocol flags. */
48 #define UV_IPC_RAW_DATA 0x0001
49 #define UV_IPC_TCP_SERVER 0x0002
50 #define UV_IPC_TCP_CONNECTION 0x0004
52 /* IPC frame header. */
55 uint64_t raw_data_length;
56 } uv_ipc_frame_header_t;
58 /* IPC frame, which contains an imported TCP socket stream. */
60 uv_ipc_frame_header_t header;
61 WSAPROTOCOL_INFOW socket_info;
62 } uv_ipc_frame_uv_stream;
64 static void eof_timer_init(uv_pipe_t* pipe);
65 static void eof_timer_start(uv_pipe_t* pipe);
66 static void eof_timer_stop(uv_pipe_t* pipe);
67 static void eof_timer_cb(uv_timer_t* timer, int status);
68 static void eof_timer_destroy(uv_pipe_t* pipe);
69 static void eof_timer_close_cb(uv_handle_t* handle);
72 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
73 _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId());
77 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
78 uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
80 handle->reqs_pending = 0;
81 handle->handle = INVALID_HANDLE_VALUE;
84 handle->remaining_ipc_rawdata_bytes = 0;
85 handle->pending_ipc_info.socket_info = NULL;
86 handle->pending_ipc_info.tcp_connection = 0;
88 handle->non_overlapped_writes_tail = NULL;
90 uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
96 static void uv_pipe_connection_init(uv_pipe_t* handle) {
97 uv_connection_init((uv_stream_t*) handle);
98 handle->read_req.data = handle;
99 handle->eof_timer = NULL;
103 static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
107 * Assume that we have a duplex pipe first, so attempt to
108 * connect with GENERIC_READ | GENERIC_WRITE.
110 pipeHandle = CreateFileW(name,
111 GENERIC_READ | GENERIC_WRITE,
115 FILE_FLAG_OVERLAPPED,
117 if (pipeHandle != INVALID_HANDLE_VALUE) {
118 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
123 * If the pipe is not duplex CreateFileW fails with
124 * ERROR_ACCESS_DENIED. In that case try to connect
125 * as a read-only or write-only.
127 if (GetLastError() == ERROR_ACCESS_DENIED) {
128 pipeHandle = CreateFileW(name,
129 GENERIC_READ | FILE_WRITE_ATTRIBUTES,
133 FILE_FLAG_OVERLAPPED,
136 if (pipeHandle != INVALID_HANDLE_VALUE) {
137 *duplex_flags = UV_HANDLE_READABLE;
142 if (GetLastError() == ERROR_ACCESS_DENIED) {
143 pipeHandle = CreateFileW(name,
144 GENERIC_WRITE | FILE_READ_ATTRIBUTES,
148 FILE_FLAG_OVERLAPPED,
151 if (pipeHandle != INVALID_HANDLE_VALUE) {
152 *duplex_flags = UV_HANDLE_WRITABLE;
157 return INVALID_HANDLE_VALUE;
161 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
162 char* name, size_t nameSize) {
165 char* ptr = (char*)handle;
168 uv_unique_pipe_name(ptr, name, nameSize);
170 pipeHandle = CreateNamedPipeA(name,
171 access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
172 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
175 if (pipeHandle != INVALID_HANDLE_VALUE) {
176 /* No name collisions. We're done. */
180 err = GetLastError();
181 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
185 /* Pipe name collision. Increment the pointer and try again. */
189 if (CreateIoCompletionPort(pipeHandle,
193 err = GetLastError();
197 uv_pipe_connection_init(handle);
198 handle->handle = pipeHandle;
203 if (pipeHandle != INVALID_HANDLE_VALUE) {
204 CloseHandle(pipeHandle);
211 static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
212 HANDLE pipeHandle, DWORD duplex_flags) {
214 IO_STATUS_BLOCK io_status;
215 FILE_MODE_INFORMATION mode_info;
216 DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
218 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
219 /* If this returns ERROR_INVALID_PARAMETER we probably opened something */
220 /* that is not a pipe. */
221 if (GetLastError() == ERROR_INVALID_PARAMETER) {
222 SetLastError(WSAENOTSOCK);
227 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
228 nt_status = pNtQueryInformationFile(pipeHandle,
232 FileModeInformation);
233 if (nt_status != STATUS_SUCCESS) {
237 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
238 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
239 /* Non-overlapped pipe. */
240 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
242 /* Overlapped pipe. Try to associate with IOCP. */
243 if (CreateIoCompletionPort(pipeHandle,
247 handle->flags |= UV_HANDLE_EMULATE_IOCP;
251 handle->handle = pipeHandle;
252 handle->flags |= duplex_flags;
258 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
263 req = (uv_shutdown_t*) parameter;
265 handle = (uv_pipe_t*) req->handle;
270 FlushFileBuffers(handle->handle);
273 POST_COMPLETION_FOR_REQ(loop, req);
279 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
284 IO_STATUS_BLOCK io_status;
285 FILE_PIPE_LOCAL_INFORMATION pipe_info;
287 if ((handle->flags & UV_HANDLE_CONNECTION) &&
288 handle->shutdown_req != NULL &&
289 handle->write_reqs_pending == 0) {
290 req = handle->shutdown_req;
292 /* Clear the shutdown_req field so we don't go here again. */
293 handle->shutdown_req = NULL;
295 if (handle->flags & UV__HANDLE_CLOSING) {
296 UNREGISTER_HANDLE_REQ(loop, handle, req);
298 /* Already closing. Cancel the shutdown. */
300 req->cb(req, UV_ECANCELED);
303 DECREASE_PENDING_REQ_COUNT(handle);
307 /* Try to avoid flushing the pipe buffer in the thread pool. */
308 nt_status = pNtQueryInformationFile(handle->handle,
312 FilePipeLocalInformation);
314 if (nt_status != STATUS_SUCCESS) {
316 UNREGISTER_HANDLE_REQ(loop, handle, req);
318 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
320 err = pRtlNtStatusToDosError(nt_status);
321 req->cb(req, uv_translate_sys_error(err));
324 DECREASE_PENDING_REQ_COUNT(handle);
328 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
329 /* Short-circuit, no need to call FlushFileBuffers. */
330 uv_insert_pending_req(loop, (uv_req_t*) req);
334 /* Run FlushFileBuffers in the thread pool. */
335 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
337 WT_EXECUTELONGFUNCTION);
343 UNREGISTER_HANDLE_REQ(loop, handle, req);
345 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
347 err = GetLastError();
348 req->cb(req, uv_translate_sys_error(err));
351 DECREASE_PENDING_REQ_COUNT(handle);
356 if (handle->flags & UV__HANDLE_CLOSING &&
357 handle->reqs_pending == 0) {
358 assert(!(handle->flags & UV_HANDLE_CLOSED));
360 if (handle->flags & UV_HANDLE_CONNECTION) {
361 if (handle->pending_ipc_info.socket_info) {
362 free(handle->pending_ipc_info.socket_info);
363 handle->pending_ipc_info.socket_info = NULL;
366 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
367 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
368 UnregisterWait(handle->read_req.wait_handle);
369 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
371 if (handle->read_req.event_handle) {
372 CloseHandle(handle->read_req.event_handle);
373 handle->read_req.event_handle = NULL;
378 if (handle->flags & UV_HANDLE_PIPESERVER) {
379 assert(handle->accept_reqs);
380 free(handle->accept_reqs);
381 handle->accept_reqs = NULL;
384 uv__handle_close(handle);
389 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
390 handle->pending_instances = count;
391 handle->flags |= UV_HANDLE_PIPESERVER;
395 /* Creates a pipe server. */
396 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
397 uv_loop_t* loop = handle->loop;
398 int i, err, nameSize;
399 uv_pipe_accept_t* req;
401 if (handle->flags & UV_HANDLE_BOUND) {
409 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
410 handle->pending_instances = default_pending_pipe_instances;
413 handle->accept_reqs = (uv_pipe_accept_t*)
414 malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
415 if (!handle->accept_reqs) {
416 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
419 for (i = 0; i < handle->pending_instances; i++) {
420 req = &handle->accept_reqs[i];
421 uv_req_init(loop, (uv_req_t*) req);
422 req->type = UV_ACCEPT;
424 req->pipeHandle = INVALID_HANDLE_VALUE;
425 req->next_pending = NULL;
428 /* Convert name to UTF16. */
429 nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
430 handle->name = (WCHAR*)malloc(nameSize);
432 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
435 if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
436 return uv_translate_sys_error(GetLastError());
440 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
441 * If this fails then there's already a pipe server for the given pipe name.
443 handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
444 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
445 FILE_FLAG_FIRST_PIPE_INSTANCE,
446 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
447 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
449 if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
450 err = GetLastError();
451 if (err == ERROR_ACCESS_DENIED) {
452 err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
453 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
454 err = WSAEACCES; /* Translates to UV_EACCES. */
459 if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
460 err = GetLastError();
464 handle->pending_accepts = NULL;
465 handle->flags |= UV_HANDLE_PIPESERVER;
466 handle->flags |= UV_HANDLE_BOUND;
476 if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
477 CloseHandle(handle->accept_reqs[0].pipeHandle);
478 handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
481 return uv_translate_sys_error(err);
485 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
489 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
492 req = (uv_connect_t*) parameter;
494 handle = (uv_pipe_t*) req->handle;
499 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
500 /* We wait for the pipe to become available with WaitNamedPipe. */
501 while (WaitNamedPipeW(handle->name, 30000)) {
502 /* The pipe is now available, try to connect. */
503 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
504 if (pipeHandle != INVALID_HANDLE_VALUE) {
511 if (pipeHandle != INVALID_HANDLE_VALUE &&
512 !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
513 SET_REQ_SUCCESS(req);
515 SET_REQ_ERROR(req, GetLastError());
519 POST_COMPLETION_FOR_REQ(loop, req);
525 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
526 const char* name, uv_connect_cb cb) {
527 uv_loop_t* loop = handle->loop;
529 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
532 uv_req_init(loop, (uv_req_t*) req);
533 req->type = UV_CONNECT;
534 req->handle = (uv_stream_t*) handle;
537 /* Convert name to UTF16. */
538 nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
539 handle->name = (WCHAR*)malloc(nameSize);
541 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
544 if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
545 err = GetLastError();
549 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
550 if (pipeHandle == INVALID_HANDLE_VALUE) {
551 if (GetLastError() == ERROR_PIPE_BUSY) {
552 /* Wait for the server to make a pipe instance available. */
553 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
555 WT_EXECUTELONGFUNCTION)) {
556 err = GetLastError();
560 REGISTER_HANDLE_REQ(loop, handle, req);
561 handle->reqs_pending++;
566 err = GetLastError();
570 assert(pipeHandle != INVALID_HANDLE_VALUE);
572 if (uv_set_pipe_handle(loop,
573 (uv_pipe_t*) req->handle,
576 err = GetLastError();
580 SET_REQ_SUCCESS(req);
581 uv_insert_pending_req(loop, (uv_req_t*) req);
582 handle->reqs_pending++;
583 REGISTER_HANDLE_REQ(loop, handle, req);
592 if (pipeHandle != INVALID_HANDLE_VALUE) {
593 CloseHandle(pipeHandle);
596 /* Make this req pending reporting an error. */
597 SET_REQ_ERROR(req, err);
598 uv_insert_pending_req(loop, (uv_req_t*) req);
599 handle->reqs_pending++;
600 REGISTER_HANDLE_REQ(loop, handle, req);
605 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
607 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
616 if (handle->flags & UV_HANDLE_PIPESERVER) {
617 for (i = 0; i < handle->pending_instances; i++) {
618 pipeHandle = handle->accept_reqs[i].pipeHandle;
619 if (pipeHandle != INVALID_HANDLE_VALUE) {
620 CloseHandle(pipeHandle);
621 handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
626 if (handle->flags & UV_HANDLE_CONNECTION) {
627 handle->flags &= ~UV_HANDLE_WRITABLE;
628 eof_timer_destroy(handle);
631 if ((handle->flags & UV_HANDLE_CONNECTION)
632 && handle->handle != INVALID_HANDLE_VALUE) {
633 CloseHandle(handle->handle);
634 handle->handle = INVALID_HANDLE_VALUE;
639 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
640 if (handle->flags & UV_HANDLE_READING) {
641 handle->flags &= ~UV_HANDLE_READING;
642 DECREASE_ACTIVE_COUNT(loop, handle);
645 if (handle->flags & UV_HANDLE_LISTENING) {
646 handle->flags &= ~UV_HANDLE_LISTENING;
647 DECREASE_ACTIVE_COUNT(loop, handle);
650 uv_pipe_cleanup(loop, handle);
652 if (handle->reqs_pending == 0) {
653 uv_want_endgame(loop, (uv_handle_t*) handle);
656 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
657 uv__handle_closing(handle);
661 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
662 uv_pipe_accept_t* req, BOOL firstInstance) {
663 assert(handle->flags & UV_HANDLE_LISTENING);
665 if (!firstInstance) {
666 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
668 req->pipeHandle = CreateNamedPipeW(handle->name,
669 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
670 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
671 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
673 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
674 SET_REQ_ERROR(req, GetLastError());
675 uv_insert_pending_req(loop, (uv_req_t*) req);
676 handle->reqs_pending++;
680 if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
681 CloseHandle(req->pipeHandle);
682 req->pipeHandle = INVALID_HANDLE_VALUE;
683 SET_REQ_ERROR(req, GetLastError());
684 uv_insert_pending_req(loop, (uv_req_t*) req);
685 handle->reqs_pending++;
690 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
692 /* Prepare the overlapped structure. */
693 memset(&(req->overlapped), 0, sizeof(req->overlapped));
695 if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
696 GetLastError() != ERROR_IO_PENDING) {
697 if (GetLastError() == ERROR_PIPE_CONNECTED) {
698 SET_REQ_SUCCESS(req);
700 CloseHandle(req->pipeHandle);
701 req->pipeHandle = INVALID_HANDLE_VALUE;
702 /* Make this req pending reporting an error. */
703 SET_REQ_ERROR(req, GetLastError());
705 uv_insert_pending_req(loop, (uv_req_t*) req);
706 handle->reqs_pending++;
710 handle->reqs_pending++;
714 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
715 uv_loop_t* loop = server->loop;
716 uv_pipe_t* pipe_client;
717 uv_pipe_accept_t* req;
720 if (!server->pending_ipc_info.socket_info) {
721 /* No valid pending sockets. */
722 return WSAEWOULDBLOCK;
725 return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
726 server->pending_ipc_info.tcp_connection);
728 pipe_client = (uv_pipe_t*)client;
730 /* Find a connection instance that has been connected, but not yet */
732 req = server->pending_accepts;
735 /* No valid connections found, so we error out. */
736 return WSAEWOULDBLOCK;
739 /* Initialize the client handle and copy the pipeHandle to the client */
740 uv_pipe_connection_init(pipe_client);
741 pipe_client->handle = req->pipeHandle;
742 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
744 /* Prepare the req to pick up a new connection */
745 server->pending_accepts = req->next_pending;
746 req->next_pending = NULL;
747 req->pipeHandle = INVALID_HANDLE_VALUE;
749 if (!(server->flags & UV__HANDLE_CLOSING)) {
750 uv_pipe_queue_accept(loop, server, req, FALSE);
758 /* Starts listening for connections for the given pipe. */
759 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
760 uv_loop_t* loop = handle->loop;
763 if (handle->flags & UV_HANDLE_LISTENING) {
764 handle->connection_cb = cb;
767 if (!(handle->flags & UV_HANDLE_BOUND)) {
771 if (handle->flags & UV_HANDLE_READING) {
775 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
776 return ERROR_NOT_SUPPORTED;
779 handle->flags |= UV_HANDLE_LISTENING;
780 INCREASE_ACTIVE_COUNT(loop, handle);
781 handle->connection_cb = cb;
783 /* First pipe handle should have already been created in uv_pipe_bind */
784 assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
786 for (i = 0; i < handle->pending_instances; i++) {
787 uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
794 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
797 uv_read_t* req = (uv_read_t*) parameter;
798 uv_pipe_t* handle = (uv_pipe_t*) req->data;
799 uv_loop_t* loop = handle->loop;
802 assert(req->type == UV_READ);
803 assert(handle->type == UV_NAMED_PIPE);
805 result = ReadFile(handle->handle,
812 SET_REQ_ERROR(req, GetLastError());
815 POST_COMPLETION_FOR_REQ(loop, req);
820 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
823 uv_write_t* req = (uv_write_t*) parameter;
824 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
825 uv_loop_t* loop = handle->loop;
828 assert(req->type == UV_WRITE);
829 assert(handle->type == UV_NAMED_PIPE);
830 assert(req->write_buffer.base);
832 result = WriteFile(handle->handle,
833 req->write_buffer.base,
834 req->write_buffer.len,
839 SET_REQ_ERROR(req, GetLastError());
842 POST_COMPLETION_FOR_REQ(loop, req);
847 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
851 req = (uv_read_t*) context;
853 handle = (uv_tcp_t*)req->data;
854 assert(handle != NULL);
857 if (!PostQueuedCompletionStatus(handle->loop->iocp,
858 req->overlapped.InternalHigh,
861 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
866 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
870 req = (uv_write_t*) context;
872 handle = (uv_tcp_t*)req->handle;
873 assert(handle != NULL);
876 if (!PostQueuedCompletionStatus(handle->loop->iocp,
877 req->overlapped.InternalHigh,
880 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
885 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
889 assert(handle->flags & UV_HANDLE_READING);
890 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
892 assert(handle->handle != INVALID_HANDLE_VALUE);
894 req = &handle->read_req;
896 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
897 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
899 WT_EXECUTELONGFUNCTION)) {
900 /* Make this req pending reporting an error. */
901 SET_REQ_ERROR(req, GetLastError());
905 memset(&req->overlapped, 0, sizeof(req->overlapped));
906 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
907 req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
911 result = ReadFile(handle->handle,
917 if (!result && GetLastError() != ERROR_IO_PENDING) {
918 /* Make this req pending reporting an error. */
919 SET_REQ_ERROR(req, GetLastError());
923 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
924 if (!req->event_handle) {
925 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
926 if (!req->event_handle) {
927 uv_fatal_error(GetLastError(), "CreateEvent");
930 if (req->wait_handle == INVALID_HANDLE_VALUE) {
931 if (!RegisterWaitForSingleObject(&req->wait_handle,
932 req->overlapped.hEvent, post_completion_read_wait, (void*) req,
933 INFINITE, WT_EXECUTEINWAITTHREAD)) {
934 SET_REQ_ERROR(req, GetLastError());
941 /* Start the eof timer if there is one */
942 eof_timer_start(handle);
943 handle->flags |= UV_HANDLE_READ_PENDING;
944 handle->reqs_pending++;
948 uv_insert_pending_req(loop, (uv_req_t*)req);
949 handle->flags |= UV_HANDLE_READ_PENDING;
950 handle->reqs_pending++;
954 static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
955 uv_read_cb read_cb, uv_read2_cb read2_cb) {
956 uv_loop_t* loop = handle->loop;
958 handle->flags |= UV_HANDLE_READING;
959 INCREASE_ACTIVE_COUNT(loop, handle);
960 handle->read_cb = read_cb;
961 handle->read2_cb = read2_cb;
962 handle->alloc_cb = alloc_cb;
964 /* If reading was stopped and then started again, there could still be a */
965 /* read request pending. */
966 if (!(handle->flags & UV_HANDLE_READ_PENDING))
967 uv_pipe_queue_read(loop, handle);
973 int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
974 uv_read_cb read_cb) {
975 return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
979 int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
980 uv_read2_cb read_cb) {
981 return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
985 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
987 req->next_req = NULL;
988 if (handle->non_overlapped_writes_tail) {
990 handle->non_overlapped_writes_tail->next_req;
991 handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
992 handle->non_overlapped_writes_tail = req;
994 req->next_req = (uv_req_t*)req;
995 handle->non_overlapped_writes_tail = req;
1000 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1003 if (handle->non_overlapped_writes_tail) {
1004 req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
1006 if (req == handle->non_overlapped_writes_tail) {
1007 handle->non_overlapped_writes_tail = NULL;
1009 handle->non_overlapped_writes_tail->next_req =
1021 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1022 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1024 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1026 WT_EXECUTELONGFUNCTION)) {
1027 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1033 static int uv_pipe_write_impl(uv_loop_t* loop,
1036 const uv_buf_t bufs[],
1038 uv_stream_t* send_handle,
1042 uv_tcp_t* tcp_send_handle;
1043 uv_write_t* ipc_header_req;
1044 uv_ipc_frame_uv_stream ipc_frame;
1046 if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
1047 return ERROR_NOT_SUPPORTED;
1050 /* Only TCP handles are supported for sharing. */
1051 if (send_handle && ((send_handle->type != UV_TCP) ||
1052 (!(send_handle->flags & UV_HANDLE_BOUND) &&
1053 !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
1054 return ERROR_NOT_SUPPORTED;
1057 assert(handle->handle != INVALID_HANDLE_VALUE);
1059 uv_req_init(loop, (uv_req_t*) req);
1060 req->type = UV_WRITE;
1061 req->handle = (uv_stream_t*) handle;
1063 req->ipc_header = 0;
1064 req->event_handle = NULL;
1065 req->wait_handle = INVALID_HANDLE_VALUE;
1066 memset(&req->overlapped, 0, sizeof(req->overlapped));
1069 assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1070 ipc_frame.header.flags = 0;
1072 /* Use the IPC framing protocol. */
1074 tcp_send_handle = (uv_tcp_t*)send_handle;
1076 err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
1077 &ipc_frame.socket_info);
1081 ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
1083 if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
1084 ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
1089 ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1090 ipc_frame.header.raw_data_length = bufs[0].len;
1094 * Use the provided req if we're only doing a single write.
1095 * If we're doing multiple writes, use ipc_header_write_req to do
1096 * the first write, and then use the provided req for the second write.
1098 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1099 ipc_header_req = req;
1102 * Try to use the preallocated write req if it's available.
1103 * Otherwise allocate a new one.
1105 if (handle->ipc_header_write_req.type != UV_WRITE) {
1106 ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
1108 ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
1109 if (!ipc_header_req) {
1110 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1114 uv_req_init(loop, (uv_req_t*) ipc_header_req);
1115 ipc_header_req->type = UV_WRITE;
1116 ipc_header_req->handle = (uv_stream_t*) handle;
1117 ipc_header_req->cb = NULL;
1118 ipc_header_req->ipc_header = 1;
1121 /* Write the header or the whole frame. */
1122 memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
1124 /* Using overlapped IO, but wait for completion before returning.
1125 This write is blocking because ipc_frame is on stack. */
1126 ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1127 if (!ipc_header_req->overlapped.hEvent) {
1128 uv_fatal_error(GetLastError(), "CreateEvent");
1131 result = WriteFile(handle->handle,
1133 ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1134 sizeof(ipc_frame) : sizeof(ipc_frame.header),
1136 &ipc_header_req->overlapped);
1137 if (!result && GetLastError() != ERROR_IO_PENDING) {
1138 err = GetLastError();
1139 CloseHandle(ipc_header_req->overlapped.hEvent);
1144 /* Request not completed immediately. Wait for it.*/
1145 if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
1147 err = GetLastError();
1148 CloseHandle(ipc_header_req->overlapped.hEvent);
1152 ipc_header_req->queued_bytes = 0;
1153 CloseHandle(ipc_header_req->overlapped.hEvent);
1154 ipc_header_req->overlapped.hEvent = NULL;
1156 REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
1157 handle->reqs_pending++;
1158 handle->write_reqs_pending++;
1160 /* If we don't have any raw data to write - we're done. */
1161 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1166 if ((handle->flags &
1167 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1168 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1170 result = WriteFile(handle->handle,
1179 /* Request completed immediately. */
1180 req->queued_bytes = 0;
1183 REGISTER_HANDLE_REQ(loop, handle, req);
1184 handle->reqs_pending++;
1185 handle->write_reqs_pending++;
1186 POST_COMPLETION_FOR_REQ(loop, req);
1188 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1189 req->write_buffer = bufs[0];
1190 uv_insert_non_overlapped_write_req(handle, req);
1191 if (handle->write_reqs_pending == 0) {
1192 uv_queue_non_overlapped_write(handle);
1195 /* Request queued by the kernel. */
1196 req->queued_bytes = uv_count_bufs(bufs, nbufs);
1197 handle->write_queue_size += req->queued_bytes;
1198 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1199 /* Using overlapped IO, but wait for completion before returning */
1200 req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1201 if (!req->overlapped.hEvent) {
1202 uv_fatal_error(GetLastError(), "CreateEvent");
1205 result = WriteFile(handle->handle,
1211 if (!result && GetLastError() != ERROR_IO_PENDING) {
1212 err = GetLastError();
1213 CloseHandle(req->overlapped.hEvent);
1218 /* Request completed immediately. */
1219 req->queued_bytes = 0;
1221 /* Request queued by the kernel. */
1222 if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
1224 err = GetLastError();
1225 CloseHandle(ipc_header_req->overlapped.hEvent);
1226 return uv_translate_sys_error(err);
1229 CloseHandle(req->overlapped.hEvent);
1231 REGISTER_HANDLE_REQ(loop, handle, req);
1232 handle->reqs_pending++;
1233 handle->write_reqs_pending++;
1234 POST_COMPLETION_FOR_REQ(loop, req);
1237 result = WriteFile(handle->handle,
1243 if (!result && GetLastError() != ERROR_IO_PENDING) {
1244 return GetLastError();
1248 /* Request completed immediately. */
1249 req->queued_bytes = 0;
1251 /* Request queued by the kernel. */
1252 req->queued_bytes = uv_count_bufs(bufs, nbufs);
1253 handle->write_queue_size += req->queued_bytes;
1256 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1257 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1258 if (!req->event_handle) {
1259 uv_fatal_error(GetLastError(), "CreateEvent");
1261 if (!RegisterWaitForSingleObject(&req->wait_handle,
1262 req->overlapped.hEvent, post_completion_write_wait, (void*) req,
1263 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1264 return GetLastError();
1269 REGISTER_HANDLE_REQ(loop, handle, req);
1270 handle->reqs_pending++;
1271 handle->write_reqs_pending++;
1277 int uv_pipe_write(uv_loop_t* loop,
1280 const uv_buf_t bufs[],
1283 return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
1287 int uv_pipe_write2(uv_loop_t* loop,
1290 const uv_buf_t bufs[],
1292 uv_stream_t* send_handle,
1298 return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
1302 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1304 /* If there is an eof timer running, we don't need it any more, */
1305 /* so discard it. */
1306 eof_timer_destroy(handle);
1308 handle->flags &= ~UV_HANDLE_READABLE;
1309 uv_read_stop((uv_stream_t*) handle);
1311 if (handle->read2_cb) {
1312 handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE);
1314 handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
1319 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1321 /* If there is an eof timer running, we don't need it any more, */
1322 /* so discard it. */
1323 eof_timer_destroy(handle);
1325 uv_read_stop((uv_stream_t*) handle);
1327 if (handle->read2_cb) {
1328 handle->read2_cb(handle,
1329 uv_translate_sys_error(error),
1333 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1338 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1339 int error, uv_buf_t buf) {
1340 if (error == ERROR_BROKEN_PIPE) {
1341 uv_pipe_read_eof(loop, handle, buf);
1343 uv_pipe_read_error(loop, handle, error, buf);
1348 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1352 uv_ipc_frame_uv_stream ipc_frame;
1354 assert(handle->type == UV_NAMED_PIPE);
1356 handle->flags &= ~UV_HANDLE_READ_PENDING;
1357 eof_timer_stop(handle);
1359 if (!REQ_SUCCESS(req)) {
1360 /* An error occurred doing the 0-read. */
1361 if (handle->flags & UV_HANDLE_READING) {
1362 uv_pipe_read_error_or_eof(loop,
1368 /* Do non-blocking reads until the buffer is empty */
1369 while (handle->flags & UV_HANDLE_READING) {
1370 if (!PeekNamedPipe(handle->handle,
1376 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1381 /* There is nothing to read after all. */
1386 /* Use the IPC framing protocol to read the incoming data. */
1387 if (handle->remaining_ipc_rawdata_bytes == 0) {
1388 /* We're reading a new frame. First, read the header. */
1389 assert(avail >= sizeof(ipc_frame.header));
1391 if (!ReadFile(handle->handle,
1393 sizeof(ipc_frame.header),
1396 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1401 assert(bytes == sizeof(ipc_frame.header));
1402 assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
1403 UV_IPC_TCP_CONNECTION));
1405 if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
1406 assert(avail - sizeof(ipc_frame.header) >=
1407 sizeof(ipc_frame.socket_info));
1409 /* Read the TCP socket info. */
1410 if (!ReadFile(handle->handle,
1411 &ipc_frame.socket_info,
1412 sizeof(ipc_frame) - sizeof(ipc_frame.header),
1415 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1420 assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1422 /* Store the pending socket info. */
1423 assert(!handle->pending_ipc_info.socket_info);
1424 handle->pending_ipc_info.socket_info =
1425 (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
1426 if (!handle->pending_ipc_info.socket_info) {
1427 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1430 *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
1431 handle->pending_ipc_info.tcp_connection =
1432 ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
1435 if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1436 handle->remaining_ipc_rawdata_bytes =
1437 ipc_frame.header.raw_data_length;
1441 avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
1445 handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
1447 if (handle->read2_cb) {
1448 handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
1449 } else if (handle->read_cb) {
1450 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1454 assert(buf.base != NULL);
1456 if (ReadFile(handle->handle,
1461 /* Successful read */
1463 assert(handle->remaining_ipc_rawdata_bytes >= bytes);
1464 handle->remaining_ipc_rawdata_bytes =
1465 handle->remaining_ipc_rawdata_bytes - bytes;
1466 if (handle->read2_cb) {
1467 handle->read2_cb(handle, bytes, &buf,
1468 handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
1469 } else if (handle->read_cb) {
1470 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1473 if (handle->pending_ipc_info.socket_info) {
1474 free(handle->pending_ipc_info.socket_info);
1475 handle->pending_ipc_info.socket_info = NULL;
1478 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1481 /* Read again only if bytes == buf.len */
1482 if (bytes <= buf.len) {
1486 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1491 /* Post another 0-read if still reading and not closing. */
1492 if ((handle->flags & UV_HANDLE_READING) &&
1493 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1494 uv_pipe_queue_read(loop, handle);
1498 DECREASE_PENDING_REQ_COUNT(handle);
1502 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1506 assert(handle->type == UV_NAMED_PIPE);
1508 assert(handle->write_queue_size >= req->queued_bytes);
1509 handle->write_queue_size -= req->queued_bytes;
1511 UNREGISTER_HANDLE_REQ(loop, handle, req);
1513 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1514 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1515 UnregisterWait(req->wait_handle);
1516 req->wait_handle = INVALID_HANDLE_VALUE;
1518 if (req->event_handle) {
1519 CloseHandle(req->event_handle);
1520 req->event_handle = NULL;
1524 if (req->ipc_header) {
1525 if (req == &handle->ipc_header_write_req) {
1526 req->type = UV_UNKNOWN_REQ;
1532 err = GET_REQ_ERROR(req);
1533 req->cb(req, uv_translate_sys_error(err));
1537 handle->write_reqs_pending--;
1539 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1540 handle->non_overlapped_writes_tail) {
1541 assert(handle->write_reqs_pending > 0);
1542 uv_queue_non_overlapped_write(handle);
1545 if (handle->shutdown_req != NULL &&
1546 handle->write_reqs_pending == 0) {
1547 uv_want_endgame(loop, (uv_handle_t*)handle);
1550 DECREASE_PENDING_REQ_COUNT(handle);
1554 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1555 uv_req_t* raw_req) {
1556 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1558 assert(handle->type == UV_NAMED_PIPE);
1560 if (REQ_SUCCESS(req)) {
1561 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1562 req->next_pending = handle->pending_accepts;
1563 handle->pending_accepts = req;
1565 if (handle->connection_cb) {
1566 handle->connection_cb((uv_stream_t*)handle, 0);
1569 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1570 CloseHandle(req->pipeHandle);
1571 req->pipeHandle = INVALID_HANDLE_VALUE;
1573 if (!(handle->flags & UV__HANDLE_CLOSING)) {
1574 uv_pipe_queue_accept(loop, handle, req, FALSE);
1578 DECREASE_PENDING_REQ_COUNT(handle);
1582 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1583 uv_connect_t* req) {
1586 assert(handle->type == UV_NAMED_PIPE);
1588 UNREGISTER_HANDLE_REQ(loop, handle, req);
1592 if (REQ_SUCCESS(req)) {
1593 uv_pipe_connection_init(handle);
1595 err = GET_REQ_ERROR(req);
1597 req->cb(req, uv_translate_sys_error(err));
1600 DECREASE_PENDING_REQ_COUNT(handle);
1604 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1605 uv_shutdown_t* req) {
1606 assert(handle->type == UV_NAMED_PIPE);
1608 UNREGISTER_HANDLE_REQ(loop, handle, req);
1610 if (handle->flags & UV_HANDLE_READABLE) {
1611 /* Initialize and optionally start the eof timer. Only do this if the */
1612 /* pipe is readable and we haven't seen EOF come in ourselves. */
1613 eof_timer_init(handle);
1615 /* If reading start the timer right now. */
1616 /* Otherwise uv_pipe_queue_read will start it. */
1617 if (handle->flags & UV_HANDLE_READ_PENDING) {
1618 eof_timer_start(handle);
1622 /* This pipe is not readable. We can just close it to let the other end */
1623 /* know that we're done writing. */
1624 CloseHandle(handle->handle);
1625 handle->handle = INVALID_HANDLE_VALUE;
1632 DECREASE_PENDING_REQ_COUNT(handle);
1636 static void eof_timer_init(uv_pipe_t* pipe) {
1639 assert(pipe->eof_timer == NULL);
1640 assert(pipe->flags & UV_HANDLE_CONNECTION);
1642 pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
1644 r = uv_timer_init(pipe->loop, pipe->eof_timer);
1645 assert(r == 0); /* timers can't fail */
1646 pipe->eof_timer->data = pipe;
1647 uv_unref((uv_handle_t*) pipe->eof_timer);
1651 static void eof_timer_start(uv_pipe_t* pipe) {
1652 assert(pipe->flags & UV_HANDLE_CONNECTION);
1654 if (pipe->eof_timer != NULL) {
1655 uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
1660 static void eof_timer_stop(uv_pipe_t* pipe) {
1661 assert(pipe->flags & UV_HANDLE_CONNECTION);
1663 if (pipe->eof_timer != NULL) {
1664 uv_timer_stop(pipe->eof_timer);
1669 static void eof_timer_cb(uv_timer_t* timer, int status) {
1670 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1671 uv_loop_t* loop = timer->loop;
1673 assert(status == 0); /* timers can't fail */
1674 assert(pipe->type == UV_NAMED_PIPE);
1676 /* This should always be true, since we start the timer only */
1677 /* in uv_pipe_queue_read after successfully calling ReadFile, */
1678 /* or in uv_process_pipe_shutdown_req if a read is pending, */
1679 /* and we always immediately stop the timer in */
1680 /* uv_process_pipe_read_req. */
1681 assert(pipe->flags & UV_HANDLE_READ_PENDING);
1683 /* If there are many packets coming off the iocp then the timer callback */
1684 /* may be called before the read request is coming off the queue. */
1685 /* Therefore we check here if the read request has completed but will */
1686 /* be processed later. */
1687 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1688 HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
1692 /* Force both ends off the pipe. */
1693 CloseHandle(pipe->handle);
1694 pipe->handle = INVALID_HANDLE_VALUE;
1696 /* Stop reading, so the pending read that is going to fail will */
1697 /* not be reported to the user. */
1698 uv_read_stop((uv_stream_t*) pipe);
1700 /* Report the eof and update flags. This will get reported even if the */
1701 /* user stopped reading in the meantime. TODO: is that okay? */
1702 uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1706 static void eof_timer_destroy(uv_pipe_t* pipe) {
1707 assert(pipe->flags && UV_HANDLE_CONNECTION);
1709 if (pipe->eof_timer) {
1710 uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
1711 pipe->eof_timer = NULL;
1716 static void eof_timer_close_cb(uv_handle_t* handle) {
1717 assert(handle->type == UV_TIMER);
1722 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1723 HANDLE os_handle = (HANDLE)_get_osfhandle(file);
1725 if (os_handle == INVALID_HANDLE_VALUE ||
1726 uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
1730 uv_pipe_connection_init(pipe);
1731 pipe->handle = os_handle;
1732 pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1735 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1736 pipe->ipc_pid = uv_parent_pid();
1737 assert(pipe->ipc_pid != -1);