merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / pulsecore / asyncmsgq.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2006 Lennart Poettering
7
8   PulseAudio is free software; you can redistribute it and/or modify
9   it under the terms of the GNU Lesser General Public License as
10   published by the Free Software Foundation; either version 2.1 of the
11   License, or (at your option) any later version.
12
13   PulseAudio is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public
19   License along with PulseAudio; if not, write to the Free Software
20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21   USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
43 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
44
45 struct asyncmsgq_item {
46     int code;
47     pa_msgobject *object;
48     void *userdata;
49     pa_free_cb_t free_cb;
50     int64_t offset;
51     pa_memchunk memchunk;
52     pa_semaphore *semaphore;
53     int ret;
54 };
55
56 struct pa_asyncmsgq {
57     PA_REFCNT_DECLARE;
58     pa_asyncq *asyncq;
59     pa_mutex *mutex; /* only for the writer side */
60
61     struct asyncmsgq_item *current;
62 };
63
64 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
65     pa_asyncmsgq *a;
66
67     a = pa_xnew(pa_asyncmsgq, 1);
68
69     PA_REFCNT_INIT(a);
70     pa_assert_se(a->asyncq = pa_asyncq_new(size));
71     pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
72     a->current = NULL;
73
74     return a;
75 }
76
77 static void asyncmsgq_free(pa_asyncmsgq *a) {
78     struct asyncmsgq_item *i;
79     pa_assert(a);
80
81     while ((i = pa_asyncq_pop(a->asyncq, 0))) {
82
83         pa_assert(!i->semaphore);
84
85         if (i->object)
86             pa_msgobject_unref(i->object);
87
88         if (i->memchunk.memblock)
89             pa_memblock_unref(i->memchunk.memblock);
90
91         if (i->free_cb)
92             i->free_cb(i->userdata);
93
94         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
95             pa_xfree(i);
96     }
97
98     pa_asyncq_free(a->asyncq, NULL);
99     pa_mutex_free(a->mutex);
100     pa_xfree(a);
101 }
102
103 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
104     pa_assert(PA_REFCNT_VALUE(q) > 0);
105
106     PA_REFCNT_INC(q);
107     return q;
108 }
109
110 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
111     pa_assert(PA_REFCNT_VALUE(q) > 0);
112
113     if (PA_REFCNT_DEC(q) <= 0)
114         asyncmsgq_free(q);
115 }
116
117 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
118     struct asyncmsgq_item *i;
119     pa_assert(PA_REFCNT_VALUE(a) > 0);
120
121     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
122         i = pa_xnew(struct asyncmsgq_item, 1);
123
124     i->code = code;
125     i->object = object ? pa_msgobject_ref(object) : NULL;
126     i->userdata = (void*) userdata;
127     i->free_cb = free_cb;
128     i->offset = offset;
129     if (chunk) {
130         pa_assert(chunk->memblock);
131         i->memchunk = *chunk;
132         pa_memblock_ref(i->memchunk.memblock);
133     } else
134         pa_memchunk_reset(&i->memchunk);
135     i->semaphore = NULL;
136
137     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
138     pa_mutex_lock(a->mutex);
139     pa_asyncq_post(a->asyncq, i);
140     pa_mutex_unlock(a->mutex);
141 }
142
143 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
144     struct asyncmsgq_item i;
145     pa_assert(PA_REFCNT_VALUE(a) > 0);
146
147     i.code = code;
148     i.object = object;
149     i.userdata = (void*) userdata;
150     i.free_cb = NULL;
151     i.ret = -1;
152     i.offset = offset;
153     if (chunk) {
154         pa_assert(chunk->memblock);
155         i.memchunk = *chunk;
156     } else
157         pa_memchunk_reset(&i.memchunk);
158
159     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
160         i.semaphore = pa_semaphore_new(0);
161
162     pa_assert_se(i.semaphore);
163
164     /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
165     pa_mutex_lock(a->mutex);
166     pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
167     pa_mutex_unlock(a->mutex);
168
169     pa_semaphore_wait(i.semaphore);
170
171     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
172         pa_semaphore_free(i.semaphore);
173
174     return i.ret;
175 }
176
177 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) {
178     pa_assert(PA_REFCNT_VALUE(a) > 0);
179     pa_assert(!a->current);
180
181     if (!(a->current = pa_asyncq_pop(a->asyncq, wait))) {
182 /*         pa_log("failure"); */
183         return -1;
184     }
185
186 /*     pa_log("success"); */
187
188     if (code)
189         *code = a->current->code;
190     if (userdata)
191         *userdata = a->current->userdata;
192     if (offset)
193         *offset = a->current->offset;
194     if (object) {
195         if ((*object = a->current->object))
196             pa_msgobject_assert_ref(*object);
197     }
198     if (chunk)
199         *chunk = a->current->memchunk;
200
201 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", (void*) a, (void*) a->current->object, a->current->object ? a->current->object->parent.type_name : NULL, a->current->code, (void*) a->current->userdata, (unsigned long) a->current->memchunk.length); */
202
203     return 0;
204 }
205
206 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
207     pa_assert(PA_REFCNT_VALUE(a) > 0);
208     pa_assert(a);
209     pa_assert(a->current);
210
211     if (a->current->semaphore) {
212         a->current->ret = ret;
213         pa_semaphore_post(a->current->semaphore);
214     } else {
215
216         if (a->current->free_cb)
217             a->current->free_cb(a->current->userdata);
218
219         if (a->current->object)
220             pa_msgobject_unref(a->current->object);
221
222         if (a->current->memchunk.memblock)
223             pa_memblock_unref(a->current->memchunk.memblock);
224
225         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
226             pa_xfree(a->current);
227     }
228
229     a->current = NULL;
230 }
231
232 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
233     int c;
234     pa_assert(PA_REFCNT_VALUE(a) > 0);
235
236     pa_asyncmsgq_ref(a);
237
238     do {
239         pa_msgobject *o;
240         void *data;
241         int64_t offset;
242         pa_memchunk chunk;
243         int ret;
244
245         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
246             return -1;
247
248         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
249         pa_asyncmsgq_done(a, ret);
250
251     } while (c != code);
252
253     pa_asyncmsgq_unref(a);
254
255     return 0;
256 }
257
258 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
259     pa_msgobject *object;
260     int code;
261     void *data;
262     pa_memchunk chunk;
263     int64_t offset;
264     int ret;
265
266     pa_assert(PA_REFCNT_VALUE(a) > 0);
267
268     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
269         return 0;
270
271     pa_asyncmsgq_ref(a);
272     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
273     pa_asyncmsgq_done(a, ret);
274     pa_asyncmsgq_unref(a);
275
276     return 1;
277 }
278
279 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
280     pa_assert(PA_REFCNT_VALUE(a) > 0);
281
282     return pa_asyncq_read_fd(a->asyncq);
283 }
284
285 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
286     pa_assert(PA_REFCNT_VALUE(a) > 0);
287
288     return pa_asyncq_read_before_poll(a->asyncq);
289 }
290
291 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
292     pa_assert(PA_REFCNT_VALUE(a) > 0);
293
294     pa_asyncq_read_after_poll(a->asyncq);
295 }
296
297 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
298     pa_assert(PA_REFCNT_VALUE(a) > 0);
299
300     return pa_asyncq_write_fd(a->asyncq);
301 }
302
303 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
304     pa_assert(PA_REFCNT_VALUE(a) > 0);
305
306     pa_asyncq_write_before_poll(a->asyncq);
307 }
308
309 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
310     pa_assert(PA_REFCNT_VALUE(a) > 0);
311
312     pa_asyncq_write_after_poll(a->asyncq);
313 }
314
315 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
316
317     if (object)
318         return object->process_msg(object, code, userdata, offset, memchunk);
319
320     return 0;
321 }