2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
147 s->write_memblock = NULL;
148 s->write_data = NULL;
150 pa_memchunk_reset(&s->peek_memchunk);
152 s->record_memblockq = NULL;
154 memset(&s->timing_info, 0, sizeof(s->timing_info));
155 s->timing_info_valid = FALSE;
157 s->previous_time = 0;
159 s->read_index_not_before = 0;
160 s->write_index_not_before = 0;
161 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162 s->write_index_corrections[i].valid = 0;
163 s->current_write_index_correction = 0;
165 s->auto_timing_update_event = NULL;
166 s->auto_timing_update_requested = FALSE;
167 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
173 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174 PA_LLIST_PREPEND(pa_stream, c->streams, s);
180 static void stream_unlink(pa_stream *s) {
187 /* Detach from context */
189 /* Unref all operatio object that point to us */
190 for (o = s->context->operations; o; o = n) {
194 pa_operation_cancel(o);
197 /* Drop all outstanding replies for this stream */
198 if (s->context->pdispatch)
199 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201 if (s->channel_valid) {
202 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
204 s->channel_valid = FALSE;
207 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
212 if (s->auto_timing_update_event) {
213 pa_assert(s->mainloop);
214 s->mainloop->time_free(s->auto_timing_update_event);
220 static void stream_free(pa_stream *s) {
225 if (s->write_memblock) {
226 pa_memblock_release(s->write_memblock);
227 pa_memblock_unref(s->write_data);
230 if (s->peek_memchunk.memblock) {
232 pa_memblock_release(s->peek_memchunk.memblock);
233 pa_memblock_unref(s->peek_memchunk.memblock);
236 if (s->record_memblockq)
237 pa_memblockq_free(s->record_memblockq);
240 pa_proplist_free(s->proplist);
243 pa_smoother_free(s->smoother);
245 pa_xfree(s->device_name);
249 void pa_stream_unref(pa_stream *s) {
251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253 if (PA_REFCNT_DEC(s) <= 0)
257 pa_stream* pa_stream_ref(pa_stream *s) {
259 pa_assert(PA_REFCNT_VALUE(s) >= 1);
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
272 pa_context* pa_stream_get_context(pa_stream *s) {
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
279 uint32_t pa_stream_get_index(pa_stream *s) {
281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286 return s->stream_index;
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
300 if (s->state_callback)
301 s->state_callback(s, s->state_userdata);
303 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
313 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316 if (s->state == PA_STREAM_READY &&
317 (force || !s->auto_timing_update_requested)) {
320 /* pa_log("Automatically requesting new timing data"); */
322 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323 pa_operation_unref(o);
324 s->auto_timing_update_requested = TRUE;
328 if (s->auto_timing_update_event) {
330 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339 pa_context *c = userdata;
344 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347 pa_assert(PA_REFCNT_VALUE(c) >= 1);
351 if (pa_tagstruct_getu32(t, &channel) < 0 ||
352 !pa_tagstruct_eof(t)) {
353 pa_context_fail(c, PA_ERR_PROTOCOL);
357 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
360 if (s->state != PA_STREAM_READY)
363 pa_context_set_error(c, PA_ERR_KILLED);
364 pa_stream_set_state(s, PA_STREAM_FAILED);
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
374 pa_assert(!force_start || !force_stop);
379 x = pa_rtclock_now();
381 if (s->timing_info_valid) {
383 x -= s->timing_info.transport_usec;
385 x += s->timing_info.transport_usec;
388 if (s->suspended || s->corked || force_stop)
389 pa_smoother_pause(s->smoother, x);
390 else if (force_start || s->buffer_attr.prebuf == 0) {
392 if (!s->timing_info_valid &&
396 s->context->version >= 13) {
398 /* If the server supports STARTED events we take them as
399 * indications when audio really starts/stops playing, if
400 * we don't have any timing info yet -- instead of trying
401 * to be smart and guessing the server time. Otherwise the
402 * unknown transport delay add too much noise to our time
408 pa_smoother_resume(s->smoother, x, TRUE);
411 /* Please note that we have no idea if playback actually started
412 * if prebuf is non-zero! */
415 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416 pa_context *c = userdata;
423 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
426 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
429 pa_assert(PA_REFCNT_VALUE(c) >= 1);
433 if (c->version < 12) {
434 pa_context_fail(c, PA_ERR_PROTOCOL);
438 if (pa_tagstruct_getu32(t, &channel) < 0 ||
439 pa_tagstruct_getu32(t, &di) < 0 ||
440 pa_tagstruct_gets(t, &dn) < 0 ||
441 pa_tagstruct_get_boolean(t, &suspended) < 0) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
446 if (c->version >= 13) {
448 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
449 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
450 pa_tagstruct_getu32(t, &fragsize) < 0 ||
451 pa_tagstruct_get_usec(t, &usec) < 0) {
452 pa_context_fail(c, PA_ERR_PROTOCOL);
456 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
457 pa_tagstruct_getu32(t, &tlength) < 0 ||
458 pa_tagstruct_getu32(t, &prebuf) < 0 ||
459 pa_tagstruct_getu32(t, &minreq) < 0 ||
460 pa_tagstruct_get_usec(t, &usec) < 0) {
461 pa_context_fail(c, PA_ERR_PROTOCOL);
467 if (!pa_tagstruct_eof(t)) {
468 pa_context_fail(c, PA_ERR_PROTOCOL);
472 if (!dn || di == PA_INVALID_INDEX) {
473 pa_context_fail(c, PA_ERR_PROTOCOL);
477 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
480 if (s->state != PA_STREAM_READY)
483 if (c->version >= 13) {
484 if (s->direction == PA_STREAM_RECORD)
485 s->timing_info.configured_source_usec = usec;
487 s->timing_info.configured_sink_usec = usec;
489 s->buffer_attr.maxlength = maxlength;
490 s->buffer_attr.fragsize = fragsize;
491 s->buffer_attr.tlength = tlength;
492 s->buffer_attr.prebuf = prebuf;
493 s->buffer_attr.minreq = minreq;
496 pa_xfree(s->device_name);
497 s->device_name = pa_xstrdup(dn);
498 s->device_index = di;
500 s->suspended = suspended;
502 check_smoother_status(s, TRUE, FALSE, FALSE);
503 request_auto_timing_update(s, TRUE);
505 if (s->moved_callback)
506 s->moved_callback(s, s->moved_userdata);
512 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
513 pa_context *c = userdata;
517 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
520 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
523 pa_assert(PA_REFCNT_VALUE(c) >= 1);
527 if (c->version < 15) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
532 if (pa_tagstruct_getu32(t, &channel) < 0) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
537 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
538 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
539 pa_tagstruct_getu32(t, &fragsize) < 0 ||
540 pa_tagstruct_get_usec(t, &usec) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
545 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
546 pa_tagstruct_getu32(t, &tlength) < 0 ||
547 pa_tagstruct_getu32(t, &prebuf) < 0 ||
548 pa_tagstruct_getu32(t, &minreq) < 0 ||
549 pa_tagstruct_get_usec(t, &usec) < 0) {
550 pa_context_fail(c, PA_ERR_PROTOCOL);
555 if (!pa_tagstruct_eof(t)) {
556 pa_context_fail(c, PA_ERR_PROTOCOL);
560 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
563 if (s->state != PA_STREAM_READY)
566 if (s->direction == PA_STREAM_RECORD)
567 s->timing_info.configured_source_usec = usec;
569 s->timing_info.configured_sink_usec = usec;
571 s->buffer_attr.maxlength = maxlength;
572 s->buffer_attr.fragsize = fragsize;
573 s->buffer_attr.tlength = tlength;
574 s->buffer_attr.prebuf = prebuf;
575 s->buffer_attr.minreq = minreq;
577 request_auto_timing_update(s, TRUE);
579 if (s->buffer_attr_callback)
580 s->buffer_attr_callback(s, s->buffer_attr_userdata);
586 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
587 pa_context *c = userdata;
593 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
596 pa_assert(PA_REFCNT_VALUE(c) >= 1);
600 if (c->version < 12) {
601 pa_context_fail(c, PA_ERR_PROTOCOL);
605 if (pa_tagstruct_getu32(t, &channel) < 0 ||
606 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
607 !pa_tagstruct_eof(t)) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
612 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
615 if (s->state != PA_STREAM_READY)
618 s->suspended = suspended;
620 check_smoother_status(s, TRUE, FALSE, FALSE);
621 request_auto_timing_update(s, TRUE);
623 if (s->suspended_callback)
624 s->suspended_callback(s, s->suspended_userdata);
630 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631 pa_context *c = userdata;
636 pa_assert(command == PA_COMMAND_STARTED);
639 pa_assert(PA_REFCNT_VALUE(c) >= 1);
643 if (c->version < 13) {
644 pa_context_fail(c, PA_ERR_PROTOCOL);
648 if (pa_tagstruct_getu32(t, &channel) < 0 ||
649 !pa_tagstruct_eof(t)) {
650 pa_context_fail(c, PA_ERR_PROTOCOL);
654 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
657 if (s->state != PA_STREAM_READY)
660 check_smoother_status(s, TRUE, TRUE, FALSE);
661 request_auto_timing_update(s, TRUE);
663 if (s->started_callback)
664 s->started_callback(s, s->started_userdata);
670 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671 pa_context *c = userdata;
674 pa_proplist *pl = NULL;
678 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
681 pa_assert(PA_REFCNT_VALUE(c) >= 1);
685 if (c->version < 15) {
686 pa_context_fail(c, PA_ERR_PROTOCOL);
690 pl = pa_proplist_new();
692 if (pa_tagstruct_getu32(t, &channel) < 0 ||
693 pa_tagstruct_gets(t, &event) < 0 ||
694 pa_tagstruct_get_proplist(t, pl) < 0 ||
695 !pa_tagstruct_eof(t) || !event) {
696 pa_context_fail(c, PA_ERR_PROTOCOL);
700 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
703 if (s->state != PA_STREAM_READY)
706 if (s->event_callback)
707 s->event_callback(s, event, pl, s->event_userdata);
713 pa_proplist_free(pl);
716 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718 pa_context *c = userdata;
719 uint32_t bytes, channel;
722 pa_assert(command == PA_COMMAND_REQUEST);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
729 if (pa_tagstruct_getu32(t, &channel) < 0 ||
730 pa_tagstruct_getu32(t, &bytes) < 0 ||
731 !pa_tagstruct_eof(t)) {
732 pa_context_fail(c, PA_ERR_PROTOCOL);
736 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
739 if (s->state != PA_STREAM_READY)
742 s->requested_bytes += bytes;
744 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
746 if (s->requested_bytes > 0 && s->write_callback)
747 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
753 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
755 pa_context *c = userdata;
759 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
762 pa_assert(PA_REFCNT_VALUE(c) >= 1);
766 if (pa_tagstruct_getu32(t, &channel) < 0 ||
767 !pa_tagstruct_eof(t)) {
768 pa_context_fail(c, PA_ERR_PROTOCOL);
772 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
775 if (s->state != PA_STREAM_READY)
778 if (s->buffer_attr.prebuf > 0)
779 check_smoother_status(s, TRUE, FALSE, TRUE);
781 request_auto_timing_update(s, TRUE);
783 if (command == PA_COMMAND_OVERFLOW) {
784 if (s->overflow_callback)
785 s->overflow_callback(s, s->overflow_userdata);
786 } else if (command == PA_COMMAND_UNDERFLOW) {
787 if (s->underflow_callback)
788 s->underflow_callback(s, s->underflow_userdata);
795 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
797 pa_assert(PA_REFCNT_VALUE(s) >= 1);
799 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
801 if (s->state != PA_STREAM_READY)
805 s->write_index_not_before = s->context->ctag;
807 if (s->timing_info_valid)
808 s->timing_info.write_index_corrupt = TRUE;
810 /* pa_log("write_index invalidated"); */
814 s->read_index_not_before = s->context->ctag;
816 if (s->timing_info_valid)
817 s->timing_info.read_index_corrupt = TRUE;
819 /* pa_log("read_index invalidated"); */
822 request_auto_timing_update(s, TRUE);
825 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
826 pa_stream *s = userdata;
829 pa_assert(PA_REFCNT_VALUE(s) >= 1);
832 request_auto_timing_update(s, FALSE);
836 static void create_stream_complete(pa_stream *s) {
838 pa_assert(PA_REFCNT_VALUE(s) >= 1);
839 pa_assert(s->state == PA_STREAM_CREATING);
841 pa_stream_set_state(s, PA_STREAM_READY);
843 if (s->requested_bytes > 0 && s->write_callback)
844 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
846 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
847 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
848 pa_assert(!s->auto_timing_update_event);
849 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
851 request_auto_timing_update(s, TRUE);
854 check_smoother_status(s, TRUE, FALSE, FALSE);
857 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
862 if (s->context->version >= 13)
865 /* Version older than 0.9.10 didn't do server side buffer_attr
866 * selection, hence we have to fake it on the client side. */
868 /* We choose fairly conservative values here, to not confuse
869 * old clients with extremely large playback buffers */
871 if (attr->maxlength == (uint32_t) -1)
872 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
874 if (attr->tlength == (uint32_t) -1)
875 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
877 if (attr->minreq == (uint32_t) -1)
878 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
880 if (attr->prebuf == (uint32_t) -1)
881 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
883 if (attr->fragsize == (uint32_t) -1)
884 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
887 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
888 pa_stream *s = userdata;
889 uint32_t requested_bytes = 0;
893 pa_assert(PA_REFCNT_VALUE(s) >= 1);
894 pa_assert(s->state == PA_STREAM_CREATING);
898 if (command != PA_COMMAND_REPLY) {
899 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
902 pa_stream_set_state(s, PA_STREAM_FAILED);
906 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
907 s->channel == PA_INVALID_INDEX ||
908 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
909 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
910 pa_context_fail(s->context, PA_ERR_PROTOCOL);
914 s->requested_bytes = (int64_t) requested_bytes;
916 if (s->context->version >= 9) {
917 if (s->direction == PA_STREAM_PLAYBACK) {
918 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
919 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
920 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
921 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
922 pa_context_fail(s->context, PA_ERR_PROTOCOL);
925 } else if (s->direction == PA_STREAM_RECORD) {
926 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
927 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
928 pa_context_fail(s->context, PA_ERR_PROTOCOL);
934 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
937 const char *dn = NULL;
940 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
941 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
942 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
943 pa_tagstruct_gets(t, &dn) < 0 ||
944 pa_tagstruct_get_boolean(t, &suspended) < 0) {
945 pa_context_fail(s->context, PA_ERR_PROTOCOL);
949 if (!dn || s->device_index == PA_INVALID_INDEX ||
950 ss.channels != cm.channels ||
951 !pa_channel_map_valid(&cm) ||
952 !pa_sample_spec_valid(&ss) ||
953 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
954 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
955 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
956 pa_context_fail(s->context, PA_ERR_PROTOCOL);
960 pa_xfree(s->device_name);
961 s->device_name = pa_xstrdup(dn);
962 s->suspended = suspended;
968 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
971 if (pa_tagstruct_get_usec(t, &usec) < 0) {
972 pa_context_fail(s->context, PA_ERR_PROTOCOL);
976 if (s->direction == PA_STREAM_RECORD)
977 s->timing_info.configured_source_usec = usec;
979 s->timing_info.configured_sink_usec = usec;
982 if (!pa_tagstruct_eof(t)) {
983 pa_context_fail(s->context, PA_ERR_PROTOCOL);
987 if (s->direction == PA_STREAM_RECORD) {
988 pa_assert(!s->record_memblockq);
990 s->record_memblockq = pa_memblockq_new(
992 s->buffer_attr.maxlength,
994 pa_frame_size(&s->sample_spec),
1001 s->channel_valid = TRUE;
1002 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1004 create_stream_complete(s);
1010 static int create_stream(
1011 pa_stream_direction_t direction,
1014 const pa_buffer_attr *attr,
1015 pa_stream_flags_t flags,
1016 const pa_cvolume *volume,
1017 pa_stream *sync_stream) {
1021 pa_bool_t volume_set = FALSE;
1024 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1025 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1027 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1028 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1029 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1030 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1031 PA_STREAM_INTERPOLATE_TIMING|
1032 PA_STREAM_NOT_MONOTONIC|
1033 PA_STREAM_AUTO_TIMING_UPDATE|
1034 PA_STREAM_NO_REMAP_CHANNELS|
1035 PA_STREAM_NO_REMIX_CHANNELS|
1036 PA_STREAM_FIX_FORMAT|
1038 PA_STREAM_FIX_CHANNELS|
1039 PA_STREAM_DONT_MOVE|
1040 PA_STREAM_VARIABLE_RATE|
1041 PA_STREAM_PEAK_DETECT|
1042 PA_STREAM_START_MUTED|
1043 PA_STREAM_ADJUST_LATENCY|
1044 PA_STREAM_EARLY_REQUESTS|
1045 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1046 PA_STREAM_START_UNMUTED|
1047 PA_STREAM_FAIL_ON_SUSPEND|
1048 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1050 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1051 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1052 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1053 /* Althought some of the other flags are not supported on older
1054 * version, we don't check for them here, because it doesn't hurt
1055 * when they are passed but actually not supported. This makes
1056 * client development easier */
1058 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1059 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1060 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1061 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1062 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1066 s->direction = direction;
1068 s->corked = !!(flags & PA_STREAM_START_CORKED);
1071 s->syncid = sync_stream->syncid;
1074 s->buffer_attr = *attr;
1075 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1077 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1080 x = pa_rtclock_now();
1082 pa_assert(!s->smoother);
1083 s->smoother = pa_smoother_new(
1084 SMOOTHER_ADJUST_TIME,
1085 SMOOTHER_HISTORY_TIME,
1086 !(flags & PA_STREAM_NOT_MONOTONIC),
1088 SMOOTHER_MIN_HISTORY,
1094 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1096 t = pa_tagstruct_command(
1098 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1101 if (s->context->version < 13)
1102 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1106 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1107 PA_TAG_CHANNEL_MAP, &s->channel_map,
1108 PA_TAG_U32, PA_INVALID_INDEX,
1110 PA_TAG_U32, s->buffer_attr.maxlength,
1111 PA_TAG_BOOLEAN, s->corked,
1114 if (s->direction == PA_STREAM_PLAYBACK) {
1119 PA_TAG_U32, s->buffer_attr.tlength,
1120 PA_TAG_U32, s->buffer_attr.prebuf,
1121 PA_TAG_U32, s->buffer_attr.minreq,
1122 PA_TAG_U32, s->syncid,
1125 volume_set = !!volume;
1128 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1130 pa_tagstruct_put_cvolume(t, volume);
1132 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1134 if (s->context->version >= 12) {
1137 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1138 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1139 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1140 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1141 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1142 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1143 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1147 if (s->context->version >= 13) {
1149 if (s->direction == PA_STREAM_PLAYBACK)
1150 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1152 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1156 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1157 PA_TAG_PROPLIST, s->proplist,
1160 if (s->direction == PA_STREAM_RECORD)
1161 pa_tagstruct_putu32(t, s->direct_on_input);
1164 if (s->context->version >= 14) {
1166 if (s->direction == PA_STREAM_PLAYBACK)
1167 pa_tagstruct_put_boolean(t, volume_set);
1169 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1172 if (s->context->version >= 15) {
1174 if (s->direction == PA_STREAM_PLAYBACK)
1175 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1177 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1178 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1181 if (s->context->version >= 17) {
1183 if (s->direction == PA_STREAM_PLAYBACK)
1184 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1188 pa_pstream_send_tagstruct(s->context->pstream, t);
1189 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1191 pa_stream_set_state(s, PA_STREAM_CREATING);
1197 int pa_stream_connect_playback(
1200 const pa_buffer_attr *attr,
1201 pa_stream_flags_t flags,
1202 const pa_cvolume *volume,
1203 pa_stream *sync_stream) {
1206 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1208 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1211 int pa_stream_connect_record(
1214 const pa_buffer_attr *attr,
1215 pa_stream_flags_t flags) {
1218 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1220 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1223 int pa_stream_begin_write(
1229 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1231 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1232 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1233 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1234 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1235 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1237 if (*nbytes != (size_t) -1) {
1240 m = pa_mempool_block_size_max(s->context->mempool);
1241 fs = pa_frame_size(&s->sample_spec);
1248 if (!s->write_memblock) {
1249 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1250 s->write_data = pa_memblock_acquire(s->write_memblock);
1253 *data = s->write_data;
1254 *nbytes = pa_memblock_get_length(s->write_memblock);
1259 int pa_stream_cancel_write(
1263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1265 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1266 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1267 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1268 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1270 pa_assert(s->write_data);
1272 pa_memblock_release(s->write_memblock);
1273 pa_memblock_unref(s->write_memblock);
1274 s->write_memblock = NULL;
1275 s->write_data = NULL;
1280 int pa_stream_write(
1284 pa_free_cb_t free_cb,
1286 pa_seek_mode_t seek) {
1289 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1292 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1293 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1294 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1295 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1296 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1297 PA_CHECK_VALIDITY(s->context,
1298 !s->write_memblock ||
1299 ((data >= s->write_data) &&
1300 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1302 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1304 if (s->write_memblock) {
1307 /* pa_stream_write_begin() was called before */
1309 pa_memblock_release(s->write_memblock);
1311 chunk.memblock = s->write_memblock;
1312 chunk.index = (const char *) data - (const char *) s->write_data;
1313 chunk.length = length;
1315 s->write_memblock = NULL;
1316 s->write_data = NULL;
1318 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1319 pa_memblock_unref(chunk.memblock);
1322 pa_seek_mode_t t_seek = seek;
1323 int64_t t_offset = offset;
1324 size_t t_length = length;
1325 const void *t_data = data;
1327 /* pa_stream_write_begin() was not called before */
1329 while (t_length > 0) {
1334 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1335 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1336 chunk.length = t_length;
1340 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1341 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1343 d = pa_memblock_acquire(chunk.memblock);
1344 memcpy(d, t_data, chunk.length);
1345 pa_memblock_release(chunk.memblock);
1348 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1351 t_seek = PA_SEEK_RELATIVE;
1353 t_data = (const uint8_t*) t_data + chunk.length;
1354 t_length -= chunk.length;
1356 pa_memblock_unref(chunk.memblock);
1359 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1360 free_cb((void*) data);
1363 /* This is obviously wrong since we ignore the seeking index . But
1364 * that's OK, the server side applies the same error */
1365 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1367 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1369 if (s->direction == PA_STREAM_PLAYBACK) {
1371 /* Update latency request correction */
1372 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1374 if (seek == PA_SEEK_ABSOLUTE) {
1375 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1376 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1377 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1378 } else if (seek == PA_SEEK_RELATIVE) {
1379 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1380 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1382 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1385 /* Update the write index in the already available latency data */
1386 if (s->timing_info_valid) {
1388 if (seek == PA_SEEK_ABSOLUTE) {
1389 s->timing_info.write_index_corrupt = FALSE;
1390 s->timing_info.write_index = offset + (int64_t) length;
1391 } else if (seek == PA_SEEK_RELATIVE) {
1392 if (!s->timing_info.write_index_corrupt)
1393 s->timing_info.write_index += offset + (int64_t) length;
1395 s->timing_info.write_index_corrupt = TRUE;
1398 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1399 request_auto_timing_update(s, TRUE);
1405 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1407 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1411 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1412 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1413 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1415 if (!s->peek_memchunk.memblock) {
1417 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1423 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1426 pa_assert(s->peek_data);
1427 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1428 *length = s->peek_memchunk.length;
1432 int pa_stream_drop(pa_stream *s) {
1434 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1436 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1437 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1438 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1439 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1441 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1443 /* Fix the simulated local read index */
1444 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1445 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1447 pa_assert(s->peek_data);
1448 pa_memblock_release(s->peek_memchunk.memblock);
1449 pa_memblock_unref(s->peek_memchunk.memblock);
1450 pa_memchunk_reset(&s->peek_memchunk);
1455 size_t pa_stream_writable_size(pa_stream *s) {
1457 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1459 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1460 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1461 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1463 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1466 size_t pa_stream_readable_size(pa_stream *s) {
1468 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1470 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1471 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1472 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1474 return pa_memblockq_get_length(s->record_memblockq);
1477 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1483 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1485 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1486 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1487 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1489 /* Ask for a timing update before we cork/uncork to get the best
1490 * accuracy for the transport latency suitable for the
1491 * check_smoother_status() call in the started callback */
1492 request_auto_timing_update(s, TRUE);
1494 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1496 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1497 pa_tagstruct_putu32(t, s->channel);
1498 pa_pstream_send_tagstruct(s->context->pstream, t);
1499 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1501 /* This might cause the read index to conitnue again, hence
1502 * let's request a timing update */
1503 request_auto_timing_update(s, TRUE);
1508 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1512 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1513 pa_assert(s->state == PA_STREAM_READY);
1514 pa_assert(s->direction != PA_STREAM_UPLOAD);
1515 pa_assert(s->timing_info_valid);
1516 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1517 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1519 if (s->direction == PA_STREAM_PLAYBACK) {
1520 /* The last byte that was written into the output device
1521 * had this time value associated */
1522 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1524 if (!s->corked && !s->suspended) {
1526 if (!ignore_transport)
1527 /* Because the latency info took a little time to come
1528 * to us, we assume that the real output time is actually
1530 usec += s->timing_info.transport_usec;
1532 /* However, the output device usually maintains a buffer
1533 too, hence the real sample currently played is a little
1535 if (s->timing_info.sink_usec >= usec)
1538 usec -= s->timing_info.sink_usec;
1542 pa_assert(s->direction == PA_STREAM_RECORD);
1544 /* The last byte written into the server side queue had
1545 * this time value associated */
1546 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1548 if (!s->corked && !s->suspended) {
1550 if (!ignore_transport)
1551 /* Add transport latency */
1552 usec += s->timing_info.transport_usec;
1554 /* Add latency of data in device buffer */
1555 usec += s->timing_info.source_usec;
1557 /* If this is a monitor source, we need to correct the
1558 * time by the playback device buffer */
1559 if (s->timing_info.sink_usec >= usec)
1562 usec -= s->timing_info.sink_usec;
1569 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1570 pa_operation *o = userdata;
1571 struct timeval local, remote, now;
1573 pa_bool_t playing = FALSE;
1574 uint64_t underrun_for = 0, playing_for = 0;
1578 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1580 if (!o->context || !o->stream)
1583 i = &o->stream->timing_info;
1585 o->stream->timing_info_valid = FALSE;
1586 i->write_index_corrupt = TRUE;
1587 i->read_index_corrupt = TRUE;
1589 if (command != PA_COMMAND_REPLY) {
1590 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1595 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1596 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1597 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1598 pa_tagstruct_get_timeval(t, &local) < 0 ||
1599 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1600 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1601 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1603 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1607 if (o->context->version >= 13 &&
1608 o->stream->direction == PA_STREAM_PLAYBACK)
1609 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1610 pa_tagstruct_getu64(t, &playing_for) < 0) {
1612 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1617 if (!pa_tagstruct_eof(t)) {
1618 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1621 o->stream->timing_info_valid = TRUE;
1622 i->write_index_corrupt = FALSE;
1623 i->read_index_corrupt = FALSE;
1625 i->playing = (int) playing;
1626 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1628 pa_gettimeofday(&now);
1630 /* Calculcate timestamps */
1631 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1632 /* local and remote seem to have synchronized clocks */
1634 if (o->stream->direction == PA_STREAM_PLAYBACK)
1635 i->transport_usec = pa_timeval_diff(&remote, &local);
1637 i->transport_usec = pa_timeval_diff(&now, &remote);
1639 i->synchronized_clocks = TRUE;
1640 i->timestamp = remote;
1642 /* clocks are not synchronized, let's estimate latency then */
1643 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1644 i->synchronized_clocks = FALSE;
1645 i->timestamp = local;
1646 pa_timeval_add(&i->timestamp, i->transport_usec);
1649 /* Invalidate read and write indexes if necessary */
1650 if (tag < o->stream->read_index_not_before)
1651 i->read_index_corrupt = TRUE;
1653 if (tag < o->stream->write_index_not_before)
1654 i->write_index_corrupt = TRUE;
1656 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1657 /* Write index correction */
1660 uint32_t ctag = tag;
1662 /* Go through the saved correction values and add up the
1663 * total correction.*/
1664 for (n = 0, j = o->stream->current_write_index_correction+1;
1665 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1666 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1668 /* Step over invalid data or out-of-date data */
1669 if (!o->stream->write_index_corrections[j].valid ||
1670 o->stream->write_index_corrections[j].tag < ctag)
1673 /* Make sure that everything is in order */
1674 ctag = o->stream->write_index_corrections[j].tag+1;
1676 /* Now fix the write index */
1677 if (o->stream->write_index_corrections[j].corrupt) {
1678 /* A corrupting seek was made */
1679 i->write_index_corrupt = TRUE;
1680 } else if (o->stream->write_index_corrections[j].absolute) {
1681 /* An absolute seek was made */
1682 i->write_index = o->stream->write_index_corrections[j].value;
1683 i->write_index_corrupt = FALSE;
1684 } else if (!i->write_index_corrupt) {
1685 /* A relative seek was made */
1686 i->write_index += o->stream->write_index_corrections[j].value;
1690 /* Clear old correction entries */
1691 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1692 if (!o->stream->write_index_corrections[n].valid)
1695 if (o->stream->write_index_corrections[n].tag <= tag)
1696 o->stream->write_index_corrections[n].valid = FALSE;
1700 if (o->stream->direction == PA_STREAM_RECORD) {
1701 /* Read index correction */
1703 if (!i->read_index_corrupt)
1704 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1707 /* Update smoother */
1708 if (o->stream->smoother) {
1711 u = x = pa_rtclock_now() - i->transport_usec;
1713 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1716 /* If we weren't playing then it will take some time
1717 * until the audio will actually come out through the
1718 * speakers. Since we follow that timing here, we need
1719 * to try to fix this up */
1721 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1723 if (su < i->sink_usec)
1724 x += i->sink_usec - su;
1728 pa_smoother_pause(o->stream->smoother, x);
1730 /* Update the smoother */
1731 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1732 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1733 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1736 pa_smoother_resume(o->stream->smoother, x, TRUE);
1740 o->stream->auto_timing_update_requested = FALSE;
1742 if (o->stream->latency_update_callback)
1743 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1745 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1746 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1747 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1752 pa_operation_done(o);
1753 pa_operation_unref(o);
1756 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1764 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1766 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1767 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1768 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1770 if (s->direction == PA_STREAM_PLAYBACK) {
1771 /* Find a place to store the write_index correction data for this entry */
1772 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1774 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1775 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1777 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1779 t = pa_tagstruct_command(
1781 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1783 pa_tagstruct_putu32(t, s->channel);
1784 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1786 pa_pstream_send_tagstruct(s->context->pstream, t);
1787 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1789 if (s->direction == PA_STREAM_PLAYBACK) {
1790 /* Fill in initial correction data */
1792 s->current_write_index_correction = cidx;
1794 s->write_index_corrections[cidx].valid = TRUE;
1795 s->write_index_corrections[cidx].absolute = FALSE;
1796 s->write_index_corrections[cidx].corrupt = FALSE;
1797 s->write_index_corrections[cidx].tag = tag;
1798 s->write_index_corrections[cidx].value = 0;
1804 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1805 pa_stream *s = userdata;
1809 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1813 if (command != PA_COMMAND_REPLY) {
1814 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1817 pa_stream_set_state(s, PA_STREAM_FAILED);
1819 } else if (!pa_tagstruct_eof(t)) {
1820 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1824 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1830 int pa_stream_disconnect(pa_stream *s) {
1835 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1837 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1838 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1839 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1843 t = pa_tagstruct_command(
1845 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1846 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1848 pa_tagstruct_putu32(t, s->channel);
1849 pa_pstream_send_tagstruct(s->context->pstream, t);
1850 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1856 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1858 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1860 if (pa_detect_fork())
1863 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1866 s->read_callback = cb;
1867 s->read_userdata = userdata;
1870 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1872 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1874 if (pa_detect_fork())
1877 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1880 s->write_callback = cb;
1881 s->write_userdata = userdata;
1884 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1886 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1888 if (pa_detect_fork())
1891 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1894 s->state_callback = cb;
1895 s->state_userdata = userdata;
1898 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1900 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1902 if (pa_detect_fork())
1905 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1908 s->overflow_callback = cb;
1909 s->overflow_userdata = userdata;
1912 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1914 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916 if (pa_detect_fork())
1919 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1922 s->underflow_callback = cb;
1923 s->underflow_userdata = userdata;
1926 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1928 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930 if (pa_detect_fork())
1933 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1936 s->latency_update_callback = cb;
1937 s->latency_update_userdata = userdata;
1940 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1942 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944 if (pa_detect_fork())
1947 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1950 s->moved_callback = cb;
1951 s->moved_userdata = userdata;
1954 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1956 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1958 if (pa_detect_fork())
1961 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1964 s->suspended_callback = cb;
1965 s->suspended_userdata = userdata;
1968 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1970 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972 if (pa_detect_fork())
1975 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1978 s->started_callback = cb;
1979 s->started_userdata = userdata;
1982 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1984 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986 if (pa_detect_fork())
1989 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1992 s->event_callback = cb;
1993 s->event_userdata = userdata;
1996 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1998 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000 if (pa_detect_fork())
2003 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2006 s->buffer_attr_callback = cb;
2007 s->buffer_attr_userdata = userdata;
2010 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2011 pa_operation *o = userdata;
2016 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2021 if (command != PA_COMMAND_REPLY) {
2022 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2026 } else if (!pa_tagstruct_eof(t)) {
2027 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2032 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2033 cb(o->stream, success, o->userdata);
2037 pa_operation_done(o);
2038 pa_operation_unref(o);
2041 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2047 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2050 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2051 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2053 /* Ask for a timing update before we cork/uncork to get the best
2054 * accuracy for the transport latency suitable for the
2055 * check_smoother_status() call in the started callback */
2056 request_auto_timing_update(s, TRUE);
2060 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2062 t = pa_tagstruct_command(
2064 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2066 pa_tagstruct_putu32(t, s->channel);
2067 pa_tagstruct_put_boolean(t, !!b);
2068 pa_pstream_send_tagstruct(s->context->pstream, t);
2069 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2071 check_smoother_status(s, FALSE, FALSE, FALSE);
2073 /* This might cause the indexes to hang/start again, hence let's
2074 * request a timing update, after the cork/uncork, too */
2075 request_auto_timing_update(s, TRUE);
2080 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2086 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2088 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2089 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2091 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2093 t = pa_tagstruct_command(s->context, command, &tag);
2094 pa_tagstruct_putu32(t, s->channel);
2095 pa_pstream_send_tagstruct(s->context->pstream, t);
2096 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2101 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2105 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2108 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2109 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2111 /* Ask for a timing update *before* the flush, so that the
2112 * transport usec is as up to date as possible when we get the
2113 * underflow message and update the smoother status*/
2114 request_auto_timing_update(s, TRUE);
2116 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2119 if (s->direction == PA_STREAM_PLAYBACK) {
2121 if (s->write_index_corrections[s->current_write_index_correction].valid)
2122 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2124 if (s->buffer_attr.prebuf > 0)
2125 check_smoother_status(s, FALSE, FALSE, TRUE);
2127 /* This will change the write index, but leave the
2128 * read index untouched. */
2129 invalidate_indexes(s, FALSE, TRUE);
2132 /* For record streams this has no influence on the write
2133 * index, but the read index might jump. */
2134 invalidate_indexes(s, TRUE, FALSE);
2139 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2143 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2145 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2146 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2147 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2148 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2150 /* Ask for a timing update before we cork/uncork to get the best
2151 * accuracy for the transport latency suitable for the
2152 * check_smoother_status() call in the started callback */
2153 request_auto_timing_update(s, TRUE);
2155 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2158 /* This might cause the read index to hang again, hence
2159 * let's request a timing update */
2160 request_auto_timing_update(s, TRUE);
2165 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2169 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2171 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2172 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2173 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2174 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2176 /* Ask for a timing update before we cork/uncork to get the best
2177 * accuracy for the transport latency suitable for the
2178 * check_smoother_status() call in the started callback */
2179 request_auto_timing_update(s, TRUE);
2181 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2184 /* This might cause the read index to start moving again, hence
2185 * let's request a timing update */
2186 request_auto_timing_update(s, TRUE);
2191 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2195 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2202 if (s->context->version >= 13) {
2203 pa_proplist *p = pa_proplist_new();
2205 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2206 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2207 pa_proplist_free(p);
2212 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2213 t = pa_tagstruct_command(
2215 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2217 pa_tagstruct_putu32(t, s->channel);
2218 pa_tagstruct_puts(t, name);
2219 pa_pstream_send_tagstruct(s->context->pstream, t);
2220 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2226 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2230 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2232 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2235 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2236 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2237 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2240 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2242 usec = calc_time(s, FALSE);
2244 /* Make sure the time runs monotonically */
2245 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2246 if (usec < s->previous_time)
2247 usec = s->previous_time;
2249 s->previous_time = usec;
2258 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2268 if (negative && s->direction == PA_STREAM_RECORD) {
2276 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2282 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2285 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2286 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2287 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2288 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2289 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2290 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2292 if ((r = pa_stream_get_time(s, &t)) < 0)
2295 if (s->direction == PA_STREAM_PLAYBACK)
2296 cindex = s->timing_info.write_index;
2298 cindex = s->timing_info.read_index;
2303 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2305 if (s->direction == PA_STREAM_PLAYBACK)
2306 *r_usec = time_counter_diff(s, c, t, negative);
2308 *r_usec = time_counter_diff(s, t, c, negative);
2313 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2315 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2317 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2318 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2319 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2320 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2322 return &s->timing_info;
2325 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2327 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2331 return &s->sample_spec;
2334 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2336 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2340 return &s->channel_map;
2343 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2345 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2347 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2348 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2349 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2351 return &s->buffer_attr;
2354 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2355 pa_operation *o = userdata;
2360 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2365 if (command != PA_COMMAND_REPLY) {
2366 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2371 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2372 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2373 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2374 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2375 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2376 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2379 } else if (o->stream->direction == PA_STREAM_RECORD) {
2380 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2381 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2382 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2387 if (o->stream->context->version >= 13) {
2390 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2391 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2395 if (o->stream->direction == PA_STREAM_RECORD)
2396 o->stream->timing_info.configured_source_usec = usec;
2398 o->stream->timing_info.configured_sink_usec = usec;
2401 if (!pa_tagstruct_eof(t)) {
2402 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2408 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2409 cb(o->stream, success, o->userdata);
2413 pa_operation_done(o);
2414 pa_operation_unref(o);
2418 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2424 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2427 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2428 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2429 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2430 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2432 /* Ask for a timing update before we cork/uncork to get the best
2433 * accuracy for the transport latency suitable for the
2434 * check_smoother_status() call in the started callback */
2435 request_auto_timing_update(s, TRUE);
2437 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2439 t = pa_tagstruct_command(
2441 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2443 pa_tagstruct_putu32(t, s->channel);
2445 pa_tagstruct_putu32(t, attr->maxlength);
2447 if (s->direction == PA_STREAM_PLAYBACK)
2450 PA_TAG_U32, attr->tlength,
2451 PA_TAG_U32, attr->prebuf,
2452 PA_TAG_U32, attr->minreq,
2455 pa_tagstruct_putu32(t, attr->fragsize);
2457 if (s->context->version >= 13)
2458 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2460 if (s->context->version >= 14)
2461 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2463 pa_pstream_send_tagstruct(s->context->pstream, t);
2464 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2466 /* This might cause changes in the read/write indexex, hence let's
2467 * request a timing update */
2468 request_auto_timing_update(s, TRUE);
2473 uint32_t pa_stream_get_device_index(pa_stream *s) {
2475 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2477 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2478 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2479 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2480 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2481 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2483 return s->device_index;
2486 const char *pa_stream_get_device_name(pa_stream *s) {
2488 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2490 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2491 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2492 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2493 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2494 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2496 return s->device_name;
2499 int pa_stream_is_suspended(pa_stream *s) {
2501 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2503 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2504 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2505 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2506 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2508 return s->suspended;
2511 int pa_stream_is_corked(pa_stream *s) {
2513 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2515 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2516 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2517 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2522 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2523 pa_operation *o = userdata;
2528 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2533 if (command != PA_COMMAND_REPLY) {
2534 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2540 if (!pa_tagstruct_eof(t)) {
2541 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2546 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2547 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2550 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2551 cb(o->stream, success, o->userdata);
2555 pa_operation_done(o);
2556 pa_operation_unref(o);
2560 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2566 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2569 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2570 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2572 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2573 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2575 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2576 o->private = PA_UINT_TO_PTR(rate);
2578 t = pa_tagstruct_command(
2580 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2582 pa_tagstruct_putu32(t, s->channel);
2583 pa_tagstruct_putu32(t, rate);
2585 pa_pstream_send_tagstruct(s->context->pstream, t);
2586 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2591 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2597 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2602 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2603 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2605 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2607 t = pa_tagstruct_command(
2609 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2611 pa_tagstruct_putu32(t, s->channel);
2612 pa_tagstruct_putu32(t, (uint32_t) mode);
2613 pa_tagstruct_put_proplist(t, p);
2615 pa_pstream_send_tagstruct(s->context->pstream, t);
2616 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2618 /* Please note that we don't update s->proplist here, because we
2619 * don't export that field */
2624 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2628 const char * const*k;
2631 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2634 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2635 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2636 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2637 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2639 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2641 t = pa_tagstruct_command(
2643 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2645 pa_tagstruct_putu32(t, s->channel);
2647 for (k = keys; *k; k++)
2648 pa_tagstruct_puts(t, *k);
2650 pa_tagstruct_puts(t, NULL);
2652 pa_pstream_send_tagstruct(s->context->pstream, t);
2653 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2655 /* Please note that we don't update s->proplist here, because we
2656 * don't export that field */
2661 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2663 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2666 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2667 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2668 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2670 s->direct_on_input = sink_input_idx;
2675 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2677 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2679 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2680 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2681 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2683 return s->direct_on_input;