get rid of 'default' min/max latencies, simplify things by just having absolute bound...
[platform/upstream/pulseaudio.git] / src / pulsecore / source.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <pulse/utf8.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/timeval.h>
34 #include <pulse/util.h>
35
36 #include <pulsecore/source-output.h>
37 #include <pulsecore/namereg.h>
38 #include <pulsecore/core-subscribe.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/sample-util.h>
41
42 #include "source.h"
43
44 #define ABSOLUTE_MIN_LATENCY (500)
45 #define ABSOLUTE_MAX_LATENCY (10*PA_USEC_PER_SEC)
46
47 static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
48
49 static void source_free(pa_object *o);
50
51 pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) {
52     pa_assert(data);
53
54     memset(data, 0, sizeof(*data));
55     data->proplist = pa_proplist_new();
56
57     return data;
58 }
59
60 void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) {
61     pa_assert(data);
62
63     pa_xfree(data->name);
64     data->name = pa_xstrdup(name);
65 }
66
67 void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) {
68     pa_assert(data);
69
70     if ((data->sample_spec_is_set = !!spec))
71         data->sample_spec = *spec;
72 }
73
74 void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) {
75     pa_assert(data);
76
77     if ((data->channel_map_is_set = !!map))
78         data->channel_map = *map;
79 }
80
81 void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) {
82     pa_assert(data);
83
84     if ((data->volume_is_set = !!volume))
85         data->volume = *volume;
86 }
87
88 void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) {
89     pa_assert(data);
90
91     data->muted_is_set = TRUE;
92     data->muted = !!mute;
93 }
94
95 void pa_source_new_data_done(pa_source_new_data *data) {
96     pa_assert(data);
97
98     pa_xfree(data->name);
99     pa_proplist_free(data->proplist);
100 }
101
102 /* Called from main context */
103 static void reset_callbacks(pa_source *s) {
104     pa_assert(s);
105
106     s->set_state = NULL;
107     s->get_volume = NULL;
108     s->set_volume = NULL;
109     s->get_mute = NULL;
110     s->set_mute = NULL;
111     s->update_requested_latency = NULL;
112 }
113
114 /* Called from main context */
115 pa_source* pa_source_new(
116         pa_core *core,
117         pa_source_new_data *data,
118         pa_source_flags_t flags) {
119
120     pa_source *s;
121     const char *name;
122     char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
123     char *pt;
124
125     pa_assert(core);
126     pa_assert(data);
127     pa_assert(data->name);
128
129     s = pa_msgobject_new(pa_source);
130
131     if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) {
132         pa_xfree(s);
133         return NULL;
134     }
135
136     pa_source_new_data_set_name(data, name);
137
138     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) {
139         pa_xfree(s);
140         pa_namereg_unregister(core, name);
141         return NULL;
142     }
143
144     pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver));
145     pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]);
146
147     pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec));
148
149     if (!data->channel_map_is_set)
150         pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT));
151
152     pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map));
153     pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels);
154
155     if (!data->volume_is_set)
156         pa_cvolume_reset(&data->volume, data->sample_spec.channels);
157
158     pa_return_null_if_fail(pa_cvolume_valid(&data->volume));
159     pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels);
160
161     if (!data->muted_is_set)
162         data->muted = FALSE;
163
164     if (data->card)
165         pa_proplist_update(data->proplist, PA_UPDATE_MERGE, data->card->proplist);
166
167     pa_device_init_description(data->proplist);
168     pa_device_init_icon(data->proplist, FALSE);
169
170     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) {
171         pa_xfree(s);
172         pa_namereg_unregister(core, name);
173         return NULL;
174     }
175
176     s->parent.parent.free = source_free;
177     s->parent.process_msg = pa_source_process_msg;
178
179     s->core = core;
180     s->state = PA_SOURCE_INIT;
181     s->flags = flags;
182     s->name = pa_xstrdup(name);
183     s->proplist = pa_proplist_copy(data->proplist);
184     s->driver = pa_xstrdup(pa_path_get_filename(data->driver));
185     s->module = data->module;
186     s->card = data->card;
187
188     s->sample_spec = data->sample_spec;
189     s->channel_map = data->channel_map;
190
191     s->outputs = pa_idxset_new(NULL, NULL);
192     s->n_corked = 0;
193     s->monitor_of = NULL;
194
195     s->virtual_volume = data->volume;
196     pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
197     s->base_volume = PA_VOLUME_NORM;
198     s->n_volume_steps = PA_VOLUME_NORM+1;
199     s->muted = data->muted;
200     s->refresh_volume = s->refresh_muted = FALSE;
201
202     reset_callbacks(s);
203     s->userdata = NULL;
204
205     s->asyncmsgq = NULL;
206     s->rtpoll = NULL;
207
208     pa_silence_memchunk_get(
209             &core->silence_cache,
210             core->mempool,
211             &s->silence,
212             &s->sample_spec,
213             0);
214
215     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
216     s->thread_info.soft_volume = s->soft_volume;
217     s->thread_info.soft_muted = s->muted;
218     s->thread_info.state = s->state;
219     s->thread_info.max_rewind = 0;
220     s->thread_info.requested_latency_valid = FALSE;
221     s->thread_info.requested_latency = 0;
222     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
223     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
224
225     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
226
227     if (s->card)
228         pa_assert_se(pa_idxset_put(s->card->sources, s, NULL) >= 0);
229
230     pt = pa_proplist_to_string_sep(s->proplist, "\n    ");
231     pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s\n    %s",
232                 s->index,
233                 s->name,
234                 pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec),
235                 pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map),
236                 pt);
237     pa_xfree(pt);
238
239     return s;
240 }
241
242 /* Called from main context */
243 static int source_set_state(pa_source *s, pa_source_state_t state) {
244     int ret;
245     pa_bool_t suspend_change;
246     pa_source_state_t original_state;
247
248     pa_assert(s);
249
250     if (s->state == state)
251         return 0;
252
253     original_state = s->state;
254
255     suspend_change =
256         (original_state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(state)) ||
257         (PA_SOURCE_IS_OPENED(original_state) && state == PA_SOURCE_SUSPENDED);
258
259     if (s->set_state)
260         if ((ret = s->set_state(s, state)) < 0)
261             return ret;
262
263     if (s->asyncmsgq)
264         if ((ret = pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL)) < 0) {
265
266             if (s->set_state)
267                 s->set_state(s, original_state);
268
269             return ret;
270         }
271
272     s->state = state;
273
274     if (state != PA_SOURCE_UNLINKED) { /* if we enter UNLINKED state pa_source_unlink() will fire the apropriate events */
275         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_STATE_CHANGED], s);
276         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
277     }
278
279     if (suspend_change) {
280         pa_source_output *o;
281         uint32_t idx;
282
283         /* We're suspending or resuming, tell everyone about it */
284
285         for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx)))
286             if (s->state == PA_SOURCE_SUSPENDED &&
287                 (o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND))
288                 pa_source_output_kill(o);
289             else if (o->suspend)
290                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
291     }
292
293
294     return 0;
295 }
296
297 /* Called from main context */
298 void pa_source_put(pa_source *s) {
299     pa_source_assert_ref(s);
300
301     pa_assert(s->state == PA_SOURCE_INIT);
302
303     /* The following fields must be initialized properly when calling _put() */
304     pa_assert(s->asyncmsgq);
305     pa_assert(s->rtpoll);
306     pa_assert(!s->thread_info.min_latency || !s->thread_info.max_latency ||
307               s->thread_info.min_latency <= s->thread_info.max_latency);
308
309     if (!(s->flags & PA_SOURCE_HW_VOLUME_CTRL)) {
310         s->flags |= PA_SOURCE_DECIBEL_VOLUME;
311
312         s->thread_info.soft_volume = s->soft_volume;
313         s->thread_info.soft_muted = s->muted;
314     }
315
316     if (s->flags & PA_SOURCE_DECIBEL_VOLUME)
317         s->n_volume_steps = PA_VOLUME_NORM+1;
318
319     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
320
321     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
322     pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s);
323 }
324
325 /* Called from main context */
326 void pa_source_unlink(pa_source *s) {
327     pa_bool_t linked;
328     pa_source_output *o, *j = NULL;
329
330     pa_assert(s);
331
332     /* See pa_sink_unlink() for a couple of comments how this function
333      * works. */
334
335     linked = PA_SOURCE_IS_LINKED(s->state);
336
337     if (linked)
338         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s);
339
340     if (s->state != PA_SOURCE_UNLINKED)
341         pa_namereg_unregister(s->core, s->name);
342     pa_idxset_remove_by_data(s->core->sources, s, NULL);
343
344     if (s->card)
345         pa_idxset_remove_by_data(s->card->sources, s, NULL);
346
347     while ((o = pa_idxset_first(s->outputs, NULL))) {
348         pa_assert(o != j);
349         pa_source_output_kill(o);
350         j = o;
351     }
352
353     if (linked)
354         source_set_state(s, PA_SOURCE_UNLINKED);
355     else
356         s->state = PA_SOURCE_UNLINKED;
357
358     reset_callbacks(s);
359
360     if (linked) {
361         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
362         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s);
363     }
364 }
365
366 /* Called from main context */
367 static void source_free(pa_object *o) {
368     pa_source_output *so;
369     pa_source *s = PA_SOURCE(o);
370
371     pa_assert(s);
372     pa_assert(pa_source_refcnt(s) == 0);
373
374     if (PA_SOURCE_IS_LINKED(s->state))
375         pa_source_unlink(s);
376
377     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
378
379     pa_idxset_free(s->outputs, NULL, NULL);
380
381     while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
382         pa_source_output_unref(so);
383
384     pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
385
386     if (s->silence.memblock)
387         pa_memblock_unref(s->silence.memblock);
388
389     pa_xfree(s->name);
390     pa_xfree(s->driver);
391
392     if (s->proplist)
393         pa_proplist_free(s->proplist);
394
395     pa_xfree(s);
396 }
397
398 /* Called from main context */
399 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
400     pa_source_assert_ref(s);
401
402     s->asyncmsgq = q;
403 }
404
405 /* Called from main context */
406 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
407     pa_source_assert_ref(s);
408
409     s->rtpoll = p;
410 }
411
412 /* Called from main context */
413 int pa_source_update_status(pa_source*s) {
414     pa_source_assert_ref(s);
415     pa_assert(PA_SOURCE_IS_LINKED(s->state));
416
417     if (s->state == PA_SOURCE_SUSPENDED)
418         return 0;
419
420     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
421 }
422
423 /* Called from main context */
424 int pa_source_suspend(pa_source *s, pa_bool_t suspend) {
425     pa_source_assert_ref(s);
426     pa_assert(PA_SOURCE_IS_LINKED(s->state));
427
428     if (s->monitor_of)
429         return -PA_ERR_NOTSUPPORTED;
430
431     if (suspend)
432         return source_set_state(s, PA_SOURCE_SUSPENDED);
433     else
434         return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
435 }
436
437 /* Called from main context */
438 int pa_source_sync_suspend(pa_source *s) {
439     pa_sink_state_t state;
440
441     pa_source_assert_ref(s);
442     pa_assert(PA_SOURCE_IS_LINKED(s->state));
443     pa_assert(s->monitor_of);
444
445     state = pa_sink_get_state(s->monitor_of);
446
447     if (state == PA_SINK_SUSPENDED)
448         return source_set_state(s, PA_SOURCE_SUSPENDED);
449
450     pa_assert(PA_SINK_IS_OPENED(state));
451
452     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
453 }
454
455 /* Called from main context */
456 pa_queue *pa_source_move_all_start(pa_source *s) {
457     pa_queue *q;
458     pa_source_output *o, *n;
459     uint32_t idx;
460
461     pa_source_assert_ref(s);
462     pa_assert(PA_SOURCE_IS_LINKED(s->state));
463
464     q = pa_queue_new();
465
466     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = n) {
467         n = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx));
468
469         if (pa_source_output_start_move(o) >= 0)
470             pa_queue_push(q, pa_source_output_ref(o));
471     }
472
473     return q;
474 }
475
476 /* Called from main context */
477 void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
478     pa_source_output *o;
479
480     pa_source_assert_ref(s);
481     pa_assert(PA_SOURCE_IS_LINKED(s->state));
482     pa_assert(q);
483
484     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
485         if (pa_source_output_finish_move(o, s, save) < 0)
486             pa_source_output_kill(o);
487
488         pa_source_output_unref(o);
489     }
490
491     pa_queue_free(q, NULL, NULL);
492 }
493
494 /* Called from main context */
495 void pa_source_move_all_fail(pa_queue *q) {
496     pa_source_output *o;
497     pa_assert(q);
498
499     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
500         if (pa_hook_fire(&o->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], o) == PA_HOOK_OK) {
501             pa_source_output_kill(o);
502             pa_source_output_unref(o);
503         }
504     }
505
506     pa_queue_free(q, NULL, NULL);
507 }
508
509 /* Called from IO thread context */
510 void pa_source_process_rewind(pa_source *s, size_t nbytes) {
511     pa_source_output *o;
512     void *state = NULL;
513
514     pa_source_assert_ref(s);
515     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
516
517     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
518         return;
519
520     if (nbytes <= 0)
521         return;
522
523     pa_log_debug("Processing rewind...");
524
525     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
526         pa_source_output_assert_ref(o);
527         pa_source_output_process_rewind(o, nbytes);
528     }
529 }
530
531 /* Called from IO thread context */
532 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
533     pa_source_output *o;
534     void *state = NULL;
535
536     pa_source_assert_ref(s);
537     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
538     pa_assert(chunk);
539
540     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
541         return;
542
543     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
544         pa_memchunk vchunk = *chunk;
545
546         pa_memblock_ref(vchunk.memblock);
547         pa_memchunk_make_writable(&vchunk, 0);
548
549         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
550             pa_silence_memchunk(&vchunk, &s->sample_spec);
551         else
552             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
553
554         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
555             pa_source_output_assert_ref(o);
556
557             if (!o->thread_info.direct_on_input)
558                 pa_source_output_push(o, &vchunk);
559         }
560
561         pa_memblock_unref(vchunk.memblock);
562     } else {
563
564         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
565             pa_source_output_assert_ref(o);
566
567             if (!o->thread_info.direct_on_input)
568                 pa_source_output_push(o, chunk);
569         }
570     }
571 }
572
573 /* Called from IO thread context */
574 void pa_source_post_direct(pa_source*s, pa_source_output *o, const pa_memchunk *chunk) {
575     pa_source_assert_ref(s);
576     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
577     pa_source_output_assert_ref(o);
578     pa_assert(o->thread_info.direct_on_input);
579     pa_assert(chunk);
580
581     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
582         return;
583
584     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
585         pa_memchunk vchunk = *chunk;
586
587         pa_memblock_ref(vchunk.memblock);
588         pa_memchunk_make_writable(&vchunk, 0);
589
590         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
591             pa_silence_memchunk(&vchunk, &s->sample_spec);
592         else
593             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
594
595         pa_source_output_push(o, &vchunk);
596
597         pa_memblock_unref(vchunk.memblock);
598     } else
599         pa_source_output_push(o, chunk);
600 }
601
602 /* Called from main thread */
603 pa_usec_t pa_source_get_latency(pa_source *s) {
604     pa_usec_t usec;
605
606     pa_source_assert_ref(s);
607     pa_assert(PA_SOURCE_IS_LINKED(s->state));
608
609     if (s->state == PA_SOURCE_SUSPENDED)
610         return 0;
611
612     if (!(s->flags & PA_SOURCE_LATENCY))
613         return 0;
614
615     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) == 0);
616
617     return usec;
618 }
619
620 /* Called from main thread */
621 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume) {
622     pa_cvolume old_virtual_volume;
623     pa_bool_t virtual_volume_changed;
624
625     pa_source_assert_ref(s);
626     pa_assert(PA_SOURCE_IS_LINKED(s->state));
627     pa_assert(volume);
628     pa_assert(pa_cvolume_valid(volume));
629     pa_assert(pa_cvolume_compatible(volume, &s->sample_spec));
630
631     old_virtual_volume = s->virtual_volume;
632     s->virtual_volume = *volume;
633     virtual_volume_changed = !pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume);
634
635     if (s->set_volume) {
636         pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
637         s->set_volume(s);
638     } else
639         s->soft_volume = s->virtual_volume;
640
641     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
642
643     if (virtual_volume_changed)
644         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
645 }
646
647 /* Called from main thread. Only to be called by source implementor */
648 void pa_source_set_soft_volume(pa_source *s, const pa_cvolume *volume) {
649     pa_source_assert_ref(s);
650     pa_assert(volume);
651
652     if (PA_SOURCE_IS_LINKED(s->state))
653         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
654     else
655         s->thread_info.soft_volume = *volume;
656 }
657
658 /* Called from main thread */
659 const pa_cvolume *pa_source_get_volume(pa_source *s, pa_bool_t force_refresh) {
660     pa_source_assert_ref(s);
661     pa_assert(PA_SOURCE_IS_LINKED(s->state));
662
663     if (s->refresh_volume || force_refresh) {
664         pa_cvolume old_virtual_volume = s->virtual_volume;
665
666         if (s->get_volume)
667             s->get_volume(s);
668
669         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, NULL, 0, NULL) == 0);
670
671         if (!pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume))
672             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
673     }
674
675     return &s->virtual_volume;
676 }
677
678 /* Called from main thread */
679 void pa_source_volume_changed(pa_source *s, const pa_cvolume *new_volume) {
680     pa_source_assert_ref(s);
681
682     /* The source implementor may call this if the volume changed to make sure everyone is notified */
683
684     if (pa_cvolume_equal(&s->virtual_volume, new_volume))
685         return;
686
687     s->virtual_volume = *new_volume;
688     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
689 }
690
691 /* Called from main thread */
692 void pa_source_set_mute(pa_source *s, pa_bool_t mute) {
693     pa_bool_t old_muted;
694
695     pa_source_assert_ref(s);
696     pa_assert(PA_SOURCE_IS_LINKED(s->state));
697
698     old_muted = s->muted;
699     s->muted = mute;
700
701     if (s->set_mute)
702         s->set_mute(s);
703
704     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
705
706     if (old_muted != s->muted)
707         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
708 }
709
710 /* Called from main thread */
711 pa_bool_t pa_source_get_mute(pa_source *s, pa_bool_t force_refresh) {
712     pa_source_assert_ref(s);
713     pa_assert(PA_SOURCE_IS_LINKED(s->state));
714
715     if (s->refresh_muted || force_refresh) {
716         pa_bool_t old_muted = s->muted;
717
718         if (s->get_mute)
719             s->get_mute(s);
720
721         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, NULL, 0, NULL) == 0);
722
723         if (old_muted != s->muted)
724             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
725     }
726
727     return s->muted;
728 }
729
730 /* Called from main thread */
731 void pa_source_mute_changed(pa_source *s, pa_bool_t new_muted) {
732     pa_source_assert_ref(s);
733
734     /* The source implementor may call this if the mute state changed to make sure everyone is notified */
735
736     if (s->muted == new_muted)
737         return;
738
739     s->muted = new_muted;
740     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
741 }
742
743 /* Called from main thread */
744 pa_bool_t pa_source_update_proplist(pa_source *s, pa_update_mode_t mode, pa_proplist *p) {
745     pa_source_assert_ref(s);
746
747     if (p)
748         pa_proplist_update(s->proplist, mode, p);
749
750     if (PA_SOURCE_IS_LINKED(s->state)) {
751         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
752         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
753     }
754
755     return TRUE;
756 }
757
758 /* Called from main thread */
759 void pa_source_set_description(pa_source *s, const char *description) {
760     const char *old;
761     pa_source_assert_ref(s);
762
763     if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION))
764         return;
765
766     old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
767
768     if (old && description && !strcmp(old, description))
769         return;
770
771     if (description)
772         pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description);
773     else
774         pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
775
776     if (PA_SOURCE_IS_LINKED(s->state)) {
777         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
778         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
779     }
780 }
781
782 /* Called from main thread */
783 unsigned pa_source_linked_by(pa_source *s) {
784     pa_source_assert_ref(s);
785     pa_assert(PA_SOURCE_IS_LINKED(s->state));
786
787     return pa_idxset_size(s->outputs);
788 }
789
790 /* Called from main thread */
791 unsigned pa_source_used_by(pa_source *s) {
792     unsigned ret;
793
794     pa_source_assert_ref(s);
795     pa_assert(PA_SOURCE_IS_LINKED(s->state));
796
797     ret = pa_idxset_size(s->outputs);
798     pa_assert(ret >= s->n_corked);
799
800     return ret - s->n_corked;
801 }
802
803 /* Called from main thread */
804 unsigned pa_source_check_suspend(pa_source *s) {
805     unsigned ret;
806     pa_source_output *o;
807     uint32_t idx;
808
809     pa_source_assert_ref(s);
810
811     if (!PA_SOURCE_IS_LINKED(s->state))
812         return 0;
813
814     ret = 0;
815
816     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx))) {
817         pa_source_output_state_t st;
818
819         st = pa_source_output_get_state(o);
820         pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(st));
821
822         if (st == PA_SOURCE_OUTPUT_CORKED)
823             continue;
824
825         if (o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND)
826             continue;
827
828         ret ++;
829     }
830
831     return ret;
832 }
833
834 /* Called from IO thread, except when it is not */
835 int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
836     pa_source *s = PA_SOURCE(object);
837     pa_source_assert_ref(s);
838
839     switch ((pa_source_message_t) code) {
840
841         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
842             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
843
844             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
845
846             if (o->direct_on_input) {
847                 o->thread_info.direct_on_input = o->direct_on_input;
848                 pa_hashmap_put(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index), o);
849             }
850
851             pa_assert(!o->thread_info.attached);
852             o->thread_info.attached = TRUE;
853
854             if (o->attach)
855                 o->attach(o);
856
857             pa_source_output_set_state_within_thread(o, o->state);
858
859             if (o->thread_info.requested_source_latency != (pa_usec_t) -1)
860                 pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
861
862             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
863
864             /* We don't just invalidate the requested latency here,
865              * because if we are in a move we might need to fix up the
866              * requested latency. */
867             pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
868
869             return 0;
870         }
871
872         case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
873             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
874
875             pa_source_output_set_state_within_thread(o, o->state);
876
877             if (o->detach)
878                 o->detach(o);
879
880             pa_assert(o->thread_info.attached);
881             o->thread_info.attached = FALSE;
882
883             if (o->thread_info.direct_on_input) {
884                 pa_hashmap_remove(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index));
885                 o->thread_info.direct_on_input = NULL;
886             }
887
888             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
889                 pa_source_output_unref(o);
890
891             pa_source_invalidate_requested_latency(s);
892
893             return 0;
894         }
895
896         case PA_SOURCE_MESSAGE_SET_VOLUME:
897             s->thread_info.soft_volume = s->soft_volume;
898             return 0;
899
900         case PA_SOURCE_MESSAGE_GET_VOLUME:
901             return 0;
902
903         case PA_SOURCE_MESSAGE_SET_MUTE:
904             s->thread_info.soft_muted = s->muted;
905             return 0;
906
907         case PA_SOURCE_MESSAGE_GET_MUTE:
908             return 0;
909
910         case PA_SOURCE_MESSAGE_SET_STATE:
911             s->thread_info.state = PA_PTR_TO_UINT(userdata);
912             return 0;
913
914         case PA_SOURCE_MESSAGE_DETACH:
915
916             /* Detach all streams */
917             pa_source_detach_within_thread(s);
918             return 0;
919
920         case PA_SOURCE_MESSAGE_ATTACH:
921
922             /* Reattach all streams */
923             pa_source_attach_within_thread(s);
924             return 0;
925
926         case PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY: {
927
928             pa_usec_t *usec = userdata;
929             *usec = pa_source_get_requested_latency_within_thread(s);
930
931             if (*usec == (pa_usec_t) -1)
932                 *usec = s->thread_info.max_latency;
933
934             return 0;
935         }
936
937         case PA_SOURCE_MESSAGE_SET_LATENCY_RANGE: {
938             pa_usec_t *r = userdata;
939
940             pa_source_set_latency_range_within_thread(s, r[0], r[1]);
941
942             return 0;
943         }
944
945         case PA_SOURCE_MESSAGE_GET_LATENCY_RANGE: {
946             pa_usec_t *r = userdata;
947
948             r[0] = s->thread_info.min_latency;
949             r[1] = s->thread_info.max_latency;
950
951             return 0;
952         }
953
954         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
955
956             *((size_t*) userdata) = s->thread_info.max_rewind;
957             return 0;
958
959         case PA_SOURCE_MESSAGE_GET_LATENCY:
960
961             if (s->monitor_of) {
962                 *((pa_usec_t*) userdata) = 0;
963                 return 0;
964             }
965
966             /* Implementors need to overwrite this implementation! */
967             return -1;
968
969         case PA_SOURCE_MESSAGE_MAX:
970             ;
971     }
972
973     return -1;
974 }
975
976 /* Called from main thread */
977 int pa_source_suspend_all(pa_core *c, pa_bool_t suspend) {
978     uint32_t idx;
979     pa_source *source;
980     int ret = 0;
981
982     pa_core_assert_ref(c);
983
984     for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) {
985         int r;
986
987         if (source->monitor_of)
988             continue;
989
990         if ((r = pa_source_suspend(source, suspend)) < 0)
991             ret = r;
992     }
993
994     return ret;
995 }
996
997 /* Called from main thread */
998 void pa_source_detach(pa_source *s) {
999     pa_source_assert_ref(s);
1000     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1001
1002     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL) == 0);
1003 }
1004
1005 /* Called from main thread */
1006 void pa_source_attach(pa_source *s) {
1007     pa_source_assert_ref(s);
1008     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1009
1010     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL) == 0);
1011 }
1012
1013 /* Called from IO thread */
1014 void pa_source_detach_within_thread(pa_source *s) {
1015     pa_source_output *o;
1016     void *state = NULL;
1017
1018     pa_source_assert_ref(s);
1019     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1020
1021     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1022         if (o->detach)
1023             o->detach(o);
1024 }
1025
1026 /* Called from IO thread */
1027 void pa_source_attach_within_thread(pa_source *s) {
1028     pa_source_output *o;
1029     void *state = NULL;
1030
1031     pa_source_assert_ref(s);
1032     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1033
1034     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1035         if (o->attach)
1036             o->attach(o);
1037 }
1038
1039 /* Called from IO thread */
1040 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
1041     pa_usec_t result = (pa_usec_t) -1;
1042     pa_source_output *o;
1043     void *state = NULL;
1044
1045     pa_source_assert_ref(s);
1046
1047     if (s->thread_info.requested_latency_valid)
1048         return s->thread_info.requested_latency;
1049
1050     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1051
1052         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
1053             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
1054             result = o->thread_info.requested_source_latency;
1055
1056     if (result != (pa_usec_t) -1) {
1057         if (s->thread_info.max_latency > 0 && result > s->thread_info.max_latency)
1058             result = s->thread_info.max_latency;
1059
1060         if (s->thread_info.min_latency > 0 && result < s->thread_info.min_latency)
1061             result = s->thread_info.min_latency;
1062     }
1063
1064     s->thread_info.requested_latency = result;
1065     s->thread_info.requested_latency_valid = TRUE;
1066
1067     return result;
1068 }
1069
1070 /* Called from main thread */
1071 pa_usec_t pa_source_get_requested_latency(pa_source *s) {
1072     pa_usec_t usec;
1073
1074     pa_source_assert_ref(s);
1075     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1076
1077     if (s->state == PA_SOURCE_SUSPENDED)
1078         return 0;
1079
1080     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0);
1081
1082     return usec;
1083 }
1084
1085 /* Called from IO thread */
1086 void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
1087     pa_source_output *o;
1088     void *state = NULL;
1089
1090     pa_source_assert_ref(s);
1091
1092     if (max_rewind == s->thread_info.max_rewind)
1093         return;
1094
1095     s->thread_info.max_rewind = max_rewind;
1096
1097     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1098         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1099             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1100     }
1101 }
1102
1103 void pa_source_invalidate_requested_latency(pa_source *s) {
1104     pa_source_output *o;
1105     void *state = NULL;
1106
1107     pa_source_assert_ref(s);
1108
1109     s->thread_info.requested_latency_valid = FALSE;
1110
1111     if (s->update_requested_latency)
1112         s->update_requested_latency(s);
1113
1114     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1115         if (o->update_source_requested_latency)
1116             o->update_source_requested_latency(o);
1117
1118     if (s->monitor_of)
1119         pa_sink_invalidate_requested_latency(s->monitor_of);
1120 }
1121
1122 void pa_source_set_latency_range(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1123     pa_source_assert_ref(s);
1124
1125     /* min_latency == 0:           no limit
1126      * min_latency anything else:  specified limit
1127      *
1128      * Similar for max_latency */
1129
1130     if (min_latency < ABSOLUTE_MIN_LATENCY)
1131         min_latency = ABSOLUTE_MIN_LATENCY;
1132
1133     if (max_latency <= 0 ||
1134         max_latency > ABSOLUTE_MAX_LATENCY)
1135         max_latency = ABSOLUTE_MAX_LATENCY;
1136
1137     pa_assert(min_latency <= max_latency);
1138
1139     if (PA_SOURCE_IS_LINKED(s->state)) {
1140         pa_usec_t r[2];
1141
1142         r[0] = min_latency;
1143         r[1] = max_latency;
1144
1145         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_LATENCY_RANGE, r, 0, NULL) == 0);
1146     } else {
1147         s->thread_info.min_latency = min_latency;
1148         s->thread_info.max_latency = max_latency;
1149
1150         s->thread_info.requested_latency_valid = FALSE;
1151     }
1152 }
1153
1154 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency) {
1155    pa_source_assert_ref(s);
1156    pa_assert(min_latency);
1157    pa_assert(max_latency);
1158
1159    if (PA_SOURCE_IS_LINKED(s->state)) {
1160        pa_usec_t r[2] = { 0, 0 };
1161
1162        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY_RANGE, r, 0, NULL) == 0);
1163
1164        *min_latency = r[0];
1165        *max_latency = r[1];
1166    } else {
1167        *min_latency = s->thread_info.min_latency;
1168        *max_latency = s->thread_info.max_latency;
1169    }
1170 }
1171
1172 /* Called from IO thread */
1173 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1174     pa_source_output *o;
1175     void *state = NULL;
1176
1177     pa_source_assert_ref(s);
1178
1179     pa_assert(min_latency >= ABSOLUTE_MIN_LATENCY);
1180     pa_assert(max_latency <= ABSOLUTE_MAX_LATENCY);
1181     pa_assert(min_latency <= max_latency);
1182
1183     s->thread_info.min_latency = min_latency;
1184     s->thread_info.max_latency = max_latency;
1185
1186     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1187         if (o->update_source_latency_range)
1188             o->update_source_latency_range(o);
1189
1190     pa_source_invalidate_requested_latency(s);
1191 }
1192
1193 size_t pa_source_get_max_rewind(pa_source *s) {
1194     size_t r;
1195     pa_source_assert_ref(s);
1196
1197     if (!PA_SOURCE_IS_LINKED(s->state))
1198         return s->thread_info.max_rewind;
1199
1200     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MAX_REWIND, &r, 0, NULL) == 0);
1201
1202     return r;
1203 }