#include "uv-common.h"
#include "uv-eio.h"
+#define _GNU_SOURCE /* O_CLOEXEC */
+
#include <stddef.h> /* NULL */
#include <stdio.h> /* printf */
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h> /* PATH_MAX */
void uv__req_init(uv_req_t*);
-void uv__tcp_io(EV_P_ ev_io* watcher, int revents);
void uv__next(EV_P_ ev_idle* watcher, int revents);
-static void uv__tcp_connect(uv_tcp_t*);
-int uv_tcp_open(uv_tcp_t*, int fd);
+static int uv__stream_open(uv_stream_t*, int fd);
static void uv__finish_close(uv_handle_t* handle);
+static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error);
+
+static uv_write_t* uv__write(uv_stream_t* stream);
+static void uv__read(uv_stream_t* stream);
+static void uv__stream_connect(uv_stream_t*);
+static void uv__stream_io(EV_P_ ev_io* watcher, int revents);
+static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
+
+#ifndef __GNUC__
+#define __attribute__(a)
+#endif
+
+/* Unused on systems that support O_CLOEXEC, SOCK_CLOEXEC, etc. */
+static int uv__cloexec(int fd, int set) __attribute__((unused));
+static int uv__nonblock(int fd, int set) __attribute__((unused));
+
+static int uv__socket(int domain, int type, int protocol);
+static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
+
+size_t uv__strlcpy(char* dst, const char* src, size_t size);
+
/* flags */
enum {
int uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
uv_tcp_t* tcp;
+ uv_pipe_t* pipe;
uv_async_t* async;
uv_timer_t* timer;
ev_timer_stop(EV_DEFAULT_ &timer->timer_watcher);
break;
+ case UV_NAMED_PIPE:
+ pipe = (uv_pipe_t*)handle;
+ if (pipe->pipe_fname) {
+ /*
+ * Unlink the file system entity before closing the file descriptor.
+ * Doing it the other way around introduces a race where our process
+ * unlinks a socket with the same name that's just been created by
+ * another thread or process.
+ */
+ unlink(pipe->pipe_fname);
+ free((void*)pipe->pipe_fname);
+ }
+ uv_read_stop((uv_stream_t*)pipe);
+ ev_io_stop(EV_DEFAULT_ &pipe->write_watcher);
+ break;
+
default:
assert(0);
return -1;
ngx_queue_init(&tcp->write_completed_queue);
tcp->write_queue_size = 0;
- ev_init(&tcp->read_watcher, uv__tcp_io);
+ ev_init(&tcp->read_watcher, uv__stream_io);
tcp->read_watcher.data = tcp;
- ev_init(&tcp->write_watcher, uv__tcp_io);
+ ev_init(&tcp->write_watcher, uv__stream_io);
tcp->write_watcher.data = tcp;
assert(ngx_queue_empty(&tcp->write_queue));
int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, int addrsize) {
- int r;
+ int saved_errno;
+ int status;
+ int fd;
- if (tcp->fd <= 0) {
- int fd = socket(domain, SOCK_STREAM, 0);
+ saved_errno = errno;
+ status = -1;
- if (fd < 0) {
+ if (tcp->fd <= 0) {
+ if ((fd = uv__socket(domain, SOCK_STREAM, 0)) == -1) {
uv_err_new((uv_handle_t*)tcp, errno);
- return -1;
+ goto out;
}
- if (uv_tcp_open(tcp, fd)) {
+ if (uv__stream_open((uv_stream_t*)tcp, fd)) {
+ status = -2;
close(fd);
- return -2;
+ goto out;
}
}
assert(tcp->fd >= 0);
- r = bind(tcp->fd, addr, addrsize);
tcp->delayed_error = 0;
-
- if (r) {
- switch (errno) {
- case EADDRINUSE:
- tcp->delayed_error = errno;
- return 0;
-
- default:
- uv_err_new((uv_handle_t*)tcp, errno);
- return -1;
+ if (bind(tcp->fd, addr, addrsize) == -1) {
+ if (errno == EADDRINUSE) {
+ tcp->delayed_error = errno;
+ } else {
+ uv_err_new((uv_handle_t*)tcp, errno);
+ goto out;
}
}
+ status = 0;
- return 0;
+out:
+ errno = saved_errno;
+ return status;
}
}
-int uv_tcp_open(uv_tcp_t* tcp, int fd) {
- int yes;
- int r;
+static int uv__stream_open(uv_stream_t* stream, int fd) {
+ socklen_t yes;
assert(fd >= 0);
- tcp->fd = fd;
+ stream->fd = fd;
- /* Set non-blocking. */
+ /* Reuse the port address if applicable. */
yes = 1;
- r = fcntl(fd, F_SETFL, O_NONBLOCK);
- assert(r == 0);
-
- /* Reuse the port address. */
- r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
- assert(r == 0);
+ if (stream->type == UV_TCP
+ && setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
+ uv_err_new((uv_handle_t*)stream, errno);
+ return -1;
+ }
/* Associate the fd with each ev_io watcher. */
- ev_io_set(&tcp->read_watcher, fd, EV_READ);
- ev_io_set(&tcp->write_watcher, fd, EV_WRITE);
+ ev_io_set(&stream->read_watcher, fd, EV_READ);
+ ev_io_set(&stream->write_watcher, fd, EV_WRITE);
- /* These should have been set up by uv_tcp_init. */
- assert(tcp->next_watcher.data == tcp);
- assert(tcp->write_watcher.data == tcp);
- assert(tcp->read_watcher.data == tcp);
- assert(tcp->read_watcher.cb == uv__tcp_io);
- assert(tcp->write_watcher.cb == uv__tcp_io);
+ /* These should have been set up by uv_tcp_init or uv_pipe_init. */
+ assert(stream->read_watcher.cb == uv__stream_io);
+ assert(stream->write_watcher.cb == uv__stream_io);
return 0;
}
int fd;
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(struct sockaddr_storage);
- uv_tcp_t* tcp = watcher->data;
+ uv_stream_t* stream = watcher->data;
- assert(watcher == &tcp->read_watcher ||
- watcher == &tcp->write_watcher);
+ assert(watcher == &stream->read_watcher ||
+ watcher == &stream->write_watcher);
assert(revents == EV_READ);
- assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
+ assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING));
- if (tcp->accepted_fd >= 0) {
- ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
+ if (stream->accepted_fd >= 0) {
+ ev_io_stop(EV_DEFAULT_ &stream->read_watcher);
return;
}
while (1) {
- assert(tcp->accepted_fd < 0);
- fd = accept(tcp->fd, (struct sockaddr*)&addr, &addrlen);
+ assert(stream->accepted_fd < 0);
+ fd = accept(stream->fd, (struct sockaddr*)&addr, &addrlen);
if (fd < 0) {
if (errno == EAGAIN) {
/* TODO special trick. unlock reserved socket, accept, close. */
return;
} else {
- uv_err_new((uv_handle_t*)tcp, errno);
- tcp->connection_cb((uv_handle_t*)tcp, -1);
+ uv_err_new((uv_handle_t*)stream, errno);
+ stream->connection_cb((uv_handle_t*)stream, -1);
}
} else {
- tcp->accepted_fd = fd;
- tcp->connection_cb((uv_handle_t*)tcp, 0);
- if (tcp->accepted_fd >= 0) {
+ stream->accepted_fd = fd;
+ stream->connection_cb((uv_handle_t*)stream, 0);
+ if (stream->accepted_fd >= 0) {
/* The user hasn't yet accepted called uv_accept() */
- ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
+ ev_io_stop(EV_DEFAULT_ &stream->read_watcher);
return;
}
}
int uv_accept(uv_handle_t* server, uv_stream_t* client) {
- uv_tcp_t* tcpServer = (uv_tcp_t*)server;
- uv_tcp_t* tcpClient = (uv_tcp_t*)client;
+ uv_stream_t* streamServer;
+ uv_stream_t* streamClient;
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+ status = -1;
+
+ streamServer = (uv_stream_t*)server;
+ streamClient = (uv_stream_t*)client;
- if (tcpServer->accepted_fd < 0) {
+ if (streamServer->accepted_fd < 0) {
uv_err_new(server, EAGAIN);
- return -1;
+ goto out;
}
- if (uv_tcp_open(tcpClient, tcpServer->accepted_fd)) {
- /* Ignore error for now */
- tcpServer->accepted_fd = -1;
- close(tcpServer->accepted_fd);
- return -1;
- } else {
- tcpServer->accepted_fd = -1;
- ev_io_start(EV_DEFAULT_ &tcpServer->read_watcher);
- return 0;
+ if (uv__stream_open(streamClient, streamServer->accepted_fd)) {
+ /* TODO handle error */
+ streamServer->accepted_fd = -1;
+ close(streamServer->accepted_fd);
+ goto out;
}
+
+ ev_io_start(EV_DEFAULT_ &streamServer->read_watcher);
+ streamServer->accepted_fd = -1;
+ status = 0;
+
+out:
+ errno = saved_errno;
+ return status;
}
void uv__finish_close(uv_handle_t* handle) {
- uv_tcp_t* tcp;
-
assert(uv_flag_is_set(handle, UV_CLOSING));
assert(!uv_flag_is_set(handle, UV_CLOSED));
uv_flag_set(handle, UV_CLOSED);
switch (handle->type) {
- case UV_TCP:
- /* XXX Is it necessary to stop these watchers here? weren't they
- * supposed to be stopped in uv_close()?
- */
- tcp = (uv_tcp_t*)handle;
- ev_io_stop(EV_DEFAULT_ &tcp->write_watcher);
- ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
-
- assert(!ev_is_active(&tcp->read_watcher));
- assert(!ev_is_active(&tcp->write_watcher));
-
- close(tcp->fd);
- tcp->fd = -1;
-
- if (tcp->accepted_fd >= 0) {
- close(tcp->accepted_fd);
- tcp->accepted_fd = -1;
- }
- break;
-
case UV_PREPARE:
assert(!ev_is_active(&((uv_prepare_t*)handle)->prepare_watcher));
break;
assert(!ev_is_active(&((uv_timer_t*)handle)->timer_watcher));
break;
+ case UV_NAMED_PIPE:
+ case UV_TCP:
+ {
+ uv_stream_t* stream;
+
+ stream = (uv_stream_t*)handle;
+
+ assert(!ev_is_active(&stream->read_watcher));
+ assert(!ev_is_active(&stream->write_watcher));
+
+ close(stream->fd);
+ stream->fd = -1;
+
+ if (stream->accepted_fd >= 0) {
+ close(stream->accepted_fd);
+ stream->accepted_fd = -1;
+ }
+ break;
+ }
+
default:
assert(0);
break;
}
-uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) {
+uv_write_t* uv_write_queue_head(uv_stream_t* stream) {
ngx_queue_t* q;
uv_write_t* req;
- if (ngx_queue_empty(&tcp->write_queue)) {
+ if (ngx_queue_empty(&stream->write_queue)) {
return NULL;
}
- q = ngx_queue_head(&tcp->write_queue);
+ q = ngx_queue_head(&stream->write_queue);
if (!q) {
return NULL;
}
}
-static void uv__drain(uv_tcp_t* tcp) {
+static void uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;
- assert(!uv_write_queue_head(tcp));
- assert(tcp->write_queue_size == 0);
+ assert(!uv_write_queue_head(stream));
+ assert(stream->write_queue_size == 0);
- ev_io_stop(EV_DEFAULT_ &tcp->write_watcher);
+ ev_io_stop(EV_DEFAULT_ &stream->write_watcher);
/* Shutdown? */
- if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUTTING) &&
- !uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING) &&
- !uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT)) {
- assert(tcp->shutdown_req);
+ if (uv_flag_is_set((uv_handle_t*)stream, UV_SHUTTING) &&
+ !uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING) &&
+ !uv_flag_is_set((uv_handle_t*)stream, UV_SHUT)) {
+ assert(stream->shutdown_req);
- req = tcp->shutdown_req;
+ req = stream->shutdown_req;
- if (shutdown(tcp->fd, SHUT_WR)) {
+ if (shutdown(stream->fd, SHUT_WR)) {
/* Error. Report it. User should call uv_close(). */
- uv_err_new((uv_handle_t*)tcp, errno);
+ uv_err_new((uv_handle_t*)stream, errno);
if (req->cb) {
req->cb(req, -1);
}
} else {
- uv_err_new((uv_handle_t*)tcp, 0);
- uv_flag_set((uv_handle_t*)tcp, UV_SHUT);
+ uv_err_new((uv_handle_t*)stream, 0);
+ uv_flag_set((uv_handle_t*)stream, UV_SHUT);
if (req->cb) {
req->cb(req, 0);
}
/* On success returns NULL. On error returns a pointer to the write request
* which had the error.
*/
-static uv_write_t* uv__write(uv_tcp_t* tcp) {
+static uv_write_t* uv__write(uv_stream_t* stream) {
uv_write_t* req;
struct iovec* iov;
int iovcnt;
ssize_t n;
- assert(tcp->fd >= 0);
+ assert(stream->fd >= 0);
/* TODO: should probably while(1) here until EAGAIN */
/* Get the request at the head of the queue. */
- req = uv_write_queue_head(tcp);
+ req = uv_write_queue_head(stream);
if (!req) {
- assert(tcp->write_queue_size == 0);
+ assert(stream->write_queue_size == 0);
return NULL;
}
- assert(req->handle == (uv_stream_t*)tcp);
+ assert(req->handle == stream);
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
if (iovcnt == 1) {
- n = write(tcp->fd, iov[0].iov_base, iov[0].iov_len);
+ n = write(stream->fd, iov[0].iov_base, iov[0].iov_len);
}
else {
- n = writev(tcp->fd, iov, iovcnt);
+ n = writev(stream->fd, iov, iovcnt);
}
if (n < 0) {
if (errno != EAGAIN) {
/* Error */
- uv_err_new((uv_handle_t*)tcp, errno);
+ uv_err_new((uv_handle_t*)stream, errno);
return req;
}
} else {
if (n < len) {
buf->base += n;
buf->len -= n;
- tcp->write_queue_size -= n;
+ stream->write_queue_size -= n;
n = 0;
/* There is more to write. Break and ensure the watcher is pending. */
assert(n >= len);
n -= len;
- assert(tcp->write_queue_size >= len);
- tcp->write_queue_size -= len;
+ assert(stream->write_queue_size >= len);
+ stream->write_queue_size -= len;
if (req->write_index == req->bufcnt) {
/* Then we're done! */
* callback called in the near future.
* TODO: start trying to write the next request.
*/
- ngx_queue_insert_tail(&tcp->write_completed_queue, &req->queue);
- ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+ ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
+ ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
return NULL;
}
}
assert(n == 0 || n == -1);
/* We're not done. */
- ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+ ev_io_start(EV_DEFAULT_ &stream->write_watcher);
return NULL;
}
-static void uv__write_callbacks(uv_tcp_t* tcp) {
+static void uv__write_callbacks(uv_stream_t* stream) {
int callbacks_made = 0;
ngx_queue_t* q;
uv_write_t* req;
- while (!ngx_queue_empty(&tcp->write_completed_queue)) {
+ while (!ngx_queue_empty(&stream->write_completed_queue)) {
/* Pop a req off write_completed_queue. */
- q = ngx_queue_head(&tcp->write_completed_queue);
+ q = ngx_queue_head(&stream->write_completed_queue);
assert(q);
req = ngx_queue_data(q, struct uv_write_s, queue);
ngx_queue_remove(q);
callbacks_made++;
}
- assert(ngx_queue_empty(&tcp->write_completed_queue));
+ assert(ngx_queue_empty(&stream->write_completed_queue));
/* Write queue drained. */
- if (!uv_write_queue_head(tcp)) {
- uv__drain(tcp);
+ if (!uv_write_queue_head(stream)) {
+ uv__drain(stream);
}
}
-void uv__read(uv_tcp_t* tcp) {
+static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
struct iovec* iov;
ssize_t nread;
/* XXX: Maybe instead of having UV_READING we just test if
* tcp->read_cb is NULL or not?
*/
- while (tcp->read_cb && uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
- assert(tcp->alloc_cb);
- buf = tcp->alloc_cb((uv_stream_t*)tcp, 64 * 1024);
+ while (stream->read_cb && uv_flag_is_set((uv_handle_t*)stream, UV_READING)) {
+ assert(stream->alloc_cb);
+ buf = stream->alloc_cb(stream, 64 * 1024);
assert(buf.len > 0);
assert(buf.base);
iov = (struct iovec*) &buf;
- nread = read(tcp->fd, buf.base, buf.len);
+ nread = read(stream->fd, buf.base, buf.len);
if (nread < 0) {
/* Error */
if (errno == EAGAIN) {
/* Wait for the next one. */
- if (uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
- ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
+ if (uv_flag_is_set((uv_handle_t*)stream, UV_READING)) {
+ ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
}
- uv_err_new((uv_handle_t*)tcp, EAGAIN);
- tcp->read_cb((uv_stream_t*)tcp, 0, buf);
+ uv_err_new((uv_handle_t*)stream, EAGAIN);
+ stream->read_cb(stream, 0, buf);
return;
} else {
/* Error. User should call uv_close(). */
- uv_err_new((uv_handle_t*)tcp, errno);
- tcp->read_cb((uv_stream_t*)tcp, -1, buf);
- assert(!ev_is_active(&tcp->read_watcher));
+ uv_err_new((uv_handle_t*)stream, errno);
+ stream->read_cb(stream, -1, buf);
+ assert(!ev_is_active(&stream->read_watcher));
return;
}
} else if (nread == 0) {
/* EOF */
- uv_err_new_artificial((uv_handle_t*)tcp, UV_EOF);
- ev_io_stop(EV_DEFAULT_UC_ &tcp->read_watcher);
- tcp->read_cb((uv_stream_t*)tcp, -1, buf);
+ uv_err_new_artificial((uv_handle_t*)stream, UV_EOF);
+ ev_io_stop(EV_DEFAULT_UC_ &stream->read_watcher);
+ stream->read_cb(stream, -1, buf);
return;
} else {
/* Successful read */
- tcp->read_cb((uv_stream_t*)tcp, nread, buf);
+ stream->read_cb(stream, nread, buf);
}
}
}
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
uv_tcp_t* tcp = (uv_tcp_t*)handle;
- assert(handle->type == UV_TCP &&
- "uv_shutdown (unix) only supports uv_tcp_t right now");
+ assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+ && "uv_shutdown (unix) only supports uv_tcp_t right now");
assert(tcp->fd >= 0);
/* Initialize request */
}
-void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
- uv_tcp_t* tcp = watcher->data;
+static void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
+ uv_stream_t* stream = watcher->data;
- assert(tcp->type == UV_TCP);
- assert(watcher == &tcp->read_watcher ||
- watcher == &tcp->write_watcher);
- assert(tcp->fd >= 0);
- assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
+ assert(stream->type == UV_TCP ||
+ stream->type == UV_NAMED_PIPE);
+ assert(watcher == &stream->read_watcher ||
+ watcher == &stream->write_watcher);
+ assert(stream->fd >= 0);
+ assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING));
- if (tcp->connect_req) {
- uv__tcp_connect(tcp);
+ if (stream->connect_req) {
+ uv__stream_connect(stream);
} else {
if (revents & EV_READ) {
- uv__read(tcp);
+ uv__read((uv_stream_t*)stream);
}
if (revents & EV_WRITE) {
- uv_write_t* req = uv__write(tcp);
+ uv_write_t* req = uv__write(stream);
if (req) {
/* Error. Notify the user. */
if (req->cb) {
req->cb(req, -1);
}
} else {
- uv__write_callbacks(tcp);
+ uv__write_callbacks(stream);
}
}
}
* In order to determine if we've errored out or succeeded must call
* getsockopt.
*/
-static void uv__tcp_connect(uv_tcp_t* tcp) {
+static void uv__stream_connect(uv_stream_t* stream) {
int error;
- uv_connect_t* req = tcp->connect_req;
+ uv_connect_t* req = stream->connect_req;
socklen_t errorsize = sizeof(int);
- assert(tcp->type == UV_TCP);
- assert(tcp->fd >= 0);
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
+ assert(stream->fd >= 0);
assert(req);
- if (tcp->delayed_error) {
+ if (stream->delayed_error) {
/* To smooth over the differences between unixes errors that
* were reported synchronously on the first connect can be delayed
* until the next tick--which is now.
*/
- error = tcp->delayed_error;
- tcp->delayed_error = 0;
+ error = stream->delayed_error;
+ stream->delayed_error = 0;
} else {
/* Normal situation: we need to get the socket error from the kernel. */
- getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
+ getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
}
if (!error) {
- ev_io_start(EV_DEFAULT_ &tcp->read_watcher);
+ ev_io_start(EV_DEFAULT_ &stream->read_watcher);
/* Successful connection */
- tcp->connect_req = NULL;
+ stream->connect_req = NULL;
if (req->cb) {
req->cb(req, 0);
}
return;
} else {
/* Error */
- uv_err_new((uv_handle_t*)tcp, error);
+ uv_err_new((uv_handle_t*)stream, error);
- tcp->connect_req = NULL;
+ stream->connect_req = NULL;
if (req->cb) {
req->cb(req, -1);
}
}
-static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
- socklen_t addrlen, uv_connect_cb cb) {
+static int uv__connect(uv_connect_t* req,
+ uv_stream_t* stream,
+ struct sockaddr* addr,
+ socklen_t addrlen,
+ uv_connect_cb cb) {
+
+ int sockfd;
int r;
- if (tcp->fd <= 0) {
- int fd = socket(addr->sa_family, SOCK_STREAM, 0);
+ if (stream->fd <= 0) {
+ if ((sockfd = uv__socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
- if (fd < 0) {
- uv_err_new((uv_handle_t*)tcp, errno);
+ }
+
+ if (sockfd < 0) {
+ uv_err_new((uv_handle_t*)stream, errno);
return -1;
}
- if (uv_tcp_open(tcp, fd)) {
- close(fd);
+ if (uv__stream_open(stream, sockfd)) {
+ close(sockfd);
return -2;
}
}
uv__req_init((uv_req_t*)req);
req->cb = cb;
- req->handle = (uv_stream_t*)tcp;
+ req->handle = stream;
req->type = UV_CONNECT;
ngx_queue_init(&req->queue);
- if (tcp->connect_req) {
- uv_err_new((uv_handle_t*)tcp, EALREADY);
+ if (stream->connect_req) {
+ uv_err_new((uv_handle_t*)stream, EALREADY);
return -1;
}
- if (tcp->type != UV_TCP) {
- uv_err_new((uv_handle_t*)tcp, ENOTSOCK);
+ if (stream->type != UV_TCP) {
+ uv_err_new((uv_handle_t*)stream, ENOTSOCK);
return -1;
}
- tcp->connect_req = req;
+ stream->connect_req = req;
- r = connect(tcp->fd, addr, addrlen);
+ r = connect(stream->fd, addr, addrlen);
- tcp->delayed_error = 0;
+ stream->delayed_error = 0;
if (r != 0 && errno != EINPROGRESS) {
switch (errno) {
* wait.
*/
case ECONNREFUSED:
- tcp->delayed_error = errno;
+ stream->delayed_error = errno;
break;
default:
- uv_err_new((uv_handle_t*)tcp, errno);
+ uv_err_new((uv_handle_t*)stream, errno);
return -1;
}
}
- assert(tcp->write_watcher.data == tcp);
- ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+ assert(stream->write_watcher.data == stream);
+ ev_io_start(EV_DEFAULT_ &stream->write_watcher);
- if (tcp->delayed_error) {
- ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+ if (stream->delayed_error) {
+ ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
}
return 0;
}
-int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
- struct sockaddr_in address, uv_connect_cb cb) {
- assert(handle->type == UV_TCP);
- assert(address.sin_family == AF_INET);
- return uv__connect(req, handle, (struct sockaddr*) &address,
- sizeof(struct sockaddr_in), cb);
+int uv_tcp_connect(uv_connect_t* req,
+ uv_tcp_t* handle,
+ struct sockaddr_in address,
+ uv_connect_cb cb) {
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+ status = -1;
+
+ if (handle->type != UV_TCP) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ if (address.sin_family != AF_INET) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ status = uv__connect(req,
+ (uv_stream_t*)handle,
+ (struct sockaddr*)&address,
+ sizeof address,
+ cb);
+
+out:
+ errno = saved_errno;
+ return status;
}
-int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
- struct sockaddr_in6 address, uv_connect_cb cb) {
- assert(handle->type == UV_TCP);
- assert(address.sin6_family == AF_INET6);
- return uv__connect(req, handle, (struct sockaddr*) &address,
- sizeof(struct sockaddr_in6), cb);
+int uv_tcp_connect6(uv_connect_t* req,
+ uv_tcp_t* handle,
+ struct sockaddr_in6 address,
+ uv_connect_cb cb) {
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+ status = -1;
+
+ if (handle->type != UV_TCP) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ if (address.sin6_family != AF_INET6) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ status = uv__connect(req,
+ (uv_stream_t*)handle,
+ (struct sockaddr*)&address,
+ sizeof address,
+ cb);
+
+out:
+ errno = saved_errno;
+ return status;
}
*/
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
+ uv_stream_t* stream;
int empty_queue;
- uv_tcp_t* tcp = (uv_tcp_t*)handle;
+
+ stream = (uv_stream_t*)handle;
/* Initialize the req */
uv__req_init((uv_req_t*) req);
req->handle = handle;
ngx_queue_init(&req->queue);
- assert(handle->type == UV_TCP &&
- "uv_write (unix) does not yet support other types of streams");
+ assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
+ && "uv_write (unix) does not yet support other types of streams");
- empty_queue = (tcp->write_queue_size == 0);
- assert(tcp->fd >= 0);
+ empty_queue = (stream->write_queue_size == 0);
+ assert(stream->fd >= 0);
ngx_queue_init(&req->queue);
req->type = UV_WRITE;
*/
req->write_index = 0;
- tcp->write_queue_size += uv__buf_count(bufs, bufcnt);
+ stream->write_queue_size += uv__buf_count(bufs, bufcnt);
/* Append the request to write_queue. */
- ngx_queue_insert_tail(&tcp->write_queue, &req->queue);
+ ngx_queue_insert_tail(&stream->write_queue, &req->queue);
- assert(!ngx_queue_empty(&tcp->write_queue));
- assert(tcp->write_watcher.cb == uv__tcp_io);
- assert(tcp->write_watcher.data == tcp);
- assert(tcp->write_watcher.fd == tcp->fd);
+ assert(!ngx_queue_empty(&stream->write_queue));
+ assert(stream->write_watcher.cb == uv__stream_io);
+ assert(stream->write_watcher.data == stream);
+ assert(stream->write_watcher.fd == stream->fd);
/* If the queue was empty when this function began, we should attempt to
* do the write immediately. Otherwise start the write_watcher and wait
* for the fd to become writable.
*/
if (empty_queue) {
- if (uv__write(tcp)) {
+ if (uv__write(stream)) {
/* Error. uv_last_error has been set. */
return -1;
}
* means we need to make the callback. The callback can only be done on a
* fresh stack so we feed the event loop in order to service it.
*/
- if (ngx_queue_empty(&tcp->write_queue)) {
- ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE);
+ if (ngx_queue_empty(&stream->write_queue)) {
+ ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE);
} else {
/* Otherwise there is data to write - so we should wait for the file
* descriptor to become writable.
*/
- ev_io_start(EV_DEFAULT_ &tcp->write_watcher);
+ ev_io_start(EV_DEFAULT_ &stream->write_watcher);
}
return 0;
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
- uv_tcp_t* tcp = (uv_tcp_t*)stream;
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
/* The UV_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
- uv_flag_set((uv_handle_t*)tcp, UV_READING);
+ uv_flag_set((uv_handle_t*)stream, UV_READING);
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
- assert(tcp->fd >= 0);
+ assert(stream->fd >= 0);
assert(alloc_cb);
- tcp->read_cb = read_cb;
- tcp->alloc_cb = alloc_cb;
+ stream->read_cb = read_cb;
+ stream->alloc_cb = alloc_cb;
/* These should have been set by uv_tcp_init. */
- assert(tcp->read_watcher.data == tcp);
- assert(tcp->read_watcher.cb == uv__tcp_io);
+ assert(stream->read_watcher.cb == uv__stream_io);
- ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
+ ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
return 0;
}
int uv_pipe_init(uv_pipe_t* handle) {
- assert(0 && "implement me");
+ memset(handle, 0, sizeof handle);
+
+ uv__handle_init((uv_handle_t*)handle, UV_NAMED_PIPE);
+ uv_counters()->pipe_init++;
+
+ handle->type = UV_NAMED_PIPE;
+ handle->pipe_fname = NULL; /* Only set by listener. */
+
+ ev_init(&handle->write_watcher, uv__stream_io);
+ ev_init(&handle->read_watcher, uv__stream_io);
+ handle->write_watcher.data = handle;
+ handle->read_watcher.data = handle;
+ handle->fd = -1;
+
+ ngx_queue_init(&handle->write_completed_queue);
+ ngx_queue_init(&handle->write_queue);
+
+ return 0;
}
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
- assert(0 && "implement me");
+ struct sockaddr_un sun;
+ int saved_errno;
+ int sockfd;
+ int status;
+ int bound;
+
+ saved_errno = errno;
+ sockfd = -1;
+ status = -1;
+ bound = 0;
+
+ /* Make a copy of the file name, it outlives this function's scope. */
+ if ((name = (const char*)strdup(name)) == NULL) {
+ uv_err_new((uv_handle_t*)handle, ENOMEM);
+ goto out;
+ }
+
+ if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+
+ memset(&sun, 0, sizeof sun);
+ uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path));
+ sun.sun_family = AF_UNIX;
+
+ if (bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+#ifdef DONT_RACE_ME_BRO
+ /*
+ * Try to bind the socket. Note that we explicitly don't act
+ * on EADDRINUSE. Unlinking and trying to bind again opens
+ * a window for races with other threads and processes.
+ */
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+#else
+ /*
+ * Try to re-purpose the socket. This is a potential race window.
+ */
+ if (errno != EADDRINUSE
+ || unlink(name) == -1
+ || bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+#endif
+ }
+ bound = 1;
+
+ /* Success. */
+ handle->pipe_fname = name; /* Is a strdup'ed copy. */
+ handle->fd = sockfd;
+ status = 0;
+
+out:
+ /* Clean up on error. */
+ if (status) {
+ if (bound) {
+ /* unlink() before close() to avoid races. */
+ unlink(name);
+ }
+ close(sockfd);
+ free((void*)name);
+ }
+
+ errno = saved_errno;
+ return status;
}
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
- assert(0 && "implement me");
+ int saved_errno;
+ int status;
+
+ saved_errno = errno;
+
+ if ((status = listen(handle->fd, SOMAXCONN)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ } else {
+ handle->connection_cb = cb;
+ ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ);
+ ev_io_start(EV_DEFAULT_ &handle->read_watcher);
+ }
+
+ errno = saved_errno;
+ return status;
+}
+
+
+int uv_pipe_connect(uv_connect_t* req,
+ uv_pipe_t* handle,
+ const char* name,
+ uv_connect_cb cb) {
+ struct sockaddr_un sun;
+ int saved_errno;
+ int sockfd;
+ int status;
+
+ saved_errno = errno;
+ sockfd = -1;
+ status = -1;
+
+ if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+
+ memset(&sun, 0, sizeof sun);
+ uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path));
+ sun.sun_family = AF_UNIX;
+
+ /* We don't check for EINPROGRESS. Think about it: the socket
+ * is either there or not.
+ */
+ if (connect(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ close(sockfd);
+ goto out;
+ }
+
+ handle->fd = sockfd;
+ ev_io_init(&handle->read_watcher, uv__stream_io, sockfd, EV_READ);
+ ev_io_init(&handle->write_watcher, uv__stream_io, sockfd, EV_WRITE);
+ ev_io_start(EV_DEFAULT_ &handle->read_watcher);
+ ev_io_start(EV_DEFAULT_ &handle->write_watcher);
+
+ status = 0;
+
+out:
+ uv__req_init((uv_req_t*)req);
+ req->handle = (uv_stream_t*)handle;
+ req->type = UV_CONNECT;
+ req->cb = cb;
+ ngx_queue_init(&req->queue);
+
+ if (cb) {
+ cb(req, status);
+ }
+
+ /* Mimic the Windows pipe implementation, always
+ * return 0 and let the callback handle errors.
+ */
+ errno = saved_errno;
+ return 0;
}
-int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
- const char* name, uv_connect_cb cb) {
- assert(0 && "implement me");
+/* TODO merge with uv__server_io()? */
+static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
+ struct sockaddr_un sun;
+ uv_pipe_t* pipe;
+ int saved_errno;
+ int sockfd;
+
+ saved_errno = errno;
+ pipe = watcher->data;
+
+ assert(pipe->type == UV_NAMED_PIPE);
+ assert(pipe->pipe_fname != NULL);
+
+ sockfd = uv__accept(pipe->fd, (struct sockaddr *)&sun, sizeof sun);
+ if (sockfd == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ assert(0 && "EAGAIN on uv__accept(pipefd)");
+ } else {
+ uv_err_new((uv_handle_t*)pipe, errno);
+ }
+ } else {
+ pipe->accepted_fd = sockfd;
+ pipe->connection_cb((uv_handle_t*)pipe, 0);
+ if (pipe->accepted_fd == sockfd) {
+ /* The user hasn't yet accepted called uv_accept() */
+ ev_io_stop(EV_DEFAULT_ &pipe->read_watcher);
+ }
+ }
+
+ errno = saved_errno;
+}
+
+
+/* Open a socket in non-blocking close-on-exec mode, atomically if possible. */
+static int uv__socket(int domain, int type, int protocol) {
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ return socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
+#else
+ int sockfd;
+
+ if ((sockfd = socket(domain, type, protocol)) == -1) {
+ return -1;
+ }
+
+ if (uv__nonblock(sockfd, 1) == -1 || uv__cloexec(sockfd, 1) == -1) {
+ close(sockfd);
+ return -1;
+ }
+
+ return sockfd;
+#endif
+}
+
+
+static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t slen) {
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ return accept4(sockfd, saddr, &slen, SOCK_NONBLOCK | SOCK_CLOEXEC);
+#else
+ int peerfd;
+
+ if ((peerfd = accept(sockfd, saddr, &slen)) == -1) {
+ return -1;
+ }
+
+ if (uv__cloexec(peerfd, 1) == -1 || uv__nonblock(peerfd, 1) == -1) {
+ close(peerfd);
+ return -1;
+ }
+
+ return peerfd;
+#endif
+}
+
+
+static int uv__nonblock(int fd, int set) {
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFL)) == -1) {
+ return -1;
+ }
+
+ if (set) {
+ flags |= O_NONBLOCK;
+ } else {
+ flags &= ~O_NONBLOCK;
+ }
+
+ if (fcntl(fd, F_SETFL, flags) == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static int uv__cloexec(int fd, int set) {
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFD)) == -1) {
+ return -1;
+ }
+
+ if (set) {
+ flags |= FD_CLOEXEC;
+ } else {
+ flags &= ~FD_CLOEXEC;
+ }
+
+ if (fcntl(fd, F_SETFD, flags) == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/* TODO move to uv-common.c? */
+size_t uv__strlcpy(char* dst, const char* src, size_t size) {
+ const char *org;
+
+ if (size == 0) {
+ return 0;
+ }
+
+ org = src;
+ while (size > 1) {
+ if ((*dst++ = *src++) == '\0') {
+ return org - src;
+ }
+ }
+ *dst = '\0';
+
+ return src - org;
}