2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
41 #include "fork-detect.h"
44 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
45 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
55 static void reset_callbacks(pa_stream *s) {
56 s->read_callback = NULL;
57 s->read_userdata = NULL;
58 s->write_callback = NULL;
59 s->write_userdata = NULL;
60 s->state_callback = NULL;
61 s->state_userdata = NULL;
62 s->overflow_callback = NULL;
63 s->overflow_userdata = NULL;
64 s->underflow_callback = NULL;
65 s->underflow_userdata = NULL;
66 s->latency_update_callback = NULL;
67 s->latency_update_userdata = NULL;
68 s->moved_callback = NULL;
69 s->moved_userdata = NULL;
70 s->suspended_callback = NULL;
71 s->suspended_userdata = NULL;
72 s->started_callback = NULL;
73 s->started_userdata = NULL;
74 s->event_callback = NULL;
75 s->event_userdata = NULL;
76 s->buffer_attr_callback = NULL;
77 s->buffer_attr_userdata = NULL;
80 pa_stream *pa_stream_new_with_proplist(
83 const pa_sample_spec *ss,
84 const pa_channel_map *map,
92 pa_assert(PA_REFCNT_VALUE(c) >= 1);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105 s = pa_xnew(pa_stream, 1);
108 s->mainloop = c->mainloop;
110 s->direction = PA_STREAM_NODIRECTION;
111 s->state = PA_STREAM_UNCONNECTED;
114 s->sample_spec = *ss;
115 s->channel_map = *map;
117 s->direct_on_input = PA_INVALID_INDEX;
119 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
124 s->channel_valid = FALSE;
125 s->syncid = c->csyncid++;
126 s->stream_index = PA_INVALID_INDEX;
128 s->requested_bytes = 0;
129 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131 /* We initialize der target length here, so that if the user
132 * passes no explicit buffering metrics the default is similar to
133 * what older PA versions provided. */
135 s->buffer_attr.maxlength = (uint32_t) -1;
136 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
137 s->buffer_attr.minreq = (uint32_t) -1;
138 s->buffer_attr.prebuf = (uint32_t) -1;
139 s->buffer_attr.fragsize = (uint32_t) -1;
141 s->device_index = PA_INVALID_INDEX;
142 s->device_name = NULL;
143 s->suspended = FALSE;
146 pa_memchunk_reset(&s->peek_memchunk);
149 s->record_memblockq = NULL;
152 memset(&s->timing_info, 0, sizeof(s->timing_info));
153 s->timing_info_valid = FALSE;
155 s->previous_time = 0;
157 s->read_index_not_before = 0;
158 s->write_index_not_before = 0;
159 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
160 s->write_index_corrections[i].valid = 0;
161 s->current_write_index_correction = 0;
163 s->auto_timing_update_event = NULL;
164 s->auto_timing_update_requested = FALSE;
165 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
171 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
172 PA_LLIST_PREPEND(pa_stream, c->streams, s);
178 static void stream_unlink(pa_stream *s) {
185 /* Detach from context */
187 /* Unref all operatio object that point to us */
188 for (o = s->context->operations; o; o = n) {
192 pa_operation_cancel(o);
195 /* Drop all outstanding replies for this stream */
196 if (s->context->pdispatch)
197 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
199 if (s->channel_valid) {
200 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
202 s->channel_valid = FALSE;
205 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
210 if (s->auto_timing_update_event) {
211 pa_assert(s->mainloop);
212 s->mainloop->time_free(s->auto_timing_update_event);
218 static void stream_free(pa_stream *s) {
223 if (s->peek_memchunk.memblock) {
225 pa_memblock_release(s->peek_memchunk.memblock);
226 pa_memblock_unref(s->peek_memchunk.memblock);
229 if (s->record_memblockq)
230 pa_memblockq_free(s->record_memblockq);
233 pa_proplist_free(s->proplist);
236 pa_smoother_free(s->smoother);
238 pa_xfree(s->device_name);
242 void pa_stream_unref(pa_stream *s) {
244 pa_assert(PA_REFCNT_VALUE(s) >= 1);
246 if (PA_REFCNT_DEC(s) <= 0)
250 pa_stream* pa_stream_ref(pa_stream *s) {
252 pa_assert(PA_REFCNT_VALUE(s) >= 1);
258 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
265 pa_context* pa_stream_get_context(pa_stream *s) {
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
272 uint32_t pa_stream_get_index(pa_stream *s) {
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
276 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
277 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
279 return s->stream_index;
282 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
293 if (s->state_callback)
294 s->state_callback(s, s->state_userdata);
296 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
302 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
304 pa_assert(PA_REFCNT_VALUE(s) >= 1);
306 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
309 if (s->state == PA_STREAM_READY &&
310 (force || !s->auto_timing_update_requested)) {
313 /* pa_log("Automatically requesting new timing data"); */
315 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
316 pa_operation_unref(o);
317 s->auto_timing_update_requested = TRUE;
321 if (s->auto_timing_update_event) {
325 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
327 pa_gettimeofday(&next);
328 pa_timeval_add(&next, s->auto_timing_interval_usec);
329 s->mainloop->time_restart(s->auto_timing_update_event, &next);
331 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336 pa_context *c = userdata;
341 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
344 pa_assert(PA_REFCNT_VALUE(c) >= 1);
348 if (pa_tagstruct_getu32(t, &channel) < 0 ||
349 !pa_tagstruct_eof(t)) {
350 pa_context_fail(c, PA_ERR_PROTOCOL);
354 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
357 if (s->state != PA_STREAM_READY)
360 pa_context_set_error(c, PA_ERR_KILLED);
361 pa_stream_set_state(s, PA_STREAM_FAILED);
367 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371 pa_assert(!force_start || !force_stop);
376 x = pa_rtclock_usec();
378 if (s->timing_info_valid) {
380 x -= s->timing_info.transport_usec;
382 x += s->timing_info.transport_usec;
384 if (s->direction == PA_STREAM_PLAYBACK)
385 /* it takes a while until the pause/resume is actually
387 x += s->timing_info.sink_usec;
389 /* Data froma while back will be dropped */
390 x -= s->timing_info.source_usec;
393 if (s->suspended || s->corked || force_stop)
394 pa_smoother_pause(s->smoother, x);
395 else if (force_start || s->buffer_attr.prebuf == 0)
396 pa_smoother_resume(s->smoother, x, TRUE);
399 /* Please note that we have no idea if playback actually started
400 * if prebuf is non-zero! */
403 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
404 pa_context *c = userdata;
411 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
414 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
417 pa_assert(PA_REFCNT_VALUE(c) >= 1);
421 if (c->version < 12) {
422 pa_context_fail(c, PA_ERR_PROTOCOL);
426 if (pa_tagstruct_getu32(t, &channel) < 0 ||
427 pa_tagstruct_getu32(t, &di) < 0 ||
428 pa_tagstruct_gets(t, &dn) < 0 ||
429 pa_tagstruct_get_boolean(t, &suspended) < 0) {
430 pa_context_fail(c, PA_ERR_PROTOCOL);
434 if (c->version >= 13) {
436 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
437 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
438 pa_tagstruct_getu32(t, &fragsize) < 0 ||
439 pa_tagstruct_get_usec(t, &usec) < 0) {
440 pa_context_fail(c, PA_ERR_PROTOCOL);
444 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
445 pa_tagstruct_getu32(t, &tlength) < 0 ||
446 pa_tagstruct_getu32(t, &prebuf) < 0 ||
447 pa_tagstruct_getu32(t, &minreq) < 0 ||
448 pa_tagstruct_get_usec(t, &usec) < 0) {
449 pa_context_fail(c, PA_ERR_PROTOCOL);
455 if (!pa_tagstruct_eof(t)) {
456 pa_context_fail(c, PA_ERR_PROTOCOL);
460 if (!dn || di == PA_INVALID_INDEX) {
461 pa_context_fail(c, PA_ERR_PROTOCOL);
465 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
468 if (s->state != PA_STREAM_READY)
471 if (c->version >= 13) {
472 if (s->direction == PA_STREAM_RECORD)
473 s->timing_info.configured_source_usec = usec;
475 s->timing_info.configured_sink_usec = usec;
477 s->buffer_attr.maxlength = maxlength;
478 s->buffer_attr.fragsize = fragsize;
479 s->buffer_attr.tlength = tlength;
480 s->buffer_attr.prebuf = prebuf;
481 s->buffer_attr.minreq = minreq;
484 pa_xfree(s->device_name);
485 s->device_name = pa_xstrdup(dn);
486 s->device_index = di;
488 s->suspended = suspended;
490 check_smoother_status(s, TRUE, FALSE, FALSE);
491 request_auto_timing_update(s, TRUE);
493 if (s->moved_callback)
494 s->moved_callback(s, s->moved_userdata);
500 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
501 pa_context *c = userdata;
505 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
508 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
511 pa_assert(PA_REFCNT_VALUE(c) >= 1);
515 if (c->version < 15) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
520 if (pa_tagstruct_getu32(t, &channel) < 0) {
521 pa_context_fail(c, PA_ERR_PROTOCOL);
525 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
526 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
527 pa_tagstruct_getu32(t, &fragsize) < 0 ||
528 pa_tagstruct_get_usec(t, &usec) < 0) {
529 pa_context_fail(c, PA_ERR_PROTOCOL);
533 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
534 pa_tagstruct_getu32(t, &tlength) < 0 ||
535 pa_tagstruct_getu32(t, &prebuf) < 0 ||
536 pa_tagstruct_getu32(t, &minreq) < 0 ||
537 pa_tagstruct_get_usec(t, &usec) < 0) {
538 pa_context_fail(c, PA_ERR_PROTOCOL);
543 if (!pa_tagstruct_eof(t)) {
544 pa_context_fail(c, PA_ERR_PROTOCOL);
548 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
551 if (s->state != PA_STREAM_READY)
554 if (s->direction == PA_STREAM_RECORD)
555 s->timing_info.configured_source_usec = usec;
557 s->timing_info.configured_sink_usec = usec;
559 s->buffer_attr.maxlength = maxlength;
560 s->buffer_attr.fragsize = fragsize;
561 s->buffer_attr.tlength = tlength;
562 s->buffer_attr.prebuf = prebuf;
563 s->buffer_attr.minreq = minreq;
565 request_auto_timing_update(s, TRUE);
567 if (s->buffer_attr_callback)
568 s->buffer_attr_callback(s, s->buffer_attr_userdata);
574 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
575 pa_context *c = userdata;
581 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
584 pa_assert(PA_REFCNT_VALUE(c) >= 1);
588 if (c->version < 12) {
589 pa_context_fail(c, PA_ERR_PROTOCOL);
593 if (pa_tagstruct_getu32(t, &channel) < 0 ||
594 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
595 !pa_tagstruct_eof(t)) {
596 pa_context_fail(c, PA_ERR_PROTOCOL);
600 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
603 if (s->state != PA_STREAM_READY)
606 s->suspended = suspended;
608 check_smoother_status(s, TRUE, FALSE, FALSE);
609 request_auto_timing_update(s, TRUE);
611 if (s->suspended_callback)
612 s->suspended_callback(s, s->suspended_userdata);
618 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
619 pa_context *c = userdata;
624 pa_assert(command == PA_COMMAND_STARTED);
627 pa_assert(PA_REFCNT_VALUE(c) >= 1);
631 if (c->version < 13) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
636 if (pa_tagstruct_getu32(t, &channel) < 0 ||
637 !pa_tagstruct_eof(t)) {
638 pa_context_fail(c, PA_ERR_PROTOCOL);
642 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
645 if (s->state != PA_STREAM_READY)
648 check_smoother_status(s, TRUE, TRUE, FALSE);
649 request_auto_timing_update(s, TRUE);
651 if (s->started_callback)
652 s->started_callback(s, s->started_userdata);
658 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
659 pa_context *c = userdata;
662 pa_proplist *pl = NULL;
666 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
669 pa_assert(PA_REFCNT_VALUE(c) >= 1);
673 if (c->version < 15) {
674 pa_context_fail(c, PA_ERR_PROTOCOL);
678 pl = pa_proplist_new();
680 if (pa_tagstruct_getu32(t, &channel) < 0 ||
681 pa_tagstruct_gets(t, &event) < 0 ||
682 pa_tagstruct_get_proplist(t, pl) < 0 ||
683 !pa_tagstruct_eof(t) || !event) {
684 pa_context_fail(c, PA_ERR_PROTOCOL);
688 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
691 if (s->state != PA_STREAM_READY)
694 if (s->event_callback)
695 s->event_callback(s, event, pl, s->event_userdata);
701 pa_proplist_free(pl);
704 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
706 pa_context *c = userdata;
707 uint32_t bytes, channel;
710 pa_assert(command == PA_COMMAND_REQUEST);
713 pa_assert(PA_REFCNT_VALUE(c) >= 1);
717 if (pa_tagstruct_getu32(t, &channel) < 0 ||
718 pa_tagstruct_getu32(t, &bytes) < 0 ||
719 !pa_tagstruct_eof(t)) {
720 pa_context_fail(c, PA_ERR_PROTOCOL);
724 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
727 if (s->state != PA_STREAM_READY)
730 s->requested_bytes += bytes;
732 if (s->requested_bytes > 0 && s->write_callback)
733 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
739 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
741 pa_context *c = userdata;
745 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
748 pa_assert(PA_REFCNT_VALUE(c) >= 1);
752 if (pa_tagstruct_getu32(t, &channel) < 0 ||
753 !pa_tagstruct_eof(t)) {
754 pa_context_fail(c, PA_ERR_PROTOCOL);
758 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
761 if (s->state != PA_STREAM_READY)
764 if (s->buffer_attr.prebuf > 0)
765 check_smoother_status(s, TRUE, FALSE, TRUE);
767 request_auto_timing_update(s, TRUE);
769 if (command == PA_COMMAND_OVERFLOW) {
770 if (s->overflow_callback)
771 s->overflow_callback(s, s->overflow_userdata);
772 } else if (command == PA_COMMAND_UNDERFLOW) {
773 if (s->underflow_callback)
774 s->underflow_callback(s, s->underflow_userdata);
781 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
783 pa_assert(PA_REFCNT_VALUE(s) >= 1);
785 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
787 if (s->state != PA_STREAM_READY)
791 s->write_index_not_before = s->context->ctag;
793 if (s->timing_info_valid)
794 s->timing_info.write_index_corrupt = TRUE;
796 /* pa_log("write_index invalidated"); */
800 s->read_index_not_before = s->context->ctag;
802 if (s->timing_info_valid)
803 s->timing_info.read_index_corrupt = TRUE;
805 /* pa_log("read_index invalidated"); */
808 request_auto_timing_update(s, TRUE);
811 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
812 pa_stream *s = userdata;
815 pa_assert(PA_REFCNT_VALUE(s) >= 1);
818 request_auto_timing_update(s, FALSE);
822 static void create_stream_complete(pa_stream *s) {
824 pa_assert(PA_REFCNT_VALUE(s) >= 1);
825 pa_assert(s->state == PA_STREAM_CREATING);
827 pa_stream_set_state(s, PA_STREAM_READY);
829 if (s->requested_bytes > 0 && s->write_callback)
830 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
832 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
834 pa_gettimeofday(&tv);
835 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
836 pa_timeval_add(&tv, s->auto_timing_interval_usec);
837 pa_assert(!s->auto_timing_update_event);
838 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
840 request_auto_timing_update(s, TRUE);
843 check_smoother_status(s, TRUE, FALSE, FALSE);
846 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
851 if (s->context->version >= 13)
854 /* Version older than 0.9.10 didn't do server side buffer_attr
855 * selection, hence we have to fake it on the client side. */
857 /* We choose fairly conservative values here, to not confuse
858 * old clients with extremely large playback buffers */
860 if (attr->maxlength == (uint32_t) -1)
861 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
863 if (attr->tlength == (uint32_t) -1)
864 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
866 if (attr->minreq == (uint32_t) -1)
867 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
869 if (attr->prebuf == (uint32_t) -1)
870 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
872 if (attr->fragsize == (uint32_t) -1)
873 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
876 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
877 pa_stream *s = userdata;
878 uint32_t requested_bytes;
882 pa_assert(PA_REFCNT_VALUE(s) >= 1);
883 pa_assert(s->state == PA_STREAM_CREATING);
887 if (command != PA_COMMAND_REPLY) {
888 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
891 pa_stream_set_state(s, PA_STREAM_FAILED);
895 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
896 s->channel == PA_INVALID_INDEX ||
897 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
898 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
899 pa_context_fail(s->context, PA_ERR_PROTOCOL);
903 s->requested_bytes = (int64_t) requested_bytes;
905 if (s->context->version >= 9) {
906 if (s->direction == PA_STREAM_PLAYBACK) {
907 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
909 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
910 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
911 pa_context_fail(s->context, PA_ERR_PROTOCOL);
914 } else if (s->direction == PA_STREAM_RECORD) {
915 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
916 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
917 pa_context_fail(s->context, PA_ERR_PROTOCOL);
923 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
926 const char *dn = NULL;
929 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
930 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
931 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
932 pa_tagstruct_gets(t, &dn) < 0 ||
933 pa_tagstruct_get_boolean(t, &suspended) < 0) {
934 pa_context_fail(s->context, PA_ERR_PROTOCOL);
938 if (!dn || s->device_index == PA_INVALID_INDEX ||
939 ss.channels != cm.channels ||
940 !pa_channel_map_valid(&cm) ||
941 !pa_sample_spec_valid(&ss) ||
942 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
943 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
944 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
945 pa_context_fail(s->context, PA_ERR_PROTOCOL);
949 pa_xfree(s->device_name);
950 s->device_name = pa_xstrdup(dn);
951 s->suspended = suspended;
957 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
960 if (pa_tagstruct_get_usec(t, &usec) < 0) {
961 pa_context_fail(s->context, PA_ERR_PROTOCOL);
965 if (s->direction == PA_STREAM_RECORD)
966 s->timing_info.configured_source_usec = usec;
968 s->timing_info.configured_sink_usec = usec;
971 if (!pa_tagstruct_eof(t)) {
972 pa_context_fail(s->context, PA_ERR_PROTOCOL);
976 if (s->direction == PA_STREAM_RECORD) {
977 pa_assert(!s->record_memblockq);
979 s->record_memblockq = pa_memblockq_new(
981 s->buffer_attr.maxlength,
983 pa_frame_size(&s->sample_spec),
990 s->channel_valid = TRUE;
991 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
993 create_stream_complete(s);
999 static int create_stream(
1000 pa_stream_direction_t direction,
1003 const pa_buffer_attr *attr,
1004 pa_stream_flags_t flags,
1005 const pa_cvolume *volume,
1006 pa_stream *sync_stream) {
1010 pa_bool_t volume_set = FALSE;
1013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1014 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1016 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1017 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1018 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1019 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1020 PA_STREAM_INTERPOLATE_TIMING|
1021 PA_STREAM_NOT_MONOTONIC|
1022 PA_STREAM_AUTO_TIMING_UPDATE|
1023 PA_STREAM_NO_REMAP_CHANNELS|
1024 PA_STREAM_NO_REMIX_CHANNELS|
1025 PA_STREAM_FIX_FORMAT|
1027 PA_STREAM_FIX_CHANNELS|
1028 PA_STREAM_DONT_MOVE|
1029 PA_STREAM_VARIABLE_RATE|
1030 PA_STREAM_PEAK_DETECT|
1031 PA_STREAM_START_MUTED|
1032 PA_STREAM_ADJUST_LATENCY|
1033 PA_STREAM_EARLY_REQUESTS|
1034 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1035 PA_STREAM_START_UNMUTED|
1036 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1038 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1039 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1040 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1041 /* Althought some of the other flags are not supported on older
1042 * version, we don't check for them here, because it doesn't hurt
1043 * when they are passed but actually not supported. This makes
1044 * client development easier */
1046 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1047 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1048 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1049 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1050 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1054 s->direction = direction;
1056 s->corked = !!(flags & PA_STREAM_START_CORKED);
1059 s->syncid = sync_stream->syncid;
1062 s->buffer_attr = *attr;
1063 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1065 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1068 x = pa_rtclock_usec();
1070 pa_assert(!s->smoother);
1071 s->smoother = pa_smoother_new(
1072 SMOOTHER_ADJUST_TIME,
1073 SMOOTHER_HISTORY_TIME,
1074 !(flags & PA_STREAM_NOT_MONOTONIC),
1076 SMOOTHER_MIN_HISTORY,
1082 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1084 t = pa_tagstruct_command(
1086 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1089 if (s->context->version < 13)
1090 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1094 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1095 PA_TAG_CHANNEL_MAP, &s->channel_map,
1096 PA_TAG_U32, PA_INVALID_INDEX,
1098 PA_TAG_U32, s->buffer_attr.maxlength,
1099 PA_TAG_BOOLEAN, s->corked,
1102 if (s->direction == PA_STREAM_PLAYBACK) {
1107 PA_TAG_U32, s->buffer_attr.tlength,
1108 PA_TAG_U32, s->buffer_attr.prebuf,
1109 PA_TAG_U32, s->buffer_attr.minreq,
1110 PA_TAG_U32, s->syncid,
1113 volume_set = !!volume;
1116 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1118 pa_tagstruct_put_cvolume(t, volume);
1120 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1122 if (s->context->version >= 12) {
1125 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1126 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1127 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1128 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1129 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1130 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1131 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1135 if (s->context->version >= 13) {
1137 if (s->direction == PA_STREAM_PLAYBACK)
1138 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1140 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1144 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1145 PA_TAG_PROPLIST, s->proplist,
1148 if (s->direction == PA_STREAM_RECORD)
1149 pa_tagstruct_putu32(t, s->direct_on_input);
1152 if (s->context->version >= 14) {
1154 if (s->direction == PA_STREAM_PLAYBACK)
1155 pa_tagstruct_put_boolean(t, volume_set);
1157 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1160 if (s->context->version >= 15) {
1162 if (s->direction == PA_STREAM_PLAYBACK)
1163 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1165 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1166 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1169 pa_pstream_send_tagstruct(s->context->pstream, t);
1170 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1172 pa_stream_set_state(s, PA_STREAM_CREATING);
1178 int pa_stream_connect_playback(
1181 const pa_buffer_attr *attr,
1182 pa_stream_flags_t flags,
1184 pa_stream *sync_stream) {
1187 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1189 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1192 int pa_stream_connect_record(
1195 const pa_buffer_attr *attr,
1196 pa_stream_flags_t flags) {
1199 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1201 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1204 int pa_stream_write(
1208 void (*free_cb)(void *p),
1210 pa_seek_mode_t seek) {
1213 pa_seek_mode_t t_seek;
1219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1222 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1223 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1224 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1225 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1226 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1236 while (t_length > 0) {
1240 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1241 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1242 chunk.length = t_length;
1246 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1247 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1249 d = pa_memblock_acquire(chunk.memblock);
1250 memcpy(d, t_data, chunk.length);
1251 pa_memblock_release(chunk.memblock);
1254 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1257 t_seek = PA_SEEK_RELATIVE;
1259 t_data = (const uint8_t*) t_data + chunk.length;
1260 t_length -= chunk.length;
1262 pa_memblock_unref(chunk.memblock);
1265 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1266 free_cb((void*) data);
1268 /* This is obviously wrong since we ignore the seeking index . But
1269 * that's OK, the server side applies the same error */
1270 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1272 if (s->direction == PA_STREAM_PLAYBACK) {
1274 /* Update latency request correction */
1275 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1277 if (seek == PA_SEEK_ABSOLUTE) {
1278 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1279 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1280 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1281 } else if (seek == PA_SEEK_RELATIVE) {
1282 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1283 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1285 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1288 /* Update the write index in the already available latency data */
1289 if (s->timing_info_valid) {
1291 if (seek == PA_SEEK_ABSOLUTE) {
1292 s->timing_info.write_index_corrupt = FALSE;
1293 s->timing_info.write_index = offset + (int64_t) length;
1294 } else if (seek == PA_SEEK_RELATIVE) {
1295 if (!s->timing_info.write_index_corrupt)
1296 s->timing_info.write_index += offset + (int64_t) length;
1298 s->timing_info.write_index_corrupt = TRUE;
1301 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1302 request_auto_timing_update(s, TRUE);
1308 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1310 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1314 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1315 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1316 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1318 if (!s->peek_memchunk.memblock) {
1320 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1326 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1329 pa_assert(s->peek_data);
1330 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1331 *length = s->peek_memchunk.length;
1335 int pa_stream_drop(pa_stream *s) {
1337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1339 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1340 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1341 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1342 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1344 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1346 /* Fix the simulated local read index */
1347 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1348 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1350 pa_assert(s->peek_data);
1351 pa_memblock_release(s->peek_memchunk.memblock);
1352 pa_memblock_unref(s->peek_memchunk.memblock);
1353 pa_memchunk_reset(&s->peek_memchunk);
1358 size_t pa_stream_writable_size(pa_stream *s) {
1360 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1362 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1363 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1364 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1366 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1369 size_t pa_stream_readable_size(pa_stream *s) {
1371 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1373 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1374 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1375 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1377 return pa_memblockq_get_length(s->record_memblockq);
1380 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1386 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1388 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1389 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1390 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1392 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1394 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1395 pa_tagstruct_putu32(t, s->channel);
1396 pa_pstream_send_tagstruct(s->context->pstream, t);
1397 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1402 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1406 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1407 pa_assert(s->state == PA_STREAM_READY);
1408 pa_assert(s->direction != PA_STREAM_UPLOAD);
1409 pa_assert(s->timing_info_valid);
1410 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1411 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1413 if (s->direction == PA_STREAM_PLAYBACK) {
1414 /* The last byte that was written into the output device
1415 * had this time value associated */
1416 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1418 if (!s->corked && !s->suspended) {
1420 if (!ignore_transport)
1421 /* Because the latency info took a little time to come
1422 * to us, we assume that the real output time is actually
1424 usec += s->timing_info.transport_usec;
1426 /* However, the output device usually maintains a buffer
1427 too, hence the real sample currently played is a little
1429 if (s->timing_info.sink_usec >= usec)
1432 usec -= s->timing_info.sink_usec;
1436 pa_assert(s->direction == PA_STREAM_RECORD);
1438 /* The last byte written into the server side queue had
1439 * this time value associated */
1440 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1442 if (!s->corked && !s->suspended) {
1444 if (!ignore_transport)
1445 /* Add transport latency */
1446 usec += s->timing_info.transport_usec;
1448 /* Add latency of data in device buffer */
1449 usec += s->timing_info.source_usec;
1451 /* If this is a monitor source, we need to correct the
1452 * time by the playback device buffer */
1453 if (s->timing_info.sink_usec >= usec)
1456 usec -= s->timing_info.sink_usec;
1463 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1464 pa_operation *o = userdata;
1465 struct timeval local, remote, now;
1467 pa_bool_t playing = FALSE;
1468 uint64_t underrun_for = 0, playing_for = 0;
1472 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1474 if (!o->context || !o->stream)
1477 i = &o->stream->timing_info;
1479 o->stream->timing_info_valid = FALSE;
1480 i->write_index_corrupt = TRUE;
1481 i->read_index_corrupt = TRUE;
1483 if (command != PA_COMMAND_REPLY) {
1484 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1489 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1490 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1491 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1492 pa_tagstruct_get_timeval(t, &local) < 0 ||
1493 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1494 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1495 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1497 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1501 if (o->context->version >= 13 &&
1502 o->stream->direction == PA_STREAM_PLAYBACK)
1503 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1504 pa_tagstruct_getu64(t, &playing_for) < 0) {
1506 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1511 if (!pa_tagstruct_eof(t)) {
1512 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1515 o->stream->timing_info_valid = TRUE;
1516 i->write_index_corrupt = FALSE;
1517 i->read_index_corrupt = FALSE;
1519 i->playing = (int) playing;
1520 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1522 pa_gettimeofday(&now);
1524 /* Calculcate timestamps */
1525 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1526 /* local and remote seem to have synchronized clocks */
1528 if (o->stream->direction == PA_STREAM_PLAYBACK)
1529 i->transport_usec = pa_timeval_diff(&remote, &local);
1531 i->transport_usec = pa_timeval_diff(&now, &remote);
1533 i->synchronized_clocks = TRUE;
1534 i->timestamp = remote;
1536 /* clocks are not synchronized, let's estimate latency then */
1537 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1538 i->synchronized_clocks = FALSE;
1539 i->timestamp = local;
1540 pa_timeval_add(&i->timestamp, i->transport_usec);
1543 /* Invalidate read and write indexes if necessary */
1544 if (tag < o->stream->read_index_not_before)
1545 i->read_index_corrupt = TRUE;
1547 if (tag < o->stream->write_index_not_before)
1548 i->write_index_corrupt = TRUE;
1550 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1551 /* Write index correction */
1554 uint32_t ctag = tag;
1556 /* Go through the saved correction values and add up the
1557 * total correction.*/
1558 for (n = 0, j = o->stream->current_write_index_correction+1;
1559 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1560 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1562 /* Step over invalid data or out-of-date data */
1563 if (!o->stream->write_index_corrections[j].valid ||
1564 o->stream->write_index_corrections[j].tag < ctag)
1567 /* Make sure that everything is in order */
1568 ctag = o->stream->write_index_corrections[j].tag+1;
1570 /* Now fix the write index */
1571 if (o->stream->write_index_corrections[j].corrupt) {
1572 /* A corrupting seek was made */
1573 i->write_index_corrupt = TRUE;
1574 } else if (o->stream->write_index_corrections[j].absolute) {
1575 /* An absolute seek was made */
1576 i->write_index = o->stream->write_index_corrections[j].value;
1577 i->write_index_corrupt = FALSE;
1578 } else if (!i->write_index_corrupt) {
1579 /* A relative seek was made */
1580 i->write_index += o->stream->write_index_corrections[j].value;
1584 /* Clear old correction entries */
1585 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1586 if (!o->stream->write_index_corrections[n].valid)
1589 if (o->stream->write_index_corrections[n].tag <= tag)
1590 o->stream->write_index_corrections[n].valid = FALSE;
1594 if (o->stream->direction == PA_STREAM_RECORD) {
1595 /* Read index correction */
1597 if (!i->read_index_corrupt)
1598 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1601 /* Update smoother */
1602 if (o->stream->smoother) {
1605 u = x = pa_rtclock_usec() - i->transport_usec;
1607 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1610 /* If we weren't playing then it will take some time
1611 * until the audio will actually come out through the
1612 * speakers. Since we follow that timing here, we need
1613 * to try to fix this up */
1615 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1617 if (su < i->sink_usec)
1618 x += i->sink_usec - su;
1622 pa_smoother_pause(o->stream->smoother, x);
1624 /* Update the smoother */
1625 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1626 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1627 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1630 pa_smoother_resume(o->stream->smoother, x, TRUE);
1634 o->stream->auto_timing_update_requested = FALSE;
1636 if (o->stream->latency_update_callback)
1637 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1639 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1640 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1641 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1646 pa_operation_done(o);
1647 pa_operation_unref(o);
1650 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1660 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1664 if (s->direction == PA_STREAM_PLAYBACK) {
1665 /* Find a place to store the write_index correction data for this entry */
1666 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1668 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1669 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1671 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1673 t = pa_tagstruct_command(
1675 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1677 pa_tagstruct_putu32(t, s->channel);
1678 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1680 pa_pstream_send_tagstruct(s->context->pstream, t);
1681 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1683 if (s->direction == PA_STREAM_PLAYBACK) {
1684 /* Fill in initial correction data */
1686 s->current_write_index_correction = cidx;
1688 s->write_index_corrections[cidx].valid = TRUE;
1689 s->write_index_corrections[cidx].absolute = FALSE;
1690 s->write_index_corrections[cidx].corrupt = FALSE;
1691 s->write_index_corrections[cidx].tag = tag;
1692 s->write_index_corrections[cidx].value = 0;
1698 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1699 pa_stream *s = userdata;
1703 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1707 if (command != PA_COMMAND_REPLY) {
1708 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1711 pa_stream_set_state(s, PA_STREAM_FAILED);
1713 } else if (!pa_tagstruct_eof(t)) {
1714 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1718 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1724 int pa_stream_disconnect(pa_stream *s) {
1729 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1731 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1732 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1733 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1737 t = pa_tagstruct_command(
1739 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1740 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1742 pa_tagstruct_putu32(t, s->channel);
1743 pa_pstream_send_tagstruct(s->context->pstream, t);
1744 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1750 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1752 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1754 if (pa_detect_fork())
1757 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1760 s->read_callback = cb;
1761 s->read_userdata = userdata;
1764 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1766 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1768 if (pa_detect_fork())
1771 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1774 s->write_callback = cb;
1775 s->write_userdata = userdata;
1778 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1780 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1782 if (pa_detect_fork())
1785 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1788 s->state_callback = cb;
1789 s->state_userdata = userdata;
1792 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1794 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1796 if (pa_detect_fork())
1799 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1802 s->overflow_callback = cb;
1803 s->overflow_userdata = userdata;
1806 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1808 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1810 if (pa_detect_fork())
1813 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1816 s->underflow_callback = cb;
1817 s->underflow_userdata = userdata;
1820 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1822 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1824 if (pa_detect_fork())
1827 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1830 s->latency_update_callback = cb;
1831 s->latency_update_userdata = userdata;
1834 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1836 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1838 if (pa_detect_fork())
1841 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1844 s->moved_callback = cb;
1845 s->moved_userdata = userdata;
1848 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1850 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1852 if (pa_detect_fork())
1855 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1858 s->suspended_callback = cb;
1859 s->suspended_userdata = userdata;
1862 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1864 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1866 if (pa_detect_fork())
1869 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1872 s->started_callback = cb;
1873 s->started_userdata = userdata;
1876 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1878 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880 if (pa_detect_fork())
1883 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1886 s->event_callback = cb;
1887 s->event_userdata = userdata;
1890 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1892 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1894 if (pa_detect_fork())
1897 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1900 s->buffer_attr_callback = cb;
1901 s->buffer_attr_userdata = userdata;
1904 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1905 pa_operation *o = userdata;
1910 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1915 if (command != PA_COMMAND_REPLY) {
1916 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1920 } else if (!pa_tagstruct_eof(t)) {
1921 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1926 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1927 cb(o->stream, success, o->userdata);
1931 pa_operation_done(o);
1932 pa_operation_unref(o);
1935 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1941 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1943 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1944 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1945 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1949 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1951 t = pa_tagstruct_command(
1953 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1955 pa_tagstruct_putu32(t, s->channel);
1956 pa_tagstruct_put_boolean(t, !!b);
1957 pa_pstream_send_tagstruct(s->context->pstream, t);
1958 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1960 check_smoother_status(s, FALSE, FALSE, FALSE);
1962 /* This might cause the indexes to hang/start again, hence
1963 * let's request a timing update */
1964 request_auto_timing_update(s, TRUE);
1969 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1975 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1977 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1978 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1980 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1982 t = pa_tagstruct_command(s->context, command, &tag);
1983 pa_tagstruct_putu32(t, s->channel);
1984 pa_pstream_send_tagstruct(s->context->pstream, t);
1985 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1990 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1994 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1996 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1997 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1998 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2000 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2003 if (s->direction == PA_STREAM_PLAYBACK) {
2005 if (s->write_index_corrections[s->current_write_index_correction].valid)
2006 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2008 if (s->buffer_attr.prebuf > 0)
2009 check_smoother_status(s, FALSE, FALSE, TRUE);
2011 /* This will change the write index, but leave the
2012 * read index untouched. */
2013 invalidate_indexes(s, FALSE, TRUE);
2016 /* For record streams this has no influence on the write
2017 * index, but the read index might jump. */
2018 invalidate_indexes(s, TRUE, FALSE);
2023 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2029 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2030 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2031 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2032 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2034 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2037 /* This might cause the read index to hang again, hence
2038 * let's request a timing update */
2039 request_auto_timing_update(s, TRUE);
2044 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2048 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2050 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2051 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2052 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2053 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2055 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2058 /* This might cause the read index to start moving again, hence
2059 * let's request a timing update */
2060 request_auto_timing_update(s, TRUE);
2065 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2069 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2072 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2073 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2074 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2076 if (s->context->version >= 13) {
2077 pa_proplist *p = pa_proplist_new();
2079 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2080 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2081 pa_proplist_free(p);
2086 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2087 t = pa_tagstruct_command(
2089 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2091 pa_tagstruct_putu32(t, s->channel);
2092 pa_tagstruct_puts(t, name);
2093 pa_pstream_send_tagstruct(s->context->pstream, t);
2094 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2100 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2104 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2106 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2107 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2108 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2109 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2110 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2111 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2114 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2116 usec = calc_time(s, FALSE);
2118 /* Make sure the time runs monotonically */
2119 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2120 if (usec < s->previous_time)
2121 usec = s->previous_time;
2123 s->previous_time = usec;
2132 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2134 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2142 if (negative && s->direction == PA_STREAM_RECORD) {
2150 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2156 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2159 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2160 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2161 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2162 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2163 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2164 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2166 if ((r = pa_stream_get_time(s, &t)) < 0)
2169 if (s->direction == PA_STREAM_PLAYBACK)
2170 cindex = s->timing_info.write_index;
2172 cindex = s->timing_info.read_index;
2177 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2179 if (s->direction == PA_STREAM_PLAYBACK)
2180 *r_usec = time_counter_diff(s, c, t, negative);
2182 *r_usec = time_counter_diff(s, t, c, negative);
2187 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2189 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2191 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2192 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2193 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2194 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2196 return &s->timing_info;
2199 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2201 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2205 return &s->sample_spec;
2208 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2210 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2212 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2214 return &s->channel_map;
2217 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2221 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2222 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2223 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2225 return &s->buffer_attr;
2228 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2229 pa_operation *o = userdata;
2234 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2239 if (command != PA_COMMAND_REPLY) {
2240 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2245 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2246 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2247 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2248 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2249 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2250 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2253 } else if (o->stream->direction == PA_STREAM_RECORD) {
2254 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2255 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2256 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2261 if (o->stream->context->version >= 13) {
2264 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2265 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2269 if (o->stream->direction == PA_STREAM_RECORD)
2270 o->stream->timing_info.configured_source_usec = usec;
2272 o->stream->timing_info.configured_sink_usec = usec;
2275 if (!pa_tagstruct_eof(t)) {
2276 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2282 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2283 cb(o->stream, success, o->userdata);
2287 pa_operation_done(o);
2288 pa_operation_unref(o);
2292 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2298 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2301 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2302 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2303 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2304 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2306 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2308 t = pa_tagstruct_command(
2310 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2312 pa_tagstruct_putu32(t, s->channel);
2314 pa_tagstruct_putu32(t, attr->maxlength);
2316 if (s->direction == PA_STREAM_PLAYBACK)
2319 PA_TAG_U32, attr->tlength,
2320 PA_TAG_U32, attr->prebuf,
2321 PA_TAG_U32, attr->minreq,
2324 pa_tagstruct_putu32(t, attr->fragsize);
2326 if (s->context->version >= 13)
2327 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2329 if (s->context->version >= 14)
2330 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2332 pa_pstream_send_tagstruct(s->context->pstream, t);
2333 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2335 /* This might cause changes in the read/write indexex, hence let's
2336 * request a timing update */
2337 request_auto_timing_update(s, TRUE);
2342 uint32_t pa_stream_get_device_index(pa_stream *s) {
2344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2346 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2347 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2348 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2352 return s->device_index;
2355 const char *pa_stream_get_device_name(pa_stream *s) {
2357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2360 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2361 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2362 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2363 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2365 return s->device_name;
2368 int pa_stream_is_suspended(pa_stream *s) {
2370 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2372 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2373 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2374 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2375 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2377 return s->suspended;
2380 int pa_stream_is_corked(pa_stream *s) {
2382 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2384 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2385 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2386 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2391 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2392 pa_operation *o = userdata;
2397 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2402 if (command != PA_COMMAND_REPLY) {
2403 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2409 if (!pa_tagstruct_eof(t)) {
2410 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2415 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2416 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2419 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2420 cb(o->stream, success, o->userdata);
2424 pa_operation_done(o);
2425 pa_operation_unref(o);
2429 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2435 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2437 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2438 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2439 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2440 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2441 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2442 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2444 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2445 o->private = PA_UINT_TO_PTR(rate);
2447 t = pa_tagstruct_command(
2449 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2451 pa_tagstruct_putu32(t, s->channel);
2452 pa_tagstruct_putu32(t, rate);
2454 pa_pstream_send_tagstruct(s->context->pstream, t);
2455 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2460 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2468 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2470 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2471 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2472 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2474 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2476 t = pa_tagstruct_command(
2478 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2480 pa_tagstruct_putu32(t, s->channel);
2481 pa_tagstruct_putu32(t, (uint32_t) mode);
2482 pa_tagstruct_put_proplist(t, p);
2484 pa_pstream_send_tagstruct(s->context->pstream, t);
2485 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2487 /* Please note that we don't update s->proplist here, because we
2488 * don't export that field */
2493 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2497 const char * const*k;
2500 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2503 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2504 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2505 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2506 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2508 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2510 t = pa_tagstruct_command(
2512 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2514 pa_tagstruct_putu32(t, s->channel);
2516 for (k = keys; *k; k++)
2517 pa_tagstruct_puts(t, *k);
2519 pa_tagstruct_puts(t, NULL);
2521 pa_pstream_send_tagstruct(s->context->pstream, t);
2522 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2524 /* Please note that we don't update s->proplist here, because we
2525 * don't export that field */
2530 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2532 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2534 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2535 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2536 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2537 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2539 s->direct_on_input = sink_input_idx;
2544 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2546 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2548 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2549 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2550 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2552 return s->direct_on_input;