Rework memory management to allow shared memory data transfer. The central idea
authorLennart Poettering <lennart@poettering.net>
Fri, 18 Aug 2006 19:55:18 +0000 (19:55 +0000)
committerLennart Poettering <lennart@poettering.net>
Fri, 18 Aug 2006 19:55:18 +0000 (19:55 +0000)
is to allocate all audio memory blocks from a per-process memory pool which is
available as read-only SHM segment to other local processes. Then, instead of
writing the actual audio data to the socket just write references to this
shared memory pool.

To work optimally all memory blocks should now be of type PA_MEMBLOCK_POOL or
PA_MEMBLOCK_POOL_EXTERNAL. The function pa_memblock_new() now generates memory
blocks of this type by default.

git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1266 fefdeb5f-60dc-0310-8127-8f9354f1896f

45 files changed:
src/daemon/main.c
src/modules/module-alsa-sink.c
src/modules/module-alsa-source.c
src/modules/module-combine.c
src/modules/module-jack-source.c
src/modules/module-oss-mmap.c
src/modules/module-oss.c
src/modules/module-pipe-source.c
src/modules/module-sine.c
src/modules/module-tunnel.c
src/modules/rtp/module-rtp-recv.c
src/modules/rtp/module-rtp-send.c
src/modules/rtp/rtp.c
src/modules/rtp/rtp.h
src/pulse/context.c
src/pulse/internal.h
src/pulse/stream.c
src/pulsecore/cli-command.c
src/pulsecore/core-scache.c
src/pulsecore/core.c
src/pulsecore/core.h
src/pulsecore/mcalign.c
src/pulsecore/mcalign.h
src/pulsecore/memblock.c
src/pulsecore/memblock.h
src/pulsecore/memblockq.c
src/pulsecore/memblockq.h
src/pulsecore/memchunk.c
src/pulsecore/memchunk.h
src/pulsecore/protocol-esound.c
src/pulsecore/protocol-native.c
src/pulsecore/protocol-simple.c
src/pulsecore/pstream.c
src/pulsecore/pstream.h
src/pulsecore/resampler.c
src/pulsecore/resampler.h
src/pulsecore/sample-util.c
src/pulsecore/sample-util.h
src/pulsecore/sink-input.c
src/pulsecore/sink.c
src/pulsecore/sound-file-stream.c
src/pulsecore/sound-file.c
src/pulsecore/sound-file.h
src/pulsecore/source-output.c
src/pulsecore/source.c

index 38d465f..aada0ad 100644 (file)
@@ -559,7 +559,7 @@ int main(int argc, char *argv[]) {
     mainloop = pa_mainloop_new();
     assert(mainloop);
 
-    c = pa_core_new(pa_mainloop_get_api(mainloop));
+    c = pa_core_new(pa_mainloop_get_api(mainloop), 1);
     assert(c);
     c->is_system_instance = !!conf->system_instance;
 
index 8da3d23..0cebd50 100644 (file)
@@ -492,7 +492,7 @@ int pa__init(pa_core *c, pa_module*m) {
 
     pa_log_info(__FILE__": using %u fragments of size %lu bytes.", periods, (long unsigned)u->fragment_size);
 
-    u->silence.memblock = pa_memblock_new(u->silence.length = u->fragment_size, c->memblock_stat);
+    u->silence.memblock = pa_memblock_new(c->mempool, u->silence.length = u->fragment_size);
     assert(u->silence.memblock);
     pa_silence_memblock(u->silence.memblock, &ss);
     u->silence.index = 0;
index 4a8678c..c3979df 100644 (file)
@@ -151,7 +151,7 @@ static void do_read(struct userdata *u) {
         size_t l;
         
         if (!u->memchunk.memblock) {
-            u->memchunk.memblock = pa_memblock_new(u->memchunk.length = u->fragment_size, u->source->core->memblock_stat);
+            u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size);
             u->memchunk.index = 0;
         }
             
index 008fe6e..5243975 100644 (file)
@@ -235,8 +235,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink, int resample
             pa_frame_size(&u->sink->sample_spec),
             1,
             0,
-            NULL,
-            sink->core->memblock_stat);
+            NULL);
 
     snprintf(t, sizeof(t), "%s: output #%u", u->sink->name, u->n_outputs+1);
 
index 583f3b8..8e65919 100644 (file)
@@ -137,7 +137,7 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_
         
         fs = pa_frame_size(&u->source->sample_spec);
 
-        chunk.memblock = pa_memblock_new(chunk.length = u->frames_posted * fs, u->core->memblock_stat);
+        chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs);
         chunk.index = 0;
         
         for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) {
index c783a2f..75ab9a9 100644 (file)
@@ -162,10 +162,10 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) {
             
         chunk.memblock = u->out_memblocks[u->out_current] =
             pa_memblock_new_fixed(
+                    u->core->mempool,
                     (uint8_t*) u->out_mmap+u->out_fragment_size*u->out_current,
                     u->out_fragment_size,
-                    1,
-                    u->core->memblock_stat);
+                    1);
         assert(chunk.memblock);
         chunk.length = chunk.memblock->length;
         chunk.index = 0;
@@ -210,7 +210,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) {
         pa_memchunk chunk;
         
         if (!u->in_memblocks[u->in_current]) {
-            chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed((uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1, u->core->memblock_stat);
+            chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1);
             chunk.length = chunk.memblock->length;
             chunk.index = 0;
             
index ce11ee0..b9b80e7 100644 (file)
@@ -217,7 +217,7 @@ static void do_read(struct userdata *u) {
     }
     
     do {
-        memchunk.memblock = pa_memblock_new(l, u->core->memblock_stat);
+        memchunk.memblock = pa_memblock_new(u->core->mempool, l);
         assert(memchunk.memblock);
         if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) {
             pa_memblock_unref(memchunk.memblock);
@@ -503,7 +503,7 @@ int pa__init(pa_core *c, pa_module*m) {
 
     u->out_fragment_size = out_frag_size;
     u->in_fragment_size = in_frag_size;
-    u->silence.memblock = pa_memblock_new(u->silence.length = u->out_fragment_size, u->core->memblock_stat);
+    u->silence.memblock = pa_memblock_new(u->core->mempool, u->silence.length = u->out_fragment_size);
     assert(u->silence.memblock);
     pa_silence_memblock(u->silence.memblock, &ss);
     u->silence.index = 0;
index 5caa60a..43a8dab 100644 (file)
@@ -91,7 +91,7 @@ static void do_read(struct userdata *u) {
     pa_module_set_used(u->module, pa_idxset_size(u->source->outputs));
 
     if (!u->chunk.memblock) {
-        u->chunk.memblock = pa_memblock_new(1024, u->core->memblock_stat);
+        u->chunk.memblock = pa_memblock_new(u->core->mempool, PIPE_BUF);
         u->chunk.index = chunk.length = 0;
     }
 
index 5ceddce..89c3c60 100644 (file)
@@ -139,7 +139,7 @@ int pa__init(pa_core *c, pa_module*m) {
         goto fail;
     }
     
-    u->memblock = pa_memblock_new(pa_bytes_per_second(&ss), c->memblock_stat);
+    u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss));
     calc_sine(u->memblock->data, u->memblock->length, frequency);
 
     snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency);
index 9bb11c0..53bffd3 100644 (file)
@@ -651,7 +651,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
         return;
     }
 
-    u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->memblock_stat);
+    u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
     u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
 
     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
index df6f8c1..5d3f3e2 100644 (file)
@@ -150,7 +150,7 @@ static void rtp_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event
     assert(fd == s->rtp_context.fd);
     assert(flags == PA_IO_EVENT_INPUT);
 
-    if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->memblock_stat) < 0)
+    if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->mempool) < 0)
         return;
 
     if (s->sdp_info.payload != s->rtp_context.payload) {
@@ -312,10 +312,10 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
     s->sink_input->kill = sink_input_kill;
     s->sink_input->get_latency = sink_input_get_latency;
 
-    silence = pa_silence_memblock_new(&s->sink_input->sample_spec,
+    silence = pa_silence_memblock_new(s->userdata->core->mempool,
+                                      &s->sink_input->sample_spec,
                                       (pa_bytes_per_second(&s->sink_input->sample_spec)/128/pa_frame_size(&s->sink_input->sample_spec))*
-                                      pa_frame_size(&s->sink_input->sample_spec),
-                                      s->userdata->core->memblock_stat);
+                                      pa_frame_size(&s->sink_input->sample_spec));
     
     s->memblockq = pa_memblockq_new(
             0,
@@ -324,8 +324,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
             pa_frame_size(&s->sink_input->sample_spec),
             pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
             0,
-            silence,
-            u->core->memblock_stat);
+            silence);
 
     pa_memblock_unref(silence);
 
index 759aa81..1b85c84 100644 (file)
@@ -297,8 +297,7 @@ int pa__init(pa_core *c, pa_module*m) {
             pa_frame_size(&ss),
             1,
             0,
-            NULL,
-            c->memblock_stat);
+            NULL);
 
     u->mtu = mtu;
     
index ee037d4..8e77c60 100644 (file)
@@ -149,7 +149,7 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame
     return c;
 }
 
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
     int size;
     struct msghdr m;
     struct iovec iov;
@@ -170,7 +170,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
     if (!size)
         return 0;
 
-    chunk->memblock = pa_memblock_new(size, st);
+    chunk->memblock = pa_memblock_new(pool, size);
 
     iov.iov_base = chunk->memblock->data;
     iov.iov_len = size;
index 35fbbd3..123602b 100644 (file)
@@ -41,7 +41,7 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr
 int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q);
 
 pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size);
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool);
 
 void pa_rtp_context_destroy(pa_rtp_context *c);
 
index 34f517f..b353054 100644 (file)
@@ -128,7 +128,7 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) {
     c->subscribe_callback = NULL;
     c->subscribe_userdata = NULL;
 
-    c->memblock_stat = pa_memblock_stat_new();
+    c->mempool = pa_mempool_new(1);
     c->local = -1;
     c->server_list = NULL;
     c->server = NULL;
@@ -177,7 +177,7 @@ static void context_free(pa_context *c) {
     if (c->playback_streams)
         pa_dynarray_free(c->playback_streams, NULL, NULL);
 
-    pa_memblock_stat_unref(c->memblock_stat);
+    pa_mempool_free(c->mempool);
 
     if (c->conf)
         pa_client_conf_free(c->conf);
@@ -407,7 +407,9 @@ static void setup_context(pa_context *c, pa_iochannel *io) {
     pa_context_ref(c);
     
     assert(!c->pstream);
-    c->pstream = pa_pstream_new(c->mainloop, io, c->memblock_stat);
+    c->pstream = pa_pstream_new(c->mainloop, io, c->mempool);
+
+    pa_pstream_use_shm(c->pstream, 1);
     
     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
index 96028d8..afcfaef 100644 (file)
@@ -69,7 +69,7 @@ struct pa_context {
     pa_context_subscribe_cb_t subscribe_callback;
     void *subscribe_userdata;
 
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
     int local;
     int do_autospawn;
index 677df00..180cd09 100644 (file)
@@ -437,8 +437,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
                 pa_frame_size(&s->sample_spec),
                 1,
                 0,
-                NULL,
-                s->context->memblock_stat);
+                NULL);
     }
 
     s->channel_valid = 1;
@@ -604,9 +603,9 @@ int pa_stream_write(
         return 0;
 
     if (free_cb) 
-        chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
+        chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1);
     else {
-        chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
+        chunk.memblock = pa_memblock_new(s->context->mempool, length);
         memcpy(chunk.memblock->data, data, length);
     }
         
index f74258d..811b96d 100644 (file)
@@ -100,6 +100,7 @@ static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int
 static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
 
 /* A method table for all available commands */
 
@@ -144,6 +145,7 @@ static const struct command commands[] = {
     { "list-props",              pa_cli_command_list_props,         NULL, 1},
     { "move-sink-input",         pa_cli_command_move_sink_input,    "Move sink input to another sink (args: index, sink)", 3},
     { "move-source-output",      pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3},
+    { "vacuum",                  pa_cli_command_vacuum,             NULL, 1},
     { NULL, NULL, NULL, 0 }
 };
 
@@ -239,23 +241,32 @@ static int pa_cli_command_source_outputs(pa_core *c, pa_tokenizer *t, pa_strbuf
 
 static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) {
     char s[256];
+    const pa_mempool_stat *stat;
     assert(c && t);
 
-    pa_bytes_snprint(s, sizeof(s), c->memblock_stat->total_size);
+    stat = pa_mempool_get_stat(c->mempool);
+    
     pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n",
-                     c->memblock_stat->total,
-                     s);
+                     stat->n_allocated,
+                     pa_bytes_snprint(s, sizeof(s), stat->allocated_size));
 
-    pa_bytes_snprint(s, sizeof(s), c->memblock_stat->allocated_size);
     pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n",
-                     c->memblock_stat->allocated,
-                     s);
+                     stat->n_accumulated,
+                     pa_bytes_snprint(s, sizeof(s), stat->accumulated_size));
+
+    pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n",
+                     stat->n_imported,
+                     pa_bytes_snprint(s, sizeof(s), stat->imported_size));
 
-    pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c));
-    pa_strbuf_printf(buf, "Total sample cache size: %s.\n", s);
+    pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n",
+                     stat->n_exported,
+                     pa_bytes_snprint(s, sizeof(s), stat->exported_size));
 
-    pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec);
-    pa_strbuf_printf(buf, "Default sample spec: %s\n", s);
+    pa_strbuf_printf(buf, "Total sample cache size: %s.\n",
+                     pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c)));
+
+    pa_strbuf_printf(buf, "Default sample spec: %s\n",
+                     pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec));
 
     pa_strbuf_printf(buf, "Default sink name: %s\n"
                      "Default source name: %s\n",
@@ -731,6 +742,15 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf
     return 0;
 }
 
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+    assert(c);
+    assert(t);
+
+    pa_mempool_vacuum(c->mempool);
+    
+    return 0;
+}
+
 static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
     const char *n, *k;
     pa_sink_input *si;
index 377dd56..ca2408f 100644 (file)
@@ -176,7 +176,7 @@ int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint3
         filename = buf;
 #endif
 
-    if (pa_sound_file_load(filename, &ss, &map, &chunk, c->memblock_stat) < 0)
+    if (pa_sound_file_load(c->mempool, filename, &ss, &map, &chunk) < 0)
         return -1;
         
     r = pa_scache_add_item(c, name, &ss, &map, &chunk, idx);
@@ -261,7 +261,7 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t
         return -1;
 
     if (e->lazy && !e->memchunk.memblock) {
-        if (pa_sound_file_load(e->filename, &e->sample_spec, &e->channel_map, &e->memchunk, c->memblock_stat) < 0)
+        if (pa_sound_file_load(c->mempool, e->filename, &e->sample_spec, &e->channel_map, &e->memchunk) < 0)
             return -1;
 
         pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE|PA_SUBSCRIPTION_EVENT_CHANGE, e->index);
index 7f2f0f6..5fdeab5 100644 (file)
@@ -44,7 +44,7 @@
 
 #include "core.h"
 
-pa_core* pa_core_new(pa_mainloop_api *m) {
+pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
     pa_core* c;
     
     c = pa_xnew(pa_core, 1);
@@ -78,7 +78,7 @@ pa_core* pa_core_new(pa_mainloop_api *m) {
     PA_LLIST_HEAD_INIT(pa_subscription_event, c->subscription_event_queue);
     c->subscription_event_last = NULL;
 
-    c->memblock_stat = pa_memblock_stat_new();
+    c->mempool = pa_mempool_new(shared);
 
     c->disallow_module_loading = 0;
 
@@ -139,7 +139,7 @@ void pa_core_free(pa_core *c) {
     pa_xfree(c->default_source_name);
     pa_xfree(c->default_sink_name);
 
-    pa_memblock_stat_unref(c->memblock_stat);
+    pa_mempool_free(c->mempool);
 
     pa_property_cleanup(c);
 
index f9fa386..3a34d29 100644 (file)
@@ -67,7 +67,7 @@ struct pa_core {
     PA_LLIST_HEAD(pa_subscription_event, subscription_event_queue);
     pa_subscription_event *subscription_event_last;
 
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
     int disallow_module_loading, running_as_daemon;
     int exit_idle_time, module_idle_time, scache_idle_time;
@@ -88,7 +88,7 @@ struct pa_core {
         hook_source_disconnect;
 };
 
-pa_core* pa_core_new(pa_mainloop_api *m);
+pa_core* pa_core_new(pa_mainloop_api *m, int shared);
 void pa_core_free(pa_core*c);
 
 /* Check whether noone is connected to this core */
index 8283a7a..9ede610 100644 (file)
 struct pa_mcalign {
     size_t base;
     pa_memchunk leftover, current;
-    pa_memblock_stat *memblock_stat;
 };
 
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
+pa_mcalign *pa_mcalign_new(size_t base) {
     pa_mcalign *m;
     assert(base);
 
@@ -47,7 +46,6 @@ pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
     m->base = base;
     pa_memchunk_reset(&m->leftover);
     pa_memchunk_reset(&m->current);
-    m->memblock_stat = s;
     
     return m;
 }
@@ -100,7 +98,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
                 l = c->length;
 
             /* Can we use the current block? */
-            pa_memchunk_make_writable(&m->leftover, m->memblock_stat, m->base);
+            pa_memchunk_make_writable(&m->leftover, m->base);
 
             memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l);
             m->leftover.length += l;
index 80e3749..94e99e2 100644 (file)
@@ -63,7 +63,7 @@
 
 typedef struct pa_mcalign pa_mcalign;
 
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s);
+pa_mcalign *pa_mcalign_new(size_t base);
 void pa_mcalign_free(pa_mcalign *m);
 
 /* Push a new memchunk into the aligner. The caller of this routine
index 36de17f..4ce1b7c 100644 (file)
 #include <stdlib.h>
 #include <assert.h>
 #include <string.h>
+#include <unistd.h>
 
 #include <pulse/xmalloc.h>
 
+#include <pulsecore/shm.h>
+#include <pulsecore/log.h>
+#include <pulsecore/hashmap.h>
+
 #include "memblock.h"
 
-static void stat_add(pa_memblock*m, pa_memblock_stat *s) {
-    assert(m);
+#define PA_MEMPOOL_SLOTS_MAX 128
+#define PA_MEMPOOL_SLOT_SIZE (16*1024)
 
-    if (!s) {
-        m->stat = NULL;
-        return;
-    }
+#define PA_MEMEXPORT_SLOTS_MAX 128
+
+#define PA_MEMIMPORT_SLOTS_MAX 128
+#define PA_MEMIMPORT_SEGMENTS_MAX 16
+
+struct pa_memimport_segment {
+    pa_memimport *import;
+    pa_shm memory;
+    unsigned n_blocks;
+};
+
+struct pa_memimport {
+    pa_mempool *pool;
+    pa_hashmap *segments;
+    pa_hashmap *blocks;
+
+    /* Called whenever an imported memory block is no longer
+     * needed. */
+    pa_memimport_release_cb_t release_cb;
+    void *userdata;
+
+    PA_LLIST_FIELDS(pa_memimport);
+};
+
+struct memexport_slot {
+    PA_LLIST_FIELDS(struct memexport_slot);
+    pa_memblock *block;
+};
+
+struct pa_memexport {
+    pa_mempool *pool;
+    
+    struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
+    PA_LLIST_HEAD(struct memexport_slot, free_slots);
+    PA_LLIST_HEAD(struct memexport_slot, used_slots);
+    unsigned n_init;
+
+    /* Called whenever a client from which we imported a memory block
+       which we in turn exported to another client dies and we need to
+       revoke the memory block accordingly */
+    pa_memexport_revoke_cb_t revoke_cb;
+    void *userdata;
+
+    PA_LLIST_FIELDS(pa_memexport);
+};
+
+struct mempool_slot {
+    PA_LLIST_FIELDS(struct mempool_slot);
+    /* the actual data follows immediately hereafter */
+};
 
-    m->stat = pa_memblock_stat_ref(s);
-    s->total++;
-    s->allocated++;
-    s->total_size += m->length;
-    s->allocated_size += m->length;
+struct pa_mempool {
+    pa_shm memory;
+    size_t block_size;
+    unsigned n_blocks, n_init;
+
+    PA_LLIST_HEAD(pa_memimport, imports);
+    PA_LLIST_HEAD(pa_memexport, exports);
+
+    /* A list of free slots that may be reused */
+    PA_LLIST_HEAD(struct mempool_slot, free_slots);
+    PA_LLIST_HEAD(struct mempool_slot, used_slots);
+    
+    pa_mempool_stat stat;
+};
+
+static void segment_detach(pa_memimport_segment *seg);
+
+static void stat_add(pa_memblock*b) {
+    assert(b);
+    assert(b->pool);
+
+    b->pool->stat.n_allocated ++;
+    b->pool->stat.n_accumulated ++;
+    b->pool->stat.allocated_size += b->length;
+    b->pool->stat.accumulated_size += b->length;
+
+    if (b->type == PA_MEMBLOCK_IMPORTED) {
+        b->pool->stat.n_imported++;
+        b->pool->stat.imported_size += b->length;
+    }
 }
 
-static void stat_remove(pa_memblock *m) {
-    assert(m);
+static void stat_remove(pa_memblock *b) {
+    assert(b);
+    assert(b->pool);
 
-    if (!m->stat)
-        return;
+    assert(b->pool->stat.n_allocated > 0);
+    assert(b->pool->stat.allocated_size >= b->length);
+           
+    b->pool->stat.n_allocated --;
+    b->pool->stat.allocated_size -= b->length;
+
+    if (b->type == PA_MEMBLOCK_IMPORTED) {
+        assert(b->pool->stat.n_imported > 0);
+        assert(b->pool->stat.imported_size >= b->length);
+        
+        b->pool->stat.n_imported --;
+        b->pool->stat.imported_size -= b->length;
+    }
+}
 
-    m->stat->total--;
-    m->stat->total_size -= m->length;
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
+
+pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
+    pa_memblock *b;
     
-    pa_memblock_stat_unref(m->stat);
-    m->stat = NULL;
+    assert(p);
+    assert(length > 0);
+    
+    if (!(b = pa_memblock_new_pool(p, length)))
+        b = memblock_new_appended(p, length);
+
+    return b;
 }
 
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s) {
-    pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)+length);
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
+    pa_memblock *b;
+
+    assert(p);
+    assert(length > 0);
+
+    b = pa_xmalloc(sizeof(pa_memblock) + length);
     b->type = PA_MEMBLOCK_APPENDED;
+    b->read_only = 0;
     b->ref = 1;
     b->length = length;
-    b->data = b+1;
-    b->free_cb = NULL;
-    b->read_only = 0;
-    stat_add(b, s);
+    b->data = (uint8_t*) b + sizeof(pa_memblock);
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
-pa_memblock *pa_memblock_new_dynamic(void *d, size_t length, pa_memblock_stat*s) {
-    pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
-    b->type = PA_MEMBLOCK_DYNAMIC;
-    b->ref = 1;
+static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
+    struct mempool_slot *slot;
+    assert(p);
+
+    if (p->free_slots) {
+        slot = p->free_slots;
+        PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot);
+    } else if (p->n_init < p->n_blocks)
+        slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++));
+    else {
+        pa_log_debug(__FILE__": Pool full");
+        p->stat.n_pool_full++;
+        return NULL;
+    }
+
+    PA_LLIST_PREPEND(struct mempool_slot, p->used_slots, slot);
+    return slot;
+}
+
+static void* mempool_slot_data(struct mempool_slot *slot) {
+    assert(slot);
+
+    return (uint8_t*) slot + sizeof(struct mempool_slot);
+}
+
+static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
+    assert(p);
+    assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
+    assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
+
+    return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
+}
+
+static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
+    unsigned idx;
+
+    if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
+        return NULL;
+
+    return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
+}
+
+pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
+    pa_memblock *b = NULL;
+    struct mempool_slot *slot;
+
+    assert(p);
+    assert(length > 0);
+
+    if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
+
+        if (!(slot = mempool_allocate_slot(p)))
+            return NULL;
+        
+        b = mempool_slot_data(slot);
+        b->type = PA_MEMBLOCK_POOL;
+        b->data = (uint8_t*) b + sizeof(pa_memblock);
+        
+    } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
+
+        if (!(slot = mempool_allocate_slot(p)))
+            return NULL;
+        
+        b = pa_xnew(pa_memblock, 1);
+        b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+        b->data = mempool_slot_data(slot);
+    } else {
+        pa_log_debug(__FILE__": Memory block to large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
+        p->stat.n_too_large_for_pool++;
+        return NULL;
+    }
+
     b->length = length;
-    b->data = d;
-    b->free_cb = NULL;
     b->read_only = 0;
-    stat_add(b, s);
+    b->ref = 1;
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
-pa_memblock *pa_memblock_new_fixed(void *d, size_t length, int read_only, pa_memblock_stat*s) {
-    pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
+pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
+    pa_memblock *b;
+
+    assert(p);
+    assert(d);
+    assert(length > 0);
+
+    b = pa_xnew(pa_memblock, 1);
     b->type = PA_MEMBLOCK_FIXED;
+    b->read_only = read_only;
     b->ref = 1;
     b->length = length;
     b->data = d;
-    b->free_cb = NULL;
-    b->read_only = read_only;
-    stat_add(b, s);
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
-pa_memblock *pa_memblock_new_user(void *d, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s) {
+pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
     pa_memblock *b;
-    assert(d && length && free_cb);
-    b = pa_xmalloc(sizeof(pa_memblock));
+
+    assert(p);
+    assert(d);
+    assert(length > 0);
+    assert(free_cb);
+    
+    b = pa_xnew(pa_memblock, 1);
     b->type = PA_MEMBLOCK_USER;
+    b->read_only = read_only;
     b->ref = 1;
     b->length = length;
     b->data = d;
-    b->free_cb = free_cb;
-    b->read_only = read_only;
-    stat_add(b, s);
+    b->per_type.user.free_cb = free_cb;
+    b->pool = p;
+
+    stat_add(b);
     return b;
 }
 
@@ -122,52 +307,458 @@ void pa_memblock_unref(pa_memblock*b) {
     assert(b);
     assert(b->ref >= 1);
 
-    if ((--(b->ref)) == 0) {
-        stat_remove(b);
+    if ((--(b->ref)) > 0)
+        return;
+    
+    stat_remove(b);
+
+    switch (b->type) {
+        case PA_MEMBLOCK_USER :
+            assert(b->per_type.user.free_cb);
+            b->per_type.user.free_cb(b->data);
+
+            /* Fall through */
+
+        case PA_MEMBLOCK_FIXED:
+        case PA_MEMBLOCK_APPENDED :
+            pa_xfree(b);
+            break;
+
+        case PA_MEMBLOCK_IMPORTED : {
+            pa_memimport_segment *segment;
+
+            segment = b->per_type.imported.segment;
+            assert(segment);
+            assert(segment->import);
+            
+            pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
+            segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata);
+
+            if (-- segment->n_blocks <= 0)
+                segment_detach(segment);
+            
+            pa_xfree(b);
+            break;
+        }
 
-        if (b->type == PA_MEMBLOCK_USER) {
-            assert(b->free_cb);
-            b->free_cb(b->data);
-        } else if (b->type == PA_MEMBLOCK_DYNAMIC)
-            pa_xfree(b->data);
+        case PA_MEMBLOCK_POOL_EXTERNAL:
+        case PA_MEMBLOCK_POOL: {
+            struct mempool_slot *slot;
 
-        pa_xfree(b);
+            slot = mempool_slot_by_ptr(b->pool, b->data);
+            assert(slot);
+            
+            PA_LLIST_REMOVE(struct mempool_slot, b->pool->used_slots, slot);
+            PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot);
+            
+            if (b->type == PA_MEMBLOCK_POOL_EXTERNAL)
+                pa_xfree(b);
+        }
     }
 }
 
+static void memblock_make_local(pa_memblock *b) {
+    assert(b);
+
+    if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
+        struct mempool_slot *slot;
+
+        if ((slot = mempool_allocate_slot(b->pool))) {
+            void *new_data;
+            /* We can move it into a local pool, perfect! */
+            
+            b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+            b->read_only = 0;
+
+            new_data = mempool_slot_data(slot);
+            memcpy(new_data, b->data, b->length);
+            b->data = new_data;
+            return;
+        }
+    }
+
+    /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
+    b->type = PA_MEMBLOCK_USER;
+    b->per_type.user.free_cb = pa_xfree;
+    b->read_only = 0;
+    b->data = pa_xmemdup(b->data, b->length);
+}
+
 void pa_memblock_unref_fixed(pa_memblock *b) {
-    assert(b && b->ref >= 1 && b->type == PA_MEMBLOCK_FIXED);
+    assert(b);
+    assert(b->ref >= 1);
+    assert(b->type == PA_MEMBLOCK_FIXED);
 
-    if (b->ref == 1)
-        pa_memblock_unref(b);
-    else {
-        b->data = pa_xmemdup(b->data, b->length);
-        b->type = PA_MEMBLOCK_DYNAMIC;
-        b->ref--;
+    if (b->ref > 1)
+        memblock_make_local(b);
+
+    pa_memblock_unref(b);
+}
+
+static void memblock_replace_import(pa_memblock *b) {
+    pa_memimport_segment *seg;
+    
+    assert(b);
+    assert(b->type == PA_MEMBLOCK_IMPORTED);
+
+    assert(b->pool->stat.n_imported > 0);
+    assert(b->pool->stat.imported_size >= b->length);
+    b->pool->stat.n_imported --;
+    b->pool->stat.imported_size -= b->length;
+
+    seg = b->per_type.imported.segment;
+    assert(seg);
+    assert(seg->import);
+
+    pa_hashmap_remove(
+            seg->import->blocks,
+            PA_UINT32_TO_PTR(b->per_type.imported.id));
+
+    memblock_make_local(b);
+
+    if (-- seg->n_blocks <= 0)
+        segment_detach(seg);
+}
+
+pa_mempool* pa_mempool_new(int shared) {
+    size_t ps;
+    pa_mempool *p;
+
+    p = pa_xnew(pa_mempool, 1);
+
+    ps = (size_t) sysconf(_SC_PAGESIZE);
+    
+    p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
+
+    if (p->block_size < ps)
+        p->block_size = ps;
+    
+    p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
+
+    assert(p->block_size > sizeof(struct mempool_slot));
+
+    if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
+        pa_xfree(p);
+        return NULL;
+    }
+
+    p->n_init = 0;
+    
+    PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
+    PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
+    PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots);
+    PA_LLIST_HEAD_INIT(struct mempool_slot, p->used_slots);
+
+    memset(&p->stat, 0, sizeof(p->stat));
+
+    return p;
+}
+
+void pa_mempool_free(pa_mempool *p) {
+    assert(p);
+
+    while (p->imports)
+        pa_memimport_free(p->imports);
+
+    while (p->exports)
+        pa_memexport_free(p->exports);
+
+    if (p->stat.n_allocated > 0)
+        pa_log_warn(__FILE__": WARNING! Memory pool destroyed but not all memory blocks freed!");
+    
+    pa_shm_free(&p->memory);
+    pa_xfree(p);
+}
+
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
+    assert(p);
+
+    return &p->stat;
+}
+
+void pa_mempool_vacuum(pa_mempool *p) {
+    struct mempool_slot *slot;
+    
+    assert(p);
+
+    for (slot = p->free_slots; slot; slot = slot->next) {
+        pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot));
     }
 }
 
-pa_memblock_stat* pa_memblock_stat_new(void) {
-    pa_memblock_stat *s;
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
+    assert(p);
 
-    s = pa_xmalloc(sizeof(pa_memblock_stat));
-    s->ref = 1;
-    s->total = s->total_size = s->allocated = s->allocated_size = 0;
+    if (!p->memory.shared)
+        return -1;
 
-    return s;
+    *id = p->memory.id;
+    
+    return 0;
+}
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
+    pa_memimport *i;
+
+    assert(p);
+    assert(cb);
+    
+    i = pa_xnew(pa_memimport, 1);
+    i->pool = p;
+    i->segments = pa_hashmap_new(NULL, NULL);
+    i->blocks = pa_hashmap_new(NULL, NULL);
+    i->release_cb = cb;
+    i->userdata = userdata;
+    
+    PA_LLIST_PREPEND(pa_memimport, p->imports, i);
+    return i;
 }
 
-void pa_memblock_stat_unref(pa_memblock_stat *s) {
-    assert(s && s->ref >= 1);
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
+
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+    pa_memimport_segment* seg;
 
-    if (!(--(s->ref))) {
-        assert(!s->total);
-        pa_xfree(s);
+    if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
+        return NULL;
+
+    seg = pa_xnew(pa_memimport_segment, 1);
+    
+    if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
+        pa_xfree(seg);
+        return NULL;
     }
+
+    seg->import = i;
+    seg->n_blocks = 0;
+    
+    pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
+    return seg;
+}
+
+static void segment_detach(pa_memimport_segment *seg) {
+    assert(seg);
+
+    pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
+    pa_shm_free(&seg->memory);
+    pa_xfree(seg);
+}
+
+void pa_memimport_free(pa_memimport *i) {
+    pa_memexport *e;
+    pa_memblock *b;
+    
+    assert(i);
+
+    /* If we've exported this block further we need to revoke that export */
+    for (e = i->pool->exports; e; e = e->next)
+        memexport_revoke_blocks(e, i);
+
+    while ((b = pa_hashmap_get_first(i->blocks)))
+        memblock_replace_import(b);
+
+    assert(pa_hashmap_size(i->segments) == 0);
+
+    pa_hashmap_free(i->blocks, NULL, NULL);
+    pa_hashmap_free(i->segments, NULL, NULL);
+    
+    PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
+    pa_xfree(i);
 }
 
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s) {
-    assert(s);
-    s->ref++;
-    return s;
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+    pa_memblock *b;
+    pa_memimport_segment *seg;
+    
+    assert(i);
+
+    if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
+        return NULL;
+
+    if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id)))) 
+        if (!(seg = segment_attach(i, shm_id)))
+            return NULL;
+
+    if (offset+size > seg->memory.size)
+        return NULL;
+    
+    b = pa_xnew(pa_memblock, 1);
+    b->type = PA_MEMBLOCK_IMPORTED;
+    b->read_only = 1;
+    b->ref = 1;
+    b->length = size;
+    b->data = (uint8_t*) seg->memory.ptr + offset;
+    b->pool = i->pool;
+    b->per_type.imported.id = block_id;
+    b->per_type.imported.segment = seg;
+
+    pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
+
+    seg->n_blocks++;
+    
+    stat_add(b);
+    
+    return b;
+}
+
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
+    pa_memblock *b;
+    assert(i);
+
+    if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
+        return -1;
+    
+    memblock_replace_import(b);
+    return 0;
+}
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
+    pa_memexport *e;
+    
+    assert(p);
+    assert(cb);
+
+    if (!p->memory.shared)
+        return NULL;
+    
+    e = pa_xnew(pa_memexport, 1);
+    e->pool = p;
+    PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
+    PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
+    e->n_init = 0;
+    e->revoke_cb = cb;
+    e->userdata = userdata;
+    
+    PA_LLIST_PREPEND(pa_memexport, p->exports, e);
+    return e;
+}
+
+void pa_memexport_free(pa_memexport *e) {
+    assert(e);
+
+    while (e->used_slots)
+        pa_memexport_process_release(e, e->used_slots - e->slots);
+
+    PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
+    pa_xfree(e);
+}
+
+int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
+    assert(e);
+
+    if (id >= e->n_init)
+        return -1;
+
+    if (!e->slots[id].block)
+        return -1;
+
+/*     pa_log("Processing release for %u", id); */
+
+    assert(e->pool->stat.n_exported > 0);
+    assert(e->pool->stat.exported_size >= e->slots[id].block->length);
+    
+    e->pool->stat.n_exported --;
+    e->pool->stat.exported_size -= e->slots[id].block->length;
+    
+    pa_memblock_unref(e->slots[id].block);
+    e->slots[id].block = NULL;
+
+    PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
+    PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
+
+    return 0;
+}
+
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
+    struct memexport_slot *slot, *next;
+    assert(e);
+    assert(i);
+
+    for (slot = e->used_slots; slot; slot = next) {
+        uint32_t idx;
+        next = slot->next;
+        
+        if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
+            slot->block->per_type.imported.segment->import != i)
+            continue;
+
+        idx = slot - e->slots;
+        e->revoke_cb(e, idx, e->userdata);
+        pa_memexport_process_release(e, idx);
+    }
+}
+
+static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
+    pa_memblock *n;
+
+    assert(p);
+    assert(b);
+    
+    if (b->type == PA_MEMBLOCK_IMPORTED ||
+        b->type == PA_MEMBLOCK_POOL ||
+        b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
+        assert(b->pool == p);
+        return pa_memblock_ref(b);
+    }
+
+    if (!(n = pa_memblock_new_pool(p, b->length)))
+        return NULL;
+
+    memcpy(n->data, b->data, b->length);
+    return n;
+}
+
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
+    pa_shm *memory;
+    struct memexport_slot *slot;
+    
+    assert(e);
+    assert(b);
+    assert(block_id);
+    assert(shm_id);
+    assert(offset);
+    assert(size);
+    assert(b->pool == e->pool);
+
+    if (!(b = memblock_shared_copy(e->pool, b)))
+        return -1;
+
+    if (e->free_slots) {
+        slot = e->free_slots;
+        PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
+    } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) {
+        slot = &e->slots[e->n_init++];
+    } else {
+        pa_memblock_unref(b);
+        return -1;
+    }
+
+    PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
+    slot->block = b;
+    *block_id = slot - e->slots;
+
+/*     pa_log("Got block id %u", *block_id); */
+
+    if (b->type == PA_MEMBLOCK_IMPORTED) {
+        assert(b->per_type.imported.segment);
+        memory = &b->per_type.imported.segment->memory;
+    } else {
+        assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
+        assert(b->pool);
+        memory = &b->pool->memory;
+    }
+        
+    assert(b->data >= memory->ptr);
+    assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size);
+    
+    *shm_id = memory->id;
+    *offset = (uint8_t*) b->data - (uint8_t*) memory->ptr;
+    *size = b->length;
+
+    e->pool->stat.n_exported ++;
+    e->pool->stat.exported_size += b->length;
+
+    return 0;
 }
index 04a0b55..e63e1e0 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef foomemblockhfoo
-#define foomemblockhfoo
+#ifndef foopulsememblockhfoo
+#define foopulsememblockhfoo
 
 /* $Id$ */
 
@@ -25,6 +25,8 @@
 #include <sys/types.h>
 #include <inttypes.h>
 
+#include <pulsecore/llist.h>
+
 /* A pa_memblock is a reference counted memory block. PulseAudio
  * passed references to pa_memblocks around instead of copying
  * data. See pa_memchunk for a structure that describes parts of
 
 /* The type of memory this block points to */
 typedef enum pa_memblock_type {
-    PA_MEMBLOCK_FIXED,     /* data is a pointer to fixed memory that needs not to be freed */
-    PA_MEMBLOCK_APPENDED,  /* The most common kind: the data is appended to the memory block */ 
-    PA_MEMBLOCK_DYNAMIC,   /* data is a pointer to some memory allocated with pa_xmalloc() */
-    PA_MEMBLOCK_USER       /* User supplied memory, to be freed with free_cb */
+    PA_MEMBLOCK_POOL,             /* Memory is part of the memory pool */
+    PA_MEMBLOCK_POOL_EXTERNAL,    /* Data memory is part of the memory pool but the pa_memblock structure itself not */
+    PA_MEMBLOCK_APPENDED,         /* the data is appended to the memory block */ 
+    PA_MEMBLOCK_USER,             /* User supplied memory, to be freed with free_cb */
+    PA_MEMBLOCK_FIXED,            /* data is a pointer to fixed memory that needs not to be freed */
+    PA_MEMBLOCK_IMPORTED,         /* Memory is imported from another process via shm */
 } pa_memblock_type_t;
 
-/* A structure of keeping memory block statistics */
-/* Maintains statistics about memory blocks */
-typedef struct pa_memblock_stat {
-    int ref;
-    unsigned total;
-    unsigned total_size;
-    unsigned allocated;
-    unsigned allocated_size;
-} pa_memblock_stat;
-
-typedef struct pa_memblock {
+typedef struct pa_memblock pa_memblock;
+typedef struct pa_mempool pa_mempool;
+typedef struct pa_mempool_stat pa_mempool_stat;
+typedef struct pa_memimport_segment pa_memimport_segment;
+typedef struct pa_memimport pa_memimport;
+typedef struct pa_memexport pa_memexport;
+
+typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata);
+typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata);
+
+struct pa_memblock {
     pa_memblock_type_t type;
-    unsigned ref;  /* the reference counter */
     int read_only; /* boolean */
+    unsigned ref;  /* the reference counter */
     size_t length;
     void *data;
-    void (*free_cb)(void *p);  /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
-    pa_memblock_stat *stat;
-} pa_memblock;
+    pa_mempool *pool;
 
-/* Allocate a new memory block of type PA_MEMBLOCK_APPENDED */
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s);
+    union {
+        struct {
+            void (*free_cb)(void *p);  /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
+        } user;
+            
+        struct  {
+            uint32_t id;
+            pa_memimport_segment *segment;
+        } imported;
+    } per_type;
+};
 
-/* Allocate a new memory block of type PA_MEMBLOCK_DYNAMIC. The pointer data is to be maintained be the memory block */
-pa_memblock *pa_memblock_new_dynamic(void *data, size_t length, pa_memblock_stat*s);
+struct pa_mempool_stat {
+    unsigned n_allocated;
+    unsigned n_accumulated;
+    unsigned n_imported;
+    unsigned n_exported;
+    size_t allocated_size;
+    size_t accumulated_size;
+    size_t imported_size;
+    size_t exported_size;
 
-/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
-pa_memblock *pa_memblock_new_fixed(void *data, size_t length, int read_only, pa_memblock_stat*s);
+    unsigned n_too_large_for_pool;
+    unsigned n_pool_full;
+};
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */
+pa_memblock *pa_memblock_new(pa_mempool *, size_t length);
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL. If the requested size is too large, return NULL */
+pa_memblock *pa_memblock_new_pool(pa_mempool *, size_t length);
 
 /* Allocate a new memory block of type PA_MEMBLOCK_USER */
-pa_memblock *pa_memblock_new_user(void *data, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s);
+pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, void (*free_cb)(void *p), int read_only);
+
+/* A special case of pa_memblock_new_user: take a memory buffer previously allocated with pa_xmalloc()  */
+#define pa_memblock_new_malloced(p,data,length) pa_memblock_new_user(p, data, length, pa_xfree, 0)
+
+/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
+pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, int read_only);
 
 void pa_memblock_unref(pa_memblock*b);
 pa_memblock* pa_memblock_ref(pa_memblock*b);
@@ -79,8 +110,23 @@ references to the memory. This causes the memory to be copied and
 converted into a PA_MEMBLOCK_DYNAMIC type memory block */
 void pa_memblock_unref_fixed(pa_memblock*b);
 
-pa_memblock_stat* pa_memblock_stat_new(void);
-void pa_memblock_stat_unref(pa_memblock_stat *s);
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s);
+/* The memory block manager */
+pa_mempool* pa_mempool_new(int shared);
+void pa_mempool_free(pa_mempool *p);
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
+void pa_mempool_vacuum(pa_mempool *p);
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
+void pa_memimport_free(pa_memimport *i);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
+void pa_memexport_free(pa_memexport *e);
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_process_release(pa_memexport *e, uint32_t id);
 
 #endif
index 822bd66..2fd3885 100644 (file)
@@ -49,7 +49,6 @@ struct pa_memblockq {
     size_t maxlength, tlength, base, prebuf, minreq;
     int64_t read_index, write_index;
     enum { PREBUF, RUNNING } state;
-    pa_memblock_stat *memblock_stat;
     pa_memblock *silence;
     pa_mcalign *mcalign;
 };
@@ -61,8 +60,7 @@ pa_memblockq* pa_memblockq_new(
         size_t base,
         size_t prebuf,
         size_t minreq,
-        pa_memblock *silence,
-        pa_memblock_stat *s) {
+        pa_memblock *silence) {
     
     pa_memblockq* bq;
     
@@ -75,7 +73,6 @@ pa_memblockq* pa_memblockq_new(
 
     bq->base = base;
     bq->read_index = bq->write_index = idx;
-    bq->memblock_stat = s;
 
     pa_log_debug(__FILE__": memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",
         (unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq);
@@ -586,7 +583,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {
         return pa_memblockq_push(bq, chunk);
        
     if (!bq->mcalign)
-        bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat);
+        bq->mcalign = pa_mcalign_new(bq->base);
 
     if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length)))
         return -1;
index c35b62d..4d701a8 100644 (file)
@@ -69,8 +69,7 @@ pa_memblockq* pa_memblockq_new(
         size_t base,
         size_t prebuf, 
         size_t minreq,
-        pa_memblock *silence,
-        pa_memblock_stat *s);
+        pa_memblock *silence);
 
 void pa_memblockq_free(pa_memblockq*bq);
 
index abfc2ca..bcf0ce0 100644 (file)
 
 #include "memchunk.h"
 
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min) {
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
     pa_memblock *n;
     size_t l;
-    assert(c && c->memblock && c->memblock->ref >= 1);
+    
+    assert(c);
+    assert(c->memblock);
+    assert(c->memblock->ref >= 1);
 
     if (c->memblock->ref == 1 && !c->memblock->read_only && c->memblock->length >= c->index+min)
         return;
@@ -44,7 +47,7 @@ void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min)
     if (l < min)
         l = min;
     
-    n = pa_memblock_new(l, s);
+    n = pa_memblock_new(c->memblock->pool, l);
     memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length);
     pa_memblock_unref(c->memblock);
     c->memblock = n;
index 1b26c2e..b8ce624 100644 (file)
@@ -36,7 +36,7 @@ typedef struct pa_memchunk {
 /* Make a memchunk writable, i.e. make sure that the caller may have
  * exclusive access to the memblock and it is not read_only. If needed
  * the memblock in the structure is replaced by a copy. */
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min);
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min);
 
 /* Invalidate a memchunk. This does not free the cotaining memblock,
  * but sets all members to zero. */
index f1a827b..2fadeca 100644 (file)
@@ -377,8 +377,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t
             pa_frame_size(&ss),
             (size_t) -1,
             l/PLAYBACK_BUFFER_FRAGMENTS,
-            NULL,
-            c->protocol->core->memblock_stat);
+            NULL);
     pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2);
     c->playback.fragment_size = l/10;
 
@@ -469,8 +468,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co
             pa_frame_size(&ss),
             1,
             0,
-            NULL,
-            c->protocol->core->memblock_stat);
+            NULL);
     pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2);
     
     c->source_output->push = source_output_push_cb;
@@ -722,7 +720,7 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_
     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
     
     assert(!c->scache.memchunk.memblock);
-    c->scache.memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat);
+    c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length);
     c->scache.memchunk.index = 0;
     c->scache.memchunk.length = sc_length;
     c->scache.sample_spec = ss;
@@ -941,7 +939,7 @@ static int do_read(struct connection *c) {
             }
         
         if (!c->playback.current_memblock) {
-            c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+            c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
             assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
             c->playback.memblock_index = 0;
         }
index 0b79892..2c9b356 100644 (file)
@@ -348,8 +348,7 @@ static struct record_stream* record_stream_new(
             base = pa_frame_size(ss),
             1,
             0,
-            NULL,
-            c->protocol->core->memblock_stat);
+            NULL);
     assert(s->memblockq);
 
     s->fragment_size = (fragment_size/base)*base;
@@ -448,7 +447,7 @@ static struct playback_stream* playback_stream_new(
         start_index = 0;
     }
     
-    silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat);
+    silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
     
     s->memblockq = pa_memblockq_new(
             start_index,
@@ -457,8 +456,7 @@ static struct playback_stream* playback_stream_new(
             pa_frame_size(ss),
             prebuf,
             minreq,
-            silence,
-            c->protocol->core->memblock_stat);
+            silence);
 
     pa_memblock_unref(silence);
     
@@ -1076,6 +1074,7 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
 static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
     struct connection *c = userdata;
     pa_tagstruct *reply;
+    const pa_mempool_stat *stat;
     assert(c && t);
 
     if (!pa_tagstruct_eof(t)) {
@@ -1085,11 +1084,13 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
 
     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
 
+    stat = pa_mempool_get_stat(c->protocol->core->mempool);
+    
     reply = reply_new(tag);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
-    pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
+    pa_tagstruct_putu32(reply, stat->n_allocated);
+    pa_tagstruct_putu32(reply, stat->allocated_size);
+    pa_tagstruct_putu32(reply, stat->n_accumulated);
+    pa_tagstruct_putu32(reply, stat->accumulated_size);
     pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
     pa_pstream_send_tagstruct(c->pstream, reply);
 }
@@ -2256,7 +2257,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
                 pa_memblock_ref(u->memchunk.memblock);
                 u->length = 0;
             } else {
-                u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
+                u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
                 u->memchunk.index = u->memchunk.length = 0;
             }
         }
@@ -2349,9 +2350,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
     c->client->userdata = c;
     c->client->owner = p->module;
     
-    c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
+    c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
     assert(c->pstream);
 
+    pa_pstream_use_shm(c->pstream, 1);
+
     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
index 3705986..924ee29 100644 (file)
@@ -128,7 +128,7 @@ static int do_read(struct connection *c) {
         }
 
     if (!c->playback.current_memblock) {
-        c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+        c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
         assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
         c->playback.memblock_index = 0;
     }
@@ -369,8 +369,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
                 pa_frame_size(&p->sample_spec),
                 (size_t) -1,
                 l/PLAYBACK_BUFFER_FRAGMENTS,
-                NULL,
-                p->core->memblock_stat);
+                NULL);
         assert(c->input_memblockq);
         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
         c->playback.fragment_size = l/10;
@@ -406,8 +405,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
                 pa_frame_size(&p->sample_spec),
                 1,
                 0,
-                NULL,
-                p->core->memblock_stat);
+                NULL);
         pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
         pa_source_notify(c->source_output->source);
     }
index 7096d65..421f5de 100644 (file)
 
 #include "pstream.h"
 
+/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
+#define PA_FLAG_SHMDATA    0x80000000LU
+#define PA_FLAG_SHMRELEASE 0x40000000LU
+#define PA_FLAG_SHMREVOKE  0xC0000000LU
+#define PA_FLAG_SHMMASK    0xFF000000LU
+#define PA_FLAG_SEEKMASK   0x000000FFLU
+
+/* The sequence descriptor header consists of 5 32bit integers: */
 enum {
     PA_PSTREAM_DESCRIPTOR_LENGTH,
     PA_PSTREAM_DESCRIPTOR_CHANNEL,
     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
-    PA_PSTREAM_DESCRIPTOR_SEEK,
+    PA_PSTREAM_DESCRIPTOR_FLAGS,
     PA_PSTREAM_DESCRIPTOR_MAX
 };
 
+/* If we have an SHM block, this info follows the descriptor */
+enum {
+    PA_PSTREAM_SHM_BLOCKID,
+    PA_PSTREAM_SHM_SHMID,
+    PA_PSTREAM_SHM_INDEX,
+    PA_PSTREAM_SHM_LENGTH,
+    PA_PSTREAM_SHM_MAX
+};
+
 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
 
 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
 
 struct item_info {
-    enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
+    enum {
+        PA_PSTREAM_ITEM_PACKET,
+        PA_PSTREAM_ITEM_MEMBLOCK,
+        PA_PSTREAM_ITEM_SHMRELEASE,
+        PA_PSTREAM_ITEM_SHMREVOKE
+    } type;
 
-    /* memblock info */
-    pa_memchunk chunk;
-    uint32_t channel;
-    int64_t offset;
-    pa_seek_mode_t seek_mode;
 
     /* packet info */
     pa_packet *packet;
@@ -78,6 +95,15 @@ struct item_info {
     int with_creds;
     pa_creds creds;
 #endif
+
+    /* memblock info */
+    pa_memchunk chunk;
+    uint32_t channel;
+    int64_t offset;
+    pa_seek_mode_t seek_mode;
+
+    /* release/revoke info */
+    uint32_t block_id;
 };
 
 struct pa_pstream {
@@ -91,20 +117,26 @@ struct pa_pstream {
     int dead;
 
     struct {
-        struct item_info* current;
         pa_pstream_descriptor descriptor;
+        struct item_info* current;
+        uint32_t shm_info[PA_PSTREAM_SHM_MAX];
         void *data;
         size_t index;
     } write;
 
     struct {
+        pa_pstream_descriptor descriptor;
         pa_memblock *memblock;
         pa_packet *packet;
-        pa_pstream_descriptor descriptor;
+        uint32_t shm_info[PA_PSTREAM_SHM_MAX];
         void *data;
         size_t index;
     } read;
 
+    int use_shm;
+    pa_memimport *import;
+    pa_memexport *export;
+
     pa_pstream_packet_cb_t recieve_packet_callback;
     void *recieve_packet_callback_userdata;
 
@@ -117,7 +149,7 @@ struct pa_pstream {
     pa_pstream_notify_cb_t die_callback;
     void *die_callback_userdata;
 
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
 #ifdef HAVE_CREDS
     pa_creds read_creds, write_creds;
@@ -178,16 +210,19 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata)
     do_something(p);
 }
 
-pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
+
+pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
     pa_pstream *p;
+    
+    assert(m);
     assert(io);
+    assert(pool);
 
     p = pa_xnew(pa_pstream, 1);
-    
     p->ref = 1;
     p->io = io;
     pa_iochannel_set_callback(io, io_callback, p);
-
     p->dead = 0;
 
     p->mainloop = m;
@@ -199,24 +234,24 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta
 
     p->write.current = NULL;
     p->write.index = 0;
-
     p->read.memblock = NULL;
     p->read.packet = NULL;
     p->read.index = 0;
 
     p->recieve_packet_callback = NULL;
     p->recieve_packet_callback_userdata = NULL;
-    
     p->recieve_memblock_callback = NULL;
     p->recieve_memblock_callback_userdata = NULL;
-
     p->drain_callback = NULL;
     p->drain_callback_userdata = NULL;
-
     p->die_callback = NULL;
     p->die_callback_userdata = NULL;
 
-    p->memblock_stat = s;
+    p->mempool = pool;
+
+    p->use_shm = 0;
+    p->export = NULL;
+    p->import = NULL; 
 
     pa_iochannel_socket_set_rcvbuf(io, 1024*8); 
     pa_iochannel_socket_set_sndbuf(io, 1024*8);
@@ -235,8 +270,7 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) {
     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
         assert(i->chunk.memblock);
         pa_memblock_unref(i->chunk.memblock);
-    } else {
-        assert(i->type == PA_PSTREAM_ITEM_PACKET);
+    } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
         assert(i->packet);
         pa_packet_unref(i->packet);
     }
@@ -265,16 +299,18 @@ static void pstream_free(pa_pstream *p) {
 
 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
     struct item_info *i;
-    assert(p && packet && p->ref >= 1);
+
+    assert(p);
+    assert(p->ref >= 1);
+    assert(packet);
 
     if (p->dead)
         return;
     
-/*     pa_log(__FILE__": push-packet %p", packet); */
-    
     i = pa_xnew(struct item_info, 1);
     i->type = PA_PSTREAM_ITEM_PACKET;
     i->packet = pa_packet_ref(packet);
+    
 #ifdef HAVE_CREDS
     if ((i->with_creds = !!creds))
         i->creds = *creds;
@@ -286,13 +322,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
 
 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
     struct item_info *i;
-    assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
+    
+    assert(p);
+    assert(p->ref >= 1);
+    assert(channel != (uint32_t) -1);
+    assert(chunk);
 
     if (p->dead)
         return;
-    
-/*     pa_log(__FILE__": push-memblock %p", chunk); */
-    
+
     i = pa_xnew(struct item_info, 1);
     i->type = PA_PSTREAM_ITEM_MEMBLOCK;
     i->chunk = *chunk;
@@ -309,6 +347,52 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
     p->mainloop->defer_enable(p->defer_event, 1);
 }
 
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
+    struct item_info *item;
+    pa_pstream *p = userdata;
+
+    assert(p);
+    assert(p->ref >= 1);
+    
+    if (p->dead)
+        return;
+
+/*     pa_log(__FILE__": Releasing block %u", block_id); */
+
+    item = pa_xnew(struct item_info, 1);
+    item->type = PA_PSTREAM_ITEM_SHMRELEASE;
+    item->block_id = block_id;
+#ifdef HAVE_CREDS
+    item->with_creds = 0;
+#endif
+
+    pa_queue_push(p->send_queue, item);
+    p->mainloop->defer_enable(p->defer_event, 1);
+}
+
+static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
+    struct item_info *item;
+    pa_pstream *p = userdata;
+
+    assert(p);
+    assert(p->ref >= 1);
+    
+    if (p->dead)
+        return;
+
+/*     pa_log(__FILE__": Revoking block %u", block_id); */
+    
+    item = pa_xnew(struct item_info, 1);
+    item->type = PA_PSTREAM_ITEM_SHMREVOKE;
+    item->block_id = block_id;
+#ifdef HAVE_CREDS
+    item->with_creds = 0;
+#endif
+
+    pa_queue_push(p->send_queue, item);
+    p->mainloop->defer_enable(p->defer_event, 1);
+}
+
 static void prepare_next_write_item(pa_pstream *p) {
     assert(p);
 
@@ -316,27 +400,77 @@ static void prepare_next_write_item(pa_pstream *p) {
         return;
     
     p->write.index = 0;
+    p->write.data = NULL;
+
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
     
     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
-        /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
         
         assert(p->write.current->packet);
         p->write.data = p->write.current->packet->data;
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
 
+    } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
+
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
+
+    } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
+
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
         
     } else {
-        assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
-        p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+        uint32_t flags;
+        int send_payload = 1;
+        
+        assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
+        assert(p->write.current->chunk.memblock);
+        
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
-        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
+
+        flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
+
+        if (p->use_shm) {
+            uint32_t block_id, shm_id;
+            size_t offset, length;
+
+            assert(p->export);
+
+            if (pa_memexport_put(p->export,
+                                 p->write.current->chunk.memblock,
+                                 &block_id,
+                                 &shm_id,
+                                 &offset,
+                                 &length) >= 0) {
+                
+                flags |= PA_FLAG_SHMDATA;
+                send_payload = 0;
+                
+                p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+                p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+                p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+                p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+                
+                p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
+                p->write.data = p->write.shm_info;
+            }
+/*             else */
+/*                 pa_log_warn(__FILE__": Failed to export memory block."); */
+        }
+
+        if (send_payload) {
+            p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+            p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
+        }
+        
+        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
     }
 
 #ifdef HAVE_CREDS
@@ -344,7 +478,6 @@ static void prepare_next_write_item(pa_pstream *p) {
         p->write_creds = p->write.current->creds;
     
 #endif
-
 }
 
 static int do_write(pa_pstream *p) {
@@ -359,16 +492,18 @@ static int do_write(pa_pstream *p) {
     if (!p->write.current)
         return 0;
 
-    assert(p->write.data);
-
     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
         d = (uint8_t*) p->write.descriptor + p->write.index;
         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
     } else {
+        assert(p->write.data);
+    
         d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
     }
 
+    assert(l > 0);
+        
 #ifdef HAVE_CREDS
     if (p->send_creds_now) {
 
@@ -384,7 +519,7 @@ static int do_write(pa_pstream *p) {
 
     p->write.index += r;
 
-    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
+    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
         assert(p->write.current);
         item_free(p->write.current, (void *) 1);
         p->write.current = NULL;
@@ -428,27 +563,87 @@ static int do_read(pa_pstream *p) {
     p->read.index += r;
 
     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+        uint32_t flags, length, channel;
         /* Reading of frame descriptor complete */
 
-        /* Frame size too large */
-        if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
-            pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
+        flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+
+        if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
+            pa_log_warn(__FILE__": Recieved SHM frame on a socket where SHM is disabled.");
+            return -1;
+        }
+        
+        if (flags == PA_FLAG_SHMRELEASE) {
+
+            /* This is a SHM memblock release frame with no payload */
+
+/*             pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+            
+            assert(p->export);
+            pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+            goto frame_done;
+            
+        } else if (flags == PA_FLAG_SHMREVOKE) {
+
+            /* This is a SHM memblock revoke frame with no payload */
+
+/*             pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+            assert(p->import);
+            pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+            goto frame_done;
+        }
+
+        length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+        
+        if (length > FRAME_SIZE_MAX) {
+            pa_log_warn(__FILE__": Recieved invalid frame size : %lu", (unsigned long) length);
             return -1;
         }
         
         assert(!p->read.packet && !p->read.memblock);
 
-        if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
+        channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+        
+        if (channel == (uint32_t) -1) {
+
+            if (flags != 0) {
+                pa_log_warn(__FILE__": Received packet frame with invalid flags value.");
+                return -1;
+            }
+            
             /* Frame is a packet frame */
-            p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
+            p->read.packet = pa_packet_new(length);
             p->read.data = p->read.packet->data;
+            
         } else {
-            /* Frame is a memblock frame */
-            p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
-            p->read.data = p->read.memblock->data;
 
-            if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
-                pa_log_warn(__FILE__": Invalid seek mode");
+            if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
+                pa_log_warn(__FILE__": Received memblock frame with invalid seek mode.");
+                return -1;
+            }
+            
+            if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+
+                if (length != sizeof(p->read.shm_info)) {
+                    pa_log_warn(__FILE__": Recieved SHM memblock frame with Invalid frame length.");
+                    return -1;
+                }
+            
+                /* Frame is a memblock frame referencing an SHM memblock */
+                p->read.data = p->read.shm_info;
+
+            } else if ((flags & PA_FLAG_SHMMASK) == 0) {
+
+                /* Frame is a memblock frame */
+                
+                p->read.memblock = pa_memblock_new(p->mempool, length);
+                p->read.data = p->read.memblock->data;
+            } else {
+                
+                pa_log_warn(__FILE__": Recieved memblock frame with invalid flags value.");
                 return -1;
             }
         }
@@ -456,7 +651,9 @@ static int do_read(pa_pstream *p) {
     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
         /* Frame payload available */
         
-        if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
+        if (p->read.memblock && p->recieve_memblock_callback) {
+
+            /* Is this memblock data? Than pass it to the user */
             l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
                 
             if (l > 0) {
@@ -477,13 +674,13 @@ static int do_read(pa_pstream *p) {
                         p,
                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
                         offset,
-                        ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
+                        ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
                         &chunk,
                         p->recieve_memblock_callback_userdata);
                 }
 
                 /* Drop seek info for following callbacks */
-                p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
+                p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
             }
@@ -491,13 +688,13 @@ static int do_read(pa_pstream *p) {
 
         /* Frame complete */
         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+            
             if (p->read.memblock) {
-                assert(!p->read.packet);
-                
+
+                /* This was a memblock frame. We can unref the memblock now */
                 pa_memblock_unref(p->read.memblock);
-                p->read.memblock = NULL;
-            } else {
-                assert(p->read.packet);
+
+            } else if (p->read.packet) {
                 
                 if (p->recieve_packet_callback)
 #ifdef HAVE_CREDS
@@ -507,17 +704,63 @@ static int do_read(pa_pstream *p) {
 #endif
 
                 pa_packet_unref(p->read.packet);
-                p->read.packet = NULL;
+            } else {
+                pa_memblock *b;
+                
+                assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+
+                assert(p->import);
+
+                if (!(b = pa_memimport_get(p->import,
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
+                                          ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+
+                    pa_log_warn(__FILE__": Failed to import memory block.");
+                    return -1;
+                }
+
+                if (p->recieve_memblock_callback) {
+                    int64_t offset;
+                    pa_memchunk chunk;
+                    
+                    chunk.memblock = b;
+                    chunk.index = 0;
+                    chunk.length = b->length;
+
+                    offset = (int64_t) (
+                            (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+                            (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+                    
+                    p->recieve_memblock_callback(
+                            p,
+                            ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+                            offset,
+                            ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+                            &chunk,
+                            p->recieve_memblock_callback_userdata);
+                }
+
+                pa_memblock_unref(b);
             }
 
-            p->read.index = 0;
-#ifdef HAVE_CREDS
-            p->read_creds_valid = 0;
-#endif
+            goto frame_done;
         }
     }
 
-    return 0;   
+    return 0;
+
+frame_done:
+    p->read.memblock = NULL;
+    p->read.packet = NULL;
+    p->read.index = 0;
+
+#ifdef HAVE_CREDS
+    p->read_creds_valid = 0;
+#endif
+
+    return 0;
 }
 
 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
@@ -583,6 +826,16 @@ void pa_pstream_close(pa_pstream *p) {
 
     p->dead = 1;
 
+    if (p->import) {
+        pa_memimport_free(p->import);
+        p->import = NULL;
+    }
+
+    if (p->export) {
+        pa_memexport_free(p->export);
+        p->export = NULL;
+    }
+
     if (p->io) {
         pa_iochannel_free(p->io);
         p->io = NULL;
@@ -597,4 +850,19 @@ void pa_pstream_close(pa_pstream *p) {
     p->drain_callback = NULL;
     p->recieve_packet_callback = NULL;
     p->recieve_memblock_callback = NULL;
+
+
+}
+
+void pa_pstream_use_shm(pa_pstream *p, int enable) {
+    assert(p);
+    assert(p->ref >= 1);
+
+    p->use_shm = enable;
+
+    if (!p->import)
+        p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
+
+    if (!p->export)
+        p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
 }
index 789e40b..26bb769 100644 (file)
@@ -39,7 +39,7 @@ typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const p
 typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata);
 typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata);
 
-pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s);
+pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *p);
 void pa_pstream_unref(pa_pstream*p);
 pa_pstream* pa_pstream_ref(pa_pstream*p);
 
@@ -54,6 +54,8 @@ void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void
 
 int pa_pstream_is_pending(pa_pstream *p);
 
+void pa_pstream_use_shm(pa_pstream *p, int enable);
+
 void pa_pstream_close(pa_pstream *p);
 
 #endif
index 23cdf38..7422671 100644 (file)
@@ -42,7 +42,7 @@ struct pa_resampler {
     pa_sample_spec i_ss, o_ss;
     pa_channel_map i_cm, o_cm;
     size_t i_fz, o_fz;
-    pa_memblock_stat *memblock_stat;
+    pa_mempool *mempool;
 
     void (*impl_free)(pa_resampler *r);
     void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate);
@@ -71,15 +71,16 @@ static int libsamplerate_init(pa_resampler*r);
 static int trivial_init(pa_resampler*r);
 
 pa_resampler* pa_resampler_new(
-    const pa_sample_spec *a,
-    const pa_channel_map *am,
-    const pa_sample_spec *b,
-    const pa_channel_map *bm,
-    pa_memblock_stat *s,
-    pa_resample_method_t resample_method) {
+        pa_mempool *pool,
+        const pa_sample_spec *a,
+        const pa_channel_map *am,
+        const pa_sample_spec *b,
+        const pa_channel_map *bm,
+        pa_resample_method_t resample_method) {
     
     pa_resampler *r = NULL;
 
+    assert(pool);
     assert(a);
     assert(b);
     assert(pa_sample_spec_valid(a));
@@ -88,7 +89,7 @@ pa_resampler* pa_resampler_new(
 
     r = pa_xnew(pa_resampler, 1);
     r->impl_data = NULL;
-    r->memblock_stat = s;
+    r->mempool = pool;
     r->resample_method = resample_method;
 
     r->impl_free = NULL;
@@ -450,7 +451,7 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun
             assert(p);
 
             /* Take the existing buffer and make it a memblock */
-            out->memblock = pa_memblock_new_dynamic(*p, out->length, r->memblock_stat);
+            out->memblock = pa_memblock_new_malloced(r->mempool, *p, out->length);
             *p = NULL;
         }
     } else {
@@ -549,7 +550,7 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out
         l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz;
         
         out->index = 0;
-        out->memblock = pa_memblock_new(l, r->memblock_stat);
+        out->memblock = pa_memblock_new(r->mempool, l);
         
         for (o_index = 0;; o_index++, u->o_counter++) {
             unsigned j;
index c1199e5..327e24a 100644 (file)
@@ -43,12 +43,12 @@ typedef enum pa_resample_method {
 } pa_resample_method_t;
 
 pa_resampler* pa_resampler_new(
-    const pa_sample_spec *a,
-    const pa_channel_map *am,
-    const pa_sample_spec *b,
-    const pa_channel_map *bm,
-    pa_memblock_stat *s,
-    pa_resample_method_t resample_method);
+        pa_mempool *pool,
+        const pa_sample_spec *a,
+        const pa_channel_map *am,
+        const pa_sample_spec *b,
+        const pa_channel_map *bm,
+        pa_resample_method_t resample_method);
 
 void pa_resampler_free(pa_resampler *r);
 
index 638f806..7f5d8a0 100644 (file)
 #include "sample-util.h"
 #include "endianmacros.h"
 
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s) {
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length) {
+    assert(pool);
     assert(spec);
 
     if (length == 0)
-        length = pa_bytes_per_second(spec)/10; /* 100 ms */
+        length = pa_bytes_per_second(spec)/20; /* 50 ms */
 
-    return pa_silence_memblock(pa_memblock_new(length, s), spec);
+    return pa_silence_memblock(pa_memblock_new(pool, length), spec);
 }
 
 pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) {
index 3ebb7e2..6b77079 100644 (file)
@@ -28,7 +28,7 @@
 #include <pulsecore/memchunk.h>
 
 pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec);
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s);
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length);
 void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec);
 void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec);
 
index b3fabad..b5ba9df 100644 (file)
@@ -136,9 +136,9 @@ pa_sink_input* pa_sink_input_new(
         !pa_channel_map_equal(&data->channel_map, &data->sink->channel_map))
         
         if (!(resampler = pa_resampler_new(
+                      core->mempool, 
                       &data->sample_spec, &data->channel_map,
                       &data->sink->sample_spec, &data->sink->channel_map,
-                      core->memblock_stat,
                       data->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return NULL;
@@ -299,7 +299,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
          * while until the old sink has drained its playback buffer */
         
         if (!i->silence_memblock)
-            i->silence_memblock = pa_silence_memblock_new(&i->sink->sample_spec, SILENCE_BUFFER_LENGTH, i->sink->core->memblock_stat);
+            i->silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH);
 
         chunk->memblock = pa_memblock_ref(i->silence_memblock);
         chunk->index = 0;
@@ -338,7 +338,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
 
         /* It might be necessary to adjust the volume here */
         if (do_volume_adj_here && !volume_is_norm) {
-            pa_memchunk_make_writable(&tchunk, i->sink->core->memblock_stat, 0);
+            pa_memchunk_make_writable(&tchunk, 0);
             pa_volume_memchunk(&tchunk, &i->sample_spec, &i->volume);
         }
 
@@ -540,9 +540,9 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
         /* Okey, we need a new resampler for the new sink */
         
         if (!(new_resampler = pa_resampler_new(
+                      dest->core->mempool,
                       &i->sample_spec, &i->channel_map,
                       &dest->sample_spec, &dest->channel_map,
-                      dest->core->memblock_stat,
                       i->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return -1;
@@ -553,7 +553,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
         pa_usec_t old_latency, new_latency;
         pa_usec_t silence_usec = 0;
 
-        buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL, NULL);
+        buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL);
         
         /* Let's do a little bit of Voodoo for compensating latency
          * differences */
@@ -599,7 +599,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
                 chunk.length = n;
 
                 if (!volume_is_norm) {
-                    pa_memchunk_make_writable(&chunk, origin->core->memblock_stat, 0);
+                    pa_memchunk_make_writable(&chunk, 0);
                     pa_volume_memchunk(&chunk, &origin->sample_spec, &volume);
                 }
 
index 557d5ef..aacb89f 100644 (file)
@@ -298,14 +298,14 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
         pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume);
         
         if (s->sw_muted || !pa_cvolume_is_norm(&volume)) {
-            pa_memchunk_make_writable(result, s->core->memblock_stat, 0);
+            pa_memchunk_make_writable(result, 0);
             if (s->sw_muted)
                 pa_silence_memchunk(result, &s->sample_spec);
             else
                 pa_volume_memchunk(result, &s->sample_spec, &volume);
         }
     } else {
-        result->memblock = pa_memblock_new(length, s->core->memblock_stat);
+        result->memblock = pa_memblock_new(s->core->mempool, length);
         assert(result->memblock);
 
 /*          pa_log("mixing %i", n);  */
@@ -429,7 +429,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result) {
 
     /*** This needs optimization ***/
     
-    result->memblock = pa_memblock_new(result->length = length, s->core->memblock_stat);
+    result->memblock = pa_memblock_new(s->core->mempool, result->length = length);
     result->index = 0;
 
     pa_sink_render_into_full(s, result);
index 6063b93..6782f50 100644 (file)
@@ -75,7 +75,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
         uint32_t fs = pa_frame_size(&i->sample_spec);
         sf_count_t n;
 
-        u->memchunk.memblock = pa_memblock_new(BUF_SIZE, i->sink->core->memblock_stat);
+        u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
         u->memchunk.index = 0;
 
         if (u->readf_function) {
index d11d4b9..256cce4 100644 (file)
@@ -34,7 +34,7 @@
 #include "sound-file.h"
 #include "core-scache.h"
 
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s) {
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk) {
     SNDFILE*sf = NULL;
     SF_INFO sfinfo;
     int ret = -1;
@@ -92,7 +92,7 @@ int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *ma
         goto finish;
     }
 
-    chunk->memblock = pa_memblock_new(l, s);
+    chunk->memblock = pa_memblock_new(pool, l);
     assert(chunk->memblock);
     chunk->index = 0;
     chunk->length = l;
index 0b81d97..7e3c82e 100644 (file)
@@ -26,7 +26,7 @@
 #include <pulse/channelmap.h>
 #include <pulsecore/memchunk.h>
 
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s);
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk);
 
 int pa_sound_file_too_big_to_cache(const char *fname);
 
index 7371474..f9d66f6 100644 (file)
@@ -115,9 +115,9 @@ pa_source_output* pa_source_output_new(
     if (!pa_sample_spec_equal(&data->sample_spec, &data->source->sample_spec) ||
         !pa_channel_map_equal(&data->channel_map, &data->source->channel_map))
         if (!(resampler = pa_resampler_new(
+                      core->mempool,
                       &data->source->sample_spec, &data->source->channel_map,
                       &data->sample_spec, &data->channel_map,
-                      core->memblock_stat,
                       data->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return NULL;
@@ -330,9 +330,9 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
         /* Okey, we need a new resampler for the new sink */
         
         if (!(new_resampler = pa_resampler_new(
+                      dest->core->mempool,
                       &dest->sample_spec, &dest->channel_map,
                       &o->sample_spec, &o->channel_map,
-                      dest->core->memblock_stat,
                       o->resample_method))) {
             pa_log_warn(__FILE__": Unsupported resampling operation.");
             return -1;
index 0d55da4..cb5b103 100644 (file)
@@ -211,7 +211,7 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
         pa_memchunk vchunk = *chunk;
         
         pa_memblock_ref(vchunk.memblock);
-        pa_memchunk_make_writable(&vchunk, s->core->memblock_stat, 0);
+        pa_memchunk_make_writable(&vchunk, 0);
         if (s->sw_muted)
             pa_silence_memchunk(&vchunk, &s->sample_spec);
         else