draining ind native protocol
authorLennart Poettering <lennart@poettering.net>
Wed, 7 Jul 2004 00:22:46 +0000 (00:22 +0000)
committerLennart Poettering <lennart@poettering.net>
Wed, 7 Jul 2004 00:22:46 +0000 (00:22 +0000)
fixes in callback code on object destruction
simple protocol

git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@52 fefdeb5f-60dc-0310-8127-8f9354f1896f

38 files changed:
src/Makefile.am
src/client.c
src/core.c
src/iochannel.h
src/main.c
src/memblockq.c
src/memblockq.h
src/module-cli.c
src/module-oss-mmap.c
src/module-oss.c
src/module-pipe-sink.c
src/module-protocol-stub.c
src/module.c
src/module.h
src/pacat-simple.c
src/pacat.c
src/pdispatch.c
src/pdispatch.h
src/polyp.c
src/polyp.h
src/polypdef.h
src/protocol-esound.c
src/protocol-native-spec.h
src/protocol-native.c
src/protocol-simple.c
src/pstream.c
src/pstream.h
src/sample.h
src/simple.c
src/sink.c
src/sinkinput.c
src/socket-client.h
src/socket-server.h
src/source.c
src/sourceoutput.c
src/todo
src/util.c
src/util.h

index 70e7e09..d7002ca 100644 (file)
@@ -220,6 +220,7 @@ libpolyp_la_SOURCES = polyp.c polyp.h \
                protocol-native-spec.h \
                mainloop-api.c mainloop-api.h \
                mainloop.c mainloop.h \
+               mainloop-signal.c mainloop-signal.h \
                idxset.c idxset.h \
                util.c util.h \
                memblock.c memblock.h \
@@ -237,12 +238,13 @@ libpolyp_error_la_CFLAGS = $(AM_CFLAGS)
 
 libpolyp_simple_la_SOURCES = simple.c simple.h 
 libpolyp_simple_la_CFLAGS = $(AM_CFLAGS)
-libpolyp_simple_la_LIBADD = libpolyp.la #libpolyp-error.la
+libpolyp_simple_la_LIBADD = libpolyp.la
+#libpolyp-error.la
 
-pacat_SOURCES = pacat.c       #$(libpolyp_la_SOURCES)
-pacat_LDADD = libpolyp.la
+pacat_SOURCES = pacat.c $(libpolyp_la_SOURCES) $(libpolyp_error_la_SOURCES)
+#pacat_LDADD = libpolyp.la
 pacat_CFLAGS = $(AM_CFLAGS) 
 
-pacat_simple_SOURCES = pacat-simple.c
-pacat_simple_LDADD = libpolyp-simple.a
+pacat_simple_SOURCES = pacat-simple.c $(libpolyp_la_SOURCES) $(libpolyp_simple_la_SOURCES) $(libpolyp_error_la_SOURCES)
+#pacat_simple_LDADD = libpolyp-simple.la libpolyp-error.la
 pacat_simple_CFLAGS = $(AM_CFLAGS)
index 7f1648a..d07f188 100644 (file)
@@ -59,7 +59,7 @@ char *pa_client_list_to_string(struct pa_core *c) {
     pa_strbuf_printf(s, "%u client(s).\n", pa_idxset_ncontents(c->clients));
     
     for (client = pa_idxset_first(c->clients, &index); client; client = pa_idxset_next(c->clients, &index))
-        pa_strbuf_printf(s, "    index: %u, name: <%s>, protocol_name: <%s>\n", client->index, client->name, client->protocol_name);
+        pa_strbuf_printf(s, "    index: %u\n\tname: <%s>\n\tprotocol_name: <%s>\n", client->index, client->name, client->protocol_name);
     
     return pa_strbuf_tostring_free(s);
 }
index 4615903..a1fe7d9 100644 (file)
@@ -7,6 +7,7 @@
 #include "sink.h"
 #include "source.h"
 #include "namereg.h"
+#include "util.h"
 
 struct pa_core* pa_core_new(struct pa_mainloop_api *m) {
     struct pa_core* c;
@@ -24,6 +25,8 @@ struct pa_core* pa_core_new(struct pa_mainloop_api *m) {
 
     c->modules = NULL;
     c->namereg = NULL;
+
+    pa_check_for_sigpipe();
     
     return c;
 };
index c550af1..1a5057d 100644 (file)
@@ -4,6 +4,8 @@
 #include <sys/types.h>
 #include "mainloop-api.h"
 
+/* It is safe to destroy the calling iochannel object from the callback */
+
 struct pa_iochannel;
 
 struct pa_iochannel* pa_iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd);
index 88552fe..c7a83fe 100644 (file)
 
 static struct pa_mainloop *mainloop;
 
-static void signal_callback(void *id, int sig, void *userdata) {
+static void exit_signal_callback(void *id, int sig, void *userdata) {
     struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop);
     m->quit(m, 1);
-    fprintf(stderr, "main: got signal.\n");
+    fprintf(stderr, __FILE__": got signal.\n");
+}
+
+static void aux_signal_callback(void *id, int sig, void *userdata) {
+    struct pa_core *c = userdata;
+    assert(c);
+    pa_module_load(c, sig == SIGUSR1 ? "module-cli" : "module-cli-protocol-unix", NULL);
 }
 
 int main(int argc, char *argv[]) {
@@ -30,12 +36,12 @@ int main(int argc, char *argv[]) {
 
     r = pa_signal_init(pa_mainloop_get_api(mainloop));
     assert(r == 0);
-    pa_signal_register(SIGINT, signal_callback, NULL);
+    pa_signal_register(SIGINT, exit_signal_callback, NULL);
     signal(SIGPIPE, SIG_IGN);
 
     c = pa_core_new(pa_mainloop_get_api(mainloop));
     assert(c);
-
+    
     pa_module_load(c, "module-oss", "/dev/dsp");
 /*    pa_module_load(c, "module-pipe-sink", NULL);*/
     pa_module_load(c, "module-simple-protocol-tcp", NULL);
@@ -46,6 +52,9 @@ int main(int argc, char *argv[]) {
     pa_module_load(c, "module-native-protocol-unix", NULL);
     pa_module_load(c, "module-esound-protocol-tcp", NULL);
     pa_module_load(c, "module-cli", NULL);
+
+    pa_signal_register(SIGUSR1, aux_signal_callback, c);
+    pa_signal_register(SIGUSR2, aux_signal_callback, c);
     
     fprintf(stderr, "main: mainloop entry.\n");
     if (pa_mainloop_run(mainloop, &retval) < 0)
index b70a67f..e5dab68 100644 (file)
@@ -15,30 +15,45 @@ struct memblock_list {
 struct pa_memblockq {
     struct memblock_list *blocks, *blocks_tail;
     unsigned n_blocks;
-    size_t total_length, maxlength, base, prebuf;
+    size_t current_length, maxlength, tlength, base, prebuf, minreq;
     int measure_delay;
     uint32_t delay;
     struct pa_mcalign *mcalign;
 };
 
-struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t base, size_t prebuf) {
+struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t base, size_t prebuf, size_t minreq) {
     struct pa_memblockq* bq;
-    assert(maxlength && base);
+    assert(maxlength && base && maxlength);
     
     bq = malloc(sizeof(struct pa_memblockq));
     assert(bq);
     bq->blocks = bq->blocks_tail = 0;
     bq->n_blocks = 0;
-    bq->total_length = 0;
+
+    bq->current_length = 0;
+
+    fprintf(stderr, "memblockq requested: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", maxlength, tlength, base, prebuf, minreq);
+    
     bq->base = base;
+
     bq->maxlength = ((maxlength+base-1)/base)*base;
-    bq->prebuf = prebuf == (size_t) -1 ? bq->maxlength/2 : prebuf;
+    assert(bq->maxlength >= base);
+
+    bq->tlength = ((tlength+base-1)/base)*base;
+    if (bq->tlength == 0 || bq->tlength >= bq->maxlength)
+        bq->tlength = bq->maxlength;
     
+    bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf;
+    bq->prebuf = (bq->prebuf/base)*base;
     if (bq->prebuf > bq->maxlength)
         bq->prebuf = bq->maxlength;
     
-    assert(bq->maxlength >= base);
+    bq->minreq = (minreq/base)*base;
+    if (bq->minreq == 0)
+        bq->minreq = 1;
 
+    fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq);
+    
     bq->measure_delay = 0;
     bq->delay = 0;
 
@@ -88,7 +103,7 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
     bq->blocks_tail = q;
 
     bq->n_blocks++;
-    bq->total_length += chunk->length;
+    bq->current_length += chunk->length;
 
     pa_memblockq_shorten(bq, bq->maxlength);
 }
@@ -96,7 +111,7 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
 int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) {
     assert(bq && chunk);
 
-    if (!bq->blocks || bq->total_length < bq->prebuf)
+    if (!bq->blocks || bq->current_length < bq->prebuf)
         return -1;
 
     bq->prebuf = 0;
@@ -116,7 +131,7 @@ int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
     
     assert(bq && chunk);
 
-    if (!bq->blocks || bq->total_length < bq->prebuf)
+    if (!bq->blocks || bq->current_length < bq->prebuf)
         return -1;
 
     bq->prebuf = 0;
@@ -127,7 +142,7 @@ int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
     *chunk = q->chunk;
 
     bq->n_blocks--;
-    bq->total_length -= chunk->length;
+    bq->current_length -= chunk->length;
 
     free(q);
     return 0;
@@ -159,7 +174,7 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
 
     while (length > 0) {
         size_t l = length;
-        assert(bq->blocks && bq->total_length >= length);
+        assert(bq->blocks && bq->current_length >= length);
         
         if (l > bq->blocks->chunk.length)
             l = bq->blocks->chunk.length;
@@ -169,7 +184,7 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
         
         bq->blocks->chunk.index += l;
         bq->blocks->chunk.length -= l;
-        bq->total_length -= l;
+        bq->current_length -= l;
         
         if (bq->blocks->chunk.length == 0) {
             struct memblock_list *q;
@@ -192,12 +207,12 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) {
     size_t l;
     assert(bq);
 
-    if (bq->total_length <= length)
+    if (bq->current_length <= length)
         return;
 
     fprintf(stderr, "Warning! pa_memblockq_shorten()\n");
     
-    l = bq->total_length - length;
+    l = bq->current_length - length;
     l /= bq->base;
     l *= bq->base;
 
@@ -213,14 +228,13 @@ void pa_memblockq_empty(struct pa_memblockq *bq) {
 int pa_memblockq_is_readable(struct pa_memblockq *bq) {
     assert(bq);
 
-    return bq->total_length >= bq->prebuf;
+    return bq->current_length >= bq->prebuf;
 }
 
 int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) {
     assert(bq);
 
-    assert(length <= bq->maxlength);
-    return bq->total_length + length <= bq->maxlength;
+    return bq->current_length + length <= bq->tlength;
 }
 
 uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
@@ -230,16 +244,20 @@ uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
 
 uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) {
     assert(bq);
-    return bq->total_length;
+    return bq->current_length;
 }
 
-uint32_t pa_memblockq_missing_to(struct pa_memblockq *bq, size_t qlen) {
-    assert(bq && qlen);
+uint32_t pa_memblockq_missing(struct pa_memblockq *bq) {
+    size_t l;
+    assert(bq);
 
-    if (bq->total_length >= qlen)
+    if (bq->current_length >= bq->tlength)
         return 0;
 
-    return qlen - bq->total_length;
+    l = bq->tlength - bq->current_length;
+    assert(l);
+
+    return (l >= bq->minreq) ? l : 0;
 }
 
 void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta) {
@@ -264,3 +282,8 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *
         delta = 0;
     }
 }
+
+uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq) {
+    assert(bq);
+    return bq->minreq;
+}
index d8b9567..bece4fd 100644 (file)
@@ -8,11 +8,18 @@
 
 struct pa_memblockq;
 
-/* Parameters: the maximum length of the memblock queue, a base value
-for all operations (that is, all byte operations shall work on
-multiples of this base value) and an amount of bytes to prebuffer
-before having pa_memblockq_peek() succeed. */
-struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t base, size_t prebuf);
+/* Parameters:
+   - maxlength: maximum length of queue. If more data is pushed into the queue, data from the front is dropped
+   - length:    the target length of the queue.
+   - base:      a base value for all metrics. Only multiples of this value are popped from the queue
+   - prebuf:    before passing the first byte out, make sure that enough bytes are in the queue
+   - minreq:    pa_memblockq_missing() will only return values greater than this value
+*/
+struct pa_memblockq* pa_memblockq_new(size_t maxlength,
+                                      size_t tlength,
+                                      size_t base,
+                                      size_t prebuf,
+                                      size_t minreq);
 void pa_memblockq_free(struct pa_memblockq*bq);
 
 /* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */
@@ -46,6 +53,9 @@ uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq);
 uint32_t pa_memblockq_get_length(struct pa_memblockq *bq);
 
 /* Return how many bytes are missing in queue to the specified fill amount */
-uint32_t pa_memblockq_missing_to(struct pa_memblockq *bq, size_t qlen);
+uint32_t pa_memblockq_missing(struct pa_memblockq *bq);
+
+
+uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq);
 
 #endif
index 7306ade..a6e9582 100644 (file)
@@ -14,7 +14,7 @@ static void eof_cb(struct pa_cli*c, void *userdata) {
     pa_module_unload_request(m->core, m);
 }
 
-int module_init(struct pa_core *c, struct pa_module*m) {
+int pa_module_init(struct pa_core *c, struct pa_module*m) {
     struct pa_iochannel *io;
     assert(c && m);
 
@@ -35,7 +35,7 @@ int module_init(struct pa_core *c, struct pa_module*m) {
     return 0;
 }
 
-void module_done(struct pa_core *c, struct pa_module*m) {
+void pa_module_done(struct pa_core *c, struct pa_module*m) {
     assert(c && m);
 
     pa_cli_free(m->userdata);
index ef2b19d..62c2cc2 100644 (file)
@@ -180,7 +180,7 @@ static uint32_t sink_get_latency_cb(struct pa_sink *s) {
     return pa_samples_usec(u->out_fill, &s->sample_spec);
 }
 
-int module_init(struct pa_core *c, struct pa_module*m) {
+int pa_module_init(struct pa_core *c, struct pa_module*m) {
     struct audio_buf_info info;
     struct userdata *u = NULL;
     char *p;
index 310c89c..5ec9d2d 100644 (file)
@@ -111,7 +111,7 @@ static uint32_t sink_get_latency_cb(struct pa_sink *s) {
     return pa_samples_usec(arg, &s->sample_spec);
 }
 
-int module_init(struct pa_core *c, struct pa_module*m) {
+int pa_module_init(struct pa_core *c, struct pa_module*m) {
     struct audio_buf_info info;
     struct userdata *u = NULL;
     char *p;
@@ -224,7 +224,7 @@ fail:
     return -1;
 }
 
-void module_done(struct pa_core *c, struct pa_module*m) {
+void pa_module_done(struct pa_core *c, struct pa_module*m) {
     struct userdata *u;
     assert(c && m);
 
index ea5c15d..efba3b5 100644 (file)
@@ -73,7 +73,7 @@ static void io_callback(struct pa_iochannel *io, void*userdata) {
     do_write(u);
 }
 
-int module_init(struct pa_core *c, struct pa_module*m) {
+int pa_module_init(struct pa_core *c, struct pa_module*m) {
     struct userdata *u = NULL;
     struct stat st;
     char *p;
@@ -137,7 +137,7 @@ fail:
     return -1;
 }
 
-void module_done(struct pa_core *c, struct pa_module*m) {
+void pa_module_done(struct pa_core *c, struct pa_module*m) {
     struct userdata *u;
     assert(c && m);
 
index 713e0ab..885ea4c 100644 (file)
@@ -47,7 +47,7 @@
   #endif
 #endif
 
-int module_init(struct pa_core *c, struct pa_module*m) {
+int pa_module_init(struct pa_core *c, struct pa_module*m) {
     struct pa_socket_server *s;
     assert(c && m);
 
@@ -91,7 +91,7 @@ int module_init(struct pa_core *c, struct pa_module*m) {
     return 0;
 }
 
-void module_done(struct pa_core *c, struct pa_module*m) {
+void pa_module_done(struct pa_core *c, struct pa_module*m) {
     assert(c && m);
 
     protocol_free(m->userdata);
index 468998b..87df3b3 100644 (file)
@@ -23,10 +23,10 @@ struct pa_module* pa_module_load(struct pa_core *c, const char *name, const char
     if (!(m->dl = lt_dlopenext(name)))
         goto fail;
 
-    if (!(m->init = lt_dlsym(m->dl, "module_init")))
+    if (!(m->init = lt_dlsym(m->dl, "pa_module_init")))
         goto fail;
 
-    if (!(m->done = lt_dlsym(m->dl, "module_done")))
+    if (!(m->done = lt_dlsym(m->dl, "pa_module_done")))
         goto fail;
     
     m->userdata = NULL;
@@ -124,7 +124,7 @@ char *pa_module_list_to_string(struct pa_core *c) {
     pa_strbuf_printf(s, "%u module(s) loaded.\n", pa_idxset_ncontents(c->modules));
     
     for (m = pa_idxset_first(c->modules, &index); m; m = pa_idxset_next(c->modules, &index))
-        pa_strbuf_printf(s, "    index: %u, name: <%s>, argument: <%s>\n", m->index, m->name, m->argument);
+        pa_strbuf_printf(s, "    index: %u\n\tname: <%s>\n\targument: <%s>\n", m->index, m->name, m->argument);
     
     return pa_strbuf_tostring_free(s);
 }
index 1cc7d77..2a9cf55 100644 (file)
@@ -30,4 +30,8 @@ char *pa_module_list_to_string(struct pa_core *c);
 void pa_module_unload_request(struct pa_core *c, struct pa_module *m);
 
 
+/* These to following prototypes are for module entrypoints and not implemented by the core */
+int pa_module_init(struct pa_core *c, struct pa_module*m);
+void pa_module_done(struct pa_core *c, struct pa_module*m);
+
 #endif
index 5408221..8b48bdd 100644 (file)
@@ -19,7 +19,7 @@ int main(int argc, char*argv[]) {
     int error;
 
     if (!(s = pa_simple_new(NULL, argv[0], PA_STREAM_PLAYBACK, NULL, "playback", &ss, NULL, &error))) {
-        fprintf(stderr, "Failed to connect to server: %s\n", pa_strerror(error));
+        fprintf(stderr, __FILE__": pa_simple_new() failed: %s\n", pa_strerror(error));
         goto finish;
     }
 
@@ -31,16 +31,16 @@ int main(int argc, char*argv[]) {
             if (r == 0) /* eof */
                 break;
             
-            fprintf(stderr, "read() failed: %s\n", strerror(errno));
+            fprintf(stderr, __FILE__": read() failed: %s\n", strerror(errno));
             goto finish;
         }
 
         if (pa_simple_write(s, buf, r, &error) < 0) {
-            fprintf(stderr, "Failed to write data: %s\n", pa_strerror(error));
+            fprintf(stderr, __FILE__": pa_simple_write() failed: %s\n", pa_strerror(error));
             goto finish;
         }
     }
-    
+
     ret = 0;
 
 finish:
index c69148e..75a94fc 100644 (file)
@@ -1,3 +1,4 @@
+#include <signal.h>
 #include <string.h>
 #include <errno.h>
 #include <unistd.h>
@@ -6,7 +7,9 @@
 #include <stdlib.h>
 
 #include "polyp.h"
+#include "polyp-error.h"
 #include "mainloop.h"
+#include "mainloop-signal.h"
 
 static struct pa_context *context = NULL;
 static struct pa_stream *stream = NULL;
@@ -17,21 +20,29 @@ static size_t buffer_length = 0, buffer_index = 0;
 
 static void* stdin_source = NULL;
 
+static void quit(int ret) {
+    assert(mainloop_api);
+    mainloop_api->quit(mainloop_api, ret);
+}
+
 static void context_die_callback(struct pa_context *c, void *userdata) {
     assert(c);
     fprintf(stderr, "Connection to server shut down, exiting.\n");
-    mainloop_api->quit(mainloop_api, 1);
+    quit(1);
 }
 
 static void stream_die_callback(struct pa_stream *s, void *userdata) {
     assert(s);
     fprintf(stderr, "Stream deleted, exiting.\n");
-    mainloop_api->quit(mainloop_api, 1);
+    quit(1);
 }
 
 static void do_write(size_t length) {
     size_t l;
-    assert(buffer && buffer_length);
+    assert(length);
+
+    if (!buffer || !buffer_length)
+        return;
     
     l = length;
     if (l > buffer_length)
@@ -50,8 +61,9 @@ static void do_write(size_t length) {
 
 static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
     assert(s && length);
-    
-    mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT);
+
+    if (stdin_source)
+        mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT);
 
     if (!buffer)
         return;
@@ -63,13 +75,12 @@ static void stream_complete_callback(struct pa_stream*s, int success, void *user
     assert(s);
 
     if (!success) {
-        fprintf(stderr, "Stream creation failed.\n");
-        mainloop_api->quit(mainloop_api, 1);
+        fprintf(stderr, "Stream creation failed: %s\n", pa_strerror(pa_context_errno(pa_stream_get_context(s))));
+        quit(1);
         return;
     }
 
-    pa_stream_set_die_callback(s, stream_die_callback, NULL);
-    pa_stream_set_write_callback(s, stream_write_callback, NULL);
+    fprintf(stderr, "Stream created.\n");
 }
 
 static void context_complete_callback(struct pa_context *c, int success, void *userdata) {
@@ -82,43 +93,59 @@ static void context_complete_callback(struct pa_context *c, int success, void *u
     assert(c && !stream);
 
     if (!success) {
-        fprintf(stderr, "Connection failed\n");
+        fprintf(stderr, "Connection failed: %s\n", pa_strerror(pa_context_errno(c)));
         goto fail;
     }
+
+    fprintf(stderr, "Connection established.\n");
     
     if (!(stream = pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL))) {
-        fprintf(stderr, "pa_stream_new() failed.\n");
+        fprintf(stderr, "pa_stream_new() failed: %s\n", pa_strerror(pa_context_errno(c)));
         goto fail;
     }
 
+    pa_stream_set_die_callback(stream, stream_die_callback, NULL);
+    pa_stream_set_write_callback(stream, stream_write_callback, NULL);
+    
     return;
     
 fail:
-    mainloop_api->quit(mainloop_api, 1);
+    quit(1);
+}
+
+static void context_drain_complete(struct pa_context*c, void *userdata) {
+    quit(0);
 }
 
 static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
     size_t l, w = 0;
     ssize_t r;
-    assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
+    assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT && stdin_source == id);
 
     if (buffer) {
         mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL);
         return;
     }
 
-    if (!stream || !(l = w = pa_stream_writable_size(stream)))
+    if (!stream || !pa_stream_is_ready(stream) || !(l = w = pa_stream_writable_size(stream)))
         l = 4096;
+    
     buffer = malloc(l);
     assert(buffer);
     if ((r = read(fd, buffer, l)) <= 0) {
-        if (r == 0)
-            mainloop_api->quit(mainloop_api, 0);
-        else {
+        if (r == 0) {
+            fprintf(stderr, "Got EOF.\n");
+            if (pa_context_drain(context, context_drain_complete, NULL) < 0)
+                quit(0);
+            else
+                fprintf(stderr, "Draining connection to server.\n");
+        } else {
             fprintf(stderr, "read() failed: %s\n", strerror(errno));
-            mainloop_api->quit(mainloop_api, 1);
+            quit(1);
         }
 
+        mainloop_api->cancel_io(mainloop_api, stdin_source);
+        stdin_source = NULL;
         return;
     }
 
@@ -129,9 +156,15 @@ static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_m
         do_write(w);
 }
 
+
+static void exit_signal_callback(void *id, int sig, void *userdata) {
+    fprintf(stderr, "Got SIGINT, exiting.\n");
+    quit(0);
+}
+
 int main(int argc, char *argv[]) {
     struct pa_mainloop* m;
-    int ret = 1;
+    int ret = 1, r;
 
     if (!(m = pa_mainloop_new())) {
         fprintf(stderr, "pa_mainloop_new() failed.\n");
@@ -140,6 +173,11 @@ int main(int argc, char *argv[]) {
 
     mainloop_api = pa_mainloop_get_api(m);
 
+    r = pa_signal_init(mainloop_api);
+    assert(r == 0);
+    pa_signal_register(SIGINT, exit_signal_callback, NULL);
+    signal(SIGPIPE, SIG_IGN);
+    
     if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) {
         fprintf(stderr, "source_io() failed.\n");
         goto quit;
index c2db134..ec45419 100644 (file)
@@ -4,10 +4,26 @@
 #include "pdispatch.h"
 #include "protocol-native-spec.h"
 
+static const char *command_names[PA_COMMAND_MAX] = {
+    [PA_COMMAND_ERROR] = "ERROR",
+    [PA_COMMAND_TIMEOUT] = "TIMEOUT",
+    [PA_COMMAND_REPLY] = "REPLY",
+    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = "CREATE_PLAYBACK_STREAM",
+    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = "DELETE_PLAYBACK_STREAM",
+    [PA_COMMAND_CREATE_RECORD_STREAM] = "CREATE_RECORD_STREAM",
+    [PA_COMMAND_DELETE_RECORD_STREAM] = "DELETE_RECORD_STREAM",
+    [PA_COMMAND_AUTH] = "AUTH",
+    [PA_COMMAND_REQUEST] = "REQUEST",
+    [PA_COMMAND_EXIT] = "EXIT",
+    [PA_COMMAND_SET_NAME] = "SET_NAME",
+    [PA_COMMAND_LOOKUP_SINK] = "LOOKUP_SINK",
+    [PA_COMMAND_LOOKUP_SOURCE] = "LOOKUP_SOURCE",
+};
+
 struct reply_info {
     struct pa_pdispatch *pdispatch;
     struct reply_info *next, *previous;
-    int (*callback)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+    void (*callback)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
     void *userdata;
     uint32_t tag;
     void *mainloop_timeout;
@@ -18,6 +34,9 @@ struct pa_pdispatch {
     const struct pa_pdispatch_command *command_table;
     unsigned n_commands;
     struct reply_info *replies;
+    void (*drain_callback)(struct pa_pdispatch *pd, void *userdata);
+    void *drain_userdata;
+    int in_use, shall_free;
 };
 
 static void reply_info_free(struct reply_info *r) {
@@ -49,11 +68,21 @@ struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *mainloop, const st
     pd->command_table = table;
     pd->n_commands = entries;
     pd->replies = NULL;
+    pd->drain_callback = NULL;
+    pd->drain_userdata = NULL;
+
+    pd->in_use = pd->shall_free = 0;
     return pd;
 }
 
 void pa_pdispatch_free(struct pa_pdispatch *pd) {
     assert(pd);
+
+    if (pd->in_use) {
+        pd->shall_free = 1;
+        return;
+    }
+    
     while (pd->replies)
         reply_info_free(pd->replies);
     free(pd);
@@ -61,60 +90,61 @@ void pa_pdispatch_free(struct pa_pdispatch *pd) {
 
 int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*packet, void *userdata) {
     uint32_t tag, command;
-    assert(pd && packet);
     struct pa_tagstruct *ts = NULL;
-    assert(pd && packet && packet->data);
+    int ret = -1;
+    assert(pd && packet && packet->data && !pd->in_use);
 
     if (packet->length <= 8)
-        goto fail;
+        goto finish;
 
     ts = pa_tagstruct_new(packet->data, packet->length);
     assert(ts);
     
     if (pa_tagstruct_getu32(ts, &command) < 0 ||
         pa_tagstruct_getu32(ts, &tag) < 0)
-        goto fail;
+        goto finish;
+
+    /*fprintf(stderr, __FILE__": Recieved opcode <%s>\n", command_names[command]);*/
 
     if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
         struct reply_info *r;
-        int done = 0;
 
         for (r = pd->replies; r; r = r->next) {
-            if (r->tag == tag) {
-                int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata);
-                reply_info_free(r);
-                
-                if (ret < 0)
-                    goto fail;
-                
-                done = 1;
+            if (r->tag != tag)
+                continue;
+            
+            pd->in_use = 1;
+            assert(r->callback);
+            r->callback(r->pdispatch, command, tag, ts, r->userdata);
+            pd->in_use = 0;
+            reply_info_free(r);
+            
+            if (pd->shall_free) {
+                pa_pdispatch_free(pd);
                 break;
             }
-        }
 
-        if (!done)
-            goto fail;
+            if (pd->drain_callback && !pa_pdispatch_is_pending(r->pdispatch))
+                pd->drain_callback(r->pdispatch, r->pdispatch->drain_userdata);
+
+            break;
+        }
 
     } else if (pd->command_table && command < pd->n_commands) {
         const struct pa_pdispatch_command *c = pd->command_table+command;
 
-        if (!c->proc)
-            goto fail;
-        
-        if (c->proc(pd, command, tag, ts, userdata) < 0)
-            goto fail;
+        if (c->proc)
+            c->proc(pd, command, tag, ts, userdata);
     } else
-        goto fail;
-    
-    pa_tagstruct_free(ts);    
-        
-    return 0;
+        goto finish;
 
-fail:
+    ret = 0;
+        
+finish:
     if (ts)
         pa_tagstruct_free(ts);    
 
-    return -1;
+    return ret;
 }
 
 static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) {
@@ -123,9 +153,12 @@ static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct ti
 
     r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata);
     reply_info_free(r);
+
+    if (r->pdispatch->drain_callback && !pa_pdispatch_is_pending(r->pdispatch))
+        r->pdispatch->drain_callback(r->pdispatch, r->pdispatch->drain_userdata);
 }
 
-void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata) {
+void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, void (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata) {
     struct reply_info *r;
     struct timeval tv;
     assert(pd && cb);
@@ -149,3 +182,17 @@ void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int time
         r->next->previous = r;
     pd->replies = r;
 }
+
+int pa_pdispatch_is_pending(struct pa_pdispatch *pd) {
+    assert(pd);
+
+    return !!pd->replies;
+}
+
+void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata) {
+    assert(pd);
+    assert(!cb || pa_pdispatch_is_pending(pd));
+
+    pd->drain_callback = cb;
+    pd->drain_userdata = userdata;
+}
index 7368670..35e9382 100644 (file)
@@ -8,8 +8,10 @@
 
 struct pa_pdispatch;
 
+/* It is safe to destroy the calling pdispatch object from all callbacks */
+
 struct pa_pdispatch_command {
-    int (*proc)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+    void (*proc)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
 };
 
 struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *m, const struct pa_pdispatch_command*table, unsigned entries);
@@ -17,6 +19,10 @@ void pa_pdispatch_free(struct pa_pdispatch *pd);
 
 int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*p, void *userdata);
 
-void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata);
+void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, void (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata);
+
+int pa_pdispatch_is_pending(struct pa_pdispatch *pd);
+
+void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata);
 
 #endif
index c15d5d9..9af8d46 100644 (file)
 #include "socket-client.h"
 #include "pstream-util.h"
 #include "authkey.h"
+#include "util.h"
 
-#define DEFAULT_QUEUE_LENGTH 10240
-#define DEFAULT_MAX_LENGTH 20480
+#define DEFAULT_MAXLENGTH 20480
+#define DEFAULT_TLENGTH 10240
 #define DEFAULT_PREBUF 4096
+#define DEFAULT_MINREQ 1024
+
 #define DEFAULT_TIMEOUT (5*60)
 #define DEFAULT_SERVER "/tmp/polypaudio/native"
 
@@ -28,25 +31,40 @@ struct pa_context {
     struct pa_stream *first_stream;
     uint32_t ctag;
     uint32_t error;
-    enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_AUTHORIZING, CONTEXT_SETTING_NAME, CONTEXT_READY, CONTEXT_DEAD} state;
+    enum {
+        CONTEXT_UNCONNECTED,
+        CONTEXT_CONNECTING,
+        CONTEXT_AUTHORIZING,
+        CONTEXT_SETTING_NAME,
+        CONTEXT_READY,
+        CONTEXT_DEAD
+    } state;
 
     void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
     void *connect_complete_userdata;
 
+    void (*drain_complete_callback)(struct pa_context*c, void *userdata);
+    void *drain_complete_userdata;
+    
     void (*die_callback)(struct pa_context*c, void *userdata);
     void *die_userdata;
-
+    
     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
 };
 
 struct pa_stream {
     struct pa_context *context;
     struct pa_stream *next, *previous;
+
+    char *name;
+    struct pa_buffer_attr buffer_attr;
+    struct pa_sample_spec sample_spec;
     uint32_t device_index;
     uint32_t channel;
     int channel_valid;
     enum pa_stream_direction direction;
-    enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
+    
+    enum { STREAM_LOOKING_UP, STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
     uint32_t requested_bytes;
 
     void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
@@ -62,7 +80,7 @@ struct pa_stream {
     void *die_userdata;
 };
 
-static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
 
 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_ERROR] = { NULL },
@@ -76,8 +94,9 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
 };
 
 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
-    assert(mainloop && name);
     struct pa_context *c;
+    assert(mainloop && name);
+    
     c = malloc(sizeof(struct pa_context));
     assert(c);
     c->name = strdup(name);
@@ -95,9 +114,13 @@ struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *
     c->connect_complete_callback = NULL;
     c->connect_complete_userdata = NULL;
 
+    c->drain_complete_callback = NULL;
+    c->drain_complete_userdata = NULL;
+
     c->die_callback = NULL;
     c->die_userdata = NULL;
-    
+
+    pa_check_for_sigpipe();
     return c;
 }
 
@@ -121,84 +144,105 @@ void pa_context_free(struct pa_context *c) {
 }
 
 static void stream_dead(struct pa_stream *s) {
+    assert(s);
+    
     if (s->state == STREAM_DEAD)
         return;
-
-    s->state = STREAM_DEAD;
-    if (s->die_callback)
-        s->die_callback(s, s->die_userdata);
+    
+    if (s->state == STREAM_READY) {
+        s->state = STREAM_DEAD;
+        if (s->die_callback)
+            s->die_callback(s, s->die_userdata);
+    } else
+        s->state = STREAM_DEAD;
 }
 
 static void context_dead(struct pa_context *c) {
     struct pa_stream *s;
     assert(c);
     
-    for (s = c->first_stream; s; s = s->next)
-        stream_dead(s);
-
     if (c->state == CONTEXT_DEAD)
         return;
+
+    if (c->pdispatch)
+        pa_pdispatch_free(c->pdispatch);
+    c->pdispatch = NULL;
+    
+    if (c->pstream)
+        pa_pstream_free(c->pstream);
+    c->pstream = NULL;
+    
+    if (c->client)
+        pa_socket_client_free(c->client);
+    c->client = NULL;
     
-    c->state = CONTEXT_DEAD;
-    if (c->die_callback)
-        c->die_callback(c, c->die_userdata);
+    for (s = c->first_stream; s; s = s->next)
+        stream_dead(s);
+
+    if (c->state == CONTEXT_READY) {
+        c->state = CONTEXT_DEAD;
+        if (c->die_callback)
+            c->die_callback(c, c->die_userdata);
+    } else
+        s->state = CONTEXT_DEAD;
 }
 
 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
     struct pa_context *c = userdata;
     assert(p && c);
-
-    assert(c->state != CONTEXT_DEAD);
-    
-    c->state = CONTEXT_DEAD;
-
     context_dead(c);
 }
 
-static int pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
+static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
     struct pa_context *c = userdata;
     assert(p && packet && c);
 
     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
         fprintf(stderr, "polyp.c: invalid packet.\n");
         context_dead(c);
-        return -1;
     }
-
-
-    return 0;
 }
 
-static int pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
     struct pa_context *c = userdata;
     struct pa_stream *s;
     assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
 
     if (!(s = pa_dynarray_get(c->streams, channel)))
-        return 0;
+        return;
 
     if (s->read_callback)
         s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
+}
+
+static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) {
+    assert(c && t);
     
-    return 0;
+    if (command == PA_COMMAND_ERROR) {
+        if (pa_tagstruct_getu32(t, &c->error) < 0) {
+            c->error = PA_ERROR_PROTOCOL;
+            return -1;
+        }
+
+        return 0;
+    }
+
+    c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL;
+    return -1;
 }
 
-static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct pa_context *c = userdata;
     assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME));
 
     if (command != PA_COMMAND_REPLY) {
-        if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &c->error) < 0)
-            c->error = PA_ERROR_PROTOCOL;
-        else if (command == PA_COMMAND_TIMEOUT)
-            c->error = PA_ERROR_TIMEOUT;
-
-        c->state = CONTEXT_DEAD;
+        handle_error(c, command, t);
+        context_dead(c);
 
         if (c->connect_complete_callback)
             c->connect_complete_callback(c, 0, c->connect_complete_userdata);
         
-        return -1;
+        return;
     }
 
     if (c->state == CONTEXT_AUTHORIZING) {
@@ -210,7 +254,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin
         pa_tagstruct_putu32(t, tag = c->ctag++);
         pa_tagstruct_puts(t, c->name);
         pa_pstream_send_tagstruct(c->pstream, t);
-        pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c);
+        pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
     } else {
         assert(c->state == CONTEXT_SETTING_NAME);
         
@@ -220,7 +264,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin
             c->connect_complete_callback(c, 1, c->connect_complete_userdata);
     }
 
-    return 0;
+    return;
 }
 
 static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) {
@@ -234,7 +278,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i
 
     if (!io) {
         c->error = PA_ERROR_CONNECTIONREFUSED;
-        c->state = CONTEXT_UNCONNECTED;
+        context_dead(c);
 
         if (c->connect_complete_callback)
             c->connect_complete_callback(c, 0, c->connect_complete_userdata);
@@ -257,7 +301,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i
     pa_tagstruct_putu32(t, tag = c->ctag++);
     pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie));
     pa_pstream_send_tagstruct(c->pstream, t);
-    pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c);
+    pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
     c->state = CONTEXT_AUTHORIZING;
 }
 
@@ -305,7 +349,7 @@ void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_cont
     c->die_userdata = userdata;
 }
 
-static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct pa_stream *s;
     struct pa_context *c = userdata;
     uint32_t bytes, channel;
@@ -315,63 +359,122 @@ static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t t
         pa_tagstruct_getu32(t, &bytes) < 0 ||
         !pa_tagstruct_eof(t)) {
         c->error = PA_ERROR_PROTOCOL;
-        return -1;
+        context_dead(c);
+        return;
     }
     
-    if (!(s = pa_dynarray_get(c->streams, channel))) {
-        c->error = PA_ERROR_PROTOCOL;
-        return -1;
-    }
+    if (!(s = pa_dynarray_get(c->streams, channel)))
+        return;
 
-    /*fprintf(stderr, "Requested %u bytes\n", bytes);*/
+    if (s->state != STREAM_READY)
+        return;
     
     s->requested_bytes += bytes;
     
     if (s->requested_bytes && s->write_callback)
         s->write_callback(s, s->requested_bytes, s->write_userdata);
-
-    return 0;
 }
 
-static int create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
-    int ret = 0;
+static void create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct pa_stream *s = userdata;
     assert(pd && s && s->state == STREAM_CREATING);
 
     if (command != PA_COMMAND_REPLY) {
-        struct pa_context *c = s->context;
-        assert(c);
+        if (handle_error(s->context, command, t) < 0) {
+            context_dead(s->context);
+            return;
+        }
 
-        if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &s->context->error) < 0)
-            s->context->error = PA_ERROR_PROTOCOL;
-        else if (command == PA_COMMAND_TIMEOUT)
-            s->context->error = PA_ERROR_TIMEOUT;
-        
-        ret = -1;
-        goto fail;
+        stream_dead(s);
+        if (s->create_complete_callback)
+            s->create_complete_callback(s, 0, s->create_complete_userdata);
+
+        return;
     }
 
     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
         pa_tagstruct_getu32(t, &s->device_index) < 0 ||
         !pa_tagstruct_eof(t)) {
         s->context->error = PA_ERROR_PROTOCOL;
-        ret = -1;
-        goto fail;
+        context_dead(s->context);
+        return;
     }
 
     s->channel_valid = 1;
     pa_dynarray_put(s->context->streams, s->channel, s);
     
     s->state = STREAM_READY;
-    assert(s->create_complete_callback);
-    s->create_complete_callback(s, 1, s->create_complete_userdata);
-    return 0;
+    if (s->create_complete_callback)
+        s->create_complete_callback(s, 1, s->create_complete_userdata);
+}
+
+static void create_stream(struct pa_stream *s, uint32_t tdev_index) {
+    struct pa_tagstruct *t;
+    uint32_t tag;
+    assert(s);
 
-fail:
-    assert(s->create_complete_callback);
-    s->create_complete_callback(s, 0, s->create_complete_userdata);
-    pa_stream_free(s);
-    return ret;
+    s->state = STREAM_CREATING;
+    
+    t = pa_tagstruct_new(NULL, 0);
+    assert(t);
+    
+    pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
+    pa_tagstruct_putu32(t, tag = s->context->ctag++);
+    pa_tagstruct_puts(t, s->name);
+    pa_tagstruct_put_sample_spec(t, &s->sample_spec);
+    pa_tagstruct_putu32(t, tdev_index);
+    pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
+    pa_tagstruct_putu32(t, s->buffer_attr.tlength);
+    pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
+    pa_tagstruct_putu32(t, s->buffer_attr.minreq);
+
+    pa_pstream_send_tagstruct(s->context->pstream, t);
+    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
+}
+
+static void lookup_device_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+    struct pa_stream *s = userdata;
+    uint32_t tdev;
+    assert(pd && s && s->state == STREAM_LOOKING_UP);
+
+    if (command != PA_COMMAND_REPLY) {
+        if (handle_error(s->context, command, t) < 0) {
+            context_dead(s->context);
+            return;
+        }
+
+        stream_dead(s);
+        if (s->create_complete_callback)
+            s->create_complete_callback(s, 0, s->create_complete_userdata);
+        return;
+    }
+
+    if (pa_tagstruct_getu32(t, &tdev) < 0 ||
+        !pa_tagstruct_eof(t)) {
+        s->context->error = PA_ERROR_PROTOCOL;
+        context_dead(s->context);
+        return;
+    }
+    
+    create_stream(s, tdev);
+}
+
+static void lookup_device(struct pa_stream *s, const char *tdev) {
+    struct pa_tagstruct *t;
+    uint32_t tag;
+    assert(s);
+    
+    s->state = STREAM_LOOKING_UP;
+
+    t = pa_tagstruct_new(NULL, 0);
+    assert(t);
+
+    pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_LOOKUP_SINK : PA_COMMAND_LOOKUP_SOURCE);
+    pa_tagstruct_putu32(t, tag = s->context->ctag++);
+    pa_tagstruct_puts(t, tdev);
+
+    pa_pstream_send_tagstruct(s->context->pstream, t);
+    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, lookup_device_callback, s);
 }
 
 struct pa_stream* pa_stream_new(
@@ -385,10 +488,8 @@ struct pa_stream* pa_stream_new(
     void *userdata) {
     
     struct pa_stream *s;
-    struct pa_tagstruct *t;
-    uint32_t tag;
 
-    assert(c && name && ss && c->state == CONTEXT_READY && complete);
+    assert(c && name && ss && c->state == CONTEXT_READY);
     
     s = malloc(sizeof(struct pa_stream));
     assert(s);
@@ -403,42 +504,43 @@ struct pa_stream* pa_stream_new(
     s->create_complete_callback = complete;
     s->create_complete_userdata = NULL;
 
+    s->name = strdup(name);
     s->state = STREAM_CREATING;
     s->requested_bytes = 0;
     s->channel = 0;
     s->channel_valid = 0;
     s->device_index = (uint32_t) -1;
     s->direction = dir;
+    s->sample_spec = *ss;
+    if (attr)
+        s->buffer_attr = *attr;
+    else {
+        s->buffer_attr.maxlength = DEFAULT_MAXLENGTH;
+        s->buffer_attr.tlength = DEFAULT_TLENGTH;
+        s->buffer_attr.prebuf = DEFAULT_PREBUF;
+        s->buffer_attr.minreq = DEFAULT_MINREQ;
+    }
 
-    t = pa_tagstruct_new(NULL, 0);
-    assert(t);
-    
-    pa_tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
-    pa_tagstruct_putu32(t, tag = c->ctag++);
-    pa_tagstruct_puts(t, name);
-    pa_tagstruct_put_sample_spec(t, ss);
-    pa_tagstruct_putu32(t, (uint32_t) -1);
-    pa_tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
-    pa_tagstruct_putu32(t, attr ? attr->max_length  : DEFAULT_MAX_LENGTH);
-    pa_tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
-
-    pa_pstream_send_tagstruct(c->pstream, t);
-
-    pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
-    
     s->next = c->first_stream;
     if (s->next)
         s->next->previous = s;
     s->previous = NULL;
     c->first_stream = s;
 
-    return 0;
+    if (dev)
+        lookup_device(s, dev);
+    else
+        create_stream(s, (uint32_t) -1);
+    
+    return s;
 }
 
 void pa_stream_free(struct pa_stream *s) {
     assert(s && s->context);
+
+    free(s->name);
     
-    if (s->channel_valid) {
+    if (s->channel_valid && s->context->state == CONTEXT_READY) {
         struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0);
         assert(t);
     
@@ -469,7 +571,7 @@ void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stre
 
 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
     struct pa_memchunk chunk;
-    assert(s && s->context && data && length);
+    assert(s && s->context && data && length && s->state == STREAM_READY);
 
     chunk.memblock = pa_memblock_new(length);
     assert(chunk.memblock && chunk.memblock->data);
@@ -489,7 +591,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
 }
 
 size_t pa_stream_writable_size(struct pa_stream *s) {
-    assert(s);
+    assert(s && s->state == STREAM_READY);
     return s->requested_bytes;
 }
 
@@ -512,3 +614,72 @@ void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream
     s->die_callback = cb;
     s->die_userdata = userdata;
 }
+
+int pa_context_is_pending(struct pa_context *c) {
+    assert(c);
+
+    if (c->state != CONTEXT_READY)
+        return 0;
+
+    return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch);
+}
+
+struct pa_context* pa_stream_get_context(struct pa_stream *p) {
+    assert(p);
+    return p->context;
+}
+
+static void set_dispatch_callbacks(struct pa_context *c);
+
+static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) {
+    set_dispatch_callbacks(userdata);
+}
+
+static void pstream_drain_callback(struct pa_pstream *s, void *userdata) {
+    set_dispatch_callbacks(userdata);
+}
+
+static void set_dispatch_callbacks(struct pa_context *c) {
+    assert(c && c->state == CONTEXT_READY);
+
+    pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
+    pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
+    
+    if (pa_pdispatch_is_pending(c->pdispatch)) {
+        pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c);
+        return;
+    }
+
+    if (pa_pstream_is_pending(c->pstream)) {
+        pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
+        return;
+    }
+
+    assert(c->drain_complete_callback);
+    c->drain_complete_callback(c, c->drain_complete_userdata);
+}
+
+int pa_context_drain(
+    struct pa_context *c, 
+    void (*complete) (struct pa_context*c, void *userdata),
+    void *userdata) {
+
+    assert(c && c->state == CONTEXT_READY);
+
+    if (complete == NULL) {
+        c->drain_complete_callback = NULL;
+        pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
+        pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
+        return 0;
+    }
+    
+    if (!pa_context_is_pending(c))
+        return -1;
+    
+    c->drain_complete_callback = complete;
+    c->drain_complete_userdata = userdata;
+
+    set_dispatch_callbacks(c);
+
+    return 0;
+}
index 77a6966..25ee3be 100644 (file)
@@ -17,6 +17,11 @@ int pa_context_connect(
     void (*complete) (struct pa_context*c, int success, void *userdata),
     void *userdata);
 
+int pa_context_drain(
+    struct pa_context *c, 
+    void (*complete) (struct pa_context*c, void *userdata),
+    void *userdata);
+
 void pa_context_free(struct pa_context *c);
 
 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata);
@@ -25,6 +30,8 @@ int pa_context_is_dead(struct pa_context *c);
 int pa_context_is_ready(struct pa_context *c);
 int pa_context_errno(struct pa_context *c);
 
+int pa_context_is_pending(struct pa_context *c);
+
 struct pa_stream;
 
 struct pa_stream* pa_stream_new(
@@ -50,4 +57,6 @@ void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_strea
 int pa_stream_is_dead(struct pa_stream *p);
 int pa_stream_is_ready(struct pa_stream*p);
 
+struct pa_context* pa_stream_get_context(struct pa_stream *p);
+
 #endif
index aa6e6bf..80b3cc6 100644 (file)
@@ -9,9 +9,10 @@ enum pa_stream_direction {
 };
 
 struct pa_buffer_attr {
-    uint32_t queue_length;
-    uint32_t max_length;
+    uint32_t maxlength;
+    uint32_t tlength;
     uint32_t prebuf;
+    uint32_t minreq;
 };
 
 
index cd6448f..04006d5 100644 (file)
@@ -17,9 +17,7 @@
 
 #define COOKIE_FILE ".esd_auth"
 
-#define MEMBLOCKQ_LENGTH (10*1204)
-#define MEMBLOCKQ_PREBUF (2*1024)
-#define BUFSIZE (1024)
+#define BUFFER_SECONDS (0.5)
 
 /* This is heavily based on esound's code */
 
@@ -196,6 +194,7 @@ static int esd_proto_stream_play(struct connection *c, const void *data, size_t
     int format, rate;
     struct pa_sink *sink;
     struct pa_sample_spec ss;
+    size_t l;
     assert(length == (sizeof(int)*2+ESD_NAME_MAX));
     
     format = maybe_swap_endian_32(c->swap_byte_order, *(int*)data);
@@ -217,7 +216,9 @@ static int esd_proto_stream_play(struct connection *c, const void *data, size_t
     pa_client_rename(c->client, name);
 
     assert(!c->input_memblockq);
-    c->input_memblockq = pa_memblockq_new(MEMBLOCKQ_LENGTH, pa_sample_size(&ss), MEMBLOCKQ_PREBUF);
+
+    l = (size_t) (pa_bytes_per_second(&ss)*BUFFER_SECONDS); 
+    c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&ss), l/2, l/10);
     assert(c->input_memblockq);
 
     assert(!c->sink_input);
@@ -252,7 +253,7 @@ static int esd_proto_get_latency(struct connection *c, const void *data, size_t
         latency = 0;
     else {
         float usec = pa_sink_get_latency(sink);
-        usec += pa_samples_usec(MEMBLOCKQ_LENGTH-BUFSIZE, &sink->sample_spec);
+        usec += BUFFER_SECONDS*1000000*.9;          /* A better estimation would be a good idea! */
         latency = (int) ((usec*44100)/1000000);
     }
     
@@ -452,16 +453,17 @@ static int do_read(struct connection *c) {
     } else if (c->state == ESD_STREAMING_DATA) {
         struct pa_memchunk chunk;
         ssize_t r;
+        size_t l;
 
         assert(c->input_memblockq);
 
-        if (!pa_memblockq_is_writable(c->input_memblockq, BUFSIZE))
+        if (!(l = pa_memblockq_missing(c->input_memblockq)))
             return 0;
 
-        chunk.memblock = pa_memblock_new(BUFSIZE);
+        chunk.memblock = pa_memblock_new(l);
         assert(chunk.memblock && chunk.memblock->data);
 
-        if ((r = pa_iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) {
+        if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) {
             fprintf(stderr, "protocol-esound.c: read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
             pa_memblock_unref(chunk.memblock);
             return -1;
index 07fc735..7fb9ac4 100644 (file)
@@ -13,6 +13,8 @@ enum {
     PA_COMMAND_REQUEST,
     PA_COMMAND_AUTH,
     PA_COMMAND_SET_NAME,
+    PA_COMMAND_LOOKUP_SINK,
+    PA_COMMAND_LOOKUP_SOURCE,
     PA_COMMAND_MAX
 };
 
index 9463a46..42ff4b5 100644 (file)
@@ -14,6 +14,7 @@
 #include "pdispatch.h"
 #include "pstream-util.h"
 #include "authkey.h"
+#include "namereg.h"
 
 struct connection;
 struct pa_protocol_native;
@@ -28,7 +29,6 @@ struct record_stream {
 struct playback_stream {
     struct connection *connection;
     uint32_t index;
-    size_t qlength;
     struct pa_sink_input *sink_input;
     struct pa_memblockq *memblockq;
     size_t requested_bytes;
@@ -58,11 +58,12 @@ static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
 
 static void request_bytes(struct playback_stream*s);
 
-static int command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
-static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
-static int command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
-static int command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
-static int command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
 
 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_ERROR] = { NULL },
@@ -76,6 +77,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
     [PA_COMMAND_REQUEST] = { NULL },
     [PA_COMMAND_EXIT] = { command_exit },
     [PA_COMMAND_SET_NAME] = { command_set_name },
+    [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
+    [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
 };
 
 /* structure management */
@@ -89,14 +92,17 @@ static void record_stream_free(struct record_stream* r) {
     free(r);
 }
 
-static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {
+static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, struct pa_sample_spec *ss, const char *name,
+                                                   size_t maxlength,
+                                                   size_t tlength,
+                                                   size_t prebuf,
+                                                   size_t minreq) {
     struct playback_stream *s;
-    assert(c && sink && ss && name && qlen && maxlength && prebuf);
+    assert(c && sink && ss && name && maxlength);
 
     s = malloc(sizeof(struct playback_stream));
     assert (s);
     s->connection = c;
-    s->qlength = qlen;
     
     s->sink_input = pa_sink_input_new(sink, name, ss);
     assert(s->sink_input);
@@ -106,7 +112,7 @@ static struct playback_stream* playback_stream_new(struct connection *c, struct
     s->sink_input->get_latency = sink_input_get_latency_cb;
     s->sink_input->userdata = s;
     
-    s->memblockq = pa_memblockq_new(maxlength, pa_sample_size(ss), prebuf);
+    s->memblockq = pa_memblockq_new(maxlength, tlength, pa_sample_size(ss), prebuf, minreq);
     assert(s->memblockq);
 
     s->requested_bytes = 0;
@@ -149,13 +155,17 @@ static void request_bytes(struct playback_stream *s) {
     size_t l;
     assert(s);
 
-    if (!(l = pa_memblockq_missing_to(s->memblockq, s->qlength)))
+    if (!(l = pa_memblockq_missing(s->memblockq)))
         return;
 
     if (l <= s->requested_bytes)
         return;
 
     l -= s->requested_bytes;
+
+    if (l < pa_memblockq_get_minreq(s->memblockq))
+        return;
+    
     s->requested_bytes += l;
 
     t = pa_tagstruct_new(NULL, 0);
@@ -166,7 +176,7 @@ static void request_bytes(struct playback_stream *s) {
     pa_tagstruct_putu32(t, l);
     pa_pstream_send_tagstruct(s->connection->pstream, t);
 
-/*    fprintf(stderr, "Requesting %u bytes\n", l);*/
+    /*fprintf(stderr, "Requesting %u bytes\n", l);*/
 }
 
 /*** sinkinput callbacks ***/
@@ -209,10 +219,15 @@ static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) {
 
 /*** pdispatch callbacks ***/
 
-static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void protocol_error(struct connection *c) {
+    fprintf(stderr, __FILE__": protocol error, kicking client\n");
+    connection_free(c);
+}
+
+static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     struct playback_stream *s;
-    size_t maxlength, prebuf, qlength;
+    size_t maxlength, tlength, prebuf, minreq;
     uint32_t sink_index;
     const char *name;
     struct pa_sample_spec ss;
@@ -223,15 +238,18 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm
     if (pa_tagstruct_gets(t, &name) < 0 ||
         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
         pa_tagstruct_getu32(t, &sink_index) < 0 ||
-        pa_tagstruct_getu32(t, &qlength) < 0 ||
         pa_tagstruct_getu32(t, &maxlength) < 0 ||
+        pa_tagstruct_getu32(t, &tlength) < 0 ||
         pa_tagstruct_getu32(t, &prebuf) < 0 ||
-        !pa_tagstruct_eof(t))
-        return -1;
+        pa_tagstruct_getu32(t, &minreq) < 0 ||
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
 
     if (!c->authorized) {
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
-        return 0;
+        return;
     }
 
     if (sink_index == (uint32_t) -1)
@@ -241,12 +259,12 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm
 
     if (!sink) {
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
-        return 0;
+        return;
     }
     
-    if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) {
+    if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) {
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
-        return 0;
+        return;
     }
     
     reply = pa_tagstruct_new(NULL, 0);
@@ -258,107 +276,148 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm
     pa_tagstruct_putu32(reply, s->sink_input->index);
     pa_pstream_send_tagstruct(c->pstream, reply);
     request_bytes(s);
-    return 0;
 }
 
-static int command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     uint32_t channel;
     struct playback_stream *s;
     assert(c && t);
     
     if (pa_tagstruct_getu32(t, &channel) < 0 ||
-        !pa_tagstruct_eof(t))
-        return -1;
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
 
     if (!c->authorized) {
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
-        return 0;
+        return;
     }
     
     if (!(s = pa_idxset_get_by_index(c->playback_streams, channel))) {
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
-        return 0;
+        return;
     }
 
     pa_pstream_send_simple_ack(c->pstream, tag);
-    return 0;
 }
 
-static int command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     assert(c && t);
     
-    if (!pa_tagstruct_eof(t))
-        return -1;
+    if (!pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
 
     if (!c->authorized) {
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
-        return 0;
+        return;
     }
     
     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
-    return 0;
+    return;
 }
 
-static int command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     const void*cookie;
     assert(c && t);
 
     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
-        !pa_tagstruct_eof(t))
-        return -1;
-
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
+        
     if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
         fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n");
         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
-        return 0;
+        return;
     }
 
     c->authorized = 1;
     pa_pstream_send_simple_ack(c->pstream, tag);
-    return 0;
+    return;
 }
 
-static int command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     const char *name;
     assert(c && t);
 
     if (pa_tagstruct_gets(t, &name) < 0 ||
-        !pa_tagstruct_eof(t))
-        return -1;
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
 
     pa_client_rename(c->client, name);
     pa_pstream_send_simple_ack(c->pstream, tag);
-    return 0;
+    return;
+}
+
+static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+    struct connection *c = userdata;
+    const char *name;
+    uint32_t index = PA_IDXSET_INVALID;
+    assert(c && t);
+
+    if (pa_tagstruct_gets(t, &name) < 0 ||
+        !pa_tagstruct_eof(t)) {
+        protocol_error(c);
+        return;
+    }
+
+    if (command == PA_COMMAND_LOOKUP_SINK) {
+        struct pa_sink *sink;
+        if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
+            index = sink->index;
+    } else {
+        struct pa_source *source;
+        assert(command == PA_COMMAND_LOOKUP_SOURCE);
+        if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
+            index = source->index;
+    }
+
+    if (index == PA_IDXSET_INVALID)
+        pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+    else {
+        struct pa_tagstruct *reply;
+        reply = pa_tagstruct_new(NULL, 0);
+        assert(reply);
+        pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
+        pa_tagstruct_putu32(reply, tag);
+        pa_tagstruct_putu32(reply, index);
+        pa_pstream_send_tagstruct(c->pstream, reply);
+    }
 }
 
 /*** pstream callbacks ***/
 
-static int packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
+static void packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
     struct connection *c = userdata;
     assert(p && packet && packet->data && c);
 
     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
         fprintf(stderr, "protocol-native: invalid packet.\n");
-        return -1;
+        connection_free(c);
     }
-    
-    return 0;
 }
 
-static int memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
+static void memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
     struct connection *c = userdata;
     struct playback_stream *stream;
     assert(p && chunk && userdata);
 
     if (!(stream = pa_idxset_get_by_index(c->playback_streams, channel))) {
         fprintf(stderr, "protocol-native: client sent block for invalid stream.\n");
-        return -1;
+        connection_free(c);
+        return;
     }
 
     if (chunk->length >= stream->requested_bytes)
@@ -371,8 +430,6 @@ static int memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t del
     pa_sink_notify(stream->sink_input->sink);
 
     /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
-
-    return 0;
 }
 
 static void die_callback(struct pa_pstream *p, void *userdata) {
index 380b180..91eab59 100644 (file)
@@ -28,7 +28,8 @@ struct pa_protocol_simple {
     struct pa_sample_spec sample_spec;
 };
 
-#define BUFSIZE PIPE_BUF
+#define PLAYBACK_BUFFER_SECONDS (.5)
+#define RECORD_BUFFER_SECONDS (5)
 
 static void connection_free(struct connection *c) {
     assert(c);
@@ -52,17 +53,18 @@ static void connection_free(struct connection *c) {
 static int do_read(struct connection *c) {
     struct pa_memchunk chunk;
     ssize_t r;
+    size_t l;
 
     if (!pa_iochannel_is_readable(c->io))
         return 0;
     
-    if (!c->sink_input || !pa_memblockq_is_writable(c->input_memblockq, BUFSIZE))
+    if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
         return 0;
-    
-    chunk.memblock = pa_memblock_new(BUFSIZE);
+
+    chunk.memblock = pa_memblock_new(l);
     assert(chunk.memblock);
 
-    if ((r = pa_iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) {
+    if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) {
         fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno));
         pa_memblock_unref(chunk.memblock);
         return -1;
@@ -213,8 +215,8 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo
         c->source_output->kill = source_output_kill_cb;
         c->source_output->userdata = c;
 
-        l = 5*pa_bytes_per_second(&p->sample_spec); /* 5s */
-        c->output_memblockq = pa_memblockq_new(l, pa_sample_size(&p->sample_spec), l/2);
+        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
+        c->output_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, 0);
     }
 
     if (p->mode & PA_PROTOCOL_SIMPLE_PLAYBACK) {
@@ -234,8 +236,8 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo
         c->sink_input->get_latency = sink_input_get_latency_cb;
         c->sink_input->userdata = c;
 
-        l = pa_bytes_per_second(&p->sample_spec)/2; /* half a second */
-        c->input_memblockq = pa_memblockq_new(l, pa_sample_size(&p->sample_spec), l/2);
+        l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
+        c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, l/10);
     }
 
 
index 1739780..19f83e3 100644 (file)
@@ -35,6 +35,8 @@ struct pa_pstream {
     struct pa_iochannel *io;
     struct pa_queue *send_queue;
 
+    int in_use, shall_free;
+    
     int dead;
     void (*die_callback) (struct pa_pstream *p, void *userdad);
     void *die_callback_userdata;
@@ -46,9 +48,6 @@ struct pa_pstream {
         size_t index;
     } write;
 
-    void (*send_callback) (struct pa_pstream *p, void *userdata);
-    void *send_callback_userdata;
-
     struct {
         struct pa_memblock *memblock;
         struct pa_packet *packet;
@@ -57,34 +56,51 @@ struct pa_pstream {
         size_t index;
     } read;
 
-    int (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
+    void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
     void *recieve_packet_callback_userdata;
 
-    int (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata);
+    void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata);
     void *recieve_memblock_callback_userdata;
+
+    void (*drain_callback)(struct pa_pstream *p, void *userdata);
+    void *drain_userdata;
 };
 
 static void do_write(struct pa_pstream *p);
 static void do_read(struct pa_pstream *p);
 
-static void io_callback(struct pa_iochannel*io, void *userdata) {
-    struct pa_pstream *p = userdata;
-    assert(p && p->io == io);
-
+static void do_something(struct pa_pstream *p) {
+    assert(p && !p->shall_free);
     p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
-    
+
+    p->in_use = 1;
     do_write(p);
+    p->in_use = 0;
+
+    if (p->shall_free) {
+        pa_pstream_free(p);
+        return;
+    }
+    
+    p->in_use = 1;
     do_read(p);
+    p->in_use = 0;
+    if (p->shall_free) {
+        pa_pstream_free(p);
+        return;
+    }
+}
+
+static void io_callback(struct pa_iochannel*io, void *userdata) {
+    struct pa_pstream *p = userdata;
+    assert(p && p->io == io);
+    do_something(p);
 }
 
 static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) {
     struct pa_pstream *p = userdata;
     assert(p && p->mainloop_source == id && p->mainloop == m);
-
-    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
-    
-    do_write(p);
-    do_read(p);
+    do_something(p);
 }
 
 struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io) {
@@ -115,15 +131,17 @@ struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel
     p->read.packet = NULL;
     p->read.index = 0;
 
-    p->send_callback = NULL;
-    p->send_callback_userdata = NULL;
-
     p->recieve_packet_callback = NULL;
     p->recieve_packet_callback_userdata = NULL;
     
     p->recieve_memblock_callback = NULL;
     p->recieve_memblock_callback_userdata = NULL;
 
+    p->drain_callback = NULL;
+    p->drain_userdata = NULL;
+
+    p->in_use = p->shall_free = 0;
+
     return p;
 }
 
@@ -146,6 +164,12 @@ static void item_free(void *item, void *p) {
 void pa_pstream_free(struct pa_pstream *p) {
     assert(p);
 
+    if (p->in_use) {
+        /* If this pstream object is used by someone else on the call stack, we have to postpone the freeing */
+        p->dead = p->shall_free = 1;
+        return;
+    }
+
     pa_iochannel_free(p->io);
     pa_queue_free(p->send_queue, item_free, NULL);
 
@@ -162,13 +186,6 @@ void pa_pstream_free(struct pa_pstream *p) {
     free(p);
 }
 
-void pa_pstream_set_send_callback(struct pa_pstream*p, void (*callback) (struct pa_pstream *p, void *userdata), void *userdata) {
-    assert(p && callback);
-
-    p->send_callback = callback;
-    p->send_callback_userdata = userdata;
-}
-
 void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
     struct item_info *i;
     assert(p && packet);
@@ -199,14 +216,14 @@ void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t del
     p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
 }
 
-void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) {
+void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) {
     assert(p && callback);
 
     p->recieve_packet_callback = callback;
     p->recieve_packet_callback_userdata = userdata;
 }
 
-void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata) {
+void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata) {
     assert(p && callback);
 
     p->recieve_memblock_callback = callback;
@@ -261,7 +278,7 @@ static void do_write(struct pa_pstream *p) {
         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
     }
 
-    if ((r = pa_iochannel_write(p->io, d, l)) < 0) 
+    if ((r = pa_iochannel_write(p->io, d, l)) < 0)
         goto die;
 
     p->write.index += r;
@@ -271,8 +288,8 @@ static void do_write(struct pa_pstream *p) {
         item_free(p->write.current, (void *) 1);
         p->write.current = NULL;
 
-        if (p->send_callback && pa_queue_is_empty(p->send_queue))
-            p->send_callback(p, p->send_callback_userdata);
+        if (p->drain_callback && !pa_pstream_is_pending(p))
+            p->drain_callback(p, p->drain_userdata);
     }
 
     return;
@@ -341,13 +358,14 @@ static void do_read(struct pa_pstream *p) {
                 chunk.memblock = p->read.memblock;
                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
                 chunk.length = l;
-                
-                if (p->recieve_memblock_callback(p,
-                                                 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
-                                                 (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
-                                                 &chunk,
-                                                 p->recieve_memblock_callback_userdata) < 0)
-                    goto die;
+
+                if (p->recieve_memblock_callback)
+                    p->recieve_memblock_callback(
+                        p,
+                        ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+                        (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
+                        &chunk,
+                        p->recieve_memblock_callback_userdata);
             }
         }
 
@@ -359,17 +377,13 @@ static void do_read(struct pa_pstream *p) {
                 pa_memblock_unref(p->read.memblock);
                 p->read.memblock = NULL;
             } else {
-                int r = 0;
                 assert(p->read.packet);
-
+                
                 if (p->recieve_packet_callback)
-                    r = p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
+                    p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
 
                 pa_packet_unref(p->read.packet);
                 p->read.packet = NULL;
-
-                if (r < 0)
-                    goto die;
             }
 
             p->read.index = 0;
@@ -390,3 +404,21 @@ void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct p
     p->die_callback = callback;
     p->die_callback_userdata = userdata;
 }
+
+int pa_pstream_is_pending(struct pa_pstream *p) {
+    assert(p);
+
+    if (p->dead)
+        return 0;
+
+    return p->write.current || !pa_queue_is_empty(p->send_queue);
+}
+
+void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata) {
+    assert(p);
+    assert(!cb || pa_pstream_is_pending(p));
+
+    p->drain_callback = cb;
+    p->drain_userdata = userdata;
+}
+
index 0f5975d..011b8d1 100644 (file)
@@ -9,18 +9,22 @@
 #include "mainloop-api.h"
 #include "memchunk.h"
 
+/* It is safe to destroy the calling pstream object from all callbacks */
+
 struct pa_pstream;
 
 struct pa_pstream* pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io);
 void pa_pstream_free(struct pa_pstream*p);
 
-void pa_pstream_set_send_callback(struct pa_pstream*p, void (*callback) (struct pa_pstream *p, void *userdata), void *userdata);
 void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet);
 void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk);
 
-void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata);
-void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata);
+void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata);
+void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata);
+void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata);
 
 void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata);
 
+int pa_pstream_is_pending(struct pa_pstream *p);
+
 #endif
index 4131992..df12924 100644 (file)
@@ -32,6 +32,8 @@ uint32_t pa_samples_usec(size_t length, const struct pa_sample_spec *spec);
 int pa_sample_spec_valid(const struct pa_sample_spec *spec);
 int pa_sample_spec_equal(const struct pa_sample_spec*a, const struct pa_sample_spec*b);
 
+
+#define PA_SAMPLE_SNPRINT_MAX_LENGTH 32
 void pa_sample_snprint(char *s, size_t l, const struct pa_sample_spec *spec);
 
 #endif
index c1d1e96..cf31ac5 100644 (file)
@@ -14,6 +14,27 @@ struct pa_simple {
     int dead;
 };
 
+static int iterate(struct pa_simple *p, int block, int *perror) {
+    assert(p && p->context && p->mainloop && perror);
+
+    if (!block && !pa_context_is_pending(p->context))
+        return 0;
+    
+    do {
+        if (pa_context_is_dead(p->context) || (p->stream && pa_stream_is_dead(p->stream))) {
+            *perror = pa_context_errno(p->context);
+            return -1;
+        }
+        
+        if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) {
+            *perror = PA_ERROR_INTERNAL;
+            return -1;
+        }
+    } while (pa_context_is_pending(p->context));
+
+    return 0;
+}
+
 struct pa_simple* pa_simple_new(
     const char *server,
     const char *name,
@@ -44,26 +65,18 @@ struct pa_simple* pa_simple_new(
         goto fail;
     }
 
+    /* Wait until the context is ready */
     while (!pa_context_is_ready(p->context)) {
-        if (pa_context_is_dead(p->context)) {
-            error = pa_context_errno(p->context);
-            goto fail;
-        }
-        
-        if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0)
+        if (iterate(p, 1, &error) < 0)
             goto fail;
     }
 
     if (!(p->stream = pa_stream_new(p->context, dir, dev, stream_name, ss, attr, NULL, NULL)))
         goto fail;
 
+    /* Wait until the stream is ready */
     while (!pa_stream_is_ready(p->stream)) {
-        if (pa_stream_is_dead(p->stream)) {
-            error = pa_context_errno(p->context);
-            goto fail;
-        }
-
-        if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0)
+        if (iterate(p, 1, &error) < 0)
             goto fail;
     }
 
@@ -96,17 +109,9 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
     while (length > 0) {
         size_t l;
         
-        while (!(l = pa_stream_writable_size(p->stream))) {
-            if (pa_context_is_dead(p->context)) {
-                *perror = pa_context_errno(p->context);
+        while (!(l = pa_stream_writable_size(p->stream)))
+            if (iterate(p, 1, perror) < 0)
                 return -1;
-            }
-            
-            if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) {
-                *perror = PA_ERROR_INTERNAL;
-                return -1;
-            }
-        }
 
         if (l > length)
             l = length;
@@ -116,9 +121,14 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
         length -= l;
     }
 
+    /* Make sure that no data is pending for write */
+    if (iterate(p, 0, perror) < 0)
+        return -1;
+
     return 0;
 }
 
 int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *perror) {
     assert(0);
 }
+
index 79bf177..4852edc 100644 (file)
@@ -274,8 +274,18 @@ char *pa_sink_list_to_string(struct pa_core *c) {
     default_sink = pa_sink_get_default(c);
     
     for (sink = pa_idxset_first(c->sinks, &index); sink; sink = pa_idxset_next(c->sinks, &index)) {
+        char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
+        pa_sample_snprint(ss, sizeof(ss), &sink->sample_spec);
         assert(sink->monitor_source);
-        pa_strbuf_printf(s, "  %c index: %u, name: <%s>, volume: <0x%04x>, latency: <%u usec>, monitor_source: <%u>\n", sink == default_sink ? '*' : ' ', sink->index, sink->name, (unsigned) sink->volume, pa_sink_get_latency(sink), sink->monitor_source->index);
+        pa_strbuf_printf(
+            s,
+            "  %c index: %u\n\tname: <%s>\n\tvolume: <0x%04x>\n\tlatency: <%u usec>\n\tmonitor_source: <%u>\n\tsample_spec: <%s>\n",
+            sink == default_sink ? '*' : ' ',
+            sink->index, sink->name,
+            (unsigned) sink->volume,
+            pa_sink_get_latency(sink),
+            sink->monitor_source->index,
+            ss);
     }
     
     return pa_strbuf_tostring_free(s);
index aa6f332..20ab25e 100644 (file)
@@ -85,13 +85,18 @@ char *pa_sink_input_list_to_string(struct pa_core *c) {
     pa_strbuf_printf(s, "%u sink input(s) available.\n", pa_idxset_ncontents(c->sink_inputs));
 
     for (i = pa_idxset_first(c->sink_inputs, &index); i; i = pa_idxset_next(c->sink_inputs, &index)) {
+        char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
+        pa_sample_snprint(ss, sizeof(ss), &i->sample_spec);
         assert(i->sink);
-        pa_strbuf_printf(s, "    index: %u, name: <%s>, sink: <%u>; volume: <0x%04x>, latency: <%u usec>\n",
-                      i->index,
-                      i->name,
-                      i->sink->index,
-                      (unsigned) i->volume,
-                      pa_sink_input_get_latency(i));
+        pa_strbuf_printf(
+            s,
+            "    index: %u\n\tname: <%s>\n\tsink: <%u>\n\tvolume: <0x%04x>\n\tlatency: <%u usec>\n\tsample_spec: <%s>\n",
+            i->index,
+            i->name,
+            i->sink->index,
+            (unsigned) i->volume,
+            pa_sink_input_get_latency(i),
+            ss);
     }
     
     return pa_strbuf_tostring_free(s);
index 76126ae..046cc3a 100644 (file)
@@ -5,6 +5,8 @@
 #include "mainloop-api.h"
 #include "iochannel.h"
 
+/* It is safe to destroy the calling socket_client object from the callback */
+
 struct pa_socket_client;
 
 struct pa_socket_client* pa_socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
index d581fa3..dbce172 100644 (file)
@@ -5,6 +5,8 @@
 #include "mainloop-api.h"
 #include "iochannel.h"
 
+/* It is safe to destroy the calling socket_server object from the callback */
+
 struct pa_socket_server;
 
 struct pa_socket_server* pa_socket_server_new(struct pa_mainloop_api *m, int fd);
index 1c97604..44d7da0 100644 (file)
@@ -111,10 +111,12 @@ char *pa_source_list_to_string(struct pa_core *c) {
     default_source = pa_source_get_default(c);
     
     for (source = pa_idxset_first(c->sources, &index); source; source = pa_idxset_next(c->sources, &index)) {
+        char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
         char mo[256] = "";
         if (source->monitor_of) 
-            snprintf(mo, sizeof(mo), ", monitor_of: <%u>", source->monitor_of->index);
-        pa_strbuf_printf(s, "  %c index: %u, name: <%s>%s\n", source == default_source ? '*' : ' ', source->index, source->name, mo);
+            snprintf(mo, sizeof(mo), "\n\tmonitor_of: <%u>", source->monitor_of->index);
+        pa_sample_snprint(ss, sizeof(ss), &source->sample_spec);
+        pa_strbuf_printf(s, "  %c index: %u\n\tname: <%s>\n\tsample_spec: <%s>%s\n", source == default_source ? '*' : ' ', source->index, source->name, ss, mo);
     }
     
     return pa_strbuf_tostring_free(s);
index 4f9f821..388f122 100644 (file)
@@ -68,11 +68,16 @@ char *pa_source_output_list_to_string(struct pa_core *c) {
     pa_strbuf_printf(s, "%u source outputs(s) available.\n", pa_idxset_ncontents(c->source_outputs));
 
     for (o = pa_idxset_first(c->source_outputs, &index); o; o = pa_idxset_next(c->source_outputs, &index)) {
+        char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
+        pa_sample_snprint(ss, sizeof(ss), &o->sample_spec);
         assert(o->source);
-        pa_strbuf_printf(s, "  %c index: %u, name: <%s>, source: <%u>\n",
-                         o->index,
-                         o->name,
-                         o->source->index);
+        pa_strbuf_printf(
+            s, "  %c index: %u\n\tname: <%s>\n\tsource: <%u>\n\tsample_spec: <%u>\n",
+            o->index,
+            o->name,
+            o->source->index,
+            ss,
+            ss);
     }
     
     return pa_strbuf_tostring_free(s);
index 5b0d893..57040cd 100644 (file)
--- a/src/todo
+++ b/src/todo
@@ -1,7 +1,7 @@
 - recording (general, simple, esound, native)
 - native library/protocol:
        sync() function
-       more functions
+       more functions (esp. latency)
 - simple library
 
 - config parser/cmdline
@@ -9,18 +9,20 @@
 - move more stuff from module-oss[-dma] to liboss-util
 - in module-oss: create source first, than sink
 
+- client field for sinkinput/sourceoutput
+
+- xmms+esound latency testing
+
 - rename files
 - svn-id and license in every file
 - documentation
 
-
-
 -- post 0.1
 - future cancellation
 - client-ui
 - clip cache
 - autoloading/autounloading
-- ldap/rendezvous
+- slp/rendezvous
 - doxygen
 
 drivers:
index 4ade681..3111bd5 100644 (file)
@@ -1,3 +1,4 @@
+#include <signal.h>
 #include <errno.h>
 #include <assert.h>
 #include <string.h>
@@ -51,7 +52,7 @@ void pa_peer_to_string(char *c, size_t l, int fd) {
                          ntohs(sa.in.sin_port));
                 return;
             } else if (sa.sa.sa_family == AF_LOCAL) {
-                snprintf(c, l, "UNIX client for %s", sa.un.sun_path);
+                snprintf(c, l, "UNIX socket client");
                 return;
             }
 
@@ -208,3 +209,15 @@ int pa_unix_socket_remove_stale(const char *fn) {
 
     return 0;
 }
+
+void pa_check_for_sigpipe(void) {
+    struct sigaction sa;
+
+    if (sigaction(SIGPIPE, NULL, &sa) < 0) {
+        fprintf(stderr, __FILE__": sigaction() failed: %s\n", strerror(errno));
+        return;
+    }
+        
+    if (sa.sa_handler == SIG_DFL)
+        fprintf(stderr, "polypaudio: WARNING: SIGPIPE is not trapped. This might cause malfunction!\n");
+}
index 40095e0..ad9916e 100644 (file)
@@ -18,4 +18,6 @@ ssize_t pa_loop_write(int fd, const void*data, size_t size);
 int pa_unix_socket_is_stale(const char *fn);
 int pa_unix_socket_remove_stale(const char *fn);
 
+void pa_check_for_sigpipe(void);
+
 #endif