c->protocol_name = protocol_name;
c->name = name ? strdup(name) : NULL;
c->kill = NULL;
- c->userdata = NULL;
+ c->kill_userdata = NULL;
c->core = core;
r = idxset_put(core->clients, c, &c->index);
free(c->name);
free(c);
}
+
+void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata) {
+ assert(c && kill);
+ c->kill = kill;
+ c->kill_userdata = userdata;
+}
+
+void client_kill(struct client *c) {
+ assert(c);
+ c->kill(c, c->kill_userdata);
+}
+
const char *protocol_name;
- void *userdata;
- void (*kill)(struct client *c);
+ void *kill_userdata;
+ void (*kill)(struct client *c, void *userdata);
struct core *core;
};
struct client *client_new(struct core *c, const char *protocol_name, char *name);
+
+/* This function should be called only by the code that created the client */
void client_free(struct client *c);
+/* The registrant of the client should call this function to set a
+ * callback function which is called when destruction of the client is
+ * requested */
+void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata);
+
+/* Code that didn't create the client should call this function to
+ * request destruction of the client */
+void client_kill(struct client *c);
+
#endif
if ((sink = idxset_get_by_index(c->sinks, c->default_sink_index)))
return sink;
- if (!(sink = idxset_rrobin(c->sinks, NULL)))
+ if (!(sink = idxset_first(c->sinks, &c->default_sink_index)))
return NULL;
fprintf(stderr, "Default sink vanished, setting to %u\n", sink->index);
- c->default_sink_index = sink->index;
return sink;
}
if ((source = idxset_get_by_index(c->sources, c->default_source_index)))
return source;
- if (!(source = idxset_rrobin(c->sources, NULL)))
+ if (!(source = idxset_first(c->sources, &c->default_source_index)))
return NULL;
fprintf(stderr, "Default source vanished, setting to %u\n", source->index);
- c->default_source_index = source->index;
return source;
}
+#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
int (*compare_func)(void *a, void *b);
unsigned hash_table_size, n_entries;
- struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail, *rrobin;
+ struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail;
uint32_t index, start_index, array_size;
};
s->index = 0;
s->start_index = 0;
s->n_entries = 0;
- s->rrobin = NULL;
s->iterate_list_head = s->iterate_list_tail = NULL;
static void extend_array(struct idxset *s, uint32_t index) {
uint32_t i, j, l;
struct idxset_entry** n;
- assert(index >= s->start_index );
+ assert(index >= s->start_index);
- if (index <= s->start_index + s->array_size)
+ if (index < s->start_index + s->array_size)
return;
for (i = 0; i < s->array_size; i++)
}
static struct idxset_entry** array_index(struct idxset*s, uint32_t index) {
-
if (index >= s->start_index + s->array_size)
return NULL;
if (index < s->start_index)
return NULL;
-
+
return s->array + (index - s->start_index);
}
assert(s && e);
/* Remove from array */
- a = array_index(s, s->index);
- assert(a && *a == e);
+ a = array_index(s, e->index);
+ assert(a && *a && *a == e);
*a = NULL;
/* Remove from linked list */
else
s->hash_table[e->hash_value] = e->hash_next;
- if (s->rrobin == e)
- s->rrobin = NULL;
-
free(e);
assert(s->n_entries >= 1);
void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) {
struct idxset_entry *e;
unsigned h;
-
+
assert(s->hash_func);
h = s->hash_func(data) % s->hash_table_size;
}
void* idxset_rrobin(struct idxset *s, uint32_t *index) {
- assert(s);
+ struct idxset_entry **a, *e = NULL;
+ assert(s && index);
+
+ if ((a = array_index(s, *index)) && *a)
+ e = (*a)->iterate_next;
+
+ if (!e)
+ e = s->iterate_list_head;
- if (s->rrobin)
- s->rrobin = s->rrobin->iterate_next;
+ if (!e)
+ return NULL;
- if (!s->rrobin)
- s->rrobin = s->iterate_list_head;
+ if (index)
+ *index = e->index;
+
+ return e->data;
+}
+
+void* idxset_first(struct idxset *s, uint32_t *index) {
+ assert(s);
- if (!s->rrobin)
+ if (!s->iterate_list_head)
return NULL;
if (index)
- *index = s->rrobin->index;
-
- return s->rrobin->data;
+ *index = s->iterate_list_head->index;
+ return s->iterate_list_head->data;
}
+
int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) {
struct idxset_entry *e;
assert(s && func);
void* idxset_remove_by_index(struct idxset*s, uint32_t index);
void* idxset_remove_by_data(struct idxset*s, void *p, uint32_t *index);
+/* This may be used to iterate through all entries. When called with
+ an invalid index value it returns the first entry, otherwise the
+ next following. The function is best called with *index =
+ IDXSET_VALID first. */
void* idxset_rrobin(struct idxset *s, uint32_t *index);
+/* Return the oldest entry in the idxset */
+void* idxset_first(struct idxset *s, uint32_t *index);
+
int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata);
unsigned idxset_ncontents(struct idxset*s);
i->name = name ? strdup(name) : NULL;
i->sink = s;
i->spec = *spec;
+ i->kill = NULL;
+ i->kill_userdata = NULL;
i->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
assert(i->memblockq);
free(i);
}
-void input_stream_notify(struct input_stream *i) {
+void input_stream_notify_sink(struct input_stream *i) {
assert(i);
if (memblockq_is_empty(i->memblockq))
sink_notify(i->sink);
}
+
+void input_stream_set_kill_callback(struct input_stream *i, void (*kill)(struct input_stream*i, void *userdata), void *userdata) {
+ assert(i && kill);
+ i->kill = kill;
+ i->kill_userdata = userdata;
+}
+
+
+void input_stream_kill(struct input_stream*i) {
+ assert(i);
+
+ if (i->kill)
+ i->kill(i, i->kill_userdata);
+}
struct sample_spec spec;
struct memblockq *memblockq;
+
+ void (*kill)(struct input_stream* i, void *userdata);
+ void *kill_userdata;
};
struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name);
void input_stream_free(struct input_stream* i);
-void input_stream_notify(struct input_stream *i);
+/* This function notifies the attached sink that new data is available
+ * in the memblockq */
+void input_stream_notify_sink(struct input_stream *i);
+
+
+/* The registrant of the input stream should call this function to set a
+ * callback function which is called when destruction of the input stream is
+ * requested */
+void input_stream_set_kill_callback(struct input_stream *c, void (*kill)(struct input_stream*i, void *userdata), void *userdata);
+
+/* Code that didn't create the input stream should call this function to
+ * request destruction of it */
+void input_stream_kill(struct input_stream *c);
#endif
q = bq->blocks;
bq->blocks = bq->blocks->next;
+ if (bq->blocks == NULL)
+ bq->blocks_tail = NULL;
memblock_unref(q->chunk.memblock);
free(q);
o->name = name ? strdup(name) : NULL;
o->source = s;
o->spec = *spec;
+ o->kill = NULL;
+ o->kill_userdata = NULL;
o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
assert(o->memblockq);
free(o->name);
free(o);
}
+
+void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata) {
+ assert(i && kill);
+ i->kill = kill;
+ i->kill_userdata = userdata;
+}
+
+
+void output_stream_kill(struct output_stream*i) {
+ assert(i);
+
+ if (i->kill)
+ i->kill(i, i->kill_userdata);
+}
struct sample_spec spec;
struct memblockq *memblockq;
+ void (*kill)(struct output_stream* i, void *userdata);
+ void *kill_userdata;
};
struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name);
void output_stream_free(struct output_stream* o);
+void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata);
+void output_stream_kill(struct output_stream*i);
+
#endif
free(c);
}
+static void destroy_connection(struct connection *c) {
+ assert(c && c->protocol);
+ idxset_remove_by_data(c->protocol->connections, c, NULL);
+ free_connection(c, NULL);
+}
+
+static void istream_kill_cb(struct input_stream *i, void *userdata) {
+ struct connection *c = userdata;
+ assert(i && c);
+ destroy_connection(c);
+}
+
+static void ostream_kill_cb(struct output_stream *o, void *userdata) {
+ struct connection *c = userdata;
+ assert(o && c);
+ destroy_connection(c);
+}
+
+static void client_kill_cb(struct client *client, void*userdata) {
+ struct connection *c= userdata;
+ assert(client && c);
+ destroy_connection(c);
+}
+
static void io_callback(struct iochannel*io, void *userdata) {
struct connection *c = userdata;
assert(io && c);
chunk.index = 0;
memblockq_push(c->istream->memblockq, &chunk, 0);
- input_stream_notify(c->istream);
+ input_stream_notify_sink(c->istream);
memblock_unref(chunk.memblock);
}
return;
fail:
- idxset_remove_by_data(c->protocol->connections, c, NULL);
- free_connection(c, NULL);
+ destroy_connection(c);
}
static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
c->client = client_new(p->core, "SIMPLE", "Client");
assert(c->client);
+ client_set_kill_callback(c->client, client_kill_cb, c);
if (p->mode & PROTOCOL_SIMPLE_RECORD) {
struct source *source;
c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name);
assert(c->ostream);
+ output_stream_set_kill_callback(c->ostream, ostream_kill_cb, c);
}
if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name);
assert(c->istream);
+ input_stream_set_kill_callback(c->istream, istream_kill_cb, c);
}
}
void sink_free(struct sink *s) {
- struct input_stream *i;
+ struct input_stream *i, *j = NULL;
assert(s);
- while ((i = idxset_rrobin(s->input_streams, NULL)))
- input_stream_free(i);
+ while ((i = idxset_first(s->input_streams, NULL))) {
+ assert(i != j);
+ input_stream_kill(i);
+ j = i;
+ }
idxset_free(s->input_streams, NULL, NULL);
idxset_remove_by_data(s->core->sinks, s, NULL);
}
void source_free(struct source *s) {
- struct output_stream *o;
+ struct output_stream *o, *j = NULL;
assert(s);
- while ((o = idxset_rrobin(s->output_streams, NULL)))
+ while ((o = idxset_first(s->output_streams, NULL))) {
+ assert(o != j);
output_stream_free(o);
+ j = o;
+ }
idxset_free(s->output_streams, NULL, NULL);
idxset_remove_by_data(s->core->sources, s, NULL);