#define UV_REQ_BUFSML_SIZE (4)
-#define UV_REQ_PRIVATE_FIELDS \
- int write_index; \
- ev_timer timer; \
+#define UV_REQ_PRIVATE_FIELDS /* empty */
+
+#define UV_WRITE_PRIVATE_FIELDS \
ngx_queue_t queue; \
+ int write_index; \
uv_buf_t* bufs; \
int bufcnt; \
uv_buf_t bufsml[UV_REQ_BUFSML_SIZE];
+#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */
+
+#define UV_CONNECT_PRIVATE_FIELDS \
+ ngx_queue_t queue;
+
/* TODO: union or classes please! */
#define UV_HANDLE_PRIVATE_FIELDS \
int delayed_error; \
uv_connection_cb connection_cb; \
int accepted_fd; \
- uv_req_t *connect_req; \
- uv_req_t *shutdown_req; \
+ uv_connect_t *connect_req; \
+ uv_shutdown_t *shutdown_req; \
ev_io read_watcher; \
ev_io write_watcher; \
ngx_queue_t write_queue; \
char* base;
} uv_buf_t;
-/*
- * Private uv_pipe_instance state.
- */
-typedef enum {
- UV_PIPEINSTANCE_CONNECTED = 0,
- UV_PIPEINSTANCE_DISCONNECTED,
- UV_PIPEINSTANCE_ACTIVE
-} uv_pipeinstance_state;
-
-/* Used to store active pipe instances inside a linked list. */
-typedef struct uv_pipe_instance_s {
- HANDLE handle;
- uv_pipeinstance_state state;
- struct uv_pipe_instance_s* next;
-} uv_pipe_instance_t;
-
#define UV_REQ_PRIVATE_FIELDS \
union { \
/* Used by I/O operations */ \
size_t queued_bytes; \
}; \
}; \
- int flags; \
uv_err_t error; \
struct uv_req_s* next_req;
+#define UV_WRITE_PRIVATE_FIELDS \
+ /* empty */
+
+#define UV_CONNECT_PRIVATE_FIELDS \
+ /* empty */
+
+#define UV_SHUTDOWN_PRIVATE_FIELDS \
+ /* empty */
+
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
- uv_req_t* shutdown_req;
+ uv_shutdown_t* shutdown_req;
#define uv_stream_server_fields \
uv_connection_cb connection_cb;
unsigned int reqs_pending; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
- struct uv_req_s read_req; \
+ uv_req_t read_req; \
union { \
struct { uv_stream_connection_fields }; \
struct { uv_stream_server_fields }; \
}; \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
- struct uv_req_s accept_req;
+ struct uv_req_s accept_req; \
#define uv_pipe_server_fields \
char* name; \
- uv_pipe_instance_t* connections; \
- struct uv_req_s accept_reqs[4];
+ struct uv_pipe_accept_s { \
+ UV_REQ_FIELDS \
+ HANDLE pipeHandle; \
+ struct uv_pipe_accept_s* next_pending; \
+ } accept_reqs[4]; \
+ struct uv_pipe_accept_s* pending_accepts;
#define uv_pipe_connection_fields \
- uv_pipe_t* server; \
- uv_pipe_instance_t* connection; \
- uv_pipe_instance_t clientConnection;
+ HANDLE handle;
#define UV_PIPE_PRIVATE_FIELDS \
union { \
#define UV_ASYNC_PRIVATE_FIELDS \
struct uv_req_s async_req; \
+ uv_async_cb async_cb; \
/* char to avoid alignment issues */ \
char volatile async_sent;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
-typedef struct uv_req_s uv_req_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
-
+/* Request types */
+typedef struct uv_req_s uv_req_t;
+typedef struct uv_shutdown_s uv_shutdown_t;
+typedef struct uv_write_s uv_write_t;
+typedef struct uv_connect_s uv_connect_t;
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "uv-unix.h"
*/
typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size);
typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf);
-typedef void (*uv_write_cb)(uv_req_t* req, int status);
-typedef void (*uv_connect_cb)(uv_req_t* req, int status);
-typedef void (*uv_shutdown_cb)(uv_req_t* req, int status);
+typedef void (*uv_write_cb)(uv_write_t* req, int status);
+typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
+typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
typedef void (*uv_connection_cb)(uv_handle_t* server, int status);
typedef void (*uv_close_cb)(uv_handle_t* handle);
typedef void (*uv_timer_cb)(uv_timer_t* handle, int status);
};
-struct uv_req_s {
- /* read-only */
- uv_req_type type;
- /* public */
- uv_handle_t* handle;
- void *(*cb)(void *);
- void* data;
- /* private */
+#define UV_REQ_FIELDS \
+ /* read-only */ \
+ uv_req_type type; \
+ /* public */ \
+ void* data; \
+ /* private */ \
UV_REQ_PRIVATE_FIELDS
+
+/* Abstract base class of all requests. */
+struct uv_req_s {
+ UV_REQ_FIELDS
};
+
/*
- * Initialize a request for use with uv_write, uv_shutdown, or uv_connect.
+ * Shutdown the outgoing (write) side of a duplex stream. It waits for
+ * pending write requests to complete. The handle should refer to a
+ * initialized stream. req should be an uninitalized shutdown request
+ * struct. The cb is a called after shutdown is complete.
*/
-void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *));
+struct uv_shutdown_s {
+ UV_REQ_FIELDS
+ uv_stream_t* handle;
+ uv_shutdown_cb cb;
+ UV_SHUTDOWN_PRIVATE_FIELDS
+};
-int uv_shutdown(uv_req_t* req);
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb);
#define UV_HANDLE_FIELDS \
int uv_read_stop(uv_stream_t*);
-/* Write data to stream. Buffers are written in order. Example:
+/*
+ * Write data to stream. Buffers are written in order. Example:
*
* uv_buf_t a[] = {
* { .base = "1", .len = 1 },
* uv_write(req, b, 2);
*
*/
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt);
+struct uv_write_s {
+ UV_REQ_FIELDS
+ uv_write_cb cb;
+ uv_stream_t* handle;
+ UV_WRITE_PRIVATE_FIELDS
+};
+
+int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+ uv_write_cb cb);
/*
int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);
-int uv_tcp_connect(uv_req_t* req, struct sockaddr_in);
-int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6);
+/*
+ * uv_tcp_connect, uv_tcp_connect6
+ * These functions establish IPv4 and IPv6 TCP connections. Provide an
+ * initialized TCP handle and an uninitialized uv_connect_t*. The callback
+ * will be made when the connection is estabished.
+ */
+struct uv_connect_s {
+ UV_REQ_FIELDS
+ uv_connect_cb cb;
+ uv_stream_t* handle;
+ UV_CONNECT_PRIVATE_FIELDS
+};
+
+int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
+ struct sockaddr_in address, uv_connect_cb cb);
+int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
+ struct sockaddr_in6 address, uv_connect_cb cb);
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
/*
* A subclass of uv_stream_t representing a pipe stream or pipe server.
*/
-struct uv_pipe_s {
- UV_HANDLE_FIELDS
- UV_STREAM_FIELDS
- UV_PIPE_PRIVATE_FIELDS
+struct uv_pipe_s {
+ UV_HANDLE_FIELDS
+ UV_STREAM_FIELDS
+ UV_PIPE_PRIVATE_FIELDS
};
int uv_pipe_init(uv_pipe_t* handle);
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb);
-int uv_pipe_connect(uv_req_t* req, const char* name);
+int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+ const char* name, uv_connect_cb cb);
/*
extern uint64_t uv_hrtime(void);
-/* the presence of this union forces similar struct layout */
+/* the presence of these unions force similar struct layout */
union uv_any_handle {
uv_tcp_t tcp;
uv_pipe_t pipe;
uv_getaddrinfo_t getaddrinfo;
};
+union uv_any_req {
+ uv_req_t req;
+ uv_write_t write;
+ uv_connect_t connect;
+ uv_shutdown_t shutdown;
+};
+
+
/* Diagnostic counters */
typedef struct {
uint64_t req_init;
static struct uv_ares_data_s ares_data;
+void uv__req_init(uv_req_t*);
void uv__tcp_io(EV_P_ ev_io* watcher, int revents);
void uv__next(EV_P_ ev_idle* watcher, int revents);
static void uv__tcp_connect(uv_tcp_t*);
}
-uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) {
+uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) {
ngx_queue_t* q;
- uv_req_t* req;
+ uv_write_t* req;
if (ngx_queue_empty(&tcp->write_queue)) {
return NULL;
return NULL;
}
- req = ngx_queue_data(q, struct uv_req_s, queue);
+ req = ngx_queue_data(q, struct uv_write_s, queue);
assert(req);
return req;
static void uv__drain(uv_tcp_t* tcp) {
- uv_req_t* req;
- uv_shutdown_cb cb;
+ uv_shutdown_t* req;
assert(!uv_write_queue_head(tcp));
assert(tcp->write_queue_size == 0);
assert(tcp->shutdown_req);
req = tcp->shutdown_req;
- cb = (uv_shutdown_cb)req->cb;
if (shutdown(tcp->fd, SHUT_WR)) {
/* Error. Report it. User should call uv_close(). */
uv_err_new((uv_handle_t*)tcp, errno);
- if (cb) cb(req, -1);
+ if (req->cb) {
+ req->cb(req, -1);
+ }
} else {
uv_err_new((uv_handle_t*)tcp, 0);
uv_flag_set((uv_handle_t*)tcp, UV_SHUT);
- if (cb) cb(req, 0);
+ if (req->cb) {
+ req->cb(req, 0);
+ }
}
}
}
/* On success returns NULL. On error returns a pointer to the write request
* which had the error.
*/
-static uv_req_t* uv__write(uv_tcp_t* tcp) {
- uv_req_t* req;
+static uv_write_t* uv__write(uv_tcp_t* tcp) {
+ uv_write_t* req;
struct iovec* iov;
int iovcnt;
ssize_t n;
return NULL;
}
- assert(req->handle == (uv_handle_t*)tcp);
+ assert(req->handle == (uv_stream_t*)tcp);
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
static void uv__write_callbacks(uv_tcp_t* tcp) {
- uv_write_cb cb;
int callbacks_made = 0;
ngx_queue_t* q;
- uv_req_t* req;
+ uv_write_t* req;
while (!ngx_queue_empty(&tcp->write_completed_queue)) {
/* Pop a req off write_completed_queue. */
q = ngx_queue_head(&tcp->write_completed_queue);
assert(q);
- req = ngx_queue_data(q, struct uv_req_s, queue);
+ req = ngx_queue_data(q, struct uv_write_s, queue);
ngx_queue_remove(q);
- cb = (uv_write_cb) req->cb;
-
/* NOTE: call callback AFTER freeing the request data. */
- if (cb) {
- cb(req, 0);
+ if (req->cb) {
+ req->cb(req, 0);
}
callbacks_made++;
}
-int uv_shutdown(uv_req_t* req) {
- uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
+ uv_tcp_t* tcp = (uv_tcp_t*)handle;
+ assert(handle->type == UV_TCP &&
+ "uv_shutdown (unix) only supports uv_tcp_t right now");
assert(tcp->fd >= 0);
- assert(tcp->type == UV_TCP);
+
+ /* Initialize request */
+ uv__req_init((uv_req_t*)req);
+ req->handle = handle;
+ req->cb = cb;
if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) ||
uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) ||
void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
uv_tcp_t* tcp = watcher->data;
+
+ assert(tcp->type == UV_TCP);
assert(watcher == &tcp->read_watcher ||
watcher == &tcp->write_watcher);
-
assert(tcp->fd >= 0);
-
assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
if (tcp->connect_req) {
}
if (revents & EV_WRITE) {
- uv_req_t* req = uv__write(tcp);
+ uv_write_t* req = uv__write(tcp);
if (req) {
/* Error. Notify the user. */
- uv_write_cb cb = (uv_write_cb) req->cb;
-
- if (cb) {
- cb(req, -1);
+ if (req->cb) {
+ req->cb(req, -1);
}
} else {
uv__write_callbacks(tcp);
*/
static void uv__tcp_connect(uv_tcp_t* tcp) {
int error;
- uv_req_t* req;
- uv_connect_cb connect_cb;
+ uv_connect_t* req = tcp->connect_req;
socklen_t errorsize = sizeof(int);
+ assert(tcp->type == UV_TCP);
assert(tcp->fd >= 0);
-
- req = tcp->connect_req;
assert(req);
if (tcp->delayed_error) {
/* Successful connection */
tcp->connect_req = NULL;
- connect_cb = (uv_connect_cb) req->cb;
- if (connect_cb) {
- connect_cb(req, 0);
+ if (req->cb) {
+ req->cb(req, 0);
}
} else if (error == EINPROGRESS) {
uv_err_new((uv_handle_t*)tcp, error);
tcp->connect_req = NULL;
-
- connect_cb = (uv_connect_cb) req->cb;
- if (connect_cb) {
- connect_cb(req, -1);
+ if (req->cb) {
+ req->cb(req, -1);
}
}
}
-static int uv__connect(uv_req_t* req, struct sockaddr* addr,
- socklen_t addrlen) {
- uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
+static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
+ socklen_t addrlen, uv_connect_cb cb) {
int r;
if (tcp->fd <= 0) {
}
}
+ uv__req_init((uv_req_t*)req);
+ req->cb = cb;
+ req->handle = (uv_stream_t*)tcp;
req->type = UV_CONNECT;
ngx_queue_init(&req->queue);
}
-int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
- assert(addr.sin_family == AF_INET);
- return uv__connect(req, (struct sockaddr*) &addr,
- sizeof(struct sockaddr_in));
+int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
+ struct sockaddr_in address, uv_connect_cb cb) {
+ assert(handle->type == UV_TCP);
+ assert(address.sin_family == AF_INET);
+ return uv__connect(req, handle, (struct sockaddr*) &address,
+ sizeof(struct sockaddr_in), cb);
}
-int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
- assert(addr.sin6_family == AF_INET6);
- return uv__connect(req, (struct sockaddr*) &addr,
- sizeof(struct sockaddr_in6));
+int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
+ struct sockaddr_in6 address, uv_connect_cb cb) {
+ assert(handle->type == UV_TCP);
+ assert(address.sin6_family == AF_INET6);
+ return uv__connect(req, handle, (struct sockaddr*) &address,
+ sizeof(struct sockaddr_in6), cb);
}
/* The buffers to be written must remain valid until the callback is called.
* This is not required for the uv_buf_t array.
*/
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
- uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
- int empty_queue = (tcp->write_queue_size == 0);
+int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+ uv_write_cb cb) {
+ int empty_queue;
+ uv_tcp_t* tcp = (uv_tcp_t*)handle;
+
+ /* Initialize the req */
+ uv__req_init((uv_req_t*) req);
+ req->cb = cb;
+ req->handle = handle;
+ ngx_queue_init(&req->queue);
+
+ assert(handle->type == UV_TCP &&
+ "uv_write (unix) does not yet support other types of streams");
+
+ empty_queue = (tcp->write_queue_size == 0);
assert(tcp->fd >= 0);
ngx_queue_init(&req->queue);
}
-void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) {
+void uv__req_init(uv_req_t* req) {
uv_counters()->req_init++;
req->type = UV_UNKNOWN_REQ;
- req->cb = cb;
- req->handle = handle;
- ngx_queue_init(&req->queue);
+ req->data = NULL;
}
}
-int uv_pipe_connect(uv_req_t* req, const char* name) {
+int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+ const char* name, uv_connect_cb cb) {
assert(0 && "implement me");
}
#include <limits.h>
#include <malloc.h>
#include <stdio.h>
+#include <string.h>
#include "uv.h"
#include "uv-common.h"
#define UV_HANDLE_BIND_ERROR 0x1000
#define UV_HANDLE_IPV6 0x2000
#define UV_HANDLE_PIPESERVER 0x4000
-
-/*
- * Private uv_req flags.
- */
-/* The request is currently queued. */
-#define UV_REQ_PENDING 0x01
+#define UV_HANDLE_READ_PENDING 0x8000
/* Binary tree used to keep the list of timers sorted. */
}
-void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) {
+static void uv_req_init(uv_req_t* req) {
uv_counters()->req_init++;
req->type = UV_UNKNOWN_REQ;
- req->flags = 0;
- req->handle = handle;
- req->cb = cb;
}
static void uv_init_connection(uv_stream_t* handle) {
handle->flags |= UV_HANDLE_CONNECTION;
handle->write_reqs_pending = 0;
- uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL);
+
+ uv_req_init((uv_req_t*) &(handle->read_req));
+ handle->read_req.type = UV_READ;
+ handle->read_req.data = handle;
}
err = uv_new_sys_error(WSAGetLastError());
}
if (handle->shutdown_req->cb) {
- handle->shutdown_req->flags &= ~UV_REQ_PENDING;
if (status == -1) {
uv_last_error_ = err;
}
- ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status);
+ handle->shutdown_req->cb(handle->shutdown_req, status);
}
handle->reqs_pending--;
}
close_pipe(handle, &status, &err);
if (handle->shutdown_req->cb) {
- handle->shutdown_req->flags &= ~UV_REQ_PENDING;
if (status == -1) {
uv_last_error_ = err;
}
- ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status);
+ handle->shutdown_req->cb(handle->shutdown_req, status);
}
handle->reqs_pending--;
}
/* Prepare the uv_req structure. */
req = &handle->accept_req;
- assert(!(req->flags & UV_REQ_PENDING));
- req->type = UV_ACCEPT;
- req->flags |= UV_REQ_PENDING;
/* choose family and extension function */
if ((handle->flags & UV_HANDLE_IPV6) != 0) {
handle->accept_socket = accept_socket;
handle->reqs_pending++;
- req->flags |= UV_REQ_PENDING;
}
-static void uv_pipe_queue_accept(uv_pipe_t* handle) {
- uv_req_t* req;
+static void uv_pipe_queue_accept(uv_pipe_t* handle, struct uv_pipe_accept_s* req) {
HANDLE pipeHandle;
- int i;
assert(handle->flags & UV_HANDLE_LISTENING);
+ assert(req->pipeHandle == INVALID_HANDLE_VALUE);
+
+ pipeHandle = CreateNamedPipe(handle->name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ NULL);
+
+ if (pipeHandle == INVALID_HANDLE_VALUE) {
+ req->error = uv_new_sys_error(GetLastError());
+ uv_insert_pending_req((uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
- for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
- req = &handle->accept_reqs[i];
- if (!(req->flags & UV_REQ_PENDING)) {
- pipeHandle = CreateNamedPipe(handle->name,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
- PIPE_UNLIMITED_INSTANCES,
- 65536,
- 65536,
- 0,
- NULL);
-
- if (pipeHandle == INVALID_HANDLE_VALUE) {
- continue;
- }
-
- if (CreateIoCompletionPort(pipeHandle,
- uv_iocp_,
- (ULONG_PTR)handle,
- 0) == NULL) {
- continue;
- }
-
- /* Prepare the overlapped structure. */
- memset(&(req->overlapped), 0, sizeof(req->overlapped));
+ if (CreateIoCompletionPort(pipeHandle,
+ uv_iocp_,
+ (ULONG_PTR)handle,
+ 0) == NULL) {
+ req->error = uv_new_sys_error(GetLastError());
+ uv_insert_pending_req((uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
- if (!ConnectNamedPipe(pipeHandle, &req->overlapped) &&
- GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
- /* Make this req pending reporting an error. */
- req->error = uv_new_sys_error(GetLastError());
- uv_insert_pending_req(req);
- handle->reqs_pending++;
- continue;
- }
+ /* Prepare the overlapped structure. */
+ memset(&(req->overlapped), 0, sizeof(req->overlapped));
- req->data = pipeHandle;
- req->flags |= UV_REQ_PENDING;
- handle->reqs_pending++;
- }
+ if (!ConnectNamedPipe(pipeHandle, &req->overlapped) &&
+ GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
+ /* Make this req pending reporting an error. */
+ req->error = uv_new_sys_error(GetLastError());
+ uv_insert_pending_req((uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
}
+
+ req->pipeHandle = pipeHandle;
+ handle->reqs_pending++;
}
DWORD bytes, flags;
assert(handle->flags & UV_HANDLE_READING);
+ assert(!(handle->flags & UV_HANDLE_READ_PENDING));
req = &handle->read_req;
- assert(!(req->flags & UV_REQ_PENDING));
memset(&req->overlapped, 0, sizeof(req->overlapped));
- req->type = UV_READ;
buf.base = (char*) &uv_zero_;
buf.len = 0;
return;
}
- req->flags |= UV_REQ_PENDING;
+ handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
}
int result;
assert(handle->flags & UV_HANDLE_READING);
- assert(handle->connection);
- assert(handle->connection->handle != INVALID_HANDLE_VALUE);
+ assert(!(handle->flags & UV_HANDLE_READ_PENDING));
+
+ assert(handle->handle != INVALID_HANDLE_VALUE);
req = &handle->read_req;
- assert(!(req->flags & UV_REQ_PENDING));
memset(&req->overlapped, 0, sizeof(req->overlapped));
- req->type = UV_READ;
/* Do 0-read */
- result = ReadFile(handle->connection->handle,
+ result = ReadFile(handle->handle,
&uv_zero_,
0,
NULL,
return;
}
- req->flags |= UV_REQ_PENDING;
+ handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
}
return -1;
}
+ if (!(handle->flags & UV_HANDLE_BOUND) &&
+ uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
+ return -1;
+
if (listen(handle->socket, backlog) == SOCKET_ERROR) {
uv_set_sys_error(WSAGetLastError());
return -1;
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
- uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL);
+ uv_req_init(&(handle->accept_req));
+ handle->accept_req.type = UV_ACCEPT;
+ handle->accept_req.data = handle;
uv_tcp_queue_accept(handle);
return 0;
static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
- uv_pipe_instance_t* connection = server->connections;
-
/* Find a connection instance that has been connected, but not yet accepted. */
- while (connection) {
- if (connection->state == UV_PIPEINSTANCE_CONNECTED) {
- break;
- }
+ struct uv_pipe_accept_s* req = server->pending_accepts;
- connection = connection->next;
- }
-
- if (!connection) {
+ if (!req) {
/* No valid connections found, so we error out. */
- uv_set_sys_error(UV_ENOTCONN);
+ uv_set_sys_error(WSAEWOULDBLOCK);
return -1;
}
- /* Make the connection instance active */
- connection->state = UV_PIPEINSTANCE_ACTIVE;
-
- /* Assign the connection to the client. */
- client->connection = connection;
- client->server = server;
+ /* Initialize the client handle and copy the pipeHandle to the client */
+ uv_pipe_init(client);
+ uv_init_connection((uv_stream_t*) client);
+ client->handle = req->pipeHandle;
- uv_init_connection((uv_stream_t*)client);
- client->flags |= UV_HANDLE_PIPESERVER;
- uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL);
+ /* Prepare the req to pick up a new connection */
+ server->pending_accepts = req->next_pending;
+ req->next_pending = NULL;
+ req->pipeHandle = INVALID_HANDLE_VALUE;
if (!(server->flags & UV_HANDLE_CLOSING)) {
- uv_pipe_queue_accept(server);
+ uv_pipe_queue_accept(server, req);
}
return 0;
/* If reading was stopped and then started again, there could stell be a */
/* read request pending. */
- if (!(handle->read_req.flags & UV_REQ_PENDING))
+ if (!(handle->flags & UV_HANDLE_READ_PENDING))
uv_tcp_queue_read(handle);
return 0;
/* If reading was stopped and then started again, there could stell be a */
/* read request pending. */
- if (!(handle->read_req.flags & UV_REQ_PENDING))
+ if (!(handle->flags & UV_HANDLE_READ_PENDING))
uv_pipe_queue_read(handle);
return 0;
}
-int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
+int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
+ struct sockaddr_in address, uv_connect_cb cb) {
int addrsize = sizeof(struct sockaddr_in);
BOOL success;
DWORD bytes;
- uv_tcp_t* handle = (uv_tcp_t*)req->handle;
-
- assert(!(req->flags & UV_REQ_PENDING));
if (handle->flags & UV_HANDLE_BIND_ERROR) {
uv_last_error_ = handle->error;
return -1;
}
- if (addr.sin_family != AF_INET) {
+ if (address.sin_family != AF_INET) {
uv_set_sys_error(WSAEFAULT);
return -1;
}
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;
- memset(&req->overlapped, 0, sizeof(req->overlapped));
+ uv_req_init((uv_req_t*) req);
req->type = UV_CONNECT;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
success = pConnectEx(handle->socket,
- (struct sockaddr*)&addr,
+ (struct sockaddr*) &address,
addrsize,
NULL,
0,
return -1;
}
- req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
return 0;
}
-int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
+int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
+ struct sockaddr_in6 address, uv_connect_cb cb) {
int addrsize = sizeof(struct sockaddr_in6);
BOOL success;
DWORD bytes;
- uv_tcp_t* handle = (uv_tcp_t*)req->handle;
if (!uv_allow_ipv6) {
uv_new_sys_error(UV_EAFNOSUPPORT);
return -1;
}
- assert(!(req->flags & UV_REQ_PENDING));
-
if (handle->flags & UV_HANDLE_BIND_ERROR) {
uv_last_error_ = handle->error;
return -1;
}
- if (addr.sin6_family != AF_INET6) {
+ if (address.sin6_family != AF_INET6) {
uv_set_sys_error(WSAEFAULT);
return -1;
}
uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0)
return -1;
- memset(&req->overlapped, 0, sizeof(req->overlapped));
+ uv_req_init((uv_req_t*) req);
req->type = UV_CONNECT;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
success = pConnectEx6(handle->socket,
- (struct sockaddr*)&addr,
+ (struct sockaddr*) &address,
addrsize,
NULL,
0,
return -1;
}
- req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
return 0;
}
-int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+static int uv_tcp_write(uv_write_t* req, uv_tcp_t* handle, uv_buf_t bufs[], int bufcnt,
+ uv_write_cb cb) {
int result;
DWORD bytes, err;
- uv_tcp_t* handle = (uv_tcp_t*) req->handle;
- assert(!(req->flags & UV_REQ_PENDING));
-
- if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+ if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(WSAEINVAL);
return -1;
}
- if (req->handle->flags & UV_HANDLE_SHUTTING) {
+ if (handle->flags & UV_HANDLE_SHUTTING) {
uv_set_sys_error(WSAESHUTDOWN);
return -1;
}
- memset(&req->overlapped, 0, sizeof(req->overlapped));
+ uv_req_init((uv_req_t*) req);
req->type = UV_WRITE;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
result = WSASend(handle->socket,
(WSABUF*)bufs,
handle->write_queue_size += req->queued_bytes;
}
- req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
handle->write_reqs_pending++;
}
-int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
+int uv_pipe_write(uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
+ uv_write_cb cb) {
int result;
- uv_pipe_t* handle = (uv_pipe_t*) req->handle;
-
- assert(!(req->flags & UV_REQ_PENDING));
if (bufcnt != 1) {
uv_set_sys_error(UV_ENOTSUP);
return -1;
}
- assert(handle->connection);
- assert(handle->connection->handle != INVALID_HANDLE_VALUE);
+ assert(handle->handle != INVALID_HANDLE_VALUE);
- if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+ if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(UV_EINVAL);
return -1;
}
- if (req->handle->flags & UV_HANDLE_SHUTTING) {
+ if (handle->flags & UV_HANDLE_SHUTTING) {
uv_set_sys_error(UV_EOF);
return -1;
}
- memset(&req->overlapped, 0, sizeof(req->overlapped));
+ uv_req_init((uv_req_t*) req);
req->type = UV_WRITE;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
- result = WriteFile(handle->connection->handle,
+ result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
handle->write_queue_size += req->queued_bytes;
}
- req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
handle->write_reqs_pending++;
}
-int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
- if (req->handle->type == UV_TCP) {
- return uv_tcp_write(req, bufs, bufcnt);
- } else if (req->handle->type == UV_NAMED_PIPE) {
- return uv_pipe_write(req, bufs, bufcnt);
+int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
+ uv_write_cb cb) {
+ if (handle->type == UV_TCP) {
+ return uv_tcp_write(req, (uv_tcp_t*) handle, bufs, bufcnt, cb);
+ } else if (handle->type == UV_NAMED_PIPE) {
+ return uv_pipe_write(req, (uv_pipe_t*) handle, bufs, bufcnt, cb);
}
+ uv_set_sys_error(WSAEINVAL);
return -1;
}
-int uv_shutdown(uv_req_t* req) {
- uv_tcp_t* handle = (uv_tcp_t*) req->handle;
- int status = 0;
-
- if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
+ if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(WSAEINVAL);
return -1;
}
return -1;
}
+ uv_req_init((uv_req_t*) req);
req->type = UV_SHUTDOWN;
- req->flags |= UV_REQ_PENDING;
+ req->handle = handle;
+ req->cb = cb;
handle->flags |= UV_HANDLE_SHUTTING;
handle->shutdown_req = req;
assert(handle->type == UV_TCP);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
+ handle->flags &= ~UV_HANDLE_READ_PENDING;
if (req->error.code != UV_OK) {
/* An error occurred doing the 0-read. */
}
/* Post another 0-read if still reading and not closing. */
- if (handle->flags & UV_HANDLE_READING) {
+ if ((handle->flags & UV_HANDLE_READING) &&
+ !(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_tcp_queue_read(handle);
}
}
}
-static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) {
+static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) {
assert(handle->type == UV_TCP);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
-
handle->write_queue_size -= req->queued_bytes;
if (req->cb) {
static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
assert(handle->type == UV_TCP);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
-
/* If handle->accepted_socket is not a valid socket, then */
/* uv_queue_accept must have failed. This is a serious error. We stop */
/* accepting connections and report this error to the connection */
}
-static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_req_t* req) {
+static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req) {
assert(handle->type == UV_TCP);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
-
if (req->cb) {
if (req->error.code == UV_OK) {
if (setsockopt(handle->socket,
assert(handle->type == UV_NAMED_PIPE);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
+ handle->flags &= ~UV_HANDLE_READ_PENDING;
if (req->error.code != UV_OK) {
/* An error occurred doing the 0-read. */
* This is so that ReadFile doesn't block if the read buffer is empty.
*/
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT;
- if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
+ if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
/* We can't continue processing this read. */
handle->flags &= ~UV_HANDLE_READING;
uv_set_sys_error(GetLastError());
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(buf.len > 0);
- if (ReadFile(handle->connection->handle,
- buf.base,
- buf.len,
- &bytes,
- NULL)) {
+ if (ReadFile(handle->handle,
+ buf.base,
+ buf.len,
+ &bytes,
+ NULL)) {
if (bytes > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, bytes, buf);
err = GetLastError();
if (err == ERROR_NO_DATA) {
/* Read buffer was completely empty, report a 0-byte read. */
- uv_set_sys_error(UV_EAGAIN);
+ uv_set_sys_error(WSAEWOULDBLOCK);
handle->read_cb((uv_stream_t*)handle, 0, buf);
} else {
/* Ouch! serious error. */
/* TODO: if the read callback stops reading we can't start reading again
because the pipe will still be in nowait mode. */
- if (handle->flags & UV_HANDLE_READING) {
+ if ((handle->flags & UV_HANDLE_READING) &&
+ !(handle->flags & UV_HANDLE_READ_PENDING)) {
/* Switch back to blocking mode so that we can use IOCP for 0-reads */
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
- if (SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
+ if (SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
/* Post another 0-read */
uv_pipe_queue_read(handle);
} else {
}
-static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) {
+static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_write_t* req) {
assert(handle->type == UV_NAMED_PIPE);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
-
handle->write_queue_size -= req->queued_bytes;
if (req->cb) {
}
-static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) {
- uv_pipe_instance_t* pipeInstance;
+static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* raw_req) {
+ struct uv_pipe_accept_s* req = (struct uv_pipe_accept_s*) raw_req;
assert(handle->type == UV_NAMED_PIPE);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
-
if (req->error.code == UV_OK) {
- assert(req->data);
-
- /* Create the connection instance and add it to the connections list. */
- pipeInstance = (uv_pipe_instance_t*)malloc(sizeof(uv_pipe_instance_t));
- if (!pipeInstance) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
-
- pipeInstance->handle = req->data;
- pipeInstance->state = UV_PIPEINSTANCE_CONNECTED;
- pipeInstance->next = handle->connections;
- handle->connections = pipeInstance;
+ assert(req->pipeHandle != INVALID_HANDLE_VALUE);
- /* Clear the request. */
- req->data = NULL;
- req->flags = 0;
+ req->next_pending = handle->pending_accepts;
+ handle->pending_accepts = req;
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else {
- /* Ignore errors and continue listening */
- if (handle->flags & UV_HANDLE_LISTENING) {
- uv_pipe_queue_accept(handle);
+ if (req->pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(req->pipeHandle);
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ }
+ if (!(handle->flags & UV_HANDLE_CLOSING)) {
+ uv_pipe_queue_accept(handle, req);
}
}
}
-static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_req_t* req) {
+static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_connect_t* req) {
assert(handle->type == UV_NAMED_PIPE);
- /* Mark the request non-pending */
- req->flags &= ~UV_REQ_PENDING;
-
if (req->cb) {
if (req->error.code == UV_OK) {
uv_init_connection((uv_stream_t*)handle);
handle->flags = 0;
handle->async_sent = 0;
handle->error = uv_ok_;
+ handle->async_cb = async_cb;
req = &handle->async_req;
- uv_req_init(req, (uv_handle_t*)handle, async_cb);
+ uv_req_init(req);
req->type = UV_WAKEUP;
+ req->data = handle;
uv_refs_++;
assert(req->type == UV_WAKEUP);
handle->async_sent = 0;
- if (req->cb) {
- ((uv_async_cb)req->cb)((uv_async_t*) handle, 0);
+ if (handle->async_cb) {
+ handle->async_cb((uv_async_t*) handle, 0);
}
if (handle->flags & UV_HANDLE_CLOSING) {
uv_want_endgame((uv_handle_t*)handle);
}
-#define DELEGATE_STREAM_REQ(req, method) \
+#define DELEGATE_STREAM_REQ(req, method, handle_at) \
do { \
- switch (req->handle->type) { \
+ switch (((uv_handle_t*) (req)->handle_at)->type) { \
case UV_TCP: \
- uv_process_tcp_##method##_req((uv_tcp_t*) req->handle, req); \
+ uv_process_tcp_##method##_req((uv_tcp_t*) ((req)->handle_at), req); \
break; \
\
case UV_NAMED_PIPE: \
- uv_process_pipe_##method##_req((uv_pipe_t*) req->handle, req); \
+ uv_process_pipe_##method##_req((uv_pipe_t*) ((req)->handle_at), req); \
break; \
\
default: \
while (req = uv_remove_pending_req()) {
switch (req->type) {
case UV_READ:
- DELEGATE_STREAM_REQ(req, read);
+ DELEGATE_STREAM_REQ(req, read, data);
break;
case UV_WRITE:
- DELEGATE_STREAM_REQ(req, write);
+ DELEGATE_STREAM_REQ((uv_write_t*) req, write, handle);
break;
case UV_ACCEPT:
- DELEGATE_STREAM_REQ(req, accept);
+ DELEGATE_STREAM_REQ(req, accept, data);
break;
case UV_CONNECT:
- DELEGATE_STREAM_REQ(req, connect);
+ DELEGATE_STREAM_REQ((uv_connect_t*) req, connect, handle);
break;
case UV_WAKEUP:
- uv_process_async_wakeup_req((uv_async_t*) req->handle, req);
+ uv_process_async_wakeup_req((uv_async_t*) req->data, req);
break;
case UV_ARES_EVENT_REQ:
- uv_process_ares_event_req((uv_ares_action_t*) req->handle, req);
+ uv_process_ares_event_req((uv_ares_action_t*) req->data, req);
break;
case UV_ARES_CLEANUP_REQ:
- uv_process_ares_cleanup_req((uv_ares_task_t*) req->handle, req);
+ uv_process_ares_cleanup_req((uv_ares_task_t*) req->data, req);
break;
case UV_GETADDRINFO_REQ:
- uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->handle, req);
+ uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->data, req);
break;
default:
selhandle->write = (network_events.lNetworkEvents & (FD_WRITE | FD_CONNECT)) ? 1 : 0;
uv_ares_req = &selhandle->ares_req;
- uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL);
+ uv_req_init(uv_ares_req);
uv_ares_req->type = UV_ARES_EVENT_REQ;
+ uv_ares_req->data = selhandle;
/* post ares needs to called */
if (!PostQueuedCompletionStatus(uv_iocp_,
/* Post request to cleanup the Task */
uv_ares_req = &uv_handle_ares->ares_req;
- uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL);
+ uv_req_init(uv_ares_req);
uv_ares_req->type = UV_ARES_CLEANUP_REQ;
+ uv_ares_req->data = uv_handle_ares;
/* post ares done with socket - finish cleanup when all threads done. */
if (!PostQueuedCompletionStatus(uv_iocp_,
}
/* init request for Post handling */
- uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL);
+ uv_req_init(&handle->getadddrinfo_req);
+ handle->getadddrinfo_req.data = handle;
handle->getadddrinfo_req.type = UV_GETADDRINFO_REQ;
/* Ask thread to run. Treat this as a long operation */
handle->type = UV_NAMED_PIPE;
handle->reqs_pending = 0;
+ handle->pending_accepts = NULL;
uv_counters()->pipe_init++;
/* TODO: make this work with UTF8 name */
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
int i;
+ struct uv_pipe_accept_s* req;
if (!name) {
+ uv_set_sys_error(WSAEINVAL);
return -1;
}
- handle->connections = NULL;
-
- /* Initialize accept requests. */
- for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
- handle->accept_reqs[i].flags = 0;
- handle->accept_reqs[i].type = UV_ACCEPT;
- handle->accept_reqs[i].handle = (uv_handle_t*)handle;
- handle->accept_reqs[i].cb = NULL;
- handle->accept_reqs[i].data = NULL;
- uv_counters()->req_init++;
- }
-
/* Make our own copy of the pipe name */
- handle->name = (char*)malloc(MAX_PIPENAME_LEN);
+ handle->name = strdup(name);
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
- strcpy(handle->name, name);
- handle->name[255] = '\0';
+
+ for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
+ req = &handle->accept_reqs[i];
+ uv_req_init((uv_req_t*) req);
+ req->type = UV_ACCEPT;
+ req->data = handle;
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ req->next_pending = NULL;
+ }
handle->flags |= UV_HANDLE_PIPESERVER;
return 0;
/* Starts listening for connections for the given pipe. */
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
- int i, maxInstances, errno;
- HANDLE pipeHandle;
- uv_pipe_instance_t* pipeInstance;
+ int i, errno;
if (handle->flags & UV_HANDLE_LISTENING ||
handle->flags & UV_HANDLE_READING) {
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
- uv_pipe_queue_accept(handle);
+ for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
+ uv_pipe_queue_accept(handle, &handle->accept_reqs[i]);
+ }
+
return 0;
}
/* TODO: make this work with UTF8 name */
-int uv_pipe_connect(uv_req_t* req, const char* name) {
+/* TODO: run this in the thread pool */
+int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+ const char* name, uv_connect_cb cb) {
int errno;
DWORD mode;
- uv_pipe_t* handle = (uv_pipe_t*)req->handle;
-
- assert(!(req->flags & UV_REQ_PENDING));
+ uv_req_init((uv_req_t*) req);
req->type = UV_CONNECT;
- handle->connection = &handle->clientConnection;
- handle->server = NULL;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+
memset(&req->overlapped, 0, sizeof(req->overlapped));
- handle->clientConnection.handle = CreateFile(name,
- GENERIC_READ | GENERIC_WRITE,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
+ handle->handle = CreateFile(name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
- if (handle->clientConnection.handle == INVALID_HANDLE_VALUE &&
+ if (handle->handle == INVALID_HANDLE_VALUE &&
GetLastError() != ERROR_IO_PENDING) {
errno = GetLastError();
goto error;
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
- if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) {
+ if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) {
errno = GetLastError();
goto error;
}
- if (CreateIoCompletionPort(handle->clientConnection.handle,
+ if (CreateIoCompletionPort(handle->handle,
uv_iocp_,
(ULONG_PTR)handle,
0) == NULL) {
}
req->error = uv_ok_;
- req->flags |= UV_REQ_PENDING;
- handle->connection->state = UV_PIPEINSTANCE_ACTIVE;
- uv_insert_pending_req(req);
+ uv_insert_pending_req((uv_req_t*) req);
handle->reqs_pending++;
return 0;
error:
- close_pipe(handle, NULL, NULL);
+ if (handle->handle != INVALID_HANDLE_VALUE) {
+ CloseHandle(handle->handle);
+ }
req->error = uv_new_sys_error(errno);
- uv_insert_pending_req(req);
+ uv_insert_pending_req((uv_req_t*) req);
handle->reqs_pending++;
- return 0;
+ return -1;
}
/* Cleans up uv_pipe_t (server or connection) and all resources associated with it */
static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
- uv_pipe_instance_t* connection, *next, *cur, **prev;
- HANDLE pipeHandle;
int i;
+ HANDLE pipeHandle;
if (handle->flags & UV_HANDLE_PIPESERVER) {
- if (handle->flags & UV_HANDLE_CONNECTION) {
- /*
- * The handle is for a connection instance on the pipe server.
- * To clean-up, we call DisconnectNamedPipe, and then uv_pipe_queue_accept will cleanup the allocated uv_pipe_instance_t.
- */
-
- connection = handle->connection;
- if (connection && connection->handle != INVALID_HANDLE_VALUE) {
- /* Disconnect the connection intance and return it to pending state. */
- if (DisconnectNamedPipe(connection->handle)) {
- if (status) *status = 0;
- } else {
- if (status) *status = -1;
- if (err) *err = uv_new_sys_error(GetLastError());
- }
-
- connection->state = UV_PIPEINSTANCE_DISCONNECTED;
- connection->handle = NULL;
-
- cur = handle->connections;
- handle->connection = NULL;
- prev = &handle->server->connections;
-
- /* Remove the connection from the list. */
- while (connection) {
- if (cur == connection) {
- *prev = connection->next;
- free(connection);
- break;
- } else {
- prev = &connection->next;
- connection = connection->next;
- }
- }
-
- /* Queue accept now that the instance is in pending state. */
- if (!(handle->server->flags & UV_HANDLE_CLOSING)) {
- uv_pipe_queue_accept(handle->server);
- }
- }
- } else {
- /*
- * The handle is for the pipe server.
- * To clean-up we close every active and pending connection instance.
- */
-
- if (handle->name) {
- free(handle->name);
- handle->name = NULL;
+ for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
+ pipeHandle = handle->accept_reqs[i].pipeHandle;
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(pipeHandle);
}
-
- connection = handle->connections;
- while (connection) {
- pipeHandle = connection->handle;
-
- if (pipeHandle) {
- DisconnectNamedPipe(pipeHandle);
- CloseHandle(pipeHandle);
- }
-
- next = connection->next;
- free(connection);
- connection = next;
- }
-
- handle->connections = NULL;
-
- for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
- if (handle->accept_reqs[i].flags & UV_REQ_PENDING) {
- pipeHandle = handle->accept_reqs[i].data;
- assert(pipeHandle);
- DisconnectNamedPipe(pipeHandle);
- CloseHandle(pipeHandle);
- handle->accept_reqs[i].flags = 0;
- handle->reqs_pending--;
- }
- }
-
- if (status) *status = 0;
}
+
} else {
- /*
- * The handle is for a connection instance on the pipe client.
- * To clean-up we close the pipe handle.
- */
- connection = handle->connection;
- if (connection && connection->handle != INVALID_HANDLE_VALUE) {
- if (CloseHandle(connection->handle)) {
- connection->state = UV_PIPEINSTANCE_DISCONNECTED;
- handle->connection = NULL;
- if (status) *status = 0;
- } else {
- if (status) *status = -1;
- if (err) *err = uv_new_sys_error(GetLastError());
- }
- }
+ CloseHandle(handle->handle);
}
handle->flags |= UV_HANDLE_SHUT;
int pongs;
int state;
uv_tcp_t tcp;
- uv_req_t connect_req;
- uv_req_t shutdown_req;
+ uv_connect_t connect_req;
+ uv_shutdown_t shutdown_req;
} pinger_t;
typedef struct buf_s {
}
-static void pinger_write_cb(uv_req_t *req, int status) {
+static void pinger_write_cb(uv_write_t* req, int status) {
ASSERT(status == 0);
free(req);
static void pinger_write_ping(pinger_t* pinger) {
- uv_req_t *req;
+ uv_write_t* req;
uv_buf_t buf;
buf.base = (char*)&PING;
buf.len = strlen(PING);
- req = (uv_req_t*)malloc(sizeof(*req));
- uv_req_init(req, (uv_handle_t*)(&pinger->tcp), pinger_write_cb);
-
- if (uv_write(req, &buf, 1)) {
+ req = malloc(sizeof *req);
+ if (uv_write(req, (uv_stream_t*) &pinger->tcp, &buf, 1, pinger_write_cb)) {
FATAL("uv_write failed");
}
}
-static void pinger_shutdown_cb(uv_handle_t* handle, int status) {
+static void pinger_shutdown_cb(uv_shutdown_t* req, int status) {
ASSERT(status == 0);
pinger_shutdown_cb_called++;
if (pinger->state == 0) {
pinger->pongs++;
if (uv_now() - start_time > TIME) {
- uv_req_init(&pinger->shutdown_req, (uv_handle_t*)tcp, pinger_shutdown_cb);
- uv_shutdown(&pinger->shutdown_req);
+ uv_shutdown(&pinger->shutdown_req, (uv_stream_t*) tcp, pinger_shutdown_cb);
break;
} else {
pinger_write_ping(pinger);
}
-static void pinger_connect_cb(uv_req_t *req, int status) {
+static void pinger_connect_cb(uv_connect_t* req, int status) {
pinger_t *pinger = (pinger_t*)req->handle->data;
ASSERT(status == 0);
pinger_write_ping(pinger);
- if (uv_read_start((uv_stream_t*)(req->handle), buf_alloc, pinger_read_cb)) {
+ if (uv_read_start(req->handle, buf_alloc, pinger_read_cb)) {
FATAL("uv_read_start failed");
}
}
pinger->tcp.data = pinger;
- /* We are never doing multiple reads/connects at a time anyway. */
- /* so these handles can be pre-initialized. */
- uv_req_init(&pinger->connect_req, (uv_handle_t*)&pinger->tcp,
- pinger_connect_cb);
-
uv_tcp_bind(&pinger->tcp, client_addr);
- r = uv_tcp_connect(&pinger->connect_req, server_addr);
+
+ r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_connect_cb);
ASSERT(!r);
}
}
-static void write_cb(uv_req_t *req, int status) {
+static void write_cb(uv_write_t* req, int status) {
uv_buf_t* buf = (uv_buf_t*) req->data;
ASSERT(status == 0);
- req_free(req);
+ req_free((uv_req_t*) req);
nsent += sizeof write_buffer;
nsent_total += sizeof write_buffer;
- do_write((uv_stream_t*)req->handle);
+ do_write((uv_stream_t*) req->handle);
}
static void do_write(uv_stream_t* stream) {
- uv_req_t* req;
+ uv_write_t* req;
uv_buf_t buf;
int r;
buf.len = sizeof write_buffer;
while (stream->write_queue_size == 0) {
- req = req_alloc();
- uv_req_init(req, (uv_handle_t*)stream, write_cb);
-
- r = uv_write(req, &buf, 1);
+ req = (uv_write_t*) req_alloc();
+ r = uv_write(req, stream, &buf, 1, write_cb);
ASSERT(r == 0);
}
}
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
int i;
if (status) LOG(uv_strerror(uv_last_error()));
ASSERT(status == 0);
write_sockets++;
- req_free(req);
+ req_free((uv_req_t*) req);
maybe_connect_some();
static void maybe_connect_some() {
- uv_req_t* req;
+ uv_connect_t* req;
uv_tcp_t* tcp;
uv_pipe_t* pipe;
int r;
r = uv_tcp_init(tcp);
ASSERT(r == 0);
- req = req_alloc();
- uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
- r = uv_tcp_connect(req, connect_addr);
+ req = (uv_connect_t*) req_alloc();
+ r = uv_tcp_connect(req, tcp, connect_addr, connect_cb);
ASSERT(r == 0);
} else {
pipe = &pipe_write_handles[max_connect_socket++];
r = uv_pipe_init(pipe);
ASSERT(r == 0);
- req = req_alloc();
- uv_req_init(req, (uv_handle_t*)pipe, connect_cb);
- r = uv_pipe_connect(req, TEST_PIPENAME);
+ req = (uv_connect_t*) req_alloc();
+ r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
ASSERT(r == 0);
#ifdef _WIN32
*/
typedef struct req_list_s {
- uv_req_t uv_req;
+ union uv_any_req uv_req;
struct req_list_s* next;
} req_list_t;
typedef struct {
- uv_req_t req;
+ uv_write_t req;
uv_buf_t buf;
} write_req_t;
static uv_tcp_t server;
-static void after_write(uv_req_t* req, int status);
+static void after_write(uv_write_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 };
-static void after_write(uv_req_t* req, int status) {
+static void after_write(uv_write_t* req, int status) {
write_req_t* wr;
if (status) {
}
-static void after_shutdown(uv_req_t* req, int status) {
- uv_close(req->handle, on_close);
+static void after_shutdown(uv_shutdown_t* req, int status) {
+ uv_close((uv_handle_t*) req->handle, on_close);
free(req);
}
}
static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
- write_req_t *wr;
+ write_req_t* wr;
dnshandle* dns = (dnshandle*)handle;
char hdrbuf[DNSREC_LEN];
int hdrbuf_remaining = DNSREC_LEN;
int usingprev = 0;
wr = (write_req_t*) malloc(sizeof *wr);
- uv_req_init(&wr->req, (uv_handle_t*)handle, after_write);
wr->buf.base = (char*)malloc(WRITE_BUF_LEN);
wr->buf.len = 0;
/* send write buffer */
if (wr->buf.len > 0) {
- if (uv_write(&wr->req, &wr->buf, 1)) {
+ if (uv_write((uv_write_t*) &wr->req, handle, &wr->buf, 1, after_write)) {
FATAL("uv_write failed");
}
}
}
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
- uv_req_t* req;
+ uv_shutdown_t* req;
if (nread < 0) {
/* Error or EOF */
free(buf.base);
}
- req = (uv_req_t*) malloc(sizeof *req);
- uv_req_init(req, (uv_handle_t*)handle, after_shutdown);
- uv_shutdown(req);
+ req = malloc(sizeof *req);
+ uv_shutdown(req, handle, after_shutdown);
return;
}
#include <stdlib.h>
typedef struct {
- uv_req_t req;
+ uv_write_t req;
uv_buf_t buf;
} write_req_t;
static uv_pipe_t pipeServer;
static uv_handle_t* server;
-static void after_write(uv_req_t* req, int status);
+static void after_write(uv_write_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
static void on_connection(uv_handle_t*, int status);
-static void after_write(uv_req_t* req, int status) {
+static void after_write(uv_write_t* req, int status) {
write_req_t* wr;
if (status) {
}
-static void after_shutdown(uv_req_t* req, int status) {
- uv_close(req->handle, on_close);
+static void after_shutdown(uv_shutdown_t* req, int status) {
+ uv_close((uv_handle_t*)req->handle, on_close);
free(req);
}
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
int i;
write_req_t *wr;
- uv_req_t* req;
+ uv_shutdown_t* req;
if (nread < 0) {
/* Error or EOF */
free(buf.base);
}
- req = (uv_req_t*) malloc(sizeof *req);
- uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown);
- uv_shutdown(req);
+ req = (uv_shutdown_t*) malloc(sizeof *req);
+ uv_shutdown(req, handle, after_shutdown);
return;
}
wr = (write_req_t*) malloc(sizeof *wr);
- uv_req_init(&wr->req, (uv_handle_t*)handle, (void *(*)(void *))after_write);
wr->buf.base = buf.base;
wr->buf.len = nread;
- if (uv_write(&wr->req, &wr->buf, 1)) {
+ if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) {
FATAL("uv_write failed");
}
}
static uv_tcp_t client;
static uv_timer_t timer;
-static uv_req_t connect_req, write_req, shutdown_req;
+static uv_connect_t connect_req;
+static uv_write_t write_req;
+static uv_shutdown_t shutdown_req;
static int nested = 0;
static int close_cb_called = 0;
}
-static void shutdown_cb(uv_req_t* req, int status) {
+static void shutdown_cb(uv_shutdown_t* req, int status) {
ASSERT(status == 0);
ASSERT(nested == 0 && "shutdown_cb must be called from a fresh stack");
/* from a fresh stack. */
if (bytes_received == sizeof MESSAGE) {
nested++;
- uv_req_init(&shutdown_req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb);
puts("Shutdown");
- if (uv_shutdown(&shutdown_req)) {
+ if (uv_shutdown(&shutdown_req, (uv_stream_t*)tcp, shutdown_cb)) {
FATAL("uv_shutdown failed");
}
nested--;
}
-static void write_cb(uv_req_t* req, int status) {
+static void write_cb(uv_write_t* req, int status) {
int r;
ASSERT(status == 0);
}
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
uv_buf_t buf;
puts("Connected. Write some data to echo server...");
buf.base = (char*) &MESSAGE;
buf.len = sizeof MESSAGE;
- uv_req_init(&write_req, req->handle, (void *(*)(void *))write_cb);
-
- if (uv_write(&write_req, &buf, 1)) {
+ if (uv_write(&write_req, (uv_stream_t*)req->handle, &buf, 1, write_cb)) {
FATAL("uv_write failed");
}
puts("Connecting...");
nested++;
- uv_req_init(&connect_req, (uv_handle_t*)&client,
- (void *(*)(void *))connect_cb);
- if (uv_tcp_connect(&connect_req, addr)) {
+ if (uv_tcp_connect(&connect_req, &client, addr, connect_cb)) {
FATAL("uv_tcp_connect failed");
}
nested--;
static uv_tcp_t tcp;
-static uv_req_t req;
+static uv_connect_t req;
static int connect_cb_calls;
static int close_cb_calls;
}
-static void on_connect_with_close(uv_req_t *req, int status) {
- ASSERT(&tcp == (uv_tcp_t*) req->handle);
+static void on_connect_with_close(uv_connect_t *req, int status) {
+ ASSERT((uv_stream_t*) &tcp == req->handle);
ASSERT(status == -1);
ASSERT(uv_last_error().code == UV_ECONNREFUSED);
connect_cb_calls++;
ASSERT(close_cb_calls == 0);
- uv_close(req->handle, on_close);
+ uv_close((uv_handle_t*)req->handle, on_close);
}
-static void on_connect_without_close(uv_req_t *req, int status) {
+static void on_connect_without_close(uv_connect_t *req, int status) {
ASSERT(status == -1);
ASSERT(uv_last_error().code == UV_ECONNREFUSED);
connect_cb_calls++;
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
- uv_req_init(&req, (uv_handle_t*)&tcp, (void *(*)(void *))connect_cb);
-
uv_tcp_bind(&tcp, client_addr);
- r = uv_tcp_connect(&req, server_addr);
+ r = uv_tcp_connect(&req, &tcp, server_addr, connect_cb);
ASSERT(!r);
uv_run();
}
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
int r;
ASSERT(req != NULL);
static void client_connect() {
struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client);
- uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req);
+ uv_connect_t* connect_req = malloc(sizeof *connect_req);
int r;
ASSERT(client != NULL);
r = uv_tcp_init(client);
ASSERT(r == 0);
- uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb);
- r = uv_tcp_connect(connect_req, addr);
+ r = uv_tcp_connect(connect_req, client, addr, connect_cb);
ASSERT(r == 0);
}
static uv_tcp_t tcp;
-static uv_req_t connect_req;
+static uv_connect_t connect_req;
static uv_tcp_t tcpServer;
}
-static void after_shutdown(uv_req_t* req, int status) {
- uv_close(req->handle, on_close);
+static void after_shutdown(uv_shutdown_t* req, int status) {
+ uv_close((uv_handle_t*) req->handle, on_close);
free(req);
}
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
- uv_req_t* req;
+ uv_shutdown_t* req;
+ int r;
if (buf.base) {
free(buf.base);
}
- req = (uv_req_t*) malloc(sizeof *req);
- uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown);
- uv_shutdown(req);
+ req = (uv_shutdown_t*) malloc(sizeof *req);
+ r = uv_shutdown(req, handle, after_shutdown);
+ ASSERT(r == 0);
}
}
-static void on_connect(void* req) {
+static void on_connect(uv_connect_t* req, int status) {
struct sockaddr sockname;
int namelen = sizeof(sockname);
- int status;
+ int r;
- status = uv_getsockname(&tcp, &sockname, &namelen);
- if (status != 0) {
+ ASSERT(status == 0);
+
+ r = uv_getsockname(&tcp, &sockname, &namelen);
+ if (r != 0) {
fprintf(stderr, "uv_getsockname error (connector) %d\n", uv_last_error().code);
}
- ASSERT(status == 0);
+ ASSERT(r == 0);
getsocknamecount++;
tcp.data = &connect_req;
ASSERT(!r);
- uv_req_init(&connect_req, (uv_handle_t*)(&tcp), (void *(*)(void *))on_connect);
-
- r = uv_tcp_connect(&connect_req, server_addr);
+ r = uv_tcp_connect(&connect_req, &tcp, server_addr, on_connect);
ASSERT(!r);
}
uv_tcp_t tcp;
uv_pipe_t pipe;
};
- uv_req_t connect_req;
- uv_req_t read_req;
+ uv_connect_t connect_req;
char read_buffer[BUFSIZE];
} pinger_t;
}
-static void pinger_after_write(uv_req_t *req, int status) {
+static void pinger_after_write(uv_write_t *req, int status) {
ASSERT(status == 0);
-
free(req);
}
static void pinger_write_ping(pinger_t* pinger) {
- uv_req_t *req;
+ uv_write_t *req;
uv_buf_t buf;
buf.base = (char*)&PING;
buf.len = strlen(PING);
- req = (uv_req_t*)malloc(sizeof(*req));
- uv_req_init(req, (uv_handle_t*)(&pinger->tcp),
- (void *(*)(void *))pinger_after_write);
+ req = malloc(sizeof(uv_write_t));
- if (uv_write(req, &buf, 1)) {
+ if (uv_write(req, (uv_stream_t*)&pinger->tcp, &buf, 1, pinger_after_write)) {
FATAL("uv_write failed");
}
}
-static void pinger_on_connect(uv_req_t *req, int status) {
+static void pinger_on_connect(uv_connect_t *req, int status) {
pinger_t *pinger = (pinger_t*)req->handle->data;
ASSERT(status == 0);
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
- uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
- (void *(*)(void *))pinger_on_connect);
-
- r = uv_tcp_connect6(&pinger->connect_req, server_addr);
+ r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr,
+ pinger_on_connect);
ASSERT(!r);
}
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
- uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
- (void *(*)(void *))pinger_on_connect);
-
- r = uv_tcp_connect(&pinger->connect_req, server_addr);
+ r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_on_connect);
ASSERT(!r);
}
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
- uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe),
- (void *(*)(void *))pinger_on_connect);
- r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME);
+ r = uv_pipe_connect(&pinger->connect_req, &pinger->pipe, TEST_PIPENAME, pinger_on_connect);
ASSERT(!r);
}
static uv_timer_t timer;
static uv_tcp_t tcp;
-static uv_req_t connect_req, write_req, shutdown_req;
+static uv_connect_t connect_req;
+static uv_write_t write_req;
+static uv_shutdown_t shutdown_req;
static uv_buf_t qbuf;
static int got_q;
static int got_eof;
}
-static void shutdown_cb(uv_req_t *req, int status) {
+static void shutdown_cb(uv_shutdown_t *req, int status) {
ASSERT(req == &shutdown_req);
ASSERT(called_connect_cb == 1);
}
-static void connect_cb(uv_req_t *req, int status) {
+static void connect_cb(uv_connect_t *req, int status) {
ASSERT(status == 0);
ASSERT(req == &connect_req);
* Write the letter 'Q' to gracefully kill the echo-server. This will not
* effect our connection.
*/
- uv_req_init(&write_req, (uv_handle_t*)&tcp, NULL);
- uv_write(&write_req, &qbuf, 1);
+ uv_write(&write_req, (uv_stream_t*) &tcp, &qbuf, 1, NULL);
/* Shutdown our end of the connection. */
- uv_req_init(&shutdown_req, (uv_handle_t*)&tcp, (void *(*)(void *))shutdown_cb);
- uv_shutdown(&shutdown_req);
+ uv_shutdown(&shutdown_req, (uv_stream_t*) &tcp, shutdown_cb);
called_connect_cb++;
ASSERT(called_shutdown_cb == 0);
r = uv_tcp_init(&tcp);
ASSERT(!r);
- uv_req_init(&connect_req, (uv_handle_t*) &tcp, (void *(*)(void *))connect_cb);
- r = uv_tcp_connect(&connect_req, server_addr);
+ r = uv_tcp_connect(&connect_req, &tcp, server_addr, connect_cb);
ASSERT(!r);
uv_run();
}
-static void shutdown_cb(uv_req_t* req, int status) {
+static void shutdown_cb(uv_shutdown_t* req, int status) {
uv_tcp_t* tcp;
ASSERT(req);
}
-static void write_cb(uv_req_t* req, int status) {
+static void write_cb(uv_write_t* req, int status) {
ASSERT(req != NULL);
if (status) {
}
-static void connect_cb(uv_req_t* req, int status) {
+static void connect_cb(uv_connect_t* req, int status) {
uv_buf_t send_bufs[CHUNKS_PER_WRITE];
uv_tcp_t* tcp;
+ uv_write_t* write_req;
+ uv_shutdown_t* shutdown_req;
int i, j, r;
ASSERT(req != NULL);
bytes_sent += CHUNK_SIZE;
}
- req = (uv_req_t*)malloc(sizeof *req);
- ASSERT(req != NULL);
+ write_req = malloc(sizeof(uv_write_t));
+ ASSERT(write_req != NULL);
- uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))write_cb);
- r = uv_write(req, (uv_buf_t*)&send_bufs, CHUNKS_PER_WRITE);
+ r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs,
+ CHUNKS_PER_WRITE, write_cb);
ASSERT(r == 0);
}
/* Shutdown on drain. FIXME: dealloc req? */
- req = (uv_req_t*) malloc(sizeof(uv_req_t));
- ASSERT(req != NULL);
- uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb);
- r = uv_shutdown(req);
+ shutdown_req = malloc(sizeof(uv_shutdown_t));
+ ASSERT(shutdown_req != NULL);
+ r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb);
ASSERT(r == 0);
/* Start reading */
- req = (uv_req_t*)malloc(sizeof *req);
- ASSERT(req != NULL);
-
- uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))read_cb);
r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb);
ASSERT(r == 0);
}
TEST_IMPL(tcp_writealot) {
struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client);
- uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req);
+ uv_connect_t* connect_req = malloc(sizeof(uv_connect_t));
int r;
ASSERT(client != NULL);
r = uv_tcp_init(client);
ASSERT(r == 0);
- uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb);
- r = uv_tcp_connect(connect_req, addr);
+ r = uv_tcp_connect(connect_req, client, addr, connect_cb);
ASSERT(r == 0);
uv_run();
class TCPWrap;
+template <typename T>
class ReqWrap {
public:
- ReqWrap(uv_handle_t* handle, void* callback) {
+ ReqWrap() {
HandleScope scope;
object_ = Persistent<Object>::New(Object::New());
- uv_req_init(&req_, handle, (void* (*)(void*))callback);
- req_.data = this;
}
~ReqWrap() {
+ // Assert that someone has called Dispatched()
+ assert(req_.data == this);
assert(!object_.IsEmpty());
object_.Dispose();
object_.Clear();
}
+ // Call this after the req has been dispatched.
+ void Dispatched() {
+ req_.data = this;
+ }
+
Persistent<Object> object_;
- uv_req_t req_;
+ T req_;
};
+
+typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
+typedef class ReqWrap<uv_write_t> WriteWrap;
+typedef class ReqWrap<uv_connect_t> ConnectWrap;
+
+
class TCPWrap {
public:
return scope.Close(Integer::New(r));
}
- static void AfterWrite(uv_req_t* req, int status) {
- ReqWrap* req_wrap = (ReqWrap*) req->data;
+ static void AfterWrite(uv_write_t* req, int status) {
+ WriteWrap* req_wrap = (WriteWrap*) req->data;
TCPWrap* wrap = (TCPWrap*) req->handle->data;
HandleScope scope;
length = args[2]->IntegerValue();
}
- // I hate when people program C++ like it was C, and yet I do it too.
- // I'm too lazy to come up with the perfect class hierarchy here. Let's
- // just do some type munging.
- ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
- (void*)AfterWrite);
+ WriteWrap* req_wrap = new WriteWrap();
req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
buf.base = Buffer::Data(buffer_obj) + offset;
buf.len = length;
- int r = uv_write(&req_wrap->req_, &buf, 1);
+ int r = uv_write(&req_wrap->req_, (uv_stream_t*)&wrap->handle_, &buf, 1,
+ AfterWrite);
+
+ req_wrap->Dispatched();
wrap->UpdateWriteQueueSize();
}
}
- static void AfterConnect(uv_req_t* req, int status) {
- ReqWrap* req_wrap = (ReqWrap*) req->data;
+ static void AfterConnect(uv_connect_t* req, int status) {
+ ConnectWrap* req_wrap = (ConnectWrap*) req->data;
TCPWrap* wrap = (TCPWrap*) req->handle->data;
HandleScope scope;
// I hate when people program C++ like it was C, and yet I do it too.
// I'm too lazy to come up with the perfect class hierarchy here. Let's
// just do some type munging.
- ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
- (void*)AfterConnect);
+ ConnectWrap* req_wrap = new ConnectWrap();
+
+ int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address,
+ AfterConnect);
- int r = uv_tcp_connect(&req_wrap->req_, address);
+ req_wrap->Dispatched();
if (r) {
SetErrno(uv_last_error().code);
struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port);
- // I hate when people program C++ like it was C, and yet I do it too.
- // I'm too lazy to come up with the perfect class hierarchy here. Let's
- // just do some type munging.
- ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
- (void*)AfterConnect);
+ ConnectWrap* req_wrap = new ConnectWrap();
- int r = uv_tcp_connect6(&req_wrap->req_, address);
+ int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address,
+ AfterConnect);
+
+ req_wrap->Dispatched();
if (r) {
SetErrno(uv_last_error().code);
}
}
- static void AfterShutdown(uv_req_t* req, int status) {
- ReqWrap* req_wrap = (ReqWrap*) req->data;
+ static void AfterShutdown(uv_shutdown_t* req, int status) {
+ ReqWrap<uv_shutdown_t>* req_wrap = (ReqWrap<uv_shutdown_t>*) req->data;
TCPWrap* wrap = (TCPWrap*) req->handle->data;
// The request object should still be there.
UNWRAP
- ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_,
- (void*)AfterShutdown);
+ ShutdownWrap* req_wrap = new ShutdownWrap();
+
+ int r = uv_shutdown(&req_wrap->req_, (uv_stream_t*) &wrap->handle_,
+ AfterShutdown);
- int r = uv_shutdown(&req_wrap->req_);
+ req_wrap->Dispatched();
if (r) {
SetErrno(uv_last_error().code);
uv_tcp_t handle_;
Persistent<Object> object_;
size_t slab_offset_;
- friend class ReqWrap;
};