Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / memblock.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2004-2006 Lennart Poettering
7   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as
11   published by the Free Software Foundation; either version 2.1 of the
12   License, or (at your option) any later version.
13
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   Lesser General Public License for more details.
18
19   You should have received a copy of the GNU Lesser General Public
20   License along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <unistd.h>
34
35 #include <pulse/xmalloc.h>
36 #include <pulse/def.h>
37
38 #include <pulsecore/shm.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/hashmap.h>
41 #include <pulsecore/semaphore.h>
42 #include <pulsecore/macro.h>
43 #include <pulsecore/flist.h>
44
45 #include "memblock.h"
46
47 #define PA_MEMPOOL_SLOTS_MAX 128
48 #define PA_MEMPOOL_SLOT_SIZE (16*1024)
49
50 #define PA_MEMEXPORT_SLOTS_MAX 128
51
52 #define PA_MEMIMPORT_SLOTS_MAX 128
53 #define PA_MEMIMPORT_SEGMENTS_MAX 16
54
55 struct pa_memblock {
56     PA_REFCNT_DECLARE; /* the reference counter */
57     pa_mempool *pool;
58
59     pa_memblock_type_t type;
60     int read_only; /* boolean */
61
62     pa_atomic_ptr_t data;
63     size_t length;
64
65     pa_atomic_t n_acquired;
66     pa_atomic_t please_signal;
67
68     union {
69         struct {
70             /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
71             pa_free_cb_t free_cb;
72         } user;
73
74         struct  {
75             uint32_t id;
76             pa_memimport_segment *segment;
77         } imported;
78     } per_type;
79 };
80
81 struct pa_memimport_segment {
82     pa_memimport *import;
83     pa_shm memory;
84     unsigned n_blocks;
85 };
86
87 struct pa_memimport {
88     pa_mutex *mutex;
89
90     pa_mempool *pool;
91     pa_hashmap *segments;
92     pa_hashmap *blocks;
93
94     /* Called whenever an imported memory block is no longer
95      * needed. */
96     pa_memimport_release_cb_t release_cb;
97     void *userdata;
98
99     PA_LLIST_FIELDS(pa_memimport);
100 };
101
102 struct memexport_slot {
103     PA_LLIST_FIELDS(struct memexport_slot);
104     pa_memblock *block;
105 };
106
107 struct pa_memexport {
108     pa_mutex *mutex;
109     pa_mempool *pool;
110
111     struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
112
113     PA_LLIST_HEAD(struct memexport_slot, free_slots);
114     PA_LLIST_HEAD(struct memexport_slot, used_slots);
115     unsigned n_init;
116
117     /* Called whenever a client from which we imported a memory block
118        which we in turn exported to another client dies and we need to
119        revoke the memory block accordingly */
120     pa_memexport_revoke_cb_t revoke_cb;
121     void *userdata;
122
123     PA_LLIST_FIELDS(pa_memexport);
124 };
125
126 struct mempool_slot {
127     PA_LLIST_FIELDS(struct mempool_slot);
128     /* the actual data follows immediately hereafter */
129 };
130
131 struct pa_mempool {
132     pa_semaphore *semaphore;
133     pa_mutex *mutex;
134
135     pa_shm memory;
136     size_t block_size;
137     unsigned n_blocks;
138
139     pa_atomic_t n_init;
140
141     PA_LLIST_HEAD(pa_memimport, imports);
142     PA_LLIST_HEAD(pa_memexport, exports);
143
144     /* A list of free slots that may be reused */
145     pa_flist *free_slots;
146
147     pa_mempool_stat stat;
148 };
149
150 static void segment_detach(pa_memimport_segment *seg);
151
152 /* No lock necessary */
153 static void stat_add(pa_memblock*b) {
154     assert(b);
155     assert(b->pool);
156
157     pa_atomic_inc(&b->pool->stat.n_allocated);
158     pa_atomic_add(&b->pool->stat.allocated_size, b->length);
159
160     pa_atomic_inc(&b->pool->stat.n_accumulated);
161     pa_atomic_add(&b->pool->stat.accumulated_size, b->length);
162
163     if (b->type == PA_MEMBLOCK_IMPORTED) {
164         pa_atomic_inc(&b->pool->stat.n_imported);
165         pa_atomic_add(&b->pool->stat.imported_size, b->length);
166     }
167
168     pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
169     pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
170 }
171
172 /* No lock necessary */
173 static void stat_remove(pa_memblock *b) {
174     assert(b);
175     assert(b->pool);
176
177     assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
178     assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
179
180     pa_atomic_dec(&b->pool->stat.n_allocated);
181     pa_atomic_sub(&b->pool->stat.allocated_size,  b->length);
182
183     if (b->type == PA_MEMBLOCK_IMPORTED) {
184         assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
185         assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
186
187         pa_atomic_dec(&b->pool->stat.n_imported);
188         pa_atomic_sub(&b->pool->stat.imported_size, b->length);
189     }
190
191     pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
192 }
193
194 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
195
196 /* No lock necessary */
197 pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
198     pa_memblock *b;
199
200     assert(p);
201     assert(length > 0);
202
203     if (!(b = pa_memblock_new_pool(p, length)))
204         b = memblock_new_appended(p, length);
205
206     return b;
207 }
208
209 /* No lock necessary */
210 static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
211     pa_memblock *b;
212
213     assert(p);
214     assert(length > 0);
215
216     b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
217     PA_REFCNT_INIT(b);
218     b->pool = p;
219     b->type = PA_MEMBLOCK_APPENDED;
220     b->read_only = 0;
221     pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
222     b->length = length;
223     pa_atomic_store(&b->n_acquired, 0);
224     pa_atomic_store(&b->please_signal, 0);
225
226     stat_add(b);
227     return b;
228 }
229
230 /* No lock necessary */
231 static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
232     struct mempool_slot *slot;
233     assert(p);
234
235     if (!(slot = pa_flist_pop(p->free_slots))) {
236         int idx;
237
238         /* The free list was empty, we have to allocate a new entry */
239
240         if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
241             pa_atomic_dec(&p->n_init);
242         else
243             slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
244
245         if (!slot) {
246             pa_log_debug("Pool full");
247             pa_atomic_inc(&p->stat.n_pool_full);
248             return NULL;
249         }
250     }
251
252     return slot;
253 }
254
255 /* No lock necessary */
256 static void* mempool_slot_data(struct mempool_slot *slot) {
257     assert(slot);
258
259     return (uint8_t*) slot + sizeof(struct mempool_slot);
260 }
261
262 /* No lock necessary */
263 static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
264     assert(p);
265
266     assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
267     assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
268
269     return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
270 }
271
272 /* No lock necessary */
273 static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
274     unsigned idx;
275
276     if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
277         return NULL;
278
279     return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
280 }
281
282 /* No lock necessary */
283 pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
284     pa_memblock *b = NULL;
285     struct mempool_slot *slot;
286
287     assert(p);
288     assert(length > 0);
289
290     if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
291
292         if (!(slot = mempool_allocate_slot(p)))
293             return NULL;
294
295         b = mempool_slot_data(slot);
296         b->type = PA_MEMBLOCK_POOL;
297         pa_atomic_ptr_store(&b->data, (uint8_t*) b + sizeof(pa_memblock));
298
299     } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
300
301         if (!(slot = mempool_allocate_slot(p)))
302             return NULL;
303
304         b = pa_xnew(pa_memblock, 1);
305         b->type = PA_MEMBLOCK_POOL_EXTERNAL;
306         pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
307
308     } else {
309         pa_log_debug("Memory block too large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
310         pa_atomic_inc(&p->stat.n_too_large_for_pool);
311         return NULL;
312     }
313
314     PA_REFCNT_INIT(b);
315     b->pool = p;
316     b->read_only = 0;
317     b->length = length;
318     pa_atomic_store(&b->n_acquired, 0);
319     pa_atomic_store(&b->please_signal, 0);
320
321     stat_add(b);
322     return b;
323 }
324
325 /* No lock necessary */
326 pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
327     pa_memblock *b;
328
329     assert(p);
330     assert(d);
331     assert(length > 0);
332
333     b = pa_xnew(pa_memblock, 1);
334     PA_REFCNT_INIT(b);
335     b->pool = p;
336     b->type = PA_MEMBLOCK_FIXED;
337     b->read_only = read_only;
338     pa_atomic_ptr_store(&b->data, d);
339     b->length = length;
340     pa_atomic_store(&b->n_acquired, 0);
341     pa_atomic_store(&b->please_signal, 0);
342
343     stat_add(b);
344     return b;
345 }
346
347 /* No lock necessary */
348 pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
349     pa_memblock *b;
350
351     assert(p);
352     assert(d);
353     assert(length > 0);
354     assert(free_cb);
355
356     b = pa_xnew(pa_memblock, 1);
357     PA_REFCNT_INIT(b);
358     b->pool = p;
359     b->type = PA_MEMBLOCK_USER;
360     b->read_only = read_only;
361     pa_atomic_ptr_store(&b->data, d);
362     b->length = length;
363     pa_atomic_store(&b->n_acquired, 0);
364     pa_atomic_store(&b->please_signal, 0);
365
366     b->per_type.user.free_cb = free_cb;
367
368     stat_add(b);
369     return b;
370 }
371
372 /* No lock necessary */
373 int pa_memblock_is_read_only(pa_memblock *b) {
374     assert(b);
375     assert(PA_REFCNT_VALUE(b) > 0);
376
377     return b->read_only && PA_REFCNT_VALUE(b) == 1;
378 }
379
380 /* No lock necessary */
381 void* pa_memblock_acquire(pa_memblock *b) {
382     assert(b);
383     assert(PA_REFCNT_VALUE(b) > 0);
384
385     pa_atomic_inc(&b->n_acquired);
386
387     return pa_atomic_ptr_load(&b->data);
388 }
389
390 /* No lock necessary, in corner cases locks by its own */
391 void pa_memblock_release(pa_memblock *b) {
392     int r;
393     assert(b);
394     assert(PA_REFCNT_VALUE(b) > 0);
395
396     r = pa_atomic_dec(&b->n_acquired);
397     assert(r >= 1);
398
399     /* Signal a waiting thread that this memblock is no longer used */
400     if (r == 1 && pa_atomic_load(&b->please_signal))
401         pa_semaphore_post(b->pool->semaphore);
402 }
403
404 size_t pa_memblock_get_length(pa_memblock *b) {
405     assert(b);
406     assert(PA_REFCNT_VALUE(b) > 0);
407
408     return b->length;
409 }
410
411 pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
412     assert(b);
413     assert(PA_REFCNT_VALUE(b) > 0);
414
415     return b->pool;
416 }
417
418 /* No lock necessary */
419 pa_memblock* pa_memblock_ref(pa_memblock*b) {
420     assert(b);
421     assert(PA_REFCNT_VALUE(b) > 0);
422
423     PA_REFCNT_INC(b);
424     return b;
425 }
426
427 static void memblock_free(pa_memblock *b) {
428     assert(b);
429
430     assert(pa_atomic_load(&b->n_acquired) == 0);
431
432     stat_remove(b);
433
434     switch (b->type) {
435         case PA_MEMBLOCK_USER :
436             assert(b->per_type.user.free_cb);
437             b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
438
439             /* Fall through */
440
441         case PA_MEMBLOCK_FIXED:
442         case PA_MEMBLOCK_APPENDED :
443             pa_xfree(b);
444             break;
445
446         case PA_MEMBLOCK_IMPORTED : {
447             pa_memimport_segment *segment;
448             pa_memimport *import;
449
450             /* FIXME! This should be implemented lock-free */
451
452             segment = b->per_type.imported.segment;
453             assert(segment);
454             import = segment->import;
455             assert(import);
456
457             pa_mutex_lock(import->mutex);
458             pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
459             if (-- segment->n_blocks <= 0)
460                 segment_detach(segment);
461
462             pa_mutex_unlock(import->mutex);
463
464             import->release_cb(import, b->per_type.imported.id, import->userdata);
465
466             pa_xfree(b);
467             break;
468         }
469
470         case PA_MEMBLOCK_POOL_EXTERNAL:
471         case PA_MEMBLOCK_POOL: {
472             struct mempool_slot *slot;
473             int call_free;
474
475             slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
476             assert(slot);
477
478             call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
479
480             /* The free list dimensions should easily allow all slots
481              * to fit in, hence try harder if pushing this slot into
482              * the free list fails */
483             while (pa_flist_push(b->pool->free_slots, slot) < 0)
484                 ;
485
486             if (call_free)
487                 pa_xfree(b);
488
489             break;
490         }
491
492         case PA_MEMBLOCK_TYPE_MAX:
493         default:
494             abort();
495     }
496 }
497
498 /* No lock necessary */
499 void pa_memblock_unref(pa_memblock*b) {
500     assert(b);
501     assert(PA_REFCNT_VALUE(b) > 0);
502
503     if (PA_REFCNT_DEC(b) > 0)
504         return;
505
506     memblock_free(b);
507 }
508
509 /* Self locked */
510 static void memblock_wait(pa_memblock *b) {
511     assert(b);
512
513     if (pa_atomic_load(&b->n_acquired) > 0) {
514         /* We need to wait until all threads gave up access to the
515          * memory block before we can go on. Unfortunately this means
516          * that we have to lock and wait here. Sniff! */
517
518         pa_atomic_inc(&b->please_signal);
519
520         while (pa_atomic_load(&b->n_acquired) > 0)
521             pa_semaphore_wait(b->pool->semaphore);
522
523         pa_atomic_dec(&b->please_signal);
524     }
525 }
526
527 /* No lock necessary. This function is not multiple caller safe! */
528 static void memblock_make_local(pa_memblock *b) {
529     assert(b);
530
531     pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
532
533     if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
534         struct mempool_slot *slot;
535
536         if ((slot = mempool_allocate_slot(b->pool))) {
537             void *new_data;
538             /* We can move it into a local pool, perfect! */
539
540             new_data = mempool_slot_data(slot);
541             memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
542             pa_atomic_ptr_store(&b->data, new_data);
543
544             b->type = PA_MEMBLOCK_POOL_EXTERNAL;
545             b->read_only = 0;
546
547             goto finish;
548         }
549     }
550
551     /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
552     b->per_type.user.free_cb = pa_xfree;
553     pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
554
555     b->type = PA_MEMBLOCK_USER;
556     b->read_only = 0;
557
558 finish:
559     pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
560     pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
561     memblock_wait(b);
562 }
563
564 /* No lock necessary. This function is not multiple caller safe*/
565 void pa_memblock_unref_fixed(pa_memblock *b) {
566     assert(b);
567     assert(PA_REFCNT_VALUE(b) > 0);
568     assert(b->type == PA_MEMBLOCK_FIXED);
569
570     if (PA_REFCNT_DEC(b) > 0)
571         memblock_make_local(b);
572     else
573         memblock_free(b);
574 }
575
576 /* Self-locked. This function is not multiple-caller safe */
577 static void memblock_replace_import(pa_memblock *b) {
578     pa_memimport_segment *seg;
579
580     assert(b);
581     assert(b->type == PA_MEMBLOCK_IMPORTED);
582
583     assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
584     assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
585     pa_atomic_dec(&b->pool->stat.n_imported);
586     pa_atomic_sub(&b->pool->stat.imported_size, b->length);
587
588     seg = b->per_type.imported.segment;
589     assert(seg);
590     assert(seg->import);
591
592     pa_mutex_lock(seg->import->mutex);
593
594     pa_hashmap_remove(
595             seg->import->blocks,
596             PA_UINT32_TO_PTR(b->per_type.imported.id));
597
598     memblock_make_local(b);
599
600     if (-- seg->n_blocks <= 0)
601         segment_detach(seg);
602
603     pa_mutex_unlock(seg->import->mutex);
604 }
605
606 pa_mempool* pa_mempool_new(int shared) {
607     size_t ps;
608     pa_mempool *p;
609
610     p = pa_xnew(pa_mempool, 1);
611
612     p->mutex = pa_mutex_new(1);
613     p->semaphore = pa_semaphore_new(0);
614
615 #ifdef HAVE_SYSCONF
616     ps = (size_t) sysconf(_SC_PAGESIZE);
617 #elif defined(PAGE_SIZE)
618     ps = (size_t) PAGE_SIZE;
619 #else
620     ps = 4096; /* Let's hope it's like x86. */
621 #endif
622
623     p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
624
625     if (p->block_size < ps)
626         p->block_size = ps;
627
628     p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
629
630     assert(p->block_size > sizeof(struct mempool_slot));
631
632     if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
633         pa_xfree(p);
634         return NULL;
635     }
636
637     memset(&p->stat, 0, sizeof(p->stat));
638     pa_atomic_store(&p->n_init, 0);
639
640     PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
641     PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
642
643     p->free_slots = pa_flist_new(p->n_blocks*2);
644
645     return p;
646 }
647
648 void pa_mempool_free(pa_mempool *p) {
649     assert(p);
650
651     pa_mutex_lock(p->mutex);
652
653     while (p->imports)
654         pa_memimport_free(p->imports);
655
656     while (p->exports)
657         pa_memexport_free(p->exports);
658
659     pa_mutex_unlock(p->mutex);
660
661     if (pa_atomic_load(&p->stat.n_allocated) > 0)
662         pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!");
663
664     pa_flist_free(p->free_slots, NULL);
665     pa_shm_free(&p->memory);
666
667     pa_mutex_free(p->mutex);
668     pa_semaphore_free(p->semaphore);
669
670     pa_xfree(p);
671 }
672
673 /* No lock necessary */
674 const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
675     assert(p);
676
677     return &p->stat;
678 }
679
680 /* No lock necessary */
681 void pa_mempool_vacuum(pa_mempool *p) {
682     struct mempool_slot *slot;
683     pa_flist *list;
684
685     assert(p);
686
687     list = pa_flist_new(p->n_blocks*2);
688
689     while ((slot = pa_flist_pop(p->free_slots)))
690         while (pa_flist_push(list, slot) < 0)
691             ;
692
693     while ((slot = pa_flist_pop(list))) {
694         pa_shm_punch(&p->memory,
695                      (uint8_t*) slot - (uint8_t*) p->memory.ptr + sizeof(struct mempool_slot),
696                      p->block_size - sizeof(struct mempool_slot));
697
698         while (pa_flist_push(p->free_slots, slot))
699             ;
700     }
701
702     pa_flist_free(list, NULL);
703 }
704
705 /* No lock necessary */
706 int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
707     assert(p);
708
709     if (!p->memory.shared)
710         return -1;
711
712     *id = p->memory.id;
713
714     return 0;
715 }
716
717 /* No lock necessary */
718 int pa_mempool_is_shared(pa_mempool *p) {
719     assert(p);
720
721     return !!p->memory.shared;
722 }
723
724 /* For recieving blocks from other nodes */
725 pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
726     pa_memimport *i;
727
728     assert(p);
729     assert(cb);
730
731     i = pa_xnew(pa_memimport, 1);
732     i->mutex = pa_mutex_new(0);
733     i->pool = p;
734     i->segments = pa_hashmap_new(NULL, NULL);
735     i->blocks = pa_hashmap_new(NULL, NULL);
736     i->release_cb = cb;
737     i->userdata = userdata;
738
739     pa_mutex_lock(p->mutex);
740     PA_LLIST_PREPEND(pa_memimport, p->imports, i);
741     pa_mutex_unlock(p->mutex);
742
743     return i;
744 }
745
746 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
747
748 /* Should be called locked */
749 static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
750     pa_memimport_segment* seg;
751
752     if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
753         return NULL;
754
755     seg = pa_xnew(pa_memimport_segment, 1);
756
757     if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
758         pa_xfree(seg);
759         return NULL;
760     }
761
762     seg->import = i;
763     seg->n_blocks = 0;
764
765     pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
766     return seg;
767 }
768
769 /* Should be called locked */
770 static void segment_detach(pa_memimport_segment *seg) {
771     assert(seg);
772
773     pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
774     pa_shm_free(&seg->memory);
775     pa_xfree(seg);
776 }
777
778 /* Self-locked. Not multiple-caller safe */
779 void pa_memimport_free(pa_memimport *i) {
780     pa_memexport *e;
781     pa_memblock *b;
782
783     assert(i);
784
785     pa_mutex_lock(i->mutex);
786
787     while ((b = pa_hashmap_get_first(i->blocks)))
788         memblock_replace_import(b);
789
790     assert(pa_hashmap_size(i->segments) == 0);
791
792     pa_mutex_unlock(i->mutex);
793
794     pa_mutex_lock(i->pool->mutex);
795
796     /* If we've exported this block further we need to revoke that export */
797     for (e = i->pool->exports; e; e = e->next)
798         memexport_revoke_blocks(e, i);
799
800     PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
801
802     pa_mutex_unlock(i->pool->mutex);
803
804     pa_hashmap_free(i->blocks, NULL, NULL);
805     pa_hashmap_free(i->segments, NULL, NULL);
806
807     pa_mutex_free(i->mutex);
808
809     pa_xfree(i);
810 }
811
812 /* Self-locked */
813 pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
814     pa_memblock *b = NULL;
815     pa_memimport_segment *seg;
816
817     assert(i);
818
819     pa_mutex_lock(i->mutex);
820
821     if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
822         goto finish;
823
824     if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
825         if (!(seg = segment_attach(i, shm_id)))
826             goto finish;
827
828     if (offset+size > seg->memory.size)
829         goto finish;
830
831     b = pa_xnew(pa_memblock, 1);
832     PA_REFCNT_INIT(b);
833     b->pool = i->pool;
834     b->type = PA_MEMBLOCK_IMPORTED;
835     b->read_only = 1;
836     pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
837     b->length = size;
838     pa_atomic_store(&b->n_acquired, 0);
839     pa_atomic_store(&b->please_signal, 0);
840     b->per_type.imported.id = block_id;
841     b->per_type.imported.segment = seg;
842
843     pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
844
845     seg->n_blocks++;
846
847 finish:
848     pa_mutex_unlock(i->mutex);
849
850     if (b)
851     stat_add(b);
852
853     return b;
854 }
855
856 int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
857     pa_memblock *b;
858     assert(i);
859
860     pa_mutex_lock(i->mutex);
861
862     if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
863         return -1;
864
865     memblock_replace_import(b);
866
867     pa_mutex_unlock(i->mutex);
868
869     return 0;
870 }
871
872 /* For sending blocks to other nodes */
873 pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
874     pa_memexport *e;
875
876     assert(p);
877     assert(cb);
878
879     if (!p->memory.shared)
880         return NULL;
881
882     e = pa_xnew(pa_memexport, 1);
883     e->mutex = pa_mutex_new(1);
884     e->pool = p;
885     PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
886     PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
887     e->n_init = 0;
888     e->revoke_cb = cb;
889     e->userdata = userdata;
890
891     pa_mutex_lock(p->mutex);
892     PA_LLIST_PREPEND(pa_memexport, p->exports, e);
893     pa_mutex_unlock(p->mutex);
894     return e;
895 }
896
897 void pa_memexport_free(pa_memexport *e) {
898     assert(e);
899
900     pa_mutex_lock(e->mutex);
901     while (e->used_slots)
902         pa_memexport_process_release(e, e->used_slots - e->slots);
903     pa_mutex_unlock(e->mutex);
904
905     pa_mutex_lock(e->pool->mutex);
906     PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
907     pa_mutex_unlock(e->pool->mutex);
908
909     pa_xfree(e);
910 }
911
912 /* Self-locked */
913 int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
914     pa_memblock *b;
915
916     assert(e);
917
918     pa_mutex_lock(e->mutex);
919
920     if (id >= e->n_init)
921         goto fail;
922
923     if (!e->slots[id].block)
924         goto fail;
925
926     b = e->slots[id].block;
927     e->slots[id].block = NULL;
928
929     PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
930     PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
931
932     pa_mutex_unlock(e->mutex);
933
934 /*     pa_log("Processing release for %u", id); */
935
936     assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
937     assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
938
939     pa_atomic_dec(&e->pool->stat.n_exported);
940     pa_atomic_sub(&e->pool->stat.exported_size, b->length);
941
942     pa_memblock_unref(b);
943
944     return 0;
945
946 fail:
947     pa_mutex_unlock(e->mutex);
948
949     return -1;
950 }
951
952 /* Self-locked */
953 static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
954     struct memexport_slot *slot, *next;
955     assert(e);
956     assert(i);
957
958     pa_mutex_lock(e->mutex);
959
960     for (slot = e->used_slots; slot; slot = next) {
961         uint32_t idx;
962         next = slot->next;
963
964         if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
965             slot->block->per_type.imported.segment->import != i)
966             continue;
967
968         idx = slot - e->slots;
969         e->revoke_cb(e, idx, e->userdata);
970         pa_memexport_process_release(e, idx);
971     }
972
973     pa_mutex_unlock(e->mutex);
974 }
975
976 /* No lock necessary */
977 static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
978     pa_memblock *n;
979
980     assert(p);
981     assert(b);
982
983     if (b->type == PA_MEMBLOCK_IMPORTED ||
984         b->type == PA_MEMBLOCK_POOL ||
985         b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
986         assert(b->pool == p);
987         return pa_memblock_ref(b);
988     }
989
990     if (!(n = pa_memblock_new_pool(p, b->length)))
991         return NULL;
992
993     memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
994     return n;
995 }
996
997 /* Self-locked */
998 int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
999     pa_shm *memory;
1000     struct memexport_slot *slot;
1001     void *data;
1002     size_t length;
1003
1004     assert(e);
1005     assert(b);
1006     assert(block_id);
1007     assert(shm_id);
1008     assert(offset);
1009     assert(size);
1010     assert(b->pool == e->pool);
1011
1012     if (!(b = memblock_shared_copy(e->pool, b)))
1013         return -1;
1014
1015     pa_mutex_lock(e->mutex);
1016
1017     if (e->free_slots) {
1018         slot = e->free_slots;
1019         PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
1020     } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
1021         slot = &e->slots[e->n_init++];
1022     else {
1023         pa_mutex_unlock(e->mutex);
1024         pa_memblock_unref(b);
1025         return -1;
1026     }
1027
1028     PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
1029     slot->block = b;
1030     *block_id = slot - e->slots;
1031
1032     pa_mutex_unlock(e->mutex);
1033 /*     pa_log("Got block id %u", *block_id); */
1034
1035     data = pa_memblock_acquire(b);
1036
1037     if (b->type == PA_MEMBLOCK_IMPORTED) {
1038         assert(b->per_type.imported.segment);
1039         memory = &b->per_type.imported.segment->memory;
1040     } else {
1041         assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
1042         assert(b->pool);
1043         memory = &b->pool->memory;
1044     }
1045
1046     assert(data >= memory->ptr);
1047     assert((uint8_t*) data + length <= (uint8_t*) memory->ptr + memory->size);
1048
1049     *shm_id = memory->id;
1050     *offset = (uint8_t*) data - (uint8_t*) memory->ptr;
1051     *size = length;
1052
1053     pa_memblock_release(b);
1054
1055     pa_atomic_inc(&e->pool->stat.n_exported);
1056     pa_atomic_add(&e->pool->stat.exported_size, length);
1057
1058     return 0;
1059 }