2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
44 #include "fork-detect.h"
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 pa_stream *pa_stream_new_with_proplist(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
106 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
108 s = pa_xnew(pa_stream, 1);
111 s->mainloop = c->mainloop;
113 s->direction = PA_STREAM_NODIRECTION;
114 s->state = PA_STREAM_UNCONNECTED;
117 s->sample_spec = *ss;
118 s->channel_map = *map;
120 s->direct_on_input = PA_INVALID_INDEX;
122 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
124 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
127 s->channel_valid = FALSE;
128 s->syncid = c->csyncid++;
129 s->stream_index = PA_INVALID_INDEX;
131 s->requested_bytes = 0;
132 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
134 /* We initialize der target length here, so that if the user
135 * passes no explicit buffering metrics the default is similar to
136 * what older PA versions provided. */
138 s->buffer_attr.maxlength = (uint32_t) -1;
139 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140 s->buffer_attr.minreq = (uint32_t) -1;
141 s->buffer_attr.prebuf = (uint32_t) -1;
142 s->buffer_attr.fragsize = (uint32_t) -1;
144 s->device_index = PA_INVALID_INDEX;
145 s->device_name = NULL;
146 s->suspended = FALSE;
149 s->write_memblock = NULL;
150 s->write_data = NULL;
152 pa_memchunk_reset(&s->peek_memchunk);
154 s->record_memblockq = NULL;
156 memset(&s->timing_info, 0, sizeof(s->timing_info));
157 s->timing_info_valid = FALSE;
159 s->previous_time = 0;
161 s->read_index_not_before = 0;
162 s->write_index_not_before = 0;
163 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164 s->write_index_corrections[i].valid = 0;
165 s->current_write_index_correction = 0;
167 s->auto_timing_update_event = NULL;
168 s->auto_timing_update_requested = FALSE;
169 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
175 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176 PA_LLIST_PREPEND(pa_stream, c->streams, s);
182 static void stream_unlink(pa_stream *s) {
189 /* Detach from context */
191 /* Unref all operatio object that point to us */
192 for (o = s->context->operations; o; o = n) {
196 pa_operation_cancel(o);
199 /* Drop all outstanding replies for this stream */
200 if (s->context->pdispatch)
201 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
203 if (s->channel_valid) {
204 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
206 s->channel_valid = FALSE;
209 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
214 if (s->auto_timing_update_event) {
215 pa_assert(s->mainloop);
216 s->mainloop->time_free(s->auto_timing_update_event);
222 static void stream_free(pa_stream *s) {
227 if (s->write_memblock) {
228 pa_memblock_release(s->write_memblock);
229 pa_memblock_unref(s->write_data);
232 if (s->peek_memchunk.memblock) {
234 pa_memblock_release(s->peek_memchunk.memblock);
235 pa_memblock_unref(s->peek_memchunk.memblock);
238 if (s->record_memblockq)
239 pa_memblockq_free(s->record_memblockq);
242 pa_proplist_free(s->proplist);
245 pa_smoother_free(s->smoother);
247 pa_xfree(s->device_name);
251 void pa_stream_unref(pa_stream *s) {
253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
255 if (PA_REFCNT_DEC(s) <= 0)
259 pa_stream* pa_stream_ref(pa_stream *s) {
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
269 pa_assert(PA_REFCNT_VALUE(s) >= 1);
274 pa_context* pa_stream_get_context(pa_stream *s) {
276 pa_assert(PA_REFCNT_VALUE(s) >= 1);
281 uint32_t pa_stream_get_index(pa_stream *s) {
283 pa_assert(PA_REFCNT_VALUE(s) >= 1);
285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
288 return s->stream_index;
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
293 pa_assert(PA_REFCNT_VALUE(s) >= 1);
302 if (s->state_callback)
303 s->state_callback(s, s->state_userdata);
305 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
313 pa_assert(PA_REFCNT_VALUE(s) >= 1);
315 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
318 if (s->state == PA_STREAM_READY &&
319 (force || !s->auto_timing_update_requested)) {
322 /* pa_log("Automatically requesting new timing data"); */
324 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325 pa_operation_unref(o);
326 s->auto_timing_update_requested = TRUE;
330 if (s->auto_timing_update_event) {
332 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
334 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
336 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341 pa_context *c = userdata;
346 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
349 pa_assert(PA_REFCNT_VALUE(c) >= 1);
353 if (pa_tagstruct_getu32(t, &channel) < 0 ||
354 !pa_tagstruct_eof(t)) {
355 pa_context_fail(c, PA_ERR_PROTOCOL);
359 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
362 if (s->state != PA_STREAM_READY)
365 pa_context_set_error(c, PA_ERR_KILLED);
366 pa_stream_set_state(s, PA_STREAM_FAILED);
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
376 pa_assert(!force_start || !force_stop);
381 x = pa_rtclock_now();
383 if (s->timing_info_valid) {
385 x -= s->timing_info.transport_usec;
387 x += s->timing_info.transport_usec;
390 if (s->suspended || s->corked || force_stop)
391 pa_smoother_pause(s->smoother, x);
392 else if (force_start || s->buffer_attr.prebuf == 0) {
394 if (!s->timing_info_valid &&
398 s->context->version >= 13) {
400 /* If the server supports STARTED events we take them as
401 * indications when audio really starts/stops playing, if
402 * we don't have any timing info yet -- instead of trying
403 * to be smart and guessing the server time. Otherwise the
404 * unknown transport delay add too much noise to our time
410 pa_smoother_resume(s->smoother, x, TRUE);
413 /* Please note that we have no idea if playback actually started
414 * if prebuf is non-zero! */
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418 pa_context *c = userdata;
425 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
428 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
431 pa_assert(PA_REFCNT_VALUE(c) >= 1);
435 if (c->version < 12) {
436 pa_context_fail(c, PA_ERR_PROTOCOL);
440 if (pa_tagstruct_getu32(t, &channel) < 0 ||
441 pa_tagstruct_getu32(t, &di) < 0 ||
442 pa_tagstruct_gets(t, &dn) < 0 ||
443 pa_tagstruct_get_boolean(t, &suspended) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
448 if (c->version >= 13) {
450 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453 pa_tagstruct_get_usec(t, &usec) < 0) {
454 pa_context_fail(c, PA_ERR_PROTOCOL);
458 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459 pa_tagstruct_getu32(t, &tlength) < 0 ||
460 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461 pa_tagstruct_getu32(t, &minreq) < 0 ||
462 pa_tagstruct_get_usec(t, &usec) < 0) {
463 pa_context_fail(c, PA_ERR_PROTOCOL);
469 if (!pa_tagstruct_eof(t)) {
470 pa_context_fail(c, PA_ERR_PROTOCOL);
474 if (!dn || di == PA_INVALID_INDEX) {
475 pa_context_fail(c, PA_ERR_PROTOCOL);
479 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
482 if (s->state != PA_STREAM_READY)
485 if (c->version >= 13) {
486 if (s->direction == PA_STREAM_RECORD)
487 s->timing_info.configured_source_usec = usec;
489 s->timing_info.configured_sink_usec = usec;
491 s->buffer_attr.maxlength = maxlength;
492 s->buffer_attr.fragsize = fragsize;
493 s->buffer_attr.tlength = tlength;
494 s->buffer_attr.prebuf = prebuf;
495 s->buffer_attr.minreq = minreq;
498 pa_xfree(s->device_name);
499 s->device_name = pa_xstrdup(dn);
500 s->device_index = di;
502 s->suspended = suspended;
504 check_smoother_status(s, TRUE, FALSE, FALSE);
505 request_auto_timing_update(s, TRUE);
507 if (s->moved_callback)
508 s->moved_callback(s, s->moved_userdata);
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515 pa_context *c = userdata;
519 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
522 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
525 pa_assert(PA_REFCNT_VALUE(c) >= 1);
529 if (c->version < 15) {
530 pa_context_fail(c, PA_ERR_PROTOCOL);
534 if (pa_tagstruct_getu32(t, &channel) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
539 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541 pa_tagstruct_getu32(t, &fragsize) < 0 ||
542 pa_tagstruct_get_usec(t, &usec) < 0) {
543 pa_context_fail(c, PA_ERR_PROTOCOL);
547 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548 pa_tagstruct_getu32(t, &tlength) < 0 ||
549 pa_tagstruct_getu32(t, &prebuf) < 0 ||
550 pa_tagstruct_getu32(t, &minreq) < 0 ||
551 pa_tagstruct_get_usec(t, &usec) < 0) {
552 pa_context_fail(c, PA_ERR_PROTOCOL);
557 if (!pa_tagstruct_eof(t)) {
558 pa_context_fail(c, PA_ERR_PROTOCOL);
562 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
565 if (s->state != PA_STREAM_READY)
568 if (s->direction == PA_STREAM_RECORD)
569 s->timing_info.configured_source_usec = usec;
571 s->timing_info.configured_sink_usec = usec;
573 s->buffer_attr.maxlength = maxlength;
574 s->buffer_attr.fragsize = fragsize;
575 s->buffer_attr.tlength = tlength;
576 s->buffer_attr.prebuf = prebuf;
577 s->buffer_attr.minreq = minreq;
579 request_auto_timing_update(s, TRUE);
581 if (s->buffer_attr_callback)
582 s->buffer_attr_callback(s, s->buffer_attr_userdata);
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589 pa_context *c = userdata;
595 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
598 pa_assert(PA_REFCNT_VALUE(c) >= 1);
602 if (c->version < 12) {
603 pa_context_fail(c, PA_ERR_PROTOCOL);
607 if (pa_tagstruct_getu32(t, &channel) < 0 ||
608 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609 !pa_tagstruct_eof(t)) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
614 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
617 if (s->state != PA_STREAM_READY)
620 s->suspended = suspended;
622 check_smoother_status(s, TRUE, FALSE, FALSE);
623 request_auto_timing_update(s, TRUE);
625 if (s->suspended_callback)
626 s->suspended_callback(s, s->suspended_userdata);
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633 pa_context *c = userdata;
638 pa_assert(command == PA_COMMAND_STARTED);
641 pa_assert(PA_REFCNT_VALUE(c) >= 1);
645 if (c->version < 13) {
646 pa_context_fail(c, PA_ERR_PROTOCOL);
650 if (pa_tagstruct_getu32(t, &channel) < 0 ||
651 !pa_tagstruct_eof(t)) {
652 pa_context_fail(c, PA_ERR_PROTOCOL);
656 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
659 if (s->state != PA_STREAM_READY)
662 check_smoother_status(s, TRUE, TRUE, FALSE);
663 request_auto_timing_update(s, TRUE);
665 if (s->started_callback)
666 s->started_callback(s, s->started_userdata);
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
676 pa_proplist *pl = NULL;
680 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
683 pa_assert(PA_REFCNT_VALUE(c) >= 1);
687 if (c->version < 15) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
692 pl = pa_proplist_new();
694 if (pa_tagstruct_getu32(t, &channel) < 0 ||
695 pa_tagstruct_gets(t, &event) < 0 ||
696 pa_tagstruct_get_proplist(t, pl) < 0 ||
697 !pa_tagstruct_eof(t) || !event) {
698 pa_context_fail(c, PA_ERR_PROTOCOL);
702 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
705 if (s->state != PA_STREAM_READY)
708 if (s->event_callback)
709 s->event_callback(s, event, pl, s->event_userdata);
715 pa_proplist_free(pl);
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
720 pa_context *c = userdata;
721 uint32_t bytes, channel;
724 pa_assert(command == PA_COMMAND_REQUEST);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
731 if (pa_tagstruct_getu32(t, &channel) < 0 ||
732 pa_tagstruct_getu32(t, &bytes) < 0 ||
733 !pa_tagstruct_eof(t)) {
734 pa_context_fail(c, PA_ERR_PROTOCOL);
738 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741 if (s->state != PA_STREAM_READY)
744 s->requested_bytes += bytes;
746 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
748 if (s->requested_bytes > 0 && s->write_callback)
749 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
761 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
764 pa_assert(PA_REFCNT_VALUE(c) >= 1);
768 if (pa_tagstruct_getu32(t, &channel) < 0 ||
769 !pa_tagstruct_eof(t)) {
770 pa_context_fail(c, PA_ERR_PROTOCOL);
774 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
777 if (s->state != PA_STREAM_READY)
780 if (s->buffer_attr.prebuf > 0)
781 check_smoother_status(s, TRUE, FALSE, TRUE);
783 request_auto_timing_update(s, TRUE);
785 if (command == PA_COMMAND_OVERFLOW) {
786 if (s->overflow_callback)
787 s->overflow_callback(s, s->overflow_userdata);
788 } else if (command == PA_COMMAND_UNDERFLOW) {
789 if (s->underflow_callback)
790 s->underflow_callback(s, s->underflow_userdata);
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
799 pa_assert(PA_REFCNT_VALUE(s) >= 1);
801 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
803 if (s->state != PA_STREAM_READY)
807 s->write_index_not_before = s->context->ctag;
809 if (s->timing_info_valid)
810 s->timing_info.write_index_corrupt = TRUE;
812 /* pa_log("write_index invalidated"); */
816 s->read_index_not_before = s->context->ctag;
818 if (s->timing_info_valid)
819 s->timing_info.read_index_corrupt = TRUE;
821 /* pa_log("read_index invalidated"); */
824 request_auto_timing_update(s, TRUE);
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828 pa_stream *s = userdata;
831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
834 request_auto_timing_update(s, FALSE);
838 static void create_stream_complete(pa_stream *s) {
840 pa_assert(PA_REFCNT_VALUE(s) >= 1);
841 pa_assert(s->state == PA_STREAM_CREATING);
843 pa_stream_set_state(s, PA_STREAM_READY);
845 if (s->requested_bytes > 0 && s->write_callback)
846 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
848 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850 pa_assert(!s->auto_timing_update_event);
851 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
853 request_auto_timing_update(s, TRUE);
856 check_smoother_status(s, TRUE, FALSE, FALSE);
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
865 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
868 if (pa_atou(e, &ms) < 0 || ms <= 0)
869 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
871 attr->maxlength = (uint32_t) -1;
872 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873 attr->minreq = (uint32_t) -1;
874 attr->prebuf = (uint32_t) -1;
875 attr->fragsize = attr->tlength;
879 *flags |= PA_STREAM_ADJUST_LATENCY;
882 if (s->context->version >= 13)
885 /* Version older than 0.9.10 didn't do server side buffer_attr
886 * selection, hence we have to fake it on the client side. */
888 /* We choose fairly conservative values here, to not confuse
889 * old clients with extremely large playback buffers */
891 if (attr->maxlength == (uint32_t) -1)
892 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
894 if (attr->tlength == (uint32_t) -1)
895 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
897 if (attr->minreq == (uint32_t) -1)
898 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
900 if (attr->prebuf == (uint32_t) -1)
901 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
903 if (attr->fragsize == (uint32_t) -1)
904 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908 pa_stream *s = userdata;
909 uint32_t requested_bytes = 0;
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
914 pa_assert(s->state == PA_STREAM_CREATING);
918 if (command != PA_COMMAND_REPLY) {
919 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
922 pa_stream_set_state(s, PA_STREAM_FAILED);
926 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927 s->channel == PA_INVALID_INDEX ||
928 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
929 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930 pa_context_fail(s->context, PA_ERR_PROTOCOL);
934 s->requested_bytes = (int64_t) requested_bytes;
936 if (s->context->version >= 9) {
937 if (s->direction == PA_STREAM_PLAYBACK) {
938 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942 pa_context_fail(s->context, PA_ERR_PROTOCOL);
945 } else if (s->direction == PA_STREAM_RECORD) {
946 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948 pa_context_fail(s->context, PA_ERR_PROTOCOL);
954 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
957 const char *dn = NULL;
960 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963 pa_tagstruct_gets(t, &dn) < 0 ||
964 pa_tagstruct_get_boolean(t, &suspended) < 0) {
965 pa_context_fail(s->context, PA_ERR_PROTOCOL);
969 if (!dn || s->device_index == PA_INVALID_INDEX ||
970 ss.channels != cm.channels ||
971 !pa_channel_map_valid(&cm) ||
972 !pa_sample_spec_valid(&ss) ||
973 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976 pa_context_fail(s->context, PA_ERR_PROTOCOL);
980 pa_xfree(s->device_name);
981 s->device_name = pa_xstrdup(dn);
982 s->suspended = suspended;
988 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
991 if (pa_tagstruct_get_usec(t, &usec) < 0) {
992 pa_context_fail(s->context, PA_ERR_PROTOCOL);
996 if (s->direction == PA_STREAM_RECORD)
997 s->timing_info.configured_source_usec = usec;
999 s->timing_info.configured_sink_usec = usec;
1002 if (!pa_tagstruct_eof(t)) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1007 if (s->direction == PA_STREAM_RECORD) {
1008 pa_assert(!s->record_memblockq);
1010 s->record_memblockq = pa_memblockq_new(
1012 s->buffer_attr.maxlength,
1014 pa_frame_size(&s->sample_spec),
1021 s->channel_valid = TRUE;
1022 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1024 create_stream_complete(s);
1030 static int create_stream(
1031 pa_stream_direction_t direction,
1034 const pa_buffer_attr *attr,
1035 pa_stream_flags_t flags,
1036 const pa_cvolume *volume,
1037 pa_stream *sync_stream) {
1041 pa_bool_t volume_set = FALSE;
1044 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1047 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051 PA_STREAM_INTERPOLATE_TIMING|
1052 PA_STREAM_NOT_MONOTONIC|
1053 PA_STREAM_AUTO_TIMING_UPDATE|
1054 PA_STREAM_NO_REMAP_CHANNELS|
1055 PA_STREAM_NO_REMIX_CHANNELS|
1056 PA_STREAM_FIX_FORMAT|
1058 PA_STREAM_FIX_CHANNELS|
1059 PA_STREAM_DONT_MOVE|
1060 PA_STREAM_VARIABLE_RATE|
1061 PA_STREAM_PEAK_DETECT|
1062 PA_STREAM_START_MUTED|
1063 PA_STREAM_ADJUST_LATENCY|
1064 PA_STREAM_EARLY_REQUESTS|
1065 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066 PA_STREAM_START_UNMUTED|
1067 PA_STREAM_FAIL_ON_SUSPEND|
1068 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1070 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1071 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1072 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1073 /* Althought some of the other flags are not supported on older
1074 * version, we don't check for them here, because it doesn't hurt
1075 * when they are passed but actually not supported. This makes
1076 * client development easier */
1078 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1079 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1080 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1081 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1082 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1086 s->direction = direction;
1089 s->syncid = sync_stream->syncid;
1092 s->buffer_attr = *attr;
1093 patch_buffer_attr(s, &s->buffer_attr, &flags);
1096 s->corked = !!(flags & PA_STREAM_START_CORKED);
1098 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1101 x = pa_rtclock_now();
1103 pa_assert(!s->smoother);
1104 s->smoother = pa_smoother_new(
1105 SMOOTHER_ADJUST_TIME,
1106 SMOOTHER_HISTORY_TIME,
1107 !(flags & PA_STREAM_NOT_MONOTONIC),
1109 SMOOTHER_MIN_HISTORY,
1115 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1117 t = pa_tagstruct_command(
1119 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1122 if (s->context->version < 13)
1123 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1127 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1128 PA_TAG_CHANNEL_MAP, &s->channel_map,
1129 PA_TAG_U32, PA_INVALID_INDEX,
1131 PA_TAG_U32, s->buffer_attr.maxlength,
1132 PA_TAG_BOOLEAN, s->corked,
1135 if (s->direction == PA_STREAM_PLAYBACK) {
1140 PA_TAG_U32, s->buffer_attr.tlength,
1141 PA_TAG_U32, s->buffer_attr.prebuf,
1142 PA_TAG_U32, s->buffer_attr.minreq,
1143 PA_TAG_U32, s->syncid,
1146 volume_set = !!volume;
1149 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1151 pa_tagstruct_put_cvolume(t, volume);
1153 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1155 if (s->context->version >= 12) {
1158 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1159 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1160 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1161 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1162 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1163 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1164 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1168 if (s->context->version >= 13) {
1170 if (s->direction == PA_STREAM_PLAYBACK)
1171 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1173 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1177 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1178 PA_TAG_PROPLIST, s->proplist,
1181 if (s->direction == PA_STREAM_RECORD)
1182 pa_tagstruct_putu32(t, s->direct_on_input);
1185 if (s->context->version >= 14) {
1187 if (s->direction == PA_STREAM_PLAYBACK)
1188 pa_tagstruct_put_boolean(t, volume_set);
1190 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1193 if (s->context->version >= 15) {
1195 if (s->direction == PA_STREAM_PLAYBACK)
1196 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1198 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1199 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1202 if (s->context->version >= 17) {
1204 if (s->direction == PA_STREAM_PLAYBACK)
1205 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1209 pa_pstream_send_tagstruct(s->context->pstream, t);
1210 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1212 pa_stream_set_state(s, PA_STREAM_CREATING);
1218 int pa_stream_connect_playback(
1221 const pa_buffer_attr *attr,
1222 pa_stream_flags_t flags,
1223 const pa_cvolume *volume,
1224 pa_stream *sync_stream) {
1227 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1229 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1232 int pa_stream_connect_record(
1235 const pa_buffer_attr *attr,
1236 pa_stream_flags_t flags) {
1239 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1241 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1244 int pa_stream_begin_write(
1250 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1252 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1253 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1254 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1255 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1256 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1258 if (*nbytes != (size_t) -1) {
1261 m = pa_mempool_block_size_max(s->context->mempool);
1262 fs = pa_frame_size(&s->sample_spec);
1269 if (!s->write_memblock) {
1270 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1271 s->write_data = pa_memblock_acquire(s->write_memblock);
1274 *data = s->write_data;
1275 *nbytes = pa_memblock_get_length(s->write_memblock);
1280 int pa_stream_cancel_write(
1284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1286 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1287 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1288 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1289 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1291 pa_assert(s->write_data);
1293 pa_memblock_release(s->write_memblock);
1294 pa_memblock_unref(s->write_memblock);
1295 s->write_memblock = NULL;
1296 s->write_data = NULL;
1301 int pa_stream_write(
1305 pa_free_cb_t free_cb,
1307 pa_seek_mode_t seek) {
1310 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1313 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1314 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1315 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1316 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1317 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1318 PA_CHECK_VALIDITY(s->context,
1319 !s->write_memblock ||
1320 ((data >= s->write_data) &&
1321 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1323 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1325 if (s->write_memblock) {
1328 /* pa_stream_write_begin() was called before */
1330 pa_memblock_release(s->write_memblock);
1332 chunk.memblock = s->write_memblock;
1333 chunk.index = (const char *) data - (const char *) s->write_data;
1334 chunk.length = length;
1336 s->write_memblock = NULL;
1337 s->write_data = NULL;
1339 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1340 pa_memblock_unref(chunk.memblock);
1343 pa_seek_mode_t t_seek = seek;
1344 int64_t t_offset = offset;
1345 size_t t_length = length;
1346 const void *t_data = data;
1348 /* pa_stream_write_begin() was not called before */
1350 while (t_length > 0) {
1355 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1356 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1357 chunk.length = t_length;
1361 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1362 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1364 d = pa_memblock_acquire(chunk.memblock);
1365 memcpy(d, t_data, chunk.length);
1366 pa_memblock_release(chunk.memblock);
1369 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1372 t_seek = PA_SEEK_RELATIVE;
1374 t_data = (const uint8_t*) t_data + chunk.length;
1375 t_length -= chunk.length;
1377 pa_memblock_unref(chunk.memblock);
1380 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1381 free_cb((void*) data);
1384 /* This is obviously wrong since we ignore the seeking index . But
1385 * that's OK, the server side applies the same error */
1386 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1388 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1390 if (s->direction == PA_STREAM_PLAYBACK) {
1392 /* Update latency request correction */
1393 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1395 if (seek == PA_SEEK_ABSOLUTE) {
1396 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1397 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1398 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1399 } else if (seek == PA_SEEK_RELATIVE) {
1400 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1401 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1403 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1406 /* Update the write index in the already available latency data */
1407 if (s->timing_info_valid) {
1409 if (seek == PA_SEEK_ABSOLUTE) {
1410 s->timing_info.write_index_corrupt = FALSE;
1411 s->timing_info.write_index = offset + (int64_t) length;
1412 } else if (seek == PA_SEEK_RELATIVE) {
1413 if (!s->timing_info.write_index_corrupt)
1414 s->timing_info.write_index += offset + (int64_t) length;
1416 s->timing_info.write_index_corrupt = TRUE;
1419 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1420 request_auto_timing_update(s, TRUE);
1426 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1432 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1436 if (!s->peek_memchunk.memblock) {
1438 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1444 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1447 pa_assert(s->peek_data);
1448 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1449 *length = s->peek_memchunk.length;
1453 int pa_stream_drop(pa_stream *s) {
1455 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1457 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1458 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1459 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1460 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1462 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1464 /* Fix the simulated local read index */
1465 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1466 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1468 pa_assert(s->peek_data);
1469 pa_memblock_release(s->peek_memchunk.memblock);
1470 pa_memblock_unref(s->peek_memchunk.memblock);
1471 pa_memchunk_reset(&s->peek_memchunk);
1476 size_t pa_stream_writable_size(pa_stream *s) {
1478 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1480 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1481 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1482 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1484 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1487 size_t pa_stream_readable_size(pa_stream *s) {
1489 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1491 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1492 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1493 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1495 return pa_memblockq_get_length(s->record_memblockq);
1498 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1504 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1506 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1507 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1508 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1510 /* Ask for a timing update before we cork/uncork to get the best
1511 * accuracy for the transport latency suitable for the
1512 * check_smoother_status() call in the started callback */
1513 request_auto_timing_update(s, TRUE);
1515 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1517 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1518 pa_tagstruct_putu32(t, s->channel);
1519 pa_pstream_send_tagstruct(s->context->pstream, t);
1520 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1522 /* This might cause the read index to conitnue again, hence
1523 * let's request a timing update */
1524 request_auto_timing_update(s, TRUE);
1529 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1533 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1534 pa_assert(s->state == PA_STREAM_READY);
1535 pa_assert(s->direction != PA_STREAM_UPLOAD);
1536 pa_assert(s->timing_info_valid);
1537 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1538 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1540 if (s->direction == PA_STREAM_PLAYBACK) {
1541 /* The last byte that was written into the output device
1542 * had this time value associated */
1543 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1545 if (!s->corked && !s->suspended) {
1547 if (!ignore_transport)
1548 /* Because the latency info took a little time to come
1549 * to us, we assume that the real output time is actually
1551 usec += s->timing_info.transport_usec;
1553 /* However, the output device usually maintains a buffer
1554 too, hence the real sample currently played is a little
1556 if (s->timing_info.sink_usec >= usec)
1559 usec -= s->timing_info.sink_usec;
1563 pa_assert(s->direction == PA_STREAM_RECORD);
1565 /* The last byte written into the server side queue had
1566 * this time value associated */
1567 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1569 if (!s->corked && !s->suspended) {
1571 if (!ignore_transport)
1572 /* Add transport latency */
1573 usec += s->timing_info.transport_usec;
1575 /* Add latency of data in device buffer */
1576 usec += s->timing_info.source_usec;
1578 /* If this is a monitor source, we need to correct the
1579 * time by the playback device buffer */
1580 if (s->timing_info.sink_usec >= usec)
1583 usec -= s->timing_info.sink_usec;
1590 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1591 pa_operation *o = userdata;
1592 struct timeval local, remote, now;
1594 pa_bool_t playing = FALSE;
1595 uint64_t underrun_for = 0, playing_for = 0;
1599 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1601 if (!o->context || !o->stream)
1604 i = &o->stream->timing_info;
1606 o->stream->timing_info_valid = FALSE;
1607 i->write_index_corrupt = TRUE;
1608 i->read_index_corrupt = TRUE;
1610 if (command != PA_COMMAND_REPLY) {
1611 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1616 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1617 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1618 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1619 pa_tagstruct_get_timeval(t, &local) < 0 ||
1620 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1621 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1622 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1624 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1628 if (o->context->version >= 13 &&
1629 o->stream->direction == PA_STREAM_PLAYBACK)
1630 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1631 pa_tagstruct_getu64(t, &playing_for) < 0) {
1633 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1638 if (!pa_tagstruct_eof(t)) {
1639 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1642 o->stream->timing_info_valid = TRUE;
1643 i->write_index_corrupt = FALSE;
1644 i->read_index_corrupt = FALSE;
1646 i->playing = (int) playing;
1647 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1649 pa_gettimeofday(&now);
1651 /* Calculcate timestamps */
1652 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1653 /* local and remote seem to have synchronized clocks */
1655 if (o->stream->direction == PA_STREAM_PLAYBACK)
1656 i->transport_usec = pa_timeval_diff(&remote, &local);
1658 i->transport_usec = pa_timeval_diff(&now, &remote);
1660 i->synchronized_clocks = TRUE;
1661 i->timestamp = remote;
1663 /* clocks are not synchronized, let's estimate latency then */
1664 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1665 i->synchronized_clocks = FALSE;
1666 i->timestamp = local;
1667 pa_timeval_add(&i->timestamp, i->transport_usec);
1670 /* Invalidate read and write indexes if necessary */
1671 if (tag < o->stream->read_index_not_before)
1672 i->read_index_corrupt = TRUE;
1674 if (tag < o->stream->write_index_not_before)
1675 i->write_index_corrupt = TRUE;
1677 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1678 /* Write index correction */
1681 uint32_t ctag = tag;
1683 /* Go through the saved correction values and add up the
1684 * total correction.*/
1685 for (n = 0, j = o->stream->current_write_index_correction+1;
1686 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1687 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1689 /* Step over invalid data or out-of-date data */
1690 if (!o->stream->write_index_corrections[j].valid ||
1691 o->stream->write_index_corrections[j].tag < ctag)
1694 /* Make sure that everything is in order */
1695 ctag = o->stream->write_index_corrections[j].tag+1;
1697 /* Now fix the write index */
1698 if (o->stream->write_index_corrections[j].corrupt) {
1699 /* A corrupting seek was made */
1700 i->write_index_corrupt = TRUE;
1701 } else if (o->stream->write_index_corrections[j].absolute) {
1702 /* An absolute seek was made */
1703 i->write_index = o->stream->write_index_corrections[j].value;
1704 i->write_index_corrupt = FALSE;
1705 } else if (!i->write_index_corrupt) {
1706 /* A relative seek was made */
1707 i->write_index += o->stream->write_index_corrections[j].value;
1711 /* Clear old correction entries */
1712 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1713 if (!o->stream->write_index_corrections[n].valid)
1716 if (o->stream->write_index_corrections[n].tag <= tag)
1717 o->stream->write_index_corrections[n].valid = FALSE;
1721 if (o->stream->direction == PA_STREAM_RECORD) {
1722 /* Read index correction */
1724 if (!i->read_index_corrupt)
1725 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1728 /* Update smoother */
1729 if (o->stream->smoother) {
1732 u = x = pa_rtclock_now() - i->transport_usec;
1734 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1737 /* If we weren't playing then it will take some time
1738 * until the audio will actually come out through the
1739 * speakers. Since we follow that timing here, we need
1740 * to try to fix this up */
1742 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1744 if (su < i->sink_usec)
1745 x += i->sink_usec - su;
1749 pa_smoother_pause(o->stream->smoother, x);
1751 /* Update the smoother */
1752 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1753 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1754 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1757 pa_smoother_resume(o->stream->smoother, x, TRUE);
1761 o->stream->auto_timing_update_requested = FALSE;
1763 if (o->stream->latency_update_callback)
1764 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1766 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1767 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1768 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1773 pa_operation_done(o);
1774 pa_operation_unref(o);
1777 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1785 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1787 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1788 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1789 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1791 if (s->direction == PA_STREAM_PLAYBACK) {
1792 /* Find a place to store the write_index correction data for this entry */
1793 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1795 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1796 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1798 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1800 t = pa_tagstruct_command(
1802 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1804 pa_tagstruct_putu32(t, s->channel);
1805 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1807 pa_pstream_send_tagstruct(s->context->pstream, t);
1808 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1810 if (s->direction == PA_STREAM_PLAYBACK) {
1811 /* Fill in initial correction data */
1813 s->current_write_index_correction = cidx;
1815 s->write_index_corrections[cidx].valid = TRUE;
1816 s->write_index_corrections[cidx].absolute = FALSE;
1817 s->write_index_corrections[cidx].corrupt = FALSE;
1818 s->write_index_corrections[cidx].tag = tag;
1819 s->write_index_corrections[cidx].value = 0;
1825 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1826 pa_stream *s = userdata;
1830 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1834 if (command != PA_COMMAND_REPLY) {
1835 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1838 pa_stream_set_state(s, PA_STREAM_FAILED);
1840 } else if (!pa_tagstruct_eof(t)) {
1841 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1845 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1851 int pa_stream_disconnect(pa_stream *s) {
1856 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1858 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1859 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1860 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1864 t = pa_tagstruct_command(
1866 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1867 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1869 pa_tagstruct_putu32(t, s->channel);
1870 pa_pstream_send_tagstruct(s->context->pstream, t);
1871 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1877 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1879 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1881 if (pa_detect_fork())
1884 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1887 s->read_callback = cb;
1888 s->read_userdata = userdata;
1891 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1893 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1895 if (pa_detect_fork())
1898 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1901 s->write_callback = cb;
1902 s->write_userdata = userdata;
1905 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1907 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1909 if (pa_detect_fork())
1912 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1915 s->state_callback = cb;
1916 s->state_userdata = userdata;
1919 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1921 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1923 if (pa_detect_fork())
1926 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1929 s->overflow_callback = cb;
1930 s->overflow_userdata = userdata;
1933 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1935 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1937 if (pa_detect_fork())
1940 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1943 s->underflow_callback = cb;
1944 s->underflow_userdata = userdata;
1947 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1949 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1951 if (pa_detect_fork())
1954 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1957 s->latency_update_callback = cb;
1958 s->latency_update_userdata = userdata;
1961 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1963 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1965 if (pa_detect_fork())
1968 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1971 s->moved_callback = cb;
1972 s->moved_userdata = userdata;
1975 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1977 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1979 if (pa_detect_fork())
1982 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1985 s->suspended_callback = cb;
1986 s->suspended_userdata = userdata;
1989 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1991 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1993 if (pa_detect_fork())
1996 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1999 s->started_callback = cb;
2000 s->started_userdata = userdata;
2003 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2007 if (pa_detect_fork())
2010 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2013 s->event_callback = cb;
2014 s->event_userdata = userdata;
2017 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2019 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2021 if (pa_detect_fork())
2024 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2027 s->buffer_attr_callback = cb;
2028 s->buffer_attr_userdata = userdata;
2031 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2032 pa_operation *o = userdata;
2037 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2042 if (command != PA_COMMAND_REPLY) {
2043 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2047 } else if (!pa_tagstruct_eof(t)) {
2048 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2053 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2054 cb(o->stream, success, o->userdata);
2058 pa_operation_done(o);
2059 pa_operation_unref(o);
2062 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2068 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2070 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2071 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2072 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2074 /* Ask for a timing update before we cork/uncork to get the best
2075 * accuracy for the transport latency suitable for the
2076 * check_smoother_status() call in the started callback */
2077 request_auto_timing_update(s, TRUE);
2081 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2083 t = pa_tagstruct_command(
2085 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2087 pa_tagstruct_putu32(t, s->channel);
2088 pa_tagstruct_put_boolean(t, !!b);
2089 pa_pstream_send_tagstruct(s->context->pstream, t);
2090 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2092 check_smoother_status(s, FALSE, FALSE, FALSE);
2094 /* This might cause the indexes to hang/start again, hence let's
2095 * request a timing update, after the cork/uncork, too */
2096 request_auto_timing_update(s, TRUE);
2101 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2107 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2109 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2112 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2114 t = pa_tagstruct_command(s->context, command, &tag);
2115 pa_tagstruct_putu32(t, s->channel);
2116 pa_pstream_send_tagstruct(s->context->pstream, t);
2117 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2122 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2128 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2129 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2130 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2132 /* Ask for a timing update *before* the flush, so that the
2133 * transport usec is as up to date as possible when we get the
2134 * underflow message and update the smoother status*/
2135 request_auto_timing_update(s, TRUE);
2137 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2140 if (s->direction == PA_STREAM_PLAYBACK) {
2142 if (s->write_index_corrections[s->current_write_index_correction].valid)
2143 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2145 if (s->buffer_attr.prebuf > 0)
2146 check_smoother_status(s, FALSE, FALSE, TRUE);
2148 /* This will change the write index, but leave the
2149 * read index untouched. */
2150 invalidate_indexes(s, FALSE, TRUE);
2153 /* For record streams this has no influence on the write
2154 * index, but the read index might jump. */
2155 invalidate_indexes(s, TRUE, FALSE);
2160 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2164 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2166 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2167 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2168 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2169 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2171 /* Ask for a timing update before we cork/uncork to get the best
2172 * accuracy for the transport latency suitable for the
2173 * check_smoother_status() call in the started callback */
2174 request_auto_timing_update(s, TRUE);
2176 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2179 /* This might cause the read index to hang again, hence
2180 * let's request a timing update */
2181 request_auto_timing_update(s, TRUE);
2186 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2190 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2192 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2193 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2194 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2195 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2197 /* Ask for a timing update before we cork/uncork to get the best
2198 * accuracy for the transport latency suitable for the
2199 * check_smoother_status() call in the started callback */
2200 request_auto_timing_update(s, TRUE);
2202 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2205 /* This might cause the read index to start moving again, hence
2206 * let's request a timing update */
2207 request_auto_timing_update(s, TRUE);
2212 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2216 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2219 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2220 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2221 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2223 if (s->context->version >= 13) {
2224 pa_proplist *p = pa_proplist_new();
2226 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2227 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2228 pa_proplist_free(p);
2233 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2234 t = pa_tagstruct_command(
2236 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2238 pa_tagstruct_putu32(t, s->channel);
2239 pa_tagstruct_puts(t, name);
2240 pa_pstream_send_tagstruct(s->context->pstream, t);
2241 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2247 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2253 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2254 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2255 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2256 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2257 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2258 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2261 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2263 usec = calc_time(s, FALSE);
2265 /* Make sure the time runs monotonically */
2266 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2267 if (usec < s->previous_time)
2268 usec = s->previous_time;
2270 s->previous_time = usec;
2279 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2289 if (negative && s->direction == PA_STREAM_RECORD) {
2297 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2303 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2306 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2307 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2308 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2309 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2310 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2311 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2313 if ((r = pa_stream_get_time(s, &t)) < 0)
2316 if (s->direction == PA_STREAM_PLAYBACK)
2317 cindex = s->timing_info.write_index;
2319 cindex = s->timing_info.read_index;
2324 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2326 if (s->direction == PA_STREAM_PLAYBACK)
2327 *r_usec = time_counter_diff(s, c, t, negative);
2329 *r_usec = time_counter_diff(s, t, c, negative);
2334 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2336 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2339 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2340 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2341 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2343 return &s->timing_info;
2346 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2348 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2350 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352 return &s->sample_spec;
2355 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2361 return &s->channel_map;
2364 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2366 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2368 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2369 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2370 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2372 return &s->buffer_attr;
2375 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2376 pa_operation *o = userdata;
2381 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2386 if (command != PA_COMMAND_REPLY) {
2387 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2392 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2393 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2394 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2395 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2396 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2397 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2400 } else if (o->stream->direction == PA_STREAM_RECORD) {
2401 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2402 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2403 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2408 if (o->stream->context->version >= 13) {
2411 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2412 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2416 if (o->stream->direction == PA_STREAM_RECORD)
2417 o->stream->timing_info.configured_source_usec = usec;
2419 o->stream->timing_info.configured_sink_usec = usec;
2422 if (!pa_tagstruct_eof(t)) {
2423 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2429 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2430 cb(o->stream, success, o->userdata);
2434 pa_operation_done(o);
2435 pa_operation_unref(o);
2439 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2443 pa_buffer_attr copy;
2446 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2449 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2450 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2451 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2452 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2454 /* Ask for a timing update before we cork/uncork to get the best
2455 * accuracy for the transport latency suitable for the
2456 * check_smoother_status() call in the started callback */
2457 request_auto_timing_update(s, TRUE);
2459 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2461 t = pa_tagstruct_command(
2463 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2465 pa_tagstruct_putu32(t, s->channel);
2468 patch_buffer_attr(s, ©, NULL);
2471 pa_tagstruct_putu32(t, attr->maxlength);
2473 if (s->direction == PA_STREAM_PLAYBACK)
2476 PA_TAG_U32, attr->tlength,
2477 PA_TAG_U32, attr->prebuf,
2478 PA_TAG_U32, attr->minreq,
2481 pa_tagstruct_putu32(t, attr->fragsize);
2483 if (s->context->version >= 13)
2484 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2486 if (s->context->version >= 14)
2487 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2489 pa_pstream_send_tagstruct(s->context->pstream, t);
2490 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2492 /* This might cause changes in the read/write indexex, hence let's
2493 * request a timing update */
2494 request_auto_timing_update(s, TRUE);
2499 uint32_t pa_stream_get_device_index(pa_stream *s) {
2501 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2503 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2504 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2505 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2506 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2507 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2509 return s->device_index;
2512 const char *pa_stream_get_device_name(pa_stream *s) {
2514 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2516 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2517 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2520 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2522 return s->device_name;
2525 int pa_stream_is_suspended(pa_stream *s) {
2527 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2530 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2532 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2534 return s->suspended;
2537 int pa_stream_is_corked(pa_stream *s) {
2539 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2541 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2542 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2543 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2548 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2549 pa_operation *o = userdata;
2554 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2559 if (command != PA_COMMAND_REPLY) {
2560 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2566 if (!pa_tagstruct_eof(t)) {
2567 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2572 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2573 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2576 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2577 cb(o->stream, success, o->userdata);
2581 pa_operation_done(o);
2582 pa_operation_unref(o);
2586 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2592 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2594 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2595 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2596 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2597 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2598 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2601 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2602 o->private = PA_UINT_TO_PTR(rate);
2604 t = pa_tagstruct_command(
2606 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2608 pa_tagstruct_putu32(t, s->channel);
2609 pa_tagstruct_putu32(t, rate);
2611 pa_pstream_send_tagstruct(s->context->pstream, t);
2612 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2617 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2623 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2625 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2627 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2628 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2629 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2631 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2633 t = pa_tagstruct_command(
2635 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2637 pa_tagstruct_putu32(t, s->channel);
2638 pa_tagstruct_putu32(t, (uint32_t) mode);
2639 pa_tagstruct_put_proplist(t, p);
2641 pa_pstream_send_tagstruct(s->context->pstream, t);
2642 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2644 /* Please note that we don't update s->proplist here, because we
2645 * don't export that field */
2650 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2654 const char * const*k;
2657 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2659 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2660 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2665 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2667 t = pa_tagstruct_command(
2669 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2671 pa_tagstruct_putu32(t, s->channel);
2673 for (k = keys; *k; k++)
2674 pa_tagstruct_puts(t, *k);
2676 pa_tagstruct_puts(t, NULL);
2678 pa_pstream_send_tagstruct(s->context->pstream, t);
2679 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2681 /* Please note that we don't update s->proplist here, because we
2682 * don't export that field */
2687 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2689 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2691 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2692 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2693 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2694 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2696 s->direct_on_input = sink_input_idx;
2701 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2703 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2705 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2706 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2707 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2709 return s->direct_on_input;