22a8ed0e1010fd12cbc78ee5c7d19b4c60e5e911
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / async_jitter_queue.c
1 /* 
2  * Async Jitter Queue based on g_async_queue
3  * This code is GST RTP smart and deals with timestamps
4  *
5  * Farsight Voice+Video library
6  *  Copyright 2007 Collabora Ltd, 
7  *  Copyright 2007 Nokia Corporation
8  *   @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
9  *
10  *   This is an async queue that has a buffering mecanism based on the set low
11  *   and high threshold. When the lower threshold is reached, the queue will
12  *   fill itself up until the higher threshold is reached before allowing any
13  *   pops to occur. This allows a jitterbuffer of at least min threshold items
14  *   to be available.
15  */
16
17 /* GLIB - Library of useful routines for C programming
18  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
19  *
20  * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
21  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
22  *
23  * This library is free software; you can redistribute it and/or
24  * modify it under the terms of the GNU Lesser General Public
25  * License as published by the Free Software Foundation; either
26  * version 2 of the License, or (at your option) any later version.
27  *
28  * This library is distributed in the hope that it will be useful,
29  * but WITHOUT ANY WARRANTY; without even the implied warranty of
30  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
31  * Lesser General Public License for more details.
32  *
33  * You should have received a copy of the GNU Lesser General Public
34  * License along with this library; if not, write to the
35  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
36  * Boston, MA 02111-1307, USA.
37  */
38
39 /*
40  * MT safe
41  */
42
43 #include "config.h"
44
45 #include "async_jitter_queue.h"
46
47 #include <gst/gst.h>
48 #include <gst/rtp/gstrtpbuffer.h>
49
50 #define DEFAULT_LOW_THRESHOLD 0.1
51 #define DEFAULT_HIGH_THRESHOLD 0.9
52
53 struct _AsyncJitterQueue
54 {
55   GMutex *mutex;
56   GCond *cond;
57   GQueue *queue;
58   guint waiting_threads;
59   gint32 ref_count;
60   gfloat low_threshold;
61   gfloat high_threshold;
62   guint32 max_queue_length;
63   gboolean buffering;
64   gboolean pop_flushing;
65   gboolean pop_blocking;
66   guint pops_remaining;
67   guint32 tail_buffer_duration;
68 };
69
70 /**
71  * async_jitter_queue_new:
72  * 
73  * Creates a new asynchronous queue with the initial reference count of 1.
74  * 
75  * Return value: the new #AsyncJitterQueue.
76  **/
77 AsyncJitterQueue *
78 async_jitter_queue_new (void)
79 {
80   AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
81
82   retval->mutex = g_mutex_new ();
83   retval->cond = g_cond_new ();
84   retval->queue = g_queue_new ();
85   retval->waiting_threads = 0;
86   retval->ref_count = 1;
87   retval->low_threshold = DEFAULT_LOW_THRESHOLD;
88   retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
89   retval->buffering = TRUE;     /* we need to buffer initially */
90   retval->pop_flushing = TRUE;
91   retval->pop_blocking = TRUE;
92   retval->pops_remaining = 0;
93   retval->tail_buffer_duration = 0;
94   return retval;
95 }
96
97 /* checks buffering state and wakes up waiting pops */
98 void
99 signal_waiting_threads (AsyncJitterQueue * queue)
100 {
101   if (async_jitter_queue_length_ts_units_unlocked (queue) >=
102       queue->high_threshold * queue->max_queue_length) {
103     queue->buffering = FALSE;
104   }
105
106   if (queue->waiting_threads > 0) {
107     if (!queue->buffering) {
108       g_cond_signal (queue->cond);
109     }
110   }
111 }
112
113 /**
114  * async_jitter_queue_ref:
115  * @queue: a #AsyncJitterQueue.
116  *
117  * Increases the reference count of the asynchronous @queue by 1. You
118  * do not need to hold the lock to call this function.
119  *
120  * Returns: the @queue that was passed in (since 2.6)
121  **/
122 AsyncJitterQueue *
123 async_jitter_queue_ref (AsyncJitterQueue * queue)
124 {
125   g_return_val_if_fail (queue, NULL);
126   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
127
128   g_atomic_int_inc (&queue->ref_count);
129
130   return queue;
131 }
132
133 /**
134  * async_jitter_queue_ref_unlocked:
135  * @queue: a #AsyncJitterQueue.
136  * 
137  * Increases the reference count of the asynchronous @queue by 1.
138  **/
139 void
140 async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
141 {
142   g_return_if_fail (queue);
143   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
144
145   g_atomic_int_inc (&queue->ref_count);
146 }
147
148 /**
149  * async_jitter_queue_set_low_threshold:
150  * @queue: a #AsyncJitterQueue.
151  * @threshold: the lower threshold (fraction of max size)
152  * 
153  * Sets the low threshold on the queue. This threshold indicates the minimum
154  * number of items allowed in the queue before we refill it up to the set
155  * maximum threshold.
156  **/
157 void
158 async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
159     gfloat threshold)
160 {
161   g_return_if_fail (queue);
162   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
163
164   queue->low_threshold = threshold;
165 }
166
167 /**
168  * async_jitter_queue_set_max_threshold:
169  * @queue: a #AsyncJitterQueue.
170  * @threshold: the higher threshold (fraction of max size)
171  * 
172  * Sets the high threshold on the queue. This threshold indicates the amount of
173  * items to fill in the queue before releasing any blocking pop calls. This
174  * blocking mecanism is only triggered when we reach the low threshold and must
175  * refill the queue.
176  **/
177 void
178 async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
179     gfloat threshold)
180 {
181   g_return_if_fail (queue);
182   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
183
184   queue->high_threshold = threshold;
185 }
186
187 /* set the maximum queue length in RTP timestamp units */
188 void
189 async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
190     guint32 max_length)
191 {
192   g_return_if_fail (queue);
193   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
194
195   queue->max_queue_length = max_length;
196 }
197
198 GQueue *
199 async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
200 {
201   g_return_val_if_fail (queue, NULL);
202
203   return queue->queue;
204 }
205
206 static guint32
207 calculate_ts_diff (guint32 high_ts, guint32 low_ts)
208 {
209   /* it needs to work if ts wraps */
210   if (high_ts >= low_ts) {
211     return high_ts - low_ts;
212   } else {
213     return high_ts + G_MAXUINT32 + 1 - low_ts;
214   }
215 }
216
217 /* this function returns the length of the queue in timestamp units. It will
218  * also add the duration of the last buffer in the queue */
219 /* FIXME This function wrongly assumes that there are no missing packets inside
220  * the buffer, in reality it needs to check for gaps and subsctract those from
221  * the total */
222 guint32
223 async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
224 {
225   guint32 tail_ts;
226   guint32 head_ts;
227   guint32 ret;
228   GstBuffer *head;
229   GstBuffer *tail;
230
231   g_return_val_if_fail (queue, 0);
232
233   if (queue->queue->length < 2) {
234     return 0;
235   }
236
237   tail = g_queue_peek_tail (queue->queue);
238   head = g_queue_peek_head (queue->queue);
239
240   if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
241     return 0;
242
243   tail_ts = gst_rtp_buffer_get_timestamp (tail);
244   head_ts = gst_rtp_buffer_get_timestamp (head);
245
246   ret = calculate_ts_diff (head_ts, tail_ts);
247
248   /* let's add the duration of the tail buffer */
249   ret += queue->tail_buffer_duration;
250
251   return ret;
252 }
253
254 /**
255  * async_jitter_queue_unref_and_unlock:
256  * @queue: a #AsyncJitterQueue.
257  * 
258  * Decreases the reference count of the asynchronous @queue by 1 and
259  * releases the lock. This function must be called while holding the
260  * @queue's lock. If the reference count went to 0, the @queue will be
261  * destroyed and the memory allocated will be freed.
262  **/
263 void
264 async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
265 {
266   g_return_if_fail (queue);
267   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
268
269   g_mutex_unlock (queue->mutex);
270   async_jitter_queue_unref (queue);
271 }
272
273 /**
274  * async_jitter_queue_unref:
275  * @queue: a #AsyncJitterQueue.
276  * 
277  * Decreases the reference count of the asynchronous @queue by 1. If
278  * the reference count went to 0, the @queue will be destroyed and the
279  * memory allocated will be freed. So you are not allowed to use the
280  * @queue afterwards, as it might have disappeared. You do not need to
281  * hold the lock to call this function.
282  **/
283 void
284 async_jitter_queue_unref (AsyncJitterQueue * queue)
285 {
286   g_return_if_fail (queue);
287   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
288
289   if (g_atomic_int_dec_and_test (&queue->ref_count)) {
290     g_return_if_fail (queue->waiting_threads == 0);
291     g_mutex_free (queue->mutex);
292     if (queue->cond)
293       g_cond_free (queue->cond);
294     g_queue_free (queue->queue);
295     g_free (queue);
296   }
297 }
298
299 /**
300  * async_jitter_queue_lock:
301  * @queue: a #AsyncJitterQueue.
302  * 
303  * Acquires the @queue's lock. After that you can only call the
304  * <function>async_jitter_queue_*_unlocked()</function> function variants on that
305  * @queue. Otherwise it will deadlock.
306  **/
307 void
308 async_jitter_queue_lock (AsyncJitterQueue * queue)
309 {
310   g_return_if_fail (queue);
311   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
312
313   g_mutex_lock (queue->mutex);
314 }
315
316 /**
317  * async_jitter_queue_unlock:
318  * @queue: a #AsyncJitterQueue.
319  * 
320  * Releases the queue's lock.
321  **/
322 void
323 async_jitter_queue_unlock (AsyncJitterQueue * queue)
324 {
325   g_return_if_fail (queue);
326   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
327
328   g_mutex_unlock (queue->mutex);
329 }
330
331 /**
332  * async_jitter_queue_push:
333  * @queue: a #AsyncJitterQueue.
334  * @data: @data to push into the @queue.
335  *
336  * Pushes the @data into the @queue. @data must not be %NULL.
337  **/
338 void
339 async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
340 {
341   g_return_if_fail (queue);
342   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
343   g_return_if_fail (data);
344
345   g_mutex_lock (queue->mutex);
346   async_jitter_queue_push_unlocked (queue, data);
347   g_mutex_unlock (queue->mutex);
348 }
349
350 /**
351  * async_jitter_queue_push_unlocked:
352  * @queue: a #AsyncJitterQueue.
353  * @data: @data to push into the @queue.
354  * 
355  * Pushes the @data into the @queue. @data must not be %NULL. This
356  * function must be called while holding the @queue's lock.
357  **/
358 void
359 async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
360 {
361   g_return_if_fail (queue);
362   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
363   g_return_if_fail (data);
364
365   g_queue_push_head (queue->queue, data);
366
367   signal_waiting_threads (queue);
368 }
369
370 /**
371  * async_jitter_queue_push_sorted:
372  * @queue: a #AsyncJitterQueue
373  * @data: the @data to push into the @queue
374  * @func: the #GCompareDataFunc is used to sort @queue. This function
375  *     is passed two elements of the @queue. The function should return
376  *     0 if they are equal, a negative value if the first element
377  *     should be higher in the @queue or a positive value if the first
378  *     element should be lower in the @queue than the second element.
379  * @user_data: user data passed to @func.
380  * 
381  * Inserts @data into @queue using @func to determine the new
382  * position. 
383  * 
384  * This function requires that the @queue is sorted before pushing on
385  * new elements.
386  * 
387  * This function will lock @queue before it sorts the queue and unlock
388  * it when it is finished.
389  * 
390  * For an example of @func see async_jitter_queue_sort(). 
391  *
392  * Since: 2.10
393  **/
394 gboolean
395 async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
396     gpointer data, GCompareDataFunc func, gpointer user_data)
397 {
398   g_return_val_if_fail (queue != NULL, FALSE);
399   gboolean ret;
400
401   g_mutex_lock (queue->mutex);
402   ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
403   g_mutex_unlock (queue->mutex);
404
405   return ret;
406 }
407
408 /**
409  * async_jitter_queue_push_sorted_unlocked:
410  * @queue: a #AsyncJitterQueue
411  * @data: the @data to push into the @queue
412  * @func: the #GCompareDataFunc is used to sort @queue. This function
413  *     is passed two elements of the @queue. The function should return
414  *     0 if they are equal, a negative value if the first element
415  *     should be higher in the @queue or a positive value if the first
416  *     element should be lower in the @queue than the second element.
417  * @user_data: user data passed to @func.
418  * 
419  * Inserts @data into @queue using @func to determine the new
420  * position.
421  * 
422  * This function requires that the @queue is sorted before pushing on
423  * new elements.
424  *
425  * If @GCompareDataFunc returns 0, this function does not insert @data and
426  * return FALSE.
427  * 
428  * This function is called while holding the @queue's lock.
429  * 
430  * For an example of @func see async_jitter_queue_sort(). 
431  *
432  * Since: 2.10
433  **/
434 gboolean
435 async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
436     gpointer data, GCompareDataFunc func, gpointer user_data)
437 {
438   GList *list;
439   gint func_ret = TRUE;
440
441   g_return_val_if_fail (queue != NULL, FALSE);
442
443   list = queue->queue->head;
444   while (list && (func_ret = func (list->data, data, user_data)) < 0)
445     list = list->next;
446
447   if (func_ret == 0) {
448     return FALSE;
449   }
450   if (list) {
451     g_queue_insert_before (queue->queue, list, data);
452   } else {
453     g_queue_push_tail (queue->queue, data);
454   }
455
456   signal_waiting_threads (queue);
457   return TRUE;
458 }
459
460 void
461 async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
462     GList * sibling, gpointer data)
463 {
464   g_return_if_fail (queue != NULL);
465
466   g_queue_insert_before (queue->queue, sibling, data);
467
468   signal_waiting_threads (queue);
469 }
470
471 static gpointer
472 async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
473 {
474   gpointer retval;
475   GstBuffer *tail_buffer = NULL;
476
477   if (queue->pop_flushing)
478     return NULL;
479
480   while (queue->pop_blocking) {
481     queue->waiting_threads++;
482     g_cond_wait (queue->cond, queue->mutex);
483     queue->waiting_threads--;
484     if (queue->pop_flushing)
485       return NULL;
486   }
487
488   if (async_jitter_queue_length_ts_units_unlocked (queue) <=
489       queue->low_threshold * queue->max_queue_length
490       && queue->pops_remaining == 0) {
491     if (!queue->buffering) {
492       queue->buffering = TRUE;
493       queue->pops_remaining = queue->queue->length;
494     } else {
495       while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
496         queue->waiting_threads++;
497         g_cond_wait (queue->cond, queue->mutex);
498         queue->waiting_threads--;
499         if (queue->pop_flushing)
500           return NULL;
501       }
502     }
503   }
504
505   retval = g_queue_pop_tail (queue->queue);
506   if (queue->pops_remaining)
507     queue->pops_remaining--;
508
509   tail_buffer = g_queue_peek_tail (queue->queue);
510   if (tail_buffer) {
511     if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
512       queue->tail_buffer_duration = 0;
513     } else if (gst_rtp_buffer_get_seq (tail_buffer)
514         - gst_rtp_buffer_get_seq (retval) == 1) {
515       queue->tail_buffer_duration =
516           calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
517           gst_rtp_buffer_get_timestamp (retval));
518     } else {
519       /* There is a sequence number gap -> we can't calculate the duration
520        * let's just set it to 0 */
521       queue->tail_buffer_duration = 0;
522     }
523   }
524
525   g_assert (retval);
526
527   return retval;
528 }
529
530 /**
531  * async_jitter_queue_pop:
532  * @queue: a #AsyncJitterQueue.
533  * 
534  * Pops data from the @queue. This function blocks until data become
535  * available. If pop is disabled, tis function return NULL.
536  *
537  * Return value: data from the queue.
538  **/
539 gpointer
540 async_jitter_queue_pop (AsyncJitterQueue * queue)
541 {
542   gpointer retval;
543
544   g_return_val_if_fail (queue, NULL);
545   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
546
547   g_mutex_lock (queue->mutex);
548   retval = async_jitter_queue_pop_intern_unlocked (queue);
549   g_mutex_unlock (queue->mutex);
550
551   return retval;
552 }
553
554 /**
555  * async_jitter_queue_pop_unlocked:
556  * @queue: a #AsyncJitterQueue.
557  * 
558  * Pops data from the @queue. This function blocks until data become
559  * available. This function must be called while holding the @queue's
560  * lock.
561  *
562  * Return value: data from the queue.
563  **/
564 gpointer
565 async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
566 {
567   g_return_val_if_fail (queue, NULL);
568   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
569
570   return async_jitter_queue_pop_intern_unlocked (queue);
571 }
572
573 /**
574  * async_jitter_queue_length:
575  * @queue: a #AsyncJitterQueue.
576  * 
577  * Returns the length of the queue
578  * Return value: the length of the @queue.
579  **/
580 gint
581 async_jitter_queue_length (AsyncJitterQueue * queue)
582 {
583   gint retval;
584
585   g_return_val_if_fail (queue, 0);
586   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
587
588   g_mutex_lock (queue->mutex);
589   retval = queue->queue->length;
590   g_mutex_unlock (queue->mutex);
591
592   return retval;
593 }
594
595 /**
596  * async_jitter_queue_length_unlocked:
597  * @queue: a #AsyncJitterQueue.
598  *
599  * Returns the length of the queue.
600  *
601  * Return value: the length of the @queue.
602  **/
603 gint
604 async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
605 {
606   g_return_val_if_fail (queue, 0);
607   g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
608
609   return queue->queue->length;
610 }
611
612 /**
613  * async_jitter_queue_set_flushing_unlocked:
614  * @queue: a #AsyncJitterQueue.
615  * @free_func: a function to call to free the elements
616  * @user_data: user data passed to @free_func
617  * 
618  * This function is used to set/unset flushing. If flushing is set any
619  * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
620  * return NULL. Flushing is set by default.
621  */
622 void
623 async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
624     GFunc free_func, gpointer user_data)
625 {
626   g_return_if_fail (queue);
627   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
628
629   queue->pop_flushing = TRUE;
630   /* let's unblock any remaining pops */
631   if (queue->waiting_threads > 0)
632     g_cond_broadcast (queue->cond);
633   /* free data from queue */
634   g_queue_foreach (queue->queue, free_func, user_data);
635 }
636
637 /**
638  * async_jitter_queue_unset_flushing_unlocked:
639  * @queue: a #AsyncJitterQueue.
640  * @free_func: a function to call to free the elements
641  * @user_data: user data passed to @free_func
642  * 
643  * This function is used to set/unset flushing. If flushing is set any
644  * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
645  * return NULL. Flushing is set by default.
646  */
647 void
648 async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
649 {
650   g_return_if_fail (queue);
651   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
652
653   queue->pop_flushing = FALSE;
654   /* let's unblock any remaining pops */
655   if (queue->waiting_threads > 0)
656     g_cond_broadcast (queue->cond);
657 }
658
659 /**
660  * async_jitter_queue_set_blocking_unlocked:
661  * @queue: a #AsyncJitterQueue.
662  * @enabled: a boolean to enable/disable blocking
663  * 
664  * This function is used to enable/disable blocking. If blocking is enabled any
665  * pops will be blocked until the queue is unblocked. The queue is blocked by
666  * default.
667  */
668 void
669 async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
670     gboolean blocking)
671 {
672   g_return_if_fail (queue);
673   g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
674
675   queue->pop_blocking = blocking;
676   /* let's unblock any remaining pops */
677   if (queue->waiting_threads > 0)
678     g_cond_broadcast (queue->cond);
679 }