initial commit
authorLennart Poettering <lennart@poettering.net>
Tue, 8 Jun 2004 23:54:24 +0000 (23:54 +0000)
committerLennart Poettering <lennart@poettering.net>
Tue, 8 Jun 2004 23:54:24 +0000 (23:54 +0000)
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@3 fefdeb5f-60dc-0310-8127-8f9354f1896f

47 files changed:
src/Makefile [new file with mode: 0644]
src/client.c [new file with mode: 0644]
src/client.h [new file with mode: 0644]
src/core.c [new file with mode: 0644]
src/core.h [new file with mode: 0644]
src/idxset.c [new file with mode: 0644]
src/idxset.h [new file with mode: 0644]
src/inputstream.c [new file with mode: 0644]
src/inputstream.h [new file with mode: 0644]
src/iochannel.c [new file with mode: 0644]
src/iochannel.h [new file with mode: 0644]
src/main.c [new file with mode: 0644]
src/mainloop.c [new file with mode: 0644]
src/mainloop.h [new file with mode: 0644]
src/memblock.c [new file with mode: 0644]
src/memblock.h [new file with mode: 0644]
src/memblockq.c [new file with mode: 0644]
src/memblockq.h [new file with mode: 0644]
src/module.c [new file with mode: 0644]
src/module.h [new file with mode: 0644]
src/oss.c [new file with mode: 0644]
src/outputstream.c [new file with mode: 0644]
src/outputstream.h [new file with mode: 0644]
src/packet.c [new file with mode: 0644]
src/packet.h [new file with mode: 0644]
src/protocol-native-tcp.c [new file with mode: 0644]
src/protocol-native-unix.c [new file with mode: 0644]
src/protocol-native.c [new file with mode: 0644]
src/protocol-native.h [new file with mode: 0644]
src/protocol-simple-tcp.c [new file with mode: 0644]
src/protocol-simple.c [new file with mode: 0644]
src/protocol-simple.h [new file with mode: 0644]
src/pstream.c [new file with mode: 0644]
src/pstream.h [new file with mode: 0644]
src/queue.c [new file with mode: 0644]
src/queue.h [new file with mode: 0644]
src/sample.c [new file with mode: 0644]
src/sample.h [new file with mode: 0644]
src/sink-pipe.c [new file with mode: 0644]
src/sink.c [new file with mode: 0644]
src/sink.h [new file with mode: 0644]
src/socket-server.c [new file with mode: 0644]
src/socket-server.h [new file with mode: 0644]
src/source.c [new file with mode: 0644]
src/source.h [new file with mode: 0644]
src/strbuf.c [new file with mode: 0644]
src/strbuf.h [new file with mode: 0644]

diff --git a/src/Makefile b/src/Makefile
new file mode 100644 (file)
index 0000000..366e84e
--- /dev/null
@@ -0,0 +1,10 @@
+CFLAGS=-Wall -pipe -ansi -D_GNU_SOURCE
+
+all: idxset.o queue.o strbuf.o mainloop.o iochannel.o packet.o \
+       memblock.o sample.o socket-server.o memblockq.o client.o \
+       core.o main.o outputstream.o inputstream.o source.o sink.o \
+       pstream.o protocol-simple.o protocol-simple-tcp.o sink-pipe.o \
+       module.o
+
+clean:
+       rm -f *.o
diff --git a/src/client.c b/src/client.c
new file mode 100644 (file)
index 0000000..56d8573
--- /dev/null
@@ -0,0 +1,32 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "client.h"
+
+struct client *client_new(struct core *core, const char *protocol_name, char *name) {
+    struct client *c;
+    int r;
+    assert(core);
+
+    c = malloc(sizeof(struct client));
+    assert(c);
+    c->protocol_name = protocol_name;
+    c->name = name ? strdup(name) : NULL;
+    c->kill = NULL;
+    c->userdata = NULL;
+    c->core = core;
+
+    r = idxset_put(core->clients, c, &c->index);
+    assert(c->index != IDXSET_INVALID && r >= 0);
+    
+    return c;
+}
+
+void client_free(struct client *c) {
+    assert(c && c->core);
+
+    idxset_remove_by_data(c->core->clients, c, NULL);
+    free(c->name);
+    free(c);
+}
diff --git a/src/client.h b/src/client.h
new file mode 100644 (file)
index 0000000..7128a45
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef fooclienthfoo
+#define fooclienthfoo
+
+#include "core.h"
+
+struct client {
+    char *name;
+    uint32_t index;
+    
+    const char *protocol_name;
+
+    void *userdata;
+    void (*kill)(struct client *c);
+
+    struct core *core;
+};
+
+struct client *client_new(struct core *c, const char *protocol_name, char *name);
+void client_free(struct client *c);
+
+#endif
diff --git a/src/core.c b/src/core.c
new file mode 100644 (file)
index 0000000..7cfa66e
--- /dev/null
@@ -0,0 +1,81 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <stdio.h>
+
+#include "core.h"
+#include "module.h"
+#include "sink.h"
+#include "source.h"
+
+struct core* core_new(struct mainloop *m) {
+    struct core* c;
+    c = malloc(sizeof(struct core));
+    assert(c);
+
+    c->mainloop = m;
+    c->clients = idxset_new(NULL, NULL);
+    c->sinks = idxset_new(NULL, NULL);
+    c->sources = idxset_new(NULL, NULL);
+    c->output_streams = idxset_new(NULL, NULL);
+    c->input_streams = idxset_new(NULL, NULL);
+
+    c->default_source_index = c->default_sink_index = IDXSET_INVALID;
+
+    c->modules = NULL;
+    
+    return c;
+};
+
+void core_free(struct core *c) {
+    assert(c);
+
+    module_unload_all(c);
+    assert(!c->modules);
+    
+    assert(idxset_isempty(c->clients));
+    idxset_free(c->clients, NULL, NULL);
+    
+    assert(idxset_isempty(c->sinks));
+    idxset_free(c->sinks, NULL, NULL);
+
+    assert(idxset_isempty(c->sources));
+    idxset_free(c->sources, NULL, NULL);
+    
+    assert(idxset_isempty(c->output_streams));
+    idxset_free(c->output_streams, NULL, NULL);
+    
+    assert(idxset_isempty(c->input_streams));
+    idxset_free(c->input_streams, NULL, NULL);
+
+    free(c);    
+};
+
+struct sink* core_get_default_sink(struct core *c) {
+    struct sink *sink;
+    assert(c);
+
+    if ((sink = idxset_get_by_index(c->sinks, c->default_sink_index)))
+        return sink;
+
+    if (!(sink = idxset_rrobin(c->sinks, NULL)))
+        return NULL;
+
+    fprintf(stderr, "Default sink vanished, setting to %u\n", sink->index);
+    c->default_sink_index = sink->index;
+    return sink;
+}
+
+struct source* core_get_default_source(struct core *c) {
+    struct source *source;
+    assert(c);
+
+    if ((source = idxset_get_by_index(c->sources, c->default_source_index)))
+        return source;
+
+    if (!(source = idxset_rrobin(c->sources, NULL)))
+        return NULL;
+
+    fprintf(stderr, "Default source vanished, setting to %u\n", source->index);
+    c->default_source_index = source->index;
+    return source;
+}
diff --git a/src/core.h b/src/core.h
new file mode 100644 (file)
index 0000000..649c9db
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef foocorehfoo
+#define foocorehfoo
+
+#include "idxset.h"
+#include "mainloop.h"
+
+struct core {
+    struct mainloop *mainloop;
+
+    struct idxset *clients, *sinks, *sources, *output_streams, *input_streams, *modules;
+
+    uint32_t default_source_index, default_sink_index;
+};
+
+struct core* core_new(struct mainloop *m);
+void core_free(struct core*c);
+
+struct sink* core_get_default_sink(struct core *c);
+struct source* core_get_default_source(struct core *c);
+
+#endif
diff --git a/src/idxset.c b/src/idxset.c
new file mode 100644 (file)
index 0000000..eaea34f
--- /dev/null
@@ -0,0 +1,329 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "idxset.h"
+
+struct idxset_entry {
+    void *data;
+    uint32_t index;
+    unsigned hash_value;
+
+    struct idxset_entry *hash_prev, *hash_next;
+    struct idxset_entry* iterate_prev, *iterate_next;
+};
+
+struct idxset {
+    unsigned (*hash_func) (void *p);
+    int (*compare_func)(void *a, void *b);
+    
+    unsigned hash_table_size, n_entries;
+    struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail, *rrobin;
+    uint32_t index, start_index, array_size;
+};
+
+static unsigned trivial_hash_func(void *p) {
+    return (unsigned) p;
+}
+
+static int trivial_compare_func(void *a, void *b) {
+    return !(a == b);
+}
+
+struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (void*a, void*b)) {
+    struct idxset *s;
+
+    s = malloc(sizeof(struct idxset));
+    assert(s);
+    s->hash_func = hash_func ? hash_func : trivial_hash_func;
+    s->compare_func = compare_func ? compare_func : trivial_compare_func;
+    s->hash_table_size = 1023;
+    s->hash_table = malloc(sizeof(struct idxset_entry*)*s->hash_table_size);
+    assert(s->hash_table);
+    s->array = NULL;
+    s->array_size = 0;
+    s->index = 0;
+    s->start_index = 0;
+    s->n_entries = 0;
+
+    s->iterate_list_head = s->iterate_list_tail = NULL;
+
+    return s;
+}
+
+void idxset_free(struct idxset *s, void (*free_func) (void *p, void *userdata), void *userdata) {
+    assert(s);
+
+    if (free_func) {
+        while (s->iterate_list_head) {
+            struct idxset_entry *e = s->iterate_list_head;
+            s->iterate_list_head = s->iterate_list_head->iterate_next;
+
+            if (free_func)
+                free_func(e->data, userdata);
+            free(e);
+        }
+    }
+
+    free(s->hash_table);
+    free(s->array);
+    free(s);
+}
+
+static struct idxset_entry* hash_scan(struct idxset *s, struct idxset_entry* e, void *p) {
+    assert(p);
+
+    assert(s->compare_func);
+    for (; e; e = e->hash_next)
+        if (s->compare_func(e->data, p))
+            return e;
+
+    return NULL;
+}
+
+static void extend_array(struct idxset *s, uint32_t index) {
+    uint32_t i, j, l;
+    struct idxset_entry** n;
+    assert(index >= s->start_index );
+
+    if (index <= s->start_index + s->array_size)
+        return;
+
+    for (i = 0; i < s->array_size; i++)
+        if (s->array[i])
+            break;
+
+    l = index - s->start_index - i + 100;
+    n = malloc(sizeof(struct hash_table_entry*)*l);
+    assert(n);
+    memset(n, 0, sizeof(struct hash_table_entry*)*l);
+    
+    for (j = 0; j < s->array_size-i; j++)
+        n[j] = s->array[i+j];
+
+    free(s->array);
+    
+    s->array = n;
+    s->array_size = l;
+    s->start_index += i;
+}
+
+static struct idxset_entry** array_index(struct idxset*s, uint32_t index) {
+
+    if (index >= s->start_index + s->array_size)
+        return NULL;
+    
+    if (index < s->start_index)
+        return NULL;
+
+    return s->array + (index - s->start_index);
+}
+
+int idxset_put(struct idxset*s, void *p, uint32_t *index) {
+    unsigned h;
+    struct idxset_entry *e, **a;
+    assert(s && p);
+
+    assert(s->hash_func);
+    h = s->hash_func(p) % s->hash_table_size;
+
+    assert(s->hash_table);
+    if ((e = hash_scan(s, s->hash_table[h], p))) {
+        if (index)
+            *index = e->index;
+        
+        return -1;
+    }
+
+    e = malloc(sizeof(struct idxset_entry));
+    assert(e);
+
+    e->data = p;
+    e->index = s->index++;
+    e->hash_value = h;
+
+    /* Insert into hash table */
+    e->hash_next = s->hash_table[h];
+    e->hash_prev = NULL;
+    if (s->hash_table[h])
+        s->hash_table[h]->hash_prev = e;
+    s->hash_table[h] = e;
+
+    /* Insert into array */
+    extend_array(s, s->index);
+    a = array_index(s, s->index);
+    assert(a && !*a);
+    *a = e;
+
+    /* Insert into linked list */
+    e->iterate_next = NULL;
+    e->iterate_prev = s->iterate_list_tail;
+    if (s->iterate_list_tail) {
+        assert(s->iterate_list_head);
+        s->iterate_list_tail->iterate_next = e;
+    } else {
+        assert(!s->iterate_list_head);
+        s->iterate_list_head = e;
+    }
+    s->iterate_list_tail = e;
+    
+    s->n_entries++;
+    assert(s->n_entries >= 1);
+    
+    if (index)
+        *index = e->index;
+
+    return 0;
+}
+
+void* idxset_get_by_index(struct idxset*s, uint32_t index) {
+    struct idxset_entry **a;
+    assert(s);
+    
+    if (!(a = array_index(s, index)))
+        return NULL;
+
+    return (*a)->data;
+}
+
+void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index) {
+    unsigned h;
+    struct idxset_entry *e;
+    assert(s && p);
+    
+    assert(s->hash_func);
+    h = s->hash_func(p) % s->hash_table_size;
+
+    assert(s->hash_table);
+    if (!(e = hash_scan(s, s->hash_table[h], p)))
+        return NULL;
+
+    if (index)
+        *index = e->index;
+
+    return e->data;
+}
+
+static void remove_entry(struct idxset *s, struct idxset_entry *e) {
+    struct idxset_entry **a;
+    assert(s && e);
+
+    /* Remove from array */
+    a = array_index(s, s->index);
+    assert(a && *a == e);
+    *a = NULL;
+    
+    /* Remove from linked list */
+    if (e->iterate_next)
+        e->iterate_next->iterate_prev = e->iterate_prev;
+    else
+        s->iterate_list_tail = e->iterate_prev;
+    
+    if (e->iterate_prev)
+        e->iterate_prev->iterate_next = e->iterate_next;
+    else
+        s->iterate_list_head = e->iterate_next;
+
+    /* Remove from hash table */
+    if (e->hash_next)
+        e->hash_next->hash_prev = e->hash_prev;
+
+    if (e->hash_prev)
+        e->hash_prev->hash_next = e->hash_next;
+    else
+        s->hash_table[e->hash_value] = e->hash_next;
+
+    if (s->rrobin == e)
+        s->rrobin = NULL;
+    
+    free(e);
+
+    assert(s->n_entries >= 1);
+    s->n_entries--;
+}
+
+void* idxset_remove_by_index(struct idxset*s, uint32_t index) {
+    struct idxset_entry **a;
+    void *data;
+    
+    assert(s);
+
+    if (!(a = array_index(s, index)))
+        return NULL;
+
+    data = (*a)->data;
+    remove_entry(s, *a);
+    
+    return data; 
+}
+
+void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) {
+    struct idxset_entry *e;
+    unsigned h;
+
+    assert(s->hash_func);
+    h = s->hash_func(data) % s->hash_table_size;
+
+    assert(s->hash_table);
+    if (!(e = hash_scan(s, s->hash_table[h], data)))
+        return NULL;
+
+    data = e->data;
+    if (index)
+        *index = e->index;
+
+    remove_entry(s, e);
+
+    return data;
+}
+
+void* idxset_rrobin(struct idxset *s, uint32_t *index) {
+    assert(s && index);
+
+    if (s->rrobin)
+        s->rrobin = s->rrobin->iterate_next;
+    
+    if (!s->rrobin)
+        s->rrobin = s->iterate_list_head;
+
+    if (!s->rrobin)
+        return NULL;
+
+    if (index)
+        *index = s->rrobin->index;
+
+    return s->rrobin->data;
+}
+
+int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) {
+    struct idxset_entry *e;
+    assert(s && func);
+
+    e = s->iterate_list_head;
+    while (e) {
+        int del = 0, r;
+        struct idxset_entry *n = e->iterate_next;
+
+        r = func(e->data, e->index, &del, userdata);
+
+        if (del)
+            remove_entry(s, e);
+
+        if (r < 0)
+            return r;
+
+        e = n;
+    }
+    
+    return 0;
+}
+
+unsigned idxset_ncontents(struct idxset*s) {
+    assert(s);
+    return s->n_entries;
+}
+
+int idxset_isempty(struct idxset *s) {
+    assert(s);
+    return s->n_entries == 0;
+}
diff --git a/src/idxset.h b/src/idxset.h
new file mode 100644 (file)
index 0000000..f649e23
--- /dev/null
@@ -0,0 +1,28 @@
+#ifndef fooidxsethfoo
+#define fooidxsethfoo
+
+#include <inttypes.h>
+
+#define IDXSET_INVALID ((uint32_t) -1)
+
+struct idxset;
+
+struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (void*a, void*b));
+void idxset_free(struct idxset *s, void (*free_func) (void *p, void *userdata), void *userdata);
+
+int idxset_put(struct idxset*s, void *p, uint32_t *index);
+
+void* idxset_get_by_index(struct idxset*s, uint32_t index);
+void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index);
+
+void* idxset_remove_by_index(struct idxset*s, uint32_t index);
+void* idxset_remove_by_data(struct idxset*s, void *p, uint32_t *index);
+
+void* idxset_rrobin(struct idxset *s, uint32_t *index);
+
+int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata);
+
+unsigned idxset_ncontents(struct idxset*s);
+int idxset_isempty(struct idxset *s);
+
+#endif
diff --git a/src/inputstream.c b/src/inputstream.c
new file mode 100644 (file)
index 0000000..c7b4b4c
--- /dev/null
@@ -0,0 +1,50 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "inputstream.h"
+
+struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name) {
+    struct input_stream *i;
+    int r;
+    assert(s && spec);
+
+    i = malloc(sizeof(struct input_stream));
+    assert(i);
+    i->name = name ? strdup(name) : NULL;
+    i->sink = s;
+    i->spec = *spec;
+
+    i->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
+    assert(i->memblockq);
+    
+    assert(s->core);
+    r = idxset_put(s->core->input_streams, i, &i->index);
+    assert(r == 0 && i->index != IDXSET_INVALID);
+    r = idxset_put(s->input_streams, i, NULL);
+    assert(r == 0);
+    
+    return i;    
+}
+
+void input_stream_free(struct input_stream* i) {
+    assert(i);
+
+    memblockq_free(i->memblockq);
+
+    assert(i->sink && i->sink->core);
+    idxset_remove_by_data(i->sink->core->input_streams, i, NULL);
+    idxset_remove_by_data(i->sink->input_streams, i, NULL);
+    
+    free(i->name);
+    free(i);
+}
+
+void input_stream_notify(struct input_stream *i) {
+    assert(i);
+
+    if (memblockq_is_empty(i->memblockq))
+        return;
+    
+    sink_notify(i->sink);
+}
diff --git a/src/inputstream.h b/src/inputstream.h
new file mode 100644 (file)
index 0000000..0353799
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef fooinputstreamhfoo
+#define fooinputstreamhfoo
+
+#include <inttypes.h>
+
+#include "sink.h"
+#include "sample.h"
+#include "memblockq.h"
+
+struct input_stream {
+    char *name;
+    uint32_t index;
+
+    struct sink *sink;
+    struct sample_spec spec;
+    
+    struct memblockq *memblockq;
+};
+
+struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name);
+void input_stream_free(struct input_stream* i);
+
+void input_stream_notify(struct input_stream *i);
+
+#endif
diff --git a/src/iochannel.c b/src/iochannel.c
new file mode 100644 (file)
index 0000000..db9717a
--- /dev/null
@@ -0,0 +1,158 @@
+#include <assert.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "iochannel.h"
+
+struct iochannel {
+    int ifd, ofd;
+    struct mainloop* mainloop;
+
+    void (*callback)(struct iochannel*io, void *userdata);
+    void*userdata;
+    
+    int readable;
+    int writable;
+
+    struct mainloop_source* input_source, *output_source;
+};
+
+static void enable_mainloop_sources(struct iochannel *io) {
+    assert(io);
+
+    if (io->input_source == io->output_source) {
+        enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL;
+        assert(io->input_source);
+        
+        if (!io->readable)
+            e |= MAINLOOP_IO_EVENT_IN;
+        if (!io->writable)
+            e |= MAINLOOP_IO_EVENT_OUT;
+
+        mainloop_source_io_set_events(io->input_source, e);
+    } else {
+        if (io->input_source)
+            mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN);
+        if (io->output_source)
+            mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT);
+    }
+}
+
+static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) {
+    struct iochannel *io = userdata;
+    int changed;
+    assert(s && fd >= 0 && userdata);
+
+    if (events & MAINLOOP_IO_EVENT_IN && !io->readable) {
+        io->readable = 1;
+        changed = 1;
+    }
+    
+    if (events & MAINLOOP_IO_EVENT_OUT && !io->writable) {
+        io->writable = 1;
+        changed = 1;
+    }
+
+    if (changed) {
+        enable_mainloop_sources(io);
+        
+        if (io->callback)
+            io->callback(io, io->userdata);
+    }
+}
+
+static void make_nonblock_fd(int fd) {
+    int v;
+
+    if ((v = fcntl(fd, F_GETFL)) >= 0)
+        if (!(v & O_NONBLOCK))
+            fcntl(fd, F_SETFL, v|O_NONBLOCK);
+}
+
+struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
+    struct iochannel *io;
+    assert(m && (ifd >= 0 || ofd >= 0));
+
+    io = malloc(sizeof(struct iochannel));
+    io->ifd = ifd;
+    io->ofd = ofd;
+    io->mainloop = m;
+
+    io->userdata = NULL;
+    io->callback = NULL;
+    io->readable = 0;
+    io->writable = 0;
+
+    if (ifd == ofd) {
+        assert(ifd >= 0);
+        make_nonblock_fd(io->ifd);
+        io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io);
+    } else {
+
+        if (ifd >= 0) {
+            make_nonblock_fd(io->ifd);
+            io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io);
+        } else
+            io->input_source = NULL;
+
+        if (ofd >= 0) {
+            make_nonblock_fd(io->ofd);
+            io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io);
+        } else
+            io->output_source = NULL;
+    }
+
+    return io;
+}
+
+void iochannel_free(struct iochannel*io) {
+    assert(io);
+
+    if (io->ifd >= 0)
+        close(io->ifd);
+    if (io->ofd >= 0 && io->ofd != io->ifd)
+        close(io->ofd);
+
+    if (io->input_source)
+        mainloop_source_free(io->input_source);
+    if (io->output_source)
+        mainloop_source_free(io->output_source);
+    
+    free(io);
+}
+
+int iochannel_is_readable(struct iochannel*io) {
+    assert(io);
+    return io->readable;
+}
+
+int iochannel_is_writable(struct iochannel*io) {
+    assert(io);
+    return io->writable;
+}
+
+ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l) {
+    ssize_t r;
+    assert(io && data && l && io->ofd >= 0);
+
+    if ((r = write(io->ofd, data, l)) >= 0) {
+        io->writable = 0;
+        enable_mainloop_sources(io);
+    }
+
+    return r;
+}
+
+ssize_t iochannel_read(struct iochannel*io, void*data, size_t l) {
+    ssize_t r;
+    
+    assert(io && data && l && io->ifd >= 0);
+
+    if ((r = read(io->ifd, data, l)) >= 0) {
+        io->readable = 0;
+        enable_mainloop_sources(io);
+    }
+
+    return r;
+}
diff --git a/src/iochannel.h b/src/iochannel.h
new file mode 100644 (file)
index 0000000..f97fabb
--- /dev/null
@@ -0,0 +1,20 @@
+#ifndef fooiochannelhfoo
+#define fooiochannelhfoo
+
+#include <sys/types.h>
+#include "mainloop.h"
+
+struct iochannel;
+
+struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd);
+void iochannel_free(struct iochannel*io);
+
+ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l);
+ssize_t iochannel_read(struct iochannel*io, void*data, size_t l);
+
+int iochannel_is_readable(struct iochannel*io);
+int iochannel_is_writable(struct iochannel*io);
+
+void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata);
+
+#endif
diff --git a/src/main.c b/src/main.c
new file mode 100644 (file)
index 0000000..3104c26
--- /dev/null
@@ -0,0 +1,26 @@
+#include <stddef.h>
+#include <assert.h>
+
+#include "core.h"
+#include "mainloop.h"
+#include "module.h"
+
+int main(int argc, char *argv[]) {
+    struct mainloop *m;
+    struct core *c;
+
+    m = mainloop_new();
+    assert(m);
+    c = core_new(m);
+    assert(c);
+
+    module_load(c, "sink-pipe", NULL);
+    module_load(c, "protocol-simple-tcp", NULL);
+    
+    mainloop_run(m);
+    
+    core_free(c);
+    mainloop_free(m);
+    
+    return 0;
+}
diff --git a/src/mainloop.c b/src/mainloop.c
new file mode 100644 (file)
index 0000000..d043ce9
--- /dev/null
@@ -0,0 +1,331 @@
+#include <sys/poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "mainloop.h"
+
+struct mainloop_source {
+    struct mainloop_source *next;
+    struct mainloop *mainloop;
+    enum mainloop_source_type type;
+
+    int enabled;
+    int dead;
+    void *userdata;
+
+    struct {
+        int fd;
+        enum mainloop_io_event events;
+        void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata);
+        struct pollfd pollfd;
+    } io;
+    
+    struct  {
+        void (*callback)(struct mainloop_source*s, void *userdata);
+    } prepare;
+    
+    struct  {
+        void (*callback)(struct mainloop_source*s, void *userdata);
+    } idle;
+};
+
+struct mainloop_source_list {
+    struct mainloop_source *sources;
+    int n_sources;
+    int dead_sources;
+};
+
+struct mainloop {
+    struct mainloop_source_list io_sources, prepare_sources, idle_sources;
+    
+    struct pollfd *pollfds;
+    int max_pollfds, n_pollfds;
+    int rebuild_pollfds;
+
+    int quit;
+    int running;
+};
+
+struct mainloop *mainloop_new(void) {
+    struct mainloop *m;
+
+    m = malloc(sizeof(struct mainloop));
+    assert(m);
+    memset(m, 0, sizeof(struct mainloop));
+    
+    return m;
+}
+
+static void free_sources(struct mainloop_source_list *l, int all) {
+    struct mainloop_source *s, *p;
+    assert(l);
+
+    if (!l->dead_sources)
+        return;
+
+    p = NULL;
+    s = l->sources;
+    while (s) {
+        if (all || s->dead) {
+            struct mainloop_source *t = s;
+            s = s->next;
+
+            if (p)
+                p->next = s;
+            else
+                l->sources = s;
+            
+            free(t);
+        } else {
+            p = s;
+            s = s->next;
+        }
+    }
+
+    l->dead_sources = 0;
+
+    if (all) {
+        assert(l->sources);
+        l->n_sources = 0;
+    }
+}
+
+void mainloop_free(struct mainloop* m) {
+    assert(m);
+    free_sources(&m->io_sources, 1);
+    free_sources(&m->prepare_sources, 1);
+    free_sources(&m->idle_sources, 1);
+    free(m->pollfds);
+}
+
+static void rebuild_pollfds(struct mainloop *m) {
+    struct mainloop_source*s;
+    struct pollfd *p;
+    
+    if (m->max_pollfds < m->io_sources.n_sources) {
+        m->max_pollfds = m->io_sources.n_sources*2;
+        m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds);
+    }
+
+    m->n_pollfds = 0;
+    p = m->pollfds;
+    for (s = m->io_sources.sources; s; s = s->next) {
+        assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
+        if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) {
+            *(p++) = s->io.pollfd;
+            m->n_pollfds++;
+        }
+    }
+}
+
+static void dispatch_pollfds(struct mainloop *m) {
+    int i;
+    struct pollfd *p;
+    struct mainloop_source *s;
+    /* This loop assumes that m->sources and m->pollfds have the same
+     * order and that m->pollfds is a subset of m->sources! */
+
+    s = m->io_sources.sources;
+    for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) {
+        for (;;) {
+            assert(s && s->type == MAINLOOP_SOURCE_TYPE_IO);
+            
+            if (p->fd == s->io.fd) {
+                if (!s->dead && s->enabled) {
+                    enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0);
+                    if (e) {
+                        assert(s->io.callback);
+                        s->io.callback(s, s->io.fd, e, s->userdata);
+                    }
+                }
+
+                break;
+            }
+            s = s->next;
+        }
+    }
+}
+
+int mainloop_iterate(struct mainloop *m, int block) {
+    struct mainloop_source *s;
+    int c;
+    assert(m && !m->running);
+    
+    if(m->quit)
+        return m->quit;
+
+    free_sources(&m->io_sources, 0);
+    free_sources(&m->prepare_sources, 0);
+    free_sources(&m->idle_sources, 0);
+
+    for (s = m->prepare_sources.sources; s; s = s->next) {
+        assert(!s->dead && s->type == MAINLOOP_SOURCE_TYPE_PREPARE);
+        if (s->enabled) {
+            assert(s->prepare.callback);
+            s->prepare.callback(s, s->userdata);
+        }   
+    }
+
+    if (m->rebuild_pollfds)
+        rebuild_pollfds(m);
+
+    m->running = 1;
+
+    if ((c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0)) > 0)
+        dispatch_pollfds(m);
+    else if (c == 0) {
+        for (s = m->idle_sources.sources; s; s = s->next) {
+            assert(!s->dead && s->type == MAINLOOP_SOURCE_TYPE_IDLE);
+            if (s->enabled) {
+                assert(s->idle.callback);
+                s->idle.callback(s, s->userdata);
+            }
+        }
+    }
+    
+    m->running = 0;
+    return c < 0 ? -1 : 0;
+}
+
+int mainloop_run(struct mainloop *m) {
+    int r;
+    while (!(r = mainloop_iterate(m, 1)));
+    return r;
+}
+
+void mainloop_quit(struct mainloop *m, int r) {
+    assert(m);
+    m->quit = r;
+}
+
+static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) {
+    struct mainloop_source_list *l;
+    
+    switch(type) {
+        case MAINLOOP_SOURCE_TYPE_IO:
+            l = &m->io_sources;
+            break;
+        case MAINLOOP_SOURCE_TYPE_PREPARE:
+            l = &m->prepare_sources;
+            break;
+        case MAINLOOP_SOURCE_TYPE_IDLE:
+            l = &m->idle_sources;
+            break;
+        default:
+            l = NULL;
+            break;
+    }
+    
+    return l;
+}
+
+static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) {
+    struct mainloop_source_list *l;
+    struct mainloop_source* s;
+    assert(m);
+
+    s = malloc(sizeof(struct mainloop_source));
+    assert(s);
+    memset(s, 0, sizeof(struct mainloop_source));
+
+    s->type = type;
+    s->mainloop = m;
+
+    l = get_source_list(m, type);
+    assert(l);
+            
+    s->next = l->sources;
+    l->sources = s;
+    l->n_sources++;
+    return s;
+}
+
+struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) {
+    struct mainloop_source* s;
+    assert(m && fd>=0 && callback);
+
+    s = source_new(m, MAINLOOP_SOURCE_TYPE_IO);
+
+    s->io.fd = fd;
+    s->io.events = event;
+    s->io.callback = callback;
+    s->userdata = userdata;
+    s->io.pollfd.fd = fd;
+    s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0);
+    s->io.pollfd.revents = 0;
+
+    s->enabled = 1;
+
+    m->rebuild_pollfds = 1;
+    return s;
+}
+
+struct mainloop_source* mainloop_source_new_prepare(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
+    struct mainloop_source* s;
+    assert(m && callback);
+
+    s = source_new(m, MAINLOOP_SOURCE_TYPE_PREPARE);
+
+    s->prepare.callback = callback;
+    s->userdata = userdata;
+    s->enabled = 1;
+    return s;
+}
+
+struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
+    struct mainloop_source* s;
+    assert(m && callback);
+
+    s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE);
+
+    s->prepare.callback = callback;
+    s->userdata = userdata;
+    s->enabled = 1;
+    return s;
+}
+
+void mainloop_source_free(struct mainloop_source*s) {
+    struct mainloop_source_list *l;
+    assert(s && !s->dead);
+    s->dead = 1;
+
+    assert(s->mainloop);
+    l = get_source_list(s->mainloop, s->type);
+    assert(l);
+
+    l->n_sources--;
+    l->dead_sources = 1;
+
+    if (s->type == MAINLOOP_SOURCE_TYPE_IO)
+        s->mainloop->rebuild_pollfds = 1;
+}
+
+void mainloop_source_enable(struct mainloop_source*s, int b) {
+    assert(s && !s->dead);
+
+    if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) {
+        assert(s->mainloop);
+        s->mainloop->rebuild_pollfds = 1;
+    }
+
+    s->enabled = b;
+}
+
+void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) {
+    assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO);
+
+    if ((s->io.events && !events) || (!s->io.events && events)) {
+        assert(s->mainloop);
+        s->mainloop->rebuild_pollfds = 1;
+    }
+
+    s->io.events = events;
+    s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0);
+}
+
+struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) {
+    assert(s);
+
+    return s->mainloop;
+}
diff --git a/src/mainloop.h b/src/mainloop.h
new file mode 100644 (file)
index 0000000..72376c7
--- /dev/null
@@ -0,0 +1,38 @@
+#ifndef foomainloophfoo
+#define foomainloophfoo
+
+struct mainloop;
+struct mainloop_source;
+
+enum mainloop_io_event {
+    MAINLOOP_IO_EVENT_NULL = 0,
+    MAINLOOP_IO_EVENT_IN = 1,
+    MAINLOOP_IO_EVENT_OUT = 2,
+    MAINLOOP_IO_EVENT_BOTH = 3
+};
+
+enum mainloop_source_type {
+    MAINLOOP_SOURCE_TYPE_IO,
+    MAINLOOP_SOURCE_TYPE_PREPARE,
+    MAINLOOP_SOURCE_TYPE_IDLE
+};
+
+struct mainloop *mainloop_new(void);
+void mainloop_free(struct mainloop* m);
+
+int mainloop_iterate(struct mainloop *m, int block);
+int mainloop_run(struct mainloop *m);
+void mainloop_quit(struct mainloop *m, int r);
+
+struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata);
+struct mainloop_source* mainloop_source_new_prepare(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
+struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
+
+void mainloop_source_free(struct mainloop_source*s);
+void mainloop_source_enable(struct mainloop_source*s, int b);
+
+void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event);
+
+struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s);
+
+#endif
diff --git a/src/memblock.c b/src/memblock.c
new file mode 100644 (file)
index 0000000..3bef494
--- /dev/null
@@ -0,0 +1,67 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include "memblock.h"
+
+struct memblock *memblock_new(size_t length) {
+    struct memblock *b = malloc(sizeof(struct memblock)+length);
+    b->type = MEMBLOCK_APPENDED;
+    b->ref = 1;
+    b->length = length;
+    b->data = b+1;
+    return b;
+}
+
+struct memblock *memblock_new_fixed(void *d, size_t length) {
+    struct memblock *b = malloc(sizeof(struct memblock));
+    b->type = MEMBLOCK_FIXED;
+    b->ref = 1;
+    b->length = length;
+    b->data = d;
+    return b;
+}
+
+struct memblock *memblock_new_dynamic(void *d, size_t length) {
+    struct memblock *b = malloc(sizeof(struct memblock));
+    b->type = MEMBLOCK_DYNAMIC;
+    b->ref = 1;
+    b->length = length;
+    b->data = d;
+    return b;
+}
+
+struct memblock* memblock_ref(struct memblock*b) {
+    assert(b && b->ref >= 1);
+    b->ref++;
+    return b;
+}
+
+void memblock_unref(struct memblock*b) {
+    assert(b && b->ref >= 1);
+    b->ref--;
+
+    if (b->ref == 0) {
+        if (b->type == MEMBLOCK_DYNAMIC)
+            free(b->data);
+        free(b);
+    }
+}
+
+void memblock_unref_fixed(struct memblock *b) {
+    void *d;
+    
+    assert(b && b->ref >= 1);
+
+    if (b->ref == 1) {
+        memblock_unref(b);
+        return;
+    }
+
+    d = malloc(b->length);
+    assert(d);
+    memcpy(d, b->data, b->length);
+    b->data = d;
+    b->type = MEMBLOCK_DYNAMIC;
+}
+
diff --git a/src/memblock.h b/src/memblock.h
new file mode 100644 (file)
index 0000000..48e8728
--- /dev/null
@@ -0,0 +1,31 @@
+#ifndef foomemblockhfoo
+#define foomemblockhfoo
+
+#include <sys/types.h>
+
+enum memblock_type { MEMBLOCK_FIXED, MEMBLOCK_APPENDED, MEMBLOCK_DYNAMIC };
+
+struct memblock {
+    enum memblock_type type;
+    unsigned ref;
+    size_t length;
+    void *data;
+};
+
+struct memchunk {
+    struct memblock *memblock;
+    size_t index, length;
+};
+
+struct memblock *memblock_new(size_t length);
+struct memblock *memblock_new_fixed(void *data, size_t length);
+struct memblock *memblock_new_dynamic(void *data, size_t length);
+
+void memblock_unref(struct memblock*b);
+struct memblock* memblock_ref(struct memblock*b);
+
+void memblock_unref_fixed(struct memblock*b);
+
+#define memblock_assert_exclusive(b) assert((b)->ref == 1)
+
+#endif
diff --git a/src/memblockq.c b/src/memblockq.c
new file mode 100644 (file)
index 0000000..1424c55
--- /dev/null
@@ -0,0 +1,156 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "memblockq.h"
+
+struct memblock_list {
+    struct memblock_list *next;
+    struct memchunk chunk;
+};
+
+struct memblockq {
+    struct memblock_list *blocks, *blocks_tail;
+    unsigned n_blocks;
+    size_t total_length;
+    size_t maxlength;
+    size_t base;
+};
+
+struct memblockq* memblockq_new(size_t maxlength, size_t base) {
+    struct memblockq* bq;
+    assert(maxlength && base);
+    
+    bq = malloc(sizeof(struct memblockq));
+    assert(bq);
+    bq->blocks = bq->blocks_tail = 0;
+    bq->n_blocks = 0;
+    bq->total_length = 0;
+    bq->base = base;
+    bq->maxlength = ((maxlength+base-1)/base)*base;
+    assert(bq->maxlength >= base);
+    return bq;
+}
+
+void memblockq_free(struct memblockq* bq) {
+    struct memblock_list *l;
+    assert(bq);
+
+    while ((l = bq->blocks)) {
+        bq->blocks = l->next;
+        memblock_unref(l->chunk.memblock);
+        free(l);
+    }
+    
+    free(bq);
+}
+
+void memblockq_push(struct memblockq* bq, struct memchunk *chunk, size_t delta) {
+    struct memblock_list *q;
+    assert(bq && chunk && chunk->memblock && chunk->index);
+
+    q = malloc(sizeof(struct memblock_list));
+    assert(q);
+
+    q->chunk = *chunk;
+    memblock_ref(q->chunk.memblock);
+    assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length);
+    q->next = NULL;
+    
+    if (bq->blocks_tail)
+        bq->blocks_tail->next = q;
+    else
+        bq->blocks = q;
+    
+    bq->blocks_tail = q;
+
+    bq->n_blocks++;
+    bq->total_length += chunk->length;
+
+    memblockq_shorten(bq, bq->maxlength);
+}
+
+int memblockq_peek(struct memblockq* bq, struct memchunk *chunk) {
+    assert(bq && chunk);
+
+    if (!bq->blocks)
+        return -1;
+
+    *chunk = bq->blocks->chunk;
+    memblock_ref(chunk->memblock);
+    return 0;
+}
+
+int memblockq_pop(struct memblockq* bq, struct memchunk *chunk) {
+    struct memblock_list *q;
+    
+    assert(bq && chunk);
+
+    if (!bq->blocks)
+        return -1;
+
+    q = bq->blocks;
+    bq->blocks = bq->blocks->next;
+
+    *chunk = q->chunk;
+
+    bq->n_blocks--;
+    bq->total_length -= chunk->length;
+
+    free(q);
+    return 0;
+}
+
+void memblockq_drop(struct memblockq *bq, size_t length) {
+    assert(bq);
+
+    while (length > 0) {
+        size_t l = length;
+        assert(bq->blocks && bq->total_length >= length);
+        
+        if (l > bq->blocks->chunk.length)
+            l = bq->blocks->chunk.length;
+    
+        bq->blocks->chunk.index += l;
+        bq->blocks->chunk.length -= l;
+        bq->total_length -= l;
+        
+        if (bq->blocks->chunk.length == 0) {
+            struct memblock_list *q;
+            
+            q = bq->blocks;
+            bq->blocks = bq->blocks->next;
+            memblock_unref(q->chunk.memblock);
+            free(q);
+            
+            bq->n_blocks--;
+        }
+
+        length -= l;
+    }
+}
+
+void memblockq_shorten(struct memblockq *bq, size_t length) {
+    size_t l;
+    assert(bq);
+
+    if (bq->total_length <= length)
+        return;
+
+    l = bq->total_length - length;
+    l /= bq->base;
+    l *= bq->base;
+
+    memblockq_drop(bq, l);
+}
+
+
+void memblockq_empty(struct memblockq *bq) {
+    assert(bq);
+    memblockq_shorten(bq, 0);
+}
+
+int memblockq_is_empty(struct memblockq *bq) {
+    assert(bq);
+
+    return bq->total_length >= bq->base;
+}
diff --git a/src/memblockq.h b/src/memblockq.h
new file mode 100644 (file)
index 0000000..75c5e59
--- /dev/null
@@ -0,0 +1,24 @@
+#ifndef foomemblockqhfoo
+#define foomemblockqhfoo
+
+#include <sys/types.h>
+
+#include "memblock.h"
+
+struct memblockq;
+
+struct memblockq* memblockq_new(size_t maxlength, size_t base);
+void memblockq_free(struct memblockq* bq);
+
+void memblockq_push(struct memblockq* bq, struct memchunk *chunk, size_t delta);
+
+int memblockq_pop(struct memblockq* bq, struct memchunk *chunk);
+int memblockq_peek(struct memblockq* bq, struct memchunk *chunk);
+void memblockq_drop(struct memblockq *bq, size_t length);
+
+void memblockq_shorten(struct memblockq *bq, size_t length);
+void memblockq_empty(struct memblockq *bq);
+
+int memblockq_is_empty(struct memblockq *bq);
+
+#endif
diff --git a/src/module.c b/src/module.c
new file mode 100644 (file)
index 0000000..bcd0b6c
--- /dev/null
@@ -0,0 +1,98 @@
+#include <stdlib.h>
+#include <assert.h>
+
+#include "module.h"
+
+struct module* module_load(struct core *c, const char *name, const char *argument) {
+    struct module *m = NULL;
+    
+    assert(c && name);
+
+    m = malloc(sizeof(struct module));
+    assert(m);
+
+    if (!(m->dl = lt_dlopenext(name)))
+        goto fail;
+
+    if (!(m->init = lt_dlsym(m->dl, "module_init")))
+        goto fail;
+
+    if (!(m->done = lt_dlsym(m->dl, "module_done")))
+        goto fail;
+    
+    m->name = strdup(name);
+    m->argument = argument ? strdup(argument) : NULL;
+    m->userdata = NULL;
+
+    assert(m->init);
+    if (m->init(c, m) < 0)
+        goto fail;
+
+    if (!c->modules)
+        c->modules = idxset_new(NULL, NULL);
+    
+    assert(c->modules);
+    r = idxset_put(c->modules, m, &m->index);
+    assert(r >= 0 && m->index != IDXSET_INVALID);
+    return m;
+    
+fail:
+    if (m) {
+        if (m->dl)
+            lt_dlclose(m->dl);
+
+        free(m);
+    }
+
+    return NULL;
+}
+
+static void module_free(struct module *m) {
+    assert(m && m->done);
+    m->done(c, m);
+
+    lt_dlcose(m->dl);
+    free(m->name);
+    free(m->argument);
+    free(m);
+}
+
+void module_unload(struct core *c, struct module *m) {
+    struct module *m;
+    assert(c && index != IDXSET_INVALID);
+
+    assert(c->modules);
+    if (!(m = idxset_remove_by_data(c->modules, m, NULL)))
+        return;
+
+    module_free(m);
+}
+
+void module_unload_by_index(struct core *c, guint32_t index) {
+    struct module *m;
+    assert(c && index != IDXSET_INVALID);
+
+    assert(c->modules);
+    if (!(m = idxset_remove_by_index(c->modules, index)))
+        return;
+
+    module_free(m);
+}
+
+
+void free_callback(void *p, void *userdata) {
+    struct module *m = p;
+    assert(m);
+    module_free(m);
+}
+
+void module_unload_all(struct core *c) {
+    assert(c);
+
+    if (!c->modules)
+        return;
+
+    idxset_free(c->modules, free_callback, NULL);
+    c->modules = NULL;
+}
+
diff --git a/src/module.h b/src/module.h
new file mode 100644 (file)
index 0000000..d0dfa04
--- /dev/null
@@ -0,0 +1,27 @@
+#ifndef foomodulehfoo
+#define foomodulehfoo
+
+#include <inttypes.h>
+#include <ltdl.h>
+
+#include "core.h"
+
+struct module {
+    char *name, *argument;
+    uint32_t index;
+
+    lt_dlhandle *dl;
+    
+    int (*init)(struct core *c, struct module*m);
+    void (*done)(struct core *c, struct module*m);
+
+    void *userdata;
+};
+
+struct module* module_load(struct core *c, const char *name, const char*argument);
+void module_unload(struct core *c, struct module *m);
+void module_unload_by_index(struct core *c, uint32_t index);
+
+void module_unload_all(struct core *c);
+
+#endif
diff --git a/src/oss.c b/src/oss.c
new file mode 100644 (file)
index 0000000..42e6036
--- /dev/null
+++ b/src/oss.c
@@ -0,0 +1,30 @@
+#include "module.h"
+
+struct userdata {
+    struct sink *sink;
+    struct source *source;
+    int fd;
+};
+
+int module_init(struct core *c, struct module*m) {
+    struct userdata *u;
+    assert(c && m);
+
+    u = malloc(sizeof(struct userdata));
+    assert(u);
+    memset(u, 0, sizeof(struct userdata));
+    m->userdata = u;
+
+    return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+    struct userdata *u;
+    assert(c && m);
+
+    u = m->userdata;
+
+    sink_free(u->sink);
+    source_free(u->source);
+    free(u);
+}
diff --git a/src/outputstream.c b/src/outputstream.c
new file mode 100644 (file)
index 0000000..ffec77d
--- /dev/null
@@ -0,0 +1,41 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "outputstream.h"
+
+struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name) {
+    struct output_stream *o;
+    int r;
+    assert(s && spec);
+
+    o = malloc(sizeof(struct output_stream));
+    assert(o);
+    o->name = name ? strdup(name) : NULL;
+    o->source = s;
+    o->spec = *spec;
+
+    o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
+    assert(o->memblockq);
+    
+    assert(s->core);
+    r = idxset_put(s->core->output_streams, o, &o->index);
+    assert(r == 0 && o->index != IDXSET_INVALID);
+    r = idxset_put(s->output_streams, o, NULL);
+    assert(r == 0);
+    
+    return o;    
+}
+
+void output_stream_free(struct output_stream* o) {
+    assert(o);
+
+    memblockq_free(o->memblockq);
+
+    assert(o->source && o->source->core);
+    idxset_remove_by_data(o->source->core->output_streams, o, NULL);
+    idxset_remove_by_data(o->source->output_streams, o, NULL);
+    
+    free(o->name);
+    free(o);
+}
diff --git a/src/outputstream.h b/src/outputstream.h
new file mode 100644 (file)
index 0000000..4105434
--- /dev/null
@@ -0,0 +1,22 @@
+#ifndef foooutputstreamhfoo
+#define foooutputstreamhfoo
+
+#include <inttypes.h>
+#include "source.h"
+#include "sample.h"
+#include "memblockq.h"
+
+struct output_stream {
+    char *name;
+    uint32_t index;
+
+    struct source *source;
+    struct sample_spec spec;
+    
+    struct memblockq *memblockq;
+};
+
+struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name);
+void output_stream_free(struct output_stream* o);
+
+#endif
diff --git a/src/packet.c b/src/packet.c
new file mode 100644 (file)
index 0000000..086e4b2
--- /dev/null
@@ -0,0 +1,29 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "packet.h"
+
+struct packet* packet_new(uint32_t length) {
+    struct packet *p;
+    assert(length);
+    p = malloc(sizeof(struct packet)+length);
+    assert(p);
+
+    p->ref = 1;
+    p->length = length;
+    return p;
+}
+
+struct packet* packet_ref(struct packet *p) {
+    assert(p && p->ref >= 1);
+    p->ref++;
+    return p;
+}
+
+void packet_unref(struct packet *p) {
+    assert(p && p->ref >= 1);
+    p->ref--;
+
+    if (p->ref == 0)
+        free(p);
+}
diff --git a/src/packet.h b/src/packet.h
new file mode 100644 (file)
index 0000000..781c0e6
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef foopackethfoo
+#define foopackethfoo
+
+#include <sys/types.h>
+#include <stdint.h>
+
+struct packet {
+    unsigned ref;
+    size_t length;
+    uint8_t data[];
+};
+
+struct packet* packet_new(uint32_t length);
+
+struct packet* packet_ref(struct packet *p);
+void packet_unref(struct packet *p);
+
+#endif
diff --git a/src/protocol-native-tcp.c b/src/protocol-native-tcp.c
new file mode 100644 (file)
index 0000000..b33f3e1
--- /dev/null
@@ -0,0 +1,19 @@
+#include "module.h"
+
+int module_init(struct core *c, struct module*m) {
+    struct socket_server *s;
+    assert(c && m);
+
+    if (!(s = socket_server_new_ipv4(c->mainloop, INADDR_LOOPBACK, 4711)))
+        return -1;
+
+    m->userdata = protocol_native_new(s);
+    assert(m->userdata);
+    return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+    assert(c && m);
+
+    protocol_native_free(m->userdata);
+}
diff --git a/src/protocol-native-unix.c b/src/protocol-native-unix.c
new file mode 100644 (file)
index 0000000..a18965c
--- /dev/null
@@ -0,0 +1,27 @@
+#include "module.h"
+
+int module_init(struct core *c, struct module*m) {
+    struct fn[PATH_MAX];
+    struct socket_server *s;
+    char *t;
+    assert(c && m);
+
+    if (!(t = getenv("TMP")))
+        if (!(t = getenv("TEMP")))
+            t = "/tmp";
+    
+    snprintf(fn, sizeof(fn), "%s/foosock", t);
+             
+    if (!(s = socket_server_new_unix(c->mainloop, fn)))
+        return -1;
+
+    m->userdata = protocol_native_new(s);
+    assert(m->userdata);
+    return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+    assert(c && m);
+
+    protocol_native_free(m->userdata);
+}
diff --git a/src/protocol-native.c b/src/protocol-native.c
new file mode 100644 (file)
index 0000000..bdb6935
--- /dev/null
@@ -0,0 +1,49 @@
+#include "protocol-native.h"
+
+struct protocol_native {
+    struct socket_server*server;
+    struct idxset *connection;
+};
+
+struct stream_info {
+    guint32_t tag;
+    
+    union {
+        struct output_stream *output_stream;
+        struct input_stream *input_stream;
+    }
+};
+
+struct connection {
+    struct client *client;
+    struct serializer *serializer;
+
+    
+};
+
+static void on_connection(struct socket_server *server, struct iochannel *io, void *userdata) {
+    struct protocol_native *p = userdata;
+    assert(server && io && p && p->server == server);
+
+    
+}
+
+struct protocol_native* protocol_native(struct socket_server *server) {
+    struct protocol_native *p;
+    assert(server);
+
+    p = malloc(sizeof(struct protocol_native));
+    assert(p);
+
+    p->server = server;
+    socket_server_set_callback(p->server, callback, p);
+
+    return p;
+}
+
+void protocol_native_free(struct protocol_native *p) {
+    assert(p);
+
+    socket_server_free(p->server);
+    free(p);
+}
diff --git a/src/protocol-native.h b/src/protocol-native.h
new file mode 100644 (file)
index 0000000..bdad03b
--- /dev/null
@@ -0,0 +1,9 @@
+#ifndef fooprotocolnativehfoo
+#define fooprotocolnativehfoo
+
+struct protocol_native;
+
+struct protocol_native* protocol_native(struct socket_server *server);
+void protocol_native_free(struct protocol_native *n);
+
+#endif
diff --git a/src/protocol-simple-tcp.c b/src/protocol-simple-tcp.c
new file mode 100644 (file)
index 0000000..e71d714
--- /dev/null
@@ -0,0 +1,24 @@
+#include <assert.h>
+#include <arpa/inet.h>
+
+#include "module.h"
+#include "socket-server.h"
+#include "protocol-simple.h"
+
+int module_init(struct core *c, struct module*m) {
+    struct socket_server *s;
+    assert(c && m);
+
+    if (!(s = socket_server_new_ipv4(c->mainloop, INADDR_LOOPBACK, 4712)))
+        return -1;
+
+    m->userdata = protocol_simple_new(c, s, PROTOCOL_SIMPLE_PLAYBACK);
+    assert(m->userdata);
+    return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+    assert(c && m);
+
+    protocol_simple_free(m->userdata);
+}
diff --git a/src/protocol-simple.c b/src/protocol-simple.c
new file mode 100644 (file)
index 0000000..3335bc1
--- /dev/null
@@ -0,0 +1,173 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+
+#include "inputstream.h"
+#include "outputstream.h"
+#include "protocol-simple.h"
+#include "client.h"
+
+struct connection {
+    struct protocol_simple *protocol;
+    struct iochannel *io;
+    struct input_stream *istream;
+    struct output_stream *ostream;
+    struct client *client;
+};
+
+struct protocol_simple {
+    struct core *core;
+    struct socket_server*server;
+    struct idxset *connections;
+    enum protocol_simple_mode mode;
+};
+
+#define BUFSIZE PIPE_BUF
+
+static void free_connection(void *data, void *userdata) {
+    struct connection *c = data;
+    assert(data);
+    
+    if (c->istream)
+        input_stream_free(c->istream);
+    if (c->ostream)
+        output_stream_free(c->ostream);
+
+    client_free(c->client);
+
+    iochannel_free(c->io);
+    free(c);
+}
+
+static void io_callback(struct iochannel*io, void *userdata) {
+    struct connection *c = userdata;
+    assert(io && c);
+
+    if (c->istream && iochannel_is_readable(io)) {
+        struct memchunk chunk;
+        ssize_t r;
+
+        chunk.memblock = memblock_new(BUFSIZE);
+        assert(chunk.memblock);
+
+        if ((r = iochannel_read(io, chunk.memblock->data, BUFSIZE)) <= 0) {
+            fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno));
+            memblock_unref(chunk.memblock);
+            goto fail;
+        }
+        
+        chunk.memblock->length = r;
+        chunk.length = r;
+        chunk.index = 0;
+        
+        memblockq_push(c->istream->memblockq, &chunk, 0);
+        input_stream_notify(c->istream);
+        memblock_unref(chunk.memblock);
+    }
+
+    if (c->ostream && iochannel_is_writable(io)) {
+        struct memchunk chunk;
+        ssize_t r;
+
+        memblockq_peek(c->ostream->memblockq, &chunk);
+        assert(chunk.memblock && chunk.length);
+
+        if ((r = iochannel_write(io, chunk.memblock->data+chunk.index, chunk.length)) < 0) {
+            fprintf(stderr, "write(): %s\n", strerror(errno));
+            memblock_unref(chunk.memblock);
+            goto fail;
+        }
+        
+        memblockq_drop(c->ostream->memblockq, r);
+        memblock_unref(chunk.memblock);
+    }
+
+    return;
+    
+fail:
+    idxset_remove_by_data(c->protocol->connections, c, NULL);
+    free_connection(c, NULL);
+}
+
+static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
+    struct protocol_simple *p = userdata;
+    struct connection *c = NULL;
+    assert(s && io && p);
+
+    c = malloc(sizeof(struct connection));
+    assert(c);
+    c->io = io;
+    c->istream = NULL;
+    c->ostream = NULL;
+    c->protocol = p;
+    
+    if (p->mode & PROTOCOL_SIMPLE_RECORD) {
+        struct source *source;
+
+        if (!(source = core_get_default_source(p->core))) {
+            fprintf(stderr, "Failed to get default source.\n");
+            goto fail;
+        }
+
+        c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name);
+        assert(c->ostream);
+    }
+
+    if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
+        struct sink *sink;
+
+        if (!(sink = core_get_default_sink(p->core))) {
+            fprintf(stderr, "Failed to get default sink.\n");
+            goto fail;
+        }
+
+        c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name);
+        assert(c->istream);
+    }
+
+    c->client = client_new(p->core, "SIMPLE", "Client");
+    assert(c->client);
+
+    iochannel_set_callback(c->io, io_callback, c);
+    idxset_put(p->connections, c, NULL);
+    return;
+    
+fail:
+    if (c) {
+        if (c->istream)
+            input_stream_free(c->istream);
+        if (c->ostream)
+            output_stream_free(c->ostream);
+
+        iochannel_free(c->io);
+        free(c);
+    }
+}
+
+struct protocol_simple* protocol_simple_new(struct core *core, struct socket_server *server, enum protocol_simple_mode mode) {
+    struct protocol_simple* p;
+    assert(core && server && mode <= PROTOCOL_SIMPLE_DUPLEX && mode > 0);
+
+    p = malloc(sizeof(struct protocol_simple));
+    assert(p);
+    p->core = core;
+    p->server = server;
+    p->connections = idxset_new(NULL, NULL);
+    p->mode = mode;
+
+    socket_server_set_callback(p->server, on_connection, p);
+    
+    return p;
+}
+
+
+void protocol_simple_free(struct protocol_simple *p) {
+    assert(p);
+
+    idxset_free(p->connections, free_connection, NULL);
+    socket_server_free(p->server);
+    free(p);
+}
diff --git a/src/protocol-simple.h b/src/protocol-simple.h
new file mode 100644 (file)
index 0000000..f621043
--- /dev/null
@@ -0,0 +1,17 @@
+#ifndef fooprotocolsimplehfoo
+#define fooprotocolsimplehfoo
+
+#include "socket-server.h"
+
+struct protocol_simple;
+
+enum protocol_simple_mode {
+    PROTOCOL_SIMPLE_RECORD = 1,
+    PROTOCOL_SIMPLE_PLAYBACK = 2,
+    PROTOCOL_SIMPLE_DUPLEX = 3
+};
+
+struct protocol_simple* protocol_simple_new(struct core *core, struct socket_server *server, enum protocol_simple_mode mode);
+void protocol_simple_free(struct protocol_simple *n);
+
+#endif
diff --git a/src/pstream.c b/src/pstream.c
new file mode 100644 (file)
index 0000000..083ebc2
--- /dev/null
@@ -0,0 +1,359 @@
+#include <stdlib.h>
+#include <assert.h>
+
+#include "pstream.h"
+#include "queue.h"
+
+enum pstream_descriptor_index {
+    PSTREAM_DESCRIPTOR_LENGTH,
+    PSTREAM_DESCRIPTOR_CHANNEL,
+    PSTREAM_DESCRIPTOR_DELTA,
+    PSTREAM_DESCRIPTOR_MAX
+};
+
+typedef uint32_t pstream_descriptor[PSTREAM_DESCRIPTOR_MAX];
+
+#define PSTREAM_DESCRIPTOR_SIZE (PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
+#define FRAME_SIZE_MAX (1024*64)
+
+struct item_info {
+    enum { PSTREAM_ITEM_PACKET, PSTREAM_ITEM_MEMBLOCK } type;
+
+    /* memblock info */
+    struct memchunk chunk;
+    uint32_t channel;
+    int32_t delta;
+
+    /* packet info */
+    struct packet *packet;
+};
+
+struct pstream {
+    struct mainloop *mainloop;
+    struct mainloop_source *mainloop_source;
+    struct iochannel *io;
+    struct queue *send_queue;
+
+    int dead;
+
+    struct {
+        struct item_info* current;
+        pstream_descriptor descriptor;
+        void *data;
+        size_t index;
+    } write;
+
+    void (*send_callback) (struct pstream *p, void *userdata);
+    void *send_callback_userdata;
+
+    struct {
+        struct memblock *memblock;
+        struct packet *packet;
+        pstream_descriptor descriptor;
+        void *data;
+        size_t index;
+    } read;
+
+    void (*recieve_packet_callback) (struct pstream *p, struct packet *packet, void *userdata);
+    void *recieve_packet_callback_userdata;
+
+    void (*recieve_memblock_callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata);
+    void *recieve_memblock_callback_userdata;
+};
+
+static void do_write(struct pstream *p);
+static void do_read(struct pstream *p);
+
+static void io_callback(struct iochannel*io, void *userdata) {
+    struct pstream *p = userdata;
+    assert(p && p->io == io);
+    do_write(p);
+    do_read(p);
+}
+
+static void prepare_callback(struct mainloop_source *s, void*userdata) {
+    struct pstream *p = userdata;
+    assert(p && p->mainloop_source == s);
+    do_write(p);
+    do_read(p);
+}
+
+struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
+    struct pstream *p;
+    assert(io);
+
+    p = malloc(sizeof(struct pstream));
+    assert(p);
+
+    p->io = io;
+    iochannel_set_callback(io, io_callback, p);
+
+    p->dead = 0;
+
+    p->mainloop = m;
+    p->mainloop_source = mainloop_source_new_prepare(m, prepare_callback, p);
+    mainloop_source_enable(p->mainloop_source, 0);
+    
+    p->send_queue = queue_new();
+    assert(p->send_queue);
+
+    p->write.current = NULL;
+    p->write.index = 0;
+
+    p->read.memblock = NULL;
+    p->read.packet = NULL;
+    p->read.index = 0;
+
+    p->send_callback = NULL;
+    p->send_callback_userdata = NULL;
+
+    p->recieve_packet_callback = NULL;
+    p->recieve_packet_callback_userdata = NULL;
+    
+    p->recieve_memblock_callback = NULL;
+    p->recieve_memblock_callback_userdata = NULL;
+
+    return p;
+}
+
+static void item_free(void *item, void *p) {
+    struct item_info *i = item;
+    assert(i);
+
+    if (i->type == PSTREAM_ITEM_PACKET) {
+        assert(i->chunk.memblock);
+        memblock_unref(i->chunk.memblock);
+    } else {
+        assert(i->type == PSTREAM_ITEM_MEMBLOCK);
+        assert(i->packet);
+        packet_unref(i->packet);
+    }
+
+    free(i);
+}
+
+void pstream_free(struct pstream *p) {
+    assert(p);
+
+    iochannel_free(p->io);
+    queue_free(p->send_queue, item_free, NULL);
+
+    if (p->write.current)
+        item_free(p->write.current, NULL);
+
+    if (p->read.memblock)
+        memblock_unref(p->read.memblock);
+    
+    if (p->read.packet)
+        packet_unref(p->read.packet);
+
+    mainloop_source_free(p->mainloop_source);
+    free(p);
+}
+
+void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata) {
+    assert(p && callback);
+
+    p->send_callback = callback;
+    p->send_callback_userdata = userdata;
+}
+
+void pstream_send_packet(struct pstream*p, struct packet *packet) {
+    struct item_info *i;
+    assert(p && packet);
+
+    i = malloc(sizeof(struct item_info));
+    assert(i);
+    i->type = PSTREAM_ITEM_PACKET;
+    i->packet = packet;
+
+    queue_push(p->send_queue, i);
+    mainloop_source_enable(p->mainloop_source, 1);
+}
+
+void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
+    struct item_info *i;
+    assert(p && channel && chunk);
+    
+    i = malloc(sizeof(struct item_info));
+    assert(i);
+    i->type = PSTREAM_ITEM_MEMBLOCK;
+    i->chunk = *chunk;
+    i->channel = channel;
+    i->delta = delta;
+
+    queue_push(p->send_queue, i);
+    mainloop_source_enable(p->mainloop_source, 1);
+}
+
+void pstream_set_recieve_packet_callback(struct pstream *p, void (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) {
+    assert(p && callback);
+
+    p->recieve_packet_callback = callback;
+    p->recieve_packet_callback_userdata = userdata;
+}
+
+void pstream_set_recieve_memblock_callback(struct pstream *p, void (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata) {
+    assert(p && callback);
+
+    p->recieve_memblock_callback = callback;
+    p->recieve_memblock_callback_userdata = userdata;
+}
+
+static void prepare_next_write_item(struct pstream *p) {
+    assert(p);
+
+    if (!(p->write.current = queue_pop(p->send_queue)))
+        return;
+    
+    p->write.index = 0;
+    
+    if (p->write.current->type == PSTREAM_ITEM_PACKET) {
+        assert(p->write.current->packet);
+        p->write.data = p->write.current->packet->data;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = p->write.current->packet->length;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;
+    } else {
+        assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
+        p->write.data = p->write.current->chunk.memblock->data + p->write.current->chunk.index;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = p->write.current->chunk.length;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = p->write.current->channel;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = p->write.current->delta;
+    }
+}
+
+static void do_write(struct pstream *p) {
+    void *d;
+    size_t l;
+    ssize_t r;
+    assert(p);
+
+    mainloop_source_enable(p->mainloop_source, 0);
+
+    if (p->dead || !iochannel_is_writable(p->io))
+        return;
+    
+    if (!p->write.current)
+        prepare_next_write_item(p);
+
+    if (!p->write.current)
+        return;
+
+    assert(p->write.data);
+
+    if (p->write.index < PSTREAM_DESCRIPTOR_SIZE) {
+        d = (void*) p->write.descriptor + p->write.index;
+        l = PSTREAM_DESCRIPTOR_SIZE - p->write.index;
+    } else {
+        d = (void*) p->write.data + p->write.index - PSTREAM_DESCRIPTOR_SIZE;
+        l = p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] - p->write.index - PSTREAM_DESCRIPTOR_SIZE;
+    }
+
+    if ((r = iochannel_write(p->io, d, l)) < 0) {
+        p->dead = 1;
+        return;
+    }
+
+    p->write.index += r;
+
+    if (p->write.index >= PSTREAM_DESCRIPTOR_SIZE+p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) {
+        assert(p->write.current);
+        item_free(p->write.current, (void *) 1);
+        p->write.current = NULL;
+
+        if (p->send_callback && queue_is_empty(p->send_queue))
+            p->send_callback(p, p->send_callback_userdata);
+    }
+}
+
+static void do_read(struct pstream *p) {
+    void *d;
+    size_t l;
+    ssize_t r;
+    assert(p);
+
+    mainloop_source_enable(p->mainloop_source, 0);
+    
+    if (p->dead || !iochannel_is_readable(p->io))
+        return;
+
+    if (p->read.index < PSTREAM_DESCRIPTOR_SIZE) {
+        d = (void*) p->read.descriptor + p->read.index;
+        l = PSTREAM_DESCRIPTOR_SIZE - p->read.index;
+    } else {
+        assert(p->read.data);
+        d = (void*) p->read.data + p->read.index - PSTREAM_DESCRIPTOR_SIZE;
+        l = p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] - p->read.index - PSTREAM_DESCRIPTOR_SIZE;
+    }
+
+    if ((r = iochannel_read(p->io, d, l)) <= 0) {
+        p->dead = 1;
+        return;
+    }
+
+    p->read.index += r;
+
+    if (p->read.index == PSTREAM_DESCRIPTOR_SIZE) {
+        /* Reading of frame descriptor complete */
+
+        /* Frame size too large */
+        if (p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] > FRAME_SIZE_MAX) {
+            p->dead = 1;
+            return;
+        }
+        
+        assert(!p->read.packet && !p->read.memblock);
+
+        if (p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] == 0) {
+            /* Frame is a packet frame */
+            p->read.packet = packet_new(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]);
+            assert(p->read.packet);
+            p->read.data = p->read.packet->data;
+        } else {
+            /* Frame is a memblock frame */
+            p->read.memblock = memblock_new(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]);
+            assert(p->read.memblock);
+            p->read.data = p->read.memblock->data;
+        }
+            
+    } else if (p->read.index > PSTREAM_DESCRIPTOR_SIZE) {
+        /* Frame payload available */
+        
+        if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblockd data? Than pass it to the user */
+            size_t l;
+
+            l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r;
+                
+            if (l > 0) {
+                struct memchunk chunk;
+                
+                chunk.memblock = p->read.memblock;
+                chunk.index = p->read.index - PSTREAM_DESCRIPTOR_SIZE - l;
+                chunk.length = l;
+                
+                p->recieve_memblock_callback(p, p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL], (int32_t) p->read.descriptor[PSTREAM_DESCRIPTOR_DELTA], &chunk, p->recieve_memblock_callback_userdata);
+            }
+        }
+
+        /* Frame complete */
+        if (p->read.index >= p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] + PSTREAM_DESCRIPTOR_SIZE) {
+            if (p->read.memblock) {
+                assert(!p->read.packet);
+                
+                memblock_unref(p->read.memblock);
+                p->read.memblock = NULL;
+            } else {
+                assert(p->read.packet);
+
+                if (p->recieve_packet_callback)
+                    p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
+
+                packet_unref(p->read.packet);
+                p->read.packet = NULL;
+            }
+
+            p->read.index = 0;
+        }
+    }
+}
diff --git a/src/pstream.h b/src/pstream.h
new file mode 100644 (file)
index 0000000..c0b5749
--- /dev/null
@@ -0,0 +1,22 @@
+#ifndef foopstreamhfoo
+#define foopstreamhfoo
+
+#include <inttypes.h>
+
+#include "packet.h"
+#include "memblock.h"
+#include "iochannel.h"
+
+struct pstream;
+
+struct pstream* pstream_new(struct mainloop *m, struct iochannel *io);
+void pstream_free(struct pstream*p);
+
+void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata);
+void pstream_send_packet(struct pstream*p, struct packet *packet);
+void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk);
+
+void pstream_set_recieve_packet_callback(struct pstream *p, void (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata);
+void pstream_set_recieve_memblock_callback(struct pstream *p, void (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata);
+
+#endif
diff --git a/src/queue.c b/src/queue.c
new file mode 100644 (file)
index 0000000..90823ae
--- /dev/null
@@ -0,0 +1,77 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "queue.h"
+
+struct queue_entry {
+    struct queue_entry *next;
+    void *data;
+};
+
+struct queue {
+    struct queue_entry *front, *back;
+    unsigned length;
+};
+
+struct queue* queue_new(void) {
+    struct queue *q = malloc(sizeof(struct queue));
+    assert(q);
+    q->front = q->back = NULL;
+    q->length = 0;
+    return q;
+}
+
+void queue_free(struct queue* q, void (*destroy)(void *p, void *userdata), void *userdata) {
+    struct queue_entry *e;
+    assert(q);
+
+    e = q->front;
+    while (e) {
+        struct queue_entry *n = e->next;
+
+        if (destroy)
+            destroy(e->data, userdata);
+
+        free(e);
+        e = n;
+    }
+
+    free(q);
+}
+
+void queue_push(struct queue *q, void *p) {
+    struct queue_entry *e;
+
+    e = malloc(sizeof(struct queue_entry));
+
+    e->data = p;
+    e->next = NULL;
+
+    if (q->back)
+        q->back->next = e;
+    else {
+        assert(!q->front);
+        q->front = e;
+    }
+
+    q->back = e;
+    q->length++;
+}
+
+void* queue_pop(struct queue *q) {
+    void *p;
+    struct queue_entry *e;
+    assert(q);
+
+    if (!(e = q->front))
+        return NULL;
+
+    q->front = e->next;
+    if (q->back == e)
+        q->back = NULL;
+
+    p = e->data;
+    free(e);
+
+    return p;
+}
diff --git a/src/queue.h b/src/queue.h
new file mode 100644 (file)
index 0000000..6b371a8
--- /dev/null
@@ -0,0 +1,13 @@
+#ifndef fooqueuehfoo
+#define fooqueuehfoo
+
+struct queue;
+
+struct queue* queue_new(void);
+void queue_free(struct queue* q, void (*destroy)(void *p, void *userdata), void *userdata);
+void queue_push(struct queue *q, void *p);
+void* queue_pop(struct queue *q);
+
+int queue_is_empty(struct queue *q);
+
+#endif
diff --git a/src/sample.c b/src/sample.c
new file mode 100644 (file)
index 0000000..74a5493
--- /dev/null
@@ -0,0 +1,80 @@
+#include <string.h>
+#include <assert.h>
+
+#include "sample.h"
+
+struct sample_spec default_sample_spec = {
+    .format = SAMPLE_S16NE,
+    .rate = 44100,
+    .channels = 2
+};
+
+struct memblock *silence(struct memblock* b, struct sample_spec *spec) {
+    char c;
+    assert(b && spec);
+    memblock_assert_exclusive(b);
+
+    switch (spec->format) {
+        case SAMPLE_U8:
+            c = 127;
+            break;
+        case SAMPLE_S16LE:
+        case SAMPLE_S16BE:
+        case SAMPLE_FLOAT32:
+            c = 0;
+            break;
+        case SAMPLE_ALAW:
+        case SAMPLE_ULAW:
+            c = 80;
+            break;
+    }
+                
+    memset(b->data, c, b->length);
+    return b;
+}
+
+void add_clip(struct memchunk *target, struct memchunk *chunk, struct sample_spec *spec) {
+    int16_t *p, *d;
+    size_t i;
+    assert(target && target->memblock && chunk && chunk->memblock && spec);
+    assert(spec->format == SAMPLE_S16NE);
+    assert((target->length & 1) == 0);
+    
+    d = target->memblock->data + target->index;
+    p = chunk->memblock->data + chunk->index;
+
+    for (i = 0; i < target->length && i < chunk->length; i++) {
+        int32_t r = (int32_t) *d + (int32_t) *p;
+        if (r < -0x8000) r = 0x8000;
+        if (r > 0x7FFF) r = 0x7FFF;
+        *d = (int16_t) r;
+    }
+}
+
+size_t sample_size(struct sample_spec *spec) {
+    assert(spec);
+    size_t b;
+
+    switch (spec->format) {
+        case SAMPLE_U8:
+        case SAMPLE_ULAW:
+        case SAMPLE_ALAW:
+            b = 1;
+            break;
+        case SAMPLE_S16LE:
+        case SAMPLE_S16BE:
+            b = 2;
+            break;
+        case SAMPLE_FLOAT32:
+            b = 4;
+            break;
+    }
+
+    return b * spec->channels;
+}
+
+size_t bytes_per_second(struct sample_spec *spec) {
+    assert(spec);
+    return spec->rate*sample_size(spec);
+}
+
diff --git a/src/sample.h b/src/sample.h
new file mode 100644 (file)
index 0000000..ecbe33f
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef foosamplehfoo
+#define foosamplehfoo
+
+#include <inttypes.h>
+
+#include "memblock.h"
+
+enum sample_format {
+    SAMPLE_U8,
+    SAMPLE_ALAW,
+    SAMPLE_ULAW,
+    SAMPLE_S16LE,
+    SAMPLE_S16BE,
+    SAMPLE_FLOAT32
+};
+
+#define SAMPLE_S16NE SAMPLE_S16LE
+
+struct sample_spec {
+    enum sample_format format;
+    uint32_t rate;
+    uint32_t channels;
+};
+
+#define DEFAULT_SAMPLE_SPEC default_sample_spec
+
+extern struct sample_spec default_sample_spec;
+
+struct memblock *silence(struct memblock* b, struct sample_spec *spec);
+void add_clip(struct memchunk *target, struct memchunk *chunk, struct sample_spec *spec);
+
+size_t bytes_per_second(struct sample_spec *spec);
+size_t sample_size(struct sample_spec *spec);
+
+#endif
diff --git a/src/sink-pipe.c b/src/sink-pipe.c
new file mode 100644 (file)
index 0000000..4a8348f
--- /dev/null
@@ -0,0 +1,155 @@
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <stdio.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <limits.h>
+
+#include "iochannel.h"
+#include "sink.h"
+#include "module.h"
+
+struct userdata {
+    struct sink *sink;
+    struct iochannel *io;
+    struct core *core;
+    struct mainloop_source *mainloop_source;
+
+    struct memchunk memchunk;
+};
+
+static void do_write(struct userdata *u) {
+    ssize_t r;
+    assert(u);
+
+    mainloop_source_enable(u->mainloop_source, 0);
+        
+    if (!iochannel_is_writable(u->io))
+        return;
+
+    if (!u->memchunk.length)
+        if (sink_render(u->sink, PIPE_BUF, &u->memchunk) < 0)
+            return;
+
+    assert(u->memchunk.memblock && u->memchunk.length);
+    
+    if ((r = iochannel_write(u->io, u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) {
+        fprintf(stderr, "write() failed: %s\n", strerror(errno));
+        return;
+    }
+
+    u->memchunk.index += r;
+    u->memchunk.length -= r;
+        
+    if (u->memchunk.length <= 0) {
+        memblock_unref(u->memchunk.memblock);
+        u->memchunk.memblock = NULL;
+    }
+}
+
+static void notify_callback(struct sink*s, void *userdata) {
+    struct userdata *u = userdata;
+    assert(u);
+
+    if (iochannel_is_writable(u->io))
+        mainloop_source_enable(u->mainloop_source, 1);
+}
+
+static void prepare_callback(struct mainloop_source *src, void *userdata) {
+    struct userdata *u = userdata;
+    assert(u);
+    do_write(u);
+}
+
+static void io_callback(struct iochannel *io, void*userdata) {
+    struct userdata *u = userdata;
+    assert(u);
+    do_write(u);
+}
+
+int module_init(struct core *c, struct module*m) {
+    struct userdata *u = NULL;
+    struct stat st;
+    struct sink *sink;
+    char *p;
+    int fd = -1;
+    const static struct sample_spec ss = {
+        .format = SAMPLE_S16NE,
+        .rate = 44100,
+        .channels = 2,
+    };
+    assert(c && m);
+
+    mkfifo((p = m->argument ? m->argument : "/tmp/musicfifo"), 0777);
+
+    if ((fd = open(p, O_RDWR) < 0)) {
+        fprintf(stderr, "open('%s'): %s\n", p, strerror(errno));
+        goto fail;
+    }
+
+    if (fstat(fd, &st) < 0) {
+        fprintf(stderr, "fstat('%s'): %s\n", p, strerror(errno));
+        goto fail;
+    }
+
+    if (!S_ISFIFO(st.st_mode)) {
+        fprintf(stderr, "'%s' is not a FIFO\n", p);
+        goto fail;
+    }
+
+    if (!(sink = sink_new(c, "fifo", &ss))) {
+        fprintf(stderr, "Failed to allocate new sink!\n");
+        goto fail;
+    }
+    
+    u = malloc(sizeof(struct userdata));
+    assert(u);
+
+    u->core = c;
+    u->sink = sink;
+    sink_set_notify_callback(sink, notify_callback, u);
+
+    u->io = iochannel_new(c->mainloop, -1, fd);
+    assert(u->io);
+    iochannel_set_callback(u->io, io_callback, u);
+
+    u->memchunk.memblock = NULL;
+    u->memchunk.length = 0;
+
+    u->mainloop_source = mainloop_source_new_prepare(c->mainloop, prepare_callback, u);
+    assert(u->mainloop_source);
+    mainloop_source_enable(u->mainloop_source, 0);
+    
+    m->userdata = u;
+
+
+    return 0;
+
+fail:
+    if (fd >= 0)
+        close(fd);
+
+    if (u)
+        free(u);
+    
+    return -1;
+}
+
+void module_done(struct core *c, struct module*m) {
+    struct userdata *u;
+    assert(c && m);
+
+    u = m->userdata;
+    assert(u);
+    
+    if (u->memchunk.memblock)
+        memblock_unref(u->memchunk.memblock);
+        
+    sink_free(u->sink);
+    iochannel_free(u->io);
+    mainloop_source_free(u->mainloop_source);
+    free(u);
+}
diff --git a/src/sink.c b/src/sink.c
new file mode 100644 (file)
index 0000000..ac387c7
--- /dev/null
@@ -0,0 +1,217 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "sink.h"
+#include "inputstream.h"
+
+struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) {
+    struct sink *s;
+    char *n = NULL;
+    int r;
+    assert(core && spec);
+
+    s = malloc(sizeof(struct sink));
+    assert(s);
+    
+    s->name = name ? strdup(name) : NULL;
+    r = idxset_put(core->sinks, s, &s->index);
+    assert(s->index != IDXSET_INVALID && r >= 0);
+
+    s->core = core;
+    s->sample_spec = *spec;
+    s->input_streams = idxset_new(NULL, NULL);
+
+    if (name) {
+        n = malloc(strlen(name)+9);
+        sprintf(n, "%s_monitor", name);
+    }
+    
+    s->monitor_source = source_new(core, n, spec);
+    s->volume = 0xFF;
+
+    s->notify_callback = NULL;
+    s->userdata = NULL;
+
+    return s;
+}
+
+void sink_free(struct sink *s) {
+    struct input_stream *i;
+    assert(s);
+
+    idxset_remove_by_data(s->core->sinks, s, NULL);
+    source_free(s->monitor_source);
+
+    while ((i = idxset_rrobin(s->input_streams, NULL)))
+        input_stream_free(i);
+        
+    free(s->name);
+    free(s);
+}
+
+struct pass1_info {
+    size_t maxlength;
+    unsigned count;
+    struct input_stream *last_input_stream;
+};
+
+static int get_max_length(void *p, uint32_t index, int *del, void*userdata) {
+    struct memchunk chunk;
+    struct pass1_info *info = userdata;
+    struct input_stream*i = p;
+    assert(info && i);
+
+    if (memblockq_peek(i->memblockq, &chunk) != 0)
+        return 0;
+
+    assert(chunk.length);
+    
+    if (info->maxlength > chunk.length)
+        info->maxlength = chunk.length;
+
+    info->count++;
+    info->last_input_stream = i;
+
+    return 0;
+}
+
+struct pass2_info {
+    struct memchunk *chunk;
+    struct sample_spec *spec;
+};
+
+static int do_mix(void *p, uint32_t index, int *del, void*userdata) {
+    struct memchunk chunk;
+    struct pass2_info *info = userdata;
+    struct input_stream*i = p;
+    assert(info && info->chunk && info->chunk->memblock && i && info->spec);
+    
+    if (memblockq_peek(i->memblockq, &chunk) != 0)
+        return 0;
+
+    memblock_assert_exclusive(info->chunk->memblock);
+    assert(chunk.length && chunk.length <= info->chunk->memblock->length - info->chunk->index);
+
+    add_clip(info->chunk, &chunk, info->spec);
+    return 0;
+}
+
+int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result) {
+    struct pass1_info pass1_info;
+    struct pass2_info pass2_info;
+    assert(s && target && result);
+    memblock_assert_exclusive(target);
+
+    /* Calculate how many bytes to mix */
+    pass1_info.maxlength = target->length;
+    pass1_info.count = 0;
+    
+    idxset_foreach(s->input_streams, get_max_length, &pass1_info);
+    assert(pass1_info.maxlength);
+
+    /* No data to mix */
+    if (pass1_info.count == 0)
+        return -1;
+    
+    /* A shortcut if only a single input stream is connected */
+    if (pass1_info.count == 1) {
+        struct input_stream *i = pass1_info.last_input_stream;
+        struct memchunk chunk;
+        size_t l;
+
+        assert(i);
+        
+        if (memblockq_peek(i->memblockq, &chunk) != 0)
+            return -1;
+
+        l = target->length < chunk.length ? target->length : chunk.length;
+        memcpy(target->data, result->memblock+result->index, l);
+        target->length = l;
+        memblock_unref(chunk.memblock);
+        memblockq_drop(i->memblockq, l);
+        
+        result->memblock = target;
+        result->length = l;
+        result->index = 0;
+        return 0;
+    }
+
+    /* Do the real mixing */
+    result->memblock = silence(target, &s->sample_spec);
+    result->index = 0;
+    result->length = pass1_info.maxlength;
+    pass2_info.chunk = result;
+    pass2_info.spec = &s->sample_spec;
+    idxset_foreach(s->input_streams, do_mix, &pass2_info);
+
+    assert(s->monitor_source);
+    source_post(s->monitor_source, result);
+    
+    return 0;
+}
+
+int sink_render(struct sink*s, size_t length, struct memchunk *result) {
+    struct pass1_info pass1_info;
+    struct pass2_info pass2_info;
+    assert(s && result);
+
+    if (!length)
+        length = (size_t) -1;
+    
+    /* Calculate how many bytes to mix */
+    pass1_info.maxlength = length;
+    pass1_info.count = 0;
+    
+    idxset_foreach(s->input_streams, get_max_length, &pass1_info);
+    assert(pass1_info.maxlength);
+
+    /* No data to mix */
+    if (pass1_info.count == 0)
+        return -1;
+
+    if (pass1_info.count == 1) {
+        struct input_stream *i = pass1_info.last_input_stream;
+        size_t l;
+
+        assert(i);
+
+        if (memblockq_peek(i->memblockq, result) != 0)
+            return -1;
+
+        l = length < result->length ? length : result->length;
+        result->length = l;
+        memblockq_drop(i->memblockq, l);
+        return 0;
+    }
+
+    /* Do the mixing */
+    result->memblock = silence(memblock_new(result->length), &s->sample_spec);
+    result->index = 0;
+    result->length = pass1_info.maxlength;
+    pass2_info.chunk = result;
+    pass2_info.spec = &s->sample_spec;
+    idxset_foreach(s->input_streams, do_mix, &pass2_info);
+
+    assert(s->monitor_source);
+
+    source_post(s->monitor_source, result);
+    return 0;
+}
+
+void sink_notify(struct sink*s) {
+    assert(s);
+
+    if (s->notify_callback)
+        s->notify_callback(s, s->userdata);
+}
+
+void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata) {
+    assert(s && notify_callback);
+
+    s->notify_callback = notify_callback;
+    s->userdata = userdata;
+}
+
+
diff --git a/src/sink.h b/src/sink.h
new file mode 100644 (file)
index 0000000..a6f9800
--- /dev/null
@@ -0,0 +1,38 @@
+#ifndef foosinkhfoo
+#define foosinkhfoo
+
+struct sink;
+
+#include <inttypes.h>
+
+#include "core.h"
+#include "sample.h"
+#include "idxset.h"
+#include "source.h"
+
+struct sink {
+    char *name;
+    uint32_t index;
+    
+    struct core *core;
+    struct sample_spec sample_spec;
+    struct idxset *input_streams;
+
+    struct source *monitor_source;
+
+    uint8_t volume;
+
+    void (*notify_callback)(struct sink*sink, void *userdata);
+    void *userdata;
+};
+
+struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec);
+void sink_free(struct sink* s);
+
+int sink_render(struct sink*s, size_t length, struct memchunk *result);
+int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result);
+
+void sink_notify(struct sink*s);
+void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata);
+
+#endif
diff --git a/src/socket-server.c b/src/socket-server.c
new file mode 100644 (file)
index 0000000..2a1db9a
--- /dev/null
@@ -0,0 +1,157 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "socket-server.h"
+
+struct socket_server {
+    int fd;
+    char *filename;
+
+    void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata);
+    void *userdata;
+
+    struct mainloop_source *mainloop_source;
+};
+
+static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) {
+    struct socket_server *s = userdata;
+    struct iochannel *io;
+    int nfd;
+    assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s);
+
+    if ((nfd = accept(fd, NULL, NULL)) < 0) {
+        fprintf(stderr, "accept(): %s\n", strerror(errno));
+        return;
+    }
+
+    if (!s->on_connection) {
+        close(nfd);
+        return;
+    }
+
+    io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd);
+    assert(io);
+    s->on_connection(s, io, s->userdata);
+}
+
+struct socket_server* socket_server_new(struct mainloop *m, int fd) {
+    struct socket_server *s;
+    assert(m && fd >= 0);
+    
+    s = malloc(sizeof(struct socket_server));
+    assert(s);
+    s->fd = fd;
+    s->filename = NULL;
+    s->on_connection = NULL;
+    s->userdata = NULL;
+
+    s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s);
+    assert(s->mainloop_source);
+    
+    return s;
+}
+
+struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) {
+    int fd = -1;
+    struct sockaddr_un sa;
+    struct socket_server *s;
+    
+    assert(m && filename);
+
+    if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) {
+        fprintf(stderr, "socket(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    sa.sun_family = AF_LOCAL;
+    strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1);
+    sa.sun_path[sizeof(sa.sun_path) - 1] = 0;
+
+    if (bind(fd, (struct sockaddr*) &sa, SUN_LEN(&sa)) < 0) {
+        fprintf(stderr, "bind(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    if (listen(fd, 5) < 0) {
+        fprintf(stderr, "listen(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    s = socket_server_new(m, fd);
+    assert(s);
+
+    s->filename = strdup(filename);
+    assert(s->filename);
+
+    return s;
+                                                                                                                                                                         
+fail:
+    if (fd >= 0)
+        close(fd);
+
+    return NULL;
+}
+
+struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) {
+    int fd = -1;
+    struct sockaddr_in sa;
+
+    assert(m && port);
+
+    if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+        fprintf(stderr, "socket(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    sa.sin_family = AF_INET;
+    sa.sin_port = htons(port);
+    sa.sin_addr.s_addr = htonl(address);
+    
+    if (bind(fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
+        fprintf(stderr, "bind(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    if (listen(fd, 5) < 0) {
+        fprintf(stderr, "listen(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    return socket_server_new(m, fd);
+    
+fail:
+    if (fd >= 0)
+        close(fd);
+
+    return NULL;
+}
+
+void socket_server_free(struct socket_server*s) {
+    assert(s);
+    close(s->fd);
+
+    if (s->filename) {
+        unlink(s->filename);
+        free(s->filename);
+    }
+
+    mainloop_source_free(s->mainloop_source);
+    
+    free(s);
+}
+
+void socket_server_set_callback(struct socket_server*s, void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata), void *userdata) {
+    assert(s);
+
+    s->on_connection = on_connection;
+    s->userdata = userdata;
+}
diff --git a/src/socket-server.h b/src/socket-server.h
new file mode 100644 (file)
index 0000000..4814fc6
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef foosocketserverhfoo
+#define foosocketserverhfoo
+
+#include <inttypes.h>
+#include "mainloop.h"
+#include "iochannel.h"
+
+struct socket_server;
+
+struct socket_server* socket_server_new(struct mainloop *m, int fd);
+struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename);
+struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port);
+
+void socket_server_free(struct socket_server*s);
+
+void socket_server_set_callback(struct socket_server*s, void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata), void *userdata);
+
+#endif
diff --git a/src/source.c b/src/source.c
new file mode 100644 (file)
index 0000000..2f34c46
--- /dev/null
@@ -0,0 +1,58 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "source.h"
+#include "outputstream.h"
+
+struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) {
+    struct source *s;
+    int r;
+    assert(core && spec);
+
+    s = malloc(sizeof(struct source));
+    assert(s);
+
+    s->name = name ? strdup(name) : NULL;
+    r = idxset_put(core->sources, s, &s->index);
+    assert(s->index != IDXSET_INVALID && r >= 0);
+
+    s->core = core;
+    s->sample_spec = *spec;
+    s->output_streams = idxset_new(NULL, NULL);
+
+    s->link_change_callback = NULL;
+    s->userdata = NULL;
+
+    return s;
+}
+
+static void do_free(void *p, void *userdata) {
+    struct output_stream *o = p;
+    assert(o);
+    output_stream_free(o);
+};
+
+void source_free(struct source *s) {
+    assert(s);
+
+    idxset_remove_by_data(s->core->sources, s, NULL);
+    idxset_free(s->output_streams, do_free, NULL);
+    free(s->name);
+    free(s);
+}
+
+static int do_post(void *p, uint32_t index, int *del, void*userdata) {
+    struct memchunk *chunk = userdata;
+    struct output_stream *o = p;
+    assert(o && o->memblockq && index && del && chunk);
+
+    memblockq_push(o->memblockq, chunk, 0);
+    return 0;
+}
+
+void source_post(struct source*s, struct memchunk *chunk) {
+    assert(s && chunk);
+
+    idxset_foreach(s->output_streams, do_post, chunk);
+}
diff --git a/src/source.h b/src/source.h
new file mode 100644 (file)
index 0000000..3beb3f9
--- /dev/null
@@ -0,0 +1,30 @@
+#ifndef foosourcehfoo
+#define foosourcehfoo
+
+struct source;
+
+#include <inttypes.h>
+#include "core.h"
+#include "sample.h"
+#include "idxset.h"
+#include "memblock.h"
+
+struct source {
+    char *name;
+    uint32_t index;
+    
+    struct core *core;
+    struct sample_spec sample_spec;
+    struct idxset *output_streams;
+
+    void (*link_change_callback)(struct source*source, void *userdata);
+    void *userdata;
+};
+
+struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec);
+void source_free(struct source *s);
+
+/* Pass a new memory block to all output streams */
+void source_post(struct source*s, struct memchunk *b);
+
+#endif
diff --git a/src/strbuf.c b/src/strbuf.c
new file mode 100644 (file)
index 0000000..7c8b965
--- /dev/null
@@ -0,0 +1,122 @@
+#ifndef foostrbufhfoo
+#define foostrbufhfoo
+
+#include <sys/types.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+struct chunk {
+    struct chunk *next;
+    char text[];
+};
+
+struct strbuf {
+    size_t length;
+    struct chunk *head, *tail;
+};
+
+struct strbuf *strbuf_new(void) {
+    struct strbuf *sb = malloc(sizeof(struct strbuf));
+    assert(sb);
+    sb->length = 0;
+    sb->head = sb->tail = NULL;
+    return sb;
+}
+
+void strbuf_free(struct strbuf *sb) {
+    assert(sb);
+    while (sb->head) {
+        struct chunk *c = sb->head;
+        sb->head = sb->head->next;
+        free(c);
+    }
+
+    free(sb);
+}
+
+char *strbuf_tostring(struct strbuf *sb) {
+    char *t, *e;
+    struct chunk *c;
+    assert(sb);
+
+    t = malloc(sb->length+1);
+    assert(t);
+
+    e = t;
+    *e = 0;
+    for (c = sb->head; c; c = c->next) {
+        strcpy(e, c->text);
+        e = strchr(e, 0);
+    }
+
+    return t;
+}
+
+void strbuf_puts(struct strbuf *sb, const char *t) {
+    struct chunk *c;
+    size_t l;
+    assert(sb && t);
+
+    l = strlen(t);
+    c = malloc(sizeof(struct chunk)+l);
+    assert(c);
+
+    c->next = NULL;
+    strcpy(c->text, t);
+
+    if (sb->tail) {
+        assert(sb->head);
+        sb->tail->next = c;
+    } else {
+        assert(!sb->head);
+        sb->head = c;
+    }
+
+    sb->tail = c;
+    sb->length += l;
+}
+
+int strbuf_printf(struct strbuf *sb, const char *format, ...) {
+    int r, size = 100;
+    struct chunk *c = NULL;
+
+    assert(sb);
+    
+    for(;;) {
+        va_list ap;
+
+        c = realloc(c, sizeof(struct chunk)+size);
+        assert(c);
+
+        va_start(ap, format);
+        r = vsnprintf(c->text, size, format, ap);
+        va_end(ap);
+        
+        if (r > -1 && r < size) {
+            c->next = NULL;
+            
+            if (sb->tail) {
+                assert(sb->head);
+                sb->tail->next = c;
+            } else {
+                assert(!sb->head);
+                sb->head = c;
+            }
+            
+            sb->tail = c;
+            sb->length += r;
+            
+            return r;
+        }
+
+        if (r > -1)    /* glibc 2.1 */
+            size = r+1; 
+        else           /* glibc 2.0 */
+            size *= 2;
+    }
+}
+
+#endif
diff --git a/src/strbuf.h b/src/strbuf.h
new file mode 100644 (file)
index 0000000..6ad582a
--- /dev/null
@@ -0,0 +1,13 @@
+#ifndef foostrbufhfoo
+#define foostrbufhfoo
+
+struct strbuf;
+
+struct strbuf *strbuf_new(void);
+void strbuf_free(struct strbuf *sb);
+char *strbuf_tostring(struct strbuf *sb);
+
+int strbuf_printf(struct strbuf *sb, const char *format, ...);
+void strbuf_puts(struct strbuf *sb, const char *t);
+
+#endif