2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 static pa_stream *pa_stream_new_with_proplist_internal(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
96 pa_assert((ss == NULL && map == NULL) || formats == NULL);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
101 s = pa_xnew(pa_stream, 1);
104 s->mainloop = c->mainloop;
106 s->direction = PA_STREAM_NODIRECTION;
107 s->state = PA_STREAM_UNCONNECTED;
111 s->sample_spec = *ss;
113 s->sample_spec.format = PA_SAMPLE_INVALID;
116 s->channel_map = *map;
118 pa_channel_map_init(&s->channel_map);
122 for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
124 s->req_formats[i] = pa_format_info_copy(formats[i]);
126 /* Make sure the input array was NULL-terminated */
127 pa_assert(formats[i] == NULL);
130 /* We'll get the final negotiated format after connecting */
133 s->direct_on_input = PA_INVALID_INDEX;
135 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
137 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
140 s->channel_valid = FALSE;
141 s->syncid = c->csyncid++;
142 s->stream_index = PA_INVALID_INDEX;
144 s->requested_bytes = 0;
145 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
147 /* We initialize der target length here, so that if the user
148 * passes no explicit buffering metrics the default is similar to
149 * what older PA versions provided. */
151 s->buffer_attr.maxlength = (uint32_t) -1;
153 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
155 /* FIXME: We assume a worst-case compressed format corresponding to
156 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157 pa_sample_spec tmp_ss = {
158 .format = PA_SAMPLE_S16NE,
162 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
164 s->buffer_attr.minreq = (uint32_t) -1;
165 s->buffer_attr.prebuf = (uint32_t) -1;
166 s->buffer_attr.fragsize = (uint32_t) -1;
168 s->device_index = PA_INVALID_INDEX;
169 s->device_name = NULL;
170 s->suspended = FALSE;
173 s->write_memblock = NULL;
174 s->write_data = NULL;
176 pa_memchunk_reset(&s->peek_memchunk);
178 s->record_memblockq = NULL;
180 memset(&s->timing_info, 0, sizeof(s->timing_info));
181 s->timing_info_valid = FALSE;
183 s->previous_time = 0;
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
206 pa_stream *pa_stream_new_with_proplist(
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
227 pa_stream *pa_stream_new_extended(
230 pa_format_info * const *formats,
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
238 static void stream_unlink(pa_stream *s) {
245 /* Detach from context */
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
252 pa_operation_cancel(o);
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel_valid = FALSE;
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
278 static void stream_free(pa_stream *s) {
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
290 if (s->peek_memchunk.memblock) {
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
300 pa_proplist_free(s->proplist);
303 pa_smoother_free(s->smoother);
305 for (i = 0; i < s->n_formats; i++)
306 pa_xfree(s->req_formats[i]);
308 pa_xfree(s->device_name);
312 void pa_stream_unref(pa_stream *s) {
314 pa_assert(PA_REFCNT_VALUE(s) >= 1);
316 if (PA_REFCNT_DEC(s) <= 0)
320 pa_stream* pa_stream_ref(pa_stream *s) {
322 pa_assert(PA_REFCNT_VALUE(s) >= 1);
328 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
335 pa_context* pa_stream_get_context(pa_stream *s) {
337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
342 uint32_t pa_stream_get_index(pa_stream *s) {
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
346 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
347 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
349 return s->stream_index;
352 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
354 pa_assert(PA_REFCNT_VALUE(s) >= 1);
363 if (s->state_callback)
364 s->state_callback(s, s->state_userdata);
366 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
372 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
376 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
379 if (s->state == PA_STREAM_READY &&
380 (force || !s->auto_timing_update_requested)) {
383 /* pa_log("Automatically requesting new timing data"); */
385 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
386 pa_operation_unref(o);
387 s->auto_timing_update_requested = TRUE;
391 if (s->auto_timing_update_event) {
393 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
395 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
397 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
401 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
402 pa_context *c = userdata;
407 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
410 pa_assert(PA_REFCNT_VALUE(c) >= 1);
414 if (pa_tagstruct_getu32(t, &channel) < 0 ||
415 !pa_tagstruct_eof(t)) {
416 pa_context_fail(c, PA_ERR_PROTOCOL);
420 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
423 if (s->state != PA_STREAM_READY)
426 pa_context_set_error(c, PA_ERR_KILLED);
427 pa_stream_set_state(s, PA_STREAM_FAILED);
433 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
437 pa_assert(!force_start || !force_stop);
442 x = pa_rtclock_now();
444 if (s->timing_info_valid) {
446 x -= s->timing_info.transport_usec;
448 x += s->timing_info.transport_usec;
451 if (s->suspended || s->corked || force_stop)
452 pa_smoother_pause(s->smoother, x);
453 else if (force_start || s->buffer_attr.prebuf == 0) {
455 if (!s->timing_info_valid &&
459 s->context->version >= 13) {
461 /* If the server supports STARTED events we take them as
462 * indications when audio really starts/stops playing, if
463 * we don't have any timing info yet -- instead of trying
464 * to be smart and guessing the server time. Otherwise the
465 * unknown transport delay add too much noise to our time
471 pa_smoother_resume(s->smoother, x, TRUE);
474 /* Please note that we have no idea if playback actually started
475 * if prebuf is non-zero! */
478 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
479 pa_context *c = userdata;
486 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
489 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
492 pa_assert(PA_REFCNT_VALUE(c) >= 1);
496 if (c->version < 12) {
497 pa_context_fail(c, PA_ERR_PROTOCOL);
501 if (pa_tagstruct_getu32(t, &channel) < 0 ||
502 pa_tagstruct_getu32(t, &di) < 0 ||
503 pa_tagstruct_gets(t, &dn) < 0 ||
504 pa_tagstruct_get_boolean(t, &suspended) < 0) {
505 pa_context_fail(c, PA_ERR_PROTOCOL);
509 if (c->version >= 13) {
511 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
512 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
513 pa_tagstruct_getu32(t, &fragsize) < 0 ||
514 pa_tagstruct_get_usec(t, &usec) < 0) {
515 pa_context_fail(c, PA_ERR_PROTOCOL);
519 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520 pa_tagstruct_getu32(t, &tlength) < 0 ||
521 pa_tagstruct_getu32(t, &prebuf) < 0 ||
522 pa_tagstruct_getu32(t, &minreq) < 0 ||
523 pa_tagstruct_get_usec(t, &usec) < 0) {
524 pa_context_fail(c, PA_ERR_PROTOCOL);
530 if (!pa_tagstruct_eof(t)) {
531 pa_context_fail(c, PA_ERR_PROTOCOL);
535 if (!dn || di == PA_INVALID_INDEX) {
536 pa_context_fail(c, PA_ERR_PROTOCOL);
540 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
543 if (s->state != PA_STREAM_READY)
546 if (c->version >= 13) {
547 if (s->direction == PA_STREAM_RECORD)
548 s->timing_info.configured_source_usec = usec;
550 s->timing_info.configured_sink_usec = usec;
552 s->buffer_attr.maxlength = maxlength;
553 s->buffer_attr.fragsize = fragsize;
554 s->buffer_attr.tlength = tlength;
555 s->buffer_attr.prebuf = prebuf;
556 s->buffer_attr.minreq = minreq;
559 pa_xfree(s->device_name);
560 s->device_name = pa_xstrdup(dn);
561 s->device_index = di;
563 s->suspended = suspended;
565 check_smoother_status(s, TRUE, FALSE, FALSE);
566 request_auto_timing_update(s, TRUE);
568 if (s->moved_callback)
569 s->moved_callback(s, s->moved_userdata);
575 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
576 pa_context *c = userdata;
580 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
583 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
586 pa_assert(PA_REFCNT_VALUE(c) >= 1);
590 if (c->version < 15) {
591 pa_context_fail(c, PA_ERR_PROTOCOL);
595 if (pa_tagstruct_getu32(t, &channel) < 0) {
596 pa_context_fail(c, PA_ERR_PROTOCOL);
600 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
601 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
602 pa_tagstruct_getu32(t, &fragsize) < 0 ||
603 pa_tagstruct_get_usec(t, &usec) < 0) {
604 pa_context_fail(c, PA_ERR_PROTOCOL);
608 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
609 pa_tagstruct_getu32(t, &tlength) < 0 ||
610 pa_tagstruct_getu32(t, &prebuf) < 0 ||
611 pa_tagstruct_getu32(t, &minreq) < 0 ||
612 pa_tagstruct_get_usec(t, &usec) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
618 if (!pa_tagstruct_eof(t)) {
619 pa_context_fail(c, PA_ERR_PROTOCOL);
623 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
626 if (s->state != PA_STREAM_READY)
629 if (s->direction == PA_STREAM_RECORD)
630 s->timing_info.configured_source_usec = usec;
632 s->timing_info.configured_sink_usec = usec;
634 s->buffer_attr.maxlength = maxlength;
635 s->buffer_attr.fragsize = fragsize;
636 s->buffer_attr.tlength = tlength;
637 s->buffer_attr.prebuf = prebuf;
638 s->buffer_attr.minreq = minreq;
640 request_auto_timing_update(s, TRUE);
642 if (s->buffer_attr_callback)
643 s->buffer_attr_callback(s, s->buffer_attr_userdata);
649 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650 pa_context *c = userdata;
656 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
659 pa_assert(PA_REFCNT_VALUE(c) >= 1);
663 if (c->version < 12) {
664 pa_context_fail(c, PA_ERR_PROTOCOL);
668 if (pa_tagstruct_getu32(t, &channel) < 0 ||
669 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
670 !pa_tagstruct_eof(t)) {
671 pa_context_fail(c, PA_ERR_PROTOCOL);
675 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
678 if (s->state != PA_STREAM_READY)
681 s->suspended = suspended;
683 check_smoother_status(s, TRUE, FALSE, FALSE);
684 request_auto_timing_update(s, TRUE);
686 if (s->suspended_callback)
687 s->suspended_callback(s, s->suspended_userdata);
693 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
694 pa_context *c = userdata;
699 pa_assert(command == PA_COMMAND_STARTED);
702 pa_assert(PA_REFCNT_VALUE(c) >= 1);
706 if (c->version < 13) {
707 pa_context_fail(c, PA_ERR_PROTOCOL);
711 if (pa_tagstruct_getu32(t, &channel) < 0 ||
712 !pa_tagstruct_eof(t)) {
713 pa_context_fail(c, PA_ERR_PROTOCOL);
717 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
720 if (s->state != PA_STREAM_READY)
723 check_smoother_status(s, TRUE, TRUE, FALSE);
724 request_auto_timing_update(s, TRUE);
726 if (s->started_callback)
727 s->started_callback(s, s->started_userdata);
733 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
734 pa_context *c = userdata;
737 pa_proplist *pl = NULL;
741 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
744 pa_assert(PA_REFCNT_VALUE(c) >= 1);
748 if (c->version < 15) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
753 pl = pa_proplist_new();
755 if (pa_tagstruct_getu32(t, &channel) < 0 ||
756 pa_tagstruct_gets(t, &event) < 0 ||
757 pa_tagstruct_get_proplist(t, pl) < 0 ||
758 !pa_tagstruct_eof(t) || !event) {
759 pa_context_fail(c, PA_ERR_PROTOCOL);
763 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
766 if (s->state != PA_STREAM_READY)
769 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
770 /* Let client know what the running time was when the stream had to be
773 if (pa_stream_get_time(s, &time) == 0)
774 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
777 if (s->event_callback)
778 s->event_callback(s, event, pl, s->event_userdata);
784 pa_proplist_free(pl);
787 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
789 pa_context *c = userdata;
790 uint32_t bytes, channel;
793 pa_assert(command == PA_COMMAND_REQUEST);
796 pa_assert(PA_REFCNT_VALUE(c) >= 1);
800 if (pa_tagstruct_getu32(t, &channel) < 0 ||
801 pa_tagstruct_getu32(t, &bytes) < 0 ||
802 !pa_tagstruct_eof(t)) {
803 pa_context_fail(c, PA_ERR_PROTOCOL);
807 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
810 if (s->state != PA_STREAM_READY)
813 s->requested_bytes += bytes;
815 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
817 if (s->requested_bytes > 0 && s->write_callback)
818 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
824 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
826 pa_context *c = userdata;
830 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
833 pa_assert(PA_REFCNT_VALUE(c) >= 1);
837 if (pa_tagstruct_getu32(t, &channel) < 0 ||
838 !pa_tagstruct_eof(t)) {
839 pa_context_fail(c, PA_ERR_PROTOCOL);
843 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
846 if (s->state != PA_STREAM_READY)
849 if (s->buffer_attr.prebuf > 0)
850 check_smoother_status(s, TRUE, FALSE, TRUE);
852 request_auto_timing_update(s, TRUE);
854 if (command == PA_COMMAND_OVERFLOW) {
855 if (s->overflow_callback)
856 s->overflow_callback(s, s->overflow_userdata);
857 } else if (command == PA_COMMAND_UNDERFLOW) {
858 if (s->underflow_callback)
859 s->underflow_callback(s, s->underflow_userdata);
866 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
868 pa_assert(PA_REFCNT_VALUE(s) >= 1);
870 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
872 if (s->state != PA_STREAM_READY)
876 s->write_index_not_before = s->context->ctag;
878 if (s->timing_info_valid)
879 s->timing_info.write_index_corrupt = TRUE;
881 /* pa_log("write_index invalidated"); */
885 s->read_index_not_before = s->context->ctag;
887 if (s->timing_info_valid)
888 s->timing_info.read_index_corrupt = TRUE;
890 /* pa_log("read_index invalidated"); */
893 request_auto_timing_update(s, TRUE);
896 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
897 pa_stream *s = userdata;
900 pa_assert(PA_REFCNT_VALUE(s) >= 1);
903 request_auto_timing_update(s, FALSE);
907 static void create_stream_complete(pa_stream *s) {
909 pa_assert(PA_REFCNT_VALUE(s) >= 1);
910 pa_assert(s->state == PA_STREAM_CREATING);
912 pa_stream_set_state(s, PA_STREAM_READY);
914 if (s->requested_bytes > 0 && s->write_callback)
915 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
917 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
918 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
919 pa_assert(!s->auto_timing_update_event);
920 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
922 request_auto_timing_update(s, TRUE);
925 check_smoother_status(s, TRUE, FALSE, FALSE);
928 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
934 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
937 if (pa_atou(e, &ms) < 0 || ms <= 0)
938 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
940 attr->maxlength = (uint32_t) -1;
941 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
942 attr->minreq = (uint32_t) -1;
943 attr->prebuf = (uint32_t) -1;
944 attr->fragsize = attr->tlength;
948 *flags |= PA_STREAM_ADJUST_LATENCY;
951 if (s->context->version >= 13)
954 /* Version older than 0.9.10 didn't do server side buffer_attr
955 * selection, hence we have to fake it on the client side. */
957 /* We choose fairly conservative values here, to not confuse
958 * old clients with extremely large playback buffers */
960 if (attr->maxlength == (uint32_t) -1)
961 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
963 if (attr->tlength == (uint32_t) -1)
964 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
966 if (attr->minreq == (uint32_t) -1)
967 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
969 if (attr->prebuf == (uint32_t) -1)
970 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
972 if (attr->fragsize == (uint32_t) -1)
973 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
976 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
977 pa_stream *s = userdata;
978 uint32_t requested_bytes = 0;
982 pa_assert(PA_REFCNT_VALUE(s) >= 1);
983 pa_assert(s->state == PA_STREAM_CREATING);
987 if (command != PA_COMMAND_REPLY) {
988 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
991 pa_stream_set_state(s, PA_STREAM_FAILED);
995 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
996 s->channel == PA_INVALID_INDEX ||
997 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
998 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
999 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1003 s->requested_bytes = (int64_t) requested_bytes;
1005 if (s->context->version >= 9) {
1006 if (s->direction == PA_STREAM_PLAYBACK) {
1007 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1008 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1009 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1010 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1011 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1014 } else if (s->direction == PA_STREAM_RECORD) {
1015 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1016 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1017 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1023 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1026 const char *dn = NULL;
1027 pa_bool_t suspended;
1029 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1030 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1031 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1032 pa_tagstruct_gets(t, &dn) < 0 ||
1033 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1034 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1038 if (!dn || s->device_index == PA_INVALID_INDEX ||
1039 ss.channels != cm.channels ||
1040 !pa_channel_map_valid(&cm) ||
1041 !pa_sample_spec_valid(&ss) ||
1042 (s->n_formats == 0 && (
1043 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1044 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1045 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1046 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1050 pa_xfree(s->device_name);
1051 s->device_name = pa_xstrdup(dn);
1052 s->suspended = suspended;
1054 s->channel_map = cm;
1055 s->sample_spec = ss;
1058 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1061 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1062 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1066 if (s->direction == PA_STREAM_RECORD)
1067 s->timing_info.configured_source_usec = usec;
1069 s->timing_info.configured_sink_usec = usec;
1072 if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1073 pa_format_info *f = pa_format_info_new();
1074 pa_tagstruct_get_format_info(t, f);
1076 if (pa_format_info_valid(f))
1079 pa_format_info_free(f);
1080 if (s->n_formats > 0) {
1081 /* We used the extended API, so we should have got back a proper format */
1082 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1088 if (!pa_tagstruct_eof(t)) {
1089 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1093 if (s->direction == PA_STREAM_RECORD) {
1094 pa_assert(!s->record_memblockq);
1096 s->record_memblockq = pa_memblockq_new(
1098 s->buffer_attr.maxlength,
1100 pa_frame_size(&s->sample_spec),
1107 s->channel_valid = TRUE;
1108 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1110 create_stream_complete(s);
1116 static int create_stream(
1117 pa_stream_direction_t direction,
1120 const pa_buffer_attr *attr,
1121 pa_stream_flags_t flags,
1122 const pa_cvolume *volume,
1123 pa_stream *sync_stream) {
1127 pa_bool_t volume_set = FALSE;
1131 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1132 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1134 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1135 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1136 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1137 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1138 PA_STREAM_INTERPOLATE_TIMING|
1139 PA_STREAM_NOT_MONOTONIC|
1140 PA_STREAM_AUTO_TIMING_UPDATE|
1141 PA_STREAM_NO_REMAP_CHANNELS|
1142 PA_STREAM_NO_REMIX_CHANNELS|
1143 PA_STREAM_FIX_FORMAT|
1145 PA_STREAM_FIX_CHANNELS|
1146 PA_STREAM_DONT_MOVE|
1147 PA_STREAM_VARIABLE_RATE|
1148 PA_STREAM_PEAK_DETECT|
1149 PA_STREAM_START_MUTED|
1150 PA_STREAM_ADJUST_LATENCY|
1151 PA_STREAM_EARLY_REQUESTS|
1152 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1153 PA_STREAM_START_UNMUTED|
1154 PA_STREAM_FAIL_ON_SUSPEND|
1155 PA_STREAM_RELATIVE_VOLUME|
1156 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1159 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1160 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1161 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1162 /* Althought some of the other flags are not supported on older
1163 * version, we don't check for them here, because it doesn't hurt
1164 * when they are passed but actually not supported. This makes
1165 * client development easier */
1167 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1168 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1169 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1170 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1171 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1175 s->direction = direction;
1178 s->syncid = sync_stream->syncid;
1181 s->buffer_attr = *attr;
1182 patch_buffer_attr(s, &s->buffer_attr, &flags);
1185 s->corked = !!(flags & PA_STREAM_START_CORKED);
1187 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1190 x = pa_rtclock_now();
1192 pa_assert(!s->smoother);
1193 s->smoother = pa_smoother_new(
1194 SMOOTHER_ADJUST_TIME,
1195 SMOOTHER_HISTORY_TIME,
1196 !(flags & PA_STREAM_NOT_MONOTONIC),
1198 SMOOTHER_MIN_HISTORY,
1204 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1206 t = pa_tagstruct_command(
1208 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1211 if (s->context->version < 13)
1212 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1216 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1217 PA_TAG_CHANNEL_MAP, &s->channel_map,
1218 PA_TAG_U32, PA_INVALID_INDEX,
1220 PA_TAG_U32, s->buffer_attr.maxlength,
1221 PA_TAG_BOOLEAN, s->corked,
1224 if (s->direction == PA_STREAM_PLAYBACK) {
1229 PA_TAG_U32, s->buffer_attr.tlength,
1230 PA_TAG_U32, s->buffer_attr.prebuf,
1231 PA_TAG_U32, s->buffer_attr.minreq,
1232 PA_TAG_U32, s->syncid,
1235 volume_set = !!volume;
1238 if (pa_sample_spec_valid(&s->sample_spec))
1239 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1241 /* This is not really relevant, since no volume was set, and
1242 * the real number of channels is embedded in the format_info
1244 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1248 pa_tagstruct_put_cvolume(t, volume);
1250 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1252 if (s->context->version >= 12) {
1255 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1256 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1257 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1258 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1259 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1260 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1261 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1265 if (s->context->version >= 13) {
1267 if (s->direction == PA_STREAM_PLAYBACK)
1268 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1270 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1274 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1275 PA_TAG_PROPLIST, s->proplist,
1278 if (s->direction == PA_STREAM_RECORD)
1279 pa_tagstruct_putu32(t, s->direct_on_input);
1282 if (s->context->version >= 14) {
1284 if (s->direction == PA_STREAM_PLAYBACK)
1285 pa_tagstruct_put_boolean(t, volume_set);
1287 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1290 if (s->context->version >= 15) {
1292 if (s->direction == PA_STREAM_PLAYBACK)
1293 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1295 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1296 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1299 if (s->context->version >= 17) {
1301 if (s->direction == PA_STREAM_PLAYBACK)
1302 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1306 if (s->context->version >= 18) {
1308 if (s->direction == PA_STREAM_PLAYBACK)
1309 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1312 if (s->context->version >= 21) {
1314 if (s->direction == PA_STREAM_PLAYBACK) {
1315 pa_tagstruct_putu8(t, s->n_formats);
1316 for (i = 0; i < s->n_formats; i++)
1317 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1321 pa_pstream_send_tagstruct(s->context->pstream, t);
1322 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1324 pa_stream_set_state(s, PA_STREAM_CREATING);
1330 int pa_stream_connect_playback(
1333 const pa_buffer_attr *attr,
1334 pa_stream_flags_t flags,
1335 const pa_cvolume *volume,
1336 pa_stream *sync_stream) {
1339 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1341 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1344 int pa_stream_connect_record(
1347 const pa_buffer_attr *attr,
1348 pa_stream_flags_t flags) {
1351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1353 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1356 int pa_stream_begin_write(
1362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1364 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1365 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1366 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1367 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1368 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1370 if (*nbytes != (size_t) -1) {
1373 m = pa_mempool_block_size_max(s->context->mempool);
1374 fs = pa_frame_size(&s->sample_spec);
1381 if (!s->write_memblock) {
1382 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1383 s->write_data = pa_memblock_acquire(s->write_memblock);
1386 *data = s->write_data;
1387 *nbytes = pa_memblock_get_length(s->write_memblock);
1392 int pa_stream_cancel_write(
1396 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1398 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1399 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1400 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1401 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1403 pa_assert(s->write_data);
1405 pa_memblock_release(s->write_memblock);
1406 pa_memblock_unref(s->write_memblock);
1407 s->write_memblock = NULL;
1408 s->write_data = NULL;
1413 int pa_stream_write(
1417 pa_free_cb_t free_cb,
1419 pa_seek_mode_t seek) {
1422 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1425 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1426 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1427 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1428 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1429 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1430 PA_CHECK_VALIDITY(s->context,
1431 !s->write_memblock ||
1432 ((data >= s->write_data) &&
1433 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1435 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1437 if (s->write_memblock) {
1440 /* pa_stream_write_begin() was called before */
1442 pa_memblock_release(s->write_memblock);
1444 chunk.memblock = s->write_memblock;
1445 chunk.index = (const char *) data - (const char *) s->write_data;
1446 chunk.length = length;
1448 s->write_memblock = NULL;
1449 s->write_data = NULL;
1451 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1452 pa_memblock_unref(chunk.memblock);
1455 pa_seek_mode_t t_seek = seek;
1456 int64_t t_offset = offset;
1457 size_t t_length = length;
1458 const void *t_data = data;
1460 /* pa_stream_write_begin() was not called before */
1462 while (t_length > 0) {
1467 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1468 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1469 chunk.length = t_length;
1473 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1474 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1476 d = pa_memblock_acquire(chunk.memblock);
1477 memcpy(d, t_data, chunk.length);
1478 pa_memblock_release(chunk.memblock);
1481 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1484 t_seek = PA_SEEK_RELATIVE;
1486 t_data = (const uint8_t*) t_data + chunk.length;
1487 t_length -= chunk.length;
1489 pa_memblock_unref(chunk.memblock);
1492 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1493 free_cb((void*) data);
1496 /* This is obviously wrong since we ignore the seeking index . But
1497 * that's OK, the server side applies the same error */
1498 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1500 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1502 if (s->direction == PA_STREAM_PLAYBACK) {
1504 /* Update latency request correction */
1505 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1507 if (seek == PA_SEEK_ABSOLUTE) {
1508 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1509 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1510 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1511 } else if (seek == PA_SEEK_RELATIVE) {
1512 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1513 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1515 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1518 /* Update the write index in the already available latency data */
1519 if (s->timing_info_valid) {
1521 if (seek == PA_SEEK_ABSOLUTE) {
1522 s->timing_info.write_index_corrupt = FALSE;
1523 s->timing_info.write_index = offset + (int64_t) length;
1524 } else if (seek == PA_SEEK_RELATIVE) {
1525 if (!s->timing_info.write_index_corrupt)
1526 s->timing_info.write_index += offset + (int64_t) length;
1528 s->timing_info.write_index_corrupt = TRUE;
1531 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1532 request_auto_timing_update(s, TRUE);
1538 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1540 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1544 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1545 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1546 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1548 if (!s->peek_memchunk.memblock) {
1550 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1556 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1559 pa_assert(s->peek_data);
1560 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1561 *length = s->peek_memchunk.length;
1565 int pa_stream_drop(pa_stream *s) {
1567 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1569 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1570 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1571 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1572 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1574 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1576 /* Fix the simulated local read index */
1577 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1578 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1580 pa_assert(s->peek_data);
1581 pa_memblock_release(s->peek_memchunk.memblock);
1582 pa_memblock_unref(s->peek_memchunk.memblock);
1583 pa_memchunk_reset(&s->peek_memchunk);
1588 size_t pa_stream_writable_size(pa_stream *s) {
1590 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1592 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1593 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1594 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1596 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1599 size_t pa_stream_readable_size(pa_stream *s) {
1601 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1603 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1604 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1605 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1607 return pa_memblockq_get_length(s->record_memblockq);
1610 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1616 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1618 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1619 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1620 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1622 /* Ask for a timing update before we cork/uncork to get the best
1623 * accuracy for the transport latency suitable for the
1624 * check_smoother_status() call in the started callback */
1625 request_auto_timing_update(s, TRUE);
1627 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1629 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1630 pa_tagstruct_putu32(t, s->channel);
1631 pa_pstream_send_tagstruct(s->context->pstream, t);
1632 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1634 /* This might cause the read index to conitnue again, hence
1635 * let's request a timing update */
1636 request_auto_timing_update(s, TRUE);
1641 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1645 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1646 pa_assert(s->state == PA_STREAM_READY);
1647 pa_assert(s->direction != PA_STREAM_UPLOAD);
1648 pa_assert(s->timing_info_valid);
1649 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1650 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1652 if (s->direction == PA_STREAM_PLAYBACK) {
1653 /* The last byte that was written into the output device
1654 * had this time value associated */
1655 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1657 if (!s->corked && !s->suspended) {
1659 if (!ignore_transport)
1660 /* Because the latency info took a little time to come
1661 * to us, we assume that the real output time is actually
1663 usec += s->timing_info.transport_usec;
1665 /* However, the output device usually maintains a buffer
1666 too, hence the real sample currently played is a little
1668 if (s->timing_info.sink_usec >= usec)
1671 usec -= s->timing_info.sink_usec;
1675 pa_assert(s->direction == PA_STREAM_RECORD);
1677 /* The last byte written into the server side queue had
1678 * this time value associated */
1679 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1681 if (!s->corked && !s->suspended) {
1683 if (!ignore_transport)
1684 /* Add transport latency */
1685 usec += s->timing_info.transport_usec;
1687 /* Add latency of data in device buffer */
1688 usec += s->timing_info.source_usec;
1690 /* If this is a monitor source, we need to correct the
1691 * time by the playback device buffer */
1692 if (s->timing_info.sink_usec >= usec)
1695 usec -= s->timing_info.sink_usec;
1702 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1703 pa_operation *o = userdata;
1704 struct timeval local, remote, now;
1706 pa_bool_t playing = FALSE;
1707 uint64_t underrun_for = 0, playing_for = 0;
1711 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1713 if (!o->context || !o->stream)
1716 i = &o->stream->timing_info;
1718 o->stream->timing_info_valid = FALSE;
1719 i->write_index_corrupt = TRUE;
1720 i->read_index_corrupt = TRUE;
1722 if (command != PA_COMMAND_REPLY) {
1723 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1728 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1729 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1730 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1731 pa_tagstruct_get_timeval(t, &local) < 0 ||
1732 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1733 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1734 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1736 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1740 if (o->context->version >= 13 &&
1741 o->stream->direction == PA_STREAM_PLAYBACK)
1742 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1743 pa_tagstruct_getu64(t, &playing_for) < 0) {
1745 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1750 if (!pa_tagstruct_eof(t)) {
1751 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1754 o->stream->timing_info_valid = TRUE;
1755 i->write_index_corrupt = FALSE;
1756 i->read_index_corrupt = FALSE;
1758 i->playing = (int) playing;
1759 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1761 pa_gettimeofday(&now);
1763 /* Calculcate timestamps */
1764 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1765 /* local and remote seem to have synchronized clocks */
1767 if (o->stream->direction == PA_STREAM_PLAYBACK)
1768 i->transport_usec = pa_timeval_diff(&remote, &local);
1770 i->transport_usec = pa_timeval_diff(&now, &remote);
1772 i->synchronized_clocks = TRUE;
1773 i->timestamp = remote;
1775 /* clocks are not synchronized, let's estimate latency then */
1776 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1777 i->synchronized_clocks = FALSE;
1778 i->timestamp = local;
1779 pa_timeval_add(&i->timestamp, i->transport_usec);
1782 /* Invalidate read and write indexes if necessary */
1783 if (tag < o->stream->read_index_not_before)
1784 i->read_index_corrupt = TRUE;
1786 if (tag < o->stream->write_index_not_before)
1787 i->write_index_corrupt = TRUE;
1789 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1790 /* Write index correction */
1793 uint32_t ctag = tag;
1795 /* Go through the saved correction values and add up the
1796 * total correction.*/
1797 for (n = 0, j = o->stream->current_write_index_correction+1;
1798 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1799 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1801 /* Step over invalid data or out-of-date data */
1802 if (!o->stream->write_index_corrections[j].valid ||
1803 o->stream->write_index_corrections[j].tag < ctag)
1806 /* Make sure that everything is in order */
1807 ctag = o->stream->write_index_corrections[j].tag+1;
1809 /* Now fix the write index */
1810 if (o->stream->write_index_corrections[j].corrupt) {
1811 /* A corrupting seek was made */
1812 i->write_index_corrupt = TRUE;
1813 } else if (o->stream->write_index_corrections[j].absolute) {
1814 /* An absolute seek was made */
1815 i->write_index = o->stream->write_index_corrections[j].value;
1816 i->write_index_corrupt = FALSE;
1817 } else if (!i->write_index_corrupt) {
1818 /* A relative seek was made */
1819 i->write_index += o->stream->write_index_corrections[j].value;
1823 /* Clear old correction entries */
1824 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1825 if (!o->stream->write_index_corrections[n].valid)
1828 if (o->stream->write_index_corrections[n].tag <= tag)
1829 o->stream->write_index_corrections[n].valid = FALSE;
1833 if (o->stream->direction == PA_STREAM_RECORD) {
1834 /* Read index correction */
1836 if (!i->read_index_corrupt)
1837 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1840 /* Update smoother if we're not corked */
1841 if (o->stream->smoother && !o->stream->corked) {
1844 u = x = pa_rtclock_now() - i->transport_usec;
1846 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1849 /* If we weren't playing then it will take some time
1850 * until the audio will actually come out through the
1851 * speakers. Since we follow that timing here, we need
1852 * to try to fix this up */
1854 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1856 if (su < i->sink_usec)
1857 x += i->sink_usec - su;
1861 pa_smoother_pause(o->stream->smoother, x);
1863 /* Update the smoother */
1864 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1865 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1866 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1869 pa_smoother_resume(o->stream->smoother, x, TRUE);
1873 o->stream->auto_timing_update_requested = FALSE;
1875 if (o->stream->latency_update_callback)
1876 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1878 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1879 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1880 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1885 pa_operation_done(o);
1886 pa_operation_unref(o);
1889 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1897 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1899 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1900 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1901 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1903 if (s->direction == PA_STREAM_PLAYBACK) {
1904 /* Find a place to store the write_index correction data for this entry */
1905 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1907 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1908 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1910 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1912 t = pa_tagstruct_command(
1914 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1916 pa_tagstruct_putu32(t, s->channel);
1917 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1919 pa_pstream_send_tagstruct(s->context->pstream, t);
1920 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1922 if (s->direction == PA_STREAM_PLAYBACK) {
1923 /* Fill in initial correction data */
1925 s->current_write_index_correction = cidx;
1927 s->write_index_corrections[cidx].valid = TRUE;
1928 s->write_index_corrections[cidx].absolute = FALSE;
1929 s->write_index_corrections[cidx].corrupt = FALSE;
1930 s->write_index_corrections[cidx].tag = tag;
1931 s->write_index_corrections[cidx].value = 0;
1937 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1938 pa_stream *s = userdata;
1942 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1946 if (command != PA_COMMAND_REPLY) {
1947 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1950 pa_stream_set_state(s, PA_STREAM_FAILED);
1952 } else if (!pa_tagstruct_eof(t)) {
1953 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1957 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1963 int pa_stream_disconnect(pa_stream *s) {
1968 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1970 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1971 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1972 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1976 t = pa_tagstruct_command(
1978 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1979 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1981 pa_tagstruct_putu32(t, s->channel);
1982 pa_pstream_send_tagstruct(s->context->pstream, t);
1983 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1989 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1991 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1993 if (pa_detect_fork())
1996 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1999 s->read_callback = cb;
2000 s->read_userdata = userdata;
2003 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2007 if (pa_detect_fork())
2010 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2013 s->write_callback = cb;
2014 s->write_userdata = userdata;
2017 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2019 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2021 if (pa_detect_fork())
2024 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2027 s->state_callback = cb;
2028 s->state_userdata = userdata;
2031 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2033 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2035 if (pa_detect_fork())
2038 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2041 s->overflow_callback = cb;
2042 s->overflow_userdata = userdata;
2045 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2047 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049 if (pa_detect_fork())
2052 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2055 s->underflow_callback = cb;
2056 s->underflow_userdata = userdata;
2059 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2061 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2063 if (pa_detect_fork())
2066 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2069 s->latency_update_callback = cb;
2070 s->latency_update_userdata = userdata;
2073 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2075 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2077 if (pa_detect_fork())
2080 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2083 s->moved_callback = cb;
2084 s->moved_userdata = userdata;
2087 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2089 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2091 if (pa_detect_fork())
2094 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2097 s->suspended_callback = cb;
2098 s->suspended_userdata = userdata;
2101 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2103 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2105 if (pa_detect_fork())
2108 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2111 s->started_callback = cb;
2112 s->started_userdata = userdata;
2115 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2117 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2119 if (pa_detect_fork())
2122 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2125 s->event_callback = cb;
2126 s->event_userdata = userdata;
2129 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2131 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2133 if (pa_detect_fork())
2136 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2139 s->buffer_attr_callback = cb;
2140 s->buffer_attr_userdata = userdata;
2143 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2144 pa_operation *o = userdata;
2149 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2154 if (command != PA_COMMAND_REPLY) {
2155 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2159 } else if (!pa_tagstruct_eof(t)) {
2160 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2165 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2166 cb(o->stream, success, o->userdata);
2170 pa_operation_done(o);
2171 pa_operation_unref(o);
2174 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2182 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2183 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2184 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2186 /* Ask for a timing update before we cork/uncork to get the best
2187 * accuracy for the transport latency suitable for the
2188 * check_smoother_status() call in the started callback */
2189 request_auto_timing_update(s, TRUE);
2193 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2195 t = pa_tagstruct_command(
2197 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2199 pa_tagstruct_putu32(t, s->channel);
2200 pa_tagstruct_put_boolean(t, !!b);
2201 pa_pstream_send_tagstruct(s->context->pstream, t);
2202 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2204 check_smoother_status(s, FALSE, FALSE, FALSE);
2206 /* This might cause the indexes to hang/start again, hence let's
2207 * request a timing update, after the cork/uncork, too */
2208 request_auto_timing_update(s, TRUE);
2213 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2221 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2222 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2224 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2226 t = pa_tagstruct_command(s->context, command, &tag);
2227 pa_tagstruct_putu32(t, s->channel);
2228 pa_pstream_send_tagstruct(s->context->pstream, t);
2229 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2234 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2238 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2240 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2241 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2242 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2244 /* Ask for a timing update *before* the flush, so that the
2245 * transport usec is as up to date as possible when we get the
2246 * underflow message and update the smoother status*/
2247 request_auto_timing_update(s, TRUE);
2249 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2252 if (s->direction == PA_STREAM_PLAYBACK) {
2254 if (s->write_index_corrections[s->current_write_index_correction].valid)
2255 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2257 if (s->buffer_attr.prebuf > 0)
2258 check_smoother_status(s, FALSE, FALSE, TRUE);
2260 /* This will change the write index, but leave the
2261 * read index untouched. */
2262 invalidate_indexes(s, FALSE, TRUE);
2265 /* For record streams this has no influence on the write
2266 * index, but the read index might jump. */
2267 invalidate_indexes(s, TRUE, FALSE);
2269 /* Note that we do not update requested_bytes here. This is
2270 * because we cannot really know how data actually was dropped
2271 * from the write index due to this. This 'error' will be applied
2272 * by both client and server and hence we should be fine. */
2277 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2283 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2284 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2285 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2286 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2288 /* Ask for a timing update before we cork/uncork to get the best
2289 * accuracy for the transport latency suitable for the
2290 * check_smoother_status() call in the started callback */
2291 request_auto_timing_update(s, TRUE);
2293 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2296 /* This might cause the read index to hang again, hence
2297 * let's request a timing update */
2298 request_auto_timing_update(s, TRUE);
2303 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2307 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2309 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2310 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2311 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2312 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2314 /* Ask for a timing update before we cork/uncork to get the best
2315 * accuracy for the transport latency suitable for the
2316 * check_smoother_status() call in the started callback */
2317 request_auto_timing_update(s, TRUE);
2319 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2322 /* This might cause the read index to start moving again, hence
2323 * let's request a timing update */
2324 request_auto_timing_update(s, TRUE);
2329 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2333 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2336 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2337 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2340 if (s->context->version >= 13) {
2341 pa_proplist *p = pa_proplist_new();
2343 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2344 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2345 pa_proplist_free(p);
2350 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2351 t = pa_tagstruct_command(
2353 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2355 pa_tagstruct_putu32(t, s->channel);
2356 pa_tagstruct_puts(t, name);
2357 pa_pstream_send_tagstruct(s->context->pstream, t);
2358 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2364 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2368 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2370 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2371 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2372 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2373 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2374 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2375 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2378 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2380 usec = calc_time(s, FALSE);
2382 /* Make sure the time runs monotonically */
2383 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2384 if (usec < s->previous_time)
2385 usec = s->previous_time;
2387 s->previous_time = usec;
2396 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2398 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2406 if (negative && s->direction == PA_STREAM_RECORD) {
2414 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2420 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2423 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2425 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2426 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2427 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2428 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2430 if ((r = pa_stream_get_time(s, &t)) < 0)
2433 if (s->direction == PA_STREAM_PLAYBACK)
2434 cindex = s->timing_info.write_index;
2436 cindex = s->timing_info.read_index;
2441 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2443 if (s->direction == PA_STREAM_PLAYBACK)
2444 *r_usec = time_counter_diff(s, c, t, negative);
2446 *r_usec = time_counter_diff(s, t, c, negative);
2451 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2453 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2455 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2456 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2457 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2458 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2460 return &s->timing_info;
2463 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2465 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469 return &s->sample_spec;
2472 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2474 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2478 return &s->channel_map;
2481 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2483 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2485 /* We don't have the format till routing is done */
2486 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2487 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2491 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2493 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2495 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2496 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2499 return &s->buffer_attr;
2502 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2503 pa_operation *o = userdata;
2508 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2513 if (command != PA_COMMAND_REPLY) {
2514 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2519 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2520 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2521 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2522 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2523 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2524 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2527 } else if (o->stream->direction == PA_STREAM_RECORD) {
2528 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2529 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2530 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2535 if (o->stream->context->version >= 13) {
2538 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2539 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2543 if (o->stream->direction == PA_STREAM_RECORD)
2544 o->stream->timing_info.configured_source_usec = usec;
2546 o->stream->timing_info.configured_sink_usec = usec;
2549 if (!pa_tagstruct_eof(t)) {
2550 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2556 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2557 cb(o->stream, success, o->userdata);
2561 pa_operation_done(o);
2562 pa_operation_unref(o);
2566 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2570 pa_buffer_attr copy;
2573 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2576 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2577 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2578 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2579 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2581 /* Ask for a timing update before we cork/uncork to get the best
2582 * accuracy for the transport latency suitable for the
2583 * check_smoother_status() call in the started callback */
2584 request_auto_timing_update(s, TRUE);
2586 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2588 t = pa_tagstruct_command(
2590 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2592 pa_tagstruct_putu32(t, s->channel);
2595 patch_buffer_attr(s, ©, NULL);
2598 pa_tagstruct_putu32(t, attr->maxlength);
2600 if (s->direction == PA_STREAM_PLAYBACK)
2603 PA_TAG_U32, attr->tlength,
2604 PA_TAG_U32, attr->prebuf,
2605 PA_TAG_U32, attr->minreq,
2608 pa_tagstruct_putu32(t, attr->fragsize);
2610 if (s->context->version >= 13)
2611 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2613 if (s->context->version >= 14)
2614 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2616 pa_pstream_send_tagstruct(s->context->pstream, t);
2617 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2619 /* This might cause changes in the read/write indexex, hence let's
2620 * request a timing update */
2621 request_auto_timing_update(s, TRUE);
2626 uint32_t pa_stream_get_device_index(pa_stream *s) {
2628 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2630 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2631 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2632 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2633 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2634 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2636 return s->device_index;
2639 const char *pa_stream_get_device_name(pa_stream *s) {
2641 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2643 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2644 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2645 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2646 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2647 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2649 return s->device_name;
2652 int pa_stream_is_suspended(pa_stream *s) {
2654 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2656 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2657 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2658 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2659 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2661 return s->suspended;
2664 int pa_stream_is_corked(pa_stream *s) {
2666 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2668 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2669 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2670 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2675 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2676 pa_operation *o = userdata;
2681 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2686 if (command != PA_COMMAND_REPLY) {
2687 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2693 if (!pa_tagstruct_eof(t)) {
2694 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2699 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2700 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2703 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2704 cb(o->stream, success, o->userdata);
2708 pa_operation_done(o);
2709 pa_operation_unref(o);
2713 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2719 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2721 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2722 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2723 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2724 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2725 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2726 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2728 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2729 o->private = PA_UINT_TO_PTR(rate);
2731 t = pa_tagstruct_command(
2733 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2735 pa_tagstruct_putu32(t, s->channel);
2736 pa_tagstruct_putu32(t, rate);
2738 pa_pstream_send_tagstruct(s->context->pstream, t);
2739 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2744 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2750 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2752 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2753 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2754 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2755 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2756 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2758 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2760 t = pa_tagstruct_command(
2762 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2764 pa_tagstruct_putu32(t, s->channel);
2765 pa_tagstruct_putu32(t, (uint32_t) mode);
2766 pa_tagstruct_put_proplist(t, p);
2768 pa_pstream_send_tagstruct(s->context->pstream, t);
2769 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2771 /* Please note that we don't update s->proplist here, because we
2772 * don't export that field */
2777 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2781 const char * const*k;
2784 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2786 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2787 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2788 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2789 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2790 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2792 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2794 t = pa_tagstruct_command(
2796 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2798 pa_tagstruct_putu32(t, s->channel);
2800 for (k = keys; *k; k++)
2801 pa_tagstruct_puts(t, *k);
2803 pa_tagstruct_puts(t, NULL);
2805 pa_pstream_send_tagstruct(s->context->pstream, t);
2806 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2808 /* Please note that we don't update s->proplist here, because we
2809 * don't export that field */
2814 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2816 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2818 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2819 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2820 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2821 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2823 s->direct_on_input = sink_input_idx;
2828 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2830 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2832 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2833 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2834 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2836 return s->direct_on_input;