2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
43 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
46 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_MIN_HISTORY (4)
49 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
50 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
53 static void reset_callbacks(pa_stream *s) {
54 s->read_callback = NULL;
55 s->read_userdata = NULL;
56 s->write_callback = NULL;
57 s->write_userdata = NULL;
58 s->state_callback = NULL;
59 s->state_userdata = NULL;
60 s->overflow_callback = NULL;
61 s->overflow_userdata = NULL;
62 s->underflow_callback = NULL;
63 s->underflow_userdata = NULL;
64 s->latency_update_callback = NULL;
65 s->latency_update_userdata = NULL;
66 s->moved_callback = NULL;
67 s->moved_userdata = NULL;
68 s->suspended_callback = NULL;
69 s->suspended_userdata = NULL;
70 s->started_callback = NULL;
71 s->started_userdata = NULL;
72 s->event_callback = NULL;
73 s->event_userdata = NULL;
76 pa_stream *pa_stream_new_with_proplist(
79 const pa_sample_spec *ss,
80 const pa_channel_map *map,
88 pa_assert(PA_REFCNT_VALUE(c) >= 1);
90 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
91 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
92 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
93 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
100 s = pa_xnew(pa_stream, 1);
103 s->mainloop = c->mainloop;
105 s->direction = PA_STREAM_NODIRECTION;
106 s->state = PA_STREAM_UNCONNECTED;
109 s->sample_spec = *ss;
110 s->channel_map = *map;
112 s->direct_on_input = PA_INVALID_INDEX;
114 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
116 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
119 s->channel_valid = FALSE;
120 s->syncid = c->csyncid++;
121 s->stream_index = PA_INVALID_INDEX;
123 s->requested_bytes = 0;
124 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
126 /* We initialize der target length here, so that if the user
127 * passes no explicit buffering metrics the default is similar to
128 * what older PA versions provided. */
130 s->buffer_attr.maxlength = (uint32_t) -1;
131 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
132 s->buffer_attr.minreq = (uint32_t) -1;
133 s->buffer_attr.prebuf = (uint32_t) -1;
134 s->buffer_attr.fragsize = (uint32_t) -1;
136 s->device_index = PA_INVALID_INDEX;
137 s->device_name = NULL;
138 s->suspended = FALSE;
140 pa_memchunk_reset(&s->peek_memchunk);
143 s->record_memblockq = NULL;
147 memset(&s->timing_info, 0, sizeof(s->timing_info));
148 s->timing_info_valid = FALSE;
150 s->previous_time = 0;
152 s->read_index_not_before = 0;
153 s->write_index_not_before = 0;
154 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
155 s->write_index_corrections[i].valid = 0;
156 s->current_write_index_correction = 0;
158 s->auto_timing_update_event = NULL;
159 s->auto_timing_update_requested = FALSE;
165 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
166 PA_LLIST_PREPEND(pa_stream, c->streams, s);
172 static void stream_unlink(pa_stream *s) {
179 /* Detach from context */
181 /* Unref all operatio object that point to us */
182 for (o = s->context->operations; o; o = n) {
186 pa_operation_cancel(o);
189 /* Drop all outstanding replies for this stream */
190 if (s->context->pdispatch)
191 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
193 if (s->channel_valid) {
194 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
196 s->channel_valid = FALSE;
199 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
204 if (s->auto_timing_update_event) {
205 pa_assert(s->mainloop);
206 s->mainloop->time_free(s->auto_timing_update_event);
212 static void stream_free(pa_stream *s) {
217 if (s->peek_memchunk.memblock) {
219 pa_memblock_release(s->peek_memchunk.memblock);
220 pa_memblock_unref(s->peek_memchunk.memblock);
223 if (s->record_memblockq)
224 pa_memblockq_free(s->record_memblockq);
227 pa_proplist_free(s->proplist);
230 pa_smoother_free(s->smoother);
232 pa_xfree(s->device_name);
236 void pa_stream_unref(pa_stream *s) {
238 pa_assert(PA_REFCNT_VALUE(s) >= 1);
240 if (PA_REFCNT_DEC(s) <= 0)
244 pa_stream* pa_stream_ref(pa_stream *s) {
246 pa_assert(PA_REFCNT_VALUE(s) >= 1);
252 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
254 pa_assert(PA_REFCNT_VALUE(s) >= 1);
259 pa_context* pa_stream_get_context(pa_stream *s) {
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
266 uint32_t pa_stream_get_index(pa_stream *s) {
268 pa_assert(PA_REFCNT_VALUE(s) >= 1);
270 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
272 return s->stream_index;
275 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
277 pa_assert(PA_REFCNT_VALUE(s) >= 1);
286 if (s->state_callback)
287 s->state_callback(s, s->state_userdata);
289 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
295 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
297 pa_assert(PA_REFCNT_VALUE(s) >= 1);
299 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
302 if (s->state == PA_STREAM_READY &&
303 (force || !s->auto_timing_update_requested)) {
306 /* pa_log("automatically requesting new timing data"); */
308 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
309 pa_operation_unref(o);
310 s->auto_timing_update_requested = TRUE;
314 if (s->auto_timing_update_event) {
316 pa_gettimeofday(&next);
317 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
318 s->mainloop->time_restart(s->auto_timing_update_event, &next);
322 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
323 pa_context *c = userdata;
328 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
331 pa_assert(PA_REFCNT_VALUE(c) >= 1);
335 if (pa_tagstruct_getu32(t, &channel) < 0 ||
336 !pa_tagstruct_eof(t)) {
337 pa_context_fail(c, PA_ERR_PROTOCOL);
341 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
344 if (s->state != PA_STREAM_READY)
347 pa_context_set_error(c, PA_ERR_KILLED);
348 pa_stream_set_state(s, PA_STREAM_FAILED);
354 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
358 pa_assert(!force_start || !force_stop);
363 x = pa_rtclock_usec();
365 if (s->timing_info_valid) {
367 x -= s->timing_info.transport_usec;
369 x += s->timing_info.transport_usec;
371 if (s->direction == PA_STREAM_PLAYBACK)
372 /* it takes a while until the pause/resume is actually
374 x += s->timing_info.sink_usec;
376 /* Data froma while back will be dropped */
377 x -= s->timing_info.source_usec;
380 if (s->suspended || s->corked || force_stop)
381 pa_smoother_pause(s->smoother, x);
382 else if (force_start || s->buffer_attr.prebuf == 0)
383 pa_smoother_resume(s->smoother, x);
385 /* Please note that we have no idea if playback actually started
386 * if prebuf is non-zero! */
389 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
390 pa_context *c = userdata;
397 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
400 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
403 pa_assert(PA_REFCNT_VALUE(c) >= 1);
407 if (c->version < 12) {
408 pa_context_fail(c, PA_ERR_PROTOCOL);
412 if (pa_tagstruct_getu32(t, &channel) < 0 ||
413 pa_tagstruct_getu32(t, &di) < 0 ||
414 pa_tagstruct_gets(t, &dn) < 0 ||
415 pa_tagstruct_get_boolean(t, &suspended) < 0) {
416 pa_context_fail(c, PA_ERR_PROTOCOL);
420 if (c->version >= 13) {
422 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
423 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
424 pa_tagstruct_getu32(t, &fragsize) < 0 ||
425 pa_tagstruct_get_usec(t, &usec) < 0) {
426 pa_context_fail(c, PA_ERR_PROTOCOL);
430 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
431 pa_tagstruct_getu32(t, &tlength) < 0 ||
432 pa_tagstruct_getu32(t, &prebuf) < 0 ||
433 pa_tagstruct_getu32(t, &minreq) < 0 ||
434 pa_tagstruct_get_usec(t, &usec) < 0) {
435 pa_context_fail(c, PA_ERR_PROTOCOL);
441 if (!pa_tagstruct_eof(t)) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
446 if (!dn || di == PA_INVALID_INDEX) {
447 pa_context_fail(c, PA_ERR_PROTOCOL);
451 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
454 if (s->state != PA_STREAM_READY)
457 if (c->version >= 13) {
458 if (s->direction == PA_STREAM_RECORD)
459 s->timing_info.configured_source_usec = usec;
461 s->timing_info.configured_sink_usec = usec;
463 s->buffer_attr.maxlength = maxlength;
464 s->buffer_attr.fragsize = fragsize;
465 s->buffer_attr.tlength = tlength;
466 s->buffer_attr.prebuf = prebuf;
467 s->buffer_attr.minreq = minreq;
470 pa_xfree(s->device_name);
471 s->device_name = pa_xstrdup(dn);
472 s->device_index = di;
474 s->suspended = suspended;
476 check_smoother_status(s, TRUE, FALSE, FALSE);
477 request_auto_timing_update(s, TRUE);
479 if (s->moved_callback)
480 s->moved_callback(s, s->moved_userdata);
486 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
487 pa_context *c = userdata;
493 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
496 pa_assert(PA_REFCNT_VALUE(c) >= 1);
500 if (c->version < 12) {
501 pa_context_fail(c, PA_ERR_PROTOCOL);
505 if (pa_tagstruct_getu32(t, &channel) < 0 ||
506 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
507 !pa_tagstruct_eof(t)) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
512 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
515 if (s->state != PA_STREAM_READY)
518 s->suspended = suspended;
520 check_smoother_status(s, TRUE, FALSE, FALSE);
521 request_auto_timing_update(s, TRUE);
523 if (s->suspended_callback)
524 s->suspended_callback(s, s->suspended_userdata);
530 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
531 pa_context *c = userdata;
536 pa_assert(command == PA_COMMAND_STARTED);
539 pa_assert(PA_REFCNT_VALUE(c) >= 1);
543 if (c->version < 13) {
544 pa_context_fail(c, PA_ERR_PROTOCOL);
548 if (pa_tagstruct_getu32(t, &channel) < 0 ||
549 !pa_tagstruct_eof(t)) {
550 pa_context_fail(c, PA_ERR_PROTOCOL);
554 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
557 if (s->state != PA_STREAM_READY)
560 check_smoother_status(s, TRUE, TRUE, FALSE);
561 request_auto_timing_update(s, TRUE);
563 if (s->started_callback)
564 s->started_callback(s, s->started_userdata);
570 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
571 pa_context *c = userdata;
574 pa_proplist *pl = NULL;
578 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
581 pa_assert(PA_REFCNT_VALUE(c) >= 1);
585 if (c->version < 15) {
586 pa_context_fail(c, PA_ERR_PROTOCOL);
590 pl = pa_proplist_new();
592 if (pa_tagstruct_getu32(t, &channel) < 0 ||
593 pa_tagstruct_gets(t, &event) < 0 ||
594 pa_tagstruct_get_proplist(t, pl) < 0 ||
595 !pa_tagstruct_eof(t) || !event) {
596 pa_context_fail(c, PA_ERR_PROTOCOL);
600 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
603 if (s->state != PA_STREAM_READY)
606 if (s->event_callback)
607 s->event_callback(s, event, pl, s->event_userdata);
613 pa_proplist_free(pl);
616 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
618 pa_context *c = userdata;
619 uint32_t bytes, channel;
622 pa_assert(command == PA_COMMAND_REQUEST);
625 pa_assert(PA_REFCNT_VALUE(c) >= 1);
629 if (pa_tagstruct_getu32(t, &channel) < 0 ||
630 pa_tagstruct_getu32(t, &bytes) < 0 ||
631 !pa_tagstruct_eof(t)) {
632 pa_context_fail(c, PA_ERR_PROTOCOL);
636 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
639 if (s->state != PA_STREAM_READY)
642 s->requested_bytes += bytes;
644 if (s->requested_bytes > 0 && s->write_callback)
645 s->write_callback(s, s->requested_bytes, s->write_userdata);
651 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
653 pa_context *c = userdata;
657 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
660 pa_assert(PA_REFCNT_VALUE(c) >= 1);
664 if (pa_tagstruct_getu32(t, &channel) < 0 ||
665 !pa_tagstruct_eof(t)) {
666 pa_context_fail(c, PA_ERR_PROTOCOL);
670 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
673 if (s->state != PA_STREAM_READY)
676 if (s->buffer_attr.prebuf > 0)
677 check_smoother_status(s, TRUE, FALSE, TRUE);
679 request_auto_timing_update(s, TRUE);
681 if (command == PA_COMMAND_OVERFLOW) {
682 if (s->overflow_callback)
683 s->overflow_callback(s, s->overflow_userdata);
684 } else if (command == PA_COMMAND_UNDERFLOW) {
685 if (s->underflow_callback)
686 s->underflow_callback(s, s->underflow_userdata);
693 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
695 pa_assert(PA_REFCNT_VALUE(s) >= 1);
697 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
699 if (s->state != PA_STREAM_READY)
703 s->write_index_not_before = s->context->ctag;
705 if (s->timing_info_valid)
706 s->timing_info.write_index_corrupt = TRUE;
708 /* pa_log("write_index invalidated"); */
712 s->read_index_not_before = s->context->ctag;
714 if (s->timing_info_valid)
715 s->timing_info.read_index_corrupt = TRUE;
717 /* pa_log("read_index invalidated"); */
720 request_auto_timing_update(s, TRUE);
723 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
724 pa_stream *s = userdata;
727 pa_assert(PA_REFCNT_VALUE(s) >= 1);
730 request_auto_timing_update(s, FALSE);
734 static void create_stream_complete(pa_stream *s) {
736 pa_assert(PA_REFCNT_VALUE(s) >= 1);
737 pa_assert(s->state == PA_STREAM_CREATING);
739 pa_stream_set_state(s, PA_STREAM_READY);
741 if (s->requested_bytes > 0 && s->write_callback)
742 s->write_callback(s, s->requested_bytes, s->write_userdata);
744 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
746 pa_gettimeofday(&tv);
747 tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
748 pa_assert(!s->auto_timing_update_event);
749 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
751 request_auto_timing_update(s, TRUE);
754 check_smoother_status(s, TRUE, FALSE, FALSE);
757 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
762 if (s->context->version >= 13)
765 /* Version older than 0.9.10 didn't do server side buffer_attr
766 * selection, hence we have to fake it on the client side. */
768 /* We choose fairly conservative values here, to not confuse
769 * old clients with extremely large playback buffers */
771 if (attr->maxlength == (uint32_t) -1)
772 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
774 if (attr->tlength == (uint32_t) -1)
775 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
777 if (attr->minreq == (uint32_t) -1)
778 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
780 if (attr->prebuf == (uint32_t) -1)
781 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
783 if (attr->fragsize == (uint32_t) -1)
784 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
787 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
788 pa_stream *s = userdata;
792 pa_assert(PA_REFCNT_VALUE(s) >= 1);
793 pa_assert(s->state == PA_STREAM_CREATING);
797 if (command != PA_COMMAND_REPLY) {
798 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
801 pa_stream_set_state(s, PA_STREAM_FAILED);
805 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
806 s->channel == PA_INVALID_INDEX ||
807 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
808 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
809 pa_context_fail(s->context, PA_ERR_PROTOCOL);
813 if (s->context->version >= 9) {
814 if (s->direction == PA_STREAM_PLAYBACK) {
815 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
816 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
817 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
818 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
819 pa_context_fail(s->context, PA_ERR_PROTOCOL);
822 } else if (s->direction == PA_STREAM_RECORD) {
823 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
824 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
825 pa_context_fail(s->context, PA_ERR_PROTOCOL);
831 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
834 const char *dn = NULL;
837 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
838 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
839 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
840 pa_tagstruct_gets(t, &dn) < 0 ||
841 pa_tagstruct_get_boolean(t, &suspended) < 0) {
842 pa_context_fail(s->context, PA_ERR_PROTOCOL);
846 if (!dn || s->device_index == PA_INVALID_INDEX ||
847 ss.channels != cm.channels ||
848 !pa_channel_map_valid(&cm) ||
849 !pa_sample_spec_valid(&ss) ||
850 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
851 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
852 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
853 pa_context_fail(s->context, PA_ERR_PROTOCOL);
857 pa_xfree(s->device_name);
858 s->device_name = pa_xstrdup(dn);
859 s->suspended = suspended;
865 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
868 if (pa_tagstruct_get_usec(t, &usec) < 0) {
869 pa_context_fail(s->context, PA_ERR_PROTOCOL);
873 if (s->direction == PA_STREAM_RECORD)
874 s->timing_info.configured_source_usec = usec;
876 s->timing_info.configured_sink_usec = usec;
879 if (!pa_tagstruct_eof(t)) {
880 pa_context_fail(s->context, PA_ERR_PROTOCOL);
884 if (s->direction == PA_STREAM_RECORD) {
885 pa_assert(!s->record_memblockq);
887 s->record_memblockq = pa_memblockq_new(
889 s->buffer_attr.maxlength,
891 pa_frame_size(&s->sample_spec),
898 s->channel_valid = TRUE;
899 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
901 create_stream_complete(s);
907 static int create_stream(
908 pa_stream_direction_t direction,
911 const pa_buffer_attr *attr,
912 pa_stream_flags_t flags,
913 const pa_cvolume *volume,
914 pa_stream *sync_stream) {
918 pa_bool_t volume_set = FALSE;
921 pa_assert(PA_REFCNT_VALUE(s) >= 1);
922 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
924 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
925 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
926 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
927 PA_STREAM_INTERPOLATE_TIMING|
928 PA_STREAM_NOT_MONOTONIC|
929 PA_STREAM_AUTO_TIMING_UPDATE|
930 PA_STREAM_NO_REMAP_CHANNELS|
931 PA_STREAM_NO_REMIX_CHANNELS|
932 PA_STREAM_FIX_FORMAT|
934 PA_STREAM_FIX_CHANNELS|
936 PA_STREAM_VARIABLE_RATE|
937 PA_STREAM_PEAK_DETECT|
938 PA_STREAM_START_MUTED|
939 PA_STREAM_ADJUST_LATENCY|
940 PA_STREAM_EARLY_REQUESTS|
941 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
942 PA_STREAM_START_UNMUTED|
943 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
945 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
946 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
947 /* Althought some of the other flags are not supported on older
948 * version, we don't check for them here, because it doesn't hurt
949 * when they are passed but actually not supported. This makes
950 * client development easier */
952 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
953 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
954 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
955 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
956 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
960 s->direction = direction;
962 s->corked = !!(flags & PA_STREAM_START_CORKED);
965 s->syncid = sync_stream->syncid;
968 s->buffer_attr = *attr;
969 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
971 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
975 pa_smoother_free(s->smoother);
977 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
979 x = pa_rtclock_usec();
980 pa_smoother_set_time_offset(s->smoother, x);
981 pa_smoother_pause(s->smoother, x);
985 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
987 t = pa_tagstruct_command(
989 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
992 if (s->context->version < 13)
993 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
997 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
998 PA_TAG_CHANNEL_MAP, &s->channel_map,
999 PA_TAG_U32, PA_INVALID_INDEX,
1001 PA_TAG_U32, s->buffer_attr.maxlength,
1002 PA_TAG_BOOLEAN, s->corked,
1005 if (s->direction == PA_STREAM_PLAYBACK) {
1010 PA_TAG_U32, s->buffer_attr.tlength,
1011 PA_TAG_U32, s->buffer_attr.prebuf,
1012 PA_TAG_U32, s->buffer_attr.minreq,
1013 PA_TAG_U32, s->syncid,
1016 volume_set = !!volume;
1019 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1021 pa_tagstruct_put_cvolume(t, volume);
1023 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1025 if (s->context->version >= 12) {
1028 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1029 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1030 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1031 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1032 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1033 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1034 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1038 if (s->context->version >= 13) {
1040 if (s->direction == PA_STREAM_PLAYBACK)
1041 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1043 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1047 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1048 PA_TAG_PROPLIST, s->proplist,
1051 if (s->direction == PA_STREAM_RECORD)
1052 pa_tagstruct_putu32(t, s->direct_on_input);
1055 if (s->context->version >= 14) {
1057 if (s->direction == PA_STREAM_PLAYBACK)
1058 pa_tagstruct_put_boolean(t, volume_set);
1060 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1063 if (s->context->version >= 15) {
1065 if (s->direction == PA_STREAM_PLAYBACK)
1066 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1068 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1069 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1072 pa_pstream_send_tagstruct(s->context->pstream, t);
1073 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1075 pa_stream_set_state(s, PA_STREAM_CREATING);
1081 int pa_stream_connect_playback(
1084 const pa_buffer_attr *attr,
1085 pa_stream_flags_t flags,
1087 pa_stream *sync_stream) {
1090 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1092 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1095 int pa_stream_connect_record(
1098 const pa_buffer_attr *attr,
1099 pa_stream_flags_t flags) {
1102 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1104 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1107 int pa_stream_write(
1111 void (*free_cb)(void *p),
1113 pa_seek_mode_t seek) {
1116 pa_seek_mode_t t_seek;
1122 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1125 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1126 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1127 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1128 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1138 while (t_length > 0) {
1142 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1143 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1144 chunk.length = t_length;
1148 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1149 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1151 d = pa_memblock_acquire(chunk.memblock);
1152 memcpy(d, t_data, chunk.length);
1153 pa_memblock_release(chunk.memblock);
1156 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1159 t_seek = PA_SEEK_RELATIVE;
1161 t_data = (const uint8_t*) t_data + chunk.length;
1162 t_length -= chunk.length;
1164 pa_memblock_unref(chunk.memblock);
1167 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1168 free_cb((void*) data);
1170 if (length < s->requested_bytes)
1171 s->requested_bytes -= (uint32_t) length;
1173 s->requested_bytes = 0;
1175 /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1177 if (s->direction == PA_STREAM_PLAYBACK) {
1179 /* Update latency request correction */
1180 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1182 if (seek == PA_SEEK_ABSOLUTE) {
1183 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1184 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1185 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1186 } else if (seek == PA_SEEK_RELATIVE) {
1187 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1188 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1190 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1193 /* Update the write index in the already available latency data */
1194 if (s->timing_info_valid) {
1196 if (seek == PA_SEEK_ABSOLUTE) {
1197 s->timing_info.write_index_corrupt = FALSE;
1198 s->timing_info.write_index = offset + (int64_t) length;
1199 } else if (seek == PA_SEEK_RELATIVE) {
1200 if (!s->timing_info.write_index_corrupt)
1201 s->timing_info.write_index += offset + (int64_t) length;
1203 s->timing_info.write_index_corrupt = TRUE;
1206 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1207 request_auto_timing_update(s, TRUE);
1213 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1215 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1219 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1220 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1222 if (!s->peek_memchunk.memblock) {
1224 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1230 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1233 pa_assert(s->peek_data);
1234 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1235 *length = s->peek_memchunk.length;
1239 int pa_stream_drop(pa_stream *s) {
1241 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1243 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1244 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1245 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1247 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1249 /* Fix the simulated local read index */
1250 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1251 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1253 pa_assert(s->peek_data);
1254 pa_memblock_release(s->peek_memchunk.memblock);
1255 pa_memblock_unref(s->peek_memchunk.memblock);
1256 pa_memchunk_reset(&s->peek_memchunk);
1261 size_t pa_stream_writable_size(pa_stream *s) {
1263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1265 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1266 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1268 return s->requested_bytes;
1271 size_t pa_stream_readable_size(pa_stream *s) {
1273 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1275 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1276 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1278 return pa_memblockq_get_length(s->record_memblockq);
1281 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1287 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1292 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1294 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1295 pa_tagstruct_putu32(t, s->channel);
1296 pa_pstream_send_tagstruct(s->context->pstream, t);
1297 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1302 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1306 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1307 pa_assert(s->state == PA_STREAM_READY);
1308 pa_assert(s->direction != PA_STREAM_UPLOAD);
1309 pa_assert(s->timing_info_valid);
1310 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1311 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1313 if (s->direction == PA_STREAM_PLAYBACK) {
1314 /* The last byte that was written into the output device
1315 * had this time value associated */
1316 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1318 if (!s->corked && !s->suspended) {
1320 if (!ignore_transport)
1321 /* Because the latency info took a little time to come
1322 * to us, we assume that the real output time is actually
1324 usec += s->timing_info.transport_usec;
1326 /* However, the output device usually maintains a buffer
1327 too, hence the real sample currently played is a little
1329 if (s->timing_info.sink_usec >= usec)
1332 usec -= s->timing_info.sink_usec;
1336 pa_assert(s->direction == PA_STREAM_RECORD);
1338 /* The last byte written into the server side queue had
1339 * this time value associated */
1340 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1342 if (!s->corked && !s->suspended) {
1344 if (!ignore_transport)
1345 /* Add transport latency */
1346 usec += s->timing_info.transport_usec;
1348 /* Add latency of data in device buffer */
1349 usec += s->timing_info.source_usec;
1351 /* If this is a monitor source, we need to correct the
1352 * time by the playback device buffer */
1353 if (s->timing_info.sink_usec >= usec)
1356 usec -= s->timing_info.sink_usec;
1363 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1364 pa_operation *o = userdata;
1365 struct timeval local, remote, now;
1367 pa_bool_t playing = FALSE;
1368 uint64_t underrun_for = 0, playing_for = 0;
1372 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1374 if (!o->context || !o->stream)
1377 i = &o->stream->timing_info;
1379 o->stream->timing_info_valid = FALSE;
1380 i->write_index_corrupt = TRUE;
1381 i->read_index_corrupt = TRUE;
1383 if (command != PA_COMMAND_REPLY) {
1384 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1389 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1390 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1391 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1392 pa_tagstruct_get_timeval(t, &local) < 0 ||
1393 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1394 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1395 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1397 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1401 if (o->context->version >= 13 &&
1402 o->stream->direction == PA_STREAM_PLAYBACK)
1403 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1404 pa_tagstruct_getu64(t, &playing_for) < 0) {
1406 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1411 if (!pa_tagstruct_eof(t)) {
1412 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1415 o->stream->timing_info_valid = TRUE;
1416 i->write_index_corrupt = FALSE;
1417 i->read_index_corrupt = FALSE;
1419 i->playing = (int) playing;
1420 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1422 pa_gettimeofday(&now);
1424 /* Calculcate timestamps */
1425 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1426 /* local and remote seem to have synchronized clocks */
1428 if (o->stream->direction == PA_STREAM_PLAYBACK)
1429 i->transport_usec = pa_timeval_diff(&remote, &local);
1431 i->transport_usec = pa_timeval_diff(&now, &remote);
1433 i->synchronized_clocks = TRUE;
1434 i->timestamp = remote;
1436 /* clocks are not synchronized, let's estimate latency then */
1437 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1438 i->synchronized_clocks = FALSE;
1439 i->timestamp = local;
1440 pa_timeval_add(&i->timestamp, i->transport_usec);
1443 /* Invalidate read and write indexes if necessary */
1444 if (tag < o->stream->read_index_not_before)
1445 i->read_index_corrupt = TRUE;
1447 if (tag < o->stream->write_index_not_before)
1448 i->write_index_corrupt = TRUE;
1450 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1451 /* Write index correction */
1454 uint32_t ctag = tag;
1456 /* Go through the saved correction values and add up the
1457 * total correction.*/
1458 for (n = 0, j = o->stream->current_write_index_correction+1;
1459 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1460 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1462 /* Step over invalid data or out-of-date data */
1463 if (!o->stream->write_index_corrections[j].valid ||
1464 o->stream->write_index_corrections[j].tag < ctag)
1467 /* Make sure that everything is in order */
1468 ctag = o->stream->write_index_corrections[j].tag+1;
1470 /* Now fix the write index */
1471 if (o->stream->write_index_corrections[j].corrupt) {
1472 /* A corrupting seek was made */
1473 i->write_index_corrupt = TRUE;
1474 } else if (o->stream->write_index_corrections[j].absolute) {
1475 /* An absolute seek was made */
1476 i->write_index = o->stream->write_index_corrections[j].value;
1477 i->write_index_corrupt = FALSE;
1478 } else if (!i->write_index_corrupt) {
1479 /* A relative seek was made */
1480 i->write_index += o->stream->write_index_corrections[j].value;
1484 /* Clear old correction entries */
1485 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1486 if (!o->stream->write_index_corrections[n].valid)
1489 if (o->stream->write_index_corrections[n].tag <= tag)
1490 o->stream->write_index_corrections[n].valid = FALSE;
1494 if (o->stream->direction == PA_STREAM_RECORD) {
1495 /* Read index correction */
1497 if (!i->read_index_corrupt)
1498 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1501 /* Update smoother */
1502 if (o->stream->smoother) {
1505 u = x = pa_rtclock_usec() - i->transport_usec;
1507 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1510 /* If we weren't playing then it will take some time
1511 * until the audio will actually come out through the
1512 * speakers. Since we follow that timing here, we need
1513 * to try to fix this up */
1515 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1517 if (su < i->sink_usec)
1518 x += i->sink_usec - su;
1522 pa_smoother_pause(o->stream->smoother, x);
1524 /* Update the smoother */
1525 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1526 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1527 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1530 pa_smoother_resume(o->stream->smoother, x);
1534 o->stream->auto_timing_update_requested = FALSE;
1536 if (o->stream->latency_update_callback)
1537 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1539 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1540 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1541 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1546 pa_operation_done(o);
1547 pa_operation_unref(o);
1550 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1558 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1560 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1561 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1563 if (s->direction == PA_STREAM_PLAYBACK) {
1564 /* Find a place to store the write_index correction data for this entry */
1565 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1567 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1570 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1572 t = pa_tagstruct_command(
1574 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1576 pa_tagstruct_putu32(t, s->channel);
1577 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1579 pa_pstream_send_tagstruct(s->context->pstream, t);
1580 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1582 if (s->direction == PA_STREAM_PLAYBACK) {
1583 /* Fill in initial correction data */
1585 s->current_write_index_correction = cidx;
1587 s->write_index_corrections[cidx].valid = TRUE;
1588 s->write_index_corrections[cidx].absolute = FALSE;
1589 s->write_index_corrections[cidx].corrupt = FALSE;
1590 s->write_index_corrections[cidx].tag = tag;
1591 s->write_index_corrections[cidx].value = 0;
1597 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1598 pa_stream *s = userdata;
1602 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1606 if (command != PA_COMMAND_REPLY) {
1607 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1610 pa_stream_set_state(s, PA_STREAM_FAILED);
1612 } else if (!pa_tagstruct_eof(t)) {
1613 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1617 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1623 int pa_stream_disconnect(pa_stream *s) {
1628 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1630 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1631 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1635 t = pa_tagstruct_command(
1637 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1638 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1640 pa_tagstruct_putu32(t, s->channel);
1641 pa_pstream_send_tagstruct(s->context->pstream, t);
1642 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1648 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1650 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1652 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1655 s->read_callback = cb;
1656 s->read_userdata = userdata;
1659 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1661 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1666 s->write_callback = cb;
1667 s->write_userdata = userdata;
1670 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1672 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1674 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1677 s->state_callback = cb;
1678 s->state_userdata = userdata;
1681 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1683 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1685 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1688 s->overflow_callback = cb;
1689 s->overflow_userdata = userdata;
1692 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1694 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1696 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1699 s->underflow_callback = cb;
1700 s->underflow_userdata = userdata;
1703 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1705 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1707 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1710 s->latency_update_callback = cb;
1711 s->latency_update_userdata = userdata;
1714 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1716 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1718 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1721 s->moved_callback = cb;
1722 s->moved_userdata = userdata;
1725 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1727 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1729 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1732 s->suspended_callback = cb;
1733 s->suspended_userdata = userdata;
1736 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1738 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1740 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1743 s->started_callback = cb;
1744 s->started_userdata = userdata;
1747 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1749 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1751 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1754 s->event_callback = cb;
1755 s->event_userdata = userdata;
1758 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1759 pa_operation *o = userdata;
1764 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1769 if (command != PA_COMMAND_REPLY) {
1770 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1774 } else if (!pa_tagstruct_eof(t)) {
1775 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1780 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1781 cb(o->stream, success, o->userdata);
1785 pa_operation_done(o);
1786 pa_operation_unref(o);
1789 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1797 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1798 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1802 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1804 t = pa_tagstruct_command(
1806 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1808 pa_tagstruct_putu32(t, s->channel);
1809 pa_tagstruct_put_boolean(t, !!b);
1810 pa_pstream_send_tagstruct(s->context->pstream, t);
1811 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1813 check_smoother_status(s, FALSE, FALSE, FALSE);
1815 /* This might cause the indexes to hang/start again, hence
1816 * let's request a timing update */
1817 request_auto_timing_update(s, TRUE);
1822 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1828 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1830 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1832 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1834 t = pa_tagstruct_command(s->context, command, &tag);
1835 pa_tagstruct_putu32(t, s->channel);
1836 pa_pstream_send_tagstruct(s->context->pstream, t);
1837 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1842 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1846 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1848 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1849 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1851 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1854 if (s->direction == PA_STREAM_PLAYBACK) {
1856 if (s->write_index_corrections[s->current_write_index_correction].valid)
1857 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1859 if (s->buffer_attr.prebuf > 0)
1860 check_smoother_status(s, FALSE, FALSE, TRUE);
1862 /* This will change the write index, but leave the
1863 * read index untouched. */
1864 invalidate_indexes(s, FALSE, TRUE);
1867 /* For record streams this has no influence on the write
1868 * index, but the read index might jump. */
1869 invalidate_indexes(s, TRUE, FALSE);
1874 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1878 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1880 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1881 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1882 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1884 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1887 /* This might cause the read index to hang again, hence
1888 * let's request a timing update */
1889 request_auto_timing_update(s, TRUE);
1894 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1898 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1900 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1901 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1902 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1904 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1907 /* This might cause the read index to start moving again, hence
1908 * let's request a timing update */
1909 request_auto_timing_update(s, TRUE);
1914 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1924 if (s->context->version >= 13) {
1925 pa_proplist *p = pa_proplist_new();
1927 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1928 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1929 pa_proplist_free(p);
1934 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1935 t = pa_tagstruct_command(
1937 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1939 pa_tagstruct_putu32(t, s->channel);
1940 pa_tagstruct_puts(t, name);
1941 pa_pstream_send_tagstruct(s->context->pstream, t);
1942 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1948 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1952 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1954 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1955 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1956 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
1957 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
1958 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
1961 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
1963 usec = calc_time(s, FALSE);
1965 /* Make sure the time runs monotonically */
1966 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
1967 if (usec < s->previous_time)
1968 usec = s->previous_time;
1970 s->previous_time = usec;
1979 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
1981 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1989 if (negative && s->direction == PA_STREAM_RECORD) {
1997 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2003 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2006 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2007 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2008 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2009 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2010 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2012 if ((r = pa_stream_get_time(s, &t)) < 0)
2015 if (s->direction == PA_STREAM_PLAYBACK)
2016 cindex = s->timing_info.write_index;
2018 cindex = s->timing_info.read_index;
2023 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2025 if (s->direction == PA_STREAM_PLAYBACK)
2026 *r_usec = time_counter_diff(s, c, t, negative);
2028 *r_usec = time_counter_diff(s, t, c, negative);
2033 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2035 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2037 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2038 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2039 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2041 return &s->timing_info;
2044 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2046 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2048 return &s->sample_spec;
2051 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2053 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2055 return &s->channel_map;
2058 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2060 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2062 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2063 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2064 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2066 return &s->buffer_attr;
2069 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2070 pa_operation *o = userdata;
2075 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2080 if (command != PA_COMMAND_REPLY) {
2081 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2086 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2087 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2088 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2089 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2090 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2091 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2094 } else if (o->stream->direction == PA_STREAM_RECORD) {
2095 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2096 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2097 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2102 if (o->stream->context->version >= 13) {
2105 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2106 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2110 if (o->stream->direction == PA_STREAM_RECORD)
2111 o->stream->timing_info.configured_source_usec = usec;
2113 o->stream->timing_info.configured_sink_usec = usec;
2116 if (!pa_tagstruct_eof(t)) {
2117 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2123 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2124 cb(o->stream, success, o->userdata);
2128 pa_operation_done(o);
2129 pa_operation_unref(o);
2133 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2142 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2144 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2146 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2148 t = pa_tagstruct_command(
2150 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2152 pa_tagstruct_putu32(t, s->channel);
2154 pa_tagstruct_putu32(t, attr->maxlength);
2156 if (s->direction == PA_STREAM_PLAYBACK)
2159 PA_TAG_U32, attr->tlength,
2160 PA_TAG_U32, attr->prebuf,
2161 PA_TAG_U32, attr->minreq,
2164 pa_tagstruct_putu32(t, attr->fragsize);
2166 if (s->context->version >= 13)
2167 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2169 if (s->context->version >= 14)
2170 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2172 pa_pstream_send_tagstruct(s->context->pstream, t);
2173 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2175 /* This might cause changes in the read/write indexex, hence let's
2176 * request a timing update */
2177 request_auto_timing_update(s, TRUE);
2182 uint32_t pa_stream_get_device_index(pa_stream *s) {
2184 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2186 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2187 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2188 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2189 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2191 return s->device_index;
2194 const char *pa_stream_get_device_name(pa_stream *s) {
2196 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2201 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2203 return s->device_name;
2206 int pa_stream_is_suspended(pa_stream *s) {
2208 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2210 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2211 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2212 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2214 return s->suspended;
2217 int pa_stream_is_corked(pa_stream *s) {
2219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2221 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2222 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2227 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2228 pa_operation *o = userdata;
2233 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2238 if (command != PA_COMMAND_REPLY) {
2239 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2245 if (!pa_tagstruct_eof(t)) {
2246 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2251 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2252 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2255 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2256 cb(o->stream, success, o->userdata);
2260 pa_operation_done(o);
2261 pa_operation_unref(o);
2265 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2271 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2273 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2274 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2275 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2276 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2277 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2279 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2280 o->private = PA_UINT_TO_PTR(rate);
2282 t = pa_tagstruct_command(
2284 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2286 pa_tagstruct_putu32(t, s->channel);
2287 pa_tagstruct_putu32(t, rate);
2289 pa_pstream_send_tagstruct(s->context->pstream, t);
2290 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2295 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2301 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2303 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2304 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2305 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2306 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2308 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2310 t = pa_tagstruct_command(
2312 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2314 pa_tagstruct_putu32(t, s->channel);
2315 pa_tagstruct_putu32(t, (uint32_t) mode);
2316 pa_tagstruct_put_proplist(t, p);
2318 pa_pstream_send_tagstruct(s->context->pstream, t);
2319 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2321 /* Please note that we don't update s->proplist here, because we
2322 * don't export that field */
2327 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2331 const char * const*k;
2334 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2336 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2337 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2339 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2341 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2343 t = pa_tagstruct_command(
2345 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2347 pa_tagstruct_putu32(t, s->channel);
2349 for (k = keys; *k; k++)
2350 pa_tagstruct_puts(t, *k);
2352 pa_tagstruct_puts(t, NULL);
2354 pa_pstream_send_tagstruct(s->context->pstream, t);
2355 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2357 /* Please note that we don't update s->proplist here, because we
2358 * don't export that field */
2363 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2365 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2367 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2368 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2369 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2371 s->direct_on_input = sink_input_idx;
2376 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2381 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2383 return s->direct_on_input;