add suspend_within_thread() callbacks to pa_sink_input/pa_source_output
[platform/upstream/pulseaudio.git] / src / pulsecore / source.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <pulse/utf8.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/timeval.h>
34 #include <pulse/util.h>
35
36 #include <pulsecore/source-output.h>
37 #include <pulsecore/namereg.h>
38 #include <pulsecore/core-subscribe.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/sample-util.h>
41
42 #include "source.h"
43
44 #define ABSOLUTE_MIN_LATENCY (500)
45 #define ABSOLUTE_MAX_LATENCY (10*PA_USEC_PER_SEC)
46
47 static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
48
49 static void source_free(pa_object *o);
50
51 pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) {
52     pa_assert(data);
53
54     memset(data, 0, sizeof(*data));
55     data->proplist = pa_proplist_new();
56
57     return data;
58 }
59
60 void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) {
61     pa_assert(data);
62
63     pa_xfree(data->name);
64     data->name = pa_xstrdup(name);
65 }
66
67 void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) {
68     pa_assert(data);
69
70     if ((data->sample_spec_is_set = !!spec))
71         data->sample_spec = *spec;
72 }
73
74 void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) {
75     pa_assert(data);
76
77     if ((data->channel_map_is_set = !!map))
78         data->channel_map = *map;
79 }
80
81 void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) {
82     pa_assert(data);
83
84     if ((data->volume_is_set = !!volume))
85         data->volume = *volume;
86 }
87
88 void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) {
89     pa_assert(data);
90
91     data->muted_is_set = TRUE;
92     data->muted = !!mute;
93 }
94
95 void pa_source_new_data_done(pa_source_new_data *data) {
96     pa_assert(data);
97
98     pa_xfree(data->name);
99     pa_proplist_free(data->proplist);
100 }
101
102 /* Called from main context */
103 static void reset_callbacks(pa_source *s) {
104     pa_assert(s);
105
106     s->set_state = NULL;
107     s->get_volume = NULL;
108     s->set_volume = NULL;
109     s->get_mute = NULL;
110     s->set_mute = NULL;
111     s->update_requested_latency = NULL;
112 }
113
114 /* Called from main context */
115 pa_source* pa_source_new(
116         pa_core *core,
117         pa_source_new_data *data,
118         pa_source_flags_t flags) {
119
120     pa_source *s;
121     const char *name;
122     char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
123     char *pt;
124
125     pa_assert(core);
126     pa_assert(data);
127     pa_assert(data->name);
128
129     s = pa_msgobject_new(pa_source);
130
131     if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) {
132         pa_xfree(s);
133         return NULL;
134     }
135
136     pa_source_new_data_set_name(data, name);
137
138     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) {
139         pa_xfree(s);
140         pa_namereg_unregister(core, name);
141         return NULL;
142     }
143
144     pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver));
145     pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]);
146
147     pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec));
148
149     if (!data->channel_map_is_set)
150         pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT));
151
152     pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map));
153     pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels);
154
155     if (!data->volume_is_set)
156         pa_cvolume_reset(&data->volume, data->sample_spec.channels);
157
158     pa_return_null_if_fail(pa_cvolume_valid(&data->volume));
159     pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels);
160
161     if (!data->muted_is_set)
162         data->muted = FALSE;
163
164     if (data->card)
165         pa_proplist_update(data->proplist, PA_UPDATE_MERGE, data->card->proplist);
166
167     pa_device_init_description(data->proplist);
168     pa_device_init_icon(data->proplist, FALSE);
169
170     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) {
171         pa_xfree(s);
172         pa_namereg_unregister(core, name);
173         return NULL;
174     }
175
176     s->parent.parent.free = source_free;
177     s->parent.process_msg = pa_source_process_msg;
178
179     s->core = core;
180     s->state = PA_SOURCE_INIT;
181     s->flags = flags;
182     s->name = pa_xstrdup(name);
183     s->proplist = pa_proplist_copy(data->proplist);
184     s->driver = pa_xstrdup(pa_path_get_filename(data->driver));
185     s->module = data->module;
186     s->card = data->card;
187
188     s->sample_spec = data->sample_spec;
189     s->channel_map = data->channel_map;
190
191     s->outputs = pa_idxset_new(NULL, NULL);
192     s->n_corked = 0;
193     s->monitor_of = NULL;
194
195     s->virtual_volume = data->volume;
196     pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
197     s->base_volume = PA_VOLUME_NORM;
198     s->n_volume_steps = PA_VOLUME_NORM+1;
199     s->muted = data->muted;
200     s->refresh_volume = s->refresh_muted = FALSE;
201
202     reset_callbacks(s);
203     s->userdata = NULL;
204
205     s->asyncmsgq = NULL;
206     s->rtpoll = NULL;
207
208     pa_silence_memchunk_get(
209             &core->silence_cache,
210             core->mempool,
211             &s->silence,
212             &s->sample_spec,
213             0);
214
215     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
216     s->thread_info.soft_volume = s->soft_volume;
217     s->thread_info.soft_muted = s->muted;
218     s->thread_info.state = s->state;
219     s->thread_info.max_rewind = 0;
220     s->thread_info.requested_latency_valid = FALSE;
221     s->thread_info.requested_latency = 0;
222     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
223     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
224
225     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
226
227     if (s->card)
228         pa_assert_se(pa_idxset_put(s->card->sources, s, NULL) >= 0);
229
230     pt = pa_proplist_to_string_sep(s->proplist, "\n    ");
231     pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s\n    %s",
232                 s->index,
233                 s->name,
234                 pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec),
235                 pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map),
236                 pt);
237     pa_xfree(pt);
238
239     return s;
240 }
241
242 /* Called from main context */
243 static int source_set_state(pa_source *s, pa_source_state_t state) {
244     int ret;
245     pa_bool_t suspend_change;
246     pa_source_state_t original_state;
247
248     pa_assert(s);
249
250     if (s->state == state)
251         return 0;
252
253     original_state = s->state;
254
255     suspend_change =
256         (original_state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(state)) ||
257         (PA_SOURCE_IS_OPENED(original_state) && state == PA_SOURCE_SUSPENDED);
258
259     if (s->set_state)
260         if ((ret = s->set_state(s, state)) < 0)
261             return ret;
262
263     if (s->asyncmsgq)
264         if ((ret = pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL)) < 0) {
265
266             if (s->set_state)
267                 s->set_state(s, original_state);
268
269             return ret;
270         }
271
272     s->state = state;
273
274     if (state != PA_SOURCE_UNLINKED) { /* if we enter UNLINKED state pa_source_unlink() will fire the apropriate events */
275         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_STATE_CHANGED], s);
276         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
277     }
278
279     if (suspend_change) {
280         pa_source_output *o;
281         uint32_t idx;
282
283         /* We're suspending or resuming, tell everyone about it */
284
285         for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx)))
286             if (s->state == PA_SOURCE_SUSPENDED &&
287                 (o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND))
288                 pa_source_output_kill(o);
289             else if (o->suspend)
290                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
291     }
292
293
294     return 0;
295 }
296
297 /* Called from main context */
298 void pa_source_put(pa_source *s) {
299     pa_source_assert_ref(s);
300
301     pa_assert(s->state == PA_SOURCE_INIT);
302
303     /* The following fields must be initialized properly when calling _put() */
304     pa_assert(s->asyncmsgq);
305     pa_assert(s->rtpoll);
306     pa_assert(!s->thread_info.min_latency || !s->thread_info.max_latency ||
307               s->thread_info.min_latency <= s->thread_info.max_latency);
308
309     if (!(s->flags & PA_SOURCE_HW_VOLUME_CTRL)) {
310         s->flags |= PA_SOURCE_DECIBEL_VOLUME;
311
312         s->thread_info.soft_volume = s->soft_volume;
313         s->thread_info.soft_muted = s->muted;
314     }
315
316     if (s->flags & PA_SOURCE_DECIBEL_VOLUME)
317         s->n_volume_steps = PA_VOLUME_NORM+1;
318
319     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
320
321     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
322     pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s);
323 }
324
325 /* Called from main context */
326 void pa_source_unlink(pa_source *s) {
327     pa_bool_t linked;
328     pa_source_output *o, *j = NULL;
329
330     pa_assert(s);
331
332     /* See pa_sink_unlink() for a couple of comments how this function
333      * works. */
334
335     linked = PA_SOURCE_IS_LINKED(s->state);
336
337     if (linked)
338         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s);
339
340     if (s->state != PA_SOURCE_UNLINKED)
341         pa_namereg_unregister(s->core, s->name);
342     pa_idxset_remove_by_data(s->core->sources, s, NULL);
343
344     if (s->card)
345         pa_idxset_remove_by_data(s->card->sources, s, NULL);
346
347     while ((o = pa_idxset_first(s->outputs, NULL))) {
348         pa_assert(o != j);
349         pa_source_output_kill(o);
350         j = o;
351     }
352
353     if (linked)
354         source_set_state(s, PA_SOURCE_UNLINKED);
355     else
356         s->state = PA_SOURCE_UNLINKED;
357
358     reset_callbacks(s);
359
360     if (linked) {
361         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
362         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s);
363     }
364 }
365
366 /* Called from main context */
367 static void source_free(pa_object *o) {
368     pa_source_output *so;
369     pa_source *s = PA_SOURCE(o);
370
371     pa_assert(s);
372     pa_assert(pa_source_refcnt(s) == 0);
373
374     if (PA_SOURCE_IS_LINKED(s->state))
375         pa_source_unlink(s);
376
377     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
378
379     pa_idxset_free(s->outputs, NULL, NULL);
380
381     while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
382         pa_source_output_unref(so);
383
384     pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
385
386     if (s->silence.memblock)
387         pa_memblock_unref(s->silence.memblock);
388
389     pa_xfree(s->name);
390     pa_xfree(s->driver);
391
392     if (s->proplist)
393         pa_proplist_free(s->proplist);
394
395     pa_xfree(s);
396 }
397
398 /* Called from main context */
399 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
400     pa_source_assert_ref(s);
401
402     s->asyncmsgq = q;
403 }
404
405 /* Called from main context */
406 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
407     pa_source_assert_ref(s);
408
409     s->rtpoll = p;
410 }
411
412 /* Called from main context */
413 int pa_source_update_status(pa_source*s) {
414     pa_source_assert_ref(s);
415     pa_assert(PA_SOURCE_IS_LINKED(s->state));
416
417     if (s->state == PA_SOURCE_SUSPENDED)
418         return 0;
419
420     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
421 }
422
423 /* Called from main context */
424 int pa_source_suspend(pa_source *s, pa_bool_t suspend) {
425     pa_source_assert_ref(s);
426     pa_assert(PA_SOURCE_IS_LINKED(s->state));
427
428     if (s->monitor_of)
429         return -PA_ERR_NOTSUPPORTED;
430
431     if (suspend)
432         return source_set_state(s, PA_SOURCE_SUSPENDED);
433     else
434         return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
435 }
436
437 /* Called from main context */
438 int pa_source_sync_suspend(pa_source *s) {
439     pa_sink_state_t state;
440
441     pa_source_assert_ref(s);
442     pa_assert(PA_SOURCE_IS_LINKED(s->state));
443     pa_assert(s->monitor_of);
444
445     state = pa_sink_get_state(s->monitor_of);
446
447     if (state == PA_SINK_SUSPENDED)
448         return source_set_state(s, PA_SOURCE_SUSPENDED);
449
450     pa_assert(PA_SINK_IS_OPENED(state));
451
452     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
453 }
454
455 /* Called from main context */
456 pa_queue *pa_source_move_all_start(pa_source *s) {
457     pa_queue *q;
458     pa_source_output *o, *n;
459     uint32_t idx;
460
461     pa_source_assert_ref(s);
462     pa_assert(PA_SOURCE_IS_LINKED(s->state));
463
464     q = pa_queue_new();
465
466     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = n) {
467         n = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx));
468
469         if (pa_source_output_start_move(o) >= 0)
470             pa_queue_push(q, pa_source_output_ref(o));
471     }
472
473     return q;
474 }
475
476 /* Called from main context */
477 void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
478     pa_source_output *o;
479
480     pa_source_assert_ref(s);
481     pa_assert(PA_SOURCE_IS_LINKED(s->state));
482     pa_assert(q);
483
484     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
485         if (pa_source_output_finish_move(o, s, save) < 0)
486             pa_source_output_kill(o);
487
488         pa_source_output_unref(o);
489     }
490
491     pa_queue_free(q, NULL, NULL);
492 }
493
494 /* Called from main context */
495 void pa_source_move_all_fail(pa_queue *q) {
496     pa_source_output *o;
497     pa_assert(q);
498
499     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
500         if (pa_hook_fire(&o->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], o) == PA_HOOK_OK) {
501             pa_source_output_kill(o);
502             pa_source_output_unref(o);
503         }
504     }
505
506     pa_queue_free(q, NULL, NULL);
507 }
508
509 /* Called from IO thread context */
510 void pa_source_process_rewind(pa_source *s, size_t nbytes) {
511     pa_source_output *o;
512     void *state = NULL;
513
514     pa_source_assert_ref(s);
515     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
516
517     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
518         return;
519
520     if (nbytes <= 0)
521         return;
522
523     pa_log_debug("Processing rewind...");
524
525     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
526         pa_source_output_assert_ref(o);
527         pa_source_output_process_rewind(o, nbytes);
528     }
529 }
530
531 /* Called from IO thread context */
532 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
533     pa_source_output *o;
534     void *state = NULL;
535
536     pa_source_assert_ref(s);
537     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
538     pa_assert(chunk);
539
540     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
541         return;
542
543     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
544         pa_memchunk vchunk = *chunk;
545
546         pa_memblock_ref(vchunk.memblock);
547         pa_memchunk_make_writable(&vchunk, 0);
548
549         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
550             pa_silence_memchunk(&vchunk, &s->sample_spec);
551         else
552             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
553
554         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
555             pa_source_output_assert_ref(o);
556
557             if (!o->thread_info.direct_on_input)
558                 pa_source_output_push(o, &vchunk);
559         }
560
561         pa_memblock_unref(vchunk.memblock);
562     } else {
563
564         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
565             pa_source_output_assert_ref(o);
566
567             if (!o->thread_info.direct_on_input)
568                 pa_source_output_push(o, chunk);
569         }
570     }
571 }
572
573 /* Called from IO thread context */
574 void pa_source_post_direct(pa_source*s, pa_source_output *o, const pa_memchunk *chunk) {
575     pa_source_assert_ref(s);
576     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
577     pa_source_output_assert_ref(o);
578     pa_assert(o->thread_info.direct_on_input);
579     pa_assert(chunk);
580
581     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
582         return;
583
584     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
585         pa_memchunk vchunk = *chunk;
586
587         pa_memblock_ref(vchunk.memblock);
588         pa_memchunk_make_writable(&vchunk, 0);
589
590         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
591             pa_silence_memchunk(&vchunk, &s->sample_spec);
592         else
593             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
594
595         pa_source_output_push(o, &vchunk);
596
597         pa_memblock_unref(vchunk.memblock);
598     } else
599         pa_source_output_push(o, chunk);
600 }
601
602 /* Called from main thread */
603 pa_usec_t pa_source_get_latency(pa_source *s) {
604     pa_usec_t usec;
605
606     pa_source_assert_ref(s);
607     pa_assert(PA_SOURCE_IS_LINKED(s->state));
608
609     if (s->state == PA_SOURCE_SUSPENDED)
610         return 0;
611
612     if (!(s->flags & PA_SOURCE_LATENCY))
613         return 0;
614
615     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) == 0);
616
617     return usec;
618 }
619
620 /* Called from IO thread */
621 pa_usec_t pa_source_get_latency_within_thread(pa_source *s) {
622     pa_usec_t usec = 0;
623     pa_msgobject *o;
624
625     pa_source_assert_ref(s);
626     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
627
628     /* The returned value is supposed to be in the time domain of the sound card! */
629
630     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
631         return 0;
632
633     if (!(s->flags & PA_SOURCE_LATENCY))
634         return 0;
635
636     o = PA_MSGOBJECT(s);
637
638     /* We probably should make this a proper vtable callback instead of going through process_msg() */
639
640     if (o->process_msg(o, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
641         return -1;
642
643     return usec;
644 }
645
646 /* Called from main thread */
647 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
648     pa_cvolume old_virtual_volume;
649     pa_bool_t virtual_volume_changed;
650
651     pa_source_assert_ref(s);
652     pa_assert(PA_SOURCE_IS_LINKED(s->state));
653     pa_assert(volume);
654     pa_assert(pa_cvolume_valid(volume));
655     pa_assert(pa_cvolume_compatible(volume, &s->sample_spec));
656
657     old_virtual_volume = s->virtual_volume;
658     s->virtual_volume = *volume;
659     virtual_volume_changed = !pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume);
660
661     if (s->set_volume) {
662         pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
663         s->set_volume(s);
664     } else
665         s->soft_volume = s->virtual_volume;
666
667     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
668
669     if (virtual_volume_changed)
670         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
671 }
672
673 /* Called from main thread. Only to be called by source implementor */
674 void pa_source_set_soft_volume(pa_source *s, const pa_cvolume *volume) {
675     pa_source_assert_ref(s);
676     pa_assert(volume);
677
678     if (PA_SOURCE_IS_LINKED(s->state))
679         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
680     else
681         s->thread_info.soft_volume = *volume;
682 }
683
684 /* Called from main thread */
685 const pa_cvolume *pa_source_get_volume(pa_source *s, pa_bool_t force_refresh) {
686     pa_source_assert_ref(s);
687     pa_assert(PA_SOURCE_IS_LINKED(s->state));
688
689     if (s->refresh_volume || force_refresh) {
690         pa_cvolume old_virtual_volume = s->virtual_volume;
691
692         if (s->get_volume)
693             s->get_volume(s);
694
695         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, NULL, 0, NULL) == 0);
696
697         if (!pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume))
698             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
699     }
700
701     return &s->virtual_volume;
702 }
703
704 /* Called from main thread */
705 void pa_source_volume_changed(pa_source *s, const pa_cvolume *new_volume) {
706     pa_source_assert_ref(s);
707
708     /* The source implementor may call this if the volume changed to make sure everyone is notified */
709
710     if (pa_cvolume_equal(&s->virtual_volume, new_volume))
711         return;
712
713     s->virtual_volume = *new_volume;
714     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
715 }
716
717 /* Called from main thread */
718 void pa_source_set_mute(pa_source *s, pa_bool_t mute) {
719     pa_bool_t old_muted;
720
721     pa_source_assert_ref(s);
722     pa_assert(PA_SOURCE_IS_LINKED(s->state));
723
724     old_muted = s->muted;
725     s->muted = mute;
726
727     if (s->set_mute)
728         s->set_mute(s);
729
730     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
731
732     if (old_muted != s->muted)
733         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
734 }
735
736 /* Called from main thread */
737 pa_bool_t pa_source_get_mute(pa_source *s, pa_bool_t force_refresh) {
738     pa_source_assert_ref(s);
739     pa_assert(PA_SOURCE_IS_LINKED(s->state));
740
741     if (s->refresh_muted || force_refresh) {
742         pa_bool_t old_muted = s->muted;
743
744         if (s->get_mute)
745             s->get_mute(s);
746
747         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, NULL, 0, NULL) == 0);
748
749         if (old_muted != s->muted)
750             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
751     }
752
753     return s->muted;
754 }
755
756 /* Called from main thread */
757 void pa_source_mute_changed(pa_source *s, pa_bool_t new_muted) {
758     pa_source_assert_ref(s);
759
760     /* The source implementor may call this if the mute state changed to make sure everyone is notified */
761
762     if (s->muted == new_muted)
763         return;
764
765     s->muted = new_muted;
766     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
767 }
768
769 /* Called from main thread */
770 pa_bool_t pa_source_update_proplist(pa_source *s, pa_update_mode_t mode, pa_proplist *p) {
771     pa_source_assert_ref(s);
772
773     if (p)
774         pa_proplist_update(s->proplist, mode, p);
775
776     if (PA_SOURCE_IS_LINKED(s->state)) {
777         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
778         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
779     }
780
781     return TRUE;
782 }
783
784 /* Called from main thread */
785 void pa_source_set_description(pa_source *s, const char *description) {
786     const char *old;
787     pa_source_assert_ref(s);
788
789     if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION))
790         return;
791
792     old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
793
794     if (old && description && !strcmp(old, description))
795         return;
796
797     if (description)
798         pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description);
799     else
800         pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
801
802     if (PA_SOURCE_IS_LINKED(s->state)) {
803         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
804         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
805     }
806 }
807
808 /* Called from main thread */
809 unsigned pa_source_linked_by(pa_source *s) {
810     pa_source_assert_ref(s);
811     pa_assert(PA_SOURCE_IS_LINKED(s->state));
812
813     return pa_idxset_size(s->outputs);
814 }
815
816 /* Called from main thread */
817 unsigned pa_source_used_by(pa_source *s) {
818     unsigned ret;
819
820     pa_source_assert_ref(s);
821     pa_assert(PA_SOURCE_IS_LINKED(s->state));
822
823     ret = pa_idxset_size(s->outputs);
824     pa_assert(ret >= s->n_corked);
825
826     return ret - s->n_corked;
827 }
828
829 /* Called from main thread */
830 unsigned pa_source_check_suspend(pa_source *s) {
831     unsigned ret;
832     pa_source_output *o;
833     uint32_t idx;
834
835     pa_source_assert_ref(s);
836
837     if (!PA_SOURCE_IS_LINKED(s->state))
838         return 0;
839
840     ret = 0;
841
842     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx))) {
843         pa_source_output_state_t st;
844
845         st = pa_source_output_get_state(o);
846         pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(st));
847
848         if (st == PA_SOURCE_OUTPUT_CORKED)
849             continue;
850
851         if (o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND)
852             continue;
853
854         ret ++;
855     }
856
857     return ret;
858 }
859
860 /* Called from IO thread, except when it is not */
861 int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
862     pa_source *s = PA_SOURCE(object);
863     pa_source_assert_ref(s);
864
865     switch ((pa_source_message_t) code) {
866
867         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
868             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
869
870             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
871
872             if (o->direct_on_input) {
873                 o->thread_info.direct_on_input = o->direct_on_input;
874                 pa_hashmap_put(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index), o);
875             }
876
877             pa_assert(!o->thread_info.attached);
878             o->thread_info.attached = TRUE;
879
880             if (o->attach)
881                 o->attach(o);
882
883             pa_source_output_set_state_within_thread(o, o->state);
884
885             if (o->thread_info.requested_source_latency != (pa_usec_t) -1)
886                 pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
887
888             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
889
890             /* We don't just invalidate the requested latency here,
891              * because if we are in a move we might need to fix up the
892              * requested latency. */
893             pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
894
895             return 0;
896         }
897
898         case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
899             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
900
901             pa_source_output_set_state_within_thread(o, o->state);
902
903             if (o->detach)
904                 o->detach(o);
905
906             pa_assert(o->thread_info.attached);
907             o->thread_info.attached = FALSE;
908
909             if (o->thread_info.direct_on_input) {
910                 pa_hashmap_remove(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index));
911                 o->thread_info.direct_on_input = NULL;
912             }
913
914             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
915                 pa_source_output_unref(o);
916
917             pa_source_invalidate_requested_latency(s);
918
919             return 0;
920         }
921
922         case PA_SOURCE_MESSAGE_SET_VOLUME:
923             s->thread_info.soft_volume = s->soft_volume;
924             return 0;
925
926         case PA_SOURCE_MESSAGE_GET_VOLUME:
927             return 0;
928
929         case PA_SOURCE_MESSAGE_SET_MUTE:
930             s->thread_info.soft_muted = s->muted;
931             return 0;
932
933         case PA_SOURCE_MESSAGE_GET_MUTE:
934             return 0;
935
936         case PA_SOURCE_MESSAGE_SET_STATE: {
937
938             pa_bool_t suspend_change =
939                 (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
940                 (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
941
942             s->thread_info.state = PA_PTR_TO_UINT(userdata);
943
944             if (suspend_change) {
945                 pa_source_output *o;
946                 void *state = NULL;
947
948                 while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
949                     if (o->suspend_within_thread)
950                         o->suspend_within_thread(o, s->thread_info.state == PA_SOURCE_SUSPENDED);
951             }
952
953
954             return 0;
955         }
956
957         case PA_SOURCE_MESSAGE_DETACH:
958
959             /* Detach all streams */
960             pa_source_detach_within_thread(s);
961             return 0;
962
963         case PA_SOURCE_MESSAGE_ATTACH:
964
965             /* Reattach all streams */
966             pa_source_attach_within_thread(s);
967             return 0;
968
969         case PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY: {
970
971             pa_usec_t *usec = userdata;
972             *usec = pa_source_get_requested_latency_within_thread(s);
973
974             if (*usec == (pa_usec_t) -1)
975                 *usec = s->thread_info.max_latency;
976
977             return 0;
978         }
979
980         case PA_SOURCE_MESSAGE_SET_LATENCY_RANGE: {
981             pa_usec_t *r = userdata;
982
983             pa_source_set_latency_range_within_thread(s, r[0], r[1]);
984
985             return 0;
986         }
987
988         case PA_SOURCE_MESSAGE_GET_LATENCY_RANGE: {
989             pa_usec_t *r = userdata;
990
991             r[0] = s->thread_info.min_latency;
992             r[1] = s->thread_info.max_latency;
993
994             return 0;
995         }
996
997         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
998
999             *((size_t*) userdata) = s->thread_info.max_rewind;
1000             return 0;
1001
1002         case PA_SOURCE_MESSAGE_SET_MAX_REWIND:
1003
1004             pa_source_set_max_rewind_within_thread(s, (size_t) offset);
1005             return 0;
1006
1007         case PA_SOURCE_MESSAGE_GET_LATENCY:
1008
1009             if (s->monitor_of) {
1010                 *((pa_usec_t*) userdata) = 0;
1011                 return 0;
1012             }
1013
1014             /* Implementors need to overwrite this implementation! */
1015             return -1;
1016
1017         case PA_SOURCE_MESSAGE_MAX:
1018             ;
1019     }
1020
1021     return -1;
1022 }
1023
1024 /* Called from main thread */
1025 int pa_source_suspend_all(pa_core *c, pa_bool_t suspend) {
1026     uint32_t idx;
1027     pa_source *source;
1028     int ret = 0;
1029
1030     pa_core_assert_ref(c);
1031
1032     for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) {
1033         int r;
1034
1035         if (source->monitor_of)
1036             continue;
1037
1038         if ((r = pa_source_suspend(source, suspend)) < 0)
1039             ret = r;
1040     }
1041
1042     return ret;
1043 }
1044
1045 /* Called from main thread */
1046 void pa_source_detach(pa_source *s) {
1047     pa_source_assert_ref(s);
1048     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1049
1050     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL) == 0);
1051 }
1052
1053 /* Called from main thread */
1054 void pa_source_attach(pa_source *s) {
1055     pa_source_assert_ref(s);
1056     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1057
1058     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL) == 0);
1059 }
1060
1061 /* Called from IO thread */
1062 void pa_source_detach_within_thread(pa_source *s) {
1063     pa_source_output *o;
1064     void *state = NULL;
1065
1066     pa_source_assert_ref(s);
1067     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1068
1069     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1070         if (o->detach)
1071             o->detach(o);
1072 }
1073
1074 /* Called from IO thread */
1075 void pa_source_attach_within_thread(pa_source *s) {
1076     pa_source_output *o;
1077     void *state = NULL;
1078
1079     pa_source_assert_ref(s);
1080     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1081
1082     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1083         if (o->attach)
1084             o->attach(o);
1085 }
1086
1087 /* Called from IO thread */
1088 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
1089     pa_usec_t result = (pa_usec_t) -1;
1090     pa_source_output *o;
1091     void *state = NULL;
1092
1093     pa_source_assert_ref(s);
1094
1095     if (s->thread_info.requested_latency_valid)
1096         return s->thread_info.requested_latency;
1097
1098     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1099
1100         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
1101             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
1102             result = o->thread_info.requested_source_latency;
1103
1104     if (result != (pa_usec_t) -1) {
1105         if (s->thread_info.max_latency > 0 && result > s->thread_info.max_latency)
1106             result = s->thread_info.max_latency;
1107
1108         if (s->thread_info.min_latency > 0 && result < s->thread_info.min_latency)
1109             result = s->thread_info.min_latency;
1110     }
1111
1112     s->thread_info.requested_latency = result;
1113     s->thread_info.requested_latency_valid = TRUE;
1114
1115     return result;
1116 }
1117
1118 /* Called from main thread */
1119 pa_usec_t pa_source_get_requested_latency(pa_source *s) {
1120     pa_usec_t usec;
1121
1122     pa_source_assert_ref(s);
1123     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1124
1125     if (s->state == PA_SOURCE_SUSPENDED)
1126         return 0;
1127
1128     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0);
1129
1130     return usec;
1131 }
1132
1133 /* Called from IO thread */
1134 void pa_source_set_max_rewind_within_thread(pa_source *s, size_t max_rewind) {
1135     pa_source_output *o;
1136     void *state = NULL;
1137
1138     pa_source_assert_ref(s);
1139
1140     if (max_rewind == s->thread_info.max_rewind)
1141         return;
1142
1143     s->thread_info.max_rewind = max_rewind;
1144
1145     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1146         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1147             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1148     }
1149 }
1150
1151 /* Called from main thread */
1152 void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
1153     pa_source_assert_ref(s);
1154
1155     if (PA_SOURCE_IS_LINKED(s->state))
1156         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MAX_REWIND, NULL, max_rewind, NULL) == 0);
1157     else
1158         pa_source_set_max_rewind_within_thread(s, max_rewind);
1159 }
1160
1161 /* Called from IO thread */
1162 void pa_source_invalidate_requested_latency(pa_source *s) {
1163     pa_source_output *o;
1164     void *state = NULL;
1165
1166     pa_source_assert_ref(s);
1167
1168     s->thread_info.requested_latency_valid = FALSE;
1169
1170     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1171
1172         if (s->update_requested_latency)
1173             s->update_requested_latency(s);
1174
1175         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1176             if (o->update_source_requested_latency)
1177                 o->update_source_requested_latency(o);
1178     }
1179
1180     if (s->monitor_of)
1181         pa_sink_invalidate_requested_latency(s->monitor_of);
1182 }
1183
1184 /* Called from main thread */
1185 void pa_source_set_latency_range(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1186     pa_source_assert_ref(s);
1187
1188     /* min_latency == 0:           no limit
1189      * min_latency anything else:  specified limit
1190      *
1191      * Similar for max_latency */
1192
1193     if (min_latency < ABSOLUTE_MIN_LATENCY)
1194         min_latency = ABSOLUTE_MIN_LATENCY;
1195
1196     if (max_latency <= 0 ||
1197         max_latency > ABSOLUTE_MAX_LATENCY)
1198         max_latency = ABSOLUTE_MAX_LATENCY;
1199
1200     pa_assert(min_latency <= max_latency);
1201
1202     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1203     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1204                max_latency == ABSOLUTE_MAX_LATENCY) ||
1205               (s->flags & PA_SOURCE_DYNAMIC_LATENCY));
1206
1207     if (PA_SOURCE_IS_LINKED(s->state)) {
1208         pa_usec_t r[2];
1209
1210         r[0] = min_latency;
1211         r[1] = max_latency;
1212
1213         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_LATENCY_RANGE, r, 0, NULL) == 0);
1214     } else
1215         pa_source_set_latency_range_within_thread(s, min_latency, max_latency);
1216 }
1217
1218 /* Called from main thread */
1219 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency) {
1220    pa_source_assert_ref(s);
1221    pa_assert(min_latency);
1222    pa_assert(max_latency);
1223
1224    if (PA_SOURCE_IS_LINKED(s->state)) {
1225        pa_usec_t r[2] = { 0, 0 };
1226
1227        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY_RANGE, r, 0, NULL) == 0);
1228
1229        *min_latency = r[0];
1230        *max_latency = r[1];
1231    } else {
1232        *min_latency = s->thread_info.min_latency;
1233        *max_latency = s->thread_info.max_latency;
1234    }
1235 }
1236
1237 /* Called from IO thread, and from main thread before pa_source_put() is called */
1238 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1239     void *state = NULL;
1240
1241     pa_source_assert_ref(s);
1242
1243     pa_assert(min_latency >= ABSOLUTE_MIN_LATENCY);
1244     pa_assert(max_latency <= ABSOLUTE_MAX_LATENCY);
1245     pa_assert(min_latency <= max_latency);
1246
1247     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1248     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1249                max_latency == ABSOLUTE_MAX_LATENCY) ||
1250               (s->flags & PA_SOURCE_DYNAMIC_LATENCY) ||
1251               s->monitor_of);
1252
1253     s->thread_info.min_latency = min_latency;
1254     s->thread_info.max_latency = max_latency;
1255
1256     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1257         pa_source_output *o;
1258
1259         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1260             if (o->update_source_latency_range)
1261                 o->update_source_latency_range(o);
1262     }
1263
1264     pa_source_invalidate_requested_latency(s);
1265 }
1266
1267 /* Called from main thread */
1268 size_t pa_source_get_max_rewind(pa_source *s) {
1269     size_t r;
1270     pa_source_assert_ref(s);
1271
1272     if (!PA_SOURCE_IS_LINKED(s->state))
1273         return s->thread_info.max_rewind;
1274
1275     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MAX_REWIND, &r, 0, NULL) == 0);
1276
1277     return r;
1278 }