From 5ae4eed52e911fb3e9712e1cd7d6095427515cd1 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 14 Sep 2007 23:26:17 +0000 Subject: [PATCH] Move attaching/detaching from a pa_rtpoll into pa_sink proper, remove it from module-combine git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1823 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/modules/module-combine.c | 44 +++++++++++++++------------------ src/pulsecore/sink-input.h | 5 +++- src/pulsecore/sink.c | 58 ++++++++++++++++++++++++++++++++++++++++++++ src/pulsecore/sink.h | 10 ++++++-- src/pulsecore/source.c | 53 ++++++++++++++++++++++++++++++++++++++++ src/pulsecore/source.h | 10 ++++++-- 6 files changed, 150 insertions(+), 30 deletions(-) diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c index 7df04ec..91561ee 100644 --- a/src/modules/module-combine.c +++ b/src/modules/module-combine.c @@ -133,9 +133,7 @@ struct userdata { }; enum { - SINK_MESSAGE_DETACH = PA_SINK_MESSAGE_MAX, - SINK_MESSAGE_ATTACH, - SINK_MESSAGE_ADD_OUTPUT, + SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX, SINK_MESSAGE_REMOVE_OUTPUT }; @@ -358,6 +356,15 @@ static void sink_input_attach_cb(pa_sink_input *i) { o = i->userdata; pa_assert(o); + if (o->userdata->master == o) { + /* Calling these two functions here is safe, because both + * threads that might access this sink input are known to be + * waiting for us. */ + pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq); + pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll); + pa_sink_attach_within_thread(o->userdata->sink); + } + pa_assert(!o->rtpoll_item); o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq( i->sink->rtpoll, @@ -376,6 +383,9 @@ static void sink_input_detach_cb(pa_sink_input *i) { pa_assert(o->rtpoll_item); pa_rtpoll_item_free(o->rtpoll_item); o->rtpoll_item = NULL; + + if (o->userdata->master == o) + pa_sink_detach_from_thread(o->userdata->sink); } /* Called from main context */ @@ -543,36 +553,20 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse break; } - case SINK_MESSAGE_DETACH: { - pa_sink_input *i; - void *state = NULL; + case PA_SINK_MESSAGE_DETACH: /* We're detaching all our input streams artificially, so - * that we can driver our sink from a different sink */ - - while ((i = pa_hashmap_iterate(u->sink->thread_info.inputs, &state, NULL))) - if (i->detach) - i->detach(i); + * that we can drive our sink from a different sink */ u->thread_info.master = NULL; - break; - } - case SINK_MESSAGE_ATTACH: { - pa_sink_input *i; - void *state = NULL; + case PA_SINK_MESSAGE_ATTACH: /* We're attached all our input streams artificially again */ - - while ((i = pa_hashmap_iterate(u->sink->thread_info.inputs, &state, NULL))) - if (i->attach) - i->attach(i); - - u->thread_info.master = data; + u->thread_info.master = data; break; - } case SINK_MESSAGE_ADD_OUTPUT: PA_LLIST_PREPEND(struct output, u->thread_info.outputs, (struct output*) data); @@ -655,7 +649,7 @@ static int update_master(struct userdata *u, struct output *o) { /* Make sure everything is detached from the old thread before we move our stuff to a new thread */ if (u->sink && PA_SINK_LINKED(pa_sink_get_state(u->sink))) - pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_DETACH, NULL, 0, NULL); + pa_sink_detach(u->sink); if (o) { /* If we have a master sink we run our own sink in its thread */ @@ -706,7 +700,7 @@ static int update_master(struct userdata *u, struct output *o) { /* Now attach everything again */ if (u->sink && PA_SINK_LINKED(pa_sink_get_state(u->sink))) - pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_ATTACH, u->master, 0, NULL); + pa_sink_attach(u->sink); return 0; } diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index c4e65b5..a101828 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -91,7 +91,10 @@ struct pa_sink_input { void (*drop) (pa_sink_input *i, size_t length); /* If non-NULL this function is called when the input is first - * connected to a sink. Called from IO thread context */ + * connected to a sink or when the rtpoll/asyncmsgq fields + * change. You usually don't need to implement this function + * unless you rewrite a sink that is piggy-backed onto + * another. Called from IO thread context */ void (*attach) (pa_sink_input *i); /* may be NULL */ /* If non-NULL this function is called when the output is diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index a7ed5a4..9b19188 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -901,6 +901,20 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse s->thread_info.state = PA_PTR_TO_UINT(userdata); return 0; + case PA_SINK_MESSAGE_DETACH: + + /* We're detaching all our input streams so that the + * asyncmsgq and rtpoll fields can be changed without + * problems */ + pa_sink_detach_within_thread(s); + break; + + case PA_SINK_MESSAGE_ATTACH: + + /* Reattach all streams */ + pa_sink_attach_within_thread(s); + break; + case PA_SINK_MESSAGE_GET_LATENCY: case PA_SINK_MESSAGE_MAX: ; @@ -922,3 +936,47 @@ int pa_sink_suspend_all(pa_core *c, int suspend) { return ret; } +void pa_sink_detach(pa_sink *s) { + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->state)); + + pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_DETACH, NULL, 0, NULL); +} + +void pa_sink_attach(pa_sink *s) { + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->state)); + + pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_ATTACH, NULL, 0, NULL); +} + +void pa_sink_detach_within_thread(pa_sink *s) { + pa_sink_input *i; + void *state = NULL; + + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->thread_info.state)); + + while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) + if (i->detach) + i->detach(i); + + if (s->monitor_source) + pa_source_detach_within_thread(s->monitor_source); +} + +void pa_sink_attach_within_thread(pa_sink *s) { + pa_sink_input *i; + void *state = NULL; + + pa_sink_assert_ref(s); + pa_assert(PA_SINK_LINKED(s->thread_info.state)); + + while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) + if (i->attach) + i->attach(i); + + if (s->monitor_source) + pa_source_attach_within_thread(s->monitor_source); +} + diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index b3fcff4..be883ed 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -122,6 +122,8 @@ typedef enum pa_sink_message { PA_SINK_MESSAGE_SET_STATE, PA_SINK_MESSAGE_PING, PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER, + PA_SINK_MESSAGE_ATTACH, + PA_SINK_MESSAGE_DETACH, PA_SINK_MESSAGE_MAX } pa_sink_message_t; @@ -143,6 +145,9 @@ void pa_sink_set_description(pa_sink *s, const char *description); void pa_sink_set_asyncmsgq(pa_sink *s, pa_asyncmsgq *q); void pa_sink_set_rtpoll(pa_sink *s, pa_rtpoll *p); +void pa_sink_detach(pa_sink *s); +void pa_sink_attach(pa_sink *s); + /* May be called by everyone, from main context */ pa_usec_t pa_sink_get_latency(pa_sink *s); @@ -173,8 +178,9 @@ void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target); void pa_sink_skip(pa_sink *s, size_t length); -int pa_sink_process_inputs(pa_sink *s); - int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); +void pa_sink_attach_within_thread(pa_sink *s); +void pa_sink_detach_within_thread(pa_sink *s); + #endif diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 34e023d..63ff9d7 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -483,6 +483,20 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ case PA_SOURCE_MESSAGE_SET_STATE: s->thread_info.state = PA_PTR_TO_UINT(userdata); return 0; + + case PA_SOURCE_MESSAGE_DETACH: + + /* We're detaching all our output streams so that the + * asyncmsgq and rtpoll fields can be changed without + * problems */ + pa_source_detach_within_thread(s); + break; + + case PA_SOURCE_MESSAGE_ATTACH: + + /* Reattach all streams */ + pa_source_attach_within_thread(s); + break; case PA_SOURCE_MESSAGE_GET_LATENCY: case PA_SOURCE_MESSAGE_MAX: @@ -504,3 +518,42 @@ int pa_source_suspend_all(pa_core *c, int suspend) { return ret; } + +void pa_source_detach(pa_source *s) { + pa_source_assert_ref(s); + pa_assert(PA_SOURCE_LINKED(s->state)); + + pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL); +} + +void pa_source_attach(pa_source *s) { + pa_source_assert_ref(s); + pa_assert(PA_SOURCE_LINKED(s->state)); + + pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL); +} + +void pa_source_detach_within_thread(pa_source *s) { + pa_source_output *o; + void *state = NULL; + + pa_source_assert_ref(s); + pa_assert(PA_SOURCE_LINKED(s->thread_info.state)); + + while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) + if (o->detach) + o->detach(o); +} + +void pa_source_attach_within_thread(pa_source *s) { + pa_source_output *o; + void *state = NULL; + + pa_source_assert_ref(s); + pa_assert(PA_SOURCE_LINKED(s->thread_info.state)); + + while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) + if (o->attach) + o->attach(o); + +} diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h index ddc6615..0fd1486 100644 --- a/src/pulsecore/source.h +++ b/src/pulsecore/source.h @@ -121,6 +121,8 @@ typedef enum pa_source_message { PA_SOURCE_MESSAGE_GET_LATENCY, PA_SOURCE_MESSAGE_SET_STATE, PA_SOURCE_MESSAGE_PING, + PA_SOURCE_MESSAGE_ATTACH, + PA_SOURCE_MESSAGE_DETACH, PA_SOURCE_MESSAGE_MAX } pa_source_message_t; @@ -142,6 +144,9 @@ void pa_source_set_description(pa_source *s, const char *description); void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q); void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p); +void pa_source_detach(pa_source *s); +void pa_source_attach(pa_source *s); + /* May be called by everyone, from main context */ pa_usec_t pa_source_get_latency(pa_source *s); @@ -164,8 +169,9 @@ unsigned pa_source_used_by(pa_source *s); void pa_source_post(pa_source*s, const pa_memchunk *b); -int pa_source_process_outputs(pa_source *o); - int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64_t, pa_memchunk *chunk); +void pa_source_attach_within_thread(pa_source *s); +void pa_source_detach_within_thread(pa_source *s); + #endif -- 2.7.4