2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
83 static pa_stream *pa_stream_new_with_proplist_internal(
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 s = pa_xnew(pa_stream, 1);
106 s->mainloop = c->mainloop;
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
113 s->sample_spec = *ss;
115 s->sample_spec.format = PA_SAMPLE_INVALID;
118 s->channel_map = *map;
120 pa_channel_map_init(&s->channel_map);
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
129 /* We'll get the final negotiated format after connecting */
132 s->direct_on_input = PA_INVALID_INDEX;
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146 /* We initialize der target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
150 s->buffer_attr.maxlength = (uint32_t) -1;
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
172 s->write_memblock = NULL;
173 s->write_data = NULL;
175 pa_memchunk_reset(&s->peek_memchunk);
177 s->record_memblockq = NULL;
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
182 s->previous_time = 0;
184 s->read_index_not_before = 0;
185 s->write_index_not_before = 0;
186 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
187 s->write_index_corrections[i].valid = 0;
188 s->current_write_index_correction = 0;
190 s->auto_timing_update_event = NULL;
191 s->auto_timing_update_requested = FALSE;
192 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
198 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
199 PA_LLIST_PREPEND(pa_stream, c->streams, s);
205 pa_stream *pa_stream_new_with_proplist(
208 const pa_sample_spec *ss,
209 const pa_channel_map *map,
214 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
215 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
226 pa_stream *pa_stream_new_extended(
229 pa_format_info * const *formats,
230 unsigned int n_formats,
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
238 static void stream_unlink(pa_stream *s) {
245 /* Detach from context */
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
252 pa_operation_cancel(o);
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel_valid = FALSE;
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
278 static void stream_free(pa_stream *s) {
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
290 if (s->peek_memchunk.memblock) {
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
300 pa_proplist_free(s->proplist);
303 pa_smoother_free(s->smoother);
305 for (i = 0; i < s->n_formats; i++)
306 pa_format_info_free(s->req_formats[i]);
309 pa_format_info_free(s->format);
311 pa_xfree(s->device_name);
315 void pa_stream_unref(pa_stream *s) {
317 pa_assert(PA_REFCNT_VALUE(s) >= 1);
319 if (PA_REFCNT_DEC(s) <= 0)
323 pa_stream* pa_stream_ref(pa_stream *s) {
325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
331 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
333 pa_assert(PA_REFCNT_VALUE(s) >= 1);
338 pa_context* pa_stream_get_context(pa_stream *s) {
340 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345 uint32_t pa_stream_get_index(pa_stream *s) {
347 pa_assert(PA_REFCNT_VALUE(s) >= 1);
349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
352 return s->stream_index;
355 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
366 if (s->state_callback)
367 s->state_callback(s, s->state_userdata);
369 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
375 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
379 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
382 if (s->state == PA_STREAM_READY &&
383 (force || !s->auto_timing_update_requested)) {
386 /* pa_log("Automatically requesting new timing data"); */
388 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
389 pa_operation_unref(o);
390 s->auto_timing_update_requested = TRUE;
394 if (s->auto_timing_update_event) {
395 if (s->suspended && !force) {
396 pa_assert(s->mainloop);
397 s->mainloop->time_free(s->auto_timing_update_event);
398 s->auto_timing_update_event = NULL;
401 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
403 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
405 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
410 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
411 pa_context *c = userdata;
416 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
419 pa_assert(PA_REFCNT_VALUE(c) >= 1);
423 if (pa_tagstruct_getu32(t, &channel) < 0 ||
424 !pa_tagstruct_eof(t)) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
429 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
432 if (s->state != PA_STREAM_READY)
435 pa_context_set_error(c, PA_ERR_KILLED);
436 pa_stream_set_state(s, PA_STREAM_FAILED);
442 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
446 pa_assert(!force_start || !force_stop);
451 x = pa_rtclock_now();
453 if (s->timing_info_valid) {
455 x -= s->timing_info.transport_usec;
457 x += s->timing_info.transport_usec;
460 if (s->suspended || s->corked || force_stop)
461 pa_smoother_pause(s->smoother, x);
462 else if (force_start || s->buffer_attr.prebuf == 0) {
464 if (!s->timing_info_valid &&
468 s->context->version >= 13) {
470 /* If the server supports STARTED events we take them as
471 * indications when audio really starts/stops playing, if
472 * we don't have any timing info yet -- instead of trying
473 * to be smart and guessing the server time. Otherwise the
474 * unknown transport delay add too much noise to our time
480 pa_smoother_resume(s->smoother, x, TRUE);
483 /* Please note that we have no idea if playback actually started
484 * if prebuf is non-zero! */
487 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
489 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
497 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
500 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
503 pa_assert(PA_REFCNT_VALUE(c) >= 1);
507 if (c->version < 12) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
512 if (pa_tagstruct_getu32(t, &channel) < 0 ||
513 pa_tagstruct_getu32(t, &di) < 0 ||
514 pa_tagstruct_gets(t, &dn) < 0 ||
515 pa_tagstruct_get_boolean(t, &suspended) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
520 if (c->version >= 13) {
522 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
523 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
524 pa_tagstruct_getu32(t, &fragsize) < 0 ||
525 pa_tagstruct_get_usec(t, &usec) < 0) {
526 pa_context_fail(c, PA_ERR_PROTOCOL);
530 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
531 pa_tagstruct_getu32(t, &tlength) < 0 ||
532 pa_tagstruct_getu32(t, &prebuf) < 0 ||
533 pa_tagstruct_getu32(t, &minreq) < 0 ||
534 pa_tagstruct_get_usec(t, &usec) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
541 if (!pa_tagstruct_eof(t)) {
542 pa_context_fail(c, PA_ERR_PROTOCOL);
546 if (!dn || di == PA_INVALID_INDEX) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
551 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
554 if (s->state != PA_STREAM_READY)
557 if (c->version >= 13) {
558 if (s->direction == PA_STREAM_RECORD)
559 s->timing_info.configured_source_usec = usec;
561 s->timing_info.configured_sink_usec = usec;
563 s->buffer_attr.maxlength = maxlength;
564 s->buffer_attr.fragsize = fragsize;
565 s->buffer_attr.tlength = tlength;
566 s->buffer_attr.prebuf = prebuf;
567 s->buffer_attr.minreq = minreq;
570 pa_xfree(s->device_name);
571 s->device_name = pa_xstrdup(dn);
572 s->device_index = di;
574 s->suspended = suspended;
576 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
577 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
578 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
579 request_auto_timing_update(s, TRUE);
582 check_smoother_status(s, TRUE, FALSE, FALSE);
583 request_auto_timing_update(s, TRUE);
585 if (s->moved_callback)
586 s->moved_callback(s, s->moved_userdata);
592 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
593 pa_context *c = userdata;
597 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
600 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
603 pa_assert(PA_REFCNT_VALUE(c) >= 1);
607 if (c->version < 15) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
612 if (pa_tagstruct_getu32(t, &channel) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
617 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
618 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
619 pa_tagstruct_getu32(t, &fragsize) < 0 ||
620 pa_tagstruct_get_usec(t, &usec) < 0) {
621 pa_context_fail(c, PA_ERR_PROTOCOL);
625 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
626 pa_tagstruct_getu32(t, &tlength) < 0 ||
627 pa_tagstruct_getu32(t, &prebuf) < 0 ||
628 pa_tagstruct_getu32(t, &minreq) < 0 ||
629 pa_tagstruct_get_usec(t, &usec) < 0) {
630 pa_context_fail(c, PA_ERR_PROTOCOL);
635 if (!pa_tagstruct_eof(t)) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
640 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
643 if (s->state != PA_STREAM_READY)
646 if (s->direction == PA_STREAM_RECORD)
647 s->timing_info.configured_source_usec = usec;
649 s->timing_info.configured_sink_usec = usec;
651 s->buffer_attr.maxlength = maxlength;
652 s->buffer_attr.fragsize = fragsize;
653 s->buffer_attr.tlength = tlength;
654 s->buffer_attr.prebuf = prebuf;
655 s->buffer_attr.minreq = minreq;
657 request_auto_timing_update(s, TRUE);
659 if (s->buffer_attr_callback)
660 s->buffer_attr_callback(s, s->buffer_attr_userdata);
666 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
667 pa_context *c = userdata;
673 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
676 pa_assert(PA_REFCNT_VALUE(c) >= 1);
680 if (c->version < 12) {
681 pa_context_fail(c, PA_ERR_PROTOCOL);
685 if (pa_tagstruct_getu32(t, &channel) < 0 ||
686 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
687 !pa_tagstruct_eof(t)) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
692 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
695 if (s->state != PA_STREAM_READY)
698 s->suspended = suspended;
700 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
701 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
702 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
703 request_auto_timing_update(s, TRUE);
706 check_smoother_status(s, TRUE, FALSE, FALSE);
707 request_auto_timing_update(s, TRUE);
709 if (s->suspended_callback)
710 s->suspended_callback(s, s->suspended_userdata);
716 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 pa_context *c = userdata;
722 pa_assert(command == PA_COMMAND_STARTED);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
729 if (c->version < 13) {
730 pa_context_fail(c, PA_ERR_PROTOCOL);
734 if (pa_tagstruct_getu32(t, &channel) < 0 ||
735 !pa_tagstruct_eof(t)) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
740 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
743 if (s->state != PA_STREAM_READY)
746 check_smoother_status(s, TRUE, TRUE, FALSE);
747 request_auto_timing_update(s, TRUE);
749 if (s->started_callback)
750 s->started_callback(s, s->started_userdata);
756 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
760 pa_proplist *pl = NULL;
764 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
767 pa_assert(PA_REFCNT_VALUE(c) >= 1);
771 if (c->version < 15) {
772 pa_context_fail(c, PA_ERR_PROTOCOL);
776 pl = pa_proplist_new();
778 if (pa_tagstruct_getu32(t, &channel) < 0 ||
779 pa_tagstruct_gets(t, &event) < 0 ||
780 pa_tagstruct_get_proplist(t, pl) < 0 ||
781 !pa_tagstruct_eof(t) || !event) {
782 pa_context_fail(c, PA_ERR_PROTOCOL);
786 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
789 if (s->state != PA_STREAM_READY)
792 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
793 /* Let client know what the running time was when the stream had to be killed */
794 pa_usec_t stream_time;
795 if (pa_stream_get_time(s, &stream_time) == 0)
796 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
799 if (s->event_callback)
800 s->event_callback(s, event, pl, s->event_userdata);
806 pa_proplist_free(pl);
809 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811 pa_context *c = userdata;
812 uint32_t bytes, channel;
815 pa_assert(command == PA_COMMAND_REQUEST);
818 pa_assert(PA_REFCNT_VALUE(c) >= 1);
822 if (pa_tagstruct_getu32(t, &channel) < 0 ||
823 pa_tagstruct_getu32(t, &bytes) < 0 ||
824 !pa_tagstruct_eof(t)) {
825 pa_context_fail(c, PA_ERR_PROTOCOL);
829 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
832 if (s->state != PA_STREAM_READY)
835 s->requested_bytes += bytes;
837 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
839 if (s->requested_bytes > 0 && s->write_callback)
840 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
846 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
848 pa_context *c = userdata;
852 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
855 pa_assert(PA_REFCNT_VALUE(c) >= 1);
859 if (pa_tagstruct_getu32(t, &channel) < 0 ||
860 !pa_tagstruct_eof(t)) {
861 pa_context_fail(c, PA_ERR_PROTOCOL);
865 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
868 if (s->state != PA_STREAM_READY)
871 if (s->buffer_attr.prebuf > 0)
872 check_smoother_status(s, TRUE, FALSE, TRUE);
874 request_auto_timing_update(s, TRUE);
876 if (command == PA_COMMAND_OVERFLOW) {
877 if (s->overflow_callback)
878 s->overflow_callback(s, s->overflow_userdata);
879 } else if (command == PA_COMMAND_UNDERFLOW) {
880 if (s->underflow_callback)
881 s->underflow_callback(s, s->underflow_userdata);
888 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
890 pa_assert(PA_REFCNT_VALUE(s) >= 1);
892 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
894 if (s->state != PA_STREAM_READY)
898 s->write_index_not_before = s->context->ctag;
900 if (s->timing_info_valid)
901 s->timing_info.write_index_corrupt = TRUE;
903 /* pa_log("write_index invalidated"); */
907 s->read_index_not_before = s->context->ctag;
909 if (s->timing_info_valid)
910 s->timing_info.read_index_corrupt = TRUE;
912 /* pa_log("read_index invalidated"); */
915 request_auto_timing_update(s, TRUE);
918 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
919 pa_stream *s = userdata;
922 pa_assert(PA_REFCNT_VALUE(s) >= 1);
925 request_auto_timing_update(s, FALSE);
929 static void create_stream_complete(pa_stream *s) {
931 pa_assert(PA_REFCNT_VALUE(s) >= 1);
932 pa_assert(s->state == PA_STREAM_CREATING);
934 pa_stream_set_state(s, PA_STREAM_READY);
936 if (s->requested_bytes > 0 && s->write_callback)
937 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
939 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
940 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
941 pa_assert(!s->auto_timing_update_event);
942 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
944 request_auto_timing_update(s, TRUE);
947 check_smoother_status(s, TRUE, FALSE, FALSE);
950 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
956 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
959 if (pa_atou(e, &ms) < 0 || ms <= 0)
960 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
962 attr->maxlength = (uint32_t) -1;
963 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
964 attr->minreq = (uint32_t) -1;
965 attr->prebuf = (uint32_t) -1;
966 attr->fragsize = attr->tlength;
970 *flags |= PA_STREAM_ADJUST_LATENCY;
973 if (s->context->version >= 13)
976 /* Version older than 0.9.10 didn't do server side buffer_attr
977 * selection, hence we have to fake it on the client side. */
979 /* We choose fairly conservative values here, to not confuse
980 * old clients with extremely large playback buffers */
982 if (attr->maxlength == (uint32_t) -1)
983 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
985 if (attr->tlength == (uint32_t) -1)
986 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
988 if (attr->minreq == (uint32_t) -1)
989 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
991 if (attr->prebuf == (uint32_t) -1)
992 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
994 if (attr->fragsize == (uint32_t) -1)
995 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
998 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
999 pa_stream *s = userdata;
1000 uint32_t requested_bytes = 0;
1004 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1005 pa_assert(s->state == PA_STREAM_CREATING);
1009 if (command != PA_COMMAND_REPLY) {
1010 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1013 pa_stream_set_state(s, PA_STREAM_FAILED);
1017 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1018 s->channel == PA_INVALID_INDEX ||
1019 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1020 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1021 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1025 s->requested_bytes = (int64_t) requested_bytes;
1027 if (s->context->version >= 9) {
1028 if (s->direction == PA_STREAM_PLAYBACK) {
1029 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1030 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1031 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1032 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1033 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1036 } else if (s->direction == PA_STREAM_RECORD) {
1037 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1038 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1039 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1045 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1048 const char *dn = NULL;
1049 pa_bool_t suspended;
1051 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1052 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1053 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1054 pa_tagstruct_gets(t, &dn) < 0 ||
1055 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1056 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1060 if (!dn || s->device_index == PA_INVALID_INDEX ||
1061 ss.channels != cm.channels ||
1062 !pa_channel_map_valid(&cm) ||
1063 !pa_sample_spec_valid(&ss) ||
1064 (s->n_formats == 0 && (
1065 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1066 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1067 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1068 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1072 pa_xfree(s->device_name);
1073 s->device_name = pa_xstrdup(dn);
1074 s->suspended = suspended;
1076 s->channel_map = cm;
1077 s->sample_spec = ss;
1080 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1083 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1084 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1088 if (s->direction == PA_STREAM_RECORD)
1089 s->timing_info.configured_source_usec = usec;
1091 s->timing_info.configured_sink_usec = usec;
1094 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1095 || s->context->version >= 22) {
1097 pa_format_info *f = pa_format_info_new();
1098 pa_tagstruct_get_format_info(t, f);
1100 if (pa_format_info_valid(f))
1103 pa_format_info_free(f);
1104 if (s->n_formats > 0) {
1105 /* We used the extended API, so we should have got back a proper format */
1106 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1112 if (!pa_tagstruct_eof(t)) {
1113 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1117 if (s->direction == PA_STREAM_RECORD) {
1118 pa_assert(!s->record_memblockq);
1120 s->record_memblockq = pa_memblockq_new(
1122 s->buffer_attr.maxlength,
1124 pa_frame_size(&s->sample_spec),
1131 s->channel_valid = TRUE;
1132 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1134 create_stream_complete(s);
1140 static int create_stream(
1141 pa_stream_direction_t direction,
1144 const pa_buffer_attr *attr,
1145 pa_stream_flags_t flags,
1146 const pa_cvolume *volume,
1147 pa_stream *sync_stream) {
1151 pa_bool_t volume_set = !!volume;
1156 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1157 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1159 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1160 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1161 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1162 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1163 PA_STREAM_INTERPOLATE_TIMING|
1164 PA_STREAM_NOT_MONOTONIC|
1165 PA_STREAM_AUTO_TIMING_UPDATE|
1166 PA_STREAM_NO_REMAP_CHANNELS|
1167 PA_STREAM_NO_REMIX_CHANNELS|
1168 PA_STREAM_FIX_FORMAT|
1170 PA_STREAM_FIX_CHANNELS|
1171 PA_STREAM_DONT_MOVE|
1172 PA_STREAM_VARIABLE_RATE|
1173 PA_STREAM_PEAK_DETECT|
1174 PA_STREAM_START_MUTED|
1175 PA_STREAM_ADJUST_LATENCY|
1176 PA_STREAM_EARLY_REQUESTS|
1177 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1178 PA_STREAM_START_UNMUTED|
1179 PA_STREAM_FAIL_ON_SUSPEND|
1180 PA_STREAM_RELATIVE_VOLUME|
1181 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1184 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1185 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1186 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1187 /* Althought some of the other flags are not supported on older
1188 * version, we don't check for them here, because it doesn't hurt
1189 * when they are passed but actually not supported. This makes
1190 * client development easier */
1192 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1193 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1194 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1195 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1196 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1200 s->direction = direction;
1203 s->syncid = sync_stream->syncid;
1206 s->buffer_attr = *attr;
1207 patch_buffer_attr(s, &s->buffer_attr, &flags);
1210 s->corked = !!(flags & PA_STREAM_START_CORKED);
1212 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1215 x = pa_rtclock_now();
1217 pa_assert(!s->smoother);
1218 s->smoother = pa_smoother_new(
1219 SMOOTHER_ADJUST_TIME,
1220 SMOOTHER_HISTORY_TIME,
1221 !(flags & PA_STREAM_NOT_MONOTONIC),
1223 SMOOTHER_MIN_HISTORY,
1229 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1231 t = pa_tagstruct_command(
1233 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1236 if (s->context->version < 13)
1237 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1241 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1242 PA_TAG_CHANNEL_MAP, &s->channel_map,
1243 PA_TAG_U32, PA_INVALID_INDEX,
1245 PA_TAG_U32, s->buffer_attr.maxlength,
1246 PA_TAG_BOOLEAN, s->corked,
1250 if (pa_sample_spec_valid(&s->sample_spec))
1251 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1253 /* This is not really relevant, since no volume was set, and
1254 * the real number of channels is embedded in the format_info
1256 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1260 if (s->direction == PA_STREAM_PLAYBACK) {
1263 PA_TAG_U32, s->buffer_attr.tlength,
1264 PA_TAG_U32, s->buffer_attr.prebuf,
1265 PA_TAG_U32, s->buffer_attr.minreq,
1266 PA_TAG_U32, s->syncid,
1269 pa_tagstruct_put_cvolume(t, volume);
1271 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1273 if (s->context->version >= 12) {
1276 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1277 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1278 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1279 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1280 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1281 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1282 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1286 if (s->context->version >= 13) {
1288 if (s->direction == PA_STREAM_PLAYBACK)
1289 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1291 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1295 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1296 PA_TAG_PROPLIST, s->proplist,
1299 if (s->direction == PA_STREAM_RECORD)
1300 pa_tagstruct_putu32(t, s->direct_on_input);
1303 if (s->context->version >= 14) {
1305 if (s->direction == PA_STREAM_PLAYBACK)
1306 pa_tagstruct_put_boolean(t, volume_set);
1308 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1311 if (s->context->version >= 15) {
1313 if (s->direction == PA_STREAM_PLAYBACK)
1314 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1316 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1317 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1320 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1321 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1323 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1324 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1326 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1327 || s->context->version >= 22) {
1329 pa_tagstruct_putu8(t, s->n_formats);
1330 for (i = 0; i < s->n_formats; i++)
1331 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1334 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1335 pa_tagstruct_put_cvolume(t, volume);
1336 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1337 pa_tagstruct_put_boolean(t, volume_set);
1338 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1339 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1340 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1343 pa_pstream_send_tagstruct(s->context->pstream, t);
1344 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1346 pa_stream_set_state(s, PA_STREAM_CREATING);
1352 int pa_stream_connect_playback(
1355 const pa_buffer_attr *attr,
1356 pa_stream_flags_t flags,
1357 const pa_cvolume *volume,
1358 pa_stream *sync_stream) {
1361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1363 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1366 int pa_stream_connect_record(
1369 const pa_buffer_attr *attr,
1370 pa_stream_flags_t flags) {
1373 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1375 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1378 int pa_stream_begin_write(
1384 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1386 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1387 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1388 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1389 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1390 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1392 if (*nbytes != (size_t) -1) {
1395 m = pa_mempool_block_size_max(s->context->mempool);
1396 fs = pa_frame_size(&s->sample_spec);
1403 if (!s->write_memblock) {
1404 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1405 s->write_data = pa_memblock_acquire(s->write_memblock);
1408 *data = s->write_data;
1409 *nbytes = pa_memblock_get_length(s->write_memblock);
1414 int pa_stream_cancel_write(
1418 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1420 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1421 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1422 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1423 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1425 pa_assert(s->write_data);
1427 pa_memblock_release(s->write_memblock);
1428 pa_memblock_unref(s->write_memblock);
1429 s->write_memblock = NULL;
1430 s->write_data = NULL;
1435 int pa_stream_write(
1439 pa_free_cb_t free_cb,
1441 pa_seek_mode_t seek) {
1444 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1447 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1448 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1449 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1450 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1451 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1452 PA_CHECK_VALIDITY(s->context,
1453 !s->write_memblock ||
1454 ((data >= s->write_data) &&
1455 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1457 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1459 if (s->write_memblock) {
1462 /* pa_stream_write_begin() was called before */
1464 pa_memblock_release(s->write_memblock);
1466 chunk.memblock = s->write_memblock;
1467 chunk.index = (const char *) data - (const char *) s->write_data;
1468 chunk.length = length;
1470 s->write_memblock = NULL;
1471 s->write_data = NULL;
1473 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1474 pa_memblock_unref(chunk.memblock);
1477 pa_seek_mode_t t_seek = seek;
1478 int64_t t_offset = offset;
1479 size_t t_length = length;
1480 const void *t_data = data;
1482 /* pa_stream_write_begin() was not called before */
1484 while (t_length > 0) {
1489 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1490 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1491 chunk.length = t_length;
1495 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1496 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1498 d = pa_memblock_acquire(chunk.memblock);
1499 memcpy(d, t_data, chunk.length);
1500 pa_memblock_release(chunk.memblock);
1503 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1506 t_seek = PA_SEEK_RELATIVE;
1508 t_data = (const uint8_t*) t_data + chunk.length;
1509 t_length -= chunk.length;
1511 pa_memblock_unref(chunk.memblock);
1514 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1515 free_cb((void*) data);
1518 /* This is obviously wrong since we ignore the seeking index . But
1519 * that's OK, the server side applies the same error */
1520 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1522 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1524 if (s->direction == PA_STREAM_PLAYBACK) {
1526 /* Update latency request correction */
1527 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1529 if (seek == PA_SEEK_ABSOLUTE) {
1530 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1531 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1532 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1533 } else if (seek == PA_SEEK_RELATIVE) {
1534 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1535 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1537 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1540 /* Update the write index in the already available latency data */
1541 if (s->timing_info_valid) {
1543 if (seek == PA_SEEK_ABSOLUTE) {
1544 s->timing_info.write_index_corrupt = FALSE;
1545 s->timing_info.write_index = offset + (int64_t) length;
1546 } else if (seek == PA_SEEK_RELATIVE) {
1547 if (!s->timing_info.write_index_corrupt)
1548 s->timing_info.write_index += offset + (int64_t) length;
1550 s->timing_info.write_index_corrupt = TRUE;
1553 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1554 request_auto_timing_update(s, TRUE);
1560 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1562 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1566 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1567 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1568 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1570 if (!s->peek_memchunk.memblock) {
1572 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1578 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1581 pa_assert(s->peek_data);
1582 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1583 *length = s->peek_memchunk.length;
1587 int pa_stream_drop(pa_stream *s) {
1589 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1591 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1592 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1593 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1594 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1596 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1598 /* Fix the simulated local read index */
1599 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1600 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1602 pa_assert(s->peek_data);
1603 pa_memblock_release(s->peek_memchunk.memblock);
1604 pa_memblock_unref(s->peek_memchunk.memblock);
1605 pa_memchunk_reset(&s->peek_memchunk);
1610 size_t pa_stream_writable_size(pa_stream *s) {
1612 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1615 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1616 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1618 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1621 size_t pa_stream_readable_size(pa_stream *s) {
1623 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1625 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1626 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1627 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1629 return pa_memblockq_get_length(s->record_memblockq);
1632 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1638 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1640 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1641 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1644 /* Ask for a timing update before we cork/uncork to get the best
1645 * accuracy for the transport latency suitable for the
1646 * check_smoother_status() call in the started callback */
1647 request_auto_timing_update(s, TRUE);
1649 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1651 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1652 pa_tagstruct_putu32(t, s->channel);
1653 pa_pstream_send_tagstruct(s->context->pstream, t);
1654 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1656 /* This might cause the read index to continue again, hence
1657 * let's request a timing update */
1658 request_auto_timing_update(s, TRUE);
1663 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1667 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1668 pa_assert(s->state == PA_STREAM_READY);
1669 pa_assert(s->direction != PA_STREAM_UPLOAD);
1670 pa_assert(s->timing_info_valid);
1671 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1672 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1674 if (s->direction == PA_STREAM_PLAYBACK) {
1675 /* The last byte that was written into the output device
1676 * had this time value associated */
1677 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1679 if (!s->corked && !s->suspended) {
1681 if (!ignore_transport)
1682 /* Because the latency info took a little time to come
1683 * to us, we assume that the real output time is actually
1685 usec += s->timing_info.transport_usec;
1687 /* However, the output device usually maintains a buffer
1688 too, hence the real sample currently played is a little
1690 if (s->timing_info.sink_usec >= usec)
1693 usec -= s->timing_info.sink_usec;
1697 pa_assert(s->direction == PA_STREAM_RECORD);
1699 /* The last byte written into the server side queue had
1700 * this time value associated */
1701 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1703 if (!s->corked && !s->suspended) {
1705 if (!ignore_transport)
1706 /* Add transport latency */
1707 usec += s->timing_info.transport_usec;
1709 /* Add latency of data in device buffer */
1710 usec += s->timing_info.source_usec;
1712 /* If this is a monitor source, we need to correct the
1713 * time by the playback device buffer */
1714 if (s->timing_info.sink_usec >= usec)
1717 usec -= s->timing_info.sink_usec;
1724 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1725 pa_operation *o = userdata;
1726 struct timeval local, remote, now;
1728 pa_bool_t playing = FALSE;
1729 uint64_t underrun_for = 0, playing_for = 0;
1733 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1735 if (!o->context || !o->stream)
1738 i = &o->stream->timing_info;
1740 o->stream->timing_info_valid = FALSE;
1741 i->write_index_corrupt = TRUE;
1742 i->read_index_corrupt = TRUE;
1744 if (command != PA_COMMAND_REPLY) {
1745 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1750 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1751 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1752 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1753 pa_tagstruct_get_timeval(t, &local) < 0 ||
1754 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1755 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1756 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1758 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1762 if (o->context->version >= 13 &&
1763 o->stream->direction == PA_STREAM_PLAYBACK)
1764 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1765 pa_tagstruct_getu64(t, &playing_for) < 0) {
1767 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1772 if (!pa_tagstruct_eof(t)) {
1773 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1776 o->stream->timing_info_valid = TRUE;
1777 i->write_index_corrupt = FALSE;
1778 i->read_index_corrupt = FALSE;
1780 i->playing = (int) playing;
1781 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1783 pa_gettimeofday(&now);
1785 /* Calculcate timestamps */
1786 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1787 /* local and remote seem to have synchronized clocks */
1789 if (o->stream->direction == PA_STREAM_PLAYBACK)
1790 i->transport_usec = pa_timeval_diff(&remote, &local);
1792 i->transport_usec = pa_timeval_diff(&now, &remote);
1794 i->synchronized_clocks = TRUE;
1795 i->timestamp = remote;
1797 /* clocks are not synchronized, let's estimate latency then */
1798 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1799 i->synchronized_clocks = FALSE;
1800 i->timestamp = local;
1801 pa_timeval_add(&i->timestamp, i->transport_usec);
1804 /* Invalidate read and write indexes if necessary */
1805 if (tag < o->stream->read_index_not_before)
1806 i->read_index_corrupt = TRUE;
1808 if (tag < o->stream->write_index_not_before)
1809 i->write_index_corrupt = TRUE;
1811 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1812 /* Write index correction */
1815 uint32_t ctag = tag;
1817 /* Go through the saved correction values and add up the
1818 * total correction.*/
1819 for (n = 0, j = o->stream->current_write_index_correction+1;
1820 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1821 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1823 /* Step over invalid data or out-of-date data */
1824 if (!o->stream->write_index_corrections[j].valid ||
1825 o->stream->write_index_corrections[j].tag < ctag)
1828 /* Make sure that everything is in order */
1829 ctag = o->stream->write_index_corrections[j].tag+1;
1831 /* Now fix the write index */
1832 if (o->stream->write_index_corrections[j].corrupt) {
1833 /* A corrupting seek was made */
1834 i->write_index_corrupt = TRUE;
1835 } else if (o->stream->write_index_corrections[j].absolute) {
1836 /* An absolute seek was made */
1837 i->write_index = o->stream->write_index_corrections[j].value;
1838 i->write_index_corrupt = FALSE;
1839 } else if (!i->write_index_corrupt) {
1840 /* A relative seek was made */
1841 i->write_index += o->stream->write_index_corrections[j].value;
1845 /* Clear old correction entries */
1846 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1847 if (!o->stream->write_index_corrections[n].valid)
1850 if (o->stream->write_index_corrections[n].tag <= tag)
1851 o->stream->write_index_corrections[n].valid = FALSE;
1855 if (o->stream->direction == PA_STREAM_RECORD) {
1856 /* Read index correction */
1858 if (!i->read_index_corrupt)
1859 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1862 /* Update smoother if we're not corked */
1863 if (o->stream->smoother && !o->stream->corked) {
1866 u = x = pa_rtclock_now() - i->transport_usec;
1868 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1871 /* If we weren't playing then it will take some time
1872 * until the audio will actually come out through the
1873 * speakers. Since we follow that timing here, we need
1874 * to try to fix this up */
1876 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1878 if (su < i->sink_usec)
1879 x += i->sink_usec - su;
1883 pa_smoother_pause(o->stream->smoother, x);
1885 /* Update the smoother */
1886 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1887 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1888 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1891 pa_smoother_resume(o->stream->smoother, x, TRUE);
1895 o->stream->auto_timing_update_requested = FALSE;
1897 if (o->stream->latency_update_callback)
1898 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1900 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1901 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1902 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1907 pa_operation_done(o);
1908 pa_operation_unref(o);
1911 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1919 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1923 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1925 if (s->direction == PA_STREAM_PLAYBACK) {
1926 /* Find a place to store the write_index correction data for this entry */
1927 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1929 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1930 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1932 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1934 t = pa_tagstruct_command(
1936 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1938 pa_tagstruct_putu32(t, s->channel);
1939 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1941 pa_pstream_send_tagstruct(s->context->pstream, t);
1942 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1944 if (s->direction == PA_STREAM_PLAYBACK) {
1945 /* Fill in initial correction data */
1947 s->current_write_index_correction = cidx;
1949 s->write_index_corrections[cidx].valid = TRUE;
1950 s->write_index_corrections[cidx].absolute = FALSE;
1951 s->write_index_corrections[cidx].corrupt = FALSE;
1952 s->write_index_corrections[cidx].tag = tag;
1953 s->write_index_corrections[cidx].value = 0;
1959 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1960 pa_stream *s = userdata;
1964 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1968 if (command != PA_COMMAND_REPLY) {
1969 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1972 pa_stream_set_state(s, PA_STREAM_FAILED);
1974 } else if (!pa_tagstruct_eof(t)) {
1975 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1979 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1985 int pa_stream_disconnect(pa_stream *s) {
1990 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1992 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1993 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1994 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1998 t = pa_tagstruct_command(
2000 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2001 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2003 pa_tagstruct_putu32(t, s->channel);
2004 pa_pstream_send_tagstruct(s->context->pstream, t);
2005 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2011 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015 if (pa_detect_fork())
2018 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2021 s->read_callback = cb;
2022 s->read_userdata = userdata;
2025 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2029 if (pa_detect_fork())
2032 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2035 s->write_callback = cb;
2036 s->write_userdata = userdata;
2039 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2041 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2043 if (pa_detect_fork())
2046 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2049 s->state_callback = cb;
2050 s->state_userdata = userdata;
2053 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2055 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2057 if (pa_detect_fork())
2060 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2063 s->overflow_callback = cb;
2064 s->overflow_userdata = userdata;
2067 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2069 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2071 if (pa_detect_fork())
2074 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2077 s->underflow_callback = cb;
2078 s->underflow_userdata = userdata;
2081 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2083 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2085 if (pa_detect_fork())
2088 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2091 s->latency_update_callback = cb;
2092 s->latency_update_userdata = userdata;
2095 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2097 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2099 if (pa_detect_fork())
2102 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2105 s->moved_callback = cb;
2106 s->moved_userdata = userdata;
2109 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2111 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2113 if (pa_detect_fork())
2116 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2119 s->suspended_callback = cb;
2120 s->suspended_userdata = userdata;
2123 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2125 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127 if (pa_detect_fork())
2130 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2133 s->started_callback = cb;
2134 s->started_userdata = userdata;
2137 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2141 if (pa_detect_fork())
2144 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2147 s->event_callback = cb;
2148 s->event_userdata = userdata;
2151 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2153 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2155 if (pa_detect_fork())
2158 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2161 s->buffer_attr_callback = cb;
2162 s->buffer_attr_userdata = userdata;
2165 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2166 pa_operation *o = userdata;
2171 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2176 if (command != PA_COMMAND_REPLY) {
2177 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2181 } else if (!pa_tagstruct_eof(t)) {
2182 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2187 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2188 cb(o->stream, success, o->userdata);
2192 pa_operation_done(o);
2193 pa_operation_unref(o);
2196 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2202 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2204 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2205 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2206 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2208 /* Ask for a timing update before we cork/uncork to get the best
2209 * accuracy for the transport latency suitable for the
2210 * check_smoother_status() call in the started callback */
2211 request_auto_timing_update(s, TRUE);
2215 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2217 t = pa_tagstruct_command(
2219 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2221 pa_tagstruct_putu32(t, s->channel);
2222 pa_tagstruct_put_boolean(t, !!b);
2223 pa_pstream_send_tagstruct(s->context->pstream, t);
2224 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2226 check_smoother_status(s, FALSE, FALSE, FALSE);
2228 /* This might cause the indexes to hang/start again, hence let's
2229 * request a timing update, after the cork/uncork, too */
2230 request_auto_timing_update(s, TRUE);
2235 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2241 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2243 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2244 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2246 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2248 t = pa_tagstruct_command(s->context, command, &tag);
2249 pa_tagstruct_putu32(t, s->channel);
2250 pa_pstream_send_tagstruct(s->context->pstream, t);
2251 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2256 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2262 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2263 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2264 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2266 /* Ask for a timing update *before* the flush, so that the
2267 * transport usec is as up to date as possible when we get the
2268 * underflow message and update the smoother status*/
2269 request_auto_timing_update(s, TRUE);
2271 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2274 if (s->direction == PA_STREAM_PLAYBACK) {
2276 if (s->write_index_corrections[s->current_write_index_correction].valid)
2277 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2279 if (s->buffer_attr.prebuf > 0)
2280 check_smoother_status(s, FALSE, FALSE, TRUE);
2282 /* This will change the write index, but leave the
2283 * read index untouched. */
2284 invalidate_indexes(s, FALSE, TRUE);
2287 /* For record streams this has no influence on the write
2288 * index, but the read index might jump. */
2289 invalidate_indexes(s, TRUE, FALSE);
2291 /* Note that we do not update requested_bytes here. This is
2292 * because we cannot really know how data actually was dropped
2293 * from the write index due to this. This 'error' will be applied
2294 * by both client and server and hence we should be fine. */
2299 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2303 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2305 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2306 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2307 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2308 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2310 /* Ask for a timing update before we cork/uncork to get the best
2311 * accuracy for the transport latency suitable for the
2312 * check_smoother_status() call in the started callback */
2313 request_auto_timing_update(s, TRUE);
2315 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2318 /* This might cause the read index to hang again, hence
2319 * let's request a timing update */
2320 request_auto_timing_update(s, TRUE);
2325 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2329 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2331 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2332 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2333 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2336 /* Ask for a timing update before we cork/uncork to get the best
2337 * accuracy for the transport latency suitable for the
2338 * check_smoother_status() call in the started callback */
2339 request_auto_timing_update(s, TRUE);
2341 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2344 /* This might cause the read index to start moving again, hence
2345 * let's request a timing update */
2346 request_auto_timing_update(s, TRUE);
2351 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2355 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2358 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2362 if (s->context->version >= 13) {
2363 pa_proplist *p = pa_proplist_new();
2365 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2366 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2367 pa_proplist_free(p);
2372 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2373 t = pa_tagstruct_command(
2375 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2377 pa_tagstruct_putu32(t, s->channel);
2378 pa_tagstruct_puts(t, name);
2379 pa_pstream_send_tagstruct(s->context->pstream, t);
2380 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2386 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2390 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2392 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2393 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2394 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2395 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2396 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2397 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2400 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2402 usec = calc_time(s, FALSE);
2404 /* Make sure the time runs monotonically */
2405 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2406 if (usec < s->previous_time)
2407 usec = s->previous_time;
2409 s->previous_time = usec;
2418 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2420 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2428 if (negative && s->direction == PA_STREAM_RECORD) {
2436 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2445 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2446 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2447 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2448 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2449 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2450 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2452 if ((r = pa_stream_get_time(s, &t)) < 0)
2455 if (s->direction == PA_STREAM_PLAYBACK)
2456 cindex = s->timing_info.write_index;
2458 cindex = s->timing_info.read_index;
2463 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2465 if (s->direction == PA_STREAM_PLAYBACK)
2466 *r_usec = time_counter_diff(s, c, t, negative);
2468 *r_usec = time_counter_diff(s, t, c, negative);
2473 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2475 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2477 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2478 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2479 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2480 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2482 return &s->timing_info;
2485 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2487 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2491 return &s->sample_spec;
2494 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2496 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500 return &s->channel_map;
2503 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2505 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2507 /* We don't have the format till routing is done */
2508 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2509 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2513 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2515 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2517 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2521 return &s->buffer_attr;
2524 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2525 pa_operation *o = userdata;
2530 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2535 if (command != PA_COMMAND_REPLY) {
2536 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2541 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2542 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2543 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2544 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2545 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2546 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2549 } else if (o->stream->direction == PA_STREAM_RECORD) {
2550 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2551 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2552 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2557 if (o->stream->context->version >= 13) {
2560 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2561 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2565 if (o->stream->direction == PA_STREAM_RECORD)
2566 o->stream->timing_info.configured_source_usec = usec;
2568 o->stream->timing_info.configured_sink_usec = usec;
2571 if (!pa_tagstruct_eof(t)) {
2572 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2578 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2579 cb(o->stream, success, o->userdata);
2583 pa_operation_done(o);
2584 pa_operation_unref(o);
2588 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2592 pa_buffer_attr copy;
2595 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2598 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2603 /* Ask for a timing update before we cork/uncork to get the best
2604 * accuracy for the transport latency suitable for the
2605 * check_smoother_status() call in the started callback */
2606 request_auto_timing_update(s, TRUE);
2608 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2610 t = pa_tagstruct_command(
2612 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2614 pa_tagstruct_putu32(t, s->channel);
2617 patch_buffer_attr(s, ©, NULL);
2620 pa_tagstruct_putu32(t, attr->maxlength);
2622 if (s->direction == PA_STREAM_PLAYBACK)
2625 PA_TAG_U32, attr->tlength,
2626 PA_TAG_U32, attr->prebuf,
2627 PA_TAG_U32, attr->minreq,
2630 pa_tagstruct_putu32(t, attr->fragsize);
2632 if (s->context->version >= 13)
2633 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2635 if (s->context->version >= 14)
2636 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2638 pa_pstream_send_tagstruct(s->context->pstream, t);
2639 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2641 /* This might cause changes in the read/write indexex, hence let's
2642 * request a timing update */
2643 request_auto_timing_update(s, TRUE);
2648 uint32_t pa_stream_get_device_index(pa_stream *s) {
2650 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2652 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2653 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2654 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2655 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2656 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2658 return s->device_index;
2661 const char *pa_stream_get_device_name(pa_stream *s) {
2663 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2666 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2667 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2668 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2669 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2671 return s->device_name;
2674 int pa_stream_is_suspended(pa_stream *s) {
2676 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2678 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2679 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2680 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2681 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2683 return s->suspended;
2686 int pa_stream_is_corked(pa_stream *s) {
2688 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2690 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2691 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2692 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2697 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2698 pa_operation *o = userdata;
2703 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2708 if (command != PA_COMMAND_REPLY) {
2709 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2715 if (!pa_tagstruct_eof(t)) {
2716 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2721 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2722 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2725 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2726 cb(o->stream, success, o->userdata);
2730 pa_operation_done(o);
2731 pa_operation_unref(o);
2735 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2741 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2743 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2745 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2746 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2747 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2750 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2751 o->private = PA_UINT_TO_PTR(rate);
2753 t = pa_tagstruct_command(
2755 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2757 pa_tagstruct_putu32(t, s->channel);
2758 pa_tagstruct_putu32(t, rate);
2760 pa_pstream_send_tagstruct(s->context->pstream, t);
2761 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2766 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2772 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2774 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2775 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2776 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2777 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2778 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2780 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2782 t = pa_tagstruct_command(
2784 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2786 pa_tagstruct_putu32(t, s->channel);
2787 pa_tagstruct_putu32(t, (uint32_t) mode);
2788 pa_tagstruct_put_proplist(t, p);
2790 pa_pstream_send_tagstruct(s->context->pstream, t);
2791 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2793 /* Please note that we don't update s->proplist here, because we
2794 * don't export that field */
2799 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2803 const char * const*k;
2806 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2808 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2814 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2816 t = pa_tagstruct_command(
2818 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2820 pa_tagstruct_putu32(t, s->channel);
2822 for (k = keys; *k; k++)
2823 pa_tagstruct_puts(t, *k);
2825 pa_tagstruct_puts(t, NULL);
2827 pa_pstream_send_tagstruct(s->context->pstream, t);
2828 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2830 /* Please note that we don't update s->proplist here, because we
2831 * don't export that field */
2836 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2838 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2840 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2841 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2842 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2843 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2845 s->direct_on_input = sink_input_idx;
2850 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2852 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2854 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2855 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2856 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2858 return s->direct_on_input;