core: be a bit more verbose when registering a sink/source fails
[platform/upstream/pulseaudio.git] / src / pulsecore / source.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <pulse/utf8.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/timeval.h>
34 #include <pulse/util.h>
35
36 #include <pulsecore/source-output.h>
37 #include <pulsecore/namereg.h>
38 #include <pulsecore/core-subscribe.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/sample-util.h>
41
42 #include "source.h"
43
44 #define ABSOLUTE_MIN_LATENCY (500)
45 #define ABSOLUTE_MAX_LATENCY (10*PA_USEC_PER_SEC)
46 #define DEFAULT_FIXED_LATENCY (250*PA_USEC_PER_MSEC)
47
48 static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
49
50 static void source_free(pa_object *o);
51
52 pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) {
53     pa_assert(data);
54
55     memset(data, 0, sizeof(*data));
56     data->proplist = pa_proplist_new();
57
58     return data;
59 }
60
61 void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) {
62     pa_assert(data);
63
64     pa_xfree(data->name);
65     data->name = pa_xstrdup(name);
66 }
67
68 void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) {
69     pa_assert(data);
70
71     if ((data->sample_spec_is_set = !!spec))
72         data->sample_spec = *spec;
73 }
74
75 void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) {
76     pa_assert(data);
77
78     if ((data->channel_map_is_set = !!map))
79         data->channel_map = *map;
80 }
81
82 void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) {
83     pa_assert(data);
84
85     if ((data->volume_is_set = !!volume))
86         data->volume = *volume;
87 }
88
89 void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) {
90     pa_assert(data);
91
92     data->muted_is_set = TRUE;
93     data->muted = !!mute;
94 }
95
96 void pa_source_new_data_set_port(pa_source_new_data *data, const char *port) {
97     pa_assert(data);
98
99     pa_xfree(data->active_port);
100     data->active_port = pa_xstrdup(port);
101 }
102
103 void pa_source_new_data_done(pa_source_new_data *data) {
104     pa_assert(data);
105
106     pa_proplist_free(data->proplist);
107
108     if (data->ports) {
109         pa_device_port *p;
110
111         while ((p = pa_hashmap_steal_first(data->ports)))
112             pa_device_port_free(p);
113
114         pa_hashmap_free(data->ports, NULL, NULL);
115     }
116
117     pa_xfree(data->name);
118     pa_xfree(data->active_port);
119 }
120
121 /* Called from main context */
122 static void reset_callbacks(pa_source *s) {
123     pa_assert(s);
124
125     s->set_state = NULL;
126     s->get_volume = NULL;
127     s->set_volume = NULL;
128     s->get_mute = NULL;
129     s->set_mute = NULL;
130     s->update_requested_latency = NULL;
131     s->set_port = NULL;
132 }
133
134 /* Called from main context */
135 pa_source* pa_source_new(
136         pa_core *core,
137         pa_source_new_data *data,
138         pa_source_flags_t flags) {
139
140     pa_source *s;
141     const char *name;
142     char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
143     char *pt;
144
145     pa_assert(core);
146     pa_assert(data);
147     pa_assert(data->name);
148
149     s = pa_msgobject_new(pa_source);
150
151     if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) {
152         pa_log_debug("Failed to register name %s.", data->name);
153         pa_xfree(s);
154         return NULL;
155     }
156
157     pa_source_new_data_set_name(data, name);
158
159     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) {
160         pa_xfree(s);
161         pa_namereg_unregister(core, name);
162         return NULL;
163     }
164
165     /* FIXME, need to free s here on failure */
166
167     pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver));
168     pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]);
169
170     pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec));
171
172     if (!data->channel_map_is_set)
173         pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT));
174
175     pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map));
176     pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels);
177
178     if (!data->volume_is_set)
179         pa_cvolume_reset(&data->volume, data->sample_spec.channels);
180
181     pa_return_null_if_fail(pa_cvolume_valid(&data->volume));
182     pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels);
183
184     if (!data->muted_is_set)
185         data->muted = FALSE;
186
187     if (data->card)
188         pa_proplist_update(data->proplist, PA_UPDATE_MERGE, data->card->proplist);
189
190     pa_device_init_description(data->proplist);
191     pa_device_init_icon(data->proplist, FALSE);
192     pa_device_init_intended_roles(data->proplist);
193
194     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) {
195         pa_xfree(s);
196         pa_namereg_unregister(core, name);
197         return NULL;
198     }
199
200     s->parent.parent.free = source_free;
201     s->parent.process_msg = pa_source_process_msg;
202
203     s->core = core;
204     s->state = PA_SOURCE_INIT;
205     s->flags = flags;
206     s->suspend_cause = 0;
207     s->name = pa_xstrdup(name);
208     s->proplist = pa_proplist_copy(data->proplist);
209     s->driver = pa_xstrdup(pa_path_get_filename(data->driver));
210     s->module = data->module;
211     s->card = data->card;
212
213     s->sample_spec = data->sample_spec;
214     s->channel_map = data->channel_map;
215
216     s->outputs = pa_idxset_new(NULL, NULL);
217     s->n_corked = 0;
218     s->monitor_of = NULL;
219
220     s->virtual_volume = data->volume;
221     pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
222     s->base_volume = PA_VOLUME_NORM;
223     s->n_volume_steps = PA_VOLUME_NORM+1;
224     s->muted = data->muted;
225     s->refresh_volume = s->refresh_muted = FALSE;
226
227     s->fixed_latency = flags & PA_SOURCE_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
228
229     reset_callbacks(s);
230     s->userdata = NULL;
231
232     s->asyncmsgq = NULL;
233     s->rtpoll = NULL;
234
235     /* As a minor optimization we just steal the list instead of
236      * copying it here */
237     s->ports = data->ports;
238     data->ports = NULL;
239
240     s->active_port = NULL;
241     s->save_port = FALSE;
242
243     if (data->active_port && s->ports)
244         if ((s->active_port = pa_hashmap_get(s->ports, data->active_port)))
245             s->save_port = data->save_port;
246
247     if (!s->active_port && s->ports) {
248         void *state;
249         pa_device_port *p;
250
251         PA_HASHMAP_FOREACH(p, s->ports, state)
252             if (!s->active_port || p->priority > s->active_port->priority)
253                 s->active_port = p;
254     }
255
256     s->save_volume = data->save_volume;
257     s->save_muted = data->save_muted;
258
259     pa_silence_memchunk_get(
260             &core->silence_cache,
261             core->mempool,
262             &s->silence,
263             &s->sample_spec,
264             0);
265
266     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
267     s->thread_info.soft_volume = s->soft_volume;
268     s->thread_info.soft_muted = s->muted;
269     s->thread_info.state = s->state;
270     s->thread_info.max_rewind = 0;
271     s->thread_info.requested_latency_valid = FALSE;
272     s->thread_info.requested_latency = 0;
273     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
274     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
275
276     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
277
278     if (s->card)
279         pa_assert_se(pa_idxset_put(s->card->sources, s, NULL) >= 0);
280
281     pt = pa_proplist_to_string_sep(s->proplist, "\n    ");
282     pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s\n    %s",
283                 s->index,
284                 s->name,
285                 pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec),
286                 pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map),
287                 pt);
288     pa_xfree(pt);
289
290     return s;
291 }
292
293 /* Called from main context */
294 static int source_set_state(pa_source *s, pa_source_state_t state) {
295     int ret;
296     pa_bool_t suspend_change;
297     pa_source_state_t original_state;
298
299     pa_assert(s);
300
301     if (s->state == state)
302         return 0;
303
304     original_state = s->state;
305
306     suspend_change =
307         (original_state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(state)) ||
308         (PA_SOURCE_IS_OPENED(original_state) && state == PA_SOURCE_SUSPENDED);
309
310     if (s->set_state)
311         if ((ret = s->set_state(s, state)) < 0)
312             return ret;
313
314     if (s->asyncmsgq)
315         if ((ret = pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL)) < 0) {
316
317             if (s->set_state)
318                 s->set_state(s, original_state);
319
320             return ret;
321         }
322
323     s->state = state;
324
325     if (state != PA_SOURCE_UNLINKED) { /* if we enter UNLINKED state pa_source_unlink() will fire the apropriate events */
326         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_STATE_CHANGED], s);
327         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
328     }
329
330     if (suspend_change) {
331         pa_source_output *o;
332         uint32_t idx;
333
334         /* We're suspending or resuming, tell everyone about it */
335
336         for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx)))
337             if (s->state == PA_SOURCE_SUSPENDED &&
338                 (o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND))
339                 pa_source_output_kill(o);
340             else if (o->suspend)
341                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
342     }
343
344
345     return 0;
346 }
347
348 /* Called from main context */
349 void pa_source_put(pa_source *s) {
350     pa_source_assert_ref(s);
351
352     pa_assert(s->state == PA_SOURCE_INIT);
353
354     /* The following fields must be initialized properly when calling _put() */
355     pa_assert(s->asyncmsgq);
356     pa_assert(s->rtpoll);
357     pa_assert(s->thread_info.min_latency <= s->thread_info.max_latency);
358
359     /* Generally, flags should be initialized via pa_source_new(). As
360      * a special exception we allow volume related flags to be set
361      * between _new() and _put(). */
362
363     if (!(s->flags & PA_SOURCE_HW_VOLUME_CTRL))
364         s->flags |= PA_SOURCE_DECIBEL_VOLUME;
365
366     s->thread_info.soft_volume = s->soft_volume;
367     s->thread_info.soft_muted = s->muted;
368
369     pa_assert((s->flags & PA_SOURCE_HW_VOLUME_CTRL) || (s->base_volume == PA_VOLUME_NORM && s->flags & PA_SOURCE_DECIBEL_VOLUME));
370     pa_assert(!(s->flags & PA_SOURCE_DECIBEL_VOLUME) || s->n_volume_steps == PA_VOLUME_NORM+1);
371     pa_assert(!(s->flags & PA_SOURCE_DYNAMIC_LATENCY) == (s->fixed_latency != 0));
372
373     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
374
375     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
376     pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s);
377 }
378
379 /* Called from main context */
380 void pa_source_unlink(pa_source *s) {
381     pa_bool_t linked;
382     pa_source_output *o, *j = NULL;
383
384     pa_assert(s);
385
386     /* See pa_sink_unlink() for a couple of comments how this function
387      * works. */
388
389     linked = PA_SOURCE_IS_LINKED(s->state);
390
391     if (linked)
392         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s);
393
394     if (s->state != PA_SOURCE_UNLINKED)
395         pa_namereg_unregister(s->core, s->name);
396     pa_idxset_remove_by_data(s->core->sources, s, NULL);
397
398     if (s->card)
399         pa_idxset_remove_by_data(s->card->sources, s, NULL);
400
401     while ((o = pa_idxset_first(s->outputs, NULL))) {
402         pa_assert(o != j);
403         pa_source_output_kill(o);
404         j = o;
405     }
406
407     if (linked)
408         source_set_state(s, PA_SOURCE_UNLINKED);
409     else
410         s->state = PA_SOURCE_UNLINKED;
411
412     reset_callbacks(s);
413
414     if (linked) {
415         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
416         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s);
417     }
418 }
419
420 /* Called from main context */
421 static void source_free(pa_object *o) {
422     pa_source_output *so;
423     pa_source *s = PA_SOURCE(o);
424
425     pa_assert(s);
426     pa_assert(pa_source_refcnt(s) == 0);
427
428     if (PA_SOURCE_IS_LINKED(s->state))
429         pa_source_unlink(s);
430
431     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
432
433     pa_idxset_free(s->outputs, NULL, NULL);
434
435     while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
436         pa_source_output_unref(so);
437
438     pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
439
440     if (s->silence.memblock)
441         pa_memblock_unref(s->silence.memblock);
442
443     pa_xfree(s->name);
444     pa_xfree(s->driver);
445
446     if (s->proplist)
447         pa_proplist_free(s->proplist);
448
449     if (s->ports) {
450         pa_device_port *p;
451
452         while ((p = pa_hashmap_steal_first(s->ports)))
453             pa_device_port_free(p);
454
455         pa_hashmap_free(s->ports, NULL, NULL);
456     }
457
458     pa_xfree(s);
459 }
460
461 /* Called from main context */
462 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
463     pa_source_assert_ref(s);
464
465     s->asyncmsgq = q;
466 }
467
468 /* Called from main context */
469 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
470     pa_source_assert_ref(s);
471
472     s->rtpoll = p;
473 }
474
475 /* Called from main context */
476 int pa_source_update_status(pa_source*s) {
477     pa_source_assert_ref(s);
478     pa_assert(PA_SOURCE_IS_LINKED(s->state));
479
480     if (s->state == PA_SOURCE_SUSPENDED)
481         return 0;
482
483     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
484 }
485
486 /* Called from main context */
487 int pa_source_suspend(pa_source *s, pa_bool_t suspend, pa_suspend_cause_t cause) {
488     pa_source_assert_ref(s);
489     pa_assert(PA_SOURCE_IS_LINKED(s->state));
490     pa_assert(cause != 0);
491
492     if (s->monitor_of)
493         return -PA_ERR_NOTSUPPORTED;
494
495     if (suspend)
496         s->suspend_cause |= cause;
497     else
498         s->suspend_cause &= ~cause;
499
500     if ((pa_source_get_state(s) == PA_SOURCE_SUSPENDED) == !!s->suspend_cause)
501         return 0;
502
503     pa_log_debug("Suspend cause of source %s is 0x%04x, %s", s->name, s->suspend_cause, s->suspend_cause ? "suspending" : "resuming");
504
505     if (suspend)
506         return source_set_state(s, PA_SOURCE_SUSPENDED);
507     else
508         return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
509 }
510
511 /* Called from main context */
512 int pa_source_sync_suspend(pa_source *s) {
513     pa_sink_state_t state;
514
515     pa_source_assert_ref(s);
516     pa_assert(PA_SOURCE_IS_LINKED(s->state));
517     pa_assert(s->monitor_of);
518
519     state = pa_sink_get_state(s->monitor_of);
520
521     if (state == PA_SINK_SUSPENDED)
522         return source_set_state(s, PA_SOURCE_SUSPENDED);
523
524     pa_assert(PA_SINK_IS_OPENED(state));
525
526     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
527 }
528
529 /* Called from main context */
530 pa_queue *pa_source_move_all_start(pa_source *s, pa_queue *q) {
531     pa_source_output *o, *n;
532     uint32_t idx;
533
534     pa_source_assert_ref(s);
535     pa_assert(PA_SOURCE_IS_LINKED(s->state));
536
537     if (!q)
538         q = pa_queue_new();
539
540     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = n) {
541         n = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx));
542
543         pa_source_output_ref(o);
544
545         if (pa_source_output_start_move(o) >= 0)
546             pa_queue_push(q, o);
547         else
548             pa_source_output_unref(o);
549     }
550
551     return q;
552 }
553
554 /* Called from main context */
555 void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
556     pa_source_output *o;
557
558     pa_source_assert_ref(s);
559     pa_assert(PA_SOURCE_IS_LINKED(s->state));
560     pa_assert(q);
561
562     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
563         if (pa_source_output_finish_move(o, s, save) < 0)
564             pa_source_output_kill(o);
565
566         pa_source_output_unref(o);
567     }
568
569     pa_queue_free(q, NULL, NULL);
570 }
571
572 /* Called from main context */
573 void pa_source_move_all_fail(pa_queue *q) {
574     pa_source_output *o;
575     pa_assert(q);
576
577     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
578         if (pa_hook_fire(&o->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FAIL], o) == PA_HOOK_OK) {
579             pa_source_output_kill(o);
580             pa_source_output_unref(o);
581         }
582     }
583
584     pa_queue_free(q, NULL, NULL);
585 }
586
587 /* Called from IO thread context */
588 void pa_source_process_rewind(pa_source *s, size_t nbytes) {
589     pa_source_output *o;
590     void *state = NULL;
591
592     pa_source_assert_ref(s);
593     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
594
595     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
596         return;
597
598     if (nbytes <= 0)
599         return;
600
601     pa_log_debug("Processing rewind...");
602
603     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
604         pa_source_output_assert_ref(o);
605         pa_source_output_process_rewind(o, nbytes);
606     }
607 }
608
609 /* Called from IO thread context */
610 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
611     pa_source_output *o;
612     void *state = NULL;
613
614     pa_source_assert_ref(s);
615     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
616     pa_assert(chunk);
617
618     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
619         return;
620
621     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
622         pa_memchunk vchunk = *chunk;
623
624         pa_memblock_ref(vchunk.memblock);
625         pa_memchunk_make_writable(&vchunk, 0);
626
627         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
628             pa_silence_memchunk(&vchunk, &s->sample_spec);
629         else
630             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
631
632         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
633             pa_source_output_assert_ref(o);
634
635             if (!o->thread_info.direct_on_input)
636                 pa_source_output_push(o, &vchunk);
637         }
638
639         pa_memblock_unref(vchunk.memblock);
640     } else {
641
642         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
643             pa_source_output_assert_ref(o);
644
645             if (!o->thread_info.direct_on_input)
646                 pa_source_output_push(o, chunk);
647         }
648     }
649 }
650
651 /* Called from IO thread context */
652 void pa_source_post_direct(pa_source*s, pa_source_output *o, const pa_memchunk *chunk) {
653     pa_source_assert_ref(s);
654     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
655     pa_source_output_assert_ref(o);
656     pa_assert(o->thread_info.direct_on_input);
657     pa_assert(chunk);
658
659     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
660         return;
661
662     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
663         pa_memchunk vchunk = *chunk;
664
665         pa_memblock_ref(vchunk.memblock);
666         pa_memchunk_make_writable(&vchunk, 0);
667
668         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
669             pa_silence_memchunk(&vchunk, &s->sample_spec);
670         else
671             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
672
673         pa_source_output_push(o, &vchunk);
674
675         pa_memblock_unref(vchunk.memblock);
676     } else
677         pa_source_output_push(o, chunk);
678 }
679
680 /* Called from main thread */
681 pa_usec_t pa_source_get_latency(pa_source *s) {
682     pa_usec_t usec;
683
684     pa_source_assert_ref(s);
685     pa_assert(PA_SOURCE_IS_LINKED(s->state));
686
687     if (s->state == PA_SOURCE_SUSPENDED)
688         return 0;
689
690     if (!(s->flags & PA_SOURCE_LATENCY))
691         return 0;
692
693     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) == 0);
694
695     return usec;
696 }
697
698 /* Called from IO thread */
699 pa_usec_t pa_source_get_latency_within_thread(pa_source *s) {
700     pa_usec_t usec = 0;
701     pa_msgobject *o;
702
703     pa_source_assert_ref(s);
704     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
705
706     /* The returned value is supposed to be in the time domain of the sound card! */
707
708     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
709         return 0;
710
711     if (!(s->flags & PA_SOURCE_LATENCY))
712         return 0;
713
714     o = PA_MSGOBJECT(s);
715
716     /* We probably should make this a proper vtable callback instead of going through process_msg() */
717
718     if (o->process_msg(o, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
719         return -1;
720
721     return usec;
722 }
723
724 /* Called from main thread */
725 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume, pa_bool_t save) {
726     pa_cvolume old_virtual_volume;
727     pa_bool_t virtual_volume_changed;
728
729     pa_source_assert_ref(s);
730     pa_assert(PA_SOURCE_IS_LINKED(s->state));
731     pa_assert(volume);
732     pa_assert(pa_cvolume_valid(volume));
733     pa_assert(pa_cvolume_compatible(volume, &s->sample_spec));
734
735     old_virtual_volume = s->virtual_volume;
736     s->virtual_volume = *volume;
737     virtual_volume_changed = !pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume);
738     s->save_volume = (!virtual_volume_changed && s->save_volume) || save;
739
740     if (s->set_volume) {
741         pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
742         s->set_volume(s);
743     } else
744         s->soft_volume = s->virtual_volume;
745
746     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
747
748     if (virtual_volume_changed)
749         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
750 }
751
752 /* Called from main thread. Only to be called by source implementor */
753 void pa_source_set_soft_volume(pa_source *s, const pa_cvolume *volume) {
754     pa_source_assert_ref(s);
755     pa_assert(volume);
756
757     if (PA_SOURCE_IS_LINKED(s->state))
758         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
759     else
760         s->thread_info.soft_volume = *volume;
761 }
762
763 /* Called from main thread */
764 const pa_cvolume *pa_source_get_volume(pa_source *s, pa_bool_t force_refresh) {
765     pa_source_assert_ref(s);
766     pa_assert(PA_SOURCE_IS_LINKED(s->state));
767
768     if (s->refresh_volume || force_refresh) {
769         pa_cvolume old_virtual_volume = s->virtual_volume;
770
771         if (s->get_volume)
772             s->get_volume(s);
773
774         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, NULL, 0, NULL) == 0);
775
776         if (!pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume))
777             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
778     }
779
780     return &s->virtual_volume;
781 }
782
783 /* Called from main thread */
784 void pa_source_volume_changed(pa_source *s, const pa_cvolume *new_volume, pa_bool_t save) {
785     pa_source_assert_ref(s);
786
787     /* The source implementor may call this if the volume changed to make sure everyone is notified */
788
789     if (pa_cvolume_equal(&s->virtual_volume, new_volume)) {
790         s->save_volume = s->save_volume || save;
791         return;
792     }
793
794     s->virtual_volume = *new_volume;
795     s->save_volume = save;
796
797     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
798 }
799
800 /* Called from main thread */
801 void pa_source_set_mute(pa_source *s, pa_bool_t mute, pa_bool_t save) {
802     pa_bool_t old_muted;
803
804     pa_source_assert_ref(s);
805     pa_assert(PA_SOURCE_IS_LINKED(s->state));
806
807     old_muted = s->muted;
808     s->muted = mute;
809     s->save_muted = (old_muted == s->muted && s->save_muted) || save;
810
811     if (s->set_mute)
812         s->set_mute(s);
813
814     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
815
816     if (old_muted != s->muted)
817         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
818 }
819
820 /* Called from main thread */
821 pa_bool_t pa_source_get_mute(pa_source *s, pa_bool_t force_refresh) {
822     pa_source_assert_ref(s);
823     pa_assert(PA_SOURCE_IS_LINKED(s->state));
824
825     if (s->refresh_muted || force_refresh) {
826         pa_bool_t old_muted = s->muted;
827
828         if (s->get_mute)
829             s->get_mute(s);
830
831         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, NULL, 0, NULL) == 0);
832
833         if (old_muted != s->muted) {
834             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
835
836             /* Make sure the soft mute status stays in sync */
837             pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
838         }
839     }
840
841     return s->muted;
842 }
843
844 /* Called from main thread */
845 void pa_source_mute_changed(pa_source *s, pa_bool_t new_muted, pa_bool_t save) {
846     pa_source_assert_ref(s);
847
848     /* The source implementor may call this if the mute state changed to make sure everyone is notified */
849
850     if (s->muted == new_muted) {
851         s->save_muted = s->save_muted || save;
852         return;
853     }
854
855     s->muted = new_muted;
856     s->save_muted = save;
857
858     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
859 }
860
861 /* Called from main thread */
862 pa_bool_t pa_source_update_proplist(pa_source *s, pa_update_mode_t mode, pa_proplist *p) {
863     pa_source_assert_ref(s);
864
865     if (p)
866         pa_proplist_update(s->proplist, mode, p);
867
868     if (PA_SOURCE_IS_LINKED(s->state)) {
869         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
870         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
871     }
872
873     return TRUE;
874 }
875
876 /* Called from main thread */
877 void pa_source_set_description(pa_source *s, const char *description) {
878     const char *old;
879     pa_source_assert_ref(s);
880
881     if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION))
882         return;
883
884     old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
885
886     if (old && description && !strcmp(old, description))
887         return;
888
889     if (description)
890         pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description);
891     else
892         pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
893
894     if (PA_SOURCE_IS_LINKED(s->state)) {
895         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
896         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
897     }
898 }
899
900 /* Called from main thread */
901 unsigned pa_source_linked_by(pa_source *s) {
902     pa_source_assert_ref(s);
903     pa_assert(PA_SOURCE_IS_LINKED(s->state));
904
905     return pa_idxset_size(s->outputs);
906 }
907
908 /* Called from main thread */
909 unsigned pa_source_used_by(pa_source *s) {
910     unsigned ret;
911
912     pa_source_assert_ref(s);
913     pa_assert(PA_SOURCE_IS_LINKED(s->state));
914
915     ret = pa_idxset_size(s->outputs);
916     pa_assert(ret >= s->n_corked);
917
918     return ret - s->n_corked;
919 }
920
921 /* Called from main thread */
922 unsigned pa_source_check_suspend(pa_source *s) {
923     unsigned ret;
924     pa_source_output *o;
925     uint32_t idx;
926
927     pa_source_assert_ref(s);
928
929     if (!PA_SOURCE_IS_LINKED(s->state))
930         return 0;
931
932     ret = 0;
933
934     PA_IDXSET_FOREACH(o, s->outputs, idx) {
935         pa_source_output_state_t st;
936
937         st = pa_source_output_get_state(o);
938         pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(st));
939
940         if (st == PA_SOURCE_OUTPUT_CORKED)
941             continue;
942
943         if (o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND)
944             continue;
945
946         ret ++;
947     }
948
949     return ret;
950 }
951
952 /* Called from IO thread, except when it is not */
953 int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
954     pa_source *s = PA_SOURCE(object);
955     pa_source_assert_ref(s);
956
957     switch ((pa_source_message_t) code) {
958
959         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
960             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
961
962             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
963
964             if (o->direct_on_input) {
965                 o->thread_info.direct_on_input = o->direct_on_input;
966                 pa_hashmap_put(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index), o);
967             }
968
969             pa_assert(!o->thread_info.attached);
970             o->thread_info.attached = TRUE;
971
972             if (o->attach)
973                 o->attach(o);
974
975             pa_source_output_set_state_within_thread(o, o->state);
976
977             if (o->thread_info.requested_source_latency != (pa_usec_t) -1)
978                 pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
979
980             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
981
982             /* We don't just invalidate the requested latency here,
983              * because if we are in a move we might need to fix up the
984              * requested latency. */
985             pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
986
987             return 0;
988         }
989
990         case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
991             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
992
993             pa_source_output_set_state_within_thread(o, o->state);
994
995             if (o->detach)
996                 o->detach(o);
997
998             pa_assert(o->thread_info.attached);
999             o->thread_info.attached = FALSE;
1000
1001             if (o->thread_info.direct_on_input) {
1002                 pa_hashmap_remove(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index));
1003                 o->thread_info.direct_on_input = NULL;
1004             }
1005
1006             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
1007                 pa_source_output_unref(o);
1008
1009             pa_source_invalidate_requested_latency(s);
1010
1011             return 0;
1012         }
1013
1014         case PA_SOURCE_MESSAGE_SET_VOLUME:
1015             s->thread_info.soft_volume = s->soft_volume;
1016             return 0;
1017
1018         case PA_SOURCE_MESSAGE_GET_VOLUME:
1019             return 0;
1020
1021         case PA_SOURCE_MESSAGE_SET_MUTE:
1022             s->thread_info.soft_muted = s->muted;
1023             return 0;
1024
1025         case PA_SOURCE_MESSAGE_GET_MUTE:
1026             return 0;
1027
1028         case PA_SOURCE_MESSAGE_SET_STATE: {
1029
1030             pa_bool_t suspend_change =
1031                 (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
1032                 (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
1033
1034             s->thread_info.state = PA_PTR_TO_UINT(userdata);
1035
1036             if (suspend_change) {
1037                 pa_source_output *o;
1038                 void *state = NULL;
1039
1040                 while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1041                     if (o->suspend_within_thread)
1042                         o->suspend_within_thread(o, s->thread_info.state == PA_SOURCE_SUSPENDED);
1043             }
1044
1045
1046             return 0;
1047         }
1048
1049         case PA_SOURCE_MESSAGE_DETACH:
1050
1051             /* Detach all streams */
1052             pa_source_detach_within_thread(s);
1053             return 0;
1054
1055         case PA_SOURCE_MESSAGE_ATTACH:
1056
1057             /* Reattach all streams */
1058             pa_source_attach_within_thread(s);
1059             return 0;
1060
1061         case PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY: {
1062
1063             pa_usec_t *usec = userdata;
1064             *usec = pa_source_get_requested_latency_within_thread(s);
1065
1066             if (*usec == (pa_usec_t) -1)
1067                 *usec = s->thread_info.max_latency;
1068
1069             return 0;
1070         }
1071
1072         case PA_SOURCE_MESSAGE_SET_LATENCY_RANGE: {
1073             pa_usec_t *r = userdata;
1074
1075             pa_source_set_latency_range_within_thread(s, r[0], r[1]);
1076
1077             return 0;
1078         }
1079
1080         case PA_SOURCE_MESSAGE_GET_LATENCY_RANGE: {
1081             pa_usec_t *r = userdata;
1082
1083             r[0] = s->thread_info.min_latency;
1084             r[1] = s->thread_info.max_latency;
1085
1086             return 0;
1087         }
1088
1089         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
1090
1091             *((size_t*) userdata) = s->thread_info.max_rewind;
1092             return 0;
1093
1094         case PA_SOURCE_MESSAGE_SET_MAX_REWIND:
1095
1096             pa_source_set_max_rewind_within_thread(s, (size_t) offset);
1097             return 0;
1098
1099         case PA_SOURCE_MESSAGE_GET_LATENCY:
1100
1101             if (s->monitor_of) {
1102                 *((pa_usec_t*) userdata) = 0;
1103                 return 0;
1104             }
1105
1106             /* Implementors need to overwrite this implementation! */
1107             return -1;
1108
1109         case PA_SOURCE_MESSAGE_MAX:
1110             ;
1111     }
1112
1113     return -1;
1114 }
1115
1116 /* Called from main thread */
1117 int pa_source_suspend_all(pa_core *c, pa_bool_t suspend, pa_suspend_cause_t cause) {
1118     uint32_t idx;
1119     pa_source *source;
1120     int ret = 0;
1121
1122     pa_core_assert_ref(c);
1123     pa_assert(cause != 0);
1124
1125     for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) {
1126         int r;
1127
1128         if (source->monitor_of)
1129             continue;
1130
1131         if ((r = pa_source_suspend(source, suspend, cause)) < 0)
1132             ret = r;
1133     }
1134
1135     return ret;
1136 }
1137
1138 /* Called from main thread */
1139 void pa_source_detach(pa_source *s) {
1140     pa_source_assert_ref(s);
1141     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1142
1143     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL) == 0);
1144 }
1145
1146 /* Called from main thread */
1147 void pa_source_attach(pa_source *s) {
1148     pa_source_assert_ref(s);
1149     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1150
1151     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL) == 0);
1152 }
1153
1154 /* Called from IO thread */
1155 void pa_source_detach_within_thread(pa_source *s) {
1156     pa_source_output *o;
1157     void *state = NULL;
1158
1159     pa_source_assert_ref(s);
1160     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1161
1162     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1163         if (o->detach)
1164             o->detach(o);
1165 }
1166
1167 /* Called from IO thread */
1168 void pa_source_attach_within_thread(pa_source *s) {
1169     pa_source_output *o;
1170     void *state = NULL;
1171
1172     pa_source_assert_ref(s);
1173     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1174
1175     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1176         if (o->attach)
1177             o->attach(o);
1178 }
1179
1180 /* Called from IO thread */
1181 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
1182     pa_usec_t result = (pa_usec_t) -1;
1183     pa_source_output *o;
1184     void *state = NULL;
1185
1186     pa_source_assert_ref(s);
1187
1188     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1189         return PA_CLAMP(s->fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
1190
1191     if (s->thread_info.requested_latency_valid)
1192         return s->thread_info.requested_latency;
1193
1194     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1195
1196         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
1197             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
1198             result = o->thread_info.requested_source_latency;
1199
1200     if (result != (pa_usec_t) -1)
1201         result = PA_CLAMP(result, s->thread_info.min_latency, s->thread_info.max_latency);
1202
1203     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1204         /* Only cache this if we are fully set up */
1205         s->thread_info.requested_latency = result;
1206         s->thread_info.requested_latency_valid = TRUE;
1207     }
1208
1209     return result;
1210 }
1211
1212 /* Called from main thread */
1213 pa_usec_t pa_source_get_requested_latency(pa_source *s) {
1214     pa_usec_t usec = 0;
1215
1216     pa_source_assert_ref(s);
1217     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1218
1219     if (s->state == PA_SOURCE_SUSPENDED)
1220         return 0;
1221
1222     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0);
1223
1224     return usec;
1225 }
1226
1227 /* Called from IO thread */
1228 void pa_source_set_max_rewind_within_thread(pa_source *s, size_t max_rewind) {
1229     pa_source_output *o;
1230     void *state = NULL;
1231
1232     pa_source_assert_ref(s);
1233
1234     if (max_rewind == s->thread_info.max_rewind)
1235         return;
1236
1237     s->thread_info.max_rewind = max_rewind;
1238
1239     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1240         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1241             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1242     }
1243 }
1244
1245 /* Called from main thread */
1246 void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
1247     pa_source_assert_ref(s);
1248
1249     if (PA_SOURCE_IS_LINKED(s->state))
1250         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MAX_REWIND, NULL, max_rewind, NULL) == 0);
1251     else
1252         pa_source_set_max_rewind_within_thread(s, max_rewind);
1253 }
1254
1255 /* Called from IO thread */
1256 void pa_source_invalidate_requested_latency(pa_source *s) {
1257     pa_source_output *o;
1258     void *state = NULL;
1259
1260     pa_source_assert_ref(s);
1261
1262     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1263         return;
1264
1265     s->thread_info.requested_latency_valid = FALSE;
1266
1267     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1268
1269         if (s->update_requested_latency)
1270             s->update_requested_latency(s);
1271
1272         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1273             if (o->update_source_requested_latency)
1274                 o->update_source_requested_latency(o);
1275     }
1276
1277     if (s->monitor_of)
1278         pa_sink_invalidate_requested_latency(s->monitor_of);
1279 }
1280
1281 /* Called from main thread */
1282 void pa_source_set_latency_range(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1283     pa_source_assert_ref(s);
1284
1285     /* min_latency == 0:           no limit
1286      * min_latency anything else:  specified limit
1287      *
1288      * Similar for max_latency */
1289
1290     if (min_latency < ABSOLUTE_MIN_LATENCY)
1291         min_latency = ABSOLUTE_MIN_LATENCY;
1292
1293     if (max_latency <= 0 ||
1294         max_latency > ABSOLUTE_MAX_LATENCY)
1295         max_latency = ABSOLUTE_MAX_LATENCY;
1296
1297     pa_assert(min_latency <= max_latency);
1298
1299     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1300     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1301                max_latency == ABSOLUTE_MAX_LATENCY) ||
1302               (s->flags & PA_SOURCE_DYNAMIC_LATENCY));
1303
1304     if (PA_SOURCE_IS_LINKED(s->state)) {
1305         pa_usec_t r[2];
1306
1307         r[0] = min_latency;
1308         r[1] = max_latency;
1309
1310         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_LATENCY_RANGE, r, 0, NULL) == 0);
1311     } else
1312         pa_source_set_latency_range_within_thread(s, min_latency, max_latency);
1313 }
1314
1315 /* Called from main thread */
1316 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency) {
1317    pa_source_assert_ref(s);
1318    pa_assert(min_latency);
1319    pa_assert(max_latency);
1320
1321    if (PA_SOURCE_IS_LINKED(s->state)) {
1322        pa_usec_t r[2] = { 0, 0 };
1323
1324        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY_RANGE, r, 0, NULL) == 0);
1325
1326        *min_latency = r[0];
1327        *max_latency = r[1];
1328    } else {
1329        *min_latency = s->thread_info.min_latency;
1330        *max_latency = s->thread_info.max_latency;
1331    }
1332 }
1333
1334 /* Called from IO thread, and from main thread before pa_source_put() is called */
1335 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1336     void *state = NULL;
1337
1338     pa_source_assert_ref(s);
1339
1340     pa_assert(min_latency >= ABSOLUTE_MIN_LATENCY);
1341     pa_assert(max_latency <= ABSOLUTE_MAX_LATENCY);
1342     pa_assert(min_latency <= max_latency);
1343
1344     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1345     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1346                max_latency == ABSOLUTE_MAX_LATENCY) ||
1347               (s->flags & PA_SOURCE_DYNAMIC_LATENCY) ||
1348               s->monitor_of);
1349
1350     s->thread_info.min_latency = min_latency;
1351     s->thread_info.max_latency = max_latency;
1352
1353     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1354         pa_source_output *o;
1355
1356         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1357             if (o->update_source_latency_range)
1358                 o->update_source_latency_range(o);
1359     }
1360
1361     pa_source_invalidate_requested_latency(s);
1362 }
1363
1364 /* Called from main thread, before the source is put */
1365 void pa_source_set_fixed_latency(pa_source *s, pa_usec_t latency) {
1366     pa_source_assert_ref(s);
1367
1368     pa_assert(pa_source_get_state(s) == PA_SOURCE_INIT);
1369
1370     if (latency < ABSOLUTE_MIN_LATENCY)
1371         latency = ABSOLUTE_MIN_LATENCY;
1372
1373     if (latency > ABSOLUTE_MAX_LATENCY)
1374         latency = ABSOLUTE_MAX_LATENCY;
1375
1376     s->fixed_latency = latency;
1377 }
1378
1379 /* Called from main thread */
1380 size_t pa_source_get_max_rewind(pa_source *s) {
1381     size_t r;
1382     pa_source_assert_ref(s);
1383
1384     if (!PA_SOURCE_IS_LINKED(s->state))
1385         return s->thread_info.max_rewind;
1386
1387     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MAX_REWIND, &r, 0, NULL) == 0);
1388
1389     return r;
1390 }
1391
1392 /* Called from main context */
1393 int pa_source_set_port(pa_source *s, const char *name, pa_bool_t save) {
1394     pa_device_port *port;
1395
1396     pa_assert(s);
1397
1398     if (!s->set_port) {
1399         pa_log_debug("set_port() operation not implemented for sink %u \"%s\"", s->index, s->name);
1400         return -PA_ERR_NOTIMPLEMENTED;
1401     }
1402
1403     if (!s->ports)
1404         return -PA_ERR_NOENTITY;
1405
1406     if (!(port = pa_hashmap_get(s->ports, name)))
1407         return -PA_ERR_NOENTITY;
1408
1409     if (s->active_port == port) {
1410         s->save_port = s->save_port || save;
1411         return 0;
1412     }
1413
1414     if ((s->set_port(s, port)) < 0)
1415         return -PA_ERR_NOENTITY;
1416
1417     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
1418
1419     pa_log_info("Changed port of source %u \"%s\" to %s", s->index, s->name, port->name);
1420
1421     s->active_port = port;
1422     s->save_port = save;
1423
1424     return 0;
1425 }