From 1f9ce59969e5e778f3baa287fa9c7918fde1b0c9 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 6 Aug 2007 21:47:53 +0000 Subject: [PATCH] port esound protocol to new lock-free core git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1585 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/Makefile.am | 81 +++--- src/pulsecore/protocol-esound.c | 555 +++++++++++++++++++++++++--------------- 2 files changed, 392 insertions(+), 244 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 8fc0f19..0f2cdec 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -723,9 +723,8 @@ modlibexec_LTLIBRARIES = \ libstrlist.la \ libprotocol-simple.la \ libprotocol-http.la \ - libprotocol-native.la - -# libprotocol-esound.la + libprotocol-native.la \ + libprotocol-esound.la # We need to emulate sendmsg/recvmsg to support this on Win32 if !OS_IS_WIN32 @@ -881,8 +880,8 @@ modlibexec_LTLIBRARIES += \ module-http-protocol-tcp.la \ module-sine.la \ module-native-protocol-tcp.la \ - module-native-protocol-fd.la -# module-esound-protocol-tcp.la \ + module-native-protocol-fd.la \ + module-esound-protocol-tcp.la # module-combine.la \ # module-tunnel-sink.la \ # module-tunnel-source.la \ @@ -900,8 +899,8 @@ modlibexec_LTLIBRARIES += \ module-cli-protocol-unix.la \ module-simple-protocol-unix.la \ module-http-protocol-unix.la \ - module-native-protocol-unix.la -# module-esound-protocol-unix.la + module-native-protocol-unix.la \ + module-esound-protocol-unix.la endif if HAVE_MKFIFO @@ -910,11 +909,11 @@ modlibexec_LTLIBRARIES += \ module-pipe-source.la endif -#if !OS_IS_WIN32 -#modlibexec_LTLIBRARIES += \ -# module-esound-compat-spawnfd.la \ -# module-esound-compat-spawnpid.la -#endif +if !OS_IS_WIN32 +modlibexec_LTLIBRARIES += \ + module-esound-compat-spawnfd.la \ + module-esound-compat-spawnpid.la +endif if HAVE_REGEX modlibexec_LTLIBRARIES += \ @@ -940,10 +939,10 @@ modlibexec_LTLIBRARIES += \ module-alsa-source.la endif -if HAVE_SOLARIS -modlibexec_LTLIBRARIES += \ - module-solaris.la -endif +#if HAVE_SOLARIS +#modlibexec_LTLIBRARIES += \ +# module-solaris.la +#endif if HAVE_AVAHI modlibexec_LTLIBRARIES += \ @@ -974,10 +973,10 @@ pulselibexec_PROGRAMS = \ gconf-helper endif -if OS_IS_WIN32 -modlibexec_LTLIBRARIES += \ - module-waveout.la -endif +#if OS_IS_WIN32 +#modlibexec_LTLIBRARIES += \ +# module-waveout.la +#endif if HAVE_HAL modlibexec_LTLIBRARIES += \ @@ -1099,23 +1098,23 @@ module_native_protocol_fd_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-n # EsounD protocol -#module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c -#module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) -#module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version -#module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la +module_esound_protocol_tcp_la_SOURCES = modules/module-protocol-stub.c +module_esound_protocol_tcp_la_CFLAGS = -DUSE_TCP_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) +module_esound_protocol_tcp_la_LDFLAGS = -module -avoid-version +module_esound_protocol_tcp_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la -#module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c -#module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) -#module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version -#module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la +module_esound_protocol_unix_la_SOURCES = modules/module-protocol-stub.c +module_esound_protocol_unix_la_CFLAGS = -DUSE_UNIX_SOCKETS -DUSE_PROTOCOL_ESOUND $(AM_CFLAGS) +module_esound_protocol_unix_la_LDFLAGS = -module -avoid-version +module_esound_protocol_unix_la_LIBADD = $(AM_LIBADD) libpulsecore.la libprotocol-esound.la libsocket-server.la libsocket-util.la -#module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c -#module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version -#module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la +module_esound_compat_spawnfd_la_SOURCES = modules/module-esound-compat-spawnfd.c +module_esound_compat_spawnfd_la_LDFLAGS = -module -avoid-version +module_esound_compat_spawnfd_la_LIBADD = $(AM_LIBADD) libpulsecore.la -#module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c -#module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version -#module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la +module_esound_compat_spawnpid_la_SOURCES = modules/module-esound-compat-spawnpid.c +module_esound_compat_spawnpid_la_LDFLAGS = -module -avoid-version +module_esound_compat_spawnpid_la_LIBADD = $(AM_LIBADD) libpulsecore.la #module_esound_sink_la_SOURCES = modules/module-esound-sink.c #module_esound_sink_la_LDFLAGS = -module -avoid-version @@ -1201,9 +1200,9 @@ module_alsa_source_la_CFLAGS = $(AM_CFLAGS) $(ASOUNDLIB_CFLAGS) # Solaris -module_solaris_la_SOURCES = modules/module-solaris.c -module_solaris_la_LDFLAGS = -module -avoid-version -module_solaris_la_LIBADD = $(AM_LIBADD) libiochannel.la +#module_solaris_la_SOURCES = modules/module-solaris.c +#module_solaris_la_LDFLAGS = -module -avoid-version +#module_solaris_la_LIBADD = $(AM_LIBADD) libiochannel.la # Avahi @@ -1228,10 +1227,10 @@ module_mmkbd_evdev_la_CFLAGS = $(AM_CFLAGS) # Windows waveout -module_waveout_la_SOURCES = modules/module-waveout.c -module_waveout_la_LDFLAGS = -module -avoid-version -module_waveout_la_LIBADD = $(AM_LIBADD) libpulsecore.la -lwinmm -module_waveout_la_CFLAGS = $(AM_CFLAGS) +#module_waveout_la_SOURCES = modules/module-waveout.c +#module_waveout_la_LDFLAGS = -module -avoid-version +#module_waveout_la_LIBADD = $(AM_LIBADD) libpulsecore.la -lwinmm +#module_waveout_la_CFLAGS = $(AM_CFLAGS) # Hardware autodetection module module_detect_la_SOURCES = modules/module-detect.c diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c index fe0b879..7460714 100644 --- a/src/pulsecore/protocol-esound.c +++ b/src/pulsecore/protocol-esound.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include @@ -53,6 +52,7 @@ #include #include #include +#include #include "endianmacros.h" @@ -77,7 +77,9 @@ /* This is heavily based on esound's code */ -struct connection { +typedef struct connection { + pa_msgobject parent; + uint32_t index; int dead; pa_protocol_esound *protocol; @@ -100,6 +102,7 @@ struct connection { struct { pa_memblock *current_memblock; size_t memblock_index, fragment_size; + pa_atomic_t missing; } playback; struct { @@ -109,46 +112,62 @@ struct connection { } scache; pa_time_event *auth_timeout_event; -}; +} connection; + +PA_DECLARE_CLASS(connection); +#define CONNECTION(o) (connection_cast(o)) +static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject); struct pa_protocol_esound { - int public; pa_module *module; pa_core *core; + int public; pa_socket_server *server; pa_idxset *connections; + char *sink_name, *source_name; unsigned n_player; uint8_t esd_key[ESD_KEY_LEN]; pa_ip_acl *auth_ip_acl; }; +enum { + SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */ + SINK_INPUT_MESSAGE_DISABLE_PREBUF +}; + +enum { + CONNECTION_MESSAGE_REQUEST_DATA, + CONNECTION_MESSAGE_POST_DATA, + CONNECTION_MESSAGE_UNLINK_CONNECTION +}; + typedef struct proto_handler { size_t data_length; - int (*proc)(struct connection *c, esd_proto_t request, const void *data, size_t length); + int (*proc)(connection *c, esd_proto_t request, const void *data, size_t length); const char *description; } esd_proto_handler_info_t; -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length); +static void sink_input_drop_cb(pa_sink_input *i, size_t length); static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk); static void sink_input_kill_cb(pa_sink_input *i); -static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i); +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); static pa_usec_t source_output_get_latency_cb(pa_source_output *o); static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk); static void source_output_kill_cb(pa_source_output *o); -static int esd_proto_connect(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_stream_play(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_stream_record(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_get_latency(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_server_info(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_all_info(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_stream_pan(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_sample_cache(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_sample_get_id(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_standby_or_resume(struct connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length); /* the big map of protocol handler info */ static struct proto_handler proto_map[ESD_PROTO_MAX] = { @@ -185,25 +204,56 @@ static struct proto_handler proto_map[ESD_PROTO_MAX] = { { 0, esd_proto_get_latency, "get latency" } }; -static void connection_free(struct connection *c) { - assert(c); - pa_idxset_remove_by_data(c->protocol->connections, c, NULL); - - if (c->state == ESD_STREAMING_DATA) - c->protocol->n_player--; +static void connection_unlink(connection *c) { + pa_assert(c); - pa_client_free(c->client); + if (!c->protocol) + return; if (c->sink_input) { pa_sink_input_disconnect(c->sink_input); pa_sink_input_unref(c->sink_input); + c->sink_input = NULL; } if (c->source_output) { pa_source_output_disconnect(c->source_output); pa_source_output_unref(c->source_output); + c->source_output = NULL; } + if (c->client) { + pa_client_free(c->client); + c->client = NULL; + } + + if (c->state == ESD_STREAMING_DATA) + c->protocol->n_player--; + + if (c->io) { + pa_iochannel_free(c->io); + c->io = NULL; + } + + if (c->defer_event) { + c->protocol->core->mainloop->defer_free(c->defer_event); + c->defer_event = NULL; + } + + if (c->auth_timeout_event) { + c->protocol->core->mainloop->time_free(c->auth_timeout_event); + c->auth_timeout_event = NULL; + } + + pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c); + c->protocol = NULL; + connection_unref(c); +} + +static void connection_free(pa_object *obj) { + connection *c = CONNECTION(obj); + pa_assert(c); + if (c->input_memblockq) pa_memblockq_free(c->input_memblockq); if (c->output_memblockq) @@ -215,54 +265,44 @@ static void connection_free(struct connection *c) { pa_xfree(c->read_data); pa_xfree(c->write_data); - if (c->io) - pa_iochannel_free(c->io); - - if (c->defer_event) - c->protocol->core->mainloop->defer_free(c->defer_event); - if (c->scache.memchunk.memblock) pa_memblock_unref(c->scache.memchunk.memblock); pa_xfree(c->scache.name); - if (c->auth_timeout_event) - c->protocol->core->mainloop->time_free(c->auth_timeout_event); - pa_xfree(c->original_name); pa_xfree(c); } -static void connection_write_prepare(struct connection *c, size_t length) { +static void connection_write_prepare(connection *c, size_t length) { size_t t; - assert(c); + pa_assert(c); t = c->write_data_length+length; if (c->write_data_alloc < t) c->write_data = pa_xrealloc(c->write_data, c->write_data_alloc = t); - assert(c->write_data); + pa_assert(c->write_data); } -static void connection_write(struct connection *c, const void *data, size_t length) { +static void connection_write(connection *c, const void *data, size_t length) { size_t i; - assert(c); + pa_assert(c); - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); c->protocol->core->mainloop->defer_enable(c->defer_event, 1); connection_write_prepare(c, length); - assert(c->write_data); + pa_assert(c->write_data); i = c->write_data_length; c->write_data_length += length; - memcpy((char*)c->write_data + i, data, length); + memcpy((uint8_t*) c->write_data + i, data, length); } static void format_esd2native(int format, int swap_bytes, pa_sample_spec *ss) { - assert(ss); + pa_assert(ss); ss->channels = ((format & ESD_MASK_CHAN) == ESD_STEREO) ? 2 : 1; if ((format & ESD_MASK_BITS) == ESD_BITS16) @@ -289,11 +329,13 @@ static int format_native2esd(pa_sample_spec *ss) { /*** esound commands ***/ -static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_connect(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { uint32_t ekey; int ok; - assert(length == (ESD_KEY_LEN + sizeof(uint32_t))); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == (ESD_KEY_LEN + sizeof(uint32_t))); if (!c->authorized) { if (memcmp(data, c->protocol->esd_key, ESD_KEY_LEN) != 0) { @@ -316,7 +358,7 @@ static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto_t req else if (ekey == ESD_SWAP_ENDIAN_KEY) c->swap_byte_order = 1; else { - pa_log("client sent invalid endian key"); + pa_log_warn("Client sent invalid endian key"); return -1; } @@ -325,7 +367,7 @@ static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto_t req return 0; } -static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { char name[ESD_NAME_MAX], *utf8_name; int32_t format, rate; pa_sample_spec ss; @@ -333,15 +375,17 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t pa_sink *sink = NULL; pa_sink_input_new_data sdata; - assert(c && length == (sizeof(int32_t)*2+ESD_NAME_MAX)); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX)); memcpy(&format, data, sizeof(int32_t)); format = MAYBE_INT32_SWAP(c->swap_byte_order, format); - data = (const char*)data + sizeof(int32_t); + data = (const char*) data + sizeof(int32_t); memcpy(&rate, data, sizeof(int32_t)); rate = MAYBE_INT32_SWAP(c->swap_byte_order, rate); - data = (const char*)data + sizeof(int32_t); + data = (const char*) data + sizeof(int32_t); ss.rate = rate; format_esd2native(format, c->swap_byte_order, &ss); @@ -362,7 +406,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t c->original_name = pa_xstrdup(name); - assert(!c->sink_input && !c->input_memblockq); + pa_assert(!c->sink_input && !c->input_memblockq); pa_sink_input_new_data_init(&sdata); sdata.sink = sink; @@ -385,22 +429,26 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t l/PLAYBACK_BUFFER_FRAGMENTS, NULL); pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2); - c->playback.fragment_size = l/10; + c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; + c->sink_input->parent.process_msg = sink_input_process_msg; c->sink_input->peek = sink_input_peek_cb; c->sink_input->drop = sink_input_drop_cb; c->sink_input->kill = sink_input_kill_cb; - c->sink_input->get_latency = sink_input_get_latency_cb; c->sink_input->userdata = c; c->state = ESD_STREAMING_DATA; c->protocol->n_player++; + pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); + + pa_sink_input_put(c->sink_input); + return 0; } -static int esd_proto_stream_record(struct connection *c, esd_proto_t request, const void *data, size_t length) { +static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length) { char name[ESD_NAME_MAX], *utf8_name; int32_t format, rate; pa_source *source = NULL; @@ -408,15 +456,17 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co size_t l; pa_source_output_new_data sdata; - assert(c && length == (sizeof(int32_t)*2+ESD_NAME_MAX)); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX)); memcpy(&format, data, sizeof(int32_t)); format = MAYBE_INT32_SWAP(c->swap_byte_order, format); - data = (const char*)data + sizeof(int32_t); + data = (const char*) data + sizeof(int32_t); memcpy(&rate, data, sizeof(int32_t)); rate = MAYBE_INT32_SWAP(c->swap_byte_order, rate); - data = (const char*)data + sizeof(int32_t); + data = (const char*) data + sizeof(int32_t); ss.rate = rate; format_esd2native(format, c->swap_byte_order, &ss); @@ -436,7 +486,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co return -1; } } else { - assert(request == ESD_PROTO_STREAM_REC); + pa_assert(request == ESD_PROTO_STREAM_REC); if (c->protocol->source_name) { if (!(source = pa_namereg_get(c->protocol->core, c->protocol->source_name, PA_NAMEREG_SOURCE, 1))) { @@ -455,7 +505,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co c->original_name = pa_xstrdup(name); - assert(!c->output_memblockq && !c->source_output); + pa_assert(!c->output_memblockq && !c->source_output); pa_source_output_new_data_init(&sdata); sdata.source = source; @@ -488,14 +538,18 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co c->protocol->n_player++; + pa_source_output_put(c->source_output); + return 0; } -static int esd_proto_get_latency(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_get_latency(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { pa_sink *sink; int32_t latency; - assert(c && !data && length == 0); + connection_ref(c); + pa_assert(!data); + pa_assert(length == 0); if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) latency = 0; @@ -509,12 +563,14 @@ static int esd_proto_get_latency(struct connection *c, PA_GCC_UNUSED esd_proto_t return 0; } -static int esd_proto_server_info(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_server_info(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { int32_t rate = 44100, format = ESD_STEREO|ESD_BITS16; int32_t response; pa_sink *sink; - assert(c && data && length == sizeof(int32_t)); + connection_ref(c); + pa_assert(data); + pa_assert(length == sizeof(int32_t)); if ((sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) { rate = sink->sample_spec.rate; @@ -533,14 +589,16 @@ static int esd_proto_server_info(struct connection *c, PA_GCC_UNUSED esd_proto_t return 0; } -static int esd_proto_all_info(struct connection *c, esd_proto_t request, const void *data, size_t length) { +static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length) { size_t t, k, s; - struct connection *conn; + connection *conn; uint32_t idx = PA_IDXSET_INVALID; unsigned nsamples; char terminator[sizeof(int32_t)*6+ESD_NAME_MAX]; - assert(c && data && length == sizeof(int32_t)); + connection_ref(c); + pa_assert(data); + pa_assert(length == sizeof(int32_t)); if (esd_proto_server_info(c, request, data, length) < 0) return -1; @@ -561,7 +619,7 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v if (conn->state != ESD_STREAMING_DATA) continue; - assert(t >= k*2+s); + pa_assert(t >= k*2+s); if (conn->sink_input) { pa_cvolume volume = *pa_sink_input_get_volume(conn->sink_input); @@ -602,7 +660,7 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v t -= k; } - assert(t == s*(nsamples+1)+k); + pa_assert(t == s*(nsamples+1)+k); t -= k; connection_write(c, terminator, k); @@ -615,7 +673,7 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v int32_t id, rate, lvolume, rvolume, format, len; char name[ESD_NAME_MAX]; - assert(t >= s*2); + pa_assert(t >= s*2); /* id */ id = MAYBE_INT32_SWAP(c->swap_byte_order, (int) (ce->index+1)); @@ -653,19 +711,21 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v } } - assert(t == s); + pa_assert(t == s); connection_write(c, terminator, s); return 0; } -static int esd_proto_stream_pan(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_stream_pan(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { int32_t ok; uint32_t idx, lvolume, rvolume; - struct connection *conn; + connection *conn; - assert(c && data && length == sizeof(int32_t)*3); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == sizeof(int32_t)*3); memcpy(&idx, data, sizeof(uint32_t)); idx = MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1; @@ -694,13 +754,15 @@ static int esd_proto_stream_pan(struct connection *c, PA_GCC_UNUSED esd_proto_t return 0; } -static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_sample_cache(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { pa_sample_spec ss; int32_t format, rate, sc_length; uint32_t idx; char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1]; - assert(c && data && length == (ESD_NAME_MAX+3*sizeof(int32_t))); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == (ESD_NAME_MAX+3*sizeof(int32_t))); memcpy(&format, data, sizeof(int32_t)); format = MAYBE_INT32_SWAP(c->swap_byte_order, format); @@ -727,12 +789,12 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_ CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name."); - assert(!c->scache.memchunk.memblock); + pa_assert(!c->scache.memchunk.memblock); c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length); c->scache.memchunk.index = 0; c->scache.memchunk.length = sc_length; c->scache.sample_spec = ss; - assert(!c->scache.name); + pa_assert(!c->scache.name); c->scache.name = pa_xstrdup(name); c->state = ESD_CACHING_SAMPLE; @@ -745,12 +807,14 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_ return 0; } -static int esd_proto_sample_get_id(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_sample_get_id(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { int32_t ok; uint32_t idx; char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1]; - assert(c && data && length == ESD_NAME_MAX); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == ESD_NAME_MAX); strcpy(name, SCACHE_PREFIX); strncpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX); @@ -767,12 +831,14 @@ static int esd_proto_sample_get_id(struct connection *c, PA_GCC_UNUSED esd_proto return 0; } -static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t request, const void *data, size_t length) { +static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length) { int32_t ok; const char *name; uint32_t idx; - assert(c && data && length == sizeof(int32_t)); + connection_assert_ref(c); + pa_assert(data); + pa_assert(length == sizeof(int32_t)); memcpy(&idx, data, sizeof(uint32_t)); idx = MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1; @@ -787,7 +853,7 @@ static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t reque if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM) >= 0) ok = idx + 1; } else { - assert(request == ESD_PROTO_SAMPLE_FREE); + pa_assert(request == ESD_PROTO_SAMPLE_FREE); if (pa_scache_remove_item(c->protocol->core, name) >= 0) ok = idx + 1; @@ -799,9 +865,11 @@ static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t reque return 0; } -static int esd_proto_standby_or_resume(struct connection *c, PA_GCC_UNUSED esd_proto_t request, PA_GCC_UNUSED const void *data, PA_GCC_UNUSED size_t length) { +static int esd_proto_standby_or_resume(connection *c, PA_GCC_UNUSED esd_proto_t request, PA_GCC_UNUSED const void *data, PA_GCC_UNUSED size_t length) { int32_t ok; + connection_assert_ref(c); + connection_write_prepare(c, sizeof(int32_t) * 2); ok = 1; @@ -814,20 +882,21 @@ static int esd_proto_standby_or_resume(struct connection *c, PA_GCC_UNUSED esd_p /*** client callbacks ***/ static void client_kill_cb(pa_client *c) { - assert(c && c->userdata); - connection_free(c->userdata); + pa_assert(c); + + connection_unlink(CONNECTION(c->userdata)); } /*** pa_iochannel callbacks ***/ -static int do_read(struct connection *c) { - assert(c && c->io); +static int do_read(connection *c) { + connection_assert_ref(c); /* pa_log("READ"); */ if (c->state == ESD_NEXT_REQUEST) { ssize_t r; - assert(c->read_data_length < sizeof(c->request)); + pa_assert(c->read_data_length < sizeof(c->request)); if ((r = pa_iochannel_read(c->io, ((uint8_t*) &c->request) + c->read_data_length, sizeof(c->request) - c->read_data_length)) <= 0) { pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"); @@ -862,7 +931,7 @@ static int do_read(struct connection *c) { } else { if (c->read_data_alloc < handler->data_length) c->read_data = pa_xrealloc(c->read_data, c->read_data_alloc = handler->data_length); - assert(c->read_data); + pa_assert(c->read_data); c->state = ESD_NEEDS_REQDATA; c->read_data_length = 0; @@ -873,18 +942,21 @@ static int do_read(struct connection *c) { ssize_t r; struct proto_handler *handler = proto_map+c->request; - assert(handler->proc); + pa_assert(handler->proc); - assert(c->read_data && c->read_data_length < handler->data_length); + pa_assert(c->read_data && c->read_data_length < handler->data_length); if ((r = pa_iochannel_read(c->io, (uint8_t*) c->read_data + c->read_data_length, handler->data_length - c->read_data_length)) <= 0) { + if (errno == EINTR || errno == EAGAIN) + return 0; + pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"); return -1; } if ((c->read_data_length += r) >= handler->data_length) { size_t l = c->read_data_length; - assert(handler->proc); + pa_assert(handler->proc); c->state = ESD_NEXT_REQUEST; c->read_data_length = 0; @@ -896,22 +968,24 @@ static int do_read(struct connection *c) { ssize_t r; void *p; - assert(c->scache.memchunk.memblock); - assert(c->scache.name); - assert(c->scache.memchunk.index < c->scache.memchunk.length); + pa_assert(c->scache.memchunk.memblock); + pa_assert(c->scache.name); + pa_assert(c->scache.memchunk.index < c->scache.memchunk.length); p = pa_memblock_acquire(c->scache.memchunk.memblock); - - if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) { - pa_memblock_release(c->scache.memchunk.memblock); + r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index); + pa_memblock_release(c->scache.memchunk.memblock); + + if (r <= 0) { + if (errno == EINTR || errno == EAGAIN) + return 0; + pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"); return -1; } - pa_memblock_release(c->scache.memchunk.memblock); - c->scache.memchunk.index += r; - assert(c->scache.memchunk.index <= c->scache.memchunk.length); + pa_assert(c->scache.memchunk.index <= c->scache.memchunk.length); if (c->scache.memchunk.index == c->scache.memchunk.length) { uint32_t idx; @@ -938,11 +1012,11 @@ static int do_read(struct connection *c) { size_t l; void *p; - assert(c->input_memblockq); + pa_assert(c->input_memblockq); /* pa_log("STREAMING_DATA"); */ - if (!(l = pa_memblockq_missing(c->input_memblockq))) + if (!(l = pa_atomic_load(&c->playback.missing))) return 0; if (l > c->playback.fragment_size) @@ -956,47 +1030,50 @@ static int do_read(struct connection *c) { } if (!c->playback.current_memblock) { - c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); - assert(c->playback.current_memblock); - assert(pa_memblock_get_length(c->playback.current_memblock) >= l); + pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2)); c->playback.memblock_index = 0; } p = pa_memblock_acquire(c->playback.current_memblock); + r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l); + pa_memblock_release(c->playback.current_memblock); + + if (r <= 0) { + + if (errno == EINTR || errno == EAGAIN) + return 0; - if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l)) <= 0) { - pa_memblock_release(c->playback.current_memblock); pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"); return -1; } - pa_memblock_release(c->playback.current_memblock); chunk.memblock = c->playback.current_memblock; chunk.index = c->playback.memblock_index; chunk.length = r; - assert(chunk.memblock); c->playback.memblock_index += r; - assert(c->input_memblockq); - pa_memblockq_push_align(c->input_memblockq, &chunk); - assert(c->sink_input); - pa_sink_notify(c->sink_input->sink); + pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL); + pa_atomic_sub(&c->playback.missing, r); } return 0; } -static int do_write(struct connection *c) { - assert(c && c->io); +static int do_write(connection *c) { + connection_assert_ref(c); /* pa_log("WRITE"); */ if (c->write_data_length) { ssize_t r; - assert(c->write_data_index < c->write_data_length); + pa_assert(c->write_data_index < c->write_data_length); if ((r = pa_iochannel_write(c->io, (uint8_t*) c->write_data+c->write_data_index, c->write_data_length-c->write_data_index)) < 0) { + + if (errno == EINTR || errno == EAGAIN) + return 0; + pa_log("write(): %s", pa_cstrerror(errno)); return -1; } @@ -1009,37 +1086,36 @@ static int do_write(struct connection *c) { ssize_t r; void *p; - assert(c->output_memblockq); if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) return 0; - assert(chunk.memblock); - assert(chunk.length); + pa_assert(chunk.memblock); + pa_assert(chunk.length); p = pa_memblock_acquire(chunk.memblock); + r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length); + pa_memblock_release(chunk.memblock); + + pa_memblock_unref(chunk.memblock); + + if (r < 0) { - if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) { - pa_memblock_release(chunk.memblock); - pa_memblock_unref(chunk.memblock); + if (errno == EINTR || errno == EAGAIN) + return 0; + pa_log("write(): %s", pa_cstrerror(errno)); return -1; } - pa_memblock_release(chunk.memblock); - - pa_memblockq_drop(c->output_memblockq, &chunk, r); - pa_memblock_unref(chunk.memblock); - - pa_source_notify(c->source_output->source); + pa_memblockq_drop(c->output_memblockq, r); } return 0; } -static void do_work(struct connection *c) { - assert(c); +static void do_work(connection *c) { + connection_assert_ref(c); - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); c->protocol->core->mainloop->defer_enable(c->defer_event, 0); if (c->dead) @@ -1070,117 +1146,188 @@ fail: pa_iochannel_free(c->io); c->io = NULL; - pa_memblockq_prebuf_disable(c->input_memblockq); - pa_sink_notify(c->sink_input->sink); + pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL); } else - connection_free(c); + connection_unlink(c); } static void io_callback(pa_iochannel*io, void *userdata) { - struct connection *c = userdata; - assert(io && c && c->io == io); + connection *c = CONNECTION(userdata); + + connection_assert_ref(c); + pa_assert(io); do_work(c); } -/*** defer callback ***/ - static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) { - struct connection *c = userdata; - assert(a && c && c->defer_event == e); + connection *c = CONNECTION(userdata); -/* pa_log("DEFER"); */ + connection_assert_ref(c); + pa_assert(e); do_work(c); } +static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) { + connection *c = CONNECTION(o); + connection_assert_ref(c); + + switch (code) { + case CONNECTION_MESSAGE_REQUEST_DATA: + do_work(c); + break; + + case CONNECTION_MESSAGE_POST_DATA: +/* pa_log("got data %u", chunk->length); */ + pa_memblockq_push_align(c->output_memblockq, chunk); + do_work(c); + break; + + case CONNECTION_MESSAGE_UNLINK_CONNECTION: + connection_unlink(c); + break; + } + + return 0; +} + /*** sink_input callbacks ***/ -static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { - struct connection*c; - assert(i && i->userdata && chunk); - c = i->userdata; +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { + pa_sink_input *i = PA_SINK_INPUT(o); + connection*c; - if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + pa_sink_input_assert_ref(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); - if (c->dead) - connection_free(c); + switch (code) { - return -1; - } + case SINK_INPUT_MESSAGE_POST_DATA: { + pa_assert(chunk); - return 0; + /* New data from the main loop */ + pa_memblockq_push_align(c->input_memblockq, chunk); + +/* pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */ + + return 0; + } + + case SINK_INPUT_MESSAGE_DISABLE_PREBUF: { + pa_memblockq_prebuf_disable(c->input_memblockq); + return 0; + } + + case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { + pa_usec_t *r = userdata; + + *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); + + /* Fall through, the default handler will add in the extra + * latency added by the resampler */ + } + + default: + return pa_sink_input_process_msg(o, code, userdata, offset, chunk); + } } -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { - struct connection*c = i->userdata; - assert(i && c && length); -/* pa_log("DROP"); */ +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { + connection*c; + int r; + + pa_assert(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + pa_assert(chunk); + + if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0 && c->dead) + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + + return r; +} + +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { + connection*c; + size_t old, new; - pa_memblockq_drop(c->input_memblockq, chunk, length); + pa_assert(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + pa_assert(length); - /* do something */ - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); + /* pa_log("DROP"); */ - if (!c->dead) - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + old = pa_memblockq_missing(c->input_memblockq); + pa_memblockq_drop(c->input_memblockq, length); + new = pa_memblockq_missing(c->input_memblockq); -/* assert(pa_memblockq_get_length(c->input_memblockq) > 2048); */ + if (new > old) { + if (pa_atomic_add(&c->playback.missing, new - old) <= 0) + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + } } static void sink_input_kill_cb(pa_sink_input *i) { - assert(i && i->userdata); - connection_free((struct connection *) i->userdata); -} + pa_sink_input_assert_ref(i); -static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) { - struct connection*c = i->userdata; - assert(i && c); - return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); + connection_unlink(CONNECTION(i->userdata)); } /*** source_output callbacks ***/ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { - struct connection *c = o->userdata; - assert(o && c && chunk); - - pa_memblockq_push(c->output_memblockq, chunk); + connection *c; - /* do something */ - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); + pa_assert(o); + c = CONNECTION(o->userdata); + pa_assert(c); + pa_assert(chunk); - if (!c->dead) - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL); } static void source_output_kill_cb(pa_source_output *o) { - assert(o && o->userdata); - connection_free((struct connection *) o->userdata); + pa_source_output_assert_ref(o); + + connection_unlink(CONNECTION(o->userdata)); } static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { - struct connection*c = o->userdata; - assert(o && c); + connection*c; + + pa_assert(o); + c = CONNECTION(o->userdata); + pa_assert(c); + return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec); } /*** socket server callback ***/ static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) { - struct connection *c = userdata; - assert(m && tv && c && c->auth_timeout_event == e); + connection *c = CONNECTION(userdata); + + pa_assert(m); + pa_assert(tv); + connection_assert_ref(c); + pa_assert(c->auth_timeout_event == e); if (!c->authorized) - connection_free(c); + connection_unlink(c); } static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) { - struct connection *c; + connection *c; pa_protocol_esound *p = userdata; char cname[256], pname[128]; - assert(s && io && p); + + pa_assert(s); + pa_assert(io); + pa_assert(p); if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) { pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); @@ -1188,16 +1335,16 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) return; } - c = pa_xnew(struct connection, 1); + c = pa_msgobject_new(connection); + c->parent.parent.free = connection_free; + c->parent.process_msg = connection_process_msg; c->protocol = p; c->io = io; pa_iochannel_set_callback(c->io, io_callback, c); pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname)); pa_snprintf(cname, sizeof(cname), "EsounD client (%s)", pname); - assert(p->core); c->client = pa_client_new(p->core, __FILE__, cname); - assert(c->client); c->client->owner = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; @@ -1224,6 +1371,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->playback.current_memblock = NULL; c->playback.memblock_index = 0; c->playback.fragment_size = 0; + pa_atomic_store(&c->playback.missing, 0); c->scache.memchunk.length = c->scache.memchunk.index = 0; c->scache.memchunk.memblock = NULL; @@ -1245,7 +1393,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->auth_timeout_event = NULL; c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); - assert(c->defer_event); p->core->mainloop->defer_enable(c->defer_event, 0); pa_idxset_put(p->connections, c, &c->index); @@ -1254,22 +1401,22 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) /*** entry points ***/ pa_protocol_esound* pa_protocol_esound_new(pa_core*core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { - pa_protocol_esound *p; + pa_protocol_esound *p = NULL; int public = 0; const char *acl; - assert(core); - assert(server); - assert(m); - assert(ma); - - p = pa_xnew(pa_protocol_esound, 1); + pa_assert(core); + pa_assert(server); + pa_assert(m); + pa_assert(ma); if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &public) < 0) { pa_log("auth-anonymous= expects a boolean argument."); goto fail; } + p = pa_xnew(pa_protocol_esound, 1); + if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", DEFAULT_COOKIE_FILE), p->esd_key, sizeof(p->esd_key)) < 0) goto fail; @@ -1282,13 +1429,12 @@ pa_protocol_esound* pa_protocol_esound_new(pa_core*core, pa_socket_server *serve } else p->auth_ip_acl = NULL; + p->core = core; p->module = m; p->public = public; p->server = server; pa_socket_server_set_callback(p->server, on_connection, p); - p->core = core; p->connections = pa_idxset_new(NULL, NULL); - assert(p->connections); p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)); @@ -1302,17 +1448,20 @@ fail: } void pa_protocol_esound_free(pa_protocol_esound *p) { - struct connection *c; - assert(p); + connection *c; + pa_assert(p); while ((c = pa_idxset_first(p->connections, NULL))) - connection_free(c); - + connection_unlink(c); pa_idxset_free(p->connections, NULL, NULL); + pa_socket_server_unref(p->server); if (p->auth_ip_acl) pa_ip_acl_free(p->auth_ip_acl); + pa_xfree(p->sink_name); + pa_xfree(p->source_name); + pa_xfree(p); } -- 2.7.4