#include <pulse/def.h>
#include <pulse/timeval.h>
+#include <pulse/rtclock.h>
#include <pulse/xmalloc.h>
+#include <pulse/fork-detect.h>
#include <pulsecore/pstream-util.h>
#include <pulsecore/log.h>
#include <pulsecore/hashmap.h>
#include <pulsecore/macro.h>
-#include <pulsecore/rtclock.h>
+#include <pulsecore/core-rtclock.h>
+#include <pulsecore/core-util.h>
-#include "fork-detect.h"
#include "internal.h"
+#include "stream.h"
-#define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
+#define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
+#define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
#define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
#define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
s->started_userdata = NULL;
s->event_callback = NULL;
s->event_userdata = NULL;
+ s->buffer_attr_callback = NULL;
+ s->buffer_attr_userdata = NULL;
}
-pa_stream *pa_stream_new_with_proplist(
+static pa_stream *pa_stream_new_with_proplist_internal(
pa_context *c,
const char *name,
const pa_sample_spec *ss,
const pa_channel_map *map,
+ pa_format_info * const *formats,
pa_proplist *p) {
pa_stream *s;
int i;
- pa_channel_map tmap;
pa_assert(c);
pa_assert(PA_REFCNT_VALUE(c) >= 1);
+ pa_assert((ss == NULL && map == NULL) || formats == NULL);
PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
- PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
- PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
- PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
- PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
- PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
- if (!map)
- PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
-
s = pa_xnew(pa_stream, 1);
PA_REFCNT_INIT(s);
s->context = c;
s->state = PA_STREAM_UNCONNECTED;
s->flags = 0;
- s->sample_spec = *ss;
- s->channel_map = *map;
+ if (ss)
+ s->sample_spec = *ss;
+ else
+ s->sample_spec.format = PA_SAMPLE_INVALID;
+
+ if (map)
+ s->channel_map = *map;
+ else
+ pa_channel_map_init(&s->channel_map);
+
+ s->n_formats = 0;
+ if (formats) {
+ for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
+ s->n_formats++;
+ s->req_formats[i] = pa_format_info_copy(formats[i]);
+ }
+ /* Make sure the input array was NULL-terminated */
+ pa_assert(formats[i] == NULL);
+ }
+
+ /* We'll get the final negotiated format after connecting */
+ s->format = NULL;
s->direct_on_input = PA_INVALID_INDEX;
* what older PA versions provided. */
s->buffer_attr.maxlength = (uint32_t) -1;
- s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
+ if (ss)
+ s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
+ else {
+ /* FIXME: We assume a worst-case compressed format corresponding to
+ * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
+ pa_sample_spec tmp_ss = {
+ .format = PA_SAMPLE_S16NE,
+ .rate = 48000,
+ .channels = 2,
+ };
+ s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
+ }
s->buffer_attr.minreq = (uint32_t) -1;
s->buffer_attr.prebuf = (uint32_t) -1;
s->buffer_attr.fragsize = (uint32_t) -1;
s->suspended = FALSE;
s->corked = FALSE;
+ s->write_memblock = NULL;
+ s->write_data = NULL;
+
pa_memchunk_reset(&s->peek_memchunk);
s->peek_data = NULL;
-
s->record_memblockq = NULL;
-
memset(&s->timing_info, 0, sizeof(s->timing_info));
s->timing_info_valid = FALSE;
s->auto_timing_update_event = NULL;
s->auto_timing_update_requested = FALSE;
+ s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
reset_callbacks(s);
return s;
}
+pa_stream *pa_stream_new_with_proplist(
+ pa_context *c,
+ const char *name,
+ const pa_sample_spec *ss,
+ const pa_channel_map *map,
+ pa_proplist *p) {
+
+ pa_channel_map tmap;
+
+ PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
+
+ if (!map)
+ PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
+
+ return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
+}
+
+pa_stream *pa_stream_new_extended(
+ pa_context *c,
+ const char *name,
+ pa_format_info * const *formats,
+ pa_proplist *p) {
+
+ PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
+
+ return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
+}
+
static void stream_unlink(pa_stream *s) {
pa_operation *o, *n;
pa_assert(s);
pa_pdispatch_unregister_reply(s->context->pdispatch, s);
if (s->channel_valid) {
- pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
+ pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
s->channel = 0;
s->channel_valid = FALSE;
}
}
static void stream_free(pa_stream *s) {
+ unsigned int i;
+
pa_assert(s);
stream_unlink(s);
+ if (s->write_memblock) {
+ pa_memblock_release(s->write_memblock);
+ pa_memblock_unref(s->write_data);
+ }
+
if (s->peek_memchunk.memblock) {
if (s->peek_data)
pa_memblock_release(s->peek_memchunk.memblock);
if (s->smoother)
pa_smoother_free(s->smoother);
+ for (i = 0; i < s->n_formats; i++)
+ pa_xfree(s->req_formats[i]);
+
pa_xfree(s->device_name);
pa_xfree(s);
}
(force || !s->auto_timing_update_requested)) {
pa_operation *o;
-/* pa_log("automatically requesting new timing data"); */
+/* pa_log("Automatically requesting new timing data"); */
if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
pa_operation_unref(o);
}
if (s->auto_timing_update_event) {
- struct timeval next;
- pa_gettimeofday(&next);
- pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
- s->mainloop->time_restart(s->auto_timing_update_event, &next);
+ if (s->suspended && !force) {
+ pa_assert(s->mainloop);
+ s->mainloop->time_free(s->auto_timing_update_event);
+ s->auto_timing_update_event = NULL;
+ } else {
+ if (force)
+ s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
+
+ pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
+
+ s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
+ }
}
}
goto finish;
}
- if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
+ if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
if (!s->smoother)
return;
- x = pa_rtclock_usec();
+ x = pa_rtclock_now();
if (s->timing_info_valid) {
if (aposteriori)
x -= s->timing_info.transport_usec;
else
x += s->timing_info.transport_usec;
-
- if (s->direction == PA_STREAM_PLAYBACK)
- /* it takes a while until the pause/resume is actually
- * audible */
- x += s->timing_info.sink_usec;
- else
- /* Data froma while back will be dropped */
- x -= s->timing_info.source_usec;
}
if (s->suspended || s->corked || force_stop)
pa_smoother_pause(s->smoother, x);
- else if (force_start || s->buffer_attr.prebuf == 0)
- pa_smoother_resume(s->smoother, x);
+ else if (force_start || s->buffer_attr.prebuf == 0) {
+
+ if (!s->timing_info_valid &&
+ !aposteriori &&
+ !force_start &&
+ !force_stop &&
+ s->context->version >= 13) {
+
+ /* If the server supports STARTED events we take them as
+ * indications when audio really starts/stops playing, if
+ * we don't have any timing info yet -- instead of trying
+ * to be smart and guessing the server time. Otherwise the
+ * unknown transport delay add too much noise to our time
+ * calculations. */
+
+ return;
+ }
+
+ pa_smoother_resume(s->smoother, x, TRUE);
+ }
/* Please note that we have no idea if playback actually started
* if prebuf is non-zero! */
}
+static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
+
void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_context *c = userdata;
pa_stream *s;
const char *dn;
pa_bool_t suspended;
uint32_t di;
- pa_usec_t usec;
+ pa_usec_t usec = 0;
uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
pa_assert(pd);
goto finish;
}
- if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
+ if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
s->suspended = suspended;
+ if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
+ s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
+ s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
+ request_auto_timing_update(s, TRUE);
+ }
+
check_smoother_status(s, TRUE, FALSE, FALSE);
request_auto_timing_update(s, TRUE);
pa_context_unref(c);
}
+void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_context *c = userdata;
+ pa_stream *s;
+ uint32_t channel;
+ pa_usec_t usec = 0;
+ uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
+
+ pa_assert(pd);
+ pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
+ pa_assert(t);
+ pa_assert(c);
+ pa_assert(PA_REFCNT_VALUE(c) >= 1);
+
+ pa_context_ref(c);
+
+ if (c->version < 15) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
+
+ if (pa_tagstruct_getu32(t, &channel) < 0) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
+
+ if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
+ if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
+ pa_tagstruct_getu32(t, &fragsize) < 0 ||
+ pa_tagstruct_get_usec(t, &usec) < 0) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
+ } else {
+ if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
+ pa_tagstruct_getu32(t, &tlength) < 0 ||
+ pa_tagstruct_getu32(t, &prebuf) < 0 ||
+ pa_tagstruct_getu32(t, &minreq) < 0 ||
+ pa_tagstruct_get_usec(t, &usec) < 0) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
+ }
+
+ if (!pa_tagstruct_eof(t)) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
+
+ if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
+ goto finish;
+
+ if (s->state != PA_STREAM_READY)
+ goto finish;
+
+ if (s->direction == PA_STREAM_RECORD)
+ s->timing_info.configured_source_usec = usec;
+ else
+ s->timing_info.configured_sink_usec = usec;
+
+ s->buffer_attr.maxlength = maxlength;
+ s->buffer_attr.fragsize = fragsize;
+ s->buffer_attr.tlength = tlength;
+ s->buffer_attr.prebuf = prebuf;
+ s->buffer_attr.minreq = minreq;
+
+ request_auto_timing_update(s, TRUE);
+
+ if (s->buffer_attr_callback)
+ s->buffer_attr_callback(s, s->buffer_attr_userdata);
+
+finish:
+ pa_context_unref(c);
+}
+
void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_context *c = userdata;
pa_stream *s;
goto finish;
}
- if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
+ if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
s->suspended = suspended;
+ if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
+ s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
+ s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
+ request_auto_timing_update(s, TRUE);
+ }
+
check_smoother_status(s, TRUE, FALSE, FALSE);
request_auto_timing_update(s, TRUE);
goto finish;
}
- if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+ if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
goto finish;
}
- if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
+ if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
goto finish;
+ if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
+ /* Let client know what the running time was when the stream had to be
+ * killed */
+ pa_usec_t time;
+ if (pa_stream_get_time(s, &time) == 0)
+ pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
+ }
+
if (s->event_callback)
s->event_callback(s, event, pl, s->event_userdata);
goto finish;
}
- if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+ if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
s->requested_bytes += bytes;
+ /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
+
if (s->requested_bytes > 0 && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
finish:
pa_context_unref(c);
goto finish;
}
- if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+ if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
goto finish;
if (s->state != PA_STREAM_READY)
s->underflow_callback(s, s->underflow_userdata);
}
- finish:
+finish:
pa_context_unref(c);
}
request_auto_timing_update(s, TRUE);
}
-static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
+static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
pa_stream *s = userdata;
pa_assert(s);
pa_stream_set_state(s, PA_STREAM_READY);
if (s->requested_bytes > 0 && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
- struct timeval tv;
- pa_gettimeofday(&tv);
- tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
+ s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
pa_assert(!s->auto_timing_update_event);
- s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
+ s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
request_auto_timing_update(s, TRUE);
}
check_smoother_status(s, TRUE, FALSE, FALSE);
}
-static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
+static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
+ const char *e;
+
pa_assert(s);
pa_assert(attr);
- pa_assert(ss);
+
+ if ((e = getenv("PULSE_LATENCY_MSEC"))) {
+ uint32_t ms;
+
+ if (pa_atou(e, &ms) < 0 || ms <= 0)
+ pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
+ else {
+ attr->maxlength = (uint32_t) -1;
+ attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
+ attr->minreq = (uint32_t) -1;
+ attr->prebuf = (uint32_t) -1;
+ attr->fragsize = attr->tlength;
+ }
+
+ if (flags)
+ *flags |= PA_STREAM_ADJUST_LATENCY;
+ }
if (s->context->version >= 13)
return;
attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
if (attr->tlength == (uint32_t) -1)
- attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
+ attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
if (attr->minreq == (uint32_t) -1)
attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_stream *s = userdata;
+ uint32_t requested_bytes = 0;
pa_assert(pd);
pa_assert(s);
if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
s->channel == PA_INVALID_INDEX ||
- ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
- ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
+ ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
+ ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
+ s->requested_bytes = (int64_t) requested_bytes;
+
if (s->context->version >= 9) {
if (s->direction == PA_STREAM_PLAYBACK) {
if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
ss.channels != cm.channels ||
!pa_channel_map_valid(&cm) ||
!pa_sample_spec_valid(&ss) ||
- (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
- (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
- (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
+ (s->n_formats == 0 && (
+ (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
+ (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
+ (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
s->timing_info.configured_sink_usec = usec;
}
+ if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
+ pa_format_info *f = pa_format_info_new();
+ pa_tagstruct_get_format_info(t, f);
+
+ if (pa_format_info_valid(f))
+ s->format = f;
+ else {
+ pa_format_info_free(f);
+ if (s->n_formats > 0) {
+ /* We used the extended API, so we should have got back a proper format */
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
+ goto finish;
+ }
+ }
+ }
+
if (!pa_tagstruct_eof(t)) {
pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
s->channel_valid = TRUE;
- pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
+ pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
create_stream_complete(s);
pa_tagstruct *t;
uint32_t tag;
pa_bool_t volume_set = FALSE;
+ uint32_t i;
pa_assert(s);
pa_assert(PA_REFCNT_VALUE(s) >= 1);
PA_STREAM_EARLY_REQUESTS|
PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
PA_STREAM_START_UNMUTED|
- PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
+ PA_STREAM_FAIL_ON_SUSPEND|
+ PA_STREAM_RELATIVE_VOLUME|
+ PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
+
PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
- PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
pa_stream_ref(s);
s->direction = direction;
- s->flags = flags;
- s->corked = !!(flags & PA_STREAM_START_CORKED);
if (sync_stream)
s->syncid = sync_stream->syncid;
if (attr)
s->buffer_attr = *attr;
- automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
+ patch_buffer_attr(s, &s->buffer_attr, &flags);
+
+ s->flags = flags;
+ s->corked = !!(flags & PA_STREAM_START_CORKED);
if (flags & PA_STREAM_INTERPOLATE_TIMING) {
pa_usec_t x;
- if (s->smoother)
- pa_smoother_free(s->smoother);
-
- s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
-
- x = pa_rtclock_usec();
- pa_smoother_set_time_offset(s->smoother, x);
- pa_smoother_pause(s->smoother, x);
+ x = pa_rtclock_now();
+
+ pa_assert(!s->smoother);
+ s->smoother = pa_smoother_new(
+ SMOOTHER_ADJUST_TIME,
+ SMOOTHER_HISTORY_TIME,
+ !(flags & PA_STREAM_NOT_MONOTONIC),
+ TRUE,
+ SMOOTHER_MIN_HISTORY,
+ x,
+ TRUE);
}
if (!dev)
volume_set = !!volume;
- if (!volume)
- volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
+ if (!volume) {
+ if (pa_sample_spec_valid(&s->sample_spec))
+ volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
+ else {
+ /* This is not really relevant, since no volume was set, and
+ * the real number of channels is embedded in the format_info
+ * structure */
+ volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
+ }
+ }
pa_tagstruct_put_cvolume(t, volume);
} else
pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
}
+ if (s->context->version >= 17) {
+
+ if (s->direction == PA_STREAM_PLAYBACK)
+ pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
+
+ }
+
+ if (s->context->version >= 18) {
+
+ if (s->direction == PA_STREAM_PLAYBACK)
+ pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
+ }
+
+ if (s->context->version >= 21) {
+
+ if (s->direction == PA_STREAM_PLAYBACK) {
+ pa_tagstruct_putu8(t, s->n_formats);
+ for (i = 0; i < s->n_formats; i++)
+ pa_tagstruct_put_format_info(t, s->req_formats[i]);
+ }
+ }
+
pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
const char *dev,
const pa_buffer_attr *attr,
pa_stream_flags_t flags,
- pa_cvolume *volume,
+ const pa_cvolume *volume,
pa_stream *sync_stream) {
pa_assert(s);
return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
}
+int pa_stream_begin_write(
+ pa_stream *s,
+ void **data,
+ size_t *nbytes) {
+
+ pa_assert(s);
+ pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+ PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
+
+ if (*nbytes != (size_t) -1) {
+ size_t m, fs;
+
+ m = pa_mempool_block_size_max(s->context->mempool);
+ fs = pa_frame_size(&s->sample_spec);
+
+ m = (m / fs) * fs;
+ if (*nbytes > m)
+ *nbytes = m;
+ }
+
+ if (!s->write_memblock) {
+ s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
+ s->write_data = pa_memblock_acquire(s->write_memblock);
+ }
+
+ *data = s->write_data;
+ *nbytes = pa_memblock_get_length(s->write_memblock);
+
+ return 0;
+}
+
+int pa_stream_cancel_write(
+ pa_stream *s) {
+
+ pa_assert(s);
+ pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+ PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
+
+ pa_assert(s->write_data);
+
+ pa_memblock_release(s->write_memblock);
+ pa_memblock_unref(s->write_memblock);
+ s->write_memblock = NULL;
+ s->write_data = NULL;
+
+ return 0;
+}
+
int pa_stream_write(
pa_stream *s,
const void *data,
size_t length,
- void (*free_cb)(void *p),
+ pa_free_cb_t free_cb,
int64_t offset,
pa_seek_mode_t seek) {
- pa_memchunk chunk;
- pa_seek_mode_t t_seek;
- int64_t t_offset;
- size_t t_length;
- const void *t_data;
-
pa_assert(s);
pa_assert(PA_REFCNT_VALUE(s) >= 1);
pa_assert(data);
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context,
+ !s->write_memblock ||
+ ((data >= s->write_data) &&
+ ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
+ PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
- if (length <= 0)
- return 0;
+ if (s->write_memblock) {
+ pa_memchunk chunk;
- t_seek = seek;
- t_offset = offset;
- t_length = length;
- t_data = data;
+ /* pa_stream_write_begin() was called before */
- while (t_length > 0) {
+ pa_memblock_release(s->write_memblock);
- chunk.index = 0;
+ chunk.memblock = s->write_memblock;
+ chunk.index = (const char *) data - (const char *) s->write_data;
+ chunk.length = length;
- if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
- chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
- chunk.length = t_length;
- } else {
- void *d;
+ s->write_memblock = NULL;
+ s->write_data = NULL;
- chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
- chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
+ pa_memblock_unref(chunk.memblock);
- d = pa_memblock_acquire(chunk.memblock);
- memcpy(d, t_data, chunk.length);
- pa_memblock_release(chunk.memblock);
- }
+ } else {
+ pa_seek_mode_t t_seek = seek;
+ int64_t t_offset = offset;
+ size_t t_length = length;
+ const void *t_data = data;
- pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
+ /* pa_stream_write_begin() was not called before */
- t_offset = 0;
- t_seek = PA_SEEK_RELATIVE;
+ while (t_length > 0) {
+ pa_memchunk chunk;
- t_data = (const uint8_t*) t_data + chunk.length;
- t_length -= chunk.length;
+ chunk.index = 0;
- pa_memblock_unref(chunk.memblock);
- }
+ if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
+ chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
+ chunk.length = t_length;
+ } else {
+ void *d;
- if (free_cb && pa_pstream_get_shm(s->context->pstream))
- free_cb((void*) data);
+ chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
+ chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
- if (length < s->requested_bytes)
- s->requested_bytes -= (uint32_t) length;
- else
- s->requested_bytes = 0;
+ d = pa_memblock_acquire(chunk.memblock);
+ memcpy(d, t_data, chunk.length);
+ pa_memblock_release(chunk.memblock);
+ }
+
+ pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
+
+ t_offset = 0;
+ t_seek = PA_SEEK_RELATIVE;
+
+ t_data = (const uint8_t*) t_data + chunk.length;
+ t_length -= chunk.length;
+
+ pa_memblock_unref(chunk.memblock);
+ }
+
+ if (free_cb && pa_pstream_get_shm(s->context->pstream))
+ free_cb((void*) data);
+ }
- /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
+ /* This is obviously wrong since we ignore the seeking index . But
+ * that's OK, the server side applies the same error */
+ s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
+
+ /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
if (s->direction == PA_STREAM_PLAYBACK) {
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
- return s->requested_bytes;
+ return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
}
size_t pa_stream_readable_size(pa_stream *s) {
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
+ /* Ask for a timing update before we cork/uncork to get the best
+ * accuracy for the transport latency suitable for the
+ * check_smoother_status() call in the started callback */
+ request_auto_timing_update(s, TRUE);
+
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
+ /* This might cause the read index to conitnue again, hence
+ * let's request a timing update */
+ request_auto_timing_update(s, TRUE);
+
return o;
}
i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
}
- /* Update smoother */
- if (o->stream->smoother) {
+ /* Update smoother if we're not corked */
+ if (o->stream->smoother && !o->stream->corked) {
pa_usec_t u, x;
- u = x = pa_rtclock_usec() - i->transport_usec;
+ u = x = pa_rtclock_now() - i->transport_usec;
if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
pa_usec_t su;
pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
if (i->playing)
- pa_smoother_resume(o->stream->smoother, x);
+ pa_smoother_resume(o->stream->smoother, x, TRUE);
}
}
s->event_userdata = userdata;
}
+void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ pa_assert(s);
+ pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+ if (pa_detect_fork())
+ return;
+
+ if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
+ return;
+
+ s->buffer_attr_callback = cb;
+ s->buffer_attr_userdata = userdata;
+}
+
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_operation *o = userdata;
int success = 1;
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ /* Ask for a timing update before we cork/uncork to get the best
+ * accuracy for the transport latency suitable for the
+ * check_smoother_status() call in the started callback */
+ request_auto_timing_update(s, TRUE);
+
s->corked = b;
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
check_smoother_status(s, FALSE, FALSE, FALSE);
- /* This might cause the indexes to hang/start again, hence
- * let's request a timing update */
+ /* This might cause the indexes to hang/start again, hence let's
+ * request a timing update, after the cork/uncork, too */
request_auto_timing_update(s, TRUE);
return o;
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ /* Ask for a timing update *before* the flush, so that the
+ * transport usec is as up to date as possible when we get the
+ * underflow message and update the smoother status*/
+ request_auto_timing_update(s, TRUE);
+
if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
return NULL;
* index, but the read index might jump. */
invalidate_indexes(s, TRUE, FALSE);
+ /* Note that we do not update requested_bytes here. This is
+ * because we cannot really know how data actually was dropped
+ * from the write index due to this. This 'error' will be applied
+ * by both client and server and hence we should be fine. */
+
return o;
}
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
+ /* Ask for a timing update before we cork/uncork to get the best
+ * accuracy for the transport latency suitable for the
+ * check_smoother_status() call in the started callback */
+ request_auto_timing_update(s, TRUE);
+
if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
return NULL;
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
+ /* Ask for a timing update before we cork/uncork to get the best
+ * accuracy for the transport latency suitable for the
+ * check_smoother_status() call in the started callback */
+ request_auto_timing_update(s, TRUE);
+
if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
return NULL;
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
if (s->smoother)
- usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
+ usec = pa_smoother_get(s->smoother, pa_rtclock_now());
else
usec = calc_time(s, FALSE);
return &s->channel_map;
}
+const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
+ pa_assert(s);
+ pa_assert(PA_REFCNT_VALUE(s) >= 1);
+
+ /* We don't have the format till routing is done */
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
+
+ return s->format;
+}
const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
pa_assert(s);
pa_assert(PA_REFCNT_VALUE(s) >= 1);
pa_operation *o;
pa_tagstruct *t;
uint32_t tag;
+ pa_buffer_attr copy;
pa_assert(s);
pa_assert(PA_REFCNT_VALUE(s) >= 1);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
+ /* Ask for a timing update before we cork/uncork to get the best
+ * accuracy for the transport latency suitable for the
+ * check_smoother_status() call in the started callback */
+ request_auto_timing_update(s, TRUE);
+
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
t = pa_tagstruct_command(
&tag);
pa_tagstruct_putu32(t, s->channel);
+ copy = *attr;
+ patch_buffer_attr(s, ©, NULL);
+ attr = ©
+
pa_tagstruct_putu32(t, attr->maxlength);
if (s->direction == PA_STREAM_PLAYBACK)