gst/rtpmanager/: Remove complicated async queue and replace with more simple jitterbu...
authorWim Taymans <wim.taymans@gmail.com>
Fri, 10 Aug 2007 17:16:53 +0000 (17:16 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:28 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/Makefile.am:
* gst/rtpmanager/async_jitter_queue.c:
* gst/rtpmanager/async_jitter_queue.h:
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_class_init),
(rtp_jitter_buffer_init), (rtp_jitter_buffer_finalize),
(rtp_jitter_buffer_new), (compare_seqnum),
(rtp_jitter_buffer_insert), (rtp_jitter_buffer_pop),
(rtp_jitter_buffer_flush), (rtp_jitter_buffer_num_packets),
(rtp_jitter_buffer_get_ts_diff):
* gst/rtpmanager/rtpjitterbuffer.h:
Remove complicated async queue and replace with more simple jitterbuffer
code while also fixing some bugs.
* gst/rtpmanager/gstrtpbin-marshal.list:
* gst/rtpmanager/gstrtpbin.c: (on_new_ssrc), (on_ssrc_collision),
(on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout),
(create_session), (gst_rtp_bin_class_init), (create_recv_rtp),
(create_send_rtp):
* gst/rtpmanager/gstrtpbin.h:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose),
(gst_jitter_buffer_sink_parse_caps),
(gst_rtp_jitter_buffer_flush_start),
(gst_rtp_jitter_buffer_flush_stop),
(gst_rtp_jitter_buffer_change_state),
(gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain),
(gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_set_property):
* gst/rtpmanager/gstrtpsession.c: (on_new_ssrc),
(on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc),
(on_bye_timeout), (on_timeout), (gst_rtp_session_class_init),
(gst_rtp_session_init):
* gst/rtpmanager/gstrtpsession.h:
* gst/rtpmanager/rtpsession.c: (on_bye_ssrc), (session_cleanup):
Use new jitterbuffer code.
Expose some new signals in preparation for handling EOS.

12 files changed:
gst/rtpmanager/Makefile.am
gst/rtpmanager/async_jitter_queue.c [deleted file]
gst/rtpmanager/async_jitter_queue.h [deleted file]
gst/rtpmanager/gstrtpbin-marshal.list
gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/gstrtpsession.h
gst/rtpmanager/rtpjitterbuffer.c [new file with mode: 0644]
gst/rtpmanager/rtpjitterbuffer.h [new file with mode: 0644]
gst/rtpmanager/rtpsession.c

index 9e47cbd..cad27a3 100644 (file)
@@ -13,10 +13,10 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
 libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
                              gstrtpbin.c \
                              gstrtpclient.c \
-                             async_jitter_queue.c \
                              gstrtpjitterbuffer.c \
                              gstrtpptdemux.c \
                              gstrtpssrcdemux.c \
+                             rtpjitterbuffer.c      \
                              rtpsession.c      \
                              rtpsource.c      \
                              rtpstats.c      \
@@ -27,10 +27,10 @@ nodist_libgstrtpmanager_la_SOURCES = \
 
 noinst_HEADERS = gstrtpbin.h \
                 gstrtpclient.h \
-                 async_jitter_queue.h \
                 gstrtpjitterbuffer.h \
                  gstrtpptdemux.h \
                  gstrtpssrcdemux.h \
+                 rtpjitterbuffer.h \
                 rtpsession.h  \
                 rtpsource.h  \
                 rtpstats.h  \
diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c
deleted file mode 100644 (file)
index 73597b2..0000000
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Async Jitter Queue based on g_async_queue
- * This code is GST RTP smart and deals with timestamps
- *
- * Farsight Voice+Video library
- *  Copyright 2007 Collabora Ltd,
- *  Copyright 2007 Nokia Corporation
- *   @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
- *
- *   This is an async queue that has a buffering mecanism based on the set low
- *   and high threshold. When the lower threshold is reached, the queue will
- *   fill itself up until the higher threshold is reached before allowing any
- *   pops to occur. This allows a jitterbuffer of at least min threshold items
- *   to be available.
- */
-
-/* GLIB - Library of useful routines for C programming
- * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
- *
- * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
- * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-/*
- * MT safe
- */
-
-#include "config.h"
-
-#include "async_jitter_queue.h"
-
-#include <gst/gst.h>
-#include <gst/rtp/gstrtpbuffer.h>
-
-#define DEFAULT_LOW_THRESHOLD 0.1
-#define DEFAULT_HIGH_THRESHOLD 0.9
-
-struct _AsyncJitterQueue
-{
-  GMutex *mutex;
-  GCond *cond;
-  GQueue *queue;
-  guint waiting_threads;
-  gint32 ref_count;
-  gfloat low_threshold;
-  gfloat high_threshold;
-  guint32 max_queue_length;
-  gboolean buffering;
-  gboolean pop_flushing;
-  gboolean pop_blocking;
-  guint pops_remaining;
-  guint32 tail_buffer_duration;
-};
-
-/**
- * async_jitter_queue_new:
- *
- * Creates a new asynchronous queue with the initial reference count of 1.
- *
- * Return value: the new #AsyncJitterQueue.
- **/
-AsyncJitterQueue *
-async_jitter_queue_new (void)
-{
-  AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
-
-  retval->mutex = g_mutex_new ();
-  retval->cond = g_cond_new ();
-  retval->queue = g_queue_new ();
-  retval->waiting_threads = 0;
-  retval->ref_count = 1;
-  retval->low_threshold = DEFAULT_LOW_THRESHOLD;
-  retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
-  retval->buffering = TRUE;     /* we need to buffer initially */
-  retval->pop_flushing = TRUE;
-  retval->pop_blocking = TRUE;
-  retval->pops_remaining = 0;
-  retval->tail_buffer_duration = 0;
-  return retval;
-}
-
-/* checks buffering state and wakes up waiting pops */
-void
-signal_waiting_threads (AsyncJitterQueue * queue)
-{
-  if (async_jitter_queue_length_ts_units_unlocked (queue) >=
-      queue->high_threshold * queue->max_queue_length) {
-    GST_DEBUG ("stop buffering");
-    queue->buffering = FALSE;
-  }
-
-  if (queue->waiting_threads > 0) {
-    if (!queue->buffering) {
-      g_cond_signal (queue->cond);
-    }
-  }
-}
-
-/**
- * async_jitter_queue_ref:
- * @queue: a #AsyncJitterQueue.
- *
- * Increases the reference count of the asynchronous @queue by 1. You
- * do not need to hold the lock to call this function.
- *
- * Returns: the @queue that was passed in (since 2.6)
- **/
-AsyncJitterQueue *
-async_jitter_queue_ref (AsyncJitterQueue * queue)
-{
-  g_return_val_if_fail (queue, NULL);
-  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
-
-  g_atomic_int_inc (&queue->ref_count);
-
-  return queue;
-}
-
-/**
- * async_jitter_queue_ref_unlocked:
- * @queue: a #AsyncJitterQueue.
- *
- * Increases the reference count of the asynchronous @queue by 1.
- **/
-void
-async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  g_atomic_int_inc (&queue->ref_count);
-}
-
-/**
- * async_jitter_queue_set_low_threshold:
- * @queue: a #AsyncJitterQueue.
- * @threshold: the lower threshold (fraction of max size)
- *
- * Sets the low threshold on the queue. This threshold indicates the minimum
- * number of items allowed in the queue before we refill it up to the set
- * maximum threshold.
- **/
-void
-async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
-    gfloat threshold)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  queue->low_threshold = threshold;
-}
-
-/**
- * async_jitter_queue_set_max_threshold:
- * @queue: a #AsyncJitterQueue.
- * @threshold: the higher threshold (fraction of max size)
- *
- * Sets the high threshold on the queue. This threshold indicates the amount of
- * items to fill in the queue before releasing any blocking pop calls. This
- * blocking mecanism is only triggered when we reach the low threshold and must
- * refill the queue.
- **/
-void
-async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
-    gfloat threshold)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  queue->high_threshold = threshold;
-}
-
-/* set the maximum queue length in RTP timestamp units */
-void
-async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
-    guint32 max_length)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  queue->max_queue_length = max_length;
-}
-
-GQueue *
-async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
-{
-  g_return_val_if_fail (queue, NULL);
-
-  return queue->queue;
-}
-
-static guint32
-calculate_ts_diff (guint32 high_ts, guint32 low_ts)
-{
-  /* it needs to work if ts wraps */
-  if (high_ts >= low_ts) {
-    return high_ts - low_ts;
-  } else {
-    return high_ts + G_MAXUINT32 + 1 - low_ts;
-  }
-}
-
-/* this function returns the length of the queue in timestamp units. It will
- * also add the duration of the last buffer in the queue */
-/* FIXME This function wrongly assumes that there are no missing packets inside
- * the buffer, in reality it needs to check for gaps and subsctract those from
- * the total */
-guint32
-async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
-{
-  guint32 tail_ts;
-  guint32 head_ts;
-  guint32 ret;
-  GstBuffer *head;
-  GstBuffer *tail;
-
-  g_return_val_if_fail (queue, 0);
-
-  if (queue->queue->length < 2) {
-    return 0;
-  }
-
-  tail = g_queue_peek_tail (queue->queue);
-  head = g_queue_peek_head (queue->queue);
-
-  if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
-    return 0;
-
-  tail_ts = gst_rtp_buffer_get_timestamp (tail);
-  head_ts = gst_rtp_buffer_get_timestamp (head);
-
-  ret = calculate_ts_diff (head_ts, tail_ts);
-
-  /* let's add the duration of the tail buffer */
-  ret += queue->tail_buffer_duration;
-
-  return ret;
-}
-
-/**
- * async_jitter_queue_unref_and_unlock:
- * @queue: a #AsyncJitterQueue.
- *
- * Decreases the reference count of the asynchronous @queue by 1 and
- * releases the lock. This function must be called while holding the
- * @queue's lock. If the reference count went to 0, the @queue will be
- * destroyed and the memory allocated will be freed.
- **/
-void
-async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  g_mutex_unlock (queue->mutex);
-  async_jitter_queue_unref (queue);
-}
-
-/**
- * async_jitter_queue_unref:
- * @queue: a #AsyncJitterQueue.
- *
- * Decreases the reference count of the asynchronous @queue by 1. If
- * the reference count went to 0, the @queue will be destroyed and the
- * memory allocated will be freed. So you are not allowed to use the
- * @queue afterwards, as it might have disappeared. You do not need to
- * hold the lock to call this function.
- **/
-void
-async_jitter_queue_unref (AsyncJitterQueue * queue)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  if (g_atomic_int_dec_and_test (&queue->ref_count)) {
-    g_return_if_fail (queue->waiting_threads == 0);
-    g_mutex_free (queue->mutex);
-    if (queue->cond)
-      g_cond_free (queue->cond);
-    g_queue_free (queue->queue);
-    g_free (queue);
-  }
-}
-
-/**
- * async_jitter_queue_lock:
- * @queue: a #AsyncJitterQueue.
- *
- * Acquires the @queue's lock. After that you can only call the
- * <function>async_jitter_queue_*_unlocked()</function> function variants on that
- * @queue. Otherwise it will deadlock.
- **/
-void
-async_jitter_queue_lock (AsyncJitterQueue * queue)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  g_mutex_lock (queue->mutex);
-}
-
-/**
- * async_jitter_queue_unlock:
- * @queue: a #AsyncJitterQueue.
- *
- * Releases the queue's lock.
- **/
-void
-async_jitter_queue_unlock (AsyncJitterQueue * queue)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  g_mutex_unlock (queue->mutex);
-}
-
-/**
- * async_jitter_queue_push:
- * @queue: a #AsyncJitterQueue.
- * @data: @data to push into the @queue.
- *
- * Pushes the @data into the @queue. @data must not be %NULL.
- **/
-void
-async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-  g_return_if_fail (data);
-
-  g_mutex_lock (queue->mutex);
-  async_jitter_queue_push_unlocked (queue, data);
-  g_mutex_unlock (queue->mutex);
-}
-
-/**
- * async_jitter_queue_push_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @data: @data to push into the @queue.
- *
- * Pushes the @data into the @queue. @data must not be %NULL. This
- * function must be called while holding the @queue's lock.
- **/
-void
-async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-  g_return_if_fail (data);
-
-  g_queue_push_head (queue->queue, data);
-
-  signal_waiting_threads (queue);
-}
-
-/**
- * async_jitter_queue_push_sorted:
- * @queue: a #AsyncJitterQueue
- * @data: the @data to push into the @queue
- * @func: the #GCompareDataFunc is used to sort @queue. This function
- *     is passed two elements of the @queue. The function should return
- *     0 if they are equal, a negative value if the first element
- *     should be higher in the @queue or a positive value if the first
- *     element should be lower in the @queue than the second element.
- * @user_data: user data passed to @func.
- *
- * Inserts @data into @queue using @func to determine the new
- * position.
- *
- * This function requires that the @queue is sorted before pushing on
- * new elements.
- *
- * This function will lock @queue before it sorts the queue and unlock
- * it when it is finished.
- *
- * For an example of @func see async_jitter_queue_sort().
- *
- * Since: 2.10
- **/
-gboolean
-async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
-    gpointer data, GCompareDataFunc func, gpointer user_data)
-{
-  gboolean ret;
-
-  g_return_val_if_fail (queue != NULL, FALSE);
-
-  g_mutex_lock (queue->mutex);
-  ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
-  g_mutex_unlock (queue->mutex);
-
-  return ret;
-}
-
-/**
- * async_jitter_queue_push_sorted_unlocked:
- * @queue: a #AsyncJitterQueue
- * @data: the @data to push into the @queue
- * @func: the #GCompareDataFunc is used to sort @queue. This function
- *     is passed two elements of the @queue. The function should return
- *     0 if they are equal, a negative value if the first element
- *     should be higher in the @queue or a positive value if the first
- *     element should be lower in the @queue than the second element.
- * @user_data: user data passed to @func.
- *
- * Inserts @data into @queue using @func to determine the new
- * position.
- *
- * This function requires that the @queue is sorted before pushing on
- * new elements.
- *
- * If @GCompareDataFunc returns 0, this function does not insert @data and
- * return FALSE.
- *
- * This function is called while holding the @queue's lock.
- *
- * For an example of @func see async_jitter_queue_sort().
- *
- * Since: 2.10
- **/
-gboolean
-async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
-    gpointer data, GCompareDataFunc func, gpointer user_data)
-{
-  GList *list;
-  gint func_ret = TRUE;
-
-  g_return_val_if_fail (queue != NULL, FALSE);
-
-  list = queue->queue->head;
-  while (list && (func_ret = func (list->data, data, user_data)) < 0)
-    list = list->next;
-
-  if (func_ret == 0) {
-    return FALSE;
-  }
-  if (list) {
-    g_queue_insert_before (queue->queue, list, data);
-  } else {
-    g_queue_push_tail (queue->queue, data);
-  }
-
-  signal_waiting_threads (queue);
-  return TRUE;
-}
-
-void
-async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
-    GList * sibling, gpointer data)
-{
-  g_return_if_fail (queue != NULL);
-
-  g_queue_insert_before (queue->queue, sibling, data);
-
-  signal_waiting_threads (queue);
-}
-
-static gpointer
-async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
-{
-  gpointer retval;
-  GstBuffer *tail_buffer = NULL;
-  guint tsunits;
-
-  if (queue->pop_flushing)
-    return NULL;
-
-  while (queue->pop_blocking) {
-    queue->waiting_threads++;
-    g_cond_wait (queue->cond, queue->mutex);
-    queue->waiting_threads--;
-    if (queue->pop_flushing)
-      return NULL;
-  }
-
-
-  tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
-
-  GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
-      (int) (queue->low_threshold * queue->max_queue_length));
-
-  if (tsunits <= queue->low_threshold * queue->max_queue_length
-      && queue->pops_remaining == 0) {
-    if (!queue->buffering) {
-      GST_DEBUG ("start buffering");
-      queue->buffering = TRUE;
-      queue->pops_remaining = queue->queue->length;
-    }
-
-    GST_DEBUG ("wait for data");
-    while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
-      queue->waiting_threads++;
-      g_cond_wait (queue->cond, queue->mutex);
-      queue->waiting_threads--;
-      if (queue->pop_flushing)
-        return NULL;
-    }
-  }
-
-  retval = g_queue_pop_tail (queue->queue);
-  if (queue->pops_remaining)
-    queue->pops_remaining--;
-
-  tail_buffer = g_queue_peek_tail (queue->queue);
-  if (tail_buffer) {
-    if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
-      queue->tail_buffer_duration = 0;
-    } else if (gst_rtp_buffer_get_seq (tail_buffer)
-        - gst_rtp_buffer_get_seq (retval) == 1) {
-      queue->tail_buffer_duration =
-          calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
-          gst_rtp_buffer_get_timestamp (retval));
-    } else {
-      /* There is a sequence number gap -> we can't calculate the duration
-       * let's just set it to 0 */
-      queue->tail_buffer_duration = 0;
-    }
-  }
-
-  g_assert (retval);
-
-  return retval;
-}
-
-/**
- * async_jitter_queue_pop:
- * @queue: a #AsyncJitterQueue.
- *
- * Pops data from the @queue. This function blocks until data become
- * available. If pop is disabled, tis function return NULL.
- *
- * Return value: data from the queue.
- **/
-gpointer
-async_jitter_queue_pop (AsyncJitterQueue * queue)
-{
-  gpointer retval;
-
-  g_return_val_if_fail (queue, NULL);
-  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
-
-  g_mutex_lock (queue->mutex);
-  retval = async_jitter_queue_pop_intern_unlocked (queue);
-  g_mutex_unlock (queue->mutex);
-
-  return retval;
-}
-
-/**
- * async_jitter_queue_pop_unlocked:
- * @queue: a #AsyncJitterQueue.
- *
- * Pops data from the @queue. This function blocks until data become
- * available. This function must be called while holding the @queue's
- * lock.
- *
- * Return value: data from the queue.
- **/
-gpointer
-async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
-{
-  g_return_val_if_fail (queue, NULL);
-  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
-
-  return async_jitter_queue_pop_intern_unlocked (queue);
-}
-
-/**
- * async_jitter_queue_length:
- * @queue: a #AsyncJitterQueue.
- *
- * Returns the length of the queue
- * Return value: the length of the @queue.
- **/
-gint
-async_jitter_queue_length (AsyncJitterQueue * queue)
-{
-  gint retval;
-
-  g_return_val_if_fail (queue, 0);
-  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
-
-  g_mutex_lock (queue->mutex);
-  retval = queue->queue->length;
-  g_mutex_unlock (queue->mutex);
-
-  return retval;
-}
-
-/**
- * async_jitter_queue_length_unlocked:
- * @queue: a #AsyncJitterQueue.
- *
- * Returns the length of the queue.
- *
- * Return value: the length of the @queue.
- **/
-gint
-async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
-{
-  g_return_val_if_fail (queue, 0);
-  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
-
-  return queue->queue->length;
-}
-
-/**
- * async_jitter_queue_set_flushing_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @free_func: a function to call to free the elements
- * @user_data: user data passed to @free_func
- *
- * This function is used to set/unset flushing. If flushing is set any
- * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
- * return NULL. Flushing is set by default.
- */
-void
-async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
-    GFunc free_func, gpointer user_data)
-{
-  gpointer elem;
-
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  queue->pop_flushing = TRUE;
-  /* let's unblock any remaining pops */
-  if (queue->waiting_threads > 0)
-    g_cond_broadcast (queue->cond);
-  /* free data from queue */
-  while ((elem = g_queue_pop_head (queue->queue)))
-    free_func (elem, user_data);
-}
-
-/**
- * async_jitter_queue_unset_flushing_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @free_func: a function to call to free the elements
- * @user_data: user data passed to @free_func
- *
- * This function is used to set/unset flushing. If flushing is set any
- * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
- * return NULL. Flushing is set by default.
- */
-void
-async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  queue->pop_flushing = FALSE;
-  /* let's unblock any remaining pops */
-  if (queue->waiting_threads > 0)
-    g_cond_broadcast (queue->cond);
-}
-
-/**
- * async_jitter_queue_set_blocking_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @enabled: a boolean to enable/disable blocking
- *
- * This function is used to enable/disable blocking. If blocking is enabled any
- * pops will be blocked until the queue is unblocked. The queue is blocked by
- * default.
- */
-void
-async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
-    gboolean blocking)
-{
-  g_return_if_fail (queue);
-  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
-  queue->pop_blocking = blocking;
-  /* let's unblock any remaining pops */
-  if (queue->waiting_threads > 0)
-    g_cond_broadcast (queue->cond);
-}
diff --git a/gst/rtpmanager/async_jitter_queue.h b/gst/rtpmanager/async_jitter_queue.h
deleted file mode 100644 (file)
index bea76d6..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-/* Async Jitter Queue based on g_async_queue
- *
- * Farsight Voice+Video library
- *  Copyright 2007 Collabora Ltd, 
- *  Copyright 2007 Nokia Corporation
- *   @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
- */
-
-/* GLIB - Library of useful routines for C programming
- * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.         See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-/*
- * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
- * file for a list of people on the GLib Team.  See the ChangeLog
- * files for a list of changes.  These files are distributed with
- * GLib at ftp://ftp.gtk.org/pub/gtk/. 
- */
-
-#ifndef __ASYNCJITTERQUEUE_H__
-#define __ASYNCJITTERQUEUE_H__
-
-#include <glib.h>
-#include <glib/gthread.h>
-
-G_BEGIN_DECLS
-
-typedef struct _AsyncJitterQueue     AsyncJitterQueue;
-
-/* Asyncronous Queues, can be used to communicate between threads
- */
-
-/* Get a new AsyncJitterQueue with the ref_count 1 */
-AsyncJitterQueue*  async_jitter_queue_new                (void);
-
-/* Lock and unlock a AsyncJitterQueue. All functions lock the queue for
- * themselves, but in certain cirumstances you want to hold the lock longer,
- * thus you lock the queue, call the *_unlocked functions and unlock it again.
- */
-void          async_jitter_queue_lock               (AsyncJitterQueue *queue);
-void          async_jitter_queue_unlock             (AsyncJitterQueue *queue);
-
-/* Ref and unref the AsyncJitterQueue. */
-AsyncJitterQueue*  async_jitter_queue_ref           (AsyncJitterQueue *queue);
-void          async_jitter_queue_unref              (AsyncJitterQueue *queue);
-#ifndef G_DISABLE_DEPRECATED
-/* You don't have to hold the lock for calling *_ref and *_unref anymore. */
-void          async_jitter_queue_ref_unlocked       (AsyncJitterQueue *queue);
-void          async_jitter_queue_unref_and_unlock   (AsyncJitterQueue *queue);
-#endif /* !G_DISABLE_DEPRECATED */
-
-void          async_jitter_queue_set_low_threshold  (AsyncJitterQueue *queue,
-                                                gfloat threshold);
-void          async_jitter_queue_set_high_threshold (AsyncJitterQueue *queue,
-                                                gfloat threshold);
-
-void          async_jitter_queue_set_max_queue_length (AsyncJitterQueue *queue,
-                                                guint32 max_length);
-
-/* Push data into the async queue. Must not be NULL. */
-void          async_jitter_queue_push               (AsyncJitterQueue *queue,
-                                                gpointer     data);
-void          async_jitter_queue_push_unlocked      (AsyncJitterQueue *queue,
-                                                gpointer     data);
-gboolean      async_jitter_queue_push_sorted        (AsyncJitterQueue *queue,
-                                                gpointer          data,
-                                                GCompareDataFunc  func,
-                                                gpointer          user_data);
-
-void          async_jitter_queue_insert_after_unlocked(AsyncJitterQueue *queue,
-                                                GList *sibling,
-                                                gpointer data);
-
-gboolean      async_jitter_queue_push_sorted_unlocked(AsyncJitterQueue *queue,
-                                                gpointer          data,
-                                                GCompareDataFunc  func,
-                                                gpointer          user_data);
-
-/* Pop data from the async queue. When no data is there, the thread is blocked
- * until data arrives. */
-gpointer      async_jitter_queue_pop                (AsyncJitterQueue *queue);
-gpointer      async_jitter_queue_pop_unlocked       (AsyncJitterQueue *queue);
-
-/* Try to pop data. NULL is returned in case of empty queue. */
-gpointer      async_jitter_queue_try_pop            (AsyncJitterQueue *queue);
-gpointer      async_jitter_queue_try_pop_unlocked   (AsyncJitterQueue *queue);
-
-/* Wait for data until at maximum until end_time is reached. NULL is returned
- * in case of empty queue. */
-gpointer      async_jitter_queue_timed_pop          (AsyncJitterQueue *queue,
-                                                GTimeVal    *end_time);
-gpointer      async_jitter_queue_timed_pop_unlocked (AsyncJitterQueue *queue,
-                                                GTimeVal    *end_time);
-
-/* Return the length of the queue. Negative values mean that threads
- * are waiting, positve values mean that there are entries in the
- * queue. Actually this function returns the length of the queue minus
- * the number of waiting threads, async_jitter_queue_length == 0 could also
- * mean 'n' entries in the queue and 'n' thread waiting. Such can
- * happen due to locking of the queue or due to scheduling. */
-gint          async_jitter_queue_length             (AsyncJitterQueue *queue);
-gint          async_jitter_queue_length_unlocked    (AsyncJitterQueue *queue);
-
-void          async_jitter_queue_set_flushing_unlocked   (AsyncJitterQueue* queue, 
-                                                          GFunc free_func, gpointer user_data);
-void          async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue* queue);
-void          async_jitter_queue_set_blocking_unlocked   (AsyncJitterQueue* queue,
-                                                          gboolean blocking);
-guint32
-async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue *queue);
-
-G_END_DECLS
-
-#endif /* __ASYNCJITTERQUEUE_H__ */
-
index d0b9103..ca760d8 100644 (file)
@@ -2,3 +2,4 @@ UINT:UINT
 BOXED:UINT
 BOXED:UINT,UINT
 VOID:UINT,OBJECT
+VOID:UINT,UINT
index cc0afd0..e96291b 100644 (file)
@@ -158,6 +158,13 @@ enum
 {
   SIGNAL_REQUEST_PT_MAP,
   SIGNAL_CLEAR_PT_MAP,
+
+  SIGNAL_ON_NEW_SSRC,
+  SIGNAL_ON_SSRC_COLLISION,
+  SIGNAL_ON_SSRC_VALIDATED,
+  SIGNAL_ON_BYE_SSRC,
+  SIGNAL_ON_BYE_TIMEOUT,
+  SIGNAL_ON_TIMEOUT,
   LAST_SIGNAL
 };
 
@@ -258,6 +265,48 @@ find_session_by_id (GstRTPBin * rtpbin, gint id)
   return NULL;
 }
 
+static void
+on_new_ssrc (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
+      sess->id, ssrc);
+}
+
+static void
+on_ssrc_collision (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
+      sess->id, ssrc);
+}
+
+static void
+on_ssrc_validated (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
+      sess->id, ssrc);
+}
+
+static void
+on_bye_ssrc (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
+      sess->id, ssrc);
+}
+
+static void
+on_bye_timeout (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
+      sess->id, ssrc);
+}
+
+static void
+on_timeout (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
+      sess->id, ssrc);
+}
+
 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
 static GstRTPBinSession *
 create_session (GstRTPBin * rtpbin, gint id)
@@ -284,6 +333,18 @@ create_session (GstRTPBin * rtpbin, gint id)
   g_signal_connect (session, "request-pt-map",
       (GCallback) pt_map_requested, sess);
 
+  g_signal_connect (sess->session, "on-new-ssrc",
+      (GCallback) on_new_ssrc, sess);
+  g_signal_connect (sess->session, "on-ssrc-collision",
+      (GCallback) on_ssrc_collision, sess);
+  g_signal_connect (sess->session, "on-ssrc-validated",
+      (GCallback) on_ssrc_validated, sess);
+  g_signal_connect (sess->session, "on-bye-ssrc",
+      (GCallback) on_bye_ssrc, sess);
+  g_signal_connect (sess->session, "on-bye-timeout",
+      (GCallback) on_bye_timeout, sess);
+  g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
+
   gst_bin_add (GST_BIN_CAST (rtpbin), session);
   gst_element_set_state (session, GST_STATE_PLAYING);
   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
@@ -557,6 +618,86 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass)
       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTPBinClass, clear_pt_map),
       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
 
+  /**
+   * GstRTPBin::on-new-ssrc:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify of a new SSRC that entered @session.
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
+      g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_new_ssrc),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+  /**
+   * GstRTPBin::on-ssrc_collision:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify when we have an SSRC collision
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
+      g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_ssrc_collision),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+  /**
+   * GstRTPBin::on-ssrc_validated:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify of a new SSRC that became validated.
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
+      g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_ssrc_validated),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+
+  /**
+   * GstRTPBin::on-bye-ssrc:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify of an SSRC that became inactive because of a BYE packet.
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
+      g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_bye_ssrc),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+  /**
+   * GstRTPBin::on-bye-timeout:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify of an SSRC that has timed out because of BYE
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
+      g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_bye_timeout),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+  /**
+   * GstRTPBin::on-timeout:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify of an SSRC that has timed out
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
+      g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_timeout),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+
   gstelement_class->provide_clock =
       GST_DEBUG_FUNCPTR (gst_rtp_bin_provide_clock);
   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
index ffbdd62..99cb281 100644 (file)
@@ -57,8 +57,14 @@ struct _GstRTPBinClass {
 
   /* get the caps for pt */
   GstCaps* (*request_pt_map)  (GstRTPBin *rtpbin, guint session, guint pt);
-
   void     (*clear_pt_map)    (GstRTPBin *rtpbin);
+
+  void     (*on_new_ssrc)       (GstRTPBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_ssrc_collision) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_ssrc_validated) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_bye_ssrc)       (GstRTPBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_bye_timeout)    (GstRTPBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_timeout)        (GstRTPBin *rtpbin, guint session, guint32 ssrc);
 };
 
 GType gst_rtp_bin_get_type (void);
index 79d0678..fe85f87 100644 (file)
@@ -4,7 +4,7 @@
  *  Copyright 2007 Collabora Ltd, 
  *  Copyright 2007 Nokia Corporation
  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
- *  Copyright 2007 Wim Taymans <wim@fluendo.com>
+ *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -72,7 +72,7 @@
 #include "gstrtpbin-marshal.h"
 
 #include "gstrtpjitterbuffer.h"
-#include "async_jitter_queue.h"
+#include "rtpjitterbuffer.h"
 
 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
@@ -87,7 +87,7 @@ GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
     "Filter/Network/RTP",
     "A buffer that deals with network jitter and other transmission faults",
     "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
-    "Wim Taymans <wim@fluendo.com>");
+    "Wim Taymans <wim.taymans@gmail.com>");
 
 /* RTPJitterBuffer signals and args */
 enum
@@ -107,11 +107,32 @@ enum
   PROP_DROP_ON_LATENCY
 };
 
+#define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
+
+#define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
+  JBUF_LOCK (priv);                                   \
+  if (priv->srcresult != GST_FLOW_OK)                 \
+    goto label;                                       \
+} G_STMT_END
+
+#define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
+#define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
+
+#define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
+  JBUF_WAIT(priv);                                    \
+  if (priv->srcresult != GST_FLOW_OK)                 \
+    goto label;                                       \
+} G_STMT_END
+
+#define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
+
 struct _GstRTPJitterBufferPrivate
 {
   GstPad *sinkpad, *srcpad;
 
-  AsyncJitterQueue *queue;
+  RTPJitterBuffer *jbuf;
+  GMutex *jbuf_lock;
+  GCond *jbuf_cond;
 
   /* properties */
   guint latency_ms;
@@ -122,12 +143,16 @@ struct _GstRTPJitterBufferPrivate
   /* the next expected seqnum */
   guint32 next_seqnum;
 
+  /* state */
+  gboolean eos;
+
   /* clock rate and rtp timestamp offset */
   gint32 clock_rate;
   gint64 clock_base;
 
   /* when we are shutting down */
   GstFlowReturn srcresult;
+  gboolean blocked;
 
   /* for sync */
   GstSegment segment;
@@ -292,9 +317,9 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
   priv->latency_ms = DEFAULT_LATENCY_MS;
   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
 
-  priv->queue = async_jitter_queue_new ();
-  async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD);
-  async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD);
+  priv->jbuf = rtp_jitter_buffer_new ();
+  priv->jbuf_lock = g_mutex_new ();
+  priv->jbuf_cond = g_cond_new ();
 
   priv->waiting_seqnum = -1;
 
@@ -332,9 +357,9 @@ gst_rtp_jitter_buffer_dispose (GObject * object)
   GstRTPJitterBuffer *jitterbuffer;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
-  if (jitterbuffer->priv->queue) {
-    async_jitter_queue_unref (jitterbuffer->priv->queue);
-    jitterbuffer->priv->queue = NULL;
+  if (jitterbuffer->priv->jbuf) {
+    g_object_unref (jitterbuffer->priv->jbuf);
+    jitterbuffer->priv->jbuf = NULL;
   }
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
@@ -430,9 +455,6 @@ gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer,
   } else
     priv->next_seqnum = -1;
 
-  async_jitter_queue_set_max_queue_length (priv->queue,
-      priv->latency_ms * priv->clock_rate / 1000);
-
   return TRUE;
 
   /* ERRORS */
@@ -470,34 +492,24 @@ gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
 }
 
 static void
-free_func (gpointer data, GstRTPJitterBuffer * user_data)
-{
-  if (GST_IS_BUFFER (data))
-    gst_buffer_unref (GST_BUFFER_CAST (data));
-  else
-    gst_event_unref (GST_EVENT_CAST (data));
-}
-
-static void
 gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
 {
   GstRTPJitterBufferPrivate *priv;
 
   priv = jitterbuffer->priv;
 
-  async_jitter_queue_lock (priv->queue);
+  JBUF_LOCK (priv);
   /* mark ourselves as flushing */
   priv->srcresult = GST_FLOW_WRONG_STATE;
   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
   /* this unblocks any waiting pops on the src pad task */
-  async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue,
-      (GFunc) free_func, jitterbuffer);
+  JBUF_SIGNAL (priv);
+  rtp_jitter_buffer_flush (priv->jbuf);
   /* unlock clock, we just unschedule, the entry will be released by the 
    * locking streaming thread. */
   if (priv->clock_id)
     gst_clock_id_unschedule (priv->clock_id);
-
-  async_jitter_queue_unlock (priv->queue);
+  JBUF_UNLOCK (priv);
 }
 
 static void
@@ -507,7 +519,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
 
   priv = jitterbuffer->priv;
 
-  async_jitter_queue_lock (priv->queue);
+  JBUF_LOCK (priv);
   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
   /* Mark as non flushing */
   priv->srcresult = GST_FLOW_OK;
@@ -515,9 +527,8 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
   priv->last_popped_seqnum = -1;
   priv->next_seqnum = -1;
   priv->clock_rate = -1;
-  /* allow pops from the src pad task */
-  async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue);
-  async_jitter_queue_unlock (priv->queue);
+  priv->eos = FALSE;
+  JBUF_UNLOCK (priv);
 }
 
 static gboolean
@@ -566,21 +577,20 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* reset negotiated values */
       priv->clock_rate = -1;
       priv->clock_base = -1;
       /* block until we go to PLAYING */
-      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
-          TRUE);
-      async_jitter_queue_unlock (priv->queue);
+      priv->blocked = TRUE;
+      JBUF_UNLOCK (priv);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* unblock to allow streaming in PLAYING */
-      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
-          FALSE);
-      async_jitter_queue_unlock (priv->queue);
+      priv->blocked = FALSE;
+      JBUF_SIGNAL (priv);
+      JBUF_UNLOCK (priv);
       break;
     default:
       break;
@@ -596,11 +606,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
         ret = GST_STATE_CHANGE_NO_PREROLL;
       break;
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* block to stop streaming when PAUSED */
-      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
-          TRUE);
-      async_jitter_queue_unlock (priv->queue);
+      priv->blocked = TRUE;
+      JBUF_UNLOCK (priv);
       if (ret != GST_STATE_CHANGE_FAILURE)
         ret = GST_STATE_CHANGE_NO_PREROLL;
       break;
@@ -630,30 +639,6 @@ priv_compare_rtp_seq_lt (guint16 a, guint16 b)
   }
 }
 
-/**
- * gets the seqnum from the buffers and compare them 
- */
-static gint
-compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b)
-{
-  gint ret;
-
-  if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) {
-    /* two buffers */
-    ret = priv_compare_rtp_seq_lt
-        (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)),
-        gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b)));
-  } else {
-    /* one of them is an event, the event always goes before the other element
-     * so we return -1. */
-    if (GST_IS_EVENT (a))
-      ret = -1;
-    else
-      ret = 1;
-  }
-  return ret;
-}
-
 static gboolean
 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
 {
@@ -707,16 +692,20 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_EOS:
     {
       /* push EOS in queue. We always push it at the head */
-      async_jitter_queue_lock (priv->queue);
-      GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
+      JBUF_LOCK (priv);
       /* check for flushing, we need to discard the event and return FALSE when
        * we are flushing */
       ret = priv->srcresult == GST_FLOW_OK;
-      if (ret)
-        async_jitter_queue_push_unlocked (priv->queue, event);
-      else
+      if (ret) {
+        GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
+        priv->eos = TRUE;
+        JBUF_SIGNAL (priv);
+      } else {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
+            gst_flow_get_name (priv->srcresult));
         gst_event_unref (event);
-      async_jitter_queue_unlock (priv->queue);
+      }
+      JBUF_UNLOCK (priv);
       break;
     }
     default:
@@ -780,7 +769,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   GstRTPJitterBuffer *jitterbuffer;
   GstRTPJitterBufferPrivate *priv;
   guint16 seqnum;
-  GstFlowReturn ret;
+  GstFlowReturn ret = GST_FLOW_OK;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
@@ -803,10 +792,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   seqnum = gst_rtp_buffer_get_seq (buffer);
   GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
 
-  async_jitter_queue_lock (priv->queue);
-  ret = priv->srcresult;
-  if (ret != GST_FLOW_OK)
-    goto out_flushing;
+  JBUF_LOCK_CHECK (priv, out_flushing);
+  /* don't accept more data on EOS */
+  if (priv->eos)
+    goto have_eos;
 
   /* let's check if this buffer is too late, we cannot accept packets with
    * bigger seqnum than the one we already pushed. */
@@ -818,14 +807,18 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   /* let's drop oldest packet if the queue is already full and drop-on-latency
    * is set. */
   if (priv->drop_on_latency) {
-    if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >=
-        priv->latency_ms * priv->clock_rate / 1000) {
+    guint64 latency_ts;
+
+    latency_ts =
+        gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
+
+    if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
       GstBuffer *old_buf;
 
       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
           seqnum);
 
-      old_buf = async_jitter_queue_pop_unlocked (priv->queue);
+      old_buf = rtp_jitter_buffer_pop (priv->jbuf);
       gst_buffer_unref (old_buf);
     }
   }
@@ -833,10 +826,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   /* now insert the packet into the queue in sorted order. This function returns
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
-  if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer,
-          (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL))
+  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer))
     goto duplicate;
 
+  /* signal addition of new buffer */
+  JBUF_SIGNAL (priv);
+
   /* let's unschedule and unblock any waiting buffers. We only want to do this
    * if there is a currently waiting newer (> seqnum) buffer  */
   if (priv->clock_id) {
@@ -846,11 +841,11 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
     }
   }
 
-  GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d",
-      seqnum, async_jitter_queue_length_unlocked (priv->queue));
+  GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
+      seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
 
 finished:
-  async_jitter_queue_unlock (priv->queue);
+  JBUF_UNLOCK (priv);
 
   gst_object_unref (jitterbuffer);
 
@@ -875,10 +870,18 @@ not_negotiated:
   }
 out_flushing:
   {
+    ret = priv->srcresult;
     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
     gst_buffer_unref (buffer);
     goto finished;
   }
+have_eos:
+  {
+    ret = GST_FLOW_UNEXPECTED;
+    GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
+    gst_buffer_unref (buffer);
+    goto finished;
+  }
 too_late:
   {
     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
@@ -908,8 +911,7 @@ static void
 gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
 {
   GstRTPJitterBufferPrivate *priv;
-  gpointer elem;
-  GstBuffer *outbuf;
+  GstBuffer *outbuf = NULL;
   GstFlowReturn result;
   guint16 seqnum;
   guint32 rtp_time;
@@ -918,44 +920,24 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
 
   priv = jitterbuffer->priv;
 
-  async_jitter_queue_lock (priv->queue);
+  JBUF_LOCK_CHECK (priv, flushing);
 again:
   GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
-  /* pop a buffer, we will get NULL if the queue was shut down */
-  elem = async_jitter_queue_pop_unlocked (priv->queue);
-  if (!elem)
-    goto no_elem;
-
-  /* special code for events */
-  if (G_UNLIKELY (GST_IS_EVENT (elem))) {
-    GstEvent *event = GST_EVENT_CAST (elem);
-
-    switch (GST_EVENT_TYPE (event)) {
-      case GST_EVENT_EOS:
-        GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue");
-        /* we don't expect more data now, makes upstream perform EOS actions */
-        priv->srcresult = GST_FLOW_UNEXPECTED;
-        break;
-      default:
-        GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue",
-            GST_EVENT_TYPE_NAME (event));
-        break;
-    }
-    async_jitter_queue_unlock (priv->queue);
-
-    /* push event */
-    gst_pad_push_event (priv->srcpad, event);
-    return;
+  /* wait if we are blocked or don't have a packet and eos */
+  while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf)
+          || priv->eos)) {
+    JBUF_WAIT_CHECK (priv, flushing);
   }
+  if (priv->eos)
+    goto do_eos;
 
-  /* we know it's a buffer now */
-  outbuf = GST_BUFFER_CAST (elem);
+  /* pop a buffer, we must have a buffer now */
+  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
 
   seqnum = gst_rtp_buffer_get_seq (outbuf);
 
-  GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d",
-      gst_rtp_buffer_get_seq (outbuf),
-      async_jitter_queue_length_unlocked (priv->queue));
+  GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, now %d left",
+      seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
 
   /* If we don't know what the next seqnum should be (== -1) we have to wait
    * because it might be possible that we are not receiving this buffer in-order,
@@ -1032,11 +1014,11 @@ again:
     GST_OBJECT_UNLOCK (jitterbuffer);
 
     /* release the lock so that the other end can push stuff or unlock */
-    async_jitter_queue_unlock (priv->queue);
+    JBUF_UNLOCK (priv);
 
     ret = gst_clock_id_wait (id, &jitter);
 
-    async_jitter_queue_lock (priv->queue);
+    JBUF_LOCK (priv);
     /* and free the entry */
     gst_clock_id_unref (id);
     priv->clock_id = NULL;
@@ -1054,8 +1036,7 @@ again:
       GST_DEBUG_OBJECT (jitterbuffer,
           "Wait got unscheduled, will retry to push with new buffer");
       /* reinserting popped buffer into queue */
-      if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf,
-              (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) {
+      if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) {
         GST_DEBUG_OBJECT (jitterbuffer,
             "Duplicate packet #%d detected, dropping", seqnum);
         priv->num_duplicates++;
@@ -1087,7 +1068,7 @@ push_buffer:
    * so the other end can push stuff in the queue again. */
   priv->last_popped_seqnum = seqnum;
   priv->next_seqnum = (seqnum + 1) & 0xffff;
-  async_jitter_queue_unlock (priv->queue);
+  JBUF_UNLOCK (priv);
 
   /* push buffer */
   GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
@@ -1098,20 +1079,22 @@ push_buffer:
   return;
 
   /* ERRORS */
-no_elem:
+do_eos:
   {
     /* store result, we are flushing now */
-    GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing");
-    priv->srcresult = GST_FLOW_WRONG_STATE;
+    GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
+    priv->srcresult = GST_FLOW_UNEXPECTED;
     gst_pad_pause_task (priv->srcpad);
-    async_jitter_queue_unlock (priv->queue);
+    gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
+    JBUF_UNLOCK (priv);
     return;
   }
 flushing:
   {
     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
-    gst_buffer_unref (outbuf);
-    async_jitter_queue_unlock (priv->queue);
+    if (outbuf)
+      gst_buffer_unref (outbuf);
+    JBUF_UNLOCK (priv);
     return;
   }
 pause:
@@ -1120,13 +1103,13 @@ pause:
 
     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
 
-    async_jitter_queue_lock (priv->queue);
+    JBUF_LOCK (priv);
     /* store result */
     priv->srcresult = result;
     /* we don't post errors or anything because upstream will do that for us
      * when we pass the return value upstream. */
     gst_pad_pause_task (priv->srcpad);
-    async_jitter_queue_unlock (priv->queue);
+    JBUF_UNLOCK (priv);
     return;
   }
 }
@@ -1194,11 +1177,7 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
       old_latency = jitterbuffer->priv->latency_ms;
 
       jitterbuffer->priv->latency_ms = new_latency;
-      if (jitterbuffer->priv->clock_rate != -1) {
-        async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue,
-            gst_util_uint64_scale_int (new_latency,
-                jitterbuffer->priv->clock_rate, 1000));
-      }
+
       /* post message if latency changed, this will infor the parent pipeline
        * that a latency reconfiguration is possible. */
       if (new_latency != old_latency) {
index 3e33cf6..bb47a29 100644 (file)
@@ -202,6 +202,13 @@ enum
 {
   SIGNAL_REQUEST_PT_MAP,
   SIGNAL_CLEAR_PT_MAP,
+
+  SIGNAL_ON_NEW_SSRC,
+  SIGNAL_ON_SSRC_COLLISION,
+  SIGNAL_ON_SSRC_VALIDATED,
+  SIGNAL_ON_BYE_SSRC,
+  SIGNAL_ON_BYE_TIMEOUT,
+  SIGNAL_ON_TIMEOUT,
   LAST_SIGNAL
 };
 
@@ -266,6 +273,48 @@ static void gst_rtp_session_clear_pt_map (GstRTPSession * rtpsession);
 
 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
 
+static void
+on_new_ssrc (RTPSession * session, RTPSource * src, GstRTPSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
+      src->ssrc);
+}
+
+static void
+on_ssrc_collision (RTPSession * session, RTPSource * src, GstRTPSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
+      src->ssrc);
+}
+
+static void
+on_ssrc_validated (RTPSession * session, RTPSource * src, GstRTPSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
+      src->ssrc);
+}
+
+static void
+on_bye_ssrc (RTPSession * session, RTPSource * src, GstRTPSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
+      src->ssrc);
+}
+
+static void
+on_bye_timeout (RTPSession * session, RTPSource * src, GstRTPSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
+      src->ssrc);
+}
+
+static void
+on_timeout (RTPSession * session, RTPSource * src, GstRTPSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
+      src->ssrc);
+}
+
 GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
 
 static void
@@ -332,6 +381,76 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTPSessionClass, clear_pt_map),
       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
 
+  /**
+   * GstRTPSession::on-new-ssrc:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify of a new SSRC that entered @session.
+   */
+  gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
+      g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_new_ssrc),
+      NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+  /**
+   * GstRTPSession::on-ssrc_collision:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify when we have an SSRC collision
+   */
+  gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
+      g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass,
+          on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
+      G_TYPE_NONE, 1, G_TYPE_UINT);
+  /**
+   * GstRTPSession::on-ssrc_validated:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify of a new SSRC that became validated.
+   */
+  gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
+      g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass,
+          on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
+      G_TYPE_NONE, 1, G_TYPE_UINT);
+
+  /**
+   * GstRTPSession::on-bye-ssrc:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify of an SSRC that became inactive because of a BYE packet.
+   */
+  gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
+      g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_bye_ssrc),
+      NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+  /**
+   * GstRTPSession::on-bye-timeout:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify of an SSRC that has timed out because of BYE
+   */
+  gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
+      g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_bye_timeout),
+      NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+  /**
+   * GstRTPSession::on-timeout:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify of an SSRC that has timed out
+   */
+  gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
+      g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_timeout),
+      NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
   gstelement_class->request_new_pad =
@@ -353,6 +472,19 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
   rtpsession->priv->session = rtp_session_new ();
   /* configure callbacks */
   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
+  /* configure signals */
+  g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
+      (GCallback) on_new_ssrc, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
+      (GCallback) on_ssrc_collision, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
+      (GCallback) on_ssrc_validated, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
+      (GCallback) on_bye_ssrc, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
+      (GCallback) on_bye_timeout, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-timeout",
+      (GCallback) on_timeout, rtpsession);
 }
 
 static void
index c58f23e..6c9fb77 100644 (file)
@@ -59,8 +59,14 @@ struct _GstRTPSessionClass {
 
   /* signals */
   GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt);
-
   void     (*clear_pt_map)   (GstRTPSession *sess);
+
+  void     (*on_new_ssrc)       (GstRTPSession *sess, guint32 ssrc);
+  void     (*on_ssrc_collision) (GstRTPSession *sess, guint32 ssrc);
+  void     (*on_ssrc_validated) (GstRTPSession *sess, guint32 ssrc);
+  void     (*on_bye_ssrc)       (GstRTPSession *sess, guint32 ssrc);
+  void     (*on_bye_timeout)    (GstRTPSession *sess, guint32 ssrc);
+  void     (*on_timeout)        (GstRTPSession *sess, guint32 ssrc);
 };
 
 GType gst_rtp_session_get_type (void);
diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c
new file mode 100644 (file)
index 0000000..f90811b
--- /dev/null
@@ -0,0 +1,237 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+#include <string.h>
+
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+
+#include "rtpjitterbuffer.h"
+
+GST_DEBUG_CATEGORY_STATIC (rtp_jitter_buffer_debug);
+#define GST_CAT_DEFAULT rtp_jitter_buffer_debug
+
+/* signals and args */
+enum
+{
+  LAST_SIGNAL
+};
+
+enum
+{
+  PROP_0
+};
+
+/* GObject vmethods */
+static void rtp_jitter_buffer_finalize (GObject * object);
+
+/* static guint rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 }; */
+
+G_DEFINE_TYPE (RTPJitterBuffer, rtp_jitter_buffer, G_TYPE_OBJECT);
+
+static void
+rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
+{
+  GObjectClass *gobject_class;
+
+  gobject_class = (GObjectClass *) klass;
+
+  gobject_class->finalize = rtp_jitter_buffer_finalize;
+
+  GST_DEBUG_CATEGORY_INIT (rtp_jitter_buffer_debug, "rtpjitterbuffer", 0,
+      "RTP Jitter Buffer");
+}
+
+static void
+rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
+{
+  jbuf->packets = g_queue_new ();
+}
+
+static void
+rtp_jitter_buffer_finalize (GObject * object)
+{
+  RTPJitterBuffer *jbuf;
+
+  jbuf = RTP_JITTER_BUFFER_CAST (object);
+
+  rtp_jitter_buffer_flush (jbuf);
+  g_queue_free (jbuf->packets);
+
+  G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object);
+}
+
+/**
+ * rtp_jitter_buffer_new:
+ *
+ * Create an #RTPJitterBuffer.
+ *
+ * Returns: a new #RTPJitterBuffer. Use g_object_unref() after usage.
+ */
+RTPJitterBuffer *
+rtp_jitter_buffer_new (void)
+{
+  RTPJitterBuffer *jbuf;
+
+  jbuf = g_object_new (RTP_TYPE_JITTER_BUFFER, NULL);
+
+  return jbuf;
+}
+
+static gint
+compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
+{
+  guint16 seq1, seq2;
+
+  seq1 = gst_rtp_buffer_get_seq (a);
+  seq2 = gst_rtp_buffer_get_seq (b);
+
+  /* check if diff more than half of the 16bit range */
+  if (abs (seq2 - seq1) > (1 << 15)) {
+    /* one of a/b has wrapped */
+    return seq1 - seq2;
+  } else {
+    return seq2 - seq1;
+  }
+}
+
+/**
+ * rtp_jitter_buffer_insert:
+ * @jbuf: an #RTPJitterBuffer
+ * @buf: a buffer
+ *
+ * Inserts @buf into the packet queue of @jbuf. The sequence number of the
+ * packet will be used to sort the packets. This function takes ownerhip of
+ * @buf when the function returns %TRUE.
+ *
+ * Returns: %FALSE if a packet with the same number already existed.
+ */
+gboolean
+rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf)
+{
+  GList *list;
+  gint func_ret = 1;
+
+  g_return_val_if_fail (jbuf != NULL, FALSE);
+  g_return_val_if_fail (buf != NULL, FALSE);
+
+  /* loop the list to skip strictly smaller seqnum buffers */
+  list = jbuf->packets->head;
+  while (list
+      && (func_ret =
+          compare_seqnum (GST_BUFFER_CAST (list->data), buf, jbuf)) < 0)
+    list = list->next;
+
+  /* we hit a packet with the same seqnum, return FALSE to notify a duplicate */
+  if (func_ret == 0)
+    return FALSE;
+
+  if (list)
+    g_queue_insert_before (jbuf->packets, list, buf);
+  else
+    g_queue_push_tail (jbuf->packets, buf);
+
+  return TRUE;
+}
+
+/**
+ * rtp_jitter_buffer_pop:
+ * @jbuf: an #RTPJitterBuffer
+ *
+ * Pops the oldest buffer from the packet queue of @jbuf.
+ *
+ * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
+ */
+GstBuffer *
+rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
+{
+  GstBuffer *buf;
+
+  g_return_val_if_fail (jbuf != NULL, FALSE);
+
+  buf = g_queue_pop_tail (jbuf->packets);
+
+  return buf;
+}
+
+/**
+ * rtp_jitter_buffer_flush:
+ * @jbuf: an #RTPJitterBuffer
+ *
+ * Flush all packets from the jitterbuffer.
+ */
+void
+rtp_jitter_buffer_flush (RTPJitterBuffer * jbuf)
+{
+  GstBuffer *buffer;
+
+  g_return_if_fail (jbuf != NULL);
+
+  while ((buffer = g_queue_pop_head (jbuf->packets)))
+    gst_buffer_unref (buffer);
+}
+
+/**
+ * rtp_jitter_buffer_num_packets:
+ * @jbuf: an #RTPJitterBuffer
+ *
+ * Get the number of packets currently in "jbuf.
+ *
+ * Returns: The number of packets in @jbuf.
+ */
+guint
+rtp_jitter_buffer_num_packets (RTPJitterBuffer * jbuf)
+{
+  g_return_val_if_fail (jbuf != NULL, 0);
+
+  return jbuf->packets->length;
+}
+
+/**
+ * rtp_jitter_buffer_get_ts_diff:
+ * @jbuf: an #RTPJitterBuffer
+ *
+ * Get the difference between the timestamps of first and last packet in the
+ * jitterbuffer.
+ *
+ * Returns: The difference expressed in the timestamp units of the packets.
+ */
+guint32
+rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
+{
+  guint32 high_ts, low_ts;
+  GstBuffer *high_buf, *low_buf;
+
+  g_return_val_if_fail (jbuf != NULL, 0);
+
+  high_buf = g_queue_peek_head (jbuf->packets);
+  low_buf = g_queue_peek_tail (jbuf->packets);
+
+  if (!high_buf || !low_buf || high_buf == low_buf)
+    return 0;
+
+  high_ts = gst_rtp_buffer_get_timestamp (high_buf);
+  low_ts = gst_rtp_buffer_get_timestamp (low_buf);
+
+  /* it needs to work if ts wraps */
+  if (high_ts >= low_ts) {
+    return high_ts - low_ts;
+  } else {
+    return high_ts + G_MAXUINT32 + 1 - low_ts;
+  }
+}
diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h
new file mode 100644 (file)
index 0000000..14b5b3f
--- /dev/null
@@ -0,0 +1,67 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_JITTER_BUFFER_H__
+#define __RTP_JITTER_BUFFER_H__
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+typedef struct _RTPJitterBuffer RTPJitterBuffer;
+typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
+
+#define RTP_TYPE_JITTER_BUFFER             (rtp_jitter_buffer_get_type())
+#define RTP_JITTER_BUFFER(src)             (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer))
+#define RTP_JITTER_BUFFER_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_JITTER_BUFFER,RTPJitterBufferClass))
+#define RTP_IS_JITTER_BUFFER(src)          (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_JITTER_BUFFER))
+#define RTP_IS_JITTER_BUFFER_CLASS(klass)  (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_JITTER_BUFFER))
+#define RTP_JITTER_BUFFER_CAST(src)        ((RTPJitterBuffer *)(src))
+
+/**
+ * RTPJitterBuffer:
+ *
+ * A JitterBuffer in the #RTPSession
+ */
+struct _RTPJitterBuffer {
+  GObject       object;
+
+  GQueue       *packets;
+};
+
+struct _RTPJitterBufferClass {
+  GObjectClass   parent_class;
+};
+
+GType rtp_jitter_buffer_get_type (void);
+
+/* managing lifetime */
+RTPJitterBuffer*      rtp_jitter_buffer_new            (void);
+
+gboolean              rtp_jitter_buffer_insert         (RTPJitterBuffer *jbuf, GstBuffer *buf);
+GstBuffer *           rtp_jitter_buffer_pop            (RTPJitterBuffer *jbuf);
+
+void                  rtp_jitter_buffer_flush          (RTPJitterBuffer *jbuf);
+
+guint                 rtp_jitter_buffer_num_packets    (RTPJitterBuffer *jbuf);
+guint32               rtp_jitter_buffer_get_ts_diff    (RTPJitterBuffer *jbuf);
+
+
+
+#endif /* __RTP_JITTER_BUFFER_H__ */
index 7490791..2b3bcb8 100644 (file)
@@ -271,7 +271,6 @@ on_ssrc_validated (RTPSession * sess, RTPSource * source)
 static void
 on_bye_ssrc (RTPSession * sess, RTPSource * source)
 {
-  /* notify app that reconsideration should be performed */
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
 }
 
@@ -1724,7 +1723,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
       on_bye_timeout (sess, source);
     else
       on_timeout (sess, source);
-
   }
   return remove;
 }