core: make sure we fix up flags/monitor flags already in pa_sink_new() instead of...
[platform/upstream/pulseaudio.git] / src / pulsecore / source.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <pulse/utf8.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/timeval.h>
34 #include <pulse/util.h>
35
36 #include <pulsecore/source-output.h>
37 #include <pulsecore/namereg.h>
38 #include <pulsecore/core-subscribe.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/sample-util.h>
41
42 #include "source.h"
43
44 #define ABSOLUTE_MIN_LATENCY (500)
45 #define ABSOLUTE_MAX_LATENCY (10*PA_USEC_PER_SEC)
46 #define DEFAULT_FIXED_LATENCY (250*PA_USEC_PER_MSEC)
47
48 static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
49
50 static void source_free(pa_object *o);
51
52 pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) {
53     pa_assert(data);
54
55     memset(data, 0, sizeof(*data));
56     data->proplist = pa_proplist_new();
57
58     return data;
59 }
60
61 void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) {
62     pa_assert(data);
63
64     pa_xfree(data->name);
65     data->name = pa_xstrdup(name);
66 }
67
68 void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) {
69     pa_assert(data);
70
71     if ((data->sample_spec_is_set = !!spec))
72         data->sample_spec = *spec;
73 }
74
75 void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) {
76     pa_assert(data);
77
78     if ((data->channel_map_is_set = !!map))
79         data->channel_map = *map;
80 }
81
82 void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) {
83     pa_assert(data);
84
85     if ((data->volume_is_set = !!volume))
86         data->volume = *volume;
87 }
88
89 void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) {
90     pa_assert(data);
91
92     data->muted_is_set = TRUE;
93     data->muted = !!mute;
94 }
95
96 void pa_source_new_data_done(pa_source_new_data *data) {
97     pa_assert(data);
98
99     pa_xfree(data->name);
100     pa_proplist_free(data->proplist);
101 }
102
103 /* Called from main context */
104 static void reset_callbacks(pa_source *s) {
105     pa_assert(s);
106
107     s->set_state = NULL;
108     s->get_volume = NULL;
109     s->set_volume = NULL;
110     s->get_mute = NULL;
111     s->set_mute = NULL;
112     s->update_requested_latency = NULL;
113 }
114
115 /* Called from main context */
116 pa_source* pa_source_new(
117         pa_core *core,
118         pa_source_new_data *data,
119         pa_source_flags_t flags) {
120
121     pa_source *s;
122     const char *name;
123     char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
124     char *pt;
125
126     pa_assert(core);
127     pa_assert(data);
128     pa_assert(data->name);
129
130     s = pa_msgobject_new(pa_source);
131
132     if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) {
133         pa_xfree(s);
134         return NULL;
135     }
136
137     pa_source_new_data_set_name(data, name);
138
139     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) {
140         pa_xfree(s);
141         pa_namereg_unregister(core, name);
142         return NULL;
143     }
144
145     pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver));
146     pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]);
147
148     pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec));
149
150     if (!data->channel_map_is_set)
151         pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT));
152
153     pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map));
154     pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels);
155
156     if (!data->volume_is_set)
157         pa_cvolume_reset(&data->volume, data->sample_spec.channels);
158
159     pa_return_null_if_fail(pa_cvolume_valid(&data->volume));
160     pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels);
161
162     if (!data->muted_is_set)
163         data->muted = FALSE;
164
165     if (data->card)
166         pa_proplist_update(data->proplist, PA_UPDATE_MERGE, data->card->proplist);
167
168     pa_device_init_description(data->proplist);
169     pa_device_init_icon(data->proplist, FALSE);
170
171     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) {
172         pa_xfree(s);
173         pa_namereg_unregister(core, name);
174         return NULL;
175     }
176
177     if (!(flags & PA_SOURCE_HW_VOLUME_CTRL))
178         flags |= PA_SOURCE_DECIBEL_VOLUME;
179
180     s->parent.parent.free = source_free;
181     s->parent.process_msg = pa_source_process_msg;
182
183     s->core = core;
184     s->state = PA_SOURCE_INIT;
185     s->flags = flags;
186     s->name = pa_xstrdup(name);
187     s->proplist = pa_proplist_copy(data->proplist);
188     s->driver = pa_xstrdup(pa_path_get_filename(data->driver));
189     s->module = data->module;
190     s->card = data->card;
191
192     s->sample_spec = data->sample_spec;
193     s->channel_map = data->channel_map;
194
195     s->outputs = pa_idxset_new(NULL, NULL);
196     s->n_corked = 0;
197     s->monitor_of = NULL;
198
199     s->virtual_volume = data->volume;
200     pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
201     s->base_volume = PA_VOLUME_NORM;
202     s->n_volume_steps = PA_VOLUME_NORM+1;
203     s->muted = data->muted;
204     s->refresh_volume = s->refresh_muted = FALSE;
205
206     s->fixed_latency = flags & PA_SOURCE_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
207
208     reset_callbacks(s);
209     s->userdata = NULL;
210
211     s->asyncmsgq = NULL;
212     s->rtpoll = NULL;
213
214     pa_silence_memchunk_get(
215             &core->silence_cache,
216             core->mempool,
217             &s->silence,
218             &s->sample_spec,
219             0);
220
221     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
222     s->thread_info.soft_volume = s->soft_volume;
223     s->thread_info.soft_muted = s->muted;
224     s->thread_info.state = s->state;
225     s->thread_info.max_rewind = 0;
226     s->thread_info.requested_latency_valid = FALSE;
227     s->thread_info.requested_latency = 0;
228     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
229     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
230
231     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
232
233     if (s->card)
234         pa_assert_se(pa_idxset_put(s->card->sources, s, NULL) >= 0);
235
236     pt = pa_proplist_to_string_sep(s->proplist, "\n    ");
237     pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s\n    %s",
238                 s->index,
239                 s->name,
240                 pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec),
241                 pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map),
242                 pt);
243     pa_xfree(pt);
244
245     return s;
246 }
247
248 /* Called from main context */
249 static int source_set_state(pa_source *s, pa_source_state_t state) {
250     int ret;
251     pa_bool_t suspend_change;
252     pa_source_state_t original_state;
253
254     pa_assert(s);
255
256     if (s->state == state)
257         return 0;
258
259     original_state = s->state;
260
261     suspend_change =
262         (original_state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(state)) ||
263         (PA_SOURCE_IS_OPENED(original_state) && state == PA_SOURCE_SUSPENDED);
264
265     if (s->set_state)
266         if ((ret = s->set_state(s, state)) < 0)
267             return ret;
268
269     if (s->asyncmsgq)
270         if ((ret = pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL)) < 0) {
271
272             if (s->set_state)
273                 s->set_state(s, original_state);
274
275             return ret;
276         }
277
278     s->state = state;
279
280     if (state != PA_SOURCE_UNLINKED) { /* if we enter UNLINKED state pa_source_unlink() will fire the apropriate events */
281         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_STATE_CHANGED], s);
282         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
283     }
284
285     if (suspend_change) {
286         pa_source_output *o;
287         uint32_t idx;
288
289         /* We're suspending or resuming, tell everyone about it */
290
291         for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx)))
292             if (s->state == PA_SOURCE_SUSPENDED &&
293                 (o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND))
294                 pa_source_output_kill(o);
295             else if (o->suspend)
296                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
297     }
298
299
300     return 0;
301 }
302
303 /* Called from main context */
304 void pa_source_put(pa_source *s) {
305     pa_source_assert_ref(s);
306
307     pa_assert(s->state == PA_SOURCE_INIT);
308
309     /* The following fields must be initialized properly when calling _put() */
310     pa_assert(s->asyncmsgq);
311     pa_assert(s->rtpoll);
312     pa_assert(s->thread_info.min_latency <= s->thread_info.max_latency);
313
314     s->thread_info.soft_volume = s->soft_volume;
315     s->thread_info.soft_muted = s->muted;
316
317     pa_assert((s->flags & PA_SOURCE_HW_VOLUME_CTRL) || (s->base_volume == PA_VOLUME_NORM && s->flags & PA_SOURCE_DECIBEL_VOLUME));
318     pa_assert(!(s->flags & PA_SOURCE_DECIBEL_VOLUME) || s->n_volume_steps == PA_VOLUME_NORM+1);
319     pa_assert(!(s->flags & PA_SOURCE_DYNAMIC_LATENCY) == (s->fixed_latency != 0));
320
321     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
322
323     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
324     pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s);
325 }
326
327 /* Called from main context */
328 void pa_source_unlink(pa_source *s) {
329     pa_bool_t linked;
330     pa_source_output *o, *j = NULL;
331
332     pa_assert(s);
333
334     /* See pa_sink_unlink() for a couple of comments how this function
335      * works. */
336
337     linked = PA_SOURCE_IS_LINKED(s->state);
338
339     if (linked)
340         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s);
341
342     if (s->state != PA_SOURCE_UNLINKED)
343         pa_namereg_unregister(s->core, s->name);
344     pa_idxset_remove_by_data(s->core->sources, s, NULL);
345
346     if (s->card)
347         pa_idxset_remove_by_data(s->card->sources, s, NULL);
348
349     while ((o = pa_idxset_first(s->outputs, NULL))) {
350         pa_assert(o != j);
351         pa_source_output_kill(o);
352         j = o;
353     }
354
355     if (linked)
356         source_set_state(s, PA_SOURCE_UNLINKED);
357     else
358         s->state = PA_SOURCE_UNLINKED;
359
360     reset_callbacks(s);
361
362     if (linked) {
363         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
364         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s);
365     }
366 }
367
368 /* Called from main context */
369 static void source_free(pa_object *o) {
370     pa_source_output *so;
371     pa_source *s = PA_SOURCE(o);
372
373     pa_assert(s);
374     pa_assert(pa_source_refcnt(s) == 0);
375
376     if (PA_SOURCE_IS_LINKED(s->state))
377         pa_source_unlink(s);
378
379     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
380
381     pa_idxset_free(s->outputs, NULL, NULL);
382
383     while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
384         pa_source_output_unref(so);
385
386     pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
387
388     if (s->silence.memblock)
389         pa_memblock_unref(s->silence.memblock);
390
391     pa_xfree(s->name);
392     pa_xfree(s->driver);
393
394     if (s->proplist)
395         pa_proplist_free(s->proplist);
396
397     pa_xfree(s);
398 }
399
400 /* Called from main context */
401 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
402     pa_source_assert_ref(s);
403
404     s->asyncmsgq = q;
405 }
406
407 /* Called from main context */
408 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
409     pa_source_assert_ref(s);
410
411     s->rtpoll = p;
412 }
413
414 /* Called from main context */
415 int pa_source_update_status(pa_source*s) {
416     pa_source_assert_ref(s);
417     pa_assert(PA_SOURCE_IS_LINKED(s->state));
418
419     if (s->state == PA_SOURCE_SUSPENDED)
420         return 0;
421
422     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
423 }
424
425 /* Called from main context */
426 int pa_source_suspend(pa_source *s, pa_bool_t suspend) {
427     pa_source_assert_ref(s);
428     pa_assert(PA_SOURCE_IS_LINKED(s->state));
429
430     if (s->monitor_of)
431         return -PA_ERR_NOTSUPPORTED;
432
433     if (suspend)
434         return source_set_state(s, PA_SOURCE_SUSPENDED);
435     else
436         return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
437 }
438
439 /* Called from main context */
440 int pa_source_sync_suspend(pa_source *s) {
441     pa_sink_state_t state;
442
443     pa_source_assert_ref(s);
444     pa_assert(PA_SOURCE_IS_LINKED(s->state));
445     pa_assert(s->monitor_of);
446
447     state = pa_sink_get_state(s->monitor_of);
448
449     if (state == PA_SINK_SUSPENDED)
450         return source_set_state(s, PA_SOURCE_SUSPENDED);
451
452     pa_assert(PA_SINK_IS_OPENED(state));
453
454     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
455 }
456
457 /* Called from main context */
458 pa_queue *pa_source_move_all_start(pa_source *s) {
459     pa_queue *q;
460     pa_source_output *o, *n;
461     uint32_t idx;
462
463     pa_source_assert_ref(s);
464     pa_assert(PA_SOURCE_IS_LINKED(s->state));
465
466     q = pa_queue_new();
467
468     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = n) {
469         n = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx));
470
471         pa_source_output_ref(o);
472
473         if (pa_source_output_start_move(o) >= 0)
474             pa_queue_push(q, o);
475         else
476             pa_source_output_unref(o);
477     }
478
479     return q;
480 }
481
482 /* Called from main context */
483 void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
484     pa_source_output *o;
485
486     pa_source_assert_ref(s);
487     pa_assert(PA_SOURCE_IS_LINKED(s->state));
488     pa_assert(q);
489
490     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
491         if (pa_source_output_finish_move(o, s, save) < 0)
492             pa_source_output_kill(o);
493
494         pa_source_output_unref(o);
495     }
496
497     pa_queue_free(q, NULL, NULL);
498 }
499
500 /* Called from main context */
501 void pa_source_move_all_fail(pa_queue *q) {
502     pa_source_output *o;
503     pa_assert(q);
504
505     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
506         if (pa_hook_fire(&o->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], o) == PA_HOOK_OK) {
507             pa_source_output_kill(o);
508             pa_source_output_unref(o);
509         }
510     }
511
512     pa_queue_free(q, NULL, NULL);
513 }
514
515 /* Called from IO thread context */
516 void pa_source_process_rewind(pa_source *s, size_t nbytes) {
517     pa_source_output *o;
518     void *state = NULL;
519
520     pa_source_assert_ref(s);
521     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
522
523     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
524         return;
525
526     if (nbytes <= 0)
527         return;
528
529     pa_log_debug("Processing rewind...");
530
531     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
532         pa_source_output_assert_ref(o);
533         pa_source_output_process_rewind(o, nbytes);
534     }
535 }
536
537 /* Called from IO thread context */
538 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
539     pa_source_output *o;
540     void *state = NULL;
541
542     pa_source_assert_ref(s);
543     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
544     pa_assert(chunk);
545
546     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
547         return;
548
549     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
550         pa_memchunk vchunk = *chunk;
551
552         pa_memblock_ref(vchunk.memblock);
553         pa_memchunk_make_writable(&vchunk, 0);
554
555         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
556             pa_silence_memchunk(&vchunk, &s->sample_spec);
557         else
558             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
559
560         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
561             pa_source_output_assert_ref(o);
562
563             if (!o->thread_info.direct_on_input)
564                 pa_source_output_push(o, &vchunk);
565         }
566
567         pa_memblock_unref(vchunk.memblock);
568     } else {
569
570         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
571             pa_source_output_assert_ref(o);
572
573             if (!o->thread_info.direct_on_input)
574                 pa_source_output_push(o, chunk);
575         }
576     }
577 }
578
579 /* Called from IO thread context */
580 void pa_source_post_direct(pa_source*s, pa_source_output *o, const pa_memchunk *chunk) {
581     pa_source_assert_ref(s);
582     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
583     pa_source_output_assert_ref(o);
584     pa_assert(o->thread_info.direct_on_input);
585     pa_assert(chunk);
586
587     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
588         return;
589
590     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
591         pa_memchunk vchunk = *chunk;
592
593         pa_memblock_ref(vchunk.memblock);
594         pa_memchunk_make_writable(&vchunk, 0);
595
596         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
597             pa_silence_memchunk(&vchunk, &s->sample_spec);
598         else
599             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
600
601         pa_source_output_push(o, &vchunk);
602
603         pa_memblock_unref(vchunk.memblock);
604     } else
605         pa_source_output_push(o, chunk);
606 }
607
608 /* Called from main thread */
609 pa_usec_t pa_source_get_latency(pa_source *s) {
610     pa_usec_t usec;
611
612     pa_source_assert_ref(s);
613     pa_assert(PA_SOURCE_IS_LINKED(s->state));
614
615     if (s->state == PA_SOURCE_SUSPENDED)
616         return 0;
617
618     if (!(s->flags & PA_SOURCE_LATENCY))
619         return 0;
620
621     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) == 0);
622
623     return usec;
624 }
625
626 /* Called from IO thread */
627 pa_usec_t pa_source_get_latency_within_thread(pa_source *s) {
628     pa_usec_t usec = 0;
629     pa_msgobject *o;
630
631     pa_source_assert_ref(s);
632     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
633
634     /* The returned value is supposed to be in the time domain of the sound card! */
635
636     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
637         return 0;
638
639     if (!(s->flags & PA_SOURCE_LATENCY))
640         return 0;
641
642     o = PA_MSGOBJECT(s);
643
644     /* We probably should make this a proper vtable callback instead of going through process_msg() */
645
646     if (o->process_msg(o, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
647         return -1;
648
649     return usec;
650 }
651
652 /* Called from main thread */
653 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
654     pa_cvolume old_virtual_volume;
655     pa_bool_t virtual_volume_changed;
656
657     pa_source_assert_ref(s);
658     pa_assert(PA_SOURCE_IS_LINKED(s->state));
659     pa_assert(volume);
660     pa_assert(pa_cvolume_valid(volume));
661     pa_assert(pa_cvolume_compatible(volume, &s->sample_spec));
662
663     old_virtual_volume = s->virtual_volume;
664     s->virtual_volume = *volume;
665     virtual_volume_changed = !pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume);
666
667     if (s->set_volume) {
668         pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
669         s->set_volume(s);
670     } else
671         s->soft_volume = s->virtual_volume;
672
673     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
674
675     if (virtual_volume_changed)
676         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
677 }
678
679 /* Called from main thread. Only to be called by source implementor */
680 void pa_source_set_soft_volume(pa_source *s, const pa_cvolume *volume) {
681     pa_source_assert_ref(s);
682     pa_assert(volume);
683
684     if (PA_SOURCE_IS_LINKED(s->state))
685         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
686     else
687         s->thread_info.soft_volume = *volume;
688 }
689
690 /* Called from main thread */
691 const pa_cvolume *pa_source_get_volume(pa_source *s, pa_bool_t force_refresh) {
692     pa_source_assert_ref(s);
693     pa_assert(PA_SOURCE_IS_LINKED(s->state));
694
695     if (s->refresh_volume || force_refresh) {
696         pa_cvolume old_virtual_volume = s->virtual_volume;
697
698         if (s->get_volume)
699             s->get_volume(s);
700
701         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, NULL, 0, NULL) == 0);
702
703         if (!pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume))
704             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
705     }
706
707     return &s->virtual_volume;
708 }
709
710 /* Called from main thread */
711 void pa_source_volume_changed(pa_source *s, const pa_cvolume *new_volume) {
712     pa_source_assert_ref(s);
713
714     /* The source implementor may call this if the volume changed to make sure everyone is notified */
715
716     if (pa_cvolume_equal(&s->virtual_volume, new_volume))
717         return;
718
719     s->virtual_volume = *new_volume;
720     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
721 }
722
723 /* Called from main thread */
724 void pa_source_set_mute(pa_source *s, pa_bool_t mute) {
725     pa_bool_t old_muted;
726
727     pa_source_assert_ref(s);
728     pa_assert(PA_SOURCE_IS_LINKED(s->state));
729
730     old_muted = s->muted;
731     s->muted = mute;
732
733     if (s->set_mute)
734         s->set_mute(s);
735
736     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
737
738     if (old_muted != s->muted)
739         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
740 }
741
742 /* Called from main thread */
743 pa_bool_t pa_source_get_mute(pa_source *s, pa_bool_t force_refresh) {
744     pa_source_assert_ref(s);
745     pa_assert(PA_SOURCE_IS_LINKED(s->state));
746
747     if (s->refresh_muted || force_refresh) {
748         pa_bool_t old_muted = s->muted;
749
750         if (s->get_mute)
751             s->get_mute(s);
752
753         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, NULL, 0, NULL) == 0);
754
755         if (old_muted != s->muted)
756             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
757     }
758
759     return s->muted;
760 }
761
762 /* Called from main thread */
763 void pa_source_mute_changed(pa_source *s, pa_bool_t new_muted) {
764     pa_source_assert_ref(s);
765
766     /* The source implementor may call this if the mute state changed to make sure everyone is notified */
767
768     if (s->muted == new_muted)
769         return;
770
771     s->muted = new_muted;
772     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
773 }
774
775 /* Called from main thread */
776 pa_bool_t pa_source_update_proplist(pa_source *s, pa_update_mode_t mode, pa_proplist *p) {
777     pa_source_assert_ref(s);
778
779     if (p)
780         pa_proplist_update(s->proplist, mode, p);
781
782     if (PA_SOURCE_IS_LINKED(s->state)) {
783         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
784         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
785     }
786
787     return TRUE;
788 }
789
790 /* Called from main thread */
791 void pa_source_set_description(pa_source *s, const char *description) {
792     const char *old;
793     pa_source_assert_ref(s);
794
795     if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION))
796         return;
797
798     old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
799
800     if (old && description && !strcmp(old, description))
801         return;
802
803     if (description)
804         pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description);
805     else
806         pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
807
808     if (PA_SOURCE_IS_LINKED(s->state)) {
809         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
810         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
811     }
812 }
813
814 /* Called from main thread */
815 unsigned pa_source_linked_by(pa_source *s) {
816     pa_source_assert_ref(s);
817     pa_assert(PA_SOURCE_IS_LINKED(s->state));
818
819     return pa_idxset_size(s->outputs);
820 }
821
822 /* Called from main thread */
823 unsigned pa_source_used_by(pa_source *s) {
824     unsigned ret;
825
826     pa_source_assert_ref(s);
827     pa_assert(PA_SOURCE_IS_LINKED(s->state));
828
829     ret = pa_idxset_size(s->outputs);
830     pa_assert(ret >= s->n_corked);
831
832     return ret - s->n_corked;
833 }
834
835 /* Called from main thread */
836 unsigned pa_source_check_suspend(pa_source *s) {
837     unsigned ret;
838     pa_source_output *o;
839     uint32_t idx;
840
841     pa_source_assert_ref(s);
842
843     if (!PA_SOURCE_IS_LINKED(s->state))
844         return 0;
845
846     ret = 0;
847
848     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx))) {
849         pa_source_output_state_t st;
850
851         st = pa_source_output_get_state(o);
852         pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(st));
853
854         if (st == PA_SOURCE_OUTPUT_CORKED)
855             continue;
856
857         if (o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND)
858             continue;
859
860         ret ++;
861     }
862
863     return ret;
864 }
865
866 /* Called from IO thread, except when it is not */
867 int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
868     pa_source *s = PA_SOURCE(object);
869     pa_source_assert_ref(s);
870
871     switch ((pa_source_message_t) code) {
872
873         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
874             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
875
876             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
877
878             if (o->direct_on_input) {
879                 o->thread_info.direct_on_input = o->direct_on_input;
880                 pa_hashmap_put(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index), o);
881             }
882
883             pa_assert(!o->thread_info.attached);
884             o->thread_info.attached = TRUE;
885
886             if (o->attach)
887                 o->attach(o);
888
889             pa_source_output_set_state_within_thread(o, o->state);
890
891             if (o->thread_info.requested_source_latency != (pa_usec_t) -1)
892                 pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
893
894             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
895
896             /* We don't just invalidate the requested latency here,
897              * because if we are in a move we might need to fix up the
898              * requested latency. */
899             pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
900
901             return 0;
902         }
903
904         case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
905             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
906
907             pa_source_output_set_state_within_thread(o, o->state);
908
909             if (o->detach)
910                 o->detach(o);
911
912             pa_assert(o->thread_info.attached);
913             o->thread_info.attached = FALSE;
914
915             if (o->thread_info.direct_on_input) {
916                 pa_hashmap_remove(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index));
917                 o->thread_info.direct_on_input = NULL;
918             }
919
920             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
921                 pa_source_output_unref(o);
922
923             pa_source_invalidate_requested_latency(s);
924
925             return 0;
926         }
927
928         case PA_SOURCE_MESSAGE_SET_VOLUME:
929             s->thread_info.soft_volume = s->soft_volume;
930             return 0;
931
932         case PA_SOURCE_MESSAGE_GET_VOLUME:
933             return 0;
934
935         case PA_SOURCE_MESSAGE_SET_MUTE:
936             s->thread_info.soft_muted = s->muted;
937             return 0;
938
939         case PA_SOURCE_MESSAGE_GET_MUTE:
940             return 0;
941
942         case PA_SOURCE_MESSAGE_SET_STATE: {
943
944             pa_bool_t suspend_change =
945                 (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
946                 (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
947
948             s->thread_info.state = PA_PTR_TO_UINT(userdata);
949
950             if (suspend_change) {
951                 pa_source_output *o;
952                 void *state = NULL;
953
954                 while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
955                     if (o->suspend_within_thread)
956                         o->suspend_within_thread(o, s->thread_info.state == PA_SOURCE_SUSPENDED);
957             }
958
959
960             return 0;
961         }
962
963         case PA_SOURCE_MESSAGE_DETACH:
964
965             /* Detach all streams */
966             pa_source_detach_within_thread(s);
967             return 0;
968
969         case PA_SOURCE_MESSAGE_ATTACH:
970
971             /* Reattach all streams */
972             pa_source_attach_within_thread(s);
973             return 0;
974
975         case PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY: {
976
977             pa_usec_t *usec = userdata;
978             *usec = pa_source_get_requested_latency_within_thread(s);
979
980             if (*usec == (pa_usec_t) -1)
981                 *usec = s->thread_info.max_latency;
982
983             return 0;
984         }
985
986         case PA_SOURCE_MESSAGE_SET_LATENCY_RANGE: {
987             pa_usec_t *r = userdata;
988
989             pa_source_set_latency_range_within_thread(s, r[0], r[1]);
990
991             return 0;
992         }
993
994         case PA_SOURCE_MESSAGE_GET_LATENCY_RANGE: {
995             pa_usec_t *r = userdata;
996
997             r[0] = s->thread_info.min_latency;
998             r[1] = s->thread_info.max_latency;
999
1000             return 0;
1001         }
1002
1003         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
1004
1005             *((size_t*) userdata) = s->thread_info.max_rewind;
1006             return 0;
1007
1008         case PA_SOURCE_MESSAGE_SET_MAX_REWIND:
1009
1010             pa_source_set_max_rewind_within_thread(s, (size_t) offset);
1011             return 0;
1012
1013         case PA_SOURCE_MESSAGE_GET_LATENCY:
1014
1015             if (s->monitor_of) {
1016                 *((pa_usec_t*) userdata) = 0;
1017                 return 0;
1018             }
1019
1020             /* Implementors need to overwrite this implementation! */
1021             return -1;
1022
1023         case PA_SOURCE_MESSAGE_MAX:
1024             ;
1025     }
1026
1027     return -1;
1028 }
1029
1030 /* Called from main thread */
1031 int pa_source_suspend_all(pa_core *c, pa_bool_t suspend) {
1032     uint32_t idx;
1033     pa_source *source;
1034     int ret = 0;
1035
1036     pa_core_assert_ref(c);
1037
1038     for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) {
1039         int r;
1040
1041         if (source->monitor_of)
1042             continue;
1043
1044         if ((r = pa_source_suspend(source, suspend)) < 0)
1045             ret = r;
1046     }
1047
1048     return ret;
1049 }
1050
1051 /* Called from main thread */
1052 void pa_source_detach(pa_source *s) {
1053     pa_source_assert_ref(s);
1054     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1055
1056     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL) == 0);
1057 }
1058
1059 /* Called from main thread */
1060 void pa_source_attach(pa_source *s) {
1061     pa_source_assert_ref(s);
1062     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1063
1064     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL) == 0);
1065 }
1066
1067 /* Called from IO thread */
1068 void pa_source_detach_within_thread(pa_source *s) {
1069     pa_source_output *o;
1070     void *state = NULL;
1071
1072     pa_source_assert_ref(s);
1073     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1074
1075     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1076         if (o->detach)
1077             o->detach(o);
1078 }
1079
1080 /* Called from IO thread */
1081 void pa_source_attach_within_thread(pa_source *s) {
1082     pa_source_output *o;
1083     void *state = NULL;
1084
1085     pa_source_assert_ref(s);
1086     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1087
1088     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1089         if (o->attach)
1090             o->attach(o);
1091 }
1092
1093 /* Called from IO thread */
1094 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
1095     pa_usec_t result = (pa_usec_t) -1;
1096     pa_source_output *o;
1097     void *state = NULL;
1098
1099     pa_source_assert_ref(s);
1100
1101     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1102         return PA_CLAMP(s->fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
1103
1104     if (s->thread_info.requested_latency_valid)
1105         return s->thread_info.requested_latency;
1106
1107     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1108
1109         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
1110             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
1111             result = o->thread_info.requested_source_latency;
1112
1113     if (result != (pa_usec_t) -1)
1114         result = PA_CLAMP(result, s->thread_info.min_latency, s->thread_info.max_latency);
1115
1116     s->thread_info.requested_latency = result;
1117     s->thread_info.requested_latency_valid = TRUE;
1118
1119     return result;
1120 }
1121
1122 /* Called from main thread */
1123 pa_usec_t pa_source_get_requested_latency(pa_source *s) {
1124     pa_usec_t usec = 0;
1125
1126     pa_source_assert_ref(s);
1127     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1128
1129     if (s->state == PA_SOURCE_SUSPENDED)
1130         return 0;
1131
1132     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0);
1133
1134     return usec;
1135 }
1136
1137 /* Called from IO thread */
1138 void pa_source_set_max_rewind_within_thread(pa_source *s, size_t max_rewind) {
1139     pa_source_output *o;
1140     void *state = NULL;
1141
1142     pa_source_assert_ref(s);
1143
1144     if (max_rewind == s->thread_info.max_rewind)
1145         return;
1146
1147     s->thread_info.max_rewind = max_rewind;
1148
1149     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1150         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1151             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1152     }
1153 }
1154
1155 /* Called from main thread */
1156 void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
1157     pa_source_assert_ref(s);
1158
1159     if (PA_SOURCE_IS_LINKED(s->state))
1160         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MAX_REWIND, NULL, max_rewind, NULL) == 0);
1161     else
1162         pa_source_set_max_rewind_within_thread(s, max_rewind);
1163 }
1164
1165 /* Called from IO thread */
1166 void pa_source_invalidate_requested_latency(pa_source *s) {
1167     pa_source_output *o;
1168     void *state = NULL;
1169
1170     pa_source_assert_ref(s);
1171
1172     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1173         return;
1174
1175     s->thread_info.requested_latency_valid = FALSE;
1176
1177     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1178
1179         if (s->update_requested_latency)
1180             s->update_requested_latency(s);
1181
1182         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1183             if (o->update_source_requested_latency)
1184                 o->update_source_requested_latency(o);
1185     }
1186
1187     if (s->monitor_of)
1188         pa_sink_invalidate_requested_latency(s->monitor_of);
1189 }
1190
1191 /* Called from main thread */
1192 void pa_source_set_latency_range(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1193     pa_source_assert_ref(s);
1194
1195     /* min_latency == 0:           no limit
1196      * min_latency anything else:  specified limit
1197      *
1198      * Similar for max_latency */
1199
1200     if (min_latency < ABSOLUTE_MIN_LATENCY)
1201         min_latency = ABSOLUTE_MIN_LATENCY;
1202
1203     if (max_latency <= 0 ||
1204         max_latency > ABSOLUTE_MAX_LATENCY)
1205         max_latency = ABSOLUTE_MAX_LATENCY;
1206
1207     pa_assert(min_latency <= max_latency);
1208
1209     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1210     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1211                max_latency == ABSOLUTE_MAX_LATENCY) ||
1212               (s->flags & PA_SOURCE_DYNAMIC_LATENCY));
1213
1214     if (PA_SOURCE_IS_LINKED(s->state)) {
1215         pa_usec_t r[2];
1216
1217         r[0] = min_latency;
1218         r[1] = max_latency;
1219
1220         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_LATENCY_RANGE, r, 0, NULL) == 0);
1221     } else
1222         pa_source_set_latency_range_within_thread(s, min_latency, max_latency);
1223 }
1224
1225 /* Called from main thread */
1226 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency) {
1227    pa_source_assert_ref(s);
1228    pa_assert(min_latency);
1229    pa_assert(max_latency);
1230
1231    if (PA_SOURCE_IS_LINKED(s->state)) {
1232        pa_usec_t r[2] = { 0, 0 };
1233
1234        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY_RANGE, r, 0, NULL) == 0);
1235
1236        *min_latency = r[0];
1237        *max_latency = r[1];
1238    } else {
1239        *min_latency = s->thread_info.min_latency;
1240        *max_latency = s->thread_info.max_latency;
1241    }
1242 }
1243
1244 /* Called from IO thread, and from main thread before pa_source_put() is called */
1245 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1246     void *state = NULL;
1247
1248     pa_source_assert_ref(s);
1249
1250     pa_assert(min_latency >= ABSOLUTE_MIN_LATENCY);
1251     pa_assert(max_latency <= ABSOLUTE_MAX_LATENCY);
1252     pa_assert(min_latency <= max_latency);
1253
1254     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1255     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1256                max_latency == ABSOLUTE_MAX_LATENCY) ||
1257               (s->flags & PA_SOURCE_DYNAMIC_LATENCY) ||
1258               s->monitor_of);
1259
1260     s->thread_info.min_latency = min_latency;
1261     s->thread_info.max_latency = max_latency;
1262
1263     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1264         pa_source_output *o;
1265
1266         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1267             if (o->update_source_latency_range)
1268                 o->update_source_latency_range(o);
1269     }
1270
1271     pa_source_invalidate_requested_latency(s);
1272 }
1273
1274 /* Called from main thread, before the source is put */
1275 void pa_source_set_fixed_latency(pa_source *s, pa_usec_t latency) {
1276     pa_source_assert_ref(s);
1277
1278     pa_assert(pa_source_get_state(s) == PA_SOURCE_INIT);
1279
1280     if (latency < ABSOLUTE_MIN_LATENCY)
1281         latency = ABSOLUTE_MIN_LATENCY;
1282
1283     if (latency > ABSOLUTE_MAX_LATENCY)
1284         latency = ABSOLUTE_MAX_LATENCY;
1285
1286     s->fixed_latency = latency;
1287 }
1288
1289 /* Called from main thread */
1290 size_t pa_source_get_max_rewind(pa_source *s) {
1291     size_t r;
1292     pa_source_assert_ref(s);
1293
1294     if (!PA_SOURCE_IS_LINKED(s->state))
1295         return s->thread_info.max_rewind;
1296
1297     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MAX_REWIND, &r, 0, NULL) == 0);
1298
1299     return r;
1300 }