From f7171e86caef4f71f58d4f65d9cada4e53a19396 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 10 Aug 2007 22:01:17 +0000 Subject: [PATCH] Wrap two pa_asyncmsq in a new pa_thread_mq object for bidirectional, lock-free communication between a main loop and a thread git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1622 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/Makefile.am | 1 + src/pulsecore/thread-mq.c | 119 ++++++++++++++++++++++++++++++++++++++++++++++ src/pulsecore/thread-mq.h | 49 +++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 src/pulsecore/thread-mq.c create mode 100644 src/pulsecore/thread-mq.h diff --git a/src/Makefile.am b/src/Makefile.am index 6cb3f28..2d3af07 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -662,6 +662,7 @@ libpulsecore_la_SOURCES += \ pulsecore/flist.c pulsecore/flist.h \ pulsecore/asyncmsgq.c pulsecore/asyncmsgqq.h \ pulsecore/asyncq.c pulsecore/asyncq.h \ + pulsecore/thread-mq.c pulsecore/thread-mq.h \ pulsecore/fdsem.c pulsecore/fdsem.h \ pulsecore/object.c pulsecore/object.h \ pulsecore/msgobject.c pulsecore/msgobject.h \ diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c new file mode 100644 index 0000000..224a14c --- /dev/null +++ b/src/pulsecore/thread-mq.c @@ -0,0 +1,119 @@ +/* $Id$ */ + +/*** + This file is part of PulseAudio. + + Copyright 2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "thread-mq.h" + +static pa_once once = PA_ONCE_INIT; +static pa_tls *tls; + +static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { + pa_thread_mq *q = userdata; + + pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd); + pa_assert(events == PA_IO_EVENT_INPUT); + + pa_asyncmsgq_after_poll(q->outq); + + for (;;) { + pa_msgobject *object; + int code; + void *data; + int64_t offset; + pa_memchunk chunk; + + /* Check whether there is a message for us to process */ + while (pa_asyncmsgq_get(q->outq, &object, &code, &data, &offset, &chunk, 0) == 0) { + int ret; + + ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); + pa_asyncmsgq_done(q->outq, ret); + } + + if (pa_asyncmsgq_before_poll(q->outq) == 0) + break; + } +} + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { + pa_assert(q); + pa_assert(mainloop); + + q->mainloop = mainloop; + pa_assert_se(q->inq = pa_asyncmsgq_new(0)); + pa_assert_se(q->outq = pa_asyncmsgq_new(0)); + + pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0); + pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q)); +} + +void pa_thread_mq_done(pa_thread_mq *q) { + pa_assert(q); + + q->mainloop->io_free(q->io_event); + q->io_event = NULL; + + pa_asyncmsgq_after_poll(q->outq); + pa_asyncmsgq_free(q->inq); + pa_asyncmsgq_free(q->outq); + q->inq = q->outq = NULL; + + q->mainloop = NULL; +} + +static void init_tls(void) { + tls = pa_tls_new(NULL); +} + +void pa_thread_mq_install(pa_thread_mq *q) { + pa_assert(q); + + pa_run_once(&once, init_tls); + pa_tls_set(tls, q); +} + +pa_thread_mq *pa_thread_mq_get(void) { + pa_thread_mq *q; + + pa_run_once(&once, init_tls); + pa_assert_se(q = pa_tls_get(tls)); + return q; +} + diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h new file mode 100644 index 0000000..13b6e01 --- /dev/null +++ b/src/pulsecore/thread-mq.h @@ -0,0 +1,49 @@ +#ifndef foopulsethreadmqhfoo +#define foopulsethreadmqhfoo + +/* $Id$ */ + +/*** + This file is part of PulseAudio. + + Copyright 2004-2006 Lennart Poettering + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation; either version 2.1 of the + License, or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#include +#include + +/* Two way communication between a thread and a mainloop. Before the + * thread is started a pa_pthread_mq should be initialized and than + * attached to the thread using pa_thread_mq_install(). */ + +typedef struct pa_thread_mq { + pa_mainloop_api *mainloop; + pa_asyncmsgq *inq, *outq; + pa_io_event *io_event; +} pa_thread_mq; + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop); +void pa_thread_mq_done(pa_thread_mq *q); + +/* Install the specified pa_thread_mq object for the current thread */ +void pa_thread_mq_install(pa_thread_mq *q); + +/* Return the pa_thread_mq object that is set for the current thread */ +pa_thread_mq *pa_thread_mq_get(void); + +#endif -- 2.7.4