handle->reqs_pending = 0;
handle->pending_accepts = NULL;
handle->name = NULL;
+ handle->handle = INVALID_HANDLE_VALUE;
uv_counters()->pipe_init++;
/* Creates a pipe server. */
-/* TODO: make this work with UTF8 name */
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
- int i;
+ int i, errno, nameSize;
uv_pipe_accept_t* req;
- if (!name) {
+ if (handle->flags & UV_HANDLE_BOUND) {
uv_set_sys_error(WSAEINVAL);
return -1;
}
- /* Make our own copy of the pipe name */
- handle->name = _strdup(name);
- if (!handle->name) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ if (!name) {
+ uv_set_sys_error(WSAEINVAL);
+ return -1;
}
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
req->next_pending = NULL;
}
+ /* Convert name to UTF16. */
+ nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(wchar_t);
+ handle->name = (wchar_t*)malloc(nameSize);
+ if (!handle->name) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(wchar_t))) {
+ uv_set_sys_error(GetLastError());
+ return -1;
+ }
+
+ /*
+ * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
+ * If this fails then there's already a pipe server for the given pipe name.
+ */
+ handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ NULL);
+
+ if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
+ errno = GetLastError();
+ if (errno == ERROR_ACCESS_DENIED) {
+ uv_set_error(UV_EADDRINUSE, errno);
+ handle->error = LOOP->last_error;
+ handle->flags |= UV_HANDLE_BIND_ERROR;
+ } else if (errno == ERROR_PATH_NOT_FOUND || errno == ERROR_INVALID_NAME) {
+ uv_set_error(UV_EADDRNOTAVAIL, errno);
+ } else {
+ uv_set_sys_error(errno);
+ }
+ goto error;
+ }
+
+ if (uv_set_pipe_handle(handle, handle->accept_reqs[0].pipeHandle)) {
+ uv_set_sys_error(GetLastError());
+ goto error;
+ }
+
handle->flags |= UV_HANDLE_PIPESERVER;
+ handle->flags |= UV_HANDLE_BOUND;
+
return 0;
+
+error:
+ if (handle->name) {
+ free(handle->name);
+ handle->name = NULL;
+ }
+
+ if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(handle->accept_reqs[0].pipeHandle);
+ handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
+ }
+
+ return -1;
}
assert(handle);
/* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait for the pipe to become available with WaitNamedPipe. */
- while (WaitNamedPipe(handle->name, 30000)) {
+ while (WaitNamedPipeW(handle->name, 30000)) {
/* The pipe is now available, try to connect. */
- pipeHandle = CreateFile(handle->name,
+ pipeHandle = CreateFileW(handle->name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
}
-/* TODO: make this work with UTF8 name */
int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
- int errno;
+ int errno, nameSize;
HANDLE pipeHandle;
handle->handle = INVALID_HANDLE_VALUE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
- pipeHandle = CreateFile(name,
+ /* Convert name to UTF16. */
+ nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(wchar_t);
+ handle->name = (wchar_t*)malloc(nameSize);
+ if (!handle->name) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(wchar_t))) {
+ errno = GetLastError();
+ goto error;
+ }
+
+ pipeHandle = CreateFileW(handle->name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
if (pipeHandle == INVALID_HANDLE_VALUE) {
if (GetLastError() == ERROR_PIPE_BUSY) {
/* Wait for the server to make a pipe instance available. */
- handle->name = _strdup(name);
- if (!handle->name) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
-
if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) {
errno = GetLastError();
goto error;
return 0;
error:
+ if (handle->name) {
+ free(handle->name);
+ handle->name = NULL;
+ }
+
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
}
if (handle->name) {
free(handle->name);
- handle->name;
+ handle->name = NULL;
}
if (handle->flags & UV_HANDLE_PIPESERVER) {
pipeHandle = handle->accept_reqs[i].pipeHandle;
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
+ handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
}
}
} else if (handle->handle != INVALID_HANDLE_VALUE) {
CloseHandle(handle->handle);
+ handle->handle = INVALID_HANDLE_VALUE;
}
handle->flags |= UV_HANDLE_SHUT;
}
-static void uv_pipe_queue_accept(uv_pipe_t* handle, uv_pipe_accept_t* req) {
- HANDLE pipeHandle;
-
+static void uv_pipe_queue_accept(uv_pipe_t* handle, uv_pipe_accept_t* req, BOOL firstInstance) {
assert(handle->flags & UV_HANDLE_LISTENING);
- assert(req->pipeHandle == INVALID_HANDLE_VALUE);
- pipeHandle = CreateNamedPipe(handle->name,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
- PIPE_UNLIMITED_INSTANCES,
- 65536,
- 65536,
- 0,
- NULL);
+ if (!firstInstance) {
+ assert(req->pipeHandle == INVALID_HANDLE_VALUE);
- if (pipeHandle == INVALID_HANDLE_VALUE) {
- req->error = uv_new_sys_error(GetLastError());
- uv_insert_pending_req((uv_req_t*) req);
- handle->reqs_pending++;
- return;
- }
+ req->pipeHandle = CreateNamedPipeW(handle->name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ NULL);
- if (CreateIoCompletionPort(pipeHandle,
- LOOP->iocp,
- (ULONG_PTR)handle,
- 0) == NULL) {
- req->error = uv_new_sys_error(GetLastError());
- uv_insert_pending_req((uv_req_t*) req);
- handle->reqs_pending++;
- return;
+ if (req->pipeHandle == INVALID_HANDLE_VALUE) {
+ req->error = uv_new_sys_error(GetLastError());
+ uv_insert_pending_req((uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
+
+ if (uv_set_pipe_handle(handle, req->pipeHandle)) {
+ CloseHandle(req->pipeHandle);
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ req->error = uv_new_sys_error(GetLastError());
+ uv_insert_pending_req((uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
}
+ assert(req->pipeHandle != INVALID_HANDLE_VALUE);
+
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
- if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING) {
+ if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING) {
if (GetLastError() == ERROR_PIPE_CONNECTED) {
- req->pipeHandle = pipeHandle;
req->error = uv_ok_;
} else {
+ CloseHandle(req->pipeHandle);
+ req->pipeHandle = INVALID_HANDLE_VALUE;
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(GetLastError());
}
return;
}
- req->pipeHandle = pipeHandle;
handle->reqs_pending++;
}
req->pipeHandle = INVALID_HANDLE_VALUE;
if (!(server->flags & UV_HANDLE_CLOSING)) {
- uv_pipe_queue_accept(server, req);
+ uv_pipe_queue_accept(server, req, FALSE);
}
return 0;
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
int i, errno;
+ if (handle->flags & UV_HANDLE_BIND_ERROR) {
+ LOOP->last_error = handle->error;
+ return -1;
+ }
+
+ if (!(handle->flags & UV_HANDLE_BOUND)) {
+ uv_set_error(UV_ENOTCONN, 0);
+ return -1;
+ }
+
if (handle->flags & UV_HANDLE_LISTENING ||
handle->flags & UV_HANDLE_READING) {
- uv_set_sys_error(UV_EALREADY);
+ uv_set_error(UV_EALREADY, 0);
return -1;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
- uv_set_sys_error(UV_ENOTSUP);
+ uv_set_error(UV_ENOTSUP, 0);
return -1;
}
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
+ /* First pipe handle should have already been created in uv_pipe_bind */
+ assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
+
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
- uv_pipe_queue_accept(handle, &handle->accept_reqs[i]);
+ uv_pipe_queue_accept(handle, &handle->accept_reqs[i], i == 0);
}
return 0;
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
- uv_set_sys_error(UV_EINVAL);
+ uv_set_error(UV_EINVAL, 0);
return -1;
}
if (handle->flags & UV_HANDLE_READING) {
- uv_set_sys_error(UV_EALREADY);
+ uv_set_error(UV_EALREADY, 0);
return -1;
}
if (handle->flags & UV_HANDLE_EOF) {
- uv_set_sys_error(UV_EOF);
+ uv_set_error(UV_EOF, 0);
return -1;
}
int result;
if (bufcnt != 1) {
- uv_set_sys_error(UV_ENOTSUP);
+ uv_set_error(UV_ENOTSUP, 0);
return -1;
}
assert(handle->handle != INVALID_HANDLE_VALUE);
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
- uv_set_sys_error(UV_EINVAL);
+ uv_set_error(UV_EINVAL, 0);
return -1;
}
if (handle->flags & UV_HANDLE_SHUTTING) {
- uv_set_sys_error(UV_EOF);
+ uv_set_error(UV_EOF, 0);
return -1;
}
void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
- DWORD bytes, avail, err, mode;
+ DWORD bytes, avail;
uv_buf_t buf;
assert(handle->type == UV_NAMED_PIPE);
req->pipeHandle = INVALID_HANDLE_VALUE;
}
if (!(handle->flags & UV_HANDLE_CLOSING)) {
- uv_pipe_queue_accept(handle, req);
+ uv_pipe_queue_accept(handle, req, FALSE);
}
}
--- /dev/null
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "task.h"
+
+
+static uv_timer_t timer_handle;
+static uv_idle_t idle_handle;
+
+static int idle_cb_called = 0;
+static int timer_cb_called = 0;
+static int close_cb_called = 0;
+
+
+static void close_cb(uv_handle_t* handle) {
+ close_cb_called++;
+}
+
+
+static void timer_cb(uv_timer_t* handle, int status) {
+ ASSERT(handle == &timer_handle);
+ ASSERT(status == 0);
+
+ uv_close((uv_handle_t*) &idle_handle, close_cb);
+ uv_close((uv_handle_t*) &timer_handle, close_cb);
+
+ timer_cb_called++;
+ LOGF("timer_cb %d\n", timer_cb_called);
+}
+
+
+static void idle_cb(uv_idle_t* handle, int status) {
+ ASSERT(handle == &idle_handle);
+ ASSERT(status == 0);
+
+ idle_cb_called++;
+ LOGF("idle_cb %d\n", idle_cb_called);
+}
+
+
+TEST_IMPL(idle_starvation) {
+ int r;
+
+ uv_init();
+
+ r = uv_idle_init(&idle_handle);
+ ASSERT(r == 0);
+ r = uv_idle_start(&idle_handle, idle_cb);
+ ASSERT(r == 0);
+
+ r = uv_timer_init(&timer_handle);
+ ASSERT(r == 0);
+ r = uv_timer_start(&timer_handle, timer_cb, 50, 0);
+ ASSERT(r == 0);
+
+ r = uv_run();
+ ASSERT(r == 0);
+
+ ASSERT(idle_cb_called > 0);
+ ASSERT(timer_cb_called == 1);
+ ASSERT(close_cb_called == 2);
+
+ return 0;
+}