Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2004-2006 Lennart Poettering
7   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as
11   published by the Free Software Foundation; either version 2.1 of the
12   License, or (at your option) any later version.
13
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   Lesser General Public License for more details.
18
19   You should have received a copy of the GNU Lesser General Public
20   License along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <unistd.h>
33
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
42 #endif
43
44 #include "winsock.h"
45
46 #include <pulse/xmalloc.h>
47
48 #include <pulsecore/queue.h>
49 #include <pulsecore/log.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/creds.h>
52 #include <pulsecore/refcnt.h>
53
54 #include "pstream.h"
55
56 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
57 #define PA_FLAG_SHMDATA    0x80000000LU
58 #define PA_FLAG_SHMRELEASE 0x40000000LU
59 #define PA_FLAG_SHMREVOKE  0xC0000000LU
60 #define PA_FLAG_SHMMASK    0xFF000000LU
61 #define PA_FLAG_SEEKMASK   0x000000FFLU
62
63 /* The sequence descriptor header consists of 5 32bit integers: */
64 enum {
65     PA_PSTREAM_DESCRIPTOR_LENGTH,
66     PA_PSTREAM_DESCRIPTOR_CHANNEL,
67     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
68     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
69     PA_PSTREAM_DESCRIPTOR_FLAGS,
70     PA_PSTREAM_DESCRIPTOR_MAX
71 };
72
73 /* If we have an SHM block, this info follows the descriptor */
74 enum {
75     PA_PSTREAM_SHM_BLOCKID,
76     PA_PSTREAM_SHM_SHMID,
77     PA_PSTREAM_SHM_INDEX,
78     PA_PSTREAM_SHM_LENGTH,
79     PA_PSTREAM_SHM_MAX
80 };
81
82 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
83
84 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
85 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 #define FRAME_SIZE_MAX_USE (1024*64)
87
88 struct item_info {
89     enum {
90         PA_PSTREAM_ITEM_PACKET,
91         PA_PSTREAM_ITEM_MEMBLOCK,
92         PA_PSTREAM_ITEM_SHMRELEASE,
93         PA_PSTREAM_ITEM_SHMREVOKE
94     } type;
95
96
97     /* packet info */
98     pa_packet *packet;
99 #ifdef HAVE_CREDS
100     int with_creds;
101     pa_creds creds;
102 #endif
103
104     /* memblock info */
105     pa_memchunk chunk;
106     uint32_t channel;
107     int64_t offset;
108     pa_seek_mode_t seek_mode;
109
110     /* release/revoke info */
111     uint32_t block_id;
112 };
113
114 struct pa_pstream {
115     PA_REFCNT_DECLARE;
116
117     pa_mainloop_api *mainloop;
118     pa_defer_event *defer_event;
119     pa_iochannel *io;
120
121     pa_queue *send_queue;
122
123     int dead;
124
125     struct {
126         pa_pstream_descriptor descriptor;
127         struct item_info* current;
128         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
129         void *data;
130         size_t index;
131         pa_memchunk memchunk;
132     } write;
133
134     struct {
135         pa_pstream_descriptor descriptor;
136         pa_memblock *memblock;
137         pa_packet *packet;
138         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139         void *data;
140         size_t index;
141     } read;
142
143     int use_shm;
144     pa_memimport *import;
145     pa_memexport *export;
146
147     pa_pstream_packet_cb_t recieve_packet_callback;
148     void *recieve_packet_callback_userdata;
149
150     pa_pstream_memblock_cb_t recieve_memblock_callback;
151     void *recieve_memblock_callback_userdata;
152
153     pa_pstream_notify_cb_t drain_callback;
154     void *drain_callback_userdata;
155
156     pa_pstream_notify_cb_t die_callback;
157     void *die_callback_userdata;
158
159     pa_mempool *mempool;
160
161 #ifdef HAVE_CREDS
162     pa_creds read_creds, write_creds;
163     int read_creds_valid, send_creds_now;
164 #endif
165 };
166
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
169
170 static void do_something(pa_pstream *p) {
171     assert(p);
172     assert(PA_REFCNT_VALUE(p) > 0);
173
174     pa_pstream_ref(p);
175
176     p->mainloop->defer_enable(p->defer_event, 0);
177
178     if (!p->dead && pa_iochannel_is_readable(p->io)) {
179         if (do_read(p) < 0)
180             goto fail;
181     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
182         goto fail;
183
184     if (!p->dead && pa_iochannel_is_writable(p->io)) {
185         if (do_write(p) < 0)
186             goto fail;
187     }
188
189     pa_pstream_unref(p);
190     return;
191
192 fail:
193
194     p->dead = 1;
195
196     if (p->die_callback)
197         p->die_callback(p, p->die_callback_userdata);
198
199     pa_pstream_unref(p);
200 }
201
202 static void io_callback(pa_iochannel*io, void *userdata) {
203     pa_pstream *p = userdata;
204
205     assert(p);
206     assert(p->io == io);
207
208     do_something(p);
209 }
210
211 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
212
213 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
214     pa_pstream *p;
215
216     assert(m);
217     assert(io);
218     assert(pool);
219
220     p = pa_xnew(pa_pstream, 1);
221     PA_REFCNT_INIT(p);
222     p->io = io;
223     pa_iochannel_set_callback(io, io_callback, p);
224     p->dead = 0;
225
226     p->mainloop = m;
227
228     p->send_queue = pa_queue_new();
229     assert(p->send_queue);
230
231     p->write.current = NULL;
232     p->write.index = 0;
233     pa_memchunk_reset(&p->write.memchunk);
234     p->read.memblock = NULL;
235     p->read.packet = NULL;
236     p->read.index = 0;
237
238     p->recieve_packet_callback = NULL;
239     p->recieve_packet_callback_userdata = NULL;
240     p->recieve_memblock_callback = NULL;
241     p->recieve_memblock_callback_userdata = NULL;
242     p->drain_callback = NULL;
243     p->drain_callback_userdata = NULL;
244     p->die_callback = NULL;
245     p->die_callback_userdata = NULL;
246
247     p->mempool = pool;
248
249     p->use_shm = 0;
250     p->export = NULL;
251
252     /* We do importing unconditionally */
253     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
254
255     pa_iochannel_socket_set_rcvbuf(io, 1024*8);
256     pa_iochannel_socket_set_sndbuf(io, 1024*8);
257
258 #ifdef HAVE_CREDS
259     p->send_creds_now = 0;
260     p->read_creds_valid = 0;
261 #endif
262     return p;
263 }
264
265 static void item_free(void *item, PA_GCC_UNUSED void *p) {
266     struct item_info *i = item;
267     assert(i);
268
269     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
270         assert(i->chunk.memblock);
271         pa_memblock_unref(i->chunk.memblock);
272     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
273         assert(i->packet);
274         pa_packet_unref(i->packet);
275     }
276
277     pa_xfree(i);
278 }
279
280 static void pstream_free(pa_pstream *p) {
281     assert(p);
282
283     pa_pstream_close(p);
284
285     pa_queue_free(p->send_queue, item_free, NULL);
286
287     if (p->write.current)
288         item_free(p->write.current, NULL);
289
290     if (p->read.memblock)
291         pa_memblock_unref(p->read.memblock);
292
293     if (p->read.packet)
294         pa_packet_unref(p->read.packet);
295
296     if (p->write.memchunk.memblock)
297         pa_memblock_unref(p->write.memchunk.memblock);
298
299     pa_xfree(p);
300 }
301
302 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
303     struct item_info *i;
304
305     assert(p);
306     assert(PA_REFCNT_VALUE(p) > 0);
307     assert(packet);
308
309     if (p->dead)
310         return;
311
312     i = pa_xnew(struct item_info, 1);
313     i->type = PA_PSTREAM_ITEM_PACKET;
314     i->packet = pa_packet_ref(packet);
315
316 #ifdef HAVE_CREDS
317     if ((i->with_creds = !!creds))
318         i->creds = *creds;
319 #endif
320
321     pa_queue_push(p->send_queue, i);
322 }
323
324 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
325     size_t length, idx;
326
327     assert(p);
328     assert(PA_REFCNT_VALUE(p) > 0);
329     assert(channel != (uint32_t) -1);
330     assert(chunk);
331
332     if (p->dead)
333         return;
334
335     idx = 0;
336
337     while (length > 0) {
338         struct item_info *i;
339         size_t n;
340
341         i = pa_xnew(struct item_info, 1);
342         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
343
344         n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
345         i->chunk.index = chunk->index + idx;
346         i->chunk.length = n;
347         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
348
349         i->channel = channel;
350         i->offset = offset;
351         i->seek_mode = seek_mode;
352 #ifdef HAVE_CREDS
353         i->with_creds = 0;
354 #endif
355
356         pa_queue_push(p->send_queue, i);
357
358         idx += n;
359         length -= n;
360     }
361
362     p->mainloop->defer_enable(p->defer_event, 1);
363 }
364
365 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
366     struct item_info *item;
367     pa_pstream *p = userdata;
368
369     assert(p);
370     assert(PA_REFCNT_VALUE(p) > 0);
371
372     if (p->dead)
373         return;
374
375 /*     pa_log("Releasing block %u", block_id); */
376
377     item = pa_xnew(struct item_info, 1);
378     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
379     item->block_id = block_id;
380 #ifdef HAVE_CREDS
381     item->with_creds = 0;
382 #endif
383
384     pa_queue_push(p->send_queue, item);
385 }
386
387 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
388     struct item_info *item;
389     pa_pstream *p = userdata;
390
391     assert(p);
392     assert(PA_REFCNT_VALUE(p) > 0);
393
394     if (p->dead)
395         return;
396 /*     pa_log("Revoking block %u", block_id); */
397
398     item = pa_xnew(struct item_info, 1);
399     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
400     item->block_id = block_id;
401 #ifdef HAVE_CREDS
402     item->with_creds = 0;
403 #endif
404
405     pa_queue_push(p->send_queue, item);
406 }
407
408 static void prepare_next_write_item(pa_pstream *p) {
409     assert(p);
410     assert(PA_REFCNT_VALUE(p) > 0);
411
412     p->write.current = pa_queue_pop(p->send_queue);
413
414     if (!p->write.current)
415         return;
416
417     p->write.index = 0;
418     p->write.data = NULL;
419     pa_memchunk_reset(&p->write.memchunk);
420
421     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
422     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
423     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
424     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
425     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
426
427     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
428
429         assert(p->write.current->packet);
430         p->write.data = p->write.current->packet->data;
431         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
432
433     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
434
435         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
436         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
437
438     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
439
440         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
441         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
442
443     } else {
444         uint32_t flags;
445         int send_payload = 1;
446
447         assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
448         assert(p->write.current->chunk.memblock);
449
450         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
451         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
452         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
453
454         flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
455
456         if (p->use_shm) {
457             uint32_t block_id, shm_id;
458             size_t offset, length;
459
460             assert(p->export);
461
462             if (pa_memexport_put(p->export,
463                                  p->write.current->chunk.memblock,
464                                  &block_id,
465                                  &shm_id,
466                                  &offset,
467                                  &length) >= 0) {
468
469                 flags |= PA_FLAG_SHMDATA;
470                 send_payload = 0;
471
472                 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
473                 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
474                 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
475                 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
476
477                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
478                 p->write.data = p->write.shm_info;
479             }
480 /*             else */
481 /*                 pa_log_warn("Failed to export memory block."); */
482         }
483
484         if (send_payload) {
485             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
486             p->write.memchunk = p->write.current->chunk;
487             pa_memblock_ref(p->write.memchunk.memblock);
488             p->write.data = NULL;
489         }
490
491         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
492     }
493
494 #ifdef HAVE_CREDS
495     if ((p->send_creds_now = p->write.current->with_creds))
496         p->write_creds = p->write.current->creds;
497 #endif
498 }
499
500 static int do_write(pa_pstream *p) {
501     void *d;
502     size_t l;
503     ssize_t r;
504     pa_memblock *release_memblock = NULL;
505
506     assert(p);
507     assert(PA_REFCNT_VALUE(p) > 0);
508
509     if (!p->write.current)
510         prepare_next_write_item(p);
511
512     if (!p->write.current)
513         return 0;
514
515     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
516         d = (uint8_t*) p->write.descriptor + p->write.index;
517         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
518     } else {
519         assert(p->write.data || p->write.memchunk.memblock);
520
521         if (p->write.data)
522             d = p->write.data;
523         else {
524             d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
525             release_memblock = p->write.memchunk.memblock;
526         }
527
528         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
529         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
530     }
531
532     assert(l > 0);
533
534 #ifdef HAVE_CREDS
535     if (p->send_creds_now) {
536
537         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
538             goto fail;
539
540         p->send_creds_now = 0;
541     } else
542 #endif
543
544     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
545         goto fail;
546
547     if (release_memblock)
548         pa_memblock_release(release_memblock);
549
550     p->write.index += r;
551
552     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
553         assert(p->write.current);
554         item_free(p->write.current, (void *) 1);
555         p->write.current = NULL;
556
557         if (p->drain_callback && !pa_pstream_is_pending(p))
558             p->drain_callback(p, p->drain_callback_userdata);
559     }
560
561     return 0;
562
563 fail:
564
565     if (release_memblock)
566         pa_memblock_release(release_memblock);
567
568     return -1;
569 }
570
571 static int do_read(pa_pstream *p) {
572     void *d;
573     size_t l;
574     ssize_t r;
575     pa_memblock *release_memblock = NULL;
576     assert(p);
577     assert(PA_REFCNT_VALUE(p) > 0);
578
579     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
580         d = (uint8_t*) p->read.descriptor + p->read.index;
581         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
582     } else {
583         assert(p->read.data || p->read.memblock);
584
585         if (p->read.data)
586             d = p->read.data;
587         else {
588             d = pa_memblock_acquire(p->read.memblock);
589             release_memblock = p->read.memblock;
590         }
591
592         d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
593         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
594     }
595
596 #ifdef HAVE_CREDS
597     {
598         int b = 0;
599
600         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
601             goto fail;
602
603         p->read_creds_valid = p->read_creds_valid || b;
604     }
605 #else
606     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
607         goto fail;
608 #endif
609
610     if (release_memblock)
611         pa_memblock_release(release_memblock);
612
613     p->read.index += r;
614
615     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
616         uint32_t flags, length, channel;
617         /* Reading of frame descriptor complete */
618
619         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
620
621         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
622             pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
623             return -1;
624         }
625
626         if (flags == PA_FLAG_SHMRELEASE) {
627
628             /* This is a SHM memblock release frame with no payload */
629
630 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
631
632             assert(p->export);
633             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
634
635             goto frame_done;
636
637         } else if (flags == PA_FLAG_SHMREVOKE) {
638
639             /* This is a SHM memblock revoke frame with no payload */
640
641 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
642
643             assert(p->import);
644             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
645
646             goto frame_done;
647         }
648
649         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
650
651         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
652             pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
653             return -1;
654         }
655
656         assert(!p->read.packet && !p->read.memblock);
657
658         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
659
660         if (channel == (uint32_t) -1) {
661
662             if (flags != 0) {
663                 pa_log_warn("Received packet frame with invalid flags value.");
664                 return -1;
665             }
666
667             /* Frame is a packet frame */
668             p->read.packet = pa_packet_new(length);
669             p->read.data = p->read.packet->data;
670
671         } else {
672
673             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
674                 pa_log_warn("Received memblock frame with invalid seek mode.");
675                 return -1;
676             }
677
678             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
679
680                 if (length != sizeof(p->read.shm_info)) {
681                     pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
682                     return -1;
683                 }
684
685                 /* Frame is a memblock frame referencing an SHM memblock */
686                 p->read.data = p->read.shm_info;
687
688             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
689
690                 /* Frame is a memblock frame */
691
692                 p->read.memblock = pa_memblock_new(p->mempool, length);
693                 p->read.data = NULL;
694             } else {
695
696                 pa_log_warn("Recieved memblock frame with invalid flags value.");
697                 return -1;
698             }
699         }
700
701     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
702         /* Frame payload available */
703
704         if (p->read.memblock && p->recieve_memblock_callback) {
705
706             /* Is this memblock data? Than pass it to the user */
707             l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
708
709             if (l > 0) {
710                 pa_memchunk chunk;
711
712                 chunk.memblock = p->read.memblock;
713                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
714                 chunk.length = l;
715
716                 if (p->recieve_memblock_callback) {
717                     int64_t offset;
718
719                     offset = (int64_t) (
720                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
721                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
722
723                     p->recieve_memblock_callback(
724                         p,
725                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
726                         offset,
727                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
728                         &chunk,
729                         p->recieve_memblock_callback_userdata);
730                 }
731
732                 /* Drop seek info for following callbacks */
733                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
734                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
735                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
736             }
737         }
738
739         /* Frame complete */
740         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
741
742             if (p->read.memblock) {
743
744                 /* This was a memblock frame. We can unref the memblock now */
745                 pa_memblock_unref(p->read.memblock);
746
747             } else if (p->read.packet) {
748
749                 if (p->recieve_packet_callback)
750 #ifdef HAVE_CREDS
751                     p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
752 #else
753                     p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
754 #endif
755
756                 pa_packet_unref(p->read.packet);
757             } else {
758                 pa_memblock *b;
759
760                 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
761
762                 assert(p->import);
763
764                 if (!(b = pa_memimport_get(p->import,
765                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
766                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
767                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
768                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
769
770                     pa_log_warn("Failed to import memory block.");
771                     return -1;
772                 }
773
774                 if (p->recieve_memblock_callback) {
775                     int64_t offset;
776                     pa_memchunk chunk;
777
778                     chunk.memblock = b;
779                     chunk.index = 0;
780                     chunk.length = pa_memblock_get_length(b);
781
782                     offset = (int64_t) (
783                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
784                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
785
786                     p->recieve_memblock_callback(
787                             p,
788                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
789                             offset,
790                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
791                             &chunk,
792                             p->recieve_memblock_callback_userdata);
793                 }
794
795                 pa_memblock_unref(b);
796             }
797
798             goto frame_done;
799         }
800     }
801
802     return 0;
803
804 frame_done:
805     p->read.memblock = NULL;
806     p->read.packet = NULL;
807     p->read.index = 0;
808     p->read.data = NULL;
809
810 #ifdef HAVE_CREDS
811     p->read_creds_valid = 0;
812 #endif
813
814     return 0;
815
816 fail:
817     if (release_memblock)
818         pa_memblock_release(release_memblock);
819
820     return -1;
821 }
822
823 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
824     assert(p);
825     assert(PA_REFCNT_VALUE(p) > 0);
826
827     p->die_callback = cb;
828     p->die_callback_userdata = userdata;
829 }
830
831 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
832     assert(p);
833     assert(PA_REFCNT_VALUE(p) > 0);
834
835     p->drain_callback = cb;
836     p->drain_callback_userdata = userdata;
837 }
838
839 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
840     assert(p);
841     assert(PA_REFCNT_VALUE(p) > 0);
842
843     p->recieve_packet_callback = cb;
844     p->recieve_packet_callback_userdata = userdata;
845 }
846
847 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
848     assert(p);
849     assert(PA_REFCNT_VALUE(p) > 0);
850
851     p->recieve_memblock_callback = cb;
852     p->recieve_memblock_callback_userdata = userdata;
853 }
854
855 int pa_pstream_is_pending(pa_pstream *p) {
856     int b;
857
858     assert(p);
859     assert(PA_REFCNT_VALUE(p) > 0);
860
861     if (p->dead)
862         b = 0;
863     else
864         b = p->write.current || !pa_queue_is_empty(p->send_queue);
865
866     return b;
867 }
868
869 void pa_pstream_unref(pa_pstream*p) {
870     assert(p);
871     assert(PA_REFCNT_VALUE(p) > 0);
872
873     if (PA_REFCNT_DEC(p) <= 0)
874         pstream_free(p);
875 }
876
877 pa_pstream* pa_pstream_ref(pa_pstream*p) {
878     assert(p);
879     assert(PA_REFCNT_VALUE(p) > 0);
880
881     PA_REFCNT_INC(p);
882     return p;
883 }
884
885 void pa_pstream_close(pa_pstream *p) {
886     assert(p);
887
888     p->dead = 1;
889
890     if (p->import) {
891         pa_memimport_free(p->import);
892         p->import = NULL;
893     }
894
895     if (p->export) {
896         pa_memexport_free(p->export);
897         p->export = NULL;
898     }
899
900     if (p->io) {
901         pa_iochannel_free(p->io);
902         p->io = NULL;
903     }
904
905     p->die_callback = NULL;
906     p->drain_callback = NULL;
907     p->recieve_packet_callback = NULL;
908     p->recieve_memblock_callback = NULL;
909 }
910
911 void pa_pstream_use_shm(pa_pstream *p, int enable) {
912     assert(p);
913     assert(PA_REFCNT_VALUE(p) > 0);
914
915     p->use_shm = enable;
916
917     if (enable) {
918
919         if (!p->export)
920             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
921
922     } else {
923
924         if (p->export) {
925             pa_memexport_free(p->export);
926             p->export = NULL;
927         }
928     }
929 }