Add RTP session management elements. Still in progress.
authorWim Taymans <wim.taymans@gmail.com>
Tue, 3 Apr 2007 09:13:17 +0000 (09:13 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:23 +0000 (02:30 +0100)
Original commit message from CVS:
* configure.ac:
* gst/rtpmanager/Makefile.am:
* gst/rtpmanager/async_jitter_queue.c: (async_jitter_queue_new),
(signal_waiting_threads), (async_jitter_queue_ref),
(async_jitter_queue_ref_unlocked),
(async_jitter_queue_set_low_threshold),
(async_jitter_queue_set_high_threshold),
(async_jitter_queue_set_max_queue_length),
(async_jitter_queue_get_g_queue), (calculate_ts_diff),
(async_jitter_queue_length_ts_units_unlocked),
(async_jitter_queue_unref_and_unlock), (async_jitter_queue_unref),
(async_jitter_queue_lock), (async_jitter_queue_unlock),
(async_jitter_queue_push), (async_jitter_queue_push_unlocked),
(async_jitter_queue_push_sorted),
(async_jitter_queue_push_sorted_unlocked),
(async_jitter_queue_insert_after_unlocked),
(async_jitter_queue_pop_intern_unlocked), (async_jitter_queue_pop),
(async_jitter_queue_pop_unlocked), (async_jitter_queue_length),
(async_jitter_queue_length_unlocked),
(async_jitter_queue_set_flushing_unlocked),
(async_jitter_queue_unset_flushing_unlocked),
(async_jitter_queue_set_blocking_unlocked):
* gst/rtpmanager/async_jitter_queue.h:
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_base_init),
(gst_rtp_bin_class_init), (gst_rtp_bin_init),
(gst_rtp_bin_finalize), (gst_rtp_bin_set_property),
(gst_rtp_bin_get_property), (gst_rtp_bin_change_state),
(gst_rtp_bin_request_new_pad), (gst_rtp_bin_release_pad):
* gst/rtpmanager/gstrtpbin.h:
* gst/rtpmanager/gstrtpclient.c: (new_pad), (create_stream),
(free_stream), (find_stream_by_ssrc), (gst_rtp_client_base_init),
(gst_rtp_client_class_init), (gst_rtp_client_init),
(gst_rtp_client_finalize), (gst_rtp_client_set_property),
(gst_rtp_client_get_property), (gst_rtp_client_change_state),
(gst_rtp_client_request_new_pad), (gst_rtp_client_release_pad):
* gst/rtpmanager/gstrtpclient.h:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_base_init),
(gst_rtp_jitter_buffer_class_init), (gst_rtp_jitter_buffer_init),
(gst_rtp_jitter_buffer_dispose), (gst_rtp_jitter_buffer_getcaps),
(gst_jitter_buffer_sink_setcaps), (free_func),
(gst_rtp_jitter_buffer_flush_start),
(gst_rtp_jitter_buffer_flush_stop),
(gst_rtp_jitter_buffer_src_activate_push),
(gst_rtp_jitter_buffer_change_state), (priv_compare_rtp_seq_lt),
(compare_rtp_buffers_seq_num), (gst_rtp_jitter_buffer_sink_event),
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop),
(gst_rtp_jitter_buffer_query),
(gst_rtp_jitter_buffer_set_property),
(gst_rtp_jitter_buffer_get_property):
* gst/rtpmanager/gstrtpjitterbuffer.h:
* gst/rtpmanager/gstrtpmanager.c: (plugin_init):
* gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_base_init),
(gst_rtp_pt_demux_class_init), (gst_rtp_pt_demux_init),
(gst_rtp_pt_demux_finalize), (gst_rtp_pt_demux_chain),
(gst_rtp_pt_demux_getcaps), (find_pad_for_pt),
(gst_rtp_pt_demux_setup), (gst_rtp_pt_demux_release),
(gst_rtp_pt_demux_change_state):
* gst/rtpmanager/gstrtpptdemux.h:
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
(gst_rtp_session_class_init), (gst_rtp_session_init),
(gst_rtp_session_finalize), (gst_rtp_session_set_property),
(gst_rtp_session_get_property), (gst_rtp_session_change_state),
(gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink),
(create_recv_rtcp_sink), (create_send_rtp_sink), (create_rtcp_src),
(gst_rtp_session_request_new_pad), (gst_rtp_session_release_pad):
* gst/rtpmanager/gstrtpsession.h:
Add RTP session management elements. Still in progress.

14 files changed:
gst/rtpmanager/Makefile.am [new file with mode: 0644]
gst/rtpmanager/async_jitter_queue.c [new file with mode: 0644]
gst/rtpmanager/async_jitter_queue.h [new file with mode: 0644]
gst/rtpmanager/gstrtpbin.c [new file with mode: 0644]
gst/rtpmanager/gstrtpbin.h [new file with mode: 0644]
gst/rtpmanager/gstrtpclient.c [new file with mode: 0644]
gst/rtpmanager/gstrtpclient.h [new file with mode: 0644]
gst/rtpmanager/gstrtpjitterbuffer.c [new file with mode: 0644]
gst/rtpmanager/gstrtpjitterbuffer.h [new file with mode: 0644]
gst/rtpmanager/gstrtpmanager.c [new file with mode: 0644]
gst/rtpmanager/gstrtpptdemux.c [new file with mode: 0644]
gst/rtpmanager/gstrtpptdemux.h [new file with mode: 0644]
gst/rtpmanager/gstrtpsession.c [new file with mode: 0644]
gst/rtpmanager/gstrtpsession.h [new file with mode: 0644]

diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am
new file mode 100644 (file)
index 0000000..561bf52
--- /dev/null
@@ -0,0 +1,23 @@
+
+plugin_LTLIBRARIES = libgstrtpmanager.la
+
+libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
+                             gstrtpbin.c \
+                             gstrtpclient.c \
+                             async_jitter_queue.c \
+                             gstrtpjitterbuffer.c \
+                             gstrtpptdemux.c \
+                             gstrtpsession.c
+
+noinst_HEADERS = gstrtpbin.h \
+                gstrtpclient.h \
+                 async_jitter_queue.h \
+                gstrtpjitterbuffer.h \
+                gstrtpsession.h
+
+libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS)
+libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS)
+libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@
+
+EXTRA_DIST = 
+
diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c
new file mode 100644 (file)
index 0000000..22a8ed0
--- /dev/null
@@ -0,0 +1,679 @@
+/* 
+ * Async Jitter Queue based on g_async_queue
+ * This code is GST RTP smart and deals with timestamps
+ *
+ * Farsight Voice+Video library
+ *  Copyright 2007 Collabora Ltd, 
+ *  Copyright 2007 Nokia Corporation
+ *   @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
+ *
+ *   This is an async queue that has a buffering mecanism based on the set low
+ *   and high threshold. When the lower threshold is reached, the queue will
+ *   fill itself up until the higher threshold is reached before allowing any
+ *   pops to occur. This allows a jitterbuffer of at least min threshold items
+ *   to be available.
+ */
+
+/* GLIB - Library of useful routines for C programming
+ * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
+ * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * MT safe
+ */
+
+#include "config.h"
+
+#include "async_jitter_queue.h"
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#define DEFAULT_LOW_THRESHOLD 0.1
+#define DEFAULT_HIGH_THRESHOLD 0.9
+
+struct _AsyncJitterQueue
+{
+  GMutex *mutex;
+  GCond *cond;
+  GQueue *queue;
+  guint waiting_threads;
+  gint32 ref_count;
+  gfloat low_threshold;
+  gfloat high_threshold;
+  guint32 max_queue_length;
+  gboolean buffering;
+  gboolean pop_flushing;
+  gboolean pop_blocking;
+  guint pops_remaining;
+  guint32 tail_buffer_duration;
+};
+
+/**
+ * async_jitter_queue_new:
+ * 
+ * Creates a new asynchronous queue with the initial reference count of 1.
+ * 
+ * Return value: the new #AsyncJitterQueue.
+ **/
+AsyncJitterQueue *
+async_jitter_queue_new (void)
+{
+  AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
+
+  retval->mutex = g_mutex_new ();
+  retval->cond = g_cond_new ();
+  retval->queue = g_queue_new ();
+  retval->waiting_threads = 0;
+  retval->ref_count = 1;
+  retval->low_threshold = DEFAULT_LOW_THRESHOLD;
+  retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
+  retval->buffering = TRUE;     /* we need to buffer initially */
+  retval->pop_flushing = TRUE;
+  retval->pop_blocking = TRUE;
+  retval->pops_remaining = 0;
+  retval->tail_buffer_duration = 0;
+  return retval;
+}
+
+/* checks buffering state and wakes up waiting pops */
+void
+signal_waiting_threads (AsyncJitterQueue * queue)
+{
+  if (async_jitter_queue_length_ts_units_unlocked (queue) >=
+      queue->high_threshold * queue->max_queue_length) {
+    queue->buffering = FALSE;
+  }
+
+  if (queue->waiting_threads > 0) {
+    if (!queue->buffering) {
+      g_cond_signal (queue->cond);
+    }
+  }
+}
+
+/**
+ * async_jitter_queue_ref:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Increases the reference count of the asynchronous @queue by 1. You
+ * do not need to hold the lock to call this function.
+ *
+ * Returns: the @queue that was passed in (since 2.6)
+ **/
+AsyncJitterQueue *
+async_jitter_queue_ref (AsyncJitterQueue * queue)
+{
+  g_return_val_if_fail (queue, NULL);
+  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+  g_atomic_int_inc (&queue->ref_count);
+
+  return queue;
+}
+
+/**
+ * async_jitter_queue_ref_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Increases the reference count of the asynchronous @queue by 1.
+ **/
+void
+async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  g_atomic_int_inc (&queue->ref_count);
+}
+
+/**
+ * async_jitter_queue_set_low_threshold:
+ * @queue: a #AsyncJitterQueue.
+ * @threshold: the lower threshold (fraction of max size)
+ * 
+ * Sets the low threshold on the queue. This threshold indicates the minimum
+ * number of items allowed in the queue before we refill it up to the set
+ * maximum threshold.
+ **/
+void
+async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
+    gfloat threshold)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  queue->low_threshold = threshold;
+}
+
+/**
+ * async_jitter_queue_set_max_threshold:
+ * @queue: a #AsyncJitterQueue.
+ * @threshold: the higher threshold (fraction of max size)
+ * 
+ * Sets the high threshold on the queue. This threshold indicates the amount of
+ * items to fill in the queue before releasing any blocking pop calls. This
+ * blocking mecanism is only triggered when we reach the low threshold and must
+ * refill the queue.
+ **/
+void
+async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
+    gfloat threshold)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  queue->high_threshold = threshold;
+}
+
+/* set the maximum queue length in RTP timestamp units */
+void
+async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
+    guint32 max_length)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  queue->max_queue_length = max_length;
+}
+
+GQueue *
+async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
+{
+  g_return_val_if_fail (queue, NULL);
+
+  return queue->queue;
+}
+
+static guint32
+calculate_ts_diff (guint32 high_ts, guint32 low_ts)
+{
+  /* it needs to work if ts wraps */
+  if (high_ts >= low_ts) {
+    return high_ts - low_ts;
+  } else {
+    return high_ts + G_MAXUINT32 + 1 - low_ts;
+  }
+}
+
+/* this function returns the length of the queue in timestamp units. It will
+ * also add the duration of the last buffer in the queue */
+/* FIXME This function wrongly assumes that there are no missing packets inside
+ * the buffer, in reality it needs to check for gaps and subsctract those from
+ * the total */
+guint32
+async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
+{
+  guint32 tail_ts;
+  guint32 head_ts;
+  guint32 ret;
+  GstBuffer *head;
+  GstBuffer *tail;
+
+  g_return_val_if_fail (queue, 0);
+
+  if (queue->queue->length < 2) {
+    return 0;
+  }
+
+  tail = g_queue_peek_tail (queue->queue);
+  head = g_queue_peek_head (queue->queue);
+
+  if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
+    return 0;
+
+  tail_ts = gst_rtp_buffer_get_timestamp (tail);
+  head_ts = gst_rtp_buffer_get_timestamp (head);
+
+  ret = calculate_ts_diff (head_ts, tail_ts);
+
+  /* let's add the duration of the tail buffer */
+  ret += queue->tail_buffer_duration;
+
+  return ret;
+}
+
+/**
+ * async_jitter_queue_unref_and_unlock:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Decreases the reference count of the asynchronous @queue by 1 and
+ * releases the lock. This function must be called while holding the
+ * @queue's lock. If the reference count went to 0, the @queue will be
+ * destroyed and the memory allocated will be freed.
+ **/
+void
+async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  g_mutex_unlock (queue->mutex);
+  async_jitter_queue_unref (queue);
+}
+
+/**
+ * async_jitter_queue_unref:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Decreases the reference count of the asynchronous @queue by 1. If
+ * the reference count went to 0, the @queue will be destroyed and the
+ * memory allocated will be freed. So you are not allowed to use the
+ * @queue afterwards, as it might have disappeared. You do not need to
+ * hold the lock to call this function.
+ **/
+void
+async_jitter_queue_unref (AsyncJitterQueue * queue)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  if (g_atomic_int_dec_and_test (&queue->ref_count)) {
+    g_return_if_fail (queue->waiting_threads == 0);
+    g_mutex_free (queue->mutex);
+    if (queue->cond)
+      g_cond_free (queue->cond);
+    g_queue_free (queue->queue);
+    g_free (queue);
+  }
+}
+
+/**
+ * async_jitter_queue_lock:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Acquires the @queue's lock. After that you can only call the
+ * <function>async_jitter_queue_*_unlocked()</function> function variants on that
+ * @queue. Otherwise it will deadlock.
+ **/
+void
+async_jitter_queue_lock (AsyncJitterQueue * queue)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  g_mutex_lock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_unlock:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Releases the queue's lock.
+ **/
+void
+async_jitter_queue_unlock (AsyncJitterQueue * queue)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_push:
+ * @queue: a #AsyncJitterQueue.
+ * @data: @data to push into the @queue.
+ *
+ * Pushes the @data into the @queue. @data must not be %NULL.
+ **/
+void
+async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+  g_return_if_fail (data);
+
+  g_mutex_lock (queue->mutex);
+  async_jitter_queue_push_unlocked (queue, data);
+  g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_push_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @data: @data to push into the @queue.
+ * 
+ * Pushes the @data into the @queue. @data must not be %NULL. This
+ * function must be called while holding the @queue's lock.
+ **/
+void
+async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+  g_return_if_fail (data);
+
+  g_queue_push_head (queue->queue, data);
+
+  signal_waiting_threads (queue);
+}
+
+/**
+ * async_jitter_queue_push_sorted:
+ * @queue: a #AsyncJitterQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ *     is passed two elements of the @queue. The function should return
+ *     0 if they are equal, a negative value if the first element
+ *     should be higher in the @queue or a positive value if the first
+ *     element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ * 
+ * Inserts @data into @queue using @func to determine the new
+ * position. 
+ * 
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ * 
+ * This function will lock @queue before it sorts the queue and unlock
+ * it when it is finished.
+ * 
+ * For an example of @func see async_jitter_queue_sort(). 
+ *
+ * Since: 2.10
+ **/
+gboolean
+async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
+    gpointer data, GCompareDataFunc func, gpointer user_data)
+{
+  g_return_val_if_fail (queue != NULL, FALSE);
+  gboolean ret;
+
+  g_mutex_lock (queue->mutex);
+  ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
+  g_mutex_unlock (queue->mutex);
+
+  return ret;
+}
+
+/**
+ * async_jitter_queue_push_sorted_unlocked:
+ * @queue: a #AsyncJitterQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ *     is passed two elements of the @queue. The function should return
+ *     0 if they are equal, a negative value if the first element
+ *     should be higher in the @queue or a positive value if the first
+ *     element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ * 
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ * 
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * If @GCompareDataFunc returns 0, this function does not insert @data and
+ * return FALSE.
+ * 
+ * This function is called while holding the @queue's lock.
+ * 
+ * For an example of @func see async_jitter_queue_sort(). 
+ *
+ * Since: 2.10
+ **/
+gboolean
+async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
+    gpointer data, GCompareDataFunc func, gpointer user_data)
+{
+  GList *list;
+  gint func_ret = TRUE;
+
+  g_return_val_if_fail (queue != NULL, FALSE);
+
+  list = queue->queue->head;
+  while (list && (func_ret = func (list->data, data, user_data)) < 0)
+    list = list->next;
+
+  if (func_ret == 0) {
+    return FALSE;
+  }
+  if (list) {
+    g_queue_insert_before (queue->queue, list, data);
+  } else {
+    g_queue_push_tail (queue->queue, data);
+  }
+
+  signal_waiting_threads (queue);
+  return TRUE;
+}
+
+void
+async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
+    GList * sibling, gpointer data)
+{
+  g_return_if_fail (queue != NULL);
+
+  g_queue_insert_before (queue->queue, sibling, data);
+
+  signal_waiting_threads (queue);
+}
+
+static gpointer
+async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
+{
+  gpointer retval;
+  GstBuffer *tail_buffer = NULL;
+
+  if (queue->pop_flushing)
+    return NULL;
+
+  while (queue->pop_blocking) {
+    queue->waiting_threads++;
+    g_cond_wait (queue->cond, queue->mutex);
+    queue->waiting_threads--;
+    if (queue->pop_flushing)
+      return NULL;
+  }
+
+  if (async_jitter_queue_length_ts_units_unlocked (queue) <=
+      queue->low_threshold * queue->max_queue_length
+      && queue->pops_remaining == 0) {
+    if (!queue->buffering) {
+      queue->buffering = TRUE;
+      queue->pops_remaining = queue->queue->length;
+    } else {
+      while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
+        queue->waiting_threads++;
+        g_cond_wait (queue->cond, queue->mutex);
+        queue->waiting_threads--;
+        if (queue->pop_flushing)
+          return NULL;
+      }
+    }
+  }
+
+  retval = g_queue_pop_tail (queue->queue);
+  if (queue->pops_remaining)
+    queue->pops_remaining--;
+
+  tail_buffer = g_queue_peek_tail (queue->queue);
+  if (tail_buffer) {
+    if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
+      queue->tail_buffer_duration = 0;
+    } else if (gst_rtp_buffer_get_seq (tail_buffer)
+        - gst_rtp_buffer_get_seq (retval) == 1) {
+      queue->tail_buffer_duration =
+          calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
+          gst_rtp_buffer_get_timestamp (retval));
+    } else {
+      /* There is a sequence number gap -> we can't calculate the duration
+       * let's just set it to 0 */
+      queue->tail_buffer_duration = 0;
+    }
+  }
+
+  g_assert (retval);
+
+  return retval;
+}
+
+/**
+ * async_jitter_queue_pop:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Pops data from the @queue. This function blocks until data become
+ * available. If pop is disabled, tis function return NULL.
+ *
+ * Return value: data from the queue.
+ **/
+gpointer
+async_jitter_queue_pop (AsyncJitterQueue * queue)
+{
+  gpointer retval;
+
+  g_return_val_if_fail (queue, NULL);
+  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+  g_mutex_lock (queue->mutex);
+  retval = async_jitter_queue_pop_intern_unlocked (queue);
+  g_mutex_unlock (queue->mutex);
+
+  return retval;
+}
+
+/**
+ * async_jitter_queue_pop_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Pops data from the @queue. This function blocks until data become
+ * available. This function must be called while holding the @queue's
+ * lock.
+ *
+ * Return value: data from the queue.
+ **/
+gpointer
+async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
+{
+  g_return_val_if_fail (queue, NULL);
+  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+  return async_jitter_queue_pop_intern_unlocked (queue);
+}
+
+/**
+ * async_jitter_queue_length:
+ * @queue: a #AsyncJitterQueue.
+ * 
+ * Returns the length of the queue
+ * Return value: the length of the @queue.
+ **/
+gint
+async_jitter_queue_length (AsyncJitterQueue * queue)
+{
+  gint retval;
+
+  g_return_val_if_fail (queue, 0);
+  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
+
+  g_mutex_lock (queue->mutex);
+  retval = queue->queue->length;
+  g_mutex_unlock (queue->mutex);
+
+  return retval;
+}
+
+/**
+ * async_jitter_queue_length_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Returns the length of the queue.
+ *
+ * Return value: the length of the @queue.
+ **/
+gint
+async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
+{
+  g_return_val_if_fail (queue, 0);
+  g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
+
+  return queue->queue->length;
+}
+
+/**
+ * async_jitter_queue_set_flushing_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @free_func: a function to call to free the elements
+ * @user_data: user data passed to @free_func
+ * 
+ * This function is used to set/unset flushing. If flushing is set any
+ * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
+ * return NULL. Flushing is set by default.
+ */
+void
+async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
+    GFunc free_func, gpointer user_data)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  queue->pop_flushing = TRUE;
+  /* let's unblock any remaining pops */
+  if (queue->waiting_threads > 0)
+    g_cond_broadcast (queue->cond);
+  /* free data from queue */
+  g_queue_foreach (queue->queue, free_func, user_data);
+}
+
+/**
+ * async_jitter_queue_unset_flushing_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @free_func: a function to call to free the elements
+ * @user_data: user data passed to @free_func
+ * 
+ * This function is used to set/unset flushing. If flushing is set any
+ * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
+ * return NULL. Flushing is set by default.
+ */
+void
+async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  queue->pop_flushing = FALSE;
+  /* let's unblock any remaining pops */
+  if (queue->waiting_threads > 0)
+    g_cond_broadcast (queue->cond);
+}
+
+/**
+ * async_jitter_queue_set_blocking_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @enabled: a boolean to enable/disable blocking
+ * 
+ * This function is used to enable/disable blocking. If blocking is enabled any
+ * pops will be blocked until the queue is unblocked. The queue is blocked by
+ * default.
+ */
+void
+async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
+    gboolean blocking)
+{
+  g_return_if_fail (queue);
+  g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+  queue->pop_blocking = blocking;
+  /* let's unblock any remaining pops */
+  if (queue->waiting_threads > 0)
+    g_cond_broadcast (queue->cond);
+}
diff --git a/gst/rtpmanager/async_jitter_queue.h b/gst/rtpmanager/async_jitter_queue.h
new file mode 100644 (file)
index 0000000..bea76d6
--- /dev/null
@@ -0,0 +1,130 @@
+/* Async Jitter Queue based on g_async_queue
+ *
+ * Farsight Voice+Video library
+ *  Copyright 2007 Collabora Ltd, 
+ *  Copyright 2007 Nokia Corporation
+ *   @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
+ */
+
+/* GLIB - Library of useful routines for C programming
+ * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.         See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
+ * file for a list of people on the GLib Team.  See the ChangeLog
+ * files for a list of changes.  These files are distributed with
+ * GLib at ftp://ftp.gtk.org/pub/gtk/. 
+ */
+
+#ifndef __ASYNCJITTERQUEUE_H__
+#define __ASYNCJITTERQUEUE_H__
+
+#include <glib.h>
+#include <glib/gthread.h>
+
+G_BEGIN_DECLS
+
+typedef struct _AsyncJitterQueue     AsyncJitterQueue;
+
+/* Asyncronous Queues, can be used to communicate between threads
+ */
+
+/* Get a new AsyncJitterQueue with the ref_count 1 */
+AsyncJitterQueue*  async_jitter_queue_new                (void);
+
+/* Lock and unlock a AsyncJitterQueue. All functions lock the queue for
+ * themselves, but in certain cirumstances you want to hold the lock longer,
+ * thus you lock the queue, call the *_unlocked functions and unlock it again.
+ */
+void          async_jitter_queue_lock               (AsyncJitterQueue *queue);
+void          async_jitter_queue_unlock             (AsyncJitterQueue *queue);
+
+/* Ref and unref the AsyncJitterQueue. */
+AsyncJitterQueue*  async_jitter_queue_ref           (AsyncJitterQueue *queue);
+void          async_jitter_queue_unref              (AsyncJitterQueue *queue);
+#ifndef G_DISABLE_DEPRECATED
+/* You don't have to hold the lock for calling *_ref and *_unref anymore. */
+void          async_jitter_queue_ref_unlocked       (AsyncJitterQueue *queue);
+void          async_jitter_queue_unref_and_unlock   (AsyncJitterQueue *queue);
+#endif /* !G_DISABLE_DEPRECATED */
+
+void          async_jitter_queue_set_low_threshold  (AsyncJitterQueue *queue,
+                                                gfloat threshold);
+void          async_jitter_queue_set_high_threshold (AsyncJitterQueue *queue,
+                                                gfloat threshold);
+
+void          async_jitter_queue_set_max_queue_length (AsyncJitterQueue *queue,
+                                                guint32 max_length);
+
+/* Push data into the async queue. Must not be NULL. */
+void          async_jitter_queue_push               (AsyncJitterQueue *queue,
+                                                gpointer     data);
+void          async_jitter_queue_push_unlocked      (AsyncJitterQueue *queue,
+                                                gpointer     data);
+gboolean      async_jitter_queue_push_sorted        (AsyncJitterQueue *queue,
+                                                gpointer          data,
+                                                GCompareDataFunc  func,
+                                                gpointer          user_data);
+
+void          async_jitter_queue_insert_after_unlocked(AsyncJitterQueue *queue,
+                                                GList *sibling,
+                                                gpointer data);
+
+gboolean      async_jitter_queue_push_sorted_unlocked(AsyncJitterQueue *queue,
+                                                gpointer          data,
+                                                GCompareDataFunc  func,
+                                                gpointer          user_data);
+
+/* Pop data from the async queue. When no data is there, the thread is blocked
+ * until data arrives. */
+gpointer      async_jitter_queue_pop                (AsyncJitterQueue *queue);
+gpointer      async_jitter_queue_pop_unlocked       (AsyncJitterQueue *queue);
+
+/* Try to pop data. NULL is returned in case of empty queue. */
+gpointer      async_jitter_queue_try_pop            (AsyncJitterQueue *queue);
+gpointer      async_jitter_queue_try_pop_unlocked   (AsyncJitterQueue *queue);
+
+/* Wait for data until at maximum until end_time is reached. NULL is returned
+ * in case of empty queue. */
+gpointer      async_jitter_queue_timed_pop          (AsyncJitterQueue *queue,
+                                                GTimeVal    *end_time);
+gpointer      async_jitter_queue_timed_pop_unlocked (AsyncJitterQueue *queue,
+                                                GTimeVal    *end_time);
+
+/* Return the length of the queue. Negative values mean that threads
+ * are waiting, positve values mean that there are entries in the
+ * queue. Actually this function returns the length of the queue minus
+ * the number of waiting threads, async_jitter_queue_length == 0 could also
+ * mean 'n' entries in the queue and 'n' thread waiting. Such can
+ * happen due to locking of the queue or due to scheduling. */
+gint          async_jitter_queue_length             (AsyncJitterQueue *queue);
+gint          async_jitter_queue_length_unlocked    (AsyncJitterQueue *queue);
+
+void          async_jitter_queue_set_flushing_unlocked   (AsyncJitterQueue* queue, 
+                                                          GFunc free_func, gpointer user_data);
+void          async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue* queue);
+void          async_jitter_queue_set_blocking_unlocked   (AsyncJitterQueue* queue,
+                                                          gboolean blocking);
+guint32
+async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue *queue);
+
+G_END_DECLS
+
+#endif /* __ASYNCJITTERQUEUE_H__ */
+
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
new file mode 100644 (file)
index 0000000..629f098
--- /dev/null
@@ -0,0 +1,279 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-rtpbin
+ * @short_description: handle media from one RTP bin
+ * @see_also: rtpjitterbuffer, rtpclient, rtpsession
+ *
+ * <refsect2>
+ * <para>
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
+ * </programlisting>
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-04-02 (0.10.6)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <string.h>
+
+#include "gstrtpbin.h"
+
+/* elementfactory information */
+static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
+    "Filter/Editor/Video",
+    "Implement an RTP bin",
+    "Wim Taymans <wim@fluendo.com>");
+
+/* sink pads */
+static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
+static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+/* src pads */
+static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
+    GST_PAD_SRC,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
+static GstStaticPadTemplate rtpbin_send_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+#define GST_RTP_BIN_GET_PRIVATE(obj)  \
+   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRTPBinPrivate))
+
+struct _GstRTPBinPrivate
+{
+};
+
+/* signals and args */
+enum
+{
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+enum
+{
+  PROP_0
+};
+
+/* GObject vmethods */
+static void gst_rtp_bin_finalize (GObject * object);
+static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+/* GstElement vmethods */
+static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
+    GstStateChange transition);
+static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
+
+/*static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 }; */
+
+GST_BOILERPLATE (GstRTPBin, gst_rtp_bin, GstBin, GST_TYPE_BIN);
+
+static void
+gst_rtp_bin_base_init (gpointer klass)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+  /* sink pads */
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
+
+  /* src pads */
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
+
+  gst_element_class_set_details (element_class, &rtpbin_details);
+}
+
+static void
+gst_rtp_bin_class_init (GstRTPBinClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  g_type_class_add_private (klass, sizeof (GstRTPBinPrivate));
+
+  gobject_class->finalize = gst_rtp_bin_finalize;
+  gobject_class->set_property = gst_rtp_bin_set_property;
+  gobject_class->get_property = gst_rtp_bin_get_property;
+
+  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
+  gstelement_class->request_new_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
+  gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
+}
+
+static void
+gst_rtp_bin_init (GstRTPBin * rtpbin, GstRTPBinClass * klass)
+{
+  rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
+}
+
+static void
+gst_rtp_bin_finalize (GObject * object)
+{
+  GstRTPBin *rtpbin;
+
+  rtpbin = GST_RTP_BIN (object);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_bin_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstRTPBin *rtpbin;
+
+  rtpbin = GST_RTP_BIN (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_rtp_bin_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstRTPBin *rtpbin;
+
+  rtpbin = GST_RTP_BIN (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static GstStateChangeReturn
+gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
+{
+  GstStateChangeReturn res;
+  GstRTPBin *rtpbin;
+
+  rtpbin = GST_RTP_BIN (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      break;
+    default:
+      break;
+  }
+
+  res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      break;
+    default:
+      break;
+  }
+  return res;
+}
+
+/* 
+ */
+static GstPad *
+gst_rtp_bin_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name)
+{
+  GstRTPBin *rtpbin;
+  GstElementClass *klass;
+
+  g_return_val_if_fail (templ != NULL, NULL);
+  g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
+
+  rtpbin = GST_RTP_BIN (element);
+  klass = GST_ELEMENT_GET_CLASS (element);
+
+  return NULL;
+}
+
+static void
+gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
+{
+}
diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h
new file mode 100644 (file)
index 0000000..5b3d795
--- /dev/null
@@ -0,0 +1,56 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_BIN_H__
+#define __GST_RTP_BIN_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_BIN \
+  (gst_rtp_bin_get_type())
+#define GST_RTP_BIN(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_BIN,GstRTPBin))
+#define GST_RTP_BIN_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_BIN,GstRTPBinClass))
+#define GST_IS_RTP_BIN(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_BIN))
+#define GST_IS_RTP_BIN_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_BIN))
+
+typedef struct _GstRTPBin GstRTPBin;
+typedef struct _GstRTPBinClass GstRTPBinClass;
+typedef struct _GstRTPBinPrivate GstRTPBinPrivate;
+
+struct _GstRTPBin {
+  GstBin         element;
+
+  /* a list of streams from a client */
+  GList         *streams;
+
+  /*< private >*/
+  GstRTPBinPrivate *priv;
+};
+
+struct _GstRTPBinClass {
+  GstBinClass  parent_class;
+};
+
+GType gst_rtp_bin_get_type (void);
+
+#endif /* __GST_RTP_BIN_H__ */
diff --git a/gst/rtpmanager/gstrtpclient.c b/gst/rtpmanager/gstrtpclient.c
new file mode 100644 (file)
index 0000000..2984d3f
--- /dev/null
@@ -0,0 +1,482 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-rtpclient
+ * @short_description: handle media from one RTP client
+ * @see_also: rtpjitterbuffer, rtpbin, rtpsession
+ *
+ * <refsect2>
+ * <para>
+ * This element handles RTP data from one client. It accepts multiple RTP streams that
+ * should be synchronized together.
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
+ * </programlisting>
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-04-02 (0.10.6)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <string.h>
+
+#include "gstrtpclient.h"
+
+/* elementfactory information */
+static const GstElementDetails rtpclient_details =
+GST_ELEMENT_DETAILS ("RTP Client",
+    "Filter/Editor/Video",
+    "Implement an RTP client",
+    "Wim Taymans <wim@fluendo.com>");
+
+/* sink pads */
+static GstStaticPadTemplate rtpclient_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("rtp_sink_%d",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+static GstStaticPadTemplate rtpclient_sync_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sync_sink_%d",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
+/* src pads */
+static GstStaticPadTemplate rtpclient_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("rtp_src_%d_%d",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+#define GST_RTP_CLIENT_GET_PRIVATE(obj)  \
+   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_CLIENT, GstRTPClientPrivate))
+
+struct _GstRTPClientPrivate
+{
+};
+
+/* all the info needed to handle the stream with SSRC */
+typedef struct
+{
+  GstRTPClient *client;
+
+  /* the SSRC of this stream */
+  guint32 ssrc;
+
+  /* RTP and RTCP in */
+  GstPad *rtp_sink;
+  GstPad *sync_sink;
+
+  /* the jitterbuffer */
+  GstElement *jitterbuffer;
+  /* the payload demuxer */
+  GstElement *ptdemux;
+  /* the new-pad signal */
+  gulong new_pad_sig;
+} GstRTPClientStream;
+
+/* the PT demuxer found a new payload type */
+static void
+new_pad (GstElement * element, GstPad * pad, GstRTPClientStream * stream)
+{
+}
+
+/* create a new stream for SSRC.
+ *
+ * We create a jitterbuffer and an payload demuxer for the SSRC. The sinkpad of
+ * the jitterbuffer is ghosted to the bin. We connect a pad-added signal to
+ * rtpptdemux so that we can ghost the payload pads outside.
+ *
+ *       +-----------------+     +---------------+
+ *       | rtpjitterbuffer |     |  rtpptdemux   |
+ *   +- sink              src - sink             |
+ *  /    +-----------------+     +---------------+
+ *
+ */
+static GstRTPClientStream *
+create_stream (GstRTPClient * rtpclient, guint32 ssrc)
+{
+  GstRTPClientStream *stream;
+  gchar *name;
+  GstPad *srcpad, *sinkpad;
+  GstPadLinkReturn res;
+
+  stream = g_new0 (GstRTPClientStream, 1);
+  stream->ssrc = ssrc;
+  stream->client = rtpclient;
+
+  stream->jitterbuffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
+  if (!stream->jitterbuffer)
+    goto no_jitterbuffer;
+
+  stream->ptdemux = gst_element_factory_make ("rtpptdemux", NULL);
+  if (!stream->ptdemux)
+    goto no_ptdemux;
+
+  /* add elements to bin */
+  gst_bin_add (GST_BIN_CAST (rtpclient), stream->jitterbuffer);
+  gst_bin_add (GST_BIN_CAST (rtpclient), stream->ptdemux);
+
+  /* link jitterbuffer and PT demuxer */
+  srcpad = gst_element_get_pad (stream->jitterbuffer, "src");
+  sinkpad = gst_element_get_pad (stream->ptdemux, "sink");
+  res = gst_pad_link (srcpad, sinkpad);
+  gst_object_unref (srcpad);
+  gst_object_unref (sinkpad);
+
+  if (res != GST_PAD_LINK_OK)
+    goto could_not_link;
+
+  /* add stream to list */
+  rtpclient->streams = g_list_prepend (rtpclient->streams, stream);
+
+  /* ghost sinkpad */
+  name = g_strdup_printf ("rtp_sink_%d", ssrc);
+  sinkpad = gst_element_get_pad (stream->jitterbuffer, "sink");
+  stream->rtp_sink = gst_ghost_pad_new (name, sinkpad);
+  gst_object_unref (sinkpad);
+  g_free (name);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpclient), stream->rtp_sink);
+
+  /* add signal to ptdemuxer */
+  stream->new_pad_sig =
+      g_signal_connect (G_OBJECT (stream->ptdemux), "pad-added",
+      G_CALLBACK (new_pad), stream);
+
+  return stream;
+
+  /* ERRORS */
+no_jitterbuffer:
+  {
+    g_free (stream);
+    g_warning ("could not create rtpjitterbuffer element");
+    return NULL;
+  }
+no_ptdemux:
+  {
+    gst_object_unref (stream->jitterbuffer);
+    g_free (stream);
+    g_warning ("could not create rtpptdemux element");
+    return NULL;
+  }
+could_not_link:
+  {
+    gst_bin_remove (GST_BIN_CAST (rtpclient), stream->jitterbuffer);
+    gst_bin_remove (GST_BIN_CAST (rtpclient), stream->ptdemux);
+    g_free (stream);
+    g_warning ("could not link jitterbuffer and rtpptdemux element");
+    return NULL;
+  }
+}
+
+#if 0
+static void
+free_stream (GstRTPClientStream * stream)
+{
+  gst_object_unref (stream->jitterbuffer);
+  g_free (stream);
+}
+#endif
+
+/* find the stream for the given SSRC, return NULL if the stream did not exist
+ */
+static GstRTPClientStream *
+find_stream_by_ssrc (GstRTPClient * client, guint32 ssrc)
+{
+  GstRTPClientStream *stream;
+  GList *walk;
+
+  for (walk = client->streams; walk; walk = g_list_next (walk)) {
+    stream = (GstRTPClientStream *) walk->data;
+    if (stream->ssrc == ssrc)
+      return stream;
+  }
+  return NULL;
+}
+
+/* signals and args */
+enum
+{
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+enum
+{
+  PROP_0
+};
+
+/* GObject vmethods */
+static void gst_rtp_client_finalize (GObject * object);
+static void gst_rtp_client_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_rtp_client_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+/* GstElement vmethods */
+static GstStateChangeReturn gst_rtp_client_change_state (GstElement * element,
+    GstStateChange transition);
+static GstPad *gst_rtp_client_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_client_release_pad (GstElement * element, GstPad * pad);
+
+/*static guint gst_rtp_client_signals[LAST_SIGNAL] = { 0 }; */
+
+GST_BOILERPLATE (GstRTPClient, gst_rtp_client, GstBin, GST_TYPE_BIN);
+
+static void
+gst_rtp_client_base_init (gpointer klass)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+  /* sink pads */
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpclient_rtp_sink_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpclient_sync_sink_template));
+
+  /* src pads */
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpclient_rtp_src_template));
+
+  gst_element_class_set_details (element_class, &rtpclient_details);
+}
+
+static void
+gst_rtp_client_class_init (GstRTPClientClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  g_type_class_add_private (klass, sizeof (GstRTPClientPrivate));
+
+  gobject_class->finalize = gst_rtp_client_finalize;
+  gobject_class->set_property = gst_rtp_client_set_property;
+  gobject_class->get_property = gst_rtp_client_get_property;
+
+  gstelement_class->change_state =
+      GST_DEBUG_FUNCPTR (gst_rtp_client_change_state);
+  gstelement_class->request_new_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_client_request_new_pad);
+  gstelement_class->release_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_client_release_pad);
+}
+
+static void
+gst_rtp_client_init (GstRTPClient * rtpclient, GstRTPClientClass * klass)
+{
+  rtpclient->priv = GST_RTP_CLIENT_GET_PRIVATE (rtpclient);
+}
+
+static void
+gst_rtp_client_finalize (GObject * object)
+{
+  GstRTPClient *rtpclient;
+
+  rtpclient = GST_RTP_CLIENT (object);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_client_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstRTPClient *rtpclient;
+
+  rtpclient = GST_RTP_CLIENT (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_rtp_client_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstRTPClient *rtpclient;
+
+  rtpclient = GST_RTP_CLIENT (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static GstStateChangeReturn
+gst_rtp_client_change_state (GstElement * element, GstStateChange transition)
+{
+  GstStateChangeReturn res;
+  GstRTPClient *rtpclient;
+
+  rtpclient = GST_RTP_CLIENT (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      break;
+    default:
+      break;
+  }
+
+  res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      break;
+    default:
+      break;
+  }
+  return res;
+}
+
+/* We have 2 request pads (rtp_sink_%d and sync_sink_%d), the %d is assumed to
+ * be the SSRC of the stream.
+ *
+ * We require that the rtp pad is requested first for a particular SSRC, then
+ * (optionaly) the sync pad can be requested. If no sync pad is requested, no
+ * sync information can be exchanged for this stream.
+ */
+static GstPad *
+gst_rtp_client_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name)
+{
+  GstRTPClient *rtpclient;
+  GstElementClass *klass;
+  GstPadTemplate *rtp_sink_templ, *sync_sink_templ;
+  guint32 ssrc;
+  GstRTPClientStream *stream;
+  GstPad *result;
+
+  g_return_val_if_fail (templ != NULL, NULL);
+  g_return_val_if_fail (GST_IS_RTP_CLIENT (element), NULL);
+
+  if (templ->direction != GST_PAD_SINK)
+    goto wrong_direction;
+
+  rtpclient = GST_RTP_CLIENT (element);
+  klass = GST_ELEMENT_GET_CLASS (element);
+
+  /* figure out the template */
+  rtp_sink_templ = gst_element_class_get_pad_template (klass, "rtp_sink_%d");
+  sync_sink_templ = gst_element_class_get_pad_template (klass, "sync_sink_%d");
+
+  if (templ != rtp_sink_templ && templ != sync_sink_templ)
+    goto wrong_template;
+
+  if (templ == rtp_sink_templ) {
+    /* create new rtp sink pad. If a stream with the pad number already exists
+     * we have an error, else we create the sinkpad, add a jitterbuffer and
+     * ptdemuxer. */
+    if (name == NULL || strlen (name) < 9)
+      goto no_name;
+
+    ssrc = atoi (&name[9]);
+
+    /* see if a stream with that name exists, if so we have an error. */
+    stream = find_stream_by_ssrc (rtpclient, ssrc);
+    if (stream != NULL)
+      goto stream_exists;
+
+    /* ok, create new stream */
+    stream = create_stream (rtpclient, ssrc);
+    if (stream == NULL)
+      goto stream_not_found;
+
+    result = stream->rtp_sink;
+  } else {
+    /* create new rtp sink pad. We can only do this if the RTP pad was
+     * requested before, meaning the session with the padnumber must exist. */
+    if (name == NULL || strlen (name) < 10)
+      goto no_name;
+
+    ssrc = atoi (&name[10]);
+
+    /* find stream */
+    stream = find_stream_by_ssrc (rtpclient, ssrc);
+    if (stream == NULL)
+      goto stream_not_found;
+
+    stream->sync_sink =
+        gst_pad_new_from_static_template (&rtpclient_sync_sink_template, name);
+    gst_element_add_pad (GST_ELEMENT_CAST (rtpclient), stream->sync_sink);
+
+    result = stream->sync_sink;
+  }
+
+  return result;
+
+  /* ERRORS */
+wrong_direction:
+  {
+    g_warning ("rtpclient: request pad that is not a SINK pad");
+    return NULL;
+  }
+wrong_template:
+  {
+    g_warning ("rtpclient: this is not our template");
+    return NULL;
+  }
+no_name:
+  {
+    g_warning ("rtpclient: no padname was specified");
+    return NULL;
+  }
+stream_exists:
+  {
+    g_warning ("rtpclient: stream with SSRC %d already registered", ssrc);
+    return NULL;
+  }
+stream_not_found:
+  {
+    g_warning ("rtpclient: stream with SSRC %d not yet registered", ssrc);
+    return NULL;
+  }
+}
+
+static void
+gst_rtp_client_release_pad (GstElement * element, GstPad * pad)
+{
+}
diff --git a/gst/rtpmanager/gstrtpclient.h b/gst/rtpmanager/gstrtpclient.h
new file mode 100644 (file)
index 0000000..de837fd
--- /dev/null
@@ -0,0 +1,56 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_CLIENT_H__
+#define __GST_RTP_CLIENT_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_CLIENT \
+  (gst_rtp_client_get_type())
+#define GST_RTP_CLIENT(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_CLIENT,GstRTPClient))
+#define GST_RTP_CLIENT_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_CLIENT,GstRTPClientClass))
+#define GST_IS_RTP_CLIENT(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_CLIENT))
+#define GST_IS_RTP_CLIENT_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_CLIENT))
+
+typedef struct _GstRTPClient GstRTPClient;
+typedef struct _GstRTPClientClass GstRTPClientClass;
+typedef struct _GstRTPClientPrivate GstRTPClientPrivate;
+
+struct _GstRTPClient {
+  GstBin         parent_bin;
+
+  /* a list of streams from a client */
+  GList         *streams;
+
+  /*< private >*/
+  GstRTPClientPrivate *priv;
+};
+
+struct _GstRTPClientClass {
+  GstBinClass   parent_class;
+};
+
+GType gst_rtp_client_get_type (void);
+
+#endif /* __GST_RTP_CLIENT_H__ */
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
new file mode 100644 (file)
index 0000000..58b48a3
--- /dev/null
@@ -0,0 +1,1085 @@
+/*
+ * Farsight Voice+Video library
+ *
+ *  Copyright 2007 Collabora Ltd, 
+ *  Copyright 2007 Nokia Corporation
+ *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
+ *  Copyright 2007 Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ */
+
+/**
+ * SECTION:element-rtpjitterbuffer
+ * @short_description: buffer, reorder and remove duplicate RTP packets to
+ * compensate for network oddities.
+ *
+ * <refsect2>
+ * <para>
+ * This element reorders and removes duplicate RTP packets as they are received
+ * from a network source. It will also wait for missing packets up to a
+ * configurable time limit using the ::latency property. Packets arriving too
+ * late are considered as lost packets.
+ * </para>
+ * <para>
+ * This element acts as a live element and so adds ::latency to the pipeline.
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
+ * </programlisting>
+ * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
+ * inserted into the pipeline to smooth out network jitter and to reorder the
+ * out-of-order RTP packets.
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-03-27 (0.10.13)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+#include <gst/rtp/gstrtpbuffer.h>
+#include "gstrtpjitterbuffer.h"
+
+#include "async_jitter_queue.h"
+
+GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
+#define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
+
+/* low and high threshold tell the queue when to start and stop buffering */
+#define LOW_THRESHOLD 0.2
+#define HIGH_THRESHOLD 0.8
+
+/* elementfactory information */
+static const GstElementDetails gst_rtp_jitter_buffer_details =
+GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
+    "Filter/Network",
+    "A buffer that deals with network jitter and other transmission faults",
+    "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
+    "Wim Taymans <wim@fluendo.com>");
+
+/* RTPJitterBuffer signals and args */
+enum
+{
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+#define DEFAULT_LATENCY_MS      200
+#define DEFAULT_DROP_ON_LATENCY FALSE
+
+enum
+{
+  ARG_0,
+  ARG_LATENCY,
+  ARG_DROP_ON_LATENCY
+};
+
+struct _GstRTPJitterBufferPrivate
+{
+  GstPad *sinkpad, *srcpad;
+
+  AsyncJitterQueue *queue;
+
+  /* properties */
+  guint latency_ms;
+  gboolean drop_on_latency;
+
+  /* the last seqnum we pushed out */
+  guint32 last_popped_seqnum;
+  /* the next expected seqnum */
+  guint32 next_seqnum;
+
+  /* clock rate and rtp timestamp offset */
+  gint32 clock_rate;
+  guint64 clock_base;
+
+  /* when we are shutting down */
+  GstFlowReturn srcresult;
+
+  /* for sync */
+  GstSegment segment;
+  GstClockID clock_id;
+  guint32 waiting_seqnum;
+
+  /* some accounting */
+  guint64 num_late;
+  guint64 num_duplicates;
+};
+
+#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
+  (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
+                                GstRTPJitterBufferPrivate))
+
+static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("application/x-rtp, "
+        "clock-rate = (int) [ 1, 2147483647 ]"
+        /* "payload = (int) , "
+         * "encoding-name = (string) "
+         */ )
+    );
+
+static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
+GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("application/x-rtp"
+        /* "payload = (int) , "
+         * "clock-rate = (int) , "
+         * "encoding-name = (string) "
+         */ )
+    );
+
+GST_BOILERPLATE (GstRTPJitterBuffer, gst_rtp_jitter_buffer, GstElement,
+    GST_TYPE_ELEMENT);
+
+/* object overrides */
+static void gst_rtp_jitter_buffer_set_property (GObject * object,
+    guint prop_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtp_jitter_buffer_get_property (GObject * object,
+    guint prop_id, GValue * value, GParamSpec * pspec);
+static void gst_rtp_jitter_buffer_dispose (GObject * object);
+
+/* element overrides */
+static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
+    * element, GstStateChange transition);
+
+/* pad overrides */
+static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
+
+/* sinkpad overrides */
+static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
+    GstEvent * event);
+static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
+    GstBuffer * buffer);
+
+/* srcpad overrides */
+static gboolean
+gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
+static void gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer);
+static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
+
+static void
+gst_rtp_jitter_buffer_base_init (gpointer klass)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
+  gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
+}
+
+static void
+gst_rtp_jitter_buffer_class_init (GstRTPJitterBufferClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  g_type_class_add_private (klass, sizeof (GstRTPJitterBufferPrivate));
+
+  gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_dispose);
+
+  gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
+  gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
+
+  g_object_class_install_property (gobject_class, ARG_LATENCY,
+      g_param_spec_uint ("latency", "Buffer latency in ms",
+          "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
+          G_PARAM_READWRITE));
+
+  g_object_class_install_property (gobject_class, ARG_DROP_ON_LATENCY,
+      g_param_spec_boolean ("drop_on_latency",
+          "Drop buffers when maximum latency is reached",
+          "Tells the jitterbuffer to never exceed the given latency in size",
+          DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
+
+  gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
+
+  GST_DEBUG_CATEGORY_INIT
+      (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+}
+
+static void
+gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
+    GstRTPJitterBufferClass * klass)
+{
+  GstRTPJitterBufferPrivate *priv;
+
+  priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
+  jitterbuffer->priv = priv;
+
+  priv->latency_ms = DEFAULT_LATENCY_MS;
+  priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
+
+  priv->queue = async_jitter_queue_new ();
+  async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD);
+  async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD);
+
+  priv->waiting_seqnum = -1;
+
+  priv->srcpad =
+      gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
+      "src");
+
+  gst_pad_set_activatepush_function (priv->srcpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
+  gst_pad_set_query_function (priv->srcpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
+  gst_pad_set_getcaps_function (priv->srcpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+
+  priv->sinkpad =
+      gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
+      "sink");
+
+  gst_pad_set_chain_function (priv->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
+  gst_pad_set_event_function (priv->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
+  gst_pad_set_setcaps_function (priv->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
+  gst_pad_set_getcaps_function (priv->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+
+  gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
+  gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
+}
+
+static void
+gst_rtp_jitter_buffer_dispose (GObject * object)
+{
+  GstRTPJitterBuffer *jitterbuffer;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+  if (jitterbuffer->priv->queue) {
+    async_jitter_queue_unref (jitterbuffer->priv->queue);
+    jitterbuffer->priv->queue = NULL;
+  }
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static GstCaps *
+gst_rtp_jitter_buffer_getcaps (GstPad * pad)
+{
+  GstRTPJitterBuffer *jitterbuffer;
+  GstRTPJitterBufferPrivate *priv;
+  GstPad *other;
+  GstCaps *caps;
+  const GstCaps *templ;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
+
+  caps = gst_pad_peer_get_caps (other);
+
+  templ = gst_pad_get_pad_template_caps (pad);
+  if (caps == NULL) {
+    GST_DEBUG_OBJECT (jitterbuffer, "copy template");
+    caps = gst_caps_copy (templ);
+  } else {
+    GstCaps *intersect;
+
+    GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
+
+    intersect = gst_caps_intersect (caps, templ);
+    gst_caps_unref (caps);
+
+    caps = intersect;
+  }
+  gst_object_unref (jitterbuffer);
+
+  return caps;
+}
+
+static gboolean
+gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
+{
+  GstRTPJitterBuffer *jitterbuffer;
+  GstRTPJitterBufferPrivate *priv;
+  GstStructure *caps_struct;
+  const GValue *value;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  /* first parse the caps */
+  caps_struct = gst_caps_get_structure (caps, 0);
+
+  /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
+   * measure the amount of data in the buffer */
+  if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
+    goto error;
+
+  if (priv->clock_rate <= 0)
+    goto wrong_rate;
+
+  /* gah, clock-base is uint. If we don't have a base, we will use the first
+   * buffer timestamp as the base time. This will screw up sync but it's better
+   * than nothing. */
+  value = gst_structure_get_value (caps_struct, "clock-base");
+  if (value && G_VALUE_HOLDS_UINT (value))
+    priv->clock_base = g_value_get_uint (value);
+  else
+    priv->clock_base = -1;
+
+  /* first expected seqnum */
+  value = gst_structure_get_value (caps_struct, "seqnum-base");
+  if (value && G_VALUE_HOLDS_UINT (value))
+    priv->next_seqnum = g_value_get_uint (value);
+  else
+    priv->next_seqnum = -1;
+
+  async_jitter_queue_set_max_queue_length (priv->queue,
+      priv->latency_ms * priv->clock_rate / 1000);
+
+  /* set same caps on srcpad */
+  gst_pad_set_caps (priv->srcpad, caps);
+
+  gst_object_unref (jitterbuffer);
+
+  return TRUE;
+
+  /* ERRORS */
+error:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
+    gst_object_unref (jitterbuffer);
+    return FALSE;
+  }
+wrong_rate:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
+    gst_object_unref (jitterbuffer);
+    return FALSE;
+  }
+}
+
+static void
+free_func (gpointer data, GstRTPJitterBuffer * user_data)
+{
+  if (GST_IS_BUFFER (data))
+    gst_buffer_unref (GST_BUFFER_CAST (data));
+  else
+    gst_event_unref (GST_EVENT_CAST (data));
+}
+
+static void
+gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
+{
+  GstRTPJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  async_jitter_queue_lock (priv->queue);
+  /* mark ourselves as flushing */
+  priv->srcresult = GST_FLOW_WRONG_STATE;
+  GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
+  /* this unblocks any waiting pops on the src pad task */
+  async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue,
+      (GFunc) free_func, jitterbuffer);
+  /* unlock clock, we just unschedule, the entry will be released by the 
+   * locking streaming thread. */
+  if (priv->clock_id)
+    gst_clock_id_unschedule (priv->clock_id);
+
+  async_jitter_queue_unlock (priv->queue);
+}
+
+static void
+gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
+{
+  GstRTPJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  async_jitter_queue_lock (priv->queue);
+  GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
+  /* Mark as non flushing */
+  priv->srcresult = GST_FLOW_OK;
+  gst_segment_init (&priv->segment, GST_FORMAT_TIME);
+  priv->last_popped_seqnum = -1;
+  priv->next_seqnum = -1;
+  /* allow pops from the src pad task */
+  async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue);
+  async_jitter_queue_unlock (priv->queue);
+}
+
+static gboolean
+gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
+{
+  gboolean result = TRUE;
+  GstRTPJitterBuffer *jitterbuffer = NULL;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+
+  if (active) {
+    /* allow data processing */
+    gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
+
+    /* start pushing out buffers */
+    GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
+    gst_pad_start_task (jitterbuffer->priv->srcpad,
+        (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
+  } else {
+    /* make sure all data processing stops ASAP */
+    gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+
+    /* NOTE this will hardlock if the state change is called from the src pad
+     * task thread because we will _join() the thread. */
+    GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
+    result = gst_pad_stop_task (pad);
+  }
+
+  gst_object_unref (jitterbuffer);
+
+  return result;
+}
+
+static GstStateChangeReturn
+gst_rtp_jitter_buffer_change_state (GstElement * element,
+    GstStateChange transition)
+{
+  GstRTPJitterBuffer *jitterbuffer;
+  GstRTPJitterBufferPrivate *priv;
+  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+  priv = jitterbuffer->priv;
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      async_jitter_queue_lock (priv->queue);
+      /* reset negotiated values */
+      priv->clock_rate = -1;
+      priv->clock_base = -1;
+      /* block until we go to PLAYING */
+      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
+          TRUE);
+      async_jitter_queue_unlock (priv->queue);
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      async_jitter_queue_lock (priv->queue);
+      /* unblock to allow streaming in PLAYING */
+      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
+          FALSE);
+      async_jitter_queue_unlock (priv->queue);
+      break;
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      /* we are a live element because we sync to the clock, which we can only
+       * do in the PLAYING state */
+      if (ret != GST_STATE_CHANGE_FAILURE)
+        ret = GST_STATE_CHANGE_NO_PREROLL;
+      break;
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      async_jitter_queue_lock (priv->queue);
+      /* block to stop streaming when PAUSED */
+      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
+          TRUE);
+      async_jitter_queue_unlock (priv->queue);
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      break;
+    default:
+      break;
+  }
+
+  return ret;
+}
+
+/**
+ * Performs comparison 'b - a' with check for overflows.
+ */
+static inline gint
+priv_compare_rtp_seq_lt (guint16 a, guint16 b)
+{
+  /* check if diff more than half of the 16bit range */
+  if (abs (b - a) > (1 << 15)) {
+    /* one of a/b has wrapped */
+    return a - b;
+  } else {
+    return b - a;
+  }
+}
+
+/**
+ * gets the seqnum from the buffers and compare them 
+ */
+static gint
+compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b)
+{
+  gint ret;
+
+  if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) {
+    /* two buffers */
+    ret = priv_compare_rtp_seq_lt
+        (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)),
+        gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b)));
+  } else {
+    /* one of them is an event, the event always goes before the other element
+     * so we return -1. */
+    if (GST_IS_EVENT (a))
+      ret = -1;
+    else
+      ret = 1;
+  }
+  return ret;
+}
+
+static gboolean
+gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
+{
+  gboolean ret = TRUE;
+  GstRTPJitterBuffer *jitterbuffer;
+  GstRTPJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_NEWSEGMENT:
+    {
+      GstFormat format;
+      gdouble rate, arate;
+      gint64 start, stop, time;
+      gboolean update;
+
+      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+          &start, &stop, &time);
+
+      /* we need time for now */
+      if (format != GST_FORMAT_TIME)
+        goto newseg_wrong_format;
+
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
+          ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
+          update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
+          GST_TIME_ARGS (time));
+
+      /* now configure the values, we need these to time the release of the
+       * buffers on the srcpad. */
+      gst_segment_set_newsegment_full (&priv->segment, update,
+          rate, arate, format, start, stop, time);
+
+      /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
+      ret = gst_pad_push_event (priv->srcpad, event);
+      break;
+    }
+    case GST_EVENT_FLUSH_START:
+      gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+      break;
+    case GST_EVENT_FLUSH_STOP:
+      gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
+      break;
+    case GST_EVENT_EOS:
+    {
+      /* push EOS in queue. We always push it at the head */
+      async_jitter_queue_lock (priv->queue);
+      /* check for flushing, we need to discard the event and return FALSE when
+       * we are flushing */
+      ret = priv->srcresult == GST_FLOW_OK;
+      if (ret)
+        async_jitter_queue_push_unlocked (priv->queue, event);
+      else
+        gst_event_unref (event);
+      async_jitter_queue_unlock (priv->queue);
+      break;
+    }
+    default:
+      ret = gst_pad_push_event (priv->srcpad, event);
+      break;
+  }
+
+done:
+  gst_object_unref (jitterbuffer);
+
+  return ret;
+
+  /* ERRORS */
+newseg_wrong_format:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
+    ret = FALSE;
+    goto done;
+  }
+}
+
+static GstFlowReturn
+gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
+{
+  GstRTPJitterBuffer *jitterbuffer;
+  GstRTPJitterBufferPrivate *priv;
+  guint16 seqnum;
+  GstFlowReturn ret;
+
+
+  g_return_val_if_fail (gst_rtp_buffer_validate (buffer), GST_FLOW_ERROR);
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  if (priv->clock_rate == -1)
+    goto not_negotiated;
+
+  seqnum = gst_rtp_buffer_get_seq (buffer);
+  GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
+
+  async_jitter_queue_lock (priv->queue);
+  ret = priv->srcresult;
+  if (ret != GST_FLOW_OK)
+    goto out_flushing;
+
+  /* let's check if this buffer is too late, we cannot accept packets with
+   * bigger seqnum than the one we already pushed. */
+  if (priv->last_popped_seqnum != -1) {
+    if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
+      goto too_late;
+  }
+
+  /* let's drop oldest packet if the queue is already full and drop-on-latency
+   * is set. */
+  if (priv->drop_on_latency) {
+    if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >=
+        priv->latency_ms * priv->clock_rate / 1000) {
+      GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
+          seqnum);
+      GstBuffer *old_buf;
+
+      old_buf = async_jitter_queue_pop_unlocked (priv->queue);
+      gst_buffer_unref (old_buf);
+    }
+  }
+
+  /* now insert the packet into the queue in sorted order. This function returns
+   * FALSE if a packet with the same seqnum was already in the queue, meaning we
+   * have a duplicate. */
+  if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer,
+          (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL))
+    goto duplicate;
+
+  /* let's unschedule and unblock any waiting buffers. We only want to do this
+   * if there is a currently waiting newer (> seqnum) buffer  */
+  if (priv->clock_id) {
+    if (priv->waiting_seqnum > seqnum) {
+      gst_clock_id_unschedule (priv->clock_id);
+      GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
+    }
+  }
+
+  GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d",
+      seqnum, async_jitter_queue_length_unlocked (priv->queue));
+
+finished:
+  async_jitter_queue_unlock (priv->queue);
+
+  gst_object_unref (jitterbuffer);
+
+  return ret;
+
+  /* ERRORS */
+not_negotiated:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
+    gst_buffer_unref (buffer);
+    gst_object_unref (jitterbuffer);
+    return GST_FLOW_NOT_NEGOTIATED;
+  }
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
+    gst_buffer_unref (buffer);
+    gst_object_unref (jitterbuffer);
+    goto finished;
+  }
+too_late:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
+        " popped, dropping", seqnum, priv->last_popped_seqnum);
+    priv->num_late++;
+    gst_buffer_unref (buffer);
+    goto finished;
+  }
+duplicate:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
+        seqnum);
+    priv->num_duplicates++;
+    gst_buffer_unref (buffer);
+    goto finished;
+  }
+}
+
+/**
+ * This funcion will push out buffers on the source pad.
+ *
+ * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
+ * different seqnum (missing packets before B), this function will wait for the
+ * missing packet to arrive up to the rtp timestamp of buffer B.
+ */
+static void
+gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
+{
+  GstRTPJitterBufferPrivate *priv;
+  gpointer elem;
+  GstBuffer *outbuf;
+  GstFlowReturn result;
+  guint16 seqnum;
+  guint32 rtp_time;
+  GstClockTime timestamp;
+  gint64 running_time;
+
+  priv = jitterbuffer->priv;
+
+  async_jitter_queue_lock (priv->queue);
+again:
+  elem = async_jitter_queue_pop_unlocked (priv->queue);
+  if (!elem)
+    goto no_elem;
+
+  /* special code for events */
+  if (G_UNLIKELY (GST_IS_EVENT (elem))) {
+    GstEvent *event = GST_EVENT_CAST (elem);
+
+    switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_EOS:
+        GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue");
+        /* we don't expect more data now, makes upstream perform EOS actions */
+        priv->srcresult = GST_FLOW_UNEXPECTED;
+        break;
+      default:
+        GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue",
+            GST_EVENT_TYPE_NAME (event));
+        break;
+    }
+    async_jitter_queue_unlock (priv->queue);
+
+    /* push event */
+    gst_pad_push_event (priv->srcpad, event);
+    return;
+  }
+
+  /* pop a buffer, we will get NULL if the queue was shut down */
+  outbuf = GST_BUFFER_CAST (elem);
+
+  seqnum = gst_rtp_buffer_get_seq (outbuf);
+
+  GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d",
+      gst_rtp_buffer_get_seq (outbuf),
+      async_jitter_queue_length_unlocked (priv->queue));
+
+  /* If we don't know what the next seqnum should be (== -1) we have to wait
+   * because it might be possible that we are not receiving this buffer in-order,
+   * a buffer with a lower seqnum could arrive later and we want to push that
+   * earlier buffer before this buffer then.
+   * If we know the expected seqnum, we can compare it to the current seqnum to
+   * determine if we have missing a packet. If we have a missing packet (which
+   * must be before this packet) we can wait for it until the deadline for this
+   * packet expires. */
+  if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) {
+    GstClockID id;
+    GstClockTimeDiff jitter;
+    GstClockReturn ret;
+    GstClock *clock;
+
+    if (priv->next_seqnum != -1) {
+      /* we expected next_seqnum but received something else, that's a gap */
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
+          seqnum);
+    } else {
+      /* we don't know what the next_seqnum should be, wait for the last
+       * possible moment to push this buffer, maybe we get an earlier seqnum
+       * while we wait */
+      GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
+    }
+
+    /* get the max deadline to wait for the missing packets, this is the time
+     * of the currently popped packet */
+    rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
+
+    GST_DEBUG_OBJECT (jitterbuffer, "rtp_time %u, base %u", rtp_time,
+        priv->clock_base);
+
+    /* if no clock_base was given, take first ts as base */
+    if (priv->clock_base == -1)
+      priv->clock_base = rtp_time;
+
+    /* take rtp timestamp offset into account, this can wrap around */
+    rtp_time -= priv->clock_base;
+
+    /* bring timestamp to gst time */
+    timestamp =
+        gst_util_uint64_scale_int (GST_SECOND, rtp_time, priv->clock_rate);
+
+    GST_DEBUG_OBJECT (jitterbuffer, "timestamp %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (timestamp));
+
+    /* bring to running time */
+    running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
+        timestamp);
+
+    /* correct for sync against the gstreamer clock, add latency */
+    GST_OBJECT_LOCK (jitterbuffer);
+    clock = GST_ELEMENT_CLOCK (jitterbuffer);
+    if (!clock) {
+      GST_OBJECT_UNLOCK (jitterbuffer);
+      /* let's just push if there is no clock */
+      goto push_buffer;
+    }
+
+    /* add latency */
+    running_time += (priv->latency_ms * GST_MSECOND);
+
+    GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (running_time));
+
+    /* prepare for sync against clock */
+    running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time;
+
+    /* create an entry for the clock */
+    id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time);
+    priv->waiting_seqnum = seqnum;
+    GST_OBJECT_UNLOCK (jitterbuffer);
+
+    /* release the lock so that the other end can push stuff or unlock */
+    async_jitter_queue_unlock (priv->queue);
+
+    ret = gst_clock_id_wait (id, &jitter);
+
+    async_jitter_queue_lock (priv->queue);
+    /* and free the entry */
+    gst_clock_id_unref (id);
+    priv->clock_id = NULL;
+    priv->waiting_seqnum = -1;
+
+    /* at this point, the clock could have been unlocked by a timeout, a new
+     * tail element was added to the queue or because we are shutting down. Check
+     * for shutdown first. */
+    if (priv->srcresult != GST_FLOW_OK)
+      goto flushing;
+
+    /* if we got unscheduled and we are not flushing, it's because a new tail
+     * element became available in the queue. Grab it and try to push or sync. */
+    if (ret == GST_CLOCK_UNSCHEDULED) {
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "Wait got unscheduled, will retry to push with new buffer");
+      /* reinserting popped buffer into queue */
+      if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf,
+              (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) {
+        GST_DEBUG_OBJECT (jitterbuffer,
+            "Duplicate packet #%d detected, dropping", seqnum);
+        priv->num_duplicates++;
+        gst_buffer_unref (outbuf);
+      }
+      goto again;
+    }
+  }
+push_buffer:
+  /* check if we are pushing something unexpected */
+  if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
+    gint dropped;
+
+    /* calc number of missing packets, careful for wraparounds */
+    dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
+
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "Pushing DISCONT after dropping %d (%d to %d)", dropped,
+        priv->next_seqnum, seqnum);
+
+    /* update stats */
+    priv->num_late += dropped;
+
+    /* set DISCONT flag */
+    outbuf = gst_buffer_make_metadata_writable (outbuf);
+    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+  }
+  /* now we are ready to push the buffer. Save the seqnum and release the lock
+   * so the other end can push stuff in the queue again. */
+  priv->last_popped_seqnum = seqnum;
+  priv->next_seqnum = (seqnum + 1) & 0xffff;
+  async_jitter_queue_unlock (priv->queue);
+
+  /* push buffer */
+  GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
+  result = gst_pad_push (priv->srcpad, outbuf);
+  if (result != GST_FLOW_OK)
+    goto pause;
+
+  return;
+
+  /* ERRORS */
+no_elem:
+  {
+    /* store result, we are flushing now */
+    GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing");
+    priv->srcresult = GST_FLOW_WRONG_STATE;
+    gst_pad_pause_task (priv->srcpad);
+    async_jitter_queue_unlock (priv->queue);
+    return;
+  }
+flushing:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
+    gst_buffer_unref (outbuf);
+    async_jitter_queue_unlock (priv->queue);
+    return;
+  }
+pause:
+  {
+    const gchar *reason = gst_flow_get_name (result);
+
+    GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
+
+    async_jitter_queue_lock (priv->queue);
+    /* store result */
+    priv->srcresult = result;
+    /* we don't post errors or anything because upstream will do that for us
+     * when we pass the return value upstream. */
+    gst_pad_pause_task (priv->srcpad);
+    async_jitter_queue_unlock (priv->queue);
+    return;
+  }
+}
+
+static gboolean
+gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
+{
+  GstRTPJitterBuffer *jitterbuffer;
+  GstRTPJitterBufferPrivate *priv;
+  gboolean res = FALSE;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_LATENCY:
+    {
+      /* We need to send the query upstream and add the returned latency to our
+       * own */
+      GstClockTime min_latency, max_latency;
+      gboolean us_live;
+      GstPad *peer;
+
+      if ((peer = gst_pad_get_peer (priv->sinkpad))) {
+        if ((res = gst_pad_query (peer, query))) {
+          gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
+
+          min_latency += priv->latency_ms * GST_MSECOND;
+          max_latency += priv->latency_ms * GST_MSECOND;
+
+          GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
+              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+
+          gst_query_set_latency (query, TRUE, min_latency, max_latency);
+        }
+        gst_object_unref (peer);
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  return res;
+}
+
+static void
+gst_rtp_jitter_buffer_set_property (GObject * object,
+    guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+  GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+
+  switch (prop_id) {
+    case ARG_LATENCY:
+    {
+      guint new_latency, old_latency;
+
+      /* FIXME, not threadsafe */
+      new_latency = g_value_get_uint (value);
+      old_latency = jitterbuffer->priv->latency_ms;
+
+      jitterbuffer->priv->latency_ms = new_latency;
+      if (jitterbuffer->priv->clock_rate != -1) {
+        async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue,
+            gst_util_uint64_scale_int (new_latency,
+                jitterbuffer->priv->clock_rate, 1000));
+      }
+      /* post message if latency changed, this will infor the parent pipeline
+       * that a latency reconfiguration is possible. */
+      if (new_latency != old_latency) {
+        gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
+            gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
+      }
+      break;
+    }
+    case ARG_DROP_ON_LATENCY:
+    {
+      jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value);
+      break;
+    }
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_rtp_jitter_buffer_get_property (GObject * object,
+    guint prop_id, GValue * value, GParamSpec * pspec)
+{
+  GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+
+  switch (prop_id) {
+    case ARG_LATENCY:
+      g_value_set_uint (value, jitterbuffer->priv->latency_ms);
+      break;
+    case ARG_DROP_ON_LATENCY:
+      g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h
new file mode 100644 (file)
index 0000000..a867144
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Farsight Voice+Video library
+ *
+ *  Copyright 2007 Collabora Ltd, 
+ *  Copyright 2007 Nokia Corporation
+ *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
+ *  Copyright 2007 Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ */
+
+#ifndef __GST_RTP_JITTER_BUFFER_H__
+#define __GST_RTP_JITTER_BUFFER_H__
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+G_BEGIN_DECLS
+
+/* #define's don't like whitespacey bits */
+#define GST_TYPE_RTP_JITTER_BUFFER \
+  (gst_rtp_jitter_buffer_get_type())
+#define GST_RTP_JITTER_BUFFER(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj), \
+  GST_TYPE_RTP_JITTER_BUFFER,GstRTPJitterBuffer))
+#define GST_RTP_JITTER_BUFFER_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass), \
+  GST_TYPE_RTP_JITTER_BUFFER,GstRTPJitterBufferClass))
+#define GST_IS_RTP_JITTER_BUFFER(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_JITTER_BUFFER))
+#define GST_IS_RTP_JITTER_BUFFER_CLASS(obj) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_JITTER_BUFFER))
+
+typedef struct _GstRTPJitterBuffer GstRTPJitterBuffer;
+typedef struct _GstRTPJitterBufferClass GstRTPJitterBufferClass;
+typedef struct _GstRTPJitterBufferPrivate GstRTPJitterBufferPrivate;
+
+struct _GstRTPJitterBuffer
+{
+  GstElement parent;
+
+  GstRTPJitterBufferPrivate *priv;
+
+  /*< private > */
+  gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstRTPJitterBufferClass
+{
+  GstElementClass parent_class;
+
+  /*< private > */
+  gpointer _gst_reserved[GST_PADDING];
+};
+
+GType gst_rtp_jitter_buffer_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_RTP_JITTER_BUFFER_H__ */
diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c
new file mode 100644 (file)
index 0000000..a059cad
--- /dev/null
@@ -0,0 +1,55 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstrtpclient.h"
+#include "gstrtpjitterbuffer.h"
+#include "gstrtpptdemux.h"
+#include "gstrtpsession.h"
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+  if (!gst_element_register (plugin, "rtpclient", GST_RANK_NONE,
+          GST_TYPE_RTP_CLIENT))
+    return FALSE;
+
+  if (!gst_element_register (plugin, "rtpjitterbuffer", GST_RANK_NONE,
+          GST_TYPE_RTP_JITTER_BUFFER))
+    return FALSE;
+
+  if (!gst_element_register (plugin, "rtpptdemux", GST_RANK_NONE,
+          GST_TYPE_RTP_PT_DEMUX))
+    return FALSE;
+
+  if (!gst_element_register (plugin, "rtpsession", GST_RANK_NONE,
+          GST_TYPE_RTP_SESSION))
+    return FALSE;
+
+  return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+    GST_VERSION_MINOR,
+    "rtpmanager",
+    "RTP session management plugin library",
+    plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c
new file mode 100644 (file)
index 0000000..5950f61
--- /dev/null
@@ -0,0 +1,350 @@
+/* 
+ * RTP Demux element
+ *
+ * Copyright (C) 2005 Nokia Corporation.
+ * @author Kai Vehmanen <kai.vehmanen@nokia.com>
+ *
+ * Loosely based on GStreamer gstdecodebin
+ * Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * Contributors:
+ * Andre Moreira Magalhaes <andre.magalhaes@indt.org.br>
+ */
+
+/*
+ * Status:
+ *  - works with the test_rtpdemux.c tool
+ *
+ * Check:
+ *  - is emitting a signal enough, or should we
+ *    use GstEvent to notify downstream elements
+ *    of the new packet... no?
+ *
+ * Notes:
+ *  - emits event both for new PTs, and whenever
+ *    a PT is changed
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+#include <gst/gst.h>
+#include "gstrtpptdemux.h"
+#include <gst/rtp/gstrtpbuffer.h>
+
+/* generic templates */
+static GstStaticPadTemplate rtp_pt_demux_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("application/x-rtp, "
+        "payload = (int) [ 0, 255 ], " "clock-rate = (int) [ 0, 2147483647 ]")
+    );
+
+static GstStaticPadTemplate rtp_pt_demux_src_template =
+GST_STATIC_PAD_TEMPLATE ("src%d",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtp_pt_demux_debug);
+#define GST_CAT_DEFAULT gst_rtp_pt_demux_debug
+
+/**
+ * Item for storing GstPad<->pt pairs.
+ */
+struct _GstRTPPtDemuxPad
+{
+  GstPad *pad;        /**< pointer to the actual pad */
+  gint pt;             /**< RTP payload-type attached to pad */
+};
+
+/* signals */
+enum
+{
+  SIGNAL_NEW_PAYLOAD_TYPE,
+  SIGNAL_PAYLOAD_TYPE_CHANGE,
+  LAST_SIGNAL
+};
+
+GST_BOILERPLATE (GstRTPPtDemux, gst_rtp_pt_demux, GstElement, GST_TYPE_ELEMENT);
+
+static void gst_rtp_pt_demux_finalize (GObject * object);
+
+static void gst_rtp_pt_demux_release (GstElement * element);
+static gboolean gst_rtp_pt_demux_setup (GstElement * element);
+
+static GstFlowReturn gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf);
+static GstCaps *gst_rtp_pt_demux_getcaps (GstPad * pad);
+static GstStateChangeReturn gst_rtp_pt_demux_change_state (GstElement * element,
+    GstStateChange transition);
+
+static GstPad *find_pad_for_pt (GstRTPPtDemux * rtpdemux, guint8 pt);
+
+static guint gst_rtp_pt_demux_signals[LAST_SIGNAL] = { 0 };
+
+static GstElementDetails gst_rtp_pt_demux_details = {
+  "RTP Demux",
+  /* XXX: what's the correct hierarchy? */
+  "Codec/Demux/Network",
+  "Parses codec streams transmitted in the same RTP session",
+  "Kai Vehmanen <kai.vehmanen@nokia.com>"
+};
+
+static void
+gst_rtp_pt_demux_base_init (gpointer g_class)
+{
+  GstElementClass *gstelement_klass = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_add_pad_template (gstelement_klass,
+      gst_static_pad_template_get (&rtp_pt_demux_sink_template));
+  gst_element_class_add_pad_template (gstelement_klass,
+      gst_static_pad_template_get (&rtp_pt_demux_src_template));
+
+  gst_element_class_set_details (gstelement_klass, &gst_rtp_pt_demux_details);
+}
+
+static void
+gst_rtp_pt_demux_class_init (GstRTPPtDemuxClass * klass)
+{
+  GObjectClass *gobject_klass;
+  GstElementClass *gstelement_klass;
+
+  gobject_klass = (GObjectClass *) klass;
+  gstelement_klass = (GstElementClass *) klass;
+
+  gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE] =
+      g_signal_new ("new-payload-type",
+      G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstRTPPtDemuxClass, new_payload_type),
+      NULL, NULL,
+      g_cclosure_marshal_VOID__UINT_POINTER,
+      G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_PAD);
+  gst_rtp_pt_demux_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
+      g_signal_new ("payload-type-change",
+      G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstRTPPtDemuxClass, payload_type_change),
+      NULL, NULL, gst_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+
+  gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_rtp_pt_demux_finalize);
+
+  gstelement_klass->change_state =
+      GST_DEBUG_FUNCPTR (gst_rtp_pt_demux_change_state);
+
+  GST_DEBUG_CATEGORY_INIT (gst_rtp_pt_demux_debug,
+      "rtpptdemux", 0, "RTP codec demuxer");
+
+}
+
+static void
+gst_rtp_pt_demux_init (GstRTPPtDemux * ptdemux, GstRTPPtDemuxClass * g_class)
+{
+  GstElementClass *klass = GST_ELEMENT_GET_CLASS (ptdemux);
+
+  ptdemux->sink =
+      gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+          "sink"), "sink");
+  g_assert (ptdemux->sink != NULL);
+
+  gst_pad_set_chain_function (ptdemux->sink, gst_rtp_pt_demux_chain);
+
+  gst_element_add_pad (GST_ELEMENT (ptdemux), ptdemux->sink);
+}
+
+static void
+gst_rtp_pt_demux_finalize (GObject * object)
+{
+  gst_rtp_pt_demux_release (GST_ELEMENT (object));
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstFlowReturn
+gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstRTPPtDemux *rtpdemux;
+  GstElement *element = GST_ELEMENT (GST_OBJECT_PARENT (pad));
+  guint8 pt;
+  GstPad *srcpad;
+
+  rtpdemux = GST_RTP_PT_DEMUX (GST_OBJECT_PARENT (pad));
+
+  g_return_val_if_fail (gst_rtp_buffer_validate (buf), GST_FLOW_ERROR);
+
+  pt = gst_rtp_buffer_get_payload_type (buf);
+
+  srcpad = find_pad_for_pt (rtpdemux, pt);
+  if (srcpad == NULL) {
+    /* new PT, create a src pad */
+    GstElementClass *klass;
+    GstPadTemplate *templ;
+    gchar *padname;
+    GstCaps *caps;
+    GstRTPPtDemuxPad *rtpdemuxpad;
+
+    klass = GST_ELEMENT_GET_CLASS (rtpdemux);
+    templ = gst_element_class_get_pad_template (klass, "src%d");
+    padname = g_strdup_printf ("src%d", pt);
+    srcpad = gst_pad_new_from_template (templ, padname);
+    g_free (padname);
+
+    caps = gst_pad_get_caps (srcpad);
+    caps = gst_caps_make_writable (caps);
+    gst_caps_append_structure (caps,
+        gst_structure_new ("payload", "payload", G_TYPE_INT, pt, NULL));
+    gst_pad_set_caps (srcpad, caps);
+
+    /* XXX: set _link () function */
+    gst_pad_set_getcaps_function (srcpad, gst_rtp_pt_demux_getcaps);
+    gst_pad_set_active (srcpad, TRUE);
+    gst_element_add_pad (element, srcpad);
+
+    if (srcpad) {
+      GST_DEBUG ("Adding pt=%d to the list.", pt);
+      rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1);
+      rtpdemuxpad->pt = pt;
+      rtpdemuxpad->pad = srcpad;
+      rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad);
+
+      GST_DEBUG ("emitting new-payload_type for pt %d", pt);
+      g_signal_emit (G_OBJECT (rtpdemux),
+          gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad);
+    }
+  }
+
+  if (pt != rtpdemux->last_pt) {
+    gint emit_pt = pt;
+
+    /* our own signal with an extra flag that this is the only pad */
+    rtpdemux->last_pt = pt;
+    GST_DEBUG ("emitting payload-type-changed for pt %d", emit_pt);
+    g_signal_emit (G_OBJECT (rtpdemux),
+        gst_rtp_pt_demux_signals[SIGNAL_PAYLOAD_TYPE_CHANGE], 0, emit_pt);
+  }
+
+  /* push to srcpad */
+  if (srcpad)
+    gst_pad_push (srcpad, GST_BUFFER (buf));
+
+  return ret;
+}
+
+static GstCaps *
+gst_rtp_pt_demux_getcaps (GstPad * pad)
+{
+  GstCaps *caps;
+
+  GST_OBJECT_LOCK (pad);
+  if ((caps = GST_PAD_CAPS (pad)))
+    caps = gst_caps_ref (caps);
+  GST_OBJECT_UNLOCK (pad);
+
+  return caps;
+}
+
+static GstPad *
+find_pad_for_pt (GstRTPPtDemux * rtpdemux, guint8 pt)
+{
+  GstPad *respad = NULL;
+  GSList *item = rtpdemux->srcpads;
+
+  for (; item; item = g_slist_next (item)) {
+    GstRTPPtDemuxPad *pad = item->data;
+
+    if (pad->pt == pt) {
+      respad = pad->pad;
+      break;
+    }
+  }
+
+  return respad;
+}
+
+/**
+ * Reserves resources for the object.
+ */
+static gboolean
+gst_rtp_pt_demux_setup (GstElement * element)
+{
+  GstRTPPtDemux *ptdemux = GST_RTP_PT_DEMUX (element);
+  gboolean res = TRUE;
+
+  if (ptdemux) {
+    ptdemux->srcpads = NULL;
+    ptdemux->last_pt = 0xFFFF;
+  }
+
+  return res;
+}
+
+/**
+ * Free resources for the object.
+ */
+static void
+gst_rtp_pt_demux_release (GstElement * element)
+{
+  GstRTPPtDemux *ptdemux = GST_RTP_PT_DEMUX (element);
+
+  if (ptdemux) {
+    /* note: GstElement's dispose() will handle the pads */
+    g_slist_free (ptdemux->srcpads);
+    ptdemux->srcpads = NULL;
+  }
+}
+
+static GstStateChangeReturn
+gst_rtp_pt_demux_change_state (GstElement * element, GstStateChange transition)
+{
+  GstStateChangeReturn ret;
+  GstRTPPtDemux *ptdemux;
+
+  ptdemux = GST_RTP_PT_DEMUX (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      if (gst_rtp_pt_demux_setup (element) != TRUE)
+        ret = GST_STATE_CHANGE_FAILURE;
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      gst_rtp_pt_demux_release (element);
+      break;
+    default:
+      break;
+  }
+
+  return ret;
+}
diff --git a/gst/rtpmanager/gstrtpptdemux.h b/gst/rtpmanager/gstrtpptdemux.h
new file mode 100644 (file)
index 0000000..93be395
--- /dev/null
@@ -0,0 +1,57 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_PT_DEMUX_H__
+#define __GST_RTP_PT_DEMUX_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_PT_DEMUX            (gst_rtp_pt_demux_get_type())
+#define GST_RTP_PT_DEMUX(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_PT_DEMUX,GstRTPPtDemux))
+#define GST_RTP_PT_DEMUX_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_PT_DEMUX,GstRTPPtDemuxClass))
+#define GST_IS_RTP_PT_DEMUX(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_PT_DEMUX))
+#define GST_IS_RTP_PT_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_PT_DEMUX))
+
+typedef struct _GstRTPPtDemux GstRTPPtDemux;
+typedef struct _GstRTPPtDemuxClass GstRTPPtDemuxClass;
+typedef struct _GstRTPPtDemuxPad GstRTPPtDemuxPad;
+
+struct _GstRTPPtDemux
+{
+  GstElement parent;  /**< parent class */
+
+  GstPad *sink;       /**< the sink pad */
+  guint16 last_pt;    /**< pt of the last packet 0xFFFF if none */
+  GSList *srcpads;    /**< a linked list of GstRTPPtDemuxPad objects */
+};
+
+struct _GstRTPPtDemuxClass
+{
+  GstElementClass parent_class;
+
+  /* signal emmited when a new PT is found from the incoming stream */
+  void (*new_payload_type) (GstElement * element, gint pt, GstPad * pad);
+
+  /* signal emitted when the payload type changes */
+  void (*payload_type_change) (GstElement * element, gint pt);
+};
+
+GType gst_rtp_pt_demux_get_type (void);
+
+#endif /* __GST_RTP_PT_DEMUX_H__ */
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
new file mode 100644 (file)
index 0000000..47df756
--- /dev/null
@@ -0,0 +1,453 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-rtpsession
+ * @short_description: an RTP session manager
+ * @see_also: rtpjitterbuffer, rtpbin
+ *
+ * <refsect2>
+ * <para>
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
+ * </programlisting>
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-04-02 (0.10.6)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "gstrtpsession.h"
+
+/* elementfactory information */
+static const GstElementDetails rtpsession_details =
+GST_ELEMENT_DETAILS ("RTP Session",
+    "Filter/Editor/Video",
+    "Implement an RTP session",
+    "Wim Taymans <wim@fluendo.com>");
+
+/* sink pads */
+static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
+static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+/* src pads */
+static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+static GstStaticPadTemplate rtpsession_sync_src_template =
+GST_STATIC_PAD_TEMPLATE ("sync_src",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
+static GstStaticPadTemplate rtpsession_send_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS ("application/x-rtp")
+    );
+
+static GstStaticPadTemplate rtpsession_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("rtcp_src",
+    GST_PAD_SRC,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
+/* signals and args */
+enum
+{
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+enum
+{
+  PROP_0
+};
+
+/* GObject vmethods */
+static void gst_rtp_session_finalize (GObject * object);
+static void gst_rtp_session_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_rtp_session_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+/* GstElement vmethods */
+static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
+    GstStateChange transition);
+static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
+
+/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */
+
+GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
+
+static void
+gst_rtp_session_base_init (gpointer klass)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+  /* sink pads */
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
+
+  /* src pads */
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_sync_src_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&rtpsession_rtcp_src_template));
+
+  gst_element_class_set_details (element_class, &rtpsession_details);
+}
+
+static void
+gst_rtp_session_class_init (GstRTPSessionClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  gobject_class->finalize = gst_rtp_session_finalize;
+  gobject_class->set_property = gst_rtp_session_set_property;
+  gobject_class->get_property = gst_rtp_session_get_property;
+
+  gstelement_class->change_state =
+      GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
+  gstelement_class->request_new_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
+  gstelement_class->release_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
+}
+
+static void
+gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
+{
+}
+
+static void
+gst_rtp_session_finalize (GObject * object)
+{
+  GstRTPSession *rtpsession;
+
+  rtpsession = GST_RTP_SESSION (object);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_session_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstRTPSession *rtpsession;
+
+  rtpsession = GST_RTP_SESSION (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_rtp_session_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstRTPSession *rtpsession;
+
+  rtpsession = GST_RTP_SESSION (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static GstStateChangeReturn
+gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
+{
+  GstStateChangeReturn res;
+  GstRTPSession *rtpsession;
+
+  rtpsession = GST_RTP_SESSION (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      break;
+    default:
+      break;
+  }
+
+  res = parent_class->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      break;
+    default:
+      break;
+  }
+  return res;
+}
+
+/* receive a packet from a sender, send it to the RTP session manager and
+ * forward the packet on the rtp_src pad
+ */
+static GstFlowReturn
+gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
+{
+  GstRTPSession *rtpsession;
+  GstFlowReturn ret;
+
+  rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+  /* FIXME, do something */
+  ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+
+  gst_object_unref (rtpsession);
+
+  return ret;
+}
+
+/* Receive an RTCP packet from a sender, send it to the RTP session manager and
+ * forward the SR packets to the sync_src pad.
+ */
+static GstFlowReturn
+gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
+{
+  GstRTPSession *rtpsession;
+  GstFlowReturn ret;
+
+  rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+  /* FIXME, do something */
+  ret = gst_pad_push (rtpsession->sync_src, buffer);
+
+  gst_object_unref (rtpsession);
+
+  return ret;
+}
+
+/* Recieve an RTP packet to be send to the receivers, send to RTP session
+ * manager and forward to send_rtp_src.
+ */
+static GstFlowReturn
+gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
+{
+  GstRTPSession *rtpsession;
+  GstFlowReturn ret;
+
+  rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+  /* FIXME, do something */
+  ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
+
+  gst_object_unref (rtpsession);
+
+  return ret;
+}
+
+
+/* Create sinkpad to receive RTP packets from senders. This will also create a
+ * srcpad for the RTP packets.
+ */
+static GstPad *
+create_recv_rtp_sink (GstRTPSession * rtpsession)
+{
+  rtpsession->recv_rtp_sink =
+      gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
+      NULL);
+  gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
+      gst_rtp_session_chain_recv_rtp);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+      rtpsession->recv_rtp_sink);
+
+  rtpsession->recv_rtp_src =
+      gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
+      NULL);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
+
+  return rtpsession->recv_rtp_sink;
+}
+
+/* Create a sinkpad to receive RTCP messages from senders, this will also create a
+ * sync_src pad for the SR packets.
+ */
+static GstPad *
+create_recv_rtcp_sink (GstRTPSession * rtpsession)
+{
+  rtpsession->recv_rtcp_sink =
+      gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
+      NULL);
+  gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
+      gst_rtp_session_chain_recv_rtcp);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+      rtpsession->recv_rtcp_sink);
+
+  rtpsession->sync_src =
+      gst_pad_new_from_static_template (&rtpsession_sync_src_template, NULL);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
+
+  return rtpsession->recv_rtcp_sink;
+}
+
+/* Create a sinkpad to receive RTP packets for receivers. This will also create a
+ * send_rtp_src pad.
+ */
+static GstPad *
+create_send_rtp_sink (GstRTPSession * rtpsession)
+{
+  rtpsession->send_rtp_sink =
+      gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
+      NULL);
+  gst_pad_set_chain_function (rtpsession->send_rtp_sink,
+      gst_rtp_session_chain_send_rtp);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+      rtpsession->recv_rtcp_sink);
+
+  rtpsession->send_rtp_src =
+      gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
+      NULL);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
+
+  return rtpsession->send_rtp_sink;
+}
+
+/* Create a srcpad with the RTCP packets to send out.
+ * This pad will be driven by the RTP session manager when it wants to send out
+ * RTCP packets.
+ */
+static GstPad *
+create_rtcp_src (GstRTPSession * rtpsession)
+{
+  rtpsession->rtcp_src =
+      gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
+
+  return rtpsession->rtcp_src;
+}
+
+static GstPad *
+gst_rtp_session_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name)
+{
+  GstRTPSession *rtpsession;
+  GstElementClass *klass;
+  GstPad *result;
+
+  g_return_val_if_fail (templ != NULL, NULL);
+  g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
+
+  rtpsession = GST_RTP_SESSION (element);
+  klass = GST_ELEMENT_GET_CLASS (element);
+
+  /* figure out the template */
+  if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
+    if (rtpsession->recv_rtp_sink != NULL)
+      goto exists;
+
+    result = create_recv_rtp_sink (rtpsession);
+  } else if (templ == gst_element_class_get_pad_template (klass,
+          "recv_rtcp_sink")) {
+    if (rtpsession->recv_rtcp_sink != NULL)
+      goto exists;
+
+    result = create_recv_rtcp_sink (rtpsession);
+  } else if (templ == gst_element_class_get_pad_template (klass,
+          "send_rtp_sink")) {
+    if (rtpsession->send_rtp_sink != NULL)
+      goto exists;
+
+    result = create_send_rtp_sink (rtpsession);
+  } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) {
+    if (rtpsession->rtcp_src != NULL)
+      goto exists;
+
+    result = create_rtcp_src (rtpsession);
+  } else
+    goto wrong_template;
+
+  return result;
+
+  /* ERRORS */
+wrong_template:
+  {
+    g_warning ("rtpsession: this is not our template");
+    return NULL;
+  }
+exists:
+  {
+    g_warning ("rtpsession: pad already requested");
+    return NULL;
+  }
+}
+
+static void
+gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
+{
+}
diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h
new file mode 100644 (file)
index 0000000..8b34306
--- /dev/null
@@ -0,0 +1,62 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_SESSION_H__
+#define __GST_RTP_SESSION_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_SESSION \
+  (gst_rtp_session_get_type())
+#define GST_RTP_SESSION(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_SESSION,GstRTPSession))
+#define GST_RTP_SESSION_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_SESSION,GstRTPSessionClass))
+#define GST_IS_RTP_SESSION(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION))
+#define GST_IS_RTP_SESSION_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION))
+
+typedef struct _GstRTPSession GstRTPSession;
+typedef struct _GstRTPSessionClass GstRTPSessionClass;
+typedef struct _GstRTPSessionPrivate GstRTPSessionPrivate;
+
+struct _GstRTPSession {
+  GstElement     element;
+
+  /*< private >*/
+  GstPad        *recv_rtp_sink;
+  GstPad        *recv_rtcp_sink;
+  GstPad        *send_rtp_sink;
+
+  GstPad        *recv_rtp_src;
+  GstPad        *sync_src;
+  GstPad        *send_rtp_src;
+  GstPad        *rtcp_src;
+
+  GstRTPSessionPrivate *priv;
+};
+
+struct _GstRTPSessionClass {
+  GstElementClass parent_class;
+};
+
+GType gst_rtp_session_get_type (void);
+
+#endif /* __GST_RTP_SESSION_H__ */