Merge commit 'origin/master-tx'
[platform/upstream/pulseaudio.git] / src / pulsecore / pstream.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40
41
42 #include <pulse/xmalloc.h>
43
44 #include <pulsecore/winsock.h>
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/refcnt.h>
50 #include <pulsecore/flist.h>
51 #include <pulsecore/macro.h>
52
53 #include "pstream.h"
54
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA    0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE  0xC0000000LU
59 #define PA_FLAG_SHMMASK    0xFF000000LU
60 #define PA_FLAG_SEEKMASK   0x000000FFLU
61
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64     PA_PSTREAM_DESCRIPTOR_LENGTH,
65     PA_PSTREAM_DESCRIPTOR_CHANNEL,
66     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68     PA_PSTREAM_DESCRIPTOR_FLAGS,
69     PA_PSTREAM_DESCRIPTOR_MAX
70 };
71
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74     PA_PSTREAM_SHM_BLOCKID,
75     PA_PSTREAM_SHM_SHMID,
76     PA_PSTREAM_SHM_INDEX,
77     PA_PSTREAM_SHM_LENGTH,
78     PA_PSTREAM_SHM_MAX
79 };
80
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
82
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
85
86 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
87
88 struct item_info {
89     enum {
90         PA_PSTREAM_ITEM_PACKET,
91         PA_PSTREAM_ITEM_MEMBLOCK,
92         PA_PSTREAM_ITEM_SHMRELEASE,
93         PA_PSTREAM_ITEM_SHMREVOKE
94     } type;
95
96     /* packet info */
97     pa_packet *packet;
98 #ifdef HAVE_CREDS
99     pa_bool_t with_creds;
100     pa_creds creds;
101 #endif
102
103     /* memblock info */
104     pa_memchunk chunk;
105     uint32_t channel;
106     int64_t offset;
107     pa_seek_mode_t seek_mode;
108
109     /* release/revoke info */
110     uint32_t block_id;
111 };
112
113 struct pa_pstream {
114     PA_REFCNT_DECLARE;
115
116     pa_mainloop_api *mainloop;
117     pa_defer_event *defer_event;
118     pa_iochannel *io;
119
120     pa_queue *send_queue;
121
122     pa_bool_t dead;
123
124     struct {
125         pa_pstream_descriptor descriptor;
126         struct item_info* current;
127         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128         void *data;
129         size_t index;
130         pa_memchunk memchunk;
131     } write;
132
133     struct {
134         pa_pstream_descriptor descriptor;
135         pa_memblock *memblock;
136         pa_packet *packet;
137         uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138         void *data;
139         size_t index;
140     } read;
141
142     pa_bool_t use_shm;
143     pa_memimport *import;
144     pa_memexport *export;
145
146     pa_pstream_packet_cb_t recieve_packet_callback;
147     void *recieve_packet_callback_userdata;
148
149     pa_pstream_memblock_cb_t recieve_memblock_callback;
150     void *recieve_memblock_callback_userdata;
151
152     pa_pstream_notify_cb_t drain_callback;
153     void *drain_callback_userdata;
154
155     pa_pstream_notify_cb_t die_callback;
156     void *die_callback_userdata;
157
158     pa_pstream_block_id_cb_t revoke_callback;
159     void *revoke_callback_userdata;
160
161     pa_pstream_block_id_cb_t release_callback;
162     void *release_callback_userdata;
163
164     pa_mempool *mempool;
165
166 #ifdef HAVE_CREDS
167     pa_creds read_creds, write_creds;
168     pa_bool_t read_creds_valid, send_creds_now;
169 #endif
170 };
171
172 static int do_write(pa_pstream *p);
173 static int do_read(pa_pstream *p);
174
175 static void do_something(pa_pstream *p) {
176     pa_assert(p);
177     pa_assert(PA_REFCNT_VALUE(p) > 0);
178
179     pa_pstream_ref(p);
180
181     p->mainloop->defer_enable(p->defer_event, 0);
182
183     if (!p->dead && pa_iochannel_is_readable(p->io)) {
184         if (do_read(p) < 0)
185             goto fail;
186     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
187         goto fail;
188
189     if (!p->dead && pa_iochannel_is_writable(p->io)) {
190         if (do_write(p) < 0)
191             goto fail;
192     }
193
194     pa_pstream_unref(p);
195     return;
196
197 fail:
198
199     if (p->die_callback)
200         p->die_callback(p, p->die_callback_userdata);
201
202     pa_pstream_unlink(p);
203     pa_pstream_unref(p);
204 }
205
206 static void io_callback(pa_iochannel*io, void *userdata) {
207     pa_pstream *p = userdata;
208
209     pa_assert(p);
210     pa_assert(PA_REFCNT_VALUE(p) > 0);
211     pa_assert(p->io == io);
212
213     do_something(p);
214 }
215
216 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
217     pa_pstream *p = userdata;
218
219     pa_assert(p);
220     pa_assert(PA_REFCNT_VALUE(p) > 0);
221     pa_assert(p->defer_event == e);
222     pa_assert(p->mainloop == m);
223
224     do_something(p);
225 }
226
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
228
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
230     pa_pstream *p;
231
232     pa_assert(m);
233     pa_assert(io);
234     pa_assert(pool);
235
236     p = pa_xnew(pa_pstream, 1);
237     PA_REFCNT_INIT(p);
238     p->io = io;
239     pa_iochannel_set_callback(io, io_callback, p);
240     p->dead = FALSE;
241
242     p->mainloop = m;
243     p->defer_event = m->defer_new(m, defer_callback, p);
244     m->defer_enable(p->defer_event, 0);
245
246     p->send_queue = pa_queue_new();
247
248     p->write.current = NULL;
249     p->write.index = 0;
250     pa_memchunk_reset(&p->write.memchunk);
251     p->read.memblock = NULL;
252     p->read.packet = NULL;
253     p->read.index = 0;
254
255     p->recieve_packet_callback = NULL;
256     p->recieve_packet_callback_userdata = NULL;
257     p->recieve_memblock_callback = NULL;
258     p->recieve_memblock_callback_userdata = NULL;
259     p->drain_callback = NULL;
260     p->drain_callback_userdata = NULL;
261     p->die_callback = NULL;
262     p->die_callback_userdata = NULL;
263     p->revoke_callback = NULL;
264     p->revoke_callback_userdata = NULL;
265     p->release_callback = NULL;
266     p->release_callback_userdata = NULL;
267
268     p->mempool = pool;
269
270     p->use_shm = FALSE;
271     p->export = NULL;
272
273     /* We do importing unconditionally */
274     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
275
276     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
277     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
278
279 #ifdef HAVE_CREDS
280     p->send_creds_now = FALSE;
281     p->read_creds_valid = FALSE;
282 #endif
283     return p;
284 }
285
286 static void item_free(void *item, void *q) {
287     struct item_info *i = item;
288     pa_assert(i);
289
290     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
291         pa_assert(i->chunk.memblock);
292         pa_memblock_unref(i->chunk.memblock);
293     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
294         pa_assert(i->packet);
295         pa_packet_unref(i->packet);
296     }
297
298     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
299         pa_xfree(i);
300 }
301
302 static void pstream_free(pa_pstream *p) {
303     pa_assert(p);
304
305     pa_pstream_unlink(p);
306
307     pa_queue_free(p->send_queue, item_free, NULL);
308
309     if (p->write.current)
310         item_free(p->write.current, NULL);
311
312     if (p->write.memchunk.memblock)
313         pa_memblock_unref(p->write.memchunk.memblock);
314
315     if (p->read.memblock)
316         pa_memblock_unref(p->read.memblock);
317
318     if (p->read.packet)
319         pa_packet_unref(p->read.packet);
320
321     pa_xfree(p);
322 }
323
324 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
325     struct item_info *i;
326
327     pa_assert(p);
328     pa_assert(PA_REFCNT_VALUE(p) > 0);
329     pa_assert(packet);
330
331     if (p->dead)
332         return;
333
334     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
335         i = pa_xnew(struct item_info, 1);
336
337     i->type = PA_PSTREAM_ITEM_PACKET;
338     i->packet = pa_packet_ref(packet);
339
340 #ifdef HAVE_CREDS
341     if ((i->with_creds = !!creds))
342         i->creds = *creds;
343 #endif
344
345     pa_queue_push(p->send_queue, i);
346
347     p->mainloop->defer_enable(p->defer_event, 1);
348 }
349
350 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
351     size_t length, idx;
352     size_t bsm;
353
354     pa_assert(p);
355     pa_assert(PA_REFCNT_VALUE(p) > 0);
356     pa_assert(channel != (uint32_t) -1);
357     pa_assert(chunk);
358
359     if (p->dead)
360         return;
361
362     idx = 0;
363     length = chunk->length;
364
365     bsm = pa_mempool_block_size_max(p->mempool);
366
367     while (length > 0) {
368         struct item_info *i;
369         size_t n;
370
371         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
372             i = pa_xnew(struct item_info, 1);
373         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
374
375         n = PA_MIN(length, bsm);
376         i->chunk.index = chunk->index + idx;
377         i->chunk.length = n;
378         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
379
380         i->channel = channel;
381         i->offset = offset;
382         i->seek_mode = seek_mode;
383 #ifdef HAVE_CREDS
384         i->with_creds = FALSE;
385 #endif
386
387         pa_queue_push(p->send_queue, i);
388
389         idx += n;
390         length -= n;
391     }
392
393     p->mainloop->defer_enable(p->defer_event, 1);
394 }
395
396 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
397     struct item_info *item;
398     pa_assert(p);
399     pa_assert(PA_REFCNT_VALUE(p) > 0);
400
401     if (p->dead)
402         return;
403
404 /*     pa_log("Releasing block %u", block_id); */
405
406     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
407         item = pa_xnew(struct item_info, 1);
408     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
409     item->block_id = block_id;
410 #ifdef HAVE_CREDS
411     item->with_creds = FALSE;
412 #endif
413
414     pa_queue_push(p->send_queue, item);
415     p->mainloop->defer_enable(p->defer_event, 1);
416 }
417
418 /* might be called from thread context */
419 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
420     pa_pstream *p = userdata;
421
422     pa_assert(p);
423     pa_assert(PA_REFCNT_VALUE(p) > 0);
424
425     if (p->dead)
426         return;
427
428     if (p->release_callback)
429         p->release_callback(p, block_id, p->release_callback_userdata);
430     else
431         pa_pstream_send_release(p, block_id);
432 }
433
434 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
435     struct item_info *item;
436     pa_assert(p);
437     pa_assert(PA_REFCNT_VALUE(p) > 0);
438
439     if (p->dead)
440         return;
441 /*     pa_log("Revoking block %u", block_id); */
442
443     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
444         item = pa_xnew(struct item_info, 1);
445     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
446     item->block_id = block_id;
447 #ifdef HAVE_CREDS
448     item->with_creds = FALSE;
449 #endif
450
451     pa_queue_push(p->send_queue, item);
452     p->mainloop->defer_enable(p->defer_event, 1);
453 }
454
455 /* might be called from thread context */
456 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
457     pa_pstream *p = userdata;
458
459     pa_assert(p);
460     pa_assert(PA_REFCNT_VALUE(p) > 0);
461
462     if (p->revoke_callback)
463         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
464     else
465         pa_pstream_send_revoke(p, block_id);
466 }
467
468 static void prepare_next_write_item(pa_pstream *p) {
469     pa_assert(p);
470     pa_assert(PA_REFCNT_VALUE(p) > 0);
471
472     p->write.current = pa_queue_pop(p->send_queue);
473
474     if (!p->write.current)
475         return;
476
477     p->write.index = 0;
478     p->write.data = NULL;
479     pa_memchunk_reset(&p->write.memchunk);
480
481     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
482     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
483     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
484     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
485     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
486
487     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
488
489         pa_assert(p->write.current->packet);
490         p->write.data = p->write.current->packet->data;
491         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
492
493     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
494
495         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
496         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
497
498     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
499
500         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
501         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
502
503     } else {
504         uint32_t flags;
505         pa_bool_t send_payload = TRUE;
506
507         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
508         pa_assert(p->write.current->chunk.memblock);
509
510         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
511         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
512         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
513
514         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
515
516         if (p->use_shm) {
517             uint32_t block_id, shm_id;
518             size_t offset, length;
519
520             pa_assert(p->export);
521
522             if (pa_memexport_put(p->export,
523                                  p->write.current->chunk.memblock,
524                                  &block_id,
525                                  &shm_id,
526                                  &offset,
527                                  &length) >= 0) {
528
529                 flags |= PA_FLAG_SHMDATA;
530                 send_payload = FALSE;
531
532                 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
533                 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
534                 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
535                 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
536
537                 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
538                 p->write.data = p->write.shm_info;
539             }
540 /*             else */
541 /*                 pa_log_warn("Failed to export memory block."); */
542         }
543
544         if (send_payload) {
545             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
546             p->write.memchunk = p->write.current->chunk;
547             pa_memblock_ref(p->write.memchunk.memblock);
548             p->write.data = NULL;
549         }
550
551         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
552     }
553
554 #ifdef HAVE_CREDS
555     if ((p->send_creds_now = p->write.current->with_creds))
556         p->write_creds = p->write.current->creds;
557 #endif
558 }
559
560 static int do_write(pa_pstream *p) {
561     void *d;
562     size_t l;
563     ssize_t r;
564     pa_memblock *release_memblock = NULL;
565
566     pa_assert(p);
567     pa_assert(PA_REFCNT_VALUE(p) > 0);
568
569     if (!p->write.current)
570         prepare_next_write_item(p);
571
572     if (!p->write.current)
573         return 0;
574
575     if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
576         d = (uint8_t*) p->write.descriptor + p->write.index;
577         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
578     } else {
579         pa_assert(p->write.data || p->write.memchunk.memblock);
580
581         if (p->write.data)
582             d = p->write.data;
583         else {
584             d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
585             release_memblock = p->write.memchunk.memblock;
586         }
587
588         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
589         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
590     }
591
592     pa_assert(l > 0);
593
594 #ifdef HAVE_CREDS
595     if (p->send_creds_now) {
596
597         if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
598             goto fail;
599
600         p->send_creds_now = FALSE;
601     } else
602 #endif
603
604     if ((r = pa_iochannel_write(p->io, d, l)) < 0)
605         goto fail;
606
607     if (release_memblock)
608         pa_memblock_release(release_memblock);
609
610     p->write.index += (size_t) r;
611
612     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
613         pa_assert(p->write.current);
614         item_free(p->write.current, NULL);
615         p->write.current = NULL;
616
617         if (p->write.memchunk.memblock)
618             pa_memblock_unref(p->write.memchunk.memblock);
619
620         pa_memchunk_reset(&p->write.memchunk);
621
622         if (p->drain_callback && !pa_pstream_is_pending(p))
623             p->drain_callback(p, p->drain_callback_userdata);
624     }
625
626     return 0;
627
628 fail:
629
630     if (release_memblock)
631         pa_memblock_release(release_memblock);
632
633     return -1;
634 }
635
636 static int do_read(pa_pstream *p) {
637     void *d;
638     size_t l;
639     ssize_t r;
640     pa_memblock *release_memblock = NULL;
641     pa_assert(p);
642     pa_assert(PA_REFCNT_VALUE(p) > 0);
643
644     if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
645         d = (uint8_t*) p->read.descriptor + p->read.index;
646         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
647     } else {
648         pa_assert(p->read.data || p->read.memblock);
649
650         if (p->read.data)
651             d = p->read.data;
652         else {
653             d = pa_memblock_acquire(p->read.memblock);
654             release_memblock = p->read.memblock;
655         }
656
657         d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
658         l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
659     }
660
661 #ifdef HAVE_CREDS
662     {
663         pa_bool_t b = 0;
664
665         if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
666             goto fail;
667
668         p->read_creds_valid = p->read_creds_valid || b;
669     }
670 #else
671     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
672         goto fail;
673 #endif
674
675     if (release_memblock)
676         pa_memblock_release(release_memblock);
677
678     p->read.index += (size_t) r;
679
680     if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
681         uint32_t flags, length, channel;
682         /* Reading of frame descriptor complete */
683
684         flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
685
686         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
687             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
688             return -1;
689         }
690
691         if (flags == PA_FLAG_SHMRELEASE) {
692
693             /* This is a SHM memblock release frame with no payload */
694
695 /*             pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
696
697             pa_assert(p->export);
698             pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
699
700             goto frame_done;
701
702         } else if (flags == PA_FLAG_SHMREVOKE) {
703
704             /* This is a SHM memblock revoke frame with no payload */
705
706 /*             pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
707
708             pa_assert(p->import);
709             pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
710
711             goto frame_done;
712         }
713
714         length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
715
716         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
717             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
718             return -1;
719         }
720
721         pa_assert(!p->read.packet && !p->read.memblock);
722
723         channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
724
725         if (channel == (uint32_t) -1) {
726
727             if (flags != 0) {
728                 pa_log_warn("Received packet frame with invalid flags value.");
729                 return -1;
730             }
731
732             /* Frame is a packet frame */
733             p->read.packet = pa_packet_new(length);
734             p->read.data = p->read.packet->data;
735
736         } else {
737
738             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
739                 pa_log_warn("Received memblock frame with invalid seek mode.");
740                 return -1;
741             }
742
743             if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
744
745                 if (length != sizeof(p->read.shm_info)) {
746                     pa_log_warn("Received SHM memblock frame with Invalid frame length.");
747                     return -1;
748                 }
749
750                 /* Frame is a memblock frame referencing an SHM memblock */
751                 p->read.data = p->read.shm_info;
752
753             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
754
755                 /* Frame is a memblock frame */
756
757                 p->read.memblock = pa_memblock_new(p->mempool, length);
758                 p->read.data = NULL;
759             } else {
760
761                 pa_log_warn("Received memblock frame with invalid flags value.");
762                 return -1;
763             }
764         }
765
766     } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
767         /* Frame payload available */
768
769         if (p->read.memblock && p->recieve_memblock_callback) {
770
771             /* Is this memblock data? Than pass it to the user */
772             l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
773
774             if (l > 0) {
775                 pa_memchunk chunk;
776
777                 chunk.memblock = p->read.memblock;
778                 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
779                 chunk.length = l;
780
781                 if (p->recieve_memblock_callback) {
782                     int64_t offset;
783
784                     offset = (int64_t) (
785                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
786                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
787
788                     p->recieve_memblock_callback(
789                         p,
790                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
791                         offset,
792                         ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
793                         &chunk,
794                         p->recieve_memblock_callback_userdata);
795                 }
796
797                 /* Drop seek info for following callbacks */
798                 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
799                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
800                     p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
801             }
802         }
803
804         /* Frame complete */
805         if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
806
807             if (p->read.memblock) {
808
809                 /* This was a memblock frame. We can unref the memblock now */
810                 pa_memblock_unref(p->read.memblock);
811
812             } else if (p->read.packet) {
813
814                 if (p->recieve_packet_callback)
815 #ifdef HAVE_CREDS
816                     p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
817 #else
818                     p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
819 #endif
820
821                 pa_packet_unref(p->read.packet);
822             } else {
823                 pa_memblock *b;
824
825                 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
826
827                 pa_assert(p->import);
828
829                 if (!(b = pa_memimport_get(p->import,
830                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
831                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
832                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
833                                           ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
834
835                     if (pa_log_ratelimit())
836                         pa_log_debug("Failed to import memory block.");
837                 }
838
839                 if (p->recieve_memblock_callback) {
840                     int64_t offset;
841                     pa_memchunk chunk;
842
843                     chunk.memblock = b;
844                     chunk.index = 0;
845                     chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
846
847                     offset = (int64_t) (
848                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
849                             (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
850
851                     p->recieve_memblock_callback(
852                             p,
853                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
854                             offset,
855                             ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
856                             &chunk,
857                             p->recieve_memblock_callback_userdata);
858                 }
859
860                 if (b)
861                     pa_memblock_unref(b);
862             }
863
864             goto frame_done;
865         }
866     }
867
868     return 0;
869
870 frame_done:
871     p->read.memblock = NULL;
872     p->read.packet = NULL;
873     p->read.index = 0;
874     p->read.data = NULL;
875
876 #ifdef HAVE_CREDS
877     p->read_creds_valid = FALSE;
878 #endif
879
880     return 0;
881
882 fail:
883     if (release_memblock)
884         pa_memblock_release(release_memblock);
885
886     return -1;
887 }
888
889 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
890     pa_assert(p);
891     pa_assert(PA_REFCNT_VALUE(p) > 0);
892
893     p->die_callback = cb;
894     p->die_callback_userdata = userdata;
895 }
896
897 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
898     pa_assert(p);
899     pa_assert(PA_REFCNT_VALUE(p) > 0);
900
901     p->drain_callback = cb;
902     p->drain_callback_userdata = userdata;
903 }
904
905 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
906     pa_assert(p);
907     pa_assert(PA_REFCNT_VALUE(p) > 0);
908
909     p->recieve_packet_callback = cb;
910     p->recieve_packet_callback_userdata = userdata;
911 }
912
913 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
914     pa_assert(p);
915     pa_assert(PA_REFCNT_VALUE(p) > 0);
916
917     p->recieve_memblock_callback = cb;
918     p->recieve_memblock_callback_userdata = userdata;
919 }
920
921 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
922     pa_assert(p);
923     pa_assert(PA_REFCNT_VALUE(p) > 0);
924
925     p->release_callback = cb;
926     p->release_callback_userdata = userdata;
927 }
928
929 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
930     pa_assert(p);
931     pa_assert(PA_REFCNT_VALUE(p) > 0);
932
933     p->release_callback = cb;
934     p->release_callback_userdata = userdata;
935 }
936
937 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
938     pa_bool_t b;
939
940     pa_assert(p);
941     pa_assert(PA_REFCNT_VALUE(p) > 0);
942
943     if (p->dead)
944         b = FALSE;
945     else
946         b = p->write.current || !pa_queue_isempty(p->send_queue);
947
948     return b;
949 }
950
951 void pa_pstream_unref(pa_pstream*p) {
952     pa_assert(p);
953     pa_assert(PA_REFCNT_VALUE(p) > 0);
954
955     if (PA_REFCNT_DEC(p) <= 0)
956         pstream_free(p);
957 }
958
959 pa_pstream* pa_pstream_ref(pa_pstream*p) {
960     pa_assert(p);
961     pa_assert(PA_REFCNT_VALUE(p) > 0);
962
963     PA_REFCNT_INC(p);
964     return p;
965 }
966
967 void pa_pstream_unlink(pa_pstream *p) {
968     pa_assert(p);
969
970     if (p->dead)
971         return;
972
973     p->dead = TRUE;
974
975     if (p->import) {
976         pa_memimport_free(p->import);
977         p->import = NULL;
978     }
979
980     if (p->export) {
981         pa_memexport_free(p->export);
982         p->export = NULL;
983     }
984
985     if (p->io) {
986         pa_iochannel_free(p->io);
987         p->io = NULL;
988     }
989
990     if (p->defer_event) {
991         p->mainloop->defer_free(p->defer_event);
992         p->defer_event = NULL;
993     }
994
995     p->die_callback = NULL;
996     p->drain_callback = NULL;
997     p->recieve_packet_callback = NULL;
998     p->recieve_memblock_callback = NULL;
999 }
1000
1001 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1002     pa_assert(p);
1003     pa_assert(PA_REFCNT_VALUE(p) > 0);
1004
1005     p->use_shm = enable;
1006
1007     if (enable) {
1008
1009         if (!p->export)
1010             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1011
1012     } else {
1013
1014         if (p->export) {
1015             pa_memexport_free(p->export);
1016             p->export = NULL;
1017         }
1018     }
1019 }
1020
1021 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1022     pa_assert(p);
1023     pa_assert(PA_REFCNT_VALUE(p) > 0);
1024
1025     return p->use_shm;
1026 }