merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2004-2006 Lennart Poettering
7   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as
11   published by the Free Software Foundation; either version 2.1 of the
12   License, or (at your option) any later version.
13
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   Lesser General Public License for more details.
18
19   You should have received a copy of the GNU Lesser General Public
20   License along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_NETINET_IN_H
40 #include <netinet/in.h>
41 #endif
42
43
44 #include <pulse/xmalloc.h>
45
46 #include <pulsecore/winsock.h>
47 #include <pulsecore/queue.h>
48 #include <pulsecore/log.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/creds.h>
51 #include <pulsecore/refcnt.h>
52 #include <pulsecore/flist.h>
53 #include <pulsecore/macro.h>
54
55 #include "pstream.h"
56
57 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
58 #define PA_FLAG_SHMDATA    0x80000000LU
59 #define PA_FLAG_SHMRELEASE 0x40000000LU
60 #define PA_FLAG_SHMREVOKE  0xC0000000LU
61 #define PA_FLAG_SHMMASK    0xFF000000LU
62 #define PA_FLAG_SEEKMASK   0x000000FFLU
63
64 /* The sequence descriptor header consists of 5 32bit integers: */
65 enum {
66     PA_PSTREAM_DESCRIPTOR_LENGTH,
67     PA_PSTREAM_DESCRIPTOR_CHANNEL,
68     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
69     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
70     PA_PSTREAM_DESCRIPTOR_FLAGS,
71     PA_PSTREAM_DESCRIPTOR_MAX
72 };
73
74 /* If we have an SHM block, this info follows the descriptor */
75 enum {
76     PA_PSTREAM_SHM_BLOCKID,
77     PA_PSTREAM_SHM_SHMID,
78     PA_PSTREAM_SHM_INDEX,
79     PA_PSTREAM_SHM_LENGTH,
80     PA_PSTREAM_SHM_MAX
81 };
82
83 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
84
85 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
86 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
87
88 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89
90 struct item_info {
91     enum {
92         PA_PSTREAM_ITEM_PACKET,
93         PA_PSTREAM_ITEM_MEMBLOCK,
94         PA_PSTREAM_ITEM_SHMRELEASE,
95         PA_PSTREAM_ITEM_SHMREVOKE
96     } type;
97
98     /* packet info */
99     pa_packet *packet;
100 #ifdef HAVE_CREDS
101     pa_bool_t with_creds;
102     pa_creds creds;
103 #endif
104
105     /* memblock info */
106     pa_memchunk chunk;
107     uint32_t channel;
108     int64_t offset;
109     pa_seek_mode_t seek_mode;
110
111     /* release/revoke info */
112     uint32_t block_id;
113 };
114
115 struct pa_pstream {
116     PA_REFCNT_DECLARE;
117
118     pa_mainloop_api *mainloop;
119     pa_defer_event *defer_event;
120     pa_iochannel *io;
121
122     pa_queue *send_queue;
123
124     pa_bool_t dead;
125
126     struct {
127         pa_pstream_descriptor descriptor;
128         struct item_info* current;
129         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
130         void *data;
131         size_t index;
132         pa_memchunk memchunk;
133     } write;
134
135     struct {
136         pa_pstream_descriptor descriptor;
137         pa_memblock *memblock;
138         pa_packet *packet;
139         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
140         void *data;
141         size_t index;
142     } read;
143
144     pa_bool_t use_shm;
145     pa_memimport *import;
146     pa_memexport *export;
147
148     pa_pstream_packet_cb_t recieve_packet_callback;
149     void *recieve_packet_callback_userdata;
150
151     pa_pstream_memblock_cb_t recieve_memblock_callback;
152     void *recieve_memblock_callback_userdata;
153
154     pa_pstream_notify_cb_t drain_callback;
155     void *drain_callback_userdata;
156
157     pa_pstream_notify_cb_t die_callback;
158     void *die_callback_userdata;
159
160     pa_pstream_block_id_cb_t revoke_callback;
161     void *revoke_callback_userdata;
162
163     pa_pstream_block_id_cb_t release_callback;
164     void *release_callback_userdata;
165
166     pa_mempool *mempool;
167
168 #ifdef HAVE_CREDS
169     pa_creds read_creds, write_creds;
170     pa_bool_t read_creds_valid, send_creds_now;
171 #endif
172 };
173
174 static int do_write(pa_pstream *p);
175 static int do_read(pa_pstream *p);
176
177 static void do_something(pa_pstream *p) {
178     pa_assert(p);
179     pa_assert(PA_REFCNT_VALUE(p) > 0);
180
181     pa_pstream_ref(p);
182
183     p->mainloop->defer_enable(p->defer_event, 0);
184
185     if (!p->dead && pa_iochannel_is_readable(p->io)) {
186         if (do_read(p) < 0)
187             goto fail;
188     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
189         goto fail;
190
191     if (!p->dead && pa_iochannel_is_writable(p->io)) {
192         if (do_write(p) < 0)
193             goto fail;
194     }
195
196     pa_pstream_unref(p);
197     return;
198
199 fail:
200
201     if (p->die_callback)
202         p->die_callback(p, p->die_callback_userdata);
203
204     pa_pstream_unlink(p);
205     pa_pstream_unref(p);
206 }
207
208 static void io_callback(pa_iochannel*io, void *userdata) {
209     pa_pstream *p = userdata;
210
211     pa_assert(p);
212     pa_assert(PA_REFCNT_VALUE(p) > 0);
213     pa_assert(p->io == io);
214
215     do_something(p);
216 }
217
218 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
219     pa_pstream *p = userdata;
220
221     pa_assert(p);
222     pa_assert(PA_REFCNT_VALUE(p) > 0);
223     pa_assert(p->defer_event == e);
224     pa_assert(p->mainloop == m);
225
226     do_something(p);
227 }
228
229 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
230
231 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
232     pa_pstream *p;
233
234     pa_assert(m);
235     pa_assert(io);
236     pa_assert(pool);
237
238     p = pa_xnew(pa_pstream, 1);
239     PA_REFCNT_INIT(p);
240     p->io = io;
241     pa_iochannel_set_callback(io, io_callback, p);
242     p->dead = FALSE;
243
244     p->mainloop = m;
245     p->defer_event = m->defer_new(m, defer_callback, p);
246     m->defer_enable(p->defer_event, 0);
247
248     p->send_queue = pa_queue_new();
249
250     p->write.current = NULL;
251     p->write.index = 0;
252     pa_memchunk_reset(&p->write.memchunk);
253     p->read.memblock = NULL;
254     p->read.packet = NULL;
255     p->read.index = 0;
256
257     p->recieve_packet_callback = NULL;
258     p->recieve_packet_callback_userdata = NULL;
259     p->recieve_memblock_callback = NULL;
260     p->recieve_memblock_callback_userdata = NULL;
261     p->drain_callback = NULL;
262     p->drain_callback_userdata = NULL;
263     p->die_callback = NULL;
264     p->die_callback_userdata = NULL;
265     p->revoke_callback = NULL;
266     p->revoke_callback_userdata = NULL;
267     p->release_callback = NULL;
268     p->release_callback_userdata = NULL;
269
270     p->mempool = pool;
271
272     p->use_shm = FALSE;
273     p->export = NULL;
274
275     /* We do importing unconditionally */
276     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
277
278     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
279     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
280
281 #ifdef HAVE_CREDS
282     p->send_creds_now = FALSE;
283     p->read_creds_valid = FALSE;
284 #endif
285     return p;
286 }
287
288 static void item_free(void *item, PA_GCC_UNUSED void *q) {
289     struct item_info *i = item;
290     pa_assert(i);
291
292     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
293         pa_assert(i->chunk.memblock);
294         pa_memblock_unref(i->chunk.memblock);
295     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
296         pa_assert(i->packet);
297         pa_packet_unref(i->packet);
298     }
299
300     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
301         pa_xfree(i);
302 }
303
304 static void pstream_free(pa_pstream *p) {
305     pa_assert(p);
306
307     pa_pstream_unlink(p);
308
309     pa_queue_free(p->send_queue, item_free, NULL);
310
311     if (p->write.current)
312         item_free(p->write.current, NULL);
313
314     if (p->write.memchunk.memblock)
315         pa_memblock_unref(p->write.memchunk.memblock);
316
317     if (p->read.memblock)
318         pa_memblock_unref(p->read.memblock);
319
320     if (p->read.packet)
321         pa_packet_unref(p->read.packet);
322
323     pa_xfree(p);
324 }
325
326 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
327     struct item_info *i;
328
329     pa_assert(p);
330     pa_assert(PA_REFCNT_VALUE(p) > 0);
331     pa_assert(packet);
332
333     if (p->dead)
334         return;
335
336     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
337         i = pa_xnew(struct item_info, 1);
338
339     i->type = PA_PSTREAM_ITEM_PACKET;
340     i->packet = pa_packet_ref(packet);
341
342 #ifdef HAVE_CREDS
343     if ((i->with_creds = !!creds))
344         i->creds = *creds;
345 #endif
346
347     pa_queue_push(p->send_queue, i);
348
349     p->mainloop->defer_enable(p->defer_event, 1);
350 }
351
352 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
353     size_t length, idx;
354     size_t bsm;
355
356     pa_assert(p);
357     pa_assert(PA_REFCNT_VALUE(p) > 0);
358     pa_assert(channel != (uint32_t) -1);
359     pa_assert(chunk);
360
361     if (p->dead)
362         return;
363
364     idx = 0;
365     length = chunk->length;
366
367     bsm = pa_mempool_block_size_max(p->mempool);
368
369     while (length > 0) {
370         struct item_info *i;
371         size_t n;
372
373         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
374             i = pa_xnew(struct item_info, 1);
375         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
376
377         n = PA_MIN(length, bsm);
378         i->chunk.index = chunk->index + idx;
379         i->chunk.length = n;
380         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
381
382         i->channel = channel;
383         i->offset = offset;
384         i->seek_mode = seek_mode;
385 #ifdef HAVE_CREDS
386         i->with_creds = FALSE;
387 #endif
388
389         pa_queue_push(p->send_queue, i);
390
391         idx += n;
392         length -= n;
393     }
394
395     p->mainloop->defer_enable(p->defer_event, 1);
396 }
397
398 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
399     struct item_info *item;
400     pa_assert(p);
401     pa_assert(PA_REFCNT_VALUE(p) > 0);
402
403     if (p->dead)
404         return;
405
406 /*     pa_log("Releasing block %u", block_id); */
407
408     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
409         item = pa_xnew(struct item_info, 1);
410     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
411     item->block_id = block_id;
412 #ifdef HAVE_CREDS
413     item->with_creds = FALSE;
414 #endif
415
416     pa_queue_push(p->send_queue, item);
417     p->mainloop->defer_enable(p->defer_event, 1);
418 }
419
420 /* might be called from thread context */
421 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
422     pa_pstream *p = userdata;
423
424     pa_assert(p);
425     pa_assert(PA_REFCNT_VALUE(p) > 0);
426
427     if (p->dead)
428         return;
429
430     if (p->release_callback)
431         p->release_callback(p, block_id, p->release_callback_userdata);
432     else
433         pa_pstream_send_release(p, block_id);
434 }
435
436 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
437     struct item_info *item;
438     pa_assert(p);
439     pa_assert(PA_REFCNT_VALUE(p) > 0);
440
441     if (p->dead)
442         return;
443 /*     pa_log("Revoking block %u", block_id); */
444
445     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
446         item = pa_xnew(struct item_info, 1);
447     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
448     item->block_id = block_id;
449 #ifdef HAVE_CREDS
450     item->with_creds = FALSE;
451 #endif
452
453     pa_queue_push(p->send_queue, item);
454     p->mainloop->defer_enable(p->defer_event, 1);
455 }
456
457 /* might be called from thread context */
458 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
459     pa_pstream *p = userdata;
460
461     pa_assert(p);
462     pa_assert(PA_REFCNT_VALUE(p) > 0);
463
464     if (p->revoke_callback)
465         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
466     else
467         pa_pstream_send_revoke(p, block_id);
468 }
469
470 static void prepare_next_write_item(pa_pstream *p) {
471     pa_assert(p);
472     pa_assert(PA_REFCNT_VALUE(p) > 0);
473
474     p->write.current = pa_queue_pop(p->send_queue);
475
476     if (!p->write.current)
477         return;
478
479     p->write.index = 0;
480     p->write.data = NULL;
481     pa_memchunk_reset(&p->write.memchunk);
482
483     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
484     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
485     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
486     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
487     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
488
489     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
490
491         pa_assert(p->write.current->packet);
492         p->write.data = p->write.current->packet->data;
493         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
494
495     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
496
497         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
498         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
499
500     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
501
502         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
503         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
504
505     } else {
506         uint32_t flags;
507         pa_bool_t send_payload = TRUE;
508
509         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
510         pa_assert(p->write.current->chunk.memblock);
511
512         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
513         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
514         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
515
516         flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
517
518         if (p->use_shm) {
519             uint32_t block_id, shm_id;
520             size_t offset, length;
521
522             pa_assert(p->export);
523
524             if (pa_memexport_put(p->export,
525                                  p->write.current->chunk.memblock,
526                                  &block_id,
527                                  &shm_id,
528                                  &offset,
529                                  &length) >= 0) {
530
531                 flags |= PA_FLAG_SHMDATA;
532                 send_payload = FALSE;
533
534                 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
535                 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
536                 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
537                 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
538
539                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
540                 p->write.data = p->write.shm_info;
541             }
542 /*             else */
543 /*                 pa_log_warn("Failed to export memory block."); */
544         }
545
546         if (send_payload) {
547             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
548             p->write.memchunk = p->write.current->chunk;
549             pa_memblock_ref(p->write.memchunk.memblock);
550             p->write.data = NULL;
551         }
552
553         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
554     }
555
556 #ifdef HAVE_CREDS
557     if ((p->send_creds_now = p->write.current->with_creds))
558         p->write_creds = p->write.current->creds;
559 #endif
560 }
561
562 static int do_write(pa_pstream *p) {
563     void *d;
564     size_t l;
565     ssize_t r;
566     pa_memblock *release_memblock = NULL;
567
568     pa_assert(p);
569     pa_assert(PA_REFCNT_VALUE(p) > 0);
570
571     if (!p->write.current)
572         prepare_next_write_item(p);
573
574     if (!p->write.current)
575         return 0;
576
577     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
578         d = (uint8_t*) p->write.descriptor + p->write.index;
579         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
580     } else {
581         pa_assert(p->write.data || p->write.memchunk.memblock);
582
583         if (p->write.data)
584             d = p->write.data;
585         else {
586             d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
587             release_memblock = p->write.memchunk.memblock;
588         }
589
590         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
591         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
592     }
593
594     pa_assert(l > 0);
595
596 #ifdef HAVE_CREDS
597     if (p->send_creds_now) {
598
599         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
600             goto fail;
601
602         p->send_creds_now = FALSE;
603     } else
604 #endif
605
606     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
607         goto fail;
608
609     if (release_memblock)
610         pa_memblock_release(release_memblock);
611
612     p->write.index += r;
613
614     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
615         pa_assert(p->write.current);
616         item_free(p->write.current, NULL);
617         p->write.current = NULL;
618
619         if (p->write.memchunk.memblock)
620             pa_memblock_unref(p->write.memchunk.memblock);
621
622         pa_memchunk_reset(&p->write.memchunk);
623
624         if (p->drain_callback && !pa_pstream_is_pending(p))
625             p->drain_callback(p, p->drain_callback_userdata);
626     }
627
628     return 0;
629
630 fail:
631
632     if (release_memblock)
633         pa_memblock_release(release_memblock);
634
635     return -1;
636 }
637
638 static int do_read(pa_pstream *p) {
639     void *d;
640     size_t l;
641     ssize_t r;
642     pa_memblock *release_memblock = NULL;
643     pa_assert(p);
644     pa_assert(PA_REFCNT_VALUE(p) > 0);
645
646     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
647         d = (uint8_t*) p->read.descriptor + p->read.index;
648         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
649     } else {
650         pa_assert(p->read.data || p->read.memblock);
651
652         if (p->read.data)
653             d = p->read.data;
654         else {
655             d = pa_memblock_acquire(p->read.memblock);
656             release_memblock = p->read.memblock;
657         }
658
659         d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
660         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
661     }
662
663 #ifdef HAVE_CREDS
664     {
665         pa_bool_t b = 0;
666
667         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
668             goto fail;
669
670         p->read_creds_valid = p->read_creds_valid || b;
671     }
672 #else
673     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
674         goto fail;
675 #endif
676
677     if (release_memblock)
678         pa_memblock_release(release_memblock);
679
680     p->read.index += r;
681
682     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
683         uint32_t flags, length, channel;
684         /* Reading of frame descriptor complete */
685
686         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
687
688         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
689             pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
690             return -1;
691         }
692
693         if (flags == PA_FLAG_SHMRELEASE) {
694
695             /* This is a SHM memblock release frame with no payload */
696
697 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
698
699             pa_assert(p->export);
700             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
701
702             goto frame_done;
703
704         } else if (flags == PA_FLAG_SHMREVOKE) {
705
706             /* This is a SHM memblock revoke frame with no payload */
707
708 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
709
710             pa_assert(p->import);
711             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
712
713             goto frame_done;
714         }
715
716         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
717
718         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
719             pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
720             return -1;
721         }
722
723         pa_assert(!p->read.packet && !p->read.memblock);
724
725         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
726
727         if (channel == (uint32_t) -1) {
728
729             if (flags != 0) {
730                 pa_log_warn("Received packet frame with invalid flags value.");
731                 return -1;
732             }
733
734             /* Frame is a packet frame */
735             p->read.packet = pa_packet_new(length);
736             p->read.data = p->read.packet->data;
737
738         } else {
739
740             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
741                 pa_log_warn("Received memblock frame with invalid seek mode.");
742                 return -1;
743             }
744
745             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
746
747                 if (length != sizeof(p->read.shm_info)) {
748                     pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
749                     return -1;
750                 }
751
752                 /* Frame is a memblock frame referencing an SHM memblock */
753                 p->read.data = p->read.shm_info;
754
755             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
756
757                 /* Frame is a memblock frame */
758
759                 p->read.memblock = pa_memblock_new(p->mempool, length);
760                 p->read.data = NULL;
761             } else {
762
763                 pa_log_warn("Recieved memblock frame with invalid flags value.");
764                 return -1;
765             }
766         }
767
768     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
769         /* Frame payload available */
770
771         if (p->read.memblock && p->recieve_memblock_callback) {
772
773             /* Is this memblock data? Than pass it to the user */
774             l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
775
776             if (l > 0) {
777                 pa_memchunk chunk;
778
779                 chunk.memblock = p->read.memblock;
780                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
781                 chunk.length = l;
782
783                 if (p->recieve_memblock_callback) {
784                     int64_t offset;
785
786                     offset = (int64_t) (
787                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
788                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
789
790                     p->recieve_memblock_callback(
791                         p,
792                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
793                         offset,
794                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
795                         &chunk,
796                         p->recieve_memblock_callback_userdata);
797                 }
798
799                 /* Drop seek info for following callbacks */
800                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
801                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
802                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
803             }
804         }
805
806         /* Frame complete */
807         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
808
809             if (p->read.memblock) {
810
811                 /* This was a memblock frame. We can unref the memblock now */
812                 pa_memblock_unref(p->read.memblock);
813
814             } else if (p->read.packet) {
815
816                 if (p->recieve_packet_callback)
817 #ifdef HAVE_CREDS
818                     p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
819 #else
820                     p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
821 #endif
822
823                 pa_packet_unref(p->read.packet);
824             } else {
825                 pa_memblock *b;
826
827                 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
828
829                 pa_assert(p->import);
830
831                 if (!(b = pa_memimport_get(p->import,
832                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
833                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
834                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
835                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
836
837                     pa_log_warn("Failed to import memory block.");
838                     return -1;
839                 }
840
841                 if (p->recieve_memblock_callback) {
842                     int64_t offset;
843                     pa_memchunk chunk;
844
845                     chunk.memblock = b;
846                     chunk.index = 0;
847                     chunk.length = pa_memblock_get_length(b);
848
849                     offset = (int64_t) (
850                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
851                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
852
853                     p->recieve_memblock_callback(
854                             p,
855                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
856                             offset,
857                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
858                             &chunk,
859                             p->recieve_memblock_callback_userdata);
860                 }
861
862                 pa_memblock_unref(b);
863             }
864
865             goto frame_done;
866         }
867     }
868
869     return 0;
870
871 frame_done:
872     p->read.memblock = NULL;
873     p->read.packet = NULL;
874     p->read.index = 0;
875     p->read.data = NULL;
876
877 #ifdef HAVE_CREDS
878     p->read_creds_valid = FALSE;
879 #endif
880
881     return 0;
882
883 fail:
884     if (release_memblock)
885         pa_memblock_release(release_memblock);
886
887     return -1;
888 }
889
890 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
891     pa_assert(p);
892     pa_assert(PA_REFCNT_VALUE(p) > 0);
893
894     p->die_callback = cb;
895     p->die_callback_userdata = userdata;
896 }
897
898 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
899     pa_assert(p);
900     pa_assert(PA_REFCNT_VALUE(p) > 0);
901
902     p->drain_callback = cb;
903     p->drain_callback_userdata = userdata;
904 }
905
906 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
907     pa_assert(p);
908     pa_assert(PA_REFCNT_VALUE(p) > 0);
909
910     p->recieve_packet_callback = cb;
911     p->recieve_packet_callback_userdata = userdata;
912 }
913
914 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
915     pa_assert(p);
916     pa_assert(PA_REFCNT_VALUE(p) > 0);
917
918     p->recieve_memblock_callback = cb;
919     p->recieve_memblock_callback_userdata = userdata;
920 }
921
922 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
923     pa_assert(p);
924     pa_assert(PA_REFCNT_VALUE(p) > 0);
925
926     p->release_callback = cb;
927     p->release_callback_userdata = userdata;
928 }
929
930 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
931     pa_assert(p);
932     pa_assert(PA_REFCNT_VALUE(p) > 0);
933
934     p->release_callback = cb;
935     p->release_callback_userdata = userdata;
936 }
937
938 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
939     pa_bool_t b;
940
941     pa_assert(p);
942     pa_assert(PA_REFCNT_VALUE(p) > 0);
943
944     if (p->dead)
945         b = FALSE;
946     else
947         b = p->write.current || !pa_queue_is_empty(p->send_queue);
948
949     return b;
950 }
951
952 void pa_pstream_unref(pa_pstream*p) {
953     pa_assert(p);
954     pa_assert(PA_REFCNT_VALUE(p) > 0);
955
956     if (PA_REFCNT_DEC(p) <= 0)
957         pstream_free(p);
958 }
959
960 pa_pstream* pa_pstream_ref(pa_pstream*p) {
961     pa_assert(p);
962     pa_assert(PA_REFCNT_VALUE(p) > 0);
963
964     PA_REFCNT_INC(p);
965     return p;
966 }
967
968 void pa_pstream_unlink(pa_pstream *p) {
969     pa_assert(p);
970
971     if (p->dead)
972         return;
973
974     p->dead = TRUE;
975
976     if (p->import) {
977         pa_memimport_free(p->import);
978         p->import = NULL;
979     }
980
981     if (p->export) {
982         pa_memexport_free(p->export);
983         p->export = NULL;
984     }
985
986     if (p->io) {
987         pa_iochannel_free(p->io);
988         p->io = NULL;
989     }
990
991     if (p->defer_event) {
992         p->mainloop->defer_free(p->defer_event);
993         p->defer_event = NULL;
994     }
995
996     p->die_callback = NULL;
997     p->drain_callback = NULL;
998     p->recieve_packet_callback = NULL;
999     p->recieve_memblock_callback = NULL;
1000 }
1001
1002 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1003     pa_assert(p);
1004     pa_assert(PA_REFCNT_VALUE(p) > 0);
1005
1006     p->use_shm = enable;
1007
1008     if (enable) {
1009
1010         if (!p->export)
1011             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1012
1013     } else {
1014
1015         if (p->export) {
1016             pa_memexport_free(p->export);
1017             p->export = NULL;
1018         }
1019     }
1020 }
1021
1022 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1023     pa_assert(p);
1024     pa_assert(PA_REFCNT_VALUE(p) > 0);
1025
1026     return p->use_shm;
1027 }