OBJS += src/unix/error.o
OBJS += src/unix/fs.o
OBJS += src/unix/loop.o
+OBJS += src/unix/loop-watcher.o
OBJS += src/unix/pipe.o
OBJS += src/unix/poll.o
OBJS += src/unix/process.o
#define UV_UDP_SEND_PRIVATE_FIELDS \
ngx_queue_t queue; \
- struct sockaddr_storage addr; \
- socklen_t addrlen; \
- uv_buf_t* bufs; \
+ struct sockaddr_in6 addr; \
int bufcnt; \
+ uv_buf_t* bufs; \
ssize_t status; \
uv_udp_send_cb send_cb; \
uv_buf_t bufsml[UV_REQ_BUFSML_SIZE]; \
/* TODO: union or classes please! */
#define UV_HANDLE_PRIVATE_FIELDS \
- int fd; \
int flags; \
uv_handle_t* next_pending; \
uv__io_t write_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \
- int delayed_error; \
uv_connection_cb connection_cb; \
+ int delayed_error; \
int accepted_fd; \
- int blocking;
+ int fd; \
/* UV_TCP */
/* UV_UDP */
#define UV_UDP_PRIVATE_FIELDS \
+ int fd; \
uv_alloc_cb alloc_cb; \
uv_udp_recv_cb recv_cb; \
uv__io_t read_watcher; \
/* UV_POLL */
#define UV_POLL_PRIVATE_FIELDS \
+ int fd; \
uv__io_t io_watcher;
struct uv_fs_event_s* rbe_parent; \
int rbe_color; \
} node; \
- uv_fs_event_cb cb;
+ uv_fs_event_cb cb; \
+ int fd; \
#elif defined(__APPLE__) \
|| defined(__FreeBSD__) \
ev_io event_watcher; \
uv_fs_event_cb cb; \
int fflags; \
+ int fd;
#elif defined(__sun)
# define UV_FS_EVENT_PRIVATE_FIELDS \
ev_io event_watcher; \
uv_fs_event_cb cb; \
- file_obj_t fo;
+ file_obj_t fo; \
+ int fd;
#else /* !PORT_SOURCE_FILE */
# define UV_FS_EVENT_PRIVATE_FIELDS
#endif
/*
* This function starts the event loop. It blocks until the reference count
- * of the loop drops to zero.
+ * of the loop drops to zero. Always returns zero.
*/
-UV_EXTERN int uv_run (uv_loop_t*);
+UV_EXTERN int uv_run(uv_loop_t*);
/*
- * This function polls for new events without blocking.
+ * Poll for new events once. Note that this function blocks if there are no
+ * pending events. Returns zero when done (no active handles or requests left),
+ * or non-zero if more events are expected (meaning you should call
+ * uv_run_once() again sometime in the future).
*/
-UV_EXTERN int uv_run_once (uv_loop_t*);
+UV_EXTERN int uv_run_once(uv_loop_t*);
/*
* Manually modify the event loop's reference count. Useful if the user wants
#endif
#define UV_REQ_FIELDS \
- /* read-only */ \
- uv_req_type type; \
/* public */ \
void* data; \
UV_REQ_EXTRA_FIELDS \
/* private */ \
- UV_REQ_PRIVATE_FIELDS
+ UV_REQ_PRIVATE_FIELDS \
+ /* read-only */ \
+ uv_req_type type; \
/* Abstract base class of all requests. */
struct uv_req_s {
# define UV_HANDLE_EXTRA_FIELDS
#endif
-#define UV_HANDLE_FIELDS \
- /* read-only */ \
- uv_loop_t* loop; \
- uv_handle_type type; \
- /* public */ \
- uv_close_cb close_cb; \
- void* data; \
- UV_HANDLE_EXTRA_FIELDS \
- /* private */ \
- UV_HANDLE_PRIVATE_FIELDS
+#define UV_HANDLE_FIELDS \
+ /* read-only */ \
+ uv_loop_t* loop; \
+ /* public */ \
+ uv_close_cb close_cb; \
+ void* data; \
+ /* read-only */ \
+ uv_handle_type type; \
+ /* private */ \
+ UV_HANDLE_PRIVATE_FIELDS \
+ UV_HANDLE_EXTRA_FIELDS \
/* The abstract base class of all handles. */
struct uv_handle_s {
/* uv_fs_t is a subclass of uv_req_t */
struct uv_fs_s {
UV_REQ_FIELDS
- uv_loop_t* loop;
uv_fs_type fs_type;
+ uv_loop_t* loop;
uv_fs_cb cb;
ssize_t result;
void* ptr;
int uv_run_once(uv_loop_t* loop) {
- uv__run(loop);
- return 0;
+ return uv__run(loop);
}
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
- assert(!ev_is_active(&((uv_stream_t*)handle)->read_watcher));
- assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher));
+ assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
+ assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;
uv_getaddrinfo_t* req = req_->data;
struct addrinfo *res = req->res;
#if __sun
+ uv_getaddrinfo_t* handle = req->data;
size_t hostlen = strlen(handle->hostname);
#endif
}
-int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t slen) {
+int uv__accept(int sockfd) {
int peerfd;
assert(sockfd >= 0);
while (1) {
#if __linux__
- peerfd = uv__accept4(sockfd, saddr, &slen, UV__SOCK_NONBLOCK|UV__SOCK_CLOEXEC);
+ peerfd = uv__accept4(sockfd,
+ NULL,
+ NULL,
+ UV__SOCK_NONBLOCK|UV__SOCK_CLOEXEC);
+
if (peerfd != -1)
break;
break;
#endif
- if ((peerfd = accept(sockfd, saddr, &slen)) == -1) {
+ peerfd = accept(sockfd, NULL, NULL);
+
+ if (peerfd == -1) {
if (errno == EINTR)
continue;
else
/* flags */
enum {
- UV_CLOSING = 0x01, /* uv_close() called but not finished. */
- UV_CLOSED = 0x02, /* close(2) finished. */
+ UV_CLOSING = 0x01, /* uv_close() called but not finished. */
+ UV_CLOSED = 0x02, /* close(2) finished. */
UV_STREAM_READING = 0x04, /* uv_read_start() called. */
UV_STREAM_SHUTTING = 0x08, /* uv_shutdown() called but not complete. */
UV_STREAM_SHUT = 0x10, /* Write side closed. */
UV_STREAM_READABLE = 0x20, /* The stream is readable */
UV_STREAM_WRITABLE = 0x40, /* The stream is writable */
- UV_TCP_NODELAY = 0x080, /* Disable Nagle. */
- UV_TCP_KEEPALIVE = 0x100, /* Turn on keep-alive. */
- UV_TIMER_REPEAT = 0x100,
- UV__PENDING = 0x800
+ UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */
+ UV_TCP_NODELAY = 0x100, /* Disable Nagle. */
+ UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */
+ UV_TIMER_REPEAT = 0x100,
+ UV__PENDING = 0x800
};
inline static int uv__has_pending_handles(const uv_loop_t* loop) {
int uv__stream_open(uv_stream_t*, int fd, int flags);
void uv__stream_destroy(uv_stream_t* stream);
void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events);
-int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
+int uv__accept(int sockfd);
int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
socklen_t addrlen, uv_connect_cb cb);
close(loop->fs_fd);
#endif
}
-
-
-#define X(name, type) \
- int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \
- uv__handle_init(loop, (uv_handle_t*)handle, type); \
- loop->counters.name##_init++; \
- handle->name##_cb = NULL; \
- return 0; \
- } \
- int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \
- if (uv__is_active(handle)) return 0; \
- ngx_queue_insert_head(&handle->loop->name##_handles, &handle->queue); \
- handle->name##_cb = cb; \
- uv__handle_start(handle); \
- return 0; \
- } \
- int uv_##name##_stop(uv_##name##_t* handle) { \
- if (!uv__is_active(handle)) return 0; \
- ngx_queue_remove(&handle->queue); \
- uv__handle_stop(handle); \
- return 0; \
- } \
- void uv__run_##name(uv_loop_t* loop) { \
- uv_##name##_t* h; \
- ngx_queue_t* q; \
- ngx_queue_foreach(q, &loop->name##_handles) { \
- h = ngx_queue_data(q, uv_##name##_t, queue); \
- if (h->name##_cb) h->name##_cb(h, 0); \
- } \
- } \
- void uv__##name##_close(uv_##name##_t* handle) { \
- uv_##name##_stop(handle); \
- }
-X(idle, UV_IDLE)
-X(check, UV_CHECK)
-X(prepare, UV_PREPARE)
-#undef X
uv_strlcpy(saddr.sun_path, pipe_fname, sizeof(saddr.sun_path));
saddr.sun_family = AF_UNIX;
- if (bind(sockfd, (struct sockaddr*)&saddr, sizeof saddr) == -1) {
- /* On EADDRINUSE:
- *
- * We hold the file lock so there is no other process listening
- * on the socket. Ergo, it's stale - remove it.
- *
- * This assumes that the other process uses locking too
- * but that's a good enough assumption for now.
- */
- if (errno != EADDRINUSE
- || unlink(pipe_fname) == -1
- || bind(sockfd, (struct sockaddr*)&saddr, sizeof saddr) == -1) {
- /* Convert ENOENT to EACCES for compatibility with Windows. */
- uv__set_sys_error(handle->loop, (errno == ENOENT) ? EACCES : errno);
- goto out;
- }
+ if (bind(sockfd, (struct sockaddr*)&saddr, sizeof saddr)) {
+ /* Convert ENOENT to EACCES for compatibility with Windows. */
+ uv__set_sys_error(handle->loop, (errno == ENOENT) ? EACCES : errno);
+ goto out;
}
bound = 1;
/* TODO merge with uv__server_io()? */
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
- struct sockaddr_un saddr;
uv_pipe_t* pipe;
int saved_errno;
int sockfd;
assert(pipe->type == UV_NAMED_PIPE);
- sockfd = uv__accept(pipe->fd, (struct sockaddr *)&saddr, sizeof saddr);
+ sockfd = uv__accept(pipe->fd);
if (sockfd == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
uv__set_sys_error(pipe->loop, errno);
if (pevents & UV_WRITABLE)
events |= UV__IO_WRITE;
+ uv__io_stop(handle->loop, &handle->io_watcher);
uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events);
uv__io_start(handle->loop, &handle->io_watcher);
stream->accepted_fd = -1;
stream->fd = -1;
stream->delayed_error = 0;
- stream->blocking = 0;
ngx_queue_init(&stream->write_queue);
ngx_queue_init(&stream->write_completed_queue);
stream->write_queue_size = 0;
void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
int fd;
- struct sockaddr_storage addr;
uv_stream_t* stream = container_of(w, uv_stream_t, read_watcher);
assert(events == UV__IO_READ);
*/
while (stream->fd != -1) {
assert(stream->accepted_fd < 0);
- fd = uv__accept(stream->fd, (struct sockaddr*)&addr, sizeof addr);
+ fd = uv__accept(stream->fd);
if (fd < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
stream->write_queue_size -= uv__write_req_size(req);
uv__write_req_finish(req);
return;
- } else if (stream->blocking) {
+ } else if (stream->flags & UV_STREAM_BLOCKING) {
/* If this is a blocking stream, try again. */
goto start;
}
n = 0;
/* There is more to write. */
- if (stream->blocking) {
+ if (stream->flags & UV_STREAM_BLOCKING) {
/*
* If we're blocking then we should not be enabling the write
* watcher - instead we need to try again.
assert(n == 0 || n == -1);
/* Only non-blocking streams should use the write_watcher. */
- assert(!stream->blocking);
+ assert(!(stream->flags & UV_STREAM_BLOCKING));
/* We're not done. */
uv__io_start(stream->loop, &stream->write_watcher);
stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE);
}
- assert(!ev_is_active(&stream->read_watcher));
+ assert(!uv__io_active(&stream->read_watcher));
return;
}
/* EOF */
uv__set_artificial_error(stream->loop, UV_EOF);
uv__io_stop(stream->loop, &stream->read_watcher);
- if (!ev_is_active(&stream->write_watcher))
+ if (!uv__io_active(&stream->write_watcher))
uv__handle_stop(stream);
if (stream->read_cb) {
* if this assert fires then somehow the blocking stream isn't being
* sufficently flushed in uv__write.
*/
- assert(!stream->blocking);
+ assert(!(stream->flags & UV_STREAM_BLOCKING));
uv__io_start(stream->loop, &stream->write_watcher);
}
handle->accepted_fd = -1;
}
- assert(!ev_is_active(&handle->read_watcher));
- assert(!ev_is_active(&handle->write_watcher));
+ assert(!uv__io_active(&handle->read_watcher));
+ assert(!uv__io_active(&handle->write_watcher));
}
} else {
/* Note: writable tty we set to blocking mode. */
uv__stream_open((uv_stream_t*)tty, fd, UV_STREAM_WRITABLE);
- tty->blocking = 1;
+ tty->flags |= UV_STREAM_BLOCKING;
}
loop->counters.tty_init++;
}
-static void uv__udp_start_read_watcher(uv_udp_t* handle) {
- uv__udp_start_watcher(handle,
- &handle->read_watcher,
- uv__udp_recvmsg,
- UV__IO_READ);
-}
-
-
-static void uv__udp_start_write_watcher(uv_udp_t* handle) {
- uv__udp_start_watcher(handle,
- &handle->write_watcher,
- uv__udp_sendmsg,
- UV__IO_WRITE);
-}
-
-
-static void uv__udp_stop_read_watcher(uv_udp_t* handle) {
- uv__udp_stop_watcher(handle, &handle->read_watcher);
-}
-
-
-static void uv__udp_stop_write_watcher(uv_udp_t* handle) {
- uv__udp_stop_watcher(handle, &handle->write_watcher);
-}
-
-
void uv__udp_close(uv_udp_t* handle) {
- uv__udp_stop_write_watcher(handle);
- uv__udp_stop_read_watcher(handle);
+ uv__udp_stop_watcher(handle, &handle->write_watcher);
+ uv__udp_stop_watcher(handle, &handle->read_watcher);
close(handle->fd);
handle->fd = -1;
}
memset(&h, 0, sizeof h);
h.msg_name = &req->addr;
- h.msg_namelen = req->addrlen;
+ h.msg_namelen = (req->addr.sin6_family == AF_INET6 ?
+ sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
h.msg_iov = (struct iovec*)req->bufs;
h.msg_iovlen = req->bufcnt;
handle = container_of(w, uv_udp_t, read_watcher);
assert(handle->type == UV_UDP);
- assert(revents & EV_READ);
+ assert(revents & UV__IO_READ);
assert(handle->recv_cb != NULL);
assert(handle->alloc_cb != NULL);
handle = container_of(w, uv_udp_t, write_watcher);
assert(handle->type == UV_UDP);
- assert(revents & EV_WRITE);
+ assert(revents & UV__IO_WRITE);
assert(!ngx_queue_empty(&handle->write_queue)
|| !ngx_queue_empty(&handle->write_completed_queue));
}
else if (ngx_queue_empty(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
- uv__udp_stop_write_watcher(handle);
+ uv__udp_stop_watcher(handle, &handle->write_watcher);
}
}
uv__req_init(handle->loop, req, UV_UDP_SEND);
+ assert(addrlen <= sizeof(req->addr));
memcpy(&req->addr, addr, addrlen);
- req->addrlen = addrlen;
req->send_cb = send_cb;
req->handle = handle;
req->bufcnt = bufcnt;
memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
ngx_queue_insert_tail(&handle->write_queue, &req->queue);
- uv__udp_start_write_watcher(handle);
+
+ uv__udp_start_watcher(handle,
+ &handle->write_watcher,
+ uv__udp_sendmsg,
+ UV__IO_WRITE);
return 0;
}
handle->alloc_cb = alloc_cb;
handle->recv_cb = recv_cb;
- uv__udp_start_read_watcher(handle);
+
+ uv__udp_start_watcher(handle,
+ &handle->read_watcher,
+ uv__udp_recvmsg,
+ UV__IO_READ);
return 0;
}
int uv_udp_recv_stop(uv_udp_t* handle) {
- uv__udp_stop_read_watcher(handle);
+ uv__udp_stop_watcher(handle, &handle->read_watcher);
handle->alloc_cb = NULL;
handle->recv_cb = NULL;
return 0;
} else {
UV_LOOP_ONCE(loop, uv_poll);
}
- return 0;
+ return UV_LOOP_ALIVE(loop);
}
*/
BENCHMARK_DECLARE (sizes)
+BENCHMARK_DECLARE (loop_count)
BENCHMARK_DECLARE (ping_pongs)
BENCHMARK_DECLARE (tcp_write_batch)
BENCHMARK_DECLARE (tcp4_pound_100)
TASK_LIST_START
BENCHMARK_ENTRY (sizes)
+ BENCHMARK_ENTRY (loop_count)
BENCHMARK_ENTRY (ping_pongs)
BENCHMARK_HELPER (ping_pongs, tcp4_echo_server)
LOGF("uv_shutdown_t: %u bytes\n", (unsigned int) sizeof(uv_shutdown_t));
LOGF("uv_write_t: %u bytes\n", (unsigned int) sizeof(uv_write_t));
LOGF("uv_connect_t: %u bytes\n", (unsigned int) sizeof(uv_connect_t));
+ LOGF("uv_udp_send_t: %u bytes\n", (unsigned int) sizeof(uv_udp_send_t));
LOGF("uv_tcp_t: %u bytes\n", (unsigned int) sizeof(uv_tcp_t));
LOGF("uv_pipe_t: %u bytes\n", (unsigned int) sizeof(uv_pipe_t));
LOGF("uv_tty_t: %u bytes\n", (unsigned int) sizeof(uv_tty_t));
TEST_IMPL(fs_symlink_dir) {
uv_fs_t req;
int r;
- char src_path_buf[PATHMAX];
char* test_dir;
/* set-up */
uv_fs_req_cleanup(&req);
#ifdef _WIN32
- strcpy(src_path_buf, "\\\\?\\");
- uv_cwd(src_path_buf + 4, sizeof(src_path_buf)/sizeof(src_path_buf[0]));
- strcat(src_path_buf, "\\test_dir\\");
- test_dir = src_path_buf;
+ {
+ static char src_path_buf[PATHMAX];
+ strcpy(src_path_buf, "\\\\?\\");
+ uv_cwd(src_path_buf + 4, sizeof(src_path_buf));
+ strcat(src_path_buf, "\\test_dir\\");
+ test_dir = src_path_buf;
+ }
#else
test_dir = "test_dir";
#endif
r = uv_pipe_init(uv_default_loop(), &ctx.send.pipe, 1);
ASSERT(r == 0);
+#ifndef _WIN32
+ /* Clean up stale socket from previous test run. */
+ remove(TEST_PIPENAME);
+#endif
+
r = uv_pipe_bind(&ctx.send.pipe, TEST_PIPENAME);
ASSERT(r == 0);
TEST_DECLARE (platform_output)
TEST_DECLARE (callback_order)
+TEST_DECLARE (run_once)
TEST_DECLARE (tty)
TEST_DECLARE (stdio_over_pipes)
TEST_DECLARE (ipc_listen_before_write)
#if 0
TEST_ENTRY (callback_order)
#endif
+ TEST_ENTRY (run_once)
TEST_ENTRY (pipe_connect_bad_name)
TEST_ENTRY (pipe_connect_to_file)
#ifdef _WIN32
# define BAD_PIPENAME "bad-pipe"
+# define UNLINK_PIPE(name)
#else
# define BAD_PIPENAME "/path/to/unix/socket/that/really/should/not/be/there"
+# define UNLINK_PIPE(name) remove(name)
#endif
uv_pipe_t server1, server2;
int r;
+ UNLINK_PIPE(TEST_PIPENAME);
+
r = uv_pipe_init(uv_default_loop(), &server1, 0);
ASSERT(r == 0);
r = uv_pipe_bind(&server1, TEST_PIPENAME);
#include "uv.h"
#include "task.h"
-static idle_counter = 0;
+#define NUM_TICKS 64
+
+static uv_idle_t idle_handle;
+static int idle_counter;
+
static void idle_cb(uv_idle_t* handle, int status) {
- ASSERT(handle != NULL);
+ ASSERT(handle == &idle_handle);
ASSERT(status == 0);
- idle_counter ++;
+
+ if (++idle_counter == NUM_TICKS)
+ uv_idle_stop(handle);
}
TEST_IMPL(run_once) {
- int n;
- uv_idle_t h;
- uv_idle_init(uv_default_loop(), &h);
- uv_idle_start(&h, idle_cb);
- for (n = 0; n < 500; n++) {
- uv_run_once(uv_default_loop());
- }
- ASSERT(n == 500);
+ uv_idle_init(uv_default_loop(), &idle_handle);
+ uv_idle_start(&idle_handle, idle_cb);
+
+ while (uv_run_once(uv_default_loop()));
+ ASSERT(idle_counter == NUM_TICKS);
+
return 0;
}
'src/unix/fs.c',
'src/unix/internal.h',
'src/unix/loop.c',
+ 'src/unix/loop-watcher.c',
'src/unix/pipe.c',
'src/unix/poll.c',
'src/unix/process.c',
'test/test-poll.c',
'test/test-process-title.c',
'test/test-ref.c',
+ 'test/test-run-once.c',
'test/test-shutdown-close.c',
'test/test-shutdown-eof.c',
'test/test-spawn.c',
'test/benchmark-ares.c',
'test/benchmark-getaddrinfo.c',
'test/benchmark-list.h',
+ 'test/benchmark-loop-count.c',
'test/benchmark-ping-pongs.c',
'test/benchmark-pound.c',
'test/benchmark-pump.c',