2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
41 #include "fork-detect.h"
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 static void reset_callbacks(pa_stream *s) {
55 s->read_callback = NULL;
56 s->read_userdata = NULL;
57 s->write_callback = NULL;
58 s->write_userdata = NULL;
59 s->state_callback = NULL;
60 s->state_userdata = NULL;
61 s->overflow_callback = NULL;
62 s->overflow_userdata = NULL;
63 s->underflow_callback = NULL;
64 s->underflow_userdata = NULL;
65 s->latency_update_callback = NULL;
66 s->latency_update_userdata = NULL;
67 s->moved_callback = NULL;
68 s->moved_userdata = NULL;
69 s->suspended_callback = NULL;
70 s->suspended_userdata = NULL;
71 s->started_callback = NULL;
72 s->started_userdata = NULL;
73 s->event_callback = NULL;
74 s->event_userdata = NULL;
77 pa_stream *pa_stream_new_with_proplist(
80 const pa_sample_spec *ss,
81 const pa_channel_map *map,
89 pa_assert(PA_REFCNT_VALUE(c) >= 1);
91 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
92 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
93 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
102 s = pa_xnew(pa_stream, 1);
105 s->mainloop = c->mainloop;
107 s->direction = PA_STREAM_NODIRECTION;
108 s->state = PA_STREAM_UNCONNECTED;
111 s->sample_spec = *ss;
112 s->channel_map = *map;
114 s->direct_on_input = PA_INVALID_INDEX;
116 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
118 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
121 s->channel_valid = FALSE;
122 s->syncid = c->csyncid++;
123 s->stream_index = PA_INVALID_INDEX;
125 s->requested_bytes = 0;
126 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
128 /* We initialize der target length here, so that if the user
129 * passes no explicit buffering metrics the default is similar to
130 * what older PA versions provided. */
132 s->buffer_attr.maxlength = (uint32_t) -1;
133 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
134 s->buffer_attr.minreq = (uint32_t) -1;
135 s->buffer_attr.prebuf = (uint32_t) -1;
136 s->buffer_attr.fragsize = (uint32_t) -1;
138 s->device_index = PA_INVALID_INDEX;
139 s->device_name = NULL;
140 s->suspended = FALSE;
142 pa_memchunk_reset(&s->peek_memchunk);
145 s->record_memblockq = NULL;
149 memset(&s->timing_info, 0, sizeof(s->timing_info));
150 s->timing_info_valid = FALSE;
152 s->previous_time = 0;
154 s->read_index_not_before = 0;
155 s->write_index_not_before = 0;
156 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
157 s->write_index_corrections[i].valid = 0;
158 s->current_write_index_correction = 0;
160 s->auto_timing_update_event = NULL;
161 s->auto_timing_update_requested = FALSE;
167 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
168 PA_LLIST_PREPEND(pa_stream, c->streams, s);
174 static void stream_unlink(pa_stream *s) {
181 /* Detach from context */
183 /* Unref all operatio object that point to us */
184 for (o = s->context->operations; o; o = n) {
188 pa_operation_cancel(o);
191 /* Drop all outstanding replies for this stream */
192 if (s->context->pdispatch)
193 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
195 if (s->channel_valid) {
196 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
198 s->channel_valid = FALSE;
201 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
206 if (s->auto_timing_update_event) {
207 pa_assert(s->mainloop);
208 s->mainloop->time_free(s->auto_timing_update_event);
214 static void stream_free(pa_stream *s) {
219 if (s->peek_memchunk.memblock) {
221 pa_memblock_release(s->peek_memchunk.memblock);
222 pa_memblock_unref(s->peek_memchunk.memblock);
225 if (s->record_memblockq)
226 pa_memblockq_free(s->record_memblockq);
229 pa_proplist_free(s->proplist);
232 pa_smoother_free(s->smoother);
234 pa_xfree(s->device_name);
238 void pa_stream_unref(pa_stream *s) {
240 pa_assert(PA_REFCNT_VALUE(s) >= 1);
242 if (PA_REFCNT_DEC(s) <= 0)
246 pa_stream* pa_stream_ref(pa_stream *s) {
248 pa_assert(PA_REFCNT_VALUE(s) >= 1);
254 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
256 pa_assert(PA_REFCNT_VALUE(s) >= 1);
261 pa_context* pa_stream_get_context(pa_stream *s) {
263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
268 uint32_t pa_stream_get_index(pa_stream *s) {
270 pa_assert(PA_REFCNT_VALUE(s) >= 1);
272 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
273 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
275 return s->stream_index;
278 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
280 pa_assert(PA_REFCNT_VALUE(s) >= 1);
289 if (s->state_callback)
290 s->state_callback(s, s->state_userdata);
292 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
298 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
300 pa_assert(PA_REFCNT_VALUE(s) >= 1);
302 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
305 if (s->state == PA_STREAM_READY &&
306 (force || !s->auto_timing_update_requested)) {
309 /* pa_log("automatically requesting new timing data"); */
311 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
312 pa_operation_unref(o);
313 s->auto_timing_update_requested = TRUE;
317 if (s->auto_timing_update_event) {
319 pa_gettimeofday(&next);
320 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
321 s->mainloop->time_restart(s->auto_timing_update_event, &next);
325 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
326 pa_context *c = userdata;
331 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
334 pa_assert(PA_REFCNT_VALUE(c) >= 1);
338 if (pa_tagstruct_getu32(t, &channel) < 0 ||
339 !pa_tagstruct_eof(t)) {
340 pa_context_fail(c, PA_ERR_PROTOCOL);
344 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
347 if (s->state != PA_STREAM_READY)
350 pa_context_set_error(c, PA_ERR_KILLED);
351 pa_stream_set_state(s, PA_STREAM_FAILED);
357 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
361 pa_assert(!force_start || !force_stop);
366 x = pa_rtclock_usec();
368 if (s->timing_info_valid) {
370 x -= s->timing_info.transport_usec;
372 x += s->timing_info.transport_usec;
374 if (s->direction == PA_STREAM_PLAYBACK)
375 /* it takes a while until the pause/resume is actually
377 x += s->timing_info.sink_usec;
379 /* Data froma while back will be dropped */
380 x -= s->timing_info.source_usec;
383 if (s->suspended || s->corked || force_stop)
384 pa_smoother_pause(s->smoother, x);
385 else if (force_start || s->buffer_attr.prebuf == 0)
386 pa_smoother_resume(s->smoother, x);
388 /* Please note that we have no idea if playback actually started
389 * if prebuf is non-zero! */
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393 pa_context *c = userdata;
400 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
403 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
406 pa_assert(PA_REFCNT_VALUE(c) >= 1);
410 if (c->version < 12) {
411 pa_context_fail(c, PA_ERR_PROTOCOL);
415 if (pa_tagstruct_getu32(t, &channel) < 0 ||
416 pa_tagstruct_getu32(t, &di) < 0 ||
417 pa_tagstruct_gets(t, &dn) < 0 ||
418 pa_tagstruct_get_boolean(t, &suspended) < 0) {
419 pa_context_fail(c, PA_ERR_PROTOCOL);
423 if (c->version >= 13) {
425 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428 pa_tagstruct_get_usec(t, &usec) < 0) {
429 pa_context_fail(c, PA_ERR_PROTOCOL);
433 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434 pa_tagstruct_getu32(t, &tlength) < 0 ||
435 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436 pa_tagstruct_getu32(t, &minreq) < 0 ||
437 pa_tagstruct_get_usec(t, &usec) < 0) {
438 pa_context_fail(c, PA_ERR_PROTOCOL);
444 if (!pa_tagstruct_eof(t)) {
445 pa_context_fail(c, PA_ERR_PROTOCOL);
449 if (!dn || di == PA_INVALID_INDEX) {
450 pa_context_fail(c, PA_ERR_PROTOCOL);
454 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
457 if (s->state != PA_STREAM_READY)
460 if (c->version >= 13) {
461 if (s->direction == PA_STREAM_RECORD)
462 s->timing_info.configured_source_usec = usec;
464 s->timing_info.configured_sink_usec = usec;
466 s->buffer_attr.maxlength = maxlength;
467 s->buffer_attr.fragsize = fragsize;
468 s->buffer_attr.tlength = tlength;
469 s->buffer_attr.prebuf = prebuf;
470 s->buffer_attr.minreq = minreq;
473 pa_xfree(s->device_name);
474 s->device_name = pa_xstrdup(dn);
475 s->device_index = di;
477 s->suspended = suspended;
479 check_smoother_status(s, TRUE, FALSE, FALSE);
480 request_auto_timing_update(s, TRUE);
482 if (s->moved_callback)
483 s->moved_callback(s, s->moved_userdata);
489 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
496 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
499 pa_assert(PA_REFCNT_VALUE(c) >= 1);
503 if (c->version < 12) {
504 pa_context_fail(c, PA_ERR_PROTOCOL);
508 if (pa_tagstruct_getu32(t, &channel) < 0 ||
509 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
510 !pa_tagstruct_eof(t)) {
511 pa_context_fail(c, PA_ERR_PROTOCOL);
515 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
518 if (s->state != PA_STREAM_READY)
521 s->suspended = suspended;
523 check_smoother_status(s, TRUE, FALSE, FALSE);
524 request_auto_timing_update(s, TRUE);
526 if (s->suspended_callback)
527 s->suspended_callback(s, s->suspended_userdata);
533 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
534 pa_context *c = userdata;
539 pa_assert(command == PA_COMMAND_STARTED);
542 pa_assert(PA_REFCNT_VALUE(c) >= 1);
546 if (c->version < 13) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
551 if (pa_tagstruct_getu32(t, &channel) < 0 ||
552 !pa_tagstruct_eof(t)) {
553 pa_context_fail(c, PA_ERR_PROTOCOL);
557 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
560 if (s->state != PA_STREAM_READY)
563 check_smoother_status(s, TRUE, TRUE, FALSE);
564 request_auto_timing_update(s, TRUE);
566 if (s->started_callback)
567 s->started_callback(s, s->started_userdata);
573 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574 pa_context *c = userdata;
577 pa_proplist *pl = NULL;
581 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
584 pa_assert(PA_REFCNT_VALUE(c) >= 1);
588 if (c->version < 15) {
589 pa_context_fail(c, PA_ERR_PROTOCOL);
593 pl = pa_proplist_new();
595 if (pa_tagstruct_getu32(t, &channel) < 0 ||
596 pa_tagstruct_gets(t, &event) < 0 ||
597 pa_tagstruct_get_proplist(t, pl) < 0 ||
598 !pa_tagstruct_eof(t) || !event) {
599 pa_context_fail(c, PA_ERR_PROTOCOL);
603 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
606 if (s->state != PA_STREAM_READY)
609 if (s->event_callback)
610 s->event_callback(s, event, pl, s->event_userdata);
616 pa_proplist_free(pl);
619 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
621 pa_context *c = userdata;
622 uint32_t bytes, channel;
625 pa_assert(command == PA_COMMAND_REQUEST);
628 pa_assert(PA_REFCNT_VALUE(c) >= 1);
632 if (pa_tagstruct_getu32(t, &channel) < 0 ||
633 pa_tagstruct_getu32(t, &bytes) < 0 ||
634 !pa_tagstruct_eof(t)) {
635 pa_context_fail(c, PA_ERR_PROTOCOL);
639 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
642 if (s->state != PA_STREAM_READY)
645 s->requested_bytes += bytes;
647 if (s->requested_bytes > 0 && s->write_callback)
648 s->write_callback(s, s->requested_bytes, s->write_userdata);
654 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
656 pa_context *c = userdata;
660 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
663 pa_assert(PA_REFCNT_VALUE(c) >= 1);
667 if (pa_tagstruct_getu32(t, &channel) < 0 ||
668 !pa_tagstruct_eof(t)) {
669 pa_context_fail(c, PA_ERR_PROTOCOL);
673 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
676 if (s->state != PA_STREAM_READY)
679 if (s->buffer_attr.prebuf > 0)
680 check_smoother_status(s, TRUE, FALSE, TRUE);
682 request_auto_timing_update(s, TRUE);
684 if (command == PA_COMMAND_OVERFLOW) {
685 if (s->overflow_callback)
686 s->overflow_callback(s, s->overflow_userdata);
687 } else if (command == PA_COMMAND_UNDERFLOW) {
688 if (s->underflow_callback)
689 s->underflow_callback(s, s->underflow_userdata);
696 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
698 pa_assert(PA_REFCNT_VALUE(s) >= 1);
700 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
702 if (s->state != PA_STREAM_READY)
706 s->write_index_not_before = s->context->ctag;
708 if (s->timing_info_valid)
709 s->timing_info.write_index_corrupt = TRUE;
711 /* pa_log("write_index invalidated"); */
715 s->read_index_not_before = s->context->ctag;
717 if (s->timing_info_valid)
718 s->timing_info.read_index_corrupt = TRUE;
720 /* pa_log("read_index invalidated"); */
723 request_auto_timing_update(s, TRUE);
726 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
727 pa_stream *s = userdata;
730 pa_assert(PA_REFCNT_VALUE(s) >= 1);
733 request_auto_timing_update(s, FALSE);
737 static void create_stream_complete(pa_stream *s) {
739 pa_assert(PA_REFCNT_VALUE(s) >= 1);
740 pa_assert(s->state == PA_STREAM_CREATING);
742 pa_stream_set_state(s, PA_STREAM_READY);
744 if (s->requested_bytes > 0 && s->write_callback)
745 s->write_callback(s, s->requested_bytes, s->write_userdata);
747 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
749 pa_gettimeofday(&tv);
750 tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
751 pa_assert(!s->auto_timing_update_event);
752 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
754 request_auto_timing_update(s, TRUE);
757 check_smoother_status(s, TRUE, FALSE, FALSE);
760 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
765 if (s->context->version >= 13)
768 /* Version older than 0.9.10 didn't do server side buffer_attr
769 * selection, hence we have to fake it on the client side. */
771 /* We choose fairly conservative values here, to not confuse
772 * old clients with extremely large playback buffers */
774 if (attr->maxlength == (uint32_t) -1)
775 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
777 if (attr->tlength == (uint32_t) -1)
778 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
780 if (attr->minreq == (uint32_t) -1)
781 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
783 if (attr->prebuf == (uint32_t) -1)
784 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
786 if (attr->fragsize == (uint32_t) -1)
787 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
790 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
791 pa_stream *s = userdata;
795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
796 pa_assert(s->state == PA_STREAM_CREATING);
800 if (command != PA_COMMAND_REPLY) {
801 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
804 pa_stream_set_state(s, PA_STREAM_FAILED);
808 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
809 s->channel == PA_INVALID_INDEX ||
810 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
811 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
812 pa_context_fail(s->context, PA_ERR_PROTOCOL);
816 if (s->context->version >= 9) {
817 if (s->direction == PA_STREAM_PLAYBACK) {
818 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
819 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
820 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
821 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
822 pa_context_fail(s->context, PA_ERR_PROTOCOL);
825 } else if (s->direction == PA_STREAM_RECORD) {
826 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
827 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
828 pa_context_fail(s->context, PA_ERR_PROTOCOL);
834 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
837 const char *dn = NULL;
840 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
841 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
842 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
843 pa_tagstruct_gets(t, &dn) < 0 ||
844 pa_tagstruct_get_boolean(t, &suspended) < 0) {
845 pa_context_fail(s->context, PA_ERR_PROTOCOL);
849 if (!dn || s->device_index == PA_INVALID_INDEX ||
850 ss.channels != cm.channels ||
851 !pa_channel_map_valid(&cm) ||
852 !pa_sample_spec_valid(&ss) ||
853 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
854 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
855 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
856 pa_context_fail(s->context, PA_ERR_PROTOCOL);
860 pa_xfree(s->device_name);
861 s->device_name = pa_xstrdup(dn);
862 s->suspended = suspended;
868 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
871 if (pa_tagstruct_get_usec(t, &usec) < 0) {
872 pa_context_fail(s->context, PA_ERR_PROTOCOL);
876 if (s->direction == PA_STREAM_RECORD)
877 s->timing_info.configured_source_usec = usec;
879 s->timing_info.configured_sink_usec = usec;
882 if (!pa_tagstruct_eof(t)) {
883 pa_context_fail(s->context, PA_ERR_PROTOCOL);
887 if (s->direction == PA_STREAM_RECORD) {
888 pa_assert(!s->record_memblockq);
890 s->record_memblockq = pa_memblockq_new(
892 s->buffer_attr.maxlength,
894 pa_frame_size(&s->sample_spec),
901 s->channel_valid = TRUE;
902 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
904 create_stream_complete(s);
910 static int create_stream(
911 pa_stream_direction_t direction,
914 const pa_buffer_attr *attr,
915 pa_stream_flags_t flags,
916 const pa_cvolume *volume,
917 pa_stream *sync_stream) {
921 pa_bool_t volume_set = FALSE;
924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
925 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
927 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
928 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
929 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
930 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
931 PA_STREAM_INTERPOLATE_TIMING|
932 PA_STREAM_NOT_MONOTONIC|
933 PA_STREAM_AUTO_TIMING_UPDATE|
934 PA_STREAM_NO_REMAP_CHANNELS|
935 PA_STREAM_NO_REMIX_CHANNELS|
936 PA_STREAM_FIX_FORMAT|
938 PA_STREAM_FIX_CHANNELS|
940 PA_STREAM_VARIABLE_RATE|
941 PA_STREAM_PEAK_DETECT|
942 PA_STREAM_START_MUTED|
943 PA_STREAM_ADJUST_LATENCY|
944 PA_STREAM_EARLY_REQUESTS|
945 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
946 PA_STREAM_START_UNMUTED|
947 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
949 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
950 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
951 /* Althought some of the other flags are not supported on older
952 * version, we don't check for them here, because it doesn't hurt
953 * when they are passed but actually not supported. This makes
954 * client development easier */
956 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
957 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
958 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
959 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
960 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
964 s->direction = direction;
966 s->corked = !!(flags & PA_STREAM_START_CORKED);
969 s->syncid = sync_stream->syncid;
972 s->buffer_attr = *attr;
973 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
975 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
979 pa_smoother_free(s->smoother);
981 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
983 x = pa_rtclock_usec();
984 pa_smoother_set_time_offset(s->smoother, x);
985 pa_smoother_pause(s->smoother, x);
989 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
991 t = pa_tagstruct_command(
993 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
996 if (s->context->version < 13)
997 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1001 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1002 PA_TAG_CHANNEL_MAP, &s->channel_map,
1003 PA_TAG_U32, PA_INVALID_INDEX,
1005 PA_TAG_U32, s->buffer_attr.maxlength,
1006 PA_TAG_BOOLEAN, s->corked,
1009 if (s->direction == PA_STREAM_PLAYBACK) {
1014 PA_TAG_U32, s->buffer_attr.tlength,
1015 PA_TAG_U32, s->buffer_attr.prebuf,
1016 PA_TAG_U32, s->buffer_attr.minreq,
1017 PA_TAG_U32, s->syncid,
1020 volume_set = !!volume;
1023 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1025 pa_tagstruct_put_cvolume(t, volume);
1027 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1029 if (s->context->version >= 12) {
1032 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1033 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1034 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1035 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1036 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1037 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1038 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1042 if (s->context->version >= 13) {
1044 if (s->direction == PA_STREAM_PLAYBACK)
1045 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1047 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1051 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1052 PA_TAG_PROPLIST, s->proplist,
1055 if (s->direction == PA_STREAM_RECORD)
1056 pa_tagstruct_putu32(t, s->direct_on_input);
1059 if (s->context->version >= 14) {
1061 if (s->direction == PA_STREAM_PLAYBACK)
1062 pa_tagstruct_put_boolean(t, volume_set);
1064 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1067 if (s->context->version >= 15) {
1069 if (s->direction == PA_STREAM_PLAYBACK)
1070 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1072 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1073 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1076 pa_pstream_send_tagstruct(s->context->pstream, t);
1077 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1079 pa_stream_set_state(s, PA_STREAM_CREATING);
1085 int pa_stream_connect_playback(
1088 const pa_buffer_attr *attr,
1089 pa_stream_flags_t flags,
1091 pa_stream *sync_stream) {
1094 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1096 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1099 int pa_stream_connect_record(
1102 const pa_buffer_attr *attr,
1103 pa_stream_flags_t flags) {
1106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1108 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1111 int pa_stream_write(
1115 void (*free_cb)(void *p),
1117 pa_seek_mode_t seek) {
1120 pa_seek_mode_t t_seek;
1126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1129 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1130 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1131 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1132 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1133 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1143 while (t_length > 0) {
1147 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1148 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1149 chunk.length = t_length;
1153 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1154 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1156 d = pa_memblock_acquire(chunk.memblock);
1157 memcpy(d, t_data, chunk.length);
1158 pa_memblock_release(chunk.memblock);
1161 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1164 t_seek = PA_SEEK_RELATIVE;
1166 t_data = (const uint8_t*) t_data + chunk.length;
1167 t_length -= chunk.length;
1169 pa_memblock_unref(chunk.memblock);
1172 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1173 free_cb((void*) data);
1175 if (length < s->requested_bytes)
1176 s->requested_bytes -= (uint32_t) length;
1178 s->requested_bytes = 0;
1180 /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1182 if (s->direction == PA_STREAM_PLAYBACK) {
1184 /* Update latency request correction */
1185 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1187 if (seek == PA_SEEK_ABSOLUTE) {
1188 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1189 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1190 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1191 } else if (seek == PA_SEEK_RELATIVE) {
1192 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1193 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1195 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1198 /* Update the write index in the already available latency data */
1199 if (s->timing_info_valid) {
1201 if (seek == PA_SEEK_ABSOLUTE) {
1202 s->timing_info.write_index_corrupt = FALSE;
1203 s->timing_info.write_index = offset + (int64_t) length;
1204 } else if (seek == PA_SEEK_RELATIVE) {
1205 if (!s->timing_info.write_index_corrupt)
1206 s->timing_info.write_index += offset + (int64_t) length;
1208 s->timing_info.write_index_corrupt = TRUE;
1211 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1212 request_auto_timing_update(s, TRUE);
1218 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1220 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1224 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1225 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1226 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1228 if (!s->peek_memchunk.memblock) {
1230 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1236 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1239 pa_assert(s->peek_data);
1240 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1241 *length = s->peek_memchunk.length;
1245 int pa_stream_drop(pa_stream *s) {
1247 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1249 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1250 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1251 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1252 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1254 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1256 /* Fix the simulated local read index */
1257 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1258 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1260 pa_assert(s->peek_data);
1261 pa_memblock_release(s->peek_memchunk.memblock);
1262 pa_memblock_unref(s->peek_memchunk.memblock);
1263 pa_memchunk_reset(&s->peek_memchunk);
1268 size_t pa_stream_writable_size(pa_stream *s) {
1270 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1272 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1273 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1274 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1276 return s->requested_bytes;
1279 size_t pa_stream_readable_size(pa_stream *s) {
1281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1287 return pa_memblockq_get_length(s->record_memblockq);
1290 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1296 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1298 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1299 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1300 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1302 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1304 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1305 pa_tagstruct_putu32(t, s->channel);
1306 pa_pstream_send_tagstruct(s->context->pstream, t);
1307 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1312 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1316 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1317 pa_assert(s->state == PA_STREAM_READY);
1318 pa_assert(s->direction != PA_STREAM_UPLOAD);
1319 pa_assert(s->timing_info_valid);
1320 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1321 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1323 if (s->direction == PA_STREAM_PLAYBACK) {
1324 /* The last byte that was written into the output device
1325 * had this time value associated */
1326 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1328 if (!s->corked && !s->suspended) {
1330 if (!ignore_transport)
1331 /* Because the latency info took a little time to come
1332 * to us, we assume that the real output time is actually
1334 usec += s->timing_info.transport_usec;
1336 /* However, the output device usually maintains a buffer
1337 too, hence the real sample currently played is a little
1339 if (s->timing_info.sink_usec >= usec)
1342 usec -= s->timing_info.sink_usec;
1346 pa_assert(s->direction == PA_STREAM_RECORD);
1348 /* The last byte written into the server side queue had
1349 * this time value associated */
1350 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1352 if (!s->corked && !s->suspended) {
1354 if (!ignore_transport)
1355 /* Add transport latency */
1356 usec += s->timing_info.transport_usec;
1358 /* Add latency of data in device buffer */
1359 usec += s->timing_info.source_usec;
1361 /* If this is a monitor source, we need to correct the
1362 * time by the playback device buffer */
1363 if (s->timing_info.sink_usec >= usec)
1366 usec -= s->timing_info.sink_usec;
1373 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1374 pa_operation *o = userdata;
1375 struct timeval local, remote, now;
1377 pa_bool_t playing = FALSE;
1378 uint64_t underrun_for = 0, playing_for = 0;
1382 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1384 if (!o->context || !o->stream)
1387 i = &o->stream->timing_info;
1389 o->stream->timing_info_valid = FALSE;
1390 i->write_index_corrupt = TRUE;
1391 i->read_index_corrupt = TRUE;
1393 if (command != PA_COMMAND_REPLY) {
1394 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1399 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1400 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1401 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1402 pa_tagstruct_get_timeval(t, &local) < 0 ||
1403 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1404 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1405 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1407 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1411 if (o->context->version >= 13 &&
1412 o->stream->direction == PA_STREAM_PLAYBACK)
1413 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1414 pa_tagstruct_getu64(t, &playing_for) < 0) {
1416 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1421 if (!pa_tagstruct_eof(t)) {
1422 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1425 o->stream->timing_info_valid = TRUE;
1426 i->write_index_corrupt = FALSE;
1427 i->read_index_corrupt = FALSE;
1429 i->playing = (int) playing;
1430 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1432 pa_gettimeofday(&now);
1434 /* Calculcate timestamps */
1435 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1436 /* local and remote seem to have synchronized clocks */
1438 if (o->stream->direction == PA_STREAM_PLAYBACK)
1439 i->transport_usec = pa_timeval_diff(&remote, &local);
1441 i->transport_usec = pa_timeval_diff(&now, &remote);
1443 i->synchronized_clocks = TRUE;
1444 i->timestamp = remote;
1446 /* clocks are not synchronized, let's estimate latency then */
1447 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1448 i->synchronized_clocks = FALSE;
1449 i->timestamp = local;
1450 pa_timeval_add(&i->timestamp, i->transport_usec);
1453 /* Invalidate read and write indexes if necessary */
1454 if (tag < o->stream->read_index_not_before)
1455 i->read_index_corrupt = TRUE;
1457 if (tag < o->stream->write_index_not_before)
1458 i->write_index_corrupt = TRUE;
1460 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1461 /* Write index correction */
1464 uint32_t ctag = tag;
1466 /* Go through the saved correction values and add up the
1467 * total correction.*/
1468 for (n = 0, j = o->stream->current_write_index_correction+1;
1469 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1470 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1472 /* Step over invalid data or out-of-date data */
1473 if (!o->stream->write_index_corrections[j].valid ||
1474 o->stream->write_index_corrections[j].tag < ctag)
1477 /* Make sure that everything is in order */
1478 ctag = o->stream->write_index_corrections[j].tag+1;
1480 /* Now fix the write index */
1481 if (o->stream->write_index_corrections[j].corrupt) {
1482 /* A corrupting seek was made */
1483 i->write_index_corrupt = TRUE;
1484 } else if (o->stream->write_index_corrections[j].absolute) {
1485 /* An absolute seek was made */
1486 i->write_index = o->stream->write_index_corrections[j].value;
1487 i->write_index_corrupt = FALSE;
1488 } else if (!i->write_index_corrupt) {
1489 /* A relative seek was made */
1490 i->write_index += o->stream->write_index_corrections[j].value;
1494 /* Clear old correction entries */
1495 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1496 if (!o->stream->write_index_corrections[n].valid)
1499 if (o->stream->write_index_corrections[n].tag <= tag)
1500 o->stream->write_index_corrections[n].valid = FALSE;
1504 if (o->stream->direction == PA_STREAM_RECORD) {
1505 /* Read index correction */
1507 if (!i->read_index_corrupt)
1508 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1511 /* Update smoother */
1512 if (o->stream->smoother) {
1515 u = x = pa_rtclock_usec() - i->transport_usec;
1517 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1520 /* If we weren't playing then it will take some time
1521 * until the audio will actually come out through the
1522 * speakers. Since we follow that timing here, we need
1523 * to try to fix this up */
1525 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1527 if (su < i->sink_usec)
1528 x += i->sink_usec - su;
1532 pa_smoother_pause(o->stream->smoother, x);
1534 /* Update the smoother */
1535 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1536 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1537 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1540 pa_smoother_resume(o->stream->smoother, x);
1544 o->stream->auto_timing_update_requested = FALSE;
1546 if (o->stream->latency_update_callback)
1547 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1549 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1550 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1551 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1556 pa_operation_done(o);
1557 pa_operation_unref(o);
1560 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1568 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1570 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1572 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1574 if (s->direction == PA_STREAM_PLAYBACK) {
1575 /* Find a place to store the write_index correction data for this entry */
1576 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1578 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1579 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1581 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1583 t = pa_tagstruct_command(
1585 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1587 pa_tagstruct_putu32(t, s->channel);
1588 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1590 pa_pstream_send_tagstruct(s->context->pstream, t);
1591 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1593 if (s->direction == PA_STREAM_PLAYBACK) {
1594 /* Fill in initial correction data */
1596 s->current_write_index_correction = cidx;
1598 s->write_index_corrections[cidx].valid = TRUE;
1599 s->write_index_corrections[cidx].absolute = FALSE;
1600 s->write_index_corrections[cidx].corrupt = FALSE;
1601 s->write_index_corrections[cidx].tag = tag;
1602 s->write_index_corrections[cidx].value = 0;
1608 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1609 pa_stream *s = userdata;
1613 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1617 if (command != PA_COMMAND_REPLY) {
1618 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1621 pa_stream_set_state(s, PA_STREAM_FAILED);
1623 } else if (!pa_tagstruct_eof(t)) {
1624 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1628 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1634 int pa_stream_disconnect(pa_stream *s) {
1639 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1641 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1642 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1643 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1647 t = pa_tagstruct_command(
1649 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1650 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1652 pa_tagstruct_putu32(t, s->channel);
1653 pa_pstream_send_tagstruct(s->context->pstream, t);
1654 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1660 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1662 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1664 if (pa_detect_fork())
1667 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1670 s->read_callback = cb;
1671 s->read_userdata = userdata;
1674 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1676 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1678 if (pa_detect_fork())
1681 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1684 s->write_callback = cb;
1685 s->write_userdata = userdata;
1688 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1690 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1692 if (pa_detect_fork())
1695 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1698 s->state_callback = cb;
1699 s->state_userdata = userdata;
1702 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1704 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1706 if (pa_detect_fork())
1709 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1712 s->overflow_callback = cb;
1713 s->overflow_userdata = userdata;
1716 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1718 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1720 if (pa_detect_fork())
1723 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1726 s->underflow_callback = cb;
1727 s->underflow_userdata = userdata;
1730 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1732 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1734 if (pa_detect_fork())
1737 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1740 s->latency_update_callback = cb;
1741 s->latency_update_userdata = userdata;
1744 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1746 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1748 if (pa_detect_fork())
1751 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1754 s->moved_callback = cb;
1755 s->moved_userdata = userdata;
1758 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1760 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1762 if (pa_detect_fork())
1765 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1768 s->suspended_callback = cb;
1769 s->suspended_userdata = userdata;
1772 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1774 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1776 if (pa_detect_fork())
1779 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1782 s->started_callback = cb;
1783 s->started_userdata = userdata;
1786 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1788 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1790 if (pa_detect_fork())
1793 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1796 s->event_callback = cb;
1797 s->event_userdata = userdata;
1800 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1801 pa_operation *o = userdata;
1806 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1811 if (command != PA_COMMAND_REPLY) {
1812 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1816 } else if (!pa_tagstruct_eof(t)) {
1817 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1822 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1823 cb(o->stream, success, o->userdata);
1827 pa_operation_done(o);
1828 pa_operation_unref(o);
1831 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1837 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1840 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1841 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1845 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1847 t = pa_tagstruct_command(
1849 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1851 pa_tagstruct_putu32(t, s->channel);
1852 pa_tagstruct_put_boolean(t, !!b);
1853 pa_pstream_send_tagstruct(s->context->pstream, t);
1854 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1856 check_smoother_status(s, FALSE, FALSE, FALSE);
1858 /* This might cause the indexes to hang/start again, hence
1859 * let's request a timing update */
1860 request_auto_timing_update(s, TRUE);
1865 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1871 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1873 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1874 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1876 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1878 t = pa_tagstruct_command(s->context, command, &tag);
1879 pa_tagstruct_putu32(t, s->channel);
1880 pa_pstream_send_tagstruct(s->context->pstream, t);
1881 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1886 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1890 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1894 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1896 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1899 if (s->direction == PA_STREAM_PLAYBACK) {
1901 if (s->write_index_corrections[s->current_write_index_correction].valid)
1902 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1904 if (s->buffer_attr.prebuf > 0)
1905 check_smoother_status(s, FALSE, FALSE, TRUE);
1907 /* This will change the write index, but leave the
1908 * read index untouched. */
1909 invalidate_indexes(s, FALSE, TRUE);
1912 /* For record streams this has no influence on the write
1913 * index, but the read index might jump. */
1914 invalidate_indexes(s, TRUE, FALSE);
1919 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1923 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1925 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1926 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1927 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1928 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1930 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1933 /* This might cause the read index to hang again, hence
1934 * let's request a timing update */
1935 request_auto_timing_update(s, TRUE);
1940 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1944 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1946 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1947 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1948 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1949 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1951 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1954 /* This might cause the read index to start moving again, hence
1955 * let's request a timing update */
1956 request_auto_timing_update(s, TRUE);
1961 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1968 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1969 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1970 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1972 if (s->context->version >= 13) {
1973 pa_proplist *p = pa_proplist_new();
1975 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1976 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1977 pa_proplist_free(p);
1982 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1983 t = pa_tagstruct_command(
1985 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1987 pa_tagstruct_putu32(t, s->channel);
1988 pa_tagstruct_puts(t, name);
1989 pa_pstream_send_tagstruct(s->context->pstream, t);
1990 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1996 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2000 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2002 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2003 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2004 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2005 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2006 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2007 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2010 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2012 usec = calc_time(s, FALSE);
2014 /* Make sure the time runs monotonically */
2015 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2016 if (usec < s->previous_time)
2017 usec = s->previous_time;
2019 s->previous_time = usec;
2028 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2030 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2038 if (negative && s->direction == PA_STREAM_RECORD) {
2046 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2052 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2055 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2056 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2057 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2058 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2059 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2060 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2062 if ((r = pa_stream_get_time(s, &t)) < 0)
2065 if (s->direction == PA_STREAM_PLAYBACK)
2066 cindex = s->timing_info.write_index;
2068 cindex = s->timing_info.read_index;
2073 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2075 if (s->direction == PA_STREAM_PLAYBACK)
2076 *r_usec = time_counter_diff(s, c, t, negative);
2078 *r_usec = time_counter_diff(s, t, c, negative);
2083 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2085 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2087 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2088 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2089 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2090 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2092 return &s->timing_info;
2095 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2097 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2099 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2101 return &s->sample_spec;
2104 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110 return &s->channel_map;
2113 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2115 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2117 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2118 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2119 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2121 return &s->buffer_attr;
2124 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2125 pa_operation *o = userdata;
2130 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2135 if (command != PA_COMMAND_REPLY) {
2136 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2141 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2142 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2143 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2144 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2145 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2146 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2149 } else if (o->stream->direction == PA_STREAM_RECORD) {
2150 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2151 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2152 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2157 if (o->stream->context->version >= 13) {
2160 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2161 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2165 if (o->stream->direction == PA_STREAM_RECORD)
2166 o->stream->timing_info.configured_source_usec = usec;
2168 o->stream->timing_info.configured_sink_usec = usec;
2171 if (!pa_tagstruct_eof(t)) {
2172 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2178 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2179 cb(o->stream, success, o->userdata);
2183 pa_operation_done(o);
2184 pa_operation_unref(o);
2188 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2194 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2197 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2202 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2204 t = pa_tagstruct_command(
2206 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2208 pa_tagstruct_putu32(t, s->channel);
2210 pa_tagstruct_putu32(t, attr->maxlength);
2212 if (s->direction == PA_STREAM_PLAYBACK)
2215 PA_TAG_U32, attr->tlength,
2216 PA_TAG_U32, attr->prebuf,
2217 PA_TAG_U32, attr->minreq,
2220 pa_tagstruct_putu32(t, attr->fragsize);
2222 if (s->context->version >= 13)
2223 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2225 if (s->context->version >= 14)
2226 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2228 pa_pstream_send_tagstruct(s->context->pstream, t);
2229 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2231 /* This might cause changes in the read/write indexex, hence let's
2232 * request a timing update */
2233 request_auto_timing_update(s, TRUE);
2238 uint32_t pa_stream_get_device_index(pa_stream *s) {
2240 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2242 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2243 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2244 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2245 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2246 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2248 return s->device_index;
2251 const char *pa_stream_get_device_name(pa_stream *s) {
2253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2255 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2256 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2257 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2258 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2259 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2261 return s->device_name;
2264 int pa_stream_is_suspended(pa_stream *s) {
2266 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2268 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2269 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2270 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2271 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2273 return s->suspended;
2276 int pa_stream_is_corked(pa_stream *s) {
2278 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2280 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2281 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2282 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2287 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2288 pa_operation *o = userdata;
2293 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2298 if (command != PA_COMMAND_REPLY) {
2299 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2305 if (!pa_tagstruct_eof(t)) {
2306 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2311 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2312 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2315 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2316 cb(o->stream, success, o->userdata);
2320 pa_operation_done(o);
2321 pa_operation_unref(o);
2325 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2331 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2333 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2335 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2336 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2337 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2340 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2341 o->private = PA_UINT_TO_PTR(rate);
2343 t = pa_tagstruct_command(
2345 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2347 pa_tagstruct_putu32(t, s->channel);
2348 pa_tagstruct_putu32(t, rate);
2350 pa_pstream_send_tagstruct(s->context->pstream, t);
2351 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2356 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2364 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2366 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2367 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2368 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2370 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2372 t = pa_tagstruct_command(
2374 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2376 pa_tagstruct_putu32(t, s->channel);
2377 pa_tagstruct_putu32(t, (uint32_t) mode);
2378 pa_tagstruct_put_proplist(t, p);
2380 pa_pstream_send_tagstruct(s->context->pstream, t);
2381 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2383 /* Please note that we don't update s->proplist here, because we
2384 * don't export that field */
2389 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2393 const char * const*k;
2396 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2398 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2399 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2400 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2404 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2406 t = pa_tagstruct_command(
2408 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2410 pa_tagstruct_putu32(t, s->channel);
2412 for (k = keys; *k; k++)
2413 pa_tagstruct_puts(t, *k);
2415 pa_tagstruct_puts(t, NULL);
2417 pa_pstream_send_tagstruct(s->context->pstream, t);
2418 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2420 /* Please note that we don't update s->proplist here, because we
2421 * don't export that field */
2426 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2430 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2431 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2432 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2433 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2435 s->direct_on_input = sink_input_idx;
2440 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2444 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2445 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2446 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2448 return s->direct_on_input;