conf.define("HAVE_CONFIG_H", 1)
conf.write_config_header('config.h')
+ conf.env.append_value('CCFLAGS', ['-DEV_MULTIPLICITY=0'])
+ conf.env.append_value('CXXFLAGS', ['-DEV_MULTIPLICITY=0'])
+
def build(bld):
libev = bld.new_task_gen("cc", "staticlib")
libev.source = 'ev.c'
# Define GNUTLSDIR=/foo/bar if your gnutls header and library files are in
# /foo/bar/include and /foo/bar/lib directories.
#GNUTLSDIR=/usr
-#
-#
-# Define NO_PREAD if you have a problem with pread() system call (e.g.
-# cygwin.dll before v1.5.22).
-#
-#
-# Define NO_SENDFILE if you have a problem with the sendfile() system call
-#
-#
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
uname_P := $(shell sh -c 'uname -p 2>/dev/null || echo not')
# CFLAGS and LDFLAGS are for the users to override from the command line.
-CFLAGS = -g
+CFLAGS = -g
LDFLAGS =
PREFIX = $(HOME)/local/liboi
#define oi_h
#include <oi_socket.h>
-#include <oi_async.h>
-#include <oi_file.h>
#endif
+++ /dev/null
-#include <stdlib.h> /* malloc() */
-#include <stdio.h> /* perror() */
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h> /* read(), write() */
-#include <fcntl.h>
-#include <errno.h>
-#include <assert.h>
-#include <pthread.h>
-
-#if HAVE_SENDFILE
-# if __linux
-# include <sys/sendfile.h>
-# elif __freebsd
-# include <sys/socket.h>
-# include <sys/uio.h>
-# elif __hpux
-# include <sys/socket.h>
-# elif __solaris /* not yet */
-# include <sys/sendfile.h>
-# else
-# error sendfile support requested but not available
-# endif
-#endif
-
-#include <ev.h>
-#include <oi.h>
-
-#define NWORKERS 4
-/* TODO make adjustable
- * once it is fix sleeping_tasks
- */
-
-static int active_watchers = 0;
-static int active_workers = 0;
-static int readiness_pipe[2] = {-1, -1};
-static oi_queue waiting_tasks;
-static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_mutex_t finished_lock = PTHREAD_MUTEX_INITIALIZER;
-
-struct worker {
- oi_task *task;
- pthread_t thread;
- pthread_attr_t thread_attr;
-};
-
-/* Sendfile and pread emulation come from Marc Lehmann's libeio and are
- * Copyright (C)2007,2008 Marc Alexander Lehmann.
- * Many ideas of oi_async.* are taken from libeio and in fact, I plan to
- * use libeio once it becomes usable for me. (The problem is issuing tasks
- * from multiple threads.)
- */
-
-#if !HAVE_PREADWRITE
-/*
- * make our pread/pwrite emulation safe against themselves, but not against
- * normal read/write by using a mutex. slows down execution a lot,
- * but that's your problem, not mine.
- */
-static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
-#endif
-
-#if !HAVE_PREADWRITE
-# undef pread
-# undef pwrite
-# define pread eio__pread
-# define pwrite eio__pwrite
-
-static ssize_t
-eio__pread (int fd, void *buf, size_t count, off_t offset)
-{
- ssize_t res;
- off_t ooffset;
-
- pthread_mutex_lock(&preadwritelock);
- ooffset = lseek (fd, 0, SEEK_CUR);
- lseek (fd, offset, SEEK_SET);
- res = read (fd, buf, count);
- lseek (fd, ooffset, SEEK_SET);
- pthread_mutex_unlock(&preadwritelock);
-
- return res;
-}
-
-static ssize_t
-eio__pwrite (int fd, void *buf, size_t count, off_t offset)
-{
- ssize_t res;
- off_t ooffset;
-
- pthread_mutex_lock(&preadwritelock);
- ooffset = lseek (fd, 0, SEEK_CUR);
- lseek (fd, offset, SEEK_SET);
- res = write (fd, buf, count);
- lseek (fd, offset, SEEK_SET);
- pthread_mutex_unlock(&preadwritelock);
-
- return res;
-}
-#endif
-
-
-/* sendfile always needs emulation */
-static ssize_t
-eio__sendfile (int ofd, int ifd, off_t offset, size_t count)
-{
- ssize_t res;
-
- if (!count)
- return 0;
-
-#if HAVE_SENDFILE
-# if __linux
- res = sendfile (ofd, ifd, &offset, count);
-
-# elif __freebsd
- /*
- * Of course, the freebsd sendfile is a dire hack with no thoughts
- * wasted on making it similar to other I/O functions.
- */
- {
- off_t sbytes;
- res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
-
- if (res < 0 && sbytes)
- /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
- res = sbytes;
- }
-
-# elif __hpux
- res = sendfile (ofd, ifd, offset, count, 0, 0);
-
-# elif __solaris
- {
- struct sendfilevec vec;
- size_t sbytes;
-
- vec.sfv_fd = ifd;
- vec.sfv_flag = 0;
- vec.sfv_off = offset;
- vec.sfv_len = count;
-
- res = sendfilev (ofd, &vec, 1, &sbytes);
-
- if (res < 0 && sbytes)
- res = sbytes;
- }
-
-# endif
-#else
- res = -1;
- errno = ENOSYS;
-#endif
-
- if (res < 0
- && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
-#if __solaris
- || errno == EAFNOSUPPORT || errno == EPROTOTYPE
-#endif
- )
- )
- {
- /* emulate sendfile. this is a major pain in the ass */
-/* buffer size for various temporary buffers */
-#define EIO_BUFSIZE 65536
- char *eio_buf = malloc (EIO_BUFSIZE);
- errno = ENOMEM;
- if (!eio_buf)
- return -1;
-
- res = 0;
-
- while (count) {
- ssize_t cnt;
-
- cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
-
- if (cnt <= 0) {
- if (cnt && !res) res = -1;
- break;
- }
-
- cnt = write (ofd, eio_buf, cnt);
-
- if (cnt <= 0) {
- if (cnt && !res) res = -1;
- break;
- }
-
- offset += cnt;
- res += cnt;
- count -= cnt;
- }
-
- free(eio_buf);
- }
-
- return res;
-}
-
-static oi_task*
-queue_shift(pthread_mutex_t *lock, oi_queue *queue)
-{
- oi_queue *last = NULL;
- pthread_mutex_lock(lock);
- if(!oi_queue_empty(queue)) {
- last = oi_queue_last(queue);
- oi_queue_remove(last);
- }
- pthread_mutex_unlock(lock);
-
- if(last == NULL)
- return NULL;
-
- return oi_queue_data(last, oi_task, queue);
-}
-
-#define P1(name,a) { \
- t->params.name.result = name( t->params.name.a ); \
- break; \
-}
-
-#define P2(name,a,b) { \
- t->params.name.result = name( t->params.name.a \
- , t->params.name.b \
- ); \
- break; \
-}
-
-#define P3(name,a,b,c) { \
- t->params.name.result = name( t->params.name.a \
- , t->params.name.b \
- , t->params.name.c \
- ); \
- break; \
-}
-
-#define P4(name,a,b,c,d) { \
- t->params.name.result = name( t->params.name.a \
- , t->params.name.b \
- , t->params.name.c \
- , t->params.name.d \
- ); \
- break; \
-}
-
-static void
-execute_task(oi_task *t)
-{
- errno = 0;
- switch(t->type) {
- case OI_TASK_OPEN: P3(open, pathname, flags, mode);
- case OI_TASK_READ: P3(read, fd, buf, count);
- case OI_TASK_WRITE: P3(write, fd, buf, count);
- case OI_TASK_CLOSE: P1(close, fd);
- case OI_TASK_SLEEP: P1(sleep, seconds);
- case OI_TASK_SENDFILE: P4(eio__sendfile, out_fd, in_fd, offset, count);
- case OI_TASK_GETADDRINFO: P4(getaddrinfo, nodename, servname, hints, res);
- case OI_TASK_LSTAT: P2(lstat, path, buf);
- default:
- assert(0 && "unknown task type");
- break;
- }
- t->errorno = errno;
-}
-
-static void
-attempt_to_get_a_task(struct worker *worker)
-{
- char dummy;
- assert(readiness_pipe[0] > 0);
- int r = read(readiness_pipe[0], &dummy, 1);
- if(r == -1 && (errno != EAGAIN || errno != EINTR)) {
- perror("read(readiness_pipe[0])");
- return;
- }
-
- // 1 pop task from queue
- assert(worker->task == NULL);
- oi_task *task = queue_shift(&queue_lock, &waiting_tasks);
- if(task == NULL) return;
- worker->task = task;
-
- // 2 run task
- execute_task(task);
-
- // 3 notify complition
- oi_async *async = task->async;
- assert(async != NULL);
- pthread_mutex_lock(&finished_lock);
- oi_queue_insert_head(&async->finished_tasks, &task->queue);
- pthread_mutex_unlock(&finished_lock);
- ev_async_send(async->loop, &async->watcher);
- worker->task = NULL;
-
- /* attempt to pull another task */
- return attempt_to_get_a_task(worker);
-}
-
-void *
-worker_loop(void *data)
-{
- int r;
- struct worker *worker = data;
- fd_set readfds;
- FD_ZERO(&readfds);
- FD_SET(readiness_pipe[0], &readfds);
-
- active_workers++;
- assert(active_workers <= NWORKERS);
-
- while(1) {
- r = select(1+readiness_pipe[0], &readfds, 0, 0, 0);
- if(r == -1) break;
- attempt_to_get_a_task(worker);
- }
- active_workers--;
-
- return NULL;
-}
-
-static struct worker*
-worker_new()
-{
- int r;
- struct worker *worker = calloc(sizeof(struct worker), 1);
- if(worker == NULL ) { return NULL; }
-
- worker->task = NULL;
- pthread_attr_setdetachstate(&worker->thread_attr, PTHREAD_CREATE_DETACHED);
-
- r = pthread_create( &worker->thread
- , NULL // &worker->thread_attr
- , worker_loop
- , worker
- );
- if(r != 0) {
- /* TODO: error checking */
- perror("pthread_create");
- goto error;
- }
-
- return worker;
-error:
- free(worker);
- return NULL;
-}
-
-static void
-start_workers()
-{
- assert(readiness_pipe[0] == -1);
- assert(readiness_pipe[1] == -1);
- assert(active_workers == 0);
-
- int r = pipe(readiness_pipe);
- if(r < 0) {
- perror("pipe()");
- assert(0 && "TODO HANDLE ME");
- }
-
- /* set the write end non-blocking */
- int flags = fcntl(readiness_pipe[1], F_GETFL, 0);
- r = fcntl(readiness_pipe[1], F_SETFL, flags | O_NONBLOCK);
- if(r < 0) {
- assert(0 && "error setting pipe to non-blocking?");
- /* TODO error report */
- }
-
- oi_queue_init(&waiting_tasks);
-
- int i;
- for(i = 0; i < NWORKERS; i++)
- worker_new();
-}
-
-/*
-static void
-stop_workers()
-{
- assert(0 && "TODO implement me");
-}
-*/
-
-static void
-on_completion(struct ev_loop *loop, ev_async *watcher, int revents)
-{
- oi_async *async = watcher->data;
- oi_task *task;
-
- while((task = queue_shift(&finished_lock, &async->finished_tasks))) {
- assert(task->active);
- task->active = 0;
- errno = task->errorno;
-# define done_cb(kind) { \
- assert(task->params.kind.cb); \
- task->params.kind.cb(task, task->params.kind.result); \
- break; \
- }
- switch(task->type) {
- case OI_TASK_OPEN: done_cb(open);
- case OI_TASK_READ: done_cb(read);
- case OI_TASK_WRITE: done_cb(write);
- case OI_TASK_CLOSE: done_cb(close);
- case OI_TASK_SLEEP: done_cb(sleep);
- case OI_TASK_SENDFILE: done_cb(eio__sendfile);
- case OI_TASK_GETADDRINFO: done_cb(getaddrinfo);
- case OI_TASK_LSTAT: done_cb(lstat);
- }
- /* the task is possibly freed by callback. do not access it again. */
- }
-}
-
-void
-oi_async_init (oi_async *async)
-{
- ev_async_init(&async->watcher, on_completion);
-
- oi_queue_init(&async->finished_tasks);
- oi_queue_init(&async->new_tasks);
-
- async->watcher.data = async;
-}
-
-static void
-dispatch_tasks(oi_async *async)
-{
- while(!oi_queue_empty(&async->new_tasks)) {
- oi_queue *last = oi_queue_last(&async->new_tasks);
- oi_queue_remove(last);
- oi_task *task = oi_queue_data(last, oi_task, queue);
-
- // 1. add task to task queue.
- pthread_mutex_lock(&queue_lock);
- oi_queue_insert_head(&waiting_tasks, &task->queue);
- pthread_mutex_unlock(&queue_lock);
-
- // 2. write byte to pipe
- char dummy;
- int written = write(readiness_pipe[1], &dummy, 1);
-
- // 3. TODO make sure byte is written
- assert(written == 1);
- }
-}
-
-void
-oi_async_attach (struct ev_loop *loop, oi_async *async)
-{
- if(active_watchers == 0 && active_workers == 0)
- start_workers();
- active_watchers++;
-
- ev_async_start(loop, &async->watcher);
- async->loop = loop;
-
- dispatch_tasks(async);
-}
-
-void
-oi_async_detach (oi_async *async)
-{
- if(async->loop == NULL)
- return;
- ev_async_stop(async->loop, &async->watcher);
- async->loop = NULL;
- active_watchers--;
- if(active_watchers == 0) {
- //stop_workers();
- }
-}
-
-void
-oi_async_submit (oi_async *async, oi_task *task)
-{
- assert(!task->active);
- assert(task->async == NULL);
- task->async = async;
- task->active = 1;
-
- oi_queue_insert_head(&async->new_tasks, &task->queue);
- if(ev_is_active(&async->watcher)) {
- dispatch_tasks(async);
- }
-}
-
+++ /dev/null
-#include <ev.h>
-#include <pthread.h>
-#include <netdb.h>
-#include <oi.h>
-
-#ifndef oi_async_h
-#define oi_async_h
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct oi_async oi_async;
-typedef struct oi_task oi_task;
-
-struct oi_async {
- /* private */
- ev_async watcher;
- struct ev_loop *loop;
-
- oi_queue finished_tasks;
- oi_queue new_tasks;
-
- /* public */
- void *data;
-};
-
-typedef void (*oi_task_int_cb)(oi_task *, int result);
-typedef void (*oi_task_uint_cb)(oi_task *, unsigned int result);
-typedef void (*oi_task_ssize_cb)(oi_task *, ssize_t result);
-
-struct oi_task {
- /* private */
- oi_async *async;
- oi_queue queue;
- int type;
- union {
-
- struct {
- const char *pathname;
- int flags;
- mode_t mode;
- oi_task_int_cb cb;
- int result;
- } open;
-
- struct {
- int fd;
- void *buf;
- size_t count;
- oi_task_ssize_cb cb;
- ssize_t result;
- } read;
-
- struct {
- int fd;
- const void *buf;
- size_t count;
- oi_task_ssize_cb cb;
- ssize_t result;
- } write;
-
- struct {
- int fd;
- oi_task_int_cb cb;
- int result;
- } close;
-
- struct {
- unsigned int seconds;
- oi_task_uint_cb cb;
- unsigned int result;
- } sleep;
-
- struct {
- int out_fd;
- int in_fd;
- off_t offset;
- size_t count;
- oi_task_ssize_cb cb;
- ssize_t result;
- } eio__sendfile;
-
- struct {
- const char *nodename; /* restrict ? */
- const char *servname; /* restrict ? */
- struct addrinfo *hints;
- struct addrinfo **res; /* restrict ? */
- oi_task_int_cb cb;
- int result;
- } getaddrinfo;
-
- struct {
- const char *path;
- struct stat *buf;
- oi_task_int_cb cb;
- int result;
- } lstat;
-
- } params;
-
- /* read-only */
- volatile unsigned active:1;
- int errorno;
-
- /* public */
- void *data;
-};
-
-void oi_async_init (oi_async *);
-void oi_async_attach (struct ev_loop *loop, oi_async *);
-void oi_async_detach (oi_async *);
-void oi_async_submit (oi_async *, oi_task *);
-
-/* To submit a task for async processing
- * (0) allocate memory for your task
- * (1) initialize the task with one of the functions below
- * (2) optionally set the task->data pointer
- * (3) oi_async_submit() the task
- */
-
-enum { OI_TASK_OPEN
- , OI_TASK_READ
- , OI_TASK_WRITE
- , OI_TASK_CLOSE
- , OI_TASK_SLEEP
- , OI_TASK_SENDFILE
- , OI_TASK_GETADDRINFO
- , OI_TASK_LSTAT
- };
-
-#define oi_task_init_common(task, _type) do {\
- (task)->active = 0;\
- (task)->async = NULL;\
- (task)->type = _type;\
-} while(0)
-
-static inline void
-oi_task_init_open(oi_task *t, oi_task_int_cb cb, const char *pathname, int flags, mode_t mode)
-{
- oi_task_init_common(t, OI_TASK_OPEN);
- t->params.open.cb = cb;
- t->params.open.pathname = pathname;
- t->params.open.flags = flags;
- t->params.open.mode = mode;
-}
-
-static inline void
-oi_task_init_read(oi_task *t, oi_task_ssize_cb cb, int fd, void *buf, size_t count)
-{
- oi_task_init_common(t, OI_TASK_READ);
- t->params.read.cb = cb;
- t->params.read.fd = fd;
- t->params.read.buf = buf;
- t->params.read.count = count;
-}
-
-static inline void
-oi_task_init_write(oi_task *t, oi_task_ssize_cb cb, int fd, const void *buf, size_t count)
-{
- oi_task_init_common(t, OI_TASK_WRITE);
- t->params.write.cb = cb;
- t->params.write.fd = fd;
- t->params.write.buf = buf;
- t->params.write.count = count;
-}
-
-static inline void
-oi_task_init_close(oi_task *t, oi_task_int_cb cb, int fd)
-{
- oi_task_init_common(t, OI_TASK_CLOSE);
- t->params.close.cb = cb;
- t->params.close.fd = fd;
-}
-
-static inline void
-oi_task_init_sleep(oi_task *t, oi_task_uint_cb cb, unsigned int seconds)
-{
- oi_task_init_common(t, OI_TASK_SLEEP);
- t->params.sleep.cb = cb;
- t->params.sleep.seconds = seconds;
-}
-
-static inline void
-oi_task_init_sendfile(oi_task *t, oi_task_ssize_cb cb, int out_fd, int in_fd, off_t offset, size_t count)
-{
- oi_task_init_common(t, OI_TASK_SENDFILE);
- t->params.eio__sendfile.cb = cb;
- t->params.eio__sendfile.out_fd = out_fd;
- t->params.eio__sendfile.in_fd = in_fd;
- t->params.eio__sendfile.offset = offset;
- t->params.eio__sendfile.count = count;
-}
-
-static inline void
-oi_task_init_getaddrinfo(oi_task *t, oi_task_int_cb cb, const char *node,
- const char *service, struct addrinfo *hints, struct addrinfo **res)
-{
- oi_task_init_common(t, OI_TASK_GETADDRINFO);
- t->params.getaddrinfo.cb = cb;
- t->params.getaddrinfo.nodename = node;
- t->params.getaddrinfo.servname = service;
- t->params.getaddrinfo.hints = hints;
- t->params.getaddrinfo.res = res;
-}
-
-static inline void
-oi_task_init_lstat(oi_task *t, oi_task_int_cb cb, const char *path, struct stat *buf)
-{
- oi_task_init_common(t, OI_TASK_LSTAT);
- t->params.lstat.cb = cb;
- t->params.lstat.path = path;
- t->params.lstat.buf = buf;
-}
-
-#ifdef __cplusplus
-}
-#endif
-#endif /* oi_async_h */
extern "C" {
#endif
+enum oi_error_domain
+ { OI_ERROR_GNUTLS
+ , OI_ERROR_EV
+ , OI_ERROR_CLOSE
+ , OI_ERROR_SHUTDOWN
+ , OI_ERROR_OPEN
+ , OI_ERROR_SEND
+ , OI_ERROR_RECV
+ , OI_ERROR_WRITE
+ , OI_ERROR_READ
+ , OI_ERROR_SENDFILE
+ };
+
struct oi_error {
- enum { OI_ERROR_GNUTLS
- , OI_ERROR_EV
- , OI_ERROR_CLOSE
- , OI_ERROR_SHUTDOWN
- , OI_ERROR_OPEN
- , OI_ERROR_SEND
- , OI_ERROR_RECV
- , OI_ERROR_WRITE
- , OI_ERROR_READ
- , OI_ERROR_SENDFILE
- } domain;
+ enum oi_error_domain domain;
int code; /* errno */
};
+++ /dev/null
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <assert.h>
-#include <errno.h>
-
-#include <ev.h>
-#include <oi.h>
-
-#define RELEASE_BUF(buf) if(buf->release) { buf->release(buf); }
-#define DRAIN_CB(file) if(file->on_drain) { file->on_drain(file); }
-#define RAISE_ERROR(s, _domain, _code) do { \
- if(s->on_error) { \
- struct oi_error __oi_error; \
- __oi_error.domain = _domain; \
- __oi_error.code = _code; \
- s->on_error(s, __oi_error); \
- } \
-} while(0) \
-
-/* forwards */
-static void dispatch_write_buf (oi_file *file);
-static void maybe_do_read (oi_file *file);
-
-static void
-after_read(oi_task *task, ssize_t recved)
-{
- oi_file *file = task->data;
-
- if(recved == -1) {
- RAISE_ERROR(file, OI_ERROR_READ, errno);
- return;
- }
-
- if(recved == 0)
- oi_file_read_stop(file);
-
- if(file->on_read)
- file->on_read(file, recved);
-
- maybe_do_read(file);
-}
-
-static void
-maybe_do_read(oi_file *file)
-{
- if ( file->read_buffer == NULL
- || file->write_buf != NULL
- || file->write_socket != NULL
- || !oi_queue_empty(&file->write_queue)
- || file->io_task.active
- ) return;
-
- assert(file->fd > 0);
-
- oi_task_init_read ( &file->io_task
- , after_read
- , file->fd
- , file->read_buffer
- , file->read_buffer_size
- );
- file->io_task.data = file;
- oi_async_submit(&file->async, &file->io_task);
-}
-
-static void
-submit_read (oi_file *file)
-{
-}
-
-int
-oi_file_init (oi_file *file)
-{
- oi_async_init(&file->async);
- file->async.data = file;
-
- oi_queue_init(&file->write_queue);
-
- file->fd = -1;
- file->loop = NULL;
- file->write_buf = NULL;
- file->read_buffer = NULL;
-
- file->on_open = NULL;
- file->on_read = NULL;
- file->on_drain = NULL;
- file->on_error = NULL;
- file->on_close = NULL;
- return 0;
-}
-
-void
-oi_file_read_start (oi_file *file, void *buffer, size_t bufsize)
-{
- file->read_buffer = buffer;
- file->read_buffer_size = bufsize;
- maybe_do_read(file);
-}
-
-void
-oi_file_read_stop (oi_file *file)
-{
- file->read_buffer = NULL;
-}
-
-void
-oi_api_free_buf_with_heap_base(oi_buf *buf)
-{
- free(buf->base);
- free(buf);
-}
-
-static void
-after_open(oi_task *task, int result)
-{
- oi_file *file = task->data;
-
- if(result == -1) {
- RAISE_ERROR(file, OI_ERROR_OPEN, errno);
- return;
- }
-
- file->fd = result;
-
- if(file->on_open) {
- file->on_open(file);
- }
-
- maybe_do_read(file);
-}
-
-int
-oi_file_open_path (oi_file *file, const char *path, int flags, mode_t mode)
-{
- if(file->fd >= 0)
- return -1;
- oi_task_init_open( &file->io_task
- , after_open
- , path
- , flags
- , mode
- );
- file->io_task.data = file;
- oi_async_submit(&file->async, &file->io_task);
- return 0;
-}
-
-int
-oi_file_open_stdin (oi_file *file)
-{
- if(file->fd >= 0)
- return -1;
- file->fd = STDIN_FILENO;
- if(file->on_open)
- file->on_open(file);
- return 0;
-}
-
-int
-oi_file_open_stdout (oi_file *file)
-{
- if(file->fd >= 0)
- return -1;
- file->fd = STDOUT_FILENO;
- if(file->on_open)
- file->on_open(file);
- return 0;
-}
-
-int
-oi_file_open_stderr (oi_file *file)
-{
- if(file->fd >= 0)
- return -1;
- file->fd = STDERR_FILENO;
- if(file->on_open)
- file->on_open(file);
- return 0;
-}
-
-void
-oi_file_attach (oi_file *file, struct ev_loop *loop)
-{
- oi_async_attach (loop, &file->async);
- file->loop = loop;
-}
-
-void
-oi_file_detach (oi_file *file)
-{
- oi_async_detach (&file->async);
- file->loop = NULL;
-}
-
-static void
-after_write(oi_task *task, ssize_t result)
-{
- oi_file *file = task->data;
-
- if(result == -1) {
- RAISE_ERROR(file, OI_ERROR_WRITE, errno);
- return;
- }
-
- assert(file->write_buf != NULL);
- oi_buf *buf = file->write_buf;
-
- buf->written += result;
- if(buf->written < buf->len) {
- oi_task_init_write ( &file->io_task
- , after_write
- , file->fd
- , buf->base + buf->written
- , buf->len - buf->written
- );
- file->io_task.data = file;
- oi_async_submit(&file->async, &file->io_task);
- return;
- }
-
- assert(buf->written == buf->len);
-
- RELEASE_BUF(file->write_buf);
- file->write_buf = NULL;
-
- if(oi_queue_empty(&file->write_queue)) {
- DRAIN_CB(file);
- maybe_do_read(file);
- } else {
- dispatch_write_buf(file);
- }
-
- return;
-}
-
-static void
-dispatch_write_buf(oi_file *file)
-{
- if(file->write_buf != NULL)
- return;
- if(oi_queue_empty(&file->write_queue))
- return;
-
- oi_queue *q = oi_queue_last(&file->write_queue);
- oi_queue_remove(q);
- oi_buf *buf = file->write_buf = oi_queue_data(q, oi_buf, queue);
-
- assert(!file->io_task.active);
- oi_task_init_write ( &file->io_task
- , after_write
- , file->fd
- , buf->base + buf->written
- , buf->len - buf->written
- );
- file->io_task.data = file;
- oi_async_submit(&file->async, &file->io_task);
-}
-
-int
-oi_file_write (oi_file *file, oi_buf *buf)
-{
- if(file->fd < 0)
- return -1;
- if(file->read_buffer)
- return -2;
- /* TODO better business check*/
-
- buf->written = 0;
- oi_queue_insert_head(&file->write_queue, &buf->queue);
- dispatch_write_buf(file);
-
- return 0;
-}
-
-// Writes a string to the file.
-// NOTE: Allocates memory. Avoid for performance applications.
-int
-oi_file_write_simple (oi_file *file, const char *str, size_t len)
-{
- if(file->fd < 0)
- return -1;
- if(file->read_buffer)
- return -2;
- /* TODO better business check*/
-
- oi_buf *buf = malloc(sizeof(oi_buf));
- buf->base = malloc(len);
- memcpy(buf->base, str, len);
- buf->len = len;
- buf->release = oi_api_free_buf_with_heap_base;
-
- oi_file_write(file, buf);
- return 0;
-}
-
-static void
-clear_write_queue(oi_file *file)
-{
- while(!oi_queue_empty(&file->write_queue)) {
- oi_queue *q = oi_queue_last(&file->write_queue);
- oi_queue_remove(q);
- oi_buf *buf = oi_queue_data(q, oi_buf, queue);
- RELEASE_BUF(buf);
- }
-}
-
-static void
-after_close(oi_task *task, int result)
-{
- oi_file *file = task->data;
-
- assert(oi_queue_empty(&file->write_queue));
-
- if(result == -1) {
- RAISE_ERROR(file, OI_ERROR_CLOSE, errno);
- return;
- // TODO try to close again?
- }
-
- file->fd = -1;
- // TODO deinit task_queue, detach thread_pool_result_watcher
-
- if(file->on_close) {
- file->on_close(file);
- }
-
- return;
-}
-
-void
-oi_file_close (oi_file *file)
-{
- assert(file->fd >= 0 && "file not open!");
- clear_write_queue(file);
- oi_task_init_close ( &file->io_task
- , after_close
- , file->fd
- );
- file->io_task.data = file;
- oi_async_submit(&file->async, &file->io_task);
-}
-
-static void
-after_sendfile(oi_task *task, ssize_t sent)
-{
- oi_file *file = task->data;
- oi_socket *socket = file->write_socket;
- assert(socket != NULL);
- file->write_socket = NULL;
-
- if(sent == -1) {
- RAISE_ERROR(file, OI_ERROR_SENDFILE, errno);
- return;
- }
-
- if(socket->on_drain) {
- socket->on_drain(socket);
- }
-
- maybe_do_read(file);
-}
-
-int
-oi_file_send (oi_file *file, oi_socket *destination, off_t offset, size_t count)
-{
- if(file->fd < 0)
- return -1;
- if(file->read_buffer)
- return -2;
- /* TODO better business check*/
-
- assert(file->write_socket == NULL);
- // (1) make sure the write queue on the socket is cleared.
- //
- // (2)
- //
- file->write_socket = destination;
- oi_task_init_sendfile ( &file->io_task
- , after_sendfile
- , destination->fd
- , file->fd
- , offset
- , count
- );
- file->io_task.data = file;
- oi_async_submit(&file->async, &file->io_task);
-
- return 0;
-}
-
+++ /dev/null
-#include <oi.h>
-#include <ev.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#ifndef oi_file_h
-#define oi_file_h
-
-typedef struct oi_file oi_file;
-
-int oi_file_init (oi_file *);
-
-void oi_file_attach (oi_file *, struct ev_loop *);
-void oi_file_detach (oi_file *);
-
-/* WARNING oi_file_open_path: path argument must be valid until oi_file
- * object is closed and the on_close() callback is made. oi does not strdup
- * the path pointer. */
-int oi_file_open_path (oi_file *, const char *path, int flags, mode_t mode);
-int oi_file_open_stdin (oi_file *);
-int oi_file_open_stdout (oi_file *);
-int oi_file_open_stderr (oi_file *);
-
-void oi_file_read_start (oi_file *, void *buffer, size_t bufsize);
-void oi_file_read_stop (oi_file *);
-int oi_file_write (oi_file *, oi_buf *to_write);
-int oi_file_write_simple (oi_file *, const char *, size_t);
-int oi_file_send (oi_file *source, oi_socket *destination, off_t offset, size_t length);
-void oi_file_close (oi_file *);
-
-struct oi_file {
- /* private */
- oi_async async;
- oi_task io_task;
- struct ev_loop *loop;
- oi_queue write_queue;
- oi_buf *write_buf; /* TODO this pointer is unnecessary - remove and just look at first element of the queue */
- oi_socket *write_socket;
- void *read_buffer;
- size_t read_buffer_size;
-
- /* read-only */
- int fd;
-
- /* public */
- void (*on_open) (oi_file *);
- void (*on_read) (oi_file *, size_t count);
- void (*on_drain) (oi_file *);
- void (*on_error) (oi_file *, struct oi_error);
- void (*on_close) (oi_file *);
- void *data;
-};
-
-#ifdef __cplusplus
-}
-#endif
-#endif /* oi_file_h */
#include <ev.h>
#include <oi_socket.h>
+#if EV_MULTIPLICITY
+# define SOCKET_LOOP_ socket->loop,
+# define SERVER_LOOP_ server->loop,
+#else
+# define SOCKET_LOOP_
+# define SERVER_LOOP_
+#endif // EV_MULTIPLICITY
+
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1)
socket->read_action = NULL;
socket->write_action = NULL;
- if(socket->loop) {
- ev_feed_event(socket->loop, &socket->read_watcher, EV_READ);
+ if(socket->attached) {
+ ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ);
}
return OKAY;
}
}
if(oi_queue_empty(&socket->out_stream)) {
- ev_io_stop(socket->loop, &socket->write_watcher);
+ ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
if(socket->on_drain)
socket->on_drain(socket);
}
ssize_t sent;
if(oi_queue_empty(&socket->out_stream)) {
- ev_io_stop(socket->loop, &socket->write_watcher);
+ ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
assert(socket->secure == FALSE);
if(oi_queue_empty(&socket->out_stream)) {
- ev_io_stop(socket->loop, &socket->write_watcher);
+ ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
* Called by server->connection_watcher.
*/
static void
-on_connection(struct ev_loop *loop, ev_io *watcher, int revents)
+on_connection(EV_P_ ev_io *watcher, int revents)
{
oi_server *server = watcher->data;
// printf("on connection!\n");
assert(server->listening);
+#if EV_MULTIPLICITY
assert(server->loop == loop);
+#endif
assert(&server->connection_watcher == watcher);
if(EV_ERROR & revents) {
socket->server = server;
assign_file_descriptor(socket, fd);
- oi_socket_attach(socket, loop);
+ oi_socket_attach(EV_A_ socket);
}
int
}
void
-oi_server_attach (oi_server *server, struct ev_loop *loop)
+oi_server_attach (EV_P_ oi_server *server)
{
- ev_io_start (loop, &server->connection_watcher);
+ ev_io_start (EV_A_ &server->connection_watcher);
+#if EV_MULTIPLICITY
server->loop = loop;
+#endif
+ server->attached = TRUE;
}
void
oi_server_detach (oi_server *server)
{
- ev_io_stop (server->loop, &server->connection_watcher);
+ ev_io_stop (SERVER_LOOP_ &server->connection_watcher);
+#if EV_MULTIPLICITY
server->loop = NULL;
+#endif
+ server->attached = FALSE;
}
void
oi_server_init(oi_server *server, int backlog)
{
server->backlog = backlog;
+ server->attached = FALSE;
server->listening = FALSE;
server->fd = -1;
server->connection_watcher.data = server;
/* Internal callback. called by socket->timeout_watcher */
static void
-on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents)
+on_timeout(EV_P_ ev_timer *watcher, int revents)
{
oi_socket *socket = watcher->data;
/* Internal callback. called by socket->read_watcher */
static void
-on_io_event(struct ev_loop *loop, ev_io *watcher, int revents)
+on_io_event(EV_P_ ev_io *watcher, int revents)
{
oi_socket *socket = watcher->data;
close:
release_write_buffer(socket);
- ev_clear_pending (socket->loop, &socket->write_watcher);
- ev_clear_pending (socket->loop, &socket->read_watcher);
- ev_clear_pending (socket->loop, &socket->timeout_watcher);
+ ev_clear_pending (EV_A_ &socket->write_watcher);
+ ev_clear_pending (EV_A_ &socket->read_watcher);
+ ev_clear_pending (EV_A_ &socket->timeout_watcher);
oi_socket_detach(socket);
{
socket->fd = -1;
socket->server = NULL;
+#if EV_MULTIPLICITY
socket->loop = NULL;
+#endif
+ socket->attached = FALSE;
socket->connected = FALSE;
oi_queue_init(&socket->out_stream);
)
{
socket->write_action = secure_half_goodbye;
- if(socket->loop)
- ev_io_start(socket->loop, &socket->write_watcher);
+ if(socket->attached)
+ ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
return;
}
/* secure servers cannot handle half-closed connections? */
socket->read_action = NULL;
}
- if(socket->loop)
- ev_io_start(socket->loop, &socket->write_watcher);
+ if(socket->attached)
+ ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
return;
}
void
oi_socket_reset_timeout(oi_socket *socket)
{
- ev_timer_again(socket->loop, &socket->timeout_watcher);
+ ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
}
/**
oi_queue_insert_head(&socket->out_stream, &buf->queue);
buf->written = 0;
- ev_io_start(socket->loop, &socket->write_watcher);
+ // XXX if (socket->attached) ??
+ ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
}
static void
}
void
-oi_socket_attach(oi_socket *socket, struct ev_loop *loop)
+oi_socket_attach(EV_P_ oi_socket *socket)
{
+#if EV_MULTIPLICITY
socket->loop = loop;
+#endif
+ socket->attached = TRUE;
- ev_timer_again(loop, &socket->timeout_watcher);
+ ev_timer_again(EV_A_ &socket->timeout_watcher);
if(socket->read_action)
- ev_io_start(loop, &socket->read_watcher);
+ ev_io_start(EV_A_ &socket->read_watcher);
if(socket->write_action)
- ev_io_start(loop, &socket->write_watcher);
+ ev_io_start(EV_A_ &socket->write_watcher);
/* make sure the io_event happens soon in the case we're being reattached */
- ev_feed_event(loop, &socket->read_watcher, EV_READ);
+ ev_feed_event(EV_A_ &socket->read_watcher, EV_READ);
}
void
oi_socket_detach(oi_socket *socket)
{
- if(socket->loop) {
- ev_io_stop(socket->loop, &socket->write_watcher);
- ev_io_stop(socket->loop, &socket->read_watcher);
- ev_timer_stop(socket->loop, &socket->timeout_watcher);
+ if(socket->attached) {
+ ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
+ ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
+ ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
+#if EV_MULTIPLICITY
socket->loop = NULL;
+#endif
+ socket->attached = FALSE;
}
}
void
oi_socket_read_stop (oi_socket *socket)
{
- ev_io_stop(socket->loop, &socket->read_watcher);
- ev_clear_pending (socket->loop, &socket->read_watcher);
+ ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
+ ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher);
}
void
oi_socket_read_start (oi_socket *socket)
{
if(socket->read_action) {
- ev_io_start(socket->loop, &socket->read_watcher);
+ ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
/* XXX feed event? */
}
}
void oi_server_init (oi_server *, int backlog);
int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
-void oi_server_attach (oi_server *, struct ev_loop *loop);
+void oi_server_attach (EV_P_ oi_server *);
void oi_server_detach (oi_server *);
void oi_server_close (oi_server *);
void oi_socket_init (oi_socket *, float timeout);
int oi_socket_pair (oi_socket *a, oi_socket *b); /* TODO */
int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
-void oi_socket_attach (oi_socket *, struct ev_loop *loop);
+void oi_socket_attach (EV_P_ oi_socket *);
void oi_socket_detach (oi_socket *);
void oi_socket_read_start (oi_socket *);
void oi_socket_read_stop (oi_socket *);
/* read only */
int fd;
int backlog;
+#if EV_MULTIPLICITY
struct ev_loop *loop;
+#endif
+ unsigned attached:1;
unsigned listening:1;
/* private */
struct oi_socket {
/* read only */
int fd;
+#if EV_MULTIPLICITY
struct ev_loop *loop;
+#endif
oi_server *server;
oi_queue out_stream;
size_t written;
+ unsigned attached:1;
unsigned connected:1;
unsigned secure:1;
unsigned wait_for_secure_hangup:1;
main(int argc, const char *argv[])
{
int r;
- struct ev_loop *loop = ev_default_loop(0);
oi_server_init(&server, 1000);
server.on_connection = on_server_connection;
#endif
oi_server_listen(&server, servinfo);
- oi_server_attach(&server, loop);
+ oi_server_attach(EV_DEFAULT_ &server);
int i;
for(i = 0; i < NCONN; i++) {
#endif
r = oi_socket_connect(client, servinfo);
assert(r == 0 && "problem connecting");
- oi_socket_attach(client, loop);
+ oi_socket_attach(EV_DEFAULT_ client);
}
- ev_loop(loop, 0);
+ ev_loop(EV_DEFAULT_ 0);
assert(nconnections == NCONN);
main(int argc, const char *argv[])
{
int r;
- struct ev_loop *loop = ev_default_loop(0);
oi_server server;
oi_socket client;
#endif
r = oi_server_listen(&server, servinfo);
assert(r == 0);
- oi_server_attach(&server, loop);
+ oi_server_attach(EV_DEFAULT_ &server);
- ev_loop(loop, 0);
+ ev_loop(EV_DEFAULT_ 0);
#if TCP
freeaddrinfo(servinfo);
+++ /dev/null
-/* This test uses most of oi's facilities. It starts by opening a new file
- * and writing N characters to it. Once the first chunk is written, it opens
- * that file with another handle
- *
- * /tmp/oi_test_src == /tmp/oi_test_dst
- * | ^
- * | |
- * stream | | write
- * | |
- * V |
- * client ----------> server/connection
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <assert.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <stdint.h>
-
-#ifndef TRUE
-# define TRUE 1
-#endif
-#ifndef FALSE
-# define FALSE 0
-#endif
-
-#include <oi.h>
-#include <oi_file.h>
-
-#define PORT "5555"
-
-static struct ev_loop *loop;
-static oi_file file_src;
-static oi_file file_dst;
-static oi_socket client;
-static oi_server server;
-static oi_socket connection;
-static int got_connection = 0;
-
-static char *src_filename;
-static char *dst_filename;
-
-static void
-file_error(oi_file *_)
-{
- assert(0);
-}
-
-static void
-on_file_dst_open (oi_file *_)
-{
- assert(connection.fd > 0);
- oi_socket_read_start(&connection);
- //printf("file_dst is open\n");
-}
-
-static void
-on_file_dst_close (oi_file *_)
-{
- //printf("file dst closed\n");
- oi_file_detach(&file_dst);
-}
-
-static void
-on_connection_read (oi_socket *_, const void *buf, size_t count)
-{
- assert(file_dst.fd > 0);
- if(count == 0) {
- file_dst.on_drain = oi_file_close;
- oi_socket_close(&connection);
- } else {
- oi_file_write_simple(&file_dst, buf, count);
- }
-}
-
-static void
-on_connection_connect (oi_socket *_)
-{
- oi_file_init(&file_dst);
- file_dst.on_open = on_file_dst_open;
- file_dst.on_close = on_file_dst_close;
- oi_file_open_path(&file_dst, dst_filename, O_WRONLY | O_CREAT, 0644);
- oi_file_attach(&file_dst, loop);
-
- oi_socket_read_stop(&connection);
-}
-
-static void
-on_connection_timeout (oi_socket *_)
-{
- assert(0);
-}
-
-static void
-on_connection_error (oi_socket *_, struct oi_error e)
-{
- assert(0);
-}
-
-static void
-on_connection_close (oi_socket *_)
-{
- oi_server_close(&server);
- oi_server_detach(&server);
-}
-
-static oi_socket*
-on_server_connection(oi_server *_, struct sockaddr *addr, socklen_t len)
-{
- assert(got_connection == FALSE);
- oi_socket_init(&connection, 5.0);
- connection.on_connect = on_connection_connect;
- connection.on_read = on_connection_read;
- connection.on_error = on_connection_error;
- connection.on_close = on_connection_close;
- connection.on_timeout = on_connection_timeout;
- connection.on_drain = oi_socket_close;
- got_connection = TRUE;
- //printf("on server connection\n");
- return &connection;
-}
-
-static void
-on_client_read (oi_socket *_, const void *buf, size_t count)
-{
- assert(0);
-}
-
-static void
-on_client_error (oi_socket *_, struct oi_error e)
-{
- assert(0);
-}
-
-static void
-on_client_connect (oi_socket *_)
-{
- if(file_src.fd > 0) {
- oi_file_send(&file_src, &client, 0, 50*1024);
- }
-}
-
-static void
-on_client_drain (oi_socket *_)
-{
- oi_socket_close(&client);
- oi_file_close(&file_src);
-}
-
-static void
-on_file_src_open (oi_file *_)
-{
- if(client.fd > 0) {
- oi_file_send(&file_src, &client, 0, 50*1024);
- }
-}
-
-static void
-on_client_timeout (oi_socket *_)
-{
- assert(0);
-}
-
-int
-main(int argc, char *argv[])
-{
- int r;
- loop = ev_default_loop(0);
-
- assert(argc == 3);
- src_filename = argv[1];
- dst_filename = argv[2];
-
- assert(strlen(src_filename) > 0);
- assert(strlen(dst_filename) > 0);
-
- oi_server_init(&server, 10);
- server.on_connection = on_server_connection;
-
- struct addrinfo *servinfo;
- struct addrinfo hints;
- memset(&hints, 0, sizeof hints);
-
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
- r = getaddrinfo(NULL, PORT, &hints, &servinfo);
- assert(r == 0);
-
- r = oi_server_listen(&server, servinfo);
- assert(r >= 0 && "problem listening");
- oi_server_attach(&server, loop);
-
- oi_socket_init(&client, 5.0);
- client.on_read = on_client_read;
- client.on_error = on_client_error;
- client.on_connect = on_client_connect;
- client.on_timeout = on_client_timeout;
- //client.on_close = oi_socket_detach;
- client.on_drain = on_client_drain;
- r = oi_socket_connect(&client, servinfo);
- assert(r == 0 && "problem connecting");
- oi_socket_attach(&client, loop);
-
- oi_file_init(&file_src);
- file_src.on_open = on_file_src_open;
- file_src.on_drain = file_error;
- file_src.on_close = oi_file_detach;
- oi_file_open_path(&file_src, src_filename, O_RDONLY, 0700);
- oi_file_attach(&file_src, loop);
-
- ev_loop(loop, 0);
-
- printf("\noi_file: %d bytes\n", sizeof(oi_file));
- printf("oi_socket: %d bytes\n", sizeof(oi_socket));
-
- return 0;
-}
-
on_client_close(oi_socket *socket)
{
//printf("client connection closed\n");
- ev_unloop(socket->loop, EVUNLOOP_ALL);
+ ev_unloop(EV_DEFAULT_ EVUNLOOP_ALL);
}
static oi_socket*
main(int argc, const char *argv[])
{
int r;
- struct ev_loop *loop = ev_default_loop(0);
oi_server server;
oi_socket client;
#endif
r = oi_server_listen(&server, servinfo);
assert(r == 0);
- oi_server_attach(&server, loop);
+ oi_server_attach(EV_DEFAULT_ &server);
oi_socket_init(&client, 5.0);
client.on_read = on_client_read;
r = oi_socket_connect(&client, servinfo);
assert(r == 0 && "problem connecting");
- oi_socket_attach(&client, loop);
+ oi_socket_attach(EV_DEFAULT_ &client);
- ev_loop(loop, 0);
+ ev_loop(EV_DEFAULT_ 0);
assert(successful_ping_count == EXCHANGES + 1);
assert(nconnections == 1);
+++ /dev/null
-#include <unistd.h> /* sleep() */
-#include <stdlib.h> /* malloc(), free() */
-#include <assert.h>
-#include <ev.h>
-#include <oi_async.h>
-
-#define SLEEPS 4
-static int runs = 0;
-
-static void
-done (oi_task *task, unsigned int result)
-{
- assert(result == 0);
- if(++runs == SLEEPS) {
- ev_timer *timer = task->data;
- ev_timer_stop(task->async->loop, timer);
- oi_async_detach(task->async);
- }
- free(task);
-}
-
-static void
-on_timeout(struct ev_loop *loop, ev_timer *w, int events)
-{
- assert(0 && "timeout before all sleeping tasks were complete!");
-}
-
-int
-main()
-{
- struct ev_loop *loop = ev_default_loop(0);
- oi_async async;
- ev_timer timer;
- int i;
-
- oi_async_init(&async);
- for(i = 0; i < SLEEPS; i++) {
- oi_task *task = malloc(sizeof(oi_task));
- oi_task_init_sleep(task, done, 1);
- task->data = &timer;
- oi_async_submit(&async, task);
- }
- oi_async_attach(loop, &async);
-
-
- ev_timer_init (&timer, on_timeout, 1.2, 0.);
- ev_timer_start (loop, &timer);
-
- ev_loop(loop, 0);
-
- assert(runs == SLEEPS);
-
- return 0;
-}
+++ /dev/null
-#include <unistd.h> /* sleep() */
-#include <stdlib.h> /* malloc(), free() */
-#include <stdio.h>
-#include <errno.h>
-#include <assert.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-#include <ev.h>
-#include <oi_file.h>
-
-static oi_file file;
-static oi_file out;
-
-#define READ_BUFSIZE (5)
-static char read_buf[READ_BUFSIZE];
-# define SEP "~~~~~~~~~~~~~~~~~~~~~~\n"
-
-static void
-on_open(oi_file *f)
-{
- oi_file_write_simple(&out, "\n", 1);
- oi_file_write_simple(&out, SEP, sizeof(SEP));
- oi_file_read_start(f, read_buf, READ_BUFSIZE);
-}
-
-static void
-on_close(oi_file *f)
-{
- oi_file_write_simple(&out, SEP, sizeof(SEP));
- oi_file_detach(f);
- out.on_drain = oi_file_detach;
-}
-
-static void
-on_read(oi_file *f, size_t recved)
-{
- if(recved == 0) /* EOF */
- oi_file_close(f);
- else
- oi_file_write_simple(&out, read_buf, recved);
-}
-
-int
-main()
-{
- struct ev_loop *loop = ev_default_loop(0);
-
- oi_file_init(&file);
- file.on_open = on_open;
- file.on_read = on_read;
- file.on_close = on_close;
- oi_file_open_path(&file, "LICENSE", O_RDONLY, 0);
- oi_file_attach(&file, loop);
-
- oi_file_init(&out);
- oi_file_open_stdout(&out);
- oi_file_attach(&out, loop);
-
- ev_loop(loop, 0);
- return 0;
-}
{
int r = oi_server_listen(&server, servinfo);
if(r == 0)
- oi_server_attach(&server, node_loop());
+ oi_server_attach(EV_DEFAULT_UC_ &server);
return r;
}
r = oi_server_listen(&server->server_, address);
if (r != 0)
return ThrowException(String::New("Error listening on port"));
- oi_server_attach(&server->server_, node_loop());
+ oi_server_attach(EV_DEFAULT_UC_ &server->server_);
freeaddrinfo(address);
// no error. return.
if(r == 0 && req->result == 0) {
- oi_socket_attach (&socket->socket_, node_loop());
+ oi_socket_attach (EV_DEFAULT_UC_ &socket->socket_);
return 0;
}
node_fatal_exception (TryCatch &try_catch)
{
ReportException(&try_catch);
- ev_unloop(node_loop(), EVUNLOOP_ALL);
+ ev_unloop(EV_DEFAULT_UC_ EVUNLOOP_ALL);
exit_code = 1;
}
void node_exit (int code)
{
exit_code = code;
- ev_unloop(node_loop(), EVUNLOOP_ALL);
+ ev_unloop(EV_DEFAULT_UC_ EVUNLOOP_ALL);
}
int
main (int argc, char *argv[])
{
+ ev_default_loop(EVFLAG_AUTO); // initialize the default ev loop.
+
// start eio thread pool
ev_async_init(&thread_pool_watcher, thread_pool_cb);
eio_init(thread_pool_want_poll, NULL);
ExecuteString(String::New(native_main), String::New("main.js"));
if (try_catch.HasCaught()) goto native_js_error;
- ev_loop(node_loop(), 0);
+ ev_loop(EV_DEFAULT_UC_ 0);
context.Dispose();
enum encoding {UTF8, RAW};
void node_fatal_exception (v8::TryCatch &try_catch);
-#define node_loop() ev_default_loop(0)
void node_exit (int code);
// call this after creating a new eio event.
ev_timer_init(&watcher_, Timer::OnTimeout, after, repeat);
watcher_.data = this;
- ev_timer_start(node_loop(), &watcher_);
+ ev_timer_start(EV_DEFAULT_UC_ &watcher_);
}
Timer::~Timer ()
{
- ev_timer_stop (node_loop(), &watcher_);
+ ev_timer_stop (EV_DEFAULT_UC_ &watcher_);
handle_->SetInternalField(0, Undefined());
handle_.Dispose();
}