2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
44 #include "fork-detect.h"
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 pa_stream *pa_stream_new_with_proplist(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
106 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
108 s = pa_xnew(pa_stream, 1);
111 s->mainloop = c->mainloop;
113 s->direction = PA_STREAM_NODIRECTION;
114 s->state = PA_STREAM_UNCONNECTED;
117 s->sample_spec = *ss;
118 s->channel_map = *map;
120 s->direct_on_input = PA_INVALID_INDEX;
122 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
124 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
127 s->channel_valid = FALSE;
128 s->syncid = c->csyncid++;
129 s->stream_index = PA_INVALID_INDEX;
131 s->requested_bytes = 0;
132 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
134 /* We initialize der target length here, so that if the user
135 * passes no explicit buffering metrics the default is similar to
136 * what older PA versions provided. */
138 s->buffer_attr.maxlength = (uint32_t) -1;
139 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140 s->buffer_attr.minreq = (uint32_t) -1;
141 s->buffer_attr.prebuf = (uint32_t) -1;
142 s->buffer_attr.fragsize = (uint32_t) -1;
144 s->device_index = PA_INVALID_INDEX;
145 s->device_name = NULL;
146 s->suspended = FALSE;
149 s->write_memblock = NULL;
150 s->write_data = NULL;
152 pa_memchunk_reset(&s->peek_memchunk);
154 s->record_memblockq = NULL;
156 memset(&s->timing_info, 0, sizeof(s->timing_info));
157 s->timing_info_valid = FALSE;
159 s->previous_time = 0;
161 s->read_index_not_before = 0;
162 s->write_index_not_before = 0;
163 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164 s->write_index_corrections[i].valid = 0;
165 s->current_write_index_correction = 0;
167 s->auto_timing_update_event = NULL;
168 s->auto_timing_update_requested = FALSE;
169 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
175 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176 PA_LLIST_PREPEND(pa_stream, c->streams, s);
182 static void stream_unlink(pa_stream *s) {
189 /* Detach from context */
191 /* Unref all operatio object that point to us */
192 for (o = s->context->operations; o; o = n) {
196 pa_operation_cancel(o);
199 /* Drop all outstanding replies for this stream */
200 if (s->context->pdispatch)
201 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
203 if (s->channel_valid) {
204 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
206 s->channel_valid = FALSE;
209 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
214 if (s->auto_timing_update_event) {
215 pa_assert(s->mainloop);
216 s->mainloop->time_free(s->auto_timing_update_event);
222 static void stream_free(pa_stream *s) {
227 if (s->write_memblock) {
228 pa_memblock_release(s->write_memblock);
229 pa_memblock_unref(s->write_data);
232 if (s->peek_memchunk.memblock) {
234 pa_memblock_release(s->peek_memchunk.memblock);
235 pa_memblock_unref(s->peek_memchunk.memblock);
238 if (s->record_memblockq)
239 pa_memblockq_free(s->record_memblockq);
242 pa_proplist_free(s->proplist);
245 pa_smoother_free(s->smoother);
247 pa_xfree(s->device_name);
251 void pa_stream_unref(pa_stream *s) {
253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
255 if (PA_REFCNT_DEC(s) <= 0)
259 pa_stream* pa_stream_ref(pa_stream *s) {
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
269 pa_assert(PA_REFCNT_VALUE(s) >= 1);
274 pa_context* pa_stream_get_context(pa_stream *s) {
276 pa_assert(PA_REFCNT_VALUE(s) >= 1);
281 uint32_t pa_stream_get_index(pa_stream *s) {
283 pa_assert(PA_REFCNT_VALUE(s) >= 1);
285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
288 return s->stream_index;
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
293 pa_assert(PA_REFCNT_VALUE(s) >= 1);
302 if (s->state_callback)
303 s->state_callback(s, s->state_userdata);
305 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
313 pa_assert(PA_REFCNT_VALUE(s) >= 1);
315 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
318 if (s->state == PA_STREAM_READY &&
319 (force || !s->auto_timing_update_requested)) {
322 /* pa_log("Automatically requesting new timing data"); */
324 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325 pa_operation_unref(o);
326 s->auto_timing_update_requested = TRUE;
330 if (s->auto_timing_update_event) {
332 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
334 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
336 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341 pa_context *c = userdata;
346 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
349 pa_assert(PA_REFCNT_VALUE(c) >= 1);
353 if (pa_tagstruct_getu32(t, &channel) < 0 ||
354 !pa_tagstruct_eof(t)) {
355 pa_context_fail(c, PA_ERR_PROTOCOL);
359 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
362 if (s->state != PA_STREAM_READY)
365 pa_context_set_error(c, PA_ERR_KILLED);
366 pa_stream_set_state(s, PA_STREAM_FAILED);
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
376 pa_assert(!force_start || !force_stop);
381 x = pa_rtclock_now();
383 if (s->timing_info_valid) {
385 x -= s->timing_info.transport_usec;
387 x += s->timing_info.transport_usec;
390 if (s->suspended || s->corked || force_stop)
391 pa_smoother_pause(s->smoother, x);
392 else if (force_start || s->buffer_attr.prebuf == 0) {
394 if (!s->timing_info_valid &&
398 s->context->version >= 13) {
400 /* If the server supports STARTED events we take them as
401 * indications when audio really starts/stops playing, if
402 * we don't have any timing info yet -- instead of trying
403 * to be smart and guessing the server time. Otherwise the
404 * unknown transport delay add too much noise to our time
410 pa_smoother_resume(s->smoother, x, TRUE);
413 /* Please note that we have no idea if playback actually started
414 * if prebuf is non-zero! */
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418 pa_context *c = userdata;
425 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
428 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
431 pa_assert(PA_REFCNT_VALUE(c) >= 1);
435 if (c->version < 12) {
436 pa_context_fail(c, PA_ERR_PROTOCOL);
440 if (pa_tagstruct_getu32(t, &channel) < 0 ||
441 pa_tagstruct_getu32(t, &di) < 0 ||
442 pa_tagstruct_gets(t, &dn) < 0 ||
443 pa_tagstruct_get_boolean(t, &suspended) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
448 if (c->version >= 13) {
450 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453 pa_tagstruct_get_usec(t, &usec) < 0) {
454 pa_context_fail(c, PA_ERR_PROTOCOL);
458 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459 pa_tagstruct_getu32(t, &tlength) < 0 ||
460 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461 pa_tagstruct_getu32(t, &minreq) < 0 ||
462 pa_tagstruct_get_usec(t, &usec) < 0) {
463 pa_context_fail(c, PA_ERR_PROTOCOL);
469 if (!pa_tagstruct_eof(t)) {
470 pa_context_fail(c, PA_ERR_PROTOCOL);
474 if (!dn || di == PA_INVALID_INDEX) {
475 pa_context_fail(c, PA_ERR_PROTOCOL);
479 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
482 if (s->state != PA_STREAM_READY)
485 if (c->version >= 13) {
486 if (s->direction == PA_STREAM_RECORD)
487 s->timing_info.configured_source_usec = usec;
489 s->timing_info.configured_sink_usec = usec;
491 s->buffer_attr.maxlength = maxlength;
492 s->buffer_attr.fragsize = fragsize;
493 s->buffer_attr.tlength = tlength;
494 s->buffer_attr.prebuf = prebuf;
495 s->buffer_attr.minreq = minreq;
498 pa_xfree(s->device_name);
499 s->device_name = pa_xstrdup(dn);
500 s->device_index = di;
502 s->suspended = suspended;
504 check_smoother_status(s, TRUE, FALSE, FALSE);
505 request_auto_timing_update(s, TRUE);
507 if (s->moved_callback)
508 s->moved_callback(s, s->moved_userdata);
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515 pa_context *c = userdata;
519 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
522 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
525 pa_assert(PA_REFCNT_VALUE(c) >= 1);
529 if (c->version < 15) {
530 pa_context_fail(c, PA_ERR_PROTOCOL);
534 if (pa_tagstruct_getu32(t, &channel) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
539 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541 pa_tagstruct_getu32(t, &fragsize) < 0 ||
542 pa_tagstruct_get_usec(t, &usec) < 0) {
543 pa_context_fail(c, PA_ERR_PROTOCOL);
547 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548 pa_tagstruct_getu32(t, &tlength) < 0 ||
549 pa_tagstruct_getu32(t, &prebuf) < 0 ||
550 pa_tagstruct_getu32(t, &minreq) < 0 ||
551 pa_tagstruct_get_usec(t, &usec) < 0) {
552 pa_context_fail(c, PA_ERR_PROTOCOL);
557 if (!pa_tagstruct_eof(t)) {
558 pa_context_fail(c, PA_ERR_PROTOCOL);
562 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
565 if (s->state != PA_STREAM_READY)
568 if (s->direction == PA_STREAM_RECORD)
569 s->timing_info.configured_source_usec = usec;
571 s->timing_info.configured_sink_usec = usec;
573 s->buffer_attr.maxlength = maxlength;
574 s->buffer_attr.fragsize = fragsize;
575 s->buffer_attr.tlength = tlength;
576 s->buffer_attr.prebuf = prebuf;
577 s->buffer_attr.minreq = minreq;
579 request_auto_timing_update(s, TRUE);
581 if (s->buffer_attr_callback)
582 s->buffer_attr_callback(s, s->buffer_attr_userdata);
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589 pa_context *c = userdata;
595 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
598 pa_assert(PA_REFCNT_VALUE(c) >= 1);
602 if (c->version < 12) {
603 pa_context_fail(c, PA_ERR_PROTOCOL);
607 if (pa_tagstruct_getu32(t, &channel) < 0 ||
608 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609 !pa_tagstruct_eof(t)) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
614 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
617 if (s->state != PA_STREAM_READY)
620 s->suspended = suspended;
622 check_smoother_status(s, TRUE, FALSE, FALSE);
623 request_auto_timing_update(s, TRUE);
625 if (s->suspended_callback)
626 s->suspended_callback(s, s->suspended_userdata);
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633 pa_context *c = userdata;
638 pa_assert(command == PA_COMMAND_STARTED);
641 pa_assert(PA_REFCNT_VALUE(c) >= 1);
645 if (c->version < 13) {
646 pa_context_fail(c, PA_ERR_PROTOCOL);
650 if (pa_tagstruct_getu32(t, &channel) < 0 ||
651 !pa_tagstruct_eof(t)) {
652 pa_context_fail(c, PA_ERR_PROTOCOL);
656 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
659 if (s->state != PA_STREAM_READY)
662 check_smoother_status(s, TRUE, TRUE, FALSE);
663 request_auto_timing_update(s, TRUE);
665 if (s->started_callback)
666 s->started_callback(s, s->started_userdata);
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
676 pa_proplist *pl = NULL;
680 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
683 pa_assert(PA_REFCNT_VALUE(c) >= 1);
687 if (c->version < 15) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
692 pl = pa_proplist_new();
694 if (pa_tagstruct_getu32(t, &channel) < 0 ||
695 pa_tagstruct_gets(t, &event) < 0 ||
696 pa_tagstruct_get_proplist(t, pl) < 0 ||
697 !pa_tagstruct_eof(t) || !event) {
698 pa_context_fail(c, PA_ERR_PROTOCOL);
702 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
705 if (s->state != PA_STREAM_READY)
708 if (s->event_callback)
709 s->event_callback(s, event, pl, s->event_userdata);
715 pa_proplist_free(pl);
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
720 pa_context *c = userdata;
721 uint32_t bytes, channel;
724 pa_assert(command == PA_COMMAND_REQUEST);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
731 if (pa_tagstruct_getu32(t, &channel) < 0 ||
732 pa_tagstruct_getu32(t, &bytes) < 0 ||
733 !pa_tagstruct_eof(t)) {
734 pa_context_fail(c, PA_ERR_PROTOCOL);
738 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741 if (s->state != PA_STREAM_READY)
744 s->requested_bytes += bytes;
746 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
748 if (s->requested_bytes > 0 && s->write_callback)
749 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
761 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
764 pa_assert(PA_REFCNT_VALUE(c) >= 1);
768 if (pa_tagstruct_getu32(t, &channel) < 0 ||
769 !pa_tagstruct_eof(t)) {
770 pa_context_fail(c, PA_ERR_PROTOCOL);
774 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
777 if (s->state != PA_STREAM_READY)
780 if (s->buffer_attr.prebuf > 0)
781 check_smoother_status(s, TRUE, FALSE, TRUE);
783 request_auto_timing_update(s, TRUE);
785 if (command == PA_COMMAND_OVERFLOW) {
786 if (s->overflow_callback)
787 s->overflow_callback(s, s->overflow_userdata);
788 } else if (command == PA_COMMAND_UNDERFLOW) {
789 if (s->underflow_callback)
790 s->underflow_callback(s, s->underflow_userdata);
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
799 pa_assert(PA_REFCNT_VALUE(s) >= 1);
801 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
803 if (s->state != PA_STREAM_READY)
807 s->write_index_not_before = s->context->ctag;
809 if (s->timing_info_valid)
810 s->timing_info.write_index_corrupt = TRUE;
812 /* pa_log("write_index invalidated"); */
816 s->read_index_not_before = s->context->ctag;
818 if (s->timing_info_valid)
819 s->timing_info.read_index_corrupt = TRUE;
821 /* pa_log("read_index invalidated"); */
824 request_auto_timing_update(s, TRUE);
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828 pa_stream *s = userdata;
831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
834 request_auto_timing_update(s, FALSE);
838 static void create_stream_complete(pa_stream *s) {
840 pa_assert(PA_REFCNT_VALUE(s) >= 1);
841 pa_assert(s->state == PA_STREAM_CREATING);
843 pa_stream_set_state(s, PA_STREAM_READY);
845 if (s->requested_bytes > 0 && s->write_callback)
846 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
848 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850 pa_assert(!s->auto_timing_update_event);
851 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
853 request_auto_timing_update(s, TRUE);
856 check_smoother_status(s, TRUE, FALSE, FALSE);
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
865 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
868 if (pa_atou(e, &ms) < 0 || ms <= 0)
869 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
871 attr->maxlength = (uint32_t) -1;
872 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873 attr->minreq = (uint32_t) -1;
874 attr->prebuf = (uint32_t) -1;
875 attr->fragsize = attr->tlength;
879 *flags |= PA_STREAM_ADJUST_LATENCY;
882 if (s->context->version >= 13)
885 /* Version older than 0.9.10 didn't do server side buffer_attr
886 * selection, hence we have to fake it on the client side. */
888 /* We choose fairly conservative values here, to not confuse
889 * old clients with extremely large playback buffers */
891 if (attr->maxlength == (uint32_t) -1)
892 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
894 if (attr->tlength == (uint32_t) -1)
895 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
897 if (attr->minreq == (uint32_t) -1)
898 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
900 if (attr->prebuf == (uint32_t) -1)
901 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
903 if (attr->fragsize == (uint32_t) -1)
904 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908 pa_stream *s = userdata;
909 uint32_t requested_bytes = 0;
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
914 pa_assert(s->state == PA_STREAM_CREATING);
918 if (command != PA_COMMAND_REPLY) {
919 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
922 pa_stream_set_state(s, PA_STREAM_FAILED);
926 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927 s->channel == PA_INVALID_INDEX ||
928 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
929 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930 pa_context_fail(s->context, PA_ERR_PROTOCOL);
934 s->requested_bytes = (int64_t) requested_bytes;
936 if (s->context->version >= 9) {
937 if (s->direction == PA_STREAM_PLAYBACK) {
938 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942 pa_context_fail(s->context, PA_ERR_PROTOCOL);
945 } else if (s->direction == PA_STREAM_RECORD) {
946 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948 pa_context_fail(s->context, PA_ERR_PROTOCOL);
954 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
957 const char *dn = NULL;
960 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963 pa_tagstruct_gets(t, &dn) < 0 ||
964 pa_tagstruct_get_boolean(t, &suspended) < 0) {
965 pa_context_fail(s->context, PA_ERR_PROTOCOL);
969 if (!dn || s->device_index == PA_INVALID_INDEX ||
970 ss.channels != cm.channels ||
971 !pa_channel_map_valid(&cm) ||
972 !pa_sample_spec_valid(&ss) ||
973 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976 pa_context_fail(s->context, PA_ERR_PROTOCOL);
980 pa_xfree(s->device_name);
981 s->device_name = pa_xstrdup(dn);
982 s->suspended = suspended;
988 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
991 if (pa_tagstruct_get_usec(t, &usec) < 0) {
992 pa_context_fail(s->context, PA_ERR_PROTOCOL);
996 if (s->direction == PA_STREAM_RECORD)
997 s->timing_info.configured_source_usec = usec;
999 s->timing_info.configured_sink_usec = usec;
1002 if (!pa_tagstruct_eof(t)) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1007 if (s->direction == PA_STREAM_RECORD) {
1008 pa_assert(!s->record_memblockq);
1010 s->record_memblockq = pa_memblockq_new(
1012 s->buffer_attr.maxlength,
1014 pa_frame_size(&s->sample_spec),
1021 s->channel_valid = TRUE;
1022 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1024 create_stream_complete(s);
1030 static int create_stream(
1031 pa_stream_direction_t direction,
1034 const pa_buffer_attr *attr,
1035 pa_stream_flags_t flags,
1036 const pa_cvolume *volume,
1037 pa_stream *sync_stream) {
1041 pa_bool_t volume_set = FALSE;
1044 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1047 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051 PA_STREAM_INTERPOLATE_TIMING|
1052 PA_STREAM_NOT_MONOTONIC|
1053 PA_STREAM_AUTO_TIMING_UPDATE|
1054 PA_STREAM_NO_REMAP_CHANNELS|
1055 PA_STREAM_NO_REMIX_CHANNELS|
1056 PA_STREAM_FIX_FORMAT|
1058 PA_STREAM_FIX_CHANNELS|
1059 PA_STREAM_DONT_MOVE|
1060 PA_STREAM_VARIABLE_RATE|
1061 PA_STREAM_PEAK_DETECT|
1062 PA_STREAM_START_MUTED|
1063 PA_STREAM_ADJUST_LATENCY|
1064 PA_STREAM_EARLY_REQUESTS|
1065 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066 PA_STREAM_START_UNMUTED|
1067 PA_STREAM_FAIL_ON_SUSPEND|
1068 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1070 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1071 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1072 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1073 /* Althought some of the other flags are not supported on older
1074 * version, we don't check for them here, because it doesn't hurt
1075 * when they are passed but actually not supported. This makes
1076 * client development easier */
1078 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1079 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1080 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1081 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1082 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1086 s->direction = direction;
1089 s->syncid = sync_stream->syncid;
1092 s->buffer_attr = *attr;
1093 patch_buffer_attr(s, &s->buffer_attr, &flags);
1096 s->corked = !!(flags & PA_STREAM_START_CORKED);
1098 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1101 x = pa_rtclock_now();
1103 pa_assert(!s->smoother);
1104 s->smoother = pa_smoother_new(
1105 SMOOTHER_ADJUST_TIME,
1106 SMOOTHER_HISTORY_TIME,
1107 !(flags & PA_STREAM_NOT_MONOTONIC),
1109 SMOOTHER_MIN_HISTORY,
1115 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1117 t = pa_tagstruct_command(
1119 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1122 if (s->context->version < 13)
1123 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1127 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1128 PA_TAG_CHANNEL_MAP, &s->channel_map,
1129 PA_TAG_U32, PA_INVALID_INDEX,
1131 PA_TAG_U32, s->buffer_attr.maxlength,
1132 PA_TAG_BOOLEAN, s->corked,
1135 if (s->direction == PA_STREAM_PLAYBACK) {
1140 PA_TAG_U32, s->buffer_attr.tlength,
1141 PA_TAG_U32, s->buffer_attr.prebuf,
1142 PA_TAG_U32, s->buffer_attr.minreq,
1143 PA_TAG_U32, s->syncid,
1146 volume_set = !!volume;
1149 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1151 pa_tagstruct_put_cvolume(t, volume);
1153 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1155 if (s->context->version >= 12) {
1158 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1159 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1160 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1161 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1162 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1163 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1164 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1168 if (s->context->version >= 13) {
1170 if (s->direction == PA_STREAM_PLAYBACK)
1171 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1173 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1177 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1178 PA_TAG_PROPLIST, s->proplist,
1181 if (s->direction == PA_STREAM_RECORD)
1182 pa_tagstruct_putu32(t, s->direct_on_input);
1185 if (s->context->version >= 14) {
1187 if (s->direction == PA_STREAM_PLAYBACK)
1188 pa_tagstruct_put_boolean(t, volume_set);
1190 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1193 if (s->context->version >= 15) {
1195 if (s->direction == PA_STREAM_PLAYBACK)
1196 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1198 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1199 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1202 if (s->context->version >= 17) {
1204 if (s->direction == PA_STREAM_PLAYBACK)
1205 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1209 pa_pstream_send_tagstruct(s->context->pstream, t);
1210 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1212 pa_stream_set_state(s, PA_STREAM_CREATING);
1218 int pa_stream_connect_playback(
1221 const pa_buffer_attr *attr,
1222 pa_stream_flags_t flags,
1223 const pa_cvolume *volume,
1224 pa_stream *sync_stream) {
1227 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1229 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1232 int pa_stream_connect_record(
1235 const pa_buffer_attr *attr,
1236 pa_stream_flags_t flags) {
1239 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1241 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1244 int pa_stream_begin_write(
1250 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1252 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1253 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1254 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1255 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1256 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1258 if (*nbytes != (size_t) -1) {
1261 m = pa_mempool_block_size_max(s->context->mempool);
1262 fs = pa_frame_size(&s->sample_spec);
1269 if (!s->write_memblock) {
1270 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1271 s->write_data = pa_memblock_acquire(s->write_memblock);
1274 *data = s->write_data;
1275 *nbytes = pa_memblock_get_length(s->write_memblock);
1280 int pa_stream_cancel_write(
1284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1286 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1287 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1288 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1289 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1291 pa_assert(s->write_data);
1293 pa_memblock_release(s->write_memblock);
1294 pa_memblock_unref(s->write_memblock);
1295 s->write_memblock = NULL;
1296 s->write_data = NULL;
1301 int pa_stream_write(
1305 pa_free_cb_t free_cb,
1307 pa_seek_mode_t seek) {
1310 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1313 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1314 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1315 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1316 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1317 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1318 PA_CHECK_VALIDITY(s->context,
1319 !s->write_memblock ||
1320 ((data >= s->write_data) &&
1321 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1323 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1325 if (s->write_memblock) {
1328 /* pa_stream_write_begin() was called before */
1330 pa_memblock_release(s->write_memblock);
1332 chunk.memblock = s->write_memblock;
1333 chunk.index = (const char *) data - (const char *) s->write_data;
1334 chunk.length = length;
1336 s->write_memblock = NULL;
1337 s->write_data = NULL;
1339 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1340 pa_memblock_unref(chunk.memblock);
1343 pa_seek_mode_t t_seek = seek;
1344 int64_t t_offset = offset;
1345 size_t t_length = length;
1346 const void *t_data = data;
1348 /* pa_stream_write_begin() was not called before */
1350 while (t_length > 0) {
1355 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1356 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1357 chunk.length = t_length;
1361 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1362 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1364 d = pa_memblock_acquire(chunk.memblock);
1365 memcpy(d, t_data, chunk.length);
1366 pa_memblock_release(chunk.memblock);
1369 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1372 t_seek = PA_SEEK_RELATIVE;
1374 t_data = (const uint8_t*) t_data + chunk.length;
1375 t_length -= chunk.length;
1377 pa_memblock_unref(chunk.memblock);
1380 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1381 free_cb((void*) data);
1384 /* This is obviously wrong since we ignore the seeking index . But
1385 * that's OK, the server side applies the same error */
1386 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1388 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1390 if (s->direction == PA_STREAM_PLAYBACK) {
1392 /* Update latency request correction */
1393 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1395 if (seek == PA_SEEK_ABSOLUTE) {
1396 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1397 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1398 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1399 } else if (seek == PA_SEEK_RELATIVE) {
1400 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1401 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1403 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1406 /* Update the write index in the already available latency data */
1407 if (s->timing_info_valid) {
1409 if (seek == PA_SEEK_ABSOLUTE) {
1410 s->timing_info.write_index_corrupt = FALSE;
1411 s->timing_info.write_index = offset + (int64_t) length;
1412 } else if (seek == PA_SEEK_RELATIVE) {
1413 if (!s->timing_info.write_index_corrupt)
1414 s->timing_info.write_index += offset + (int64_t) length;
1416 s->timing_info.write_index_corrupt = TRUE;
1419 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1420 request_auto_timing_update(s, TRUE);
1426 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1432 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1436 if (!s->peek_memchunk.memblock) {
1438 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1444 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1447 pa_assert(s->peek_data);
1448 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1449 *length = s->peek_memchunk.length;
1453 int pa_stream_drop(pa_stream *s) {
1455 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1457 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1458 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1459 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1460 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1462 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1464 /* Fix the simulated local read index */
1465 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1466 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1468 pa_assert(s->peek_data);
1469 pa_memblock_release(s->peek_memchunk.memblock);
1470 pa_memblock_unref(s->peek_memchunk.memblock);
1471 pa_memchunk_reset(&s->peek_memchunk);
1476 size_t pa_stream_writable_size(pa_stream *s) {
1478 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1480 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1481 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1482 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1484 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1487 size_t pa_stream_readable_size(pa_stream *s) {
1489 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1491 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1492 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1493 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1495 return pa_memblockq_get_length(s->record_memblockq);
1498 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1504 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1506 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1507 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1508 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1510 /* Ask for a timing update before we cork/uncork to get the best
1511 * accuracy for the transport latency suitable for the
1512 * check_smoother_status() call in the started callback */
1513 request_auto_timing_update(s, TRUE);
1515 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1517 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1518 pa_tagstruct_putu32(t, s->channel);
1519 pa_pstream_send_tagstruct(s->context->pstream, t);
1520 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1522 /* This might cause the read index to conitnue again, hence
1523 * let's request a timing update */
1524 request_auto_timing_update(s, TRUE);
1529 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1533 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1534 pa_assert(s->state == PA_STREAM_READY);
1535 pa_assert(s->direction != PA_STREAM_UPLOAD);
1536 pa_assert(s->timing_info_valid);
1537 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1538 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1540 if (s->direction == PA_STREAM_PLAYBACK) {
1541 /* The last byte that was written into the output device
1542 * had this time value associated */
1543 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1545 if (!s->corked && !s->suspended) {
1547 if (!ignore_transport)
1548 /* Because the latency info took a little time to come
1549 * to us, we assume that the real output time is actually
1551 usec += s->timing_info.transport_usec;
1553 /* However, the output device usually maintains a buffer
1554 too, hence the real sample currently played is a little
1556 if (s->timing_info.sink_usec >= usec)
1559 usec -= s->timing_info.sink_usec;
1563 pa_assert(s->direction == PA_STREAM_RECORD);
1565 /* The last byte written into the server side queue had
1566 * this time value associated */
1567 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1569 if (!s->corked && !s->suspended) {
1571 if (!ignore_transport)
1572 /* Add transport latency */
1573 usec += s->timing_info.transport_usec;
1575 /* Add latency of data in device buffer */
1576 usec += s->timing_info.source_usec;
1578 /* If this is a monitor source, we need to correct the
1579 * time by the playback device buffer */
1580 if (s->timing_info.sink_usec >= usec)
1583 usec -= s->timing_info.sink_usec;
1590 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1591 pa_operation *o = userdata;
1592 struct timeval local, remote, now;
1594 pa_bool_t playing = FALSE;
1595 uint64_t underrun_for = 0, playing_for = 0;
1599 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1601 if (!o->context || !o->stream)
1604 i = &o->stream->timing_info;
1606 o->stream->timing_info_valid = FALSE;
1607 i->write_index_corrupt = TRUE;
1608 i->read_index_corrupt = TRUE;
1610 if (command != PA_COMMAND_REPLY) {
1611 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1616 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1617 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1618 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1619 pa_tagstruct_get_timeval(t, &local) < 0 ||
1620 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1621 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1622 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1624 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1628 if (o->context->version >= 13 &&
1629 o->stream->direction == PA_STREAM_PLAYBACK)
1630 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1631 pa_tagstruct_getu64(t, &playing_for) < 0) {
1633 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1638 if (!pa_tagstruct_eof(t)) {
1639 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1642 o->stream->timing_info_valid = TRUE;
1643 i->write_index_corrupt = FALSE;
1644 i->read_index_corrupt = FALSE;
1646 i->playing = (int) playing;
1647 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1649 pa_gettimeofday(&now);
1651 /* Calculcate timestamps */
1652 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1653 /* local and remote seem to have synchronized clocks */
1655 if (o->stream->direction == PA_STREAM_PLAYBACK)
1656 i->transport_usec = pa_timeval_diff(&remote, &local);
1658 i->transport_usec = pa_timeval_diff(&now, &remote);
1660 i->synchronized_clocks = TRUE;
1661 i->timestamp = remote;
1663 /* clocks are not synchronized, let's estimate latency then */
1664 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1665 i->synchronized_clocks = FALSE;
1666 i->timestamp = local;
1667 pa_timeval_add(&i->timestamp, i->transport_usec);
1670 /* Invalidate read and write indexes if necessary */
1671 if (tag < o->stream->read_index_not_before)
1672 i->read_index_corrupt = TRUE;
1674 if (tag < o->stream->write_index_not_before)
1675 i->write_index_corrupt = TRUE;
1677 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1678 /* Write index correction */
1681 uint32_t ctag = tag;
1683 /* Go through the saved correction values and add up the
1684 * total correction.*/
1685 for (n = 0, j = o->stream->current_write_index_correction+1;
1686 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1687 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1689 /* Step over invalid data or out-of-date data */
1690 if (!o->stream->write_index_corrections[j].valid ||
1691 o->stream->write_index_corrections[j].tag < ctag)
1694 /* Make sure that everything is in order */
1695 ctag = o->stream->write_index_corrections[j].tag+1;
1697 /* Now fix the write index */
1698 if (o->stream->write_index_corrections[j].corrupt) {
1699 /* A corrupting seek was made */
1700 i->write_index_corrupt = TRUE;
1701 } else if (o->stream->write_index_corrections[j].absolute) {
1702 /* An absolute seek was made */
1703 i->write_index = o->stream->write_index_corrections[j].value;
1704 i->write_index_corrupt = FALSE;
1705 } else if (!i->write_index_corrupt) {
1706 /* A relative seek was made */
1707 i->write_index += o->stream->write_index_corrections[j].value;
1711 /* Clear old correction entries */
1712 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1713 if (!o->stream->write_index_corrections[n].valid)
1716 if (o->stream->write_index_corrections[n].tag <= tag)
1717 o->stream->write_index_corrections[n].valid = FALSE;
1721 if (o->stream->direction == PA_STREAM_RECORD) {
1722 /* Read index correction */
1724 if (!i->read_index_corrupt)
1725 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1728 /* Update smoother */
1729 if (o->stream->smoother) {
1732 u = x = pa_rtclock_now() - i->transport_usec;
1734 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1737 /* If we weren't playing then it will take some time
1738 * until the audio will actually come out through the
1739 * speakers. Since we follow that timing here, we need
1740 * to try to fix this up */
1742 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1744 if (su < i->sink_usec)
1745 x += i->sink_usec - su;
1749 pa_smoother_pause(o->stream->smoother, x);
1751 /* Update the smoother */
1752 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1753 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1754 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1757 pa_smoother_resume(o->stream->smoother, x, TRUE);
1761 o->stream->auto_timing_update_requested = FALSE;
1763 if (o->stream->latency_update_callback)
1764 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1766 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1767 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1768 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1773 pa_operation_done(o);
1774 pa_operation_unref(o);
1777 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1785 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1787 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1788 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1789 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1791 if (s->direction == PA_STREAM_PLAYBACK) {
1792 /* Find a place to store the write_index correction data for this entry */
1793 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1795 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1796 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1798 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1800 t = pa_tagstruct_command(
1802 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1804 pa_tagstruct_putu32(t, s->channel);
1805 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1807 pa_pstream_send_tagstruct(s->context->pstream, t);
1808 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1810 if (s->direction == PA_STREAM_PLAYBACK) {
1811 /* Fill in initial correction data */
1813 s->current_write_index_correction = cidx;
1815 s->write_index_corrections[cidx].valid = TRUE;
1816 s->write_index_corrections[cidx].absolute = FALSE;
1817 s->write_index_corrections[cidx].corrupt = FALSE;
1818 s->write_index_corrections[cidx].tag = tag;
1819 s->write_index_corrections[cidx].value = 0;
1825 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1826 pa_stream *s = userdata;
1830 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1834 if (command != PA_COMMAND_REPLY) {
1835 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1838 pa_stream_set_state(s, PA_STREAM_FAILED);
1840 } else if (!pa_tagstruct_eof(t)) {
1841 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1845 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1851 int pa_stream_disconnect(pa_stream *s) {
1856 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1858 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1859 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1860 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1864 t = pa_tagstruct_command(
1866 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1867 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1869 pa_tagstruct_putu32(t, s->channel);
1870 pa_pstream_send_tagstruct(s->context->pstream, t);
1871 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1877 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1879 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1881 if (pa_detect_fork())
1884 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1887 s->read_callback = cb;
1888 s->read_userdata = userdata;
1891 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1893 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1895 if (pa_detect_fork())
1898 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1901 s->write_callback = cb;
1902 s->write_userdata = userdata;
1905 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1907 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1909 if (pa_detect_fork())
1912 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1915 s->state_callback = cb;
1916 s->state_userdata = userdata;
1919 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1921 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1923 if (pa_detect_fork())
1926 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1929 s->overflow_callback = cb;
1930 s->overflow_userdata = userdata;
1933 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1935 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1937 if (pa_detect_fork())
1940 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1943 s->underflow_callback = cb;
1944 s->underflow_userdata = userdata;
1947 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1949 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1951 if (pa_detect_fork())
1954 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1957 s->latency_update_callback = cb;
1958 s->latency_update_userdata = userdata;
1961 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1963 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1965 if (pa_detect_fork())
1968 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1971 s->moved_callback = cb;
1972 s->moved_userdata = userdata;
1975 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1977 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1979 if (pa_detect_fork())
1982 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1985 s->suspended_callback = cb;
1986 s->suspended_userdata = userdata;
1989 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1991 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1993 if (pa_detect_fork())
1996 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1999 s->started_callback = cb;
2000 s->started_userdata = userdata;
2003 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2007 if (pa_detect_fork())
2010 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2013 s->event_callback = cb;
2014 s->event_userdata = userdata;
2017 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2019 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2021 if (pa_detect_fork())
2024 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2027 s->buffer_attr_callback = cb;
2028 s->buffer_attr_userdata = userdata;
2031 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2032 pa_operation *o = userdata;
2037 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2042 if (command != PA_COMMAND_REPLY) {
2043 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2047 } else if (!pa_tagstruct_eof(t)) {
2048 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2053 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2054 cb(o->stream, success, o->userdata);
2058 pa_operation_done(o);
2059 pa_operation_unref(o);
2062 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2068 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2070 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2071 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2072 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2074 /* Ask for a timing update before we cork/uncork to get the best
2075 * accuracy for the transport latency suitable for the
2076 * check_smoother_status() call in the started callback */
2077 request_auto_timing_update(s, TRUE);
2081 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2083 t = pa_tagstruct_command(
2085 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2087 pa_tagstruct_putu32(t, s->channel);
2088 pa_tagstruct_put_boolean(t, !!b);
2089 pa_pstream_send_tagstruct(s->context->pstream, t);
2090 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2092 check_smoother_status(s, FALSE, FALSE, FALSE);
2094 /* This might cause the indexes to hang/start again, hence let's
2095 * request a timing update, after the cork/uncork, too */
2096 request_auto_timing_update(s, TRUE);
2101 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2107 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2109 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2112 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2114 t = pa_tagstruct_command(s->context, command, &tag);
2115 pa_tagstruct_putu32(t, s->channel);
2116 pa_pstream_send_tagstruct(s->context->pstream, t);
2117 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2122 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2128 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2129 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2130 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2132 /* Ask for a timing update *before* the flush, so that the
2133 * transport usec is as up to date as possible when we get the
2134 * underflow message and update the smoother status*/
2135 request_auto_timing_update(s, TRUE);
2137 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2140 if (s->direction == PA_STREAM_PLAYBACK) {
2142 if (s->write_index_corrections[s->current_write_index_correction].valid)
2143 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2145 if (s->buffer_attr.prebuf > 0)
2146 check_smoother_status(s, FALSE, FALSE, TRUE);
2148 /* This will change the write index, but leave the
2149 * read index untouched. */
2150 invalidate_indexes(s, FALSE, TRUE);
2153 /* For record streams this has no influence on the write
2154 * index, but the read index might jump. */
2155 invalidate_indexes(s, TRUE, FALSE);
2157 /* Note that we do not update requested_bytes here. This is
2158 * because we cannot really know how data actually was dropped
2159 * from the write index due to this. This 'error' will be applied
2160 * by both client and server and hence we should be fine. */
2165 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2169 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2171 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2172 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2173 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2174 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2176 /* Ask for a timing update before we cork/uncork to get the best
2177 * accuracy for the transport latency suitable for the
2178 * check_smoother_status() call in the started callback */
2179 request_auto_timing_update(s, TRUE);
2181 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2184 /* This might cause the read index to hang again, hence
2185 * let's request a timing update */
2186 request_auto_timing_update(s, TRUE);
2191 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2195 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2197 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2202 /* Ask for a timing update before we cork/uncork to get the best
2203 * accuracy for the transport latency suitable for the
2204 * check_smoother_status() call in the started callback */
2205 request_auto_timing_update(s, TRUE);
2207 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2210 /* This might cause the read index to start moving again, hence
2211 * let's request a timing update */
2212 request_auto_timing_update(s, TRUE);
2217 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2221 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2224 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2225 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2226 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2228 if (s->context->version >= 13) {
2229 pa_proplist *p = pa_proplist_new();
2231 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2232 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2233 pa_proplist_free(p);
2238 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2239 t = pa_tagstruct_command(
2241 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2243 pa_tagstruct_putu32(t, s->channel);
2244 pa_tagstruct_puts(t, name);
2245 pa_pstream_send_tagstruct(s->context->pstream, t);
2246 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2252 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2256 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2258 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2259 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2260 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2261 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2262 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2263 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2266 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2268 usec = calc_time(s, FALSE);
2270 /* Make sure the time runs monotonically */
2271 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2272 if (usec < s->previous_time)
2273 usec = s->previous_time;
2275 s->previous_time = usec;
2284 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2286 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2294 if (negative && s->direction == PA_STREAM_RECORD) {
2302 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2308 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2311 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2312 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2313 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2314 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2315 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2316 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2318 if ((r = pa_stream_get_time(s, &t)) < 0)
2321 if (s->direction == PA_STREAM_PLAYBACK)
2322 cindex = s->timing_info.write_index;
2324 cindex = s->timing_info.read_index;
2329 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2331 if (s->direction == PA_STREAM_PLAYBACK)
2332 *r_usec = time_counter_diff(s, c, t, negative);
2334 *r_usec = time_counter_diff(s, t, c, negative);
2339 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2341 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2344 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2345 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2346 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2348 return &s->timing_info;
2351 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2353 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2357 return &s->sample_spec;
2360 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2364 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2366 return &s->channel_map;
2369 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2371 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2373 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2374 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2375 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2377 return &s->buffer_attr;
2380 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2381 pa_operation *o = userdata;
2386 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2391 if (command != PA_COMMAND_REPLY) {
2392 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2397 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2398 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2399 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2400 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2401 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2402 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2405 } else if (o->stream->direction == PA_STREAM_RECORD) {
2406 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2407 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2408 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2413 if (o->stream->context->version >= 13) {
2416 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2417 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2421 if (o->stream->direction == PA_STREAM_RECORD)
2422 o->stream->timing_info.configured_source_usec = usec;
2424 o->stream->timing_info.configured_sink_usec = usec;
2427 if (!pa_tagstruct_eof(t)) {
2428 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2434 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2435 cb(o->stream, success, o->userdata);
2439 pa_operation_done(o);
2440 pa_operation_unref(o);
2444 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2448 pa_buffer_attr copy;
2451 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2454 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2455 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2456 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2457 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2459 /* Ask for a timing update before we cork/uncork to get the best
2460 * accuracy for the transport latency suitable for the
2461 * check_smoother_status() call in the started callback */
2462 request_auto_timing_update(s, TRUE);
2464 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2466 t = pa_tagstruct_command(
2468 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2470 pa_tagstruct_putu32(t, s->channel);
2473 patch_buffer_attr(s, ©, NULL);
2476 pa_tagstruct_putu32(t, attr->maxlength);
2478 if (s->direction == PA_STREAM_PLAYBACK)
2481 PA_TAG_U32, attr->tlength,
2482 PA_TAG_U32, attr->prebuf,
2483 PA_TAG_U32, attr->minreq,
2486 pa_tagstruct_putu32(t, attr->fragsize);
2488 if (s->context->version >= 13)
2489 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2491 if (s->context->version >= 14)
2492 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2494 pa_pstream_send_tagstruct(s->context->pstream, t);
2495 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2497 /* This might cause changes in the read/write indexex, hence let's
2498 * request a timing update */
2499 request_auto_timing_update(s, TRUE);
2504 uint32_t pa_stream_get_device_index(pa_stream *s) {
2506 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2508 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2509 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2510 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2511 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2512 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2514 return s->device_index;
2517 const char *pa_stream_get_device_name(pa_stream *s) {
2519 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2521 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2522 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2523 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2524 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2525 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2527 return s->device_name;
2530 int pa_stream_is_suspended(pa_stream *s) {
2532 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2534 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2535 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2536 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2537 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2539 return s->suspended;
2542 int pa_stream_is_corked(pa_stream *s) {
2544 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2546 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2547 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2548 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2553 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2554 pa_operation *o = userdata;
2559 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2564 if (command != PA_COMMAND_REPLY) {
2565 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2571 if (!pa_tagstruct_eof(t)) {
2572 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2577 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2578 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2581 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2582 cb(o->stream, success, o->userdata);
2586 pa_operation_done(o);
2587 pa_operation_unref(o);
2591 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2597 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2602 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2603 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2604 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2606 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2607 o->private = PA_UINT_TO_PTR(rate);
2609 t = pa_tagstruct_command(
2611 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2613 pa_tagstruct_putu32(t, s->channel);
2614 pa_tagstruct_putu32(t, rate);
2616 pa_pstream_send_tagstruct(s->context->pstream, t);
2617 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2622 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2628 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2630 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2631 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2632 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2634 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2636 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2638 t = pa_tagstruct_command(
2640 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2642 pa_tagstruct_putu32(t, s->channel);
2643 pa_tagstruct_putu32(t, (uint32_t) mode);
2644 pa_tagstruct_put_proplist(t, p);
2646 pa_pstream_send_tagstruct(s->context->pstream, t);
2647 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2649 /* Please note that we don't update s->proplist here, because we
2650 * don't export that field */
2655 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2659 const char * const*k;
2662 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2664 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2665 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2666 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2667 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2668 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2670 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2672 t = pa_tagstruct_command(
2674 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2676 pa_tagstruct_putu32(t, s->channel);
2678 for (k = keys; *k; k++)
2679 pa_tagstruct_puts(t, *k);
2681 pa_tagstruct_puts(t, NULL);
2683 pa_pstream_send_tagstruct(s->context->pstream, t);
2684 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2686 /* Please note that we don't update s->proplist here, because we
2687 * don't export that field */
2692 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2694 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2696 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2697 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2698 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2699 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2701 s->direct_on_input = sink_input_idx;
2706 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2708 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2710 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2711 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2712 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2714 return s->direct_on_input;