2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
147 s->write_memblock = NULL;
148 s->write_data = NULL;
150 pa_memchunk_reset(&s->peek_memchunk);
152 s->record_memblockq = NULL;
154 memset(&s->timing_info, 0, sizeof(s->timing_info));
155 s->timing_info_valid = FALSE;
157 s->previous_time = 0;
159 s->read_index_not_before = 0;
160 s->write_index_not_before = 0;
161 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162 s->write_index_corrections[i].valid = 0;
163 s->current_write_index_correction = 0;
165 s->auto_timing_update_event = NULL;
166 s->auto_timing_update_requested = FALSE;
167 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
173 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174 PA_LLIST_PREPEND(pa_stream, c->streams, s);
180 static void stream_unlink(pa_stream *s) {
187 /* Detach from context */
189 /* Unref all operatio object that point to us */
190 for (o = s->context->operations; o; o = n) {
194 pa_operation_cancel(o);
197 /* Drop all outstanding replies for this stream */
198 if (s->context->pdispatch)
199 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201 if (s->channel_valid) {
202 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
204 s->channel_valid = FALSE;
207 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
212 if (s->auto_timing_update_event) {
213 pa_assert(s->mainloop);
214 s->mainloop->time_free(s->auto_timing_update_event);
220 static void stream_free(pa_stream *s) {
225 if (s->write_memblock) {
226 pa_memblock_release(s->write_memblock);
227 pa_memblock_unref(s->write_data);
230 if (s->peek_memchunk.memblock) {
232 pa_memblock_release(s->peek_memchunk.memblock);
233 pa_memblock_unref(s->peek_memchunk.memblock);
236 if (s->record_memblockq)
237 pa_memblockq_free(s->record_memblockq);
240 pa_proplist_free(s->proplist);
243 pa_smoother_free(s->smoother);
245 pa_xfree(s->device_name);
249 void pa_stream_unref(pa_stream *s) {
251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253 if (PA_REFCNT_DEC(s) <= 0)
257 pa_stream* pa_stream_ref(pa_stream *s) {
259 pa_assert(PA_REFCNT_VALUE(s) >= 1);
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
272 pa_context* pa_stream_get_context(pa_stream *s) {
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
279 uint32_t pa_stream_get_index(pa_stream *s) {
281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286 return s->stream_index;
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
300 if (s->state_callback)
301 s->state_callback(s, s->state_userdata);
303 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
313 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316 if (s->state == PA_STREAM_READY &&
317 (force || !s->auto_timing_update_requested)) {
320 /* pa_log("Automatically requesting new timing data"); */
322 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323 pa_operation_unref(o);
324 s->auto_timing_update_requested = TRUE;
328 if (s->auto_timing_update_event) {
330 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339 pa_context *c = userdata;
344 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347 pa_assert(PA_REFCNT_VALUE(c) >= 1);
351 if (pa_tagstruct_getu32(t, &channel) < 0 ||
352 !pa_tagstruct_eof(t)) {
353 pa_context_fail(c, PA_ERR_PROTOCOL);
357 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
360 if (s->state != PA_STREAM_READY)
363 pa_context_set_error(c, PA_ERR_KILLED);
364 pa_stream_set_state(s, PA_STREAM_FAILED);
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
374 pa_assert(!force_start || !force_stop);
379 x = pa_rtclock_now();
381 if (s->timing_info_valid) {
383 x -= s->timing_info.transport_usec;
385 x += s->timing_info.transport_usec;
388 if (s->suspended || s->corked || force_stop)
389 pa_smoother_pause(s->smoother, x);
390 else if (force_start || s->buffer_attr.prebuf == 0)
391 pa_smoother_resume(s->smoother, x, TRUE);
394 /* Please note that we have no idea if playback actually started
395 * if prebuf is non-zero! */
398 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
399 pa_context *c = userdata;
406 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
409 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
412 pa_assert(PA_REFCNT_VALUE(c) >= 1);
416 if (c->version < 12) {
417 pa_context_fail(c, PA_ERR_PROTOCOL);
421 if (pa_tagstruct_getu32(t, &channel) < 0 ||
422 pa_tagstruct_getu32(t, &di) < 0 ||
423 pa_tagstruct_gets(t, &dn) < 0 ||
424 pa_tagstruct_get_boolean(t, &suspended) < 0) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
429 if (c->version >= 13) {
431 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
432 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
433 pa_tagstruct_getu32(t, &fragsize) < 0 ||
434 pa_tagstruct_get_usec(t, &usec) < 0) {
435 pa_context_fail(c, PA_ERR_PROTOCOL);
439 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
440 pa_tagstruct_getu32(t, &tlength) < 0 ||
441 pa_tagstruct_getu32(t, &prebuf) < 0 ||
442 pa_tagstruct_getu32(t, &minreq) < 0 ||
443 pa_tagstruct_get_usec(t, &usec) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
450 if (!pa_tagstruct_eof(t)) {
451 pa_context_fail(c, PA_ERR_PROTOCOL);
455 if (!dn || di == PA_INVALID_INDEX) {
456 pa_context_fail(c, PA_ERR_PROTOCOL);
460 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
463 if (s->state != PA_STREAM_READY)
466 if (c->version >= 13) {
467 if (s->direction == PA_STREAM_RECORD)
468 s->timing_info.configured_source_usec = usec;
470 s->timing_info.configured_sink_usec = usec;
472 s->buffer_attr.maxlength = maxlength;
473 s->buffer_attr.fragsize = fragsize;
474 s->buffer_attr.tlength = tlength;
475 s->buffer_attr.prebuf = prebuf;
476 s->buffer_attr.minreq = minreq;
479 pa_xfree(s->device_name);
480 s->device_name = pa_xstrdup(dn);
481 s->device_index = di;
483 s->suspended = suspended;
485 check_smoother_status(s, TRUE, FALSE, FALSE);
486 request_auto_timing_update(s, TRUE);
488 if (s->moved_callback)
489 s->moved_callback(s, s->moved_userdata);
495 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496 pa_context *c = userdata;
500 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
503 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
506 pa_assert(PA_REFCNT_VALUE(c) >= 1);
510 if (c->version < 15) {
511 pa_context_fail(c, PA_ERR_PROTOCOL);
515 if (pa_tagstruct_getu32(t, &channel) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
520 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
521 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
522 pa_tagstruct_getu32(t, &fragsize) < 0 ||
523 pa_tagstruct_get_usec(t, &usec) < 0) {
524 pa_context_fail(c, PA_ERR_PROTOCOL);
528 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529 pa_tagstruct_getu32(t, &tlength) < 0 ||
530 pa_tagstruct_getu32(t, &prebuf) < 0 ||
531 pa_tagstruct_getu32(t, &minreq) < 0 ||
532 pa_tagstruct_get_usec(t, &usec) < 0) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
538 if (!pa_tagstruct_eof(t)) {
539 pa_context_fail(c, PA_ERR_PROTOCOL);
543 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
546 if (s->state != PA_STREAM_READY)
549 if (s->direction == PA_STREAM_RECORD)
550 s->timing_info.configured_source_usec = usec;
552 s->timing_info.configured_sink_usec = usec;
554 s->buffer_attr.maxlength = maxlength;
555 s->buffer_attr.fragsize = fragsize;
556 s->buffer_attr.tlength = tlength;
557 s->buffer_attr.prebuf = prebuf;
558 s->buffer_attr.minreq = minreq;
560 request_auto_timing_update(s, TRUE);
562 if (s->buffer_attr_callback)
563 s->buffer_attr_callback(s, s->buffer_attr_userdata);
569 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570 pa_context *c = userdata;
576 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
579 pa_assert(PA_REFCNT_VALUE(c) >= 1);
583 if (c->version < 12) {
584 pa_context_fail(c, PA_ERR_PROTOCOL);
588 if (pa_tagstruct_getu32(t, &channel) < 0 ||
589 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
590 !pa_tagstruct_eof(t)) {
591 pa_context_fail(c, PA_ERR_PROTOCOL);
595 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
598 if (s->state != PA_STREAM_READY)
601 s->suspended = suspended;
603 check_smoother_status(s, TRUE, FALSE, FALSE);
604 request_auto_timing_update(s, TRUE);
606 if (s->suspended_callback)
607 s->suspended_callback(s, s->suspended_userdata);
613 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614 pa_context *c = userdata;
619 pa_assert(command == PA_COMMAND_STARTED);
622 pa_assert(PA_REFCNT_VALUE(c) >= 1);
626 if (c->version < 13) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
631 if (pa_tagstruct_getu32(t, &channel) < 0 ||
632 !pa_tagstruct_eof(t)) {
633 pa_context_fail(c, PA_ERR_PROTOCOL);
637 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
640 if (s->state != PA_STREAM_READY)
643 check_smoother_status(s, TRUE, TRUE, FALSE);
644 request_auto_timing_update(s, TRUE);
646 if (s->started_callback)
647 s->started_callback(s, s->started_userdata);
653 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
654 pa_context *c = userdata;
657 pa_proplist *pl = NULL;
661 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
664 pa_assert(PA_REFCNT_VALUE(c) >= 1);
668 if (c->version < 15) {
669 pa_context_fail(c, PA_ERR_PROTOCOL);
673 pl = pa_proplist_new();
675 if (pa_tagstruct_getu32(t, &channel) < 0 ||
676 pa_tagstruct_gets(t, &event) < 0 ||
677 pa_tagstruct_get_proplist(t, pl) < 0 ||
678 !pa_tagstruct_eof(t) || !event) {
679 pa_context_fail(c, PA_ERR_PROTOCOL);
683 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
686 if (s->state != PA_STREAM_READY)
689 if (s->event_callback)
690 s->event_callback(s, event, pl, s->event_userdata);
696 pa_proplist_free(pl);
699 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
701 pa_context *c = userdata;
702 uint32_t bytes, channel;
705 pa_assert(command == PA_COMMAND_REQUEST);
708 pa_assert(PA_REFCNT_VALUE(c) >= 1);
712 if (pa_tagstruct_getu32(t, &channel) < 0 ||
713 pa_tagstruct_getu32(t, &bytes) < 0 ||
714 !pa_tagstruct_eof(t)) {
715 pa_context_fail(c, PA_ERR_PROTOCOL);
719 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
722 if (s->state != PA_STREAM_READY)
725 s->requested_bytes += bytes;
727 if (s->requested_bytes > 0 && s->write_callback)
728 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
734 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
736 pa_context *c = userdata;
740 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
743 pa_assert(PA_REFCNT_VALUE(c) >= 1);
747 if (pa_tagstruct_getu32(t, &channel) < 0 ||
748 !pa_tagstruct_eof(t)) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
753 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
756 if (s->state != PA_STREAM_READY)
759 if (s->buffer_attr.prebuf > 0)
760 check_smoother_status(s, TRUE, FALSE, TRUE);
762 request_auto_timing_update(s, TRUE);
764 if (command == PA_COMMAND_OVERFLOW) {
765 if (s->overflow_callback)
766 s->overflow_callback(s, s->overflow_userdata);
767 } else if (command == PA_COMMAND_UNDERFLOW) {
768 if (s->underflow_callback)
769 s->underflow_callback(s, s->underflow_userdata);
776 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
778 pa_assert(PA_REFCNT_VALUE(s) >= 1);
780 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
782 if (s->state != PA_STREAM_READY)
786 s->write_index_not_before = s->context->ctag;
788 if (s->timing_info_valid)
789 s->timing_info.write_index_corrupt = TRUE;
791 /* pa_log("write_index invalidated"); */
795 s->read_index_not_before = s->context->ctag;
797 if (s->timing_info_valid)
798 s->timing_info.read_index_corrupt = TRUE;
800 /* pa_log("read_index invalidated"); */
803 request_auto_timing_update(s, TRUE);
806 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
807 pa_stream *s = userdata;
810 pa_assert(PA_REFCNT_VALUE(s) >= 1);
813 request_auto_timing_update(s, FALSE);
817 static void create_stream_complete(pa_stream *s) {
819 pa_assert(PA_REFCNT_VALUE(s) >= 1);
820 pa_assert(s->state == PA_STREAM_CREATING);
822 pa_stream_set_state(s, PA_STREAM_READY);
824 if (s->requested_bytes > 0 && s->write_callback)
825 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
827 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
828 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829 pa_assert(!s->auto_timing_update_event);
830 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
832 request_auto_timing_update(s, TRUE);
835 check_smoother_status(s, TRUE, FALSE, FALSE);
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
843 if (s->context->version >= 13)
846 /* Version older than 0.9.10 didn't do server side buffer_attr
847 * selection, hence we have to fake it on the client side. */
849 /* We choose fairly conservative values here, to not confuse
850 * old clients with extremely large playback buffers */
852 if (attr->maxlength == (uint32_t) -1)
853 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
855 if (attr->tlength == (uint32_t) -1)
856 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
858 if (attr->minreq == (uint32_t) -1)
859 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
861 if (attr->prebuf == (uint32_t) -1)
862 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
864 if (attr->fragsize == (uint32_t) -1)
865 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869 pa_stream *s = userdata;
870 uint32_t requested_bytes;
874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
875 pa_assert(s->state == PA_STREAM_CREATING);
879 if (command != PA_COMMAND_REPLY) {
880 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
883 pa_stream_set_state(s, PA_STREAM_FAILED);
887 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888 s->channel == PA_INVALID_INDEX ||
889 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
890 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891 pa_context_fail(s->context, PA_ERR_PROTOCOL);
895 s->requested_bytes = (int64_t) requested_bytes;
897 if (s->context->version >= 9) {
898 if (s->direction == PA_STREAM_PLAYBACK) {
899 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903 pa_context_fail(s->context, PA_ERR_PROTOCOL);
906 } else if (s->direction == PA_STREAM_RECORD) {
907 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909 pa_context_fail(s->context, PA_ERR_PROTOCOL);
915 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
918 const char *dn = NULL;
921 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924 pa_tagstruct_gets(t, &dn) < 0 ||
925 pa_tagstruct_get_boolean(t, &suspended) < 0) {
926 pa_context_fail(s->context, PA_ERR_PROTOCOL);
930 if (!dn || s->device_index == PA_INVALID_INDEX ||
931 ss.channels != cm.channels ||
932 !pa_channel_map_valid(&cm) ||
933 !pa_sample_spec_valid(&ss) ||
934 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937 pa_context_fail(s->context, PA_ERR_PROTOCOL);
941 pa_xfree(s->device_name);
942 s->device_name = pa_xstrdup(dn);
943 s->suspended = suspended;
949 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
952 if (pa_tagstruct_get_usec(t, &usec) < 0) {
953 pa_context_fail(s->context, PA_ERR_PROTOCOL);
957 if (s->direction == PA_STREAM_RECORD)
958 s->timing_info.configured_source_usec = usec;
960 s->timing_info.configured_sink_usec = usec;
963 if (!pa_tagstruct_eof(t)) {
964 pa_context_fail(s->context, PA_ERR_PROTOCOL);
968 if (s->direction == PA_STREAM_RECORD) {
969 pa_assert(!s->record_memblockq);
971 s->record_memblockq = pa_memblockq_new(
973 s->buffer_attr.maxlength,
975 pa_frame_size(&s->sample_spec),
982 s->channel_valid = TRUE;
983 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
985 create_stream_complete(s);
991 static int create_stream(
992 pa_stream_direction_t direction,
995 const pa_buffer_attr *attr,
996 pa_stream_flags_t flags,
997 const pa_cvolume *volume,
998 pa_stream *sync_stream) {
1002 pa_bool_t volume_set = FALSE;
1005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1008 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012 PA_STREAM_INTERPOLATE_TIMING|
1013 PA_STREAM_NOT_MONOTONIC|
1014 PA_STREAM_AUTO_TIMING_UPDATE|
1015 PA_STREAM_NO_REMAP_CHANNELS|
1016 PA_STREAM_NO_REMIX_CHANNELS|
1017 PA_STREAM_FIX_FORMAT|
1019 PA_STREAM_FIX_CHANNELS|
1020 PA_STREAM_DONT_MOVE|
1021 PA_STREAM_VARIABLE_RATE|
1022 PA_STREAM_PEAK_DETECT|
1023 PA_STREAM_START_MUTED|
1024 PA_STREAM_ADJUST_LATENCY|
1025 PA_STREAM_EARLY_REQUESTS|
1026 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027 PA_STREAM_START_UNMUTED|
1028 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
1030 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1031 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1032 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1033 /* Althought some of the other flags are not supported on older
1034 * version, we don't check for them here, because it doesn't hurt
1035 * when they are passed but actually not supported. This makes
1036 * client development easier */
1038 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1039 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1040 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1041 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1042 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1046 s->direction = direction;
1048 s->corked = !!(flags & PA_STREAM_START_CORKED);
1051 s->syncid = sync_stream->syncid;
1054 s->buffer_attr = *attr;
1055 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1057 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1060 x = pa_rtclock_now();
1062 pa_assert(!s->smoother);
1063 s->smoother = pa_smoother_new(
1064 SMOOTHER_ADJUST_TIME,
1065 SMOOTHER_HISTORY_TIME,
1066 !(flags & PA_STREAM_NOT_MONOTONIC),
1068 SMOOTHER_MIN_HISTORY,
1074 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1076 t = pa_tagstruct_command(
1078 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1081 if (s->context->version < 13)
1082 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1086 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1087 PA_TAG_CHANNEL_MAP, &s->channel_map,
1088 PA_TAG_U32, PA_INVALID_INDEX,
1090 PA_TAG_U32, s->buffer_attr.maxlength,
1091 PA_TAG_BOOLEAN, s->corked,
1094 if (s->direction == PA_STREAM_PLAYBACK) {
1099 PA_TAG_U32, s->buffer_attr.tlength,
1100 PA_TAG_U32, s->buffer_attr.prebuf,
1101 PA_TAG_U32, s->buffer_attr.minreq,
1102 PA_TAG_U32, s->syncid,
1105 volume_set = !!volume;
1108 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1110 pa_tagstruct_put_cvolume(t, volume);
1112 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1114 if (s->context->version >= 12) {
1117 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1118 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1119 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1120 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1121 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1122 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1123 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1127 if (s->context->version >= 13) {
1129 if (s->direction == PA_STREAM_PLAYBACK)
1130 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1132 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1136 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1137 PA_TAG_PROPLIST, s->proplist,
1140 if (s->direction == PA_STREAM_RECORD)
1141 pa_tagstruct_putu32(t, s->direct_on_input);
1144 if (s->context->version >= 14) {
1146 if (s->direction == PA_STREAM_PLAYBACK)
1147 pa_tagstruct_put_boolean(t, volume_set);
1149 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1152 if (s->context->version >= 15) {
1154 if (s->direction == PA_STREAM_PLAYBACK)
1155 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1157 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1158 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1161 pa_pstream_send_tagstruct(s->context->pstream, t);
1162 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1164 pa_stream_set_state(s, PA_STREAM_CREATING);
1170 int pa_stream_connect_playback(
1173 const pa_buffer_attr *attr,
1174 pa_stream_flags_t flags,
1176 pa_stream *sync_stream) {
1179 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1181 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1184 int pa_stream_connect_record(
1187 const pa_buffer_attr *attr,
1188 pa_stream_flags_t flags) {
1191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1193 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1196 int pa_stream_begin_write(
1202 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1204 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1205 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1206 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1207 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1208 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1210 if (!s->write_memblock) {
1211 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1212 s->write_data = pa_memblock_acquire(s->write_memblock);
1215 *data = s->write_data;
1216 *nbytes = pa_memblock_get_length(s->write_memblock);
1221 int pa_stream_cancel_write(
1225 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1227 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1228 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1229 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1230 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1232 pa_assert(s->write_data);
1234 pa_memblock_release(s->write_memblock);
1235 pa_memblock_unref(s->write_memblock);
1236 s->write_memblock = NULL;
1237 s->write_data = NULL;
1242 int pa_stream_write(
1246 pa_free_cb_t free_cb,
1248 pa_seek_mode_t seek) {
1251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1254 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1255 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1256 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1257 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1258 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1259 PA_CHECK_VALIDITY(s->context,
1260 !s->write_memblock ||
1261 ((data >= s->write_data) &&
1262 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1264 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1266 if (s->write_memblock) {
1269 /* pa_stream_write_begin() was called before */
1271 pa_memblock_release(s->write_memblock);
1273 chunk.memblock = s->write_memblock;
1274 chunk.index = (const char *) data - (const char *) s->write_data;
1275 chunk.length = length;
1277 s->write_memblock = NULL;
1278 s->write_data = NULL;
1280 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1281 pa_memblock_unref(chunk.memblock);
1284 pa_seek_mode_t t_seek = seek;
1285 int64_t t_offset = offset;
1286 size_t t_length = length;
1287 const void *t_data = data;
1289 /* pa_stream_write_begin() was not called before */
1291 while (t_length > 0) {
1296 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1297 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1298 chunk.length = t_length;
1302 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1303 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1305 d = pa_memblock_acquire(chunk.memblock);
1306 memcpy(d, t_data, chunk.length);
1307 pa_memblock_release(chunk.memblock);
1310 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1313 t_seek = PA_SEEK_RELATIVE;
1315 t_data = (const uint8_t*) t_data + chunk.length;
1316 t_length -= chunk.length;
1318 pa_memblock_unref(chunk.memblock);
1321 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1322 free_cb((void*) data);
1325 /* This is obviously wrong since we ignore the seeking index . But
1326 * that's OK, the server side applies the same error */
1327 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1329 if (s->direction == PA_STREAM_PLAYBACK) {
1331 /* Update latency request correction */
1332 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1334 if (seek == PA_SEEK_ABSOLUTE) {
1335 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1336 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1337 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1338 } else if (seek == PA_SEEK_RELATIVE) {
1339 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1340 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1342 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1345 /* Update the write index in the already available latency data */
1346 if (s->timing_info_valid) {
1348 if (seek == PA_SEEK_ABSOLUTE) {
1349 s->timing_info.write_index_corrupt = FALSE;
1350 s->timing_info.write_index = offset + (int64_t) length;
1351 } else if (seek == PA_SEEK_RELATIVE) {
1352 if (!s->timing_info.write_index_corrupt)
1353 s->timing_info.write_index += offset + (int64_t) length;
1355 s->timing_info.write_index_corrupt = TRUE;
1358 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1359 request_auto_timing_update(s, TRUE);
1365 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1367 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1371 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1372 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1373 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1375 if (!s->peek_memchunk.memblock) {
1377 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1383 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1386 pa_assert(s->peek_data);
1387 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1388 *length = s->peek_memchunk.length;
1392 int pa_stream_drop(pa_stream *s) {
1394 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1396 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1397 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1398 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1399 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1401 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1403 /* Fix the simulated local read index */
1404 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1405 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1407 pa_assert(s->peek_data);
1408 pa_memblock_release(s->peek_memchunk.memblock);
1409 pa_memblock_unref(s->peek_memchunk.memblock);
1410 pa_memchunk_reset(&s->peek_memchunk);
1415 size_t pa_stream_writable_size(pa_stream *s) {
1417 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1419 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1420 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1421 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1423 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1426 size_t pa_stream_readable_size(pa_stream *s) {
1428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1430 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1431 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1432 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1434 return pa_memblockq_get_length(s->record_memblockq);
1437 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1443 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1445 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1446 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1447 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1449 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1451 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1452 pa_tagstruct_putu32(t, s->channel);
1453 pa_pstream_send_tagstruct(s->context->pstream, t);
1454 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1459 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1463 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1464 pa_assert(s->state == PA_STREAM_READY);
1465 pa_assert(s->direction != PA_STREAM_UPLOAD);
1466 pa_assert(s->timing_info_valid);
1467 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1468 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1470 if (s->direction == PA_STREAM_PLAYBACK) {
1471 /* The last byte that was written into the output device
1472 * had this time value associated */
1473 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1475 if (!s->corked && !s->suspended) {
1477 if (!ignore_transport)
1478 /* Because the latency info took a little time to come
1479 * to us, we assume that the real output time is actually
1481 usec += s->timing_info.transport_usec;
1483 /* However, the output device usually maintains a buffer
1484 too, hence the real sample currently played is a little
1486 if (s->timing_info.sink_usec >= usec)
1489 usec -= s->timing_info.sink_usec;
1493 pa_assert(s->direction == PA_STREAM_RECORD);
1495 /* The last byte written into the server side queue had
1496 * this time value associated */
1497 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1499 if (!s->corked && !s->suspended) {
1501 if (!ignore_transport)
1502 /* Add transport latency */
1503 usec += s->timing_info.transport_usec;
1505 /* Add latency of data in device buffer */
1506 usec += s->timing_info.source_usec;
1508 /* If this is a monitor source, we need to correct the
1509 * time by the playback device buffer */
1510 if (s->timing_info.sink_usec >= usec)
1513 usec -= s->timing_info.sink_usec;
1520 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1521 pa_operation *o = userdata;
1522 struct timeval local, remote, now;
1524 pa_bool_t playing = FALSE;
1525 uint64_t underrun_for = 0, playing_for = 0;
1529 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1531 if (!o->context || !o->stream)
1534 i = &o->stream->timing_info;
1536 o->stream->timing_info_valid = FALSE;
1537 i->write_index_corrupt = TRUE;
1538 i->read_index_corrupt = TRUE;
1540 if (command != PA_COMMAND_REPLY) {
1541 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1546 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1547 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1548 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1549 pa_tagstruct_get_timeval(t, &local) < 0 ||
1550 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1551 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1552 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1554 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1558 if (o->context->version >= 13 &&
1559 o->stream->direction == PA_STREAM_PLAYBACK)
1560 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1561 pa_tagstruct_getu64(t, &playing_for) < 0) {
1563 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1568 if (!pa_tagstruct_eof(t)) {
1569 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1572 o->stream->timing_info_valid = TRUE;
1573 i->write_index_corrupt = FALSE;
1574 i->read_index_corrupt = FALSE;
1576 i->playing = (int) playing;
1577 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1579 pa_gettimeofday(&now);
1581 /* Calculcate timestamps */
1582 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1583 /* local and remote seem to have synchronized clocks */
1585 if (o->stream->direction == PA_STREAM_PLAYBACK)
1586 i->transport_usec = pa_timeval_diff(&remote, &local);
1588 i->transport_usec = pa_timeval_diff(&now, &remote);
1590 i->synchronized_clocks = TRUE;
1591 i->timestamp = remote;
1593 /* clocks are not synchronized, let's estimate latency then */
1594 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1595 i->synchronized_clocks = FALSE;
1596 i->timestamp = local;
1597 pa_timeval_add(&i->timestamp, i->transport_usec);
1600 /* Invalidate read and write indexes if necessary */
1601 if (tag < o->stream->read_index_not_before)
1602 i->read_index_corrupt = TRUE;
1604 if (tag < o->stream->write_index_not_before)
1605 i->write_index_corrupt = TRUE;
1607 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1608 /* Write index correction */
1611 uint32_t ctag = tag;
1613 /* Go through the saved correction values and add up the
1614 * total correction.*/
1615 for (n = 0, j = o->stream->current_write_index_correction+1;
1616 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1617 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1619 /* Step over invalid data or out-of-date data */
1620 if (!o->stream->write_index_corrections[j].valid ||
1621 o->stream->write_index_corrections[j].tag < ctag)
1624 /* Make sure that everything is in order */
1625 ctag = o->stream->write_index_corrections[j].tag+1;
1627 /* Now fix the write index */
1628 if (o->stream->write_index_corrections[j].corrupt) {
1629 /* A corrupting seek was made */
1630 i->write_index_corrupt = TRUE;
1631 } else if (o->stream->write_index_corrections[j].absolute) {
1632 /* An absolute seek was made */
1633 i->write_index = o->stream->write_index_corrections[j].value;
1634 i->write_index_corrupt = FALSE;
1635 } else if (!i->write_index_corrupt) {
1636 /* A relative seek was made */
1637 i->write_index += o->stream->write_index_corrections[j].value;
1641 /* Clear old correction entries */
1642 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1643 if (!o->stream->write_index_corrections[n].valid)
1646 if (o->stream->write_index_corrections[n].tag <= tag)
1647 o->stream->write_index_corrections[n].valid = FALSE;
1651 if (o->stream->direction == PA_STREAM_RECORD) {
1652 /* Read index correction */
1654 if (!i->read_index_corrupt)
1655 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1658 /* Update smoother */
1659 if (o->stream->smoother) {
1662 u = x = pa_rtclock_now() - i->transport_usec;
1664 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1667 /* If we weren't playing then it will take some time
1668 * until the audio will actually come out through the
1669 * speakers. Since we follow that timing here, we need
1670 * to try to fix this up */
1672 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1674 if (su < i->sink_usec)
1675 x += i->sink_usec - su;
1679 pa_smoother_pause(o->stream->smoother, x);
1681 /* Update the smoother */
1682 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1683 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1684 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1687 pa_smoother_resume(o->stream->smoother, x, TRUE);
1691 o->stream->auto_timing_update_requested = FALSE;
1693 if (o->stream->latency_update_callback)
1694 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1696 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1697 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1698 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1703 pa_operation_done(o);
1704 pa_operation_unref(o);
1707 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1715 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1717 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1718 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1719 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1721 if (s->direction == PA_STREAM_PLAYBACK) {
1722 /* Find a place to store the write_index correction data for this entry */
1723 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1725 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1726 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1728 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1730 t = pa_tagstruct_command(
1732 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1734 pa_tagstruct_putu32(t, s->channel);
1735 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1737 pa_pstream_send_tagstruct(s->context->pstream, t);
1738 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1740 if (s->direction == PA_STREAM_PLAYBACK) {
1741 /* Fill in initial correction data */
1743 s->current_write_index_correction = cidx;
1745 s->write_index_corrections[cidx].valid = TRUE;
1746 s->write_index_corrections[cidx].absolute = FALSE;
1747 s->write_index_corrections[cidx].corrupt = FALSE;
1748 s->write_index_corrections[cidx].tag = tag;
1749 s->write_index_corrections[cidx].value = 0;
1755 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1756 pa_stream *s = userdata;
1760 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1764 if (command != PA_COMMAND_REPLY) {
1765 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1768 pa_stream_set_state(s, PA_STREAM_FAILED);
1770 } else if (!pa_tagstruct_eof(t)) {
1771 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1775 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1781 int pa_stream_disconnect(pa_stream *s) {
1786 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1788 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1789 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1790 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1794 t = pa_tagstruct_command(
1796 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1797 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1799 pa_tagstruct_putu32(t, s->channel);
1800 pa_pstream_send_tagstruct(s->context->pstream, t);
1801 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1807 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1809 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1811 if (pa_detect_fork())
1814 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1817 s->read_callback = cb;
1818 s->read_userdata = userdata;
1821 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1823 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1825 if (pa_detect_fork())
1828 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1831 s->write_callback = cb;
1832 s->write_userdata = userdata;
1835 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1837 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839 if (pa_detect_fork())
1842 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1845 s->state_callback = cb;
1846 s->state_userdata = userdata;
1849 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1851 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1853 if (pa_detect_fork())
1856 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1859 s->overflow_callback = cb;
1860 s->overflow_userdata = userdata;
1863 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1865 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1867 if (pa_detect_fork())
1870 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1873 s->underflow_callback = cb;
1874 s->underflow_userdata = userdata;
1877 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1879 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1881 if (pa_detect_fork())
1884 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1887 s->latency_update_callback = cb;
1888 s->latency_update_userdata = userdata;
1891 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1893 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1895 if (pa_detect_fork())
1898 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1901 s->moved_callback = cb;
1902 s->moved_userdata = userdata;
1905 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1907 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1909 if (pa_detect_fork())
1912 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1915 s->suspended_callback = cb;
1916 s->suspended_userdata = userdata;
1919 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1921 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1923 if (pa_detect_fork())
1926 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1929 s->started_callback = cb;
1930 s->started_userdata = userdata;
1933 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1935 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1937 if (pa_detect_fork())
1940 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1943 s->event_callback = cb;
1944 s->event_userdata = userdata;
1947 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1949 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1951 if (pa_detect_fork())
1954 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1957 s->buffer_attr_callback = cb;
1958 s->buffer_attr_userdata = userdata;
1961 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1962 pa_operation *o = userdata;
1967 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1972 if (command != PA_COMMAND_REPLY) {
1973 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1977 } else if (!pa_tagstruct_eof(t)) {
1978 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1983 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1984 cb(o->stream, success, o->userdata);
1988 pa_operation_done(o);
1989 pa_operation_unref(o);
1992 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1998 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2001 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2002 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2006 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2008 t = pa_tagstruct_command(
2010 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2012 pa_tagstruct_putu32(t, s->channel);
2013 pa_tagstruct_put_boolean(t, !!b);
2014 pa_pstream_send_tagstruct(s->context->pstream, t);
2015 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2017 check_smoother_status(s, FALSE, FALSE, FALSE);
2019 /* This might cause the indexes to hang/start again, hence
2020 * let's request a timing update */
2021 request_auto_timing_update(s, TRUE);
2026 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2032 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2034 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2035 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2037 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2039 t = pa_tagstruct_command(s->context, command, &tag);
2040 pa_tagstruct_putu32(t, s->channel);
2041 pa_pstream_send_tagstruct(s->context->pstream, t);
2042 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2047 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2051 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2053 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2054 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2055 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2057 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2060 if (s->direction == PA_STREAM_PLAYBACK) {
2062 if (s->write_index_corrections[s->current_write_index_correction].valid)
2063 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2065 if (s->buffer_attr.prebuf > 0)
2066 check_smoother_status(s, FALSE, FALSE, TRUE);
2068 /* This will change the write index, but leave the
2069 * read index untouched. */
2070 invalidate_indexes(s, FALSE, TRUE);
2073 /* For record streams this has no influence on the write
2074 * index, but the read index might jump. */
2075 invalidate_indexes(s, TRUE, FALSE);
2080 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2084 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2087 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2088 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2089 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2091 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2094 /* This might cause the read index to hang again, hence
2095 * let's request a timing update */
2096 request_auto_timing_update(s, TRUE);
2101 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2105 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2108 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2109 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2110 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2112 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2115 /* This might cause the read index to start moving again, hence
2116 * let's request a timing update */
2117 request_auto_timing_update(s, TRUE);
2122 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2129 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2130 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2131 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2133 if (s->context->version >= 13) {
2134 pa_proplist *p = pa_proplist_new();
2136 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2137 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2138 pa_proplist_free(p);
2143 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2144 t = pa_tagstruct_command(
2146 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2148 pa_tagstruct_putu32(t, s->channel);
2149 pa_tagstruct_puts(t, name);
2150 pa_pstream_send_tagstruct(s->context->pstream, t);
2151 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2157 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2161 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2163 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2164 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2165 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2166 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2167 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2168 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2171 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2173 usec = calc_time(s, FALSE);
2175 /* Make sure the time runs monotonically */
2176 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2177 if (usec < s->previous_time)
2178 usec = s->previous_time;
2180 s->previous_time = usec;
2189 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2199 if (negative && s->direction == PA_STREAM_RECORD) {
2207 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2213 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2216 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2217 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2218 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2219 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2220 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2221 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2223 if ((r = pa_stream_get_time(s, &t)) < 0)
2226 if (s->direction == PA_STREAM_PLAYBACK)
2227 cindex = s->timing_info.write_index;
2229 cindex = s->timing_info.read_index;
2234 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2236 if (s->direction == PA_STREAM_PLAYBACK)
2237 *r_usec = time_counter_diff(s, c, t, negative);
2239 *r_usec = time_counter_diff(s, t, c, negative);
2244 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2246 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2248 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2249 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2250 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2251 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2253 return &s->timing_info;
2256 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2258 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2260 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2262 return &s->sample_spec;
2265 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2269 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2271 return &s->channel_map;
2274 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2276 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2278 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2279 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2280 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2282 return &s->buffer_attr;
2285 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2286 pa_operation *o = userdata;
2291 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2296 if (command != PA_COMMAND_REPLY) {
2297 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2302 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2303 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2304 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2305 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2306 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2307 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2310 } else if (o->stream->direction == PA_STREAM_RECORD) {
2311 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2312 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2313 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2318 if (o->stream->context->version >= 13) {
2321 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2322 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2326 if (o->stream->direction == PA_STREAM_RECORD)
2327 o->stream->timing_info.configured_source_usec = usec;
2329 o->stream->timing_info.configured_sink_usec = usec;
2332 if (!pa_tagstruct_eof(t)) {
2333 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2339 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2340 cb(o->stream, success, o->userdata);
2344 pa_operation_done(o);
2345 pa_operation_unref(o);
2349 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2355 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2358 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2361 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2363 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2365 t = pa_tagstruct_command(
2367 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2369 pa_tagstruct_putu32(t, s->channel);
2371 pa_tagstruct_putu32(t, attr->maxlength);
2373 if (s->direction == PA_STREAM_PLAYBACK)
2376 PA_TAG_U32, attr->tlength,
2377 PA_TAG_U32, attr->prebuf,
2378 PA_TAG_U32, attr->minreq,
2381 pa_tagstruct_putu32(t, attr->fragsize);
2383 if (s->context->version >= 13)
2384 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2386 if (s->context->version >= 14)
2387 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2389 pa_pstream_send_tagstruct(s->context->pstream, t);
2390 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2392 /* This might cause changes in the read/write indexex, hence let's
2393 * request a timing update */
2394 request_auto_timing_update(s, TRUE);
2399 uint32_t pa_stream_get_device_index(pa_stream *s) {
2401 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2403 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2404 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2405 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2406 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2407 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2409 return s->device_index;
2412 const char *pa_stream_get_device_name(pa_stream *s) {
2414 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2416 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2417 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2418 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2419 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2420 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2422 return s->device_name;
2425 int pa_stream_is_suspended(pa_stream *s) {
2427 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2429 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2430 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2431 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2432 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2434 return s->suspended;
2437 int pa_stream_is_corked(pa_stream *s) {
2439 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2441 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2442 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2443 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2448 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2449 pa_operation *o = userdata;
2454 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2459 if (command != PA_COMMAND_REPLY) {
2460 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2466 if (!pa_tagstruct_eof(t)) {
2467 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2472 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2473 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2476 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2477 cb(o->stream, success, o->userdata);
2481 pa_operation_done(o);
2482 pa_operation_unref(o);
2486 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2492 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2494 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2495 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2496 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2501 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2502 o->private = PA_UINT_TO_PTR(rate);
2504 t = pa_tagstruct_command(
2506 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2508 pa_tagstruct_putu32(t, s->channel);
2509 pa_tagstruct_putu32(t, rate);
2511 pa_pstream_send_tagstruct(s->context->pstream, t);
2512 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2517 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2523 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2525 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2526 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2527 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2528 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2529 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2531 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2533 t = pa_tagstruct_command(
2535 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2537 pa_tagstruct_putu32(t, s->channel);
2538 pa_tagstruct_putu32(t, (uint32_t) mode);
2539 pa_tagstruct_put_proplist(t, p);
2541 pa_pstream_send_tagstruct(s->context->pstream, t);
2542 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2544 /* Please note that we don't update s->proplist here, because we
2545 * don't export that field */
2550 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2554 const char * const*k;
2557 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2559 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2560 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2561 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2562 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2563 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2565 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2567 t = pa_tagstruct_command(
2569 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2571 pa_tagstruct_putu32(t, s->channel);
2573 for (k = keys; *k; k++)
2574 pa_tagstruct_puts(t, *k);
2576 pa_tagstruct_puts(t, NULL);
2578 pa_pstream_send_tagstruct(s->context->pstream, t);
2579 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2581 /* Please note that we don't update s->proplist here, because we
2582 * don't export that field */
2587 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2589 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2591 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2592 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2593 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2594 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2596 s->direct_on_input = sink_input_idx;
2601 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2603 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2605 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2606 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2607 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2609 return s->direct_on_input;