2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 static pa_stream *pa_stream_new_with_proplist_internal(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 s = pa_xnew(pa_stream, 1);
106 s->mainloop = c->mainloop;
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
113 s->sample_spec = *ss;
115 s->sample_spec.format = PA_SAMPLE_INVALID;
118 s->channel_map = *map;
120 pa_channel_map_init(&s->channel_map);
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
129 /* We'll get the final negotiated format after connecting */
132 s->direct_on_input = PA_INVALID_INDEX;
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146 /* We initialize der target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
150 s->buffer_attr.maxlength = (uint32_t) -1;
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
172 s->write_memblock = NULL;
173 s->write_data = NULL;
175 pa_memchunk_reset(&s->peek_memchunk);
177 s->record_memblockq = NULL;
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
182 s->previous_time = 0;
184 s->read_index_not_before = 0;
185 s->write_index_not_before = 0;
186 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
187 s->write_index_corrections[i].valid = 0;
188 s->current_write_index_correction = 0;
190 s->auto_timing_update_event = NULL;
191 s->auto_timing_update_requested = FALSE;
192 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
198 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
199 PA_LLIST_PREPEND(pa_stream, c->streams, s);
205 pa_stream *pa_stream_new_with_proplist(
208 const pa_sample_spec *ss,
209 const pa_channel_map *map,
214 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
215 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
226 pa_stream *pa_stream_new_extended(
229 pa_format_info * const *formats,
230 unsigned int n_formats,
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
238 static void stream_unlink(pa_stream *s) {
245 /* Detach from context */
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
252 pa_operation_cancel(o);
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel_valid = FALSE;
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
278 static void stream_free(pa_stream *s) {
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
290 if (s->peek_memchunk.memblock) {
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
300 pa_proplist_free(s->proplist);
303 pa_smoother_free(s->smoother);
305 for (i = 0; i < s->n_formats; i++)
306 pa_format_info_free(s->req_formats[i]);
309 pa_format_info_free(s->format);
311 pa_xfree(s->device_name);
315 void pa_stream_unref(pa_stream *s) {
317 pa_assert(PA_REFCNT_VALUE(s) >= 1);
319 if (PA_REFCNT_DEC(s) <= 0)
323 pa_stream* pa_stream_ref(pa_stream *s) {
325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
331 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
333 pa_assert(PA_REFCNT_VALUE(s) >= 1);
338 pa_context* pa_stream_get_context(pa_stream *s) {
340 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345 uint32_t pa_stream_get_index(pa_stream *s) {
347 pa_assert(PA_REFCNT_VALUE(s) >= 1);
349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
352 return s->stream_index;
355 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
366 if (s->state_callback)
367 s->state_callback(s, s->state_userdata);
369 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
375 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
379 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
382 if (s->state == PA_STREAM_READY &&
383 (force || !s->auto_timing_update_requested)) {
386 /* pa_log("Automatically requesting new timing data"); */
388 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
389 pa_operation_unref(o);
390 s->auto_timing_update_requested = TRUE;
394 if (s->auto_timing_update_event) {
395 if (s->suspended && !force) {
396 pa_assert(s->mainloop);
397 s->mainloop->time_free(s->auto_timing_update_event);
398 s->auto_timing_update_event = NULL;
401 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
403 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
405 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
410 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
411 pa_context *c = userdata;
416 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
419 pa_assert(PA_REFCNT_VALUE(c) >= 1);
423 if (pa_tagstruct_getu32(t, &channel) < 0 ||
424 !pa_tagstruct_eof(t)) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
429 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
432 if (s->state != PA_STREAM_READY)
435 pa_context_set_error(c, PA_ERR_KILLED);
436 pa_stream_set_state(s, PA_STREAM_FAILED);
442 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
446 pa_assert(!force_start || !force_stop);
451 x = pa_rtclock_now();
453 if (s->timing_info_valid) {
455 x -= s->timing_info.transport_usec;
457 x += s->timing_info.transport_usec;
460 if (s->suspended || s->corked || force_stop)
461 pa_smoother_pause(s->smoother, x);
462 else if (force_start || s->buffer_attr.prebuf == 0) {
464 if (!s->timing_info_valid &&
468 s->context->version >= 13) {
470 /* If the server supports STARTED events we take them as
471 * indications when audio really starts/stops playing, if
472 * we don't have any timing info yet -- instead of trying
473 * to be smart and guessing the server time. Otherwise the
474 * unknown transport delay add too much noise to our time
480 pa_smoother_resume(s->smoother, x, TRUE);
483 /* Please note that we have no idea if playback actually started
484 * if prebuf is non-zero! */
487 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
489 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
497 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
500 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
503 pa_assert(PA_REFCNT_VALUE(c) >= 1);
507 if (c->version < 12) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
512 if (pa_tagstruct_getu32(t, &channel) < 0 ||
513 pa_tagstruct_getu32(t, &di) < 0 ||
514 pa_tagstruct_gets(t, &dn) < 0 ||
515 pa_tagstruct_get_boolean(t, &suspended) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
520 if (c->version >= 13) {
522 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
523 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
524 pa_tagstruct_getu32(t, &fragsize) < 0 ||
525 pa_tagstruct_get_usec(t, &usec) < 0) {
526 pa_context_fail(c, PA_ERR_PROTOCOL);
530 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
531 pa_tagstruct_getu32(t, &tlength) < 0 ||
532 pa_tagstruct_getu32(t, &prebuf) < 0 ||
533 pa_tagstruct_getu32(t, &minreq) < 0 ||
534 pa_tagstruct_get_usec(t, &usec) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
541 if (!pa_tagstruct_eof(t)) {
542 pa_context_fail(c, PA_ERR_PROTOCOL);
546 if (!dn || di == PA_INVALID_INDEX) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
551 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
554 if (s->state != PA_STREAM_READY)
557 if (c->version >= 13) {
558 if (s->direction == PA_STREAM_RECORD)
559 s->timing_info.configured_source_usec = usec;
561 s->timing_info.configured_sink_usec = usec;
563 s->buffer_attr.maxlength = maxlength;
564 s->buffer_attr.fragsize = fragsize;
565 s->buffer_attr.tlength = tlength;
566 s->buffer_attr.prebuf = prebuf;
567 s->buffer_attr.minreq = minreq;
570 pa_xfree(s->device_name);
571 s->device_name = pa_xstrdup(dn);
572 s->device_index = di;
574 s->suspended = suspended;
576 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
577 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
578 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
579 request_auto_timing_update(s, TRUE);
582 check_smoother_status(s, TRUE, FALSE, FALSE);
583 request_auto_timing_update(s, TRUE);
585 if (s->moved_callback)
586 s->moved_callback(s, s->moved_userdata);
592 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
593 pa_context *c = userdata;
597 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
600 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
603 pa_assert(PA_REFCNT_VALUE(c) >= 1);
607 if (c->version < 15) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
612 if (pa_tagstruct_getu32(t, &channel) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
617 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
618 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
619 pa_tagstruct_getu32(t, &fragsize) < 0 ||
620 pa_tagstruct_get_usec(t, &usec) < 0) {
621 pa_context_fail(c, PA_ERR_PROTOCOL);
625 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
626 pa_tagstruct_getu32(t, &tlength) < 0 ||
627 pa_tagstruct_getu32(t, &prebuf) < 0 ||
628 pa_tagstruct_getu32(t, &minreq) < 0 ||
629 pa_tagstruct_get_usec(t, &usec) < 0) {
630 pa_context_fail(c, PA_ERR_PROTOCOL);
635 if (!pa_tagstruct_eof(t)) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
640 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
643 if (s->state != PA_STREAM_READY)
646 if (s->direction == PA_STREAM_RECORD)
647 s->timing_info.configured_source_usec = usec;
649 s->timing_info.configured_sink_usec = usec;
651 s->buffer_attr.maxlength = maxlength;
652 s->buffer_attr.fragsize = fragsize;
653 s->buffer_attr.tlength = tlength;
654 s->buffer_attr.prebuf = prebuf;
655 s->buffer_attr.minreq = minreq;
657 request_auto_timing_update(s, TRUE);
659 if (s->buffer_attr_callback)
660 s->buffer_attr_callback(s, s->buffer_attr_userdata);
666 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
667 pa_context *c = userdata;
673 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
676 pa_assert(PA_REFCNT_VALUE(c) >= 1);
680 if (c->version < 12) {
681 pa_context_fail(c, PA_ERR_PROTOCOL);
685 if (pa_tagstruct_getu32(t, &channel) < 0 ||
686 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
687 !pa_tagstruct_eof(t)) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
692 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
695 if (s->state != PA_STREAM_READY)
698 s->suspended = suspended;
700 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
701 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
702 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
703 request_auto_timing_update(s, TRUE);
706 check_smoother_status(s, TRUE, FALSE, FALSE);
707 request_auto_timing_update(s, TRUE);
709 if (s->suspended_callback)
710 s->suspended_callback(s, s->suspended_userdata);
716 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 pa_context *c = userdata;
722 pa_assert(command == PA_COMMAND_STARTED);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
729 if (c->version < 13) {
730 pa_context_fail(c, PA_ERR_PROTOCOL);
734 if (pa_tagstruct_getu32(t, &channel) < 0 ||
735 !pa_tagstruct_eof(t)) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
740 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
743 if (s->state != PA_STREAM_READY)
746 check_smoother_status(s, TRUE, TRUE, FALSE);
747 request_auto_timing_update(s, TRUE);
749 if (s->started_callback)
750 s->started_callback(s, s->started_userdata);
756 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
760 pa_proplist *pl = NULL;
764 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
767 pa_assert(PA_REFCNT_VALUE(c) >= 1);
771 if (c->version < 15) {
772 pa_context_fail(c, PA_ERR_PROTOCOL);
776 pl = pa_proplist_new();
778 if (pa_tagstruct_getu32(t, &channel) < 0 ||
779 pa_tagstruct_gets(t, &event) < 0 ||
780 pa_tagstruct_get_proplist(t, pl) < 0 ||
781 !pa_tagstruct_eof(t) || !event) {
782 pa_context_fail(c, PA_ERR_PROTOCOL);
786 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
789 if (s->state != PA_STREAM_READY)
792 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
793 /* Let client know what the running time was when the stream had to be
796 if (pa_stream_get_time(s, &time) == 0)
797 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
800 if (s->event_callback)
801 s->event_callback(s, event, pl, s->event_userdata);
807 pa_proplist_free(pl);
810 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
812 pa_context *c = userdata;
813 uint32_t bytes, channel;
816 pa_assert(command == PA_COMMAND_REQUEST);
819 pa_assert(PA_REFCNT_VALUE(c) >= 1);
823 if (pa_tagstruct_getu32(t, &channel) < 0 ||
824 pa_tagstruct_getu32(t, &bytes) < 0 ||
825 !pa_tagstruct_eof(t)) {
826 pa_context_fail(c, PA_ERR_PROTOCOL);
830 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
833 if (s->state != PA_STREAM_READY)
836 s->requested_bytes += bytes;
838 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
840 if (s->requested_bytes > 0 && s->write_callback)
841 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
847 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
849 pa_context *c = userdata;
853 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
856 pa_assert(PA_REFCNT_VALUE(c) >= 1);
860 if (pa_tagstruct_getu32(t, &channel) < 0 ||
861 !pa_tagstruct_eof(t)) {
862 pa_context_fail(c, PA_ERR_PROTOCOL);
866 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
869 if (s->state != PA_STREAM_READY)
872 if (s->buffer_attr.prebuf > 0)
873 check_smoother_status(s, TRUE, FALSE, TRUE);
875 request_auto_timing_update(s, TRUE);
877 if (command == PA_COMMAND_OVERFLOW) {
878 if (s->overflow_callback)
879 s->overflow_callback(s, s->overflow_userdata);
880 } else if (command == PA_COMMAND_UNDERFLOW) {
881 if (s->underflow_callback)
882 s->underflow_callback(s, s->underflow_userdata);
889 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
891 pa_assert(PA_REFCNT_VALUE(s) >= 1);
893 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
895 if (s->state != PA_STREAM_READY)
899 s->write_index_not_before = s->context->ctag;
901 if (s->timing_info_valid)
902 s->timing_info.write_index_corrupt = TRUE;
904 /* pa_log("write_index invalidated"); */
908 s->read_index_not_before = s->context->ctag;
910 if (s->timing_info_valid)
911 s->timing_info.read_index_corrupt = TRUE;
913 /* pa_log("read_index invalidated"); */
916 request_auto_timing_update(s, TRUE);
919 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
920 pa_stream *s = userdata;
923 pa_assert(PA_REFCNT_VALUE(s) >= 1);
926 request_auto_timing_update(s, FALSE);
930 static void create_stream_complete(pa_stream *s) {
932 pa_assert(PA_REFCNT_VALUE(s) >= 1);
933 pa_assert(s->state == PA_STREAM_CREATING);
935 pa_stream_set_state(s, PA_STREAM_READY);
937 if (s->requested_bytes > 0 && s->write_callback)
938 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
940 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
941 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
942 pa_assert(!s->auto_timing_update_event);
943 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
945 request_auto_timing_update(s, TRUE);
948 check_smoother_status(s, TRUE, FALSE, FALSE);
951 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
957 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
960 if (pa_atou(e, &ms) < 0 || ms <= 0)
961 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
963 attr->maxlength = (uint32_t) -1;
964 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
965 attr->minreq = (uint32_t) -1;
966 attr->prebuf = (uint32_t) -1;
967 attr->fragsize = attr->tlength;
971 *flags |= PA_STREAM_ADJUST_LATENCY;
974 if (s->context->version >= 13)
977 /* Version older than 0.9.10 didn't do server side buffer_attr
978 * selection, hence we have to fake it on the client side. */
980 /* We choose fairly conservative values here, to not confuse
981 * old clients with extremely large playback buffers */
983 if (attr->maxlength == (uint32_t) -1)
984 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
986 if (attr->tlength == (uint32_t) -1)
987 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
989 if (attr->minreq == (uint32_t) -1)
990 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
992 if (attr->prebuf == (uint32_t) -1)
993 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
995 if (attr->fragsize == (uint32_t) -1)
996 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
999 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1000 pa_stream *s = userdata;
1001 uint32_t requested_bytes = 0;
1005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006 pa_assert(s->state == PA_STREAM_CREATING);
1010 if (command != PA_COMMAND_REPLY) {
1011 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1014 pa_stream_set_state(s, PA_STREAM_FAILED);
1018 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1019 s->channel == PA_INVALID_INDEX ||
1020 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1021 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1022 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1026 s->requested_bytes = (int64_t) requested_bytes;
1028 if (s->context->version >= 9) {
1029 if (s->direction == PA_STREAM_PLAYBACK) {
1030 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1031 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1032 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1033 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1034 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1037 } else if (s->direction == PA_STREAM_RECORD) {
1038 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1039 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1040 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1046 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1049 const char *dn = NULL;
1050 pa_bool_t suspended;
1052 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1053 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1054 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1055 pa_tagstruct_gets(t, &dn) < 0 ||
1056 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1057 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1061 if (!dn || s->device_index == PA_INVALID_INDEX ||
1062 ss.channels != cm.channels ||
1063 !pa_channel_map_valid(&cm) ||
1064 !pa_sample_spec_valid(&ss) ||
1065 (s->n_formats == 0 && (
1066 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1067 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1068 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1069 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1073 pa_xfree(s->device_name);
1074 s->device_name = pa_xstrdup(dn);
1075 s->suspended = suspended;
1077 s->channel_map = cm;
1078 s->sample_spec = ss;
1081 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1084 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1085 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1089 if (s->direction == PA_STREAM_RECORD)
1090 s->timing_info.configured_source_usec = usec;
1092 s->timing_info.configured_sink_usec = usec;
1095 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1096 || s->context->version >= 22) {
1098 pa_format_info *f = pa_format_info_new();
1099 pa_tagstruct_get_format_info(t, f);
1101 if (pa_format_info_valid(f))
1104 pa_format_info_free(f);
1105 if (s->n_formats > 0) {
1106 /* We used the extended API, so we should have got back a proper format */
1107 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1113 if (!pa_tagstruct_eof(t)) {
1114 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1118 if (s->direction == PA_STREAM_RECORD) {
1119 pa_assert(!s->record_memblockq);
1121 s->record_memblockq = pa_memblockq_new(
1123 s->buffer_attr.maxlength,
1125 pa_frame_size(&s->sample_spec),
1132 s->channel_valid = TRUE;
1133 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1135 create_stream_complete(s);
1141 static int create_stream(
1142 pa_stream_direction_t direction,
1145 const pa_buffer_attr *attr,
1146 pa_stream_flags_t flags,
1147 const pa_cvolume *volume,
1148 pa_stream *sync_stream) {
1152 pa_bool_t volume_set = FALSE;
1156 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1157 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1159 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1160 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1161 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1162 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1163 PA_STREAM_INTERPOLATE_TIMING|
1164 PA_STREAM_NOT_MONOTONIC|
1165 PA_STREAM_AUTO_TIMING_UPDATE|
1166 PA_STREAM_NO_REMAP_CHANNELS|
1167 PA_STREAM_NO_REMIX_CHANNELS|
1168 PA_STREAM_FIX_FORMAT|
1170 PA_STREAM_FIX_CHANNELS|
1171 PA_STREAM_DONT_MOVE|
1172 PA_STREAM_VARIABLE_RATE|
1173 PA_STREAM_PEAK_DETECT|
1174 PA_STREAM_START_MUTED|
1175 PA_STREAM_ADJUST_LATENCY|
1176 PA_STREAM_EARLY_REQUESTS|
1177 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1178 PA_STREAM_START_UNMUTED|
1179 PA_STREAM_FAIL_ON_SUSPEND|
1180 PA_STREAM_RELATIVE_VOLUME|
1181 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1184 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1185 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1186 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1187 /* Althought some of the other flags are not supported on older
1188 * version, we don't check for them here, because it doesn't hurt
1189 * when they are passed but actually not supported. This makes
1190 * client development easier */
1192 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1193 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1194 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1195 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1196 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1200 s->direction = direction;
1203 s->syncid = sync_stream->syncid;
1206 s->buffer_attr = *attr;
1207 patch_buffer_attr(s, &s->buffer_attr, &flags);
1210 s->corked = !!(flags & PA_STREAM_START_CORKED);
1212 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1215 x = pa_rtclock_now();
1217 pa_assert(!s->smoother);
1218 s->smoother = pa_smoother_new(
1219 SMOOTHER_ADJUST_TIME,
1220 SMOOTHER_HISTORY_TIME,
1221 !(flags & PA_STREAM_NOT_MONOTONIC),
1223 SMOOTHER_MIN_HISTORY,
1229 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1231 t = pa_tagstruct_command(
1233 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1236 if (s->context->version < 13)
1237 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1241 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1242 PA_TAG_CHANNEL_MAP, &s->channel_map,
1243 PA_TAG_U32, PA_INVALID_INDEX,
1245 PA_TAG_U32, s->buffer_attr.maxlength,
1246 PA_TAG_BOOLEAN, s->corked,
1249 if (s->direction == PA_STREAM_PLAYBACK) {
1254 PA_TAG_U32, s->buffer_attr.tlength,
1255 PA_TAG_U32, s->buffer_attr.prebuf,
1256 PA_TAG_U32, s->buffer_attr.minreq,
1257 PA_TAG_U32, s->syncid,
1260 volume_set = !!volume;
1263 if (pa_sample_spec_valid(&s->sample_spec))
1264 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1266 /* This is not really relevant, since no volume was set, and
1267 * the real number of channels is embedded in the format_info
1269 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1273 pa_tagstruct_put_cvolume(t, volume);
1275 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1277 if (s->context->version >= 12) {
1280 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1281 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1282 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1283 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1284 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1285 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1286 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1290 if (s->context->version >= 13) {
1292 if (s->direction == PA_STREAM_PLAYBACK)
1293 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1295 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1299 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1300 PA_TAG_PROPLIST, s->proplist,
1303 if (s->direction == PA_STREAM_RECORD)
1304 pa_tagstruct_putu32(t, s->direct_on_input);
1307 if (s->context->version >= 14) {
1309 if (s->direction == PA_STREAM_PLAYBACK)
1310 pa_tagstruct_put_boolean(t, volume_set);
1312 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1315 if (s->context->version >= 15) {
1317 if (s->direction == PA_STREAM_PLAYBACK)
1318 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1320 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1321 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1324 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1325 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1327 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1328 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1330 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1331 || s->context->version >= 22) {
1333 pa_tagstruct_putu8(t, s->n_formats);
1334 for (i = 0; i < s->n_formats; i++)
1335 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1338 pa_pstream_send_tagstruct(s->context->pstream, t);
1339 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1341 pa_stream_set_state(s, PA_STREAM_CREATING);
1347 int pa_stream_connect_playback(
1350 const pa_buffer_attr *attr,
1351 pa_stream_flags_t flags,
1352 const pa_cvolume *volume,
1353 pa_stream *sync_stream) {
1356 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1358 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1361 int pa_stream_connect_record(
1364 const pa_buffer_attr *attr,
1365 pa_stream_flags_t flags) {
1368 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1370 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1373 int pa_stream_begin_write(
1379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1381 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1382 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1383 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1384 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1385 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1387 if (*nbytes != (size_t) -1) {
1390 m = pa_mempool_block_size_max(s->context->mempool);
1391 fs = pa_frame_size(&s->sample_spec);
1398 if (!s->write_memblock) {
1399 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1400 s->write_data = pa_memblock_acquire(s->write_memblock);
1403 *data = s->write_data;
1404 *nbytes = pa_memblock_get_length(s->write_memblock);
1409 int pa_stream_cancel_write(
1413 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1415 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1416 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1417 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1418 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1420 pa_assert(s->write_data);
1422 pa_memblock_release(s->write_memblock);
1423 pa_memblock_unref(s->write_memblock);
1424 s->write_memblock = NULL;
1425 s->write_data = NULL;
1430 int pa_stream_write(
1434 pa_free_cb_t free_cb,
1436 pa_seek_mode_t seek) {
1439 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1442 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1443 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1444 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1445 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1446 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1447 PA_CHECK_VALIDITY(s->context,
1448 !s->write_memblock ||
1449 ((data >= s->write_data) &&
1450 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1452 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1454 if (s->write_memblock) {
1457 /* pa_stream_write_begin() was called before */
1459 pa_memblock_release(s->write_memblock);
1461 chunk.memblock = s->write_memblock;
1462 chunk.index = (const char *) data - (const char *) s->write_data;
1463 chunk.length = length;
1465 s->write_memblock = NULL;
1466 s->write_data = NULL;
1468 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1469 pa_memblock_unref(chunk.memblock);
1472 pa_seek_mode_t t_seek = seek;
1473 int64_t t_offset = offset;
1474 size_t t_length = length;
1475 const void *t_data = data;
1477 /* pa_stream_write_begin() was not called before */
1479 while (t_length > 0) {
1484 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1485 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1486 chunk.length = t_length;
1490 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1491 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1493 d = pa_memblock_acquire(chunk.memblock);
1494 memcpy(d, t_data, chunk.length);
1495 pa_memblock_release(chunk.memblock);
1498 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1501 t_seek = PA_SEEK_RELATIVE;
1503 t_data = (const uint8_t*) t_data + chunk.length;
1504 t_length -= chunk.length;
1506 pa_memblock_unref(chunk.memblock);
1509 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1510 free_cb((void*) data);
1513 /* This is obviously wrong since we ignore the seeking index . But
1514 * that's OK, the server side applies the same error */
1515 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1517 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1519 if (s->direction == PA_STREAM_PLAYBACK) {
1521 /* Update latency request correction */
1522 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1524 if (seek == PA_SEEK_ABSOLUTE) {
1525 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1526 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1527 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1528 } else if (seek == PA_SEEK_RELATIVE) {
1529 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1530 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1532 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1535 /* Update the write index in the already available latency data */
1536 if (s->timing_info_valid) {
1538 if (seek == PA_SEEK_ABSOLUTE) {
1539 s->timing_info.write_index_corrupt = FALSE;
1540 s->timing_info.write_index = offset + (int64_t) length;
1541 } else if (seek == PA_SEEK_RELATIVE) {
1542 if (!s->timing_info.write_index_corrupt)
1543 s->timing_info.write_index += offset + (int64_t) length;
1545 s->timing_info.write_index_corrupt = TRUE;
1548 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1549 request_auto_timing_update(s, TRUE);
1555 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1557 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1561 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1562 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1563 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1565 if (!s->peek_memchunk.memblock) {
1567 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1573 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1576 pa_assert(s->peek_data);
1577 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1578 *length = s->peek_memchunk.length;
1582 int pa_stream_drop(pa_stream *s) {
1584 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1586 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1587 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1588 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1589 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1591 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1593 /* Fix the simulated local read index */
1594 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1595 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1597 pa_assert(s->peek_data);
1598 pa_memblock_release(s->peek_memchunk.memblock);
1599 pa_memblock_unref(s->peek_memchunk.memblock);
1600 pa_memchunk_reset(&s->peek_memchunk);
1605 size_t pa_stream_writable_size(pa_stream *s) {
1607 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1609 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1610 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1611 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1613 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1616 size_t pa_stream_readable_size(pa_stream *s) {
1618 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1620 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1621 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1622 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1624 return pa_memblockq_get_length(s->record_memblockq);
1627 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1633 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1635 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1636 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1637 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1639 /* Ask for a timing update before we cork/uncork to get the best
1640 * accuracy for the transport latency suitable for the
1641 * check_smoother_status() call in the started callback */
1642 request_auto_timing_update(s, TRUE);
1644 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1646 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1647 pa_tagstruct_putu32(t, s->channel);
1648 pa_pstream_send_tagstruct(s->context->pstream, t);
1649 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1651 /* This might cause the read index to conitnue again, hence
1652 * let's request a timing update */
1653 request_auto_timing_update(s, TRUE);
1658 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1662 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663 pa_assert(s->state == PA_STREAM_READY);
1664 pa_assert(s->direction != PA_STREAM_UPLOAD);
1665 pa_assert(s->timing_info_valid);
1666 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1667 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1669 if (s->direction == PA_STREAM_PLAYBACK) {
1670 /* The last byte that was written into the output device
1671 * had this time value associated */
1672 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1674 if (!s->corked && !s->suspended) {
1676 if (!ignore_transport)
1677 /* Because the latency info took a little time to come
1678 * to us, we assume that the real output time is actually
1680 usec += s->timing_info.transport_usec;
1682 /* However, the output device usually maintains a buffer
1683 too, hence the real sample currently played is a little
1685 if (s->timing_info.sink_usec >= usec)
1688 usec -= s->timing_info.sink_usec;
1692 pa_assert(s->direction == PA_STREAM_RECORD);
1694 /* The last byte written into the server side queue had
1695 * this time value associated */
1696 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1698 if (!s->corked && !s->suspended) {
1700 if (!ignore_transport)
1701 /* Add transport latency */
1702 usec += s->timing_info.transport_usec;
1704 /* Add latency of data in device buffer */
1705 usec += s->timing_info.source_usec;
1707 /* If this is a monitor source, we need to correct the
1708 * time by the playback device buffer */
1709 if (s->timing_info.sink_usec >= usec)
1712 usec -= s->timing_info.sink_usec;
1719 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1720 pa_operation *o = userdata;
1721 struct timeval local, remote, now;
1723 pa_bool_t playing = FALSE;
1724 uint64_t underrun_for = 0, playing_for = 0;
1728 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1730 if (!o->context || !o->stream)
1733 i = &o->stream->timing_info;
1735 o->stream->timing_info_valid = FALSE;
1736 i->write_index_corrupt = TRUE;
1737 i->read_index_corrupt = TRUE;
1739 if (command != PA_COMMAND_REPLY) {
1740 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1745 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1746 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1747 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1748 pa_tagstruct_get_timeval(t, &local) < 0 ||
1749 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1750 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1751 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1753 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1757 if (o->context->version >= 13 &&
1758 o->stream->direction == PA_STREAM_PLAYBACK)
1759 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1760 pa_tagstruct_getu64(t, &playing_for) < 0) {
1762 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1767 if (!pa_tagstruct_eof(t)) {
1768 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1771 o->stream->timing_info_valid = TRUE;
1772 i->write_index_corrupt = FALSE;
1773 i->read_index_corrupt = FALSE;
1775 i->playing = (int) playing;
1776 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1778 pa_gettimeofday(&now);
1780 /* Calculcate timestamps */
1781 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1782 /* local and remote seem to have synchronized clocks */
1784 if (o->stream->direction == PA_STREAM_PLAYBACK)
1785 i->transport_usec = pa_timeval_diff(&remote, &local);
1787 i->transport_usec = pa_timeval_diff(&now, &remote);
1789 i->synchronized_clocks = TRUE;
1790 i->timestamp = remote;
1792 /* clocks are not synchronized, let's estimate latency then */
1793 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1794 i->synchronized_clocks = FALSE;
1795 i->timestamp = local;
1796 pa_timeval_add(&i->timestamp, i->transport_usec);
1799 /* Invalidate read and write indexes if necessary */
1800 if (tag < o->stream->read_index_not_before)
1801 i->read_index_corrupt = TRUE;
1803 if (tag < o->stream->write_index_not_before)
1804 i->write_index_corrupt = TRUE;
1806 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1807 /* Write index correction */
1810 uint32_t ctag = tag;
1812 /* Go through the saved correction values and add up the
1813 * total correction.*/
1814 for (n = 0, j = o->stream->current_write_index_correction+1;
1815 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1816 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1818 /* Step over invalid data or out-of-date data */
1819 if (!o->stream->write_index_corrections[j].valid ||
1820 o->stream->write_index_corrections[j].tag < ctag)
1823 /* Make sure that everything is in order */
1824 ctag = o->stream->write_index_corrections[j].tag+1;
1826 /* Now fix the write index */
1827 if (o->stream->write_index_corrections[j].corrupt) {
1828 /* A corrupting seek was made */
1829 i->write_index_corrupt = TRUE;
1830 } else if (o->stream->write_index_corrections[j].absolute) {
1831 /* An absolute seek was made */
1832 i->write_index = o->stream->write_index_corrections[j].value;
1833 i->write_index_corrupt = FALSE;
1834 } else if (!i->write_index_corrupt) {
1835 /* A relative seek was made */
1836 i->write_index += o->stream->write_index_corrections[j].value;
1840 /* Clear old correction entries */
1841 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1842 if (!o->stream->write_index_corrections[n].valid)
1845 if (o->stream->write_index_corrections[n].tag <= tag)
1846 o->stream->write_index_corrections[n].valid = FALSE;
1850 if (o->stream->direction == PA_STREAM_RECORD) {
1851 /* Read index correction */
1853 if (!i->read_index_corrupt)
1854 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1857 /* Update smoother if we're not corked */
1858 if (o->stream->smoother && !o->stream->corked) {
1861 u = x = pa_rtclock_now() - i->transport_usec;
1863 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1866 /* If we weren't playing then it will take some time
1867 * until the audio will actually come out through the
1868 * speakers. Since we follow that timing here, we need
1869 * to try to fix this up */
1871 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1873 if (su < i->sink_usec)
1874 x += i->sink_usec - su;
1878 pa_smoother_pause(o->stream->smoother, x);
1880 /* Update the smoother */
1881 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1882 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1883 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1886 pa_smoother_resume(o->stream->smoother, x, TRUE);
1890 o->stream->auto_timing_update_requested = FALSE;
1892 if (o->stream->latency_update_callback)
1893 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1895 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1896 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1897 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1902 pa_operation_done(o);
1903 pa_operation_unref(o);
1906 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1914 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1917 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1918 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1920 if (s->direction == PA_STREAM_PLAYBACK) {
1921 /* Find a place to store the write_index correction data for this entry */
1922 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1924 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1925 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1927 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1929 t = pa_tagstruct_command(
1931 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1933 pa_tagstruct_putu32(t, s->channel);
1934 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1936 pa_pstream_send_tagstruct(s->context->pstream, t);
1937 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1939 if (s->direction == PA_STREAM_PLAYBACK) {
1940 /* Fill in initial correction data */
1942 s->current_write_index_correction = cidx;
1944 s->write_index_corrections[cidx].valid = TRUE;
1945 s->write_index_corrections[cidx].absolute = FALSE;
1946 s->write_index_corrections[cidx].corrupt = FALSE;
1947 s->write_index_corrections[cidx].tag = tag;
1948 s->write_index_corrections[cidx].value = 0;
1954 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1955 pa_stream *s = userdata;
1959 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1963 if (command != PA_COMMAND_REPLY) {
1964 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1967 pa_stream_set_state(s, PA_STREAM_FAILED);
1969 } else if (!pa_tagstruct_eof(t)) {
1970 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1974 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1980 int pa_stream_disconnect(pa_stream *s) {
1985 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1987 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1988 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1989 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1993 t = pa_tagstruct_command(
1995 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1996 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1998 pa_tagstruct_putu32(t, s->channel);
1999 pa_pstream_send_tagstruct(s->context->pstream, t);
2000 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2006 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2008 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2010 if (pa_detect_fork())
2013 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2016 s->read_callback = cb;
2017 s->read_userdata = userdata;
2020 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2022 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2024 if (pa_detect_fork())
2027 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2030 s->write_callback = cb;
2031 s->write_userdata = userdata;
2034 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2036 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2038 if (pa_detect_fork())
2041 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2044 s->state_callback = cb;
2045 s->state_userdata = userdata;
2048 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2050 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2052 if (pa_detect_fork())
2055 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2058 s->overflow_callback = cb;
2059 s->overflow_userdata = userdata;
2062 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2064 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2066 if (pa_detect_fork())
2069 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2072 s->underflow_callback = cb;
2073 s->underflow_userdata = userdata;
2076 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2078 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2080 if (pa_detect_fork())
2083 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2086 s->latency_update_callback = cb;
2087 s->latency_update_userdata = userdata;
2090 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2092 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2094 if (pa_detect_fork())
2097 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2100 s->moved_callback = cb;
2101 s->moved_userdata = userdata;
2104 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108 if (pa_detect_fork())
2111 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2114 s->suspended_callback = cb;
2115 s->suspended_userdata = userdata;
2118 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2120 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2122 if (pa_detect_fork())
2125 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2128 s->started_callback = cb;
2129 s->started_userdata = userdata;
2132 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2134 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2136 if (pa_detect_fork())
2139 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2142 s->event_callback = cb;
2143 s->event_userdata = userdata;
2146 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2148 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2150 if (pa_detect_fork())
2153 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2156 s->buffer_attr_callback = cb;
2157 s->buffer_attr_userdata = userdata;
2160 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2161 pa_operation *o = userdata;
2166 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2171 if (command != PA_COMMAND_REPLY) {
2172 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2176 } else if (!pa_tagstruct_eof(t)) {
2177 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2182 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2183 cb(o->stream, success, o->userdata);
2187 pa_operation_done(o);
2188 pa_operation_unref(o);
2191 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2197 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2201 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2203 /* Ask for a timing update before we cork/uncork to get the best
2204 * accuracy for the transport latency suitable for the
2205 * check_smoother_status() call in the started callback */
2206 request_auto_timing_update(s, TRUE);
2210 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2212 t = pa_tagstruct_command(
2214 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2216 pa_tagstruct_putu32(t, s->channel);
2217 pa_tagstruct_put_boolean(t, !!b);
2218 pa_pstream_send_tagstruct(s->context->pstream, t);
2219 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2221 check_smoother_status(s, FALSE, FALSE, FALSE);
2223 /* This might cause the indexes to hang/start again, hence let's
2224 * request a timing update, after the cork/uncork, too */
2225 request_auto_timing_update(s, TRUE);
2230 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2236 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2238 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2239 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2241 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2243 t = pa_tagstruct_command(s->context, command, &tag);
2244 pa_tagstruct_putu32(t, s->channel);
2245 pa_pstream_send_tagstruct(s->context->pstream, t);
2246 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2251 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2255 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2257 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2258 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2259 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2261 /* Ask for a timing update *before* the flush, so that the
2262 * transport usec is as up to date as possible when we get the
2263 * underflow message and update the smoother status*/
2264 request_auto_timing_update(s, TRUE);
2266 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2269 if (s->direction == PA_STREAM_PLAYBACK) {
2271 if (s->write_index_corrections[s->current_write_index_correction].valid)
2272 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2274 if (s->buffer_attr.prebuf > 0)
2275 check_smoother_status(s, FALSE, FALSE, TRUE);
2277 /* This will change the write index, but leave the
2278 * read index untouched. */
2279 invalidate_indexes(s, FALSE, TRUE);
2282 /* For record streams this has no influence on the write
2283 * index, but the read index might jump. */
2284 invalidate_indexes(s, TRUE, FALSE);
2286 /* Note that we do not update requested_bytes here. This is
2287 * because we cannot really know how data actually was dropped
2288 * from the write index due to this. This 'error' will be applied
2289 * by both client and server and hence we should be fine. */
2294 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2298 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2300 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2301 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2302 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2303 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2305 /* Ask for a timing update before we cork/uncork to get the best
2306 * accuracy for the transport latency suitable for the
2307 * check_smoother_status() call in the started callback */
2308 request_auto_timing_update(s, TRUE);
2310 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2313 /* This might cause the read index to hang again, hence
2314 * let's request a timing update */
2315 request_auto_timing_update(s, TRUE);
2320 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2324 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2326 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2327 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2328 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2331 /* Ask for a timing update before we cork/uncork to get the best
2332 * accuracy for the transport latency suitable for the
2333 * check_smoother_status() call in the started callback */
2334 request_auto_timing_update(s, TRUE);
2336 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2339 /* This might cause the read index to start moving again, hence
2340 * let's request a timing update */
2341 request_auto_timing_update(s, TRUE);
2346 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2350 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2357 if (s->context->version >= 13) {
2358 pa_proplist *p = pa_proplist_new();
2360 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2361 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2362 pa_proplist_free(p);
2367 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2368 t = pa_tagstruct_command(
2370 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2372 pa_tagstruct_putu32(t, s->channel);
2373 pa_tagstruct_puts(t, name);
2374 pa_pstream_send_tagstruct(s->context->pstream, t);
2375 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2381 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2385 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2387 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2388 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2389 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2390 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2391 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2392 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2395 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2397 usec = calc_time(s, FALSE);
2399 /* Make sure the time runs monotonically */
2400 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2401 if (usec < s->previous_time)
2402 usec = s->previous_time;
2404 s->previous_time = usec;
2413 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2415 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2423 if (negative && s->direction == PA_STREAM_RECORD) {
2431 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2437 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2440 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2443 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2444 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2445 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2447 if ((r = pa_stream_get_time(s, &t)) < 0)
2450 if (s->direction == PA_STREAM_PLAYBACK)
2451 cindex = s->timing_info.write_index;
2453 cindex = s->timing_info.read_index;
2458 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2460 if (s->direction == PA_STREAM_PLAYBACK)
2461 *r_usec = time_counter_diff(s, c, t, negative);
2463 *r_usec = time_counter_diff(s, t, c, negative);
2468 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2470 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2472 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2473 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2474 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2475 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2477 return &s->timing_info;
2480 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2482 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2484 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2486 return &s->sample_spec;
2489 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2491 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2493 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2495 return &s->channel_map;
2498 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2500 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502 /* We don't have the format till routing is done */
2503 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2504 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2508 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2510 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2512 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2513 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2514 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2516 return &s->buffer_attr;
2519 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2520 pa_operation *o = userdata;
2525 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2530 if (command != PA_COMMAND_REPLY) {
2531 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2536 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2537 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2538 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2539 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2540 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2541 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2544 } else if (o->stream->direction == PA_STREAM_RECORD) {
2545 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2546 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2547 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2552 if (o->stream->context->version >= 13) {
2555 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2556 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2560 if (o->stream->direction == PA_STREAM_RECORD)
2561 o->stream->timing_info.configured_source_usec = usec;
2563 o->stream->timing_info.configured_sink_usec = usec;
2566 if (!pa_tagstruct_eof(t)) {
2567 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2573 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2574 cb(o->stream, success, o->userdata);
2578 pa_operation_done(o);
2579 pa_operation_unref(o);
2583 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2587 pa_buffer_attr copy;
2590 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2593 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2594 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2595 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2596 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2598 /* Ask for a timing update before we cork/uncork to get the best
2599 * accuracy for the transport latency suitable for the
2600 * check_smoother_status() call in the started callback */
2601 request_auto_timing_update(s, TRUE);
2603 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2605 t = pa_tagstruct_command(
2607 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2609 pa_tagstruct_putu32(t, s->channel);
2612 patch_buffer_attr(s, ©, NULL);
2615 pa_tagstruct_putu32(t, attr->maxlength);
2617 if (s->direction == PA_STREAM_PLAYBACK)
2620 PA_TAG_U32, attr->tlength,
2621 PA_TAG_U32, attr->prebuf,
2622 PA_TAG_U32, attr->minreq,
2625 pa_tagstruct_putu32(t, attr->fragsize);
2627 if (s->context->version >= 13)
2628 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2630 if (s->context->version >= 14)
2631 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2633 pa_pstream_send_tagstruct(s->context->pstream, t);
2634 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2636 /* This might cause changes in the read/write indexex, hence let's
2637 * request a timing update */
2638 request_auto_timing_update(s, TRUE);
2643 uint32_t pa_stream_get_device_index(pa_stream *s) {
2645 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2647 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2648 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2649 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2650 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2651 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2653 return s->device_index;
2656 const char *pa_stream_get_device_name(pa_stream *s) {
2658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2664 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2666 return s->device_name;
2669 int pa_stream_is_suspended(pa_stream *s) {
2671 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2673 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2674 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2675 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2676 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2678 return s->suspended;
2681 int pa_stream_is_corked(pa_stream *s) {
2683 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2685 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2686 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2687 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2692 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2693 pa_operation *o = userdata;
2698 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2703 if (command != PA_COMMAND_REPLY) {
2704 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2710 if (!pa_tagstruct_eof(t)) {
2711 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2716 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2717 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2720 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2721 cb(o->stream, success, o->userdata);
2725 pa_operation_done(o);
2726 pa_operation_unref(o);
2730 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2736 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2738 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2739 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2740 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2741 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2742 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2743 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2745 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2746 o->private = PA_UINT_TO_PTR(rate);
2748 t = pa_tagstruct_command(
2750 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2752 pa_tagstruct_putu32(t, s->channel);
2753 pa_tagstruct_putu32(t, rate);
2755 pa_pstream_send_tagstruct(s->context->pstream, t);
2756 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2761 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2767 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2769 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2770 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2771 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2772 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2773 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2775 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2777 t = pa_tagstruct_command(
2779 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2781 pa_tagstruct_putu32(t, s->channel);
2782 pa_tagstruct_putu32(t, (uint32_t) mode);
2783 pa_tagstruct_put_proplist(t, p);
2785 pa_pstream_send_tagstruct(s->context->pstream, t);
2786 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2788 /* Please note that we don't update s->proplist here, because we
2789 * don't export that field */
2794 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2798 const char * const*k;
2801 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2803 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2804 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2805 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2806 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2807 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2809 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2811 t = pa_tagstruct_command(
2813 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2815 pa_tagstruct_putu32(t, s->channel);
2817 for (k = keys; *k; k++)
2818 pa_tagstruct_puts(t, *k);
2820 pa_tagstruct_puts(t, NULL);
2822 pa_pstream_send_tagstruct(s->context->pstream, t);
2823 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2825 /* Please note that we don't update s->proplist here, because we
2826 * don't export that field */
2831 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2833 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2835 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2836 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2837 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2838 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2840 s->direct_on_input = sink_input_idx;
2845 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2847 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2849 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2850 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2851 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2853 return s->direct_on_input;