4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
46 #include <pulse/xmalloc.h>
48 #include <pulsecore/queue.h>
49 #include <pulsecore/log.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/creds.h>
52 #include <pulsecore/refcnt.h>
56 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
57 #define PA_FLAG_SHMDATA 0x80000000LU
58 #define PA_FLAG_SHMRELEASE 0x40000000LU
59 #define PA_FLAG_SHMREVOKE 0xC0000000LU
60 #define PA_FLAG_SHMMASK 0xFF000000LU
61 #define PA_FLAG_SEEKMASK 0x000000FFLU
63 /* The sequence descriptor header consists of 5 32bit integers: */
65 PA_PSTREAM_DESCRIPTOR_LENGTH,
66 PA_PSTREAM_DESCRIPTOR_CHANNEL,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
69 PA_PSTREAM_DESCRIPTOR_FLAGS,
70 PA_PSTREAM_DESCRIPTOR_MAX
73 /* If we have an SHM block, this info follows the descriptor */
75 PA_PSTREAM_SHM_BLOCKID,
78 PA_PSTREAM_SHM_LENGTH,
82 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
84 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
85 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 #define FRAME_SIZE_MAX_USE (1024*64)
90 PA_PSTREAM_ITEM_PACKET,
91 PA_PSTREAM_ITEM_MEMBLOCK,
92 PA_PSTREAM_ITEM_SHMRELEASE,
93 PA_PSTREAM_ITEM_SHMREVOKE
108 pa_seek_mode_t seek_mode;
110 /* release/revoke info */
117 pa_mainloop_api *mainloop;
118 pa_defer_event *defer_event;
121 pa_queue *send_queue;
126 pa_pstream_descriptor descriptor;
127 struct item_info* current;
128 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
131 pa_memchunk memchunk;
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
144 pa_memimport *import;
145 pa_memexport *export;
147 pa_pstream_packet_cb_t recieve_packet_callback;
148 void *recieve_packet_callback_userdata;
150 pa_pstream_memblock_cb_t recieve_memblock_callback;
151 void *recieve_memblock_callback_userdata;
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
162 pa_creds read_creds, write_creds;
163 int read_creds_valid, send_creds_now;
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
170 static void do_something(pa_pstream *p) {
172 assert(PA_REFCNT_VALUE(p) > 0);
176 p->mainloop->defer_enable(p->defer_event, 0);
178 if (!p->dead && pa_iochannel_is_readable(p->io)) {
181 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
184 if (!p->dead && pa_iochannel_is_writable(p->io)) {
197 p->die_callback(p, p->die_callback_userdata);
202 static void io_callback(pa_iochannel*io, void *userdata) {
203 pa_pstream *p = userdata;
211 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
213 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
220 p = pa_xnew(pa_pstream, 1);
223 pa_iochannel_set_callback(io, io_callback, p);
228 p->send_queue = pa_queue_new();
229 assert(p->send_queue);
231 p->write.current = NULL;
233 pa_memchunk_reset(&p->write.memchunk);
234 p->read.memblock = NULL;
235 p->read.packet = NULL;
238 p->recieve_packet_callback = NULL;
239 p->recieve_packet_callback_userdata = NULL;
240 p->recieve_memblock_callback = NULL;
241 p->recieve_memblock_callback_userdata = NULL;
242 p->drain_callback = NULL;
243 p->drain_callback_userdata = NULL;
244 p->die_callback = NULL;
245 p->die_callback_userdata = NULL;
252 /* We do importing unconditionally */
253 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
255 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
256 pa_iochannel_socket_set_sndbuf(io, 1024*8);
259 p->send_creds_now = 0;
260 p->read_creds_valid = 0;
265 static void item_free(void *item, PA_GCC_UNUSED void *p) {
266 struct item_info *i = item;
269 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
270 assert(i->chunk.memblock);
271 pa_memblock_unref(i->chunk.memblock);
272 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
274 pa_packet_unref(i->packet);
280 static void pstream_free(pa_pstream *p) {
285 pa_queue_free(p->send_queue, item_free, NULL);
287 if (p->write.current)
288 item_free(p->write.current, NULL);
290 if (p->read.memblock)
291 pa_memblock_unref(p->read.memblock);
294 pa_packet_unref(p->read.packet);
296 if (p->write.memchunk.memblock)
297 pa_memblock_unref(p->write.memchunk.memblock);
302 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
306 assert(PA_REFCNT_VALUE(p) > 0);
312 i = pa_xnew(struct item_info, 1);
313 i->type = PA_PSTREAM_ITEM_PACKET;
314 i->packet = pa_packet_ref(packet);
317 if ((i->with_creds = !!creds))
321 pa_queue_push(p->send_queue, i);
324 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
328 assert(PA_REFCNT_VALUE(p) > 0);
329 assert(channel != (uint32_t) -1);
341 i = pa_xnew(struct item_info, 1);
342 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
344 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
345 i->chunk.index = chunk->index + idx;
347 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
349 i->channel = channel;
351 i->seek_mode = seek_mode;
356 pa_queue_push(p->send_queue, i);
362 p->mainloop->defer_enable(p->defer_event, 1);
365 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
366 struct item_info *item;
367 pa_pstream *p = userdata;
370 assert(PA_REFCNT_VALUE(p) > 0);
375 /* pa_log("Releasing block %u", block_id); */
377 item = pa_xnew(struct item_info, 1);
378 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
379 item->block_id = block_id;
381 item->with_creds = 0;
384 pa_queue_push(p->send_queue, item);
387 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
388 struct item_info *item;
389 pa_pstream *p = userdata;
392 assert(PA_REFCNT_VALUE(p) > 0);
396 /* pa_log("Revoking block %u", block_id); */
398 item = pa_xnew(struct item_info, 1);
399 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
400 item->block_id = block_id;
402 item->with_creds = 0;
405 pa_queue_push(p->send_queue, item);
408 static void prepare_next_write_item(pa_pstream *p) {
410 assert(PA_REFCNT_VALUE(p) > 0);
412 p->write.current = pa_queue_pop(p->send_queue);
414 if (!p->write.current)
418 p->write.data = NULL;
419 pa_memchunk_reset(&p->write.memchunk);
421 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
422 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
423 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
424 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
425 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
427 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
429 assert(p->write.current->packet);
430 p->write.data = p->write.current->packet->data;
431 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
433 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
435 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
436 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
438 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
440 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
441 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
445 int send_payload = 1;
447 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
448 assert(p->write.current->chunk.memblock);
450 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
451 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
452 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
454 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
457 uint32_t block_id, shm_id;
458 size_t offset, length;
462 if (pa_memexport_put(p->export,
463 p->write.current->chunk.memblock,
469 flags |= PA_FLAG_SHMDATA;
472 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
473 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
474 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
475 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
477 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
478 p->write.data = p->write.shm_info;
481 /* pa_log_warn("Failed to export memory block."); */
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
486 p->write.memchunk = p->write.current->chunk;
487 pa_memblock_ref(p->write.memchunk.memblock);
488 p->write.data = NULL;
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
495 if ((p->send_creds_now = p->write.current->with_creds))
496 p->write_creds = p->write.current->creds;
500 static int do_write(pa_pstream *p) {
504 pa_memblock *release_memblock = NULL;
507 assert(PA_REFCNT_VALUE(p) > 0);
509 if (!p->write.current)
510 prepare_next_write_item(p);
512 if (!p->write.current)
515 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
516 d = (uint8_t*) p->write.descriptor + p->write.index;
517 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
519 assert(p->write.data || p->write.memchunk.memblock);
524 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
525 release_memblock = p->write.memchunk.memblock;
528 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
529 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
535 if (p->send_creds_now) {
537 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
540 p->send_creds_now = 0;
544 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
547 if (release_memblock)
548 pa_memblock_release(release_memblock);
552 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
553 assert(p->write.current);
554 item_free(p->write.current, (void *) 1);
555 p->write.current = NULL;
557 if (p->drain_callback && !pa_pstream_is_pending(p))
558 p->drain_callback(p, p->drain_callback_userdata);
565 if (release_memblock)
566 pa_memblock_release(release_memblock);
571 static int do_read(pa_pstream *p) {
575 pa_memblock *release_memblock = NULL;
577 assert(PA_REFCNT_VALUE(p) > 0);
579 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
580 d = (uint8_t*) p->read.descriptor + p->read.index;
581 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
583 assert(p->read.data || p->read.memblock);
588 d = pa_memblock_acquire(p->read.memblock);
589 release_memblock = p->read.memblock;
592 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
593 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
600 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
603 p->read_creds_valid = p->read_creds_valid || b;
606 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
610 if (release_memblock)
611 pa_memblock_release(release_memblock);
615 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
616 uint32_t flags, length, channel;
617 /* Reading of frame descriptor complete */
619 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
621 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
622 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
626 if (flags == PA_FLAG_SHMRELEASE) {
628 /* This is a SHM memblock release frame with no payload */
630 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
633 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
637 } else if (flags == PA_FLAG_SHMREVOKE) {
639 /* This is a SHM memblock revoke frame with no payload */
641 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
644 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
649 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
651 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
652 pa_log_warn("Recieved invalid frame size: %lu", (unsigned long) length);
656 assert(!p->read.packet && !p->read.memblock);
658 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
660 if (channel == (uint32_t) -1) {
663 pa_log_warn("Received packet frame with invalid flags value.");
667 /* Frame is a packet frame */
668 p->read.packet = pa_packet_new(length);
669 p->read.data = p->read.packet->data;
673 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
674 pa_log_warn("Received memblock frame with invalid seek mode.");
678 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
680 if (length != sizeof(p->read.shm_info)) {
681 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
685 /* Frame is a memblock frame referencing an SHM memblock */
686 p->read.data = p->read.shm_info;
688 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
690 /* Frame is a memblock frame */
692 p->read.memblock = pa_memblock_new(p->mempool, length);
696 pa_log_warn("Recieved memblock frame with invalid flags value.");
701 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
702 /* Frame payload available */
704 if (p->read.memblock && p->recieve_memblock_callback) {
706 /* Is this memblock data? Than pass it to the user */
707 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
712 chunk.memblock = p->read.memblock;
713 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
716 if (p->recieve_memblock_callback) {
720 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
721 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
723 p->recieve_memblock_callback(
725 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
727 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
729 p->recieve_memblock_callback_userdata);
732 /* Drop seek info for following callbacks */
733 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
734 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
735 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
740 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
742 if (p->read.memblock) {
744 /* This was a memblock frame. We can unref the memblock now */
745 pa_memblock_unref(p->read.memblock);
747 } else if (p->read.packet) {
749 if (p->recieve_packet_callback)
751 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
753 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
756 pa_packet_unref(p->read.packet);
760 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
764 if (!(b = pa_memimport_get(p->import,
765 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
766 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
767 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
768 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
770 pa_log_warn("Failed to import memory block.");
774 if (p->recieve_memblock_callback) {
780 chunk.length = pa_memblock_get_length(b);
783 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
784 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
786 p->recieve_memblock_callback(
788 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
790 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
792 p->recieve_memblock_callback_userdata);
795 pa_memblock_unref(b);
805 p->read.memblock = NULL;
806 p->read.packet = NULL;
811 p->read_creds_valid = 0;
817 if (release_memblock)
818 pa_memblock_release(release_memblock);
823 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
825 assert(PA_REFCNT_VALUE(p) > 0);
827 p->die_callback = cb;
828 p->die_callback_userdata = userdata;
831 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
833 assert(PA_REFCNT_VALUE(p) > 0);
835 p->drain_callback = cb;
836 p->drain_callback_userdata = userdata;
839 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
841 assert(PA_REFCNT_VALUE(p) > 0);
843 p->recieve_packet_callback = cb;
844 p->recieve_packet_callback_userdata = userdata;
847 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
849 assert(PA_REFCNT_VALUE(p) > 0);
851 p->recieve_memblock_callback = cb;
852 p->recieve_memblock_callback_userdata = userdata;
855 int pa_pstream_is_pending(pa_pstream *p) {
859 assert(PA_REFCNT_VALUE(p) > 0);
864 b = p->write.current || !pa_queue_is_empty(p->send_queue);
869 void pa_pstream_unref(pa_pstream*p) {
871 assert(PA_REFCNT_VALUE(p) > 0);
873 if (PA_REFCNT_DEC(p) <= 0)
877 pa_pstream* pa_pstream_ref(pa_pstream*p) {
879 assert(PA_REFCNT_VALUE(p) > 0);
885 void pa_pstream_close(pa_pstream *p) {
891 pa_memimport_free(p->import);
896 pa_memexport_free(p->export);
901 pa_iochannel_free(p->io);
905 p->die_callback = NULL;
906 p->drain_callback = NULL;
907 p->recieve_packet_callback = NULL;
908 p->recieve_memblock_callback = NULL;
911 void pa_pstream_use_shm(pa_pstream *p, int enable) {
913 assert(PA_REFCNT_VALUE(p) > 0);
920 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
925 pa_memexport_free(p->export);