2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
41 #include "fork-detect.h"
44 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
45 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_MIN_HISTORY (4)
51 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
52 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
55 static void reset_callbacks(pa_stream *s) {
56 s->read_callback = NULL;
57 s->read_userdata = NULL;
58 s->write_callback = NULL;
59 s->write_userdata = NULL;
60 s->state_callback = NULL;
61 s->state_userdata = NULL;
62 s->overflow_callback = NULL;
63 s->overflow_userdata = NULL;
64 s->underflow_callback = NULL;
65 s->underflow_userdata = NULL;
66 s->latency_update_callback = NULL;
67 s->latency_update_userdata = NULL;
68 s->moved_callback = NULL;
69 s->moved_userdata = NULL;
70 s->suspended_callback = NULL;
71 s->suspended_userdata = NULL;
72 s->started_callback = NULL;
73 s->started_userdata = NULL;
74 s->event_callback = NULL;
75 s->event_userdata = NULL;
76 s->buffer_attr_callback = NULL;
77 s->buffer_attr_userdata = NULL;
80 pa_stream *pa_stream_new_with_proplist(
83 const pa_sample_spec *ss,
84 const pa_channel_map *map,
92 pa_assert(PA_REFCNT_VALUE(c) >= 1);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105 s = pa_xnew(pa_stream, 1);
108 s->mainloop = c->mainloop;
110 s->direction = PA_STREAM_NODIRECTION;
111 s->state = PA_STREAM_UNCONNECTED;
114 s->sample_spec = *ss;
115 s->channel_map = *map;
117 s->direct_on_input = PA_INVALID_INDEX;
119 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
124 s->channel_valid = FALSE;
125 s->syncid = c->csyncid++;
126 s->stream_index = PA_INVALID_INDEX;
128 s->requested_bytes = 0;
129 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131 /* We initialize der target length here, so that if the user
132 * passes no explicit buffering metrics the default is similar to
133 * what older PA versions provided. */
135 s->buffer_attr.maxlength = (uint32_t) -1;
136 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
137 s->buffer_attr.minreq = (uint32_t) -1;
138 s->buffer_attr.prebuf = (uint32_t) -1;
139 s->buffer_attr.fragsize = (uint32_t) -1;
141 s->device_index = PA_INVALID_INDEX;
142 s->device_name = NULL;
143 s->suspended = FALSE;
146 pa_memchunk_reset(&s->peek_memchunk);
149 s->record_memblockq = NULL;
152 memset(&s->timing_info, 0, sizeof(s->timing_info));
153 s->timing_info_valid = FALSE;
155 s->previous_time = 0;
157 s->read_index_not_before = 0;
158 s->write_index_not_before = 0;
159 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
160 s->write_index_corrections[i].valid = 0;
161 s->current_write_index_correction = 0;
163 s->auto_timing_update_event = NULL;
164 s->auto_timing_update_requested = FALSE;
165 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
171 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
172 PA_LLIST_PREPEND(pa_stream, c->streams, s);
178 static void stream_unlink(pa_stream *s) {
185 /* Detach from context */
187 /* Unref all operatio object that point to us */
188 for (o = s->context->operations; o; o = n) {
192 pa_operation_cancel(o);
195 /* Drop all outstanding replies for this stream */
196 if (s->context->pdispatch)
197 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
199 if (s->channel_valid) {
200 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
202 s->channel_valid = FALSE;
205 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
210 if (s->auto_timing_update_event) {
211 pa_assert(s->mainloop);
212 s->mainloop->time_free(s->auto_timing_update_event);
218 static void stream_free(pa_stream *s) {
223 if (s->peek_memchunk.memblock) {
225 pa_memblock_release(s->peek_memchunk.memblock);
226 pa_memblock_unref(s->peek_memchunk.memblock);
229 if (s->record_memblockq)
230 pa_memblockq_free(s->record_memblockq);
233 pa_proplist_free(s->proplist);
236 pa_smoother_free(s->smoother);
238 pa_xfree(s->device_name);
242 void pa_stream_unref(pa_stream *s) {
244 pa_assert(PA_REFCNT_VALUE(s) >= 1);
246 if (PA_REFCNT_DEC(s) <= 0)
250 pa_stream* pa_stream_ref(pa_stream *s) {
252 pa_assert(PA_REFCNT_VALUE(s) >= 1);
258 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
265 pa_context* pa_stream_get_context(pa_stream *s) {
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
272 uint32_t pa_stream_get_index(pa_stream *s) {
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
276 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
277 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
279 return s->stream_index;
282 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
284 pa_assert(PA_REFCNT_VALUE(s) >= 1);
293 if (s->state_callback)
294 s->state_callback(s, s->state_userdata);
296 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
302 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
304 pa_assert(PA_REFCNT_VALUE(s) >= 1);
306 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
309 if (s->state == PA_STREAM_READY &&
310 (force || !s->auto_timing_update_requested)) {
313 /* pa_log("Automatically requesting new timing data"); */
315 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
316 pa_operation_unref(o);
317 s->auto_timing_update_requested = TRUE;
321 if (s->auto_timing_update_event) {
325 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
327 pa_gettimeofday(&next);
328 pa_timeval_add(&next, s->auto_timing_interval_usec);
329 s->mainloop->time_restart(s->auto_timing_update_event, &next);
331 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
336 pa_context *c = userdata;
341 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
344 pa_assert(PA_REFCNT_VALUE(c) >= 1);
348 if (pa_tagstruct_getu32(t, &channel) < 0 ||
349 !pa_tagstruct_eof(t)) {
350 pa_context_fail(c, PA_ERR_PROTOCOL);
354 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
357 if (s->state != PA_STREAM_READY)
360 pa_context_set_error(c, PA_ERR_KILLED);
361 pa_stream_set_state(s, PA_STREAM_FAILED);
367 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371 pa_assert(!force_start || !force_stop);
376 x = pa_rtclock_usec();
378 if (s->timing_info_valid) {
380 x -= s->timing_info.transport_usec;
382 x += s->timing_info.transport_usec;
384 if (s->direction == PA_STREAM_PLAYBACK)
385 /* it takes a while until the pause/resume is actually
387 x += s->timing_info.sink_usec;
389 /* Data froma while back will be dropped */
390 x -= s->timing_info.source_usec;
393 if (s->suspended || s->corked || force_stop)
394 pa_smoother_pause(s->smoother, x);
395 else if (force_start || s->buffer_attr.prebuf == 0)
396 pa_smoother_resume(s->smoother, x);
398 /* Please note that we have no idea if playback actually started
399 * if prebuf is non-zero! */
402 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
403 pa_context *c = userdata;
410 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
413 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
416 pa_assert(PA_REFCNT_VALUE(c) >= 1);
420 if (c->version < 12) {
421 pa_context_fail(c, PA_ERR_PROTOCOL);
425 if (pa_tagstruct_getu32(t, &channel) < 0 ||
426 pa_tagstruct_getu32(t, &di) < 0 ||
427 pa_tagstruct_gets(t, &dn) < 0 ||
428 pa_tagstruct_get_boolean(t, &suspended) < 0) {
429 pa_context_fail(c, PA_ERR_PROTOCOL);
433 if (c->version >= 13) {
435 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
436 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
437 pa_tagstruct_getu32(t, &fragsize) < 0 ||
438 pa_tagstruct_get_usec(t, &usec) < 0) {
439 pa_context_fail(c, PA_ERR_PROTOCOL);
443 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
444 pa_tagstruct_getu32(t, &tlength) < 0 ||
445 pa_tagstruct_getu32(t, &prebuf) < 0 ||
446 pa_tagstruct_getu32(t, &minreq) < 0 ||
447 pa_tagstruct_get_usec(t, &usec) < 0) {
448 pa_context_fail(c, PA_ERR_PROTOCOL);
454 if (!pa_tagstruct_eof(t)) {
455 pa_context_fail(c, PA_ERR_PROTOCOL);
459 if (!dn || di == PA_INVALID_INDEX) {
460 pa_context_fail(c, PA_ERR_PROTOCOL);
464 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
467 if (s->state != PA_STREAM_READY)
470 if (c->version >= 13) {
471 if (s->direction == PA_STREAM_RECORD)
472 s->timing_info.configured_source_usec = usec;
474 s->timing_info.configured_sink_usec = usec;
476 s->buffer_attr.maxlength = maxlength;
477 s->buffer_attr.fragsize = fragsize;
478 s->buffer_attr.tlength = tlength;
479 s->buffer_attr.prebuf = prebuf;
480 s->buffer_attr.minreq = minreq;
483 pa_xfree(s->device_name);
484 s->device_name = pa_xstrdup(dn);
485 s->device_index = di;
487 s->suspended = suspended;
489 check_smoother_status(s, TRUE, FALSE, FALSE);
490 request_auto_timing_update(s, TRUE);
492 if (s->moved_callback)
493 s->moved_callback(s, s->moved_userdata);
499 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
500 pa_context *c = userdata;
504 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
507 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
510 pa_assert(PA_REFCNT_VALUE(c) >= 1);
514 if (c->version < 15) {
515 pa_context_fail(c, PA_ERR_PROTOCOL);
519 if (pa_tagstruct_getu32(t, &channel) < 0) {
520 pa_context_fail(c, PA_ERR_PROTOCOL);
524 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
525 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
526 pa_tagstruct_getu32(t, &fragsize) < 0 ||
527 pa_tagstruct_get_usec(t, &usec) < 0) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
532 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
533 pa_tagstruct_getu32(t, &tlength) < 0 ||
534 pa_tagstruct_getu32(t, &prebuf) < 0 ||
535 pa_tagstruct_getu32(t, &minreq) < 0 ||
536 pa_tagstruct_get_usec(t, &usec) < 0) {
537 pa_context_fail(c, PA_ERR_PROTOCOL);
542 if (!pa_tagstruct_eof(t)) {
543 pa_context_fail(c, PA_ERR_PROTOCOL);
547 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
550 if (s->state != PA_STREAM_READY)
553 if (s->direction == PA_STREAM_RECORD)
554 s->timing_info.configured_source_usec = usec;
556 s->timing_info.configured_sink_usec = usec;
558 s->buffer_attr.maxlength = maxlength;
559 s->buffer_attr.fragsize = fragsize;
560 s->buffer_attr.tlength = tlength;
561 s->buffer_attr.prebuf = prebuf;
562 s->buffer_attr.minreq = minreq;
564 request_auto_timing_update(s, TRUE);
566 if (s->buffer_attr_callback)
567 s->buffer_attr_callback(s, s->buffer_attr_userdata);
573 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574 pa_context *c = userdata;
580 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
583 pa_assert(PA_REFCNT_VALUE(c) >= 1);
587 if (c->version < 12) {
588 pa_context_fail(c, PA_ERR_PROTOCOL);
592 if (pa_tagstruct_getu32(t, &channel) < 0 ||
593 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
594 !pa_tagstruct_eof(t)) {
595 pa_context_fail(c, PA_ERR_PROTOCOL);
599 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
602 if (s->state != PA_STREAM_READY)
605 s->suspended = suspended;
607 check_smoother_status(s, TRUE, FALSE, FALSE);
608 request_auto_timing_update(s, TRUE);
610 if (s->suspended_callback)
611 s->suspended_callback(s, s->suspended_userdata);
617 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
618 pa_context *c = userdata;
623 pa_assert(command == PA_COMMAND_STARTED);
626 pa_assert(PA_REFCNT_VALUE(c) >= 1);
630 if (c->version < 13) {
631 pa_context_fail(c, PA_ERR_PROTOCOL);
635 if (pa_tagstruct_getu32(t, &channel) < 0 ||
636 !pa_tagstruct_eof(t)) {
637 pa_context_fail(c, PA_ERR_PROTOCOL);
641 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
644 if (s->state != PA_STREAM_READY)
647 check_smoother_status(s, TRUE, TRUE, FALSE);
648 request_auto_timing_update(s, TRUE);
650 if (s->started_callback)
651 s->started_callback(s, s->started_userdata);
657 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
658 pa_context *c = userdata;
661 pa_proplist *pl = NULL;
665 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
668 pa_assert(PA_REFCNT_VALUE(c) >= 1);
672 if (c->version < 15) {
673 pa_context_fail(c, PA_ERR_PROTOCOL);
677 pl = pa_proplist_new();
679 if (pa_tagstruct_getu32(t, &channel) < 0 ||
680 pa_tagstruct_gets(t, &event) < 0 ||
681 pa_tagstruct_get_proplist(t, pl) < 0 ||
682 !pa_tagstruct_eof(t) || !event) {
683 pa_context_fail(c, PA_ERR_PROTOCOL);
687 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
690 if (s->state != PA_STREAM_READY)
693 if (s->event_callback)
694 s->event_callback(s, event, pl, s->event_userdata);
700 pa_proplist_free(pl);
703 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
705 pa_context *c = userdata;
706 uint32_t bytes, channel;
709 pa_assert(command == PA_COMMAND_REQUEST);
712 pa_assert(PA_REFCNT_VALUE(c) >= 1);
716 if (pa_tagstruct_getu32(t, &channel) < 0 ||
717 pa_tagstruct_getu32(t, &bytes) < 0 ||
718 !pa_tagstruct_eof(t)) {
719 pa_context_fail(c, PA_ERR_PROTOCOL);
723 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
726 if (s->state != PA_STREAM_READY)
729 s->requested_bytes += bytes;
731 if (s->requested_bytes > 0 && s->write_callback)
732 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
738 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
740 pa_context *c = userdata;
744 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
747 pa_assert(PA_REFCNT_VALUE(c) >= 1);
751 if (pa_tagstruct_getu32(t, &channel) < 0 ||
752 !pa_tagstruct_eof(t)) {
753 pa_context_fail(c, PA_ERR_PROTOCOL);
757 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
760 if (s->state != PA_STREAM_READY)
763 if (s->buffer_attr.prebuf > 0)
764 check_smoother_status(s, TRUE, FALSE, TRUE);
766 request_auto_timing_update(s, TRUE);
768 if (command == PA_COMMAND_OVERFLOW) {
769 if (s->overflow_callback)
770 s->overflow_callback(s, s->overflow_userdata);
771 } else if (command == PA_COMMAND_UNDERFLOW) {
772 if (s->underflow_callback)
773 s->underflow_callback(s, s->underflow_userdata);
780 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
782 pa_assert(PA_REFCNT_VALUE(s) >= 1);
784 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
786 if (s->state != PA_STREAM_READY)
790 s->write_index_not_before = s->context->ctag;
792 if (s->timing_info_valid)
793 s->timing_info.write_index_corrupt = TRUE;
795 /* pa_log("write_index invalidated"); */
799 s->read_index_not_before = s->context->ctag;
801 if (s->timing_info_valid)
802 s->timing_info.read_index_corrupt = TRUE;
804 /* pa_log("read_index invalidated"); */
807 request_auto_timing_update(s, TRUE);
810 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
811 pa_stream *s = userdata;
814 pa_assert(PA_REFCNT_VALUE(s) >= 1);
817 request_auto_timing_update(s, FALSE);
821 static void create_stream_complete(pa_stream *s) {
823 pa_assert(PA_REFCNT_VALUE(s) >= 1);
824 pa_assert(s->state == PA_STREAM_CREATING);
826 pa_stream_set_state(s, PA_STREAM_READY);
828 if (s->requested_bytes > 0 && s->write_callback)
829 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
831 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
833 pa_gettimeofday(&tv);
834 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
835 pa_timeval_add(&tv, s->auto_timing_interval_usec);
836 pa_assert(!s->auto_timing_update_event);
837 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
839 request_auto_timing_update(s, TRUE);
842 check_smoother_status(s, TRUE, FALSE, FALSE);
845 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
850 if (s->context->version >= 13)
853 /* Version older than 0.9.10 didn't do server side buffer_attr
854 * selection, hence we have to fake it on the client side. */
856 /* We choose fairly conservative values here, to not confuse
857 * old clients with extremely large playback buffers */
859 if (attr->maxlength == (uint32_t) -1)
860 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
862 if (attr->tlength == (uint32_t) -1)
863 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
865 if (attr->minreq == (uint32_t) -1)
866 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
868 if (attr->prebuf == (uint32_t) -1)
869 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
871 if (attr->fragsize == (uint32_t) -1)
872 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
875 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
876 pa_stream *s = userdata;
877 uint32_t requested_bytes;
881 pa_assert(PA_REFCNT_VALUE(s) >= 1);
882 pa_assert(s->state == PA_STREAM_CREATING);
886 if (command != PA_COMMAND_REPLY) {
887 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
890 pa_stream_set_state(s, PA_STREAM_FAILED);
894 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
895 s->channel == PA_INVALID_INDEX ||
896 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
897 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
898 pa_context_fail(s->context, PA_ERR_PROTOCOL);
902 s->requested_bytes = (int64_t) requested_bytes;
904 if (s->context->version >= 9) {
905 if (s->direction == PA_STREAM_PLAYBACK) {
906 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
907 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
908 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
909 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
910 pa_context_fail(s->context, PA_ERR_PROTOCOL);
913 } else if (s->direction == PA_STREAM_RECORD) {
914 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
915 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
916 pa_context_fail(s->context, PA_ERR_PROTOCOL);
922 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
925 const char *dn = NULL;
928 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
929 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
930 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
931 pa_tagstruct_gets(t, &dn) < 0 ||
932 pa_tagstruct_get_boolean(t, &suspended) < 0) {
933 pa_context_fail(s->context, PA_ERR_PROTOCOL);
937 if (!dn || s->device_index == PA_INVALID_INDEX ||
938 ss.channels != cm.channels ||
939 !pa_channel_map_valid(&cm) ||
940 !pa_sample_spec_valid(&ss) ||
941 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
942 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
943 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
944 pa_context_fail(s->context, PA_ERR_PROTOCOL);
948 pa_xfree(s->device_name);
949 s->device_name = pa_xstrdup(dn);
950 s->suspended = suspended;
956 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
959 if (pa_tagstruct_get_usec(t, &usec) < 0) {
960 pa_context_fail(s->context, PA_ERR_PROTOCOL);
964 if (s->direction == PA_STREAM_RECORD)
965 s->timing_info.configured_source_usec = usec;
967 s->timing_info.configured_sink_usec = usec;
970 if (!pa_tagstruct_eof(t)) {
971 pa_context_fail(s->context, PA_ERR_PROTOCOL);
975 if (s->direction == PA_STREAM_RECORD) {
976 pa_assert(!s->record_memblockq);
978 s->record_memblockq = pa_memblockq_new(
980 s->buffer_attr.maxlength,
982 pa_frame_size(&s->sample_spec),
989 s->channel_valid = TRUE;
990 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
992 create_stream_complete(s);
998 static int create_stream(
999 pa_stream_direction_t direction,
1002 const pa_buffer_attr *attr,
1003 pa_stream_flags_t flags,
1004 const pa_cvolume *volume,
1005 pa_stream *sync_stream) {
1009 pa_bool_t volume_set = FALSE;
1012 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1013 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1015 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1016 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1017 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1018 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1019 PA_STREAM_INTERPOLATE_TIMING|
1020 PA_STREAM_NOT_MONOTONIC|
1021 PA_STREAM_AUTO_TIMING_UPDATE|
1022 PA_STREAM_NO_REMAP_CHANNELS|
1023 PA_STREAM_NO_REMIX_CHANNELS|
1024 PA_STREAM_FIX_FORMAT|
1026 PA_STREAM_FIX_CHANNELS|
1027 PA_STREAM_DONT_MOVE|
1028 PA_STREAM_VARIABLE_RATE|
1029 PA_STREAM_PEAK_DETECT|
1030 PA_STREAM_START_MUTED|
1031 PA_STREAM_ADJUST_LATENCY|
1032 PA_STREAM_EARLY_REQUESTS|
1033 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1034 PA_STREAM_START_UNMUTED|
1035 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1037 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1038 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1039 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1040 /* Althought some of the other flags are not supported on older
1041 * version, we don't check for them here, because it doesn't hurt
1042 * when they are passed but actually not supported. This makes
1043 * client development easier */
1045 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1046 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1047 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1048 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1049 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1053 s->direction = direction;
1055 s->corked = !!(flags & PA_STREAM_START_CORKED);
1058 s->syncid = sync_stream->syncid;
1061 s->buffer_attr = *attr;
1062 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1064 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1068 pa_smoother_free(s->smoother);
1070 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
1072 x = pa_rtclock_usec();
1073 pa_smoother_set_time_offset(s->smoother, x);
1074 pa_smoother_pause(s->smoother, x);
1078 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1080 t = pa_tagstruct_command(
1082 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1085 if (s->context->version < 13)
1086 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1090 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1091 PA_TAG_CHANNEL_MAP, &s->channel_map,
1092 PA_TAG_U32, PA_INVALID_INDEX,
1094 PA_TAG_U32, s->buffer_attr.maxlength,
1095 PA_TAG_BOOLEAN, s->corked,
1098 if (s->direction == PA_STREAM_PLAYBACK) {
1103 PA_TAG_U32, s->buffer_attr.tlength,
1104 PA_TAG_U32, s->buffer_attr.prebuf,
1105 PA_TAG_U32, s->buffer_attr.minreq,
1106 PA_TAG_U32, s->syncid,
1109 volume_set = !!volume;
1112 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1114 pa_tagstruct_put_cvolume(t, volume);
1116 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1118 if (s->context->version >= 12) {
1121 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1122 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1123 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1124 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1125 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1126 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1127 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1131 if (s->context->version >= 13) {
1133 if (s->direction == PA_STREAM_PLAYBACK)
1134 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1136 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1140 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1141 PA_TAG_PROPLIST, s->proplist,
1144 if (s->direction == PA_STREAM_RECORD)
1145 pa_tagstruct_putu32(t, s->direct_on_input);
1148 if (s->context->version >= 14) {
1150 if (s->direction == PA_STREAM_PLAYBACK)
1151 pa_tagstruct_put_boolean(t, volume_set);
1153 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1156 if (s->context->version >= 15) {
1158 if (s->direction == PA_STREAM_PLAYBACK)
1159 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1161 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1162 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1165 pa_pstream_send_tagstruct(s->context->pstream, t);
1166 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1168 pa_stream_set_state(s, PA_STREAM_CREATING);
1174 int pa_stream_connect_playback(
1177 const pa_buffer_attr *attr,
1178 pa_stream_flags_t flags,
1180 pa_stream *sync_stream) {
1183 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1185 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1188 int pa_stream_connect_record(
1191 const pa_buffer_attr *attr,
1192 pa_stream_flags_t flags) {
1195 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1197 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1200 int pa_stream_write(
1204 void (*free_cb)(void *p),
1206 pa_seek_mode_t seek) {
1209 pa_seek_mode_t t_seek;
1215 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1218 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1219 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1220 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1221 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1222 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1232 while (t_length > 0) {
1236 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1237 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1238 chunk.length = t_length;
1242 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1243 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1245 d = pa_memblock_acquire(chunk.memblock);
1246 memcpy(d, t_data, chunk.length);
1247 pa_memblock_release(chunk.memblock);
1250 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1253 t_seek = PA_SEEK_RELATIVE;
1255 t_data = (const uint8_t*) t_data + chunk.length;
1256 t_length -= chunk.length;
1258 pa_memblock_unref(chunk.memblock);
1261 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1262 free_cb((void*) data);
1264 /* This is obviously wrong since we ignore the seeking index . But
1265 * that's OK, the server side applies the same error */
1266 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1268 if (s->direction == PA_STREAM_PLAYBACK) {
1270 /* Update latency request correction */
1271 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1273 if (seek == PA_SEEK_ABSOLUTE) {
1274 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1275 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1276 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1277 } else if (seek == PA_SEEK_RELATIVE) {
1278 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1279 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1281 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1284 /* Update the write index in the already available latency data */
1285 if (s->timing_info_valid) {
1287 if (seek == PA_SEEK_ABSOLUTE) {
1288 s->timing_info.write_index_corrupt = FALSE;
1289 s->timing_info.write_index = offset + (int64_t) length;
1290 } else if (seek == PA_SEEK_RELATIVE) {
1291 if (!s->timing_info.write_index_corrupt)
1292 s->timing_info.write_index += offset + (int64_t) length;
1294 s->timing_info.write_index_corrupt = TRUE;
1297 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1298 request_auto_timing_update(s, TRUE);
1304 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1306 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1310 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1311 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1312 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1314 if (!s->peek_memchunk.memblock) {
1316 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1322 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1325 pa_assert(s->peek_data);
1326 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1327 *length = s->peek_memchunk.length;
1331 int pa_stream_drop(pa_stream *s) {
1333 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1335 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1336 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1337 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1338 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1340 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1342 /* Fix the simulated local read index */
1343 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1344 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1346 pa_assert(s->peek_data);
1347 pa_memblock_release(s->peek_memchunk.memblock);
1348 pa_memblock_unref(s->peek_memchunk.memblock);
1349 pa_memchunk_reset(&s->peek_memchunk);
1354 size_t pa_stream_writable_size(pa_stream *s) {
1356 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1358 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1359 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1360 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1362 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1365 size_t pa_stream_readable_size(pa_stream *s) {
1367 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1369 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1370 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1371 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1373 return pa_memblockq_get_length(s->record_memblockq);
1376 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1382 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1384 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1385 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1386 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1388 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1390 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1391 pa_tagstruct_putu32(t, s->channel);
1392 pa_pstream_send_tagstruct(s->context->pstream, t);
1393 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1398 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1402 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1403 pa_assert(s->state == PA_STREAM_READY);
1404 pa_assert(s->direction != PA_STREAM_UPLOAD);
1405 pa_assert(s->timing_info_valid);
1406 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1407 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1409 if (s->direction == PA_STREAM_PLAYBACK) {
1410 /* The last byte that was written into the output device
1411 * had this time value associated */
1412 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1414 if (!s->corked && !s->suspended) {
1416 if (!ignore_transport)
1417 /* Because the latency info took a little time to come
1418 * to us, we assume that the real output time is actually
1420 usec += s->timing_info.transport_usec;
1422 /* However, the output device usually maintains a buffer
1423 too, hence the real sample currently played is a little
1425 if (s->timing_info.sink_usec >= usec)
1428 usec -= s->timing_info.sink_usec;
1432 pa_assert(s->direction == PA_STREAM_RECORD);
1434 /* The last byte written into the server side queue had
1435 * this time value associated */
1436 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1438 if (!s->corked && !s->suspended) {
1440 if (!ignore_transport)
1441 /* Add transport latency */
1442 usec += s->timing_info.transport_usec;
1444 /* Add latency of data in device buffer */
1445 usec += s->timing_info.source_usec;
1447 /* If this is a monitor source, we need to correct the
1448 * time by the playback device buffer */
1449 if (s->timing_info.sink_usec >= usec)
1452 usec -= s->timing_info.sink_usec;
1459 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1460 pa_operation *o = userdata;
1461 struct timeval local, remote, now;
1463 pa_bool_t playing = FALSE;
1464 uint64_t underrun_for = 0, playing_for = 0;
1468 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1470 if (!o->context || !o->stream)
1473 i = &o->stream->timing_info;
1475 o->stream->timing_info_valid = FALSE;
1476 i->write_index_corrupt = TRUE;
1477 i->read_index_corrupt = TRUE;
1479 if (command != PA_COMMAND_REPLY) {
1480 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1485 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1486 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1487 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1488 pa_tagstruct_get_timeval(t, &local) < 0 ||
1489 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1490 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1491 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1493 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1497 if (o->context->version >= 13 &&
1498 o->stream->direction == PA_STREAM_PLAYBACK)
1499 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1500 pa_tagstruct_getu64(t, &playing_for) < 0) {
1502 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1507 if (!pa_tagstruct_eof(t)) {
1508 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1511 o->stream->timing_info_valid = TRUE;
1512 i->write_index_corrupt = FALSE;
1513 i->read_index_corrupt = FALSE;
1515 i->playing = (int) playing;
1516 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1518 pa_gettimeofday(&now);
1520 /* Calculcate timestamps */
1521 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1522 /* local and remote seem to have synchronized clocks */
1524 if (o->stream->direction == PA_STREAM_PLAYBACK)
1525 i->transport_usec = pa_timeval_diff(&remote, &local);
1527 i->transport_usec = pa_timeval_diff(&now, &remote);
1529 i->synchronized_clocks = TRUE;
1530 i->timestamp = remote;
1532 /* clocks are not synchronized, let's estimate latency then */
1533 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1534 i->synchronized_clocks = FALSE;
1535 i->timestamp = local;
1536 pa_timeval_add(&i->timestamp, i->transport_usec);
1539 /* Invalidate read and write indexes if necessary */
1540 if (tag < o->stream->read_index_not_before)
1541 i->read_index_corrupt = TRUE;
1543 if (tag < o->stream->write_index_not_before)
1544 i->write_index_corrupt = TRUE;
1546 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1547 /* Write index correction */
1550 uint32_t ctag = tag;
1552 /* Go through the saved correction values and add up the
1553 * total correction.*/
1554 for (n = 0, j = o->stream->current_write_index_correction+1;
1555 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1556 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1558 /* Step over invalid data or out-of-date data */
1559 if (!o->stream->write_index_corrections[j].valid ||
1560 o->stream->write_index_corrections[j].tag < ctag)
1563 /* Make sure that everything is in order */
1564 ctag = o->stream->write_index_corrections[j].tag+1;
1566 /* Now fix the write index */
1567 if (o->stream->write_index_corrections[j].corrupt) {
1568 /* A corrupting seek was made */
1569 i->write_index_corrupt = TRUE;
1570 } else if (o->stream->write_index_corrections[j].absolute) {
1571 /* An absolute seek was made */
1572 i->write_index = o->stream->write_index_corrections[j].value;
1573 i->write_index_corrupt = FALSE;
1574 } else if (!i->write_index_corrupt) {
1575 /* A relative seek was made */
1576 i->write_index += o->stream->write_index_corrections[j].value;
1580 /* Clear old correction entries */
1581 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1582 if (!o->stream->write_index_corrections[n].valid)
1585 if (o->stream->write_index_corrections[n].tag <= tag)
1586 o->stream->write_index_corrections[n].valid = FALSE;
1590 if (o->stream->direction == PA_STREAM_RECORD) {
1591 /* Read index correction */
1593 if (!i->read_index_corrupt)
1594 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1597 /* Update smoother */
1598 if (o->stream->smoother) {
1601 u = x = pa_rtclock_usec() - i->transport_usec;
1603 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1606 /* If we weren't playing then it will take some time
1607 * until the audio will actually come out through the
1608 * speakers. Since we follow that timing here, we need
1609 * to try to fix this up */
1611 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1613 if (su < i->sink_usec)
1614 x += i->sink_usec - su;
1618 pa_smoother_pause(o->stream->smoother, x);
1620 /* Update the smoother */
1621 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1622 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1623 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1626 pa_smoother_resume(o->stream->smoother, x);
1630 o->stream->auto_timing_update_requested = FALSE;
1632 if (o->stream->latency_update_callback)
1633 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1635 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1636 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1637 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1642 pa_operation_done(o);
1643 pa_operation_unref(o);
1646 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1654 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1656 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1657 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1658 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1660 if (s->direction == PA_STREAM_PLAYBACK) {
1661 /* Find a place to store the write_index correction data for this entry */
1662 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1664 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1665 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1667 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1669 t = pa_tagstruct_command(
1671 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1673 pa_tagstruct_putu32(t, s->channel);
1674 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1676 pa_pstream_send_tagstruct(s->context->pstream, t);
1677 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1679 if (s->direction == PA_STREAM_PLAYBACK) {
1680 /* Fill in initial correction data */
1682 s->current_write_index_correction = cidx;
1684 s->write_index_corrections[cidx].valid = TRUE;
1685 s->write_index_corrections[cidx].absolute = FALSE;
1686 s->write_index_corrections[cidx].corrupt = FALSE;
1687 s->write_index_corrections[cidx].tag = tag;
1688 s->write_index_corrections[cidx].value = 0;
1694 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1695 pa_stream *s = userdata;
1699 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1703 if (command != PA_COMMAND_REPLY) {
1704 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1707 pa_stream_set_state(s, PA_STREAM_FAILED);
1709 } else if (!pa_tagstruct_eof(t)) {
1710 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1714 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1720 int pa_stream_disconnect(pa_stream *s) {
1725 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1727 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1728 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1729 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1733 t = pa_tagstruct_command(
1735 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1736 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1738 pa_tagstruct_putu32(t, s->channel);
1739 pa_pstream_send_tagstruct(s->context->pstream, t);
1740 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1746 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1748 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1750 if (pa_detect_fork())
1753 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1756 s->read_callback = cb;
1757 s->read_userdata = userdata;
1760 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1762 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1764 if (pa_detect_fork())
1767 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1770 s->write_callback = cb;
1771 s->write_userdata = userdata;
1774 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1776 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1778 if (pa_detect_fork())
1781 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1784 s->state_callback = cb;
1785 s->state_userdata = userdata;
1788 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1790 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1792 if (pa_detect_fork())
1795 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1798 s->overflow_callback = cb;
1799 s->overflow_userdata = userdata;
1802 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1804 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1806 if (pa_detect_fork())
1809 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1812 s->underflow_callback = cb;
1813 s->underflow_userdata = userdata;
1816 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1818 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1820 if (pa_detect_fork())
1823 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1826 s->latency_update_callback = cb;
1827 s->latency_update_userdata = userdata;
1830 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1832 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1834 if (pa_detect_fork())
1837 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1840 s->moved_callback = cb;
1841 s->moved_userdata = userdata;
1844 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1846 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1848 if (pa_detect_fork())
1851 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1854 s->suspended_callback = cb;
1855 s->suspended_userdata = userdata;
1858 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1860 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1862 if (pa_detect_fork())
1865 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1868 s->started_callback = cb;
1869 s->started_userdata = userdata;
1872 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1876 if (pa_detect_fork())
1879 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1882 s->event_callback = cb;
1883 s->event_userdata = userdata;
1886 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1888 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1890 if (pa_detect_fork())
1893 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1896 s->buffer_attr_callback = cb;
1897 s->buffer_attr_userdata = userdata;
1900 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1901 pa_operation *o = userdata;
1906 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1911 if (command != PA_COMMAND_REPLY) {
1912 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1916 } else if (!pa_tagstruct_eof(t)) {
1917 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1922 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1923 cb(o->stream, success, o->userdata);
1927 pa_operation_done(o);
1928 pa_operation_unref(o);
1931 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1937 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1939 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1940 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1941 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1945 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1947 t = pa_tagstruct_command(
1949 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1951 pa_tagstruct_putu32(t, s->channel);
1952 pa_tagstruct_put_boolean(t, !!b);
1953 pa_pstream_send_tagstruct(s->context->pstream, t);
1954 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1956 check_smoother_status(s, FALSE, FALSE, FALSE);
1958 /* This might cause the indexes to hang/start again, hence
1959 * let's request a timing update */
1960 request_auto_timing_update(s, TRUE);
1965 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1971 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1973 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1974 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1976 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1978 t = pa_tagstruct_command(s->context, command, &tag);
1979 pa_tagstruct_putu32(t, s->channel);
1980 pa_pstream_send_tagstruct(s->context->pstream, t);
1981 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1986 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1990 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1992 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1993 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1994 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1996 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1999 if (s->direction == PA_STREAM_PLAYBACK) {
2001 if (s->write_index_corrections[s->current_write_index_correction].valid)
2002 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2004 if (s->buffer_attr.prebuf > 0)
2005 check_smoother_status(s, FALSE, FALSE, TRUE);
2007 /* This will change the write index, but leave the
2008 * read index untouched. */
2009 invalidate_indexes(s, FALSE, TRUE);
2012 /* For record streams this has no influence on the write
2013 * index, but the read index might jump. */
2014 invalidate_indexes(s, TRUE, FALSE);
2019 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2023 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2025 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2026 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2027 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2028 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2030 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2033 /* This might cause the read index to hang again, hence
2034 * let's request a timing update */
2035 request_auto_timing_update(s, TRUE);
2040 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2044 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2047 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2048 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2049 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2051 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2054 /* This might cause the read index to start moving again, hence
2055 * let's request a timing update */
2056 request_auto_timing_update(s, TRUE);
2061 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2065 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2068 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2069 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2070 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2072 if (s->context->version >= 13) {
2073 pa_proplist *p = pa_proplist_new();
2075 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2076 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2077 pa_proplist_free(p);
2082 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2083 t = pa_tagstruct_command(
2085 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2087 pa_tagstruct_putu32(t, s->channel);
2088 pa_tagstruct_puts(t, name);
2089 pa_pstream_send_tagstruct(s->context->pstream, t);
2090 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2096 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2100 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2102 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2103 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2104 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2105 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2106 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2107 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2110 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2112 usec = calc_time(s, FALSE);
2114 /* Make sure the time runs monotonically */
2115 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2116 if (usec < s->previous_time)
2117 usec = s->previous_time;
2119 s->previous_time = usec;
2128 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2130 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2138 if (negative && s->direction == PA_STREAM_RECORD) {
2146 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2152 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2155 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2156 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2157 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2158 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2159 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2160 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2162 if ((r = pa_stream_get_time(s, &t)) < 0)
2165 if (s->direction == PA_STREAM_PLAYBACK)
2166 cindex = s->timing_info.write_index;
2168 cindex = s->timing_info.read_index;
2173 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2175 if (s->direction == PA_STREAM_PLAYBACK)
2176 *r_usec = time_counter_diff(s, c, t, negative);
2178 *r_usec = time_counter_diff(s, t, c, negative);
2183 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2185 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2187 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2188 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2189 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2190 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2192 return &s->timing_info;
2195 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2197 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2201 return &s->sample_spec;
2204 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2206 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2208 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2210 return &s->channel_map;
2213 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2215 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2217 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2218 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2219 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2221 return &s->buffer_attr;
2224 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2225 pa_operation *o = userdata;
2230 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2235 if (command != PA_COMMAND_REPLY) {
2236 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2241 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2242 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2243 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2244 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2245 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2246 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2249 } else if (o->stream->direction == PA_STREAM_RECORD) {
2250 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2251 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2252 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2257 if (o->stream->context->version >= 13) {
2260 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2261 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2265 if (o->stream->direction == PA_STREAM_RECORD)
2266 o->stream->timing_info.configured_source_usec = usec;
2268 o->stream->timing_info.configured_sink_usec = usec;
2271 if (!pa_tagstruct_eof(t)) {
2272 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2278 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2279 cb(o->stream, success, o->userdata);
2283 pa_operation_done(o);
2284 pa_operation_unref(o);
2288 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2294 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2297 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2298 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2299 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2300 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2302 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2304 t = pa_tagstruct_command(
2306 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2308 pa_tagstruct_putu32(t, s->channel);
2310 pa_tagstruct_putu32(t, attr->maxlength);
2312 if (s->direction == PA_STREAM_PLAYBACK)
2315 PA_TAG_U32, attr->tlength,
2316 PA_TAG_U32, attr->prebuf,
2317 PA_TAG_U32, attr->minreq,
2320 pa_tagstruct_putu32(t, attr->fragsize);
2322 if (s->context->version >= 13)
2323 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2325 if (s->context->version >= 14)
2326 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2328 pa_pstream_send_tagstruct(s->context->pstream, t);
2329 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2331 /* This might cause changes in the read/write indexex, hence let's
2332 * request a timing update */
2333 request_auto_timing_update(s, TRUE);
2338 uint32_t pa_stream_get_device_index(pa_stream *s) {
2340 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2343 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2344 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2345 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2346 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2348 return s->device_index;
2351 const char *pa_stream_get_device_name(pa_stream *s) {
2353 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2356 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2357 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2358 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2361 return s->device_name;
2364 int pa_stream_is_suspended(pa_stream *s) {
2366 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2368 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2369 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2370 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2371 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2373 return s->suspended;
2376 int pa_stream_is_corked(pa_stream *s) {
2378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2381 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2387 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2388 pa_operation *o = userdata;
2393 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2398 if (command != PA_COMMAND_REPLY) {
2399 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2405 if (!pa_tagstruct_eof(t)) {
2406 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2411 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2412 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2415 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2416 cb(o->stream, success, o->userdata);
2420 pa_operation_done(o);
2421 pa_operation_unref(o);
2425 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2431 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2433 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2434 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2435 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2436 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2437 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2438 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2440 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2441 o->private = PA_UINT_TO_PTR(rate);
2443 t = pa_tagstruct_command(
2445 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2447 pa_tagstruct_putu32(t, s->channel);
2448 pa_tagstruct_putu32(t, rate);
2450 pa_pstream_send_tagstruct(s->context->pstream, t);
2451 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2456 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2462 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2464 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2465 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2466 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2467 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2468 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2470 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2472 t = pa_tagstruct_command(
2474 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2476 pa_tagstruct_putu32(t, s->channel);
2477 pa_tagstruct_putu32(t, (uint32_t) mode);
2478 pa_tagstruct_put_proplist(t, p);
2480 pa_pstream_send_tagstruct(s->context->pstream, t);
2481 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2483 /* Please note that we don't update s->proplist here, because we
2484 * don't export that field */
2489 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2493 const char * const*k;
2496 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2500 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2504 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2506 t = pa_tagstruct_command(
2508 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2510 pa_tagstruct_putu32(t, s->channel);
2512 for (k = keys; *k; k++)
2513 pa_tagstruct_puts(t, *k);
2515 pa_tagstruct_puts(t, NULL);
2517 pa_pstream_send_tagstruct(s->context->pstream, t);
2518 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2520 /* Please note that we don't update s->proplist here, because we
2521 * don't export that field */
2526 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2528 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2531 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2532 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2533 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2535 s->direct_on_input = sink_input_idx;
2540 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2542 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2544 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2545 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2546 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2548 return s->direct_on_input;