2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
147 pa_memchunk_reset(&s->peek_memchunk);
150 s->record_memblockq = NULL;
153 memset(&s->timing_info, 0, sizeof(s->timing_info));
154 s->timing_info_valid = FALSE;
156 s->previous_time = 0;
158 s->read_index_not_before = 0;
159 s->write_index_not_before = 0;
160 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
161 s->write_index_corrections[i].valid = 0;
162 s->current_write_index_correction = 0;
164 s->auto_timing_update_event = NULL;
165 s->auto_timing_update_requested = FALSE;
166 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
172 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
173 PA_LLIST_PREPEND(pa_stream, c->streams, s);
179 static void stream_unlink(pa_stream *s) {
186 /* Detach from context */
188 /* Unref all operatio object that point to us */
189 for (o = s->context->operations; o; o = n) {
193 pa_operation_cancel(o);
196 /* Drop all outstanding replies for this stream */
197 if (s->context->pdispatch)
198 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200 if (s->channel_valid) {
201 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203 s->channel_valid = FALSE;
206 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
211 if (s->auto_timing_update_event) {
212 pa_assert(s->mainloop);
213 s->mainloop->time_free(s->auto_timing_update_event);
219 static void stream_free(pa_stream *s) {
224 if (s->peek_memchunk.memblock) {
226 pa_memblock_release(s->peek_memchunk.memblock);
227 pa_memblock_unref(s->peek_memchunk.memblock);
230 if (s->record_memblockq)
231 pa_memblockq_free(s->record_memblockq);
234 pa_proplist_free(s->proplist);
237 pa_smoother_free(s->smoother);
239 pa_xfree(s->device_name);
243 void pa_stream_unref(pa_stream *s) {
245 pa_assert(PA_REFCNT_VALUE(s) >= 1);
247 if (PA_REFCNT_DEC(s) <= 0)
251 pa_stream* pa_stream_ref(pa_stream *s) {
253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
259 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
266 pa_context* pa_stream_get_context(pa_stream *s) {
268 pa_assert(PA_REFCNT_VALUE(s) >= 1);
273 uint32_t pa_stream_get_index(pa_stream *s) {
275 pa_assert(PA_REFCNT_VALUE(s) >= 1);
277 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
278 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
280 return s->stream_index;
283 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
285 pa_assert(PA_REFCNT_VALUE(s) >= 1);
294 if (s->state_callback)
295 s->state_callback(s, s->state_userdata);
297 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
303 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
305 pa_assert(PA_REFCNT_VALUE(s) >= 1);
307 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
310 if (s->state == PA_STREAM_READY &&
311 (force || !s->auto_timing_update_requested)) {
314 /* pa_log("Automatically requesting new timing data"); */
316 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
317 pa_operation_unref(o);
318 s->auto_timing_update_requested = TRUE;
322 if (s->auto_timing_update_event) {
324 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
326 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
328 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
332 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
333 pa_context *c = userdata;
338 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
341 pa_assert(PA_REFCNT_VALUE(c) >= 1);
345 if (pa_tagstruct_getu32(t, &channel) < 0 ||
346 !pa_tagstruct_eof(t)) {
347 pa_context_fail(c, PA_ERR_PROTOCOL);
351 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
354 if (s->state != PA_STREAM_READY)
357 pa_context_set_error(c, PA_ERR_KILLED);
358 pa_stream_set_state(s, PA_STREAM_FAILED);
364 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
368 pa_assert(!force_start || !force_stop);
373 x = pa_rtclock_now();
375 if (s->timing_info_valid) {
377 x -= s->timing_info.transport_usec;
379 x += s->timing_info.transport_usec;
382 if (s->suspended || s->corked || force_stop)
383 pa_smoother_pause(s->smoother, x);
384 else if (force_start || s->buffer_attr.prebuf == 0)
385 pa_smoother_resume(s->smoother, x, TRUE);
388 /* Please note that we have no idea if playback actually started
389 * if prebuf is non-zero! */
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393 pa_context *c = userdata;
400 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
403 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
406 pa_assert(PA_REFCNT_VALUE(c) >= 1);
410 if (c->version < 12) {
411 pa_context_fail(c, PA_ERR_PROTOCOL);
415 if (pa_tagstruct_getu32(t, &channel) < 0 ||
416 pa_tagstruct_getu32(t, &di) < 0 ||
417 pa_tagstruct_gets(t, &dn) < 0 ||
418 pa_tagstruct_get_boolean(t, &suspended) < 0) {
419 pa_context_fail(c, PA_ERR_PROTOCOL);
423 if (c->version >= 13) {
425 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428 pa_tagstruct_get_usec(t, &usec) < 0) {
429 pa_context_fail(c, PA_ERR_PROTOCOL);
433 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434 pa_tagstruct_getu32(t, &tlength) < 0 ||
435 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436 pa_tagstruct_getu32(t, &minreq) < 0 ||
437 pa_tagstruct_get_usec(t, &usec) < 0) {
438 pa_context_fail(c, PA_ERR_PROTOCOL);
444 if (!pa_tagstruct_eof(t)) {
445 pa_context_fail(c, PA_ERR_PROTOCOL);
449 if (!dn || di == PA_INVALID_INDEX) {
450 pa_context_fail(c, PA_ERR_PROTOCOL);
454 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
457 if (s->state != PA_STREAM_READY)
460 if (c->version >= 13) {
461 if (s->direction == PA_STREAM_RECORD)
462 s->timing_info.configured_source_usec = usec;
464 s->timing_info.configured_sink_usec = usec;
466 s->buffer_attr.maxlength = maxlength;
467 s->buffer_attr.fragsize = fragsize;
468 s->buffer_attr.tlength = tlength;
469 s->buffer_attr.prebuf = prebuf;
470 s->buffer_attr.minreq = minreq;
473 pa_xfree(s->device_name);
474 s->device_name = pa_xstrdup(dn);
475 s->device_index = di;
477 s->suspended = suspended;
479 check_smoother_status(s, TRUE, FALSE, FALSE);
480 request_auto_timing_update(s, TRUE);
482 if (s->moved_callback)
483 s->moved_callback(s, s->moved_userdata);
489 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
494 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
497 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
500 pa_assert(PA_REFCNT_VALUE(c) >= 1);
504 if (c->version < 15) {
505 pa_context_fail(c, PA_ERR_PROTOCOL);
509 if (pa_tagstruct_getu32(t, &channel) < 0) {
510 pa_context_fail(c, PA_ERR_PROTOCOL);
514 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
515 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
516 pa_tagstruct_getu32(t, &fragsize) < 0 ||
517 pa_tagstruct_get_usec(t, &usec) < 0) {
518 pa_context_fail(c, PA_ERR_PROTOCOL);
522 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
523 pa_tagstruct_getu32(t, &tlength) < 0 ||
524 pa_tagstruct_getu32(t, &prebuf) < 0 ||
525 pa_tagstruct_getu32(t, &minreq) < 0 ||
526 pa_tagstruct_get_usec(t, &usec) < 0) {
527 pa_context_fail(c, PA_ERR_PROTOCOL);
532 if (!pa_tagstruct_eof(t)) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
537 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
540 if (s->state != PA_STREAM_READY)
543 if (s->direction == PA_STREAM_RECORD)
544 s->timing_info.configured_source_usec = usec;
546 s->timing_info.configured_sink_usec = usec;
548 s->buffer_attr.maxlength = maxlength;
549 s->buffer_attr.fragsize = fragsize;
550 s->buffer_attr.tlength = tlength;
551 s->buffer_attr.prebuf = prebuf;
552 s->buffer_attr.minreq = minreq;
554 request_auto_timing_update(s, TRUE);
556 if (s->buffer_attr_callback)
557 s->buffer_attr_callback(s, s->buffer_attr_userdata);
563 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
564 pa_context *c = userdata;
570 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
573 pa_assert(PA_REFCNT_VALUE(c) >= 1);
577 if (c->version < 12) {
578 pa_context_fail(c, PA_ERR_PROTOCOL);
582 if (pa_tagstruct_getu32(t, &channel) < 0 ||
583 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
584 !pa_tagstruct_eof(t)) {
585 pa_context_fail(c, PA_ERR_PROTOCOL);
589 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
592 if (s->state != PA_STREAM_READY)
595 s->suspended = suspended;
597 check_smoother_status(s, TRUE, FALSE, FALSE);
598 request_auto_timing_update(s, TRUE);
600 if (s->suspended_callback)
601 s->suspended_callback(s, s->suspended_userdata);
607 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
608 pa_context *c = userdata;
613 pa_assert(command == PA_COMMAND_STARTED);
616 pa_assert(PA_REFCNT_VALUE(c) >= 1);
620 if (c->version < 13) {
621 pa_context_fail(c, PA_ERR_PROTOCOL);
625 if (pa_tagstruct_getu32(t, &channel) < 0 ||
626 !pa_tagstruct_eof(t)) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
631 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
634 if (s->state != PA_STREAM_READY)
637 check_smoother_status(s, TRUE, TRUE, FALSE);
638 request_auto_timing_update(s, TRUE);
640 if (s->started_callback)
641 s->started_callback(s, s->started_userdata);
647 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
648 pa_context *c = userdata;
651 pa_proplist *pl = NULL;
655 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
658 pa_assert(PA_REFCNT_VALUE(c) >= 1);
662 if (c->version < 15) {
663 pa_context_fail(c, PA_ERR_PROTOCOL);
667 pl = pa_proplist_new();
669 if (pa_tagstruct_getu32(t, &channel) < 0 ||
670 pa_tagstruct_gets(t, &event) < 0 ||
671 pa_tagstruct_get_proplist(t, pl) < 0 ||
672 !pa_tagstruct_eof(t) || !event) {
673 pa_context_fail(c, PA_ERR_PROTOCOL);
677 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
680 if (s->state != PA_STREAM_READY)
683 if (s->event_callback)
684 s->event_callback(s, event, pl, s->event_userdata);
690 pa_proplist_free(pl);
693 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
695 pa_context *c = userdata;
696 uint32_t bytes, channel;
699 pa_assert(command == PA_COMMAND_REQUEST);
702 pa_assert(PA_REFCNT_VALUE(c) >= 1);
706 if (pa_tagstruct_getu32(t, &channel) < 0 ||
707 pa_tagstruct_getu32(t, &bytes) < 0 ||
708 !pa_tagstruct_eof(t)) {
709 pa_context_fail(c, PA_ERR_PROTOCOL);
713 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
716 if (s->state != PA_STREAM_READY)
719 s->requested_bytes += bytes;
721 if (s->requested_bytes > 0 && s->write_callback)
722 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
728 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
730 pa_context *c = userdata;
734 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
737 pa_assert(PA_REFCNT_VALUE(c) >= 1);
741 if (pa_tagstruct_getu32(t, &channel) < 0 ||
742 !pa_tagstruct_eof(t)) {
743 pa_context_fail(c, PA_ERR_PROTOCOL);
747 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
750 if (s->state != PA_STREAM_READY)
753 if (s->buffer_attr.prebuf > 0)
754 check_smoother_status(s, TRUE, FALSE, TRUE);
756 request_auto_timing_update(s, TRUE);
758 if (command == PA_COMMAND_OVERFLOW) {
759 if (s->overflow_callback)
760 s->overflow_callback(s, s->overflow_userdata);
761 } else if (command == PA_COMMAND_UNDERFLOW) {
762 if (s->underflow_callback)
763 s->underflow_callback(s, s->underflow_userdata);
770 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
772 pa_assert(PA_REFCNT_VALUE(s) >= 1);
774 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
776 if (s->state != PA_STREAM_READY)
780 s->write_index_not_before = s->context->ctag;
782 if (s->timing_info_valid)
783 s->timing_info.write_index_corrupt = TRUE;
785 /* pa_log("write_index invalidated"); */
789 s->read_index_not_before = s->context->ctag;
791 if (s->timing_info_valid)
792 s->timing_info.read_index_corrupt = TRUE;
794 /* pa_log("read_index invalidated"); */
797 request_auto_timing_update(s, TRUE);
800 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
801 pa_stream *s = userdata;
804 pa_assert(PA_REFCNT_VALUE(s) >= 1);
807 request_auto_timing_update(s, FALSE);
811 static void create_stream_complete(pa_stream *s) {
813 pa_assert(PA_REFCNT_VALUE(s) >= 1);
814 pa_assert(s->state == PA_STREAM_CREATING);
816 pa_stream_set_state(s, PA_STREAM_READY);
818 if (s->requested_bytes > 0 && s->write_callback)
819 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
821 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
822 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
823 pa_assert(!s->auto_timing_update_event);
824 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
826 request_auto_timing_update(s, TRUE);
829 check_smoother_status(s, TRUE, FALSE, FALSE);
832 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
837 if (s->context->version >= 13)
840 /* Version older than 0.9.10 didn't do server side buffer_attr
841 * selection, hence we have to fake it on the client side. */
843 /* We choose fairly conservative values here, to not confuse
844 * old clients with extremely large playback buffers */
846 if (attr->maxlength == (uint32_t) -1)
847 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
849 if (attr->tlength == (uint32_t) -1)
850 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
852 if (attr->minreq == (uint32_t) -1)
853 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
855 if (attr->prebuf == (uint32_t) -1)
856 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
858 if (attr->fragsize == (uint32_t) -1)
859 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
862 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
863 pa_stream *s = userdata;
864 uint32_t requested_bytes;
868 pa_assert(PA_REFCNT_VALUE(s) >= 1);
869 pa_assert(s->state == PA_STREAM_CREATING);
873 if (command != PA_COMMAND_REPLY) {
874 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
877 pa_stream_set_state(s, PA_STREAM_FAILED);
881 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
882 s->channel == PA_INVALID_INDEX ||
883 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
884 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
885 pa_context_fail(s->context, PA_ERR_PROTOCOL);
889 s->requested_bytes = (int64_t) requested_bytes;
891 if (s->context->version >= 9) {
892 if (s->direction == PA_STREAM_PLAYBACK) {
893 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
894 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
895 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
896 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
897 pa_context_fail(s->context, PA_ERR_PROTOCOL);
900 } else if (s->direction == PA_STREAM_RECORD) {
901 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
902 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
903 pa_context_fail(s->context, PA_ERR_PROTOCOL);
909 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
912 const char *dn = NULL;
915 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
916 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
917 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
918 pa_tagstruct_gets(t, &dn) < 0 ||
919 pa_tagstruct_get_boolean(t, &suspended) < 0) {
920 pa_context_fail(s->context, PA_ERR_PROTOCOL);
924 if (!dn || s->device_index == PA_INVALID_INDEX ||
925 ss.channels != cm.channels ||
926 !pa_channel_map_valid(&cm) ||
927 !pa_sample_spec_valid(&ss) ||
928 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
929 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
930 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
931 pa_context_fail(s->context, PA_ERR_PROTOCOL);
935 pa_xfree(s->device_name);
936 s->device_name = pa_xstrdup(dn);
937 s->suspended = suspended;
943 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
946 if (pa_tagstruct_get_usec(t, &usec) < 0) {
947 pa_context_fail(s->context, PA_ERR_PROTOCOL);
951 if (s->direction == PA_STREAM_RECORD)
952 s->timing_info.configured_source_usec = usec;
954 s->timing_info.configured_sink_usec = usec;
957 if (!pa_tagstruct_eof(t)) {
958 pa_context_fail(s->context, PA_ERR_PROTOCOL);
962 if (s->direction == PA_STREAM_RECORD) {
963 pa_assert(!s->record_memblockq);
965 s->record_memblockq = pa_memblockq_new(
967 s->buffer_attr.maxlength,
969 pa_frame_size(&s->sample_spec),
976 s->channel_valid = TRUE;
977 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
979 create_stream_complete(s);
985 static int create_stream(
986 pa_stream_direction_t direction,
989 const pa_buffer_attr *attr,
990 pa_stream_flags_t flags,
991 const pa_cvolume *volume,
992 pa_stream *sync_stream) {
996 pa_bool_t volume_set = FALSE;
999 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1000 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1002 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1003 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1004 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1005 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1006 PA_STREAM_INTERPOLATE_TIMING|
1007 PA_STREAM_NOT_MONOTONIC|
1008 PA_STREAM_AUTO_TIMING_UPDATE|
1009 PA_STREAM_NO_REMAP_CHANNELS|
1010 PA_STREAM_NO_REMIX_CHANNELS|
1011 PA_STREAM_FIX_FORMAT|
1013 PA_STREAM_FIX_CHANNELS|
1014 PA_STREAM_DONT_MOVE|
1015 PA_STREAM_VARIABLE_RATE|
1016 PA_STREAM_PEAK_DETECT|
1017 PA_STREAM_START_MUTED|
1018 PA_STREAM_ADJUST_LATENCY|
1019 PA_STREAM_EARLY_REQUESTS|
1020 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1021 PA_STREAM_START_UNMUTED|
1022 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1024 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1025 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1026 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1027 /* Althought some of the other flags are not supported on older
1028 * version, we don't check for them here, because it doesn't hurt
1029 * when they are passed but actually not supported. This makes
1030 * client development easier */
1032 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1033 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1034 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1035 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1036 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1040 s->direction = direction;
1042 s->corked = !!(flags & PA_STREAM_START_CORKED);
1045 s->syncid = sync_stream->syncid;
1048 s->buffer_attr = *attr;
1049 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1051 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1054 x = pa_rtclock_now();
1056 pa_assert(!s->smoother);
1057 s->smoother = pa_smoother_new(
1058 SMOOTHER_ADJUST_TIME,
1059 SMOOTHER_HISTORY_TIME,
1060 !(flags & PA_STREAM_NOT_MONOTONIC),
1062 SMOOTHER_MIN_HISTORY,
1068 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1070 t = pa_tagstruct_command(
1072 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1075 if (s->context->version < 13)
1076 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1080 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1081 PA_TAG_CHANNEL_MAP, &s->channel_map,
1082 PA_TAG_U32, PA_INVALID_INDEX,
1084 PA_TAG_U32, s->buffer_attr.maxlength,
1085 PA_TAG_BOOLEAN, s->corked,
1088 if (s->direction == PA_STREAM_PLAYBACK) {
1093 PA_TAG_U32, s->buffer_attr.tlength,
1094 PA_TAG_U32, s->buffer_attr.prebuf,
1095 PA_TAG_U32, s->buffer_attr.minreq,
1096 PA_TAG_U32, s->syncid,
1099 volume_set = !!volume;
1102 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1104 pa_tagstruct_put_cvolume(t, volume);
1106 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1108 if (s->context->version >= 12) {
1111 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1112 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1113 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1114 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1115 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1116 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1117 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1121 if (s->context->version >= 13) {
1123 if (s->direction == PA_STREAM_PLAYBACK)
1124 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1126 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1130 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1131 PA_TAG_PROPLIST, s->proplist,
1134 if (s->direction == PA_STREAM_RECORD)
1135 pa_tagstruct_putu32(t, s->direct_on_input);
1138 if (s->context->version >= 14) {
1140 if (s->direction == PA_STREAM_PLAYBACK)
1141 pa_tagstruct_put_boolean(t, volume_set);
1143 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1146 if (s->context->version >= 15) {
1148 if (s->direction == PA_STREAM_PLAYBACK)
1149 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1151 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1152 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1155 pa_pstream_send_tagstruct(s->context->pstream, t);
1156 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1158 pa_stream_set_state(s, PA_STREAM_CREATING);
1164 int pa_stream_connect_playback(
1167 const pa_buffer_attr *attr,
1168 pa_stream_flags_t flags,
1170 pa_stream *sync_stream) {
1173 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1175 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1178 int pa_stream_connect_record(
1181 const pa_buffer_attr *attr,
1182 pa_stream_flags_t flags) {
1185 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1187 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1190 int pa_stream_write(
1194 void (*free_cb)(void *p),
1196 pa_seek_mode_t seek) {
1199 pa_seek_mode_t t_seek;
1205 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1208 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1209 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1210 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1211 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1212 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1222 while (t_length > 0) {
1226 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1227 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1228 chunk.length = t_length;
1232 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1233 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1235 d = pa_memblock_acquire(chunk.memblock);
1236 memcpy(d, t_data, chunk.length);
1237 pa_memblock_release(chunk.memblock);
1240 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1243 t_seek = PA_SEEK_RELATIVE;
1245 t_data = (const uint8_t*) t_data + chunk.length;
1246 t_length -= chunk.length;
1248 pa_memblock_unref(chunk.memblock);
1251 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1252 free_cb((void*) data);
1254 /* This is obviously wrong since we ignore the seeking index . But
1255 * that's OK, the server side applies the same error */
1256 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1258 if (s->direction == PA_STREAM_PLAYBACK) {
1260 /* Update latency request correction */
1261 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1263 if (seek == PA_SEEK_ABSOLUTE) {
1264 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1265 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1266 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1267 } else if (seek == PA_SEEK_RELATIVE) {
1268 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1269 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1271 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1274 /* Update the write index in the already available latency data */
1275 if (s->timing_info_valid) {
1277 if (seek == PA_SEEK_ABSOLUTE) {
1278 s->timing_info.write_index_corrupt = FALSE;
1279 s->timing_info.write_index = offset + (int64_t) length;
1280 } else if (seek == PA_SEEK_RELATIVE) {
1281 if (!s->timing_info.write_index_corrupt)
1282 s->timing_info.write_index += offset + (int64_t) length;
1284 s->timing_info.write_index_corrupt = TRUE;
1287 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1288 request_auto_timing_update(s, TRUE);
1294 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1296 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1300 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1301 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1302 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1304 if (!s->peek_memchunk.memblock) {
1306 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1312 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1315 pa_assert(s->peek_data);
1316 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1317 *length = s->peek_memchunk.length;
1321 int pa_stream_drop(pa_stream *s) {
1323 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1325 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1326 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1327 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1328 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1330 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1332 /* Fix the simulated local read index */
1333 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1334 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1336 pa_assert(s->peek_data);
1337 pa_memblock_release(s->peek_memchunk.memblock);
1338 pa_memblock_unref(s->peek_memchunk.memblock);
1339 pa_memchunk_reset(&s->peek_memchunk);
1344 size_t pa_stream_writable_size(pa_stream *s) {
1346 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1348 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1352 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1355 size_t pa_stream_readable_size(pa_stream *s) {
1357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1359 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1360 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1361 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1363 return pa_memblockq_get_length(s->record_memblockq);
1366 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1372 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1374 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1375 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1376 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1378 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1380 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1381 pa_tagstruct_putu32(t, s->channel);
1382 pa_pstream_send_tagstruct(s->context->pstream, t);
1383 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1388 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1392 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1393 pa_assert(s->state == PA_STREAM_READY);
1394 pa_assert(s->direction != PA_STREAM_UPLOAD);
1395 pa_assert(s->timing_info_valid);
1396 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1397 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1399 if (s->direction == PA_STREAM_PLAYBACK) {
1400 /* The last byte that was written into the output device
1401 * had this time value associated */
1402 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1404 if (!s->corked && !s->suspended) {
1406 if (!ignore_transport)
1407 /* Because the latency info took a little time to come
1408 * to us, we assume that the real output time is actually
1410 usec += s->timing_info.transport_usec;
1412 /* However, the output device usually maintains a buffer
1413 too, hence the real sample currently played is a little
1415 if (s->timing_info.sink_usec >= usec)
1418 usec -= s->timing_info.sink_usec;
1422 pa_assert(s->direction == PA_STREAM_RECORD);
1424 /* The last byte written into the server side queue had
1425 * this time value associated */
1426 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1428 if (!s->corked && !s->suspended) {
1430 if (!ignore_transport)
1431 /* Add transport latency */
1432 usec += s->timing_info.transport_usec;
1434 /* Add latency of data in device buffer */
1435 usec += s->timing_info.source_usec;
1437 /* If this is a monitor source, we need to correct the
1438 * time by the playback device buffer */
1439 if (s->timing_info.sink_usec >= usec)
1442 usec -= s->timing_info.sink_usec;
1449 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1450 pa_operation *o = userdata;
1451 struct timeval local, remote, now;
1453 pa_bool_t playing = FALSE;
1454 uint64_t underrun_for = 0, playing_for = 0;
1458 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1460 if (!o->context || !o->stream)
1463 i = &o->stream->timing_info;
1465 o->stream->timing_info_valid = FALSE;
1466 i->write_index_corrupt = TRUE;
1467 i->read_index_corrupt = TRUE;
1469 if (command != PA_COMMAND_REPLY) {
1470 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1475 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1476 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1477 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1478 pa_tagstruct_get_timeval(t, &local) < 0 ||
1479 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1480 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1481 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1483 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1487 if (o->context->version >= 13 &&
1488 o->stream->direction == PA_STREAM_PLAYBACK)
1489 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1490 pa_tagstruct_getu64(t, &playing_for) < 0) {
1492 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1497 if (!pa_tagstruct_eof(t)) {
1498 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1501 o->stream->timing_info_valid = TRUE;
1502 i->write_index_corrupt = FALSE;
1503 i->read_index_corrupt = FALSE;
1505 i->playing = (int) playing;
1506 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1508 pa_gettimeofday(&now);
1510 /* Calculcate timestamps */
1511 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1512 /* local and remote seem to have synchronized clocks */
1514 if (o->stream->direction == PA_STREAM_PLAYBACK)
1515 i->transport_usec = pa_timeval_diff(&remote, &local);
1517 i->transport_usec = pa_timeval_diff(&now, &remote);
1519 i->synchronized_clocks = TRUE;
1520 i->timestamp = remote;
1522 /* clocks are not synchronized, let's estimate latency then */
1523 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1524 i->synchronized_clocks = FALSE;
1525 i->timestamp = local;
1526 pa_timeval_add(&i->timestamp, i->transport_usec);
1529 /* Invalidate read and write indexes if necessary */
1530 if (tag < o->stream->read_index_not_before)
1531 i->read_index_corrupt = TRUE;
1533 if (tag < o->stream->write_index_not_before)
1534 i->write_index_corrupt = TRUE;
1536 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1537 /* Write index correction */
1540 uint32_t ctag = tag;
1542 /* Go through the saved correction values and add up the
1543 * total correction.*/
1544 for (n = 0, j = o->stream->current_write_index_correction+1;
1545 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1546 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1548 /* Step over invalid data or out-of-date data */
1549 if (!o->stream->write_index_corrections[j].valid ||
1550 o->stream->write_index_corrections[j].tag < ctag)
1553 /* Make sure that everything is in order */
1554 ctag = o->stream->write_index_corrections[j].tag+1;
1556 /* Now fix the write index */
1557 if (o->stream->write_index_corrections[j].corrupt) {
1558 /* A corrupting seek was made */
1559 i->write_index_corrupt = TRUE;
1560 } else if (o->stream->write_index_corrections[j].absolute) {
1561 /* An absolute seek was made */
1562 i->write_index = o->stream->write_index_corrections[j].value;
1563 i->write_index_corrupt = FALSE;
1564 } else if (!i->write_index_corrupt) {
1565 /* A relative seek was made */
1566 i->write_index += o->stream->write_index_corrections[j].value;
1570 /* Clear old correction entries */
1571 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1572 if (!o->stream->write_index_corrections[n].valid)
1575 if (o->stream->write_index_corrections[n].tag <= tag)
1576 o->stream->write_index_corrections[n].valid = FALSE;
1580 if (o->stream->direction == PA_STREAM_RECORD) {
1581 /* Read index correction */
1583 if (!i->read_index_corrupt)
1584 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1587 /* Update smoother */
1588 if (o->stream->smoother) {
1591 u = x = pa_rtclock_now() - i->transport_usec;
1593 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1596 /* If we weren't playing then it will take some time
1597 * until the audio will actually come out through the
1598 * speakers. Since we follow that timing here, we need
1599 * to try to fix this up */
1601 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1603 if (su < i->sink_usec)
1604 x += i->sink_usec - su;
1608 pa_smoother_pause(o->stream->smoother, x);
1610 /* Update the smoother */
1611 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1612 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1613 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1616 pa_smoother_resume(o->stream->smoother, x, TRUE);
1620 o->stream->auto_timing_update_requested = FALSE;
1622 if (o->stream->latency_update_callback)
1623 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1625 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1626 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1627 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1632 pa_operation_done(o);
1633 pa_operation_unref(o);
1636 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1644 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1646 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1647 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1648 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1650 if (s->direction == PA_STREAM_PLAYBACK) {
1651 /* Find a place to store the write_index correction data for this entry */
1652 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1654 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1655 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1657 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1659 t = pa_tagstruct_command(
1661 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1663 pa_tagstruct_putu32(t, s->channel);
1664 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1666 pa_pstream_send_tagstruct(s->context->pstream, t);
1667 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1669 if (s->direction == PA_STREAM_PLAYBACK) {
1670 /* Fill in initial correction data */
1672 s->current_write_index_correction = cidx;
1674 s->write_index_corrections[cidx].valid = TRUE;
1675 s->write_index_corrections[cidx].absolute = FALSE;
1676 s->write_index_corrections[cidx].corrupt = FALSE;
1677 s->write_index_corrections[cidx].tag = tag;
1678 s->write_index_corrections[cidx].value = 0;
1684 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1685 pa_stream *s = userdata;
1689 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1693 if (command != PA_COMMAND_REPLY) {
1694 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1697 pa_stream_set_state(s, PA_STREAM_FAILED);
1699 } else if (!pa_tagstruct_eof(t)) {
1700 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1704 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1710 int pa_stream_disconnect(pa_stream *s) {
1715 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1717 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1718 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1719 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1723 t = pa_tagstruct_command(
1725 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1726 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1728 pa_tagstruct_putu32(t, s->channel);
1729 pa_pstream_send_tagstruct(s->context->pstream, t);
1730 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1736 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1738 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1740 if (pa_detect_fork())
1743 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1746 s->read_callback = cb;
1747 s->read_userdata = userdata;
1750 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1752 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1754 if (pa_detect_fork())
1757 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1760 s->write_callback = cb;
1761 s->write_userdata = userdata;
1764 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1766 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1768 if (pa_detect_fork())
1771 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1774 s->state_callback = cb;
1775 s->state_userdata = userdata;
1778 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1780 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1782 if (pa_detect_fork())
1785 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1788 s->overflow_callback = cb;
1789 s->overflow_userdata = userdata;
1792 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1794 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1796 if (pa_detect_fork())
1799 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1802 s->underflow_callback = cb;
1803 s->underflow_userdata = userdata;
1806 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1808 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1810 if (pa_detect_fork())
1813 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1816 s->latency_update_callback = cb;
1817 s->latency_update_userdata = userdata;
1820 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1822 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1824 if (pa_detect_fork())
1827 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1830 s->moved_callback = cb;
1831 s->moved_userdata = userdata;
1834 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1836 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1838 if (pa_detect_fork())
1841 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1844 s->suspended_callback = cb;
1845 s->suspended_userdata = userdata;
1848 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1850 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1852 if (pa_detect_fork())
1855 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1858 s->started_callback = cb;
1859 s->started_userdata = userdata;
1862 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1864 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1866 if (pa_detect_fork())
1869 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1872 s->event_callback = cb;
1873 s->event_userdata = userdata;
1876 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1878 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880 if (pa_detect_fork())
1883 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1886 s->buffer_attr_callback = cb;
1887 s->buffer_attr_userdata = userdata;
1890 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1891 pa_operation *o = userdata;
1896 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1901 if (command != PA_COMMAND_REPLY) {
1902 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1906 } else if (!pa_tagstruct_eof(t)) {
1907 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1912 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1913 cb(o->stream, success, o->userdata);
1917 pa_operation_done(o);
1918 pa_operation_unref(o);
1921 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1927 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1929 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1930 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1931 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1935 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1937 t = pa_tagstruct_command(
1939 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1941 pa_tagstruct_putu32(t, s->channel);
1942 pa_tagstruct_put_boolean(t, !!b);
1943 pa_pstream_send_tagstruct(s->context->pstream, t);
1944 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1946 check_smoother_status(s, FALSE, FALSE, FALSE);
1948 /* This might cause the indexes to hang/start again, hence
1949 * let's request a timing update */
1950 request_auto_timing_update(s, TRUE);
1955 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1961 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1963 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1964 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1966 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1968 t = pa_tagstruct_command(s->context, command, &tag);
1969 pa_tagstruct_putu32(t, s->channel);
1970 pa_pstream_send_tagstruct(s->context->pstream, t);
1971 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1976 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1980 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1982 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1983 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1984 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1986 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1989 if (s->direction == PA_STREAM_PLAYBACK) {
1991 if (s->write_index_corrections[s->current_write_index_correction].valid)
1992 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1994 if (s->buffer_attr.prebuf > 0)
1995 check_smoother_status(s, FALSE, FALSE, TRUE);
1997 /* This will change the write index, but leave the
1998 * read index untouched. */
1999 invalidate_indexes(s, FALSE, TRUE);
2002 /* For record streams this has no influence on the write
2003 * index, but the read index might jump. */
2004 invalidate_indexes(s, TRUE, FALSE);
2009 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2016 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2017 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2018 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2020 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2023 /* This might cause the read index to hang again, hence
2024 * let's request a timing update */
2025 request_auto_timing_update(s, TRUE);
2030 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2034 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2036 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2037 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2038 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2039 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2041 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2044 /* This might cause the read index to start moving again, hence
2045 * let's request a timing update */
2046 request_auto_timing_update(s, TRUE);
2051 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2055 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2058 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2059 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2060 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2062 if (s->context->version >= 13) {
2063 pa_proplist *p = pa_proplist_new();
2065 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2066 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2067 pa_proplist_free(p);
2072 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2073 t = pa_tagstruct_command(
2075 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2077 pa_tagstruct_putu32(t, s->channel);
2078 pa_tagstruct_puts(t, name);
2079 pa_pstream_send_tagstruct(s->context->pstream, t);
2080 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2086 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2090 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2092 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2093 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2094 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2095 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2096 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2097 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2100 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2102 usec = calc_time(s, FALSE);
2104 /* Make sure the time runs monotonically */
2105 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2106 if (usec < s->previous_time)
2107 usec = s->previous_time;
2109 s->previous_time = usec;
2118 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2120 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2128 if (negative && s->direction == PA_STREAM_RECORD) {
2136 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2142 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2145 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2146 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2147 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2148 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2149 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2150 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2152 if ((r = pa_stream_get_time(s, &t)) < 0)
2155 if (s->direction == PA_STREAM_PLAYBACK)
2156 cindex = s->timing_info.write_index;
2158 cindex = s->timing_info.read_index;
2163 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2165 if (s->direction == PA_STREAM_PLAYBACK)
2166 *r_usec = time_counter_diff(s, c, t, negative);
2168 *r_usec = time_counter_diff(s, t, c, negative);
2173 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2175 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2177 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2178 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2179 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2180 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2182 return &s->timing_info;
2185 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2187 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2189 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2191 return &s->sample_spec;
2194 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2196 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200 return &s->channel_map;
2203 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2205 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2207 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2208 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2209 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2211 return &s->buffer_attr;
2214 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2215 pa_operation *o = userdata;
2220 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2225 if (command != PA_COMMAND_REPLY) {
2226 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2231 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2232 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2233 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2234 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2235 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2236 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2239 } else if (o->stream->direction == PA_STREAM_RECORD) {
2240 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2241 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2242 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2247 if (o->stream->context->version >= 13) {
2250 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2251 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2255 if (o->stream->direction == PA_STREAM_RECORD)
2256 o->stream->timing_info.configured_source_usec = usec;
2258 o->stream->timing_info.configured_sink_usec = usec;
2261 if (!pa_tagstruct_eof(t)) {
2262 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2268 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2269 cb(o->stream, success, o->userdata);
2273 pa_operation_done(o);
2274 pa_operation_unref(o);
2278 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2287 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2288 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2292 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2294 t = pa_tagstruct_command(
2296 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2298 pa_tagstruct_putu32(t, s->channel);
2300 pa_tagstruct_putu32(t, attr->maxlength);
2302 if (s->direction == PA_STREAM_PLAYBACK)
2305 PA_TAG_U32, attr->tlength,
2306 PA_TAG_U32, attr->prebuf,
2307 PA_TAG_U32, attr->minreq,
2310 pa_tagstruct_putu32(t, attr->fragsize);
2312 if (s->context->version >= 13)
2313 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2315 if (s->context->version >= 14)
2316 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2318 pa_pstream_send_tagstruct(s->context->pstream, t);
2319 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2321 /* This might cause changes in the read/write indexex, hence let's
2322 * request a timing update */
2323 request_auto_timing_update(s, TRUE);
2328 uint32_t pa_stream_get_device_index(pa_stream *s) {
2330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2332 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2333 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2334 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2335 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2336 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2338 return s->device_index;
2341 const char *pa_stream_get_device_name(pa_stream *s) {
2343 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2345 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2346 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2347 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2348 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2349 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2351 return s->device_name;
2354 int pa_stream_is_suspended(pa_stream *s) {
2356 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2358 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2361 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2363 return s->suspended;
2366 int pa_stream_is_corked(pa_stream *s) {
2368 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2370 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2371 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2372 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2377 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2378 pa_operation *o = userdata;
2383 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2388 if (command != PA_COMMAND_REPLY) {
2389 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2395 if (!pa_tagstruct_eof(t)) {
2396 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2401 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2402 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2405 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2406 cb(o->stream, success, o->userdata);
2410 pa_operation_done(o);
2411 pa_operation_unref(o);
2415 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2421 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2423 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2425 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2426 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2427 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2428 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2430 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2431 o->private = PA_UINT_TO_PTR(rate);
2433 t = pa_tagstruct_command(
2435 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2437 pa_tagstruct_putu32(t, s->channel);
2438 pa_tagstruct_putu32(t, rate);
2440 pa_pstream_send_tagstruct(s->context->pstream, t);
2441 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2446 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2452 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2454 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2455 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2456 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2457 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2458 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2460 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2462 t = pa_tagstruct_command(
2464 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2466 pa_tagstruct_putu32(t, s->channel);
2467 pa_tagstruct_putu32(t, (uint32_t) mode);
2468 pa_tagstruct_put_proplist(t, p);
2470 pa_pstream_send_tagstruct(s->context->pstream, t);
2471 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2473 /* Please note that we don't update s->proplist here, because we
2474 * don't export that field */
2479 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2483 const char * const*k;
2486 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2488 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2490 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2491 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2492 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2494 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2496 t = pa_tagstruct_command(
2498 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2500 pa_tagstruct_putu32(t, s->channel);
2502 for (k = keys; *k; k++)
2503 pa_tagstruct_puts(t, *k);
2505 pa_tagstruct_puts(t, NULL);
2507 pa_pstream_send_tagstruct(s->context->pstream, t);
2508 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2510 /* Please note that we don't update s->proplist here, because we
2511 * don't export that field */
2516 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2518 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2520 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2521 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2522 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2523 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2525 s->direct_on_input = sink_input_idx;
2530 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2532 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2534 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2535 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2536 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2538 return s->direct_on_input;