Add copyright notices to all relevant files. (based on svn log)
[profile/ivi/pulseaudio.git] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2004-2006 Lennart Poettering
7   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as
11   published by the Free Software Foundation; either version 2.1 of the
12   License, or (at your option) any later version.
13
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   Lesser General Public License for more details.
18
19   You should have received a copy of the GNU Lesser General Public
20   License along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <assert.h>
32 #include <unistd.h>
33
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
42 #endif
43
44 #include "winsock.h"
45
46 #include <pulse/xmalloc.h>
47
48 #include <pulsecore/queue.h>
49 #include <pulsecore/log.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/creds.h>
52 #include <pulsecore/mutex.h>
53 #include <pulsecore/refcnt.h>
54
55 #include "pstream.h"
56
57 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
58 #define PA_FLAG_SHMDATA    0x80000000LU
59 #define PA_FLAG_SHMRELEASE 0x40000000LU
60 #define PA_FLAG_SHMREVOKE  0xC0000000LU
61 #define PA_FLAG_SHMMASK    0xFF000000LU
62 #define PA_FLAG_SEEKMASK   0x000000FFLU
63
64 /* The sequence descriptor header consists of 5 32bit integers: */
65 enum {
66     PA_PSTREAM_DESCRIPTOR_LENGTH,
67     PA_PSTREAM_DESCRIPTOR_CHANNEL,
68     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
69     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
70     PA_PSTREAM_DESCRIPTOR_FLAGS,
71     PA_PSTREAM_DESCRIPTOR_MAX
72 };
73
74 /* If we have an SHM block, this info follows the descriptor */
75 enum {
76     PA_PSTREAM_SHM_BLOCKID,
77     PA_PSTREAM_SHM_SHMID,
78     PA_PSTREAM_SHM_INDEX,
79     PA_PSTREAM_SHM_LENGTH,
80     PA_PSTREAM_SHM_MAX
81 };
82
83 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
84
85 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
86 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
87 #define FRAME_SIZE_MAX_USE (1024*64)
88
89 struct item_info {
90     enum {
91         PA_PSTREAM_ITEM_PACKET,
92         PA_PSTREAM_ITEM_MEMBLOCK,
93         PA_PSTREAM_ITEM_SHMRELEASE,
94         PA_PSTREAM_ITEM_SHMREVOKE
95     } type;
96
97
98     /* packet info */
99     pa_packet *packet;
100 #ifdef HAVE_CREDS
101     int with_creds;
102     pa_creds creds;
103 #endif
104
105     /* memblock info */
106     pa_memchunk chunk;
107     uint32_t channel;
108     int64_t offset;
109     pa_seek_mode_t seek_mode;
110
111     /* release/revoke info */
112     uint32_t block_id;
113 };
114
115 struct pa_pstream {
116     PA_REFCNT_DECLARE;
117
118     pa_mainloop_api *mainloop;
119     pa_defer_event *defer_event;
120     pa_iochannel *io;
121     pa_queue *send_queue;
122     pa_mutex *mutex;
123
124     int dead;
125
126     struct {
127         pa_pstream_descriptor descriptor;
128         struct item_info* current;
129         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
130         void *data;
131         size_t index;
132     } write;
133
134     struct {
135         pa_pstream_descriptor descriptor;
136         pa_memblock *memblock;
137         pa_packet *packet;
138         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139         void *data;
140         size_t index;
141     } read;
142
143     int use_shm;
144     pa_memimport *import;
145     pa_memexport *export;
146
147     pa_pstream_packet_cb_t recieve_packet_callback;
148     void *recieve_packet_callback_userdata;
149
150     pa_pstream_memblock_cb_t recieve_memblock_callback;
151     void *recieve_memblock_callback_userdata;
152
153     pa_pstream_notify_cb_t drain_callback;
154     void *drain_callback_userdata;
155
156     pa_pstream_notify_cb_t die_callback;
157     void *die_callback_userdata;
158
159     pa_mempool *mempool;
160
161 #ifdef HAVE_CREDS
162     pa_creds read_creds, write_creds;
163     int read_creds_valid, send_creds_now;
164 #endif
165 };
166
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
169
170 static void do_something(pa_pstream *p) {
171     assert(p);
172     assert(PA_REFCNT_VALUE(p) > 0);
173
174     pa_pstream_ref(p);
175
176     pa_mutex_lock(p->mutex);
177
178     p->mainloop->defer_enable(p->defer_event, 0);
179
180     if (!p->dead && pa_iochannel_is_readable(p->io)) {
181         if (do_read(p) < 0)
182             goto fail;
183     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
184         goto fail;
185
186     if (!p->dead && pa_iochannel_is_writable(p->io)) {
187         if (do_write(p) < 0)
188             goto fail;
189     }
190
191     pa_mutex_unlock(p->mutex);
192
193     pa_pstream_unref(p);
194     return;
195
196 fail:
197
198     p->dead = 1;
199
200     if (p->die_callback)
201         p->die_callback(p, p->die_callback_userdata);
202
203     pa_mutex_unlock(p->mutex);
204
205     pa_pstream_unref(p);
206 }
207
208 static void io_callback(pa_iochannel*io, void *userdata) {
209     pa_pstream *p = userdata;
210
211     assert(p);
212     assert(p->io == io);
213
214     do_something(p);
215 }
216
217 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
218     pa_pstream *p = userdata;
219
220     assert(p);
221     assert(p->defer_event == e);
222     assert(p->mainloop == m);
223
224     do_something(p);
225 }
226
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
228
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
230     pa_pstream *p;
231
232     assert(m);
233     assert(io);
234     assert(pool);
235
236     p = pa_xnew(pa_pstream, 1);
237     PA_REFCNT_INIT(p);
238     p->io = io;
239     pa_iochannel_set_callback(io, io_callback, p);
240     p->dead = 0;
241
242     p->mutex = pa_mutex_new(1);
243
244     p->mainloop = m;
245     p->defer_event = m->defer_new(m, defer_callback, p);
246     m->defer_enable(p->defer_event, 0);
247
248     p->send_queue = pa_queue_new();
249     assert(p->send_queue);
250
251     p->write.current = NULL;
252     p->write.index = 0;
253     p->read.memblock = NULL;
254     p->read.packet = NULL;
255     p->read.index = 0;
256
257     p->recieve_packet_callback = NULL;
258     p->recieve_packet_callback_userdata = NULL;
259     p->recieve_memblock_callback = NULL;
260     p->recieve_memblock_callback_userdata = NULL;
261     p->drain_callback = NULL;
262     p->drain_callback_userdata = NULL;
263     p->die_callback = NULL;
264     p->die_callback_userdata = NULL;
265
266     p->mempool = pool;
267
268     p->use_shm = 0;
269     p->export = NULL;
270
271     /* We do importing unconditionally */
272     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
273
274     pa_iochannel_socket_set_rcvbuf(io, 1024*8);
275     pa_iochannel_socket_set_sndbuf(io, 1024*8);
276
277 #ifdef HAVE_CREDS
278     p->send_creds_now = 0;
279     p->read_creds_valid = 0;
280 #endif
281     return p;
282 }
283
284 static void item_free(void *item, PA_GCC_UNUSED void *p) {
285     struct item_info *i = item;
286     assert(i);
287
288     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
289         assert(i->chunk.memblock);
290         pa_memblock_unref(i->chunk.memblock);
291     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
292         assert(i->packet);
293         pa_packet_unref(i->packet);
294     }
295
296     pa_xfree(i);
297 }
298
299 static void pstream_free(pa_pstream *p) {
300     assert(p);
301
302     pa_pstream_close(p);
303
304     pa_queue_free(p->send_queue, item_free, NULL);
305
306     if (p->write.current)
307         item_free(p->write.current, NULL);
308
309     if (p->read.memblock)
310         pa_memblock_unref(p->read.memblock);
311
312     if (p->read.packet)
313         pa_packet_unref(p->read.packet);
314
315     if (p->mutex)
316         pa_mutex_free(p->mutex);
317
318     pa_xfree(p);
319 }
320
321 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
322     struct item_info *i;
323
324     assert(p);
325     assert(PA_REFCNT_VALUE(p) > 0);
326     assert(packet);
327
328     pa_mutex_lock(p->mutex);
329
330     if (p->dead)
331         goto finish;
332
333     i = pa_xnew(struct item_info, 1);
334     i->type = PA_PSTREAM_ITEM_PACKET;
335     i->packet = pa_packet_ref(packet);
336
337 #ifdef HAVE_CREDS
338     if ((i->with_creds = !!creds))
339         i->creds = *creds;
340 #endif
341
342     pa_queue_push(p->send_queue, i);
343     p->mainloop->defer_enable(p->defer_event, 1);
344
345 finish:
346
347     pa_mutex_unlock(p->mutex);
348 }
349
350 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
351     size_t length, idx;
352
353     assert(p);
354     assert(PA_REFCNT_VALUE(p) > 0);
355     assert(channel != (uint32_t) -1);
356     assert(chunk);
357
358     pa_mutex_lock(p->mutex);
359
360     if (p->dead)
361         goto finish;
362
363     length = chunk->length;
364     idx = 0;
365
366     while (length > 0) {
367         struct item_info *i;
368         size_t n;
369
370         i = pa_xnew(struct item_info, 1);
371         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
372
373         n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
374         i->chunk.index = chunk->index + idx;
375         i->chunk.length = n;
376         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
377
378         i->channel = channel;
379         i->offset = offset;
380         i->seek_mode = seek_mode;
381 #ifdef HAVE_CREDS
382         i->with_creds = 0;
383 #endif
384
385         pa_queue_push(p->send_queue, i);
386
387         idx += n;
388         length -= n;
389     }
390
391     p->mainloop->defer_enable(p->defer_event, 1);
392
393 finish:
394
395     pa_mutex_unlock(p->mutex);
396 }
397
398 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
399     struct item_info *item;
400     pa_pstream *p = userdata;
401
402     assert(p);
403     assert(PA_REFCNT_VALUE(p) > 0);
404
405     pa_mutex_lock(p->mutex);
406
407     if (p->dead)
408         goto finish;
409
410 /*     pa_log("Releasing block %u", block_id); */
411
412     item = pa_xnew(struct item_info, 1);
413     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
414     item->block_id = block_id;
415 #ifdef HAVE_CREDS
416     item->with_creds = 0;
417 #endif
418
419     pa_queue_push(p->send_queue, item);
420     p->mainloop->defer_enable(p->defer_event, 1);
421
422 finish:
423
424     pa_mutex_unlock(p->mutex);
425 }
426
427 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
428     struct item_info *item;
429     pa_pstream *p = userdata;
430
431     assert(p);
432     assert(PA_REFCNT_VALUE(p) > 0);
433
434     pa_mutex_lock(p->mutex);
435
436     if (p->dead)
437         goto finish;
438
439 /*     pa_log("Revoking block %u", block_id); */
440
441     item = pa_xnew(struct item_info, 1);
442     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
443     item->block_id = block_id;
444 #ifdef HAVE_CREDS
445     item->with_creds = 0;
446 #endif
447
448     pa_queue_push(p->send_queue, item);
449     p->mainloop->defer_enable(p->defer_event, 1);
450
451 finish:
452
453     pa_mutex_unlock(p->mutex);
454 }
455
456 static void prepare_next_write_item(pa_pstream *p) {
457     assert(p);
458     assert(PA_REFCNT_VALUE(p) > 0);
459
460     if (!(p->write.current = pa_queue_pop(p->send_queue)))
461         return;
462
463     p->write.index = 0;
464     p->write.data = NULL;
465
466     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
467     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
468     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
469     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
470     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
471
472     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
473
474         assert(p->write.current->packet);
475         p->write.data = p->write.current->packet->data;
476         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
477
478     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
479
480         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
481         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
482
483     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
484
485         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
486         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
487
488     } else {
489         uint32_t flags;
490         int send_payload = 1;
491
492         assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
493         assert(p->write.current->chunk.memblock);
494
495         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
496         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
497         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
498
499         flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
500
501         if (p->use_shm) {
502             uint32_t block_id, shm_id;
503             size_t offset, length;
504
505             assert(p->export);
506
507             if (pa_memexport_put(p->export,
508                                  p->write.current->chunk.memblock,
509                                  &block_id,
510                                  &shm_id,
511                                  &offset,
512                                  &length) >= 0) {
513
514                 flags |= PA_FLAG_SHMDATA;
515                 send_payload = 0;
516
517                 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
518                 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
519                 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
520                 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
521
522                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
523                 p->write.data = p->write.shm_info;
524             }
525 /*             else */
526 /*                 pa_log_warn("Failed to export memory block."); */
527         }
528
529         if (send_payload) {
530             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
531             p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
532         }
533
534         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
535     }
536
537 #ifdef HAVE_CREDS
538     if ((p->send_creds_now = p->write.current->with_creds))
539         p->write_creds = p->write.current->creds;
540 #endif
541 }
542
543 static int do_write(pa_pstream *p) {
544     void *d;
545     size_t l;
546     ssize_t r;
547
548     assert(p);
549     assert(PA_REFCNT_VALUE(p) > 0);
550
551     if (!p->write.current)
552         prepare_next_write_item(p);
553
554     if (!p->write.current)
555         return 0;
556
557     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
558         d = (uint8_t*) p->write.descriptor + p->write.index;
559         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
560     } else {
561         assert(p->write.data);
562
563         d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
564         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
565     }
566
567     assert(l > 0);
568
569 #ifdef HAVE_CREDS
570     if (p->send_creds_now) {
571
572         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
573             return -1;
574
575         p->send_creds_now = 0;
576     } else
577 #endif
578
579     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
580         return -1;
581
582     p->write.index += r;
583
584     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
585         assert(p->write.current);
586         item_free(p->write.current, (void *) 1);
587         p->write.current = NULL;
588
589         if (p->drain_callback && !pa_pstream_is_pending(p))
590             p->drain_callback(p, p->drain_callback_userdata);
591     }
592
593     return 0;
594 }
595
596 static int do_read(pa_pstream *p) {
597     void *d;
598     size_t l;
599     ssize_t r;
600
601     assert(p);
602     assert(PA_REFCNT_VALUE(p) > 0);
603
604     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
605         d = (uint8_t*) p->read.descriptor + p->read.index;
606         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
607     } else {
608         assert(p->read.data);
609         d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
610         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
611     }
612
613 #ifdef HAVE_CREDS
614     {
615         int b = 0;
616
617         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
618             return -1;
619
620         p->read_creds_valid = p->read_creds_valid || b;
621     }
622 #else
623     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
624         return -1;
625 #endif
626
627     p->read.index += r;
628
629     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
630         uint32_t flags, length, channel;
631         /* Reading of frame descriptor complete */
632
633         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
634
635         if (!p->import && (flags & PA_FLAG_SHMMASK) != 0) {
636             pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
637             return -1;
638         }
639
640         if (flags == PA_FLAG_SHMRELEASE) {
641
642             /* This is a SHM memblock release frame with no payload */
643
644 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
645
646             assert(p->export);
647             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
648
649             goto frame_done;
650
651         } else if (flags == PA_FLAG_SHMREVOKE) {
652
653             /* This is a SHM memblock revoke frame with no payload */
654
655 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
656
657             assert(p->import);
658             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
659
660             goto frame_done;
661         }
662
663         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
664
665         if (length > FRAME_SIZE_MAX_ALLOW) {
666             pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length);
667             return -1;
668         }
669
670         assert(!p->read.packet && !p->read.memblock);
671
672         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
673
674         if (channel == (uint32_t) -1) {
675
676             if (flags != 0) {
677                 pa_log_warn("Received packet frame with invalid flags value.");
678                 return -1;
679             }
680
681             /* Frame is a packet frame */
682             p->read.packet = pa_packet_new(length);
683             p->read.data = p->read.packet->data;
684
685         } else {
686
687             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
688                 pa_log_warn("Received memblock frame with invalid seek mode.");
689                 return -1;
690             }
691
692             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
693
694                 if (length != sizeof(p->read.shm_info)) {
695                     pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
696                     return -1;
697                 }
698
699                 /* Frame is a memblock frame referencing an SHM memblock */
700                 p->read.data = p->read.shm_info;
701
702             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
703
704                 /* Frame is a memblock frame */
705
706                 p->read.memblock = pa_memblock_new(p->mempool, length);
707                 p->read.data = p->read.memblock->data;
708             } else {
709
710                 pa_log_warn("Recieved memblock frame with invalid flags value.");
711                 return -1;
712             }
713         }
714
715     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
716         /* Frame payload available */
717
718         if (p->read.memblock && p->recieve_memblock_callback) {
719
720             /* Is this memblock data? Than pass it to the user */
721             l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
722
723             if (l > 0) {
724                 pa_memchunk chunk;
725
726                 chunk.memblock = p->read.memblock;
727                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
728                 chunk.length = l;
729
730                 if (p->recieve_memblock_callback) {
731                     int64_t offset;
732
733                     offset = (int64_t) (
734                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
735                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
736
737                     p->recieve_memblock_callback(
738                         p,
739                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
740                         offset,
741                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
742                         &chunk,
743                         p->recieve_memblock_callback_userdata);
744                 }
745
746                 /* Drop seek info for following callbacks */
747                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
748                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
749                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
750             }
751         }
752
753         /* Frame complete */
754         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
755
756             if (p->read.memblock) {
757
758                 /* This was a memblock frame. We can unref the memblock now */
759                 pa_memblock_unref(p->read.memblock);
760
761             } else if (p->read.packet) {
762
763                 if (p->recieve_packet_callback)
764 #ifdef HAVE_CREDS
765                     p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
766 #else
767                     p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
768 #endif
769
770                 pa_packet_unref(p->read.packet);
771             } else {
772                 pa_memblock *b;
773
774                 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
775
776                 assert(p->import);
777
778                 if (!(b = pa_memimport_get(p->import,
779                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
780                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
781                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
782                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
783
784                     pa_log_warn("Failed to import memory block.");
785                     return -1;
786                 }
787
788                 if (p->recieve_memblock_callback) {
789                     int64_t offset;
790                     pa_memchunk chunk;
791
792                     chunk.memblock = b;
793                     chunk.index = 0;
794                     chunk.length = b->length;
795
796                     offset = (int64_t) (
797                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
798                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
799
800                     p->recieve_memblock_callback(
801                             p,
802                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
803                             offset,
804                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
805                             &chunk,
806                             p->recieve_memblock_callback_userdata);
807                 }
808
809                 pa_memblock_unref(b);
810             }
811
812             goto frame_done;
813         }
814     }
815
816     return 0;
817
818 frame_done:
819     p->read.memblock = NULL;
820     p->read.packet = NULL;
821     p->read.index = 0;
822
823 #ifdef HAVE_CREDS
824     p->read_creds_valid = 0;
825 #endif
826
827     return 0;
828 }
829
830 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
831     assert(p);
832     assert(PA_REFCNT_VALUE(p) > 0);
833
834     pa_mutex_lock(p->mutex);
835     p->die_callback = cb;
836     p->die_callback_userdata = userdata;
837     pa_mutex_unlock(p->mutex);
838 }
839
840 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
841     assert(p);
842     assert(PA_REFCNT_VALUE(p) > 0);
843
844     pa_mutex_lock(p->mutex);
845     p->drain_callback = cb;
846     p->drain_callback_userdata = userdata;
847     pa_mutex_unlock(p->mutex);
848 }
849
850 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
851     assert(p);
852     assert(PA_REFCNT_VALUE(p) > 0);
853
854     pa_mutex_lock(p->mutex);
855     p->recieve_packet_callback = cb;
856     p->recieve_packet_callback_userdata = userdata;
857     pa_mutex_unlock(p->mutex);
858 }
859
860 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
861     assert(p);
862     assert(PA_REFCNT_VALUE(p) > 0);
863
864     pa_mutex_lock(p->mutex);
865     p->recieve_memblock_callback = cb;
866     p->recieve_memblock_callback_userdata = userdata;
867     pa_mutex_unlock(p->mutex);
868 }
869
870 int pa_pstream_is_pending(pa_pstream *p) {
871     int b;
872
873     assert(p);
874     assert(PA_REFCNT_VALUE(p) > 0);
875
876     pa_mutex_lock(p->mutex);
877
878     if (p->dead)
879         b = 0;
880     else
881         b = p->write.current || !pa_queue_is_empty(p->send_queue);
882
883     pa_mutex_unlock(p->mutex);
884
885     return b;
886 }
887
888 void pa_pstream_unref(pa_pstream*p) {
889     assert(p);
890     assert(PA_REFCNT_VALUE(p) > 0);
891
892     if (PA_REFCNT_DEC(p) <= 0)
893         pstream_free(p);
894 }
895
896 pa_pstream* pa_pstream_ref(pa_pstream*p) {
897     assert(p);
898     assert(PA_REFCNT_VALUE(p) > 0);
899
900     PA_REFCNT_INC(p);
901     return p;
902 }
903
904 void pa_pstream_close(pa_pstream *p) {
905     assert(p);
906
907     pa_mutex_lock(p->mutex);
908
909     p->dead = 1;
910
911     if (p->import) {
912         pa_memimport_free(p->import);
913         p->import = NULL;
914     }
915
916     if (p->export) {
917         pa_memexport_free(p->export);
918         p->export = NULL;
919     }
920
921     if (p->io) {
922         pa_iochannel_free(p->io);
923         p->io = NULL;
924     }
925
926     if (p->defer_event) {
927         p->mainloop->defer_free(p->defer_event);
928         p->defer_event = NULL;
929     }
930
931     p->die_callback = NULL;
932     p->drain_callback = NULL;
933     p->recieve_packet_callback = NULL;
934     p->recieve_memblock_callback = NULL;
935
936     pa_mutex_unlock(p->mutex);
937 }
938
939 void pa_pstream_use_shm(pa_pstream *p, int enable) {
940     assert(p);
941     assert(PA_REFCNT_VALUE(p) > 0);
942
943     pa_mutex_lock(p->mutex);
944
945     p->use_shm = enable;
946
947     if (enable) {
948
949         if (!p->export)
950             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
951
952     } else {
953
954         if (p->export) {
955             pa_memexport_free(p->export);
956             p->export = NULL;
957         }
958     }
959
960     pa_mutex_unlock(p->mutex);
961 }