1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
28 #include "../uv-common.h"
32 /* A zero-size buffer for use by uv_pipe_read */
33 static char uv_zero_[] = "";
36 static const uv_buf_t uv_null_buf_ = { 0, NULL };
38 /* The timeout that the pipe will wait for the remote end to write data */
39 /* when the local ends wants to shut it down. */
40 static const int64_t eof_timeout = 50; /* ms */
42 static const int default_pending_pipe_instances = 4;
44 /* IPC protocol flags. */
45 #define UV_IPC_RAW_DATA 0x0001
46 #define UV_IPC_UV_STREAM 0x0002
48 /* IPC frame header. */
51 uint64_t raw_data_length;
52 } uv_ipc_frame_header_t;
54 /* IPC frame, which contains an imported TCP socket stream. */
56 uv_ipc_frame_header_t header;
57 WSAPROTOCOL_INFOW socket_info;
58 } uv_ipc_frame_uv_stream;
60 static void eof_timer_init(uv_pipe_t* pipe);
61 static void eof_timer_start(uv_pipe_t* pipe);
62 static void eof_timer_stop(uv_pipe_t* pipe);
63 static void eof_timer_cb(uv_timer_t* timer, int status);
64 static void eof_timer_destroy(uv_pipe_t* pipe);
65 static void eof_timer_close_cb(uv_handle_t* handle);
68 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
69 _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId());
73 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
74 uv_stream_init(loop, (uv_stream_t*)handle);
76 handle->type = UV_NAMED_PIPE;
77 handle->reqs_pending = 0;
78 handle->handle = INVALID_HANDLE_VALUE;
81 handle->remaining_ipc_rawdata_bytes = 0;
82 handle->pending_socket_info = NULL;
84 handle->non_overlapped_writes_tail = NULL;
86 uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
88 loop->counters.pipe_init++;
94 static void uv_pipe_connection_init(uv_pipe_t* handle) {
95 uv_connection_init((uv_stream_t*) handle);
96 handle->read_req.data = handle;
97 handle->eof_timer = NULL;
101 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
102 char* name, size_t nameSize) {
106 char* ptr = (char*)handle;
109 uv_unique_pipe_name(ptr, name, nameSize);
111 pipeHandle = CreateNamedPipeA(name,
112 access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
113 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
116 if (pipeHandle != INVALID_HANDLE_VALUE) {
117 /* No name collisions. We're done. */
121 errno = GetLastError();
122 if (errno != ERROR_PIPE_BUSY && errno != ERROR_ACCESS_DENIED) {
123 uv__set_sys_error(loop, errno);
128 /* Pipe name collision. Increment the pointer and try again. */
132 if (CreateIoCompletionPort(pipeHandle,
136 uv__set_sys_error(loop, GetLastError());
141 uv_pipe_connection_init(handle);
142 handle->handle = pipeHandle;
146 if (err && pipeHandle != INVALID_HANDLE_VALUE) {
147 CloseHandle(pipeHandle);
154 static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
157 IO_STATUS_BLOCK io_status;
158 FILE_MODE_INFORMATION mode_info;
159 DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
161 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
165 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
166 nt_status = pNtQueryInformationFile(pipeHandle,
170 FileModeInformation);
171 if (nt_status != STATUS_SUCCESS) {
175 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
176 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
177 /* Non-overlapped pipe. */
178 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
180 /* Overlapped pipe. Try to associate with IOCP. */
181 if (CreateIoCompletionPort(pipeHandle,
185 handle->flags |= UV_HANDLE_EMULATE_IOCP;
193 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
199 req = (uv_shutdown_t*) parameter;
201 handle = (uv_pipe_t*) req->handle;
206 FlushFileBuffers(handle->handle);
209 POST_COMPLETION_FOR_REQ(loop, req);
215 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
216 unsigned int uv_alloced;
220 IO_STATUS_BLOCK io_status;
221 FILE_PIPE_LOCAL_INFORMATION pipe_info;
223 if (handle->flags & UV_HANDLE_SHUTTING &&
224 !(handle->flags & UV_HANDLE_SHUT) &&
225 handle->write_reqs_pending == 0) {
226 req = handle->shutdown_req;
228 /* Try to avoid flushing the pipe buffer in the thread pool. */
229 nt_status = pNtQueryInformationFile(handle->handle,
233 FilePipeLocalInformation);
235 if (nt_status != STATUS_SUCCESS) {
237 handle->flags &= ~UV_HANDLE_SHUTTING;
239 uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status));
242 DECREASE_PENDING_REQ_COUNT(handle);
246 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
247 handle->flags |= UV_HANDLE_SHUT;
249 /* Short-circuit, no need to call FlushFileBuffers. */
250 uv_insert_pending_req(loop, (uv_req_t*) req);
254 /* Run FlushFileBuffers in the thread pool. */
255 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
257 WT_EXECUTELONGFUNCTION);
259 /* Mark the handle as shut now to avoid going through this again. */
260 handle->flags |= UV_HANDLE_SHUT;
265 handle->flags &= ~UV_HANDLE_SHUTTING;
267 uv__set_sys_error(loop, GetLastError());
270 DECREASE_PENDING_REQ_COUNT(handle);
275 if (handle->flags & UV_HANDLE_CLOSING &&
276 handle->reqs_pending == 0) {
277 assert(!(handle->flags & UV_HANDLE_CLOSED));
278 handle->flags |= UV_HANDLE_CLOSED;
280 if (handle->flags & UV_HANDLE_CONNECTION) {
281 if (handle->pending_socket_info) {
282 free(handle->pending_socket_info);
283 handle->pending_socket_info = NULL;
286 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
287 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
288 UnregisterWait(handle->read_req.wait_handle);
289 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
291 if (handle->read_req.event_handle) {
292 CloseHandle(handle->read_req.event_handle);
293 handle->read_req.event_handle = NULL;
298 if (handle->flags & UV_HANDLE_PIPESERVER) {
299 assert(handle->accept_reqs);
300 free(handle->accept_reqs);
301 handle->accept_reqs = NULL;
304 /* Remember the state of this flag because the close callback is */
305 /* allowed to clobber or free the handle's memory */
306 uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED;
308 if (handle->close_cb) {
309 handle->close_cb((uv_handle_t*)handle);
321 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
322 handle->pending_instances = count;
323 handle->flags |= UV_HANDLE_PIPESERVER;
327 /* Creates a pipe server. */
328 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
329 uv_loop_t* loop = handle->loop;
330 int i, errno, nameSize;
331 uv_pipe_accept_t* req;
333 if (handle->flags & UV_HANDLE_BOUND) {
334 uv__set_sys_error(loop, WSAEINVAL);
339 uv__set_sys_error(loop, WSAEINVAL);
343 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
344 handle->pending_instances = default_pending_pipe_instances;
347 handle->accept_reqs = (uv_pipe_accept_t*)
348 malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
349 if (!handle->accept_reqs) {
350 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
353 for (i = 0; i < handle->pending_instances; i++) {
354 req = &handle->accept_reqs[i];
355 uv_req_init(loop, (uv_req_t*) req);
356 req->type = UV_ACCEPT;
358 req->pipeHandle = INVALID_HANDLE_VALUE;
359 req->next_pending = NULL;
362 /* Convert name to UTF16. */
363 nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(wchar_t);
364 handle->name = (wchar_t*)malloc(nameSize);
366 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
369 if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(wchar_t))) {
370 uv__set_sys_error(loop, GetLastError());
375 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
376 * If this fails then there's already a pipe server for the given pipe name.
378 handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
379 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
380 FILE_FLAG_FIRST_PIPE_INSTANCE,
381 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
382 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
384 if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
385 errno = GetLastError();
386 if (errno == ERROR_ACCESS_DENIED) {
387 uv__set_error(loop, UV_EADDRINUSE, errno);
388 } else if (errno == ERROR_PATH_NOT_FOUND || errno == ERROR_INVALID_NAME) {
389 uv__set_error(loop, UV_EACCES, errno);
391 uv__set_sys_error(loop, errno);
396 if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle)) {
397 uv__set_sys_error(loop, GetLastError());
401 handle->pending_accepts = NULL;
402 handle->flags |= UV_HANDLE_PIPESERVER;
403 handle->flags |= UV_HANDLE_BOUND;
413 if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
414 CloseHandle(handle->accept_reqs[0].pipeHandle);
415 handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
422 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
423 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
429 req = (uv_connect_t*) parameter;
431 handle = (uv_pipe_t*) req->handle;
436 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
437 /* We wait for the pipe to become available with WaitNamedPipe. */
438 while (WaitNamedPipeW(handle->name, 30000)) {
439 /* The pipe is now available, try to connect. */
440 pipeHandle = CreateFileW(handle->name,
441 GENERIC_READ | GENERIC_WRITE,
445 FILE_FLAG_OVERLAPPED,
448 if (pipeHandle != INVALID_HANDLE_VALUE) {
455 if (pipeHandle != INVALID_HANDLE_VALUE &&
456 !uv_set_pipe_handle(loop, handle, pipeHandle)) {
457 handle->handle = pipeHandle;
458 SET_REQ_SUCCESS(req);
460 SET_REQ_ERROR(req, GetLastError());
464 POST_COMPLETION_FOR_REQ(loop, req);
470 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
471 const char* name, uv_connect_cb cb) {
472 uv_loop_t* loop = handle->loop;
476 handle->handle = INVALID_HANDLE_VALUE;
478 uv_req_init(loop, (uv_req_t*) req);
479 req->type = UV_CONNECT;
480 req->handle = (uv_stream_t*) handle;
483 /* Convert name to UTF16. */
484 nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(wchar_t);
485 handle->name = (wchar_t*)malloc(nameSize);
487 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
490 if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(wchar_t))) {
491 errno = GetLastError();
495 pipeHandle = CreateFileW(handle->name,
496 GENERIC_READ | GENERIC_WRITE,
500 FILE_FLAG_OVERLAPPED,
503 if (pipeHandle == INVALID_HANDLE_VALUE) {
504 if (GetLastError() == ERROR_PIPE_BUSY) {
505 /* Wait for the server to make a pipe instance available. */
506 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
508 WT_EXECUTELONGFUNCTION)) {
509 errno = GetLastError();
513 handle->reqs_pending++;
518 errno = GetLastError();
522 if (uv_set_pipe_handle(loop, (uv_pipe_t*)req->handle, pipeHandle)) {
523 errno = GetLastError();
527 handle->handle = pipeHandle;
529 SET_REQ_SUCCESS(req);
530 uv_insert_pending_req(loop, (uv_req_t*) req);
531 handle->reqs_pending++;
540 if (pipeHandle != INVALID_HANDLE_VALUE) {
541 CloseHandle(pipeHandle);
544 /* Make this req pending reporting an error. */
545 SET_REQ_ERROR(req, errno);
546 uv_insert_pending_req(loop, (uv_req_t*) req);
547 handle->reqs_pending++;
552 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
554 void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
563 if (handle->flags & UV_HANDLE_PIPESERVER) {
564 for (i = 0; i < handle->pending_instances; i++) {
565 pipeHandle = handle->accept_reqs[i].pipeHandle;
566 if (pipeHandle != INVALID_HANDLE_VALUE) {
567 CloseHandle(pipeHandle);
568 handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
573 if (handle->flags & UV_HANDLE_CONNECTION) {
574 eof_timer_destroy(handle);
577 if ((handle->flags & UV_HANDLE_CONNECTION)
578 && handle->handle != INVALID_HANDLE_VALUE) {
579 CloseHandle(handle->handle);
580 handle->handle = INVALID_HANDLE_VALUE;
583 handle->flags |= UV_HANDLE_SHUT;
587 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
588 uv_pipe_accept_t* req, BOOL firstInstance) {
589 assert(handle->flags & UV_HANDLE_LISTENING);
591 if (!firstInstance) {
592 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
594 req->pipeHandle = CreateNamedPipeW(handle->name,
595 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
596 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
597 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
599 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
600 SET_REQ_ERROR(req, GetLastError());
601 uv_insert_pending_req(loop, (uv_req_t*) req);
602 handle->reqs_pending++;
606 if (uv_set_pipe_handle(loop, handle, req->pipeHandle)) {
607 CloseHandle(req->pipeHandle);
608 req->pipeHandle = INVALID_HANDLE_VALUE;
609 SET_REQ_ERROR(req, GetLastError());
610 uv_insert_pending_req(loop, (uv_req_t*) req);
611 handle->reqs_pending++;
616 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
618 /* Prepare the overlapped structure. */
619 memset(&(req->overlapped), 0, sizeof(req->overlapped));
621 if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
622 GetLastError() != ERROR_IO_PENDING) {
623 if (GetLastError() == ERROR_PIPE_CONNECTED) {
624 SET_REQ_SUCCESS(req);
626 CloseHandle(req->pipeHandle);
627 req->pipeHandle = INVALID_HANDLE_VALUE;
628 /* Make this req pending reporting an error. */
629 SET_REQ_ERROR(req, GetLastError());
631 uv_insert_pending_req(loop, (uv_req_t*) req);
632 handle->reqs_pending++;
636 handle->reqs_pending++;
640 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
641 uv_loop_t* loop = server->loop;
642 uv_pipe_t* pipe_client;
643 uv_pipe_accept_t* req;
646 if (!server->pending_socket_info) {
647 /* No valid pending sockets. */
648 uv__set_sys_error(loop, WSAEWOULDBLOCK);
652 return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
654 pipe_client = (uv_pipe_t*)client;
656 /* Find a connection instance that has been connected, but not yet */
658 req = server->pending_accepts;
661 /* No valid connections found, so we error out. */
662 uv__set_sys_error(loop, WSAEWOULDBLOCK);
666 /* Initialize the client handle and copy the pipeHandle to the client */
667 uv_pipe_connection_init(pipe_client);
668 pipe_client->handle = req->pipeHandle;
670 /* Prepare the req to pick up a new connection */
671 server->pending_accepts = req->next_pending;
672 req->next_pending = NULL;
673 req->pipeHandle = INVALID_HANDLE_VALUE;
675 if (!(server->flags & UV_HANDLE_CLOSING)) {
676 uv_pipe_queue_accept(loop, server, req, FALSE);
684 /* Starts listening for connections for the given pipe. */
685 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
686 uv_loop_t* loop = handle->loop;
690 if (!(handle->flags & UV_HANDLE_BOUND)) {
691 uv__set_artificial_error(loop, UV_EINVAL);
695 if (handle->flags & UV_HANDLE_LISTENING ||
696 handle->flags & UV_HANDLE_READING) {
697 uv__set_artificial_error(loop, UV_EALREADY);
701 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
702 uv__set_artificial_error(loop, UV_ENOTSUP);
706 handle->flags |= UV_HANDLE_LISTENING;
707 handle->connection_cb = cb;
709 /* First pipe handle should have already been created in uv_pipe_bind */
710 assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
712 for (i = 0; i < handle->pending_instances; i++) {
713 uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
720 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
723 uv_read_t* req = (uv_read_t*) parameter;
724 uv_pipe_t* handle = (uv_pipe_t*) req->data;
725 uv_loop_t* loop = handle->loop;
728 assert(req->type == UV_READ);
729 assert(handle->type == UV_NAMED_PIPE);
731 result = ReadFile(handle->handle,
738 SET_REQ_ERROR(req, GetLastError());
741 POST_COMPLETION_FOR_REQ(loop, req);
746 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
749 uv_write_t* req = (uv_write_t*) parameter;
750 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
751 uv_loop_t* loop = handle->loop;
754 assert(req->type == UV_WRITE);
755 assert(handle->type == UV_NAMED_PIPE);
756 assert(req->write_buffer.base);
758 result = WriteFile(handle->handle,
759 req->write_buffer.base,
760 req->write_buffer.len,
765 SET_REQ_ERROR(req, GetLastError());
768 POST_COMPLETION_FOR_REQ(loop, req);
773 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
777 req = (uv_read_t*) context;
779 handle = (uv_tcp_t*)req->data;
780 assert(handle != NULL);
783 if (!PostQueuedCompletionStatus(handle->loop->iocp,
784 req->overlapped.InternalHigh,
787 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
792 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
796 req = (uv_write_t*) context;
798 handle = (uv_tcp_t*)req->handle;
799 assert(handle != NULL);
802 if (!PostQueuedCompletionStatus(handle->loop->iocp,
803 req->overlapped.InternalHigh,
806 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
811 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
815 assert(handle->flags & UV_HANDLE_READING);
816 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
818 assert(handle->handle != INVALID_HANDLE_VALUE);
820 req = &handle->read_req;
822 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
823 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
825 WT_EXECUTELONGFUNCTION)) {
826 /* Make this req pending reporting an error. */
827 SET_REQ_ERROR(req, GetLastError());
831 memset(&req->overlapped, 0, sizeof(req->overlapped));
832 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
833 req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
837 result = ReadFile(handle->handle,
843 if (!result && GetLastError() != ERROR_IO_PENDING) {
844 /* Make this req pending reporting an error. */
845 SET_REQ_ERROR(req, GetLastError());
849 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
850 if (!req->event_handle) {
851 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
852 if (!req->event_handle) {
853 uv_fatal_error(GetLastError(), "CreateEvent");
856 if (req->wait_handle == INVALID_HANDLE_VALUE) {
857 if (!RegisterWaitForSingleObject(&req->wait_handle,
858 req->overlapped.hEvent, post_completion_read_wait, (void*) req,
859 INFINITE, WT_EXECUTEINWAITTHREAD)) {
860 SET_REQ_ERROR(req, GetLastError());
867 /* Start the eof timer if there is one */
868 eof_timer_start(handle);
869 handle->flags |= UV_HANDLE_READ_PENDING;
870 handle->reqs_pending++;
874 uv_insert_pending_req(loop, (uv_req_t*)req);
875 handle->flags |= UV_HANDLE_READ_PENDING;
876 handle->reqs_pending++;
880 static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
881 uv_read_cb read_cb, uv_read2_cb read2_cb) {
882 uv_loop_t* loop = handle->loop;
884 if (!(handle->flags & UV_HANDLE_CONNECTION)) {
885 uv__set_artificial_error(loop, UV_EINVAL);
889 if (handle->flags & UV_HANDLE_READING) {
890 uv__set_artificial_error(loop, UV_EALREADY);
894 if (handle->flags & UV_HANDLE_EOF) {
895 uv__set_artificial_error(loop, UV_EOF);
899 handle->flags |= UV_HANDLE_READING;
900 handle->read_cb = read_cb;
901 handle->read2_cb = read2_cb;
902 handle->alloc_cb = alloc_cb;
904 /* If reading was stopped and then started again, there could still be a */
905 /* read request pending. */
906 if (!(handle->flags & UV_HANDLE_READ_PENDING))
907 uv_pipe_queue_read(loop, handle);
913 int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
914 uv_read_cb read_cb) {
915 return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
919 int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
920 uv_read2_cb read_cb) {
921 return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
925 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
927 req->next_req = NULL;
928 if (handle->non_overlapped_writes_tail) {
930 handle->non_overlapped_writes_tail->next_req;
931 handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
932 handle->non_overlapped_writes_tail = req;
934 req->next_req = (uv_req_t*)req;
935 handle->non_overlapped_writes_tail = req;
940 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
943 if (handle->non_overlapped_writes_tail) {
944 req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
946 if (req == handle->non_overlapped_writes_tail) {
947 handle->non_overlapped_writes_tail = NULL;
949 handle->non_overlapped_writes_tail->next_req =
961 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
962 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
964 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
966 WT_EXECUTELONGFUNCTION)) {
967 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
973 static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
974 uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
975 uv_stream_t* send_handle, uv_write_cb cb) {
977 uv_tcp_t* tcp_send_handle;
978 uv_write_t* ipc_header_req;
979 uv_ipc_frame_uv_stream ipc_frame;
981 if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) {
982 uv__set_artificial_error(loop, UV_ENOTSUP);
986 /* Only TCP server handles are supported for sharing. */
987 if (send_handle && (send_handle->type != UV_TCP ||
988 send_handle->flags & UV_HANDLE_CONNECTION)) {
989 uv__set_artificial_error(loop, UV_ENOTSUP);
993 assert(handle->handle != INVALID_HANDLE_VALUE);
995 if (!(handle->flags & UV_HANDLE_CONNECTION)) {
996 uv__set_artificial_error(loop, UV_EINVAL);
1000 if (handle->flags & UV_HANDLE_SHUTTING) {
1001 uv__set_artificial_error(loop, UV_EOF);
1005 uv_req_init(loop, (uv_req_t*) req);
1006 req->type = UV_WRITE;
1007 req->handle = (uv_stream_t*) handle;
1009 req->ipc_header = 0;
1010 req->event_handle = NULL;
1011 req->wait_handle = INVALID_HANDLE_VALUE;
1012 memset(&req->overlapped, 0, sizeof(req->overlapped));
1015 assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1016 ipc_frame.header.flags = 0;
1018 /* Use the IPC framing protocol. */
1020 tcp_send_handle = (uv_tcp_t*)send_handle;
1022 if (uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
1023 &ipc_frame.socket_info)) {
1026 ipc_frame.header.flags |= UV_IPC_UV_STREAM;
1030 ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1031 ipc_frame.header.raw_data_length = bufs[0].len;
1035 * Use the provided req if we're only doing a single write.
1036 * If we're doing multiple writes, use ipc_header_write_req to do
1037 * the first write, and then use the provided req for the second write.
1039 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1040 ipc_header_req = req;
1043 * Try to use the preallocated write req if it's available.
1044 * Otherwise allocate a new one.
1046 if (handle->ipc_header_write_req.type != UV_WRITE) {
1047 ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
1049 ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
1050 if (!ipc_header_req) {
1051 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1055 uv_req_init(loop, (uv_req_t*) ipc_header_req);
1056 ipc_header_req->type = UV_WRITE;
1057 ipc_header_req->handle = (uv_stream_t*) handle;
1058 ipc_header_req->cb = NULL;
1059 ipc_header_req->ipc_header = 1;
1062 /* Write the header or the whole frame. */
1063 memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
1065 result = WriteFile(handle->handle,
1067 ipc_frame.header.flags & UV_IPC_UV_STREAM ?
1068 sizeof(ipc_frame) : sizeof(ipc_frame.header),
1070 &ipc_header_req->overlapped);
1071 if (!result && GetLastError() != ERROR_IO_PENDING) {
1072 uv__set_sys_error(loop, GetLastError());
1077 /* Request completed immediately. */
1078 ipc_header_req->queued_bytes = 0;
1080 /* Request queued by the kernel. */
1081 ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_UV_STREAM ?
1082 sizeof(ipc_frame) : sizeof(ipc_frame.header);
1083 handle->write_queue_size += req->queued_bytes;
1086 if (handle->write_reqs_pending == 0) {
1090 handle->reqs_pending++;
1091 handle->write_reqs_pending++;
1093 /* If we don't have any raw data to write - we're done. */
1094 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1099 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1100 req->write_buffer = bufs[0];
1101 uv_insert_non_overlapped_write_req(handle, req);
1102 if (handle->write_reqs_pending == 0) {
1103 uv_queue_non_overlapped_write(handle);
1106 /* Request queued by the kernel. */
1107 req->queued_bytes = uv_count_bufs(bufs, bufcnt);
1108 handle->write_queue_size += req->queued_bytes;
1110 result = WriteFile(handle->handle,
1116 if (!result && GetLastError() != ERROR_IO_PENDING) {
1117 uv__set_sys_error(loop, GetLastError());
1122 /* Request completed immediately. */
1123 req->queued_bytes = 0;
1125 /* Request queued by the kernel. */
1126 req->queued_bytes = uv_count_bufs(bufs, bufcnt);
1127 handle->write_queue_size += req->queued_bytes;
1130 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1131 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1132 if (!req->event_handle) {
1133 uv_fatal_error(GetLastError(), "CreateEvent");
1135 if (!RegisterWaitForSingleObject(&req->wait_handle,
1136 req->overlapped.hEvent, post_completion_write_wait, (void*) req,
1137 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1138 uv__set_sys_error(loop, GetLastError());
1144 if (handle->write_reqs_pending == 0) {
1148 handle->reqs_pending++;
1149 handle->write_reqs_pending++;
1155 int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
1156 uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
1157 return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb);
1161 int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
1162 uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) {
1164 uv__set_artificial_error(loop, UV_EINVAL);
1168 return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb);
1172 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1174 /* If there is an eof timer running, we don't need it any more, */
1175 /* so discard it. */
1176 eof_timer_destroy(handle);
1178 handle->flags |= UV_HANDLE_EOF;
1179 uv_read_stop((uv_stream_t*) handle);
1181 uv__set_artificial_error(loop, UV_EOF);
1182 if (handle->read2_cb) {
1183 handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE);
1185 handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_);
1190 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1192 /* If there is an eof timer running, we don't need it any more, */
1193 /* so discard it. */
1194 eof_timer_destroy(handle);
1196 uv_read_stop((uv_stream_t*) handle);
1198 uv__set_sys_error(loop, error);
1199 if (handle->read2_cb) {
1200 handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE);
1202 handle->read_cb((uv_stream_t*)handle, -1, buf);
1207 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1208 int error, uv_buf_t buf) {
1209 if (error == ERROR_BROKEN_PIPE) {
1210 uv_pipe_read_eof(loop, handle, buf);
1212 uv_pipe_read_error(loop, handle, error, buf);
1217 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1221 uv_ipc_frame_uv_stream ipc_frame;
1223 assert(handle->type == UV_NAMED_PIPE);
1225 handle->flags &= ~UV_HANDLE_READ_PENDING;
1226 eof_timer_stop(handle);
1228 if (!REQ_SUCCESS(req)) {
1229 /* An error occurred doing the 0-read. */
1230 if (handle->flags & UV_HANDLE_READING) {
1231 uv_pipe_read_error_or_eof(loop,
1237 /* Do non-blocking reads until the buffer is empty */
1238 while (handle->flags & UV_HANDLE_READING) {
1239 if (!PeekNamedPipe(handle->handle,
1245 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1250 /* There is nothing to read after all. */
1255 /* Use the IPC framing protocol to read the incoming data. */
1256 if (handle->remaining_ipc_rawdata_bytes == 0) {
1257 /* We're reading a new frame. First, read the header. */
1258 assert(avail >= sizeof(ipc_frame.header));
1260 if (!ReadFile(handle->handle,
1262 sizeof(ipc_frame.header),
1265 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1270 assert(bytes == sizeof(ipc_frame.header));
1271 assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA));
1273 if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
1274 assert(avail - sizeof(ipc_frame.header) >=
1275 sizeof(ipc_frame.socket_info));
1277 /* Read the TCP socket info. */
1278 if (!ReadFile(handle->handle,
1279 &ipc_frame.socket_info,
1280 sizeof(ipc_frame) - sizeof(ipc_frame.header),
1283 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1288 assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1290 /* Store the pending socket info. */
1291 assert(!handle->pending_socket_info);
1292 handle->pending_socket_info =
1293 (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
1294 if (!handle->pending_socket_info) {
1295 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1298 *(handle->pending_socket_info) = ipc_frame.socket_info;
1301 if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1302 handle->remaining_ipc_rawdata_bytes =
1303 ipc_frame.header.raw_data_length;
1307 avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
1311 buf = handle->alloc_cb((uv_handle_t*) handle, avail);
1312 assert(buf.len > 0);
1314 if (ReadFile(handle->handle,
1319 /* Successful read */
1321 assert(handle->remaining_ipc_rawdata_bytes >= bytes);
1322 handle->remaining_ipc_rawdata_bytes =
1323 handle->remaining_ipc_rawdata_bytes - bytes;
1324 if (handle->read2_cb) {
1325 handle->read2_cb(handle, bytes, buf,
1326 handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
1327 } else if (handle->read_cb) {
1328 handle->read_cb((uv_stream_t*)handle, bytes, buf);
1331 if (handle->pending_socket_info) {
1332 free(handle->pending_socket_info);
1333 handle->pending_socket_info = NULL;
1336 handle->read_cb((uv_stream_t*)handle, bytes, buf);
1339 /* Read again only if bytes == buf.len */
1340 if (bytes <= buf.len) {
1344 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1349 /* Post another 0-read if still reading and not closing. */
1350 if ((handle->flags & UV_HANDLE_READING) &&
1351 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1352 uv_pipe_queue_read(loop, handle);
1356 DECREASE_PENDING_REQ_COUNT(handle);
1360 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1362 assert(handle->type == UV_NAMED_PIPE);
1364 assert(handle->write_queue_size >= req->queued_bytes);
1365 handle->write_queue_size -= req->queued_bytes;
1367 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1368 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1369 UnregisterWait(req->wait_handle);
1370 req->wait_handle = INVALID_HANDLE_VALUE;
1372 if (req->event_handle) {
1373 CloseHandle(req->event_handle);
1374 req->event_handle = NULL;
1378 if (req->ipc_header) {
1379 if (req == &handle->ipc_header_write_req) {
1380 req->type = UV_UNKNOWN_REQ;
1386 if (!REQ_SUCCESS(req)) {
1387 uv__set_sys_error(loop, GET_REQ_ERROR(req));
1388 ((uv_write_cb)req->cb)(req, -1);
1390 ((uv_write_cb)req->cb)(req, 0);
1395 handle->write_reqs_pending--;
1397 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1398 handle->non_overlapped_writes_tail) {
1399 assert(handle->write_reqs_pending > 0);
1400 uv_queue_non_overlapped_write(handle);
1403 if (handle->write_reqs_pending == 0) {
1407 if (handle->write_reqs_pending == 0 &&
1408 handle->flags & UV_HANDLE_SHUTTING) {
1409 uv_want_endgame(loop, (uv_handle_t*)handle);
1412 DECREASE_PENDING_REQ_COUNT(handle);
1416 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1417 uv_req_t* raw_req) {
1418 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1420 assert(handle->type == UV_NAMED_PIPE);
1422 if (REQ_SUCCESS(req)) {
1423 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1424 req->next_pending = handle->pending_accepts;
1425 handle->pending_accepts = req;
1427 if (handle->connection_cb) {
1428 handle->connection_cb((uv_stream_t*)handle, 0);
1431 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1432 CloseHandle(req->pipeHandle);
1433 req->pipeHandle = INVALID_HANDLE_VALUE;
1435 if (!(handle->flags & UV_HANDLE_CLOSING)) {
1436 uv_pipe_queue_accept(loop, handle, req, FALSE);
1440 DECREASE_PENDING_REQ_COUNT(handle);
1444 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1445 uv_connect_t* req) {
1446 assert(handle->type == UV_NAMED_PIPE);
1449 if (REQ_SUCCESS(req)) {
1450 uv_pipe_connection_init(handle);
1451 ((uv_connect_cb)req->cb)(req, 0);
1453 uv__set_sys_error(loop, GET_REQ_ERROR(req));
1454 ((uv_connect_cb)req->cb)(req, -1);
1458 DECREASE_PENDING_REQ_COUNT(handle);
1462 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1463 uv_shutdown_t* req) {
1464 assert(handle->type == UV_NAMED_PIPE);
1466 /* Initialize and optionally start the eof timer. */
1467 /* This makes no sense if we've already seen EOF. */
1468 if (!(handle->flags & UV_HANDLE_EOF)) {
1469 eof_timer_init(handle);
1471 /* If reading start the timer right now. */
1472 /* Otherwise uv_pipe_queue_read will start it. */
1473 if (handle->flags & UV_HANDLE_READ_PENDING) {
1474 eof_timer_start(handle);
1482 DECREASE_PENDING_REQ_COUNT(handle);
1486 static void eof_timer_init(uv_pipe_t* pipe) {
1489 assert(pipe->eof_timer == NULL);
1490 assert(pipe->flags & UV_HANDLE_CONNECTION);
1492 pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
1494 r = uv_timer_init(pipe->loop, pipe->eof_timer);
1495 assert(r == 0); /* timers can't fail */
1496 pipe->eof_timer->data = pipe;
1500 static void eof_timer_start(uv_pipe_t* pipe) {
1501 assert(pipe->flags & UV_HANDLE_CONNECTION);
1503 if (pipe->eof_timer != NULL) {
1504 uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
1509 static void eof_timer_stop(uv_pipe_t* pipe) {
1510 assert(pipe->flags & UV_HANDLE_CONNECTION);
1512 if (pipe->eof_timer != NULL) {
1513 uv_timer_stop(pipe->eof_timer);
1518 static void eof_timer_cb(uv_timer_t* timer, int status) {
1519 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1520 uv_loop_t* loop = timer->loop;
1522 assert(status == 0); /* timers can't fail */
1523 assert(pipe->type == UV_NAMED_PIPE);
1525 /* This should always be true, since we start the timer only */
1526 /* in uv_pipe_queue_read after successfully calling ReadFile, */
1527 /* or in uv_process_pipe_shutdown_req if a read is pending, */
1528 /* and we always immediately stop the timer in */
1529 /* uv_process_pipe_read_req. */
1530 assert(pipe->flags & UV_HANDLE_READ_PENDING) ;
1532 /* If there are many packets coming off the iocp then the timer callback */
1533 /* may be called before the read request is coming off the queue. */
1534 /* Therefore we check here if the read request has completed but will */
1535 /* be processed later. */
1536 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1537 HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
1541 /* Force both ends off the pipe. */
1542 CloseHandle(pipe->handle);
1543 pipe->handle = INVALID_HANDLE_VALUE;
1545 /* Stop reading, so the pending read that is going to fail will */
1546 /* not be reported to the user. */
1547 uv_read_stop((uv_stream_t*) pipe);
1549 /* Report the eof and update flags. This will get reported even if the */
1550 /* user stopped reading in the meantime. TODO: is that okay? */
1551 uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1555 static void eof_timer_destroy(uv_pipe_t* pipe) {
1556 assert(pipe->flags && UV_HANDLE_CONNECTION);
1558 if (pipe->eof_timer) {
1559 uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
1560 pipe->eof_timer = NULL;
1565 static void eof_timer_close_cb(uv_handle_t* handle) {
1566 assert(handle->type == UV_TIMER);
1571 void uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1572 HANDLE os_handle = (HANDLE)_get_osfhandle(file);
1574 if (os_handle == INVALID_HANDLE_VALUE ||
1575 uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) {
1579 uv_pipe_connection_init(pipe);
1580 pipe->handle = os_handle;
1583 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1584 pipe->ipc_pid = uv_parent_pid();
1585 assert(pipe->ipc_pid != -1);