core: introduce pa_{sink_input|source_output}_fail_move()
[platform/upstream/pulseaudio.git] / src / pulsecore / source.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <pulse/utf8.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/timeval.h>
34 #include <pulse/util.h>
35
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/source-output.h>
38 #include <pulsecore/namereg.h>
39 #include <pulsecore/core-subscribe.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/sample-util.h>
42
43 #include "source.h"
44
45 #define ABSOLUTE_MIN_LATENCY (500)
46 #define ABSOLUTE_MAX_LATENCY (10*PA_USEC_PER_SEC)
47 #define DEFAULT_FIXED_LATENCY (250*PA_USEC_PER_MSEC)
48
49 static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
50
51 static void source_free(pa_object *o);
52
53 pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) {
54     pa_assert(data);
55
56     pa_zero(*data);
57     data->proplist = pa_proplist_new();
58
59     return data;
60 }
61
62 void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) {
63     pa_assert(data);
64
65     pa_xfree(data->name);
66     data->name = pa_xstrdup(name);
67 }
68
69 void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) {
70     pa_assert(data);
71
72     if ((data->sample_spec_is_set = !!spec))
73         data->sample_spec = *spec;
74 }
75
76 void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) {
77     pa_assert(data);
78
79     if ((data->channel_map_is_set = !!map))
80         data->channel_map = *map;
81 }
82
83 void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) {
84     pa_assert(data);
85
86     if ((data->volume_is_set = !!volume))
87         data->volume = *volume;
88 }
89
90 void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) {
91     pa_assert(data);
92
93     data->muted_is_set = TRUE;
94     data->muted = !!mute;
95 }
96
97 void pa_source_new_data_set_port(pa_source_new_data *data, const char *port) {
98     pa_assert(data);
99
100     pa_xfree(data->active_port);
101     data->active_port = pa_xstrdup(port);
102 }
103
104 void pa_source_new_data_done(pa_source_new_data *data) {
105     pa_assert(data);
106
107     pa_proplist_free(data->proplist);
108
109     if (data->ports) {
110         pa_device_port *p;
111
112         while ((p = pa_hashmap_steal_first(data->ports)))
113             pa_device_port_free(p);
114
115         pa_hashmap_free(data->ports, NULL, NULL);
116     }
117
118     pa_xfree(data->name);
119     pa_xfree(data->active_port);
120 }
121
122 /* Called from main context */
123 static void reset_callbacks(pa_source *s) {
124     pa_assert(s);
125
126     s->set_state = NULL;
127     s->get_volume = NULL;
128     s->set_volume = NULL;
129     s->get_mute = NULL;
130     s->set_mute = NULL;
131     s->update_requested_latency = NULL;
132     s->set_port = NULL;
133 }
134
135 /* Called from main context */
136 pa_source* pa_source_new(
137         pa_core *core,
138         pa_source_new_data *data,
139         pa_source_flags_t flags) {
140
141     pa_source *s;
142     const char *name;
143     char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
144     char *pt;
145
146     pa_assert(core);
147     pa_assert(data);
148     pa_assert(data->name);
149     pa_assert_ctl_context();
150
151     s = pa_msgobject_new(pa_source);
152
153     if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) {
154         pa_log_debug("Failed to register name %s.", data->name);
155         pa_xfree(s);
156         return NULL;
157     }
158
159     pa_source_new_data_set_name(data, name);
160
161     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) {
162         pa_xfree(s);
163         pa_namereg_unregister(core, name);
164         return NULL;
165     }
166
167     /* FIXME, need to free s here on failure */
168
169     pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver));
170     pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]);
171
172     pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec));
173
174     if (!data->channel_map_is_set)
175         pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT));
176
177     pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map));
178     pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels);
179
180     if (!data->volume_is_set)
181         pa_cvolume_reset(&data->volume, data->sample_spec.channels);
182
183     pa_return_null_if_fail(pa_cvolume_valid(&data->volume));
184     pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels);
185
186     if (!data->muted_is_set)
187         data->muted = FALSE;
188
189     if (data->card)
190         pa_proplist_update(data->proplist, PA_UPDATE_MERGE, data->card->proplist);
191
192     pa_device_init_description(data->proplist);
193     pa_device_init_icon(data->proplist, FALSE);
194     pa_device_init_intended_roles(data->proplist);
195
196     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) {
197         pa_xfree(s);
198         pa_namereg_unregister(core, name);
199         return NULL;
200     }
201
202     s->parent.parent.free = source_free;
203     s->parent.process_msg = pa_source_process_msg;
204
205     s->core = core;
206     s->state = PA_SOURCE_INIT;
207     s->flags = flags;
208     s->suspend_cause = 0;
209     s->name = pa_xstrdup(name);
210     s->proplist = pa_proplist_copy(data->proplist);
211     s->driver = pa_xstrdup(pa_path_get_filename(data->driver));
212     s->module = data->module;
213     s->card = data->card;
214
215     s->sample_spec = data->sample_spec;
216     s->channel_map = data->channel_map;
217
218     s->outputs = pa_idxset_new(NULL, NULL);
219     s->n_corked = 0;
220     s->monitor_of = NULL;
221
222     s->virtual_volume = data->volume;
223     pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
224     s->base_volume = PA_VOLUME_NORM;
225     s->n_volume_steps = PA_VOLUME_NORM+1;
226     s->muted = data->muted;
227     s->refresh_volume = s->refresh_muted = FALSE;
228
229     s->fixed_latency = flags & PA_SOURCE_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
230
231     reset_callbacks(s);
232     s->userdata = NULL;
233
234     s->asyncmsgq = NULL;
235     s->rtpoll = NULL;
236
237     /* As a minor optimization we just steal the list instead of
238      * copying it here */
239     s->ports = data->ports;
240     data->ports = NULL;
241
242     s->active_port = NULL;
243     s->save_port = FALSE;
244
245     if (data->active_port && s->ports)
246         if ((s->active_port = pa_hashmap_get(s->ports, data->active_port)))
247             s->save_port = data->save_port;
248
249     if (!s->active_port && s->ports) {
250         void *state;
251         pa_device_port *p;
252
253         PA_HASHMAP_FOREACH(p, s->ports, state)
254             if (!s->active_port || p->priority > s->active_port->priority)
255                 s->active_port = p;
256     }
257
258     s->save_volume = data->save_volume;
259     s->save_muted = data->save_muted;
260
261     pa_silence_memchunk_get(
262             &core->silence_cache,
263             core->mempool,
264             &s->silence,
265             &s->sample_spec,
266             0);
267
268     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
269     s->thread_info.soft_volume = s->soft_volume;
270     s->thread_info.soft_muted = s->muted;
271     s->thread_info.state = s->state;
272     s->thread_info.max_rewind = 0;
273     s->thread_info.requested_latency_valid = FALSE;
274     s->thread_info.requested_latency = 0;
275     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
276     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
277
278     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
279
280     if (s->card)
281         pa_assert_se(pa_idxset_put(s->card->sources, s, NULL) >= 0);
282
283     pt = pa_proplist_to_string_sep(s->proplist, "\n    ");
284     pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s\n    %s",
285                 s->index,
286                 s->name,
287                 pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec),
288                 pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map),
289                 pt);
290     pa_xfree(pt);
291
292     return s;
293 }
294
295 /* Called from main context */
296 static int source_set_state(pa_source *s, pa_source_state_t state) {
297     int ret;
298     pa_bool_t suspend_change;
299     pa_source_state_t original_state;
300
301     pa_assert(s);
302     pa_assert_ctl_context();
303
304     if (s->state == state)
305         return 0;
306
307     original_state = s->state;
308
309     suspend_change =
310         (original_state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(state)) ||
311         (PA_SOURCE_IS_OPENED(original_state) && state == PA_SOURCE_SUSPENDED);
312
313     if (s->set_state)
314         if ((ret = s->set_state(s, state)) < 0)
315             return ret;
316
317     if (s->asyncmsgq)
318         if ((ret = pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL)) < 0) {
319
320             if (s->set_state)
321                 s->set_state(s, original_state);
322
323             return ret;
324         }
325
326     s->state = state;
327
328     if (state != PA_SOURCE_UNLINKED) { /* if we enter UNLINKED state pa_source_unlink() will fire the apropriate events */
329         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_STATE_CHANGED], s);
330         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
331     }
332
333     if (suspend_change) {
334         pa_source_output *o;
335         uint32_t idx;
336
337         /* We're suspending or resuming, tell everyone about it */
338
339         for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx)))
340             if (s->state == PA_SOURCE_SUSPENDED &&
341                 (o->flags & PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND))
342                 pa_source_output_kill(o);
343             else if (o->suspend)
344                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
345     }
346
347
348     return 0;
349 }
350
351 /* Called from main context */
352 void pa_source_put(pa_source *s) {
353     pa_source_assert_ref(s);
354     pa_assert_ctl_context();
355
356     pa_assert(s->state == PA_SOURCE_INIT);
357
358     /* The following fields must be initialized properly when calling _put() */
359     pa_assert(s->asyncmsgq);
360     pa_assert(s->rtpoll);
361     pa_assert(s->thread_info.min_latency <= s->thread_info.max_latency);
362
363     /* Generally, flags should be initialized via pa_source_new(). As
364      * a special exception we allow volume related flags to be set
365      * between _new() and _put(). */
366
367     if (!(s->flags & PA_SOURCE_HW_VOLUME_CTRL))
368         s->flags |= PA_SOURCE_DECIBEL_VOLUME;
369
370     s->thread_info.soft_volume = s->soft_volume;
371     s->thread_info.soft_muted = s->muted;
372
373     pa_assert((s->flags & PA_SOURCE_HW_VOLUME_CTRL) || (s->base_volume == PA_VOLUME_NORM && s->flags & PA_SOURCE_DECIBEL_VOLUME));
374     pa_assert(!(s->flags & PA_SOURCE_DECIBEL_VOLUME) || s->n_volume_steps == PA_VOLUME_NORM+1);
375     pa_assert(!(s->flags & PA_SOURCE_DYNAMIC_LATENCY) == (s->fixed_latency != 0));
376
377     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
378
379     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
380     pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s);
381 }
382
383 /* Called from main context */
384 void pa_source_unlink(pa_source *s) {
385     pa_bool_t linked;
386     pa_source_output *o, *j = NULL;
387
388     pa_assert(s);
389     pa_assert_ctl_context();
390
391     /* See pa_sink_unlink() for a couple of comments how this function
392      * works. */
393
394     linked = PA_SOURCE_IS_LINKED(s->state);
395
396     if (linked)
397         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s);
398
399     if (s->state != PA_SOURCE_UNLINKED)
400         pa_namereg_unregister(s->core, s->name);
401     pa_idxset_remove_by_data(s->core->sources, s, NULL);
402
403     if (s->card)
404         pa_idxset_remove_by_data(s->card->sources, s, NULL);
405
406     while ((o = pa_idxset_first(s->outputs, NULL))) {
407         pa_assert(o != j);
408         pa_source_output_kill(o);
409         j = o;
410     }
411
412     if (linked)
413         source_set_state(s, PA_SOURCE_UNLINKED);
414     else
415         s->state = PA_SOURCE_UNLINKED;
416
417     reset_callbacks(s);
418
419     if (linked) {
420         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
421         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s);
422     }
423 }
424
425 /* Called from main context */
426 static void source_free(pa_object *o) {
427     pa_source_output *so;
428     pa_source *s = PA_SOURCE(o);
429
430     pa_assert(s);
431     pa_assert_ctl_context();
432     pa_assert(pa_source_refcnt(s) == 0);
433
434     if (PA_SOURCE_IS_LINKED(s->state))
435         pa_source_unlink(s);
436
437     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
438
439     pa_idxset_free(s->outputs, NULL, NULL);
440
441     while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
442         pa_source_output_unref(so);
443
444     pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
445
446     if (s->silence.memblock)
447         pa_memblock_unref(s->silence.memblock);
448
449     pa_xfree(s->name);
450     pa_xfree(s->driver);
451
452     if (s->proplist)
453         pa_proplist_free(s->proplist);
454
455     if (s->ports) {
456         pa_device_port *p;
457
458         while ((p = pa_hashmap_steal_first(s->ports)))
459             pa_device_port_free(p);
460
461         pa_hashmap_free(s->ports, NULL, NULL);
462     }
463
464     pa_xfree(s);
465 }
466
467 /* Called from main context */
468 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
469     pa_assert_ctl_context();
470     pa_source_assert_ref(s);
471
472     s->asyncmsgq = q;
473 }
474
475 /* Called from main context */
476 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
477     pa_assert_ctl_context();
478     pa_source_assert_ref(s);
479
480     s->rtpoll = p;
481 }
482
483 /* Called from main context */
484 int pa_source_update_status(pa_source*s) {
485     pa_source_assert_ref(s);
486     pa_assert_ctl_context();
487     pa_assert(PA_SOURCE_IS_LINKED(s->state));
488
489     if (s->state == PA_SOURCE_SUSPENDED)
490         return 0;
491
492     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
493 }
494
495 /* Called from main context */
496 int pa_source_suspend(pa_source *s, pa_bool_t suspend, pa_suspend_cause_t cause) {
497     pa_source_assert_ref(s);
498     pa_assert_ctl_context();
499     pa_assert(PA_SOURCE_IS_LINKED(s->state));
500     pa_assert(cause != 0);
501
502     if (s->monitor_of)
503         return -PA_ERR_NOTSUPPORTED;
504
505     if (suspend)
506         s->suspend_cause |= cause;
507     else
508         s->suspend_cause &= ~cause;
509
510     if ((pa_source_get_state(s) == PA_SOURCE_SUSPENDED) == !!s->suspend_cause)
511         return 0;
512
513     pa_log_debug("Suspend cause of source %s is 0x%04x, %s", s->name, s->suspend_cause, s->suspend_cause ? "suspending" : "resuming");
514
515     if (suspend)
516         return source_set_state(s, PA_SOURCE_SUSPENDED);
517     else
518         return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
519 }
520
521 /* Called from main context */
522 int pa_source_sync_suspend(pa_source *s) {
523     pa_sink_state_t state;
524
525     pa_source_assert_ref(s);
526     pa_assert_ctl_context();
527     pa_assert(PA_SOURCE_IS_LINKED(s->state));
528     pa_assert(s->monitor_of);
529
530     state = pa_sink_get_state(s->monitor_of);
531
532     if (state == PA_SINK_SUSPENDED)
533         return source_set_state(s, PA_SOURCE_SUSPENDED);
534
535     pa_assert(PA_SINK_IS_OPENED(state));
536
537     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
538 }
539
540 /* Called from main context */
541 pa_queue *pa_source_move_all_start(pa_source *s, pa_queue *q) {
542     pa_source_output *o, *n;
543     uint32_t idx;
544
545     pa_source_assert_ref(s);
546     pa_assert_ctl_context();
547     pa_assert(PA_SOURCE_IS_LINKED(s->state));
548
549     if (!q)
550         q = pa_queue_new();
551
552     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = n) {
553         n = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx));
554
555         pa_source_output_ref(o);
556
557         if (pa_source_output_start_move(o) >= 0)
558             pa_queue_push(q, o);
559         else
560             pa_source_output_unref(o);
561     }
562
563     return q;
564 }
565
566 /* Called from main context */
567 void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
568     pa_source_output *o;
569
570     pa_source_assert_ref(s);
571     pa_assert_ctl_context();
572     pa_assert(PA_SOURCE_IS_LINKED(s->state));
573     pa_assert(q);
574
575     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
576         if (pa_source_output_finish_move(o, s, save) < 0)
577             pa_source_output_fail_move(o);
578
579         pa_source_output_unref(o);
580     }
581
582     pa_queue_free(q, NULL, NULL);
583 }
584
585 /* Called from main context */
586 void pa_source_move_all_fail(pa_queue *q) {
587     pa_source_output *o;
588
589     pa_assert_ctl_context();
590     pa_assert(q);
591
592     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
593         pa_source_output_fail_move(o);
594         pa_source_output_unref(o);
595     }
596
597     pa_queue_free(q, NULL, NULL);
598 }
599
600 /* Called from IO thread context */
601 void pa_source_process_rewind(pa_source *s, size_t nbytes) {
602     pa_source_output *o;
603     void *state = NULL;
604
605     pa_source_assert_ref(s);
606     pa_source_assert_io_context(s);
607     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
608
609     if (nbytes <= 0)
610         return;
611
612     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
613         return;
614
615     pa_log_debug("Processing rewind...");
616
617     PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state) {
618         pa_source_output_assert_ref(o);
619         pa_source_output_process_rewind(o, nbytes);
620     }
621 }
622
623 /* Called from IO thread context */
624 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
625     pa_source_output *o;
626     void *state = NULL;
627
628     pa_source_assert_ref(s);
629     pa_source_assert_io_context(s);
630     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
631     pa_assert(chunk);
632
633     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
634         return;
635
636     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
637         pa_memchunk vchunk = *chunk;
638
639         pa_memblock_ref(vchunk.memblock);
640         pa_memchunk_make_writable(&vchunk, 0);
641
642         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
643             pa_silence_memchunk(&vchunk, &s->sample_spec);
644         else
645             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
646
647         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
648             pa_source_output_assert_ref(o);
649
650             if (!o->thread_info.direct_on_input)
651                 pa_source_output_push(o, &vchunk);
652         }
653
654         pa_memblock_unref(vchunk.memblock);
655     } else {
656
657         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
658             pa_source_output_assert_ref(o);
659
660             if (!o->thread_info.direct_on_input)
661                 pa_source_output_push(o, chunk);
662         }
663     }
664 }
665
666 /* Called from IO thread context */
667 void pa_source_post_direct(pa_source*s, pa_source_output *o, const pa_memchunk *chunk) {
668     pa_source_assert_ref(s);
669     pa_source_assert_io_context(s);
670     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
671     pa_source_output_assert_ref(o);
672     pa_assert(o->thread_info.direct_on_input);
673     pa_assert(chunk);
674
675     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
676         return;
677
678     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
679         pa_memchunk vchunk = *chunk;
680
681         pa_memblock_ref(vchunk.memblock);
682         pa_memchunk_make_writable(&vchunk, 0);
683
684         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
685             pa_silence_memchunk(&vchunk, &s->sample_spec);
686         else
687             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
688
689         pa_source_output_push(o, &vchunk);
690
691         pa_memblock_unref(vchunk.memblock);
692     } else
693         pa_source_output_push(o, chunk);
694 }
695
696 /* Called from main thread */
697 pa_usec_t pa_source_get_latency(pa_source *s) {
698     pa_usec_t usec;
699
700     pa_source_assert_ref(s);
701     pa_assert_ctl_context();
702     pa_assert(PA_SOURCE_IS_LINKED(s->state));
703
704     if (s->state == PA_SOURCE_SUSPENDED)
705         return 0;
706
707     if (!(s->flags & PA_SOURCE_LATENCY))
708         return 0;
709
710     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) == 0);
711
712     return usec;
713 }
714
715 /* Called from IO thread */
716 pa_usec_t pa_source_get_latency_within_thread(pa_source *s) {
717     pa_usec_t usec = 0;
718     pa_msgobject *o;
719
720     pa_source_assert_ref(s);
721     pa_source_assert_io_context(s);
722     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
723
724     /* The returned value is supposed to be in the time domain of the sound card! */
725
726     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
727         return 0;
728
729     if (!(s->flags & PA_SOURCE_LATENCY))
730         return 0;
731
732     o = PA_MSGOBJECT(s);
733
734     /* We probably should make this a proper vtable callback instead of going through process_msg() */
735
736     if (o->process_msg(o, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
737         return -1;
738
739     return usec;
740 }
741
742 /* Called from main thread */
743 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume, pa_bool_t save) {
744     pa_cvolume old_virtual_volume;
745     pa_bool_t virtual_volume_changed;
746
747     pa_source_assert_ref(s);
748     pa_assert_ctl_context();
749     pa_assert(PA_SOURCE_IS_LINKED(s->state));
750     pa_assert(volume);
751     pa_assert(pa_cvolume_valid(volume));
752     pa_assert(pa_cvolume_compatible(volume, &s->sample_spec));
753
754     old_virtual_volume = s->virtual_volume;
755     s->virtual_volume = *volume;
756     virtual_volume_changed = !pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume);
757     s->save_volume = (!virtual_volume_changed && s->save_volume) || save;
758
759     if (s->set_volume) {
760         pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
761         s->set_volume(s);
762     } else
763         s->soft_volume = s->virtual_volume;
764
765     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
766
767     if (virtual_volume_changed)
768         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
769 }
770
771 /* Called from main thread. Only to be called by source implementor */
772 void pa_source_set_soft_volume(pa_source *s, const pa_cvolume *volume) {
773     pa_source_assert_ref(s);
774     pa_assert_ctl_context();
775     pa_assert(volume);
776
777     if (PA_SOURCE_IS_LINKED(s->state))
778         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
779     else
780         s->thread_info.soft_volume = *volume;
781 }
782
783 /* Called from main thread */
784 const pa_cvolume *pa_source_get_volume(pa_source *s, pa_bool_t force_refresh) {
785     pa_source_assert_ref(s);
786     pa_assert_ctl_context();
787     pa_assert(PA_SOURCE_IS_LINKED(s->state));
788
789     if (s->refresh_volume || force_refresh) {
790         pa_cvolume old_virtual_volume = s->virtual_volume;
791
792         if (s->get_volume)
793             s->get_volume(s);
794
795         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, NULL, 0, NULL) == 0);
796
797         if (!pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume)) {
798             s->save_volume = TRUE;
799             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
800         }
801     }
802
803     return &s->virtual_volume;
804 }
805
806 /* Called from main thread */
807 void pa_source_volume_changed(pa_source *s, const pa_cvolume *new_volume) {
808     pa_source_assert_ref(s);
809     pa_assert_ctl_context();
810     pa_assert(PA_SOURCE_IS_LINKED(s->state));
811
812     /* The source implementor may call this if the volume changed to make sure everyone is notified */
813
814     if (pa_cvolume_equal(&s->virtual_volume, new_volume))
815         return;
816
817     s->virtual_volume = *new_volume;
818     s->save_volume = TRUE;
819
820     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
821 }
822
823 /* Called from main thread */
824 void pa_source_set_mute(pa_source *s, pa_bool_t mute, pa_bool_t save) {
825     pa_bool_t old_muted;
826
827     pa_source_assert_ref(s);
828     pa_assert_ctl_context();
829     pa_assert(PA_SOURCE_IS_LINKED(s->state));
830
831     old_muted = s->muted;
832     s->muted = mute;
833     s->save_muted = (old_muted == s->muted && s->save_muted) || save;
834
835     if (s->set_mute)
836         s->set_mute(s);
837
838     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
839
840     if (old_muted != s->muted)
841         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
842 }
843
844 /* Called from main thread */
845 pa_bool_t pa_source_get_mute(pa_source *s, pa_bool_t force_refresh) {
846     pa_source_assert_ref(s);
847     pa_assert_ctl_context();
848     pa_assert(PA_SOURCE_IS_LINKED(s->state));
849
850     if (s->refresh_muted || force_refresh) {
851         pa_bool_t old_muted = s->muted;
852
853         if (s->get_mute)
854             s->get_mute(s);
855
856         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, NULL, 0, NULL) == 0);
857
858         if (old_muted != s->muted) {
859             s->save_muted = TRUE;
860
861             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
862
863             /* Make sure the soft mute status stays in sync */
864             pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
865         }
866     }
867
868     return s->muted;
869 }
870
871 /* Called from main thread */
872 void pa_source_mute_changed(pa_source *s, pa_bool_t new_muted) {
873     pa_source_assert_ref(s);
874     pa_assert_ctl_context();
875     pa_assert(PA_SOURCE_IS_LINKED(s->state));
876
877     /* The source implementor may call this if the mute state changed to make sure everyone is notified */
878
879     if (s->muted == new_muted)
880         return;
881
882     s->muted = new_muted;
883     s->save_muted = TRUE;
884
885     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
886 }
887
888 /* Called from main thread */
889 pa_bool_t pa_source_update_proplist(pa_source *s, pa_update_mode_t mode, pa_proplist *p) {
890     pa_source_assert_ref(s);
891     pa_assert_ctl_context();
892
893     if (p)
894         pa_proplist_update(s->proplist, mode, p);
895
896     if (PA_SOURCE_IS_LINKED(s->state)) {
897         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
898         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
899     }
900
901     return TRUE;
902 }
903
904 /* Called from main thread */
905 /* FIXME -- this should be dropped and be merged into pa_source_update_proplist() */
906 void pa_source_set_description(pa_source *s, const char *description) {
907     const char *old;
908     pa_source_assert_ref(s);
909     pa_assert_ctl_context();
910
911     if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION))
912         return;
913
914     old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
915
916     if (old && description && pa_streq(old, description))
917         return;
918
919     if (description)
920         pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description);
921     else
922         pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
923
924     if (PA_SOURCE_IS_LINKED(s->state)) {
925         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
926         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
927     }
928 }
929
930 /* Called from main thread */
931 unsigned pa_source_linked_by(pa_source *s) {
932     pa_source_assert_ref(s);
933     pa_assert(PA_SOURCE_IS_LINKED(s->state));
934     pa_assert_ctl_context();
935
936     return pa_idxset_size(s->outputs);
937 }
938
939 /* Called from main thread */
940 unsigned pa_source_used_by(pa_source *s) {
941     unsigned ret;
942
943     pa_source_assert_ref(s);
944     pa_assert(PA_SOURCE_IS_LINKED(s->state));
945     pa_assert_ctl_context();
946
947     ret = pa_idxset_size(s->outputs);
948     pa_assert(ret >= s->n_corked);
949
950     return ret - s->n_corked;
951 }
952
953 /* Called from main thread */
954 unsigned pa_source_check_suspend(pa_source *s) {
955     unsigned ret;
956     pa_source_output *o;
957     uint32_t idx;
958
959     pa_source_assert_ref(s);
960     pa_assert_ctl_context();
961
962     if (!PA_SOURCE_IS_LINKED(s->state))
963         return 0;
964
965     ret = 0;
966
967     PA_IDXSET_FOREACH(o, s->outputs, idx) {
968         pa_source_output_state_t st;
969
970         st = pa_source_output_get_state(o);
971         pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(st));
972
973         if (st == PA_SOURCE_OUTPUT_CORKED)
974             continue;
975
976         if (o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND)
977             continue;
978
979         ret ++;
980     }
981
982     return ret;
983 }
984
985 /* Called from IO thread, except when it is not */
986 int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
987     pa_source *s = PA_SOURCE(object);
988     pa_source_assert_ref(s);
989
990     switch ((pa_source_message_t) code) {
991
992         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
993             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
994
995             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
996
997             if (o->direct_on_input) {
998                 o->thread_info.direct_on_input = o->direct_on_input;
999                 pa_hashmap_put(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index), o);
1000             }
1001
1002             pa_assert(!o->thread_info.attached);
1003             o->thread_info.attached = TRUE;
1004
1005             if (o->attach)
1006                 o->attach(o);
1007
1008             pa_source_output_set_state_within_thread(o, o->state);
1009
1010             if (o->thread_info.requested_source_latency != (pa_usec_t) -1)
1011                 pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
1012
1013             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1014
1015             /* We don't just invalidate the requested latency here,
1016              * because if we are in a move we might need to fix up the
1017              * requested latency. */
1018             pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
1019
1020             return 0;
1021         }
1022
1023         case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
1024             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
1025
1026             pa_source_output_set_state_within_thread(o, o->state);
1027
1028             if (o->detach)
1029                 o->detach(o);
1030
1031             pa_assert(o->thread_info.attached);
1032             o->thread_info.attached = FALSE;
1033
1034             if (o->thread_info.direct_on_input) {
1035                 pa_hashmap_remove(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index));
1036                 o->thread_info.direct_on_input = NULL;
1037             }
1038
1039             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
1040                 pa_source_output_unref(o);
1041
1042             pa_source_invalidate_requested_latency(s);
1043
1044             return 0;
1045         }
1046
1047         case PA_SOURCE_MESSAGE_SET_VOLUME:
1048             s->thread_info.soft_volume = s->soft_volume;
1049             return 0;
1050
1051         case PA_SOURCE_MESSAGE_GET_VOLUME:
1052             return 0;
1053
1054         case PA_SOURCE_MESSAGE_SET_MUTE:
1055             s->thread_info.soft_muted = s->muted;
1056             return 0;
1057
1058         case PA_SOURCE_MESSAGE_GET_MUTE:
1059             return 0;
1060
1061         case PA_SOURCE_MESSAGE_SET_STATE: {
1062
1063             pa_bool_t suspend_change =
1064                 (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
1065                 (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
1066
1067             s->thread_info.state = PA_PTR_TO_UINT(userdata);
1068
1069             if (suspend_change) {
1070                 pa_source_output *o;
1071                 void *state = NULL;
1072
1073                 while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1074                     if (o->suspend_within_thread)
1075                         o->suspend_within_thread(o, s->thread_info.state == PA_SOURCE_SUSPENDED);
1076             }
1077
1078
1079             return 0;
1080         }
1081
1082         case PA_SOURCE_MESSAGE_DETACH:
1083
1084             /* Detach all streams */
1085             pa_source_detach_within_thread(s);
1086             return 0;
1087
1088         case PA_SOURCE_MESSAGE_ATTACH:
1089
1090             /* Reattach all streams */
1091             pa_source_attach_within_thread(s);
1092             return 0;
1093
1094         case PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY: {
1095
1096             pa_usec_t *usec = userdata;
1097             *usec = pa_source_get_requested_latency_within_thread(s);
1098
1099             if (*usec == (pa_usec_t) -1)
1100                 *usec = s->thread_info.max_latency;
1101
1102             return 0;
1103         }
1104
1105         case PA_SOURCE_MESSAGE_SET_LATENCY_RANGE: {
1106             pa_usec_t *r = userdata;
1107
1108             pa_source_set_latency_range_within_thread(s, r[0], r[1]);
1109
1110             return 0;
1111         }
1112
1113         case PA_SOURCE_MESSAGE_GET_LATENCY_RANGE: {
1114             pa_usec_t *r = userdata;
1115
1116             r[0] = s->thread_info.min_latency;
1117             r[1] = s->thread_info.max_latency;
1118
1119             return 0;
1120         }
1121
1122         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
1123
1124             *((size_t*) userdata) = s->thread_info.max_rewind;
1125             return 0;
1126
1127         case PA_SOURCE_MESSAGE_SET_MAX_REWIND:
1128
1129             pa_source_set_max_rewind_within_thread(s, (size_t) offset);
1130             return 0;
1131
1132         case PA_SOURCE_MESSAGE_GET_LATENCY:
1133
1134             if (s->monitor_of) {
1135                 *((pa_usec_t*) userdata) = 0;
1136                 return 0;
1137             }
1138
1139             /* Implementors need to overwrite this implementation! */
1140             return -1;
1141
1142         case PA_SOURCE_MESSAGE_MAX:
1143             ;
1144     }
1145
1146     return -1;
1147 }
1148
1149 /* Called from main thread */
1150 int pa_source_suspend_all(pa_core *c, pa_bool_t suspend, pa_suspend_cause_t cause) {
1151     uint32_t idx;
1152     pa_source *source;
1153     int ret = 0;
1154
1155     pa_core_assert_ref(c);
1156     pa_assert_ctl_context();
1157     pa_assert(cause != 0);
1158
1159     for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) {
1160         int r;
1161
1162         if (source->monitor_of)
1163             continue;
1164
1165         if ((r = pa_source_suspend(source, suspend, cause)) < 0)
1166             ret = r;
1167     }
1168
1169     return ret;
1170 }
1171
1172 /* Called from main thread */
1173 void pa_source_detach(pa_source *s) {
1174     pa_source_assert_ref(s);
1175     pa_assert_ctl_context();
1176     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1177
1178     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL) == 0);
1179 }
1180
1181 /* Called from main thread */
1182 void pa_source_attach(pa_source *s) {
1183     pa_source_assert_ref(s);
1184     pa_assert_ctl_context();
1185     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1186
1187     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL) == 0);
1188 }
1189
1190 /* Called from IO thread */
1191 void pa_source_detach_within_thread(pa_source *s) {
1192     pa_source_output *o;
1193     void *state = NULL;
1194
1195     pa_source_assert_ref(s);
1196     pa_source_assert_io_context(s);
1197     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1198
1199     PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
1200         if (o->detach)
1201             o->detach(o);
1202 }
1203
1204 /* Called from IO thread */
1205 void pa_source_attach_within_thread(pa_source *s) {
1206     pa_source_output *o;
1207     void *state = NULL;
1208
1209     pa_source_assert_ref(s);
1210     pa_source_assert_io_context(s);
1211     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1212
1213     PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
1214         if (o->attach)
1215             o->attach(o);
1216 }
1217
1218 /* Called from IO thread */
1219 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
1220     pa_usec_t result = (pa_usec_t) -1;
1221     pa_source_output *o;
1222     void *state = NULL;
1223
1224     pa_source_assert_ref(s);
1225     pa_source_assert_io_context(s);
1226
1227     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1228         return PA_CLAMP(s->fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
1229
1230     if (s->thread_info.requested_latency_valid)
1231         return s->thread_info.requested_latency;
1232
1233     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1234
1235         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
1236             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
1237             result = o->thread_info.requested_source_latency;
1238
1239     if (result != (pa_usec_t) -1)
1240         result = PA_CLAMP(result, s->thread_info.min_latency, s->thread_info.max_latency);
1241
1242     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1243         /* Only cache this if we are fully set up */
1244         s->thread_info.requested_latency = result;
1245         s->thread_info.requested_latency_valid = TRUE;
1246     }
1247
1248     return result;
1249 }
1250
1251 /* Called from main thread */
1252 pa_usec_t pa_source_get_requested_latency(pa_source *s) {
1253     pa_usec_t usec = 0;
1254
1255     pa_source_assert_ref(s);
1256     pa_assert_ctl_context();
1257     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1258
1259     if (s->state == PA_SOURCE_SUSPENDED)
1260         return 0;
1261
1262     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0);
1263
1264     return usec;
1265 }
1266
1267 /* Called from IO thread */
1268 void pa_source_set_max_rewind_within_thread(pa_source *s, size_t max_rewind) {
1269     pa_source_output *o;
1270     void *state = NULL;
1271
1272     pa_source_assert_ref(s);
1273     pa_source_assert_io_context(s);
1274
1275     if (max_rewind == s->thread_info.max_rewind)
1276         return;
1277
1278     s->thread_info.max_rewind = max_rewind;
1279
1280     if (PA_SOURCE_IS_LINKED(s->thread_info.state))
1281         PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
1282             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1283 }
1284
1285 /* Called from main thread */
1286 void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
1287     pa_source_assert_ref(s);
1288     pa_assert_ctl_context();
1289
1290     if (PA_SOURCE_IS_LINKED(s->state))
1291         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MAX_REWIND, NULL, max_rewind, NULL) == 0);
1292     else
1293         pa_source_set_max_rewind_within_thread(s, max_rewind);
1294 }
1295
1296 /* Called from IO thread */
1297 void pa_source_invalidate_requested_latency(pa_source *s) {
1298     pa_source_output *o;
1299     void *state = NULL;
1300
1301     pa_source_assert_ref(s);
1302     pa_source_assert_io_context(s);
1303
1304     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1305         return;
1306
1307     s->thread_info.requested_latency_valid = FALSE;
1308
1309     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1310
1311         if (s->update_requested_latency)
1312             s->update_requested_latency(s);
1313
1314         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1315             if (o->update_source_requested_latency)
1316                 o->update_source_requested_latency(o);
1317     }
1318
1319     if (s->monitor_of)
1320         pa_sink_invalidate_requested_latency(s->monitor_of);
1321 }
1322
1323 /* Called from main thread */
1324 void pa_source_set_latency_range(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1325     pa_source_assert_ref(s);
1326     pa_assert_ctl_context();
1327
1328     /* min_latency == 0:           no limit
1329      * min_latency anything else:  specified limit
1330      *
1331      * Similar for max_latency */
1332
1333     if (min_latency < ABSOLUTE_MIN_LATENCY)
1334         min_latency = ABSOLUTE_MIN_LATENCY;
1335
1336     if (max_latency <= 0 ||
1337         max_latency > ABSOLUTE_MAX_LATENCY)
1338         max_latency = ABSOLUTE_MAX_LATENCY;
1339
1340     pa_assert(min_latency <= max_latency);
1341
1342     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1343     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1344                max_latency == ABSOLUTE_MAX_LATENCY) ||
1345               (s->flags & PA_SOURCE_DYNAMIC_LATENCY));
1346
1347     if (PA_SOURCE_IS_LINKED(s->state)) {
1348         pa_usec_t r[2];
1349
1350         r[0] = min_latency;
1351         r[1] = max_latency;
1352
1353         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_LATENCY_RANGE, r, 0, NULL) == 0);
1354     } else
1355         pa_source_set_latency_range_within_thread(s, min_latency, max_latency);
1356 }
1357
1358 /* Called from main thread */
1359 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency) {
1360    pa_source_assert_ref(s);
1361    pa_assert_ctl_context();
1362    pa_assert(min_latency);
1363    pa_assert(max_latency);
1364
1365    if (PA_SOURCE_IS_LINKED(s->state)) {
1366        pa_usec_t r[2] = { 0, 0 };
1367
1368        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY_RANGE, r, 0, NULL) == 0);
1369
1370        *min_latency = r[0];
1371        *max_latency = r[1];
1372    } else {
1373        *min_latency = s->thread_info.min_latency;
1374        *max_latency = s->thread_info.max_latency;
1375    }
1376 }
1377
1378 /* Called from IO thread, and from main thread before pa_source_put() is called */
1379 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1380     void *state = NULL;
1381
1382     pa_source_assert_ref(s);
1383     pa_source_assert_io_context(s);
1384
1385     pa_assert(min_latency >= ABSOLUTE_MIN_LATENCY);
1386     pa_assert(max_latency <= ABSOLUTE_MAX_LATENCY);
1387     pa_assert(min_latency <= max_latency);
1388
1389     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1390     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1391                max_latency == ABSOLUTE_MAX_LATENCY) ||
1392               (s->flags & PA_SOURCE_DYNAMIC_LATENCY) ||
1393               s->monitor_of);
1394
1395     s->thread_info.min_latency = min_latency;
1396     s->thread_info.max_latency = max_latency;
1397
1398     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1399         pa_source_output *o;
1400
1401         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1402             if (o->update_source_latency_range)
1403                 o->update_source_latency_range(o);
1404     }
1405
1406     pa_source_invalidate_requested_latency(s);
1407 }
1408
1409 /* Called from main thread, before the source is put */
1410 void pa_source_set_fixed_latency(pa_source *s, pa_usec_t latency) {
1411     pa_source_assert_ref(s);
1412     pa_assert_ctl_context();
1413
1414     pa_assert(pa_source_get_state(s) == PA_SOURCE_INIT);
1415
1416     if (latency < ABSOLUTE_MIN_LATENCY)
1417         latency = ABSOLUTE_MIN_LATENCY;
1418
1419     if (latency > ABSOLUTE_MAX_LATENCY)
1420         latency = ABSOLUTE_MAX_LATENCY;
1421
1422     s->fixed_latency = latency;
1423 }
1424
1425 /* Called from main thread */
1426 size_t pa_source_get_max_rewind(pa_source *s) {
1427     size_t r;
1428     pa_assert_ctl_context();
1429     pa_source_assert_ref(s);
1430
1431     if (!PA_SOURCE_IS_LINKED(s->state))
1432         return s->thread_info.max_rewind;
1433
1434     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MAX_REWIND, &r, 0, NULL) == 0);
1435
1436     return r;
1437 }
1438
1439 /* Called from main context */
1440 int pa_source_set_port(pa_source *s, const char *name, pa_bool_t save) {
1441     pa_device_port *port;
1442
1443     pa_assert(s);
1444     pa_assert_ctl_context();
1445
1446     if (!s->set_port) {
1447         pa_log_debug("set_port() operation not implemented for source %u \"%s\"", s->index, s->name);
1448         return -PA_ERR_NOTIMPLEMENTED;
1449     }
1450
1451     if (!s->ports)
1452         return -PA_ERR_NOENTITY;
1453
1454     if (!(port = pa_hashmap_get(s->ports, name)))
1455         return -PA_ERR_NOENTITY;
1456
1457     if (s->active_port == port) {
1458         s->save_port = s->save_port || save;
1459         return 0;
1460     }
1461
1462     if ((s->set_port(s, port)) < 0)
1463         return -PA_ERR_NOENTITY;
1464
1465     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
1466
1467     pa_log_info("Changed port of source %u \"%s\" to %s", s->index, s->name, port->name);
1468
1469     s->active_port = port;
1470     s->save_port = save;
1471
1472     return 0;
1473 }