From d6a22032078bcb260eb2e0dd2ca36235c5251115 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 1 Oct 2007 16:39:04 +0000 Subject: [PATCH] Fix race condition between IO thread creation and pa_sink_put(). Move activation of rtpoll fds when we change the state INIT->IDLE. git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1922 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/modules/module-alsa-sink.c | 8 +- src/modules/module-alsa-source.c | 12 +-- src/modules/module-ladspa-sink.c | 2 +- src/modules/module-oss.c | 176 +++++++++++++++++++++++++-------------- src/modules/module-remap-sink.c | 2 +- src/pulsecore/sink.c | 32 +++---- src/pulsecore/source.c | 28 +++---- 7 files changed, 158 insertions(+), 102 deletions(-) diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 0489fa8..bb78bca 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -450,6 +450,11 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case PA_SINK_IDLE: case PA_SINK_RUNNING: + if (u->sink->thread_info.state == PA_SINK_INIT) { + if (build_pollfd(u) < 0) + return -1; + } + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { if (unsuspend(u) < 0) return -1; @@ -604,9 +609,6 @@ static void thread_func(void *userdata) { pa_thread_mq_install(&u->thread_mq); pa_rtpoll_install(u->rtpoll); - if (build_pollfd(u) < 0) - goto fail; - for (;;) { int ret; diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index de019ff..0f19a4a 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -437,6 +437,13 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off case PA_SOURCE_IDLE: case PA_SOURCE_RUNNING: + if (u->source->thread_info.state == PA_SOURCE_INIT) { + if (build_pollfd(u) < 0) + return -1; + + snd_pcm_start(u->pcm_handle); + } + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { if (unsuspend(u) < 0) return -1; @@ -591,11 +598,6 @@ static void thread_func(void *userdata) { pa_thread_mq_install(&u->thread_mq); pa_rtpoll_install(u->rtpoll); - if (build_pollfd(u) < 0) - goto fail; - - snd_pcm_start(u->pcm_handle); - for (;;) { int ret; diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c index 9f771c6..29ca89f 100644 --- a/src/modules/module-ladspa-sink.c +++ b/src/modules/module-ladspa-sink.c @@ -114,7 +114,7 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) { pa_sink_assert_ref(s); pa_assert_se(u = s->userdata); - if (PA_SINK_LINKED(state) && u->sink_input) + if (PA_SINK_LINKED(state) && u->sink_input && PA_SINK_INPUT_LINKED(pa_sink_input_get_state(u->sink_input))) pa_sink_input_cork(u->sink_input, state == PA_SINK_SUSPENDED); return 0; diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index 82b5b6c..cb975fd 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -110,10 +110,10 @@ struct userdata { size_t frame_size; uint32_t in_fragment_size, out_fragment_size, in_nfrags, out_nfrags, in_hwbuf_size, out_hwbuf_size; - int use_getospace, use_getispace; - int use_getodelay; + pa_bool_t use_getospace, use_getispace; + pa_bool_t use_getodelay; - int sink_suspended, source_suspended; + pa_bool_t sink_suspended, source_suspended; int fd; int mode; @@ -123,7 +123,7 @@ struct userdata { int nfrags, frag_size; - int use_mmap; + pa_bool_t use_mmap; unsigned out_mmap_current, in_mmap_current; void *in_mmap, *out_mmap; pa_memblock **in_mmap_memblocks, **out_mmap_memblocks; @@ -149,7 +149,7 @@ static const char* const valid_modargs[] = { NULL }; -static void trigger(struct userdata *u, int quick) { +static void trigger(struct userdata *u, pa_bool_t quick) { int enable_bits = 0, zero = 0; pa_assert(u); @@ -157,7 +157,7 @@ static void trigger(struct userdata *u, int quick) { if (u->fd < 0) return; -/* pa_log_debug("trigger"); */ + pa_log_debug("trigger"); if (u->source && PA_SOURCE_OPENED(u->source->thread_info.state)) enable_bits |= PCM_ENABLE_INPUT; @@ -165,6 +165,9 @@ static void trigger(struct userdata *u, int quick) { if (u->sink && PA_SINK_OPENED(u->sink->thread_info.state)) enable_bits |= PCM_ENABLE_OUTPUT; + pa_log_debug("trigger: %i", enable_bits); + + if (u->use_mmap) { if (!quick) @@ -327,6 +330,8 @@ static int mmap_read(struct userdata *u) { return -1; } +/* pa_log("... %i", info.blocks); */ + info.blocks += u->in_mmap_saved_nfrags; u->in_mmap_saved_nfrags = 0; @@ -438,6 +443,22 @@ static pa_usec_t io_source_get_latency(struct userdata *u) { return r; } +static void build_pollfd(struct userdata *u) { + struct pollfd *pollfd; + + pa_assert(u); + pa_assert(u->fd >= 0); + + if (u->rtpoll_item) + pa_rtpoll_item_free(u->rtpoll_item); + + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + pollfd->fd = u->fd; + pollfd->events = 0; + pollfd->revents = 0; +} + static int suspend(struct userdata *u) { pa_assert(u); pa_assert(u->fd >= 0); @@ -493,7 +514,6 @@ static int unsuspend(struct userdata *u) { int frag_size, in_frag_size, out_frag_size; int in_nfrags, out_nfrags; struct audio_buf_info info; - struct pollfd *pollfd; pa_assert(u); pa_assert(u->fd < 0); @@ -575,11 +595,7 @@ static int unsuspend(struct userdata *u) { pa_assert(!u->rtpoll_item); - u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); - pollfd->fd = u->fd; - pollfd->events = 0; - pollfd->revents = 0; + build_pollfd(u); if (u->sink) pa_sink_get_volume(u->sink); @@ -598,7 +614,8 @@ fail: static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; - int do_trigger = 0, ret, quick = 1; + int ret; + pa_bool_t do_trigger = FALSE, quick = TRUE; switch (code) { @@ -629,28 +646,33 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return -1; } - do_trigger = 1; + do_trigger = TRUE; - u->sink_suspended = 1; + u->sink_suspended = TRUE; break; case PA_SINK_IDLE: case PA_SINK_RUNNING: + if (u->sink->thread_info.state == PA_SINK_INIT) { + do_trigger = TRUE; + quick = u->source && PA_SOURCE_OPENED(u->source->thread_info.state); + } + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { if (!u->source || u->source_suspended) { if (unsuspend(u) < 0) return -1; - quick = 0; + quick = FALSE; } - do_trigger = 1; + do_trigger = TRUE; u->out_mmap_current = 0; u->out_mmap_saved_nfrags = 0; - u->sink_suspended = 0; + u->sink_suspended = FALSE; } break; @@ -674,7 +696,8 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SOURCE(o)->userdata; - int do_trigger = 0, ret, quick = 1; + int ret; + int do_trigger = FALSE, quick = TRUE; switch (code) { @@ -703,28 +726,33 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return -1; } - do_trigger = 1; + do_trigger = TRUE; - u->source_suspended = 1; + u->source_suspended = TRUE; break; case PA_SOURCE_IDLE: case PA_SOURCE_RUNNING: + if (u->source->thread_info.state == PA_SOURCE_INIT) { + do_trigger = TRUE; + quick = u->sink && PA_SINK_OPENED(u->sink->thread_info.state); + } + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { if (!u->sink || u->sink_suspended) { if (unsuspend(u) < 0) return -1; - quick = 0; + quick = FALSE; } - do_trigger = 1; + do_trigger = TRUE; u->in_mmap_current = 0; u->in_mmap_saved_nfrags = 0; - u->source_suspended = 0; + u->source_suspended = FALSE; } break; @@ -840,8 +868,6 @@ static void thread_func(void *userdata) { pa_thread_mq_install(&u->thread_mq); pa_rtpoll_install(u->rtpoll); - trigger(u, 0); - for (;;) { int ret; @@ -849,7 +875,7 @@ static void thread_func(void *userdata) { /* Render some data and write it to the dsp */ - if (u->sink && PA_SINK_OPENED(u->sink->thread_info.state) && (revents & POLLOUT)) { + if (u->sink && PA_SINK_OPENED(u->sink->thread_info.state) && ((revents & POLLOUT) || u->use_mmap || u->use_getospace)) { if (u->use_mmap) { @@ -863,7 +889,7 @@ static void thread_func(void *userdata) { } else { ssize_t l; - int loop = 0; + pa_bool_t loop = FALSE, work_done = FALSE; l = u->out_fragment_size; @@ -872,21 +898,35 @@ static void thread_func(void *userdata) { if (ioctl(u->fd, SNDCTL_DSP_GETOSPACE, &info) < 0) { pa_log_info("Device doesn't support SNDCTL_DSP_GETOSPACE: %s", pa_cstrerror(errno)); - u->use_getospace = 0; + u->use_getospace = FALSE; } else { - if (info.bytes >= l) { - l = (info.bytes/l)*l; - loop = 1; - } + l = info.bytes; + + /* We loop only if GETOSPACE worked and we + * actually *know* that we can write more than + * one fragment at a time */ + loop = TRUE; } } - do { + /* Round down to multiples of the fragment size, + * because OSS needs that (at least some versions + * do) */ + l = (l/u->out_fragment_size) * u->out_fragment_size; + + /* Hmm, so poll() signalled us that we can read + * something, but GETOSPACE told us there was nothing? + * Hmm, make the best of it, try to read some data, to + * avoid spinning forever. */ + if (l <= 0 && (revents & POLLOUT)) { + l = u->out_fragment_size; + loop = FALSE; + } + + while (l > 0) { void *p; ssize_t t; - pa_assert(l > 0); - if (u->memchunk.length <= 0) pa_sink_render(u->sink, l, &u->memchunk); @@ -929,17 +969,21 @@ static void thread_func(void *userdata) { l -= t; revents &= ~POLLOUT; + work_done = TRUE; } - } while (loop && l > 0); + if (!loop) + break; + } - continue; + if (work_done) + continue; } } - /* Try to read some data and pass it on to the source driver */ + /* Try to read some data and pass it on to the source driver. */ - if (u->source && PA_SOURCE_OPENED(u->source->thread_info.state) && ((revents & POLLIN))) { + if (u->source && PA_SOURCE_OPENED(u->source->thread_info.state) && ((revents & POLLIN) || u->use_mmap || u->use_getispace)) { if (u->use_mmap) { @@ -956,7 +1000,7 @@ static void thread_func(void *userdata) { void *p; ssize_t l; pa_memchunk memchunk; - int loop = 0; + pa_bool_t loop = FALSE, work_done = FALSE; l = u->in_fragment_size; @@ -965,16 +1009,21 @@ static void thread_func(void *userdata) { if (ioctl(u->fd, SNDCTL_DSP_GETISPACE, &info) < 0) { pa_log_info("Device doesn't support SNDCTL_DSP_GETISPACE: %s", pa_cstrerror(errno)); - u->use_getispace = 0; + u->use_getispace = FALSE; } else { - if (info.bytes >= l) { - l = (info.bytes/l)*l; - loop = 1; - } + l = info.bytes; + loop = TRUE; } } - do { + l = (l/u->in_fragment_size) * u->in_fragment_size; + + if (l <= 0 && (revents & POLLIN)) { + l = u->in_fragment_size; + loop = FALSE; + } + + while (l > 0) { ssize_t t, k; pa_assert(l > 0); @@ -1023,18 +1072,25 @@ static void thread_func(void *userdata) { l -= t; revents &= ~POLLIN; + work_done = TRUE; } - } while (loop && l > 0); - continue; + if (!loop) + break; + } + + if (work_done) + continue; } } -/* pa_log("loop2"); */ +/* pa_log("loop2 revents=%i", revents); */ - if (u->fd >= 0) { + if (u->rtpoll_item) { struct pollfd *pollfd; + pa_assert(u->fd >= 0); + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); pollfd->events = ((u->source && PA_SOURCE_OPENED(u->source->thread_info.state)) ? POLLIN : 0) | @@ -1042,13 +1098,13 @@ static void thread_func(void *userdata) { } /* Hmm, nothing to do. Let's sleep */ - if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) + if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) goto fail; if (ret == 0) goto finish; - if (u->fd >= 0) { + if (u->rtpoll_item) { struct pollfd *pollfd; pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); @@ -1088,7 +1144,6 @@ int pa__init(pa_module*m) { char hwdesc[64], *t; const char *name; int namereg_fail; - struct pollfd *pollfd; pa_assert(m); @@ -1180,11 +1235,8 @@ int pa__init(pa_module*m) { pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = pa_rtpoll_new(); pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); - u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); - pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); - pollfd->fd = fd; - pollfd->events = 0; - pollfd->revents = 0; + u->rtpoll_item = NULL; + build_pollfd(u); if (ioctl(fd, SNDCTL_DSP_GETISPACE, &info) >= 0) { pa_log_info("Input -- %u fragments of size %u.", info.fragstotal, info.fragsize); @@ -1244,7 +1296,7 @@ int pa__init(pa_module*m) { use_mmap ? " via DMA" : "")); pa_xfree(t); u->source->flags = PA_SOURCE_HARDWARE|PA_SOURCE_LATENCY; - u->source->refresh_volume = 1; + u->source->refresh_volume = TRUE; if (use_mmap) u->in_mmap_memblocks = pa_xnew0(pa_memblock*, u->in_nfrags); @@ -1261,7 +1313,7 @@ int pa__init(pa_module*m) { goto go_on; } else { pa_log_warn("mmap(PROT_WRITE) failed, reverting to non-mmap mode: %s", pa_cstrerror(errno)); - u->use_mmap = use_mmap = 0; + u->use_mmap = (use_mmap = FALSE); u->out_mmap = NULL; } } else { @@ -1299,7 +1351,7 @@ int pa__init(pa_module*m) { use_mmap ? " via DMA" : "")); pa_xfree(t); u->sink->flags = PA_SINK_HARDWARE|PA_SINK_LATENCY; - u->sink->refresh_volume = 1; + u->sink->refresh_volume = TRUE; if (use_mmap) u->out_mmap_memblocks = pa_xnew0(pa_memblock*, u->out_nfrags); diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c index d712a04..e863c0c 100644 --- a/src/modules/module-remap-sink.c +++ b/src/modules/module-remap-sink.c @@ -100,7 +100,7 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) { pa_sink_assert_ref(s); pa_assert_se(u = s->userdata); - if (PA_SINK_LINKED(state) && u->sink_input) + if (PA_SINK_LINKED(state) && u->sink_input && PA_SINK_INPUT_LINKED(pa_sink_input_get_state(u->sink_input))) pa_sink_input_cork(u->sink_input, state == PA_SINK_SUSPENDED); return 0; diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 38e7d63..dccb34c 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -147,21 +147,6 @@ pa_sink* pa_sink_new( return s; } -void pa_sink_put(pa_sink* s) { - pa_sink_assert_ref(s); - - pa_assert(s->state == PA_SINK_INIT); - pa_assert(s->asyncmsgq); - pa_assert(s->rtpoll); - - s->thread_info.state = s->state = PA_SINK_IDLE; - - pa_source_put(s->monitor_source); - - pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_NEW_POST], s); -} - static int sink_set_state(pa_sink *s, pa_sink_state_t state) { int ret; @@ -196,6 +181,21 @@ static int sink_set_state(pa_sink *s, pa_sink_state_t state) { return 0; } +void pa_sink_put(pa_sink* s) { + pa_sink_assert_ref(s); + + pa_assert(s->state == PA_SINK_INIT); + pa_assert(s->asyncmsgq); + pa_assert(s->rtpoll); + + pa_assert_se(sink_set_state(s, PA_SINK_IDLE) == 0); + + pa_source_put(s->monitor_source); + + pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SINK_NEW_POST], s); +} + void pa_sink_unlink(pa_sink* s) { pa_bool_t linked; pa_sink_input *i, *j = NULL; @@ -806,7 +806,7 @@ unsigned pa_sink_used_by(pa_sink *s) { int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { pa_sink *s = PA_SINK(o); pa_sink_assert_ref(s); - pa_assert(PA_SINK_LINKED(s->thread_info.state)); + pa_assert(s->thread_info.state != PA_SINK_UNLINKED); switch ((pa_sink_message_t) code) { diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 2106edc..9a6902a 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -124,19 +124,6 @@ pa_source* pa_source_new( return s; } -void pa_source_put(pa_source *s) { - pa_source_assert_ref(s); - - pa_assert(s->state == PA_SINK_INIT); - pa_assert(s->rtpoll); - pa_assert(s->asyncmsgq); - - s->thread_info.state = s->state = PA_SOURCE_IDLE; - - pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index); - pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_NEW_POST], s); -} - static int source_set_state(pa_source *s, pa_source_state_t state) { int ret; @@ -171,6 +158,19 @@ static int source_set_state(pa_source *s, pa_source_state_t state) { return 0; } +void pa_source_put(pa_source *s) { + pa_source_assert_ref(s); + + pa_assert(s->state == PA_SINK_INIT); + pa_assert(s->rtpoll); + pa_assert(s->asyncmsgq); + + pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0); + + pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index); + pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_NEW_POST], s); +} + void pa_source_unlink(pa_source *s) { pa_bool_t linked; pa_source_output *o, *j = NULL; @@ -460,7 +460,7 @@ unsigned pa_source_used_by(pa_source *s) { int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { pa_source *s = PA_SOURCE(object); pa_source_assert_ref(s); - pa_assert(PA_SOURCE_LINKED(s->thread_info.state)); + pa_assert(s->thread_info.state != PA_SOURCE_UNLINKED); switch ((pa_source_message_t) code) { case PA_SOURCE_MESSAGE_ADD_OUTPUT: { -- 2.7.4