4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as
11 published by the Free Software Foundation; either version 2.1 of the
12 License, or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 Lesser General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public
20 License along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
46 #include <pulse/xmalloc.h>
48 #include <pulsecore/queue.h>
49 #include <pulsecore/log.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/creds.h>
52 #include <pulsecore/mutex.h>
53 #include <pulsecore/refcnt.h>
57 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
58 #define PA_FLAG_SHMDATA 0x80000000LU
59 #define PA_FLAG_SHMRELEASE 0x40000000LU
60 #define PA_FLAG_SHMREVOKE 0xC0000000LU
61 #define PA_FLAG_SHMMASK 0xFF000000LU
62 #define PA_FLAG_SEEKMASK 0x000000FFLU
64 /* The sequence descriptor header consists of 5 32bit integers: */
66 PA_PSTREAM_DESCRIPTOR_LENGTH,
67 PA_PSTREAM_DESCRIPTOR_CHANNEL,
68 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
69 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
70 PA_PSTREAM_DESCRIPTOR_FLAGS,
71 PA_PSTREAM_DESCRIPTOR_MAX
74 /* If we have an SHM block, this info follows the descriptor */
76 PA_PSTREAM_SHM_BLOCKID,
79 PA_PSTREAM_SHM_LENGTH,
83 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
85 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
86 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
87 #define FRAME_SIZE_MAX_USE (1024*64)
91 PA_PSTREAM_ITEM_PACKET,
92 PA_PSTREAM_ITEM_MEMBLOCK,
93 PA_PSTREAM_ITEM_SHMRELEASE,
94 PA_PSTREAM_ITEM_SHMREVOKE
109 pa_seek_mode_t seek_mode;
111 /* release/revoke info */
118 pa_mainloop_api *mainloop;
119 pa_defer_event *defer_event;
121 pa_queue *send_queue;
127 pa_pstream_descriptor descriptor;
128 struct item_info* current;
129 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
135 pa_pstream_descriptor descriptor;
136 pa_memblock *memblock;
138 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
144 pa_memimport *import;
145 pa_memexport *export;
147 pa_pstream_packet_cb_t recieve_packet_callback;
148 void *recieve_packet_callback_userdata;
150 pa_pstream_memblock_cb_t recieve_memblock_callback;
151 void *recieve_memblock_callback_userdata;
153 pa_pstream_notify_cb_t drain_callback;
154 void *drain_callback_userdata;
156 pa_pstream_notify_cb_t die_callback;
157 void *die_callback_userdata;
162 pa_creds read_creds, write_creds;
163 int read_creds_valid, send_creds_now;
167 static int do_write(pa_pstream *p);
168 static int do_read(pa_pstream *p);
170 static void do_something(pa_pstream *p) {
172 assert(PA_REFCNT_VALUE(p) > 0);
176 pa_mutex_lock(p->mutex);
178 p->mainloop->defer_enable(p->defer_event, 0);
180 if (!p->dead && pa_iochannel_is_readable(p->io)) {
183 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
186 if (!p->dead && pa_iochannel_is_writable(p->io)) {
191 pa_mutex_unlock(p->mutex);
201 p->die_callback(p, p->die_callback_userdata);
203 pa_mutex_unlock(p->mutex);
208 static void io_callback(pa_iochannel*io, void *userdata) {
209 pa_pstream *p = userdata;
217 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
218 pa_pstream *p = userdata;
221 assert(p->defer_event == e);
222 assert(p->mainloop == m);
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
236 p = pa_xnew(pa_pstream, 1);
239 pa_iochannel_set_callback(io, io_callback, p);
242 p->mutex = pa_mutex_new(1);
245 p->defer_event = m->defer_new(m, defer_callback, p);
246 m->defer_enable(p->defer_event, 0);
248 p->send_queue = pa_queue_new();
249 assert(p->send_queue);
251 p->write.current = NULL;
253 p->read.memblock = NULL;
254 p->read.packet = NULL;
257 p->recieve_packet_callback = NULL;
258 p->recieve_packet_callback_userdata = NULL;
259 p->recieve_memblock_callback = NULL;
260 p->recieve_memblock_callback_userdata = NULL;
261 p->drain_callback = NULL;
262 p->drain_callback_userdata = NULL;
263 p->die_callback = NULL;
264 p->die_callback_userdata = NULL;
271 /* We do importing unconditionally */
272 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
274 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
275 pa_iochannel_socket_set_sndbuf(io, 1024*8);
278 p->send_creds_now = 0;
279 p->read_creds_valid = 0;
284 static void item_free(void *item, PA_GCC_UNUSED void *p) {
285 struct item_info *i = item;
288 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
289 assert(i->chunk.memblock);
290 pa_memblock_unref(i->chunk.memblock);
291 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
293 pa_packet_unref(i->packet);
299 static void pstream_free(pa_pstream *p) {
304 pa_queue_free(p->send_queue, item_free, NULL);
306 if (p->write.current)
307 item_free(p->write.current, NULL);
309 if (p->read.memblock)
310 pa_memblock_unref(p->read.memblock);
313 pa_packet_unref(p->read.packet);
316 pa_mutex_free(p->mutex);
321 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
325 assert(PA_REFCNT_VALUE(p) > 0);
328 pa_mutex_lock(p->mutex);
333 i = pa_xnew(struct item_info, 1);
334 i->type = PA_PSTREAM_ITEM_PACKET;
335 i->packet = pa_packet_ref(packet);
338 if ((i->with_creds = !!creds))
342 pa_queue_push(p->send_queue, i);
343 p->mainloop->defer_enable(p->defer_event, 1);
347 pa_mutex_unlock(p->mutex);
350 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
354 assert(PA_REFCNT_VALUE(p) > 0);
355 assert(channel != (uint32_t) -1);
358 pa_mutex_lock(p->mutex);
363 length = chunk->length;
370 i = pa_xnew(struct item_info, 1);
371 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
373 n = length < FRAME_SIZE_MAX_USE ? length : FRAME_SIZE_MAX_USE;
374 i->chunk.index = chunk->index + idx;
376 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
378 i->channel = channel;
380 i->seek_mode = seek_mode;
385 pa_queue_push(p->send_queue, i);
391 p->mainloop->defer_enable(p->defer_event, 1);
395 pa_mutex_unlock(p->mutex);
398 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
399 struct item_info *item;
400 pa_pstream *p = userdata;
403 assert(PA_REFCNT_VALUE(p) > 0);
405 pa_mutex_lock(p->mutex);
410 /* pa_log("Releasing block %u", block_id); */
412 item = pa_xnew(struct item_info, 1);
413 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
414 item->block_id = block_id;
416 item->with_creds = 0;
419 pa_queue_push(p->send_queue, item);
420 p->mainloop->defer_enable(p->defer_event, 1);
424 pa_mutex_unlock(p->mutex);
427 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
428 struct item_info *item;
429 pa_pstream *p = userdata;
432 assert(PA_REFCNT_VALUE(p) > 0);
434 pa_mutex_lock(p->mutex);
439 /* pa_log("Revoking block %u", block_id); */
441 item = pa_xnew(struct item_info, 1);
442 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
443 item->block_id = block_id;
445 item->with_creds = 0;
448 pa_queue_push(p->send_queue, item);
449 p->mainloop->defer_enable(p->defer_event, 1);
453 pa_mutex_unlock(p->mutex);
456 static void prepare_next_write_item(pa_pstream *p) {
458 assert(PA_REFCNT_VALUE(p) > 0);
460 if (!(p->write.current = pa_queue_pop(p->send_queue)))
464 p->write.data = NULL;
466 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
467 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
468 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
469 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
470 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
472 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
474 assert(p->write.current->packet);
475 p->write.data = p->write.current->packet->data;
476 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
478 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
480 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
483 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
486 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
490 int send_payload = 1;
492 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
493 assert(p->write.current->chunk.memblock);
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
497 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
499 flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
502 uint32_t block_id, shm_id;
503 size_t offset, length;
507 if (pa_memexport_put(p->export,
508 p->write.current->chunk.memblock,
514 flags |= PA_FLAG_SHMDATA;
517 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
518 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
519 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
520 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
522 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
523 p->write.data = p->write.shm_info;
526 /* pa_log_warn("Failed to export memory block."); */
530 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
531 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
534 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
538 if ((p->send_creds_now = p->write.current->with_creds))
539 p->write_creds = p->write.current->creds;
543 static int do_write(pa_pstream *p) {
549 assert(PA_REFCNT_VALUE(p) > 0);
551 if (!p->write.current)
552 prepare_next_write_item(p);
554 if (!p->write.current)
557 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
558 d = (uint8_t*) p->write.descriptor + p->write.index;
559 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
561 assert(p->write.data);
563 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
564 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
570 if (p->send_creds_now) {
572 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
575 p->send_creds_now = 0;
579 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
584 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
585 assert(p->write.current);
586 item_free(p->write.current, (void *) 1);
587 p->write.current = NULL;
589 if (p->drain_callback && !pa_pstream_is_pending(p))
590 p->drain_callback(p, p->drain_callback_userdata);
596 static int do_read(pa_pstream *p) {
602 assert(PA_REFCNT_VALUE(p) > 0);
604 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
605 d = (uint8_t*) p->read.descriptor + p->read.index;
606 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
608 assert(p->read.data);
609 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
610 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
617 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
620 p->read_creds_valid = p->read_creds_valid || b;
623 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
629 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
630 uint32_t flags, length, channel;
631 /* Reading of frame descriptor complete */
633 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
635 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
636 pa_log_warn("Recieved SHM frame on a socket where SHM is disabled.");
640 if (flags == PA_FLAG_SHMRELEASE) {
642 /* This is a SHM memblock release frame with no payload */
644 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
647 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
651 } else if (flags == PA_FLAG_SHMREVOKE) {
653 /* This is a SHM memblock revoke frame with no payload */
655 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
658 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
663 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
665 if (length > FRAME_SIZE_MAX_ALLOW) {
666 pa_log_warn("Recieved invalid frame size : %lu", (unsigned long) length);
670 assert(!p->read.packet && !p->read.memblock);
672 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
674 if (channel == (uint32_t) -1) {
677 pa_log_warn("Received packet frame with invalid flags value.");
681 /* Frame is a packet frame */
682 p->read.packet = pa_packet_new(length);
683 p->read.data = p->read.packet->data;
687 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
688 pa_log_warn("Received memblock frame with invalid seek mode.");
692 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
694 if (length != sizeof(p->read.shm_info)) {
695 pa_log_warn("Recieved SHM memblock frame with Invalid frame length.");
699 /* Frame is a memblock frame referencing an SHM memblock */
700 p->read.data = p->read.shm_info;
702 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
704 /* Frame is a memblock frame */
706 p->read.memblock = pa_memblock_new(p->mempool, length);
707 p->read.data = p->read.memblock->data;
710 pa_log_warn("Recieved memblock frame with invalid flags value.");
715 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
716 /* Frame payload available */
718 if (p->read.memblock && p->recieve_memblock_callback) {
720 /* Is this memblock data? Than pass it to the user */
721 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
726 chunk.memblock = p->read.memblock;
727 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
730 if (p->recieve_memblock_callback) {
734 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
735 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
737 p->recieve_memblock_callback(
739 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
741 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
743 p->recieve_memblock_callback_userdata);
746 /* Drop seek info for following callbacks */
747 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
748 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
749 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
754 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
756 if (p->read.memblock) {
758 /* This was a memblock frame. We can unref the memblock now */
759 pa_memblock_unref(p->read.memblock);
761 } else if (p->read.packet) {
763 if (p->recieve_packet_callback)
765 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
767 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
770 pa_packet_unref(p->read.packet);
774 assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
778 if (!(b = pa_memimport_get(p->import,
779 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
780 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
781 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
782 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
784 pa_log_warn("Failed to import memory block.");
788 if (p->recieve_memblock_callback) {
794 chunk.length = b->length;
797 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
798 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
800 p->recieve_memblock_callback(
802 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
804 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
806 p->recieve_memblock_callback_userdata);
809 pa_memblock_unref(b);
819 p->read.memblock = NULL;
820 p->read.packet = NULL;
824 p->read_creds_valid = 0;
830 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
832 assert(PA_REFCNT_VALUE(p) > 0);
834 pa_mutex_lock(p->mutex);
835 p->die_callback = cb;
836 p->die_callback_userdata = userdata;
837 pa_mutex_unlock(p->mutex);
840 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
842 assert(PA_REFCNT_VALUE(p) > 0);
844 pa_mutex_lock(p->mutex);
845 p->drain_callback = cb;
846 p->drain_callback_userdata = userdata;
847 pa_mutex_unlock(p->mutex);
850 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
852 assert(PA_REFCNT_VALUE(p) > 0);
854 pa_mutex_lock(p->mutex);
855 p->recieve_packet_callback = cb;
856 p->recieve_packet_callback_userdata = userdata;
857 pa_mutex_unlock(p->mutex);
860 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
862 assert(PA_REFCNT_VALUE(p) > 0);
864 pa_mutex_lock(p->mutex);
865 p->recieve_memblock_callback = cb;
866 p->recieve_memblock_callback_userdata = userdata;
867 pa_mutex_unlock(p->mutex);
870 int pa_pstream_is_pending(pa_pstream *p) {
874 assert(PA_REFCNT_VALUE(p) > 0);
876 pa_mutex_lock(p->mutex);
881 b = p->write.current || !pa_queue_is_empty(p->send_queue);
883 pa_mutex_unlock(p->mutex);
888 void pa_pstream_unref(pa_pstream*p) {
890 assert(PA_REFCNT_VALUE(p) > 0);
892 if (PA_REFCNT_DEC(p) <= 0)
896 pa_pstream* pa_pstream_ref(pa_pstream*p) {
898 assert(PA_REFCNT_VALUE(p) > 0);
904 void pa_pstream_close(pa_pstream *p) {
907 pa_mutex_lock(p->mutex);
912 pa_memimport_free(p->import);
917 pa_memexport_free(p->export);
922 pa_iochannel_free(p->io);
926 if (p->defer_event) {
927 p->mainloop->defer_free(p->defer_event);
928 p->defer_event = NULL;
931 p->die_callback = NULL;
932 p->drain_callback = NULL;
933 p->recieve_packet_callback = NULL;
934 p->recieve_memblock_callback = NULL;
936 pa_mutex_unlock(p->mutex);
939 void pa_pstream_use_shm(pa_pstream *p, int enable) {
941 assert(PA_REFCNT_VALUE(p) > 0);
943 pa_mutex_lock(p->mutex);
950 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
955 pa_memexport_free(p->export);
960 pa_mutex_unlock(p->mutex);