2 This file is part of PulseAudio.
4 Copyright 2006 Lennart Poettering
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
27 #include <pulse/xmalloc.h>
29 #include <pulsecore/macro.h>
30 #include <pulsecore/log.h>
31 #include <pulsecore/semaphore.h>
32 #include <pulsecore/macro.h>
33 #include <pulsecore/mutex.h>
34 #include <pulsecore/flist.h>
36 #include "asyncmsgq.h"
38 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
39 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
41 struct asyncmsgq_item {
48 pa_semaphore *semaphore;
55 pa_mutex *mutex; /* only for the writer side */
57 struct asyncmsgq_item *current;
60 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
64 asyncq = pa_asyncq_new(size);
68 a = pa_xnew(pa_asyncmsgq, 1);
72 pa_assert_se(a->mutex = pa_mutex_new(false, true));
78 static void asyncmsgq_free(pa_asyncmsgq *a) {
79 struct asyncmsgq_item *i;
82 while ((i = pa_asyncq_pop(a->asyncq, false))) {
84 pa_assert(!i->semaphore);
87 pa_msgobject_unref(i->object);
89 if (i->memchunk.memblock)
90 pa_memblock_unref(i->memchunk.memblock);
93 i->free_cb(i->userdata);
95 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
99 pa_asyncq_free(a->asyncq, NULL);
100 pa_mutex_free(a->mutex);
104 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
105 pa_assert(PA_REFCNT_VALUE(q) > 0);
111 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
112 pa_assert(PA_REFCNT_VALUE(q) > 0);
114 if (PA_REFCNT_DEC(q) <= 0)
118 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
119 struct asyncmsgq_item *i;
120 pa_assert(PA_REFCNT_VALUE(a) > 0);
122 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
123 i = pa_xnew(struct asyncmsgq_item, 1);
126 i->object = object ? pa_msgobject_ref(object) : NULL;
127 i->userdata = (void*) userdata;
128 i->free_cb = free_cb;
131 pa_assert(chunk->memblock);
132 i->memchunk = *chunk;
133 pa_memblock_ref(i->memchunk.memblock);
135 pa_memchunk_reset(&i->memchunk);
138 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
139 pa_mutex_lock(a->mutex);
140 pa_asyncq_post(a->asyncq, i);
141 pa_mutex_unlock(a->mutex);
144 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
145 struct asyncmsgq_item i;
146 pa_assert(PA_REFCNT_VALUE(a) > 0);
150 i.userdata = (void*) userdata;
155 pa_assert(chunk->memblock);
158 pa_memchunk_reset(&i.memchunk);
160 if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
161 i.semaphore = pa_semaphore_new(0);
163 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
164 pa_mutex_lock(a->mutex);
165 pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
166 pa_mutex_unlock(a->mutex);
168 pa_semaphore_wait(i.semaphore);
170 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
171 pa_semaphore_free(i.semaphore);
176 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
177 pa_assert(PA_REFCNT_VALUE(a) > 0);
178 pa_assert(!a->current);
180 if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
181 /* pa_log("failure"); */
185 /* pa_log("success"); */
188 *code = a->current->code;
190 *userdata = a->current->userdata;
192 *offset = a->current->offset;
194 if ((*object = a->current->object))
195 pa_msgobject_assert_ref(*object);
198 *chunk = a->current->memchunk;
200 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
202 /* (void*) a->current->object, */
203 /* a->current->object ? a->current->object->parent.type_name : NULL, */
204 /* a->current->code, */
205 /* (void*) a->current->userdata, */
206 /* (unsigned long) a->current->memchunk.length); */
211 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
212 pa_assert(PA_REFCNT_VALUE(a) > 0);
214 pa_assert(a->current);
216 if (a->current->semaphore) {
217 a->current->ret = ret;
218 pa_semaphore_post(a->current->semaphore);
221 if (a->current->free_cb)
222 a->current->free_cb(a->current->userdata);
224 if (a->current->object)
225 pa_msgobject_unref(a->current->object);
227 if (a->current->memchunk.memblock)
228 pa_memblock_unref(a->current->memchunk.memblock);
230 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
231 pa_xfree(a->current);
237 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
239 pa_assert(PA_REFCNT_VALUE(a) > 0);
250 if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
253 ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
254 pa_asyncmsgq_done(a, ret);
258 pa_asyncmsgq_unref(a);
263 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
264 pa_msgobject *object;
271 pa_assert(PA_REFCNT_VALUE(a) > 0);
273 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
277 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
278 pa_asyncmsgq_done(a, ret);
279 pa_asyncmsgq_unref(a);
284 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
285 pa_assert(PA_REFCNT_VALUE(a) > 0);
287 return pa_asyncq_read_fd(a->asyncq);
290 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
291 pa_assert(PA_REFCNT_VALUE(a) > 0);
293 return pa_asyncq_read_before_poll(a->asyncq);
296 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
297 pa_assert(PA_REFCNT_VALUE(a) > 0);
299 pa_asyncq_read_after_poll(a->asyncq);
302 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
303 pa_assert(PA_REFCNT_VALUE(a) > 0);
305 return pa_asyncq_write_fd(a->asyncq);
308 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
309 pa_assert(PA_REFCNT_VALUE(a) > 0);
311 pa_asyncq_write_before_poll(a->asyncq);
314 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
315 pa_assert(PA_REFCNT_VALUE(a) > 0);
317 pa_asyncq_write_after_poll(a->asyncq);
320 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
323 return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
328 void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
329 pa_assert(PA_REFCNT_VALUE(a) > 0);
332 pa_msgobject *object;
339 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
343 pa_asyncmsgq_done(a, -1);
348 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
349 pa_asyncmsgq_done(a, ret);
350 pa_asyncmsgq_unref(a);
354 bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
355 pa_assert(PA_REFCNT_VALUE(a) > 0);