2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
28 #include <pulse/xmalloc.h>
30 #include <pulsecore/log.h>
31 #include <pulsecore/mcalign.h>
32 #include <pulsecore/macro.h>
33 #include <pulsecore/flist.h>
35 #include "memblockq.h"
37 /* #define MEMBLOCKQ_DEBUG */
40 struct list_item *next, *prev;
45 PA_STATIC_FLIST_DECLARE(list_items, 0, pa_xfree);
48 struct list_item *blocks, *blocks_tail;
49 struct list_item *current_read, *current_write;
51 size_t maxlength, tlength, base, prebuf, minreq, maxrewind;
52 int64_t read_index, write_index;
56 int64_t missing, requested;
58 pa_sample_spec sample_spec;
61 pa_memblockq* pa_memblockq_new(
66 const pa_sample_spec *sample_spec,
70 pa_memchunk *silence) {
74 pa_assert(sample_spec);
77 bq = pa_xnew0(pa_memblockq, 1);
78 bq->name = pa_xstrdup(name);
80 bq->sample_spec = *sample_spec;
81 bq->base = pa_frame_size(sample_spec);
82 bq->read_index = bq->write_index = idx;
84 pa_log_debug("memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu maxrewind=%lu",
85 (unsigned long) maxlength, (unsigned long) tlength, (unsigned long) bq->base, (unsigned long) prebuf, (unsigned long) minreq, (unsigned long) maxrewind);
89 pa_memblockq_set_maxlength(bq, maxlength);
90 pa_memblockq_set_tlength(bq, tlength);
91 pa_memblockq_set_minreq(bq, minreq);
92 pa_memblockq_set_prebuf(bq, prebuf);
93 pa_memblockq_set_maxrewind(bq, maxrewind);
95 pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu maxrewind=%lu",
96 (unsigned long) bq->maxlength, (unsigned long) bq->tlength, (unsigned long) bq->base, (unsigned long) bq->prebuf, (unsigned long) bq->minreq, (unsigned long) bq->maxrewind);
99 bq->silence = *silence;
100 pa_memblock_ref(bq->silence.memblock);
103 bq->mcalign = pa_mcalign_new(bq->base);
108 void pa_memblockq_free(pa_memblockq* bq) {
111 pa_memblockq_silence(bq);
113 if (bq->silence.memblock)
114 pa_memblock_unref(bq->silence.memblock);
117 pa_mcalign_free(bq->mcalign);
123 static void fix_current_read(pa_memblockq *bq) {
126 if (PA_UNLIKELY(!bq->blocks)) {
127 bq->current_read = NULL;
131 if (PA_UNLIKELY(!bq->current_read))
132 bq->current_read = bq->blocks;
135 while (PA_UNLIKELY(bq->current_read->index > bq->read_index))
137 if (bq->current_read->prev)
138 bq->current_read = bq->current_read->prev;
143 while (PA_LIKELY(bq->current_read != NULL) && PA_UNLIKELY(bq->current_read->index + (int64_t) bq->current_read->chunk.length <= bq->read_index))
144 bq->current_read = bq->current_read->next;
146 /* At this point current_read will either point at or left of the
147 next block to play. It may be NULL in case everything in
148 the queue was already played */
151 static void fix_current_write(pa_memblockq *bq) {
154 if (PA_UNLIKELY(!bq->blocks)) {
155 bq->current_write = NULL;
159 if (PA_UNLIKELY(!bq->current_write))
160 bq->current_write = bq->blocks_tail;
163 while (PA_UNLIKELY(bq->current_write->index + (int64_t) bq->current_write->chunk.length <= bq->write_index))
165 if (bq->current_write->next)
166 bq->current_write = bq->current_write->next;
171 while (PA_LIKELY(bq->current_write != NULL) && PA_UNLIKELY(bq->current_write->index > bq->write_index))
172 bq->current_write = bq->current_write->prev;
174 /* At this point current_write will either point at or right of
175 the next block to write data to. It may be NULL in case
176 everything in the queue is still to be played */
179 static void drop_block(pa_memblockq *bq, struct list_item *q) {
183 pa_assert(bq->n_blocks >= 1);
186 q->prev->next = q->next;
188 pa_assert(bq->blocks == q);
189 bq->blocks = q->next;
193 q->next->prev = q->prev;
195 pa_assert(bq->blocks_tail == q);
196 bq->blocks_tail = q->prev;
199 if (bq->current_write == q)
200 bq->current_write = q->prev;
202 if (bq->current_read == q)
203 bq->current_read = q->next;
205 pa_memblock_unref(q->chunk.memblock);
207 if (pa_flist_push(PA_STATIC_FLIST_GET(list_items), q) < 0)
213 static void drop_backlog(pa_memblockq *bq) {
217 boundary = bq->read_index - (int64_t) bq->maxrewind;
219 while (bq->blocks && (bq->blocks->index + (int64_t) bq->blocks->chunk.length <= boundary))
220 drop_block(bq, bq->blocks);
223 static bool can_push(pa_memblockq *bq, size_t l) {
228 if (bq->read_index > bq->write_index) {
229 int64_t d = bq->read_index - bq->write_index;
237 end = bq->blocks_tail ? bq->blocks_tail->index + (int64_t) bq->blocks_tail->chunk.length : bq->write_index;
239 /* Make sure that the list doesn't get too long */
240 if (bq->write_index + (int64_t) l > end)
241 if (bq->write_index + (int64_t) l - bq->read_index > (int64_t) bq->maxlength)
247 static void write_index_changed(pa_memblockq *bq, int64_t old_write_index, bool account) {
252 delta = bq->write_index - old_write_index;
255 bq->requested -= delta;
257 bq->missing -= delta;
259 #ifdef MEMBLOCKQ_DEBUG
260 pa_log_debug("[%s] pushed/seeked %lli: requested counter at %lli, account=%i", bq->name, (long long) delta, (long long) bq->requested, account);
264 static void read_index_changed(pa_memblockq *bq, int64_t old_read_index) {
269 delta = bq->read_index - old_read_index;
270 bq->missing += delta;
272 #ifdef MEMBLOCKQ_DEBUG
273 pa_log_debug("[%s] popped %lli: missing counter at %lli", bq->name, (long long) delta, (long long) bq->missing);
277 int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {
278 struct list_item *q, *n;
284 pa_assert(uchunk->memblock);
285 pa_assert(uchunk->length > 0);
286 pa_assert(uchunk->index + uchunk->length <= pa_memblock_get_length(uchunk->memblock));
288 pa_assert(uchunk->length % bq->base == 0);
289 pa_assert(uchunk->index % bq->base == 0);
291 if (!can_push(bq, uchunk->length))
294 old = bq->write_index;
297 fix_current_write(bq);
298 q = bq->current_write;
300 /* First we advance the q pointer right of where we want to
304 while (bq->write_index + (int64_t) chunk.length > q->index)
314 /* We go from back to front to look for the right place to add
315 * this new entry. Drop data we will overwrite on the way */
319 if (bq->write_index >= q->index + (int64_t) q->chunk.length)
320 /* We found the entry where we need to place the new entry immediately after */
322 else if (bq->write_index + (int64_t) chunk.length <= q->index) {
323 /* This entry isn't touched at all, let's skip it */
325 } else if (bq->write_index <= q->index &&
326 bq->write_index + (int64_t) chunk.length >= q->index + (int64_t) q->chunk.length) {
328 /* This entry is fully replaced by the new entry, so let's drop it */
334 } else if (bq->write_index >= q->index) {
335 /* The write index points into this memblock, so let's
336 * truncate or split it */
338 if (bq->write_index + (int64_t) chunk.length < q->index + (int64_t) q->chunk.length) {
340 /* We need to save the end of this memchunk */
344 /* Create a new list entry for the end of the memchunk */
345 if (!(p = pa_flist_pop(PA_STATIC_FLIST_GET(list_items))))
346 p = pa_xnew(struct list_item, 1);
349 pa_memblock_ref(p->chunk.memblock);
351 /* Calculate offset */
352 d = (size_t) (bq->write_index + (int64_t) chunk.length - q->index);
355 /* Drop it from the new entry */
356 p->index = q->index + (int64_t) d;
357 p->chunk.length -= d;
359 /* Add it to the list */
361 if ((p->next = q->next))
370 /* Truncate the chunk */
371 if (!(q->chunk.length = (size_t) (bq->write_index - q->index))) {
378 /* We had to truncate this block, hence we're now at the right position */
383 pa_assert(bq->write_index + (int64_t)chunk.length > q->index &&
384 bq->write_index + (int64_t)chunk.length < q->index + (int64_t)q->chunk.length &&
385 bq->write_index < q->index);
387 /* The job overwrites the current entry at the end, so let's drop the beginning of this entry */
389 d = (size_t) (bq->write_index + (int64_t) chunk.length - q->index);
390 q->index += (int64_t) d;
392 q->chunk.length -= d;
399 pa_assert(bq->write_index >= q->index + (int64_t)q->chunk.length);
400 pa_assert(!q->next || (bq->write_index + (int64_t)chunk.length <= q->next->index));
402 /* Try to merge memory blocks */
404 if (q->chunk.memblock == chunk.memblock &&
405 q->chunk.index + q->chunk.length == chunk.index &&
406 bq->write_index == q->index + (int64_t) q->chunk.length) {
408 q->chunk.length += chunk.length;
409 bq->write_index += (int64_t) chunk.length;
413 pa_assert(!bq->blocks || (bq->write_index + (int64_t)chunk.length <= bq->blocks->index));
415 if (!(n = pa_flist_pop(PA_STATIC_FLIST_GET(list_items))))
416 n = pa_xnew(struct list_item, 1);
419 pa_memblock_ref(n->chunk.memblock);
420 n->index = bq->write_index;
421 bq->write_index += (int64_t) n->chunk.length;
423 n->next = q ? q->next : bq->blocks;
440 write_index_changed(bq, old, true);
444 bool pa_memblockq_prebuf_active(pa_memblockq *bq) {
448 return pa_memblockq_get_length(bq) < bq->prebuf;
450 return bq->prebuf > 0 && bq->read_index >= bq->write_index;
453 static bool update_prebuf(pa_memblockq *bq) {
458 if (pa_memblockq_get_length(bq) < bq->prebuf)
461 bq->in_prebuf = false;
465 if (bq->prebuf > 0 && bq->read_index >= bq->write_index) {
466 bq->in_prebuf = true;
474 int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
479 /* We need to pre-buffer */
480 if (update_prebuf(bq))
483 fix_current_read(bq);
485 /* Do we need to spit out silence? */
486 if (!bq->current_read || bq->current_read->index > bq->read_index) {
489 /* How much silence shall we return? */
490 if (bq->current_read)
491 length = (size_t) (bq->current_read->index - bq->read_index);
492 else if (bq->write_index > bq->read_index)
493 length = (size_t) (bq->write_index - bq->read_index);
497 /* We need to return silence, since no data is yet available */
498 if (bq->silence.memblock) {
499 *chunk = bq->silence;
500 pa_memblock_ref(chunk->memblock);
502 if (length > 0 && length < chunk->length)
503 chunk->length = length;
507 /* If the memblockq is empty, return -1, otherwise return
508 * the time to sleep */
512 chunk->memblock = NULL;
513 chunk->length = length;
520 /* Ok, let's pass real data to the caller */
521 *chunk = bq->current_read->chunk;
522 pa_memblock_ref(chunk->memblock);
524 pa_assert(bq->read_index >= bq->current_read->index);
525 d = bq->read_index - bq->current_read->index;
526 chunk->index += (size_t) d;
527 chunk->length -= (size_t) d;
532 int pa_memblockq_peek_fixed_size(pa_memblockq *bq, size_t block_size, pa_memchunk *chunk) {
534 pa_memchunk tchunk, rchunk;
536 struct list_item *item;
539 pa_assert(block_size > 0);
541 pa_assert(bq->silence.memblock);
543 if (pa_memblockq_peek(bq, &tchunk) < 0)
546 if (tchunk.length >= block_size) {
548 chunk->length = block_size;
552 pool = pa_memblock_get_pool(tchunk.memblock);
553 rchunk.memblock = pa_memblock_new(pool, block_size);
555 rchunk.length = tchunk.length;
556 pa_mempool_unref(pool), pool = NULL;
558 pa_memchunk_memcpy(&rchunk, &tchunk);
559 pa_memblock_unref(tchunk.memblock);
561 rchunk.index += tchunk.length;
563 /* We don't need to call fix_current_read() here, since
564 * pa_memblock_peek() already did that */
565 item = bq->current_read;
566 ri = bq->read_index + tchunk.length;
568 while (rchunk.index < block_size) {
570 if (!item || item->index > ri) {
571 /* Do we need to append silence? */
572 tchunk = bq->silence;
575 tchunk.length = PA_MIN(tchunk.length, (size_t) (item->index - ri));
580 /* We can append real data! */
581 tchunk = item->chunk;
583 d = ri - item->index;
584 tchunk.index += (size_t) d;
585 tchunk.length -= (size_t) d;
587 /* Go to next item for the next iteration */
591 rchunk.length = tchunk.length = PA_MIN(tchunk.length, block_size - rchunk.index);
592 pa_memchunk_memcpy(&rchunk, &tchunk);
594 rchunk.index += rchunk.length;
599 rchunk.length = block_size;
605 void pa_memblockq_drop(pa_memblockq *bq, size_t length) {
608 pa_assert(length % bq->base == 0);
610 old = bq->read_index;
614 /* Do not drop any data when we are in prebuffering mode */
615 if (update_prebuf(bq))
618 fix_current_read(bq);
620 if (bq->current_read) {
623 /* We go through this piece by piece to make sure we don't
624 * drop more than allowed by prebuf */
626 p = bq->current_read->index + (int64_t) bq->current_read->chunk.length;
627 pa_assert(p >= bq->read_index);
628 d = p - bq->read_index;
630 if (d > (int64_t) length)
631 d = (int64_t) length;
634 length -= (size_t) d;
638 /* The list is empty, there's nothing we could drop */
639 bq->read_index += (int64_t) length;
645 read_index_changed(bq, old);
648 void pa_memblockq_rewind(pa_memblockq *bq, size_t length) {
651 pa_assert(length % bq->base == 0);
653 old = bq->read_index;
655 /* This is kind of the inverse of pa_memblockq_drop() */
657 bq->read_index -= (int64_t) length;
659 read_index_changed(bq, old);
662 bool pa_memblockq_is_readable(pa_memblockq *bq) {
665 if (pa_memblockq_prebuf_active(bq))
668 if (pa_memblockq_get_length(bq) <= 0)
674 size_t pa_memblockq_get_length(pa_memblockq *bq) {
677 if (bq->write_index <= bq->read_index)
680 return (size_t) (bq->write_index - bq->read_index);
683 void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek, bool account) {
687 old = bq->write_index;
690 case PA_SEEK_RELATIVE:
691 bq->write_index += offset;
693 case PA_SEEK_ABSOLUTE:
694 bq->write_index = offset;
696 case PA_SEEK_RELATIVE_ON_READ:
697 bq->write_index = bq->read_index + offset;
699 case PA_SEEK_RELATIVE_END:
700 bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t) bq->blocks_tail->chunk.length : bq->read_index) + offset;
703 pa_assert_not_reached();
707 write_index_changed(bq, old, account);
710 void pa_memblockq_flush_write(pa_memblockq *bq, bool account) {
714 pa_memblockq_silence(bq);
716 old = bq->write_index;
717 bq->write_index = bq->read_index;
719 pa_memblockq_prebuf_force(bq);
720 write_index_changed(bq, old, account);
723 void pa_memblockq_flush_read(pa_memblockq *bq) {
727 pa_memblockq_silence(bq);
729 old = bq->read_index;
730 bq->read_index = bq->write_index;
732 pa_memblockq_prebuf_force(bq);
733 read_index_changed(bq, old);
736 size_t pa_memblockq_get_tlength(pa_memblockq *bq) {
742 size_t pa_memblockq_get_minreq(pa_memblockq *bq) {
748 size_t pa_memblockq_get_maxrewind(pa_memblockq *bq) {
751 return bq->maxrewind;
754 int64_t pa_memblockq_get_read_index(pa_memblockq *bq) {
757 return bq->read_index;
760 int64_t pa_memblockq_get_write_index(pa_memblockq *bq) {
763 return bq->write_index;
766 int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {
773 return pa_memblockq_push(bq, chunk);
775 if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length)))
778 pa_mcalign_push(bq->mcalign, chunk);
780 while (pa_mcalign_pop(bq->mcalign, &rchunk) >= 0) {
782 r = pa_memblockq_push(bq, &rchunk);
783 pa_memblock_unref(rchunk.memblock);
786 pa_mcalign_flush(bq->mcalign);
794 void pa_memblockq_prebuf_disable(pa_memblockq *bq) {
797 bq->in_prebuf = false;
800 void pa_memblockq_prebuf_force(pa_memblockq *bq) {
804 bq->in_prebuf = true;
807 size_t pa_memblockq_get_maxlength(pa_memblockq *bq) {
810 return bq->maxlength;
813 size_t pa_memblockq_get_prebuf(pa_memblockq *bq) {
819 size_t pa_memblockq_pop_missing(pa_memblockq *bq) {
824 #ifdef MEMBLOCKQ_DEBUG
825 pa_log_debug("[%s] pop: %lli", bq->name, (long long) bq->missing);
828 if (bq->missing <= 0)
831 if (((size_t) bq->missing < bq->minreq) &&
832 !pa_memblockq_prebuf_active(bq))
835 l = (size_t) bq->missing;
837 bq->requested += bq->missing;
840 #ifdef MEMBLOCKQ_DEBUG
841 pa_log_debug("[%s] sent %lli: request counter is at %lli", bq->name, (long long) l, (long long) bq->requested);
847 void pa_memblockq_set_maxlength(pa_memblockq *bq, size_t maxlength) {
850 bq->maxlength = ((maxlength+bq->base-1)/bq->base)*bq->base;
852 if (bq->maxlength < bq->base)
853 bq->maxlength = bq->base;
855 if (bq->tlength > bq->maxlength)
856 pa_memblockq_set_tlength(bq, bq->maxlength);
859 void pa_memblockq_set_tlength(pa_memblockq *bq, size_t tlength) {
863 if (tlength <= 0 || tlength == (size_t) -1)
864 tlength = bq->maxlength;
866 old_tlength = bq->tlength;
867 bq->tlength = ((tlength+bq->base-1)/bq->base)*bq->base;
869 if (bq->tlength > bq->maxlength)
870 bq->tlength = bq->maxlength;
872 if (bq->minreq > bq->tlength)
873 pa_memblockq_set_minreq(bq, bq->tlength);
875 if (bq->prebuf > bq->tlength+bq->base-bq->minreq)
876 pa_memblockq_set_prebuf(bq, bq->tlength+bq->base-bq->minreq);
878 bq->missing += (int64_t) bq->tlength - (int64_t) old_tlength;
881 void pa_memblockq_set_minreq(pa_memblockq *bq, size_t minreq) {
884 bq->minreq = (minreq/bq->base)*bq->base;
886 if (bq->minreq > bq->tlength)
887 bq->minreq = bq->tlength;
889 if (bq->minreq < bq->base)
890 bq->minreq = bq->base;
892 if (bq->prebuf > bq->tlength+bq->base-bq->minreq)
893 pa_memblockq_set_prebuf(bq, bq->tlength+bq->base-bq->minreq);
896 void pa_memblockq_set_prebuf(pa_memblockq *bq, size_t prebuf) {
899 if (prebuf == (size_t) -1)
900 prebuf = bq->tlength+bq->base-bq->minreq;
902 bq->prebuf = ((prebuf+bq->base-1)/bq->base)*bq->base;
904 if (prebuf > 0 && bq->prebuf < bq->base)
905 bq->prebuf = bq->base;
907 if (bq->prebuf > bq->tlength+bq->base-bq->minreq)
908 bq->prebuf = bq->tlength+bq->base-bq->minreq;
910 if (bq->prebuf <= 0 || pa_memblockq_get_length(bq) >= bq->prebuf)
911 bq->in_prebuf = false;
914 void pa_memblockq_set_maxrewind(pa_memblockq *bq, size_t maxrewind) {
917 bq->maxrewind = (maxrewind/bq->base)*bq->base;
920 void pa_memblockq_apply_attr(pa_memblockq *bq, const pa_buffer_attr *a) {
924 pa_memblockq_set_maxlength(bq, a->maxlength);
925 pa_memblockq_set_tlength(bq, a->tlength);
926 pa_memblockq_set_minreq(bq, a->minreq);
927 pa_memblockq_set_prebuf(bq, a->prebuf);
930 void pa_memblockq_get_attr(pa_memblockq *bq, pa_buffer_attr *a) {
934 a->maxlength = (uint32_t) pa_memblockq_get_maxlength(bq);
935 a->tlength = (uint32_t) pa_memblockq_get_tlength(bq);
936 a->prebuf = (uint32_t) pa_memblockq_get_prebuf(bq);
937 a->minreq = (uint32_t) pa_memblockq_get_minreq(bq);
940 int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) {
945 pa_memblockq_prebuf_disable(bq);
950 if (pa_memblockq_peek(source, &chunk) < 0)
953 pa_assert(chunk.length > 0);
955 if (chunk.memblock) {
957 if (pa_memblockq_push_align(bq, &chunk) < 0) {
958 pa_memblock_unref(chunk.memblock);
962 pa_memblock_unref(chunk.memblock);
964 pa_memblockq_seek(bq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
966 pa_memblockq_drop(bq, chunk.length);
970 void pa_memblockq_willneed(pa_memblockq *bq) {
975 fix_current_read(bq);
977 for (q = bq->current_read; q; q = q->next)
978 pa_memchunk_will_need(&q->chunk);
981 void pa_memblockq_set_silence(pa_memblockq *bq, pa_memchunk *silence) {
984 if (bq->silence.memblock)
985 pa_memblock_unref(bq->silence.memblock);
988 bq->silence = *silence;
989 pa_memblock_ref(bq->silence.memblock);
991 pa_memchunk_reset(&bq->silence);
994 bool pa_memblockq_is_empty(pa_memblockq *bq) {
1000 void pa_memblockq_silence(pa_memblockq *bq) {
1004 drop_block(bq, bq->blocks);
1006 pa_assert(bq->n_blocks == 0);
1009 unsigned pa_memblockq_get_nblocks(pa_memblockq *bq) {
1012 return bq->n_blocks;
1015 size_t pa_memblockq_get_base(pa_memblockq *bq) {