2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
147 pa_memchunk_reset(&s->peek_memchunk);
150 s->record_memblockq = NULL;
153 memset(&s->timing_info, 0, sizeof(s->timing_info));
154 s->timing_info_valid = FALSE;
156 s->previous_time = 0;
158 s->read_index_not_before = 0;
159 s->write_index_not_before = 0;
160 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
161 s->write_index_corrections[i].valid = 0;
162 s->current_write_index_correction = 0;
164 s->auto_timing_update_event = NULL;
165 s->auto_timing_update_requested = FALSE;
166 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
172 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
173 PA_LLIST_PREPEND(pa_stream, c->streams, s);
179 static void stream_unlink(pa_stream *s) {
186 /* Detach from context */
188 /* Unref all operatio object that point to us */
189 for (o = s->context->operations; o; o = n) {
193 pa_operation_cancel(o);
196 /* Drop all outstanding replies for this stream */
197 if (s->context->pdispatch)
198 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200 if (s->channel_valid) {
201 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203 s->channel_valid = FALSE;
206 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
211 if (s->auto_timing_update_event) {
212 pa_assert(s->mainloop);
213 s->mainloop->time_free(s->auto_timing_update_event);
219 static void stream_free(pa_stream *s) {
224 if (s->peek_memchunk.memblock) {
226 pa_memblock_release(s->peek_memchunk.memblock);
227 pa_memblock_unref(s->peek_memchunk.memblock);
230 if (s->record_memblockq)
231 pa_memblockq_free(s->record_memblockq);
234 pa_proplist_free(s->proplist);
237 pa_smoother_free(s->smoother);
239 pa_xfree(s->device_name);
243 void pa_stream_unref(pa_stream *s) {
245 pa_assert(PA_REFCNT_VALUE(s) >= 1);
247 if (PA_REFCNT_DEC(s) <= 0)
251 pa_stream* pa_stream_ref(pa_stream *s) {
253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
259 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
266 pa_context* pa_stream_get_context(pa_stream *s) {
268 pa_assert(PA_REFCNT_VALUE(s) >= 1);
273 uint32_t pa_stream_get_index(pa_stream *s) {
275 pa_assert(PA_REFCNT_VALUE(s) >= 1);
277 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
278 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
280 return s->stream_index;
283 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
285 pa_assert(PA_REFCNT_VALUE(s) >= 1);
294 if (s->state_callback)
295 s->state_callback(s, s->state_userdata);
297 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
303 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
305 pa_assert(PA_REFCNT_VALUE(s) >= 1);
307 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
310 if (s->state == PA_STREAM_READY &&
311 (force || !s->auto_timing_update_requested)) {
314 /* pa_log("Automatically requesting new timing data"); */
316 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
317 pa_operation_unref(o);
318 s->auto_timing_update_requested = TRUE;
322 if (s->auto_timing_update_event) {
326 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
328 pa_gettimeofday(&next);
329 pa_timeval_add(&next, s->auto_timing_interval_usec);
330 s->mainloop->time_restart(s->auto_timing_update_event, &next);
332 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
336 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
337 pa_context *c = userdata;
342 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345 pa_assert(PA_REFCNT_VALUE(c) >= 1);
349 if (pa_tagstruct_getu32(t, &channel) < 0 ||
350 !pa_tagstruct_eof(t)) {
351 pa_context_fail(c, PA_ERR_PROTOCOL);
355 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358 if (s->state != PA_STREAM_READY)
361 pa_context_set_error(c, PA_ERR_KILLED);
362 pa_stream_set_state(s, PA_STREAM_FAILED);
368 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
372 pa_assert(!force_start || !force_stop);
377 x = pa_rtclock_now();
379 if (s->timing_info_valid) {
381 x -= s->timing_info.transport_usec;
383 x += s->timing_info.transport_usec;
386 if (s->suspended || s->corked || force_stop)
387 pa_smoother_pause(s->smoother, x);
388 else if (force_start || s->buffer_attr.prebuf == 0)
389 pa_smoother_resume(s->smoother, x, TRUE);
392 /* Please note that we have no idea if playback actually started
393 * if prebuf is non-zero! */
396 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
397 pa_context *c = userdata;
404 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
407 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
410 pa_assert(PA_REFCNT_VALUE(c) >= 1);
414 if (c->version < 12) {
415 pa_context_fail(c, PA_ERR_PROTOCOL);
419 if (pa_tagstruct_getu32(t, &channel) < 0 ||
420 pa_tagstruct_getu32(t, &di) < 0 ||
421 pa_tagstruct_gets(t, &dn) < 0 ||
422 pa_tagstruct_get_boolean(t, &suspended) < 0) {
423 pa_context_fail(c, PA_ERR_PROTOCOL);
427 if (c->version >= 13) {
429 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
430 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
431 pa_tagstruct_getu32(t, &fragsize) < 0 ||
432 pa_tagstruct_get_usec(t, &usec) < 0) {
433 pa_context_fail(c, PA_ERR_PROTOCOL);
437 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
438 pa_tagstruct_getu32(t, &tlength) < 0 ||
439 pa_tagstruct_getu32(t, &prebuf) < 0 ||
440 pa_tagstruct_getu32(t, &minreq) < 0 ||
441 pa_tagstruct_get_usec(t, &usec) < 0) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
448 if (!pa_tagstruct_eof(t)) {
449 pa_context_fail(c, PA_ERR_PROTOCOL);
453 if (!dn || di == PA_INVALID_INDEX) {
454 pa_context_fail(c, PA_ERR_PROTOCOL);
458 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
461 if (s->state != PA_STREAM_READY)
464 if (c->version >= 13) {
465 if (s->direction == PA_STREAM_RECORD)
466 s->timing_info.configured_source_usec = usec;
468 s->timing_info.configured_sink_usec = usec;
470 s->buffer_attr.maxlength = maxlength;
471 s->buffer_attr.fragsize = fragsize;
472 s->buffer_attr.tlength = tlength;
473 s->buffer_attr.prebuf = prebuf;
474 s->buffer_attr.minreq = minreq;
477 pa_xfree(s->device_name);
478 s->device_name = pa_xstrdup(dn);
479 s->device_index = di;
481 s->suspended = suspended;
483 check_smoother_status(s, TRUE, FALSE, FALSE);
484 request_auto_timing_update(s, TRUE);
486 if (s->moved_callback)
487 s->moved_callback(s, s->moved_userdata);
493 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
494 pa_context *c = userdata;
498 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
501 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
504 pa_assert(PA_REFCNT_VALUE(c) >= 1);
508 if (c->version < 15) {
509 pa_context_fail(c, PA_ERR_PROTOCOL);
513 if (pa_tagstruct_getu32(t, &channel) < 0) {
514 pa_context_fail(c, PA_ERR_PROTOCOL);
518 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
519 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520 pa_tagstruct_getu32(t, &fragsize) < 0 ||
521 pa_tagstruct_get_usec(t, &usec) < 0) {
522 pa_context_fail(c, PA_ERR_PROTOCOL);
526 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
527 pa_tagstruct_getu32(t, &tlength) < 0 ||
528 pa_tagstruct_getu32(t, &prebuf) < 0 ||
529 pa_tagstruct_getu32(t, &minreq) < 0 ||
530 pa_tagstruct_get_usec(t, &usec) < 0) {
531 pa_context_fail(c, PA_ERR_PROTOCOL);
536 if (!pa_tagstruct_eof(t)) {
537 pa_context_fail(c, PA_ERR_PROTOCOL);
541 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
544 if (s->state != PA_STREAM_READY)
547 if (s->direction == PA_STREAM_RECORD)
548 s->timing_info.configured_source_usec = usec;
550 s->timing_info.configured_sink_usec = usec;
552 s->buffer_attr.maxlength = maxlength;
553 s->buffer_attr.fragsize = fragsize;
554 s->buffer_attr.tlength = tlength;
555 s->buffer_attr.prebuf = prebuf;
556 s->buffer_attr.minreq = minreq;
558 request_auto_timing_update(s, TRUE);
560 if (s->buffer_attr_callback)
561 s->buffer_attr_callback(s, s->buffer_attr_userdata);
567 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
568 pa_context *c = userdata;
574 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
577 pa_assert(PA_REFCNT_VALUE(c) >= 1);
581 if (c->version < 12) {
582 pa_context_fail(c, PA_ERR_PROTOCOL);
586 if (pa_tagstruct_getu32(t, &channel) < 0 ||
587 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
588 !pa_tagstruct_eof(t)) {
589 pa_context_fail(c, PA_ERR_PROTOCOL);
593 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
596 if (s->state != PA_STREAM_READY)
599 s->suspended = suspended;
601 check_smoother_status(s, TRUE, FALSE, FALSE);
602 request_auto_timing_update(s, TRUE);
604 if (s->suspended_callback)
605 s->suspended_callback(s, s->suspended_userdata);
611 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
612 pa_context *c = userdata;
617 pa_assert(command == PA_COMMAND_STARTED);
620 pa_assert(PA_REFCNT_VALUE(c) >= 1);
624 if (c->version < 13) {
625 pa_context_fail(c, PA_ERR_PROTOCOL);
629 if (pa_tagstruct_getu32(t, &channel) < 0 ||
630 !pa_tagstruct_eof(t)) {
631 pa_context_fail(c, PA_ERR_PROTOCOL);
635 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
638 if (s->state != PA_STREAM_READY)
641 check_smoother_status(s, TRUE, TRUE, FALSE);
642 request_auto_timing_update(s, TRUE);
644 if (s->started_callback)
645 s->started_callback(s, s->started_userdata);
651 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
652 pa_context *c = userdata;
655 pa_proplist *pl = NULL;
659 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
662 pa_assert(PA_REFCNT_VALUE(c) >= 1);
666 if (c->version < 15) {
667 pa_context_fail(c, PA_ERR_PROTOCOL);
671 pl = pa_proplist_new();
673 if (pa_tagstruct_getu32(t, &channel) < 0 ||
674 pa_tagstruct_gets(t, &event) < 0 ||
675 pa_tagstruct_get_proplist(t, pl) < 0 ||
676 !pa_tagstruct_eof(t) || !event) {
677 pa_context_fail(c, PA_ERR_PROTOCOL);
681 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
684 if (s->state != PA_STREAM_READY)
687 if (s->event_callback)
688 s->event_callback(s, event, pl, s->event_userdata);
694 pa_proplist_free(pl);
697 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
699 pa_context *c = userdata;
700 uint32_t bytes, channel;
703 pa_assert(command == PA_COMMAND_REQUEST);
706 pa_assert(PA_REFCNT_VALUE(c) >= 1);
710 if (pa_tagstruct_getu32(t, &channel) < 0 ||
711 pa_tagstruct_getu32(t, &bytes) < 0 ||
712 !pa_tagstruct_eof(t)) {
713 pa_context_fail(c, PA_ERR_PROTOCOL);
717 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
720 if (s->state != PA_STREAM_READY)
723 s->requested_bytes += bytes;
725 if (s->requested_bytes > 0 && s->write_callback)
726 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
732 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
734 pa_context *c = userdata;
738 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
741 pa_assert(PA_REFCNT_VALUE(c) >= 1);
745 if (pa_tagstruct_getu32(t, &channel) < 0 ||
746 !pa_tagstruct_eof(t)) {
747 pa_context_fail(c, PA_ERR_PROTOCOL);
751 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
754 if (s->state != PA_STREAM_READY)
757 if (s->buffer_attr.prebuf > 0)
758 check_smoother_status(s, TRUE, FALSE, TRUE);
760 request_auto_timing_update(s, TRUE);
762 if (command == PA_COMMAND_OVERFLOW) {
763 if (s->overflow_callback)
764 s->overflow_callback(s, s->overflow_userdata);
765 } else if (command == PA_COMMAND_UNDERFLOW) {
766 if (s->underflow_callback)
767 s->underflow_callback(s, s->underflow_userdata);
774 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
776 pa_assert(PA_REFCNT_VALUE(s) >= 1);
778 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
780 if (s->state != PA_STREAM_READY)
784 s->write_index_not_before = s->context->ctag;
786 if (s->timing_info_valid)
787 s->timing_info.write_index_corrupt = TRUE;
789 /* pa_log("write_index invalidated"); */
793 s->read_index_not_before = s->context->ctag;
795 if (s->timing_info_valid)
796 s->timing_info.read_index_corrupt = TRUE;
798 /* pa_log("read_index invalidated"); */
801 request_auto_timing_update(s, TRUE);
804 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
805 pa_stream *s = userdata;
808 pa_assert(PA_REFCNT_VALUE(s) >= 1);
811 request_auto_timing_update(s, FALSE);
815 static void create_stream_complete(pa_stream *s) {
817 pa_assert(PA_REFCNT_VALUE(s) >= 1);
818 pa_assert(s->state == PA_STREAM_CREATING);
820 pa_stream_set_state(s, PA_STREAM_READY);
822 if (s->requested_bytes > 0 && s->write_callback)
823 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
825 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
827 pa_gettimeofday(&tv);
828 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829 pa_timeval_add(&tv, s->auto_timing_interval_usec);
830 pa_assert(!s->auto_timing_update_event);
831 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
833 request_auto_timing_update(s, TRUE);
836 check_smoother_status(s, TRUE, FALSE, FALSE);
839 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
844 if (s->context->version >= 13)
847 /* Version older than 0.9.10 didn't do server side buffer_attr
848 * selection, hence we have to fake it on the client side. */
850 /* We choose fairly conservative values here, to not confuse
851 * old clients with extremely large playback buffers */
853 if (attr->maxlength == (uint32_t) -1)
854 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
856 if (attr->tlength == (uint32_t) -1)
857 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
859 if (attr->minreq == (uint32_t) -1)
860 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
862 if (attr->prebuf == (uint32_t) -1)
863 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
865 if (attr->fragsize == (uint32_t) -1)
866 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
869 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
870 pa_stream *s = userdata;
871 uint32_t requested_bytes;
875 pa_assert(PA_REFCNT_VALUE(s) >= 1);
876 pa_assert(s->state == PA_STREAM_CREATING);
880 if (command != PA_COMMAND_REPLY) {
881 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
884 pa_stream_set_state(s, PA_STREAM_FAILED);
888 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
889 s->channel == PA_INVALID_INDEX ||
890 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
891 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
892 pa_context_fail(s->context, PA_ERR_PROTOCOL);
896 s->requested_bytes = (int64_t) requested_bytes;
898 if (s->context->version >= 9) {
899 if (s->direction == PA_STREAM_PLAYBACK) {
900 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
901 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
902 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
903 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
904 pa_context_fail(s->context, PA_ERR_PROTOCOL);
907 } else if (s->direction == PA_STREAM_RECORD) {
908 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
909 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
910 pa_context_fail(s->context, PA_ERR_PROTOCOL);
916 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
919 const char *dn = NULL;
922 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
923 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
924 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
925 pa_tagstruct_gets(t, &dn) < 0 ||
926 pa_tagstruct_get_boolean(t, &suspended) < 0) {
927 pa_context_fail(s->context, PA_ERR_PROTOCOL);
931 if (!dn || s->device_index == PA_INVALID_INDEX ||
932 ss.channels != cm.channels ||
933 !pa_channel_map_valid(&cm) ||
934 !pa_sample_spec_valid(&ss) ||
935 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
936 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
937 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
938 pa_context_fail(s->context, PA_ERR_PROTOCOL);
942 pa_xfree(s->device_name);
943 s->device_name = pa_xstrdup(dn);
944 s->suspended = suspended;
950 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
953 if (pa_tagstruct_get_usec(t, &usec) < 0) {
954 pa_context_fail(s->context, PA_ERR_PROTOCOL);
958 if (s->direction == PA_STREAM_RECORD)
959 s->timing_info.configured_source_usec = usec;
961 s->timing_info.configured_sink_usec = usec;
964 if (!pa_tagstruct_eof(t)) {
965 pa_context_fail(s->context, PA_ERR_PROTOCOL);
969 if (s->direction == PA_STREAM_RECORD) {
970 pa_assert(!s->record_memblockq);
972 s->record_memblockq = pa_memblockq_new(
974 s->buffer_attr.maxlength,
976 pa_frame_size(&s->sample_spec),
983 s->channel_valid = TRUE;
984 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
986 create_stream_complete(s);
992 static int create_stream(
993 pa_stream_direction_t direction,
996 const pa_buffer_attr *attr,
997 pa_stream_flags_t flags,
998 const pa_cvolume *volume,
999 pa_stream *sync_stream) {
1003 pa_bool_t volume_set = FALSE;
1006 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1007 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1009 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1010 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1011 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1012 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1013 PA_STREAM_INTERPOLATE_TIMING|
1014 PA_STREAM_NOT_MONOTONIC|
1015 PA_STREAM_AUTO_TIMING_UPDATE|
1016 PA_STREAM_NO_REMAP_CHANNELS|
1017 PA_STREAM_NO_REMIX_CHANNELS|
1018 PA_STREAM_FIX_FORMAT|
1020 PA_STREAM_FIX_CHANNELS|
1021 PA_STREAM_DONT_MOVE|
1022 PA_STREAM_VARIABLE_RATE|
1023 PA_STREAM_PEAK_DETECT|
1024 PA_STREAM_START_MUTED|
1025 PA_STREAM_ADJUST_LATENCY|
1026 PA_STREAM_EARLY_REQUESTS|
1027 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1028 PA_STREAM_START_UNMUTED|
1029 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1031 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1032 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1033 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1034 /* Althought some of the other flags are not supported on older
1035 * version, we don't check for them here, because it doesn't hurt
1036 * when they are passed but actually not supported. This makes
1037 * client development easier */
1039 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1040 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1041 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1042 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1043 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1047 s->direction = direction;
1049 s->corked = !!(flags & PA_STREAM_START_CORKED);
1052 s->syncid = sync_stream->syncid;
1055 s->buffer_attr = *attr;
1056 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1058 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1061 x = pa_rtclock_now();
1063 pa_assert(!s->smoother);
1064 s->smoother = pa_smoother_new(
1065 SMOOTHER_ADJUST_TIME,
1066 SMOOTHER_HISTORY_TIME,
1067 !(flags & PA_STREAM_NOT_MONOTONIC),
1069 SMOOTHER_MIN_HISTORY,
1075 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1077 t = pa_tagstruct_command(
1079 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1082 if (s->context->version < 13)
1083 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1087 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1088 PA_TAG_CHANNEL_MAP, &s->channel_map,
1089 PA_TAG_U32, PA_INVALID_INDEX,
1091 PA_TAG_U32, s->buffer_attr.maxlength,
1092 PA_TAG_BOOLEAN, s->corked,
1095 if (s->direction == PA_STREAM_PLAYBACK) {
1100 PA_TAG_U32, s->buffer_attr.tlength,
1101 PA_TAG_U32, s->buffer_attr.prebuf,
1102 PA_TAG_U32, s->buffer_attr.minreq,
1103 PA_TAG_U32, s->syncid,
1106 volume_set = !!volume;
1109 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1111 pa_tagstruct_put_cvolume(t, volume);
1113 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1115 if (s->context->version >= 12) {
1118 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1119 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1120 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1121 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1122 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1123 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1124 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1128 if (s->context->version >= 13) {
1130 if (s->direction == PA_STREAM_PLAYBACK)
1131 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1133 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1137 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1138 PA_TAG_PROPLIST, s->proplist,
1141 if (s->direction == PA_STREAM_RECORD)
1142 pa_tagstruct_putu32(t, s->direct_on_input);
1145 if (s->context->version >= 14) {
1147 if (s->direction == PA_STREAM_PLAYBACK)
1148 pa_tagstruct_put_boolean(t, volume_set);
1150 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1153 if (s->context->version >= 15) {
1155 if (s->direction == PA_STREAM_PLAYBACK)
1156 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1158 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1159 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1162 pa_pstream_send_tagstruct(s->context->pstream, t);
1163 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1165 pa_stream_set_state(s, PA_STREAM_CREATING);
1171 int pa_stream_connect_playback(
1174 const pa_buffer_attr *attr,
1175 pa_stream_flags_t flags,
1177 pa_stream *sync_stream) {
1180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1182 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1185 int pa_stream_connect_record(
1188 const pa_buffer_attr *attr,
1189 pa_stream_flags_t flags) {
1192 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1194 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1197 int pa_stream_write(
1201 void (*free_cb)(void *p),
1203 pa_seek_mode_t seek) {
1206 pa_seek_mode_t t_seek;
1212 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1215 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1216 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1217 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1218 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1219 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1229 while (t_length > 0) {
1233 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1234 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1235 chunk.length = t_length;
1239 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1240 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1242 d = pa_memblock_acquire(chunk.memblock);
1243 memcpy(d, t_data, chunk.length);
1244 pa_memblock_release(chunk.memblock);
1247 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1250 t_seek = PA_SEEK_RELATIVE;
1252 t_data = (const uint8_t*) t_data + chunk.length;
1253 t_length -= chunk.length;
1255 pa_memblock_unref(chunk.memblock);
1258 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1259 free_cb((void*) data);
1261 /* This is obviously wrong since we ignore the seeking index . But
1262 * that's OK, the server side applies the same error */
1263 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1265 if (s->direction == PA_STREAM_PLAYBACK) {
1267 /* Update latency request correction */
1268 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1270 if (seek == PA_SEEK_ABSOLUTE) {
1271 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1272 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1273 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1274 } else if (seek == PA_SEEK_RELATIVE) {
1275 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1276 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1278 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1281 /* Update the write index in the already available latency data */
1282 if (s->timing_info_valid) {
1284 if (seek == PA_SEEK_ABSOLUTE) {
1285 s->timing_info.write_index_corrupt = FALSE;
1286 s->timing_info.write_index = offset + (int64_t) length;
1287 } else if (seek == PA_SEEK_RELATIVE) {
1288 if (!s->timing_info.write_index_corrupt)
1289 s->timing_info.write_index += offset + (int64_t) length;
1291 s->timing_info.write_index_corrupt = TRUE;
1294 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1295 request_auto_timing_update(s, TRUE);
1301 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1303 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1307 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1308 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1309 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1311 if (!s->peek_memchunk.memblock) {
1313 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1319 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1322 pa_assert(s->peek_data);
1323 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1324 *length = s->peek_memchunk.length;
1328 int pa_stream_drop(pa_stream *s) {
1330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1332 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1333 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1334 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1335 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1337 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1339 /* Fix the simulated local read index */
1340 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1341 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1343 pa_assert(s->peek_data);
1344 pa_memblock_release(s->peek_memchunk.memblock);
1345 pa_memblock_unref(s->peek_memchunk.memblock);
1346 pa_memchunk_reset(&s->peek_memchunk);
1351 size_t pa_stream_writable_size(pa_stream *s) {
1353 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1355 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1356 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1357 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1359 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1362 size_t pa_stream_readable_size(pa_stream *s) {
1364 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1366 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1367 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1368 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1370 return pa_memblockq_get_length(s->record_memblockq);
1373 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1385 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1387 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1388 pa_tagstruct_putu32(t, s->channel);
1389 pa_pstream_send_tagstruct(s->context->pstream, t);
1390 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1395 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1399 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1400 pa_assert(s->state == PA_STREAM_READY);
1401 pa_assert(s->direction != PA_STREAM_UPLOAD);
1402 pa_assert(s->timing_info_valid);
1403 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1404 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1406 if (s->direction == PA_STREAM_PLAYBACK) {
1407 /* The last byte that was written into the output device
1408 * had this time value associated */
1409 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1411 if (!s->corked && !s->suspended) {
1413 if (!ignore_transport)
1414 /* Because the latency info took a little time to come
1415 * to us, we assume that the real output time is actually
1417 usec += s->timing_info.transport_usec;
1419 /* However, the output device usually maintains a buffer
1420 too, hence the real sample currently played is a little
1422 if (s->timing_info.sink_usec >= usec)
1425 usec -= s->timing_info.sink_usec;
1429 pa_assert(s->direction == PA_STREAM_RECORD);
1431 /* The last byte written into the server side queue had
1432 * this time value associated */
1433 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1435 if (!s->corked && !s->suspended) {
1437 if (!ignore_transport)
1438 /* Add transport latency */
1439 usec += s->timing_info.transport_usec;
1441 /* Add latency of data in device buffer */
1442 usec += s->timing_info.source_usec;
1444 /* If this is a monitor source, we need to correct the
1445 * time by the playback device buffer */
1446 if (s->timing_info.sink_usec >= usec)
1449 usec -= s->timing_info.sink_usec;
1456 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1457 pa_operation *o = userdata;
1458 struct timeval local, remote, now;
1460 pa_bool_t playing = FALSE;
1461 uint64_t underrun_for = 0, playing_for = 0;
1465 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1467 if (!o->context || !o->stream)
1470 i = &o->stream->timing_info;
1472 o->stream->timing_info_valid = FALSE;
1473 i->write_index_corrupt = TRUE;
1474 i->read_index_corrupt = TRUE;
1476 if (command != PA_COMMAND_REPLY) {
1477 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1482 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1483 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1484 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1485 pa_tagstruct_get_timeval(t, &local) < 0 ||
1486 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1487 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1488 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1490 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1494 if (o->context->version >= 13 &&
1495 o->stream->direction == PA_STREAM_PLAYBACK)
1496 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1497 pa_tagstruct_getu64(t, &playing_for) < 0) {
1499 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1504 if (!pa_tagstruct_eof(t)) {
1505 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1508 o->stream->timing_info_valid = TRUE;
1509 i->write_index_corrupt = FALSE;
1510 i->read_index_corrupt = FALSE;
1512 i->playing = (int) playing;
1513 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1515 pa_gettimeofday(&now);
1517 /* Calculcate timestamps */
1518 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1519 /* local and remote seem to have synchronized clocks */
1521 if (o->stream->direction == PA_STREAM_PLAYBACK)
1522 i->transport_usec = pa_timeval_diff(&remote, &local);
1524 i->transport_usec = pa_timeval_diff(&now, &remote);
1526 i->synchronized_clocks = TRUE;
1527 i->timestamp = remote;
1529 /* clocks are not synchronized, let's estimate latency then */
1530 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1531 i->synchronized_clocks = FALSE;
1532 i->timestamp = local;
1533 pa_timeval_add(&i->timestamp, i->transport_usec);
1536 /* Invalidate read and write indexes if necessary */
1537 if (tag < o->stream->read_index_not_before)
1538 i->read_index_corrupt = TRUE;
1540 if (tag < o->stream->write_index_not_before)
1541 i->write_index_corrupt = TRUE;
1543 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1544 /* Write index correction */
1547 uint32_t ctag = tag;
1549 /* Go through the saved correction values and add up the
1550 * total correction.*/
1551 for (n = 0, j = o->stream->current_write_index_correction+1;
1552 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1553 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1555 /* Step over invalid data or out-of-date data */
1556 if (!o->stream->write_index_corrections[j].valid ||
1557 o->stream->write_index_corrections[j].tag < ctag)
1560 /* Make sure that everything is in order */
1561 ctag = o->stream->write_index_corrections[j].tag+1;
1563 /* Now fix the write index */
1564 if (o->stream->write_index_corrections[j].corrupt) {
1565 /* A corrupting seek was made */
1566 i->write_index_corrupt = TRUE;
1567 } else if (o->stream->write_index_corrections[j].absolute) {
1568 /* An absolute seek was made */
1569 i->write_index = o->stream->write_index_corrections[j].value;
1570 i->write_index_corrupt = FALSE;
1571 } else if (!i->write_index_corrupt) {
1572 /* A relative seek was made */
1573 i->write_index += o->stream->write_index_corrections[j].value;
1577 /* Clear old correction entries */
1578 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1579 if (!o->stream->write_index_corrections[n].valid)
1582 if (o->stream->write_index_corrections[n].tag <= tag)
1583 o->stream->write_index_corrections[n].valid = FALSE;
1587 if (o->stream->direction == PA_STREAM_RECORD) {
1588 /* Read index correction */
1590 if (!i->read_index_corrupt)
1591 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1594 /* Update smoother */
1595 if (o->stream->smoother) {
1598 u = x = pa_rtclock_now() - i->transport_usec;
1600 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1603 /* If we weren't playing then it will take some time
1604 * until the audio will actually come out through the
1605 * speakers. Since we follow that timing here, we need
1606 * to try to fix this up */
1608 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1610 if (su < i->sink_usec)
1611 x += i->sink_usec - su;
1615 pa_smoother_pause(o->stream->smoother, x);
1617 /* Update the smoother */
1618 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1619 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1620 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1623 pa_smoother_resume(o->stream->smoother, x, TRUE);
1627 o->stream->auto_timing_update_requested = FALSE;
1629 if (o->stream->latency_update_callback)
1630 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1632 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1633 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1634 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1639 pa_operation_done(o);
1640 pa_operation_unref(o);
1643 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1651 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1653 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1654 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1655 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1657 if (s->direction == PA_STREAM_PLAYBACK) {
1658 /* Find a place to store the write_index correction data for this entry */
1659 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1661 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1664 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1666 t = pa_tagstruct_command(
1668 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1670 pa_tagstruct_putu32(t, s->channel);
1671 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1673 pa_pstream_send_tagstruct(s->context->pstream, t);
1674 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1676 if (s->direction == PA_STREAM_PLAYBACK) {
1677 /* Fill in initial correction data */
1679 s->current_write_index_correction = cidx;
1681 s->write_index_corrections[cidx].valid = TRUE;
1682 s->write_index_corrections[cidx].absolute = FALSE;
1683 s->write_index_corrections[cidx].corrupt = FALSE;
1684 s->write_index_corrections[cidx].tag = tag;
1685 s->write_index_corrections[cidx].value = 0;
1691 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1692 pa_stream *s = userdata;
1696 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1700 if (command != PA_COMMAND_REPLY) {
1701 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1704 pa_stream_set_state(s, PA_STREAM_FAILED);
1706 } else if (!pa_tagstruct_eof(t)) {
1707 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1711 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1717 int pa_stream_disconnect(pa_stream *s) {
1722 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1724 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1725 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1726 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1730 t = pa_tagstruct_command(
1732 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1733 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1735 pa_tagstruct_putu32(t, s->channel);
1736 pa_pstream_send_tagstruct(s->context->pstream, t);
1737 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1743 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1745 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1747 if (pa_detect_fork())
1750 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1753 s->read_callback = cb;
1754 s->read_userdata = userdata;
1757 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1759 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1761 if (pa_detect_fork())
1764 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1767 s->write_callback = cb;
1768 s->write_userdata = userdata;
1771 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1773 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1775 if (pa_detect_fork())
1778 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1781 s->state_callback = cb;
1782 s->state_userdata = userdata;
1785 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1787 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1789 if (pa_detect_fork())
1792 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1795 s->overflow_callback = cb;
1796 s->overflow_userdata = userdata;
1799 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1801 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1803 if (pa_detect_fork())
1806 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1809 s->underflow_callback = cb;
1810 s->underflow_userdata = userdata;
1813 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1815 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1817 if (pa_detect_fork())
1820 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1823 s->latency_update_callback = cb;
1824 s->latency_update_userdata = userdata;
1827 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1829 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1831 if (pa_detect_fork())
1834 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1837 s->moved_callback = cb;
1838 s->moved_userdata = userdata;
1841 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1843 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1845 if (pa_detect_fork())
1848 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1851 s->suspended_callback = cb;
1852 s->suspended_userdata = userdata;
1855 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1857 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1859 if (pa_detect_fork())
1862 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1865 s->started_callback = cb;
1866 s->started_userdata = userdata;
1869 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1871 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1873 if (pa_detect_fork())
1876 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1879 s->event_callback = cb;
1880 s->event_userdata = userdata;
1883 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1885 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1887 if (pa_detect_fork())
1890 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1893 s->buffer_attr_callback = cb;
1894 s->buffer_attr_userdata = userdata;
1897 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1898 pa_operation *o = userdata;
1903 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1908 if (command != PA_COMMAND_REPLY) {
1909 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1913 } else if (!pa_tagstruct_eof(t)) {
1914 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1919 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1920 cb(o->stream, success, o->userdata);
1924 pa_operation_done(o);
1925 pa_operation_unref(o);
1928 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1934 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1936 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1937 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1938 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1942 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1944 t = pa_tagstruct_command(
1946 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1948 pa_tagstruct_putu32(t, s->channel);
1949 pa_tagstruct_put_boolean(t, !!b);
1950 pa_pstream_send_tagstruct(s->context->pstream, t);
1951 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1953 check_smoother_status(s, FALSE, FALSE, FALSE);
1955 /* This might cause the indexes to hang/start again, hence
1956 * let's request a timing update */
1957 request_auto_timing_update(s, TRUE);
1962 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1968 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1970 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1971 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1973 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1975 t = pa_tagstruct_command(s->context, command, &tag);
1976 pa_tagstruct_putu32(t, s->channel);
1977 pa_pstream_send_tagstruct(s->context->pstream, t);
1978 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1983 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1987 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1989 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1990 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1991 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1993 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1996 if (s->direction == PA_STREAM_PLAYBACK) {
1998 if (s->write_index_corrections[s->current_write_index_correction].valid)
1999 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2001 if (s->buffer_attr.prebuf > 0)
2002 check_smoother_status(s, FALSE, FALSE, TRUE);
2004 /* This will change the write index, but leave the
2005 * read index untouched. */
2006 invalidate_indexes(s, FALSE, TRUE);
2009 /* For record streams this has no influence on the write
2010 * index, but the read index might jump. */
2011 invalidate_indexes(s, TRUE, FALSE);
2016 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2020 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2022 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2023 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2024 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2025 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2027 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2030 /* This might cause the read index to hang again, hence
2031 * let's request a timing update */
2032 request_auto_timing_update(s, TRUE);
2037 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2041 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2043 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2044 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2045 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2048 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2051 /* This might cause the read index to start moving again, hence
2052 * let's request a timing update */
2053 request_auto_timing_update(s, TRUE);
2058 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2062 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2065 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2066 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2067 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2069 if (s->context->version >= 13) {
2070 pa_proplist *p = pa_proplist_new();
2072 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2073 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2074 pa_proplist_free(p);
2079 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2080 t = pa_tagstruct_command(
2082 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2084 pa_tagstruct_putu32(t, s->channel);
2085 pa_tagstruct_puts(t, name);
2086 pa_pstream_send_tagstruct(s->context->pstream, t);
2087 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2093 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2097 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2099 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2100 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2101 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2102 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2103 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2104 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2107 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2109 usec = calc_time(s, FALSE);
2111 /* Make sure the time runs monotonically */
2112 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2113 if (usec < s->previous_time)
2114 usec = s->previous_time;
2116 s->previous_time = usec;
2125 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2127 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135 if (negative && s->direction == PA_STREAM_RECORD) {
2143 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2149 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2152 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2153 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2154 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2155 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2156 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2157 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2159 if ((r = pa_stream_get_time(s, &t)) < 0)
2162 if (s->direction == PA_STREAM_PLAYBACK)
2163 cindex = s->timing_info.write_index;
2165 cindex = s->timing_info.read_index;
2170 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2172 if (s->direction == PA_STREAM_PLAYBACK)
2173 *r_usec = time_counter_diff(s, c, t, negative);
2175 *r_usec = time_counter_diff(s, t, c, negative);
2180 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2182 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2184 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2185 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2186 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2187 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2189 return &s->timing_info;
2192 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2194 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198 return &s->sample_spec;
2201 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2203 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2205 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2207 return &s->channel_map;
2210 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2212 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2214 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2215 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2216 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2218 return &s->buffer_attr;
2221 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2222 pa_operation *o = userdata;
2227 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2232 if (command != PA_COMMAND_REPLY) {
2233 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2238 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2239 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2240 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2241 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2242 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2243 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2246 } else if (o->stream->direction == PA_STREAM_RECORD) {
2247 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2248 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2249 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2254 if (o->stream->context->version >= 13) {
2257 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2258 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2262 if (o->stream->direction == PA_STREAM_RECORD)
2263 o->stream->timing_info.configured_source_usec = usec;
2265 o->stream->timing_info.configured_sink_usec = usec;
2268 if (!pa_tagstruct_eof(t)) {
2269 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2275 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2276 cb(o->stream, success, o->userdata);
2280 pa_operation_done(o);
2281 pa_operation_unref(o);
2285 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2294 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2295 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2296 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2297 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2299 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2301 t = pa_tagstruct_command(
2303 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2305 pa_tagstruct_putu32(t, s->channel);
2307 pa_tagstruct_putu32(t, attr->maxlength);
2309 if (s->direction == PA_STREAM_PLAYBACK)
2312 PA_TAG_U32, attr->tlength,
2313 PA_TAG_U32, attr->prebuf,
2314 PA_TAG_U32, attr->minreq,
2317 pa_tagstruct_putu32(t, attr->fragsize);
2319 if (s->context->version >= 13)
2320 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2322 if (s->context->version >= 14)
2323 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2325 pa_pstream_send_tagstruct(s->context->pstream, t);
2326 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2328 /* This might cause changes in the read/write indexex, hence let's
2329 * request a timing update */
2330 request_auto_timing_update(s, TRUE);
2335 uint32_t pa_stream_get_device_index(pa_stream *s) {
2337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2339 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2340 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2341 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2342 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2343 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2345 return s->device_index;
2348 const char *pa_stream_get_device_name(pa_stream *s) {
2350 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2356 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2358 return s->device_name;
2361 int pa_stream_is_suspended(pa_stream *s) {
2363 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2365 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2366 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2367 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2368 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2370 return s->suspended;
2373 int pa_stream_is_corked(pa_stream *s) {
2375 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2377 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2378 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2379 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2384 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2385 pa_operation *o = userdata;
2390 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2395 if (command != PA_COMMAND_REPLY) {
2396 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2402 if (!pa_tagstruct_eof(t)) {
2403 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2408 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2409 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2412 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2413 cb(o->stream, success, o->userdata);
2417 pa_operation_done(o);
2418 pa_operation_unref(o);
2422 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2430 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2431 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2432 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2433 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2434 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2435 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2437 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2438 o->private = PA_UINT_TO_PTR(rate);
2440 t = pa_tagstruct_command(
2442 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2444 pa_tagstruct_putu32(t, s->channel);
2445 pa_tagstruct_putu32(t, rate);
2447 pa_pstream_send_tagstruct(s->context->pstream, t);
2448 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2453 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2459 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2461 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2462 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2463 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2467 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2469 t = pa_tagstruct_command(
2471 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2473 pa_tagstruct_putu32(t, s->channel);
2474 pa_tagstruct_putu32(t, (uint32_t) mode);
2475 pa_tagstruct_put_proplist(t, p);
2477 pa_pstream_send_tagstruct(s->context->pstream, t);
2478 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2480 /* Please note that we don't update s->proplist here, because we
2481 * don't export that field */
2486 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2490 const char * const*k;
2493 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2495 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2496 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2501 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2503 t = pa_tagstruct_command(
2505 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2507 pa_tagstruct_putu32(t, s->channel);
2509 for (k = keys; *k; k++)
2510 pa_tagstruct_puts(t, *k);
2512 pa_tagstruct_puts(t, NULL);
2514 pa_pstream_send_tagstruct(s->context->pstream, t);
2515 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2517 /* Please note that we don't update s->proplist here, because we
2518 * don't export that field */
2523 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2525 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2527 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2528 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2529 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2530 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2532 s->direct_on_input = sink_input_idx;
2537 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2539 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2541 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2542 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2543 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2545 return s->direct_on_input;