Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / asyncmsgq.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2006 Lennart Poettering
5
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <unistd.h>
27 #include <errno.h>
28
29 #include <pulsecore/atomic.h>
30 #include <pulsecore/log.h>
31 #include <pulsecore/thread.h>
32 #include <pulsecore/semaphore.h>
33 #include <pulsecore/macro.h>
34 #include <pulsecore/core-util.h>
35 #include <pulsecore/flist.h>
36 #include <pulse/xmalloc.h>
37
38 #include "asyncmsgq.h"
39
40 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
41 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
42
43 struct asyncmsgq_item {
44     int code;
45     pa_msgobject *object;
46     void *userdata;
47     pa_free_cb_t free_cb;
48     int64_t offset;
49     pa_memchunk memchunk;
50     pa_semaphore *semaphore;
51     int ret;
52 };
53
54 struct pa_asyncmsgq {
55     PA_REFCNT_DECLARE;
56     pa_asyncq *asyncq;
57     pa_mutex *mutex; /* only for the writer side */
58
59     struct asyncmsgq_item *current;
60 };
61
62 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
63     pa_asyncmsgq *a;
64
65     a = pa_xnew(pa_asyncmsgq, 1);
66
67     PA_REFCNT_INIT(a);
68     pa_assert_se(a->asyncq = pa_asyncq_new(size));
69     pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
70     a->current = NULL;
71
72     return a;
73 }
74
75 static void asyncmsgq_free(pa_asyncmsgq *a) {
76     struct asyncmsgq_item *i;
77     pa_assert(a);
78
79     while ((i = pa_asyncq_pop(a->asyncq, 0))) {
80
81         pa_assert(!i->semaphore);
82
83         if (i->object)
84             pa_msgobject_unref(i->object);
85
86         if (i->memchunk.memblock)
87             pa_memblock_unref(i->memchunk.memblock);
88
89         if (i->free_cb)
90             i->free_cb(i->userdata);
91
92         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
93             pa_xfree(i);
94     }
95
96     pa_asyncq_free(a->asyncq, NULL);
97     pa_mutex_free(a->mutex);
98     pa_xfree(a);
99 }
100
101 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
102     pa_assert(PA_REFCNT_VALUE(q) > 0);
103
104     PA_REFCNT_INC(q);
105     return q;
106 }
107
108 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
109     pa_assert(PA_REFCNT_VALUE(q) > 0);
110
111     if (PA_REFCNT_DEC(q) <= 0)
112         asyncmsgq_free(q);
113 }
114
115 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
116     struct asyncmsgq_item *i;
117     pa_assert(PA_REFCNT_VALUE(a) > 0);
118
119     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
120         i = pa_xnew(struct asyncmsgq_item, 1);
121
122     i->code = code;
123     i->object = object ? pa_msgobject_ref(object) : NULL;
124     i->userdata = (void*) userdata;
125     i->free_cb = free_cb;
126     i->offset = offset;
127     if (chunk) {
128         pa_assert(chunk->memblock);
129         i->memchunk = *chunk;
130         pa_memblock_ref(i->memchunk.memblock);
131     } else
132         pa_memchunk_reset(&i->memchunk);
133     i->semaphore = NULL;
134
135     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
136     pa_mutex_lock(a->mutex);
137     pa_asyncq_post(a->asyncq, i);
138     pa_mutex_unlock(a->mutex);
139 }
140
141 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
142     struct asyncmsgq_item i;
143     pa_assert(PA_REFCNT_VALUE(a) > 0);
144
145     i.code = code;
146     i.object = object;
147     i.userdata = (void*) userdata;
148     i.free_cb = NULL;
149     i.ret = -1;
150     i.offset = offset;
151     if (chunk) {
152         pa_assert(chunk->memblock);
153         i.memchunk = *chunk;
154     } else
155         pa_memchunk_reset(&i.memchunk);
156
157     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
158         i.semaphore = pa_semaphore_new(0);
159
160     pa_assert_se(i.semaphore);
161
162     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
163     pa_mutex_lock(a->mutex);
164     pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
165     pa_mutex_unlock(a->mutex);
166
167     pa_semaphore_wait(i.semaphore);
168
169     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
170         pa_semaphore_free(i.semaphore);
171
172     return i.ret;
173 }
174
175 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) {
176     pa_assert(PA_REFCNT_VALUE(a) > 0);
177     pa_assert(!a->current);
178
179     if (!(a->current = pa_asyncq_pop(a->asyncq, wait))) {
180 /*         pa_log("failure"); */
181         return -1;
182     }
183
184 /*     pa_log("success"); */
185
186     if (code)
187         *code = a->current->code;
188     if (userdata)
189         *userdata = a->current->userdata;
190     if (offset)
191         *offset = a->current->offset;
192     if (object) {
193         if ((*object = a->current->object))
194             pa_msgobject_assert_ref(*object);
195     }
196     if (chunk)
197         *chunk = a->current->memchunk;
198
199 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
200 /*                  (void*) a, */
201 /*                  (void*) a->current->object, */
202 /*                  a->current->object ? a->current->object->parent.type_name : NULL, */
203 /*                  a->current->code, */
204 /*                  (void*) a->current->userdata, */
205 /*                  (unsigned long) a->current->memchunk.length); */
206
207     return 0;
208 }
209
210 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
211     pa_assert(PA_REFCNT_VALUE(a) > 0);
212     pa_assert(a);
213     pa_assert(a->current);
214
215     if (a->current->semaphore) {
216         a->current->ret = ret;
217         pa_semaphore_post(a->current->semaphore);
218     } else {
219
220         if (a->current->free_cb)
221             a->current->free_cb(a->current->userdata);
222
223         if (a->current->object)
224             pa_msgobject_unref(a->current->object);
225
226         if (a->current->memchunk.memblock)
227             pa_memblock_unref(a->current->memchunk.memblock);
228
229         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
230             pa_xfree(a->current);
231     }
232
233     a->current = NULL;
234 }
235
236 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
237     int c;
238     pa_assert(PA_REFCNT_VALUE(a) > 0);
239
240     pa_asyncmsgq_ref(a);
241
242     do {
243         pa_msgobject *o;
244         void *data;
245         int64_t offset;
246         pa_memchunk chunk;
247         int ret;
248
249         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, 1) < 0)
250             return -1;
251
252         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
253         pa_asyncmsgq_done(a, ret);
254
255     } while (c != code);
256
257     pa_asyncmsgq_unref(a);
258
259     return 0;
260 }
261
262 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
263     pa_msgobject *object;
264     int code;
265     void *data;
266     pa_memchunk chunk;
267     int64_t offset;
268     int ret;
269
270     pa_assert(PA_REFCNT_VALUE(a) > 0);
271
272     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, 0) < 0)
273         return 0;
274
275     pa_asyncmsgq_ref(a);
276     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
277     pa_asyncmsgq_done(a, ret);
278     pa_asyncmsgq_unref(a);
279
280     return 1;
281 }
282
283 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
284     pa_assert(PA_REFCNT_VALUE(a) > 0);
285
286     return pa_asyncq_read_fd(a->asyncq);
287 }
288
289 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
290     pa_assert(PA_REFCNT_VALUE(a) > 0);
291
292     return pa_asyncq_read_before_poll(a->asyncq);
293 }
294
295 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
296     pa_assert(PA_REFCNT_VALUE(a) > 0);
297
298     pa_asyncq_read_after_poll(a->asyncq);
299 }
300
301 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
302     pa_assert(PA_REFCNT_VALUE(a) > 0);
303
304     return pa_asyncq_write_fd(a->asyncq);
305 }
306
307 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
308     pa_assert(PA_REFCNT_VALUE(a) > 0);
309
310     pa_asyncq_write_before_poll(a->asyncq);
311 }
312
313 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
314     pa_assert(PA_REFCNT_VALUE(a) > 0);
315
316     pa_asyncq_write_after_poll(a->asyncq);
317 }
318
319 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
320
321     if (object)
322         return object->process_msg(object, code, userdata, offset, memchunk);
323
324     return 0;
325 }