2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 static pa_stream *pa_stream_new_with_proplist_internal(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
96 pa_assert((ss == NULL && map == NULL) || formats == NULL);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101 s = pa_xnew(pa_stream, 1);
104 s->mainloop = c->mainloop;
106 s->direction = PA_STREAM_NODIRECTION;
107 s->state = PA_STREAM_UNCONNECTED;
111 s->sample_spec = *ss;
113 s->sample_spec.format = PA_SAMPLE_INVALID;
116 s->channel_map = *map;
118 pa_channel_map_init(&s->channel_map);
122 for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
124 s->req_formats[i] = pa_format_info_copy(formats[i]);
126 /* Make sure the input array was NULL-terminated */
127 pa_assert(formats[i] == NULL);
130 /* We'll get the final negotiated format after connecting */
133 s->direct_on_input = PA_INVALID_INDEX;
135 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
137 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
140 s->channel_valid = FALSE;
141 s->syncid = c->csyncid++;
142 s->stream_index = PA_INVALID_INDEX;
144 s->requested_bytes = 0;
145 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
147 /* We initialize der target length here, so that if the user
148 * passes no explicit buffering metrics the default is similar to
149 * what older PA versions provided. */
151 s->buffer_attr.maxlength = (uint32_t) -1;
153 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
155 /* FIXME: We assume a worst-case compressed format corresponding to
156 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157 pa_sample_spec tmp_ss = {
158 .format = PA_SAMPLE_S16NE,
162 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
164 s->buffer_attr.minreq = (uint32_t) -1;
165 s->buffer_attr.prebuf = (uint32_t) -1;
166 s->buffer_attr.fragsize = (uint32_t) -1;
168 s->device_index = PA_INVALID_INDEX;
169 s->device_name = NULL;
170 s->suspended = FALSE;
173 s->write_memblock = NULL;
174 s->write_data = NULL;
176 pa_memchunk_reset(&s->peek_memchunk);
178 s->record_memblockq = NULL;
180 memset(&s->timing_info, 0, sizeof(s->timing_info));
181 s->timing_info_valid = FALSE;
183 s->previous_time = 0;
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
206 pa_stream *pa_stream_new_with_proplist(
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
227 pa_stream *pa_stream_new_extended(
230 pa_format_info * const *formats,
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
238 static void stream_unlink(pa_stream *s) {
245 /* Detach from context */
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
252 pa_operation_cancel(o);
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel_valid = FALSE;
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
278 static void stream_free(pa_stream *s) {
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
290 if (s->peek_memchunk.memblock) {
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
300 pa_proplist_free(s->proplist);
303 pa_smoother_free(s->smoother);
305 for (i = 0; i < s->n_formats; i++)
306 pa_xfree(s->req_formats[i]);
308 pa_xfree(s->device_name);
312 void pa_stream_unref(pa_stream *s) {
314 pa_assert(PA_REFCNT_VALUE(s) >= 1);
316 if (PA_REFCNT_DEC(s) <= 0)
320 pa_stream* pa_stream_ref(pa_stream *s) {
322 pa_assert(PA_REFCNT_VALUE(s) >= 1);
328 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
335 pa_context* pa_stream_get_context(pa_stream *s) {
337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
342 uint32_t pa_stream_get_index(pa_stream *s) {
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
346 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
347 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
349 return s->stream_index;
352 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
354 pa_assert(PA_REFCNT_VALUE(s) >= 1);
363 if (s->state_callback)
364 s->state_callback(s, s->state_userdata);
366 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
372 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
376 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
379 if (s->state == PA_STREAM_READY &&
380 (force || !s->auto_timing_update_requested)) {
383 /* pa_log("Automatically requesting new timing data"); */
385 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
386 pa_operation_unref(o);
387 s->auto_timing_update_requested = TRUE;
391 if (s->auto_timing_update_event) {
393 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
395 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
397 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
401 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
402 pa_context *c = userdata;
407 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
410 pa_assert(PA_REFCNT_VALUE(c) >= 1);
414 if (pa_tagstruct_getu32(t, &channel) < 0 ||
415 !pa_tagstruct_eof(t)) {
416 pa_context_fail(c, PA_ERR_PROTOCOL);
420 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
423 if (s->state != PA_STREAM_READY)
426 pa_context_set_error(c, PA_ERR_KILLED);
427 pa_stream_set_state(s, PA_STREAM_FAILED);
433 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
437 pa_assert(!force_start || !force_stop);
442 x = pa_rtclock_now();
444 if (s->timing_info_valid) {
446 x -= s->timing_info.transport_usec;
448 x += s->timing_info.transport_usec;
451 if (s->suspended || s->corked || force_stop)
452 pa_smoother_pause(s->smoother, x);
453 else if (force_start || s->buffer_attr.prebuf == 0) {
455 if (!s->timing_info_valid &&
459 s->context->version >= 13) {
461 /* If the server supports STARTED events we take them as
462 * indications when audio really starts/stops playing, if
463 * we don't have any timing info yet -- instead of trying
464 * to be smart and guessing the server time. Otherwise the
465 * unknown transport delay add too much noise to our time
471 pa_smoother_resume(s->smoother, x, TRUE);
474 /* Please note that we have no idea if playback actually started
475 * if prebuf is non-zero! */
478 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
479 pa_context *c = userdata;
486 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
489 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
492 pa_assert(PA_REFCNT_VALUE(c) >= 1);
496 if (c->version < 12) {
497 pa_context_fail(c, PA_ERR_PROTOCOL);
501 if (pa_tagstruct_getu32(t, &channel) < 0 ||
502 pa_tagstruct_getu32(t, &di) < 0 ||
503 pa_tagstruct_gets(t, &dn) < 0 ||
504 pa_tagstruct_get_boolean(t, &suspended) < 0) {
505 pa_context_fail(c, PA_ERR_PROTOCOL);
509 if (c->version >= 13) {
511 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
512 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
513 pa_tagstruct_getu32(t, &fragsize) < 0 ||
514 pa_tagstruct_get_usec(t, &usec) < 0) {
515 pa_context_fail(c, PA_ERR_PROTOCOL);
519 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520 pa_tagstruct_getu32(t, &tlength) < 0 ||
521 pa_tagstruct_getu32(t, &prebuf) < 0 ||
522 pa_tagstruct_getu32(t, &minreq) < 0 ||
523 pa_tagstruct_get_usec(t, &usec) < 0) {
524 pa_context_fail(c, PA_ERR_PROTOCOL);
530 if (!pa_tagstruct_eof(t)) {
531 pa_context_fail(c, PA_ERR_PROTOCOL);
535 if (!dn || di == PA_INVALID_INDEX) {
536 pa_context_fail(c, PA_ERR_PROTOCOL);
540 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
543 if (s->state != PA_STREAM_READY)
546 if (c->version >= 13) {
547 if (s->direction == PA_STREAM_RECORD)
548 s->timing_info.configured_source_usec = usec;
550 s->timing_info.configured_sink_usec = usec;
552 s->buffer_attr.maxlength = maxlength;
553 s->buffer_attr.fragsize = fragsize;
554 s->buffer_attr.tlength = tlength;
555 s->buffer_attr.prebuf = prebuf;
556 s->buffer_attr.minreq = minreq;
559 pa_xfree(s->device_name);
560 s->device_name = pa_xstrdup(dn);
561 s->device_index = di;
563 s->suspended = suspended;
565 check_smoother_status(s, TRUE, FALSE, FALSE);
566 request_auto_timing_update(s, TRUE);
568 if (s->moved_callback)
569 s->moved_callback(s, s->moved_userdata);
575 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
576 pa_context *c = userdata;
580 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
583 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
586 pa_assert(PA_REFCNT_VALUE(c) >= 1);
590 if (c->version < 15) {
591 pa_context_fail(c, PA_ERR_PROTOCOL);
595 if (pa_tagstruct_getu32(t, &channel) < 0) {
596 pa_context_fail(c, PA_ERR_PROTOCOL);
600 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
601 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
602 pa_tagstruct_getu32(t, &fragsize) < 0 ||
603 pa_tagstruct_get_usec(t, &usec) < 0) {
604 pa_context_fail(c, PA_ERR_PROTOCOL);
608 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
609 pa_tagstruct_getu32(t, &tlength) < 0 ||
610 pa_tagstruct_getu32(t, &prebuf) < 0 ||
611 pa_tagstruct_getu32(t, &minreq) < 0 ||
612 pa_tagstruct_get_usec(t, &usec) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
618 if (!pa_tagstruct_eof(t)) {
619 pa_context_fail(c, PA_ERR_PROTOCOL);
623 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
626 if (s->state != PA_STREAM_READY)
629 if (s->direction == PA_STREAM_RECORD)
630 s->timing_info.configured_source_usec = usec;
632 s->timing_info.configured_sink_usec = usec;
634 s->buffer_attr.maxlength = maxlength;
635 s->buffer_attr.fragsize = fragsize;
636 s->buffer_attr.tlength = tlength;
637 s->buffer_attr.prebuf = prebuf;
638 s->buffer_attr.minreq = minreq;
640 request_auto_timing_update(s, TRUE);
642 if (s->buffer_attr_callback)
643 s->buffer_attr_callback(s, s->buffer_attr_userdata);
649 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650 pa_context *c = userdata;
656 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
659 pa_assert(PA_REFCNT_VALUE(c) >= 1);
663 if (c->version < 12) {
664 pa_context_fail(c, PA_ERR_PROTOCOL);
668 if (pa_tagstruct_getu32(t, &channel) < 0 ||
669 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
670 !pa_tagstruct_eof(t)) {
671 pa_context_fail(c, PA_ERR_PROTOCOL);
675 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
678 if (s->state != PA_STREAM_READY)
681 s->suspended = suspended;
683 check_smoother_status(s, TRUE, FALSE, FALSE);
684 request_auto_timing_update(s, TRUE);
686 if (s->suspended_callback)
687 s->suspended_callback(s, s->suspended_userdata);
693 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
694 pa_context *c = userdata;
699 pa_assert(command == PA_COMMAND_STARTED);
702 pa_assert(PA_REFCNT_VALUE(c) >= 1);
706 if (c->version < 13) {
707 pa_context_fail(c, PA_ERR_PROTOCOL);
711 if (pa_tagstruct_getu32(t, &channel) < 0 ||
712 !pa_tagstruct_eof(t)) {
713 pa_context_fail(c, PA_ERR_PROTOCOL);
717 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
720 if (s->state != PA_STREAM_READY)
723 check_smoother_status(s, TRUE, TRUE, FALSE);
724 request_auto_timing_update(s, TRUE);
726 if (s->started_callback)
727 s->started_callback(s, s->started_userdata);
733 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
734 pa_context *c = userdata;
737 pa_proplist *pl = NULL;
741 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
744 pa_assert(PA_REFCNT_VALUE(c) >= 1);
748 if (c->version < 15) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
753 pl = pa_proplist_new();
755 if (pa_tagstruct_getu32(t, &channel) < 0 ||
756 pa_tagstruct_gets(t, &event) < 0 ||
757 pa_tagstruct_get_proplist(t, pl) < 0 ||
758 !pa_tagstruct_eof(t) || !event) {
759 pa_context_fail(c, PA_ERR_PROTOCOL);
763 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
766 if (s->state != PA_STREAM_READY)
769 if (s->event_callback)
770 s->event_callback(s, event, pl, s->event_userdata);
776 pa_proplist_free(pl);
779 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
781 pa_context *c = userdata;
782 uint32_t bytes, channel;
785 pa_assert(command == PA_COMMAND_REQUEST);
788 pa_assert(PA_REFCNT_VALUE(c) >= 1);
792 if (pa_tagstruct_getu32(t, &channel) < 0 ||
793 pa_tagstruct_getu32(t, &bytes) < 0 ||
794 !pa_tagstruct_eof(t)) {
795 pa_context_fail(c, PA_ERR_PROTOCOL);
799 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
802 if (s->state != PA_STREAM_READY)
805 s->requested_bytes += bytes;
807 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
809 if (s->requested_bytes > 0 && s->write_callback)
810 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
816 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
818 pa_context *c = userdata;
822 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
825 pa_assert(PA_REFCNT_VALUE(c) >= 1);
829 if (pa_tagstruct_getu32(t, &channel) < 0 ||
830 !pa_tagstruct_eof(t)) {
831 pa_context_fail(c, PA_ERR_PROTOCOL);
835 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
838 if (s->state != PA_STREAM_READY)
841 if (s->buffer_attr.prebuf > 0)
842 check_smoother_status(s, TRUE, FALSE, TRUE);
844 request_auto_timing_update(s, TRUE);
846 if (command == PA_COMMAND_OVERFLOW) {
847 if (s->overflow_callback)
848 s->overflow_callback(s, s->overflow_userdata);
849 } else if (command == PA_COMMAND_UNDERFLOW) {
850 if (s->underflow_callback)
851 s->underflow_callback(s, s->underflow_userdata);
858 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
860 pa_assert(PA_REFCNT_VALUE(s) >= 1);
862 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
864 if (s->state != PA_STREAM_READY)
868 s->write_index_not_before = s->context->ctag;
870 if (s->timing_info_valid)
871 s->timing_info.write_index_corrupt = TRUE;
873 /* pa_log("write_index invalidated"); */
877 s->read_index_not_before = s->context->ctag;
879 if (s->timing_info_valid)
880 s->timing_info.read_index_corrupt = TRUE;
882 /* pa_log("read_index invalidated"); */
885 request_auto_timing_update(s, TRUE);
888 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
889 pa_stream *s = userdata;
892 pa_assert(PA_REFCNT_VALUE(s) >= 1);
895 request_auto_timing_update(s, FALSE);
899 static void create_stream_complete(pa_stream *s) {
901 pa_assert(PA_REFCNT_VALUE(s) >= 1);
902 pa_assert(s->state == PA_STREAM_CREATING);
904 pa_stream_set_state(s, PA_STREAM_READY);
906 if (s->requested_bytes > 0 && s->write_callback)
907 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
909 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
910 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
911 pa_assert(!s->auto_timing_update_event);
912 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
914 request_auto_timing_update(s, TRUE);
917 check_smoother_status(s, TRUE, FALSE, FALSE);
920 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
926 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
929 if (pa_atou(e, &ms) < 0 || ms <= 0)
930 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
932 attr->maxlength = (uint32_t) -1;
933 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
934 attr->minreq = (uint32_t) -1;
935 attr->prebuf = (uint32_t) -1;
936 attr->fragsize = attr->tlength;
940 *flags |= PA_STREAM_ADJUST_LATENCY;
943 if (s->context->version >= 13)
946 /* Version older than 0.9.10 didn't do server side buffer_attr
947 * selection, hence we have to fake it on the client side. */
949 /* We choose fairly conservative values here, to not confuse
950 * old clients with extremely large playback buffers */
952 if (attr->maxlength == (uint32_t) -1)
953 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
955 if (attr->tlength == (uint32_t) -1)
956 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
958 if (attr->minreq == (uint32_t) -1)
959 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
961 if (attr->prebuf == (uint32_t) -1)
962 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
964 if (attr->fragsize == (uint32_t) -1)
965 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
968 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
969 pa_stream *s = userdata;
970 uint32_t requested_bytes = 0;
974 pa_assert(PA_REFCNT_VALUE(s) >= 1);
975 pa_assert(s->state == PA_STREAM_CREATING);
979 if (command != PA_COMMAND_REPLY) {
980 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
983 pa_stream_set_state(s, PA_STREAM_FAILED);
987 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
988 s->channel == PA_INVALID_INDEX ||
989 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
990 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
991 pa_context_fail(s->context, PA_ERR_PROTOCOL);
995 s->requested_bytes = (int64_t) requested_bytes;
997 if (s->context->version >= 9) {
998 if (s->direction == PA_STREAM_PLAYBACK) {
999 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1000 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1001 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1002 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1006 } else if (s->direction == PA_STREAM_RECORD) {
1007 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1008 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1009 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1015 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1018 const char *dn = NULL;
1019 pa_bool_t suspended;
1021 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1022 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1023 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1024 pa_tagstruct_gets(t, &dn) < 0 ||
1025 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1026 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1030 if (!dn || s->device_index == PA_INVALID_INDEX ||
1031 ss.channels != cm.channels ||
1032 !pa_channel_map_valid(&cm) ||
1033 !pa_sample_spec_valid(&ss) ||
1034 (s->n_formats == 0 && (
1035 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1036 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1037 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1038 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1042 pa_xfree(s->device_name);
1043 s->device_name = pa_xstrdup(dn);
1044 s->suspended = suspended;
1046 s->channel_map = cm;
1047 s->sample_spec = ss;
1050 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1053 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1054 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1058 if (s->direction == PA_STREAM_RECORD)
1059 s->timing_info.configured_source_usec = usec;
1061 s->timing_info.configured_sink_usec = usec;
1064 if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1065 pa_format_info *f = pa_format_info_new();
1066 pa_tagstruct_get_format_info(t, f);
1068 if (pa_format_info_valid(f))
1071 pa_format_info_free(f);
1072 if (s->n_formats > 0) {
1073 /* We used the extended API, so we should have got back a proper format */
1074 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1080 if (!pa_tagstruct_eof(t)) {
1081 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1085 if (s->direction == PA_STREAM_RECORD) {
1086 pa_assert(!s->record_memblockq);
1088 s->record_memblockq = pa_memblockq_new(
1090 s->buffer_attr.maxlength,
1092 pa_frame_size(&s->sample_spec),
1099 s->channel_valid = TRUE;
1100 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1102 create_stream_complete(s);
1108 static int create_stream(
1109 pa_stream_direction_t direction,
1112 const pa_buffer_attr *attr,
1113 pa_stream_flags_t flags,
1114 const pa_cvolume *volume,
1115 pa_stream *sync_stream) {
1119 pa_bool_t volume_set = FALSE;
1123 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1124 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1126 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1127 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1128 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1129 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1130 PA_STREAM_INTERPOLATE_TIMING|
1131 PA_STREAM_NOT_MONOTONIC|
1132 PA_STREAM_AUTO_TIMING_UPDATE|
1133 PA_STREAM_NO_REMAP_CHANNELS|
1134 PA_STREAM_NO_REMIX_CHANNELS|
1135 PA_STREAM_FIX_FORMAT|
1137 PA_STREAM_FIX_CHANNELS|
1138 PA_STREAM_DONT_MOVE|
1139 PA_STREAM_VARIABLE_RATE|
1140 PA_STREAM_PEAK_DETECT|
1141 PA_STREAM_START_MUTED|
1142 PA_STREAM_ADJUST_LATENCY|
1143 PA_STREAM_EARLY_REQUESTS|
1144 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1145 PA_STREAM_START_UNMUTED|
1146 PA_STREAM_FAIL_ON_SUSPEND|
1147 PA_STREAM_RELATIVE_VOLUME|
1148 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1151 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1152 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1153 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1154 /* Althought some of the other flags are not supported on older
1155 * version, we don't check for them here, because it doesn't hurt
1156 * when they are passed but actually not supported. This makes
1157 * client development easier */
1159 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1160 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1161 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1162 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1163 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1167 s->direction = direction;
1170 s->syncid = sync_stream->syncid;
1173 s->buffer_attr = *attr;
1174 patch_buffer_attr(s, &s->buffer_attr, &flags);
1177 s->corked = !!(flags & PA_STREAM_START_CORKED);
1179 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1182 x = pa_rtclock_now();
1184 pa_assert(!s->smoother);
1185 s->smoother = pa_smoother_new(
1186 SMOOTHER_ADJUST_TIME,
1187 SMOOTHER_HISTORY_TIME,
1188 !(flags & PA_STREAM_NOT_MONOTONIC),
1190 SMOOTHER_MIN_HISTORY,
1196 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1198 t = pa_tagstruct_command(
1200 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1203 if (s->context->version < 13)
1204 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1208 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1209 PA_TAG_CHANNEL_MAP, &s->channel_map,
1210 PA_TAG_U32, PA_INVALID_INDEX,
1212 PA_TAG_U32, s->buffer_attr.maxlength,
1213 PA_TAG_BOOLEAN, s->corked,
1216 if (s->direction == PA_STREAM_PLAYBACK) {
1221 PA_TAG_U32, s->buffer_attr.tlength,
1222 PA_TAG_U32, s->buffer_attr.prebuf,
1223 PA_TAG_U32, s->buffer_attr.minreq,
1224 PA_TAG_U32, s->syncid,
1227 volume_set = !!volume;
1230 if (pa_sample_spec_valid(&s->sample_spec))
1231 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1233 /* This is not really relevant, since no volume was set, and
1234 * the real number of channels is embedded in the format_info
1236 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1240 pa_tagstruct_put_cvolume(t, volume);
1242 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1244 if (s->context->version >= 12) {
1247 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1248 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1249 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1250 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1251 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1252 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1253 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1257 if (s->context->version >= 13) {
1259 if (s->direction == PA_STREAM_PLAYBACK)
1260 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1262 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1266 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1267 PA_TAG_PROPLIST, s->proplist,
1270 if (s->direction == PA_STREAM_RECORD)
1271 pa_tagstruct_putu32(t, s->direct_on_input);
1274 if (s->context->version >= 14) {
1276 if (s->direction == PA_STREAM_PLAYBACK)
1277 pa_tagstruct_put_boolean(t, volume_set);
1279 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1282 if (s->context->version >= 15) {
1284 if (s->direction == PA_STREAM_PLAYBACK)
1285 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1287 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1288 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1291 if (s->context->version >= 17) {
1293 if (s->direction == PA_STREAM_PLAYBACK)
1294 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1298 if (s->context->version >= 18) {
1300 if (s->direction == PA_STREAM_PLAYBACK)
1301 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1304 if (s->context->version >= 21) {
1306 if (s->direction == PA_STREAM_PLAYBACK) {
1307 pa_tagstruct_putu8(t, s->n_formats);
1308 for (i = 0; i < s->n_formats; i++)
1309 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1313 pa_pstream_send_tagstruct(s->context->pstream, t);
1314 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1316 pa_stream_set_state(s, PA_STREAM_CREATING);
1322 int pa_stream_connect_playback(
1325 const pa_buffer_attr *attr,
1326 pa_stream_flags_t flags,
1327 const pa_cvolume *volume,
1328 pa_stream *sync_stream) {
1331 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1333 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1336 int pa_stream_connect_record(
1339 const pa_buffer_attr *attr,
1340 pa_stream_flags_t flags) {
1343 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1345 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1348 int pa_stream_begin_write(
1354 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1356 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1357 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1358 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1359 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1360 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1362 if (*nbytes != (size_t) -1) {
1365 m = pa_mempool_block_size_max(s->context->mempool);
1366 fs = pa_frame_size(&s->sample_spec);
1373 if (!s->write_memblock) {
1374 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1375 s->write_data = pa_memblock_acquire(s->write_memblock);
1378 *data = s->write_data;
1379 *nbytes = pa_memblock_get_length(s->write_memblock);
1384 int pa_stream_cancel_write(
1388 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1390 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1391 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1392 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1393 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1395 pa_assert(s->write_data);
1397 pa_memblock_release(s->write_memblock);
1398 pa_memblock_unref(s->write_memblock);
1399 s->write_memblock = NULL;
1400 s->write_data = NULL;
1405 int pa_stream_write(
1409 pa_free_cb_t free_cb,
1411 pa_seek_mode_t seek) {
1414 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1417 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1418 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1419 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1420 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1421 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1422 PA_CHECK_VALIDITY(s->context,
1423 !s->write_memblock ||
1424 ((data >= s->write_data) &&
1425 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1427 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1429 if (s->write_memblock) {
1432 /* pa_stream_write_begin() was called before */
1434 pa_memblock_release(s->write_memblock);
1436 chunk.memblock = s->write_memblock;
1437 chunk.index = (const char *) data - (const char *) s->write_data;
1438 chunk.length = length;
1440 s->write_memblock = NULL;
1441 s->write_data = NULL;
1443 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1444 pa_memblock_unref(chunk.memblock);
1447 pa_seek_mode_t t_seek = seek;
1448 int64_t t_offset = offset;
1449 size_t t_length = length;
1450 const void *t_data = data;
1452 /* pa_stream_write_begin() was not called before */
1454 while (t_length > 0) {
1459 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1460 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1461 chunk.length = t_length;
1465 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1466 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1468 d = pa_memblock_acquire(chunk.memblock);
1469 memcpy(d, t_data, chunk.length);
1470 pa_memblock_release(chunk.memblock);
1473 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1476 t_seek = PA_SEEK_RELATIVE;
1478 t_data = (const uint8_t*) t_data + chunk.length;
1479 t_length -= chunk.length;
1481 pa_memblock_unref(chunk.memblock);
1484 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1485 free_cb((void*) data);
1488 /* This is obviously wrong since we ignore the seeking index . But
1489 * that's OK, the server side applies the same error */
1490 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1492 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1494 if (s->direction == PA_STREAM_PLAYBACK) {
1496 /* Update latency request correction */
1497 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1499 if (seek == PA_SEEK_ABSOLUTE) {
1500 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1501 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1502 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1503 } else if (seek == PA_SEEK_RELATIVE) {
1504 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1505 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1507 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1510 /* Update the write index in the already available latency data */
1511 if (s->timing_info_valid) {
1513 if (seek == PA_SEEK_ABSOLUTE) {
1514 s->timing_info.write_index_corrupt = FALSE;
1515 s->timing_info.write_index = offset + (int64_t) length;
1516 } else if (seek == PA_SEEK_RELATIVE) {
1517 if (!s->timing_info.write_index_corrupt)
1518 s->timing_info.write_index += offset + (int64_t) length;
1520 s->timing_info.write_index_corrupt = TRUE;
1523 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1524 request_auto_timing_update(s, TRUE);
1530 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1532 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1536 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1537 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1538 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1540 if (!s->peek_memchunk.memblock) {
1542 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1548 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1551 pa_assert(s->peek_data);
1552 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1553 *length = s->peek_memchunk.length;
1557 int pa_stream_drop(pa_stream *s) {
1559 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1561 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1562 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1563 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1564 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1566 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1568 /* Fix the simulated local read index */
1569 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1570 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1572 pa_assert(s->peek_data);
1573 pa_memblock_release(s->peek_memchunk.memblock);
1574 pa_memblock_unref(s->peek_memchunk.memblock);
1575 pa_memchunk_reset(&s->peek_memchunk);
1580 size_t pa_stream_writable_size(pa_stream *s) {
1582 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1584 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1585 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1586 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1588 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1591 size_t pa_stream_readable_size(pa_stream *s) {
1593 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1595 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1596 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1597 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1599 return pa_memblockq_get_length(s->record_memblockq);
1602 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1608 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1610 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1611 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1612 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1614 /* Ask for a timing update before we cork/uncork to get the best
1615 * accuracy for the transport latency suitable for the
1616 * check_smoother_status() call in the started callback */
1617 request_auto_timing_update(s, TRUE);
1619 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1621 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1622 pa_tagstruct_putu32(t, s->channel);
1623 pa_pstream_send_tagstruct(s->context->pstream, t);
1624 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1626 /* This might cause the read index to conitnue again, hence
1627 * let's request a timing update */
1628 request_auto_timing_update(s, TRUE);
1633 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1637 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1638 pa_assert(s->state == PA_STREAM_READY);
1639 pa_assert(s->direction != PA_STREAM_UPLOAD);
1640 pa_assert(s->timing_info_valid);
1641 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1642 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1644 if (s->direction == PA_STREAM_PLAYBACK) {
1645 /* The last byte that was written into the output device
1646 * had this time value associated */
1647 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1649 if (!s->corked && !s->suspended) {
1651 if (!ignore_transport)
1652 /* Because the latency info took a little time to come
1653 * to us, we assume that the real output time is actually
1655 usec += s->timing_info.transport_usec;
1657 /* However, the output device usually maintains a buffer
1658 too, hence the real sample currently played is a little
1660 if (s->timing_info.sink_usec >= usec)
1663 usec -= s->timing_info.sink_usec;
1667 pa_assert(s->direction == PA_STREAM_RECORD);
1669 /* The last byte written into the server side queue had
1670 * this time value associated */
1671 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1673 if (!s->corked && !s->suspended) {
1675 if (!ignore_transport)
1676 /* Add transport latency */
1677 usec += s->timing_info.transport_usec;
1679 /* Add latency of data in device buffer */
1680 usec += s->timing_info.source_usec;
1682 /* If this is a monitor source, we need to correct the
1683 * time by the playback device buffer */
1684 if (s->timing_info.sink_usec >= usec)
1687 usec -= s->timing_info.sink_usec;
1694 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1695 pa_operation *o = userdata;
1696 struct timeval local, remote, now;
1698 pa_bool_t playing = FALSE;
1699 uint64_t underrun_for = 0, playing_for = 0;
1703 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1705 if (!o->context || !o->stream)
1708 i = &o->stream->timing_info;
1710 o->stream->timing_info_valid = FALSE;
1711 i->write_index_corrupt = TRUE;
1712 i->read_index_corrupt = TRUE;
1714 if (command != PA_COMMAND_REPLY) {
1715 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1720 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1721 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1722 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1723 pa_tagstruct_get_timeval(t, &local) < 0 ||
1724 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1725 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1726 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1728 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1732 if (o->context->version >= 13 &&
1733 o->stream->direction == PA_STREAM_PLAYBACK)
1734 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1735 pa_tagstruct_getu64(t, &playing_for) < 0) {
1737 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1742 if (!pa_tagstruct_eof(t)) {
1743 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1746 o->stream->timing_info_valid = TRUE;
1747 i->write_index_corrupt = FALSE;
1748 i->read_index_corrupt = FALSE;
1750 i->playing = (int) playing;
1751 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1753 pa_gettimeofday(&now);
1755 /* Calculcate timestamps */
1756 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1757 /* local and remote seem to have synchronized clocks */
1759 if (o->stream->direction == PA_STREAM_PLAYBACK)
1760 i->transport_usec = pa_timeval_diff(&remote, &local);
1762 i->transport_usec = pa_timeval_diff(&now, &remote);
1764 i->synchronized_clocks = TRUE;
1765 i->timestamp = remote;
1767 /* clocks are not synchronized, let's estimate latency then */
1768 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1769 i->synchronized_clocks = FALSE;
1770 i->timestamp = local;
1771 pa_timeval_add(&i->timestamp, i->transport_usec);
1774 /* Invalidate read and write indexes if necessary */
1775 if (tag < o->stream->read_index_not_before)
1776 i->read_index_corrupt = TRUE;
1778 if (tag < o->stream->write_index_not_before)
1779 i->write_index_corrupt = TRUE;
1781 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1782 /* Write index correction */
1785 uint32_t ctag = tag;
1787 /* Go through the saved correction values and add up the
1788 * total correction.*/
1789 for (n = 0, j = o->stream->current_write_index_correction+1;
1790 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1791 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1793 /* Step over invalid data or out-of-date data */
1794 if (!o->stream->write_index_corrections[j].valid ||
1795 o->stream->write_index_corrections[j].tag < ctag)
1798 /* Make sure that everything is in order */
1799 ctag = o->stream->write_index_corrections[j].tag+1;
1801 /* Now fix the write index */
1802 if (o->stream->write_index_corrections[j].corrupt) {
1803 /* A corrupting seek was made */
1804 i->write_index_corrupt = TRUE;
1805 } else if (o->stream->write_index_corrections[j].absolute) {
1806 /* An absolute seek was made */
1807 i->write_index = o->stream->write_index_corrections[j].value;
1808 i->write_index_corrupt = FALSE;
1809 } else if (!i->write_index_corrupt) {
1810 /* A relative seek was made */
1811 i->write_index += o->stream->write_index_corrections[j].value;
1815 /* Clear old correction entries */
1816 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1817 if (!o->stream->write_index_corrections[n].valid)
1820 if (o->stream->write_index_corrections[n].tag <= tag)
1821 o->stream->write_index_corrections[n].valid = FALSE;
1825 if (o->stream->direction == PA_STREAM_RECORD) {
1826 /* Read index correction */
1828 if (!i->read_index_corrupt)
1829 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1832 /* Update smoother if we're not corked */
1833 if (o->stream->smoother && !o->stream->corked) {
1836 u = x = pa_rtclock_now() - i->transport_usec;
1838 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1841 /* If we weren't playing then it will take some time
1842 * until the audio will actually come out through the
1843 * speakers. Since we follow that timing here, we need
1844 * to try to fix this up */
1846 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1848 if (su < i->sink_usec)
1849 x += i->sink_usec - su;
1853 pa_smoother_pause(o->stream->smoother, x);
1855 /* Update the smoother */
1856 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1857 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1858 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1861 pa_smoother_resume(o->stream->smoother, x, TRUE);
1865 o->stream->auto_timing_update_requested = FALSE;
1867 if (o->stream->latency_update_callback)
1868 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1870 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1871 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1872 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1877 pa_operation_done(o);
1878 pa_operation_unref(o);
1881 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1889 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1891 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1895 if (s->direction == PA_STREAM_PLAYBACK) {
1896 /* Find a place to store the write_index correction data for this entry */
1897 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1899 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1900 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1902 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1904 t = pa_tagstruct_command(
1906 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1908 pa_tagstruct_putu32(t, s->channel);
1909 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1911 pa_pstream_send_tagstruct(s->context->pstream, t);
1912 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1914 if (s->direction == PA_STREAM_PLAYBACK) {
1915 /* Fill in initial correction data */
1917 s->current_write_index_correction = cidx;
1919 s->write_index_corrections[cidx].valid = TRUE;
1920 s->write_index_corrections[cidx].absolute = FALSE;
1921 s->write_index_corrections[cidx].corrupt = FALSE;
1922 s->write_index_corrections[cidx].tag = tag;
1923 s->write_index_corrections[cidx].value = 0;
1929 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1930 pa_stream *s = userdata;
1934 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1938 if (command != PA_COMMAND_REPLY) {
1939 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1942 pa_stream_set_state(s, PA_STREAM_FAILED);
1944 } else if (!pa_tagstruct_eof(t)) {
1945 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1949 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1955 int pa_stream_disconnect(pa_stream *s) {
1960 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1962 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1963 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1964 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1968 t = pa_tagstruct_command(
1970 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1971 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1973 pa_tagstruct_putu32(t, s->channel);
1974 pa_pstream_send_tagstruct(s->context->pstream, t);
1975 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1981 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1983 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1985 if (pa_detect_fork())
1988 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1991 s->read_callback = cb;
1992 s->read_userdata = userdata;
1995 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1997 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1999 if (pa_detect_fork())
2002 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2005 s->write_callback = cb;
2006 s->write_userdata = userdata;
2009 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2011 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2013 if (pa_detect_fork())
2016 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2019 s->state_callback = cb;
2020 s->state_userdata = userdata;
2023 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2025 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2027 if (pa_detect_fork())
2030 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2033 s->overflow_callback = cb;
2034 s->overflow_userdata = userdata;
2037 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2039 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2041 if (pa_detect_fork())
2044 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2047 s->underflow_callback = cb;
2048 s->underflow_userdata = userdata;
2051 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2053 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2055 if (pa_detect_fork())
2058 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2061 s->latency_update_callback = cb;
2062 s->latency_update_userdata = userdata;
2065 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2067 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2069 if (pa_detect_fork())
2072 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2075 s->moved_callback = cb;
2076 s->moved_userdata = userdata;
2079 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2081 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2083 if (pa_detect_fork())
2086 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2089 s->suspended_callback = cb;
2090 s->suspended_userdata = userdata;
2093 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2095 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2097 if (pa_detect_fork())
2100 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2103 s->started_callback = cb;
2104 s->started_userdata = userdata;
2107 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2109 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2111 if (pa_detect_fork())
2114 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2117 s->event_callback = cb;
2118 s->event_userdata = userdata;
2121 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2123 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2125 if (pa_detect_fork())
2128 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2131 s->buffer_attr_callback = cb;
2132 s->buffer_attr_userdata = userdata;
2135 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2136 pa_operation *o = userdata;
2141 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2146 if (command != PA_COMMAND_REPLY) {
2147 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2151 } else if (!pa_tagstruct_eof(t)) {
2152 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2157 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2158 cb(o->stream, success, o->userdata);
2162 pa_operation_done(o);
2163 pa_operation_unref(o);
2166 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2172 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2174 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2175 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2176 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2178 /* Ask for a timing update before we cork/uncork to get the best
2179 * accuracy for the transport latency suitable for the
2180 * check_smoother_status() call in the started callback */
2181 request_auto_timing_update(s, TRUE);
2185 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2187 t = pa_tagstruct_command(
2189 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2191 pa_tagstruct_putu32(t, s->channel);
2192 pa_tagstruct_put_boolean(t, !!b);
2193 pa_pstream_send_tagstruct(s->context->pstream, t);
2194 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2196 check_smoother_status(s, FALSE, FALSE, FALSE);
2198 /* This might cause the indexes to hang/start again, hence let's
2199 * request a timing update, after the cork/uncork, too */
2200 request_auto_timing_update(s, TRUE);
2205 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2211 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2213 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2214 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2216 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2218 t = pa_tagstruct_command(s->context, command, &tag);
2219 pa_tagstruct_putu32(t, s->channel);
2220 pa_pstream_send_tagstruct(s->context->pstream, t);
2221 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2226 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2230 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2232 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2236 /* Ask for a timing update *before* the flush, so that the
2237 * transport usec is as up to date as possible when we get the
2238 * underflow message and update the smoother status*/
2239 request_auto_timing_update(s, TRUE);
2241 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2244 if (s->direction == PA_STREAM_PLAYBACK) {
2246 if (s->write_index_corrections[s->current_write_index_correction].valid)
2247 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2249 if (s->buffer_attr.prebuf > 0)
2250 check_smoother_status(s, FALSE, FALSE, TRUE);
2252 /* This will change the write index, but leave the
2253 * read index untouched. */
2254 invalidate_indexes(s, FALSE, TRUE);
2257 /* For record streams this has no influence on the write
2258 * index, but the read index might jump. */
2259 invalidate_indexes(s, TRUE, FALSE);
2261 /* Note that we do not update requested_bytes here. This is
2262 * because we cannot really know how data actually was dropped
2263 * from the write index due to this. This 'error' will be applied
2264 * by both client and server and hence we should be fine. */
2269 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2273 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2275 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2276 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2277 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2278 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2280 /* Ask for a timing update before we cork/uncork to get the best
2281 * accuracy for the transport latency suitable for the
2282 * check_smoother_status() call in the started callback */
2283 request_auto_timing_update(s, TRUE);
2285 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2288 /* This might cause the read index to hang again, hence
2289 * let's request a timing update */
2290 request_auto_timing_update(s, TRUE);
2295 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2299 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2301 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2302 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2303 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2304 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2306 /* Ask for a timing update before we cork/uncork to get the best
2307 * accuracy for the transport latency suitable for the
2308 * check_smoother_status() call in the started callback */
2309 request_auto_timing_update(s, TRUE);
2311 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2314 /* This might cause the read index to start moving again, hence
2315 * let's request a timing update */
2316 request_auto_timing_update(s, TRUE);
2321 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2328 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2330 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2332 if (s->context->version >= 13) {
2333 pa_proplist *p = pa_proplist_new();
2335 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2336 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2337 pa_proplist_free(p);
2342 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2343 t = pa_tagstruct_command(
2345 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2347 pa_tagstruct_putu32(t, s->channel);
2348 pa_tagstruct_puts(t, name);
2349 pa_pstream_send_tagstruct(s->context->pstream, t);
2350 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2356 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2360 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2362 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2363 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2364 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2365 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2366 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2367 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2370 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2372 usec = calc_time(s, FALSE);
2374 /* Make sure the time runs monotonically */
2375 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2376 if (usec < s->previous_time)
2377 usec = s->previous_time;
2379 s->previous_time = usec;
2388 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2390 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2398 if (negative && s->direction == PA_STREAM_RECORD) {
2406 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2412 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2416 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2417 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2418 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2419 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2420 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2422 if ((r = pa_stream_get_time(s, &t)) < 0)
2425 if (s->direction == PA_STREAM_PLAYBACK)
2426 cindex = s->timing_info.write_index;
2428 cindex = s->timing_info.read_index;
2433 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2435 if (s->direction == PA_STREAM_PLAYBACK)
2436 *r_usec = time_counter_diff(s, c, t, negative);
2438 *r_usec = time_counter_diff(s, t, c, negative);
2443 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2445 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2447 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2448 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2449 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2450 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2452 return &s->timing_info;
2455 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2457 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2459 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2461 return &s->sample_spec;
2464 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2468 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2470 return &s->channel_map;
2473 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2475 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2477 /* We don't have the format till routing is done */
2478 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2479 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2483 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2485 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2487 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2488 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2491 return &s->buffer_attr;
2494 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2495 pa_operation *o = userdata;
2500 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2505 if (command != PA_COMMAND_REPLY) {
2506 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2511 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2512 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2513 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2514 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2515 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2516 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2519 } else if (o->stream->direction == PA_STREAM_RECORD) {
2520 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2521 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2522 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2527 if (o->stream->context->version >= 13) {
2530 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2531 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2535 if (o->stream->direction == PA_STREAM_RECORD)
2536 o->stream->timing_info.configured_source_usec = usec;
2538 o->stream->timing_info.configured_sink_usec = usec;
2541 if (!pa_tagstruct_eof(t)) {
2542 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2548 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2549 cb(o->stream, success, o->userdata);
2553 pa_operation_done(o);
2554 pa_operation_unref(o);
2558 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2562 pa_buffer_attr copy;
2565 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2569 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2570 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2573 /* Ask for a timing update before we cork/uncork to get the best
2574 * accuracy for the transport latency suitable for the
2575 * check_smoother_status() call in the started callback */
2576 request_auto_timing_update(s, TRUE);
2578 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2580 t = pa_tagstruct_command(
2582 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2584 pa_tagstruct_putu32(t, s->channel);
2587 patch_buffer_attr(s, ©, NULL);
2590 pa_tagstruct_putu32(t, attr->maxlength);
2592 if (s->direction == PA_STREAM_PLAYBACK)
2595 PA_TAG_U32, attr->tlength,
2596 PA_TAG_U32, attr->prebuf,
2597 PA_TAG_U32, attr->minreq,
2600 pa_tagstruct_putu32(t, attr->fragsize);
2602 if (s->context->version >= 13)
2603 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2605 if (s->context->version >= 14)
2606 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2608 pa_pstream_send_tagstruct(s->context->pstream, t);
2609 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2611 /* This might cause changes in the read/write indexex, hence let's
2612 * request a timing update */
2613 request_auto_timing_update(s, TRUE);
2618 uint32_t pa_stream_get_device_index(pa_stream *s) {
2620 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2622 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2623 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2624 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2625 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2626 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2628 return s->device_index;
2631 const char *pa_stream_get_device_name(pa_stream *s) {
2633 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2635 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2636 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2637 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2638 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2639 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2641 return s->device_name;
2644 int pa_stream_is_suspended(pa_stream *s) {
2646 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2648 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2649 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2650 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2651 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2653 return s->suspended;
2656 int pa_stream_is_corked(pa_stream *s) {
2658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2661 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2667 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2668 pa_operation *o = userdata;
2673 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2678 if (command != PA_COMMAND_REPLY) {
2679 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2685 if (!pa_tagstruct_eof(t)) {
2686 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2691 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2692 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2695 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2696 cb(o->stream, success, o->userdata);
2700 pa_operation_done(o);
2701 pa_operation_unref(o);
2705 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2711 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2715 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2716 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2717 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2718 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2720 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2721 o->private = PA_UINT_TO_PTR(rate);
2723 t = pa_tagstruct_command(
2725 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2727 pa_tagstruct_putu32(t, s->channel);
2728 pa_tagstruct_putu32(t, rate);
2730 pa_pstream_send_tagstruct(s->context->pstream, t);
2731 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2736 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2742 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2745 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2746 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2747 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2750 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2752 t = pa_tagstruct_command(
2754 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2756 pa_tagstruct_putu32(t, s->channel);
2757 pa_tagstruct_putu32(t, (uint32_t) mode);
2758 pa_tagstruct_put_proplist(t, p);
2760 pa_pstream_send_tagstruct(s->context->pstream, t);
2761 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2763 /* Please note that we don't update s->proplist here, because we
2764 * don't export that field */
2769 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2773 const char * const*k;
2776 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2778 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2779 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2780 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2781 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2782 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2784 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2786 t = pa_tagstruct_command(
2788 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2790 pa_tagstruct_putu32(t, s->channel);
2792 for (k = keys; *k; k++)
2793 pa_tagstruct_puts(t, *k);
2795 pa_tagstruct_puts(t, NULL);
2797 pa_pstream_send_tagstruct(s->context->pstream, t);
2798 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2800 /* Please note that we don't update s->proplist here, because we
2801 * don't export that field */
2806 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2808 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2810 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2811 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2812 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2813 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2815 s->direct_on_input = sink_input_idx;
2820 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2822 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2824 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2825 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2826 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2828 return s->direct_on_input;