From 3396b65f15a06ff312e318bc05e502ba402c564e Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 14 Sep 2007 21:51:05 +0000 Subject: [PATCH] simplify rt loops a bit by moving more code into pa_rtpoll. It is now possible to attach "work" functions to a pa_rtpoll_item, which will be called in each loop iteration. This allows us to hide the message processing in the RT loops and to drop the seperate sink_input->process hooks. Basically, only the driver-specific code remains in the RT loops. git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1822 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/modules/module-alsa-sink.c | 21 +++------- src/modules/module-alsa-source.c | 23 +++------- src/modules/module-combine.c | 85 ++++++++++++------------------------- src/modules/module-null-sink.c | 23 +++------- src/modules/module-oss.c | 33 +++------------ src/modules/module-pipe-sink.c | 28 ++++--------- src/modules/module-pipe-source.c | 23 +++------- src/pulsecore/asyncmsgq.c | 21 ++++++++++ src/pulsecore/asyncmsgq.h | 1 + src/pulsecore/rtpoll.c | 91 +++++++++++++++++++++++++++++++++------- src/pulsecore/rtpoll.h | 20 +++++++-- src/pulsecore/sink-input.c | 2 - src/pulsecore/sink-input.h | 7 ---- src/pulsecore/sink.c | 17 -------- src/pulsecore/source-output.c | 2 - src/pulsecore/source-output.h | 7 ---- src/pulsecore/source.c | 18 -------- src/pulsecore/thread-mq.c | 25 ----------- src/pulsecore/thread-mq.h | 3 -- 19 files changed, 180 insertions(+), 270 deletions(-) diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 26c24c8..1bcb30c 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -630,24 +630,13 @@ static void thread_func(void *userdata) { } } - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) + /* Hmm, nothing to do. Let's sleep */ + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - if (ret > 0) - continue; - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) + if (ret == 0) goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); - goto fail; - } - /* Tell ALSA about this and process its response */ if (PA_SINK_OPENED(u->sink->thread_info.state)) { struct pollfd *pollfd; @@ -676,8 +665,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index 9e03729..870f204 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -612,24 +612,13 @@ static void thread_func(void *userdata) { } } - /* Now give the source outputs some to time to process their data */ - if ((ret = pa_source_process_outputs(u->source)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + if (ret == 0) + goto finish; + /* Tell ALSA about this and process its response */ if (PA_SOURCE_OPENED(u->source->thread_info.state)) { struct pollfd *pollfd; @@ -658,8 +647,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c index 11707b1..7df04ec 100644 --- a/src/modules/module-combine.c +++ b/src/modules/module-combine.c @@ -139,6 +139,10 @@ enum { SINK_MESSAGE_REMOVE_OUTPUT }; +enum { + SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX +}; + static void output_free(struct output *o); static int output_create_sink_input(struct userdata *u, struct output *o); static int update_master(struct userdata *u, struct output *o); @@ -255,28 +259,17 @@ static void thread_func(void *userdata) { } else pa_rtpoll_set_timer_disabled(u->rtpoll); - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) + /* Hmm, nothing to do. Let's sleep */ + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - if (ret > 0) - continue; - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) + if (ret == 0) goto finish; - if (ret > 0) - continue; - - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); - goto fail; - } } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); @@ -294,10 +287,8 @@ static void request_memblock(struct output *o) { /* If another thread already prepared some data we received * the data over the asyncmsgq, hence let's first process * it. */ - while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) { - pa_memblockq_push_align(o->memblockq, &chunk); - pa_asyncmsgq_done(o->asyncmsgq, 0); - } + while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) + ; /* Check whether we're now readable */ if (pa_memblockq_is_readable(o->memblockq)) @@ -309,10 +300,8 @@ static void request_memblock(struct output *o) { if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) { /* Maybe there's some data now? */ - while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) { - pa_memblockq_push_align(o->memblockq, &chunk); - pa_asyncmsgq_done(o->asyncmsgq, 0); - } + while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) + ; /* Ok, now let's prepare some data if we really have to */ while (!pa_memblockq_is_readable(o->memblockq)) { @@ -324,7 +313,7 @@ static void request_memblock(struct output *o) { /* OK, let's send this data to the other threads */ for (j = o->userdata->thread_info.outputs; j; j = j->next) if (j != o && j->sink_input) - pa_asyncmsgq_post(j->asyncmsgq, NULL, 0, NULL, 0, &chunk, NULL); + pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); /* And push it into our own queue */ pa_memblockq_push_align(o->memblockq, &chunk); @@ -362,37 +351,6 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) { } /* Called from I/O thread context */ -static int sink_input_process_cb(pa_sink_input *i) { - struct output *o; - pa_memchunk chunk; - int r = 0; - - pa_sink_input_assert_ref(i); - o = i->userdata; - pa_assert(o); - - /* Move all data in the asyncmsgq into our memblockq */ - - while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) { - if (PA_SINK_OPENED(i->sink->thread_info.state)) - pa_memblockq_push_align(o->memblockq, &chunk); - pa_asyncmsgq_done(o->asyncmsgq, 0); - } - - /* If the sink is suspended, flush our queue */ - if (!PA_SINK_OPENED(i->sink->thread_info.state)) - pa_memblockq_flush(o->memblockq); - - if (o == o->userdata->thread_info.master) { - pa_mutex_lock(o->userdata->mutex); - r = pa_sink_process_inputs(o->userdata->sink); - pa_mutex_unlock(o->userdata->mutex); - } - - return r; -} - -/* Called from I/O thread context */ static void sink_input_attach_cb(pa_sink_input *i) { struct output *o; @@ -401,7 +359,10 @@ static void sink_input_attach_cb(pa_sink_input *i) { pa_assert(o); pa_assert(!o->rtpoll_item); - o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(i->sink->rtpoll, PA_RTPOLL_NORMAL, o->asyncmsgq); + o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq( + i->sink->rtpoll, + PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */ + o->asyncmsgq); } /* Called from I/O thread context */ @@ -448,6 +409,15 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 break; } + case SINK_INPUT_MESSAGE_POST: { + + if (PA_SINK_OPENED(o->sink_input->sink->thread_info.state)) + pa_memblockq_push_align(o->memblockq, chunk); + else + pa_memblockq_flush(o->memblockq); + + break; + } } return pa_sink_input_process_msg(obj, code, data, offset, chunk); @@ -784,7 +754,6 @@ static int output_create_sink_input(struct userdata *u, struct output *o) { o->sink_input->parent.process_msg = sink_input_process_msg; o->sink_input->peek = sink_input_peek_cb; o->sink_input->drop = sink_input_drop_cb; - o->sink_input->process = sink_input_process_cb; o->sink_input->attach = sink_input_attach_cb; o->sink_input->detach = sink_input_detach_cb; o->sink_input->kill = sink_input_kill_cb; diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index ba2ae6f..04df239 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -145,28 +145,17 @@ static void thread_func(void *userdata) { } else pa_rtpoll_set_timer_disabled(u->rtpoll); - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + + if (ret == 0) + goto finish; } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index 1fd8d2e..037c401 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -999,28 +999,6 @@ static void thread_func(void *userdata) { /* pa_log("loop2"); */ - /* Now give the sink inputs some to time to process their data */ - if (u->sink) { - if ((ret = pa_sink_process_inputs(u->sink)) < 0) - goto fail; - if (ret > 0) - continue; - } - - /* Now give the source outputs some to time to process their data */ - if (u->source) { - if ((ret = pa_source_process_outputs(u->source)) < 0) - goto fail; - if (ret > 0) - continue; - } - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - if (u->fd >= 0) { struct pollfd *pollfd; @@ -1031,11 +1009,12 @@ static void thread_func(void *userdata) { } /* Hmm, nothing to do. Let's sleep */ - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + if (ret == 0) + goto finish; + if (u->fd >= 0) { struct pollfd *pollfd; @@ -1052,8 +1031,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 9594a68..a1bdc8f 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -126,8 +126,8 @@ static void thread_func(void *userdata) { pa_rtpoll_install(u->rtpoll); for (;;) { - int ret; struct pollfd *pollfd; + int ret; pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); @@ -170,36 +170,26 @@ static void thread_func(void *userdata) { } } - /* Now give the sink inputs some to time to process their data */ - if ((ret = pa_sink_process_inputs(u->sink)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ pollfd->events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0; - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + + if (ret == 0) + goto finish; pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + if (pollfd->revents & ~POLLOUT) { pa_log("FIFO shutdown."); goto fail; } - } + } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index 1b42fcf..382da8f 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -149,26 +149,15 @@ static void thread_func(void *userdata) { } } - /* Now give the source outputs some to time to process their data */ - if ((ret = pa_source_process_outputs(u->source)) < 0) - goto fail; - if (ret > 0) - continue; - - /* Check whether there is a message for us to process */ - if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) - goto finish; - if (ret > 0) - continue; - /* Hmm, nothing to do. Let's sleep */ pollfd->events = u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0; - if (pa_rtpoll_run(u->rtpoll, 1) < 0) { - pa_log("poll() failed: %s", pa_cstrerror(errno)); + if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0) goto fail; - } + if (ret == 0) + goto finish; + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); if (pollfd->revents & ~POLLIN) { pa_log("FIFO shutdown."); @@ -177,8 +166,8 @@ static void thread_func(void *userdata) { } fail: - /* We have to continue processing messages until we receive the - * SHUTDOWN message */ + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c index e3a1ba9..b365446 100644 --- a/src/pulsecore/asyncmsgq.c +++ b/src/pulsecore/asyncmsgq.c @@ -248,6 +248,27 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) { return 0; } +int pa_asyncmsgq_process_one(pa_asyncmsgq *a) { + pa_msgobject *object; + int code; + void *data; + pa_memchunk chunk; + int64_t offset; + int ret; + + pa_assert(PA_REFCNT_VALUE(a) > 0); + + if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0) + return 0; + + pa_asyncmsgq_ref(a); + ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); + pa_asyncmsgq_done(a, ret); + pa_asyncmsgq_unref(a); + + return 1; +} + int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h index 55812c6..393bb0b 100644 --- a/src/pulsecore/asyncmsgq.h +++ b/src/pulsecore/asyncmsgq.h @@ -65,6 +65,7 @@ int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **u int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk); void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret); int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code); +int pa_asyncmsgq_process_one(pa_asyncmsgq *a); /* Just for the reading side */ int pa_asyncmsgq_get_fd(pa_asyncmsgq *q); diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c index 659e538..0de8d0c 100644 --- a/src/pulsecore/rtpoll.c +++ b/src/pulsecore/rtpoll.c @@ -53,7 +53,7 @@ struct pa_rtpoll { pa_usec_t period; int scan_for_dead; - int running, installed, rebuild_needed; + int running, installed, rebuild_needed, quit; #ifdef HAVE_PPOLL int rtsig; @@ -76,6 +76,7 @@ struct pa_rtpoll_item { struct pollfd *pollfd; unsigned n_pollfd; + int (*work_cb)(pa_rtpoll_item *i); int (*before_cb)(pa_rtpoll_item *i); void (*after_cb)(pa_rtpoll_item *i); void *userdata; @@ -134,6 +135,7 @@ pa_rtpoll *pa_rtpoll_new(void) { p->installed = 0; p->scan_for_dead = 0; p->rebuild_needed = 0; + p->quit = 0; PA_LLIST_HEAD_INIT(pa_rtpoll_item, p->items); @@ -288,7 +290,6 @@ static void reset_all_revents(pa_rtpoll *p) { int pa_rtpoll_run(pa_rtpoll *p, int wait) { pa_rtpoll_item *i; int r = 0; - int saved_errno = 0; struct timespec timeout; pa_assert(p); @@ -297,6 +298,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { p->running = 1; + /* First, let's do some work */ for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { int k; @@ -306,12 +308,31 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { if (!i->before_cb) continue; - if ((k = i->before_cb(i)) != 0) { + if (p->quit) + goto finish; + + if ((k = i->work_cb(i)) != 0) { + if (k < 0) + r = k; + + goto finish; + } + } + + /* Now let's prepare for entering the sleep */ + for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { + int k = 0; + + if (i->dead) + continue; + + if (!i->before_cb) + continue; + + if (p->quit || (k = i->before_cb(i)) != 0) { /* Hmm, this one doesn't let us enter the poll, so rewind everything */ - reset_all_revents(p); - for (i = i->prev; i; i = i->prev) { if (i->dead) @@ -334,7 +355,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { rtpoll_rebuild(p); /* Calculate timeout */ - if (!wait) { + if (!wait || p->quit) { timeout.tv_sec = 0; timeout.tv_nsec = 0; } else if (p->timer_enabled) { @@ -362,13 +383,14 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { r = poll(p->pollfd, p->n_pollfd_used, p->timer_enabled > 0 ? (timeout.tv_sec*1000) + (timeout.tv_nsec / 1000000) : -1); #endif - if (r < 0) + if (r < 0) { reset_all_revents(p); - if (r < 0 && (errno == EAGAIN || errno == EINTR)) - r = 0; - - saved_errno = r < 0 ? errno : 0; + if (errno == EAGAIN || errno == EINTR) + r = 0; + else + pa_log_error("poll(): %s", pa_cstrerror(errno)); + } if (p->timer_enabled) { if (p->period > 0) { @@ -385,6 +407,7 @@ int pa_rtpoll_run(pa_rtpoll *p, int wait) { p->timer_enabled = 0; } + /* Let's tell everyone that we left the sleep */ for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { if (i->dead) @@ -413,10 +436,7 @@ finish: } } - if (saved_errno != 0) - errno = saved_errno; - - return r; + return r < 0 ? r : !p->quit; } static void update_timer(pa_rtpoll *p) { @@ -528,6 +548,7 @@ pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsi i->userdata = NULL; i->before_cb = NULL; i->after_cb = NULL; + i->work_cb = NULL; for (j = p->items; j; j = j->next) { if (prio <= j->priority) @@ -585,6 +606,13 @@ void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rt i->after_cb = after_cb; } +void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)) { + pa_assert(i); + pa_assert(i->priority < PA_RTPOLL_NEVER); + + i->work_cb = work_cb; +} + void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata) { pa_assert(i); @@ -649,6 +677,32 @@ static void asyncmsgq_after(pa_rtpoll_item *i) { pa_asyncmsgq_after_poll(i->userdata); } +static int asyncmsgq_work(pa_rtpoll_item *i) { + pa_msgobject *object; + int code; + void *data; + pa_memchunk chunk; + int64_t offset; + + pa_assert(i); + + if (pa_asyncmsgq_get(i->userdata, &object, &code, &data, &offset, &chunk, 0) == 0) { + int ret; + + if (!object && code == PA_MESSAGE_SHUTDOWN) { + pa_asyncmsgq_done(i->userdata, 0); + pa_rtpoll_quit(i->rtpoll); + return 1; + } + + ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); + pa_asyncmsgq_done(i->userdata, ret); + return 1; + } + + return 0; +} + pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { pa_rtpoll_item *i; struct pollfd *pollfd; @@ -664,7 +718,14 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t i->before_cb = asyncmsgq_before; i->after_cb = asyncmsgq_after; + i->work_cb = asyncmsgq_work; i->userdata = q; return i; } + +void pa_rtpoll_quit(pa_rtpoll *p) { + pa_assert(p); + + p->quit = 1; +} diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h index bef9eed..9a368d3 100644 --- a/src/pulsecore/rtpoll.h +++ b/src/pulsecore/rtpoll.h @@ -69,7 +69,9 @@ void pa_rtpoll_install(pa_rtpoll *p); /* Sleep on the rtpoll until the time event, or any of the fd events * is triggered. If "wait" is 0 we don't sleep but only update the - * struct pollfd. */ + * struct pollfd. Returns negative on error, positive if the loop + * should continue to run, 0 when the loop should be terminated + * cleanly. */ int pa_rtpoll_run(pa_rtpoll *f, int wait); void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, const struct timespec *ts); @@ -86,18 +88,30 @@ void pa_rtpoll_item_free(pa_rtpoll_item *i); * using the pointer and don't save the result anywhere */ struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds); +/* Set the callback that shall be called when there's time to do some work: If the + * callback returns a value > 0, the poll is skipped and the next + * iteraton of the loop will start immediately. */ +void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i)); + /* Set the callback that shall be called immediately before entering - * the sleeping poll: If the callback returns a negative value, the - * poll is skipped. */ + * the sleeping poll: If the callback returns a value > 0, the poll is + * skipped and the next iteraton of the loop will start + * immediately.. */ void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i)); /* Set the callback that shall be called immediately after having * entered the sleeping poll */ void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i)); + + void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata); void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i); pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s); pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q); +/* Requests the loop to exit. Will cause the next iteration of + * pa_rtpoll_run() to return 0 */ +void pa_rtpoll_quit(pa_rtpoll *p); + #endif diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index c33d8e7..2687cfa 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -194,7 +194,6 @@ pa_sink_input* pa_sink_input_new( i->peek = NULL; i->drop = NULL; - i->process = NULL; i->kill = NULL; i->get_latency = NULL; i->attach = NULL; @@ -272,7 +271,6 @@ void pa_sink_input_unlink(pa_sink_input *i) { i->peek = NULL; i->drop = NULL; - i->process = NULL; i->kill = NULL; i->get_latency = NULL; i->attach = NULL; diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index e1d89ff..c4e65b5 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -90,13 +90,6 @@ struct pa_sink_input { * peek(), but not necessarily. Called from IO thread context. */ void (*drop) (pa_sink_input *i, size_t length); - /* If non-NULL this function is called in each IO event loop and - * can be used to do additional processing even when the device is - * suspended and peek() is never called. Should return 1 when - * "some work" has been done and the IO event loop should be - * reiterated immediately. Called from IO thread context. */ - int (*process) (pa_sink_input *i); /* may be NULL */ - /* If non-NULL this function is called when the input is first * connected to a sink. Called from IO thread context */ void (*attach) (pa_sink_input *i); /* may be NULL */ diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index b009bc7..a7ed5a4 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -922,20 +922,3 @@ int pa_sink_suspend_all(pa_core *c, int suspend) { return ret; } -int pa_sink_process_inputs(pa_sink *s) { - pa_sink_input *i; - void *state = NULL; - int r; - - pa_sink_assert_ref(s); - - if (!PA_SINK_LINKED(s->thread_info.state)) - return 0; - - while ((i = PA_SINK_INPUT(pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)))) - if (i->process) - if ((r = i->process(i))) - return r; - - return 0; -} diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 4267234..b77a4ae 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -148,7 +148,6 @@ pa_source_output* pa_source_output_new( o->channel_map = data->channel_map; o->push = NULL; - o->process = NULL; o->kill = NULL; o->get_latency = NULL; o->detach = NULL; @@ -204,7 +203,6 @@ void pa_source_output_unlink(pa_source_output*o) { pa_source_update_status(o->source); o->push = NULL; - o->process = NULL; o->kill = NULL; o->get_latency = NULL; o->attach = NULL; diff --git a/src/pulsecore/source-output.h b/src/pulsecore/source-output.h index 2027e37..5059c46 100644 --- a/src/pulsecore/source-output.h +++ b/src/pulsecore/source-output.h @@ -73,13 +73,6 @@ struct pa_source_output { * context. */ void (*push)(pa_source_output *o, const pa_memchunk *chunk); - /* If non-NULL this function is called in each IO event loop and - * can be used to do additional processing even when the device is - * suspended and peek() is never called. Should return 1 when - * "some work" has been done and the IO event loop should be - * reiterated immediately. Called from IO thread context. */ - int (*process) (pa_source_output *o); /* may be NULL */ - /* If non-NULL this function is called when the output is first * connected to a source. Called from IO thread context */ void (*attach) (pa_source_output *o); /* may be NULL */ diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 2f1a5a5..34e023d 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -504,21 +504,3 @@ int pa_source_suspend_all(pa_core *c, int suspend) { return ret; } - -int pa_source_process_outputs(pa_source *s) { - pa_source_output *o; - void *state = NULL; - int r; - - pa_source_assert_ref(s); - - if (!PA_SOURCE_LINKED(s->state)) - return 0; - - while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) - if (o->process) - if ((r = o->process(o))) - return r; - - return 0; -} diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c index 3000246..d572f6e 100644 --- a/src/pulsecore/thread-mq.c +++ b/src/pulsecore/thread-mq.c @@ -110,28 +110,3 @@ void pa_thread_mq_install(pa_thread_mq *q) { pa_thread_mq *pa_thread_mq_get(void) { return PA_STATIC_TLS_GET(thread_mq); } - -int pa_thread_mq_process(pa_thread_mq *q) { - pa_msgobject *object; - int code; - void *data; - pa_memchunk chunk; - int64_t offset; - - pa_assert(q); - - if (pa_asyncmsgq_get(q->inq, &object, &code, &data, &offset, &chunk, 0) == 0) { - int ret; - - if (!object && code == PA_MESSAGE_SHUTDOWN) { - pa_asyncmsgq_done(q->inq, 0); - return -1; - } - - ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); - pa_asyncmsgq_done(q->inq, ret); - return 1; - } - - return 0; -} diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h index 2b1fd68..13b6e01 100644 --- a/src/pulsecore/thread-mq.h +++ b/src/pulsecore/thread-mq.h @@ -43,9 +43,6 @@ void pa_thread_mq_done(pa_thread_mq *q); /* Install the specified pa_thread_mq object for the current thread */ void pa_thread_mq_install(pa_thread_mq *q); -/* Dispatched queued events on the thread side. */ -int pa_thread_mq_process(pa_thread_mq *q); - /* Return the pa_thread_mq object that is set for the current thread */ pa_thread_mq *pa_thread_mq_get(void); -- 2.7.4