From 23d01bb75db12ceaa263fa830b74cf8669ef2dd9 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 5 Aug 2007 00:07:58 +0000 Subject: [PATCH] Modernize pstream.[ch], reintroduce defer event to make things actually work git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1572 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/pstream.c | 154 ++++++++++++++++++++++++++++-------------------- src/pulsecore/pstream.h | 2 +- 2 files changed, 90 insertions(+), 66 deletions(-) diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index f4aab1c..0ffa583 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -28,7 +28,6 @@ #include #include -#include #include #ifdef HAVE_SYS_SOCKET_H @@ -168,8 +167,8 @@ static int do_write(pa_pstream *p); static int do_read(pa_pstream *p); static void do_something(pa_pstream *p) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); pa_pstream_ref(p); @@ -191,31 +190,42 @@ static void do_something(pa_pstream *p) { fail: - p->dead = 1; - if (p->die_callback) p->die_callback(p, p->die_callback_userdata); + pa_pstream_unlink(p); pa_pstream_unref(p); } static void io_callback(pa_iochannel*io, void *userdata) { pa_pstream *p = userdata; - assert(p); - assert(p->io == io); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p->io == io); do_something(p); } +static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) { + pa_pstream *p = userdata; + + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p->defer_event == e); + pa_assert(p->mainloop == m); + + do_something(p); +} + static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata); pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) { pa_pstream *p; - assert(m); - assert(io); - assert(pool); + pa_assert(m); + pa_assert(io); + pa_assert(pool); p = pa_xnew(pa_pstream, 1); PA_REFCNT_INIT(p); @@ -224,9 +234,11 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo p->dead = 0; p->mainloop = m; - + p->defer_event = m->defer_new(m, defer_callback, p); + m->defer_enable(p->defer_event, 0); + p->send_queue = pa_queue_new(); - assert(p->send_queue); + pa_assert(p->send_queue); p->write.current = NULL; p->write.index = 0; @@ -264,13 +276,13 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo static void item_free(void *item, PA_GCC_UNUSED void *p) { struct item_info *i = item; - assert(i); + pa_assert(i); if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) { - assert(i->chunk.memblock); + pa_assert(i->chunk.memblock); pa_memblock_unref(i->chunk.memblock); } else if (i->type == PA_PSTREAM_ITEM_PACKET) { - assert(i->packet); + pa_assert(i->packet); pa_packet_unref(i->packet); } @@ -278,9 +290,9 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) { } static void pstream_free(pa_pstream *p) { - assert(p); + pa_assert(p); - pa_pstream_close(p); + pa_pstream_unlink(p); pa_queue_free(p->send_queue, item_free, NULL); @@ -302,9 +314,9 @@ static void pstream_free(pa_pstream *p) { void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) { struct item_info *i; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); - assert(packet); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(packet); if (p->dead) return; @@ -319,15 +331,17 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre #endif pa_queue_push(p->send_queue, i); + + p->mainloop->defer_enable(p->defer_event, 1); } void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) { size_t length, idx; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); - assert(channel != (uint32_t) -1); - assert(chunk); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(channel != (uint32_t) -1); + pa_assert(chunk); if (p->dead) return; @@ -366,8 +380,8 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd struct item_info *item; pa_pstream *p = userdata; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); if (p->dead) return; @@ -382,14 +396,15 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd #endif pa_queue_push(p->send_queue, item); + p->mainloop->defer_enable(p->defer_event, 1); } static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) { struct item_info *item; pa_pstream *p = userdata; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); if (p->dead) return; @@ -403,11 +418,12 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda #endif pa_queue_push(p->send_queue, item); + p->mainloop->defer_enable(p->defer_event, 1); } static void prepare_next_write_item(pa_pstream *p) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); p->write.current = pa_queue_pop(p->send_queue); @@ -426,7 +442,7 @@ static void prepare_next_write_item(pa_pstream *p) { if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) { - assert(p->write.current->packet); + pa_assert(p->write.current->packet); p->write.data = p->write.current->packet->data; p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length); @@ -444,8 +460,8 @@ static void prepare_next_write_item(pa_pstream *p) { uint32_t flags; int send_payload = 1; - assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); - assert(p->write.current->chunk.memblock); + pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); + pa_assert(p->write.current->chunk.memblock); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32)); @@ -457,7 +473,7 @@ static void prepare_next_write_item(pa_pstream *p) { uint32_t block_id, shm_id; size_t offset, length; - assert(p->export); + pa_assert(p->export); if (pa_memexport_put(p->export, p->write.current->chunk.memblock, @@ -503,8 +519,8 @@ static int do_write(pa_pstream *p) { ssize_t r; pa_memblock *release_memblock = NULL; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); if (!p->write.current) prepare_next_write_item(p); @@ -516,7 +532,7 @@ static int do_write(pa_pstream *p) { d = (uint8_t*) p->write.descriptor + p->write.index; l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index; } else { - assert(p->write.data || p->write.memchunk.memblock); + pa_assert(p->write.data || p->write.memchunk.memblock); if (p->write.data) d = p->write.data; @@ -529,7 +545,7 @@ static int do_write(pa_pstream *p) { l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE); } - assert(l > 0); + pa_assert(l > 0); #ifdef HAVE_CREDS if (p->send_creds_now) { @@ -550,7 +566,7 @@ static int do_write(pa_pstream *p) { p->write.index += r; if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { - assert(p->write.current); + pa_assert(p->write.current); item_free(p->write.current, (void *) 1); p->write.current = NULL; @@ -573,14 +589,14 @@ static int do_read(pa_pstream *p) { size_t l; ssize_t r; pa_memblock *release_memblock = NULL; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) { d = (uint8_t*) p->read.descriptor + p->read.index; l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index; } else { - assert(p->read.data || p->read.memblock); + pa_assert(p->read.data || p->read.memblock); if (p->read.data) d = p->read.data; @@ -629,7 +645,7 @@ static int do_read(pa_pstream *p) { /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ - assert(p->export); + pa_assert(p->export); pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); goto frame_done; @@ -640,7 +656,7 @@ static int do_read(pa_pstream *p) { /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ - assert(p->import); + pa_assert(p->import); pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); goto frame_done; @@ -653,7 +669,7 @@ static int do_read(pa_pstream *p) { return -1; } - assert(!p->read.packet && !p->read.memblock); + pa_assert(!p->read.packet && !p->read.memblock); channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); @@ -757,9 +773,9 @@ static int do_read(pa_pstream *p) { } else { pa_memblock *b; - assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); + pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); - assert(p->import); + pa_assert(p->import); if (!(b = pa_memimport_get(p->import, ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]), @@ -821,32 +837,32 @@ fail: } void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); p->die_callback = cb; p->die_callback_userdata = userdata; } void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); p->drain_callback = cb; p->drain_callback_userdata = userdata; } void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); p->recieve_packet_callback = cb; p->recieve_packet_callback_userdata = userdata; } void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); p->recieve_memblock_callback = cb; p->recieve_memblock_callback_userdata = userdata; @@ -855,8 +871,8 @@ void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock int pa_pstream_is_pending(pa_pstream *p) { int b; - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); if (p->dead) b = 0; @@ -867,24 +883,27 @@ int pa_pstream_is_pending(pa_pstream *p) { } void pa_pstream_unref(pa_pstream*p) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); if (PA_REFCNT_DEC(p) <= 0) pstream_free(p); } pa_pstream* pa_pstream_ref(pa_pstream*p) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); PA_REFCNT_INC(p); return p; } -void pa_pstream_close(pa_pstream *p) { - assert(p); +void pa_pstream_unlink(pa_pstream *p) { + pa_assert(p); + if (p->dead) + return; + p->dead = 1; if (p->import) { @@ -902,6 +921,11 @@ void pa_pstream_close(pa_pstream *p) { p->io = NULL; } + if (p->defer_event) { + p->mainloop->defer_free(p->defer_event); + p->defer_event = NULL; + } + p->die_callback = NULL; p->drain_callback = NULL; p->recieve_packet_callback = NULL; @@ -909,8 +933,8 @@ void pa_pstream_close(pa_pstream *p) { } void pa_pstream_use_shm(pa_pstream *p, int enable) { - assert(p); - assert(PA_REFCNT_VALUE(p) > 0); + pa_assert(p); + pa_assert(PA_REFCNT_VALUE(p) > 0); p->use_shm = enable; diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h index 5900ece..544cba4 100644 --- a/src/pulsecore/pstream.h +++ b/src/pulsecore/pstream.h @@ -59,6 +59,6 @@ int pa_pstream_is_pending(pa_pstream *p); void pa_pstream_use_shm(pa_pstream *p, int enable); -void pa_pstream_close(pa_pstream *p); +void pa_pstream_unlink(pa_pstream *p); #endif -- 2.7.4