Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / asyncmsgq.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2006 Lennart Poettering
7
8   PulseAudio is free software; you can redistribute it and/or modify
9   it under the terms of the GNU Lesser General Public License as
10   published by the Free Software Foundation; either version 2.1 of the
11   License, or (at your option) any later version.
12
13   PulseAudio is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public
19   License along with PulseAudio; if not, write to the Free Software
20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21   USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/semaphore.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/flist.h>
38 #include <pulse/xmalloc.h>
39
40 #include "asyncmsgq.h"
41
42 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0);
43
44 struct asyncmsgq_item {
45     int code;
46     pa_msgobject *object;
47     void *userdata;
48     pa_free_cb_t free_cb;
49     pa_memchunk memchunk;
50     pa_semaphore *semaphore;
51 };
52
53 struct pa_asyncmsgq {
54     pa_asyncq *asyncq;
55     pa_mutex *mutex; /* only for the writer side */
56
57     struct asyncmsgq_item *current;
58 };
59
60 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
61     pa_asyncmsgq *a;
62
63     a = pa_xnew(pa_asyncmsgq, 1);
64
65     pa_assert_se(a->asyncq = pa_asyncq_new(size));
66     pa_assert_se(a->mutex = pa_mutex_new(0));
67     a->current = NULL;
68     
69     return a;
70 }
71
72 void pa_asyncmsgq_free(pa_asyncmsgq *a) {
73     struct asyncmsgq_item *i;
74     pa_assert(a);
75
76     while ((i = pa_asyncq_pop(a->asyncq, 0))) {
77
78         pa_assert(!i->semaphore);
79
80         if (i->object)
81             pa_msgobject_unref(i->object);
82
83         if (i->memchunk.memblock)
84             pa_memblock_unref(i->object);
85         
86         if (i->userdata_free_cb)
87             i->userdata_free_cb(i->userdata);
88         
89         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
90             pa_xfree(i);
91     }
92
93     pa_asyncq_free(a->asyncq, NULL);
94     pa_mutex_free(a->mutex);
95     pa_xfree(a);
96 }
97
98 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
99     struct asyncmsgq_item *i;
100     pa_assert(a);
101
102     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
103         i = pa_xnew(struct asyncmsgq_item, 1);
104
105     i->code = code;
106     i->object = pa_msgobject_ref(object);
107     i->userdata = (void*) userdata;
108     i->free_cb = free_cb;
109     if (chunk) {
110         pa_assert(chunk->memblock);
111         i->memchunk = *chunk;
112         pa_memblock_ref(i->memchunk.memblock);
113     } else
114         pa_memchunk_reset(&i->memchunk);
115     i->semaphore = NULL;
116
117     /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
118     pa_mutex_lock(a->mutex);
119     pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
120     pa_mutex_unlock(a->mutex);
121 }
122
123 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, const pa_memchunk *chunk) {
124     struct asyncmsgq_item i;
125     pa_assert(a);
126
127     i.code = code;
128     i.object = object;
129     i.userdata = (void*) userdata;
130     i.free_cb = NULL;
131     i.ret = -1;
132     if (chunk) {
133         pa_assert(chunk->memblock);
134         i->memchunk = *chunk;
135     } else
136         pa_memchunk_reset(&i->memchunk);
137     pa_assert_se(i.semaphore = pa_semaphore_new(0));
138
139     /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
140     pa_mutex_lock(a->mutex);
141     pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
142     pa_mutex_unlock(a->mutex);
143
144     pa_semaphore_wait(i.semaphore);
145     pa_semaphore_free(i.semaphore);
146
147     return i.ret;
148 }
149
150 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, pa_memchunk *chunk, int wait) {
151     pa_assert(a);
152     pa_assert(code);
153     pa_assert(!a->current);
154
155     if (!(a->current = pa_asyncq_pop(a->asyncq, wait)))
156         return -1;
157
158     *code = a->current->code;
159     if (userdata)
160         *userdata = a->current->userdata;
161     if (object)
162         *object = a->current->object;
163     if (chunk)
164         *chunk = a->chunk;
165     
166     return 0;
167 }
168
169 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
170     pa_assert(a);
171     pa_assert(a->current);
172
173     if (a->current->semaphore) {
174         a->current->ret = ret;
175         pa_semaphore_post(a->current->semaphore);
176     } else {
177
178         if (a->current->free_cb)
179             a->current->free_cb(a->current->userdata);
180
181         if (a->current->object)
182             pa_msgobject_unref(a->current->object);
183
184         if (a->current->memchunk.memblock)
185             pa_memblock_unref(a->current->memchunk.memblock);
186         
187         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
188             pa_xfree(a->current);
189     }
190
191     a->current = NULL;
192 }
193
194 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
195     int c;
196     pa_assert(a);
197     
198     do {
199         
200         if (pa_asyncmsgq_get(a, NULL, &c, NULL, 1) < 0)
201             return -1;
202
203         pa_asyncmsgq_done(a);
204         
205     } while (c != code);
206
207     return 0;
208 }
209
210 int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
211     pa_assert(a);
212
213     return pa_asyncq_get_fd(a->asyncq);
214 }
215
216 int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
217     pa_assert(a);
218
219     return pa_asyncq_before_poll(a->asyncq);
220 }
221
222 void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
223     pa_assert(a);
224
225     pa_asyncq_after_poll(a->asyncq);
226 }
227
228 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, pa_memchunk *memchunk) {
229     pa_assert(q);
230
231     if (object)
232         return object->msg_process(object, code, userdata, memchunk);
233
234     return 0;
235 }