Revert r1404 and keep it on a development branch until it is fully tested.
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5  
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10  
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41 #include "winsock.h"
42
43 #include <pulse/xmalloc.h>
44
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/mutex.h>
50 #include <pulsecore/refcnt.h>
51
52 #include "pstream.h"
53
54 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
55 #define PA_FLAG_SHMDATA    0x80000000LU
56 #define PA_FLAG_SHMRELEASE 0x40000000LU
57 #define PA_FLAG_SHMREVOKE  0xC0000000LU
58 #define PA_FLAG_SHMMASK    0xFF000000LU
59 #define PA_FLAG_SEEKMASK   0x000000FFLU
60
61 /* The sequence descriptor header consists of 5 32bit integers: */
62 enum {
63     PA_PSTREAM_DESCRIPTOR_LENGTH,
64     PA_PSTREAM_DESCRIPTOR_CHANNEL,
65     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
66     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
67     PA_PSTREAM_DESCRIPTOR_FLAGS,
68     PA_PSTREAM_DESCRIPTOR_MAX
69 };
70
71 /* If we have an SHM block, this info follows the descriptor */
72 enum {
73     PA_PSTREAM_SHM_BLOCKID,
74     PA_PSTREAM_SHM_SHMID,
75     PA_PSTREAM_SHM_INDEX,
76     PA_PSTREAM_SHM_LENGTH,
77     PA_PSTREAM_SHM_MAX
78 };
79
80 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
81
82 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
83 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
84 #define FRAME_SIZE_MAX_USE (1024*64)
85
86 struct item_info {
87     enum {
88         PA_PSTREAM_ITEM_PACKET,
89         PA_PSTREAM_ITEM_MEMBLOCK,
90         PA_PSTREAM_ITEM_SHMRELEASE,
91         PA_PSTREAM_ITEM_SHMREVOKE
92     } type;
93
94
95     /* packet info */
96     pa_packet *packet;
97 #ifdef HAVE_CREDS
98     int with_creds;
99     pa_creds creds;
100 #endif
101
102     /* memblock info */
103     pa_memchunk chunk;
104     uint32_t channel;
105     int64_t offset;
106     pa_seek_mode_t seek_mode;
107
108     /* release/revoke info */
109     uint32_t block_id;
110 };
111
112 struct pa_pstream {
113     PA_REFCNT_DECLARE;
114     
115     pa_mainloop_api *mainloop;
116     pa_defer_event *defer_event;
117     pa_iochannel *io;
118     pa_queue *send_queue;
119     pa_mutex *mutex;
120
121     int dead;
122
123     struct {
124         pa_pstream_descriptor descriptor;
125         struct item_info* current;
126         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
127         void *data;
128         size_t index;
129     } write;
130
131     struct {
132         pa_pstream_descriptor descriptor;
133         pa_memblock *memblock;
134         pa_packet *packet;
135         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
136         void *data;
137         size_t index;
138     } read;
139
140     int use_shm;
141     pa_memimport *import;
142     pa_memexport *export;
143
144     pa_pstream_packet_cb_t recieve_packet_callback;
145     void *recieve_packet_callback_userdata;
146
147     pa_pstream_memblock_cb_t recieve_memblock_callback;
148     void *recieve_memblock_callback_userdata;
149
150     pa_pstream_notify_cb_t drain_callback;
151     void *drain_callback_userdata;
152
153     pa_pstream_notify_cb_t die_callback;
154     void *die_callback_userdata;
155
156     pa_mempool *mempool;
157
158 #ifdef HAVE_CREDS
159     pa_creds read_creds, write_creds;
160     int read_creds_valid, send_creds_now;
161 #endif
162 };
163
164 static int do_write(pa_pstream *p);
165 static int do_read(pa_pstream *p);
166
167 static void do_something(pa_pstream *p) {
168     assert(p);
169     assert(PA_REFCNT_VALUE(p) > 0);
170
171     pa_pstream_ref(p);
172
173     pa_mutex_lock(p->mutex);
174     
175     p->mainloop->defer_enable(p->defer_event, 0);
176
177     if (!p->dead && pa_iochannel_is_readable(p->io)) {
178         if (do_read(p) < 0)
179             goto fail;
180     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
181         goto fail;
182
183     if (!p->dead && pa_iochannel_is_writable(p->io)) {
184         if (do_write(p) < 0)
185             goto fail;
186     }
187
188     pa_mutex_unlock(p->mutex);
189
190     pa_pstream_unref(p);
191     return;
192
193 fail:
194
195     p->dead = 1;
196     
197     if (p->die_callback)
198         p->die_callback(p, p->die_callback_userdata);
199     
200     pa_mutex_unlock(p->mutex);
201     
202     pa_pstream_unref(p);
203 }
204
205 static void io_callback(pa_iochannel*io, void *userdata) {
206     pa_pstream *p = userdata;
207     
208     assert(p);
209     assert(p->io == io);
210     
211     do_something(p);
212 }
213
214 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
215     pa_pstream *p = userdata;
216
217     assert(p);
218     assert(p->defer_event == e);
219     assert(p->mainloop == m);
220     
221     do_something(p);
222 }
223
224 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
225
226 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
227     pa_pstream *p;
228     
229     assert(m);
230     assert(io);
231     assert(pool);
232
233     p = pa_xnew(pa_pstream, 1);
234     PA_REFCNT_INIT(p);
235     p->io = io;
236     pa_iochannel_set_callback(io, io_callback, p);
237     p->dead = 0;
238
239     p->mutex = pa_mutex_new(1);
240
241     p->mainloop = m;
242     p->defer_event = m->defer_new(m, defer_callback, p);
243     m->defer_enable(p->defer_event, 0);
244     
245     p->send_queue = pa_queue_new();
246     assert(p->send_queue);
247
248     p->write.current = NULL;
249     p->write.index = 0;
250     p->read.memblock = NULL;
251     p->read.packet = NULL;
252     p->read.index = 0;
253
254     p->recieve_packet_callback = NULL;
255     p->recieve_packet_callback_userdata = NULL;
256     p->recieve_memblock_callback = NULL;
257     p->recieve_memblock_callback_userdata = NULL;
258     p->drain_callback = NULL;
259     p->drain_callback_userdata = NULL;
260     p->die_callback = NULL;
261     p->die_callback_userdata = NULL;
262
263     p->mempool = pool;
264
265     p->use_shm = 0;
266     p->export = NULL;
267
268     /* We do importing unconditionally */
269     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
270
271     pa_iochannel_socket_set_rcvbuf(io, 1024*8); 
272     pa_iochannel_socket_set_sndbuf(io, 1024*8);
273
274 #ifdef HAVE_CREDS
275     p->send_creds_now = 0;
276     p->read_creds_valid = 0;
277 #endif
278     return p;
279 }
280
281 static void item_free(void *item, PA_GCC_UNUSED void *p) {
282     struct item_info *i = item;
283     assert(i);
284
285     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
286         assert(i->chunk.memblock);
287         pa_memblock_unref(i->chunk.memblock);
288     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
289         assert(i->packet);
290         pa_packet_unref(i->packet);
291     }
292
293     pa_xfree(i);
294 }
295
296 static void pstream_free(pa_pstream *p) {
297     assert(p);
298
299     pa_pstream_close(p);
300     
301     pa_queue_free(p->send_queue, item_free, NULL);
302
303     if (p->write.current)
304         item_free(p->write.current, NULL);
305
306     if (p->read.memblock)
307         pa_memblock_unref(p->read.memblock);
308     
309     if (p->read.packet)
310         pa_packet_unref(p->read.packet);
311
312     if (p->mutex)
313         pa_mutex_free(p->mutex);
314
315     pa_xfree(p);
316 }
317
318 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
319     struct item_info *i;
320
321     assert(p);
322     assert(PA_REFCNT_VALUE(p) > 0);
323     assert(packet);
324
325     pa_mutex_lock(p->mutex);
326     
327     if (p->dead)
328         goto finish;
329     
330     i = pa_xnew(struct item_info, 1);
331     i->type = PA_PSTREAM_ITEM_PACKET;
332     i->packet = pa_packet_ref(packet);
333     
334 #ifdef HAVE_CREDS
335     if ((i->with_creds = !!creds))
336         i->creds = *creds;
337 #endif
338
339     pa_queue_push(p->send_queue, i);
340     p->mainloop->defer_enable(p->defer_event, 1);
341
342 finish:
343
344     pa_mutex_unlock(p->mutex);
345 }
346
347 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
348     size_t length, idx;
349     
350     assert(p);
351     assert(PA_REFCNT_VALUE(p) > 0);
352     assert(channel != (uint32_t) -1);
353     assert(chunk);
354
355     pa_mutex_lock(p->mutex);
356     
357     if (p->dead)
358         goto finish;
359
360     length = chunk->length;
361     idx = 0;
362
363     while (length > 0) {
364         struct item_info *i;
365         size_t n;
366         
367         i = pa_xnew(struct item_info, 1);
368         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
369
370         n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
371         i->chunk.index = chunk->index + idx;
372         i->chunk.length = n;
373         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
374         
375         i->channel = channel;
376         i->offset = offset;
377         i->seek_mode = seek_mode;
378 #ifdef HAVE_CREDS
379         i->with_creds = 0;
380 #endif
381         
382         pa_queue_push(p->send_queue, i);
383
384         idx += n;
385         length -= n;
386     }
387         
388     p->mainloop->defer_enable(p->defer_event, 1);
389
390 finish:
391     
392     pa_mutex_unlock(p->mutex);
393 }
394
395 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
396     struct item_info *item;
397     pa_pstream *p = userdata;
398
399     assert(p);
400     assert(PA_REFCNT_VALUE(p) > 0);
401
402     pa_mutex_lock(p->mutex);
403     
404     if (p->dead)
405         goto finish;
406
407 /*     pa_log("Releasing block %u", block_id); */
408
409     item = pa_xnew(struct item_info, 1);
410     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
411     item->block_id = block_id;
412 #ifdef HAVE_CREDS
413     item->with_creds = 0;
414 #endif
415
416     pa_queue_push(p->send_queue, item);
417     p->mainloop->defer_enable(p->defer_event, 1);
418
419 finish:
420
421     pa_mutex_unlock(p->mutex);
422 }
423
424 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
425     struct item_info *item;
426     pa_pstream *p = userdata;
427
428     assert(p);
429     assert(PA_REFCNT_VALUE(p) > 0);
430
431     pa_mutex_lock(p->mutex);
432     
433     if (p->dead)
434         goto finish;
435
436 /*     pa_log("Revoking block %u", block_id); */
437     
438     item = pa_xnew(struct item_info, 1);
439     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
440     item->block_id = block_id;
441 #ifdef HAVE_CREDS
442     item->with_creds = 0;
443 #endif
444
445     pa_queue_push(p->send_queue, item);
446     p->mainloop->defer_enable(p->defer_event, 1);
447
448 finish:
449
450     pa_mutex_unlock(p->mutex);
451 }
452
453 static void prepare_next_write_item(pa_pstream *p) {
454     assert(p);
455     assert(PA_REFCNT_VALUE(p) > 0);
456
457     if (!(p->write.current = pa_queue_pop(p->send_queue)))
458         return;
459     
460     p->write.index = 0;
461     p->write.data = NULL;
462
463     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
464     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
465     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
466     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
467     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
468     
469     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
470         
471         assert(p->write.current->packet);
472         p->write.data = p->write.current->packet->data;
473         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
474
475     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
476
477         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
478         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
479
480     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
481
482         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
483         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
484         
485     } else {
486         uint32_t flags;
487         int send_payload = 1;
488         
489         assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
490         assert(p->write.current->chunk.memblock);
491         
492         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
493         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
494         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
495
496         flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
497
498         if (p->use_shm) {
499             uint32_t block_id, shm_id;
500             size_t offset, length;
501
502             assert(p->export);
503
504             if (pa_memexport_put(p->export,
505                                  p->write.current->chunk.memblock,
506                                  &block_id,
507                                  &shm_id,
508                                  &offset,
509                                  &length) >= 0) {
510                 
511                 flags |= PA_FLAG_SHMDATA;
512                 send_payload = 0;
513                 
514                 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
515                 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
516                 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
517                 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
518                 
519                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
520                 p->write.data = p->write.shm_info;
521             }
522 /*             else */
523 /*                 pa_log_warn("Failed to export memory block."); */
524         }
525
526         if (send_payload) {
527             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
528             p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
529         }
530         
531         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
532     }
533
534 #ifdef HAVE_CREDS
535     if ((p->send_creds_now = p->write.current->with_creds))
536         p->write_creds = p->write.current->creds;
537 #endif
538 }
539
540 static int do_write(pa_pstream *p) {
541     void *d;
542     size_t l;
543     ssize_t r;
544     
545     assert(p);
546     assert(PA_REFCNT_VALUE(p) > 0);
547
548     if (!p->write.current)
549         prepare_next_write_item(p);
550
551     if (!p->write.current)
552         return 0;
553
554     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
555         d = (uint8_t*) p->write.descriptor + p->write.index;
556         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
557     } else {
558         assert(p->write.data);
559     
560         d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
561         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
562     }
563
564     assert(l > 0);
565         
566 #ifdef HAVE_CREDS
567     if (p->send_creds_now) {
568
569         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
570             return -1;
571
572         p->send_creds_now = 0;
573     } else
574 #endif
575
576     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
577         return -1;
578
579     p->write.index += r;
580
581     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
582         assert(p->write.current);
583         item_free(p->write.current, (void *) 1);
584         p->write.current = NULL;
585
586         if (p->drain_callback && !pa_pstream_is_pending(p))
587             p->drain_callback(p, p->drain_callback_userdata);
588     }
589
590     return 0;
591 }
592
593 static int do_read(pa_pstream *p) {
594     void *d;
595     size_t l; 
596     ssize_t r;
597     
598     assert(p);
599     assert(PA_REFCNT_VALUE(p) > 0);
600     
601     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
602         d = (uint8_t*) p->read.descriptor + p->read.index;
603         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
604     } else {
605         assert(p->read.data);
606         d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
607         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
608     }
609
610 #ifdef HAVE_CREDS
611     {
612         int b = 0;
613         
614         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
615             return -1;
616
617         p->read_creds_valid = p->read_creds_valid || b;
618     }
619 #else
620     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
621         return -1;
622 #endif
623     
624     p->read.index += r;
625
626     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
627         uint32_t flags, length, channel;
628         /* Reading of frame descriptor complete */
629
630         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
631
632         if (!p->import && (flags & PA_FLAG_SHMMASK) != 0) {
633             pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
634             return -1;
635         }
636         
637         if (flags == PA_FLAG_SHMRELEASE) {
638
639             /* This is a SHM memblock release frame with no payload */
640
641 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
642             
643             assert(p->export);
644             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
645
646             goto frame_done;
647             
648         } else if (flags == PA_FLAG_SHMREVOKE) {
649
650             /* This is a SHM memblock revoke frame with no payload */
651
652 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
653
654             assert(p->import);
655             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
656
657             goto frame_done;
658         }
659
660         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
661         
662         if (length > FRAME_SIZE_MAX_ALLOW) {
663             pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length);
664             return -1;
665         }
666         
667         assert(!p->read.packet && !p->read.memblock);
668
669         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
670         
671         if (channel == (uint32_t) -1) {
672
673             if (flags != 0) {
674                 pa_log_warn("Received packet frame with invalid flags value.");
675                 return -1;
676             }
677             
678             /* Frame is a packet frame */
679             p->read.packet = pa_packet_new(length);
680             p->read.data = p->read.packet->data;
681             
682         } else {
683
684             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
685                 pa_log_warn("Received memblock frame with invalid seek mode.");
686                 return -1;
687             }
688             
689             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
690
691                 if (length != sizeof(p->read.shm_info)) {
692                     pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
693                     return -1;
694                 }
695             
696                 /* Frame is a memblock frame referencing an SHM memblock */
697                 p->read.data = p->read.shm_info;
698
699             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
700
701                 /* Frame is a memblock frame */
702                 
703                 p->read.memblock = pa_memblock_new(p->mempool, length);
704                 p->read.data = p->read.memblock->data;
705             } else {
706                 
707                 pa_log_warn("Recieved memblock frame with invalid flags value.");
708                 return -1;
709             }
710         }
711             
712     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
713         /* Frame payload available */
714         
715         if (p->read.memblock && p->recieve_memblock_callback) {
716
717             /* Is this memblock data? Than pass it to the user */
718             l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
719                 
720             if (l > 0) {
721                 pa_memchunk chunk;
722                 
723                 chunk.memblock = p->read.memblock;
724                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
725                 chunk.length = l;
726
727                 if (p->recieve_memblock_callback) {
728                     int64_t offset;
729
730                     offset = (int64_t) (
731                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
732                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
733                     
734                     p->recieve_memblock_callback(
735                         p,
736                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
737                         offset,
738                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
739                         &chunk,
740                         p->recieve_memblock_callback_userdata);
741                 }
742
743                 /* Drop seek info for following callbacks */
744                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
745                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
746                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
747             }
748         }
749
750         /* Frame complete */
751         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
752             
753             if (p->read.memblock) {
754
755                 /* This was a memblock frame. We can unref the memblock now */
756                 pa_memblock_unref(p->read.memblock);
757
758             } else if (p->read.packet) {
759                 
760                 if (p->recieve_packet_callback)
761 #ifdef HAVE_CREDS
762                     p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
763 #else
764                     p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
765 #endif
766
767                 pa_packet_unref(p->read.packet);
768             } else {
769                 pa_memblock *b;
770                 
771                 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
772
773                 assert(p->import);
774
775                 if (!(b = pa_memimport_get(p->import,
776                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
777                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
778                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
779                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
780
781                     pa_log_warn("Failed to import memory block.");
782                     return -1;
783                 }
784
785                 if (p->recieve_memblock_callback) {
786                     int64_t offset;
787                     pa_memchunk chunk;
788                     
789                     chunk.memblock = b;
790                     chunk.index = 0;
791                     chunk.length = b->length;
792
793                     offset = (int64_t) (
794                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
795                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
796                     
797                     p->recieve_memblock_callback(
798                             p,
799                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
800                             offset,
801                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
802                             &chunk,
803                             p->recieve_memblock_callback_userdata);
804                 }
805
806                 pa_memblock_unref(b);
807             }
808
809             goto frame_done;
810         }
811     }
812
813     return 0;
814
815 frame_done:
816     p->read.memblock = NULL;
817     p->read.packet = NULL;
818     p->read.index = 0;
819
820 #ifdef HAVE_CREDS
821     p->read_creds_valid = 0;
822 #endif
823
824     return 0;
825 }
826
827 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
828     assert(p);
829     assert(PA_REFCNT_VALUE(p) > 0);
830     
831     pa_mutex_lock(p->mutex);
832     p->die_callback = cb;
833     p->die_callback_userdata = userdata;
834     pa_mutex_unlock(p->mutex);
835 }
836
837 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
838     assert(p);
839     assert(PA_REFCNT_VALUE(p) > 0);
840
841     pa_mutex_lock(p->mutex);
842     p->drain_callback = cb;
843     p->drain_callback_userdata = userdata;
844     pa_mutex_unlock(p->mutex);
845 }
846
847 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
848     assert(p);
849     assert(PA_REFCNT_VALUE(p) > 0);
850
851     pa_mutex_lock(p->mutex);
852     p->recieve_packet_callback = cb;
853     p->recieve_packet_callback_userdata = userdata;
854     pa_mutex_unlock(p->mutex);
855 }
856
857 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
858     assert(p);
859     assert(PA_REFCNT_VALUE(p) > 0);
860
861     pa_mutex_lock(p->mutex);
862     p->recieve_memblock_callback = cb;
863     p->recieve_memblock_callback_userdata = userdata;
864     pa_mutex_unlock(p->mutex);
865 }
866
867 int pa_pstream_is_pending(pa_pstream *p) {
868     int b;
869
870     assert(p);
871     assert(PA_REFCNT_VALUE(p) > 0);
872
873     pa_mutex_lock(p->mutex);
874
875     if (p->dead)
876         b = 0;
877     else
878         b = p->write.current || !pa_queue_is_empty(p->send_queue);
879
880     pa_mutex_unlock(p->mutex);
881
882     return b;
883 }
884
885 void pa_pstream_unref(pa_pstream*p) {
886     assert(p);
887     assert(PA_REFCNT_VALUE(p) > 0);
888
889     if (PA_REFCNT_DEC(p) <= 0)
890         pstream_free(p);
891 }
892
893 pa_pstream* pa_pstream_ref(pa_pstream*p) {
894     assert(p);
895     assert(PA_REFCNT_VALUE(p) > 0);
896     
897     PA_REFCNT_INC(p);
898     return p;
899 }
900
901 void pa_pstream_close(pa_pstream *p) {
902     assert(p);
903
904     pa_mutex_lock(p->mutex);
905     
906     p->dead = 1;
907
908     if (p->import) {
909         pa_memimport_free(p->import);
910         p->import = NULL;
911     }
912
913     if (p->export) {
914         pa_memexport_free(p->export);
915         p->export = NULL;
916     }
917
918     if (p->io) {
919         pa_iochannel_free(p->io);
920         p->io = NULL;
921     }
922
923     if (p->defer_event) {
924         p->mainloop->defer_free(p->defer_event);
925         p->defer_event = NULL;
926     }
927
928     p->die_callback = NULL;
929     p->drain_callback = NULL;
930     p->recieve_packet_callback = NULL;
931     p->recieve_memblock_callback = NULL;
932
933     pa_mutex_unlock(p->mutex);
934 }
935
936 void pa_pstream_use_shm(pa_pstream *p, int enable) {
937     assert(p);
938     assert(PA_REFCNT_VALUE(p) > 0);
939
940     pa_mutex_lock(p->mutex);
941
942     p->use_shm = enable;
943
944     if (enable) {
945     
946         if (!p->export)
947             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
948
949     } else {
950
951         if (p->export) {
952             pa_memexport_free(p->export);
953             p->export = NULL;
954         }
955     }
956
957     pa_mutex_unlock(p->mutex);
958 }