4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
40 #include "asyncmsgq.h"
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0);
44 struct asyncmsgq_item {
50 pa_semaphore *semaphore;
55 pa_mutex *mutex; /* only for the writer side */
57 struct asyncmsgq_item *current;
60 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
63 a = pa_xnew(pa_asyncmsgq, 1);
65 pa_assert_se(a->asyncq = pa_asyncq_new(size));
66 pa_assert_se(a->mutex = pa_mutex_new(0));
72 void pa_asyncmsgq_free(pa_asyncmsgq *a) {
73 struct asyncmsgq_item *i;
76 while ((i = pa_asyncq_pop(a->asyncq, 0))) {
78 pa_assert(!i->semaphore);
81 pa_msgobject_unref(i->object);
83 if (i->memchunk.memblock)
84 pa_memblock_unref(i->object);
86 if (i->userdata_free_cb)
87 i->userdata_free_cb(i->userdata);
89 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
93 pa_asyncq_free(a->asyncq, NULL);
94 pa_mutex_free(a->mutex);
98 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
99 struct asyncmsgq_item *i;
102 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
103 i = pa_xnew(struct asyncmsgq_item, 1);
106 i->object = pa_msgobject_ref(object);
107 i->userdata = (void*) userdata;
108 i->free_cb = free_cb;
110 pa_assert(chunk->memblock);
111 i->memchunk = *chunk;
112 pa_memblock_ref(i->memchunk.memblock);
114 pa_memchunk_reset(&i->memchunk);
117 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
118 pa_mutex_lock(a->mutex);
119 pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
120 pa_mutex_unlock(a->mutex);
123 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk) {
124 struct asyncmsgq_item i;
129 i.userdata = (void*) userdata;
133 pa_assert(chunk->memblock);
134 i->memchunk = *chunk;
136 pa_memchunk_reset(&i->memchunk);
137 pa_assert_se(i.semaphore = pa_semaphore_new(0));
139 /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
140 pa_mutex_lock(a->mutex);
141 pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
142 pa_mutex_unlock(a->mutex);
144 pa_semaphore_wait(i.semaphore);
145 pa_semaphore_free(i.semaphore);
150 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, pa_memchunk *chunk, int wait) {
153 pa_assert(!a->current);
155 if (!(a->current = pa_asyncq_pop(a->asyncq, wait)))
158 *code = a->current->code;
160 *userdata = a->current->userdata;
162 *object = a->current->object;
169 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
171 pa_assert(a->current);
173 if (a->current->semaphore) {
174 a->current->ret = ret;
175 pa_semaphore_post(a->current->semaphore);
178 if (a->current->free_cb)
179 a->current->free_cb(a->current->userdata);
181 if (a->current->object)
182 pa_msgobject_unref(a->current->object);
184 if (a->current->memchunk.memblock)
185 pa_memblock_unref(a->current->memchunk.memblock);
187 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
188 pa_xfree(a->current);
194 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
200 if (pa_asyncmsgq_get(a, NULL, &c, NULL, 1) < 0)
203 pa_asyncmsgq_done(a);
210 int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
213 return pa_asyncq_get_fd(a->asyncq);
216 int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
219 return pa_asyncq_before_poll(a->asyncq);
222 void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
225 pa_asyncq_after_poll(a->asyncq);
228 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
232 return object->msg_process(object, code, userdata, memchunk);