2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
42 #include "fork-detect.h"
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
81 pa_stream *pa_stream_new_with_proplist(
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106 s = pa_xnew(pa_stream, 1);
109 s->mainloop = c->mainloop;
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
115 s->sample_spec = *ss;
116 s->channel_map = *map;
118 s->direct_on_input = PA_INVALID_INDEX;
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
147 s->write_memblock = NULL;
148 s->write_data = NULL;
150 pa_memchunk_reset(&s->peek_memchunk);
152 s->record_memblockq = NULL;
154 memset(&s->timing_info, 0, sizeof(s->timing_info));
155 s->timing_info_valid = FALSE;
157 s->previous_time = 0;
159 s->read_index_not_before = 0;
160 s->write_index_not_before = 0;
161 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162 s->write_index_corrections[i].valid = 0;
163 s->current_write_index_correction = 0;
165 s->auto_timing_update_event = NULL;
166 s->auto_timing_update_requested = FALSE;
167 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
173 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174 PA_LLIST_PREPEND(pa_stream, c->streams, s);
180 static void stream_unlink(pa_stream *s) {
187 /* Detach from context */
189 /* Unref all operatio object that point to us */
190 for (o = s->context->operations; o; o = n) {
194 pa_operation_cancel(o);
197 /* Drop all outstanding replies for this stream */
198 if (s->context->pdispatch)
199 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201 if (s->channel_valid) {
202 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
204 s->channel_valid = FALSE;
207 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
212 if (s->auto_timing_update_event) {
213 pa_assert(s->mainloop);
214 s->mainloop->time_free(s->auto_timing_update_event);
220 static void stream_free(pa_stream *s) {
225 if (s->write_memblock) {
226 pa_memblock_release(s->write_memblock);
227 pa_memblock_unref(s->write_data);
230 if (s->peek_memchunk.memblock) {
232 pa_memblock_release(s->peek_memchunk.memblock);
233 pa_memblock_unref(s->peek_memchunk.memblock);
236 if (s->record_memblockq)
237 pa_memblockq_free(s->record_memblockq);
240 pa_proplist_free(s->proplist);
243 pa_smoother_free(s->smoother);
245 pa_xfree(s->device_name);
249 void pa_stream_unref(pa_stream *s) {
251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253 if (PA_REFCNT_DEC(s) <= 0)
257 pa_stream* pa_stream_ref(pa_stream *s) {
259 pa_assert(PA_REFCNT_VALUE(s) >= 1);
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
272 pa_context* pa_stream_get_context(pa_stream *s) {
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
279 uint32_t pa_stream_get_index(pa_stream *s) {
281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286 return s->stream_index;
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
300 if (s->state_callback)
301 s->state_callback(s, s->state_userdata);
303 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
313 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316 if (s->state == PA_STREAM_READY &&
317 (force || !s->auto_timing_update_requested)) {
320 /* pa_log("Automatically requesting new timing data"); */
322 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323 pa_operation_unref(o);
324 s->auto_timing_update_requested = TRUE;
328 if (s->auto_timing_update_event) {
330 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339 pa_context *c = userdata;
344 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347 pa_assert(PA_REFCNT_VALUE(c) >= 1);
351 if (pa_tagstruct_getu32(t, &channel) < 0 ||
352 !pa_tagstruct_eof(t)) {
353 pa_context_fail(c, PA_ERR_PROTOCOL);
357 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
360 if (s->state != PA_STREAM_READY)
363 pa_context_set_error(c, PA_ERR_KILLED);
364 pa_stream_set_state(s, PA_STREAM_FAILED);
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
374 pa_assert(!force_start || !force_stop);
379 x = pa_rtclock_now();
381 if (s->timing_info_valid) {
383 x -= s->timing_info.transport_usec;
385 x += s->timing_info.transport_usec;
388 if (s->suspended || s->corked || force_stop)
389 pa_smoother_pause(s->smoother, x);
390 else if (force_start || s->buffer_attr.prebuf == 0)
391 pa_smoother_resume(s->smoother, x, TRUE);
394 /* Please note that we have no idea if playback actually started
395 * if prebuf is non-zero! */
398 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
399 pa_context *c = userdata;
406 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
409 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
412 pa_assert(PA_REFCNT_VALUE(c) >= 1);
416 if (c->version < 12) {
417 pa_context_fail(c, PA_ERR_PROTOCOL);
421 if (pa_tagstruct_getu32(t, &channel) < 0 ||
422 pa_tagstruct_getu32(t, &di) < 0 ||
423 pa_tagstruct_gets(t, &dn) < 0 ||
424 pa_tagstruct_get_boolean(t, &suspended) < 0) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
429 if (c->version >= 13) {
431 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
432 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
433 pa_tagstruct_getu32(t, &fragsize) < 0 ||
434 pa_tagstruct_get_usec(t, &usec) < 0) {
435 pa_context_fail(c, PA_ERR_PROTOCOL);
439 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
440 pa_tagstruct_getu32(t, &tlength) < 0 ||
441 pa_tagstruct_getu32(t, &prebuf) < 0 ||
442 pa_tagstruct_getu32(t, &minreq) < 0 ||
443 pa_tagstruct_get_usec(t, &usec) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
450 if (!pa_tagstruct_eof(t)) {
451 pa_context_fail(c, PA_ERR_PROTOCOL);
455 if (!dn || di == PA_INVALID_INDEX) {
456 pa_context_fail(c, PA_ERR_PROTOCOL);
460 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
463 if (s->state != PA_STREAM_READY)
466 if (c->version >= 13) {
467 if (s->direction == PA_STREAM_RECORD)
468 s->timing_info.configured_source_usec = usec;
470 s->timing_info.configured_sink_usec = usec;
472 s->buffer_attr.maxlength = maxlength;
473 s->buffer_attr.fragsize = fragsize;
474 s->buffer_attr.tlength = tlength;
475 s->buffer_attr.prebuf = prebuf;
476 s->buffer_attr.minreq = minreq;
479 pa_xfree(s->device_name);
480 s->device_name = pa_xstrdup(dn);
481 s->device_index = di;
483 s->suspended = suspended;
485 check_smoother_status(s, TRUE, FALSE, FALSE);
486 request_auto_timing_update(s, TRUE);
488 if (s->moved_callback)
489 s->moved_callback(s, s->moved_userdata);
495 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496 pa_context *c = userdata;
500 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
503 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
506 pa_assert(PA_REFCNT_VALUE(c) >= 1);
510 if (c->version < 15) {
511 pa_context_fail(c, PA_ERR_PROTOCOL);
515 if (pa_tagstruct_getu32(t, &channel) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
520 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
521 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
522 pa_tagstruct_getu32(t, &fragsize) < 0 ||
523 pa_tagstruct_get_usec(t, &usec) < 0) {
524 pa_context_fail(c, PA_ERR_PROTOCOL);
528 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
529 pa_tagstruct_getu32(t, &tlength) < 0 ||
530 pa_tagstruct_getu32(t, &prebuf) < 0 ||
531 pa_tagstruct_getu32(t, &minreq) < 0 ||
532 pa_tagstruct_get_usec(t, &usec) < 0) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
538 if (!pa_tagstruct_eof(t)) {
539 pa_context_fail(c, PA_ERR_PROTOCOL);
543 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
546 if (s->state != PA_STREAM_READY)
549 if (s->direction == PA_STREAM_RECORD)
550 s->timing_info.configured_source_usec = usec;
552 s->timing_info.configured_sink_usec = usec;
554 s->buffer_attr.maxlength = maxlength;
555 s->buffer_attr.fragsize = fragsize;
556 s->buffer_attr.tlength = tlength;
557 s->buffer_attr.prebuf = prebuf;
558 s->buffer_attr.minreq = minreq;
560 request_auto_timing_update(s, TRUE);
562 if (s->buffer_attr_callback)
563 s->buffer_attr_callback(s, s->buffer_attr_userdata);
569 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
570 pa_context *c = userdata;
576 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
579 pa_assert(PA_REFCNT_VALUE(c) >= 1);
583 if (c->version < 12) {
584 pa_context_fail(c, PA_ERR_PROTOCOL);
588 if (pa_tagstruct_getu32(t, &channel) < 0 ||
589 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
590 !pa_tagstruct_eof(t)) {
591 pa_context_fail(c, PA_ERR_PROTOCOL);
595 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
598 if (s->state != PA_STREAM_READY)
601 s->suspended = suspended;
603 check_smoother_status(s, TRUE, FALSE, FALSE);
604 request_auto_timing_update(s, TRUE);
606 if (s->suspended_callback)
607 s->suspended_callback(s, s->suspended_userdata);
613 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
614 pa_context *c = userdata;
619 pa_assert(command == PA_COMMAND_STARTED);
622 pa_assert(PA_REFCNT_VALUE(c) >= 1);
626 if (c->version < 13) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
631 if (pa_tagstruct_getu32(t, &channel) < 0 ||
632 !pa_tagstruct_eof(t)) {
633 pa_context_fail(c, PA_ERR_PROTOCOL);
637 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
640 if (s->state != PA_STREAM_READY)
643 check_smoother_status(s, TRUE, TRUE, FALSE);
644 request_auto_timing_update(s, TRUE);
646 if (s->started_callback)
647 s->started_callback(s, s->started_userdata);
653 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
654 pa_context *c = userdata;
657 pa_proplist *pl = NULL;
661 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
664 pa_assert(PA_REFCNT_VALUE(c) >= 1);
668 if (c->version < 15) {
669 pa_context_fail(c, PA_ERR_PROTOCOL);
673 pl = pa_proplist_new();
675 if (pa_tagstruct_getu32(t, &channel) < 0 ||
676 pa_tagstruct_gets(t, &event) < 0 ||
677 pa_tagstruct_get_proplist(t, pl) < 0 ||
678 !pa_tagstruct_eof(t) || !event) {
679 pa_context_fail(c, PA_ERR_PROTOCOL);
683 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
686 if (s->state != PA_STREAM_READY)
689 if (s->event_callback)
690 s->event_callback(s, event, pl, s->event_userdata);
696 pa_proplist_free(pl);
699 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
701 pa_context *c = userdata;
702 uint32_t bytes, channel;
705 pa_assert(command == PA_COMMAND_REQUEST);
708 pa_assert(PA_REFCNT_VALUE(c) >= 1);
712 if (pa_tagstruct_getu32(t, &channel) < 0 ||
713 pa_tagstruct_getu32(t, &bytes) < 0 ||
714 !pa_tagstruct_eof(t)) {
715 pa_context_fail(c, PA_ERR_PROTOCOL);
719 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
722 if (s->state != PA_STREAM_READY)
725 s->requested_bytes += bytes;
727 if (s->requested_bytes > 0 && s->write_callback)
728 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
734 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
736 pa_context *c = userdata;
740 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
743 pa_assert(PA_REFCNT_VALUE(c) >= 1);
747 if (pa_tagstruct_getu32(t, &channel) < 0 ||
748 !pa_tagstruct_eof(t)) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
753 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
756 if (s->state != PA_STREAM_READY)
759 if (s->buffer_attr.prebuf > 0)
760 check_smoother_status(s, TRUE, FALSE, TRUE);
762 request_auto_timing_update(s, TRUE);
764 if (command == PA_COMMAND_OVERFLOW) {
765 if (s->overflow_callback)
766 s->overflow_callback(s, s->overflow_userdata);
767 } else if (command == PA_COMMAND_UNDERFLOW) {
768 if (s->underflow_callback)
769 s->underflow_callback(s, s->underflow_userdata);
776 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
778 pa_assert(PA_REFCNT_VALUE(s) >= 1);
780 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
782 if (s->state != PA_STREAM_READY)
786 s->write_index_not_before = s->context->ctag;
788 if (s->timing_info_valid)
789 s->timing_info.write_index_corrupt = TRUE;
791 /* pa_log("write_index invalidated"); */
795 s->read_index_not_before = s->context->ctag;
797 if (s->timing_info_valid)
798 s->timing_info.read_index_corrupt = TRUE;
800 /* pa_log("read_index invalidated"); */
803 request_auto_timing_update(s, TRUE);
806 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
807 pa_stream *s = userdata;
810 pa_assert(PA_REFCNT_VALUE(s) >= 1);
813 request_auto_timing_update(s, FALSE);
817 static void create_stream_complete(pa_stream *s) {
819 pa_assert(PA_REFCNT_VALUE(s) >= 1);
820 pa_assert(s->state == PA_STREAM_CREATING);
822 pa_stream_set_state(s, PA_STREAM_READY);
824 if (s->requested_bytes > 0 && s->write_callback)
825 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
827 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
828 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
829 pa_assert(!s->auto_timing_update_event);
830 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
832 request_auto_timing_update(s, TRUE);
835 check_smoother_status(s, TRUE, FALSE, FALSE);
838 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
843 if (s->context->version >= 13)
846 /* Version older than 0.9.10 didn't do server side buffer_attr
847 * selection, hence we have to fake it on the client side. */
849 /* We choose fairly conservative values here, to not confuse
850 * old clients with extremely large playback buffers */
852 if (attr->maxlength == (uint32_t) -1)
853 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
855 if (attr->tlength == (uint32_t) -1)
856 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
858 if (attr->minreq == (uint32_t) -1)
859 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
861 if (attr->prebuf == (uint32_t) -1)
862 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
864 if (attr->fragsize == (uint32_t) -1)
865 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
868 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
869 pa_stream *s = userdata;
870 uint32_t requested_bytes = 0;
874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
875 pa_assert(s->state == PA_STREAM_CREATING);
879 if (command != PA_COMMAND_REPLY) {
880 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
883 pa_stream_set_state(s, PA_STREAM_FAILED);
887 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
888 s->channel == PA_INVALID_INDEX ||
889 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
890 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
891 pa_context_fail(s->context, PA_ERR_PROTOCOL);
895 s->requested_bytes = (int64_t) requested_bytes;
897 if (s->context->version >= 9) {
898 if (s->direction == PA_STREAM_PLAYBACK) {
899 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
900 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
901 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
902 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
903 pa_context_fail(s->context, PA_ERR_PROTOCOL);
906 } else if (s->direction == PA_STREAM_RECORD) {
907 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
908 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
909 pa_context_fail(s->context, PA_ERR_PROTOCOL);
915 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
918 const char *dn = NULL;
921 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
922 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
923 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
924 pa_tagstruct_gets(t, &dn) < 0 ||
925 pa_tagstruct_get_boolean(t, &suspended) < 0) {
926 pa_context_fail(s->context, PA_ERR_PROTOCOL);
930 if (!dn || s->device_index == PA_INVALID_INDEX ||
931 ss.channels != cm.channels ||
932 !pa_channel_map_valid(&cm) ||
933 !pa_sample_spec_valid(&ss) ||
934 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
935 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
936 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
937 pa_context_fail(s->context, PA_ERR_PROTOCOL);
941 pa_xfree(s->device_name);
942 s->device_name = pa_xstrdup(dn);
943 s->suspended = suspended;
949 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
952 if (pa_tagstruct_get_usec(t, &usec) < 0) {
953 pa_context_fail(s->context, PA_ERR_PROTOCOL);
957 if (s->direction == PA_STREAM_RECORD)
958 s->timing_info.configured_source_usec = usec;
960 s->timing_info.configured_sink_usec = usec;
963 if (!pa_tagstruct_eof(t)) {
964 pa_context_fail(s->context, PA_ERR_PROTOCOL);
968 if (s->direction == PA_STREAM_RECORD) {
969 pa_assert(!s->record_memblockq);
971 s->record_memblockq = pa_memblockq_new(
973 s->buffer_attr.maxlength,
975 pa_frame_size(&s->sample_spec),
982 s->channel_valid = TRUE;
983 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
985 create_stream_complete(s);
991 static int create_stream(
992 pa_stream_direction_t direction,
995 const pa_buffer_attr *attr,
996 pa_stream_flags_t flags,
997 const pa_cvolume *volume,
998 pa_stream *sync_stream) {
1002 pa_bool_t volume_set = FALSE;
1005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1008 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1009 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1010 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1011 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1012 PA_STREAM_INTERPOLATE_TIMING|
1013 PA_STREAM_NOT_MONOTONIC|
1014 PA_STREAM_AUTO_TIMING_UPDATE|
1015 PA_STREAM_NO_REMAP_CHANNELS|
1016 PA_STREAM_NO_REMIX_CHANNELS|
1017 PA_STREAM_FIX_FORMAT|
1019 PA_STREAM_FIX_CHANNELS|
1020 PA_STREAM_DONT_MOVE|
1021 PA_STREAM_VARIABLE_RATE|
1022 PA_STREAM_PEAK_DETECT|
1023 PA_STREAM_START_MUTED|
1024 PA_STREAM_ADJUST_LATENCY|
1025 PA_STREAM_EARLY_REQUESTS|
1026 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1027 PA_STREAM_START_UNMUTED|
1028 PA_STREAM_FAIL_ON_SUSPEND|
1029 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1031 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1032 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1033 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1034 /* Althought some of the other flags are not supported on older
1035 * version, we don't check for them here, because it doesn't hurt
1036 * when they are passed but actually not supported. This makes
1037 * client development easier */
1039 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1040 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1041 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1042 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1043 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1047 s->direction = direction;
1049 s->corked = !!(flags & PA_STREAM_START_CORKED);
1052 s->syncid = sync_stream->syncid;
1055 s->buffer_attr = *attr;
1056 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1058 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1061 x = pa_rtclock_now();
1063 pa_assert(!s->smoother);
1064 s->smoother = pa_smoother_new(
1065 SMOOTHER_ADJUST_TIME,
1066 SMOOTHER_HISTORY_TIME,
1067 !(flags & PA_STREAM_NOT_MONOTONIC),
1069 SMOOTHER_MIN_HISTORY,
1075 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1077 t = pa_tagstruct_command(
1079 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1082 if (s->context->version < 13)
1083 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1087 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1088 PA_TAG_CHANNEL_MAP, &s->channel_map,
1089 PA_TAG_U32, PA_INVALID_INDEX,
1091 PA_TAG_U32, s->buffer_attr.maxlength,
1092 PA_TAG_BOOLEAN, s->corked,
1095 if (s->direction == PA_STREAM_PLAYBACK) {
1100 PA_TAG_U32, s->buffer_attr.tlength,
1101 PA_TAG_U32, s->buffer_attr.prebuf,
1102 PA_TAG_U32, s->buffer_attr.minreq,
1103 PA_TAG_U32, s->syncid,
1106 volume_set = !!volume;
1109 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1111 pa_tagstruct_put_cvolume(t, volume);
1113 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1115 if (s->context->version >= 12) {
1118 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1119 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1120 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1121 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1122 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1123 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1124 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1128 if (s->context->version >= 13) {
1130 if (s->direction == PA_STREAM_PLAYBACK)
1131 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1133 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1137 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1138 PA_TAG_PROPLIST, s->proplist,
1141 if (s->direction == PA_STREAM_RECORD)
1142 pa_tagstruct_putu32(t, s->direct_on_input);
1145 if (s->context->version >= 14) {
1147 if (s->direction == PA_STREAM_PLAYBACK)
1148 pa_tagstruct_put_boolean(t, volume_set);
1150 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1153 if (s->context->version >= 15) {
1155 if (s->direction == PA_STREAM_PLAYBACK)
1156 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1158 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1159 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1162 if (s->context->version >= 17) {
1164 if (s->direction == PA_STREAM_PLAYBACK)
1165 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1169 pa_pstream_send_tagstruct(s->context->pstream, t);
1170 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1172 pa_stream_set_state(s, PA_STREAM_CREATING);
1178 int pa_stream_connect_playback(
1181 const pa_buffer_attr *attr,
1182 pa_stream_flags_t flags,
1183 const pa_cvolume *volume,
1184 pa_stream *sync_stream) {
1187 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1189 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1192 int pa_stream_connect_record(
1195 const pa_buffer_attr *attr,
1196 pa_stream_flags_t flags) {
1199 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1201 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1204 int pa_stream_begin_write(
1210 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1212 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1213 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1214 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1215 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1216 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1218 if (*nbytes != (size_t) -1) {
1221 m = pa_mempool_block_size_max(s->context->mempool);
1222 fs = pa_frame_size(&s->sample_spec);
1229 if (!s->write_memblock) {
1230 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1231 s->write_data = pa_memblock_acquire(s->write_memblock);
1234 *data = s->write_data;
1235 *nbytes = pa_memblock_get_length(s->write_memblock);
1240 int pa_stream_cancel_write(
1244 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1246 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1247 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1248 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1249 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1251 pa_assert(s->write_data);
1253 pa_memblock_release(s->write_memblock);
1254 pa_memblock_unref(s->write_memblock);
1255 s->write_memblock = NULL;
1256 s->write_data = NULL;
1261 int pa_stream_write(
1265 pa_free_cb_t free_cb,
1267 pa_seek_mode_t seek) {
1270 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1273 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1274 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1275 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1276 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1277 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1278 PA_CHECK_VALIDITY(s->context,
1279 !s->write_memblock ||
1280 ((data >= s->write_data) &&
1281 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1283 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1285 if (s->write_memblock) {
1288 /* pa_stream_write_begin() was called before */
1290 pa_memblock_release(s->write_memblock);
1292 chunk.memblock = s->write_memblock;
1293 chunk.index = (const char *) data - (const char *) s->write_data;
1294 chunk.length = length;
1296 s->write_memblock = NULL;
1297 s->write_data = NULL;
1299 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1300 pa_memblock_unref(chunk.memblock);
1303 pa_seek_mode_t t_seek = seek;
1304 int64_t t_offset = offset;
1305 size_t t_length = length;
1306 const void *t_data = data;
1308 /* pa_stream_write_begin() was not called before */
1310 while (t_length > 0) {
1315 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1316 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1317 chunk.length = t_length;
1321 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1322 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1324 d = pa_memblock_acquire(chunk.memblock);
1325 memcpy(d, t_data, chunk.length);
1326 pa_memblock_release(chunk.memblock);
1329 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1332 t_seek = PA_SEEK_RELATIVE;
1334 t_data = (const uint8_t*) t_data + chunk.length;
1335 t_length -= chunk.length;
1337 pa_memblock_unref(chunk.memblock);
1340 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1341 free_cb((void*) data);
1344 /* This is obviously wrong since we ignore the seeking index . But
1345 * that's OK, the server side applies the same error */
1346 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1348 if (s->direction == PA_STREAM_PLAYBACK) {
1350 /* Update latency request correction */
1351 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1353 if (seek == PA_SEEK_ABSOLUTE) {
1354 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1355 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1356 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1357 } else if (seek == PA_SEEK_RELATIVE) {
1358 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1359 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1361 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1364 /* Update the write index in the already available latency data */
1365 if (s->timing_info_valid) {
1367 if (seek == PA_SEEK_ABSOLUTE) {
1368 s->timing_info.write_index_corrupt = FALSE;
1369 s->timing_info.write_index = offset + (int64_t) length;
1370 } else if (seek == PA_SEEK_RELATIVE) {
1371 if (!s->timing_info.write_index_corrupt)
1372 s->timing_info.write_index += offset + (int64_t) length;
1374 s->timing_info.write_index_corrupt = TRUE;
1377 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1378 request_auto_timing_update(s, TRUE);
1384 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1386 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1390 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1391 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1392 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1394 if (!s->peek_memchunk.memblock) {
1396 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1402 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1405 pa_assert(s->peek_data);
1406 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1407 *length = s->peek_memchunk.length;
1411 int pa_stream_drop(pa_stream *s) {
1413 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1415 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1416 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1417 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1418 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1420 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1422 /* Fix the simulated local read index */
1423 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1424 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1426 pa_assert(s->peek_data);
1427 pa_memblock_release(s->peek_memchunk.memblock);
1428 pa_memblock_unref(s->peek_memchunk.memblock);
1429 pa_memchunk_reset(&s->peek_memchunk);
1434 size_t pa_stream_writable_size(pa_stream *s) {
1436 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1438 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1439 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1440 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1442 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1445 size_t pa_stream_readable_size(pa_stream *s) {
1447 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1449 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1450 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1451 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1453 return pa_memblockq_get_length(s->record_memblockq);
1456 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1462 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1464 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1465 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1466 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1468 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1470 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1471 pa_tagstruct_putu32(t, s->channel);
1472 pa_pstream_send_tagstruct(s->context->pstream, t);
1473 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1478 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1482 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1483 pa_assert(s->state == PA_STREAM_READY);
1484 pa_assert(s->direction != PA_STREAM_UPLOAD);
1485 pa_assert(s->timing_info_valid);
1486 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1487 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1489 if (s->direction == PA_STREAM_PLAYBACK) {
1490 /* The last byte that was written into the output device
1491 * had this time value associated */
1492 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1494 if (!s->corked && !s->suspended) {
1496 if (!ignore_transport)
1497 /* Because the latency info took a little time to come
1498 * to us, we assume that the real output time is actually
1500 usec += s->timing_info.transport_usec;
1502 /* However, the output device usually maintains a buffer
1503 too, hence the real sample currently played is a little
1505 if (s->timing_info.sink_usec >= usec)
1508 usec -= s->timing_info.sink_usec;
1512 pa_assert(s->direction == PA_STREAM_RECORD);
1514 /* The last byte written into the server side queue had
1515 * this time value associated */
1516 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1518 if (!s->corked && !s->suspended) {
1520 if (!ignore_transport)
1521 /* Add transport latency */
1522 usec += s->timing_info.transport_usec;
1524 /* Add latency of data in device buffer */
1525 usec += s->timing_info.source_usec;
1527 /* If this is a monitor source, we need to correct the
1528 * time by the playback device buffer */
1529 if (s->timing_info.sink_usec >= usec)
1532 usec -= s->timing_info.sink_usec;
1539 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1540 pa_operation *o = userdata;
1541 struct timeval local, remote, now;
1543 pa_bool_t playing = FALSE;
1544 uint64_t underrun_for = 0, playing_for = 0;
1548 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1550 if (!o->context || !o->stream)
1553 i = &o->stream->timing_info;
1555 o->stream->timing_info_valid = FALSE;
1556 i->write_index_corrupt = TRUE;
1557 i->read_index_corrupt = TRUE;
1559 if (command != PA_COMMAND_REPLY) {
1560 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1565 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1566 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1567 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1568 pa_tagstruct_get_timeval(t, &local) < 0 ||
1569 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1570 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1571 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1573 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1577 if (o->context->version >= 13 &&
1578 o->stream->direction == PA_STREAM_PLAYBACK)
1579 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1580 pa_tagstruct_getu64(t, &playing_for) < 0) {
1582 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1587 if (!pa_tagstruct_eof(t)) {
1588 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1591 o->stream->timing_info_valid = TRUE;
1592 i->write_index_corrupt = FALSE;
1593 i->read_index_corrupt = FALSE;
1595 i->playing = (int) playing;
1596 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1598 pa_gettimeofday(&now);
1600 /* Calculcate timestamps */
1601 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1602 /* local and remote seem to have synchronized clocks */
1604 if (o->stream->direction == PA_STREAM_PLAYBACK)
1605 i->transport_usec = pa_timeval_diff(&remote, &local);
1607 i->transport_usec = pa_timeval_diff(&now, &remote);
1609 i->synchronized_clocks = TRUE;
1610 i->timestamp = remote;
1612 /* clocks are not synchronized, let's estimate latency then */
1613 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1614 i->synchronized_clocks = FALSE;
1615 i->timestamp = local;
1616 pa_timeval_add(&i->timestamp, i->transport_usec);
1619 /* Invalidate read and write indexes if necessary */
1620 if (tag < o->stream->read_index_not_before)
1621 i->read_index_corrupt = TRUE;
1623 if (tag < o->stream->write_index_not_before)
1624 i->write_index_corrupt = TRUE;
1626 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1627 /* Write index correction */
1630 uint32_t ctag = tag;
1632 /* Go through the saved correction values and add up the
1633 * total correction.*/
1634 for (n = 0, j = o->stream->current_write_index_correction+1;
1635 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1636 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1638 /* Step over invalid data or out-of-date data */
1639 if (!o->stream->write_index_corrections[j].valid ||
1640 o->stream->write_index_corrections[j].tag < ctag)
1643 /* Make sure that everything is in order */
1644 ctag = o->stream->write_index_corrections[j].tag+1;
1646 /* Now fix the write index */
1647 if (o->stream->write_index_corrections[j].corrupt) {
1648 /* A corrupting seek was made */
1649 i->write_index_corrupt = TRUE;
1650 } else if (o->stream->write_index_corrections[j].absolute) {
1651 /* An absolute seek was made */
1652 i->write_index = o->stream->write_index_corrections[j].value;
1653 i->write_index_corrupt = FALSE;
1654 } else if (!i->write_index_corrupt) {
1655 /* A relative seek was made */
1656 i->write_index += o->stream->write_index_corrections[j].value;
1660 /* Clear old correction entries */
1661 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1662 if (!o->stream->write_index_corrections[n].valid)
1665 if (o->stream->write_index_corrections[n].tag <= tag)
1666 o->stream->write_index_corrections[n].valid = FALSE;
1670 if (o->stream->direction == PA_STREAM_RECORD) {
1671 /* Read index correction */
1673 if (!i->read_index_corrupt)
1674 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1677 /* Update smoother */
1678 if (o->stream->smoother) {
1681 u = x = pa_rtclock_now() - i->transport_usec;
1683 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1686 /* If we weren't playing then it will take some time
1687 * until the audio will actually come out through the
1688 * speakers. Since we follow that timing here, we need
1689 * to try to fix this up */
1691 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1693 if (su < i->sink_usec)
1694 x += i->sink_usec - su;
1698 pa_smoother_pause(o->stream->smoother, x);
1700 /* Update the smoother */
1701 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1702 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1703 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1706 pa_smoother_resume(o->stream->smoother, x, TRUE);
1710 o->stream->auto_timing_update_requested = FALSE;
1712 if (o->stream->latency_update_callback)
1713 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1715 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1716 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1717 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1722 pa_operation_done(o);
1723 pa_operation_unref(o);
1726 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1734 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1736 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1737 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1738 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1740 if (s->direction == PA_STREAM_PLAYBACK) {
1741 /* Find a place to store the write_index correction data for this entry */
1742 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1744 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1745 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1747 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1749 t = pa_tagstruct_command(
1751 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1753 pa_tagstruct_putu32(t, s->channel);
1754 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1756 pa_pstream_send_tagstruct(s->context->pstream, t);
1757 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1759 if (s->direction == PA_STREAM_PLAYBACK) {
1760 /* Fill in initial correction data */
1762 s->current_write_index_correction = cidx;
1764 s->write_index_corrections[cidx].valid = TRUE;
1765 s->write_index_corrections[cidx].absolute = FALSE;
1766 s->write_index_corrections[cidx].corrupt = FALSE;
1767 s->write_index_corrections[cidx].tag = tag;
1768 s->write_index_corrections[cidx].value = 0;
1774 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1775 pa_stream *s = userdata;
1779 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1783 if (command != PA_COMMAND_REPLY) {
1784 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1787 pa_stream_set_state(s, PA_STREAM_FAILED);
1789 } else if (!pa_tagstruct_eof(t)) {
1790 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1794 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1800 int pa_stream_disconnect(pa_stream *s) {
1805 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1807 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1808 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1809 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1813 t = pa_tagstruct_command(
1815 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1816 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1818 pa_tagstruct_putu32(t, s->channel);
1819 pa_pstream_send_tagstruct(s->context->pstream, t);
1820 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1826 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1828 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1830 if (pa_detect_fork())
1833 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1836 s->read_callback = cb;
1837 s->read_userdata = userdata;
1840 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1842 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1844 if (pa_detect_fork())
1847 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1850 s->write_callback = cb;
1851 s->write_userdata = userdata;
1854 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1856 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1858 if (pa_detect_fork())
1861 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1864 s->state_callback = cb;
1865 s->state_userdata = userdata;
1868 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1870 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1872 if (pa_detect_fork())
1875 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1878 s->overflow_callback = cb;
1879 s->overflow_userdata = userdata;
1882 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1884 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1886 if (pa_detect_fork())
1889 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1892 s->underflow_callback = cb;
1893 s->underflow_userdata = userdata;
1896 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1898 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1900 if (pa_detect_fork())
1903 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1906 s->latency_update_callback = cb;
1907 s->latency_update_userdata = userdata;
1910 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1912 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1914 if (pa_detect_fork())
1917 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1920 s->moved_callback = cb;
1921 s->moved_userdata = userdata;
1924 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1926 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1928 if (pa_detect_fork())
1931 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1934 s->suspended_callback = cb;
1935 s->suspended_userdata = userdata;
1938 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1940 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1942 if (pa_detect_fork())
1945 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1948 s->started_callback = cb;
1949 s->started_userdata = userdata;
1952 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1954 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1956 if (pa_detect_fork())
1959 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1962 s->event_callback = cb;
1963 s->event_userdata = userdata;
1966 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1968 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1970 if (pa_detect_fork())
1973 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1976 s->buffer_attr_callback = cb;
1977 s->buffer_attr_userdata = userdata;
1980 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1981 pa_operation *o = userdata;
1986 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1991 if (command != PA_COMMAND_REPLY) {
1992 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1996 } else if (!pa_tagstruct_eof(t)) {
1997 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2002 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2003 cb(o->stream, success, o->userdata);
2007 pa_operation_done(o);
2008 pa_operation_unref(o);
2011 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2017 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2019 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2020 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2021 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2025 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2027 t = pa_tagstruct_command(
2029 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2031 pa_tagstruct_putu32(t, s->channel);
2032 pa_tagstruct_put_boolean(t, !!b);
2033 pa_pstream_send_tagstruct(s->context->pstream, t);
2034 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2036 check_smoother_status(s, FALSE, FALSE, FALSE);
2038 /* This might cause the indexes to hang/start again, hence
2039 * let's request a timing update */
2040 request_auto_timing_update(s, TRUE);
2045 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2051 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2053 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2054 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2056 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2058 t = pa_tagstruct_command(s->context, command, &tag);
2059 pa_tagstruct_putu32(t, s->channel);
2060 pa_pstream_send_tagstruct(s->context->pstream, t);
2061 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2066 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2070 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2072 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2073 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2074 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2076 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2079 if (s->direction == PA_STREAM_PLAYBACK) {
2081 if (s->write_index_corrections[s->current_write_index_correction].valid)
2082 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2084 if (s->buffer_attr.prebuf > 0)
2085 check_smoother_status(s, FALSE, FALSE, TRUE);
2087 /* This will change the write index, but leave the
2088 * read index untouched. */
2089 invalidate_indexes(s, FALSE, TRUE);
2092 /* For record streams this has no influence on the write
2093 * index, but the read index might jump. */
2094 invalidate_indexes(s, TRUE, FALSE);
2099 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2103 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2105 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2106 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2107 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2108 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2110 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2113 /* This might cause the read index to hang again, hence
2114 * let's request a timing update */
2115 request_auto_timing_update(s, TRUE);
2120 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2124 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2126 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2127 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2128 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2129 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2131 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2134 /* This might cause the read index to start moving again, hence
2135 * let's request a timing update */
2136 request_auto_timing_update(s, TRUE);
2141 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2145 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2148 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2149 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2150 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2152 if (s->context->version >= 13) {
2153 pa_proplist *p = pa_proplist_new();
2155 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2156 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2157 pa_proplist_free(p);
2162 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2163 t = pa_tagstruct_command(
2165 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2167 pa_tagstruct_putu32(t, s->channel);
2168 pa_tagstruct_puts(t, name);
2169 pa_pstream_send_tagstruct(s->context->pstream, t);
2170 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2176 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2182 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2183 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2184 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2185 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2186 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2187 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2190 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2192 usec = calc_time(s, FALSE);
2194 /* Make sure the time runs monotonically */
2195 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2196 if (usec < s->previous_time)
2197 usec = s->previous_time;
2199 s->previous_time = usec;
2208 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2210 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2218 if (negative && s->direction == PA_STREAM_RECORD) {
2226 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2232 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2235 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2236 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2237 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2238 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2239 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2240 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2242 if ((r = pa_stream_get_time(s, &t)) < 0)
2245 if (s->direction == PA_STREAM_PLAYBACK)
2246 cindex = s->timing_info.write_index;
2248 cindex = s->timing_info.read_index;
2253 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2255 if (s->direction == PA_STREAM_PLAYBACK)
2256 *r_usec = time_counter_diff(s, c, t, negative);
2258 *r_usec = time_counter_diff(s, t, c, negative);
2263 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2265 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2267 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2268 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2269 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2270 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2272 return &s->timing_info;
2275 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2277 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2281 return &s->sample_spec;
2284 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2286 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2288 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2290 return &s->channel_map;
2293 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2295 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2297 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2298 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2299 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2301 return &s->buffer_attr;
2304 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2305 pa_operation *o = userdata;
2310 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2315 if (command != PA_COMMAND_REPLY) {
2316 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2321 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2322 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2323 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2324 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2325 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2326 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2329 } else if (o->stream->direction == PA_STREAM_RECORD) {
2330 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2331 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2332 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2337 if (o->stream->context->version >= 13) {
2340 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2341 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2345 if (o->stream->direction == PA_STREAM_RECORD)
2346 o->stream->timing_info.configured_source_usec = usec;
2348 o->stream->timing_info.configured_sink_usec = usec;
2351 if (!pa_tagstruct_eof(t)) {
2352 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2358 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2359 cb(o->stream, success, o->userdata);
2363 pa_operation_done(o);
2364 pa_operation_unref(o);
2368 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2377 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2378 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2379 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2380 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2382 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2384 t = pa_tagstruct_command(
2386 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2388 pa_tagstruct_putu32(t, s->channel);
2390 pa_tagstruct_putu32(t, attr->maxlength);
2392 if (s->direction == PA_STREAM_PLAYBACK)
2395 PA_TAG_U32, attr->tlength,
2396 PA_TAG_U32, attr->prebuf,
2397 PA_TAG_U32, attr->minreq,
2400 pa_tagstruct_putu32(t, attr->fragsize);
2402 if (s->context->version >= 13)
2403 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2405 if (s->context->version >= 14)
2406 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2408 pa_pstream_send_tagstruct(s->context->pstream, t);
2409 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2411 /* This might cause changes in the read/write indexex, hence let's
2412 * request a timing update */
2413 request_auto_timing_update(s, TRUE);
2418 uint32_t pa_stream_get_device_index(pa_stream *s) {
2420 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2422 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2423 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2424 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2425 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2426 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2428 return s->device_index;
2431 const char *pa_stream_get_device_name(pa_stream *s) {
2433 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2435 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2436 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2437 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2438 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2439 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2441 return s->device_name;
2444 int pa_stream_is_suspended(pa_stream *s) {
2446 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2448 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2449 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2450 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2451 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2453 return s->suspended;
2456 int pa_stream_is_corked(pa_stream *s) {
2458 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2460 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2461 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2462 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2467 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2468 pa_operation *o = userdata;
2473 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2478 if (command != PA_COMMAND_REPLY) {
2479 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2485 if (!pa_tagstruct_eof(t)) {
2486 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2491 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2492 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2495 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2496 cb(o->stream, success, o->userdata);
2500 pa_operation_done(o);
2501 pa_operation_unref(o);
2505 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2511 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2513 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2514 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2515 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2516 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2517 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2518 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2520 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2521 o->private = PA_UINT_TO_PTR(rate);
2523 t = pa_tagstruct_command(
2525 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2527 pa_tagstruct_putu32(t, s->channel);
2528 pa_tagstruct_putu32(t, rate);
2530 pa_pstream_send_tagstruct(s->context->pstream, t);
2531 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2536 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2542 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2544 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2545 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2546 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2547 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2548 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2550 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2552 t = pa_tagstruct_command(
2554 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2556 pa_tagstruct_putu32(t, s->channel);
2557 pa_tagstruct_putu32(t, (uint32_t) mode);
2558 pa_tagstruct_put_proplist(t, p);
2560 pa_pstream_send_tagstruct(s->context->pstream, t);
2561 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2563 /* Please note that we don't update s->proplist here, because we
2564 * don't export that field */
2569 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2573 const char * const*k;
2576 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2578 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2579 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2580 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2581 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2582 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2584 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2586 t = pa_tagstruct_command(
2588 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2590 pa_tagstruct_putu32(t, s->channel);
2592 for (k = keys; *k; k++)
2593 pa_tagstruct_puts(t, *k);
2595 pa_tagstruct_puts(t, NULL);
2597 pa_pstream_send_tagstruct(s->context->pstream, t);
2598 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2600 /* Please note that we don't update s->proplist here, because we
2601 * don't export that field */
2606 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2608 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2610 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2611 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2612 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2613 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2615 s->direct_on_input = sink_input_idx;
2620 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2622 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2624 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2625 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2626 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2628 return s->direct_on_input;