core: move rtpoll to thread_info sub structure
[platform/upstream/pulseaudio.git] / src / pulsecore / source.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <pulse/utf8.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/timeval.h>
34 #include <pulse/util.h>
35
36 #include <pulsecore/core-util.h>
37 #include <pulsecore/source-output.h>
38 #include <pulsecore/namereg.h>
39 #include <pulsecore/core-subscribe.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/sample-util.h>
42
43 #include "source.h"
44
45 #define ABSOLUTE_MIN_LATENCY (500)
46 #define ABSOLUTE_MAX_LATENCY (10*PA_USEC_PER_SEC)
47 #define DEFAULT_FIXED_LATENCY (250*PA_USEC_PER_MSEC)
48
49 static PA_DEFINE_CHECK_TYPE(pa_source, pa_msgobject);
50
51 static void source_free(pa_object *o);
52
53 pa_source_new_data* pa_source_new_data_init(pa_source_new_data *data) {
54     pa_assert(data);
55
56     pa_zero(*data);
57     data->proplist = pa_proplist_new();
58
59     return data;
60 }
61
62 void pa_source_new_data_set_name(pa_source_new_data *data, const char *name) {
63     pa_assert(data);
64
65     pa_xfree(data->name);
66     data->name = pa_xstrdup(name);
67 }
68
69 void pa_source_new_data_set_sample_spec(pa_source_new_data *data, const pa_sample_spec *spec) {
70     pa_assert(data);
71
72     if ((data->sample_spec_is_set = !!spec))
73         data->sample_spec = *spec;
74 }
75
76 void pa_source_new_data_set_channel_map(pa_source_new_data *data, const pa_channel_map *map) {
77     pa_assert(data);
78
79     if ((data->channel_map_is_set = !!map))
80         data->channel_map = *map;
81 }
82
83 void pa_source_new_data_set_volume(pa_source_new_data *data, const pa_cvolume *volume) {
84     pa_assert(data);
85
86     if ((data->volume_is_set = !!volume))
87         data->volume = *volume;
88 }
89
90 void pa_source_new_data_set_muted(pa_source_new_data *data, pa_bool_t mute) {
91     pa_assert(data);
92
93     data->muted_is_set = TRUE;
94     data->muted = !!mute;
95 }
96
97 void pa_source_new_data_set_port(pa_source_new_data *data, const char *port) {
98     pa_assert(data);
99
100     pa_xfree(data->active_port);
101     data->active_port = pa_xstrdup(port);
102 }
103
104 void pa_source_new_data_done(pa_source_new_data *data) {
105     pa_assert(data);
106
107     pa_proplist_free(data->proplist);
108
109     if (data->ports) {
110         pa_device_port *p;
111
112         while ((p = pa_hashmap_steal_first(data->ports)))
113             pa_device_port_free(p);
114
115         pa_hashmap_free(data->ports, NULL, NULL);
116     }
117
118     pa_xfree(data->name);
119     pa_xfree(data->active_port);
120 }
121
122 /* Called from main context */
123 static void reset_callbacks(pa_source *s) {
124     pa_assert(s);
125
126     s->set_state = NULL;
127     s->get_volume = NULL;
128     s->set_volume = NULL;
129     s->get_mute = NULL;
130     s->set_mute = NULL;
131     s->update_requested_latency = NULL;
132     s->set_port = NULL;
133 }
134
135 /* Called from main context */
136 pa_source* pa_source_new(
137         pa_core *core,
138         pa_source_new_data *data,
139         pa_source_flags_t flags) {
140
141     pa_source *s;
142     const char *name;
143     char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
144     char *pt;
145
146     pa_assert(core);
147     pa_assert(data);
148     pa_assert(data->name);
149     pa_assert_ctl_context();
150
151     s = pa_msgobject_new(pa_source);
152
153     if (!(name = pa_namereg_register(core, data->name, PA_NAMEREG_SOURCE, s, data->namereg_fail))) {
154         pa_log_debug("Failed to register name %s.", data->name);
155         pa_xfree(s);
156         return NULL;
157     }
158
159     pa_source_new_data_set_name(data, name);
160
161     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_NEW], data) < 0) {
162         pa_xfree(s);
163         pa_namereg_unregister(core, name);
164         return NULL;
165     }
166
167     /* FIXME, need to free s here on failure */
168
169     pa_return_null_if_fail(!data->driver || pa_utf8_valid(data->driver));
170     pa_return_null_if_fail(data->name && pa_utf8_valid(data->name) && data->name[0]);
171
172     pa_return_null_if_fail(data->sample_spec_is_set && pa_sample_spec_valid(&data->sample_spec));
173
174     if (!data->channel_map_is_set)
175         pa_return_null_if_fail(pa_channel_map_init_auto(&data->channel_map, data->sample_spec.channels, PA_CHANNEL_MAP_DEFAULT));
176
177     pa_return_null_if_fail(pa_channel_map_valid(&data->channel_map));
178     pa_return_null_if_fail(data->channel_map.channels == data->sample_spec.channels);
179
180     if (!data->volume_is_set)
181         pa_cvolume_reset(&data->volume, data->sample_spec.channels);
182
183     pa_return_null_if_fail(pa_cvolume_valid(&data->volume));
184     pa_return_null_if_fail(data->volume.channels == data->sample_spec.channels);
185
186     if (!data->muted_is_set)
187         data->muted = FALSE;
188
189     if (data->card)
190         pa_proplist_update(data->proplist, PA_UPDATE_MERGE, data->card->proplist);
191
192     pa_device_init_description(data->proplist);
193     pa_device_init_icon(data->proplist, FALSE);
194     pa_device_init_intended_roles(data->proplist);
195
196     if (pa_hook_fire(&core->hooks[PA_CORE_HOOK_SOURCE_FIXATE], data) < 0) {
197         pa_xfree(s);
198         pa_namereg_unregister(core, name);
199         return NULL;
200     }
201
202     s->parent.parent.free = source_free;
203     s->parent.process_msg = pa_source_process_msg;
204
205     s->core = core;
206     s->state = PA_SOURCE_INIT;
207     s->flags = flags;
208     s->suspend_cause = 0;
209     s->name = pa_xstrdup(name);
210     s->proplist = pa_proplist_copy(data->proplist);
211     s->driver = pa_xstrdup(pa_path_get_filename(data->driver));
212     s->module = data->module;
213     s->card = data->card;
214
215     s->sample_spec = data->sample_spec;
216     s->channel_map = data->channel_map;
217
218     s->outputs = pa_idxset_new(NULL, NULL);
219     s->n_corked = 0;
220     s->monitor_of = NULL;
221
222     s->virtual_volume = data->volume;
223     pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
224     s->base_volume = PA_VOLUME_NORM;
225     s->n_volume_steps = PA_VOLUME_NORM+1;
226     s->muted = data->muted;
227     s->refresh_volume = s->refresh_muted = FALSE;
228
229     s->fixed_latency = flags & PA_SOURCE_DYNAMIC_LATENCY ? 0 : DEFAULT_FIXED_LATENCY;
230
231     reset_callbacks(s);
232     s->userdata = NULL;
233
234     s->asyncmsgq = NULL;
235
236     /* As a minor optimization we just steal the list instead of
237      * copying it here */
238     s->ports = data->ports;
239     data->ports = NULL;
240
241     s->active_port = NULL;
242     s->save_port = FALSE;
243
244     if (data->active_port && s->ports)
245         if ((s->active_port = pa_hashmap_get(s->ports, data->active_port)))
246             s->save_port = data->save_port;
247
248     if (!s->active_port && s->ports) {
249         void *state;
250         pa_device_port *p;
251
252         PA_HASHMAP_FOREACH(p, s->ports, state)
253             if (!s->active_port || p->priority > s->active_port->priority)
254                 s->active_port = p;
255     }
256
257     s->save_volume = data->save_volume;
258     s->save_muted = data->save_muted;
259
260     pa_silence_memchunk_get(
261             &core->silence_cache,
262             core->mempool,
263             &s->silence,
264             &s->sample_spec,
265             0);
266
267     s->thread_info.rtpoll = NULL;
268     s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
269     s->thread_info.soft_volume = s->soft_volume;
270     s->thread_info.soft_muted = s->muted;
271     s->thread_info.state = s->state;
272     s->thread_info.max_rewind = 0;
273     s->thread_info.requested_latency_valid = FALSE;
274     s->thread_info.requested_latency = 0;
275     s->thread_info.min_latency = ABSOLUTE_MIN_LATENCY;
276     s->thread_info.max_latency = ABSOLUTE_MAX_LATENCY;
277
278     pa_assert_se(pa_idxset_put(core->sources, s, &s->index) >= 0);
279
280     if (s->card)
281         pa_assert_se(pa_idxset_put(s->card->sources, s, NULL) >= 0);
282
283     pt = pa_proplist_to_string_sep(s->proplist, "\n    ");
284     pa_log_info("Created source %u \"%s\" with sample spec %s and channel map %s\n    %s",
285                 s->index,
286                 s->name,
287                 pa_sample_spec_snprint(st, sizeof(st), &s->sample_spec),
288                 pa_channel_map_snprint(cm, sizeof(cm), &s->channel_map),
289                 pt);
290     pa_xfree(pt);
291
292     return s;
293 }
294
295 /* Called from main context */
296 static int source_set_state(pa_source *s, pa_source_state_t state) {
297     int ret;
298     pa_bool_t suspend_change;
299     pa_source_state_t original_state;
300
301     pa_assert(s);
302     pa_assert_ctl_context();
303
304     if (s->state == state)
305         return 0;
306
307     original_state = s->state;
308
309     suspend_change =
310         (original_state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(state)) ||
311         (PA_SOURCE_IS_OPENED(original_state) && state == PA_SOURCE_SUSPENDED);
312
313     if (s->set_state)
314         if ((ret = s->set_state(s, state)) < 0)
315             return ret;
316
317     if (s->asyncmsgq)
318         if ((ret = pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL)) < 0) {
319
320             if (s->set_state)
321                 s->set_state(s, original_state);
322
323             return ret;
324         }
325
326     s->state = state;
327
328     if (state != PA_SOURCE_UNLINKED) { /* if we enter UNLINKED state pa_source_unlink() will fire the apropriate events */
329         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_STATE_CHANGED], s);
330         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
331     }
332
333     if (suspend_change) {
334         pa_source_output *o;
335         uint32_t idx;
336
337         /* We're suspending or resuming, tell everyone about it */
338
339         PA_IDXSET_FOREACH(o, s->outputs, idx)
340             if (s->state == PA_SOURCE_SUSPENDED &&
341                 (o->flags & PA_SOURCE_OUTPUT_KILL_ON_SUSPEND))
342                 pa_source_output_kill(o);
343             else if (o->suspend)
344                 o->suspend(o, state == PA_SOURCE_SUSPENDED);
345     }
346
347     return 0;
348 }
349
350 /* Called from main context */
351 void pa_source_put(pa_source *s) {
352     pa_source_assert_ref(s);
353     pa_assert_ctl_context();
354
355     pa_assert(s->state == PA_SOURCE_INIT);
356
357     /* The following fields must be initialized properly when calling _put() */
358     pa_assert(s->asyncmsgq);
359     pa_assert(s->thread_info.min_latency <= s->thread_info.max_latency);
360
361     /* Generally, flags should be initialized via pa_source_new(). As
362      * a special exception we allow volume related flags to be set
363      * between _new() and _put(). */
364
365     if (!(s->flags & PA_SOURCE_HW_VOLUME_CTRL))
366         s->flags |= PA_SOURCE_DECIBEL_VOLUME;
367
368     s->thread_info.soft_volume = s->soft_volume;
369     s->thread_info.soft_muted = s->muted;
370
371     pa_assert((s->flags & PA_SOURCE_HW_VOLUME_CTRL) || (s->base_volume == PA_VOLUME_NORM && s->flags & PA_SOURCE_DECIBEL_VOLUME));
372     pa_assert(!(s->flags & PA_SOURCE_DECIBEL_VOLUME) || s->n_volume_steps == PA_VOLUME_NORM+1);
373     pa_assert(!(s->flags & PA_SOURCE_DYNAMIC_LATENCY) == (s->fixed_latency != 0));
374
375     pa_assert_se(source_set_state(s, PA_SOURCE_IDLE) == 0);
376
377     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index);
378     pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PUT], s);
379 }
380
381 /* Called from main context */
382 void pa_source_unlink(pa_source *s) {
383     pa_bool_t linked;
384     pa_source_output *o, *j = NULL;
385
386     pa_assert(s);
387     pa_assert_ctl_context();
388
389     /* See pa_sink_unlink() for a couple of comments how this function
390      * works. */
391
392     linked = PA_SOURCE_IS_LINKED(s->state);
393
394     if (linked)
395         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], s);
396
397     if (s->state != PA_SOURCE_UNLINKED)
398         pa_namereg_unregister(s->core, s->name);
399     pa_idxset_remove_by_data(s->core->sources, s, NULL);
400
401     if (s->card)
402         pa_idxset_remove_by_data(s->card->sources, s, NULL);
403
404     while ((o = pa_idxset_first(s->outputs, NULL))) {
405         pa_assert(o != j);
406         pa_source_output_kill(o);
407         j = o;
408     }
409
410     if (linked)
411         source_set_state(s, PA_SOURCE_UNLINKED);
412     else
413         s->state = PA_SOURCE_UNLINKED;
414
415     reset_callbacks(s);
416
417     if (linked) {
418         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
419         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK_POST], s);
420     }
421 }
422
423 /* Called from main context */
424 static void source_free(pa_object *o) {
425     pa_source_output *so;
426     pa_source *s = PA_SOURCE(o);
427
428     pa_assert(s);
429     pa_assert_ctl_context();
430     pa_assert(pa_source_refcnt(s) == 0);
431
432     if (PA_SOURCE_IS_LINKED(s->state))
433         pa_source_unlink(s);
434
435     pa_log_info("Freeing source %u \"%s\"", s->index, s->name);
436
437     pa_idxset_free(s->outputs, NULL, NULL);
438
439     while ((so = pa_hashmap_steal_first(s->thread_info.outputs)))
440         pa_source_output_unref(so);
441
442     pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
443
444     if (s->silence.memblock)
445         pa_memblock_unref(s->silence.memblock);
446
447     pa_xfree(s->name);
448     pa_xfree(s->driver);
449
450     if (s->proplist)
451         pa_proplist_free(s->proplist);
452
453     if (s->ports) {
454         pa_device_port *p;
455
456         while ((p = pa_hashmap_steal_first(s->ports)))
457             pa_device_port_free(p);
458
459         pa_hashmap_free(s->ports, NULL, NULL);
460     }
461
462     pa_xfree(s);
463 }
464
465 /* Called from main context */
466 void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
467     pa_source_assert_ref(s);
468     pa_assert_ctl_context();
469
470     s->asyncmsgq = q;
471 }
472
473 /* Called from main context */
474 void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
475     pa_source_assert_ref(s);
476     pa_source_assert_io_context(s);
477
478     s->thread_info.rtpoll = p;
479 }
480
481 /* Called from main context */
482 int pa_source_update_status(pa_source*s) {
483     pa_source_assert_ref(s);
484     pa_assert_ctl_context();
485     pa_assert(PA_SOURCE_IS_LINKED(s->state));
486
487     if (s->state == PA_SOURCE_SUSPENDED)
488         return 0;
489
490     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
491 }
492
493 /* Called from main context */
494 int pa_source_suspend(pa_source *s, pa_bool_t suspend, pa_suspend_cause_t cause) {
495     pa_source_assert_ref(s);
496     pa_assert_ctl_context();
497     pa_assert(PA_SOURCE_IS_LINKED(s->state));
498     pa_assert(cause != 0);
499
500     if (s->monitor_of)
501         return -PA_ERR_NOTSUPPORTED;
502
503     if (suspend)
504         s->suspend_cause |= cause;
505     else
506         s->suspend_cause &= ~cause;
507
508     if ((pa_source_get_state(s) == PA_SOURCE_SUSPENDED) == !!s->suspend_cause)
509         return 0;
510
511     pa_log_debug("Suspend cause of source %s is 0x%04x, %s", s->name, s->suspend_cause, s->suspend_cause ? "suspending" : "resuming");
512
513     if (suspend)
514         return source_set_state(s, PA_SOURCE_SUSPENDED);
515     else
516         return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
517 }
518
519 /* Called from main context */
520 int pa_source_sync_suspend(pa_source *s) {
521     pa_sink_state_t state;
522
523     pa_source_assert_ref(s);
524     pa_assert_ctl_context();
525     pa_assert(PA_SOURCE_IS_LINKED(s->state));
526     pa_assert(s->monitor_of);
527
528     state = pa_sink_get_state(s->monitor_of);
529
530     if (state == PA_SINK_SUSPENDED)
531         return source_set_state(s, PA_SOURCE_SUSPENDED);
532
533     pa_assert(PA_SINK_IS_OPENED(state));
534
535     return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
536 }
537
538 /* Called from main context */
539 pa_queue *pa_source_move_all_start(pa_source *s, pa_queue *q) {
540     pa_source_output *o, *n;
541     uint32_t idx;
542
543     pa_source_assert_ref(s);
544     pa_assert_ctl_context();
545     pa_assert(PA_SOURCE_IS_LINKED(s->state));
546
547     if (!q)
548         q = pa_queue_new();
549
550     for (o = PA_SOURCE_OUTPUT(pa_idxset_first(s->outputs, &idx)); o; o = n) {
551         n = PA_SOURCE_OUTPUT(pa_idxset_next(s->outputs, &idx));
552
553         pa_source_output_ref(o);
554
555         if (pa_source_output_start_move(o) >= 0)
556             pa_queue_push(q, o);
557         else
558             pa_source_output_unref(o);
559     }
560
561     return q;
562 }
563
564 /* Called from main context */
565 void pa_source_move_all_finish(pa_source *s, pa_queue *q, pa_bool_t save) {
566     pa_source_output *o;
567
568     pa_source_assert_ref(s);
569     pa_assert_ctl_context();
570     pa_assert(PA_SOURCE_IS_LINKED(s->state));
571     pa_assert(q);
572
573     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
574         if (pa_source_output_finish_move(o, s, save) < 0)
575             pa_source_output_fail_move(o);
576
577         pa_source_output_unref(o);
578     }
579
580     pa_queue_free(q, NULL, NULL);
581 }
582
583 /* Called from main context */
584 void pa_source_move_all_fail(pa_queue *q) {
585     pa_source_output *o;
586
587     pa_assert_ctl_context();
588     pa_assert(q);
589
590     while ((o = PA_SOURCE_OUTPUT(pa_queue_pop(q)))) {
591         pa_source_output_fail_move(o);
592         pa_source_output_unref(o);
593     }
594
595     pa_queue_free(q, NULL, NULL);
596 }
597
598 /* Called from IO thread context */
599 void pa_source_process_rewind(pa_source *s, size_t nbytes) {
600     pa_source_output *o;
601     void *state = NULL;
602
603     pa_source_assert_ref(s);
604     pa_source_assert_io_context(s);
605     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
606
607     if (nbytes <= 0)
608         return;
609
610     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
611         return;
612
613     pa_log_debug("Processing rewind...");
614
615     PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state) {
616         pa_source_output_assert_ref(o);
617         pa_source_output_process_rewind(o, nbytes);
618     }
619 }
620
621 /* Called from IO thread context */
622 void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
623     pa_source_output *o;
624     void *state = NULL;
625
626     pa_source_assert_ref(s);
627     pa_source_assert_io_context(s);
628     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
629     pa_assert(chunk);
630
631     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
632         return;
633
634     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
635         pa_memchunk vchunk = *chunk;
636
637         pa_memblock_ref(vchunk.memblock);
638         pa_memchunk_make_writable(&vchunk, 0);
639
640         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
641             pa_silence_memchunk(&vchunk, &s->sample_spec);
642         else
643             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
644
645         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
646             pa_source_output_assert_ref(o);
647
648             if (!o->thread_info.direct_on_input)
649                 pa_source_output_push(o, &vchunk);
650         }
651
652         pa_memblock_unref(vchunk.memblock);
653     } else {
654
655         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
656             pa_source_output_assert_ref(o);
657
658             if (!o->thread_info.direct_on_input)
659                 pa_source_output_push(o, chunk);
660         }
661     }
662 }
663
664 /* Called from IO thread context */
665 void pa_source_post_direct(pa_source*s, pa_source_output *o, const pa_memchunk *chunk) {
666     pa_source_assert_ref(s);
667     pa_source_assert_io_context(s);
668     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
669     pa_source_output_assert_ref(o);
670     pa_assert(o->thread_info.direct_on_input);
671     pa_assert(chunk);
672
673     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
674         return;
675
676     if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&s->thread_info.soft_volume)) {
677         pa_memchunk vchunk = *chunk;
678
679         pa_memblock_ref(vchunk.memblock);
680         pa_memchunk_make_writable(&vchunk, 0);
681
682         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&s->thread_info.soft_volume))
683             pa_silence_memchunk(&vchunk, &s->sample_spec);
684         else
685             pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
686
687         pa_source_output_push(o, &vchunk);
688
689         pa_memblock_unref(vchunk.memblock);
690     } else
691         pa_source_output_push(o, chunk);
692 }
693
694 /* Called from main thread */
695 pa_usec_t pa_source_get_latency(pa_source *s) {
696     pa_usec_t usec;
697
698     pa_source_assert_ref(s);
699     pa_assert_ctl_context();
700     pa_assert(PA_SOURCE_IS_LINKED(s->state));
701
702     if (s->state == PA_SOURCE_SUSPENDED)
703         return 0;
704
705     if (!(s->flags & PA_SOURCE_LATENCY))
706         return 0;
707
708     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) == 0);
709
710     return usec;
711 }
712
713 /* Called from IO thread */
714 pa_usec_t pa_source_get_latency_within_thread(pa_source *s) {
715     pa_usec_t usec = 0;
716     pa_msgobject *o;
717
718     pa_source_assert_ref(s);
719     pa_source_assert_io_context(s);
720     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
721
722     /* The returned value is supposed to be in the time domain of the sound card! */
723
724     if (s->thread_info.state == PA_SOURCE_SUSPENDED)
725         return 0;
726
727     if (!(s->flags & PA_SOURCE_LATENCY))
728         return 0;
729
730     o = PA_MSGOBJECT(s);
731
732     /* We probably should make this a proper vtable callback instead of going through process_msg() */
733
734     if (o->process_msg(o, PA_SOURCE_MESSAGE_GET_LATENCY, &usec, 0, NULL) < 0)
735         return -1;
736
737     return usec;
738 }
739
740 /* Called from main thread */
741 void pa_source_set_volume(pa_source *s, const pa_cvolume *volume, pa_bool_t save) {
742     pa_cvolume old_virtual_volume;
743     pa_bool_t virtual_volume_changed;
744
745     pa_source_assert_ref(s);
746     pa_assert_ctl_context();
747     pa_assert(PA_SOURCE_IS_LINKED(s->state));
748     pa_assert(volume);
749     pa_assert(pa_cvolume_valid(volume));
750     pa_assert(pa_cvolume_compatible(volume, &s->sample_spec));
751
752     old_virtual_volume = s->virtual_volume;
753     s->virtual_volume = *volume;
754     virtual_volume_changed = !pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume);
755     s->save_volume = (!virtual_volume_changed && s->save_volume) || save;
756
757     if (s->set_volume) {
758         pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
759         s->set_volume(s);
760     } else
761         s->soft_volume = s->virtual_volume;
762
763     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
764
765     if (virtual_volume_changed)
766         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
767 }
768
769 /* Called from main thread. Only to be called by source implementor */
770 void pa_source_set_soft_volume(pa_source *s, const pa_cvolume *volume) {
771     pa_source_assert_ref(s);
772     pa_assert_ctl_context();
773     pa_assert(volume);
774
775     if (PA_SOURCE_IS_LINKED(s->state))
776         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_VOLUME, NULL, 0, NULL) == 0);
777     else
778         s->thread_info.soft_volume = *volume;
779 }
780
781 /* Called from main thread */
782 const pa_cvolume *pa_source_get_volume(pa_source *s, pa_bool_t force_refresh) {
783     pa_source_assert_ref(s);
784     pa_assert_ctl_context();
785     pa_assert(PA_SOURCE_IS_LINKED(s->state));
786
787     if (s->refresh_volume || force_refresh) {
788         pa_cvolume old_virtual_volume = s->virtual_volume;
789
790         if (s->get_volume)
791             s->get_volume(s);
792
793         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_VOLUME, NULL, 0, NULL) == 0);
794
795         if (!pa_cvolume_equal(&old_virtual_volume, &s->virtual_volume)) {
796             s->save_volume = TRUE;
797             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
798         }
799     }
800
801     return &s->virtual_volume;
802 }
803
804 /* Called from main thread */
805 void pa_source_volume_changed(pa_source *s, const pa_cvolume *new_volume) {
806     pa_source_assert_ref(s);
807     pa_assert_ctl_context();
808     pa_assert(PA_SOURCE_IS_LINKED(s->state));
809
810     /* The source implementor may call this if the volume changed to make sure everyone is notified */
811
812     if (pa_cvolume_equal(&s->virtual_volume, new_volume))
813         return;
814
815     s->virtual_volume = *new_volume;
816     s->save_volume = TRUE;
817
818     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
819 }
820
821 /* Called from main thread */
822 void pa_source_set_mute(pa_source *s, pa_bool_t mute, pa_bool_t save) {
823     pa_bool_t old_muted;
824
825     pa_source_assert_ref(s);
826     pa_assert_ctl_context();
827     pa_assert(PA_SOURCE_IS_LINKED(s->state));
828
829     old_muted = s->muted;
830     s->muted = mute;
831     s->save_muted = (old_muted == s->muted && s->save_muted) || save;
832
833     if (s->set_mute)
834         s->set_mute(s);
835
836     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
837
838     if (old_muted != s->muted)
839         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
840 }
841
842 /* Called from main thread */
843 pa_bool_t pa_source_get_mute(pa_source *s, pa_bool_t force_refresh) {
844     pa_source_assert_ref(s);
845     pa_assert_ctl_context();
846     pa_assert(PA_SOURCE_IS_LINKED(s->state));
847
848     if (s->refresh_muted || force_refresh) {
849         pa_bool_t old_muted = s->muted;
850
851         if (s->get_mute)
852             s->get_mute(s);
853
854         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MUTE, NULL, 0, NULL) == 0);
855
856         if (old_muted != s->muted) {
857             s->save_muted = TRUE;
858
859             pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
860
861             /* Make sure the soft mute status stays in sync */
862             pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MUTE, NULL, 0, NULL) == 0);
863         }
864     }
865
866     return s->muted;
867 }
868
869 /* Called from main thread */
870 void pa_source_mute_changed(pa_source *s, pa_bool_t new_muted) {
871     pa_source_assert_ref(s);
872     pa_assert_ctl_context();
873     pa_assert(PA_SOURCE_IS_LINKED(s->state));
874
875     /* The source implementor may call this if the mute state changed to make sure everyone is notified */
876
877     if (s->muted == new_muted)
878         return;
879
880     s->muted = new_muted;
881     s->save_muted = TRUE;
882
883     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
884 }
885
886 /* Called from main thread */
887 pa_bool_t pa_source_update_proplist(pa_source *s, pa_update_mode_t mode, pa_proplist *p) {
888     pa_source_assert_ref(s);
889     pa_assert_ctl_context();
890
891     if (p)
892         pa_proplist_update(s->proplist, mode, p);
893
894     if (PA_SOURCE_IS_LINKED(s->state)) {
895         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
896         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
897     }
898
899     return TRUE;
900 }
901
902 /* Called from main thread */
903 /* FIXME -- this should be dropped and be merged into pa_source_update_proplist() */
904 void pa_source_set_description(pa_source *s, const char *description) {
905     const char *old;
906     pa_source_assert_ref(s);
907     pa_assert_ctl_context();
908
909     if (!description && !pa_proplist_contains(s->proplist, PA_PROP_DEVICE_DESCRIPTION))
910         return;
911
912     old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
913
914     if (old && description && pa_streq(old, description))
915         return;
916
917     if (description)
918         pa_proplist_sets(s->proplist, PA_PROP_DEVICE_DESCRIPTION, description);
919     else
920         pa_proplist_unset(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
921
922     if (PA_SOURCE_IS_LINKED(s->state)) {
923         pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
924         pa_hook_fire(&s->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], s);
925     }
926 }
927
928 /* Called from main thread */
929 unsigned pa_source_linked_by(pa_source *s) {
930     pa_source_assert_ref(s);
931     pa_assert(PA_SOURCE_IS_LINKED(s->state));
932     pa_assert_ctl_context();
933
934     return pa_idxset_size(s->outputs);
935 }
936
937 /* Called from main thread */
938 unsigned pa_source_used_by(pa_source *s) {
939     unsigned ret;
940
941     pa_source_assert_ref(s);
942     pa_assert(PA_SOURCE_IS_LINKED(s->state));
943     pa_assert_ctl_context();
944
945     ret = pa_idxset_size(s->outputs);
946     pa_assert(ret >= s->n_corked);
947
948     return ret - s->n_corked;
949 }
950
951 /* Called from main thread */
952 unsigned pa_source_check_suspend(pa_source *s) {
953     unsigned ret;
954     pa_source_output *o;
955     uint32_t idx;
956
957     pa_source_assert_ref(s);
958     pa_assert_ctl_context();
959
960     if (!PA_SOURCE_IS_LINKED(s->state))
961         return 0;
962
963     ret = 0;
964
965     PA_IDXSET_FOREACH(o, s->outputs, idx) {
966         pa_source_output_state_t st;
967
968         st = pa_source_output_get_state(o);
969         pa_assert(PA_SOURCE_OUTPUT_IS_LINKED(st));
970
971         if (st == PA_SOURCE_OUTPUT_CORKED)
972             continue;
973
974         if (o->flags & PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND)
975             continue;
976
977         ret ++;
978     }
979
980     return ret;
981 }
982
983 /* Called from IO thread, except when it is not */
984 int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
985     pa_source *s = PA_SOURCE(object);
986     pa_source_assert_ref(s);
987
988     switch ((pa_source_message_t) code) {
989
990         case PA_SOURCE_MESSAGE_ADD_OUTPUT: {
991             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
992
993             pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
994
995             if (o->direct_on_input) {
996                 o->thread_info.direct_on_input = o->direct_on_input;
997                 pa_hashmap_put(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index), o);
998             }
999
1000             pa_assert(!o->thread_info.attached);
1001             o->thread_info.attached = TRUE;
1002
1003             if (o->attach)
1004                 o->attach(o);
1005
1006             pa_source_output_set_state_within_thread(o, o->state);
1007
1008             if (o->thread_info.requested_source_latency != (pa_usec_t) -1)
1009                 pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
1010
1011             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1012
1013             /* We don't just invalidate the requested latency here,
1014              * because if we are in a move we might need to fix up the
1015              * requested latency. */
1016             pa_source_output_set_requested_latency_within_thread(o, o->thread_info.requested_source_latency);
1017
1018             return 0;
1019         }
1020
1021         case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
1022             pa_source_output *o = PA_SOURCE_OUTPUT(userdata);
1023
1024             pa_source_output_set_state_within_thread(o, o->state);
1025
1026             if (o->detach)
1027                 o->detach(o);
1028
1029             pa_assert(o->thread_info.attached);
1030             o->thread_info.attached = FALSE;
1031
1032             if (o->thread_info.direct_on_input) {
1033                 pa_hashmap_remove(o->thread_info.direct_on_input->thread_info.direct_outputs, PA_UINT32_TO_PTR(o->index));
1034                 o->thread_info.direct_on_input = NULL;
1035             }
1036
1037             if (pa_hashmap_remove(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index)))
1038                 pa_source_output_unref(o);
1039
1040             pa_source_invalidate_requested_latency(s);
1041
1042             return 0;
1043         }
1044
1045         case PA_SOURCE_MESSAGE_SET_VOLUME:
1046             s->thread_info.soft_volume = s->soft_volume;
1047             return 0;
1048
1049         case PA_SOURCE_MESSAGE_GET_VOLUME:
1050             return 0;
1051
1052         case PA_SOURCE_MESSAGE_SET_MUTE:
1053             s->thread_info.soft_muted = s->muted;
1054             return 0;
1055
1056         case PA_SOURCE_MESSAGE_GET_MUTE:
1057             return 0;
1058
1059         case PA_SOURCE_MESSAGE_SET_STATE: {
1060
1061             pa_bool_t suspend_change =
1062                 (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
1063                 (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
1064
1065             s->thread_info.state = PA_PTR_TO_UINT(userdata);
1066
1067             if (suspend_change) {
1068                 pa_source_output *o;
1069                 void *state = NULL;
1070
1071                 while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1072                     if (o->suspend_within_thread)
1073                         o->suspend_within_thread(o, s->thread_info.state == PA_SOURCE_SUSPENDED);
1074             }
1075
1076
1077             return 0;
1078         }
1079
1080         case PA_SOURCE_MESSAGE_DETACH:
1081
1082             /* Detach all streams */
1083             pa_source_detach_within_thread(s);
1084             return 0;
1085
1086         case PA_SOURCE_MESSAGE_ATTACH:
1087
1088             /* Reattach all streams */
1089             pa_source_attach_within_thread(s);
1090             return 0;
1091
1092         case PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY: {
1093
1094             pa_usec_t *usec = userdata;
1095             *usec = pa_source_get_requested_latency_within_thread(s);
1096
1097             if (*usec == (pa_usec_t) -1)
1098                 *usec = s->thread_info.max_latency;
1099
1100             return 0;
1101         }
1102
1103         case PA_SOURCE_MESSAGE_SET_LATENCY_RANGE: {
1104             pa_usec_t *r = userdata;
1105
1106             pa_source_set_latency_range_within_thread(s, r[0], r[1]);
1107
1108             return 0;
1109         }
1110
1111         case PA_SOURCE_MESSAGE_GET_LATENCY_RANGE: {
1112             pa_usec_t *r = userdata;
1113
1114             r[0] = s->thread_info.min_latency;
1115             r[1] = s->thread_info.max_latency;
1116
1117             return 0;
1118         }
1119
1120         case PA_SOURCE_MESSAGE_GET_MAX_REWIND:
1121
1122             *((size_t*) userdata) = s->thread_info.max_rewind;
1123             return 0;
1124
1125         case PA_SOURCE_MESSAGE_SET_MAX_REWIND:
1126
1127             pa_source_set_max_rewind_within_thread(s, (size_t) offset);
1128             return 0;
1129
1130         case PA_SOURCE_MESSAGE_GET_LATENCY:
1131
1132             if (s->monitor_of) {
1133                 *((pa_usec_t*) userdata) = 0;
1134                 return 0;
1135             }
1136
1137             /* Implementors need to overwrite this implementation! */
1138             return -1;
1139
1140         case PA_SOURCE_MESSAGE_MAX:
1141             ;
1142     }
1143
1144     return -1;
1145 }
1146
1147 /* Called from main thread */
1148 int pa_source_suspend_all(pa_core *c, pa_bool_t suspend, pa_suspend_cause_t cause) {
1149     uint32_t idx;
1150     pa_source *source;
1151     int ret = 0;
1152
1153     pa_core_assert_ref(c);
1154     pa_assert_ctl_context();
1155     pa_assert(cause != 0);
1156
1157     for (source = PA_SOURCE(pa_idxset_first(c->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(c->sources, &idx))) {
1158         int r;
1159
1160         if (source->monitor_of)
1161             continue;
1162
1163         if ((r = pa_source_suspend(source, suspend, cause)) < 0)
1164             ret = r;
1165     }
1166
1167     return ret;
1168 }
1169
1170 /* Called from main thread */
1171 void pa_source_detach(pa_source *s) {
1172     pa_source_assert_ref(s);
1173     pa_assert_ctl_context();
1174     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1175
1176     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_DETACH, NULL, 0, NULL) == 0);
1177 }
1178
1179 /* Called from main thread */
1180 void pa_source_attach(pa_source *s) {
1181     pa_source_assert_ref(s);
1182     pa_assert_ctl_context();
1183     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1184
1185     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_ATTACH, NULL, 0, NULL) == 0);
1186 }
1187
1188 /* Called from IO thread */
1189 void pa_source_detach_within_thread(pa_source *s) {
1190     pa_source_output *o;
1191     void *state = NULL;
1192
1193     pa_source_assert_ref(s);
1194     pa_source_assert_io_context(s);
1195     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1196
1197     PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
1198         if (o->detach)
1199             o->detach(o);
1200 }
1201
1202 /* Called from IO thread */
1203 void pa_source_attach_within_thread(pa_source *s) {
1204     pa_source_output *o;
1205     void *state = NULL;
1206
1207     pa_source_assert_ref(s);
1208     pa_source_assert_io_context(s);
1209     pa_assert(PA_SOURCE_IS_LINKED(s->thread_info.state));
1210
1211     PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
1212         if (o->attach)
1213             o->attach(o);
1214 }
1215
1216 /* Called from IO thread */
1217 pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
1218     pa_usec_t result = (pa_usec_t) -1;
1219     pa_source_output *o;
1220     void *state = NULL;
1221
1222     pa_source_assert_ref(s);
1223     pa_source_assert_io_context(s);
1224
1225     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1226         return PA_CLAMP(s->fixed_latency, s->thread_info.min_latency, s->thread_info.max_latency);
1227
1228     if (s->thread_info.requested_latency_valid)
1229         return s->thread_info.requested_latency;
1230
1231     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1232
1233         if (o->thread_info.requested_source_latency != (pa_usec_t) -1 &&
1234             (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
1235             result = o->thread_info.requested_source_latency;
1236
1237     if (result != (pa_usec_t) -1)
1238         result = PA_CLAMP(result, s->thread_info.min_latency, s->thread_info.max_latency);
1239
1240     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1241         /* Only cache this if we are fully set up */
1242         s->thread_info.requested_latency = result;
1243         s->thread_info.requested_latency_valid = TRUE;
1244     }
1245
1246     return result;
1247 }
1248
1249 /* Called from main thread */
1250 pa_usec_t pa_source_get_requested_latency(pa_source *s) {
1251     pa_usec_t usec = 0;
1252
1253     pa_source_assert_ref(s);
1254     pa_assert_ctl_context();
1255     pa_assert(PA_SOURCE_IS_LINKED(s->state));
1256
1257     if (s->state == PA_SOURCE_SUSPENDED)
1258         return 0;
1259
1260     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, &usec, 0, NULL) == 0);
1261
1262     return usec;
1263 }
1264
1265 /* Called from IO thread */
1266 void pa_source_set_max_rewind_within_thread(pa_source *s, size_t max_rewind) {
1267     pa_source_output *o;
1268     void *state = NULL;
1269
1270     pa_source_assert_ref(s);
1271     pa_source_assert_io_context(s);
1272
1273     if (max_rewind == s->thread_info.max_rewind)
1274         return;
1275
1276     s->thread_info.max_rewind = max_rewind;
1277
1278     if (PA_SOURCE_IS_LINKED(s->thread_info.state))
1279         PA_HASHMAP_FOREACH(o, s->thread_info.outputs, state)
1280             pa_source_output_update_max_rewind(o, s->thread_info.max_rewind);
1281 }
1282
1283 /* Called from main thread */
1284 void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
1285     pa_source_assert_ref(s);
1286     pa_assert_ctl_context();
1287
1288     if (PA_SOURCE_IS_LINKED(s->state))
1289         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_MAX_REWIND, NULL, max_rewind, NULL) == 0);
1290     else
1291         pa_source_set_max_rewind_within_thread(s, max_rewind);
1292 }
1293
1294 /* Called from IO thread */
1295 void pa_source_invalidate_requested_latency(pa_source *s) {
1296     pa_source_output *o;
1297     void *state = NULL;
1298
1299     pa_source_assert_ref(s);
1300     pa_source_assert_io_context(s);
1301
1302     if (!(s->flags & PA_SOURCE_DYNAMIC_LATENCY))
1303         return;
1304
1305     s->thread_info.requested_latency_valid = FALSE;
1306
1307     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1308
1309         if (s->update_requested_latency)
1310             s->update_requested_latency(s);
1311
1312         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1313             if (o->update_source_requested_latency)
1314                 o->update_source_requested_latency(o);
1315     }
1316
1317     if (s->monitor_of)
1318         pa_sink_invalidate_requested_latency(s->monitor_of);
1319 }
1320
1321 /* Called from main thread */
1322 void pa_source_set_latency_range(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1323     pa_source_assert_ref(s);
1324     pa_assert_ctl_context();
1325
1326     /* min_latency == 0:           no limit
1327      * min_latency anything else:  specified limit
1328      *
1329      * Similar for max_latency */
1330
1331     if (min_latency < ABSOLUTE_MIN_LATENCY)
1332         min_latency = ABSOLUTE_MIN_LATENCY;
1333
1334     if (max_latency <= 0 ||
1335         max_latency > ABSOLUTE_MAX_LATENCY)
1336         max_latency = ABSOLUTE_MAX_LATENCY;
1337
1338     pa_assert(min_latency <= max_latency);
1339
1340     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1341     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1342                max_latency == ABSOLUTE_MAX_LATENCY) ||
1343               (s->flags & PA_SOURCE_DYNAMIC_LATENCY));
1344
1345     if (PA_SOURCE_IS_LINKED(s->state)) {
1346         pa_usec_t r[2];
1347
1348         r[0] = min_latency;
1349         r[1] = max_latency;
1350
1351         pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_SET_LATENCY_RANGE, r, 0, NULL) == 0);
1352     } else
1353         pa_source_set_latency_range_within_thread(s, min_latency, max_latency);
1354 }
1355
1356 /* Called from main thread */
1357 void pa_source_get_latency_range(pa_source *s, pa_usec_t *min_latency, pa_usec_t *max_latency) {
1358    pa_source_assert_ref(s);
1359    pa_assert_ctl_context();
1360    pa_assert(min_latency);
1361    pa_assert(max_latency);
1362
1363    if (PA_SOURCE_IS_LINKED(s->state)) {
1364        pa_usec_t r[2] = { 0, 0 };
1365
1366        pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_LATENCY_RANGE, r, 0, NULL) == 0);
1367
1368        *min_latency = r[0];
1369        *max_latency = r[1];
1370    } else {
1371        *min_latency = s->thread_info.min_latency;
1372        *max_latency = s->thread_info.max_latency;
1373    }
1374 }
1375
1376 /* Called from IO thread, and from main thread before pa_source_put() is called */
1377 void pa_source_set_latency_range_within_thread(pa_source *s, pa_usec_t min_latency, pa_usec_t max_latency) {
1378     void *state = NULL;
1379
1380     pa_source_assert_ref(s);
1381     pa_source_assert_io_context(s);
1382
1383     pa_assert(min_latency >= ABSOLUTE_MIN_LATENCY);
1384     pa_assert(max_latency <= ABSOLUTE_MAX_LATENCY);
1385     pa_assert(min_latency <= max_latency);
1386
1387     /* Hmm, let's see if someone forgot to set PA_SOURCE_DYNAMIC_LATENCY here... */
1388     pa_assert((min_latency == ABSOLUTE_MIN_LATENCY &&
1389                max_latency == ABSOLUTE_MAX_LATENCY) ||
1390               (s->flags & PA_SOURCE_DYNAMIC_LATENCY) ||
1391               s->monitor_of);
1392
1393     s->thread_info.min_latency = min_latency;
1394     s->thread_info.max_latency = max_latency;
1395
1396     if (PA_SOURCE_IS_LINKED(s->thread_info.state)) {
1397         pa_source_output *o;
1398
1399         while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
1400             if (o->update_source_latency_range)
1401                 o->update_source_latency_range(o);
1402     }
1403
1404     pa_source_invalidate_requested_latency(s);
1405 }
1406
1407 /* Called from main thread, before the source is put */
1408 void pa_source_set_fixed_latency(pa_source *s, pa_usec_t latency) {
1409     pa_source_assert_ref(s);
1410     pa_assert_ctl_context();
1411
1412     pa_assert(pa_source_get_state(s) == PA_SOURCE_INIT);
1413
1414     if (latency < ABSOLUTE_MIN_LATENCY)
1415         latency = ABSOLUTE_MIN_LATENCY;
1416
1417     if (latency > ABSOLUTE_MAX_LATENCY)
1418         latency = ABSOLUTE_MAX_LATENCY;
1419
1420     s->fixed_latency = latency;
1421 }
1422
1423 /* Called from main thread */
1424 size_t pa_source_get_max_rewind(pa_source *s) {
1425     size_t r;
1426     pa_assert_ctl_context();
1427     pa_source_assert_ref(s);
1428
1429     if (!PA_SOURCE_IS_LINKED(s->state))
1430         return s->thread_info.max_rewind;
1431
1432     pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_GET_MAX_REWIND, &r, 0, NULL) == 0);
1433
1434     return r;
1435 }
1436
1437 /* Called from main context */
1438 int pa_source_set_port(pa_source *s, const char *name, pa_bool_t save) {
1439     pa_device_port *port;
1440
1441     pa_assert(s);
1442     pa_assert_ctl_context();
1443
1444     if (!s->set_port) {
1445         pa_log_debug("set_port() operation not implemented for source %u \"%s\"", s->index, s->name);
1446         return -PA_ERR_NOTIMPLEMENTED;
1447     }
1448
1449     if (!s->ports)
1450         return -PA_ERR_NOENTITY;
1451
1452     if (!(port = pa_hashmap_get(s->ports, name)))
1453         return -PA_ERR_NOENTITY;
1454
1455     if (s->active_port == port) {
1456         s->save_port = s->save_port || save;
1457         return 0;
1458     }
1459
1460     if ((s->set_port(s, port)) < 0)
1461         return -PA_ERR_NOENTITY;
1462
1463     pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE, s->index);
1464
1465     pa_log_info("Changed port of source %u \"%s\" to %s", s->index, s->name, port->name);
1466
1467     s->active_port = port;
1468     s->save_port = save;
1469
1470     return 0;
1471 }