Git init
[framework/multimedia/pulseaudio.git] / src / pulsecore / asyncmsgq.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2006 Lennart Poettering
5
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <unistd.h>
27 #include <errno.h>
28
29 #include <pulse/xmalloc.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/macro.h>
33 #include <pulsecore/log.h>
34 #include <pulsecore/thread.h>
35 #include <pulsecore/semaphore.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/core-util.h>
38 #include <pulsecore/flist.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
43 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
44
45 struct asyncmsgq_item {
46     int code;
47     pa_msgobject *object;
48     void *userdata;
49     pa_free_cb_t free_cb;
50     int64_t offset;
51     pa_memchunk memchunk;
52     pa_semaphore *semaphore;
53     int ret;
54 };
55
56 struct pa_asyncmsgq {
57     PA_REFCNT_DECLARE;
58     pa_asyncq *asyncq;
59     pa_mutex *mutex; /* only for the writer side */
60
61     struct asyncmsgq_item *current;
62 };
63
64 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
65     pa_asyncmsgq *a;
66
67     a = pa_xnew(pa_asyncmsgq, 1);
68
69     PA_REFCNT_INIT(a);
70     pa_assert_se(a->asyncq = pa_asyncq_new(size));
71     pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
72     a->current = NULL;
73
74     return a;
75 }
76
77 static void asyncmsgq_free(pa_asyncmsgq *a) {
78     struct asyncmsgq_item *i;
79     pa_assert(a);
80
81     while ((i = pa_asyncq_pop(a->asyncq, FALSE))) {
82
83         pa_assert(!i->semaphore);
84
85         if (i->object)
86             pa_msgobject_unref(i->object);
87
88         if (i->memchunk.memblock)
89             pa_memblock_unref(i->memchunk.memblock);
90
91         if (i->free_cb)
92             i->free_cb(i->userdata);
93
94         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
95             pa_xfree(i);
96     }
97
98     pa_asyncq_free(a->asyncq, NULL);
99     pa_mutex_free(a->mutex);
100     pa_xfree(a);
101 }
102
103 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
104     pa_assert(PA_REFCNT_VALUE(q) > 0);
105
106     PA_REFCNT_INC(q);
107     return q;
108 }
109
110 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
111     pa_assert(PA_REFCNT_VALUE(q) > 0);
112
113     if (PA_REFCNT_DEC(q) <= 0)
114         asyncmsgq_free(q);
115 }
116
117 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
118     struct asyncmsgq_item *i;
119     pa_assert(PA_REFCNT_VALUE(a) > 0);
120
121     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
122         i = pa_xnew(struct asyncmsgq_item, 1);
123
124     i->code = code;
125     i->object = object ? pa_msgobject_ref(object) : NULL;
126     i->userdata = (void*) userdata;
127     i->free_cb = free_cb;
128     i->offset = offset;
129     if (chunk) {
130         pa_assert(chunk->memblock);
131         i->memchunk = *chunk;
132         pa_memblock_ref(i->memchunk.memblock);
133     } else
134         pa_memchunk_reset(&i->memchunk);
135     i->semaphore = NULL;
136
137     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
138     pa_mutex_lock(a->mutex);
139     pa_asyncq_post(a->asyncq, i);
140     pa_mutex_unlock(a->mutex);
141 }
142
143 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
144     struct asyncmsgq_item i;
145     pa_assert(PA_REFCNT_VALUE(a) > 0);
146
147     i.code = code;
148     i.object = object;
149     i.userdata = (void*) userdata;
150     i.free_cb = NULL;
151     i.ret = -1;
152     i.offset = offset;
153     if (chunk) {
154         pa_assert(chunk->memblock);
155         i.memchunk = *chunk;
156     } else
157         pa_memchunk_reset(&i.memchunk);
158
159     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
160         i.semaphore = pa_semaphore_new(0);
161
162     pa_assert_se(i.semaphore);
163
164     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
165     pa_mutex_lock(a->mutex);
166     pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
167     pa_mutex_unlock(a->mutex);
168
169     pa_semaphore_wait(i.semaphore);
170
171     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
172         pa_semaphore_free(i.semaphore);
173
174     return i.ret;
175 }
176
177 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait_op) {
178     pa_assert(PA_REFCNT_VALUE(a) > 0);
179     pa_assert(!a->current);
180
181     if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
182 /*         pa_log("failure"); */
183         return -1;
184     }
185
186 /*     pa_log("success"); */
187
188     if (code)
189         *code = a->current->code;
190     if (userdata)
191         *userdata = a->current->userdata;
192     if (offset)
193         *offset = a->current->offset;
194     if (object) {
195         if ((*object = a->current->object))
196             pa_msgobject_assert_ref(*object);
197     }
198     if (chunk)
199         *chunk = a->current->memchunk;
200
201 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
202 /*                  (void*) a, */
203 /*                  (void*) a->current->object, */
204 /*                  a->current->object ? a->current->object->parent.type_name : NULL, */
205 /*                  a->current->code, */
206 /*                  (void*) a->current->userdata, */
207 /*                  (unsigned long) a->current->memchunk.length); */
208
209     return 0;
210 }
211
212 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
213     pa_assert(PA_REFCNT_VALUE(a) > 0);
214     pa_assert(a);
215     pa_assert(a->current);
216
217     if (a->current->semaphore) {
218         a->current->ret = ret;
219         pa_semaphore_post(a->current->semaphore);
220     } else {
221
222         if (a->current->free_cb)
223             a->current->free_cb(a->current->userdata);
224
225         if (a->current->object)
226             pa_msgobject_unref(a->current->object);
227
228         if (a->current->memchunk.memblock)
229             pa_memblock_unref(a->current->memchunk.memblock);
230
231         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
232             pa_xfree(a->current);
233     }
234
235     a->current = NULL;
236 }
237
238 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
239     int c;
240     pa_assert(PA_REFCNT_VALUE(a) > 0);
241
242     pa_asyncmsgq_ref(a);
243
244     do {
245         pa_msgobject *o;
246         void *data;
247         int64_t offset;
248         pa_memchunk chunk;
249         int ret;
250
251         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, TRUE) < 0)
252             return -1;
253
254         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
255         pa_asyncmsgq_done(a, ret);
256
257     } while (c != code);
258
259     pa_asyncmsgq_unref(a);
260
261     return 0;
262 }
263
264 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
265     pa_msgobject *object;
266     int code;
267     void *data;
268     pa_memchunk chunk;
269     int64_t offset;
270     int ret;
271
272     pa_assert(PA_REFCNT_VALUE(a) > 0);
273
274     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
275         return 0;
276
277     pa_asyncmsgq_ref(a);
278     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
279     pa_asyncmsgq_done(a, ret);
280     pa_asyncmsgq_unref(a);
281
282     return 1;
283 }
284
285 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
286     pa_assert(PA_REFCNT_VALUE(a) > 0);
287
288     return pa_asyncq_read_fd(a->asyncq);
289 }
290
291 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
292     pa_assert(PA_REFCNT_VALUE(a) > 0);
293
294     return pa_asyncq_read_before_poll(a->asyncq);
295 }
296
297 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
298     pa_assert(PA_REFCNT_VALUE(a) > 0);
299
300     pa_asyncq_read_after_poll(a->asyncq);
301 }
302
303 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
304     pa_assert(PA_REFCNT_VALUE(a) > 0);
305
306     return pa_asyncq_write_fd(a->asyncq);
307 }
308
309 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
310     pa_assert(PA_REFCNT_VALUE(a) > 0);
311
312     pa_asyncq_write_before_poll(a->asyncq);
313 }
314
315 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
316     pa_assert(PA_REFCNT_VALUE(a) > 0);
317
318     pa_asyncq_write_after_poll(a->asyncq);
319 }
320
321 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
322
323     if (object)
324         return object->process_msg(object, code, userdata, offset, memchunk);
325
326     return 0;
327 }
328
329 void pa_asyncmsgq_flush(pa_asyncmsgq *a, pa_bool_t run) {
330     pa_assert(PA_REFCNT_VALUE(a) > 0);
331
332     for (;;) {
333         pa_msgobject *object;
334         int code;
335         void *data;
336         int64_t offset;
337         pa_memchunk chunk;
338         int ret;
339
340         if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
341             return;
342
343         if (!run) {
344             pa_asyncmsgq_done(a, -1);
345             continue;
346         }
347
348         pa_asyncmsgq_ref(a);
349         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
350         pa_asyncmsgq_done(a, ret);
351         pa_asyncmsgq_unref(a);
352     }
353 }
354
355 pa_bool_t pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
356     pa_assert(PA_REFCNT_VALUE(a) > 0);
357
358     return !!a->current;
359 }