From c40c1682be62ccccedf626b1d9e335efe7a1101a Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Thu, 20 Sep 2007 20:30:03 +0000 Subject: [PATCH] maintain the attach status in a boolean variable 'attach' accessible from the IO thread for sink_inputs/source_outputs git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1872 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/sink-input.c | 78 ++++++++++++++++++++++--------------------- src/pulsecore/sink-input.h | 2 ++ src/pulsecore/sink.c | 67 ++++++++++++++++++++----------------- src/pulsecore/source-output.c | 17 ++++++---- src/pulsecore/source-output.h | 2 ++ src/pulsecore/source.c | 32 ++++++++++-------- 6 files changed, 109 insertions(+), 89 deletions(-) diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 3993e33..c06da13 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -189,9 +189,9 @@ pa_sink_input* pa_sink_input_new( if (data->sync_base->sync_next) data->sync_base->sync_next->sync_prev = i; data->sync_base->sync_next = i; - } else + } else i->sync_next = i->sync_prev = NULL; - + i->peek = NULL; i->drop = NULL; i->kill = NULL; @@ -205,11 +205,12 @@ pa_sink_input* pa_sink_input_new( pa_atomic_store(&i->thread_info.drained, 1); i->thread_info.sample_spec = i->sample_spec; i->thread_info.silence_memblock = NULL; - i->thread_info.move_silence = 0; + i->thread_info.move_silence = 0; pa_memchunk_reset(&i->thread_info.resampled_chunk); i->thread_info.resampler = resampler; i->thread_info.volume = i->volume; i->thread_info.muted = i->muted; + i->thread_info.attached = FALSE; pa_assert_se(pa_idxset_put(core->sink_inputs, pa_sink_input_ref(i), &i->index) == 0); pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0); @@ -249,7 +250,7 @@ static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) { update_n_corked(i, state); i->state = state; - + for (ssync = i->sync_prev; ssync; ssync = ssync->sync_prev) { update_n_corked(ssync, state); ssync->state = state; @@ -258,7 +259,7 @@ static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) { update_n_corked(ssync, state); ssync->state = state; } - + return 0; } @@ -267,14 +268,14 @@ void pa_sink_input_unlink(pa_sink_input *i) { pa_assert(PA_SINK_INPUT_LINKED(i->state)); pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_UNLINK], i); - + if (i->sync_prev) i->sync_prev->sync_next = i->sync_next; if (i->sync_next) i->sync_next->sync_prev = i->sync_prev; - + i->sync_prev = i->sync_next = NULL; - + pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, 0, NULL); pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL); pa_idxset_remove_by_data(i->sink->inputs, i, NULL); @@ -283,7 +284,7 @@ void pa_sink_input_unlink(pa_sink_input *i) { sink_input_set_state(i, PA_SINK_INPUT_UNLINKED); pa_sink_update_status(i->sink); - + i->peek = NULL; i->drop = NULL; i->kill = NULL; @@ -308,6 +309,8 @@ static void sink_input_free(pa_object *o) { pa_log_info("Freeing output %u \"%s\"", i->index, i->name); + pa_assert(!i->thread_info.attached); + if (i->thread_info.resampled_chunk.memblock) pa_memblock_unref(i->thread_info.resampled_chunk.memblock); @@ -393,7 +396,7 @@ int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_c block_size_max = pa_mempool_block_size_max(i->sink->core->mempool); if (length > block_size_max) length = pa_frame_align(block_size_max, &i->sink->sample_spec); - + if (i->thread_info.move_silence > 0) { size_t l; @@ -437,7 +440,7 @@ int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_c rmbs = pa_resampler_max_block_size(i->thread_info.resampler); if (l > rmbs) l = rmbs; - + if ((ret = i->peek(i, l, &tchunk)) < 0) goto finish; @@ -451,7 +454,7 @@ int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_c /* It might be necessary to adjust the volume here */ if (do_volume_adj_here && !volume_is_norm) { pa_memchunk_make_writable(&tchunk, 0); - + if (i->thread_info.muted) pa_silence_memchunk(&tchunk, &i->thread_info.sample_spec); else @@ -529,7 +532,7 @@ void pa_sink_input_drop(pa_sink_input *i, size_t length) { i->thread_info.resampled_chunk.index += l; i->thread_info.resampled_chunk.length -= l; - + if (i->thread_info.resampled_chunk.length <= 0) { pa_memblock_unref(i->thread_info.resampled_chunk.memblock); pa_memchunk_reset(&i->thread_info.resampled_chunk); @@ -539,7 +542,7 @@ void pa_sink_input_drop(pa_sink_input *i, size_t length) { } if (length > 0) { - + if (i->thread_info.resampler) { /* So, we have a resampler. To avoid discontinuities we * have to actually read all data that could be read and @@ -548,31 +551,31 @@ void pa_sink_input_drop(pa_sink_input *i, size_t length) { while (length > 0) { pa_memchunk chunk; pa_cvolume volume; - + if (pa_sink_input_peek(i, length, &chunk, &volume) >= 0) { size_t l; - + pa_memblock_unref(chunk.memblock); l = chunk.length; if (l > length) l = length; - + pa_sink_input_drop(i, l); length -= l; - + } else { size_t l; - + l = pa_resampler_request(i->thread_info.resampler, length); - + /* Hmmm, peeking failed, so let's at least drop * the right amount of data */ if (l > 0) if (i->drop) i->drop(i, l); - + break; } } @@ -728,7 +731,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { memset(&info, 0, sizeof(info)); info.sink_input = i; - + if (!immediately) { pa_usec_t old_latency, new_latency; @@ -763,9 +766,9 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { } /* Okey, let's move it */ - + if (info.buffer_bytes > 0) { - + info.ghost_sink_input = pa_memblockq_sink_input_new( origin, "Ghost Stream", @@ -777,7 +780,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { info.ghost_sink_input->thread_info.state = info.ghost_sink_input->state = PA_SINK_INPUT_RUNNING; info.ghost_sink_input->thread_info.volume = info.ghost_sink_input->volume; info.ghost_sink_input->thread_info.muted = info.ghost_sink_input->muted; - + info.buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL); } } @@ -786,12 +789,12 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { if (info.ghost_sink_input) { /* Basically, do what pa_sink_input_put() does ...*/ - + pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, info.ghost_sink_input->index); pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_PUT], info.ghost_sink_input); pa_sink_input_unref(info.ghost_sink_input); } - + pa_idxset_remove_by_data(origin->inputs, i, NULL); pa_idxset_put(dest->inputs, i, NULL); i->sink = dest; @@ -828,14 +831,14 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { &dest->sample_spec); pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL); - + pa_sink_update_status(origin); pa_sink_update_status(dest); pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_MOVE_POST], i); - + pa_log_debug("Successfully moved sink input %i from %s to %s.", i->index, origin->name, dest->name); - + /* Notify everyone */ pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); @@ -864,13 +867,13 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t if (i->thread_info.resampled_chunk.memblock) *r += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &i->sink->sample_spec); - if (i->thread_info.move_silence) - *r += pa_bytes_to_usec(i->thread_info.move_silence, &i->sink->sample_spec); + if (i->thread_info.move_silence) + *r += pa_bytes_to_usec(i->thread_info.move_silence, &i->sink->sample_spec); return 0; } - case PA_SINK_INPUT_MESSAGE_SET_RATE: + case PA_SINK_INPUT_MESSAGE_SET_RATE: i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); pa_resampler_set_input_rate(i->thread_info.resampler, PA_PTR_TO_UINT(userdata)); @@ -879,11 +882,11 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t case PA_SINK_INPUT_MESSAGE_SET_STATE: { pa_sink_input *ssync; - + if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) && (i->thread_info.state != PA_SINK_INPUT_DRAINED) && (i->thread_info.state != PA_SINK_INPUT_RUNNING)) pa_atomic_store(&i->thread_info.drained, 1); - + i->thread_info.state = PA_PTR_TO_UINT(userdata); for (ssync = i->thread_info.sync_prev; ssync; ssync = ssync->thread_info.sync_prev) { @@ -892,14 +895,14 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t pa_atomic_store(&ssync->thread_info.drained, 1); ssync->thread_info.state = PA_PTR_TO_UINT(userdata); } - + for (ssync = i->thread_info.sync_next; ssync; ssync = ssync->thread_info.sync_next) { if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) && (ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING)) pa_atomic_store(&ssync->thread_info.drained, 1); ssync->thread_info.state = PA_PTR_TO_UINT(userdata); } - + return 0; } } @@ -915,4 +918,3 @@ pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i) { return i->state; } - diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 152c24e..fec431f 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -124,6 +124,8 @@ struct pa_sink_input { struct { pa_sink_input_state_t state; pa_atomic_t drained; + + pa_bool_t attached; /* True only between ->attach() and ->detach() calls */ pa_sample_spec sample_spec; diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 886d744..81258e7 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -155,7 +155,7 @@ void pa_sink_put(pa_sink* s) { pa_assert(s->rtpoll); s->thread_info.state = s->state = PA_SINK_IDLE; - + pa_source_put(s->monitor_source); pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index); @@ -164,7 +164,7 @@ void pa_sink_put(pa_sink* s) { static int sink_set_state(pa_sink *s, pa_sink_state_t state) { int ret; - + pa_assert(s); if (s->state == state) @@ -177,14 +177,14 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) { (PA_SINK_OPENED(s->state) && state == PA_SINK_SUSPENDED)) { pa_sink_input *i; uint32_t idx; - + /* We're suspending or resuming, tell everyone about it */ - + for (i = PA_SINK_INPUT(pa_idxset_first(s->inputs, &idx)); i; i = PA_SINK_INPUT(pa_idxset_next(s->inputs, &idx))) if (i->suspend) i->suspend(i, state == PA_SINK_SUSPENDED); } - + if (s->set_state) if ((ret = s->set_state(s, state)) < 0) return -1; @@ -206,7 +206,7 @@ void pa_sink_unlink(pa_sink* s) { pa_assert(PA_SINK_LINKED(s->state)); pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_UNLINK], s); - + pa_namereg_unregister(s->core, s->name); pa_idxset_remove_by_data(s->core->sinks, s, NULL); @@ -254,12 +254,12 @@ static void sink_free(pa_object *o) { while ((i = pa_hashmap_steal_first(s->thread_info.inputs))) pa_sink_input_unref(i); - + pa_hashmap_free(s->thread_info.inputs, NULL, NULL); if (s->silence) pa_memblock_unref(s->silence); - + pa_xfree(s->name); pa_xfree(s->description); pa_xfree(s->driver); @@ -401,7 +401,7 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) { pa_mix_info info[MAX_MIX_CHANNELS]; unsigned n; size_t block_size_max; - + pa_sink_assert_ref(s); pa_assert(PA_SINK_OPENED(s->thread_info.state)); pa_assert(pa_frame_aligned(length, &s->sample_spec)); @@ -417,7 +417,7 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) { length = pa_frame_align(block_size_max, &s->sample_spec); pa_assert(length > 0); - + n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, length, info, MAX_MIX_CHANNELS) : 0; if (n == 0) { @@ -595,10 +595,10 @@ void pa_sink_skip(pa_sink *s, size_t length) { pa_assert(PA_SINK_OPENED(s->thread_info.state)); pa_assert(length > 0); pa_assert(pa_frame_aligned(length, &s->sample_spec)); - + if (pa_source_used_by(s->monitor_source)) { pa_memchunk chunk; - + /* If something is connected to our monitor source, we have to * pass valid data to it */ @@ -609,7 +609,7 @@ void pa_sink_skip(pa_sink *s, size_t length) { pa_assert(chunk.length <= length); length -= chunk.length; } - + } else { /* Ok, noone cares about the rendered data, so let's not even render it */ @@ -792,7 +792,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse pa_assert(PA_SINK_LINKED(s->thread_info.state)); switch ((pa_sink_message_t) code) { - + case PA_SINK_MESSAGE_ADD_INPUT: { pa_sink_input *i = PA_SINK_INPUT(userdata); pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index), pa_sink_input_ref(i)); @@ -813,6 +813,9 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse i->thread_info.sync_next->thread_info.sync_prev = i; } + pa_assert(!i->thread_info.attached); + i->thread_info.attached = TRUE; + if (i->attach) i->attach(i); @@ -824,14 +827,17 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse if (i->detach) i->detach(i); - + + pa_assert(i->thread_info.attached); + i->thread_info.attached = FALSE; + /* Since the caller sleeps in pa_sink_input_unlink(), * we can safely access data outside of thread_info even * though it is mutable */ pa_assert(!i->thread_info.sync_prev); pa_assert(!i->thread_info.sync_next); - + if (i->thread_info.sync_prev) { i->thread_info.sync_prev->thread_info.sync_next = i->thread_info.sync_prev->sync_next; i->thread_info.sync_prev = NULL; @@ -841,10 +847,10 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse i->thread_info.sync_next->thread_info.sync_prev = i->thread_info.sync_next->sync_prev; i->thread_info.sync_next = NULL; } - + if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(i->index))) pa_sink_input_unref(i); - + return 0; } @@ -857,37 +863,37 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse pa_assert(!info->sink_input->sync_next); pa_assert(!info->sink_input->thread_info.sync_next); pa_assert(!info->sink_input->thread_info.sync_prev); - + if (info->ghost_sink_input) { pa_assert(info->buffer_bytes > 0); pa_assert(info->buffer); - + volume_is_norm = pa_cvolume_is_norm(&info->sink_input->thread_info.volume); pa_log_debug("Buffering %lu bytes ...", (unsigned long) info->buffer_bytes); - + while (info->buffer_bytes > 0) { pa_memchunk memchunk; pa_cvolume volume; size_t n; - + if (pa_sink_input_peek(info->sink_input, info->buffer_bytes, &memchunk, &volume) < 0) break; - + n = memchunk.length > info->buffer_bytes ? info->buffer_bytes : memchunk.length; pa_sink_input_drop(info->sink_input, n); memchunk.length = n; - + if (!volume_is_norm) { pa_memchunk_make_writable(&memchunk, 0); pa_volume_memchunk(&memchunk, &s->sample_spec, &volume); } - + if (pa_memblockq_push(info->buffer, &memchunk) < 0) { pa_memblock_unref(memchunk.memblock); break; } - + pa_memblock_unref(memchunk.memblock); info->buffer_bytes -= n; } @@ -910,7 +916,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(info->ghost_sink_input->index), pa_sink_input_ref(info->ghost_sink_input)); info->ghost_sink_input->thread_info.sync_prev = info->ghost_sink_input->thread_info.sync_next = NULL; } - + return 0; } @@ -934,7 +940,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse return 0; case PA_SINK_MESSAGE_SET_STATE: - + s->thread_info.state = PA_PTR_TO_UINT(userdata); return 0; @@ -951,7 +957,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse /* Reattach all streams */ pa_sink_attach_within_thread(s); break; - + case PA_SINK_MESSAGE_GET_LATENCY: case PA_SINK_MESSAGE_MAX: ; @@ -964,7 +970,7 @@ int pa_sink_suspend_all(pa_core *c, pa_bool_t suspend) { pa_sink *sink; uint32_t idx; int ret = 0; - + pa_core_assert_ref(c); for (sink = PA_SINK(pa_idxset_first(c->sinks, &idx)); sink; sink = PA_SINK(pa_idxset_next(c->sinks, &idx))) @@ -1016,4 +1022,3 @@ void pa_sink_attach_within_thread(pa_sink *s) { if (s->monitor_source) pa_source_attach_within_thread(s->monitor_source); } - diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index df5dc8c..34eef8b 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -156,6 +156,7 @@ pa_source_output* pa_source_output_new( o->userdata = NULL; o->thread_info.state = o->state; + o->thread_info.attached = FALSE; o->thread_info.sample_spec = o->sample_spec; o->thread_info.resampler = resampler; @@ -186,9 +187,9 @@ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t pa_assert_se(o->source->n_corked -- >= 1); else if (o->state != PA_SOURCE_OUTPUT_CORKED && state == PA_SOURCE_OUTPUT_CORKED) o->source->n_corked++; - + o->state = state; - + return 0; } @@ -197,7 +198,7 @@ void pa_source_output_unlink(pa_source_output*o) { pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state)); pa_hook_fire(&o->source->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK], o); - + pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); pa_idxset_remove_by_data(o->source->core->source_outputs, o, NULL); @@ -231,6 +232,8 @@ static void source_output_free(pa_object* mo) { pa_log_info("Freeing output %u \"%s\"", o->index, o->name); + pa_assert(!o->thread_info.attached); + if (o->thread_info.resampler) pa_resampler_free(o->thread_info.resampler); @@ -258,7 +261,7 @@ void pa_source_output_put(pa_source_output *o) { void pa_source_output_kill(pa_source_output*o) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state)); - + if (o->kill) o->kill(o); } @@ -357,7 +360,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { pa_source_output_assert_ref(o); pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state)); pa_source_assert_ref(dest); - + origin = o->source; if (dest == origin) @@ -399,7 +402,7 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { /* Okey, let's move it */ pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); - + pa_idxset_remove_by_data(origin->outputs, o, NULL); pa_idxset_put(dest->outputs, o, NULL); o->source = dest; @@ -448,7 +451,7 @@ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int return 0; } - + } return -1; diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h index 96ded86..60552d4 100644 --- a/src/pulsecore/source-output.h +++ b/src/pulsecore/source-output.h @@ -99,6 +99,8 @@ struct pa_source_output { struct { pa_source_output_state_t state; + + pa_bool_t attached; /* True only between ->attach() and ->detach() calls */ pa_sample_spec sample_spec; diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 315c2ce..8f9cbc4 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -139,7 +139,7 @@ void pa_source_put(pa_source *s) { static int source_set_state(pa_source *s, pa_source_state_t state) { int ret; - + pa_assert(s); if (s->state == state) @@ -147,14 +147,14 @@ static int source_set_state(pa_source *s, pa_source_state_t state) { if (state == PA_SOURCE_SUSPENDED && !(s->flags & PA_SOURCE_CAN_SUSPEND)) return -1; - + if ((s->state == PA_SOURCE_SUSPENDED && PA_SOURCE_OPENED(state)) || (PA_SOURCE_OPENED(s->state) && state == PA_SOURCE_SUSPENDED)) { pa_source_output *o; uint32_t idx; - + /* We're suspending or resuming, tell everyone about it */ - + for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx))) if (o->suspend) o->suspend(o, state == PA_SINK_SUSPENDED); @@ -180,7 +180,7 @@ void pa_source_unlink(pa_source *s) { pa_assert(s); pa_assert(PA_SOURCE_LINKED(s->state)); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s); pa_namereg_unregister(s->core, s->name); pa_idxset_remove_by_data(s->core->sources, s, NULL); @@ -202,7 +202,7 @@ void pa_source_unlink(pa_source *s) { pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s); } static void source_free(pa_object *o) { @@ -221,7 +221,7 @@ static void source_free(pa_object *o) { while ((so = pa_hashmap_steal_first(s->thread_info.outputs))) pa_source_output_unref(so); - + pa_hashmap_free(s->thread_info.outputs, NULL, NULL); pa_xfree(s->name); @@ -267,7 +267,7 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) { if (s->thread_info.state != PA_SOURCE_RUNNING) return; - + if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) { pa_memchunk vchunk = *chunk; @@ -436,7 +436,7 @@ unsigned pa_source_linked_by(pa_source *s) { unsigned pa_source_used_by(pa_source *s) { unsigned ret; - + pa_source_assert_ref(s); pa_assert(PA_SOURCE_LINKED(s->state)); @@ -456,9 +456,12 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ pa_source_output *o = PA_SOURCE_OUTPUT(userdata); pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o)); + pa_assert(!o->thread_info.attached); + o->thread_info.attached = TRUE; + if (o->attach) o->attach(o); - + return 0; } @@ -468,6 +471,9 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ if (o->detach) o->detach(o); + pa_assert(o->thread_info.attached); + o->thread_info.attached = FALSE; + if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index))) pa_source_output_unref(o); @@ -510,7 +516,7 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ /* Reattach all streams */ pa_source_attach_within_thread(s); break; - + case PA_SOURCE_MESSAGE_GET_LATENCY: case PA_SOURCE_MESSAGE_MAX: ; @@ -523,9 +529,9 @@ int pa_source_suspend_all(pa_core *c, pa_bool_t suspend) { uint32_t idx; pa_source *source; int ret = 0; - + pa_core_assert_ref(c); - + for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) ret -= pa_source_suspend(source, suspend) < 0; -- 2.7.4