1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
28 #include "handle-inl.h"
31 #include "stream-inl.h"
32 #include "uv-common.h"
38 /* A zero-size buffer for use by uv_pipe_read */
39 static char uv_zero_[] = "";
42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
44 /* The timeout that the pipe will wait for the remote end to write data when
45 * the local ends wants to shut it down. */
46 static const int64_t eof_timeout = 50; /* ms */
48 static const int default_pending_pipe_instances = 4;
51 static char pipe_prefix[] = "\\\\?\\pipe";
52 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
54 /* IPC incoming xfer queue item. */
56 uv__ipc_socket_xfer_type_t xfer_type;
57 uv__ipc_socket_xfer_info_t xfer_info;
59 } uv__ipc_xfer_queue_item_t;
61 /* IPC frame header flags. */
62 /* clang-format off */
64 UV__IPC_FRAME_HAS_DATA = 0x01,
65 UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
66 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
67 /* These are combinations of the flags above. */
68 UV__IPC_FRAME_XFER_FLAGS = 0x06,
69 UV__IPC_FRAME_VALID_FLAGS = 0x07
73 /* IPC frame header. */
76 uint32_t reserved1; /* Ignored. */
77 uint32_t data_length; /* Must be zero if there is no data. */
78 uint32_t reserved2; /* Must be zero. */
79 } uv__ipc_frame_header_t;
81 /* To implement the IPC protocol correctly, these structures must have exactly
83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
86 /* Coalesced write request. */
88 uv_write_t req; /* Internal heap-allocated write request. */
89 uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90 } uv__coalesced_write_t;
93 static void eof_timer_init(uv_pipe_t* pipe);
94 static void eof_timer_start(uv_pipe_t* pipe);
95 static void eof_timer_stop(uv_pipe_t* pipe);
96 static void eof_timer_cb(uv_timer_t* timer);
97 static void eof_timer_destroy(uv_pipe_t* pipe);
98 static void eof_timer_close_cb(uv_handle_t* handle);
101 static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
102 snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
106 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
107 uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
109 handle->reqs_pending = 0;
110 handle->handle = INVALID_HANDLE_VALUE;
112 handle->pipe.conn.ipc_remote_pid = 0;
113 handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
114 QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
115 handle->pipe.conn.ipc_xfer_queue_length = 0;
117 handle->pipe.conn.non_overlapped_writes_tail = NULL;
123 static void uv__pipe_connection_init(uv_pipe_t* handle) {
124 assert(!(handle->flags & UV_HANDLE_PIPESERVER));
125 uv__connection_init((uv_stream_t*) handle);
126 handle->read_req.data = handle;
127 handle->pipe.conn.eof_timer = NULL;
131 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
135 * Assume that we have a duplex pipe first, so attempt to
136 * connect with GENERIC_READ | GENERIC_WRITE.
138 pipeHandle = CreateFileW(name,
139 GENERIC_READ | GENERIC_WRITE,
143 FILE_FLAG_OVERLAPPED,
145 if (pipeHandle != INVALID_HANDLE_VALUE) {
146 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
151 * If the pipe is not duplex CreateFileW fails with
152 * ERROR_ACCESS_DENIED. In that case try to connect
153 * as a read-only or write-only.
155 if (GetLastError() == ERROR_ACCESS_DENIED) {
156 pipeHandle = CreateFileW(name,
157 GENERIC_READ | FILE_WRITE_ATTRIBUTES,
161 FILE_FLAG_OVERLAPPED,
164 if (pipeHandle != INVALID_HANDLE_VALUE) {
165 *duplex_flags = UV_HANDLE_READABLE;
170 if (GetLastError() == ERROR_ACCESS_DENIED) {
171 pipeHandle = CreateFileW(name,
172 GENERIC_WRITE | FILE_READ_ATTRIBUTES,
176 FILE_FLAG_OVERLAPPED,
179 if (pipeHandle != INVALID_HANDLE_VALUE) {
180 *duplex_flags = UV_HANDLE_WRITABLE;
185 return INVALID_HANDLE_VALUE;
189 static void close_pipe(uv_pipe_t* pipe) {
190 assert(pipe->u.fd == -1 || pipe->u.fd > 2);
191 if (pipe->u.fd == -1)
192 CloseHandle(pipe->handle);
197 pipe->handle = INVALID_HANDLE_VALUE;
201 static int uv__pipe_server(
202 HANDLE* pipeHandle_ptr, DWORD access,
203 char* name, size_t nameSize, char* random) {
208 uv__unique_pipe_name(random, name, nameSize);
210 pipeHandle = CreateNamedPipeA(name,
211 access | FILE_FLAG_FIRST_PIPE_INSTANCE,
212 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
215 if (pipeHandle != INVALID_HANDLE_VALUE) {
216 /* No name collisions. We're done. */
220 err = GetLastError();
221 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
225 /* Pipe name collision. Increment the random number and try again. */
229 *pipeHandle_ptr = pipeHandle;
234 if (pipeHandle != INVALID_HANDLE_VALUE)
235 CloseHandle(pipeHandle);
241 static int uv__create_pipe_pair(
242 HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
243 unsigned int server_flags, unsigned int client_flags,
244 int inherit_client, char* random) {
245 /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
247 SECURITY_ATTRIBUTES sa;
254 server_pipe = INVALID_HANDLE_VALUE;
255 client_pipe = INVALID_HANDLE_VALUE;
258 if (server_flags & UV_READABLE_PIPE)
259 server_access |= PIPE_ACCESS_INBOUND;
260 if (server_flags & UV_WRITABLE_PIPE)
261 server_access |= PIPE_ACCESS_OUTBOUND;
262 if (server_flags & UV_NONBLOCK_PIPE)
263 server_access |= FILE_FLAG_OVERLAPPED;
264 server_access |= WRITE_DAC;
267 if (client_flags & UV_READABLE_PIPE)
268 client_access |= GENERIC_READ;
270 client_access |= FILE_READ_ATTRIBUTES;
271 if (client_flags & UV_WRITABLE_PIPE)
272 client_access |= GENERIC_WRITE;
274 client_access |= FILE_WRITE_ATTRIBUTES;
275 client_access |= WRITE_DAC;
277 /* Create server pipe handle. */
278 err = uv__pipe_server(&server_pipe,
286 /* Create client pipe handle. */
287 sa.nLength = sizeof sa;
288 sa.lpSecurityDescriptor = NULL;
289 sa.bInheritHandle = inherit_client;
291 client_pipe = CreateFileA(pipe_name,
296 (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
298 if (client_pipe == INVALID_HANDLE_VALUE) {
299 err = GetLastError();
304 /* Validate that the pipe was opened in the right mode. */
308 r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
310 assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
312 fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
317 /* Do a blocking ConnectNamedPipe. This should not block because we have
318 * both ends of the pipe created. */
319 if (!ConnectNamedPipe(server_pipe, NULL)) {
320 if (GetLastError() != ERROR_PIPE_CONNECTED) {
321 err = GetLastError();
326 *client_pipe_ptr = client_pipe;
327 *server_pipe_ptr = server_pipe;
331 if (server_pipe != INVALID_HANDLE_VALUE)
332 CloseHandle(server_pipe);
334 if (client_pipe != INVALID_HANDLE_VALUE)
335 CloseHandle(client_pipe);
341 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
347 /* Make the server side the inbound (read) end, */
348 /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
349 /* TODO: better source of local randomness than &fds? */
350 read_flags |= UV_READABLE_PIPE;
351 write_flags |= UV_WRITABLE_PIPE;
352 err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
355 temp[0] = _open_osfhandle((intptr_t) readh, 0);
357 if (errno == UV_EMFILE)
365 temp[1] = _open_osfhandle((intptr_t) writeh, 0);
367 if (errno == UV_EMFILE)
381 int uv__create_stdio_pipe_pair(uv_loop_t* loop,
382 uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
383 /* The parent_pipe is always the server_pipe and kept by libuv.
384 * The child_pipe is always the client_pipe and is passed to the child.
385 * The flags are specified with respect to their usage in the child. */
388 unsigned int server_flags;
389 unsigned int client_flags;
392 uv__pipe_connection_init(parent_pipe);
394 server_pipe = INVALID_HANDLE_VALUE;
395 client_pipe = INVALID_HANDLE_VALUE;
399 if (flags & UV_READABLE_PIPE) {
400 /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
401 * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
402 * the state of the write buffer when we're trying to shutdown the pipe. */
403 server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
404 client_flags |= UV_READABLE_PIPE;
406 if (flags & UV_WRITABLE_PIPE) {
407 server_flags |= UV_READABLE_PIPE;
408 client_flags |= UV_WRITABLE_PIPE;
410 server_flags |= UV_NONBLOCK_PIPE;
411 if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
412 client_flags |= UV_NONBLOCK_PIPE;
415 err = uv__create_pipe_pair(&server_pipe, &client_pipe,
416 server_flags, client_flags, 1, (char*) server_pipe);
420 if (CreateIoCompletionPort(server_pipe,
422 (ULONG_PTR) parent_pipe,
424 err = GetLastError();
428 parent_pipe->handle = server_pipe;
429 *child_pipe_ptr = client_pipe;
431 /* The server end is now readable and/or writable. */
432 if (flags & UV_READABLE_PIPE)
433 parent_pipe->flags |= UV_HANDLE_WRITABLE;
434 if (flags & UV_WRITABLE_PIPE)
435 parent_pipe->flags |= UV_HANDLE_READABLE;
440 if (server_pipe != INVALID_HANDLE_VALUE)
441 CloseHandle(server_pipe);
443 if (client_pipe != INVALID_HANDLE_VALUE)
444 CloseHandle(client_pipe);
450 static int uv__set_pipe_handle(uv_loop_t* loop,
454 DWORD duplex_flags) {
456 IO_STATUS_BLOCK io_status;
457 FILE_MODE_INFORMATION mode_info;
458 DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
459 DWORD current_mode = 0;
462 assert(handle->flags & UV_HANDLE_CONNECTION);
463 assert(!(handle->flags & UV_HANDLE_PIPESERVER));
464 if (handle->flags & UV_HANDLE_CLOSING)
466 if (handle->handle != INVALID_HANDLE_VALUE)
469 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
470 err = GetLastError();
471 if (err == ERROR_ACCESS_DENIED) {
473 * SetNamedPipeHandleState can fail if the handle doesn't have either
474 * GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
475 * But if the handle already has the desired wait and blocking modes
478 if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
480 return uv_translate_sys_error(GetLastError());
481 } else if (current_mode & PIPE_NOWAIT) {
485 /* If this returns ERROR_INVALID_PARAMETER we probably opened
486 * something that is not a pipe. */
487 if (err == ERROR_INVALID_PARAMETER) {
490 return uv_translate_sys_error(err);
494 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
495 nt_status = pNtQueryInformationFile(pipeHandle,
499 FileModeInformation);
500 if (nt_status != STATUS_SUCCESS) {
501 return uv_translate_sys_error(err);
504 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
505 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
506 /* Non-overlapped pipe. */
507 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
508 handle->pipe.conn.readfile_thread_handle = NULL;
509 InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
511 /* Overlapped pipe. Try to associate with IOCP. */
512 if (CreateIoCompletionPort(pipeHandle,
516 handle->flags |= UV_HANDLE_EMULATE_IOCP;
520 handle->handle = pipeHandle;
522 handle->flags |= duplex_flags;
528 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
529 uv_pipe_accept_t* req, BOOL firstInstance) {
530 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
533 CreateNamedPipeW(handle->name,
534 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
535 (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
536 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
537 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
539 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
543 /* Associate it with IOCP so we can get events. */
544 if (CreateIoCompletionPort(req->pipeHandle,
548 uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
551 /* Stash a handle in the server object for use from places such as
552 * getsockname and chmod. As we transfer ownership of these to client
553 * objects, we'll allocate new ones here. */
554 handle->handle = req->pipeHandle;
560 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
565 req = (uv_shutdown_t*) parameter;
567 handle = (uv_pipe_t*) req->handle;
572 FlushFileBuffers(handle->handle);
575 POST_COMPLETION_FOR_REQ(loop, req);
581 void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
584 IO_STATUS_BLOCK io_status;
585 FILE_PIPE_LOCAL_INFORMATION pipe_info;
587 assert(handle->flags & UV_HANDLE_CONNECTION);
589 assert(handle->stream.conn.write_reqs_pending == 0);
590 SET_REQ_SUCCESS(req);
592 if (handle->flags & UV_HANDLE_CLOSING) {
593 uv__insert_pending_req(loop, (uv_req_t*) req);
597 /* Try to avoid flushing the pipe buffer in the thread pool. */
598 nt_status = pNtQueryInformationFile(handle->handle,
602 FilePipeLocalInformation);
604 if (nt_status != STATUS_SUCCESS) {
605 SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
606 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
607 uv__insert_pending_req(loop, (uv_req_t*) req);
611 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
612 /* Short-circuit, no need to call FlushFileBuffers:
613 * all writes have been read. */
614 uv__insert_pending_req(loop, (uv_req_t*) req);
618 /* Run FlushFileBuffers in the thread pool. */
619 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
621 WT_EXECUTELONGFUNCTION);
623 SET_REQ_ERROR(req, GetLastError());
624 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
625 uv__insert_pending_req(loop, (uv_req_t*) req);
631 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
632 uv__ipc_xfer_queue_item_t* xfer_queue_item;
634 assert(handle->reqs_pending == 0);
635 assert(handle->flags & UV_HANDLE_CLOSING);
636 assert(!(handle->flags & UV_HANDLE_CLOSED));
638 if (handle->flags & UV_HANDLE_CONNECTION) {
639 /* Free pending sockets */
640 while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
644 q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
646 xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
648 /* Materialize socket and close it */
649 socket = WSASocketW(FROM_PROTOCOL_INFO,
652 &xfer_queue_item->xfer_info.socket_info,
654 WSA_FLAG_OVERLAPPED);
655 uv__free(xfer_queue_item);
657 if (socket != INVALID_SOCKET)
660 handle->pipe.conn.ipc_xfer_queue_length = 0;
662 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
663 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
664 UnregisterWait(handle->read_req.wait_handle);
665 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
667 if (handle->read_req.event_handle != NULL) {
668 CloseHandle(handle->read_req.event_handle);
669 handle->read_req.event_handle = NULL;
673 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
674 DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
677 if (handle->flags & UV_HANDLE_PIPESERVER) {
678 assert(handle->pipe.serv.accept_reqs);
679 uv__free(handle->pipe.serv.accept_reqs);
680 handle->pipe.serv.accept_reqs = NULL;
683 uv__handle_close(handle);
687 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
688 if (handle->flags & UV_HANDLE_BOUND)
690 handle->pipe.serv.pending_instances = count;
691 handle->flags |= UV_HANDLE_PIPESERVER;
695 /* Creates a pipe server. */
696 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
697 uv_loop_t* loop = handle->loop;
698 int i, err, nameSize;
699 uv_pipe_accept_t* req;
701 if (handle->flags & UV_HANDLE_BOUND) {
708 if (uv__is_closing(handle)) {
711 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
712 handle->pipe.serv.pending_instances = default_pending_pipe_instances;
715 handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
716 uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
717 if (!handle->pipe.serv.accept_reqs) {
718 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
721 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
722 req = &handle->pipe.serv.accept_reqs[i];
723 UV_REQ_INIT(req, UV_ACCEPT);
725 req->pipeHandle = INVALID_HANDLE_VALUE;
726 req->next_pending = NULL;
729 /* Convert name to UTF16. */
730 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
731 handle->name = uv__malloc(nameSize);
733 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
736 if (!MultiByteToWideChar(CP_UTF8,
741 nameSize / sizeof(WCHAR))) {
742 err = GetLastError();
747 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
748 * If this fails then there's already a pipe server for the given pipe name.
750 if (!pipe_alloc_accept(loop,
752 &handle->pipe.serv.accept_reqs[0],
754 err = GetLastError();
755 if (err == ERROR_ACCESS_DENIED) {
756 err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
757 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
758 err = WSAEACCES; /* Translates to UV_EACCES. */
763 handle->pipe.serv.pending_accepts = NULL;
764 handle->flags |= UV_HANDLE_PIPESERVER;
765 handle->flags |= UV_HANDLE_BOUND;
771 uv__free(handle->name);
775 return uv_translate_sys_error(err);
779 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
783 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
786 req = (uv_connect_t*) parameter;
788 handle = (uv_pipe_t*) req->handle;
793 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
794 * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
795 while (WaitNamedPipeW(handle->name, 30000)) {
796 /* The pipe is now available, try to connect. */
797 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
798 if (pipeHandle != INVALID_HANDLE_VALUE)
804 if (pipeHandle != INVALID_HANDLE_VALUE) {
805 SET_REQ_SUCCESS(req);
806 req->u.connect.pipeHandle = pipeHandle;
807 req->u.connect.duplex_flags = duplex_flags;
809 SET_REQ_ERROR(req, GetLastError());
813 POST_COMPLETION_FOR_REQ(loop, req);
819 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
820 const char* name, uv_connect_cb cb) {
821 uv_loop_t* loop = handle->loop;
823 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
826 UV_REQ_INIT(req, UV_CONNECT);
827 req->handle = (uv_stream_t*) handle;
829 req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
830 req->u.connect.duplex_flags = 0;
832 if (handle->flags & UV_HANDLE_PIPESERVER) {
833 err = ERROR_INVALID_PARAMETER;
836 if (handle->flags & UV_HANDLE_CONNECTION) {
837 err = ERROR_PIPE_BUSY;
840 uv__pipe_connection_init(handle);
842 /* Convert name to UTF16. */
843 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
844 handle->name = uv__malloc(nameSize);
846 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
849 if (!MultiByteToWideChar(CP_UTF8,
854 nameSize / sizeof(WCHAR))) {
855 err = GetLastError();
859 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
860 if (pipeHandle == INVALID_HANDLE_VALUE) {
861 if (GetLastError() == ERROR_PIPE_BUSY) {
862 /* Wait for the server to make a pipe instance available. */
863 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
865 WT_EXECUTELONGFUNCTION)) {
866 err = GetLastError();
870 REGISTER_HANDLE_REQ(loop, handle, req);
871 handle->reqs_pending++;
876 err = GetLastError();
880 req->u.connect.pipeHandle = pipeHandle;
881 req->u.connect.duplex_flags = duplex_flags;
882 SET_REQ_SUCCESS(req);
883 uv__insert_pending_req(loop, (uv_req_t*) req);
884 handle->reqs_pending++;
885 REGISTER_HANDLE_REQ(loop, handle, req);
890 uv__free(handle->name);
894 if (pipeHandle != INVALID_HANDLE_VALUE)
895 CloseHandle(pipeHandle);
897 /* Make this req pending reporting an error. */
898 SET_REQ_ERROR(req, err);
899 uv__insert_pending_req(loop, (uv_req_t*) req);
900 handle->reqs_pending++;
901 REGISTER_HANDLE_REQ(loop, handle, req);
906 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
909 if (!(handle->flags & UV_HANDLE_READ_PENDING))
910 return; /* No pending reads. */
911 if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
912 return; /* Already cancelled. */
913 if (handle->handle == INVALID_HANDLE_VALUE)
914 return; /* Pipe handle closed. */
916 if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
917 /* Cancel asynchronous read. */
918 r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
919 assert(r || GetLastError() == ERROR_NOT_FOUND);
922 /* Cancel synchronous read (which is happening in the thread pool). */
924 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
926 EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
928 thread = *thread_ptr;
929 if (thread == NULL) {
930 /* The thread pool thread has not yet reached the point of blocking, we
931 * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
932 *thread_ptr = INVALID_HANDLE_VALUE;
935 /* Spin until the thread has acknowledged (by setting the thread to
936 * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
937 while (thread != INVALID_HANDLE_VALUE) {
938 r = CancelSynchronousIo(thread);
939 assert(r || GetLastError() == ERROR_NOT_FOUND);
940 SwitchToThread(); /* Yield thread. */
941 thread = *thread_ptr;
945 LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
948 /* Set flag to indicate that read has been cancelled. */
949 handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
953 void uv__pipe_read_stop(uv_pipe_t* handle) {
954 handle->flags &= ~UV_HANDLE_READING;
955 DECREASE_ACTIVE_COUNT(handle->loop, handle);
956 uv__pipe_interrupt_read(handle);
960 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
962 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
966 if (handle->flags & UV_HANDLE_READING) {
967 handle->flags &= ~UV_HANDLE_READING;
968 DECREASE_ACTIVE_COUNT(loop, handle);
971 if (handle->flags & UV_HANDLE_LISTENING) {
972 handle->flags &= ~UV_HANDLE_LISTENING;
973 DECREASE_ACTIVE_COUNT(loop, handle);
976 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
978 uv__handle_closing(handle);
980 uv__pipe_interrupt_read(handle);
983 uv__free(handle->name);
987 if (handle->flags & UV_HANDLE_PIPESERVER) {
988 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
989 pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
990 if (pipeHandle != INVALID_HANDLE_VALUE) {
991 CloseHandle(pipeHandle);
992 handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
995 handle->handle = INVALID_HANDLE_VALUE;
998 if (handle->flags & UV_HANDLE_CONNECTION) {
999 eof_timer_destroy(handle);
1002 if ((handle->flags & UV_HANDLE_CONNECTION)
1003 && handle->handle != INVALID_HANDLE_VALUE) {
1004 /* This will eventually destroy the write queue for us too. */
1008 if (handle->reqs_pending == 0)
1009 uv__want_endgame(loop, (uv_handle_t*) handle);
1013 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1014 uv_pipe_accept_t* req, BOOL firstInstance) {
1015 assert(handle->flags & UV_HANDLE_LISTENING);
1017 if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1018 SET_REQ_ERROR(req, GetLastError());
1019 uv__insert_pending_req(loop, (uv_req_t*) req);
1020 handle->reqs_pending++;
1024 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1026 /* Prepare the overlapped structure. */
1027 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1029 if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1030 GetLastError() != ERROR_IO_PENDING) {
1031 if (GetLastError() == ERROR_PIPE_CONNECTED) {
1032 SET_REQ_SUCCESS(req);
1034 CloseHandle(req->pipeHandle);
1035 req->pipeHandle = INVALID_HANDLE_VALUE;
1036 /* Make this req pending reporting an error. */
1037 SET_REQ_ERROR(req, GetLastError());
1039 uv__insert_pending_req(loop, (uv_req_t*) req);
1040 handle->reqs_pending++;
1044 /* Wait for completion via IOCP */
1045 handle->reqs_pending++;
1049 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1050 uv_loop_t* loop = server->loop;
1051 uv_pipe_t* pipe_client;
1052 uv_pipe_accept_t* req;
1054 uv__ipc_xfer_queue_item_t* item;
1058 if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
1059 /* No valid pending sockets. */
1060 return WSAEWOULDBLOCK;
1063 q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
1065 server->pipe.conn.ipc_xfer_queue_length--;
1066 item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
1068 err = uv__tcp_xfer_import(
1069 (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1076 pipe_client = (uv_pipe_t*) client;
1077 uv__pipe_connection_init(pipe_client);
1079 /* Find a connection instance that has been connected, but not yet
1081 req = server->pipe.serv.pending_accepts;
1084 /* No valid connections found, so we error out. */
1085 return WSAEWOULDBLOCK;
1088 /* Initialize the client handle and copy the pipeHandle to the client */
1089 pipe_client->handle = req->pipeHandle;
1090 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1092 /* Prepare the req to pick up a new connection */
1093 server->pipe.serv.pending_accepts = req->next_pending;
1094 req->next_pending = NULL;
1095 req->pipeHandle = INVALID_HANDLE_VALUE;
1097 server->handle = INVALID_HANDLE_VALUE;
1098 if (!(server->flags & UV_HANDLE_CLOSING)) {
1099 uv__pipe_queue_accept(loop, server, req, FALSE);
1107 /* Starts listening for connections for the given pipe. */
1108 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1109 uv_loop_t* loop = handle->loop;
1112 if (handle->flags & UV_HANDLE_LISTENING) {
1113 handle->stream.serv.connection_cb = cb;
1116 if (!(handle->flags & UV_HANDLE_BOUND)) {
1120 if (handle->flags & UV_HANDLE_READING) {
1124 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1125 return ERROR_NOT_SUPPORTED;
1132 handle->flags |= UV_HANDLE_LISTENING;
1133 INCREASE_ACTIVE_COUNT(loop, handle);
1134 handle->stream.serv.connection_cb = cb;
1136 /* First pipe handle should have already been created in uv_pipe_bind */
1137 assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1139 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1140 uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1147 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1148 uv_read_t* req = (uv_read_t*) arg;
1149 uv_pipe_t* handle = (uv_pipe_t*) req->data;
1150 uv_loop_t* loop = handle->loop;
1151 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1152 CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1157 assert(req->type == UV_READ);
1158 assert(handle->type == UV_NAMED_PIPE);
1162 /* Create a handle to the current thread. */
1163 if (!DuplicateHandle(GetCurrentProcess(),
1165 GetCurrentProcess(),
1169 DUPLICATE_SAME_ACCESS)) {
1170 err = GetLastError();
1174 /* The lock needs to be held when thread handle is modified. */
1175 EnterCriticalSection(lock);
1176 if (*thread_ptr == INVALID_HANDLE_VALUE) {
1177 /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1178 err = ERROR_OPERATION_ABORTED;
1180 /* Let main thread know which worker thread is doing the blocking read. */
1181 assert(*thread_ptr == NULL);
1182 *thread_ptr = thread;
1184 LeaveCriticalSection(lock);
1189 /* Block the thread until data is available on the pipe, or the read is
1191 if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1192 err = GetLastError();
1194 /* Let the main thread know the worker is past the point of blocking. */
1195 assert(thread == *thread_ptr);
1196 *thread_ptr = INVALID_HANDLE_VALUE;
1198 /* Briefly acquire the mutex. Since the main thread holds the lock while it
1199 * is spinning trying to cancel this thread's I/O, we will block here until
1200 * it stops doing that. */
1201 EnterCriticalSection(lock);
1202 LeaveCriticalSection(lock);
1205 /* Close the handle to the current thread. */
1206 CloseHandle(thread);
1209 /* Set request status and post a completion record to the IOCP. */
1211 SET_REQ_ERROR(req, err);
1213 SET_REQ_SUCCESS(req);
1214 POST_COMPLETION_FOR_REQ(loop, req);
1220 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1223 uv_write_t* req = (uv_write_t*) parameter;
1224 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1225 uv_loop_t* loop = handle->loop;
1227 assert(req != NULL);
1228 assert(req->type == UV_WRITE);
1229 assert(handle->type == UV_NAMED_PIPE);
1231 result = WriteFile(handle->handle,
1232 req->write_buffer.base,
1233 req->write_buffer.len,
1238 SET_REQ_ERROR(req, GetLastError());
1241 POST_COMPLETION_FOR_REQ(loop, req);
1246 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1250 req = (uv_read_t*) context;
1251 assert(req != NULL);
1252 handle = (uv_tcp_t*)req->data;
1253 assert(handle != NULL);
1256 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1257 req->u.io.overlapped.InternalHigh,
1259 &req->u.io.overlapped)) {
1260 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1265 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1269 req = (uv_write_t*) context;
1270 assert(req != NULL);
1271 handle = (uv_tcp_t*)req->handle;
1272 assert(handle != NULL);
1275 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1276 req->u.io.overlapped.InternalHigh,
1278 &req->u.io.overlapped)) {
1279 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1284 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1288 assert(handle->flags & UV_HANDLE_READING);
1289 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1291 assert(handle->handle != INVALID_HANDLE_VALUE);
1293 req = &handle->read_req;
1295 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1296 handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1297 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1299 WT_EXECUTELONGFUNCTION)) {
1300 /* Make this req pending reporting an error. */
1301 SET_REQ_ERROR(req, GetLastError());
1305 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1306 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1307 assert(req->event_handle != NULL);
1308 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1312 result = ReadFile(handle->handle,
1316 &req->u.io.overlapped);
1318 if (!result && GetLastError() != ERROR_IO_PENDING) {
1319 /* Make this req pending reporting an error. */
1320 SET_REQ_ERROR(req, GetLastError());
1324 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1325 if (req->wait_handle == INVALID_HANDLE_VALUE) {
1326 if (!RegisterWaitForSingleObject(&req->wait_handle,
1327 req->event_handle, post_completion_read_wait, (void*) req,
1328 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1329 SET_REQ_ERROR(req, GetLastError());
1336 /* Start the eof timer if there is one */
1337 eof_timer_start(handle);
1338 handle->flags |= UV_HANDLE_READ_PENDING;
1339 handle->reqs_pending++;
1343 uv__insert_pending_req(loop, (uv_req_t*)req);
1344 handle->flags |= UV_HANDLE_READ_PENDING;
1345 handle->reqs_pending++;
1349 int uv__pipe_read_start(uv_pipe_t* handle,
1350 uv_alloc_cb alloc_cb,
1351 uv_read_cb read_cb) {
1352 uv_loop_t* loop = handle->loop;
1354 handle->flags |= UV_HANDLE_READING;
1355 INCREASE_ACTIVE_COUNT(loop, handle);
1356 handle->read_cb = read_cb;
1357 handle->alloc_cb = alloc_cb;
1359 /* If reading was stopped and then started again, there could still be a read
1360 * request pending. */
1361 if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1362 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1363 handle->read_req.event_handle == NULL) {
1364 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1365 if (handle->read_req.event_handle == NULL) {
1366 uv_fatal_error(GetLastError(), "CreateEvent");
1369 uv__pipe_queue_read(loop, handle);
1376 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1378 req->next_req = NULL;
1379 if (handle->pipe.conn.non_overlapped_writes_tail) {
1381 handle->pipe.conn.non_overlapped_writes_tail->next_req;
1382 handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1383 handle->pipe.conn.non_overlapped_writes_tail = req;
1385 req->next_req = (uv_req_t*)req;
1386 handle->pipe.conn.non_overlapped_writes_tail = req;
1391 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1394 if (handle->pipe.conn.non_overlapped_writes_tail) {
1395 req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1397 if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1398 handle->pipe.conn.non_overlapped_writes_tail = NULL;
1400 handle->pipe.conn.non_overlapped_writes_tail->next_req =
1412 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1413 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1415 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1417 WT_EXECUTELONGFUNCTION)) {
1418 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1424 static int uv__build_coalesced_write_req(uv_write_t* user_req,
1425 const uv_buf_t bufs[],
1427 uv_write_t** req_out,
1428 uv_buf_t* write_buf_out) {
1429 /* Pack into a single heap-allocated buffer:
1430 * (a) a uv_write_t structure where libuv stores the actual state.
1431 * (b) a pointer to the original uv_write_t.
1432 * (c) data from all `bufs` entries.
1435 size_t heap_buffer_length, heap_buffer_offset;
1436 uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1437 char* data_start; /* (c) */
1441 /* Compute combined size of all combined buffers from `bufs`. */
1443 for (i = 0; i < nbufs; i++)
1444 data_length += bufs[i].len;
1446 /* The total combined size of data buffers should not exceed UINT32_MAX,
1447 * because WriteFile() won't accept buffers larger than that. */
1448 if (data_length > UINT32_MAX)
1449 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1451 /* Compute heap buffer size. */
1452 heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1453 data_length; /* (c) */
1455 /* Allocate buffer. */
1456 heap_buffer = uv__malloc(heap_buffer_length);
1457 if (heap_buffer == NULL)
1458 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1460 /* Copy uv_write_t information to the buffer. */
1461 coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1462 coalesced_write_req->req = *user_req; /* copy (a) */
1463 coalesced_write_req->req.coalesced = 1;
1464 coalesced_write_req->user_req = user_req; /* copy (b) */
1465 heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1467 /* Copy data buffers to the heap buffer. */
1468 data_start = &heap_buffer[heap_buffer_offset];
1469 for (i = 0; i < nbufs; i++) {
1470 memcpy(&heap_buffer[heap_buffer_offset],
1472 bufs[i].len); /* copy (c) */
1473 heap_buffer_offset += bufs[i].len; /* offset (c) */
1475 assert(heap_buffer_offset == heap_buffer_length);
1477 /* Set out arguments and return. */
1478 *req_out = &coalesced_write_req->req;
1479 *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1484 static int uv__pipe_write_data(uv_loop_t* loop,
1487 const uv_buf_t bufs[],
1495 assert(handle->handle != INVALID_HANDLE_VALUE);
1497 UV_REQ_INIT(req, UV_WRITE);
1498 req->handle = (uv_stream_t*) handle;
1499 req->send_handle = NULL;
1501 /* Private fields. */
1503 req->event_handle = NULL;
1504 req->wait_handle = INVALID_HANDLE_VALUE;
1506 /* Prepare the overlapped structure. */
1507 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1508 if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1509 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1510 if (req->event_handle == NULL) {
1511 uv_fatal_error(GetLastError(), "CreateEvent");
1513 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1515 req->write_buffer = uv_null_buf_;
1518 /* Write empty buffer. */
1519 write_buf = uv_null_buf_;
1520 } else if (nbufs == 1 && !copy_always) {
1521 /* Write directly from bufs[0]. */
1522 write_buf = bufs[0];
1524 /* Coalesce all `bufs` into one big buffer. This also creates a new
1525 * write-request structure that replaces the old one. */
1526 err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1531 if ((handle->flags &
1532 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1533 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1536 WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1539 err = GetLastError();
1542 /* Request completed immediately. */
1543 req->u.io.queued_bytes = 0;
1546 REGISTER_HANDLE_REQ(loop, handle, req);
1547 handle->reqs_pending++;
1548 handle->stream.conn.write_reqs_pending++;
1549 POST_COMPLETION_FOR_REQ(loop, req);
1551 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1552 req->write_buffer = write_buf;
1553 uv__insert_non_overlapped_write_req(handle, req);
1554 if (handle->stream.conn.write_reqs_pending == 0) {
1555 uv__queue_non_overlapped_write(handle);
1558 /* Request queued by the kernel. */
1559 req->u.io.queued_bytes = write_buf.len;
1560 handle->write_queue_size += req->u.io.queued_bytes;
1561 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1562 /* Using overlapped IO, but wait for completion before returning */
1563 result = WriteFile(handle->handle,
1567 &req->u.io.overlapped);
1569 if (!result && GetLastError() != ERROR_IO_PENDING) {
1570 err = GetLastError();
1571 CloseHandle(req->event_handle);
1572 req->event_handle = NULL;
1577 /* Request completed immediately. */
1578 req->u.io.queued_bytes = 0;
1580 /* Request queued by the kernel. */
1581 req->u.io.queued_bytes = write_buf.len;
1582 handle->write_queue_size += req->u.io.queued_bytes;
1583 if (WaitForSingleObject(req->event_handle, INFINITE) !=
1585 err = GetLastError();
1586 CloseHandle(req->event_handle);
1587 req->event_handle = NULL;
1591 CloseHandle(req->event_handle);
1592 req->event_handle = NULL;
1594 REGISTER_HANDLE_REQ(loop, handle, req);
1595 handle->reqs_pending++;
1596 handle->stream.conn.write_reqs_pending++;
1599 result = WriteFile(handle->handle,
1603 &req->u.io.overlapped);
1605 if (!result && GetLastError() != ERROR_IO_PENDING) {
1606 return GetLastError();
1610 /* Request completed immediately. */
1611 req->u.io.queued_bytes = 0;
1613 /* Request queued by the kernel. */
1614 req->u.io.queued_bytes = write_buf.len;
1615 handle->write_queue_size += req->u.io.queued_bytes;
1618 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1619 if (!RegisterWaitForSingleObject(&req->wait_handle,
1620 req->event_handle, post_completion_write_wait, (void*) req,
1621 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1622 return GetLastError();
1627 REGISTER_HANDLE_REQ(loop, handle, req);
1628 handle->reqs_pending++;
1629 handle->stream.conn.write_reqs_pending++;
1635 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1636 DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1638 /* If the both ends of the IPC pipe are owned by the same process,
1639 * the remote end pid may not yet be set. If so, do it here.
1640 * TODO: this is weird; it'd probably better to use a handshake. */
1642 *pid = GetCurrentProcessId();
1648 int uv__pipe_write_ipc(uv_loop_t* loop,
1651 const uv_buf_t data_bufs[],
1652 size_t data_buf_count,
1653 uv_stream_t* send_handle,
1655 uv_buf_t stack_bufs[6];
1657 size_t buf_count, buf_index;
1658 uv__ipc_frame_header_t frame_header;
1659 uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1660 uv__ipc_socket_xfer_info_t xfer_info;
1661 uint64_t data_length;
1665 /* Compute the combined size of data buffers. */
1667 for (i = 0; i < data_buf_count; i++)
1668 data_length += data_bufs[i].len;
1669 if (data_length > UINT32_MAX)
1670 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1672 /* Prepare the frame's socket xfer payload. */
1673 if (send_handle != NULL) {
1674 uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1676 /* Verify that `send_handle` it is indeed a tcp handle. */
1677 if (send_tcp_handle->type != UV_TCP)
1678 return ERROR_NOT_SUPPORTED;
1680 /* Export the tcp handle. */
1681 err = uv__tcp_xfer_export(send_tcp_handle,
1682 uv__pipe_get_ipc_remote_pid(handle),
1689 /* Compute the number of uv_buf_t's required. */
1690 buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1691 if (send_handle != NULL)
1692 buf_count += 1; /* One extra for the socket xfer information. */
1694 /* Use the on-stack buffer array if it is big enough; otherwise allocate
1695 * space for it on the heap. */
1696 if (buf_count < ARRAY_SIZE(stack_bufs)) {
1697 /* Use on-stack buffer array. */
1700 /* Use heap-allocated buffer array. */
1701 bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1703 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1707 /* Initialize frame header and add it to the buffers list. */
1708 memset(&frame_header, 0, sizeof frame_header);
1709 bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1711 if (send_handle != NULL) {
1712 /* Add frame header flags. */
1713 switch (xfer_type) {
1714 case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1715 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1716 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1718 case UV__IPC_SOCKET_XFER_TCP_SERVER:
1719 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1722 assert(0); /* Unreachable. */
1724 /* Add xfer info buffer. */
1725 bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1728 if (data_length > 0) {
1729 /* Update frame header. */
1730 frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1731 frame_header.data_length = (uint32_t) data_length;
1732 /* Add data buffers to buffers list. */
1733 for (i = 0; i < data_buf_count; i++)
1734 bufs[buf_index++] = data_bufs[i];
1737 /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1738 * some of the written data lives on the stack. */
1739 err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1741 /* If we had to heap-allocate the bufs array, free it now. */
1742 if (bufs != stack_bufs) {
1750 int uv__pipe_write(uv_loop_t* loop,
1753 const uv_buf_t bufs[],
1755 uv_stream_t* send_handle,
1758 /* IPC pipe write: use framing protocol. */
1759 return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1761 /* Non-IPC pipe write: put data on the wire directly. */
1762 assert(send_handle == NULL);
1763 return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1768 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1770 /* If there is an eof timer running, we don't need it any more, so discard
1772 eof_timer_destroy(handle);
1774 uv_read_stop((uv_stream_t*) handle);
1776 handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1780 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1782 /* If there is an eof timer running, we don't need it any more, so discard
1784 eof_timer_destroy(handle);
1786 uv_read_stop((uv_stream_t*) handle);
1788 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1792 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1793 int error, uv_buf_t buf) {
1794 if (error == ERROR_BROKEN_PIPE) {
1795 uv__pipe_read_eof(loop, handle, buf);
1797 uv__pipe_read_error(loop, handle, error, buf);
1802 static void uv__pipe_queue_ipc_xfer_info(
1804 uv__ipc_socket_xfer_type_t xfer_type,
1805 uv__ipc_socket_xfer_info_t* xfer_info) {
1806 uv__ipc_xfer_queue_item_t* item;
1808 item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1810 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1812 item->xfer_type = xfer_type;
1813 item->xfer_info = *xfer_info;
1815 QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1816 handle->pipe.conn.ipc_xfer_queue_length++;
1820 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
1821 * encountered before the requested number of bytes are read, an error is
1823 static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1824 DWORD bytes_read, bytes_read_now;
1827 while (bytes_read < count) {
1829 (char*) buffer + bytes_read,
1833 return GetLastError();
1836 bytes_read += bytes_read_now;
1839 assert(bytes_read == count);
1844 static DWORD uv__pipe_read_data(uv_loop_t* loop,
1846 DWORD suggested_bytes,
1851 /* Ask the user for a buffer to read data into. */
1852 buf = uv_buf_init(NULL, 0);
1853 handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1854 if (buf.base == NULL || buf.len == 0) {
1855 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1856 return 0; /* Break out of read loop. */
1859 /* Ensure we read at most the smaller of:
1860 * (a) the length of the user-allocated buffer.
1861 * (b) the maximum data length as specified by the `max_bytes` argument.
1863 if (max_bytes > buf.len)
1864 max_bytes = buf.len;
1866 /* Read into the user buffer. */
1867 if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1868 uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1869 return 0; /* Break out of read loop. */
1872 /* Call the read callback. */
1873 handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1879 static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1880 uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1883 if (*data_remaining > 0) {
1884 /* Read frame data payload. */
1886 uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1887 *data_remaining -= bytes_read;
1891 /* Start of a new IPC frame. */
1892 uv__ipc_frame_header_t frame_header;
1893 uint32_t xfer_flags;
1894 uv__ipc_socket_xfer_type_t xfer_type;
1895 uv__ipc_socket_xfer_info_t xfer_info;
1897 /* Read the IPC frame header. */
1898 err = uv__pipe_read_exactly(
1899 handle->handle, &frame_header, sizeof frame_header);
1903 /* Validate that flags are valid. */
1904 if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
1906 /* Validate that reserved2 is zero. */
1907 if (frame_header.reserved2 != 0)
1910 /* Parse xfer flags. */
1911 xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
1912 if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
1913 /* Socket coming -- determine the type. */
1914 xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
1915 ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
1916 : UV__IPC_SOCKET_XFER_TCP_SERVER;
1917 } else if (xfer_flags == 0) {
1919 xfer_type = UV__IPC_SOCKET_XFER_NONE;
1921 /* Invalid flags. */
1925 /* Parse data frame information. */
1926 if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
1927 *data_remaining = frame_header.data_length;
1928 } else if (frame_header.data_length != 0) {
1929 /* Data length greater than zero but data flag not set -- invalid. */
1933 /* If no socket xfer info follows, return here. Data will be read in a
1934 * subsequent invocation of uv__pipe_read_ipc(). */
1935 if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
1936 return sizeof frame_header; /* Number of bytes read. */
1938 /* Read transferred socket information. */
1939 err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
1943 /* Store the pending socket info. */
1944 uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
1946 /* Return number of bytes read. */
1947 return sizeof frame_header + sizeof xfer_info;
1951 /* Invalid frame. */
1952 err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
1955 uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1956 return 0; /* Break out of read loop. */
1960 void uv__process_pipe_read_req(uv_loop_t* loop,
1963 assert(handle->type == UV_NAMED_PIPE);
1965 handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
1966 DECREASE_PENDING_REQ_COUNT(handle);
1967 eof_timer_stop(handle);
1969 /* At this point, we're done with bookkeeping. If the user has stopped
1970 * reading the pipe in the meantime, there is nothing left to do, since there
1971 * is no callback that we can call. */
1972 if (!(handle->flags & UV_HANDLE_READING))
1975 if (!REQ_SUCCESS(req)) {
1976 /* An error occurred doing the zero-read. */
1977 DWORD err = GET_REQ_ERROR(req);
1979 /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
1980 * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
1981 * the user; we'll start a new zero-read at the end of this function. */
1982 if (err != ERROR_OPERATION_ABORTED)
1983 uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1986 /* The zero-read completed without error, indicating there is data
1987 * available in the kernel buffer. */
1990 /* Get the number of bytes available. */
1992 if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
1993 uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1995 /* Read until we've either read all the bytes available, or the 'reading'
1996 * flag is cleared. */
1997 while (avail > 0 && handle->flags & UV_HANDLE_READING) {
1998 /* Depending on the type of pipe, read either IPC frames or raw data. */
2000 handle->ipc ? uv__pipe_read_ipc(loop, handle)
2001 : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2003 /* If no bytes were read, treat this as an indication that an error
2004 * occurred, and break out of the read loop. */
2005 if (bytes_read == 0)
2008 /* It is possible that more bytes were read than we thought were
2009 * available. To prevent `avail` from underflowing, break out of the loop
2010 * if this is the case. */
2011 if (bytes_read > avail)
2014 /* Recompute the number of bytes available. */
2015 avail -= bytes_read;
2019 /* Start another zero-read request if necessary. */
2020 if ((handle->flags & UV_HANDLE_READING) &&
2021 !(handle->flags & UV_HANDLE_READ_PENDING)) {
2022 uv__pipe_queue_read(loop, handle);
2027 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2031 assert(handle->type == UV_NAMED_PIPE);
2033 assert(handle->write_queue_size >= req->u.io.queued_bytes);
2034 handle->write_queue_size -= req->u.io.queued_bytes;
2036 UNREGISTER_HANDLE_REQ(loop, handle, req);
2038 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2039 if (req->wait_handle != INVALID_HANDLE_VALUE) {
2040 UnregisterWait(req->wait_handle);
2041 req->wait_handle = INVALID_HANDLE_VALUE;
2043 if (req->event_handle) {
2044 CloseHandle(req->event_handle);
2045 req->event_handle = NULL;
2049 err = GET_REQ_ERROR(req);
2051 /* If this was a coalesced write, extract pointer to the user_provided
2052 * uv_write_t structure so we can pass the expected pointer to the callback,
2053 * then free the heap-allocated write req. */
2054 if (req->coalesced) {
2055 uv__coalesced_write_t* coalesced_write =
2056 container_of(req, uv__coalesced_write_t, req);
2057 req = coalesced_write->user_req;
2058 uv__free(coalesced_write);
2061 req->cb(req, uv_translate_sys_error(err));
2064 handle->stream.conn.write_reqs_pending--;
2066 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2067 handle->pipe.conn.non_overlapped_writes_tail) {
2068 assert(handle->stream.conn.write_reqs_pending > 0);
2069 uv__queue_non_overlapped_write(handle);
2072 if (handle->stream.conn.write_reqs_pending == 0)
2073 if (handle->flags & UV_HANDLE_SHUTTING)
2074 uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2076 DECREASE_PENDING_REQ_COUNT(handle);
2080 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2081 uv_req_t* raw_req) {
2082 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2084 assert(handle->type == UV_NAMED_PIPE);
2086 if (handle->flags & UV_HANDLE_CLOSING) {
2087 /* The req->pipeHandle should be freed already in uv__pipe_close(). */
2088 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2089 DECREASE_PENDING_REQ_COUNT(handle);
2093 if (REQ_SUCCESS(req)) {
2094 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2095 req->next_pending = handle->pipe.serv.pending_accepts;
2096 handle->pipe.serv.pending_accepts = req;
2098 if (handle->stream.serv.connection_cb) {
2099 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2102 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2103 CloseHandle(req->pipeHandle);
2104 req->pipeHandle = INVALID_HANDLE_VALUE;
2106 if (!(handle->flags & UV_HANDLE_CLOSING)) {
2107 uv__pipe_queue_accept(loop, handle, req, FALSE);
2111 DECREASE_PENDING_REQ_COUNT(handle);
2115 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2116 uv_connect_t* req) {
2121 assert(handle->type == UV_NAMED_PIPE);
2123 UNREGISTER_HANDLE_REQ(loop, handle, req);
2126 if (REQ_SUCCESS(req)) {
2127 pipeHandle = req->u.connect.pipeHandle;
2128 duplex_flags = req->u.connect.duplex_flags;
2129 err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2131 CloseHandle(pipeHandle);
2133 err = uv_translate_sys_error(GET_REQ_ERROR(req));
2139 DECREASE_PENDING_REQ_COUNT(handle);
2144 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2145 uv_shutdown_t* req) {
2148 assert(handle->type == UV_NAMED_PIPE);
2150 /* Clear the shutdown_req field so we don't go here again. */
2151 handle->stream.conn.shutdown_req = NULL;
2152 handle->flags &= ~UV_HANDLE_SHUTTING;
2153 UNREGISTER_HANDLE_REQ(loop, handle, req);
2155 if (handle->flags & UV_HANDLE_CLOSING) {
2156 /* Already closing. Cancel the shutdown. */
2158 } else if (!REQ_SUCCESS(req)) {
2159 /* An error occurred in trying to shutdown gracefully. */
2160 err = uv_translate_sys_error(GET_REQ_ERROR(req));
2162 if (handle->flags & UV_HANDLE_READABLE) {
2163 /* Initialize and optionally start the eof timer. Only do this if the pipe
2164 * is readable and we haven't seen EOF come in ourselves. */
2165 eof_timer_init(handle);
2167 /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2169 if (handle->flags & UV_HANDLE_READ_PENDING) {
2170 eof_timer_start(handle);
2174 /* This pipe is not readable. We can just close it to let the other end
2175 * know that we're done writing. */
2184 DECREASE_PENDING_REQ_COUNT(handle);
2188 static void eof_timer_init(uv_pipe_t* pipe) {
2191 assert(pipe->pipe.conn.eof_timer == NULL);
2192 assert(pipe->flags & UV_HANDLE_CONNECTION);
2194 pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2196 r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2197 assert(r == 0); /* timers can't fail */
2199 pipe->pipe.conn.eof_timer->data = pipe;
2200 uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2204 static void eof_timer_start(uv_pipe_t* pipe) {
2205 assert(pipe->flags & UV_HANDLE_CONNECTION);
2207 if (pipe->pipe.conn.eof_timer != NULL) {
2208 uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2213 static void eof_timer_stop(uv_pipe_t* pipe) {
2214 assert(pipe->flags & UV_HANDLE_CONNECTION);
2216 if (pipe->pipe.conn.eof_timer != NULL) {
2217 uv_timer_stop(pipe->pipe.conn.eof_timer);
2222 static void eof_timer_cb(uv_timer_t* timer) {
2223 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2224 uv_loop_t* loop = timer->loop;
2226 assert(pipe->type == UV_NAMED_PIPE);
2228 /* This should always be true, since we start the timer only in
2229 * uv__pipe_queue_read after successfully calling ReadFile, or in
2230 * uv__process_pipe_shutdown_req if a read is pending, and we always
2231 * immediately stop the timer in uv__process_pipe_read_req. */
2232 assert(pipe->flags & UV_HANDLE_READ_PENDING);
2234 /* If there are many packets coming off the iocp then the timer callback may
2235 * be called before the read request is coming off the queue. Therefore we
2236 * check here if the read request has completed but will be processed later.
2238 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2239 HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2243 /* Force both ends off the pipe. */
2246 /* Stop reading, so the pending read that is going to fail will not be
2247 * reported to the user. */
2248 uv_read_stop((uv_stream_t*) pipe);
2250 /* Report the eof and update flags. This will get reported even if the user
2251 * stopped reading in the meantime. TODO: is that okay? */
2252 uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2256 static void eof_timer_destroy(uv_pipe_t* pipe) {
2257 assert(pipe->flags & UV_HANDLE_CONNECTION);
2259 if (pipe->pipe.conn.eof_timer) {
2260 uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2261 pipe->pipe.conn.eof_timer = NULL;
2266 static void eof_timer_close_cb(uv_handle_t* handle) {
2267 assert(handle->type == UV_TIMER);
2272 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2273 HANDLE os_handle = uv__get_osfhandle(file);
2275 IO_STATUS_BLOCK io_status;
2276 FILE_ACCESS_INFORMATION access;
2277 DWORD duplex_flags = 0;
2280 if (os_handle == INVALID_HANDLE_VALUE)
2282 if (pipe->flags & UV_HANDLE_PIPESERVER)
2284 if (pipe->flags & UV_HANDLE_CONNECTION)
2287 uv__pipe_connection_init(pipe);
2289 /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2290 * underlying OS handle and forget about the original fd.
2291 * We could also opt to use the original OS handle and just never close it,
2292 * but then there would be no reliable way to cancel pending read operations
2296 if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2298 INVALID_HANDLE_VALUE,
2302 DUPLICATE_SAME_ACCESS))
2303 return uv_translate_sys_error(GetLastError());
2304 assert(os_handle != INVALID_HANDLE_VALUE);
2308 /* Determine what kind of permissions we have on this handle.
2309 * Cygwin opens the pipe in message mode, but we can support it,
2310 * just query the access flags and set the stream flags accordingly.
2312 nt_status = pNtQueryInformationFile(os_handle,
2316 FileAccessInformation);
2317 if (nt_status != STATUS_SUCCESS)
2321 if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2322 !(access.AccessFlags & FILE_READ_DATA)) {
2327 if (access.AccessFlags & FILE_WRITE_DATA)
2328 duplex_flags |= UV_HANDLE_WRITABLE;
2329 if (access.AccessFlags & FILE_READ_DATA)
2330 duplex_flags |= UV_HANDLE_READABLE;
2332 err = uv__set_pipe_handle(pipe->loop,
2339 CloseHandle(os_handle);
2344 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2345 pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
2346 assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2352 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2354 IO_STATUS_BLOCK io_status;
2355 FILE_NAME_INFORMATION tmp_name_info;
2356 FILE_NAME_INFORMATION* name_info;
2358 unsigned int addrlen;
2359 unsigned int name_size;
2360 unsigned int name_len;
2366 if (handle->name != NULL) {
2367 /* The user might try to query the name before we are connected,
2368 * and this is just easier to return the cached value if we have it. */
2369 name_buf = handle->name;
2370 name_len = wcslen(name_buf);
2372 /* check how much space we need */
2373 addrlen = WideCharToMultiByte(CP_UTF8,
2383 err = uv_translate_sys_error(GetLastError());
2385 } else if (addrlen >= *size) {
2386 *size = addrlen + 1;
2391 addrlen = WideCharToMultiByte(CP_UTF8,
2401 err = uv_translate_sys_error(GetLastError());
2406 buffer[addrlen] = '\0';
2411 if (handle->handle == INVALID_HANDLE_VALUE) {
2416 /* NtQueryInformationFile will block if another thread is performing a
2417 * blocking operation on the queried handle. If the pipe handle is
2418 * synchronous, there may be a worker thread currently calling ReadFile() on
2419 * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2421 if (handle->flags & UV_HANDLE_CONNECTION &&
2422 handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2423 uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2426 nt_status = pNtQueryInformationFile(handle->handle,
2429 sizeof tmp_name_info,
2430 FileNameInformation);
2431 if (nt_status == STATUS_BUFFER_OVERFLOW) {
2432 name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2433 name_info = uv__malloc(name_size);
2440 nt_status = pNtQueryInformationFile(handle->handle,
2444 FileNameInformation);
2447 if (nt_status != STATUS_SUCCESS) {
2449 err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2454 /* the struct on stack was used */
2455 name_buf = tmp_name_info.FileName;
2456 name_len = tmp_name_info.FileNameLength;
2458 name_buf = name_info->FileName;
2459 name_len = name_info->FileNameLength;
2462 if (name_len == 0) {
2468 name_len /= sizeof(WCHAR);
2470 /* check how much space we need */
2471 addrlen = WideCharToMultiByte(CP_UTF8,
2481 err = uv_translate_sys_error(GetLastError());
2483 } else if (pipe_prefix_len + addrlen >= *size) {
2484 /* "\\\\.\\pipe" + name */
2485 *size = pipe_prefix_len + addrlen + 1;
2490 memcpy(buffer, pipe_prefix, pipe_prefix_len);
2491 addrlen = WideCharToMultiByte(CP_UTF8,
2495 buffer+pipe_prefix_len,
2496 *size-pipe_prefix_len,
2501 err = uv_translate_sys_error(GetLastError());
2505 addrlen += pipe_prefix_len;
2507 buffer[addrlen] = '\0';
2512 uv__free(name_info);
2519 int uv_pipe_pending_count(uv_pipe_t* handle) {
2522 return handle->pipe.conn.ipc_xfer_queue_length;
2526 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2527 if (handle->flags & UV_HANDLE_BOUND)
2528 return uv__pipe_getname(handle, buffer, size);
2530 if (handle->flags & UV_HANDLE_CONNECTION ||
2531 handle->handle != INVALID_HANDLE_VALUE) {
2540 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2541 /* emulate unix behaviour */
2542 if (handle->flags & UV_HANDLE_BOUND)
2545 if (handle->handle != INVALID_HANDLE_VALUE)
2546 return uv__pipe_getname(handle, buffer, size);
2548 if (handle->flags & UV_HANDLE_CONNECTION) {
2549 if (handle->name != NULL)
2550 return uv__pipe_getname(handle, buffer, size);
2557 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2559 return UV_UNKNOWN_HANDLE;
2560 if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2561 return UV_UNKNOWN_HANDLE;
2566 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2567 SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2568 PACL old_dacl, new_dacl;
2569 PSECURITY_DESCRIPTOR sd;
2574 if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2577 if (mode != UV_READABLE &&
2578 mode != UV_WRITABLE &&
2579 mode != (UV_WRITABLE | UV_READABLE))
2582 if (!AllocateAndInitializeSid(&sid_world,
2585 0, 0, 0, 0, 0, 0, 0,
2587 error = GetLastError();
2591 if (GetSecurityInfo(handle->handle,
2593 DACL_SECURITY_INFORMATION,
2599 error = GetLastError();
2603 memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2604 if (mode & UV_READABLE)
2605 ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2606 if (mode & UV_WRITABLE)
2607 ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2608 ea.grfAccessPermissions |= SYNCHRONIZE;
2609 ea.grfAccessMode = SET_ACCESS;
2610 ea.grfInheritance = NO_INHERITANCE;
2611 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2612 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2613 ea.Trustee.ptstrName = (LPTSTR)everyone;
2615 if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2616 error = GetLastError();
2620 if (SetSecurityInfo(handle->handle,
2622 DACL_SECURITY_INFORMATION,
2627 error = GetLastError();
2634 LocalFree((HLOCAL) new_dacl);
2636 LocalFree((HLOCAL) sd);
2640 return uv_translate_sys_error(error);