pa_atomic_t missing;
size_t last_missing;
+
+ /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
+ int64_t read_index, write_index;
+ size_t resampled_chunk_length;
} playback_stream;
typedef struct upload_stream {
SINK_INPUT_MESSAGE_FLUSH,
SINK_INPUT_MESSAGE_TRIGGER,
SINK_INPUT_MESSAGE_SEEK,
- SINK_INPUT_MESSAGE_PREBUF_FORCE
+ SINK_INPUT_MESSAGE_PREBUF_FORCE,
+ SINK_INPUT_MESSAGE_UPDATE_LATENCY
};
enum {
pa_xfree(c);
}
+/* Called from thread context */
static void request_bytes(playback_stream *s) {
size_t new_missing, delta, previous_missing;
-
/* pa_log("request_bytes()"); */
playback_stream_assert_ref(s);
pa_pstream_send_tagstruct(r->connection->pstream, t);
}
-/*** sinkinput callbacks ***/
+/*** sink input callbacks ***/
+/* Called from thread context */
static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o);
playback_stream *s;
return 0;
}
+
+ case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
+
+ s->read_index = pa_memblockq_get_read_index(s->memblockq);
+ s->write_index = pa_memblockq_get_write_index(s->memblockq);
+ s->resampled_chunk_length = s->sink_input->thread_info.resampled_chunk.memblock ? s->sink_input->thread_info.resampled_chunk.length : 0;
+ return 0;
case PA_SINK_INPUT_MESSAGE_SET_STATE:
return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
}
+/* Called from thread context */
static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
playback_stream *s;
return 0;
}
+/* Called from thread context */
static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
playback_stream *s;
/*** source_output callbacks ***/
+/* Called from thread context */
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
record_stream *s;
s = pa_idxset_get_by_index(c->output_streams, idx);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
-
+ CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY)
+
reply = reply_new(tag);
-
+
latency = pa_sink_get_latency(s->sink_input->sink);
-/* if (s->sink_input->resampled_chunk.memblock) */ /* FIXME*/
-/* latency += pa_bytes_to_usec(s->sink_input->resampled_chunk.length, &s->sink_input->sample_spec); */
+ latency += pa_bytes_to_usec(s->resampled_chunk_length, &s->sink_input->sample_spec);
+
pa_tagstruct_put_usec(reply, latency);
pa_tagstruct_put_usec(reply, 0);
pa_tagstruct_put_boolean(reply, pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
pa_tagstruct_put_timeval(reply, &tv);
pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
- pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
- pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
+ pa_tagstruct_puts64(reply, s->write_index);
+ pa_tagstruct_puts64(reply, s->read_index);
pa_pstream_send_tagstruct(c->pstream, reply);
}