merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / pulsecore / thread-mq.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2006 Lennart Poettering
7
8   PulseAudio is free software; you can redistribute it and/or modify
9   it under the terms of the GNU Lesser General Public License as
10   published by the Free Software Foundation; either version 2.1 of the
11   License, or (at your option) any later version.
12
13   PulseAudio is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public
19   License along with PulseAudio; if not, write to the Free Software
20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21   USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulse/xmalloc.h>
32
33 #include <pulsecore/atomic.h>
34 #include <pulsecore/once.h>
35 #include <pulsecore/log.h>
36 #include <pulsecore/thread.h>
37 #include <pulsecore/semaphore.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/flist.h>
41
42 #include "thread-mq.h"
43
44 PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq);
45
46 static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
47     pa_thread_mq *q = userdata;
48     pa_asyncmsgq *aq;
49
50     pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd);
51     pa_assert(events == PA_IO_EVENT_INPUT);
52
53     pa_asyncmsgq_ref(aq = q->outq);
54     pa_asyncmsgq_write_after_poll(aq);
55
56     for (;;) {
57         pa_msgobject *object;
58         int code;
59         void *data;
60         int64_t offset;
61         pa_memchunk chunk;
62
63         /* Check whether there is a message for us to process */
64         while (pa_asyncmsgq_get(aq, &object, &code, &data, &offset, &chunk, 0) == 0) {
65             int ret;
66
67             ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
68             pa_asyncmsgq_done(aq, ret);
69         }
70
71         if (pa_asyncmsgq_read_before_poll(aq) == 0)
72             break;
73     }
74
75     pa_asyncmsgq_unref(aq);
76 }
77
78 static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
79     pa_thread_mq *q = userdata;
80
81     pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd);
82     pa_assert(events == PA_IO_EVENT_INPUT);
83
84     pa_asyncmsgq_write_after_poll(q->inq);
85     pa_asyncmsgq_write_before_poll(q->inq);
86 }
87
88 void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) {
89     pa_assert(q);
90     pa_assert(mainloop);
91
92     q->mainloop = mainloop;
93     pa_assert_se(q->inq = pa_asyncmsgq_new(0));
94     pa_assert_se(q->outq = pa_asyncmsgq_new(0));
95
96     pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0);
97     pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q));
98
99     pa_asyncmsgq_write_before_poll(q->inq);
100     pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q));
101
102     pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq);
103     pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq);
104 }
105
106 void pa_thread_mq_done(pa_thread_mq *q) {
107     pa_assert(q);
108
109     q->mainloop->io_free(q->read_event);
110     q->mainloop->io_free(q->write_event);
111     q->read_event = q->write_event = NULL;
112
113     pa_asyncmsgq_unref(q->inq);
114     pa_asyncmsgq_unref(q->outq);
115     q->inq = q->outq = NULL;
116
117     q->mainloop = NULL;
118 }
119
120 void pa_thread_mq_install(pa_thread_mq *q) {
121     pa_assert(q);
122
123     pa_assert(!(PA_STATIC_TLS_GET(thread_mq)));
124     PA_STATIC_TLS_SET(thread_mq, q);
125 }
126
127 pa_thread_mq *pa_thread_mq_get(void) {
128     return PA_STATIC_TLS_GET(thread_mq);
129 }