pstream: Fixup hangs caused by recent iochannel patch
[platform/upstream/pulseaudio.git] / src / pulsecore / pstream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
33 #endif
34
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
44
45 #include "pstream.h"
46
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA    0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE  0xC0000000LU
51 #define PA_FLAG_SHMMASK    0xFF000000LU
52 #define PA_FLAG_SEEKMASK   0x000000FFLU
53
54 /* The sequence descriptor header consists of 5 32bit integers: */
55 enum {
56     PA_PSTREAM_DESCRIPTOR_LENGTH,
57     PA_PSTREAM_DESCRIPTOR_CHANNEL,
58     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
59     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
60     PA_PSTREAM_DESCRIPTOR_FLAGS,
61     PA_PSTREAM_DESCRIPTOR_MAX
62 };
63
64 /* If we have an SHM block, this info follows the descriptor */
65 enum {
66     PA_PSTREAM_SHM_BLOCKID,
67     PA_PSTREAM_SHM_SHMID,
68     PA_PSTREAM_SHM_INDEX,
69     PA_PSTREAM_SHM_LENGTH,
70     PA_PSTREAM_SHM_MAX
71 };
72
73 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
74
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
76
77 #define MINIBUF_SIZE (256)
78
79 /* To allow uploading a single sample in one frame, this value should be the
80  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
81  */
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
83
84 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
85
86 struct item_info {
87     enum {
88         PA_PSTREAM_ITEM_PACKET,
89         PA_PSTREAM_ITEM_MEMBLOCK,
90         PA_PSTREAM_ITEM_SHMRELEASE,
91         PA_PSTREAM_ITEM_SHMREVOKE
92     } type;
93
94     /* packet info */
95     pa_packet *packet;
96 #ifdef HAVE_CREDS
97     pa_bool_t with_creds;
98     pa_creds creds;
99 #endif
100
101     /* memblock info */
102     pa_memchunk chunk;
103     uint32_t channel;
104     int64_t offset;
105     pa_seek_mode_t seek_mode;
106
107     /* release/revoke info */
108     uint32_t block_id;
109 };
110
111 struct pa_pstream {
112     PA_REFCNT_DECLARE;
113
114     pa_mainloop_api *mainloop;
115     pa_defer_event *defer_event;
116     pa_iochannel *io;
117
118     pa_queue *send_queue;
119
120     pa_bool_t dead;
121
122     struct {
123         union {
124             uint8_t minibuf[MINIBUF_SIZE];
125             pa_pstream_descriptor descriptor;
126         };
127         struct item_info* current;
128         void *data;
129         size_t index;
130         int minibuf_validsize;
131         pa_memchunk memchunk;
132     } write;
133
134     struct {
135         pa_pstream_descriptor descriptor;
136         pa_memblock *memblock;
137         pa_packet *packet;
138         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139         void *data;
140         size_t index;
141     } read;
142
143     pa_bool_t use_shm;
144     pa_memimport *import;
145     pa_memexport *export;
146
147     pa_pstream_packet_cb_t receive_packet_callback;
148     void *receive_packet_callback_userdata;
149
150     pa_pstream_memblock_cb_t receive_memblock_callback;
151     void *receive_memblock_callback_userdata;
152
153     pa_pstream_notify_cb_t drain_callback;
154     void *drain_callback_userdata;
155
156     pa_pstream_notify_cb_t die_callback;
157     void *die_callback_userdata;
158
159     pa_pstream_block_id_cb_t revoke_callback;
160     void *revoke_callback_userdata;
161
162     pa_pstream_block_id_cb_t release_callback;
163     void *release_callback_userdata;
164
165     pa_mempool *mempool;
166
167 #ifdef HAVE_CREDS
168     pa_creds read_creds, write_creds;
169     pa_bool_t read_creds_valid, send_creds_now;
170 #endif
171 };
172
173 static int do_write(pa_pstream *p);
174 static int do_read(pa_pstream *p);
175
176 static void do_pstream_read_write(pa_pstream *p) {
177     pa_assert(p);
178     pa_assert(PA_REFCNT_VALUE(p) > 0);
179
180     pa_pstream_ref(p);
181
182     p->mainloop->defer_enable(p->defer_event, 0);
183
184     if (!p->dead && pa_iochannel_is_readable(p->io)) {
185         if (do_read(p) < 0)
186             goto fail;
187     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
188         goto fail;
189
190     while (!p->dead && pa_iochannel_is_writable(p->io)) {
191         int r = do_write(p);
192         if (r < 0)
193             goto fail;
194         if (r == 0)
195             break;
196     }
197
198     pa_pstream_unref(p);
199     return;
200
201 fail:
202
203     if (p->die_callback)
204         p->die_callback(p, p->die_callback_userdata);
205
206     pa_pstream_unlink(p);
207     pa_pstream_unref(p);
208 }
209
210 static void io_callback(pa_iochannel*io, void *userdata) {
211     pa_pstream *p = userdata;
212
213     pa_assert(p);
214     pa_assert(PA_REFCNT_VALUE(p) > 0);
215     pa_assert(p->io == io);
216
217     do_pstream_read_write(p);
218 }
219
220 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
221     pa_pstream *p = userdata;
222
223     pa_assert(p);
224     pa_assert(PA_REFCNT_VALUE(p) > 0);
225     pa_assert(p->defer_event == e);
226     pa_assert(p->mainloop == m);
227
228     do_pstream_read_write(p);
229 }
230
231 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
232
233 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
234     pa_pstream *p;
235
236     pa_assert(m);
237     pa_assert(io);
238     pa_assert(pool);
239
240     p = pa_xnew(pa_pstream, 1);
241     PA_REFCNT_INIT(p);
242     p->io = io;
243     pa_iochannel_set_callback(io, io_callback, p);
244     p->dead = FALSE;
245
246     p->mainloop = m;
247     p->defer_event = m->defer_new(m, defer_callback, p);
248     m->defer_enable(p->defer_event, 0);
249
250     p->send_queue = pa_queue_new();
251
252     p->write.current = NULL;
253     p->write.index = 0;
254     pa_memchunk_reset(&p->write.memchunk);
255     p->read.memblock = NULL;
256     p->read.packet = NULL;
257     p->read.index = 0;
258
259     p->receive_packet_callback = NULL;
260     p->receive_packet_callback_userdata = NULL;
261     p->receive_memblock_callback = NULL;
262     p->receive_memblock_callback_userdata = NULL;
263     p->drain_callback = NULL;
264     p->drain_callback_userdata = NULL;
265     p->die_callback = NULL;
266     p->die_callback_userdata = NULL;
267     p->revoke_callback = NULL;
268     p->revoke_callback_userdata = NULL;
269     p->release_callback = NULL;
270     p->release_callback_userdata = NULL;
271
272     p->mempool = pool;
273
274     p->use_shm = FALSE;
275     p->export = NULL;
276
277     /* We do importing unconditionally */
278     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
279
280     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
281     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
282
283 #ifdef HAVE_CREDS
284     p->send_creds_now = FALSE;
285     p->read_creds_valid = FALSE;
286 #endif
287     return p;
288 }
289
290 static void item_free(void *item) {
291     struct item_info *i = item;
292     pa_assert(i);
293
294     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
295         pa_assert(i->chunk.memblock);
296         pa_memblock_unref(i->chunk.memblock);
297     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
298         pa_assert(i->packet);
299         pa_packet_unref(i->packet);
300     }
301
302     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
303         pa_xfree(i);
304 }
305
306 static void pstream_free(pa_pstream *p) {
307     pa_assert(p);
308
309     pa_pstream_unlink(p);
310
311     pa_queue_free(p->send_queue, item_free);
312
313     if (p->write.current)
314         item_free(p->write.current);
315
316     if (p->write.memchunk.memblock)
317         pa_memblock_unref(p->write.memchunk.memblock);
318
319     if (p->read.memblock)
320         pa_memblock_unref(p->read.memblock);
321
322     if (p->read.packet)
323         pa_packet_unref(p->read.packet);
324
325     pa_xfree(p);
326 }
327
328 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
329     struct item_info *i;
330
331     pa_assert(p);
332     pa_assert(PA_REFCNT_VALUE(p) > 0);
333     pa_assert(packet);
334
335     if (p->dead)
336         return;
337
338     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
339         i = pa_xnew(struct item_info, 1);
340
341     i->type = PA_PSTREAM_ITEM_PACKET;
342     i->packet = pa_packet_ref(packet);
343
344 #ifdef HAVE_CREDS
345     if ((i->with_creds = !!creds))
346         i->creds = *creds;
347 #endif
348
349     pa_queue_push(p->send_queue, i);
350
351     p->mainloop->defer_enable(p->defer_event, 1);
352 }
353
354 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
355     size_t length, idx;
356     size_t bsm;
357
358     pa_assert(p);
359     pa_assert(PA_REFCNT_VALUE(p) > 0);
360     pa_assert(channel != (uint32_t) -1);
361     pa_assert(chunk);
362
363     if (p->dead)
364         return;
365
366     idx = 0;
367     length = chunk->length;
368
369     bsm = pa_mempool_block_size_max(p->mempool);
370
371     while (length > 0) {
372         struct item_info *i;
373         size_t n;
374
375         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
376             i = pa_xnew(struct item_info, 1);
377         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
378
379         n = PA_MIN(length, bsm);
380         i->chunk.index = chunk->index + idx;
381         i->chunk.length = n;
382         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
383
384         i->channel = channel;
385         i->offset = offset;
386         i->seek_mode = seek_mode;
387 #ifdef HAVE_CREDS
388         i->with_creds = FALSE;
389 #endif
390
391         pa_queue_push(p->send_queue, i);
392
393         idx += n;
394         length -= n;
395     }
396
397     p->mainloop->defer_enable(p->defer_event, 1);
398 }
399
400 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
401     struct item_info *item;
402     pa_assert(p);
403     pa_assert(PA_REFCNT_VALUE(p) > 0);
404
405     if (p->dead)
406         return;
407
408 /*     pa_log("Releasing block %u", block_id); */
409
410     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
411         item = pa_xnew(struct item_info, 1);
412     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
413     item->block_id = block_id;
414 #ifdef HAVE_CREDS
415     item->with_creds = FALSE;
416 #endif
417
418     pa_queue_push(p->send_queue, item);
419     p->mainloop->defer_enable(p->defer_event, 1);
420 }
421
422 /* might be called from thread context */
423 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
424     pa_pstream *p = userdata;
425
426     pa_assert(p);
427     pa_assert(PA_REFCNT_VALUE(p) > 0);
428
429     if (p->dead)
430         return;
431
432     if (p->release_callback)
433         p->release_callback(p, block_id, p->release_callback_userdata);
434     else
435         pa_pstream_send_release(p, block_id);
436 }
437
438 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
439     struct item_info *item;
440     pa_assert(p);
441     pa_assert(PA_REFCNT_VALUE(p) > 0);
442
443     if (p->dead)
444         return;
445 /*     pa_log("Revoking block %u", block_id); */
446
447     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
448         item = pa_xnew(struct item_info, 1);
449     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
450     item->block_id = block_id;
451 #ifdef HAVE_CREDS
452     item->with_creds = FALSE;
453 #endif
454
455     pa_queue_push(p->send_queue, item);
456     p->mainloop->defer_enable(p->defer_event, 1);
457 }
458
459 /* might be called from thread context */
460 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
461     pa_pstream *p = userdata;
462
463     pa_assert(p);
464     pa_assert(PA_REFCNT_VALUE(p) > 0);
465
466     if (p->revoke_callback)
467         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
468     else
469         pa_pstream_send_revoke(p, block_id);
470 }
471
472 static void prepare_next_write_item(pa_pstream *p) {
473     pa_assert(p);
474     pa_assert(PA_REFCNT_VALUE(p) > 0);
475
476     p->write.current = pa_queue_pop(p->send_queue);
477
478     if (!p->write.current)
479         return;
480     p->write.index = 0;
481     p->write.data = NULL;
482     p->write.minibuf_validsize = 0;
483     pa_memchunk_reset(&p->write.memchunk);
484
485     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
486     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
487     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
488     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
489     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
490
491     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
492
493         pa_assert(p->write.current->packet);
494         p->write.data = p->write.current->packet->data;
495         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
496
497         if (p->write.current->packet->length <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
498             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, p->write.current->packet->length);
499             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + p->write.current->packet->length;
500         }
501
502     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
503
504         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
505         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
506
507     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
508
509         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
510         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
511
512     } else {
513         uint32_t flags;
514         pa_bool_t send_payload = TRUE;
515
516         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
517         pa_assert(p->write.current->chunk.memblock);
518
519         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
520         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
521         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
522
523         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
524
525         if (p->use_shm) {
526             uint32_t block_id, shm_id;
527             size_t offset, length;
528             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
529             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
530
531             pa_assert(p->export);
532
533             if (pa_memexport_put(p->export,
534                                  p->write.current->chunk.memblock,
535                                  &block_id,
536                                  &shm_id,
537                                  &offset,
538                                  &length) >= 0) {
539
540                 flags |= PA_FLAG_SHMDATA;
541                 send_payload = FALSE;
542
543                 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
544                 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
545                 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
546                 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
547
548                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
549                 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
550             }
551 /*             else */
552 /*                 pa_log_warn("Failed to export memory block."); */
553         }
554
555         if (send_payload) {
556             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
557             p->write.memchunk = p->write.current->chunk;
558             pa_memblock_ref(p->write.memchunk.memblock);
559             p->write.data = NULL;
560         }
561
562         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
563     }
564
565 #ifdef HAVE_CREDS
566     if ((p->send_creds_now = p->write.current->with_creds))
567         p->write_creds = p->write.current->creds;
568 #endif
569 }
570
571 static int do_write(pa_pstream *p) {
572     void *d;
573     size_t l;
574     ssize_t r;
575     pa_memblock *release_memblock = NULL;
576
577     pa_assert(p);
578     pa_assert(PA_REFCNT_VALUE(p) > 0);
579
580     if (!p->write.current)
581         prepare_next_write_item(p);
582
583     if (!p->write.current)
584         return 0;
585
586     if (p->write.minibuf_validsize > 0) {
587         d = p->write.minibuf + p->write.index;
588         l = p->write.minibuf_validsize - p->write.index;
589     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
590         d = (uint8_t*) p->write.descriptor + p->write.index;
591         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
592     } else {
593         pa_assert(p->write.data || p->write.memchunk.memblock);
594
595         if (p->write.data)
596             d = p->write.data;
597         else {
598             d = pa_memblock_acquire_chunk(&p->write.memchunk);
599             release_memblock = p->write.memchunk.memblock;
600         }
601
602         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
603         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
604     }
605
606     pa_assert(l > 0);
607
608 #ifdef HAVE_CREDS
609     if (p->send_creds_now) {
610
611         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
612             goto fail;
613
614         p->send_creds_now = FALSE;
615     } else
616 #endif
617
618     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
619         goto fail;
620
621     if (release_memblock)
622         pa_memblock_release(release_memblock);
623
624     p->write.index += (size_t) r;
625
626     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
627         pa_assert(p->write.current);
628         item_free(p->write.current);
629         p->write.current = NULL;
630
631         if (p->write.memchunk.memblock)
632             pa_memblock_unref(p->write.memchunk.memblock);
633
634         pa_memchunk_reset(&p->write.memchunk);
635
636         if (p->drain_callback && !pa_pstream_is_pending(p))
637             p->drain_callback(p, p->drain_callback_userdata);
638     }
639
640     return (size_t) r == l ? 1 : 0;
641
642 fail:
643
644     if (release_memblock)
645         pa_memblock_release(release_memblock);
646
647     return -1;
648 }
649
650 static int do_read(pa_pstream *p) {
651     void *d;
652     size_t l;
653     ssize_t r;
654     pa_memblock *release_memblock = NULL;
655     pa_assert(p);
656     pa_assert(PA_REFCNT_VALUE(p) > 0);
657
658     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
659         d = (uint8_t*) p->read.descriptor + p->read.index;
660         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
661     } else {
662         pa_assert(p->read.data || p->read.memblock);
663
664         if (p->read.data)
665             d = p->read.data;
666         else {
667             d = pa_memblock_acquire(p->read.memblock);
668             release_memblock = p->read.memblock;
669         }
670
671         d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
672         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
673     }
674
675 #ifdef HAVE_CREDS
676     {
677         pa_bool_t b = 0;
678
679         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
680             goto fail;
681
682         p->read_creds_valid = p->read_creds_valid || b;
683     }
684 #else
685     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
686         goto fail;
687 #endif
688
689     if (release_memblock)
690         pa_memblock_release(release_memblock);
691
692     p->read.index += (size_t) r;
693
694     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
695         uint32_t flags, length, channel;
696         /* Reading of frame descriptor complete */
697
698         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
699
700         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
701             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
702             return -1;
703         }
704
705         if (flags == PA_FLAG_SHMRELEASE) {
706
707             /* This is a SHM memblock release frame with no payload */
708
709 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
710
711             pa_assert(p->export);
712             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
713
714             goto frame_done;
715
716         } else if (flags == PA_FLAG_SHMREVOKE) {
717
718             /* This is a SHM memblock revoke frame with no payload */
719
720 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
721
722             pa_assert(p->import);
723             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
724
725             goto frame_done;
726         }
727
728         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
729
730         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
731             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
732             return -1;
733         }
734
735         pa_assert(!p->read.packet && !p->read.memblock);
736
737         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
738
739         if (channel == (uint32_t) -1) {
740
741             if (flags != 0) {
742                 pa_log_warn("Received packet frame with invalid flags value.");
743                 return -1;
744             }
745
746             /* Frame is a packet frame */
747             p->read.packet = pa_packet_new(length);
748             p->read.data = p->read.packet->data;
749
750         } else {
751
752             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
753                 pa_log_warn("Received memblock frame with invalid seek mode.");
754                 return -1;
755             }
756
757             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
758
759                 if (length != sizeof(p->read.shm_info)) {
760                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
761                     return -1;
762                 }
763
764                 /* Frame is a memblock frame referencing an SHM memblock */
765                 p->read.data = p->read.shm_info;
766
767             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
768
769                 /* Frame is a memblock frame */
770
771                 p->read.memblock = pa_memblock_new(p->mempool, length);
772                 p->read.data = NULL;
773             } else {
774
775                 pa_log_warn("Received memblock frame with invalid flags value.");
776                 return -1;
777             }
778         }
779
780     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
781         /* Frame payload available */
782
783         if (p->read.memblock && p->receive_memblock_callback) {
784
785             /* Is this memblock data? Than pass it to the user */
786             l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
787
788             if (l > 0) {
789                 pa_memchunk chunk;
790
791                 chunk.memblock = p->read.memblock;
792                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
793                 chunk.length = l;
794
795                 if (p->receive_memblock_callback) {
796                     int64_t offset;
797
798                     offset = (int64_t) (
799                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
800                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
801
802                     p->receive_memblock_callback(
803                         p,
804                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
805                         offset,
806                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
807                         &chunk,
808                         p->receive_memblock_callback_userdata);
809                 }
810
811                 /* Drop seek info for following callbacks */
812                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
813                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
814                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
815             }
816         }
817
818         /* Frame complete */
819         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
820
821             if (p->read.memblock) {
822
823                 /* This was a memblock frame. We can unref the memblock now */
824                 pa_memblock_unref(p->read.memblock);
825
826             } else if (p->read.packet) {
827
828                 if (p->receive_packet_callback)
829 #ifdef HAVE_CREDS
830                     p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
831 #else
832                     p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
833 #endif
834
835                 pa_packet_unref(p->read.packet);
836             } else {
837                 pa_memblock *b;
838
839                 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
840
841                 pa_assert(p->import);
842
843                 if (!(b = pa_memimport_get(p->import,
844                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
845                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
846                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
847                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
848
849                     if (pa_log_ratelimit(PA_LOG_DEBUG))
850                         pa_log_debug("Failed to import memory block.");
851                 }
852
853                 if (p->receive_memblock_callback) {
854                     int64_t offset;
855                     pa_memchunk chunk;
856
857                     chunk.memblock = b;
858                     chunk.index = 0;
859                     chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
860
861                     offset = (int64_t) (
862                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
863                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
864
865                     p->receive_memblock_callback(
866                             p,
867                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
868                             offset,
869                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
870                             &chunk,
871                             p->receive_memblock_callback_userdata);
872                 }
873
874                 if (b)
875                     pa_memblock_unref(b);
876             }
877
878             goto frame_done;
879         }
880     }
881
882     return 0;
883
884 frame_done:
885     p->read.memblock = NULL;
886     p->read.packet = NULL;
887     p->read.index = 0;
888     p->read.data = NULL;
889
890 #ifdef HAVE_CREDS
891     p->read_creds_valid = FALSE;
892 #endif
893
894     return 0;
895
896 fail:
897     if (release_memblock)
898         pa_memblock_release(release_memblock);
899
900     return -1;
901 }
902
903 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
904     pa_assert(p);
905     pa_assert(PA_REFCNT_VALUE(p) > 0);
906
907     p->die_callback = cb;
908     p->die_callback_userdata = userdata;
909 }
910
911 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
912     pa_assert(p);
913     pa_assert(PA_REFCNT_VALUE(p) > 0);
914
915     p->drain_callback = cb;
916     p->drain_callback_userdata = userdata;
917 }
918
919 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
920     pa_assert(p);
921     pa_assert(PA_REFCNT_VALUE(p) > 0);
922
923     p->receive_packet_callback = cb;
924     p->receive_packet_callback_userdata = userdata;
925 }
926
927 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
928     pa_assert(p);
929     pa_assert(PA_REFCNT_VALUE(p) > 0);
930
931     p->receive_memblock_callback = cb;
932     p->receive_memblock_callback_userdata = userdata;
933 }
934
935 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
936     pa_assert(p);
937     pa_assert(PA_REFCNT_VALUE(p) > 0);
938
939     p->release_callback = cb;
940     p->release_callback_userdata = userdata;
941 }
942
943 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
944     pa_assert(p);
945     pa_assert(PA_REFCNT_VALUE(p) > 0);
946
947     p->release_callback = cb;
948     p->release_callback_userdata = userdata;
949 }
950
951 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
952     pa_bool_t b;
953
954     pa_assert(p);
955     pa_assert(PA_REFCNT_VALUE(p) > 0);
956
957     if (p->dead)
958         b = FALSE;
959     else
960         b = p->write.current || !pa_queue_isempty(p->send_queue);
961
962     return b;
963 }
964
965 void pa_pstream_unref(pa_pstream*p) {
966     pa_assert(p);
967     pa_assert(PA_REFCNT_VALUE(p) > 0);
968
969     if (PA_REFCNT_DEC(p) <= 0)
970         pstream_free(p);
971 }
972
973 pa_pstream* pa_pstream_ref(pa_pstream*p) {
974     pa_assert(p);
975     pa_assert(PA_REFCNT_VALUE(p) > 0);
976
977     PA_REFCNT_INC(p);
978     return p;
979 }
980
981 void pa_pstream_unlink(pa_pstream *p) {
982     pa_assert(p);
983
984     if (p->dead)
985         return;
986
987     p->dead = TRUE;
988
989     if (p->import) {
990         pa_memimport_free(p->import);
991         p->import = NULL;
992     }
993
994     if (p->export) {
995         pa_memexport_free(p->export);
996         p->export = NULL;
997     }
998
999     if (p->io) {
1000         pa_iochannel_free(p->io);
1001         p->io = NULL;
1002     }
1003
1004     if (p->defer_event) {
1005         p->mainloop->defer_free(p->defer_event);
1006         p->defer_event = NULL;
1007     }
1008
1009     p->die_callback = NULL;
1010     p->drain_callback = NULL;
1011     p->receive_packet_callback = NULL;
1012     p->receive_memblock_callback = NULL;
1013 }
1014
1015 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1016     pa_assert(p);
1017     pa_assert(PA_REFCNT_VALUE(p) > 0);
1018
1019     p->use_shm = enable;
1020
1021     if (enable) {
1022
1023         if (!p->export)
1024             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1025
1026     } else {
1027
1028         if (p->export) {
1029             pa_memexport_free(p->export);
1030             p->export = NULL;
1031         }
1032     }
1033 }
1034
1035 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1036     pa_assert(p);
1037     pa_assert(PA_REFCNT_VALUE(p) > 0);
1038
1039     return p->use_shm;
1040 }