From 1cecd46d9573d7bbe1a4e53b469b232a86e47b2a Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sat, 11 Aug 2007 23:46:51 +0000 Subject: [PATCH] Resurrect ability to move streams between sinks git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1649 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/sink-input.c | 334 ++++++++++++++++++++++----------------------- src/pulsecore/sink-input.h | 9 +- src/pulsecore/sink.c | 67 +++++++++ src/pulsecore/sink.h | 1 + 4 files changed, 239 insertions(+), 172 deletions(-) diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index 77b95fe..5fd1da4 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -202,7 +202,7 @@ pa_sink_input* pa_sink_input_new( pa_atomic_store(&i->thread_info.drained, 1); i->thread_info.sample_spec = i->sample_spec; i->thread_info.silence_memblock = NULL; -/* i->thread_info.move_silence = 0; */ + i->thread_info.move_silence = 0; pa_memchunk_reset(&i->thread_info.resampled_chunk); i->thread_info.resampler = resampler; i->thread_info.volume = i->volume; @@ -336,6 +336,7 @@ pa_usec_t pa_sink_input_get_latency(pa_sink_input *i) { return r; } +/* Called from thread context */ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) { int ret = -1; int do_volume_adj_here; @@ -350,24 +351,24 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) pa_assert(i->thread_info.state == PA_SINK_INPUT_RUNNING || i->thread_info.state == PA_SINK_INPUT_DRAINED); -/* if (i->thread_info.move_silence > 0) { */ -/* size_t l; */ + if (i->thread_info.move_silence > 0) { + size_t l; -/* /\* We have just been moved and shall play some silence for a */ -/* * while until the old sink has drained its playback buffer *\/ */ + /* We have just been moved and shall play some silence for a + * while until the old sink has drained its playback buffer */ -/* if (!i->thread_info.silence_memblock) */ -/* i->thread_info.silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH); */ + if (!i->thread_info.silence_memblock) + i->thread_info.silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH); -/* chunk->memblock = pa_memblock_ref(i->thread_info.silence_memblock); */ -/* chunk->index = 0; */ -/* l = pa_memblock_get_length(chunk->memblock); */ -/* chunk->length = i->move_silence < l ? i->move_silence : l; */ + chunk->memblock = pa_memblock_ref(i->thread_info.silence_memblock); + chunk->index = 0; + l = pa_memblock_get_length(chunk->memblock); + chunk->length = i->thread_info.move_silence < l ? i->thread_info.move_silence : l; -/* ret = 0; */ -/* do_volume_adj_here = 1; */ -/* goto finish; */ -/* } */ + ret = 0; + do_volume_adj_here = 1; + goto finish; + } if (!i->thread_info.resampler) { do_volume_adj_here = 0; /* FIXME??? */ @@ -437,36 +438,25 @@ finish: return ret; } +/* Called from thread context */ void pa_sink_input_drop(pa_sink_input *i, size_t length) { pa_sink_input_assert_ref(i); pa_assert(length > 0); -/* if (i->move_silence > 0) { */ - -/* if (chunk) { */ -/* size_t l; */ - -/* l = pa_memblock_get_length(i->silence_memblock); */ - -/* if (chunk->memblock != i->silence_memblock || */ -/* chunk->index != 0 || */ -/* (chunk->memblock && (chunk->length != (l < i->move_silence ? l : i->move_silence)))) */ -/* return; */ + if (i->thread_info.move_silence > 0) { -/* } */ + pa_assert(i->thread_info.move_silence >= length); -/* pa_assert(i->move_silence >= length); */ + i->thread_info.move_silence -= length; -/* i->move_silence -= length; */ - -/* if (i->move_silence <= 0) { */ -/* pa_assert(i->silence_memblock); */ -/* pa_memblock_unref(i->silence_memblock); */ -/* i->silence_memblock = NULL; */ -/* } */ + if (i->thread_info.move_silence <= 0) { + pa_assert(i->thread_info.silence_memblock); + pa_memblock_unref(i->thread_info.silence_memblock); + i->thread_info.silence_memblock = NULL; + } -/* return; */ -/* } */ + return; + } if (i->thread_info.resampled_chunk.memblock) { size_t l = length; @@ -509,11 +499,14 @@ void pa_sink_input_drop(pa_sink_input *i, size_t length) { length -= l; } else { + size_t l; + /* Hmmm, peeking failed, so let's at least drop * the right amount of data */ - - if (i->drop) - i->drop(i, pa_resampler_request(i->thread_info.resampler, length)); + + if ((l = pa_resampler_request(i->thread_info.resampler, length)) > 0) + if (i->drop) + i->drop(i, l); break; } @@ -609,166 +602,166 @@ pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i) { } int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { -/* pa_resampler *new_resampler = NULL; */ -/* pa_memblockq *buffer = NULL; */ -/* pa_sink *origin; */ + pa_resampler *new_resampler = NULL; + pa_sink *origin; + pa_usec_t silence_usec = 0; + pa_sink_input_move_info info; pa_sink_input_assert_ref(i); pa_sink_assert_ref(dest); - return -1; + origin = i->sink; -/* origin = i->sink; */ + if (dest == origin) + return 0; -/* if (dest == origin) */ -/* return 0; */ + if (i->sync_next || i->sync_prev) { + pa_log_warn("Moving synchronised streams not supported."); + return -1; + } -/* if (pa_idxset_size(dest->inputs) >= PA_MAX_INPUTS_PER_SINK) { */ -/* pa_log_warn("Failed to move sink input: too many inputs per sink."); */ -/* return -1; */ -/* } */ + if (pa_idxset_size(dest->inputs) >= PA_MAX_INPUTS_PER_SINK) { + pa_log_warn("Failed to move sink input: too many inputs per sink."); + return -1; + } -/* if (i->resampler && */ -/* pa_sample_spec_equal(&origin->sample_spec, &dest->sample_spec) && */ -/* pa_channel_map_equal(&origin->channel_map, &dest->channel_map)) */ + if (i->thread_info.resampler && + pa_sample_spec_equal(&origin->sample_spec, &dest->sample_spec) && + pa_channel_map_equal(&origin->channel_map, &dest->channel_map)) -/* /\* Try to reuse the old resampler if possible *\/ */ -/* new_resampler = i->resampler; */ + /* Try to reuse the old resampler if possible */ + new_resampler = i->thread_info.resampler; -/* else if ((i->flags & PA_SINK_INPUT_VARIABLE_RATE) || */ -/* !pa_sample_spec_equal(&i->sample_spec, &dest->sample_spec) || */ -/* !pa_channel_map_equal(&i->channel_map, &dest->channel_map)) { */ + else if ((i->flags & PA_SINK_INPUT_VARIABLE_RATE) || + !pa_sample_spec_equal(&i->sample_spec, &dest->sample_spec) || + !pa_channel_map_equal(&i->channel_map, &dest->channel_map)) { -/* /\* Okey, we need a new resampler for the new sink *\/ */ + /* Okey, we need a new resampler for the new sink */ -/* if (!(new_resampler = pa_resampler_new( */ -/* dest->core->mempool, */ -/* &i->sample_spec, &i->channel_map, */ -/* &dest->sample_spec, &dest->channel_map, */ -/* i->resample_method))) { */ -/* pa_log_warn("Unsupported resampling operation."); */ -/* return -1; */ -/* } */ -/* } */ + if (!(new_resampler = pa_resampler_new( + dest->core->mempool, + &i->sample_spec, &i->channel_map, + &dest->sample_spec, &dest->channel_map, + i->resample_method))) { + pa_log_warn("Unsupported resampling operation."); + return -1; + } + } -/* if (!immediately) { */ -/* pa_usec_t old_latency, new_latency; */ -/* pa_usec_t silence_usec = 0; */ + pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_MOVE], i); -/* buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL); */ - -/* /\* Let's do a little bit of Voodoo for compensating latency */ -/* * differences *\/ */ + memset(&info, 0, sizeof(info)); + info.sink_input = i; + + if (!immediately) { + pa_usec_t old_latency, new_latency; -/* old_latency = pa_sink_get_latency(origin); */ -/* new_latency = pa_sink_get_latency(dest); */ + /* Let's do a little bit of Voodoo for compensating latency + * differences. We assume that the accuracy for our + * estimations is still good enough, even though we do these + * operations non-atomic. */ -/* /\* The already resampled data should go to the old sink *\/ */ - -/* if (old_latency >= new_latency) { */ + old_latency = pa_sink_get_latency(origin); + new_latency = pa_sink_get_latency(dest); -/* /\* The latency of the old sink is larger than the latency */ -/* * of the new sink. Therefore to compensate for the */ -/* * difference we to play silence on the new one for a */ -/* * while *\/ */ + /* The already resampled data should go to the old sink */ -/* silence_usec = old_latency - new_latency; */ + if (old_latency >= new_latency) { -/* } else { */ -/* size_t l; */ -/* int volume_is_norm; */ + /* The latency of the old sink is larger than the latency + * of the new sink. Therefore to compensate for the + * difference we to play silence on the new one for a + * while */ -/* /\* The latency of new sink is larger than the latency of */ -/* * the old sink. Therefore we have to precompute a little */ -/* * and make sure that this is still played on the old */ -/* * sink, until we can play the first sample on the new */ -/* * sink.*\/ */ + silence_usec = old_latency - new_latency; -/* l = pa_usec_to_bytes(new_latency - old_latency, &origin->sample_spec); */ + } else { -/* volume_is_norm = pa_cvolume_is_norm(&i->volume); */ + /* The latency of new sink is larger than the latency of + * the old sink. Therefore we have to precompute a little + * and make sure that this is still played on the old + * sink, until we can play the first sample on the new + * sink.*/ -/* while (l > 0) { */ -/* pa_memchunk chunk; */ -/* pa_cvolume volume; */ -/* size_t n; */ + info.buffer_bytes = pa_usec_to_bytes(new_latency - old_latency, &origin->sample_spec); + } -/* if (pa_sink_input_peek(i, &chunk, &volume) < 0) */ -/* break; */ + /* Okey, let's move it */ + + if (info.buffer_bytes > 0) { + + info.ghost_sink_input = pa_memblockq_sink_input_new( + origin, + "Ghost Stream", + &origin->sample_spec, + &origin->channel_map, + NULL, + NULL); + + info.buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL); + } + } -/* n = chunk.length > l ? l : chunk.length; */ -/* pa_sink_input_drop(i, &chunk, n); */ -/* chunk.length = n; */ + pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER, &info, 0, NULL); -/* if (!volume_is_norm) { */ -/* pa_memchunk_make_writable(&chunk, 0); */ -/* pa_volume_memchunk(&chunk, &origin->sample_spec, &volume); */ -/* } */ + if (info.ghost_sink_input) { + /* Basically, do what pa_sink_input_put() does ...*/ + pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, info.ghost_sink_input->index); + pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_PUT], info.ghost_sink_input); + pa_sink_input_unref(info.ghost_sink_input); + } + + pa_idxset_remove_by_data(origin->inputs, i, NULL); + pa_idxset_put(dest->inputs, i, NULL); + i->sink = dest; + + /* Replace resampler */ + if (new_resampler != i->thread_info.resampler) { + if (i->thread_info.resampler) + pa_resampler_free(i->thread_info.resampler); + i->thread_info.resampler = new_resampler; + + /* if the resampler changed, the silence memblock is + * probably invalid now, too */ + if (i->thread_info.silence_memblock) { + pa_memblock_unref(i->thread_info.silence_memblock); + i->thread_info.silence_memblock = NULL; + } + } -/* if (pa_memblockq_push(buffer, &chunk) < 0) { */ -/* pa_memblock_unref(chunk.memblock); */ -/* break; */ -/* } */ + /* Dump already resampled data */ + if (i->thread_info.resampled_chunk.memblock) { + /* Hmm, this data has already been added to the ghost queue, presumably, hence let's sleep a little bit longer */ + silence_usec += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &origin->sample_spec); + pa_memblock_unref(i->thread_info.resampled_chunk.memblock); + pa_memchunk_reset(&i->thread_info.resampled_chunk); + } -/* pa_memblock_unref(chunk.memblock); */ -/* l -= n; */ -/* } */ -/* } */ + /* Calculate the new sleeping time */ + if (immediately) + i->thread_info.move_silence = 0; + else + i->thread_info.move_silence = pa_usec_to_bytes( + pa_bytes_to_usec(i->thread_info.move_silence, &i->sample_spec) + + silence_usec, + &i->sample_spec); -/* if (i->resampled_chunk.memblock) { */ + pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL); + + pa_sink_update_status(origin); + pa_sink_update_status(dest); -/* /\* There is still some data left in the already resampled */ -/* * memory block. Hence, let's output it on the old sink */ -/* * and sleep so long on the new sink *\/ */ - -/* pa_memblockq_push(buffer, &i->resampled_chunk); */ -/* silence_usec += pa_bytes_to_usec(i->resampled_chunk.length, &origin->sample_spec); */ -/* } */ - -/* /\* Calculate the new sleeping time *\/ */ -/* i->move_silence = pa_usec_to_bytes( */ -/* pa_bytes_to_usec(i->move_silence, &i->sample_spec) + */ -/* silence_usec, */ -/* &i->sample_spec); */ -/* } */ - -/* /\* Okey, let's move it *\/ */ -/* pa_idxset_remove_by_data(origin->inputs, i, NULL); */ -/* pa_idxset_put(dest->inputs, i, NULL); */ -/* i->sink = dest; */ - -/* /\* Replace resampler *\/ */ -/* if (new_resampler != i->resampler) { */ -/* if (i->resampler) */ -/* pa_resampler_free(i->resampler); */ -/* i->resampler = new_resampler; */ - -/* /\* if the resampler changed, the silence memblock is */ -/* * probably invalid now, too *\/ */ -/* if (i->silence_memblock) { */ -/* pa_memblock_unref(i->silence_memblock); */ -/* i->silence_memblock = NULL; */ -/* } */ -/* } */ + pa_hook_fire(&i->sink->core->hooks[PA_CORE_HOOK_SINK_INPUT_MOVE_POST], i); + + pa_log_debug("Successfully moved sink input %i from %s to %s.", i->index, origin->name, dest->name); + + /* Notify everyone */ + pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); -/* /\* Dump already resampled data *\/ */ -/* if (i->resampled_chunk.memblock) { */ -/* pa_memblock_unref(i->resampled_chunk.memblock); */ -/* i->resampled_chunk.memblock = NULL; */ -/* i->resampled_chunk.index = i->resampled_chunk.length = 0; */ -/* } */ - -/* /\* Notify everyone *\/ */ -/* pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); */ -/* pa_sink_notify(i->sink); */ - -/* /\* Ok, now let's feed the precomputed buffer to the old sink *\/ */ -/* if (buffer) */ -/* pa_play_memblockq(origin, "Ghost Stream", &origin->sample_spec, &origin->channel_map, buffer, NULL); */ - -/* return 0; */ + return 0; } +/* Called from thread context */ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { pa_sink_input *i = PA_SINK_INPUT(o); @@ -789,19 +782,18 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t if (i->thread_info.resampled_chunk.memblock) *r += pa_bytes_to_usec(i->thread_info.resampled_chunk.length, &i->sink->sample_spec); -/* if (i->move_silence) */ -/* r += pa_bytes_to_usec(i->move_silence, &i->sink->sample_spec); */ + if (i->thread_info.move_silence) + *r += pa_bytes_to_usec(i->thread_info.move_silence, &i->sink->sample_spec); return 0; } - case PA_SINK_INPUT_MESSAGE_SET_RATE: { + case PA_SINK_INPUT_MESSAGE_SET_RATE: i->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); pa_resampler_set_input_rate(i->thread_info.resampler, PA_PTR_TO_UINT(userdata)); return 0; - } case PA_SINK_INPUT_MESSAGE_SET_STATE: { pa_sink_input *ssync; diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 5a48418..d728d46 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -95,7 +95,7 @@ struct pa_sink_input { /* Some silence to play before the actual data. This is used to * compensate for latency differences when moving a sink input * "hot" between sinks. */ - /* size_t move_silence; */ + size_t move_silence; pa_memblock *silence_memblock; /* may be NULL */ pa_sink_input *sync_prev, *sync_next; @@ -188,4 +188,11 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) void pa_sink_input_drop(pa_sink_input *i, size_t length); int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); +typedef struct pa_sink_input_move_info { + pa_sink_input *sink_input; + pa_sink_input *ghost_sink_input; + pa_memblockq *buffer; + size_t buffer_bytes; +} pa_sink_input_move_info; + #endif diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 929542c..df08ff6 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -42,6 +42,7 @@ #include #include #include +#include #include "sink.h" @@ -721,6 +722,72 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse return 0; } + case PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER: { + pa_sink_input_move_info *info = userdata; + int volume_is_norm; + + /* We don't support moving synchronized streams. */ + pa_assert(!info->sink_input->sync_prev); + pa_assert(!info->sink_input->sync_next); + pa_assert(!info->sink_input->thread_info.sync_next); + pa_assert(!info->sink_input->thread_info.sync_prev); + + if (info->ghost_sink_input) { + pa_assert(info->buffer_bytes > 0); + pa_assert(info->buffer); + + volume_is_norm = pa_cvolume_is_norm(&info->sink_input->thread_info.volume); + + pa_log_debug("Buffering %u bytes ...", info->buffer_bytes); + + while (info->buffer_bytes > 0) { + pa_memchunk memchunk; + pa_cvolume volume; + size_t n; + + if (pa_sink_input_peek(info->sink_input, &memchunk, &volume) < 0) + break; + + n = memchunk.length > info->buffer_bytes ? info->buffer_bytes : memchunk.length; + pa_sink_input_drop(info->sink_input, n); + memchunk.length = n; + + if (!volume_is_norm) { + pa_memchunk_make_writable(&memchunk, 0); + pa_volume_memchunk(&memchunk, &s->sample_spec, &volume); + } + + if (pa_memblockq_push(info->buffer, &memchunk) < 0) { + pa_memblock_unref(memchunk.memblock); + break; + } + + pa_memblock_unref(memchunk.memblock); + info->buffer_bytes -= n; + } + + /* Add the remaining already resampled chunk to the buffer */ + if (info->sink_input->thread_info.resampled_chunk.memblock) + pa_memblockq_push(info->buffer, &info->sink_input->thread_info.resampled_chunk); + + pa_memblockq_sink_input_set_queue(info->ghost_sink_input, info->buffer); + + pa_log_debug("Buffered %u bytes ...", pa_memblockq_get_length(info->buffer)); + } + + /* Let's remove the sink input ...*/ + if (pa_hashmap_remove(s->thread_info.inputs, PA_UINT32_TO_PTR(info->sink_input->index))) + pa_sink_input_unref(info->sink_input); + + /* .. and add the ghost sink input instead */ + if (info->ghost_sink_input) { + pa_hashmap_put(s->thread_info.inputs, PA_UINT32_TO_PTR(info->ghost_sink_input->index), pa_sink_input_ref(info->ghost_sink_input)); + info->ghost_sink_input->thread_info.sync_prev = info->ghost_sink_input->thread_info.sync_next = NULL; + } + + return 0; + } + case PA_SINK_MESSAGE_SET_VOLUME: s->thread_info.soft_volume = *((pa_cvolume*) userdata); return 0; diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index 494bb6a..8a6fa23 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -110,6 +110,7 @@ typedef enum pa_sink_message { PA_SINK_MESSAGE_GET_LATENCY, PA_SINK_MESSAGE_SET_STATE, PA_SINK_MESSAGE_PING, + PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER, PA_SINK_MESSAGE_MAX } pa_sink_message_t; -- 2.7.4