+++ /dev/null
-liboi is Copyright (C) 2009 Ryan Dahl.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright
- notice, this list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above
- copyright notice, this list of conditions and the following disclaimer in
- the documentation and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-POSSIBILITY OF SUCH DAMAGE.
+++ /dev/null
-liboi is a C library for doing evented I/O. It is intended for building
-efficent internet programs.
-
-liboi is released under the X11 license.
-
-= Feature Summary
-
- * The library has a minimalist design
- - Does not make internal allocations
- - Does not wrap functionality of GnuTLS or libev. The user must use those
- libraries in conjuction with liboi.
- * Supports both server and client sockets.
- * Supports evented file I/O emulation through a thread pool.
- * SSL support
- * Sendfile (file to socket) with emulation on platforms that do not support
- it.
-
-= Building
-
- 1 Edit config.mk. You almost certainly will need to set the EVDIR and
- GNUTLSDIR variables.
- 2 Run "make"
-
-= Documentation
-
- 1 make doc
- 2 man ./oi.3
-
-= Website
-
-http://github.com/ry/liboi
-
-= Author
-
-Ryan Dahl (ry@tinyclouds.org)
-
+++ /dev/null
-# Define EVDIR=/foo/bar if your libev header and library files are in
-# /foo/bar/include and /foo/bar/lib directories.
-EVDIR=$(HOME)/local/libev
-
-# Define GNUTLSDIR=/foo/bar if your gnutls header and library files are in
-# /foo/bar/include and /foo/bar/lib directories.
-#GNUTLSDIR=/usr
-
-uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
-uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
-uname_O := $(shell sh -c 'uname -o 2>/dev/null || echo not')
-uname_R := $(shell sh -c 'uname -r 2>/dev/null || echo not')
-uname_P := $(shell sh -c 'uname -p 2>/dev/null || echo not')
-
-# CFLAGS and LDFLAGS are for the users to override from the command line.
-CFLAGS = -g
-LDFLAGS =
-
-PREFIX = $(HOME)/local/liboi
-
-CC = gcc
-AR = ar
-RM = rm -f
-RANLIB = ranlib
+++ /dev/null
-#ifndef oi_h
-#define oi_h
-
-#include <oi_socket.h>
-
-#endif
+++ /dev/null
-=head1 NAME
-
-liboi - a C library for doing evented I/O.
-
-=head1 SYNOPSIS
-
- #include <oi.h>
-
-=head1 DESCRIPTION
-
-liboi is an object oriented library for doing evented socket and file I/O.
-The API is mostly about registering callbacks to be executed on certain
-events.
-
-Because most systems do not support asynchornous file I/O, the behavior is
-emulated with an internal thread pool. The thread pool is accessed with the
-C<oi_async> and C<oi_task> objects. Typically one will not need to use these
-directly as C<oi_file> wraps that functionality.
-
-=head2 CONVENTIONS
-
-liboi's goal is to be very simple layer above the POSIX API. To that end it
-avoids internal allocations as much as possible. Unless otherwise noted you
-should assume all pointers passed into liboi will remain your responsibility
-to maintain. That means you should not free the data passed into liboi
-until the object in question has completed.
-
-C<oi_socket> and C<oi_file> objects must be attached to an event loop. This
-is completed with the C<*_attach> and C<*_detach> methods. When an object
-is detached, other methods can be called - just the loop will not churn out
-callbacks.
-
-Both C<oi_socket> and C<oi_file> contain a number of callback pointers.
-These are to be set manually after calling their initalization functions.
-All classes include a C<void *data> member which is left for you to use.
-
-=head1 ERROR HANDLING
-
-
-=head1 Sockets
-
-The C<oi_socket> structure represents a socket.
-The callbacks inside C<oi_socket> are
- void (*on_connect) (oi_socket *);
- void (*on_read) (oi_socket *, const void *buf, size_t count);
- void (*on_drain) (oi_socket *);
- void (*on_error) (oi_socket *, struct oi_error e);
- void (*on_close) (oi_socket *);
- void (*on_timeout) (oi_socket *);
-
-A the memory for a socket is released when the C<on_close()> callback is
-made. That is, the user may free the memory for the socket with-in the
-C<on_close()> callback.
-
-=over 4
-
-=item void oi_socket_init (oi_socket *, float timeout);
-
-Initialize a socket. C<timeout> is the number of seconds of inactivity that
-is allowed before the socket closes. The timeout only starts once the
-socket is attached to a loop and open.
-
-A C<timeout> argument of 0.0 signals that no timeout should be used.
-
-After calling this function, register your callbacks manually. Thus your
-code will probably look like this
- oi_socket socket;
- oi_socket_init(&socket, 60.0);
- socket.on_connect = my_on_connect;
- socket.on_read = my_on_read;
- /* etc */
-
-
-=item int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
-
-Open a client connect to the specified address. When the connection is made
-C<socket.on_connect> will be called.
-
-Here is an example of filling in C<addrinfo> for a local TCP connection on
-port 5555:
- struct addrinfo *servinfo;
- struct addrinfo hints;
- memset(&hints, 0, sizeof hints);
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
- r = getaddrinfo(NULL, "5555", &hints, &servinfo);
- assert(r == 0);
- oi_socket_connect(socket, servinfo);
-
-=item void oi_socket_attach (oi_socket *, struct ev_loop *loop);
-
-A socket must be attached to a loop in order before any callbacks will be
-made.
-
-=item void oi_socket_detach (oi_socket *);
-
-Detaching a socket will not close the connection.
-
-=item void oi_socket_read_start (oi_socket *);
-
-This will make the socket start receiving data. When data is received the
-C<socket.on_read> callback is made. The maximum amount of data that can be
-receive at a time is controlled by C<socket.chunksize>.
-
-The buffer returned by C<socket.on_read> is statically allocated exists only
-for the length of the callback. That means if you need to save any of the
-data coming down the line, you must copy it to a new buffer.
-
-Ideally you will have a parser attached to the C<on_read> callback which can
-be interrupted at any time.
-
-C<socket.chunksize> can be changed at any time.
-
-=item void oi_socket_read_stop (oi_socket *);
-
-Stops receiving data. You may receive spurious C<on_read> attempts even
-though the socket reading is stopped - be prepared to handle them.
-
-=item void oi_socket_reset_timeout (oi_socket *);
-
-Reset the timeout to allow the socket to exist for another few seconds
-(however long you specified in the initialization function).
-
-=item void oi_socket_write (oi_socket *socket, oi_buf *buf);
-
-Write the I<buf> to the socket. Each socket has a queue of C<oi_buf> objects
-to be written - this appends the specified buffer to the end of that queue.
-You will be notified when the queue is empty with the C<socket.on_drain()>
-callback. When the socket has written the buffer C<buf.release()> will be
-called. The release callback does not imply that the buffer was successfully
-written.
-
-=item void oi_socket_write_simple (oi_socket *, const char *str, size_t len);
-
-Sometimes you are just hacking around and need to quickly write a string to
-the socket. This convenience function allocates an C<oi_buf> object, and
-C<strdup>s the given string. The allocated buffer will be freed by liboi
-internally.
-
-Most production most applications will use their own memory pool and will
-not need this function.
-
-=item void oi_socket_write_eof (oi_socket *);
-
-This closes the write end of the socket. Further writes are not allowed
-after this.
-
-=item void oi_socket_close (oi_socket *);
-
-Attempts to close the socket.
-
-If the socket is secure, an SSL bye message will be sent.
-SSL recommends that you wait for a bye response from the peer however this
-tends to be overkill for most people. By default liboi will not wait for
-peer to send a matching bye message. If you require this then set
-C<socket.wait_for_secure_hangup> to 1.
-
-When the close is complete C<on_close()> is made. The C<on_close()>
-callback is not made until the program returns to the event loop. This is
-because C<on_close()> may free the socket memory and if C<on_close()> was
-called from C<oi_socket_close()>, then the socket object might unexpectedly
-be gone. To summarize: C<oi_socket_close()> does not call C<on_close()> and
-the socket memory is still accessable immediately after making calling
-C<oi_socket_close()>.
-
-=item void oi_socket_set_secure_session (oi_socket *, gnutls_session_t);
-
-This make a socket use SSL. You must create the GnuTLS session yourself and
-assign its credentials.
-
-=back
-
-=head1 Servers
-
-A server simply listens on an address for new connections. The connections
-come in the form of C<oi_socket> objects. The key is to give a
-C<server.on_connection()> callback which returns an initialized C<oi_socket>.
-The callback looks like this
-
- oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len);
-
-Returning NULL from C<on_connection()> will reject the connection.
-
-=over 4
-
-=item void oi_server_init (oi_server *, int backlog);
-
-Initializes a server object. C<backlog> is the argument given
-internally to C<listen()>. Set the C<server.on_connection()> callback
-after calling this.
-
-=item int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
-
-Listens on the specified address. The server will not accept connections
-until it is attached to a loop, however.
-
-=item void oi_server_attach (oi_server *, struct ev_loop *loop);
-
-Attaches a server to a loop.
-
-=item void oi_server_detach (oi_server *);
-
-Detaches a server to a loop. Does not close the server.
-
-=item void oi_server_close (oi_server *);
-
-Stops the server from listening.
-
-=back
-
-=head1 Files
-
-Files internally use a thread pool to operate without blocking.
-The thread pool is started once a file is attached and it continues until
-program termination.
-
-The following callbacks are used inside of the file object
- void (*on_open) (oi_file *);
- void (*on_read) (oi_file *, size_t count);
- void (*on_drain) (oi_file *);
- void (*on_error) (oi_file *, struct oi_error);
- void (*on_close) (oi_file *);
-
-=over 4
-
-=item int oi_file_init (oi_file *);
-
-Initializes a file object.
-
-=item void oi_file_attach (oi_file *, struct ev_loop *);
-
-Attaches a file object to a loop. If the thread pool has not been started,
-then it is started at this call.
-
-=item void oi_file_detach (oi_file *);
-
-Detaches a file object from the loop.
-
-=item int oi_file_open_path (oi_file *, const char *path, int flags, mode_t mode);
-
-Opens a file specified by the path. The C<flag> and C<mode> arguments are
-the same as used by L<open.2>. The C<file.on_open> callback is triggered
-when the file is opened. Returns 0 on success. Returns -1 if the given file
-is already open.
-
-WARNING: path argument must be valid until C<oi_file> object is closed and
-the C<file.on_close()> callback is made. I.E., liboi does not strdup the path
-pointer.
-
-=item int oi_file_open_stdin (oi_file *);
-
-=item int oi_file_open_stdout (oi_file *);
-
-=item int oi_file_open_stderr (oi_file *);
-
-=item void oi_file_read_start (oi_file *, void *buffer, size_t bufsize);
-
-
-
-=item void oi_file_read_stop (oi_file *);
-
-=item int oi_file_write (oi_file *, oi_buf *to_write);
-
-=item int oi_file_write_simple (oi_file *, const char *, size_t);
-
-=item int oi_file_send (oi_file *source, oi_socket *destination, off_t offset, size_t length);
-
-=item void oi_file_close (oi_file *);
-
-=over 4
-
-=back
-
-=head1 AUTHOR
-
-Ryan Dahl <ry@tinyclouds.org>
-
+++ /dev/null
-#include <oi_buf.h>
-#include <stdlib.h>
-#include <string.h>
-
-void oi_buf_destroy
- ( oi_buf *buf
- )
-{
- free(buf->base);
- free(buf);
-}
-
-oi_buf * oi_buf_new2
- ( size_t len
- )
-{
- oi_buf *buf = malloc(sizeof(oi_buf));
- if(!buf)
- return NULL;
- buf->base = malloc(len);
- if(!buf->base) {
- free(buf);
- return NULL;
- }
- buf->len = len;
- buf->release = oi_buf_destroy;
- return buf;
-}
-
-oi_buf * oi_buf_new
- ( const char *base
- , size_t len
- )
-{
- oi_buf *buf = oi_buf_new2(len);
- if(!buf)
- return NULL;
- memcpy(buf->base, base, len);
- return buf;
-}
-
+++ /dev/null
-#include <oi_queue.h>
-
-#ifndef oi_buf_h
-#define oi_buf_h
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct oi_buf oi_buf;
-
-struct oi_buf {
- /* public */
- char *base;
- size_t len;
- void (*release) (oi_buf *); /* called when oi is done with the object */
- void *data;
-
- /* private */
- size_t written;
- oi_queue queue;
-};
-
-oi_buf * oi_buf_new (const char* base, size_t len);
-oi_buf * oi_buf_new2 (size_t len);
-void oi_buf_destroy (oi_buf *);
-
-#ifdef __cplusplus
-}
-#endif
-#endif // oi_buf_h
+++ /dev/null
-#ifndef oi_error_h
-#define oi_error_h
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-enum oi_error_domain
- { OI_ERROR_GNUTLS
- , OI_ERROR_EV
- , OI_ERROR_CLOSE
- , OI_ERROR_SHUTDOWN
- , OI_ERROR_OPEN
- , OI_ERROR_SEND
- , OI_ERROR_RECV
- , OI_ERROR_WRITE
- , OI_ERROR_READ
- , OI_ERROR_SENDFILE
- };
-
-struct oi_error {
- enum oi_error_domain domain;
- int code; /* errno */
-};
-
-#ifdef __cplusplus
-}
-#endif
-#endif // oi_error_h
+++ /dev/null
-/* Copyright (C) 2002-2009 Igor Sysoev
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- */
-#ifndef oi_queue_h
-#define oi_queue_h
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include <stddef.h> /* offsetof() */
-
-typedef struct oi_queue oi_queue;
-struct oi_queue {
- oi_queue *prev;
- oi_queue *next;
-};
-
-#define oi_queue_init(q) \
- (q)->prev = q; \
- (q)->next = q
-
-#define oi_queue_empty(h) \
- (h == (h)->prev)
-
-#define oi_queue_insert_head(h, x) \
- (x)->next = (h)->next; \
- (x)->next->prev = x; \
- (x)->prev = h; \
- (h)->next = x
-
-#define oi_queue_head(h) \
- (h)->next
-
-#define oi_queue_last(h) \
- (h)->prev
-
-#define oi_queue_remove(x) \
- (x)->next->prev = (x)->prev; \
- (x)->prev->next = (x)->next; \
- (x)->prev = NULL; \
- (x)->next = NULL
-
-#define oi_queue_data(q, type, link) \
- (type *) ((unsigned char *) q - offsetof(type, link))
-
-#ifdef __cplusplus
-}
-#endif
-#endif // oi_queue_h
+/* Copyright (c) 2008,2009 Ryan Dahl
+ *
+ * oi_queue comes from ngx_queue.h
+ * Copyright (C) 2002-2009 Igor Sysoev
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
-# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1)
-# define GNUTLS_NEED_READ (gnutls_record_get_direction(socket->session) == 0)
-#endif
+#endif // HAVE_GNUTLS
+
+/* a few forwards
+ * they wont even be defined if not having gnutls
+ * */
+static int secure_full_goodbye (oi_socket *socket);
+static int secure_half_goodbye (oi_socket *socket);
#undef TRUE
#define TRUE 1
#define AGAIN 1
#define ERROR 2
-#define RAISE_ERROR(s, _domain, _code) do { \
- if(s->on_error) { \
- struct oi_error __oi_error; \
- __oi_error.domain = _domain; \
- __oi_error.code = _code; \
- s->on_error(s, __oi_error); \
- } \
-} while(0) \
+void
+oi_buf_destroy (oi_buf *buf)
+{
+ free(buf->base);
+ free(buf);
+}
+
+oi_buf *
+oi_buf_new2 (size_t len)
+{
+ oi_buf *buf = malloc(sizeof(oi_buf));
+ if(!buf)
+ return NULL;
+ buf->base = malloc(len);
+ if(!buf->base) {
+ free(buf);
+ return NULL;
+ }
+ buf->len = len;
+ buf->release = oi_buf_destroy;
+ return buf;
+}
+
+oi_buf *
+oi_buf_new (const char *base, size_t len)
+{
+ oi_buf *buf = oi_buf_new2(len);
+ if(!buf)
+ return NULL;
+ memcpy(buf->base, base, len);
+ return buf;
+}
+
+#define CLOSE_ASAP(socket) do { \
+ if ((socket)->read_action) { \
+ (socket)->read_action = full_close; \
+ } \
+ if ((socket)->write_action) { \
+ (socket)->write_action = full_close; \
+ } \
+} while (0)
static int
full_close(oi_socket *socket)
{
- if(-1 == close(socket->fd) && errno == EINTR) {
- /* TODO fd still open. next loop call close again? */
- assert(0 && "implement me");
- return ERROR;
- }
+ if (close(socket->fd) == -1)
+ return errno == EINTR ? AGAIN : ERROR;
socket->read_action = NULL;
socket->write_action = NULL;
-
- if(socket->attached) {
- ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ);
- }
return OKAY;
}
half_close(oi_socket *socket)
{
int r = shutdown(socket->fd, SHUT_WR);
-
- if(r == -1) {
- RAISE_ERROR(socket, OI_ERROR_SHUTDOWN, errno);
+ if (r == -1) {
+ socket->errorno = errno;
+ assert(0 && "Shouldn't get an error on shutdown");
return ERROR;
}
-
socket->write_action = NULL;
-
- /* TODO set timer to zero so we get a callback soon */
return OKAY;
}
+// This is to be called when ever the out_stream is empty
+// and we need to change state.
static void
-update_write_buffer_after_send(oi_socket *socket, ssize_t sent)
+change_state_for_empty_out_stream (oi_socket *socket)
+{
+ /*
+ * a very complicated bunch of close logic!
+ * XXX this is awful. FIXME
+ */
+ if (socket->got_half_close == FALSE) {
+ if (socket->got_full_close == FALSE) {
+ /* Normal situation. Didn't get any close signals. */
+ ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
+ } else {
+ /* Got Full Close. */
+ if (socket->read_action)
+#if HAVE_GNUTLS
+ socket->read_action = socket->secure ? secure_full_goodbye : full_close;
+#else
+ socket->read_action = full_close;
+#endif
+
+ if (socket->write_action)
+#if HAVE_GNUTLS
+ socket->write_action = socket->secure ? secure_full_goodbye : full_close;
+#else
+ socket->write_action = full_close;
+#endif
+ }
+ } else {
+ /* Got Half Close. */
+ if (socket->write_action)
+#if HAVE_GNUTLS
+ socket->write_action = socket->secure ? secure_half_goodbye : half_close;
+#else
+ socket->write_action = half_close;
+#endif
+ }
+}
+
+static void
+update_write_buffer_after_send (oi_socket *socket, ssize_t sent)
{
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
to_write->written += sent;
socket->written += sent;
- if(to_write->written == to_write->len) {
+ if (to_write->written == to_write->len) {
oi_queue_remove(q);
- if(to_write->release) {
+ if (to_write->release) {
to_write->release(to_write);
}
- if(oi_queue_empty(&socket->out_stream)) {
- ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
- if(socket->on_drain)
+ if (oi_queue_empty(&socket->out_stream)) {
+ change_state_for_empty_out_stream(socket);
+ if (socket->on_drain)
socket->on_drain(socket);
}
}
}
-
#if HAVE_GNUTLS
static int secure_socket_send(oi_socket *socket);
static int secure_socket_recv(oi_socket *socket);
#endif
int r = send(socket->fd, buf, len, flags);
- if(r == -1) {
+ if (r == -1) {
gnutls_transport_set_errno(socket->session, errno); /* necessary ? */
}
int r = gnutls_handshake(socket->session);
- if(gnutls_error_is_fatal(r)) {
- RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
+ if (gnutls_error_is_fatal(r)) {
+ socket->gnutls_errorno = r;
return ERROR;
}
- if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
+ if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
oi_socket_reset_timeout(socket);
- if(!socket->connected) {
+ if (!socket->connected) {
socket->connected = TRUE;
- if(socket->on_connect)
- socket->on_connect(socket);
+ if (socket->on_connect) socket->on_connect(socket);
}
- if(socket->read_action)
+ if (socket->read_action)
socket->read_action = secure_socket_recv;
- if(socket->write_action)
+ if (socket->write_action)
socket->write_action = secure_socket_send;
return OKAY;
{
ssize_t sent;
- if(oi_queue_empty(&socket->out_stream)) {
+ if (oi_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
, to_write->len - to_write->written
);
- if(gnutls_error_is_fatal(sent)) {
- RAISE_ERROR(socket, OI_ERROR_GNUTLS, sent);
+ if (gnutls_error_is_fatal(sent)) {
+ socket->gnutls_errorno = sent;
return ERROR;
}
- if(sent == 0)
+ if (sent == 0)
return AGAIN;
oi_socket_reset_timeout(socket);
- if(sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
- if(GNUTLS_NEED_READ) {
- if(socket->read_action) {
- socket->read_action = secure_socket_send;
- } else {
- /* TODO GnuTLS needs read but already got EOF */
- assert(0 && "needs read but already got EOF");
- return ERROR;
- }
- }
+ if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
return AGAIN;
}
- if(sent > 0) {
+ if (sent > 0) {
/* make sure the callbacks are correct */
- if(socket->read_action)
+ if (socket->read_action)
socket->read_action = secure_socket_recv;
update_write_buffer_after_send(socket, sent);
return OKAY;
static int
secure_socket_recv(oi_socket *socket)
{
- char recv_buffer[TCP_MAXWIN];
- size_t recv_buffer_size = MIN(TCP_MAXWIN, socket->chunksize);
+ char recv_buffer[socket->chunksize];
+ size_t recv_buffer_size = socket->chunksize;
ssize_t recved;
assert(socket->secure);
//printf("secure socket recv %d %p\n", recved, socket->on_connect);
- if(gnutls_error_is_fatal(recved)) {
- RAISE_ERROR(socket, OI_ERROR_GNUTLS, recved);
+ if (gnutls_error_is_fatal(recved)) {
+ socket->gnutls_errorno = recved;
return ERROR;
}
- if(recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
- if(GNUTLS_NEED_WRITE) {
- if(socket->write_action) {
- printf("need write\n");
- socket->write_action = secure_socket_recv;
- } else {
- /* TODO GnuTLS needs send but already closed write end */
- assert(0 && "needs read but cannot");
- return ERROR;
- }
- }
+ if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
return AGAIN;
}
/* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
* initiated a handshake. In that case the server can only initiate a
* handshake or terminate the connection. */
- if(recved == GNUTLS_E_REHANDSHAKE) {
- if(socket->write_action) {
+ if (recved == GNUTLS_E_REHANDSHAKE) {
+ if (socket->write_action) {
socket->read_action = secure_handshake;
socket->write_action = secure_handshake;
return OKAY;
} else {
- /* TODO */
- assert(0 && "needs read but cannot");
+ socket->read_action = full_close;
+ // set error
return ERROR;
}
}
- if(recved >= 0) {
+ if (recved >= 0) {
/* Got EOF */
- if(recved == 0)
+ if (recved == 0)
socket->read_action = NULL;
- if(socket->write_action)
+ if (socket->write_action)
socket->write_action = secure_socket_send;
- if(socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
+ if (socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
return OKAY;
}
}
static int
-secure_goodbye(oi_socket *socket, gnutls_close_request_t how)
+secure_full_goodbye (oi_socket *socket)
{
assert(socket->secure);
- int r = gnutls_bye(socket->session, how);
+ int r = gnutls_bye(socket->session, GNUTLS_SHUT_RDWR);
- if(gnutls_error_is_fatal(r)) {
- RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
+ if (gnutls_error_is_fatal(r)) {
+ socket->gnutls_errorno = r;
return ERROR;
}
- if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
+ if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
+ CLOSE_ASAP(socket);
+
return OKAY;
}
static int
-secure_full_goodbye(oi_socket *socket)
+secure_half_goodbye (oi_socket *socket)
{
- int r = secure_goodbye(socket, GNUTLS_SHUT_RDWR);
- if(OKAY == r) {
- return full_close(socket);
- }
- return r;
-}
+ assert(socket->secure);
-static int
-secure_half_goodbye(oi_socket *socket)
-{
- int r = secure_goodbye(socket, GNUTLS_SHUT_WR);
- if(OKAY == r) {
- return half_close(socket);
+ int r = gnutls_bye(socket->session, GNUTLS_SHUT_WR);
+
+ if (gnutls_error_is_fatal(r)) {
+ socket->gnutls_errorno = r;
+ return ERROR;
}
- return r;
+
+ if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
+ return AGAIN;
+
+ if (socket->write_action)
+ socket->write_action = half_close;
+
+ return OKAY;
}
-/* Tells the socket to use transport layer security (SSL). liboi does not
- * want to make any decisions about security requirements, so the
- * majoirty of GnuTLS configuration is left to the user. Only the transport
- * layer of GnuTLS is controlled by liboi.
- *
- * That is, do not use gnutls_transport_* functions.
- * Do use the rest of GnuTLS's API.
- */
void
oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
{
#endif /* HAVE GNUTLS */
static int
-socket_send(oi_socket *socket)
+socket_send (oi_socket *socket)
{
ssize_t sent;
assert(socket->secure == FALSE);
- if(oi_queue_empty(&socket->out_stream)) {
+ if (oi_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
, flags
);
- if(sent < 0) {
- switch(errno) {
+ if (sent < 0) {
+ switch (errno) {
+#ifdef EWOULDBLOCK
+ case EWOULDBLOCK:
+#else
case EAGAIN:
+#endif
return AGAIN;
case ECONNREFUSED:
+ socket->errorno = errno;
+ return ERROR;
+
case ECONNRESET:
- socket->write_action = NULL;
- /* TODO maybe just clear write buffer instead of error?
- * They should be able to read still from the socket.
- */
- RAISE_ERROR(socket, OI_ERROR_SEND, errno);
+ socket->errorno = errno;
return ERROR;
default:
perror("send()");
assert(0 && "oi shouldn't let this happen.");
+ socket->errorno = errno;
return ERROR;
}
}
oi_socket_reset_timeout(socket);
- if(!socket->connected) {
+ if (!socket->connected) {
socket->connected = TRUE;
- if(socket->on_connect) { socket->on_connect(socket); }
+ if (socket->on_connect) socket->on_connect(socket);
}
update_write_buffer_after_send(socket, sent);
}
static int
-socket_recv(oi_socket *socket)
+socket_recv (oi_socket *socket)
{
char buf[TCP_MAXWIN];
size_t buf_size = TCP_MAXWIN;
assert(socket->secure == FALSE);
- if(!socket->connected) {
+ if (!socket->connected) {
socket->connected = TRUE;
- if(socket->on_connect) { socket->on_connect(socket); }
+ if (socket->on_connect) socket->on_connect(socket);
return OKAY;
}
recved = recv(socket->fd, buf, buf_size, 0);
- if(recved < 0) {
- switch(errno) {
- case EAGAIN:
+ if (recved < 0) {
+ switch (errno) {
+#ifdef EWOULDBLOCK
+ case EWOULDBLOCK:
+#else
+ case EAGAIN:
+#endif
+ return AGAIN;
+
case EINTR:
return AGAIN;
/* A remote host refused to allow the network connection (typically
* because it is not running the requested service). */
case ECONNREFUSED:
- RAISE_ERROR(socket, OI_ERROR_RECV, errno);
- return ERROR;
-
- case ECONNRESET:
- RAISE_ERROR(socket, OI_ERROR_RECV, errno);
+ socket->errorno = errno;
return ERROR;
default:
oi_socket_reset_timeout(socket);
- if(recved == 0) {
+ if (recved == 0) {
oi_socket_read_stop(socket);
socket->read_action = NULL;
}
/* NOTE: EOF is signaled with recved == 0 on callback */
- if(socket->on_read) { socket->on_read(socket, buf, recved); }
+ if (socket->on_read) { socket->on_read(socket, buf, recved); }
return OKAY;
}
static void
-assign_file_descriptor(oi_socket *socket, int fd)
+assign_file_descriptor (oi_socket *socket, int fd)
{
socket->fd = fd;
socket->write_action = socket_send;
#if HAVE_GNUTLS
- if(socket->secure) {
+ if (socket->secure) {
gnutls_transport_set_lowat(socket->session, 0);
gnutls_transport_set_push_function(socket->session, nosigpipe_push);
gnutls_transport_set_ptr2 ( socket->session
* Called by server->connection_watcher.
*/
static void
-on_connection(EV_P_ ev_io *watcher, int revents)
+on_connection (EV_P_ ev_io *watcher, int revents)
{
oi_server *server = watcher->data;
#endif
assert(&server->connection_watcher == watcher);
- if(EV_ERROR & revents) {
+ if (EV_ERROR & revents) {
oi_server_close(server);
return;
}
/* TODO accept all possible connections? currently: just one */
int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len);
- if(fd < 0) {
+ if (fd < 0) {
perror("accept()");
return;
}
oi_socket *socket = NULL;
- if(server->on_connection)
+ if (server->on_connection)
socket = server->on_connection(server, (struct sockaddr*)&address, addr_len);
- if(socket == NULL) {
+ if (socket == NULL) {
close(fd);
return;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- if(r < 0) {
+ if (r < 0) {
/* TODO error report */
}
oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
{
int fd = -1;
- struct linger ling = {0, 0};
assert(server->listening == FALSE);
fd = socket( addrinfo->ai_family
, addrinfo->ai_socktype
, addrinfo->ai_protocol
);
- if(fd < 0) {
+ if (fd < 0) {
perror("socket()");
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- if(r < 0) {
+ if (r < 0) {
perror("fcntl()");
return -1;
}
flags = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
- setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
/* XXX: Sending single byte chunks in a response body? Perhaps there is a
* need to enable the Nagel algorithm dynamically. For now disabling.
void
oi_server_close(oi_server *server)
{
- if(server->listening) {
+ if (server->listening) {
oi_server_detach(server);
close(server->fd);
/* TODO do this on the loop? check return value? */
assert(watcher == &socket->timeout_watcher);
- // printf("on_timeout\n");
-
- if(socket->on_timeout) { socket->on_timeout(socket); }
-
+ // printf("on_timeout\n");
- /* TODD set timer to zero */
- full_close(socket);
+ if (socket->on_timeout) { socket->on_timeout(socket); }
+ // timeout does not automatically kill your connection. you must!
}
static void
release_write_buffer(oi_socket *socket)
{
- while(!oi_queue_empty(&socket->out_stream)) {
+ while (!oi_queue_empty(&socket->out_stream)) {
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *buf = oi_queue_data(q, oi_buf, queue);
oi_queue_remove(q);
- if(buf->release) { buf->release(buf); }
+ if (buf->release) { buf->release(buf); }
}
}
{
oi_socket *socket = watcher->data;
- if(revents & EV_ERROR) {
- RAISE_ERROR(socket, OI_ERROR_EV, 0);
- goto close;
+ if (revents & EV_ERROR) {
+ socket->errorno = 1;
+ CLOSE_ASAP(socket);
}
int r;
- int have_read_event = TRUE;
- int have_write_event = TRUE;
+ int have_read_event = (socket->read_action != NULL);
+ int have_write_event = (socket->write_action != NULL);
- while(have_read_event || have_write_event) {
-
- if(socket->read_action) {
- r = socket->read_action(socket);
- if(r == ERROR) goto close;
- if(r == AGAIN) have_read_event = FALSE;
- } else {
+ while (have_read_event || have_write_event) {
+ /* RECV LOOP - TRY TO CLEAR THE BUFFER */
+ if (socket->read_action == NULL)
have_read_event = FALSE;
- }
+ else {
+ r = socket->read_action(socket);
- if(socket->write_action) {
- r = socket->write_action(socket);
- if(r == ERROR) goto close;
- if(r == AGAIN) have_write_event = FALSE;
- } else {
- have_write_event = FALSE;
+ if (r == AGAIN)
+ have_read_event = FALSE;
+ else if (r == ERROR)
+ CLOSE_ASAP(socket);
}
-
- if(socket->read_watcher.active == FALSE)
- have_read_event = FALSE;
- if(socket->write_watcher.active == FALSE)
+ /* SEND LOOP - TRY TO CLEAR THE BUFFER */
+ if (socket->write_action == NULL)
have_write_event = FALSE;
- }
-
- if(socket->write_action == NULL && socket->read_action == NULL)
- goto close;
+ else {
+ r = socket->write_action(socket);
- return;
+ if (r == AGAIN)
+ have_write_event = FALSE;
+ else if (r == ERROR)
+ CLOSE_ASAP(socket);
+ }
+ }
-close:
- release_write_buffer(socket);
+ // Close when both sides of the stream are closed.
+ if (socket->write_action == NULL && socket->read_action == NULL) {
+ release_write_buffer(socket);
- ev_clear_pending (EV_A_ &socket->write_watcher);
- ev_clear_pending (EV_A_ &socket->read_watcher);
- ev_clear_pending (EV_A_ &socket->timeout_watcher);
+ ev_clear_pending (EV_A_ &socket->write_watcher);
+ ev_clear_pending (EV_A_ &socket->read_watcher);
+ ev_clear_pending (EV_A_ &socket->timeout_watcher);
- oi_socket_detach(socket);
+ oi_socket_detach(socket);
- if(socket->on_close) { socket->on_close(socket); }
- /* WARNING: user can free socket in on_close so no more
- * access beyond this point. */
+ if (socket->on_close) { socket->on_close(socket); }
+ /* WARNING: user can free socket in on_close so no more
+ * access beyond this point. */
+ }
}
/**
ev_init(&socket->read_watcher, on_io_event);
socket->read_watcher.data = socket;
+ socket->got_full_close = FALSE;
+ socket->got_half_close = FALSE;
+
+ socket->errorno = 0;
+
socket->secure = FALSE;
- socket->wait_for_secure_hangup = FALSE;
#if HAVE_GNUTLS
+ socket->gnutls_errorno = 0;
socket->session = NULL;
#endif
- /* TODO higher resolution timer */
ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout);
socket->timeout_watcher.data = socket;
socket->on_connect = NULL;
socket->on_read = NULL;
socket->on_drain = NULL;
- socket->on_error = NULL;
socket->on_timeout = NULL;
}
void
-oi_socket_write_eof (oi_socket *socket)
+oi_socket_close (oi_socket *socket)
{
-#if HAVE_GNUTLS
- /* try to hang up properly for secure connections */
- if(socket->secure)
- {
- if( socket->connected /* completed handshake */
- && socket->write_action /* write end is open */
- )
- {
- socket->write_action = secure_half_goodbye;
- if(socket->attached)
- ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
- return;
- }
- /* secure servers cannot handle half-closed connections? */
- full_close(socket);
- return;
- }
-#endif // HAVE_GNUTLS
-
- if(socket->write_action)
- half_close(socket);
- else
- full_close(socket);
+ socket->got_half_close = TRUE;
+ if (oi_queue_empty(&socket->out_stream))
+ change_state_for_empty_out_stream(socket);
}
void
-oi_socket_close (oi_socket *socket)
+oi_socket_full_close (oi_socket *socket)
{
-#if HAVE_GNUTLS
- /* try to hang up properly for secure connections */
- if( socket->secure
- && socket->connected /* completed handshake */
- && socket->write_action /* write end is open */
- )
- {
- if(socket->wait_for_secure_hangup && socket->read_action) {
- socket->write_action = secure_full_goodbye;
- socket->read_action = secure_full_goodbye;
- } else {
- socket->write_action = secure_half_goodbye;
- socket->read_action = NULL;
- }
-
- if(socket->attached)
- ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
-
- return;
- }
-#endif // HAVE_GNUTLS
-
- full_close(socket);
+ socket->got_full_close = TRUE;
+ if (oi_queue_empty(&socket->out_stream))
+ change_state_for_empty_out_stream(socket);
}
-/*
- * Resets the timeout to stay alive for another socket->timeout seconds
- */
-void
-oi_socket_reset_timeout(oi_socket *socket)
+void oi_socket_force_close (oi_socket *socket)
{
- ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
+ // socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE
+ CLOSE_ASAP(socket);
}
-/**
- * Writes a string to the socket. This is actually sets a watcher which may
- * take multiple iterations to write the entire string.
- */
void
oi_socket_write(oi_socket *socket, oi_buf *buf)
{
- if(socket->write_action == NULL)
- return;
+ assert(socket->write_action != NULL && "Do not write to a closed socket");
+ assert(socket->got_full_close == FALSE && "Do not write to a closing socket");
+ assert(socket->got_half_close == FALSE && "Do not write to a closing socket");
oi_queue_insert_head(&socket->out_stream, &buf->queue);
-
buf->written = 0;
- // XXX if (socket->attached) ??
- ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
+
+ if (socket->attached) {
+ ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
+ }
+}
+
+void
+oi_socket_reset_timeout(oi_socket *socket)
+{
+ ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
}
static void
ev_timer_again(EV_A_ &socket->timeout_watcher);
- if(socket->read_action)
+ if (socket->read_action)
ev_io_start(EV_A_ &socket->read_watcher);
- if(socket->write_action)
+ if (socket->write_action)
ev_io_start(EV_A_ &socket->write_watcher);
-
- /* make sure the io_event happens soon in the case we're being reattached */
- ev_feed_event(EV_A_ &socket->read_watcher, EV_READ);
}
void
oi_socket_detach(oi_socket *socket)
{
- if(socket->attached) {
+ if (socket->attached) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
void
oi_socket_read_start (oi_socket *socket)
{
- if(socket->read_action) {
+ if (socket->read_action) {
ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
/* XXX feed event? */
}
, addrinfo->ai_socktype
, addrinfo->ai_protocol
);
- if(fd < 0) {
+ if (fd < 0) {
perror("socket()");
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- if(r < 0) {
+ if (r < 0) {
perror("fcntl()");
return -1;
}
, addrinfo->ai_addrlen
);
- if(r < 0 && errno != EINPROGRESS) {
+ if (r < 0 && errno != EINPROGRESS) {
perror("connect");
close(fd);
return -1;
+/* Copyright (c) 2008,2009 Ryan Dahl
+ *
+ * oi_queue comes from ngx_queue.h
+ * Copyright (C) 2002-2009 Igor Sysoev
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
#include <netdb.h>
#include <ev.h>
-#include <oi_queue.h>
-#include <oi_error.h>
-#include <oi_buf.h>
+#include <stddef.h> /* offsetof() */
#ifndef oi_socket_h
#define oi_socket_h
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
#endif
+typedef struct oi_queue oi_queue;
+struct oi_queue {
+ oi_queue *prev;
+ oi_queue *next;
+};
+
+#define oi_queue_init(q) \
+ (q)->prev = q; \
+ (q)->next = q
+
+#define oi_queue_empty(h) \
+ (h == (h)->prev)
+
+#define oi_queue_insert_head(h, x) \
+ (x)->next = (h)->next; \
+ (x)->next->prev = x; \
+ (x)->prev = h; \
+ (h)->next = x
+
+#define oi_queue_head(h) \
+ (h)->next
+
+#define oi_queue_last(h) \
+ (h)->prev
+
+#define oi_queue_remove(x) \
+ (x)->next->prev = (x)->prev; \
+ (x)->prev->next = (x)->next; \
+ (x)->prev = NULL; \
+ (x)->next = NULL
+#define oi_queue_data(q, type, link) \
+ (type *) ((unsigned char *) q - offsetof(type, link))
+
+typedef struct oi_buf oi_buf;
typedef struct oi_server oi_server;
typedef struct oi_socket oi_socket;
+oi_buf * oi_buf_new (const char* base, size_t len);
+oi_buf * oi_buf_new2 (size_t len);
+void oi_buf_destroy (oi_buf *);
+
void oi_server_init (oi_server *, int backlog);
int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
void oi_server_attach (EV_P_ oi_server *);
void oi_server_close (oi_server *);
void oi_socket_init (oi_socket *, float timeout);
- int oi_socket_pair (oi_socket *a, oi_socket *b); /* TODO */
int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
void oi_socket_attach (EV_P_ oi_socket *);
void oi_socket_detach (oi_socket *);
void oi_socket_read_start (oi_socket *);
void oi_socket_read_stop (oi_socket *);
+
+/* Resets the timeout to stay alive for another socket->timeout seconds
+ */
void oi_socket_reset_timeout (oi_socket *);
+
+/* Writes a buffer to the socket.
+ * (Do not send a NULL oi_buf or a buffer with oi_buf->base == NULL.)
+ */
void oi_socket_write (oi_socket *, oi_buf *);
+
void oi_socket_write_simple (oi_socket *, const char *str, size_t len);
-void oi_socket_write_eof (oi_socket *);
+
+/* Once the write buffer is drained, oi_socket_close will shutdown the
+ * writing end of the socket and will close the read end once the server
+ * replies with an EOF.
+ */
void oi_socket_close (oi_socket *);
+
+/* Do not wait for the server to reply with EOF. This will only be called
+ * once the write buffer is drained.
+ * Warning: For TCP socket, the OS kernel may (should) reply with RST
+ * packets if this is called when data is still being received from the
+ * server.
+ */
+void oi_socket_full_close (oi_socket *);
+
+/* The most extreme measure.
+ * Will not wait for the write queue to complete.
+ */
+void oi_socket_force_close (oi_socket *);
+
+
#if HAVE_GNUTLS
+/* Tells the socket to use transport layer security (SSL). oi_socket does
+ * not want to make any decisions about security requirements, so the
+ * majoirty of GnuTLS configuration is left to the user. Only the transport
+ * layer of GnuTLS is controlled by oi_socket. That is, do not use
+ * gnutls_transport_* functions. Do use the rest of GnuTLS's API.
+ */
void oi_socket_set_secure_session (oi_socket *, gnutls_session_t);
#endif
+struct oi_buf {
+ /* public */
+ char *base;
+ size_t len;
+ void (*release) (oi_buf *); /* called when oi is done with the object */
+ void *data;
+
+ /* private */
+ size_t written;
+ oi_queue queue;
+};
+
struct oi_server {
/* read only */
int fd;
/* public */
oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len);
- void (*on_error) (oi_server *, struct oi_error e);
+ void (*on_error) (oi_server *);
void *data;
};
unsigned attached:1;
unsigned connected:1;
unsigned secure:1;
- unsigned wait_for_secure_hangup:1;
+ unsigned got_full_close:1;
+ unsigned got_half_close:1;
- /* if these are NULL then it means that end of the socket is closed. */
+ /* NULL = that end of the socket is closed. */
int (*read_action) (oi_socket *);
int (*write_action) (oi_socket *);
+ /* ERROR CODES. 0 = no error. Check on_close. */
+ int errorno;
+#if HAVE_GNUTLS
+ int gnutls_errorno;
+#endif
+
/* private */
ev_io write_watcher;
ev_io read_watcher;
void (*on_connect) (oi_socket *);
void (*on_read) (oi_socket *, const void *buf, size_t count);
void (*on_drain) (oi_socket *);
- void (*on_error) (oi_socket *, struct oi_error e);
void (*on_close) (oi_socket *);
void (*on_timeout) (oi_socket *);
void *data;
#include <ev.h>
-#include <oi.h>
+#include <oi_socket.h>
#include <gnutls/gnutls.h>
#define HOST "127.0.0.1"
static void
on_peer_close(oi_socket *socket)
{
+ assert(socket->errorno == 0);
//printf("server connection closed\n");
#if HAVE_GNUTLS
+ assert(socket->gnutls_errorno == 0);
#if SECURE
gnutls_deinit(socket->session);
#endif
assert(0);
}
-static void
-on_client_error(oi_socket *socket, struct oi_error e)
-{
- assert(0);
-}
-
#if HAVE_GNUTLS
#if SECURE
oi_socket_close(socket);
}
-static void
-on_peer_error2(oi_socket *socket, struct oi_error e)
-{
- if(e.domain == OI_ERROR_GNUTLS) return;
- assert(0);
-}
-
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_drain = on_peer_drain;
- socket->on_error = on_peer_error2;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
on_client_connect(oi_socket *socket)
{
//printf("on client connection\n");
- oi_socket_write_eof(socket);
+ oi_socket_close(socket);
}
static void
static void
on_client_read(oi_socket *socket, const void *base, size_t len)
{
+ if (len == 0) {
+ oi_socket_close(socket);
+ return;
+ }
+
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
//printf("client got message: %s\n", buf);
- if(strcmp(buf, "BYE") == 0) {
+ if (strcmp(buf, "BYE") == 0) {
oi_socket_close(socket);
} else {
assert(0);
oi_socket *client = malloc(sizeof(oi_socket));
oi_socket_init(client, TIMEOUT);
client->on_read = on_client_read;
- client->on_error = on_client_error;
client->on_connect = on_client_connect;
client->on_close = on_client_close;
client->on_timeout = on_client_timeout;
#include "test/common.c"
+// timeout must match the timeout in timeout.rb
+#define TIMEOUT 5.0
+
int successful_ping_count;
static void
oi_socket_write_simple(socket, base, len);
}
-static void
-on_peer_error(oi_socket *socket, struct oi_error e)
-{
- assert(0);
-}
-
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
- oi_socket_init(socket, 5.0);
+ oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
- socket->on_error = on_peer_error;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
#define PING "PING"
#define PONG "PONG"
-#define EXCHANGES 100
+#define EXCHANGES 500
+#define TIMEOUT 5.0
int successful_ping_count;
static void
on_peer_read(oi_socket *socket, const void *base, size_t len)
{
- if(len == 0)
+ if (len == 0) {
+ oi_socket_close(socket);
return;
+ }
char buf[2000];
strncpy(buf, base, len);
}
static void
-on_peer_error(oi_socket *socket, struct oi_error e)
-{
- assert(0);
-}
-
-static void
on_client_close(oi_socket *socket)
{
//printf("client connection closed\n");
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
- oi_socket_init(socket, 5.0);
+ oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
- socket->on_error = on_peer_error;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
}
static void
-on_client_connect(oi_socket *socket)
+on_client_connect (oi_socket *socket)
{
//printf("client connected. sending ping\n");
oi_socket_write_simple(socket, PING, sizeof PING);
}
static void
-on_client_read(oi_socket *socket, const void *base, size_t len)
+on_client_read (oi_socket *socket, const void *base, size_t len)
{
+ if(len == 0) {
+ oi_socket_close(socket);
+ return;
+ }
+
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
assert(r == 0);
oi_server_attach(EV_DEFAULT_ &server);
- oi_socket_init(&client, 5.0);
+ oi_socket_init(&client, TIMEOUT);
client.on_read = on_client_read;
- client.on_error = on_client_error;
client.on_connect = on_client_connect;
client.on_close = on_client_close;
client.on_timeout = on_client_timeout;
#define CONNECT_SYMBOL String::NewSymbol("connect")
#define ENCODING_SYMBOL String::NewSymbol("encoding")
#define TIMEOUT_SYMBOL String::NewSymbol("timeout")
-#define SERVER_SYMBOL String::NewSymbol("server")
+#define SERVER_SYMBOL String::NewSymbol("server")
#define PROTOCOL_SYMBOL String::NewSymbol("protocol")
#define PROTOCOL_CLASS_SYMBOL String::NewSymbol("protocol_class")
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "connect", v8Connect);
- NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "close", v8Close);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "send", v8Send);
- NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "sendEOF", v8SendEOF);
+ NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "close", v8Close);
+ NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "fullClose", v8FullClose);
+ NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "forceClose", v8ForceClose);
target->Set(String::NewSymbol("TCPConnection"), constructor_template->GetFunction());
}
socket_.on_connect = Connection::on_connect;
socket_.on_read = Connection::on_read;
socket_.on_drain = Connection::on_drain;
- socket_.on_error = Connection::on_error;
socket_.on_close = Connection::on_close;
socket_.on_timeout = Connection::on_timeout;
socket_.data = this;
}
+Connection::~Connection ()
+{
+ handle_->Delete(SEND_SYMBOL);
+ Close();
+}
+
Local<Object>
Connection::GetProtocol (void)
{
return 0;
}
-
- oi_error e; // TODO better error!
- connection->OnError(e);
+ connection->OnDisconnect();
return 0;
}
}
Handle<Value>
-Connection::v8Send (const Arguments& args)
+Connection::v8FullClose (const Arguments& args)
{
HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder());
+ connection->FullClose();
+ return Undefined();
+}
- if (args[0] == Null()) {
- oi_socket_write_eof(&connection->socket_);
+Handle<Value>
+Connection::v8ForceClose (const Arguments& args)
+{
+ HandleScope scope;
+ Connection *connection = NODE_UNWRAP(Connection, args.Holder());
+ connection->ForceClose();
+ return Undefined();
+}
+
+
+Handle<Value>
+Connection::v8Send (const Arguments& args)
+{
+ HandleScope scope;
+ Connection *connection = NODE_UNWRAP(Connection, args.Holder());
- } else if (args[0]->IsString()) {
+ if (args[0]->IsString()) {
// utf8 encoding
Local<String> s = args[0]->ToString();
size_t length = s->Utf8Length();
return Undefined();
}
-Handle<Value>
-Connection::v8SendEOF (const Arguments& args)
-{
- HandleScope scope;
- Connection *connection = NODE_UNWRAP(Connection, args.Holder());
- connection->SendEOF();
- return Undefined();
-}
-
void
Connection::OnReceive (const void *buf, size_t len)
{
fatal_exception(try_catch); // XXX is this the right action to take?
}
-void
-Connection::OnError (oi_error e)
-{
- HandleScope scope;
- Local<Object> protocol = GetProtocol();
- Local<Value> callback_v = protocol->Get(ON_ERROR_SYMBOL);
- if (!callback_v->IsFunction()) return;
- Handle<Function> callback = Handle<Function>::Cast(callback_v);
- // TODO call with error arg
- callback->Call(protocol, 0, NULL);
-}
-
#define DEFINE_SIMPLE_CALLBACK(name, symbol) \
void name () \
{ \
oi_server_init(&server_, backlog);
server_.on_connection = Acceptor::on_connection;
- server_.on_error = Acceptor::on_error;
server_.data = this;
}
return connection;
}
-void
-Acceptor::OnError (struct oi_error error)
-{
- HandleScope scope;
-
- Local<Value> callback_v = handle_->Get(ON_ERROR_SYMBOL);
- if (!callback_v->IsFunction()) return;
- Local<Function> callback = Local<Function>::Cast(callback_v);
- callback->Call(handle_, 0, NULL); // TODO args
-}
-
Handle<Value>
Acceptor::v8New (const Arguments& args)
{
static v8::Handle<v8::Value> v8New (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Connect (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Send (const v8::Arguments& args);
- static v8::Handle<v8::Value> v8SendEOF (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Close (const v8::Arguments& args);
+ static v8::Handle<v8::Value> v8FullClose (const v8::Arguments& args);
+ static v8::Handle<v8::Value> v8ForceClose (const v8::Arguments& args);
Connection (v8::Handle<v8::Object> handle, v8::Handle<v8::Function> protocol_class);
- virtual ~Connection () { Close(); }
+ virtual ~Connection ();
int Connect (struct addrinfo *address) { return oi_socket_connect (&socket_, address); }
- void Send (oi_buf *buf) { oi_socket_write (&socket_, buf); }
- void SendEOF (void) { oi_socket_write_eof (&socket_); }
- void Close (void) { oi_socket_close (&socket_); }
+ void Send (oi_buf *buf) { oi_socket_write(&socket_, buf); }
+ void Close (void) { oi_socket_close(&socket_); }
+ void FullClose (void) { oi_socket_full_close(&socket_); }
+ void ForceClose (void) { oi_socket_force_close(&socket_); }
void SetAcceptor (v8::Handle<v8::Object> acceptor_handle);
virtual void OnDrain (void);
virtual void OnEOF (void);
virtual void OnDisconnect (void);
- virtual void OnError (oi_error e);
virtual void OnTimeout (void);
v8::Local<v8::Object> GetProtocol (void);
connection->OnDrain();
}
- static void on_error (oi_socket *s, oi_error e) {
- Connection *connection = static_cast<Connection*> (s->data);
- connection->OnError(e);
- }
-
static void on_close (oi_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDisconnect();
}
virtual Connection* OnConnection (struct sockaddr *addr, socklen_t len);
- virtual void OnError (struct oi_error error);
private:
static oi_socket* on_connection (oi_server *s, struct sockaddr *addr, socklen_t len) {
return NULL;
}
- static void on_error (oi_server *s, struct oi_error error) {
- Acceptor *acceptor = static_cast<Acceptor*> (s->data);
- acceptor->OnError (error);
- }
-
oi_server server_;
};
this.onEOF = function () {
puts("ponger: onEOF");
- socket.send("QUIT");
socket.close();
};
socket.send("PING");
} else {
puts("sending FIN");
- socket.sendEOF();
+ socket.close();
}
};
### oi
oi = bld.new_task_gen("cc", "staticlib")
- oi.source = "deps/liboi/oi_socket.c deps/liboi/oi_buf.c"
+ oi.source = "deps/liboi/oi_socket.c"
oi.includes = "deps/liboi/"
oi.name = "oi"
oi.target = "oi"