2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
41 #include "fork-detect.h"
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 static void reset_callbacks(pa_stream *s) {
55 s->read_callback = NULL;
56 s->read_userdata = NULL;
57 s->write_callback = NULL;
58 s->write_userdata = NULL;
59 s->state_callback = NULL;
60 s->state_userdata = NULL;
61 s->overflow_callback = NULL;
62 s->overflow_userdata = NULL;
63 s->underflow_callback = NULL;
64 s->underflow_userdata = NULL;
65 s->latency_update_callback = NULL;
66 s->latency_update_userdata = NULL;
67 s->moved_callback = NULL;
68 s->moved_userdata = NULL;
69 s->suspended_callback = NULL;
70 s->suspended_userdata = NULL;
71 s->started_callback = NULL;
72 s->started_userdata = NULL;
73 s->event_callback = NULL;
74 s->event_userdata = NULL;
75 s->buffer_attr_callback = NULL;
76 s->buffer_attr_userdata = NULL;
79 pa_stream *pa_stream_new_with_proplist(
82 const pa_sample_spec *ss,
83 const pa_channel_map *map,
91 pa_assert(PA_REFCNT_VALUE(c) >= 1);
93 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
104 s = pa_xnew(pa_stream, 1);
107 s->mainloop = c->mainloop;
109 s->direction = PA_STREAM_NODIRECTION;
110 s->state = PA_STREAM_UNCONNECTED;
113 s->sample_spec = *ss;
114 s->channel_map = *map;
116 s->direct_on_input = PA_INVALID_INDEX;
118 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
120 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123 s->channel_valid = FALSE;
124 s->syncid = c->csyncid++;
125 s->stream_index = PA_INVALID_INDEX;
127 s->requested_bytes = 0;
128 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
130 /* We initialize der target length here, so that if the user
131 * passes no explicit buffering metrics the default is similar to
132 * what older PA versions provided. */
134 s->buffer_attr.maxlength = (uint32_t) -1;
135 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
136 s->buffer_attr.minreq = (uint32_t) -1;
137 s->buffer_attr.prebuf = (uint32_t) -1;
138 s->buffer_attr.fragsize = (uint32_t) -1;
140 s->device_index = PA_INVALID_INDEX;
141 s->device_name = NULL;
142 s->suspended = FALSE;
145 pa_memchunk_reset(&s->peek_memchunk);
148 s->record_memblockq = NULL;
151 memset(&s->timing_info, 0, sizeof(s->timing_info));
152 s->timing_info_valid = FALSE;
154 s->previous_time = 0;
156 s->read_index_not_before = 0;
157 s->write_index_not_before = 0;
158 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
159 s->write_index_corrections[i].valid = 0;
160 s->current_write_index_correction = 0;
162 s->auto_timing_update_event = NULL;
163 s->auto_timing_update_requested = FALSE;
169 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
170 PA_LLIST_PREPEND(pa_stream, c->streams, s);
176 static void stream_unlink(pa_stream *s) {
183 /* Detach from context */
185 /* Unref all operatio object that point to us */
186 for (o = s->context->operations; o; o = n) {
190 pa_operation_cancel(o);
193 /* Drop all outstanding replies for this stream */
194 if (s->context->pdispatch)
195 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
197 if (s->channel_valid) {
198 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
200 s->channel_valid = FALSE;
203 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208 if (s->auto_timing_update_event) {
209 pa_assert(s->mainloop);
210 s->mainloop->time_free(s->auto_timing_update_event);
216 static void stream_free(pa_stream *s) {
221 if (s->peek_memchunk.memblock) {
223 pa_memblock_release(s->peek_memchunk.memblock);
224 pa_memblock_unref(s->peek_memchunk.memblock);
227 if (s->record_memblockq)
228 pa_memblockq_free(s->record_memblockq);
231 pa_proplist_free(s->proplist);
234 pa_smoother_free(s->smoother);
236 pa_xfree(s->device_name);
240 void pa_stream_unref(pa_stream *s) {
242 pa_assert(PA_REFCNT_VALUE(s) >= 1);
244 if (PA_REFCNT_DEC(s) <= 0)
248 pa_stream* pa_stream_ref(pa_stream *s) {
250 pa_assert(PA_REFCNT_VALUE(s) >= 1);
256 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
258 pa_assert(PA_REFCNT_VALUE(s) >= 1);
263 pa_context* pa_stream_get_context(pa_stream *s) {
265 pa_assert(PA_REFCNT_VALUE(s) >= 1);
270 uint32_t pa_stream_get_index(pa_stream *s) {
272 pa_assert(PA_REFCNT_VALUE(s) >= 1);
274 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
275 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
277 return s->stream_index;
280 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
282 pa_assert(PA_REFCNT_VALUE(s) >= 1);
291 if (s->state_callback)
292 s->state_callback(s, s->state_userdata);
294 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
300 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
302 pa_assert(PA_REFCNT_VALUE(s) >= 1);
304 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
307 if (s->state == PA_STREAM_READY &&
308 (force || !s->auto_timing_update_requested)) {
311 /* pa_log("automatically requesting new timing data"); */
313 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
314 pa_operation_unref(o);
315 s->auto_timing_update_requested = TRUE;
319 if (s->auto_timing_update_event) {
321 pa_gettimeofday(&next);
322 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
323 s->mainloop->time_restart(s->auto_timing_update_event, &next);
327 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
328 pa_context *c = userdata;
333 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
336 pa_assert(PA_REFCNT_VALUE(c) >= 1);
340 if (pa_tagstruct_getu32(t, &channel) < 0 ||
341 !pa_tagstruct_eof(t)) {
342 pa_context_fail(c, PA_ERR_PROTOCOL);
346 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
349 if (s->state != PA_STREAM_READY)
352 pa_context_set_error(c, PA_ERR_KILLED);
353 pa_stream_set_state(s, PA_STREAM_FAILED);
359 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
363 pa_assert(!force_start || !force_stop);
368 x = pa_rtclock_usec();
370 if (s->timing_info_valid) {
372 x -= s->timing_info.transport_usec;
374 x += s->timing_info.transport_usec;
376 if (s->direction == PA_STREAM_PLAYBACK)
377 /* it takes a while until the pause/resume is actually
379 x += s->timing_info.sink_usec;
381 /* Data froma while back will be dropped */
382 x -= s->timing_info.source_usec;
385 if (s->suspended || s->corked || force_stop)
386 pa_smoother_pause(s->smoother, x);
387 else if (force_start || s->buffer_attr.prebuf == 0)
388 pa_smoother_resume(s->smoother, x);
390 /* Please note that we have no idea if playback actually started
391 * if prebuf is non-zero! */
394 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
395 pa_context *c = userdata;
402 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
405 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
408 pa_assert(PA_REFCNT_VALUE(c) >= 1);
412 if (c->version < 12) {
413 pa_context_fail(c, PA_ERR_PROTOCOL);
417 if (pa_tagstruct_getu32(t, &channel) < 0 ||
418 pa_tagstruct_getu32(t, &di) < 0 ||
419 pa_tagstruct_gets(t, &dn) < 0 ||
420 pa_tagstruct_get_boolean(t, &suspended) < 0) {
421 pa_context_fail(c, PA_ERR_PROTOCOL);
425 if (c->version >= 13) {
427 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
428 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
429 pa_tagstruct_getu32(t, &fragsize) < 0 ||
430 pa_tagstruct_get_usec(t, &usec) < 0) {
431 pa_context_fail(c, PA_ERR_PROTOCOL);
435 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
436 pa_tagstruct_getu32(t, &tlength) < 0 ||
437 pa_tagstruct_getu32(t, &prebuf) < 0 ||
438 pa_tagstruct_getu32(t, &minreq) < 0 ||
439 pa_tagstruct_get_usec(t, &usec) < 0) {
440 pa_context_fail(c, PA_ERR_PROTOCOL);
446 if (!pa_tagstruct_eof(t)) {
447 pa_context_fail(c, PA_ERR_PROTOCOL);
451 if (!dn || di == PA_INVALID_INDEX) {
452 pa_context_fail(c, PA_ERR_PROTOCOL);
456 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
459 if (s->state != PA_STREAM_READY)
462 if (c->version >= 13) {
463 if (s->direction == PA_STREAM_RECORD)
464 s->timing_info.configured_source_usec = usec;
466 s->timing_info.configured_sink_usec = usec;
468 s->buffer_attr.maxlength = maxlength;
469 s->buffer_attr.fragsize = fragsize;
470 s->buffer_attr.tlength = tlength;
471 s->buffer_attr.prebuf = prebuf;
472 s->buffer_attr.minreq = minreq;
475 pa_xfree(s->device_name);
476 s->device_name = pa_xstrdup(dn);
477 s->device_index = di;
479 s->suspended = suspended;
481 check_smoother_status(s, TRUE, FALSE, FALSE);
482 request_auto_timing_update(s, TRUE);
484 if (s->moved_callback)
485 s->moved_callback(s, s->moved_userdata);
491 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
492 pa_context *c = userdata;
496 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
499 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
502 pa_assert(PA_REFCNT_VALUE(c) >= 1);
506 if (c->version < 15) {
507 pa_context_fail(c, PA_ERR_PROTOCOL);
511 if (pa_tagstruct_getu32(t, &channel) < 0) {
512 pa_context_fail(c, PA_ERR_PROTOCOL);
516 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
517 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
518 pa_tagstruct_getu32(t, &fragsize) < 0 ||
519 pa_tagstruct_get_usec(t, &usec) < 0) {
520 pa_context_fail(c, PA_ERR_PROTOCOL);
524 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
525 pa_tagstruct_getu32(t, &tlength) < 0 ||
526 pa_tagstruct_getu32(t, &prebuf) < 0 ||
527 pa_tagstruct_getu32(t, &minreq) < 0 ||
528 pa_tagstruct_get_usec(t, &usec) < 0) {
529 pa_context_fail(c, PA_ERR_PROTOCOL);
534 if (!pa_tagstruct_eof(t)) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
539 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
542 if (s->state != PA_STREAM_READY)
545 if (s->direction == PA_STREAM_RECORD)
546 s->timing_info.configured_source_usec = usec;
548 s->timing_info.configured_sink_usec = usec;
550 s->buffer_attr.maxlength = maxlength;
551 s->buffer_attr.fragsize = fragsize;
552 s->buffer_attr.tlength = tlength;
553 s->buffer_attr.prebuf = prebuf;
554 s->buffer_attr.minreq = minreq;
556 request_auto_timing_update(s, TRUE);
558 if (s->buffer_attr_callback)
559 s->buffer_attr_callback(s, s->buffer_attr_userdata);
565 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
566 pa_context *c = userdata;
572 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
575 pa_assert(PA_REFCNT_VALUE(c) >= 1);
579 if (c->version < 12) {
580 pa_context_fail(c, PA_ERR_PROTOCOL);
584 if (pa_tagstruct_getu32(t, &channel) < 0 ||
585 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
586 !pa_tagstruct_eof(t)) {
587 pa_context_fail(c, PA_ERR_PROTOCOL);
591 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
594 if (s->state != PA_STREAM_READY)
597 s->suspended = suspended;
599 check_smoother_status(s, TRUE, FALSE, FALSE);
600 request_auto_timing_update(s, TRUE);
602 if (s->suspended_callback)
603 s->suspended_callback(s, s->suspended_userdata);
609 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
610 pa_context *c = userdata;
615 pa_assert(command == PA_COMMAND_STARTED);
618 pa_assert(PA_REFCNT_VALUE(c) >= 1);
622 if (c->version < 13) {
623 pa_context_fail(c, PA_ERR_PROTOCOL);
627 if (pa_tagstruct_getu32(t, &channel) < 0 ||
628 !pa_tagstruct_eof(t)) {
629 pa_context_fail(c, PA_ERR_PROTOCOL);
633 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
636 if (s->state != PA_STREAM_READY)
639 check_smoother_status(s, TRUE, TRUE, FALSE);
640 request_auto_timing_update(s, TRUE);
642 if (s->started_callback)
643 s->started_callback(s, s->started_userdata);
649 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650 pa_context *c = userdata;
653 pa_proplist *pl = NULL;
657 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
660 pa_assert(PA_REFCNT_VALUE(c) >= 1);
664 if (c->version < 15) {
665 pa_context_fail(c, PA_ERR_PROTOCOL);
669 pl = pa_proplist_new();
671 if (pa_tagstruct_getu32(t, &channel) < 0 ||
672 pa_tagstruct_gets(t, &event) < 0 ||
673 pa_tagstruct_get_proplist(t, pl) < 0 ||
674 !pa_tagstruct_eof(t) || !event) {
675 pa_context_fail(c, PA_ERR_PROTOCOL);
679 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
682 if (s->state != PA_STREAM_READY)
685 if (s->event_callback)
686 s->event_callback(s, event, pl, s->event_userdata);
692 pa_proplist_free(pl);
695 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
697 pa_context *c = userdata;
698 uint32_t bytes, channel;
701 pa_assert(command == PA_COMMAND_REQUEST);
704 pa_assert(PA_REFCNT_VALUE(c) >= 1);
708 if (pa_tagstruct_getu32(t, &channel) < 0 ||
709 pa_tagstruct_getu32(t, &bytes) < 0 ||
710 !pa_tagstruct_eof(t)) {
711 pa_context_fail(c, PA_ERR_PROTOCOL);
715 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
718 if (s->state != PA_STREAM_READY)
721 s->requested_bytes += bytes;
723 if (s->requested_bytes > 0 && s->write_callback)
724 s->write_callback(s, s->requested_bytes, s->write_userdata);
730 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
732 pa_context *c = userdata;
736 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
739 pa_assert(PA_REFCNT_VALUE(c) >= 1);
743 if (pa_tagstruct_getu32(t, &channel) < 0 ||
744 !pa_tagstruct_eof(t)) {
745 pa_context_fail(c, PA_ERR_PROTOCOL);
749 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
752 if (s->state != PA_STREAM_READY)
755 if (s->buffer_attr.prebuf > 0)
756 check_smoother_status(s, TRUE, FALSE, TRUE);
758 request_auto_timing_update(s, TRUE);
760 if (command == PA_COMMAND_OVERFLOW) {
761 if (s->overflow_callback)
762 s->overflow_callback(s, s->overflow_userdata);
763 } else if (command == PA_COMMAND_UNDERFLOW) {
764 if (s->underflow_callback)
765 s->underflow_callback(s, s->underflow_userdata);
772 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
774 pa_assert(PA_REFCNT_VALUE(s) >= 1);
776 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
778 if (s->state != PA_STREAM_READY)
782 s->write_index_not_before = s->context->ctag;
784 if (s->timing_info_valid)
785 s->timing_info.write_index_corrupt = TRUE;
787 /* pa_log("write_index invalidated"); */
791 s->read_index_not_before = s->context->ctag;
793 if (s->timing_info_valid)
794 s->timing_info.read_index_corrupt = TRUE;
796 /* pa_log("read_index invalidated"); */
799 request_auto_timing_update(s, TRUE);
802 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
803 pa_stream *s = userdata;
806 pa_assert(PA_REFCNT_VALUE(s) >= 1);
809 request_auto_timing_update(s, FALSE);
813 static void create_stream_complete(pa_stream *s) {
815 pa_assert(PA_REFCNT_VALUE(s) >= 1);
816 pa_assert(s->state == PA_STREAM_CREATING);
818 pa_stream_set_state(s, PA_STREAM_READY);
820 if (s->requested_bytes > 0 && s->write_callback)
821 s->write_callback(s, s->requested_bytes, s->write_userdata);
823 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
825 pa_gettimeofday(&tv);
826 tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
827 pa_assert(!s->auto_timing_update_event);
828 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
830 request_auto_timing_update(s, TRUE);
833 check_smoother_status(s, TRUE, FALSE, FALSE);
836 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
841 if (s->context->version >= 13)
844 /* Version older than 0.9.10 didn't do server side buffer_attr
845 * selection, hence we have to fake it on the client side. */
847 /* We choose fairly conservative values here, to not confuse
848 * old clients with extremely large playback buffers */
850 if (attr->maxlength == (uint32_t) -1)
851 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
853 if (attr->tlength == (uint32_t) -1)
854 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
856 if (attr->minreq == (uint32_t) -1)
857 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
859 if (attr->prebuf == (uint32_t) -1)
860 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
862 if (attr->fragsize == (uint32_t) -1)
863 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
866 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
867 pa_stream *s = userdata;
871 pa_assert(PA_REFCNT_VALUE(s) >= 1);
872 pa_assert(s->state == PA_STREAM_CREATING);
876 if (command != PA_COMMAND_REPLY) {
877 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
880 pa_stream_set_state(s, PA_STREAM_FAILED);
884 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
885 s->channel == PA_INVALID_INDEX ||
886 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
887 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
888 pa_context_fail(s->context, PA_ERR_PROTOCOL);
892 if (s->context->version >= 9) {
893 if (s->direction == PA_STREAM_PLAYBACK) {
894 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
895 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
896 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
897 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
898 pa_context_fail(s->context, PA_ERR_PROTOCOL);
901 } else if (s->direction == PA_STREAM_RECORD) {
902 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
903 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
904 pa_context_fail(s->context, PA_ERR_PROTOCOL);
910 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
913 const char *dn = NULL;
916 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
917 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
918 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
919 pa_tagstruct_gets(t, &dn) < 0 ||
920 pa_tagstruct_get_boolean(t, &suspended) < 0) {
921 pa_context_fail(s->context, PA_ERR_PROTOCOL);
925 if (!dn || s->device_index == PA_INVALID_INDEX ||
926 ss.channels != cm.channels ||
927 !pa_channel_map_valid(&cm) ||
928 !pa_sample_spec_valid(&ss) ||
929 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
930 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
931 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
932 pa_context_fail(s->context, PA_ERR_PROTOCOL);
936 pa_xfree(s->device_name);
937 s->device_name = pa_xstrdup(dn);
938 s->suspended = suspended;
944 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
947 if (pa_tagstruct_get_usec(t, &usec) < 0) {
948 pa_context_fail(s->context, PA_ERR_PROTOCOL);
952 if (s->direction == PA_STREAM_RECORD)
953 s->timing_info.configured_source_usec = usec;
955 s->timing_info.configured_sink_usec = usec;
958 if (!pa_tagstruct_eof(t)) {
959 pa_context_fail(s->context, PA_ERR_PROTOCOL);
963 if (s->direction == PA_STREAM_RECORD) {
964 pa_assert(!s->record_memblockq);
966 s->record_memblockq = pa_memblockq_new(
968 s->buffer_attr.maxlength,
970 pa_frame_size(&s->sample_spec),
977 s->channel_valid = TRUE;
978 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
980 create_stream_complete(s);
986 static int create_stream(
987 pa_stream_direction_t direction,
990 const pa_buffer_attr *attr,
991 pa_stream_flags_t flags,
992 const pa_cvolume *volume,
993 pa_stream *sync_stream) {
997 pa_bool_t volume_set = FALSE;
1000 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1001 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1003 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1004 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1005 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1006 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1007 PA_STREAM_INTERPOLATE_TIMING|
1008 PA_STREAM_NOT_MONOTONIC|
1009 PA_STREAM_AUTO_TIMING_UPDATE|
1010 PA_STREAM_NO_REMAP_CHANNELS|
1011 PA_STREAM_NO_REMIX_CHANNELS|
1012 PA_STREAM_FIX_FORMAT|
1014 PA_STREAM_FIX_CHANNELS|
1015 PA_STREAM_DONT_MOVE|
1016 PA_STREAM_VARIABLE_RATE|
1017 PA_STREAM_PEAK_DETECT|
1018 PA_STREAM_START_MUTED|
1019 PA_STREAM_ADJUST_LATENCY|
1020 PA_STREAM_EARLY_REQUESTS|
1021 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1022 PA_STREAM_START_UNMUTED|
1023 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1025 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1026 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1027 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1028 /* Althought some of the other flags are not supported on older
1029 * version, we don't check for them here, because it doesn't hurt
1030 * when they are passed but actually not supported. This makes
1031 * client development easier */
1033 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1034 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1035 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1036 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1037 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1041 s->direction = direction;
1043 s->corked = !!(flags & PA_STREAM_START_CORKED);
1046 s->syncid = sync_stream->syncid;
1049 s->buffer_attr = *attr;
1050 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1052 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1056 pa_smoother_free(s->smoother);
1058 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
1060 x = pa_rtclock_usec();
1061 pa_smoother_set_time_offset(s->smoother, x);
1062 pa_smoother_pause(s->smoother, x);
1066 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1068 t = pa_tagstruct_command(
1070 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1073 if (s->context->version < 13)
1074 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1078 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1079 PA_TAG_CHANNEL_MAP, &s->channel_map,
1080 PA_TAG_U32, PA_INVALID_INDEX,
1082 PA_TAG_U32, s->buffer_attr.maxlength,
1083 PA_TAG_BOOLEAN, s->corked,
1086 if (s->direction == PA_STREAM_PLAYBACK) {
1091 PA_TAG_U32, s->buffer_attr.tlength,
1092 PA_TAG_U32, s->buffer_attr.prebuf,
1093 PA_TAG_U32, s->buffer_attr.minreq,
1094 PA_TAG_U32, s->syncid,
1097 volume_set = !!volume;
1100 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1102 pa_tagstruct_put_cvolume(t, volume);
1104 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1106 if (s->context->version >= 12) {
1109 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1110 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1111 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1112 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1113 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1114 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1115 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1119 if (s->context->version >= 13) {
1121 if (s->direction == PA_STREAM_PLAYBACK)
1122 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1124 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1128 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1129 PA_TAG_PROPLIST, s->proplist,
1132 if (s->direction == PA_STREAM_RECORD)
1133 pa_tagstruct_putu32(t, s->direct_on_input);
1136 if (s->context->version >= 14) {
1138 if (s->direction == PA_STREAM_PLAYBACK)
1139 pa_tagstruct_put_boolean(t, volume_set);
1141 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1144 if (s->context->version >= 15) {
1146 if (s->direction == PA_STREAM_PLAYBACK)
1147 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1149 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1150 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1153 pa_pstream_send_tagstruct(s->context->pstream, t);
1154 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1156 pa_stream_set_state(s, PA_STREAM_CREATING);
1162 int pa_stream_connect_playback(
1165 const pa_buffer_attr *attr,
1166 pa_stream_flags_t flags,
1168 pa_stream *sync_stream) {
1171 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1173 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1176 int pa_stream_connect_record(
1179 const pa_buffer_attr *attr,
1180 pa_stream_flags_t flags) {
1183 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1185 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1188 int pa_stream_write(
1192 void (*free_cb)(void *p),
1194 pa_seek_mode_t seek) {
1197 pa_seek_mode_t t_seek;
1203 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1206 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1207 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1208 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1209 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1210 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1220 while (t_length > 0) {
1224 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1225 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1226 chunk.length = t_length;
1230 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1231 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1233 d = pa_memblock_acquire(chunk.memblock);
1234 memcpy(d, t_data, chunk.length);
1235 pa_memblock_release(chunk.memblock);
1238 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1241 t_seek = PA_SEEK_RELATIVE;
1243 t_data = (const uint8_t*) t_data + chunk.length;
1244 t_length -= chunk.length;
1246 pa_memblock_unref(chunk.memblock);
1249 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1250 free_cb((void*) data);
1252 if (length < s->requested_bytes)
1253 s->requested_bytes -= (uint32_t) length;
1255 s->requested_bytes = 0;
1257 /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1259 if (s->direction == PA_STREAM_PLAYBACK) {
1261 /* Update latency request correction */
1262 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1264 if (seek == PA_SEEK_ABSOLUTE) {
1265 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1266 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1267 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1268 } else if (seek == PA_SEEK_RELATIVE) {
1269 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1270 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1272 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1275 /* Update the write index in the already available latency data */
1276 if (s->timing_info_valid) {
1278 if (seek == PA_SEEK_ABSOLUTE) {
1279 s->timing_info.write_index_corrupt = FALSE;
1280 s->timing_info.write_index = offset + (int64_t) length;
1281 } else if (seek == PA_SEEK_RELATIVE) {
1282 if (!s->timing_info.write_index_corrupt)
1283 s->timing_info.write_index += offset + (int64_t) length;
1285 s->timing_info.write_index_corrupt = TRUE;
1288 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1289 request_auto_timing_update(s, TRUE);
1295 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1297 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1301 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1302 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1303 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1305 if (!s->peek_memchunk.memblock) {
1307 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1313 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1316 pa_assert(s->peek_data);
1317 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1318 *length = s->peek_memchunk.length;
1322 int pa_stream_drop(pa_stream *s) {
1324 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1326 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1327 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1328 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1329 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1331 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1333 /* Fix the simulated local read index */
1334 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1335 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1337 pa_assert(s->peek_data);
1338 pa_memblock_release(s->peek_memchunk.memblock);
1339 pa_memblock_unref(s->peek_memchunk.memblock);
1340 pa_memchunk_reset(&s->peek_memchunk);
1345 size_t pa_stream_writable_size(pa_stream *s) {
1347 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1351 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1353 return s->requested_bytes;
1356 size_t pa_stream_readable_size(pa_stream *s) {
1358 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1360 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1361 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1362 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1364 return pa_memblockq_get_length(s->record_memblockq);
1367 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1373 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1375 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1376 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1377 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1379 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1381 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1382 pa_tagstruct_putu32(t, s->channel);
1383 pa_pstream_send_tagstruct(s->context->pstream, t);
1384 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1389 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1393 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1394 pa_assert(s->state == PA_STREAM_READY);
1395 pa_assert(s->direction != PA_STREAM_UPLOAD);
1396 pa_assert(s->timing_info_valid);
1397 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1398 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1400 if (s->direction == PA_STREAM_PLAYBACK) {
1401 /* The last byte that was written into the output device
1402 * had this time value associated */
1403 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1405 if (!s->corked && !s->suspended) {
1407 if (!ignore_transport)
1408 /* Because the latency info took a little time to come
1409 * to us, we assume that the real output time is actually
1411 usec += s->timing_info.transport_usec;
1413 /* However, the output device usually maintains a buffer
1414 too, hence the real sample currently played is a little
1416 if (s->timing_info.sink_usec >= usec)
1419 usec -= s->timing_info.sink_usec;
1423 pa_assert(s->direction == PA_STREAM_RECORD);
1425 /* The last byte written into the server side queue had
1426 * this time value associated */
1427 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1429 if (!s->corked && !s->suspended) {
1431 if (!ignore_transport)
1432 /* Add transport latency */
1433 usec += s->timing_info.transport_usec;
1435 /* Add latency of data in device buffer */
1436 usec += s->timing_info.source_usec;
1438 /* If this is a monitor source, we need to correct the
1439 * time by the playback device buffer */
1440 if (s->timing_info.sink_usec >= usec)
1443 usec -= s->timing_info.sink_usec;
1450 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1451 pa_operation *o = userdata;
1452 struct timeval local, remote, now;
1454 pa_bool_t playing = FALSE;
1455 uint64_t underrun_for = 0, playing_for = 0;
1459 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1461 if (!o->context || !o->stream)
1464 i = &o->stream->timing_info;
1466 o->stream->timing_info_valid = FALSE;
1467 i->write_index_corrupt = TRUE;
1468 i->read_index_corrupt = TRUE;
1470 if (command != PA_COMMAND_REPLY) {
1471 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1476 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1477 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1478 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1479 pa_tagstruct_get_timeval(t, &local) < 0 ||
1480 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1481 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1482 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1484 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1488 if (o->context->version >= 13 &&
1489 o->stream->direction == PA_STREAM_PLAYBACK)
1490 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1491 pa_tagstruct_getu64(t, &playing_for) < 0) {
1493 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1498 if (!pa_tagstruct_eof(t)) {
1499 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1502 o->stream->timing_info_valid = TRUE;
1503 i->write_index_corrupt = FALSE;
1504 i->read_index_corrupt = FALSE;
1506 i->playing = (int) playing;
1507 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1509 pa_gettimeofday(&now);
1511 /* Calculcate timestamps */
1512 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1513 /* local and remote seem to have synchronized clocks */
1515 if (o->stream->direction == PA_STREAM_PLAYBACK)
1516 i->transport_usec = pa_timeval_diff(&remote, &local);
1518 i->transport_usec = pa_timeval_diff(&now, &remote);
1520 i->synchronized_clocks = TRUE;
1521 i->timestamp = remote;
1523 /* clocks are not synchronized, let's estimate latency then */
1524 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1525 i->synchronized_clocks = FALSE;
1526 i->timestamp = local;
1527 pa_timeval_add(&i->timestamp, i->transport_usec);
1530 /* Invalidate read and write indexes if necessary */
1531 if (tag < o->stream->read_index_not_before)
1532 i->read_index_corrupt = TRUE;
1534 if (tag < o->stream->write_index_not_before)
1535 i->write_index_corrupt = TRUE;
1537 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1538 /* Write index correction */
1541 uint32_t ctag = tag;
1543 /* Go through the saved correction values and add up the
1544 * total correction.*/
1545 for (n = 0, j = o->stream->current_write_index_correction+1;
1546 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1547 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1549 /* Step over invalid data or out-of-date data */
1550 if (!o->stream->write_index_corrections[j].valid ||
1551 o->stream->write_index_corrections[j].tag < ctag)
1554 /* Make sure that everything is in order */
1555 ctag = o->stream->write_index_corrections[j].tag+1;
1557 /* Now fix the write index */
1558 if (o->stream->write_index_corrections[j].corrupt) {
1559 /* A corrupting seek was made */
1560 i->write_index_corrupt = TRUE;
1561 } else if (o->stream->write_index_corrections[j].absolute) {
1562 /* An absolute seek was made */
1563 i->write_index = o->stream->write_index_corrections[j].value;
1564 i->write_index_corrupt = FALSE;
1565 } else if (!i->write_index_corrupt) {
1566 /* A relative seek was made */
1567 i->write_index += o->stream->write_index_corrections[j].value;
1571 /* Clear old correction entries */
1572 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1573 if (!o->stream->write_index_corrections[n].valid)
1576 if (o->stream->write_index_corrections[n].tag <= tag)
1577 o->stream->write_index_corrections[n].valid = FALSE;
1581 if (o->stream->direction == PA_STREAM_RECORD) {
1582 /* Read index correction */
1584 if (!i->read_index_corrupt)
1585 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1588 /* Update smoother */
1589 if (o->stream->smoother) {
1592 u = x = pa_rtclock_usec() - i->transport_usec;
1594 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1597 /* If we weren't playing then it will take some time
1598 * until the audio will actually come out through the
1599 * speakers. Since we follow that timing here, we need
1600 * to try to fix this up */
1602 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1604 if (su < i->sink_usec)
1605 x += i->sink_usec - su;
1609 pa_smoother_pause(o->stream->smoother, x);
1611 /* Update the smoother */
1612 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1613 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1614 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1617 pa_smoother_resume(o->stream->smoother, x);
1621 o->stream->auto_timing_update_requested = FALSE;
1623 if (o->stream->latency_update_callback)
1624 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1626 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1627 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1628 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1633 pa_operation_done(o);
1634 pa_operation_unref(o);
1637 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1645 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1647 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1648 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1649 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1651 if (s->direction == PA_STREAM_PLAYBACK) {
1652 /* Find a place to store the write_index correction data for this entry */
1653 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1655 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1656 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1658 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1660 t = pa_tagstruct_command(
1662 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1664 pa_tagstruct_putu32(t, s->channel);
1665 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1667 pa_pstream_send_tagstruct(s->context->pstream, t);
1668 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1670 if (s->direction == PA_STREAM_PLAYBACK) {
1671 /* Fill in initial correction data */
1673 s->current_write_index_correction = cidx;
1675 s->write_index_corrections[cidx].valid = TRUE;
1676 s->write_index_corrections[cidx].absolute = FALSE;
1677 s->write_index_corrections[cidx].corrupt = FALSE;
1678 s->write_index_corrections[cidx].tag = tag;
1679 s->write_index_corrections[cidx].value = 0;
1685 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1686 pa_stream *s = userdata;
1690 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1694 if (command != PA_COMMAND_REPLY) {
1695 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1698 pa_stream_set_state(s, PA_STREAM_FAILED);
1700 } else if (!pa_tagstruct_eof(t)) {
1701 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1705 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1711 int pa_stream_disconnect(pa_stream *s) {
1716 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1718 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1719 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1720 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1724 t = pa_tagstruct_command(
1726 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1727 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1729 pa_tagstruct_putu32(t, s->channel);
1730 pa_pstream_send_tagstruct(s->context->pstream, t);
1731 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1737 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1739 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1741 if (pa_detect_fork())
1744 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1747 s->read_callback = cb;
1748 s->read_userdata = userdata;
1751 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1753 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1755 if (pa_detect_fork())
1758 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1761 s->write_callback = cb;
1762 s->write_userdata = userdata;
1765 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1767 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1769 if (pa_detect_fork())
1772 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1775 s->state_callback = cb;
1776 s->state_userdata = userdata;
1779 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1781 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1783 if (pa_detect_fork())
1786 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1789 s->overflow_callback = cb;
1790 s->overflow_userdata = userdata;
1793 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1797 if (pa_detect_fork())
1800 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1803 s->underflow_callback = cb;
1804 s->underflow_userdata = userdata;
1807 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1809 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1811 if (pa_detect_fork())
1814 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1817 s->latency_update_callback = cb;
1818 s->latency_update_userdata = userdata;
1821 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1823 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1825 if (pa_detect_fork())
1828 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1831 s->moved_callback = cb;
1832 s->moved_userdata = userdata;
1835 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1837 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839 if (pa_detect_fork())
1842 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1845 s->suspended_callback = cb;
1846 s->suspended_userdata = userdata;
1849 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1851 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1853 if (pa_detect_fork())
1856 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1859 s->started_callback = cb;
1860 s->started_userdata = userdata;
1863 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1865 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1867 if (pa_detect_fork())
1870 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1873 s->event_callback = cb;
1874 s->event_userdata = userdata;
1877 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1879 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1881 if (pa_detect_fork())
1884 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1887 s->buffer_attr_callback = cb;
1888 s->buffer_attr_userdata = userdata;
1891 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1892 pa_operation *o = userdata;
1897 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1902 if (command != PA_COMMAND_REPLY) {
1903 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1907 } else if (!pa_tagstruct_eof(t)) {
1908 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1913 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1914 cb(o->stream, success, o->userdata);
1918 pa_operation_done(o);
1919 pa_operation_unref(o);
1922 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1928 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1931 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1932 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1936 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1938 t = pa_tagstruct_command(
1940 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1942 pa_tagstruct_putu32(t, s->channel);
1943 pa_tagstruct_put_boolean(t, !!b);
1944 pa_pstream_send_tagstruct(s->context->pstream, t);
1945 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1947 check_smoother_status(s, FALSE, FALSE, FALSE);
1949 /* This might cause the indexes to hang/start again, hence
1950 * let's request a timing update */
1951 request_auto_timing_update(s, TRUE);
1956 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1962 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1964 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1965 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1967 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1969 t = pa_tagstruct_command(s->context, command, &tag);
1970 pa_tagstruct_putu32(t, s->channel);
1971 pa_pstream_send_tagstruct(s->context->pstream, t);
1972 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1977 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1981 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1983 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1984 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1985 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1987 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1990 if (s->direction == PA_STREAM_PLAYBACK) {
1992 if (s->write_index_corrections[s->current_write_index_correction].valid)
1993 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1995 if (s->buffer_attr.prebuf > 0)
1996 check_smoother_status(s, FALSE, FALSE, TRUE);
1998 /* This will change the write index, but leave the
1999 * read index untouched. */
2000 invalidate_indexes(s, FALSE, TRUE);
2003 /* For record streams this has no influence on the write
2004 * index, but the read index might jump. */
2005 invalidate_indexes(s, TRUE, FALSE);
2010 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2014 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2016 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2017 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2018 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2019 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2021 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2024 /* This might cause the read index to hang again, hence
2025 * let's request a timing update */
2026 request_auto_timing_update(s, TRUE);
2031 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2035 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2037 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2038 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2039 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2040 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2042 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2045 /* This might cause the read index to start moving again, hence
2046 * let's request a timing update */
2047 request_auto_timing_update(s, TRUE);
2052 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2056 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2059 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2060 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2061 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2063 if (s->context->version >= 13) {
2064 pa_proplist *p = pa_proplist_new();
2066 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2067 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2068 pa_proplist_free(p);
2073 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2074 t = pa_tagstruct_command(
2076 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2078 pa_tagstruct_putu32(t, s->channel);
2079 pa_tagstruct_puts(t, name);
2080 pa_pstream_send_tagstruct(s->context->pstream, t);
2081 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2087 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2091 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2093 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2094 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2095 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2096 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2097 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2098 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2101 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2103 usec = calc_time(s, FALSE);
2105 /* Make sure the time runs monotonically */
2106 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2107 if (usec < s->previous_time)
2108 usec = s->previous_time;
2110 s->previous_time = usec;
2119 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2121 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2129 if (negative && s->direction == PA_STREAM_RECORD) {
2137 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2143 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2146 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2147 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2148 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2149 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2150 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2151 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2153 if ((r = pa_stream_get_time(s, &t)) < 0)
2156 if (s->direction == PA_STREAM_PLAYBACK)
2157 cindex = s->timing_info.write_index;
2159 cindex = s->timing_info.read_index;
2164 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2166 if (s->direction == PA_STREAM_PLAYBACK)
2167 *r_usec = time_counter_diff(s, c, t, negative);
2169 *r_usec = time_counter_diff(s, t, c, negative);
2174 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2176 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2178 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2179 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2180 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2181 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2183 return &s->timing_info;
2186 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2188 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2190 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2192 return &s->sample_spec;
2195 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2197 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2201 return &s->channel_map;
2204 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2206 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2208 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2209 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2210 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2212 return &s->buffer_attr;
2215 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2216 pa_operation *o = userdata;
2221 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2226 if (command != PA_COMMAND_REPLY) {
2227 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2232 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2233 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2234 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2235 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2236 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2237 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2240 } else if (o->stream->direction == PA_STREAM_RECORD) {
2241 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2242 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2243 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2248 if (o->stream->context->version >= 13) {
2251 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2252 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2256 if (o->stream->direction == PA_STREAM_RECORD)
2257 o->stream->timing_info.configured_source_usec = usec;
2259 o->stream->timing_info.configured_sink_usec = usec;
2262 if (!pa_tagstruct_eof(t)) {
2263 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2269 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2270 cb(o->stream, success, o->userdata);
2274 pa_operation_done(o);
2275 pa_operation_unref(o);
2279 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2285 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2288 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2291 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2293 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2295 t = pa_tagstruct_command(
2297 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2299 pa_tagstruct_putu32(t, s->channel);
2301 pa_tagstruct_putu32(t, attr->maxlength);
2303 if (s->direction == PA_STREAM_PLAYBACK)
2306 PA_TAG_U32, attr->tlength,
2307 PA_TAG_U32, attr->prebuf,
2308 PA_TAG_U32, attr->minreq,
2311 pa_tagstruct_putu32(t, attr->fragsize);
2313 if (s->context->version >= 13)
2314 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2316 if (s->context->version >= 14)
2317 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2319 pa_pstream_send_tagstruct(s->context->pstream, t);
2320 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2322 /* This might cause changes in the read/write indexex, hence let's
2323 * request a timing update */
2324 request_auto_timing_update(s, TRUE);
2329 uint32_t pa_stream_get_device_index(pa_stream *s) {
2331 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2333 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2334 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2335 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2336 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2337 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2339 return s->device_index;
2342 const char *pa_stream_get_device_name(pa_stream *s) {
2344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2346 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2347 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2348 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2349 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2350 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2352 return s->device_name;
2355 int pa_stream_is_suspended(pa_stream *s) {
2357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2359 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2360 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2361 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2362 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2364 return s->suspended;
2367 int pa_stream_is_corked(pa_stream *s) {
2369 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2371 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2372 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2373 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2378 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2379 pa_operation *o = userdata;
2384 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2389 if (command != PA_COMMAND_REPLY) {
2390 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2396 if (!pa_tagstruct_eof(t)) {
2397 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2402 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2403 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2406 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2407 cb(o->stream, success, o->userdata);
2411 pa_operation_done(o);
2412 pa_operation_unref(o);
2416 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2422 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2424 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2425 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2426 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2427 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2428 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2429 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2431 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2432 o->private = PA_UINT_TO_PTR(rate);
2434 t = pa_tagstruct_command(
2436 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2438 pa_tagstruct_putu32(t, s->channel);
2439 pa_tagstruct_putu32(t, rate);
2441 pa_pstream_send_tagstruct(s->context->pstream, t);
2442 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2447 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2453 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2455 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2456 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2457 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2458 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2459 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2461 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2463 t = pa_tagstruct_command(
2465 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2467 pa_tagstruct_putu32(t, s->channel);
2468 pa_tagstruct_putu32(t, (uint32_t) mode);
2469 pa_tagstruct_put_proplist(t, p);
2471 pa_pstream_send_tagstruct(s->context->pstream, t);
2472 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2474 /* Please note that we don't update s->proplist here, because we
2475 * don't export that field */
2480 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2484 const char * const*k;
2487 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2490 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2491 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2492 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2493 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2495 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2497 t = pa_tagstruct_command(
2499 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2501 pa_tagstruct_putu32(t, s->channel);
2503 for (k = keys; *k; k++)
2504 pa_tagstruct_puts(t, *k);
2506 pa_tagstruct_puts(t, NULL);
2508 pa_pstream_send_tagstruct(s->context->pstream, t);
2509 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2511 /* Please note that we don't update s->proplist here, because we
2512 * don't export that field */
2517 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2519 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2521 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2522 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2523 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2524 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2526 s->direct_on_input = sink_input_idx;
2531 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2533 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2535 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2536 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2537 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2539 return s->direct_on_input;