2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
44 #include "fork-detect.h"
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 pa_stream *pa_stream_new_with_proplist(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
106 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
108 s = pa_xnew(pa_stream, 1);
111 s->mainloop = c->mainloop;
113 s->direction = PA_STREAM_NODIRECTION;
114 s->state = PA_STREAM_UNCONNECTED;
117 s->sample_spec = *ss;
118 s->channel_map = *map;
120 s->direct_on_input = PA_INVALID_INDEX;
122 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
124 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
127 s->channel_valid = FALSE;
128 s->syncid = c->csyncid++;
129 s->stream_index = PA_INVALID_INDEX;
131 s->requested_bytes = 0;
132 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
134 /* We initialize der target length here, so that if the user
135 * passes no explicit buffering metrics the default is similar to
136 * what older PA versions provided. */
138 s->buffer_attr.maxlength = (uint32_t) -1;
139 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140 s->buffer_attr.minreq = (uint32_t) -1;
141 s->buffer_attr.prebuf = (uint32_t) -1;
142 s->buffer_attr.fragsize = (uint32_t) -1;
144 s->device_index = PA_INVALID_INDEX;
145 s->device_name = NULL;
146 s->suspended = FALSE;
149 s->write_memblock = NULL;
150 s->write_data = NULL;
152 pa_memchunk_reset(&s->peek_memchunk);
154 s->record_memblockq = NULL;
156 memset(&s->timing_info, 0, sizeof(s->timing_info));
157 s->timing_info_valid = FALSE;
159 s->previous_time = 0;
161 s->read_index_not_before = 0;
162 s->write_index_not_before = 0;
163 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164 s->write_index_corrections[i].valid = 0;
165 s->current_write_index_correction = 0;
167 s->auto_timing_update_event = NULL;
168 s->auto_timing_update_requested = FALSE;
169 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
175 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176 PA_LLIST_PREPEND(pa_stream, c->streams, s);
182 static void stream_unlink(pa_stream *s) {
189 /* Detach from context */
191 /* Unref all operatio object that point to us */
192 for (o = s->context->operations; o; o = n) {
196 pa_operation_cancel(o);
199 /* Drop all outstanding replies for this stream */
200 if (s->context->pdispatch)
201 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
203 if (s->channel_valid) {
204 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
206 s->channel_valid = FALSE;
209 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
214 if (s->auto_timing_update_event) {
215 pa_assert(s->mainloop);
216 s->mainloop->time_free(s->auto_timing_update_event);
222 static void stream_free(pa_stream *s) {
227 if (s->write_memblock) {
228 pa_memblock_release(s->write_memblock);
229 pa_memblock_unref(s->write_data);
232 if (s->peek_memchunk.memblock) {
234 pa_memblock_release(s->peek_memchunk.memblock);
235 pa_memblock_unref(s->peek_memchunk.memblock);
238 if (s->record_memblockq)
239 pa_memblockq_free(s->record_memblockq);
242 pa_proplist_free(s->proplist);
245 pa_smoother_free(s->smoother);
247 pa_xfree(s->device_name);
251 void pa_stream_unref(pa_stream *s) {
253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
255 if (PA_REFCNT_DEC(s) <= 0)
259 pa_stream* pa_stream_ref(pa_stream *s) {
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
269 pa_assert(PA_REFCNT_VALUE(s) >= 1);
274 pa_context* pa_stream_get_context(pa_stream *s) {
276 pa_assert(PA_REFCNT_VALUE(s) >= 1);
281 uint32_t pa_stream_get_index(pa_stream *s) {
283 pa_assert(PA_REFCNT_VALUE(s) >= 1);
285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
288 return s->stream_index;
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
293 pa_assert(PA_REFCNT_VALUE(s) >= 1);
302 if (s->state_callback)
303 s->state_callback(s, s->state_userdata);
305 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
313 pa_assert(PA_REFCNT_VALUE(s) >= 1);
315 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
318 if (s->state == PA_STREAM_READY &&
319 (force || !s->auto_timing_update_requested)) {
322 /* pa_log("Automatically requesting new timing data"); */
324 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325 pa_operation_unref(o);
326 s->auto_timing_update_requested = TRUE;
330 if (s->auto_timing_update_event) {
332 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
334 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
336 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341 pa_context *c = userdata;
346 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
349 pa_assert(PA_REFCNT_VALUE(c) >= 1);
353 if (pa_tagstruct_getu32(t, &channel) < 0 ||
354 !pa_tagstruct_eof(t)) {
355 pa_context_fail(c, PA_ERR_PROTOCOL);
359 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
362 if (s->state != PA_STREAM_READY)
365 pa_context_set_error(c, PA_ERR_KILLED);
366 pa_stream_set_state(s, PA_STREAM_FAILED);
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
376 pa_assert(!force_start || !force_stop);
381 x = pa_rtclock_now();
383 if (s->timing_info_valid) {
385 x -= s->timing_info.transport_usec;
387 x += s->timing_info.transport_usec;
390 if (s->suspended || s->corked || force_stop)
391 pa_smoother_pause(s->smoother, x);
392 else if (force_start || s->buffer_attr.prebuf == 0) {
394 if (!s->timing_info_valid &&
398 s->context->version >= 13) {
400 /* If the server supports STARTED events we take them as
401 * indications when audio really starts/stops playing, if
402 * we don't have any timing info yet -- instead of trying
403 * to be smart and guessing the server time. Otherwise the
404 * unknown transport delay add too much noise to our time
410 pa_smoother_resume(s->smoother, x, TRUE);
413 /* Please note that we have no idea if playback actually started
414 * if prebuf is non-zero! */
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418 pa_context *c = userdata;
425 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
428 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
431 pa_assert(PA_REFCNT_VALUE(c) >= 1);
435 if (c->version < 12) {
436 pa_context_fail(c, PA_ERR_PROTOCOL);
440 if (pa_tagstruct_getu32(t, &channel) < 0 ||
441 pa_tagstruct_getu32(t, &di) < 0 ||
442 pa_tagstruct_gets(t, &dn) < 0 ||
443 pa_tagstruct_get_boolean(t, &suspended) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
448 if (c->version >= 13) {
450 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453 pa_tagstruct_get_usec(t, &usec) < 0) {
454 pa_context_fail(c, PA_ERR_PROTOCOL);
458 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459 pa_tagstruct_getu32(t, &tlength) < 0 ||
460 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461 pa_tagstruct_getu32(t, &minreq) < 0 ||
462 pa_tagstruct_get_usec(t, &usec) < 0) {
463 pa_context_fail(c, PA_ERR_PROTOCOL);
469 if (!pa_tagstruct_eof(t)) {
470 pa_context_fail(c, PA_ERR_PROTOCOL);
474 if (!dn || di == PA_INVALID_INDEX) {
475 pa_context_fail(c, PA_ERR_PROTOCOL);
479 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
482 if (s->state != PA_STREAM_READY)
485 if (c->version >= 13) {
486 if (s->direction == PA_STREAM_RECORD)
487 s->timing_info.configured_source_usec = usec;
489 s->timing_info.configured_sink_usec = usec;
491 s->buffer_attr.maxlength = maxlength;
492 s->buffer_attr.fragsize = fragsize;
493 s->buffer_attr.tlength = tlength;
494 s->buffer_attr.prebuf = prebuf;
495 s->buffer_attr.minreq = minreq;
498 pa_xfree(s->device_name);
499 s->device_name = pa_xstrdup(dn);
500 s->device_index = di;
502 s->suspended = suspended;
504 check_smoother_status(s, TRUE, FALSE, FALSE);
505 request_auto_timing_update(s, TRUE);
507 if (s->moved_callback)
508 s->moved_callback(s, s->moved_userdata);
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515 pa_context *c = userdata;
519 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
522 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
525 pa_assert(PA_REFCNT_VALUE(c) >= 1);
529 if (c->version < 15) {
530 pa_context_fail(c, PA_ERR_PROTOCOL);
534 if (pa_tagstruct_getu32(t, &channel) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
539 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541 pa_tagstruct_getu32(t, &fragsize) < 0 ||
542 pa_tagstruct_get_usec(t, &usec) < 0) {
543 pa_context_fail(c, PA_ERR_PROTOCOL);
547 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548 pa_tagstruct_getu32(t, &tlength) < 0 ||
549 pa_tagstruct_getu32(t, &prebuf) < 0 ||
550 pa_tagstruct_getu32(t, &minreq) < 0 ||
551 pa_tagstruct_get_usec(t, &usec) < 0) {
552 pa_context_fail(c, PA_ERR_PROTOCOL);
557 if (!pa_tagstruct_eof(t)) {
558 pa_context_fail(c, PA_ERR_PROTOCOL);
562 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
565 if (s->state != PA_STREAM_READY)
568 if (s->direction == PA_STREAM_RECORD)
569 s->timing_info.configured_source_usec = usec;
571 s->timing_info.configured_sink_usec = usec;
573 s->buffer_attr.maxlength = maxlength;
574 s->buffer_attr.fragsize = fragsize;
575 s->buffer_attr.tlength = tlength;
576 s->buffer_attr.prebuf = prebuf;
577 s->buffer_attr.minreq = minreq;
579 request_auto_timing_update(s, TRUE);
581 if (s->buffer_attr_callback)
582 s->buffer_attr_callback(s, s->buffer_attr_userdata);
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589 pa_context *c = userdata;
595 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
598 pa_assert(PA_REFCNT_VALUE(c) >= 1);
602 if (c->version < 12) {
603 pa_context_fail(c, PA_ERR_PROTOCOL);
607 if (pa_tagstruct_getu32(t, &channel) < 0 ||
608 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609 !pa_tagstruct_eof(t)) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
614 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
617 if (s->state != PA_STREAM_READY)
620 s->suspended = suspended;
622 check_smoother_status(s, TRUE, FALSE, FALSE);
623 request_auto_timing_update(s, TRUE);
625 if (s->suspended_callback)
626 s->suspended_callback(s, s->suspended_userdata);
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633 pa_context *c = userdata;
638 pa_assert(command == PA_COMMAND_STARTED);
641 pa_assert(PA_REFCNT_VALUE(c) >= 1);
645 if (c->version < 13) {
646 pa_context_fail(c, PA_ERR_PROTOCOL);
650 if (pa_tagstruct_getu32(t, &channel) < 0 ||
651 !pa_tagstruct_eof(t)) {
652 pa_context_fail(c, PA_ERR_PROTOCOL);
656 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
659 if (s->state != PA_STREAM_READY)
662 check_smoother_status(s, TRUE, TRUE, FALSE);
663 request_auto_timing_update(s, TRUE);
665 if (s->started_callback)
666 s->started_callback(s, s->started_userdata);
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
676 pa_proplist *pl = NULL;
680 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
683 pa_assert(PA_REFCNT_VALUE(c) >= 1);
687 if (c->version < 15) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
692 pl = pa_proplist_new();
694 if (pa_tagstruct_getu32(t, &channel) < 0 ||
695 pa_tagstruct_gets(t, &event) < 0 ||
696 pa_tagstruct_get_proplist(t, pl) < 0 ||
697 !pa_tagstruct_eof(t) || !event) {
698 pa_context_fail(c, PA_ERR_PROTOCOL);
702 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
705 if (s->state != PA_STREAM_READY)
708 if (s->event_callback)
709 s->event_callback(s, event, pl, s->event_userdata);
715 pa_proplist_free(pl);
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
720 pa_context *c = userdata;
721 uint32_t bytes, channel;
724 pa_assert(command == PA_COMMAND_REQUEST);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
731 if (pa_tagstruct_getu32(t, &channel) < 0 ||
732 pa_tagstruct_getu32(t, &bytes) < 0 ||
733 !pa_tagstruct_eof(t)) {
734 pa_context_fail(c, PA_ERR_PROTOCOL);
738 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741 if (s->state != PA_STREAM_READY)
744 s->requested_bytes += bytes;
746 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
748 if (s->requested_bytes > 0 && s->write_callback)
749 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
761 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
764 pa_assert(PA_REFCNT_VALUE(c) >= 1);
768 if (pa_tagstruct_getu32(t, &channel) < 0 ||
769 !pa_tagstruct_eof(t)) {
770 pa_context_fail(c, PA_ERR_PROTOCOL);
774 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
777 if (s->state != PA_STREAM_READY)
780 if (s->buffer_attr.prebuf > 0)
781 check_smoother_status(s, TRUE, FALSE, TRUE);
783 request_auto_timing_update(s, TRUE);
785 if (command == PA_COMMAND_OVERFLOW) {
786 if (s->overflow_callback)
787 s->overflow_callback(s, s->overflow_userdata);
788 } else if (command == PA_COMMAND_UNDERFLOW) {
789 if (s->underflow_callback)
790 s->underflow_callback(s, s->underflow_userdata);
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
799 pa_assert(PA_REFCNT_VALUE(s) >= 1);
801 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
803 if (s->state != PA_STREAM_READY)
807 s->write_index_not_before = s->context->ctag;
809 if (s->timing_info_valid)
810 s->timing_info.write_index_corrupt = TRUE;
812 /* pa_log("write_index invalidated"); */
816 s->read_index_not_before = s->context->ctag;
818 if (s->timing_info_valid)
819 s->timing_info.read_index_corrupt = TRUE;
821 /* pa_log("read_index invalidated"); */
824 request_auto_timing_update(s, TRUE);
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828 pa_stream *s = userdata;
831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
834 request_auto_timing_update(s, FALSE);
838 static void create_stream_complete(pa_stream *s) {
840 pa_assert(PA_REFCNT_VALUE(s) >= 1);
841 pa_assert(s->state == PA_STREAM_CREATING);
843 pa_stream_set_state(s, PA_STREAM_READY);
845 if (s->requested_bytes > 0 && s->write_callback)
846 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
848 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850 pa_assert(!s->auto_timing_update_event);
851 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
853 request_auto_timing_update(s, TRUE);
856 check_smoother_status(s, TRUE, FALSE, FALSE);
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
865 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
868 if (pa_atou(e, &ms) < 0 || ms <= 0)
869 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
871 attr->maxlength = (uint32_t) -1;
872 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873 attr->minreq = (uint32_t) -1;
874 attr->prebuf = (uint32_t) -1;
875 attr->fragsize = attr->tlength;
879 *flags |= PA_STREAM_ADJUST_LATENCY;
882 if (s->context->version >= 13)
885 /* Version older than 0.9.10 didn't do server side buffer_attr
886 * selection, hence we have to fake it on the client side. */
888 /* We choose fairly conservative values here, to not confuse
889 * old clients with extremely large playback buffers */
891 if (attr->maxlength == (uint32_t) -1)
892 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
894 if (attr->tlength == (uint32_t) -1)
895 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
897 if (attr->minreq == (uint32_t) -1)
898 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
900 if (attr->prebuf == (uint32_t) -1)
901 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
903 if (attr->fragsize == (uint32_t) -1)
904 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908 pa_stream *s = userdata;
909 uint32_t requested_bytes = 0;
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
914 pa_assert(s->state == PA_STREAM_CREATING);
918 if (command != PA_COMMAND_REPLY) {
919 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
922 pa_stream_set_state(s, PA_STREAM_FAILED);
926 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927 s->channel == PA_INVALID_INDEX ||
928 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
929 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930 pa_context_fail(s->context, PA_ERR_PROTOCOL);
934 s->requested_bytes = (int64_t) requested_bytes;
936 if (s->context->version >= 9) {
937 if (s->direction == PA_STREAM_PLAYBACK) {
938 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942 pa_context_fail(s->context, PA_ERR_PROTOCOL);
945 } else if (s->direction == PA_STREAM_RECORD) {
946 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948 pa_context_fail(s->context, PA_ERR_PROTOCOL);
954 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
957 const char *dn = NULL;
960 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963 pa_tagstruct_gets(t, &dn) < 0 ||
964 pa_tagstruct_get_boolean(t, &suspended) < 0) {
965 pa_context_fail(s->context, PA_ERR_PROTOCOL);
969 if (!dn || s->device_index == PA_INVALID_INDEX ||
970 ss.channels != cm.channels ||
971 !pa_channel_map_valid(&cm) ||
972 !pa_sample_spec_valid(&ss) ||
973 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976 pa_context_fail(s->context, PA_ERR_PROTOCOL);
980 pa_xfree(s->device_name);
981 s->device_name = pa_xstrdup(dn);
982 s->suspended = suspended;
988 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
991 if (pa_tagstruct_get_usec(t, &usec) < 0) {
992 pa_context_fail(s->context, PA_ERR_PROTOCOL);
996 if (s->direction == PA_STREAM_RECORD)
997 s->timing_info.configured_source_usec = usec;
999 s->timing_info.configured_sink_usec = usec;
1002 if (!pa_tagstruct_eof(t)) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1007 if (s->direction == PA_STREAM_RECORD) {
1008 pa_assert(!s->record_memblockq);
1010 s->record_memblockq = pa_memblockq_new(
1012 s->buffer_attr.maxlength,
1014 pa_frame_size(&s->sample_spec),
1021 s->channel_valid = TRUE;
1022 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1024 create_stream_complete(s);
1030 static int create_stream(
1031 pa_stream_direction_t direction,
1034 const pa_buffer_attr *attr,
1035 pa_stream_flags_t flags,
1036 const pa_cvolume *volume,
1037 pa_stream *sync_stream) {
1041 pa_bool_t volume_set = FALSE;
1044 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1047 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051 PA_STREAM_INTERPOLATE_TIMING|
1052 PA_STREAM_NOT_MONOTONIC|
1053 PA_STREAM_AUTO_TIMING_UPDATE|
1054 PA_STREAM_NO_REMAP_CHANNELS|
1055 PA_STREAM_NO_REMIX_CHANNELS|
1056 PA_STREAM_FIX_FORMAT|
1058 PA_STREAM_FIX_CHANNELS|
1059 PA_STREAM_DONT_MOVE|
1060 PA_STREAM_VARIABLE_RATE|
1061 PA_STREAM_PEAK_DETECT|
1062 PA_STREAM_START_MUTED|
1063 PA_STREAM_ADJUST_LATENCY|
1064 PA_STREAM_EARLY_REQUESTS|
1065 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066 PA_STREAM_START_UNMUTED|
1067 PA_STREAM_FAIL_ON_SUSPEND|
1068 PA_STREAM_RELATIVE_VOLUME|
1069 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1072 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1073 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1074 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1075 /* Althought some of the other flags are not supported on older
1076 * version, we don't check for them here, because it doesn't hurt
1077 * when they are passed but actually not supported. This makes
1078 * client development easier */
1080 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1081 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1082 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1083 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1084 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1088 s->direction = direction;
1091 s->syncid = sync_stream->syncid;
1094 s->buffer_attr = *attr;
1095 patch_buffer_attr(s, &s->buffer_attr, &flags);
1098 s->corked = !!(flags & PA_STREAM_START_CORKED);
1100 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1103 x = pa_rtclock_now();
1105 pa_assert(!s->smoother);
1106 s->smoother = pa_smoother_new(
1107 SMOOTHER_ADJUST_TIME,
1108 SMOOTHER_HISTORY_TIME,
1109 !(flags & PA_STREAM_NOT_MONOTONIC),
1111 SMOOTHER_MIN_HISTORY,
1117 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1119 t = pa_tagstruct_command(
1121 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1124 if (s->context->version < 13)
1125 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1129 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1130 PA_TAG_CHANNEL_MAP, &s->channel_map,
1131 PA_TAG_U32, PA_INVALID_INDEX,
1133 PA_TAG_U32, s->buffer_attr.maxlength,
1134 PA_TAG_BOOLEAN, s->corked,
1137 if (s->direction == PA_STREAM_PLAYBACK) {
1142 PA_TAG_U32, s->buffer_attr.tlength,
1143 PA_TAG_U32, s->buffer_attr.prebuf,
1144 PA_TAG_U32, s->buffer_attr.minreq,
1145 PA_TAG_U32, s->syncid,
1148 volume_set = !!volume;
1151 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1153 pa_tagstruct_put_cvolume(t, volume);
1155 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1157 if (s->context->version >= 12) {
1160 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1161 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1162 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1163 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1164 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1165 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1166 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1170 if (s->context->version >= 13) {
1172 if (s->direction == PA_STREAM_PLAYBACK)
1173 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1175 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1179 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1180 PA_TAG_PROPLIST, s->proplist,
1183 if (s->direction == PA_STREAM_RECORD)
1184 pa_tagstruct_putu32(t, s->direct_on_input);
1187 if (s->context->version >= 14) {
1189 if (s->direction == PA_STREAM_PLAYBACK)
1190 pa_tagstruct_put_boolean(t, volume_set);
1192 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1195 if (s->context->version >= 15) {
1197 if (s->direction == PA_STREAM_PLAYBACK)
1198 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1200 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1201 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1204 if (s->context->version >= 17) {
1206 if (s->direction == PA_STREAM_PLAYBACK)
1207 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1211 if (s->context->version >= 18) {
1213 if (s->direction == PA_STREAM_PLAYBACK)
1214 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1217 pa_pstream_send_tagstruct(s->context->pstream, t);
1218 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1220 pa_stream_set_state(s, PA_STREAM_CREATING);
1226 int pa_stream_connect_playback(
1229 const pa_buffer_attr *attr,
1230 pa_stream_flags_t flags,
1231 const pa_cvolume *volume,
1232 pa_stream *sync_stream) {
1235 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1237 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1240 int pa_stream_connect_record(
1243 const pa_buffer_attr *attr,
1244 pa_stream_flags_t flags) {
1247 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1249 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1252 int pa_stream_begin_write(
1258 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1260 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1261 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1262 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1263 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1264 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1266 if (*nbytes != (size_t) -1) {
1269 m = pa_mempool_block_size_max(s->context->mempool);
1270 fs = pa_frame_size(&s->sample_spec);
1277 if (!s->write_memblock) {
1278 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1279 s->write_data = pa_memblock_acquire(s->write_memblock);
1282 *data = s->write_data;
1283 *nbytes = pa_memblock_get_length(s->write_memblock);
1288 int pa_stream_cancel_write(
1292 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1294 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1295 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1296 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1297 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1299 pa_assert(s->write_data);
1301 pa_memblock_release(s->write_memblock);
1302 pa_memblock_unref(s->write_memblock);
1303 s->write_memblock = NULL;
1304 s->write_data = NULL;
1309 int pa_stream_write(
1313 pa_free_cb_t free_cb,
1315 pa_seek_mode_t seek) {
1318 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1321 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1322 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1323 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1324 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1325 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1326 PA_CHECK_VALIDITY(s->context,
1327 !s->write_memblock ||
1328 ((data >= s->write_data) &&
1329 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1331 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1333 if (s->write_memblock) {
1336 /* pa_stream_write_begin() was called before */
1338 pa_memblock_release(s->write_memblock);
1340 chunk.memblock = s->write_memblock;
1341 chunk.index = (const char *) data - (const char *) s->write_data;
1342 chunk.length = length;
1344 s->write_memblock = NULL;
1345 s->write_data = NULL;
1347 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1348 pa_memblock_unref(chunk.memblock);
1351 pa_seek_mode_t t_seek = seek;
1352 int64_t t_offset = offset;
1353 size_t t_length = length;
1354 const void *t_data = data;
1356 /* pa_stream_write_begin() was not called before */
1358 while (t_length > 0) {
1363 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1364 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1365 chunk.length = t_length;
1369 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1370 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1372 d = pa_memblock_acquire(chunk.memblock);
1373 memcpy(d, t_data, chunk.length);
1374 pa_memblock_release(chunk.memblock);
1377 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1380 t_seek = PA_SEEK_RELATIVE;
1382 t_data = (const uint8_t*) t_data + chunk.length;
1383 t_length -= chunk.length;
1385 pa_memblock_unref(chunk.memblock);
1388 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1389 free_cb((void*) data);
1392 /* This is obviously wrong since we ignore the seeking index . But
1393 * that's OK, the server side applies the same error */
1394 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1396 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1398 if (s->direction == PA_STREAM_PLAYBACK) {
1400 /* Update latency request correction */
1401 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1403 if (seek == PA_SEEK_ABSOLUTE) {
1404 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1405 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1406 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1407 } else if (seek == PA_SEEK_RELATIVE) {
1408 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1409 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1411 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1414 /* Update the write index in the already available latency data */
1415 if (s->timing_info_valid) {
1417 if (seek == PA_SEEK_ABSOLUTE) {
1418 s->timing_info.write_index_corrupt = FALSE;
1419 s->timing_info.write_index = offset + (int64_t) length;
1420 } else if (seek == PA_SEEK_RELATIVE) {
1421 if (!s->timing_info.write_index_corrupt)
1422 s->timing_info.write_index += offset + (int64_t) length;
1424 s->timing_info.write_index_corrupt = TRUE;
1427 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1428 request_auto_timing_update(s, TRUE);
1434 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1436 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1440 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1441 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1442 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1444 if (!s->peek_memchunk.memblock) {
1446 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1452 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1455 pa_assert(s->peek_data);
1456 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1457 *length = s->peek_memchunk.length;
1461 int pa_stream_drop(pa_stream *s) {
1463 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1465 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1466 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1467 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1468 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1470 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1472 /* Fix the simulated local read index */
1473 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1474 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1476 pa_assert(s->peek_data);
1477 pa_memblock_release(s->peek_memchunk.memblock);
1478 pa_memblock_unref(s->peek_memchunk.memblock);
1479 pa_memchunk_reset(&s->peek_memchunk);
1484 size_t pa_stream_writable_size(pa_stream *s) {
1486 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1488 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1489 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1490 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1492 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1495 size_t pa_stream_readable_size(pa_stream *s) {
1497 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1499 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1500 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1501 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1503 return pa_memblockq_get_length(s->record_memblockq);
1506 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1512 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1514 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1515 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1516 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1518 /* Ask for a timing update before we cork/uncork to get the best
1519 * accuracy for the transport latency suitable for the
1520 * check_smoother_status() call in the started callback */
1521 request_auto_timing_update(s, TRUE);
1523 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1525 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1526 pa_tagstruct_putu32(t, s->channel);
1527 pa_pstream_send_tagstruct(s->context->pstream, t);
1528 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1530 /* This might cause the read index to conitnue again, hence
1531 * let's request a timing update */
1532 request_auto_timing_update(s, TRUE);
1537 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1541 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1542 pa_assert(s->state == PA_STREAM_READY);
1543 pa_assert(s->direction != PA_STREAM_UPLOAD);
1544 pa_assert(s->timing_info_valid);
1545 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1546 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1548 if (s->direction == PA_STREAM_PLAYBACK) {
1549 /* The last byte that was written into the output device
1550 * had this time value associated */
1551 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1553 if (!s->corked && !s->suspended) {
1555 if (!ignore_transport)
1556 /* Because the latency info took a little time to come
1557 * to us, we assume that the real output time is actually
1559 usec += s->timing_info.transport_usec;
1561 /* However, the output device usually maintains a buffer
1562 too, hence the real sample currently played is a little
1564 if (s->timing_info.sink_usec >= usec)
1567 usec -= s->timing_info.sink_usec;
1571 pa_assert(s->direction == PA_STREAM_RECORD);
1573 /* The last byte written into the server side queue had
1574 * this time value associated */
1575 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1577 if (!s->corked && !s->suspended) {
1579 if (!ignore_transport)
1580 /* Add transport latency */
1581 usec += s->timing_info.transport_usec;
1583 /* Add latency of data in device buffer */
1584 usec += s->timing_info.source_usec;
1586 /* If this is a monitor source, we need to correct the
1587 * time by the playback device buffer */
1588 if (s->timing_info.sink_usec >= usec)
1591 usec -= s->timing_info.sink_usec;
1598 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1599 pa_operation *o = userdata;
1600 struct timeval local, remote, now;
1602 pa_bool_t playing = FALSE;
1603 uint64_t underrun_for = 0, playing_for = 0;
1607 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1609 if (!o->context || !o->stream)
1612 i = &o->stream->timing_info;
1614 o->stream->timing_info_valid = FALSE;
1615 i->write_index_corrupt = TRUE;
1616 i->read_index_corrupt = TRUE;
1618 if (command != PA_COMMAND_REPLY) {
1619 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1624 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1625 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1626 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1627 pa_tagstruct_get_timeval(t, &local) < 0 ||
1628 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1629 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1630 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1632 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1636 if (o->context->version >= 13 &&
1637 o->stream->direction == PA_STREAM_PLAYBACK)
1638 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1639 pa_tagstruct_getu64(t, &playing_for) < 0) {
1641 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1646 if (!pa_tagstruct_eof(t)) {
1647 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1650 o->stream->timing_info_valid = TRUE;
1651 i->write_index_corrupt = FALSE;
1652 i->read_index_corrupt = FALSE;
1654 i->playing = (int) playing;
1655 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1657 pa_gettimeofday(&now);
1659 /* Calculcate timestamps */
1660 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1661 /* local and remote seem to have synchronized clocks */
1663 if (o->stream->direction == PA_STREAM_PLAYBACK)
1664 i->transport_usec = pa_timeval_diff(&remote, &local);
1666 i->transport_usec = pa_timeval_diff(&now, &remote);
1668 i->synchronized_clocks = TRUE;
1669 i->timestamp = remote;
1671 /* clocks are not synchronized, let's estimate latency then */
1672 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1673 i->synchronized_clocks = FALSE;
1674 i->timestamp = local;
1675 pa_timeval_add(&i->timestamp, i->transport_usec);
1678 /* Invalidate read and write indexes if necessary */
1679 if (tag < o->stream->read_index_not_before)
1680 i->read_index_corrupt = TRUE;
1682 if (tag < o->stream->write_index_not_before)
1683 i->write_index_corrupt = TRUE;
1685 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1686 /* Write index correction */
1689 uint32_t ctag = tag;
1691 /* Go through the saved correction values and add up the
1692 * total correction.*/
1693 for (n = 0, j = o->stream->current_write_index_correction+1;
1694 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1695 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1697 /* Step over invalid data or out-of-date data */
1698 if (!o->stream->write_index_corrections[j].valid ||
1699 o->stream->write_index_corrections[j].tag < ctag)
1702 /* Make sure that everything is in order */
1703 ctag = o->stream->write_index_corrections[j].tag+1;
1705 /* Now fix the write index */
1706 if (o->stream->write_index_corrections[j].corrupt) {
1707 /* A corrupting seek was made */
1708 i->write_index_corrupt = TRUE;
1709 } else if (o->stream->write_index_corrections[j].absolute) {
1710 /* An absolute seek was made */
1711 i->write_index = o->stream->write_index_corrections[j].value;
1712 i->write_index_corrupt = FALSE;
1713 } else if (!i->write_index_corrupt) {
1714 /* A relative seek was made */
1715 i->write_index += o->stream->write_index_corrections[j].value;
1719 /* Clear old correction entries */
1720 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1721 if (!o->stream->write_index_corrections[n].valid)
1724 if (o->stream->write_index_corrections[n].tag <= tag)
1725 o->stream->write_index_corrections[n].valid = FALSE;
1729 if (o->stream->direction == PA_STREAM_RECORD) {
1730 /* Read index correction */
1732 if (!i->read_index_corrupt)
1733 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1736 /* Update smoother if we're not corked */
1737 if (o->stream->smoother && !o->stream->corked) {
1740 u = x = pa_rtclock_now() - i->transport_usec;
1742 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1745 /* If we weren't playing then it will take some time
1746 * until the audio will actually come out through the
1747 * speakers. Since we follow that timing here, we need
1748 * to try to fix this up */
1750 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1752 if (su < i->sink_usec)
1753 x += i->sink_usec - su;
1757 pa_smoother_pause(o->stream->smoother, x);
1759 /* Update the smoother */
1760 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1761 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1762 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1765 pa_smoother_resume(o->stream->smoother, x, TRUE);
1769 o->stream->auto_timing_update_requested = FALSE;
1771 if (o->stream->latency_update_callback)
1772 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1774 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1775 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1776 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1781 pa_operation_done(o);
1782 pa_operation_unref(o);
1785 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1793 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1795 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1796 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1797 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1799 if (s->direction == PA_STREAM_PLAYBACK) {
1800 /* Find a place to store the write_index correction data for this entry */
1801 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1803 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1804 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1806 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1808 t = pa_tagstruct_command(
1810 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1812 pa_tagstruct_putu32(t, s->channel);
1813 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1815 pa_pstream_send_tagstruct(s->context->pstream, t);
1816 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1818 if (s->direction == PA_STREAM_PLAYBACK) {
1819 /* Fill in initial correction data */
1821 s->current_write_index_correction = cidx;
1823 s->write_index_corrections[cidx].valid = TRUE;
1824 s->write_index_corrections[cidx].absolute = FALSE;
1825 s->write_index_corrections[cidx].corrupt = FALSE;
1826 s->write_index_corrections[cidx].tag = tag;
1827 s->write_index_corrections[cidx].value = 0;
1833 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1834 pa_stream *s = userdata;
1838 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1842 if (command != PA_COMMAND_REPLY) {
1843 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1846 pa_stream_set_state(s, PA_STREAM_FAILED);
1848 } else if (!pa_tagstruct_eof(t)) {
1849 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1853 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1859 int pa_stream_disconnect(pa_stream *s) {
1864 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1866 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1867 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1868 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1872 t = pa_tagstruct_command(
1874 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1875 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1877 pa_tagstruct_putu32(t, s->channel);
1878 pa_pstream_send_tagstruct(s->context->pstream, t);
1879 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1885 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1887 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1889 if (pa_detect_fork())
1892 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1895 s->read_callback = cb;
1896 s->read_userdata = userdata;
1899 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1901 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1903 if (pa_detect_fork())
1906 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1909 s->write_callback = cb;
1910 s->write_userdata = userdata;
1913 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1915 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1917 if (pa_detect_fork())
1920 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1923 s->state_callback = cb;
1924 s->state_userdata = userdata;
1927 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1929 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1931 if (pa_detect_fork())
1934 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1937 s->overflow_callback = cb;
1938 s->overflow_userdata = userdata;
1941 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1943 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1945 if (pa_detect_fork())
1948 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1951 s->underflow_callback = cb;
1952 s->underflow_userdata = userdata;
1955 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1957 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1959 if (pa_detect_fork())
1962 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1965 s->latency_update_callback = cb;
1966 s->latency_update_userdata = userdata;
1969 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1971 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1973 if (pa_detect_fork())
1976 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1979 s->moved_callback = cb;
1980 s->moved_userdata = userdata;
1983 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1985 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1987 if (pa_detect_fork())
1990 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1993 s->suspended_callback = cb;
1994 s->suspended_userdata = userdata;
1997 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1999 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2001 if (pa_detect_fork())
2004 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2007 s->started_callback = cb;
2008 s->started_userdata = userdata;
2011 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015 if (pa_detect_fork())
2018 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2021 s->event_callback = cb;
2022 s->event_userdata = userdata;
2025 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2029 if (pa_detect_fork())
2032 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2035 s->buffer_attr_callback = cb;
2036 s->buffer_attr_userdata = userdata;
2039 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2040 pa_operation *o = userdata;
2045 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2050 if (command != PA_COMMAND_REPLY) {
2051 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2055 } else if (!pa_tagstruct_eof(t)) {
2056 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2061 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2062 cb(o->stream, success, o->userdata);
2066 pa_operation_done(o);
2067 pa_operation_unref(o);
2070 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2076 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2078 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2079 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2080 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2082 /* Ask for a timing update before we cork/uncork to get the best
2083 * accuracy for the transport latency suitable for the
2084 * check_smoother_status() call in the started callback */
2085 request_auto_timing_update(s, TRUE);
2089 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2091 t = pa_tagstruct_command(
2093 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2095 pa_tagstruct_putu32(t, s->channel);
2096 pa_tagstruct_put_boolean(t, !!b);
2097 pa_pstream_send_tagstruct(s->context->pstream, t);
2098 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2100 check_smoother_status(s, FALSE, FALSE, FALSE);
2102 /* This might cause the indexes to hang/start again, hence let's
2103 * request a timing update, after the cork/uncork, too */
2104 request_auto_timing_update(s, TRUE);
2109 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2115 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2117 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2118 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2120 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2122 t = pa_tagstruct_command(s->context, command, &tag);
2123 pa_tagstruct_putu32(t, s->channel);
2124 pa_pstream_send_tagstruct(s->context->pstream, t);
2125 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2130 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2134 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2136 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2137 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2138 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2140 /* Ask for a timing update *before* the flush, so that the
2141 * transport usec is as up to date as possible when we get the
2142 * underflow message and update the smoother status*/
2143 request_auto_timing_update(s, TRUE);
2145 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2148 if (s->direction == PA_STREAM_PLAYBACK) {
2150 if (s->write_index_corrections[s->current_write_index_correction].valid)
2151 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2153 if (s->buffer_attr.prebuf > 0)
2154 check_smoother_status(s, FALSE, FALSE, TRUE);
2156 /* This will change the write index, but leave the
2157 * read index untouched. */
2158 invalidate_indexes(s, FALSE, TRUE);
2161 /* For record streams this has no influence on the write
2162 * index, but the read index might jump. */
2163 invalidate_indexes(s, TRUE, FALSE);
2165 /* Note that we do not update requested_bytes here. This is
2166 * because we cannot really know how data actually was dropped
2167 * from the write index due to this. This 'error' will be applied
2168 * by both client and server and hence we should be fine. */
2173 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2177 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2179 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2180 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2181 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2182 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2184 /* Ask for a timing update before we cork/uncork to get the best
2185 * accuracy for the transport latency suitable for the
2186 * check_smoother_status() call in the started callback */
2187 request_auto_timing_update(s, TRUE);
2189 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2192 /* This might cause the read index to hang again, hence
2193 * let's request a timing update */
2194 request_auto_timing_update(s, TRUE);
2199 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2203 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2205 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2206 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2207 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2208 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2210 /* Ask for a timing update before we cork/uncork to get the best
2211 * accuracy for the transport latency suitable for the
2212 * check_smoother_status() call in the started callback */
2213 request_auto_timing_update(s, TRUE);
2215 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2218 /* This might cause the read index to start moving again, hence
2219 * let's request a timing update */
2220 request_auto_timing_update(s, TRUE);
2225 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2229 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2232 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2236 if (s->context->version >= 13) {
2237 pa_proplist *p = pa_proplist_new();
2239 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2240 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2241 pa_proplist_free(p);
2246 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2247 t = pa_tagstruct_command(
2249 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2251 pa_tagstruct_putu32(t, s->channel);
2252 pa_tagstruct_puts(t, name);
2253 pa_pstream_send_tagstruct(s->context->pstream, t);
2254 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2260 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2264 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2266 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2267 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2268 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2269 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2270 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2271 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2274 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2276 usec = calc_time(s, FALSE);
2278 /* Make sure the time runs monotonically */
2279 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2280 if (usec < s->previous_time)
2281 usec = s->previous_time;
2283 s->previous_time = usec;
2292 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2294 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2302 if (negative && s->direction == PA_STREAM_RECORD) {
2310 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2316 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2319 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2320 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2321 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2322 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2323 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2324 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2326 if ((r = pa_stream_get_time(s, &t)) < 0)
2329 if (s->direction == PA_STREAM_PLAYBACK)
2330 cindex = s->timing_info.write_index;
2332 cindex = s->timing_info.read_index;
2337 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2339 if (s->direction == PA_STREAM_PLAYBACK)
2340 *r_usec = time_counter_diff(s, c, t, negative);
2342 *r_usec = time_counter_diff(s, t, c, negative);
2347 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2351 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2356 return &s->timing_info;
2359 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2363 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365 return &s->sample_spec;
2368 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2370 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2372 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2374 return &s->channel_map;
2377 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2385 return &s->buffer_attr;
2388 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2389 pa_operation *o = userdata;
2394 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2399 if (command != PA_COMMAND_REPLY) {
2400 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2405 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2406 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2407 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2408 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2409 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2410 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2413 } else if (o->stream->direction == PA_STREAM_RECORD) {
2414 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2415 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2416 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2421 if (o->stream->context->version >= 13) {
2424 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2425 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2429 if (o->stream->direction == PA_STREAM_RECORD)
2430 o->stream->timing_info.configured_source_usec = usec;
2432 o->stream->timing_info.configured_sink_usec = usec;
2435 if (!pa_tagstruct_eof(t)) {
2436 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2442 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2443 cb(o->stream, success, o->userdata);
2447 pa_operation_done(o);
2448 pa_operation_unref(o);
2452 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2456 pa_buffer_attr copy;
2459 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2462 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2463 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2467 /* Ask for a timing update before we cork/uncork to get the best
2468 * accuracy for the transport latency suitable for the
2469 * check_smoother_status() call in the started callback */
2470 request_auto_timing_update(s, TRUE);
2472 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2474 t = pa_tagstruct_command(
2476 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2478 pa_tagstruct_putu32(t, s->channel);
2481 patch_buffer_attr(s, ©, NULL);
2484 pa_tagstruct_putu32(t, attr->maxlength);
2486 if (s->direction == PA_STREAM_PLAYBACK)
2489 PA_TAG_U32, attr->tlength,
2490 PA_TAG_U32, attr->prebuf,
2491 PA_TAG_U32, attr->minreq,
2494 pa_tagstruct_putu32(t, attr->fragsize);
2496 if (s->context->version >= 13)
2497 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2499 if (s->context->version >= 14)
2500 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2502 pa_pstream_send_tagstruct(s->context->pstream, t);
2503 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2505 /* This might cause changes in the read/write indexex, hence let's
2506 * request a timing update */
2507 request_auto_timing_update(s, TRUE);
2512 uint32_t pa_stream_get_device_index(pa_stream *s) {
2514 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2516 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2517 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2518 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2519 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2520 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2522 return s->device_index;
2525 const char *pa_stream_get_device_name(pa_stream *s) {
2527 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2529 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2530 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2532 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2533 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2535 return s->device_name;
2538 int pa_stream_is_suspended(pa_stream *s) {
2540 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2542 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2543 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2544 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2545 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2547 return s->suspended;
2550 int pa_stream_is_corked(pa_stream *s) {
2552 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2554 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2555 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2556 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2561 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2562 pa_operation *o = userdata;
2567 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2572 if (command != PA_COMMAND_REPLY) {
2573 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2579 if (!pa_tagstruct_eof(t)) {
2580 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2585 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2586 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2589 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2590 cb(o->stream, success, o->userdata);
2594 pa_operation_done(o);
2595 pa_operation_unref(o);
2599 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2605 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2607 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2608 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2609 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2610 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2611 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2612 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2614 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2615 o->private = PA_UINT_TO_PTR(rate);
2617 t = pa_tagstruct_command(
2619 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2621 pa_tagstruct_putu32(t, s->channel);
2622 pa_tagstruct_putu32(t, rate);
2624 pa_pstream_send_tagstruct(s->context->pstream, t);
2625 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2630 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2636 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2638 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2640 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2641 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2644 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2646 t = pa_tagstruct_command(
2648 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2650 pa_tagstruct_putu32(t, s->channel);
2651 pa_tagstruct_putu32(t, (uint32_t) mode);
2652 pa_tagstruct_put_proplist(t, p);
2654 pa_pstream_send_tagstruct(s->context->pstream, t);
2655 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2657 /* Please note that we don't update s->proplist here, because we
2658 * don't export that field */
2663 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2667 const char * const*k;
2670 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2672 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2673 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2674 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2675 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2676 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2678 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2680 t = pa_tagstruct_command(
2682 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2684 pa_tagstruct_putu32(t, s->channel);
2686 for (k = keys; *k; k++)
2687 pa_tagstruct_puts(t, *k);
2689 pa_tagstruct_puts(t, NULL);
2691 pa_pstream_send_tagstruct(s->context->pstream, t);
2692 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2694 /* Please note that we don't update s->proplist here, because we
2695 * don't export that field */
2700 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2702 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2704 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2705 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2706 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2707 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2709 s->direct_on_input = sink_input_idx;
2714 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2716 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2718 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2719 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2720 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2722 return s->direct_on_input;