ba14d98e2f6f1f3b9176d3e9c5a7c8b3d88f2dc3
[platform/upstream/gstreamer.git] / gst / rtpmanager / async_jitter_queue.c
1 /*
2  * Async Jitter Queue based on g_async_queue
3  * This code is GST RTP smart and deals with timestamps
4  *
5  * Farsight Voice+Video library
6  *  Copyright 2007 Collabora Ltd,
7  *  Copyright 2007 Nokia Corporation
8  *   @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
9  *
10  *   This is an async queue that has a buffering mecanism based on the set low
11  *   and high threshold. When the lower threshold is reached, the queue will
12  *   fill itself up until the higher threshold is reached before allowing any
13  *   pops to occur. This allows a jitterbuffer of at least min threshold items
14  *   to be available.
15  */
16
17 /* GLIB - Library of useful routines for C programming
18  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
19  *
20  * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
21  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
22  *
23  * This library is free software; you can redistribute it and/or
24  * modify it under the terms of the GNU Lesser General Public
25  * License as published by the Free Software Foundation; either
26  * version 2 of the License, or (at your option) any later version.
27  *
28  * This library is distributed in the hope that it will be useful,
29  * but WITHOUT ANY WARRANTY; without even the implied warranty of
30  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
31  * Lesser General Public License for more details.
32  *
33  * You should have received a copy of the GNU Lesser General Public
34  * License along with this library; if not, write to the
35  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
36  * Boston, MA 02111-1307, USA.
37  */
38
39 /*
40  * MT safe
41  */
42
43 #include "config.h"
44
45 #include "async_jitter_queue.h"
46
47 #include <gst/gst.h>
48 #include <gst/rtp/gstrtpbuffer.h>
49
50 #define DEFAULT_LOW_THRESHOLD 0.1
51 #define DEFAULT_HIGH_THRESHOLD 0.9
52
53 struct _AsyncJitterQueue
54 {
55   GMutex *mutex;
56   GCond *cond;
57   GQueue *queue;
58   guint waiting_threads;
59   gint32 ref_count;
60   gfloat low_threshold;
61   gfloat high_threshold;
62   guint32 max_queue_length;
63   gboolean buffering;
64   gboolean pop_flushing;
65   gboolean pop_blocking;
66   guint pops_remaining;
67   guint32 tail_buffer_duration;
68 };
69
70 /**
71  * async_jitter_queue_new:
72  *
73  * Creates a new asynchronous queue with the initial reference count of 1.
74  *
75  * Return value: the new #AsyncJitterQueue.
76  **/
77 AsyncJitterQueue *
78 async_jitter_queue_new (void)
79 {
80   AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
81
82   retval->mutex = g_mutex_new ();
83   retval->cond = g_cond_new ();
84   retval->queue = g_queue_new ();
85   retval->waiting_threads = 0;
86   retval->ref_count = 1;
87   retval->low_threshold = DEFAULT_LOW_THRESHOLD;
88   retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
89   retval->buffering = TRUE;     /* we need to buffer initially */
90   retval->pop_flushing = TRUE;
91   retval->pop_blocking = TRUE;
92   retval->pops_remaining = 0;
93   retval->tail_buffer_duration = 0;
94   return retval;
95 }
96
97 /* checks buffering state and wakes up waiting pops */
98 void
99 signal_waiting_threads (AsyncJitterQueue * queue)
100 {
101   if (async_jitter_queue_length_ts_units_unlocked (queue) >=
102       queue->high_threshold * queue->max_queue_length) {
103     GST_DEBUG ("stop buffering");
104     queue->buffering = FALSE;
105   }
106
107   if (queue->waiting_threads > 0) {
108     if (!queue->buffering) {
109       g_cond_signal (queue->cond);
110     }
111   }
112 }
113
114 /**
115  * async_jitter_queue_ref:
116  * @queue: a #AsyncJitterQueue.
117  *
118  * Increases the reference count of the asynchronous @queue by 1. You
119  * do not need to hold the lock to call this function.
120  *
121  * Returns: the @queue that was passed in (since 2.6)
122  **/
123 AsyncJitterQueue *
124 async_jitter_queue_ref (AsyncJitterQueue * queue)
125 {
126   g_return_val_if_fail (queue, NULL);
127   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
128
129   g_atomic_int_inc (&queue->ref_count);
130
131   return queue;
132 }
133
134 /**
135  * async_jitter_queue_ref_unlocked:
136  * @queue: a #AsyncJitterQueue.
137  *
138  * Increases the reference count of the asynchronous @queue by 1.
139  **/
140 void
141 async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
142 {
143   g_return_if_fail (queue);
144   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
145
146   g_atomic_int_inc (&queue->ref_count);
147 }
148
149 /**
150  * async_jitter_queue_set_low_threshold:
151  * @queue: a #AsyncJitterQueue.
152  * @threshold: the lower threshold (fraction of max size)
153  *
154  * Sets the low threshold on the queue. This threshold indicates the minimum
155  * number of items allowed in the queue before we refill it up to the set
156  * maximum threshold.
157  **/
158 void
159 async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
160     gfloat threshold)
161 {
162   g_return_if_fail (queue);
163   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
164
165   queue->low_threshold = threshold;
166 }
167
168 /**
169  * async_jitter_queue_set_max_threshold:
170  * @queue: a #AsyncJitterQueue.
171  * @threshold: the higher threshold (fraction of max size)
172  *
173  * Sets the high threshold on the queue. This threshold indicates the amount of
174  * items to fill in the queue before releasing any blocking pop calls. This
175  * blocking mecanism is only triggered when we reach the low threshold and must
176  * refill the queue.
177  **/
178 void
179 async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
180     gfloat threshold)
181 {
182   g_return_if_fail (queue);
183   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
184
185   queue->high_threshold = threshold;
186 }
187
188 /* set the maximum queue length in RTP timestamp units */
189 void
190 async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
191     guint32 max_length)
192 {
193   g_return_if_fail (queue);
194   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
195
196   queue->max_queue_length = max_length;
197 }
198
199 GQueue *
200 async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
201 {
202   g_return_val_if_fail (queue, NULL);
203
204   return queue->queue;
205 }
206
207 static guint32
208 calculate_ts_diff (guint32 high_ts, guint32 low_ts)
209 {
210   /* it needs to work if ts wraps */
211   if (high_ts >= low_ts) {
212     return high_ts - low_ts;
213   } else {
214     return high_ts + G_MAXUINT32 + 1 - low_ts;
215   }
216 }
217
218 /* this function returns the length of the queue in timestamp units. It will
219  * also add the duration of the last buffer in the queue */
220 /* FIXME This function wrongly assumes that there are no missing packets inside
221  * the buffer, in reality it needs to check for gaps and subsctract those from
222  * the total */
223 guint32
224 async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
225 {
226   guint32 tail_ts;
227   guint32 head_ts;
228   guint32 ret;
229   GstBuffer *head;
230   GstBuffer *tail;
231
232   g_return_val_if_fail (queue, 0);
233
234   if (queue->queue->length < 2) {
235     return 0;
236   }
237
238   tail = g_queue_peek_tail (queue->queue);
239   head = g_queue_peek_head (queue->queue);
240
241   if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
242     return 0;
243
244   tail_ts = gst_rtp_buffer_get_timestamp (tail);
245   head_ts = gst_rtp_buffer_get_timestamp (head);
246
247   ret = calculate_ts_diff (head_ts, tail_ts);
248
249   /* let's add the duration of the tail buffer */
250   ret += queue->tail_buffer_duration;
251
252   return ret;
253 }
254
255 /**
256  * async_jitter_queue_unref_and_unlock:
257  * @queue: a #AsyncJitterQueue.
258  *
259  * Decreases the reference count of the asynchronous @queue by 1 and
260  * releases the lock. This function must be called while holding the
261  * @queue's lock. If the reference count went to 0, the @queue will be
262  * destroyed and the memory allocated will be freed.
263  **/
264 void
265 async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
266 {
267   g_return_if_fail (queue);
268   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
269
270   g_mutex_unlock (queue->mutex);
271   async_jitter_queue_unref (queue);
272 }
273
274 /**
275  * async_jitter_queue_unref:
276  * @queue: a #AsyncJitterQueue.
277  *
278  * Decreases the reference count of the asynchronous @queue by 1. If
279  * the reference count went to 0, the @queue will be destroyed and the
280  * memory allocated will be freed. So you are not allowed to use the
281  * @queue afterwards, as it might have disappeared. You do not need to
282  * hold the lock to call this function.
283  **/
284 void
285 async_jitter_queue_unref (AsyncJitterQueue * queue)
286 {
287   g_return_if_fail (queue);
288   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
289
290   if (g_atomic_int_dec_and_test (&queue->ref_count)) {
291     g_return_if_fail (queue->waiting_threads == 0);
292     g_mutex_free (queue->mutex);
293     if (queue->cond)
294       g_cond_free (queue->cond);
295     g_queue_free (queue->queue);
296     g_free (queue);
297   }
298 }
299
300 /**
301  * async_jitter_queue_lock:
302  * @queue: a #AsyncJitterQueue.
303  *
304  * Acquires the @queue's lock. After that you can only call the
305  * <function>async_jitter_queue_*_unlocked()</function> function variants on that
306  * @queue. Otherwise it will deadlock.
307  **/
308 void
309 async_jitter_queue_lock (AsyncJitterQueue * queue)
310 {
311   g_return_if_fail (queue);
312   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
313
314   g_mutex_lock (queue->mutex);
315 }
316
317 /**
318  * async_jitter_queue_unlock:
319  * @queue: a #AsyncJitterQueue.
320  *
321  * Releases the queue's lock.
322  **/
323 void
324 async_jitter_queue_unlock (AsyncJitterQueue * queue)
325 {
326   g_return_if_fail (queue);
327   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
328
329   g_mutex_unlock (queue->mutex);
330 }
331
332 /**
333  * async_jitter_queue_push:
334  * @queue: a #AsyncJitterQueue.
335  * @data: @data to push into the @queue.
336  *
337  * Pushes the @data into the @queue. @data must not be %NULL.
338  **/
339 void
340 async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
341 {
342   g_return_if_fail (queue);
343   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
344   g_return_if_fail (data);
345
346   g_mutex_lock (queue->mutex);
347   async_jitter_queue_push_unlocked (queue, data);
348   g_mutex_unlock (queue->mutex);
349 }
350
351 /**
352  * async_jitter_queue_push_unlocked:
353  * @queue: a #AsyncJitterQueue.
354  * @data: @data to push into the @queue.
355  *
356  * Pushes the @data into the @queue. @data must not be %NULL. This
357  * function must be called while holding the @queue's lock.
358  **/
359 void
360 async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
361 {
362   g_return_if_fail (queue);
363   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
364   g_return_if_fail (data);
365
366   g_queue_push_head (queue->queue, data);
367
368   signal_waiting_threads (queue);
369 }
370
371 /**
372  * async_jitter_queue_push_sorted:
373  * @queue: a #AsyncJitterQueue
374  * @data: the @data to push into the @queue
375  * @func: the #GCompareDataFunc is used to sort @queue. This function
376  *     is passed two elements of the @queue. The function should return
377  *     0 if they are equal, a negative value if the first element
378  *     should be higher in the @queue or a positive value if the first
379  *     element should be lower in the @queue than the second element.
380  * @user_data: user data passed to @func.
381  *
382  * Inserts @data into @queue using @func to determine the new
383  * position.
384  *
385  * This function requires that the @queue is sorted before pushing on
386  * new elements.
387  *
388  * This function will lock @queue before it sorts the queue and unlock
389  * it when it is finished.
390  *
391  * For an example of @func see async_jitter_queue_sort().
392  *
393  * Since: 2.10
394  **/
395 gboolean
396 async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
397     gpointer data, GCompareDataFunc func, gpointer user_data)
398 {
399   g_return_val_if_fail (queue != NULL, FALSE);
400   gboolean ret;
401
402   g_mutex_lock (queue->mutex);
403   ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
404   g_mutex_unlock (queue->mutex);
405
406   return ret;
407 }
408
409 /**
410  * async_jitter_queue_push_sorted_unlocked:
411  * @queue: a #AsyncJitterQueue
412  * @data: the @data to push into the @queue
413  * @func: the #GCompareDataFunc is used to sort @queue. This function
414  *     is passed two elements of the @queue. The function should return
415  *     0 if they are equal, a negative value if the first element
416  *     should be higher in the @queue or a positive value if the first
417  *     element should be lower in the @queue than the second element.
418  * @user_data: user data passed to @func.
419  *
420  * Inserts @data into @queue using @func to determine the new
421  * position.
422  *
423  * This function requires that the @queue is sorted before pushing on
424  * new elements.
425  *
426  * If @GCompareDataFunc returns 0, this function does not insert @data and
427  * return FALSE.
428  *
429  * This function is called while holding the @queue's lock.
430  *
431  * For an example of @func see async_jitter_queue_sort().
432  *
433  * Since: 2.10
434  **/
435 gboolean
436 async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
437     gpointer data, GCompareDataFunc func, gpointer user_data)
438 {
439   GList *list;
440   gint func_ret = TRUE;
441
442   g_return_val_if_fail (queue != NULL, FALSE);
443
444   list = queue->queue->head;
445   while (list && (func_ret = func (list->data, data, user_data)) < 0)
446     list = list->next;
447
448   if (func_ret == 0) {
449     return FALSE;
450   }
451   if (list) {
452     g_queue_insert_before (queue->queue, list, data);
453   } else {
454     g_queue_push_tail (queue->queue, data);
455   }
456
457   signal_waiting_threads (queue);
458   return TRUE;
459 }
460
461 void
462 async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
463     GList * sibling, gpointer data)
464 {
465   g_return_if_fail (queue != NULL);
466
467   g_queue_insert_before (queue->queue, sibling, data);
468
469   signal_waiting_threads (queue);
470 }
471
472 static gpointer
473 async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
474 {
475   gpointer retval;
476   GstBuffer *tail_buffer = NULL;
477   guint tsunits;
478
479   if (queue->pop_flushing)
480     return NULL;
481
482   while (queue->pop_blocking) {
483     queue->waiting_threads++;
484     g_cond_wait (queue->cond, queue->mutex);
485     queue->waiting_threads--;
486     if (queue->pop_flushing)
487       return NULL;
488   }
489
490
491   tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
492
493   GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
494       (int) (queue->low_threshold * queue->max_queue_length));
495
496   if (tsunits <= queue->low_threshold * queue->max_queue_length
497       && queue->pops_remaining == 0) {
498     if (!queue->buffering) {
499       GST_DEBUG ("start buffering");
500       queue->buffering = TRUE;
501       queue->pops_remaining = queue->queue->length;
502     }
503
504     GST_DEBUG ("wait for data");
505     while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
506       queue->waiting_threads++;
507       g_cond_wait (queue->cond, queue->mutex);
508       queue->waiting_threads--;
509       if (queue->pop_flushing)
510         return NULL;
511     }
512   }
513
514   retval = g_queue_pop_tail (queue->queue);
515   if (queue->pops_remaining)
516     queue->pops_remaining--;
517
518   tail_buffer = g_queue_peek_tail (queue->queue);
519   if (tail_buffer) {
520     if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
521       queue->tail_buffer_duration = 0;
522     } else if (gst_rtp_buffer_get_seq (tail_buffer)
523         - gst_rtp_buffer_get_seq (retval) == 1) {
524       queue->tail_buffer_duration =
525           calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
526           gst_rtp_buffer_get_timestamp (retval));
527     } else {
528       /* There is a sequence number gap -> we can't calculate the duration
529        * let's just set it to 0 */
530       queue->tail_buffer_duration = 0;
531     }
532   }
533
534   g_assert (retval);
535
536   return retval;
537 }
538
539 /**
540  * async_jitter_queue_pop:
541  * @queue: a #AsyncJitterQueue.
542  *
543  * Pops data from the @queue. This function blocks until data become
544  * available. If pop is disabled, tis function return NULL.
545  *
546  * Return value: data from the queue.
547  **/
548 gpointer
549 async_jitter_queue_pop (AsyncJitterQueue * queue)
550 {
551   gpointer retval;
552
553   g_return_val_if_fail (queue, NULL);
554   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
555
556   g_mutex_lock (queue->mutex);
557   retval = async_jitter_queue_pop_intern_unlocked (queue);
558   g_mutex_unlock (queue->mutex);
559
560   return retval;
561 }
562
563 /**
564  * async_jitter_queue_pop_unlocked:
565  * @queue: a #AsyncJitterQueue.
566  *
567  * Pops data from the @queue. This function blocks until data become
568  * available. This function must be called while holding the @queue's
569  * lock.
570  *
571  * Return value: data from the queue.
572  **/
573 gpointer
574 async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
575 {
576   g_return_val_if_fail (queue, NULL);
577   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
578
579   return async_jitter_queue_pop_intern_unlocked (queue);
580 }
581
582 /**
583  * async_jitter_queue_length:
584  * @queue: a #AsyncJitterQueue.
585  *
586  * Returns the length of the queue
587  * Return value: the length of the @queue.
588  **/
589 gint
590 async_jitter_queue_length (AsyncJitterQueue * queue)
591 {
592   gint retval;
593
594   g_return_val_if_fail (queue, 0);
595   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
596
597   g_mutex_lock (queue->mutex);
598   retval = queue->queue->length;
599   g_mutex_unlock (queue->mutex);
600
601   return retval;
602 }
603
604 /**
605  * async_jitter_queue_length_unlocked:
606  * @queue: a #AsyncJitterQueue.
607  *
608  * Returns the length of the queue.
609  *
610  * Return value: the length of the @queue.
611  **/
612 gint
613 async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
614 {
615   g_return_val_if_fail (queue, 0);
616   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
617
618   return queue->queue->length;
619 }
620
621 /**
622  * async_jitter_queue_set_flushing_unlocked:
623  * @queue: a #AsyncJitterQueue.
624  * @free_func: a function to call to free the elements
625  * @user_data: user data passed to @free_func
626  *
627  * This function is used to set/unset flushing. If flushing is set any
628  * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
629  * return NULL. Flushing is set by default.
630  */
631 void
632 async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
633     GFunc free_func, gpointer user_data)
634 {
635   g_return_if_fail (queue);
636   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
637
638   queue->pop_flushing = TRUE;
639   /* let's unblock any remaining pops */
640   if (queue->waiting_threads > 0)
641     g_cond_broadcast (queue->cond);
642   /* free data from queue */
643   g_queue_foreach (queue->queue, free_func, user_data);
644 }
645
646 /**
647  * async_jitter_queue_unset_flushing_unlocked:
648  * @queue: a #AsyncJitterQueue.
649  * @free_func: a function to call to free the elements
650  * @user_data: user data passed to @free_func
651  *
652  * This function is used to set/unset flushing. If flushing is set any
653  * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
654  * return NULL. Flushing is set by default.
655  */
656 void
657 async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
658 {
659   g_return_if_fail (queue);
660   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
661
662   queue->pop_flushing = FALSE;
663   /* let's unblock any remaining pops */
664   if (queue->waiting_threads > 0)
665     g_cond_broadcast (queue->cond);
666 }
667
668 /**
669  * async_jitter_queue_set_blocking_unlocked:
670  * @queue: a #AsyncJitterQueue.
671  * @enabled: a boolean to enable/disable blocking
672  *
673  * This function is used to enable/disable blocking. If blocking is enabled any
674  * pops will be blocked until the queue is unblocked. The queue is blocked by
675  * default.
676  */
677 void
678 async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
679     gboolean blocking)
680 {
681   g_return_if_fail (queue);
682   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
683
684   queue->pop_blocking = blocking;
685   /* let's unblock any remaining pops */
686   if (queue->waiting_threads > 0)
687     g_cond_broadcast (queue->cond);
688 }