2 * Async Jitter Queue based on g_async_queue
3 * This code is GST RTP smart and deals with timestamps
5 * Farsight Voice+Video library
6 * Copyright 2007 Collabora Ltd,
7 * Copyright 2007 Nokia Corporation
8 * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
10 * This is an async queue that has a buffering mecanism based on the set low
11 * and high threshold. When the lower threshold is reached, the queue will
12 * fill itself up until the higher threshold is reached before allowing any
13 * pops to occur. This allows a jitterbuffer of at least min threshold items
17 /* GLIB - Library of useful routines for C programming
18 * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
20 * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
21 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
23 * This library is free software; you can redistribute it and/or
24 * modify it under the terms of the GNU Lesser General Public
25 * License as published by the Free Software Foundation; either
26 * version 2 of the License, or (at your option) any later version.
28 * This library is distributed in the hope that it will be useful,
29 * but WITHOUT ANY WARRANTY; without even the implied warranty of
30 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
31 * Lesser General Public License for more details.
33 * You should have received a copy of the GNU Lesser General Public
34 * License along with this library; if not, write to the
35 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
36 * Boston, MA 02111-1307, USA.
45 #include "async_jitter_queue.h"
48 #include <gst/rtp/gstrtpbuffer.h>
50 #define DEFAULT_LOW_THRESHOLD 0.1
51 #define DEFAULT_HIGH_THRESHOLD 0.9
53 struct _AsyncJitterQueue
58 guint waiting_threads;
61 gfloat high_threshold;
62 guint32 max_queue_length;
64 gboolean pop_flushing;
65 gboolean pop_blocking;
67 guint32 tail_buffer_duration;
71 * async_jitter_queue_new:
73 * Creates a new asynchronous queue with the initial reference count of 1.
75 * Return value: the new #AsyncJitterQueue.
78 async_jitter_queue_new (void)
80 AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
82 retval->mutex = g_mutex_new ();
83 retval->cond = g_cond_new ();
84 retval->queue = g_queue_new ();
85 retval->waiting_threads = 0;
86 retval->ref_count = 1;
87 retval->low_threshold = DEFAULT_LOW_THRESHOLD;
88 retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
89 retval->buffering = TRUE; /* we need to buffer initially */
90 retval->pop_flushing = TRUE;
91 retval->pop_blocking = TRUE;
92 retval->pops_remaining = 0;
93 retval->tail_buffer_duration = 0;
97 /* checks buffering state and wakes up waiting pops */
99 signal_waiting_threads (AsyncJitterQueue * queue)
101 if (async_jitter_queue_length_ts_units_unlocked (queue) >=
102 queue->high_threshold * queue->max_queue_length) {
103 GST_DEBUG ("stop buffering");
104 queue->buffering = FALSE;
107 if (queue->waiting_threads > 0) {
108 if (!queue->buffering) {
109 g_cond_signal (queue->cond);
115 * async_jitter_queue_ref:
116 * @queue: a #AsyncJitterQueue.
118 * Increases the reference count of the asynchronous @queue by 1. You
119 * do not need to hold the lock to call this function.
121 * Returns: the @queue that was passed in (since 2.6)
124 async_jitter_queue_ref (AsyncJitterQueue * queue)
126 g_return_val_if_fail (queue, NULL);
127 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
129 g_atomic_int_inc (&queue->ref_count);
135 * async_jitter_queue_ref_unlocked:
136 * @queue: a #AsyncJitterQueue.
138 * Increases the reference count of the asynchronous @queue by 1.
141 async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
143 g_return_if_fail (queue);
144 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
146 g_atomic_int_inc (&queue->ref_count);
150 * async_jitter_queue_set_low_threshold:
151 * @queue: a #AsyncJitterQueue.
152 * @threshold: the lower threshold (fraction of max size)
154 * Sets the low threshold on the queue. This threshold indicates the minimum
155 * number of items allowed in the queue before we refill it up to the set
159 async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
162 g_return_if_fail (queue);
163 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
165 queue->low_threshold = threshold;
169 * async_jitter_queue_set_max_threshold:
170 * @queue: a #AsyncJitterQueue.
171 * @threshold: the higher threshold (fraction of max size)
173 * Sets the high threshold on the queue. This threshold indicates the amount of
174 * items to fill in the queue before releasing any blocking pop calls. This
175 * blocking mecanism is only triggered when we reach the low threshold and must
179 async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
182 g_return_if_fail (queue);
183 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
185 queue->high_threshold = threshold;
188 /* set the maximum queue length in RTP timestamp units */
190 async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
193 g_return_if_fail (queue);
194 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
196 queue->max_queue_length = max_length;
200 async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
202 g_return_val_if_fail (queue, NULL);
208 calculate_ts_diff (guint32 high_ts, guint32 low_ts)
210 /* it needs to work if ts wraps */
211 if (high_ts >= low_ts) {
212 return high_ts - low_ts;
214 return high_ts + G_MAXUINT32 + 1 - low_ts;
218 /* this function returns the length of the queue in timestamp units. It will
219 * also add the duration of the last buffer in the queue */
220 /* FIXME This function wrongly assumes that there are no missing packets inside
221 * the buffer, in reality it needs to check for gaps and subsctract those from
224 async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
232 g_return_val_if_fail (queue, 0);
234 if (queue->queue->length < 2) {
238 tail = g_queue_peek_tail (queue->queue);
239 head = g_queue_peek_head (queue->queue);
241 if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
244 tail_ts = gst_rtp_buffer_get_timestamp (tail);
245 head_ts = gst_rtp_buffer_get_timestamp (head);
247 ret = calculate_ts_diff (head_ts, tail_ts);
249 /* let's add the duration of the tail buffer */
250 ret += queue->tail_buffer_duration;
256 * async_jitter_queue_unref_and_unlock:
257 * @queue: a #AsyncJitterQueue.
259 * Decreases the reference count of the asynchronous @queue by 1 and
260 * releases the lock. This function must be called while holding the
261 * @queue's lock. If the reference count went to 0, the @queue will be
262 * destroyed and the memory allocated will be freed.
265 async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
267 g_return_if_fail (queue);
268 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
270 g_mutex_unlock (queue->mutex);
271 async_jitter_queue_unref (queue);
275 * async_jitter_queue_unref:
276 * @queue: a #AsyncJitterQueue.
278 * Decreases the reference count of the asynchronous @queue by 1. If
279 * the reference count went to 0, the @queue will be destroyed and the
280 * memory allocated will be freed. So you are not allowed to use the
281 * @queue afterwards, as it might have disappeared. You do not need to
282 * hold the lock to call this function.
285 async_jitter_queue_unref (AsyncJitterQueue * queue)
287 g_return_if_fail (queue);
288 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
290 if (g_atomic_int_dec_and_test (&queue->ref_count)) {
291 g_return_if_fail (queue->waiting_threads == 0);
292 g_mutex_free (queue->mutex);
294 g_cond_free (queue->cond);
295 g_queue_free (queue->queue);
301 * async_jitter_queue_lock:
302 * @queue: a #AsyncJitterQueue.
304 * Acquires the @queue's lock. After that you can only call the
305 * <function>async_jitter_queue_*_unlocked()</function> function variants on that
306 * @queue. Otherwise it will deadlock.
309 async_jitter_queue_lock (AsyncJitterQueue * queue)
311 g_return_if_fail (queue);
312 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
314 g_mutex_lock (queue->mutex);
318 * async_jitter_queue_unlock:
319 * @queue: a #AsyncJitterQueue.
321 * Releases the queue's lock.
324 async_jitter_queue_unlock (AsyncJitterQueue * queue)
326 g_return_if_fail (queue);
327 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
329 g_mutex_unlock (queue->mutex);
333 * async_jitter_queue_push:
334 * @queue: a #AsyncJitterQueue.
335 * @data: @data to push into the @queue.
337 * Pushes the @data into the @queue. @data must not be %NULL.
340 async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
342 g_return_if_fail (queue);
343 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
344 g_return_if_fail (data);
346 g_mutex_lock (queue->mutex);
347 async_jitter_queue_push_unlocked (queue, data);
348 g_mutex_unlock (queue->mutex);
352 * async_jitter_queue_push_unlocked:
353 * @queue: a #AsyncJitterQueue.
354 * @data: @data to push into the @queue.
356 * Pushes the @data into the @queue. @data must not be %NULL. This
357 * function must be called while holding the @queue's lock.
360 async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
362 g_return_if_fail (queue);
363 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
364 g_return_if_fail (data);
366 g_queue_push_head (queue->queue, data);
368 signal_waiting_threads (queue);
372 * async_jitter_queue_push_sorted:
373 * @queue: a #AsyncJitterQueue
374 * @data: the @data to push into the @queue
375 * @func: the #GCompareDataFunc is used to sort @queue. This function
376 * is passed two elements of the @queue. The function should return
377 * 0 if they are equal, a negative value if the first element
378 * should be higher in the @queue or a positive value if the first
379 * element should be lower in the @queue than the second element.
380 * @user_data: user data passed to @func.
382 * Inserts @data into @queue using @func to determine the new
385 * This function requires that the @queue is sorted before pushing on
388 * This function will lock @queue before it sorts the queue and unlock
389 * it when it is finished.
391 * For an example of @func see async_jitter_queue_sort().
396 async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
397 gpointer data, GCompareDataFunc func, gpointer user_data)
399 g_return_val_if_fail (queue != NULL, FALSE);
402 g_mutex_lock (queue->mutex);
403 ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
404 g_mutex_unlock (queue->mutex);
410 * async_jitter_queue_push_sorted_unlocked:
411 * @queue: a #AsyncJitterQueue
412 * @data: the @data to push into the @queue
413 * @func: the #GCompareDataFunc is used to sort @queue. This function
414 * is passed two elements of the @queue. The function should return
415 * 0 if they are equal, a negative value if the first element
416 * should be higher in the @queue or a positive value if the first
417 * element should be lower in the @queue than the second element.
418 * @user_data: user data passed to @func.
420 * Inserts @data into @queue using @func to determine the new
423 * This function requires that the @queue is sorted before pushing on
426 * If @GCompareDataFunc returns 0, this function does not insert @data and
429 * This function is called while holding the @queue's lock.
431 * For an example of @func see async_jitter_queue_sort().
436 async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
437 gpointer data, GCompareDataFunc func, gpointer user_data)
440 gint func_ret = TRUE;
442 g_return_val_if_fail (queue != NULL, FALSE);
444 list = queue->queue->head;
445 while (list && (func_ret = func (list->data, data, user_data)) < 0)
452 g_queue_insert_before (queue->queue, list, data);
454 g_queue_push_tail (queue->queue, data);
457 signal_waiting_threads (queue);
462 async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
463 GList * sibling, gpointer data)
465 g_return_if_fail (queue != NULL);
467 g_queue_insert_before (queue->queue, sibling, data);
469 signal_waiting_threads (queue);
473 async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
476 GstBuffer *tail_buffer = NULL;
479 if (queue->pop_flushing)
482 while (queue->pop_blocking) {
483 queue->waiting_threads++;
484 g_cond_wait (queue->cond, queue->mutex);
485 queue->waiting_threads--;
486 if (queue->pop_flushing)
491 tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
493 GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
494 (int) (queue->low_threshold * queue->max_queue_length));
496 if (tsunits <= queue->low_threshold * queue->max_queue_length
497 && queue->pops_remaining == 0) {
498 if (!queue->buffering) {
499 GST_DEBUG ("start buffering");
500 queue->buffering = TRUE;
501 queue->pops_remaining = queue->queue->length;
504 GST_DEBUG ("wait for data");
505 while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
506 queue->waiting_threads++;
507 g_cond_wait (queue->cond, queue->mutex);
508 queue->waiting_threads--;
509 if (queue->pop_flushing)
514 retval = g_queue_pop_tail (queue->queue);
515 if (queue->pops_remaining)
516 queue->pops_remaining--;
518 tail_buffer = g_queue_peek_tail (queue->queue);
520 if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
521 queue->tail_buffer_duration = 0;
522 } else if (gst_rtp_buffer_get_seq (tail_buffer)
523 - gst_rtp_buffer_get_seq (retval) == 1) {
524 queue->tail_buffer_duration =
525 calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
526 gst_rtp_buffer_get_timestamp (retval));
528 /* There is a sequence number gap -> we can't calculate the duration
529 * let's just set it to 0 */
530 queue->tail_buffer_duration = 0;
540 * async_jitter_queue_pop:
541 * @queue: a #AsyncJitterQueue.
543 * Pops data from the @queue. This function blocks until data become
544 * available. If pop is disabled, tis function return NULL.
546 * Return value: data from the queue.
549 async_jitter_queue_pop (AsyncJitterQueue * queue)
553 g_return_val_if_fail (queue, NULL);
554 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
556 g_mutex_lock (queue->mutex);
557 retval = async_jitter_queue_pop_intern_unlocked (queue);
558 g_mutex_unlock (queue->mutex);
564 * async_jitter_queue_pop_unlocked:
565 * @queue: a #AsyncJitterQueue.
567 * Pops data from the @queue. This function blocks until data become
568 * available. This function must be called while holding the @queue's
571 * Return value: data from the queue.
574 async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
576 g_return_val_if_fail (queue, NULL);
577 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
579 return async_jitter_queue_pop_intern_unlocked (queue);
583 * async_jitter_queue_length:
584 * @queue: a #AsyncJitterQueue.
586 * Returns the length of the queue
587 * Return value: the length of the @queue.
590 async_jitter_queue_length (AsyncJitterQueue * queue)
594 g_return_val_if_fail (queue, 0);
595 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
597 g_mutex_lock (queue->mutex);
598 retval = queue->queue->length;
599 g_mutex_unlock (queue->mutex);
605 * async_jitter_queue_length_unlocked:
606 * @queue: a #AsyncJitterQueue.
608 * Returns the length of the queue.
610 * Return value: the length of the @queue.
613 async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
615 g_return_val_if_fail (queue, 0);
616 g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
618 return queue->queue->length;
622 * async_jitter_queue_set_flushing_unlocked:
623 * @queue: a #AsyncJitterQueue.
624 * @free_func: a function to call to free the elements
625 * @user_data: user data passed to @free_func
627 * This function is used to set/unset flushing. If flushing is set any
628 * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
629 * return NULL. Flushing is set by default.
632 async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
633 GFunc free_func, gpointer user_data)
635 g_return_if_fail (queue);
636 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
638 queue->pop_flushing = TRUE;
639 /* let's unblock any remaining pops */
640 if (queue->waiting_threads > 0)
641 g_cond_broadcast (queue->cond);
642 /* free data from queue */
643 g_queue_foreach (queue->queue, free_func, user_data);
647 * async_jitter_queue_unset_flushing_unlocked:
648 * @queue: a #AsyncJitterQueue.
649 * @free_func: a function to call to free the elements
650 * @user_data: user data passed to @free_func
652 * This function is used to set/unset flushing. If flushing is set any
653 * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
654 * return NULL. Flushing is set by default.
657 async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
659 g_return_if_fail (queue);
660 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
662 queue->pop_flushing = FALSE;
663 /* let's unblock any remaining pops */
664 if (queue->waiting_threads > 0)
665 g_cond_broadcast (queue->cond);
669 * async_jitter_queue_set_blocking_unlocked:
670 * @queue: a #AsyncJitterQueue.
671 * @enabled: a boolean to enable/disable blocking
673 * This function is used to enable/disable blocking. If blocking is enabled any
674 * pops will be blocked until the queue is unblocked. The queue is blocked by
678 async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
681 g_return_if_fail (queue);
682 g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
684 queue->pop_blocking = blocking;
685 /* let's unblock any remaining pops */
686 if (queue->waiting_threads > 0)
687 g_cond_broadcast (queue->cond);