4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/xmalloc.h>
33 #include <pulsecore/atomic.h>
34 #include <pulsecore/once.h>
35 #include <pulsecore/log.h>
36 #include <pulsecore/thread.h>
37 #include <pulsecore/semaphore.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/flist.h>
42 #include "thread-mq.h"
44 PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq);
46 static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
47 pa_thread_mq *q = userdata;
50 pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd);
51 pa_assert(events == PA_IO_EVENT_INPUT);
53 pa_asyncmsgq_ref(aq = q->outq);
54 pa_asyncmsgq_write_after_poll(aq);
63 /* Check whether there is a message for us to process */
64 while (pa_asyncmsgq_get(aq, &object, &code, &data, &offset, &chunk, 0) == 0) {
67 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
68 pa_asyncmsgq_done(aq, ret);
71 if (pa_asyncmsgq_read_before_poll(aq) == 0)
75 pa_asyncmsgq_unref(aq);
78 static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
79 pa_thread_mq *q = userdata;
81 pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd);
82 pa_assert(events == PA_IO_EVENT_INPUT);
84 pa_asyncmsgq_write_after_poll(q->inq);
85 pa_asyncmsgq_write_before_poll(q->inq);
88 void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) {
92 q->mainloop = mainloop;
93 pa_assert_se(q->inq = pa_asyncmsgq_new(0));
94 pa_assert_se(q->outq = pa_asyncmsgq_new(0));
96 pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0);
97 pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q));
99 pa_asyncmsgq_write_before_poll(q->inq);
100 pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q));
102 pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq);
103 pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq);
106 void pa_thread_mq_done(pa_thread_mq *q) {
109 q->mainloop->io_free(q->read_event);
110 q->mainloop->io_free(q->write_event);
111 q->read_event = q->write_event = NULL;
113 pa_asyncmsgq_unref(q->inq);
114 pa_asyncmsgq_unref(q->outq);
115 q->inq = q->outq = NULL;
120 void pa_thread_mq_install(pa_thread_mq *q) {
123 pa_assert(!(PA_STATIC_TLS_GET(thread_mq)));
124 PA_STATIC_TLS_SET(thread_mq, q);
127 pa_thread_mq *pa_thread_mq_get(void) {
128 return PA_STATIC_TLS_GET(thread_mq);