From: Lennart Poettering Date: Mon, 1 Oct 2007 16:42:59 +0000 (+0000) Subject: update native protocol to make use of pa_memblockq_pop_missing X-Git-Tag: 1.0_branch~2762^2~1^2~46 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=7c1768d4d1a61b998811c5044e16525b05cd88b0;p=profile%2Fivi%2Fpulseaudio.git update native protocol to make use of pa_memblockq_pop_missing git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1924 fefdeb5f-60dc-0310-8127-8f9354f1896f --- diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index c282c17..9ae0f08 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -80,7 +80,7 @@ typedef struct record_stream { connection *connection; uint32_t index; - + pa_source_output *source_output; pa_memblockq *memblockq; size_t fragment_size; @@ -92,10 +92,10 @@ typedef struct output_stream { typedef struct playback_stream { output_stream parent; - + connection *connection; uint32_t index; - + pa_sink_input *sink_input; pa_memblockq *memblockq; int drain_request; @@ -104,7 +104,7 @@ typedef struct playback_stream { int underrun; pa_atomic_t missing; - size_t last_missing; + size_t minreq; /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */ int64_t read_index, write_index; @@ -113,10 +113,10 @@ typedef struct playback_stream { typedef struct upload_stream { output_stream parent; - + connection *connection; uint32_t index; - + pa_memchunk memchunk; size_t length; char *name; @@ -126,7 +126,7 @@ typedef struct upload_stream { struct connection { pa_msgobject parent; - + int authorized; uint32_t version; pa_protocol_native *protocol; @@ -299,7 +299,7 @@ static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { [PA_COMMAND_SUSPEND_SINK] = command_suspend, [PA_COMMAND_SUSPEND_SOURCE] = command_suspend, - + [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream, [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream, [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream, @@ -360,7 +360,7 @@ static upload_stream* upload_stream_new( const char *name, size_t length) { upload_stream *s; - + pa_assert(c); pa_assert(ss); pa_assert(name); @@ -376,7 +376,7 @@ static upload_stream* upload_stream_new( s->length = length; pa_idxset_put(c->output_streams, s, &s->index); - + return s; } @@ -394,7 +394,7 @@ static void record_stream_unlink(record_stream *s) { pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s); s->connection = NULL; - record_stream_unref(s); + record_stream_unref(s); } static void record_stream_free(pa_object *o) { @@ -402,7 +402,7 @@ static void record_stream_free(pa_object *o) { pa_assert(s); record_stream_unlink(s); - + pa_memblockq_free(s->memblockq); pa_xfree(s); } @@ -413,11 +413,11 @@ static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, i if (!s->connection) return -1; - + switch (code) { - + case RECORD_STREAM_MESSAGE_POST_DATA: - + if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { /* pa_log_warn("Failed to push data into output queue."); */ return -1; @@ -512,7 +512,7 @@ static void playback_stream_unlink(playback_stream *s) { pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s); s->connection = NULL; - playback_stream_unref(s); + playback_stream_unref(s); } static void playback_stream_free(pa_object* o) { @@ -520,7 +520,7 @@ static void playback_stream_free(pa_object* o) { pa_assert(s); playback_stream_unlink(s); - + pa_memblockq_free(s->memblockq); pa_xfree(s); } @@ -535,23 +535,26 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, switch (code) { case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: { pa_tagstruct *t; - int32_t l = 0; + uint32_t l = 0; for (;;) { int32_t k; - + if ((k = pa_atomic_load(&s->missing)) <= 0) break; l += k; - + + if (l < s->minreq) + break; + if (pa_atomic_sub(&s->missing, k) <= k) break; } - if (l <= 0) + if (l < s->minreq) break; - + t = pa_tagstruct_new(NULL, 0); pa_tagstruct_putu32(t, PA_COMMAND_REQUEST); pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */ @@ -689,16 +692,16 @@ static playback_stream* playback_stream_new( *tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq); *prebuf = (uint32_t) pa_memblockq_get_prebuf(s->memblockq); *minreq = (uint32_t) pa_memblockq_get_minreq(s->memblockq); - *missing = (uint32_t) pa_memblockq_missing(s->memblockq); - + *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq); + + s->minreq = pa_memblockq_get_minreq(s->memblockq); pa_atomic_store(&s->missing, 0); - s->last_missing = *missing; s->drain_request = 0; pa_idxset_put(c->output_streams, s, &s->index); pa_sink_input_put(s->sink_input); - + return s; } @@ -708,9 +711,9 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int6 if (!c->protocol) return -1; - + switch (code) { - + case CONNECTION_MESSAGE_REVOKE: pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata)); break; @@ -751,7 +754,7 @@ static void connection_unlink(connection *c) { c->protocol->core->mainloop->time_free(c->auth_timeout_event); c->auth_timeout_event = NULL; } - + pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c); c->protocol = NULL; connection_unref(c); @@ -759,11 +762,11 @@ static void connection_unlink(connection *c) { static void connection_free(pa_object *o) { connection *c = CONNECTION(o); - + pa_assert(c); connection_unlink(c); - + pa_idxset_free(c->record_streams, NULL, NULL); pa_idxset_free(c->output_streams, NULL, NULL); @@ -776,23 +779,19 @@ static void connection_free(pa_object *o) { /* Called from thread context */ static void request_bytes(playback_stream *s) { - size_t new_missing, delta, previous_missing; - size_t minreq; + size_t m, previous_missing; playback_stream_assert_ref(s); - new_missing = pa_memblockq_missing(s->memblockq); - delta = new_missing > s->last_missing ? new_missing - s->last_missing : 0; - s->last_missing = new_missing; + m = pa_memblockq_pop_missing(s->memblockq); - if (delta <= 0) + if (m <= 0) return; -/* pa_log("request_bytes(%u)", delta); */ - minreq = pa_memblockq_get_minreq(s->memblockq); +/* pa_log("request_bytes(%u)", m); */ - previous_missing = pa_atomic_add(&s->missing, delta); - if (previous_missing < minreq && previous_missing+delta >= minreq) { + previous_missing = pa_atomic_add(&s->missing, m); + if (previous_missing < s->minreq && previous_missing+m >= s->minreq) { pa_assert(pa_thread_mq_get()); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); } @@ -821,7 +820,7 @@ static void send_memblock(connection *c) { schunk.length = r->fragment_size; pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk); - + pa_memblockq_drop(r->memblockq, schunk.length); pa_memblock_unref(schunk.memblock); @@ -865,7 +864,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int switch (code) { - case SINK_INPUT_MESSAGE_SEEK: + case SINK_INPUT_MESSAGE_SEEK: pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata)); request_bytes(s); return 0; @@ -873,6 +872,8 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int case SINK_INPUT_MESSAGE_POST_DATA: { pa_assert(chunk); +/* pa_log("sink input post: %u", chunk->length); */ + if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { pa_log_warn("Failed to push data into queue"); @@ -904,7 +905,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int case SINK_INPUT_MESSAGE_FLUSH: case SINK_INPUT_MESSAGE_PREBUF_FORCE: case SINK_INPUT_MESSAGE_TRIGGER: { - + pa_sink_input *isync; void (*func)(pa_memblockq *bq); @@ -912,11 +913,11 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int case SINK_INPUT_MESSAGE_FLUSH: func = pa_memblockq_flush; break; - + case SINK_INPUT_MESSAGE_PREBUF_FORCE: func = pa_memblockq_prebuf_force; break; - + case SINK_INPUT_MESSAGE_TRIGGER: func = pa_memblockq_prebuf_disable; break; @@ -924,7 +925,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int default: pa_assert_not_reached(); } - + func(s->memblockq); s->underrun = 0; request_bytes(s); @@ -943,17 +944,17 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int ssync->underrun = 0; request_bytes(ssync); } - + return 0; } - case SINK_INPUT_MESSAGE_UPDATE_LATENCY: + case SINK_INPUT_MESSAGE_UPDATE_LATENCY: s->read_index = pa_memblockq_get_read_index(s->memblockq); s->write_index = pa_memblockq_get_write_index(s->memblockq); s->resampled_chunk_length = s->sink_input->thread_info.resampled_chunk.memblock ? s->sink_input->thread_info.resampled_chunk.length : 0; return 0; - + case PA_SINK_INPUT_MESSAGE_SET_STATE: pa_memblockq_prebuf_force(s->memblockq); @@ -993,7 +994,7 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun return -1; } -/* pa_log("peek: %u", chunk->length); */ +/* pa_log("peek: %u", chunk->length); */ request_bytes(s); @@ -1018,7 +1019,7 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) { request_bytes(s); -/* pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */ +/* pa_log("after_drop: %u %u", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */ } static void sink_input_kill_cb(pa_sink_input *i) { @@ -1103,7 +1104,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC pa_sink *sink = NULL; pa_cvolume volume; int corked; - + connection_assert_ref(c); pa_assert(t); @@ -1156,7 +1157,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC pa_tagstruct_putu32(reply, missing); /* pa_log("initial request is %u", missing); */ - + if (c->version >= 9) { /* Since 0.9 we support sending the buffer metrics back to the client */ @@ -1185,25 +1186,25 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); switch (command) { - + case PA_COMMAND_DELETE_PLAYBACK_STREAM: { playback_stream *s; if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) { pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST); return; } - + playback_stream_unlink(s); break; } - + case PA_COMMAND_DELETE_RECORD_STREAM: { record_stream *s; if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) { pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST); return; } - + record_stream_unlink(s); break; } @@ -1215,7 +1216,7 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST); return; } - + upload_stream_unlink(s); break; } @@ -1294,7 +1295,7 @@ static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t connection_assert_ref(c); pa_assert(t); - + if (!pa_tagstruct_eof(t)) { protocol_error(c); return; @@ -1427,7 +1428,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin connection *c = CONNECTION(userdata); const char *name; uint32_t idx = PA_IDXSET_INVALID; - + connection_assert_ref(c); pa_assert(t); @@ -1532,12 +1533,12 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_ CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY); CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY); CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY) - + reply = reply_new(tag); - + latency = pa_sink_get_latency(s->sink_input->sink); - latency += pa_bytes_to_usec(s->resampled_chunk_length, &s->sink_input->sample_spec); - + latency += pa_bytes_to_usec(s->resampled_chunk_length, &s->sink_input->sample_spec); + pa_tagstruct_put_usec(reply, latency); pa_tagstruct_put_usec(reply, 0); @@ -1792,7 +1793,7 @@ static void sink_input_fill_tagstruct(connection *c, pa_tagstruct *t, pa_sink_in pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s))); pa_tagstruct_puts(t, s->driver); if (c->version >= 11) - pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s)); + pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s)); } static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) { @@ -1815,7 +1816,7 @@ static void source_output_fill_tagstruct(pa_tagstruct *t, pa_source_output *s) { static void scache_fill_tagstruct(pa_tagstruct *t, pa_scache_entry *e) { pa_assert(t); pa_assert(e); - + pa_tagstruct_putu32(t, e->index); pa_tagstruct_puts(t, e->name); pa_tagstruct_put_cvolume(t, &e->volume); @@ -2073,7 +2074,7 @@ static void command_set_volume( CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID); switch (command) { - + case PA_COMMAND_SET_SINK_VOLUME: if (idx != PA_INVALID_INDEX) sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx); @@ -2087,7 +2088,7 @@ static void command_set_volume( else source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1); break; - + case PA_COMMAND_SET_SINK_INPUT_VOLUME: si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx); break; @@ -2139,7 +2140,7 @@ static void command_set_mute( CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || !name || (*name && pa_utf8_valid(name)), tag, PA_ERR_INVALID); switch (command) { - + case PA_COMMAND_SET_SINK_MUTE: if (idx != PA_INVALID_INDEX) @@ -2227,7 +2228,7 @@ static void command_trigger_or_flush_or_prebuf_playback_stream(PA_GCC_UNUSED pa_ case PA_COMMAND_FLUSH_PLAYBACK_STREAM: pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL); break; - + case PA_COMMAND_PREBUF_PLAYBACK_STREAM: pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL); break; @@ -2275,7 +2276,7 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U connection_assert_ref(c); pa_assert(t); - + if (pa_tagstruct_getu32(t, &idx) < 0 || !pa_tagstruct_eof(t)) { protocol_error(c); @@ -2405,7 +2406,7 @@ static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui connection_assert_ref(c); pa_assert(t); - + if (pa_tagstruct_gets(t, &name) < 0 || pa_tagstruct_gets(t, &argument) < 0 || !pa_tagstruct_eof(t)) { @@ -2562,7 +2563,7 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { connection *c = CONNECTION(userdata); pa_tagstruct *reply; - + connection_assert_ref(c); pa_assert(t); @@ -2680,7 +2681,7 @@ static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa if (idx != PA_INVALID_INDEX) sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx); - else + else sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1); CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY); @@ -2693,9 +2694,9 @@ static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa } else { pa_assert(command == PA_COMMAND_SUSPEND_SOURCE); - + if (idx == PA_INVALID_INDEX && name && !*name) { - + if (pa_source_suspend_all(c->protocol->core, b) < 0) { pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID); return; @@ -2708,7 +2709,7 @@ static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa source = pa_idxset_get_by_index(c->protocol->core->sources, idx); else source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1); - + CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY); if (pa_source_suspend(source, b) < 0) { @@ -2739,7 +2740,7 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_c static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) { connection *c = CONNECTION(userdata); output_stream *stream; - + pa_assert(p); pa_assert(chunk); connection_assert_ref(c); @@ -2752,12 +2753,12 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o if (playback_stream_isinstance(stream)) { playback_stream *ps = PLAYBACK_STREAM(stream); - + if (seek != PA_SEEK_RELATIVE || offset != 0) pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL); - + pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL); - + } else { upload_stream *u = UPLOAD_STREAM(stream); size_t l; @@ -2799,7 +2800,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o static void pstream_die_callback(pa_pstream *p, void *userdata) { connection *c = CONNECTION(userdata); - + pa_assert(p); connection_assert_ref(c); @@ -2818,7 +2819,7 @@ static void pstream_drain_callback(pa_pstream *p, void *userdata) { static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) { pa_thread_mq *q; - + if (!(q = pa_thread_mq_get())) pa_pstream_send_revoke(p, block_id); else @@ -2827,7 +2828,7 @@ static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *user static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) { pa_thread_mq *q; - + if (!(q = pa_thread_mq_get())) pa_pstream_send_release(p, block_id); else @@ -2838,7 +2839,7 @@ static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *use static void client_kill_cb(pa_client *c) { pa_assert(c); - + connection_unlink(CONNECTION(c->userdata)); } @@ -2846,7 +2847,7 @@ static void client_kill_cb(pa_client *c) { static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) { connection *c = CONNECTION(userdata); - + pa_assert(m); pa_assert(tv); connection_assert_ref(c); @@ -3075,7 +3076,7 @@ pa_protocol_native* pa_protocol_native_new_iochannel( pa_iochannel *io, pa_module *m, pa_modargs *ma) { - + pa_protocol_native *p; if (!(p = protocol_new_internal(core, m, ma)))