pstream: Fix case in log message
[platform/upstream/pulseaudio.git] / src / pulsecore / pstream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
33 #endif
34
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/creds.h>
41 #include <pulsecore/refcnt.h>
42 #include <pulsecore/flist.h>
43 #include <pulsecore/macro.h>
44
45 #include "pstream.h"
46
47 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 #define PA_FLAG_SHMDATA    0x80000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE  0xC0000000LU
51 #define PA_FLAG_SHMMASK    0xFF000000LU
52 #define PA_FLAG_SEEKMASK   0x000000FFLU
53
54 /* The sequence descriptor header consists of 5 32bit integers: */
55 enum {
56     PA_PSTREAM_DESCRIPTOR_LENGTH,
57     PA_PSTREAM_DESCRIPTOR_CHANNEL,
58     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
59     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
60     PA_PSTREAM_DESCRIPTOR_FLAGS,
61     PA_PSTREAM_DESCRIPTOR_MAX
62 };
63
64 /* If we have an SHM block, this info follows the descriptor */
65 enum {
66     PA_PSTREAM_SHM_BLOCKID,
67     PA_PSTREAM_SHM_SHMID,
68     PA_PSTREAM_SHM_INDEX,
69     PA_PSTREAM_SHM_LENGTH,
70     PA_PSTREAM_SHM_MAX
71 };
72
73 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
74
75 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
76
77 #define MINIBUF_SIZE (256)
78
79 /* To allow uploading a single sample in one frame, this value should be the
80  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
81  */
82 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
83
84 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
85
86 struct item_info {
87     enum {
88         PA_PSTREAM_ITEM_PACKET,
89         PA_PSTREAM_ITEM_MEMBLOCK,
90         PA_PSTREAM_ITEM_SHMRELEASE,
91         PA_PSTREAM_ITEM_SHMREVOKE
92     } type;
93
94     /* packet info */
95     pa_packet *packet;
96 #ifdef HAVE_CREDS
97     pa_bool_t with_creds;
98     pa_creds creds;
99 #endif
100
101     /* memblock info */
102     pa_memchunk chunk;
103     uint32_t channel;
104     int64_t offset;
105     pa_seek_mode_t seek_mode;
106
107     /* release/revoke info */
108     uint32_t block_id;
109 };
110
111 struct pa_pstream {
112     PA_REFCNT_DECLARE;
113
114     pa_mainloop_api *mainloop;
115     pa_defer_event *defer_event;
116     pa_iochannel *io;
117
118     pa_queue *send_queue;
119
120     pa_bool_t dead;
121
122     struct {
123         union {
124             uint8_t minibuf[MINIBUF_SIZE];
125             pa_pstream_descriptor descriptor;
126         };
127         struct item_info* current;
128         void *data;
129         size_t index;
130         int minibuf_validsize;
131         pa_memchunk memchunk;
132     } write;
133
134     struct {
135         pa_pstream_descriptor descriptor;
136         pa_memblock *memblock;
137         pa_packet *packet;
138         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
139         void *data;
140         size_t index;
141     } read;
142
143     pa_bool_t use_shm;
144     pa_memimport *import;
145     pa_memexport *export;
146
147     pa_pstream_packet_cb_t receive_packet_callback;
148     void *receive_packet_callback_userdata;
149
150     pa_pstream_memblock_cb_t receive_memblock_callback;
151     void *receive_memblock_callback_userdata;
152
153     pa_pstream_notify_cb_t drain_callback;
154     void *drain_callback_userdata;
155
156     pa_pstream_notify_cb_t die_callback;
157     void *die_callback_userdata;
158
159     pa_pstream_block_id_cb_t revoke_callback;
160     void *revoke_callback_userdata;
161
162     pa_pstream_block_id_cb_t release_callback;
163     void *release_callback_userdata;
164
165     pa_mempool *mempool;
166
167 #ifdef HAVE_CREDS
168     pa_creds read_creds, write_creds;
169     pa_bool_t read_creds_valid, send_creds_now;
170 #endif
171 };
172
173 static int do_write(pa_pstream *p);
174 static int do_read(pa_pstream *p);
175
176 static void do_pstream_read_write(pa_pstream *p) {
177     pa_assert(p);
178     pa_assert(PA_REFCNT_VALUE(p) > 0);
179
180     pa_pstream_ref(p);
181
182     p->mainloop->defer_enable(p->defer_event, 0);
183
184     if (!p->dead && pa_iochannel_is_readable(p->io)) {
185         if (do_read(p) < 0)
186             goto fail;
187     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
188         goto fail;
189
190     if (!p->dead && pa_iochannel_is_writable(p->io)) {
191         if (do_write(p) < 0)
192             goto fail;
193     }
194
195     pa_pstream_unref(p);
196     return;
197
198 fail:
199
200     if (p->die_callback)
201         p->die_callback(p, p->die_callback_userdata);
202
203     pa_pstream_unlink(p);
204     pa_pstream_unref(p);
205 }
206
207 static void io_callback(pa_iochannel*io, void *userdata) {
208     pa_pstream *p = userdata;
209
210     pa_assert(p);
211     pa_assert(PA_REFCNT_VALUE(p) > 0);
212     pa_assert(p->io == io);
213
214     do_pstream_read_write(p);
215 }
216
217 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
218     pa_pstream *p = userdata;
219
220     pa_assert(p);
221     pa_assert(PA_REFCNT_VALUE(p) > 0);
222     pa_assert(p->defer_event == e);
223     pa_assert(p->mainloop == m);
224
225     do_pstream_read_write(p);
226 }
227
228 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
229
230 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
231     pa_pstream *p;
232
233     pa_assert(m);
234     pa_assert(io);
235     pa_assert(pool);
236
237     p = pa_xnew(pa_pstream, 1);
238     PA_REFCNT_INIT(p);
239     p->io = io;
240     pa_iochannel_set_callback(io, io_callback, p);
241     p->dead = FALSE;
242
243     p->mainloop = m;
244     p->defer_event = m->defer_new(m, defer_callback, p);
245     m->defer_enable(p->defer_event, 0);
246
247     p->send_queue = pa_queue_new();
248
249     p->write.current = NULL;
250     p->write.index = 0;
251     pa_memchunk_reset(&p->write.memchunk);
252     p->read.memblock = NULL;
253     p->read.packet = NULL;
254     p->read.index = 0;
255
256     p->receive_packet_callback = NULL;
257     p->receive_packet_callback_userdata = NULL;
258     p->receive_memblock_callback = NULL;
259     p->receive_memblock_callback_userdata = NULL;
260     p->drain_callback = NULL;
261     p->drain_callback_userdata = NULL;
262     p->die_callback = NULL;
263     p->die_callback_userdata = NULL;
264     p->revoke_callback = NULL;
265     p->revoke_callback_userdata = NULL;
266     p->release_callback = NULL;
267     p->release_callback_userdata = NULL;
268
269     p->mempool = pool;
270
271     p->use_shm = FALSE;
272     p->export = NULL;
273
274     /* We do importing unconditionally */
275     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
276
277     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
278     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
279
280 #ifdef HAVE_CREDS
281     p->send_creds_now = FALSE;
282     p->read_creds_valid = FALSE;
283 #endif
284     return p;
285 }
286
287 static void item_free(void *item) {
288     struct item_info *i = item;
289     pa_assert(i);
290
291     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
292         pa_assert(i->chunk.memblock);
293         pa_memblock_unref(i->chunk.memblock);
294     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
295         pa_assert(i->packet);
296         pa_packet_unref(i->packet);
297     }
298
299     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
300         pa_xfree(i);
301 }
302
303 static void pstream_free(pa_pstream *p) {
304     pa_assert(p);
305
306     pa_pstream_unlink(p);
307
308     pa_queue_free(p->send_queue, item_free);
309
310     if (p->write.current)
311         item_free(p->write.current);
312
313     if (p->write.memchunk.memblock)
314         pa_memblock_unref(p->write.memchunk.memblock);
315
316     if (p->read.memblock)
317         pa_memblock_unref(p->read.memblock);
318
319     if (p->read.packet)
320         pa_packet_unref(p->read.packet);
321
322     pa_xfree(p);
323 }
324
325 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
326     struct item_info *i;
327
328     pa_assert(p);
329     pa_assert(PA_REFCNT_VALUE(p) > 0);
330     pa_assert(packet);
331
332     if (p->dead)
333         return;
334
335     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
336         i = pa_xnew(struct item_info, 1);
337
338     i->type = PA_PSTREAM_ITEM_PACKET;
339     i->packet = pa_packet_ref(packet);
340
341 #ifdef HAVE_CREDS
342     if ((i->with_creds = !!creds))
343         i->creds = *creds;
344 #endif
345
346     pa_queue_push(p->send_queue, i);
347
348     p->mainloop->defer_enable(p->defer_event, 1);
349 }
350
351 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
352     size_t length, idx;
353     size_t bsm;
354
355     pa_assert(p);
356     pa_assert(PA_REFCNT_VALUE(p) > 0);
357     pa_assert(channel != (uint32_t) -1);
358     pa_assert(chunk);
359
360     if (p->dead)
361         return;
362
363     idx = 0;
364     length = chunk->length;
365
366     bsm = pa_mempool_block_size_max(p->mempool);
367
368     while (length > 0) {
369         struct item_info *i;
370         size_t n;
371
372         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
373             i = pa_xnew(struct item_info, 1);
374         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
375
376         n = PA_MIN(length, bsm);
377         i->chunk.index = chunk->index + idx;
378         i->chunk.length = n;
379         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
380
381         i->channel = channel;
382         i->offset = offset;
383         i->seek_mode = seek_mode;
384 #ifdef HAVE_CREDS
385         i->with_creds = FALSE;
386 #endif
387
388         pa_queue_push(p->send_queue, i);
389
390         idx += n;
391         length -= n;
392     }
393
394     p->mainloop->defer_enable(p->defer_event, 1);
395 }
396
397 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
398     struct item_info *item;
399     pa_assert(p);
400     pa_assert(PA_REFCNT_VALUE(p) > 0);
401
402     if (p->dead)
403         return;
404
405 /*     pa_log("Releasing block %u", block_id); */
406
407     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
408         item = pa_xnew(struct item_info, 1);
409     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
410     item->block_id = block_id;
411 #ifdef HAVE_CREDS
412     item->with_creds = FALSE;
413 #endif
414
415     pa_queue_push(p->send_queue, item);
416     p->mainloop->defer_enable(p->defer_event, 1);
417 }
418
419 /* might be called from thread context */
420 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
421     pa_pstream *p = userdata;
422
423     pa_assert(p);
424     pa_assert(PA_REFCNT_VALUE(p) > 0);
425
426     if (p->dead)
427         return;
428
429     if (p->release_callback)
430         p->release_callback(p, block_id, p->release_callback_userdata);
431     else
432         pa_pstream_send_release(p, block_id);
433 }
434
435 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
436     struct item_info *item;
437     pa_assert(p);
438     pa_assert(PA_REFCNT_VALUE(p) > 0);
439
440     if (p->dead)
441         return;
442 /*     pa_log("Revoking block %u", block_id); */
443
444     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
445         item = pa_xnew(struct item_info, 1);
446     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
447     item->block_id = block_id;
448 #ifdef HAVE_CREDS
449     item->with_creds = FALSE;
450 #endif
451
452     pa_queue_push(p->send_queue, item);
453     p->mainloop->defer_enable(p->defer_event, 1);
454 }
455
456 /* might be called from thread context */
457 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
458     pa_pstream *p = userdata;
459
460     pa_assert(p);
461     pa_assert(PA_REFCNT_VALUE(p) > 0);
462
463     if (p->revoke_callback)
464         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
465     else
466         pa_pstream_send_revoke(p, block_id);
467 }
468
469 static void prepare_next_write_item(pa_pstream *p) {
470     pa_assert(p);
471     pa_assert(PA_REFCNT_VALUE(p) > 0);
472
473     p->write.current = pa_queue_pop(p->send_queue);
474
475     if (!p->write.current)
476         return;
477     p->write.index = 0;
478     p->write.data = NULL;
479     p->write.minibuf_validsize = 0;
480     pa_memchunk_reset(&p->write.memchunk);
481
482     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
483     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
484     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
485     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
486     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
487
488     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
489
490         pa_assert(p->write.current->packet);
491         p->write.data = p->write.current->packet->data;
492         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
493
494         if (p->write.current->packet->length <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
495             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, p->write.current->packet->length);
496             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + p->write.current->packet->length;
497         }
498
499     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
500
501         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
502         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
503
504     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
505
506         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
507         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
508
509     } else {
510         uint32_t flags;
511         pa_bool_t send_payload = TRUE;
512
513         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
514         pa_assert(p->write.current->chunk.memblock);
515
516         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
517         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
518         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
519
520         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
521
522         if (p->use_shm) {
523             uint32_t block_id, shm_id;
524             size_t offset, length;
525             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
526             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
527
528             pa_assert(p->export);
529
530             if (pa_memexport_put(p->export,
531                                  p->write.current->chunk.memblock,
532                                  &block_id,
533                                  &shm_id,
534                                  &offset,
535                                  &length) >= 0) {
536
537                 flags |= PA_FLAG_SHMDATA;
538                 send_payload = FALSE;
539
540                 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
541                 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
542                 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
543                 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
544
545                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
546                 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
547             }
548 /*             else */
549 /*                 pa_log_warn("Failed to export memory block."); */
550         }
551
552         if (send_payload) {
553             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
554             p->write.memchunk = p->write.current->chunk;
555             pa_memblock_ref(p->write.memchunk.memblock);
556             p->write.data = NULL;
557         }
558
559         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
560     }
561
562 #ifdef HAVE_CREDS
563     if ((p->send_creds_now = p->write.current->with_creds))
564         p->write_creds = p->write.current->creds;
565 #endif
566 }
567
568 static int do_write(pa_pstream *p) {
569     void *d;
570     size_t l;
571     ssize_t r;
572     pa_memblock *release_memblock = NULL;
573
574     pa_assert(p);
575     pa_assert(PA_REFCNT_VALUE(p) > 0);
576
577     if (!p->write.current)
578         prepare_next_write_item(p);
579
580     if (!p->write.current)
581         return 0;
582
583     if (p->write.minibuf_validsize > 0) {
584         d = p->write.minibuf + p->write.index;
585         l = p->write.minibuf_validsize - p->write.index;
586     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
587         d = (uint8_t*) p->write.descriptor + p->write.index;
588         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
589     } else {
590         pa_assert(p->write.data || p->write.memchunk.memblock);
591
592         if (p->write.data)
593             d = p->write.data;
594         else {
595             d = pa_memblock_acquire_chunk(&p->write.memchunk);
596             release_memblock = p->write.memchunk.memblock;
597         }
598
599         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
600         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
601     }
602
603     pa_assert(l > 0);
604
605 #ifdef HAVE_CREDS
606     if (p->send_creds_now) {
607
608         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
609             goto fail;
610
611         p->send_creds_now = FALSE;
612     } else
613 #endif
614
615     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
616         goto fail;
617
618     if (release_memblock)
619         pa_memblock_release(release_memblock);
620
621     p->write.index += (size_t) r;
622
623     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
624         pa_assert(p->write.current);
625         item_free(p->write.current);
626         p->write.current = NULL;
627
628         if (p->write.memchunk.memblock)
629             pa_memblock_unref(p->write.memchunk.memblock);
630
631         pa_memchunk_reset(&p->write.memchunk);
632
633         if (p->drain_callback && !pa_pstream_is_pending(p))
634             p->drain_callback(p, p->drain_callback_userdata);
635     }
636
637     return 0;
638
639 fail:
640
641     if (release_memblock)
642         pa_memblock_release(release_memblock);
643
644     return -1;
645 }
646
647 static int do_read(pa_pstream *p) {
648     void *d;
649     size_t l;
650     ssize_t r;
651     pa_memblock *release_memblock = NULL;
652     pa_assert(p);
653     pa_assert(PA_REFCNT_VALUE(p) > 0);
654
655     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
656         d = (uint8_t*) p->read.descriptor + p->read.index;
657         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
658     } else {
659         pa_assert(p->read.data || p->read.memblock);
660
661         if (p->read.data)
662             d = p->read.data;
663         else {
664             d = pa_memblock_acquire(p->read.memblock);
665             release_memblock = p->read.memblock;
666         }
667
668         d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
669         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
670     }
671
672 #ifdef HAVE_CREDS
673     {
674         pa_bool_t b = 0;
675
676         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
677             goto fail;
678
679         p->read_creds_valid = p->read_creds_valid || b;
680     }
681 #else
682     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
683         goto fail;
684 #endif
685
686     if (release_memblock)
687         pa_memblock_release(release_memblock);
688
689     p->read.index += (size_t) r;
690
691     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
692         uint32_t flags, length, channel;
693         /* Reading of frame descriptor complete */
694
695         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
696
697         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
698             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
699             return -1;
700         }
701
702         if (flags == PA_FLAG_SHMRELEASE) {
703
704             /* This is a SHM memblock release frame with no payload */
705
706 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
707
708             pa_assert(p->export);
709             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
710
711             goto frame_done;
712
713         } else if (flags == PA_FLAG_SHMREVOKE) {
714
715             /* This is a SHM memblock revoke frame with no payload */
716
717 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
718
719             pa_assert(p->import);
720             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
721
722             goto frame_done;
723         }
724
725         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
726
727         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
728             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
729             return -1;
730         }
731
732         pa_assert(!p->read.packet && !p->read.memblock);
733
734         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
735
736         if (channel == (uint32_t) -1) {
737
738             if (flags != 0) {
739                 pa_log_warn("Received packet frame with invalid flags value.");
740                 return -1;
741             }
742
743             /* Frame is a packet frame */
744             p->read.packet = pa_packet_new(length);
745             p->read.data = p->read.packet->data;
746
747         } else {
748
749             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
750                 pa_log_warn("Received memblock frame with invalid seek mode.");
751                 return -1;
752             }
753
754             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
755
756                 if (length != sizeof(p->read.shm_info)) {
757                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
758                     return -1;
759                 }
760
761                 /* Frame is a memblock frame referencing an SHM memblock */
762                 p->read.data = p->read.shm_info;
763
764             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
765
766                 /* Frame is a memblock frame */
767
768                 p->read.memblock = pa_memblock_new(p->mempool, length);
769                 p->read.data = NULL;
770             } else {
771
772                 pa_log_warn("Received memblock frame with invalid flags value.");
773                 return -1;
774             }
775         }
776
777     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
778         /* Frame payload available */
779
780         if (p->read.memblock && p->receive_memblock_callback) {
781
782             /* Is this memblock data? Than pass it to the user */
783             l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
784
785             if (l > 0) {
786                 pa_memchunk chunk;
787
788                 chunk.memblock = p->read.memblock;
789                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
790                 chunk.length = l;
791
792                 if (p->receive_memblock_callback) {
793                     int64_t offset;
794
795                     offset = (int64_t) (
796                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
797                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
798
799                     p->receive_memblock_callback(
800                         p,
801                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
802                         offset,
803                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
804                         &chunk,
805                         p->receive_memblock_callback_userdata);
806                 }
807
808                 /* Drop seek info for following callbacks */
809                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
810                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
811                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
812             }
813         }
814
815         /* Frame complete */
816         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
817
818             if (p->read.memblock) {
819
820                 /* This was a memblock frame. We can unref the memblock now */
821                 pa_memblock_unref(p->read.memblock);
822
823             } else if (p->read.packet) {
824
825                 if (p->receive_packet_callback)
826 #ifdef HAVE_CREDS
827                     p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
828 #else
829                     p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
830 #endif
831
832                 pa_packet_unref(p->read.packet);
833             } else {
834                 pa_memblock *b;
835
836                 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
837
838                 pa_assert(p->import);
839
840                 if (!(b = pa_memimport_get(p->import,
841                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
842                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
843                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
844                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
845
846                     if (pa_log_ratelimit(PA_LOG_DEBUG))
847                         pa_log_debug("Failed to import memory block.");
848                 }
849
850                 if (p->receive_memblock_callback) {
851                     int64_t offset;
852                     pa_memchunk chunk;
853
854                     chunk.memblock = b;
855                     chunk.index = 0;
856                     chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
857
858                     offset = (int64_t) (
859                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
860                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
861
862                     p->receive_memblock_callback(
863                             p,
864                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
865                             offset,
866                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
867                             &chunk,
868                             p->receive_memblock_callback_userdata);
869                 }
870
871                 if (b)
872                     pa_memblock_unref(b);
873             }
874
875             goto frame_done;
876         }
877     }
878
879     return 0;
880
881 frame_done:
882     p->read.memblock = NULL;
883     p->read.packet = NULL;
884     p->read.index = 0;
885     p->read.data = NULL;
886
887 #ifdef HAVE_CREDS
888     p->read_creds_valid = FALSE;
889 #endif
890
891     return 0;
892
893 fail:
894     if (release_memblock)
895         pa_memblock_release(release_memblock);
896
897     return -1;
898 }
899
900 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
901     pa_assert(p);
902     pa_assert(PA_REFCNT_VALUE(p) > 0);
903
904     p->die_callback = cb;
905     p->die_callback_userdata = userdata;
906 }
907
908 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
909     pa_assert(p);
910     pa_assert(PA_REFCNT_VALUE(p) > 0);
911
912     p->drain_callback = cb;
913     p->drain_callback_userdata = userdata;
914 }
915
916 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
917     pa_assert(p);
918     pa_assert(PA_REFCNT_VALUE(p) > 0);
919
920     p->receive_packet_callback = cb;
921     p->receive_packet_callback_userdata = userdata;
922 }
923
924 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
925     pa_assert(p);
926     pa_assert(PA_REFCNT_VALUE(p) > 0);
927
928     p->receive_memblock_callback = cb;
929     p->receive_memblock_callback_userdata = userdata;
930 }
931
932 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
933     pa_assert(p);
934     pa_assert(PA_REFCNT_VALUE(p) > 0);
935
936     p->release_callback = cb;
937     p->release_callback_userdata = userdata;
938 }
939
940 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
941     pa_assert(p);
942     pa_assert(PA_REFCNT_VALUE(p) > 0);
943
944     p->release_callback = cb;
945     p->release_callback_userdata = userdata;
946 }
947
948 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
949     pa_bool_t b;
950
951     pa_assert(p);
952     pa_assert(PA_REFCNT_VALUE(p) > 0);
953
954     if (p->dead)
955         b = FALSE;
956     else
957         b = p->write.current || !pa_queue_isempty(p->send_queue);
958
959     return b;
960 }
961
962 void pa_pstream_unref(pa_pstream*p) {
963     pa_assert(p);
964     pa_assert(PA_REFCNT_VALUE(p) > 0);
965
966     if (PA_REFCNT_DEC(p) <= 0)
967         pstream_free(p);
968 }
969
970 pa_pstream* pa_pstream_ref(pa_pstream*p) {
971     pa_assert(p);
972     pa_assert(PA_REFCNT_VALUE(p) > 0);
973
974     PA_REFCNT_INC(p);
975     return p;
976 }
977
978 void pa_pstream_unlink(pa_pstream *p) {
979     pa_assert(p);
980
981     if (p->dead)
982         return;
983
984     p->dead = TRUE;
985
986     if (p->import) {
987         pa_memimport_free(p->import);
988         p->import = NULL;
989     }
990
991     if (p->export) {
992         pa_memexport_free(p->export);
993         p->export = NULL;
994     }
995
996     if (p->io) {
997         pa_iochannel_free(p->io);
998         p->io = NULL;
999     }
1000
1001     if (p->defer_event) {
1002         p->mainloop->defer_free(p->defer_event);
1003         p->defer_event = NULL;
1004     }
1005
1006     p->die_callback = NULL;
1007     p->drain_callback = NULL;
1008     p->receive_packet_callback = NULL;
1009     p->receive_memblock_callback = NULL;
1010 }
1011
1012 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1013     pa_assert(p);
1014     pa_assert(PA_REFCNT_VALUE(p) > 0);
1015
1016     p->use_shm = enable;
1017
1018     if (enable) {
1019
1020         if (!p->export)
1021             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1022
1023     } else {
1024
1025         if (p->export) {
1026             pa_memexport_free(p->export);
1027             p->export = NULL;
1028         }
1029     }
1030 }
1031
1032 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1033     pa_assert(p);
1034     pa_assert(PA_REFCNT_VALUE(p) > 0);
1035
1036     return p->use_shm;
1037 }