configure.ac: Disable rtpmanager for now because it depends on CVS -base.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 18 Apr 2007 18:58:53 +0000 (18:58 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 18 Apr 2007 18:58:53 +0000 (18:58 +0000)
Original commit message from CVS:
* configure.ac:
Disable rtpmanager for now because it depends on CVS -base.
* gst/rtpmanager/Makefile.am:
Added new files for session manager.
* gst/rtpmanager/gstrtpjitterbuffer.h:
* gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
(create_stream), (pt_map_requested), (new_ssrc_pad_found):
Some cleanups.
the session manager can now also request a pt-map.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
(gst_rtp_session_class_init), (gst_rtp_session_init),
(gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread),
(stop_rtcp_thread), (gst_rtp_session_change_state),
(gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp),
(gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate),
(gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_event_recv_rtcp_sink),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_event_send_rtp_sink),
(gst_rtp_session_chain_send_rtp), (create_send_rtcp_src),
(gst_rtp_session_request_new_pad):
* gst/rtpmanager/gstrtpsession.h:
We can ask for pt-map now too when the session manager needs it.
Hook up to the new session manager, implement the needed callbacks for
pushing data, getting clock time and requesting clock-rates.
Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to
be send to clients.
Add code to start and stop the thread that will schedule RTCP through
the session manager.
* gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
(rtp_session_init), (rtp_session_finalize),
(rtp_session_set_property), (rtp_session_get_property),
(on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated),
(on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks),
(rtp_session_set_bandwidth), (rtp_session_get_bandwidth),
(rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth),
(source_push_rtp), (source_clock_rate), (check_collision),
(obtain_source), (rtp_session_add_source),
(rtp_session_get_num_sources),
(rtp_session_get_num_active_sources),
(rtp_session_get_source_by_ssrc),
(rtp_session_get_source_by_cname), (rtp_session_create_source),
(update_arrival_stats), (rtp_session_process_rtp),
(rtp_session_process_sr), (rtp_session_process_rr),
(rtp_session_process_sdes), (rtp_session_process_bye),
(rtp_session_process_app), (rtp_session_process_rtcp),
(rtp_session_send_rtp), (rtp_session_get_rtcp_interval),
(rtp_session_produce_rtcp):
* gst/rtpmanager/rtpsession.h:
The advanced beginnings of the main session manager that handles the
participant database of RTPSources, SSRC probation, SSRC collisions,
parse RTCP to update source stats. etc..
* gst/rtpmanager/rtpsource.c: (rtp_source_class_init),
(rtp_source_init), (rtp_source_finalize), (rtp_source_new),
(rtp_source_set_callbacks), (rtp_source_set_as_csrc),
(rtp_source_set_rtp_from), (rtp_source_set_rtcp_from),
(push_packet), (get_clock_rate), (calculate_jitter),
(rtp_source_process_rtp), (rtp_source_process_bye),
(rtp_source_send_rtp), (rtp_source_process_sr),
(rtp_source_process_rb):
* gst/rtpmanager/rtpsource.h:
Object that encapsulates an SSRC and its state in the database.
Calculates the jitter and transit times of data packets.
* gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
(rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter):
* gst/rtpmanager/rtpstats.h:
Various stats regarding the session and sources.
Used to calculate the RTCP interval.

14 files changed:
ChangeLog
common
configure.ac
gst/rtpmanager/Makefile.am
gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpjitterbuffer.h
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/gstrtpsession.h
gst/rtpmanager/rtpsession.c [new file with mode: 0644]
gst/rtpmanager/rtpsession.h [new file with mode: 0644]
gst/rtpmanager/rtpsource.c [new file with mode: 0644]
gst/rtpmanager/rtpsource.h [new file with mode: 0644]
gst/rtpmanager/rtpstats.c [new file with mode: 0644]
gst/rtpmanager/rtpstats.h [new file with mode: 0644]

index 8dddf1d..c94887c 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,81 @@
+2007-04-18  Wim Taymans  <wim@fluendo.com>
+
+       * configure.ac:
+       Disable rtpmanager for now because it depends on CVS -base.
+
+       * gst/rtpmanager/Makefile.am:
+       Added new files for session manager.
+
+       * gst/rtpmanager/gstrtpjitterbuffer.h:
+       * gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
+       (create_stream), (pt_map_requested), (new_ssrc_pad_found):
+       Some cleanups. 
+       the session manager can now also request a pt-map.
+
+       * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
+       (gst_rtp_session_class_init), (gst_rtp_session_init),
+       (gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread),
+       (stop_rtcp_thread), (gst_rtp_session_change_state),
+       (gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp),
+       (gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate),
+       (gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink),
+       (gst_rtp_session_chain_recv_rtp),
+       (gst_rtp_session_event_recv_rtcp_sink),
+       (gst_rtp_session_chain_recv_rtcp),
+       (gst_rtp_session_event_send_rtp_sink),
+       (gst_rtp_session_chain_send_rtp), (create_send_rtcp_src),
+       (gst_rtp_session_request_new_pad):
+       * gst/rtpmanager/gstrtpsession.h:
+       We can ask for pt-map now too when the session manager needs it.
+       Hook up to the new session manager, implement the needed callbacks for
+       pushing data, getting clock time and requesting clock-rates.
+       Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to
+       be send to clients.
+       Add code to start and stop the thread that will schedule RTCP through
+       the session manager.
+
+       * gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
+       (rtp_session_init), (rtp_session_finalize),
+       (rtp_session_set_property), (rtp_session_get_property),
+       (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated),
+       (on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks),
+       (rtp_session_set_bandwidth), (rtp_session_get_bandwidth),
+       (rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth),
+       (source_push_rtp), (source_clock_rate), (check_collision),
+       (obtain_source), (rtp_session_add_source),
+       (rtp_session_get_num_sources),
+       (rtp_session_get_num_active_sources),
+       (rtp_session_get_source_by_ssrc),
+       (rtp_session_get_source_by_cname), (rtp_session_create_source),
+       (update_arrival_stats), (rtp_session_process_rtp),
+       (rtp_session_process_sr), (rtp_session_process_rr),
+       (rtp_session_process_sdes), (rtp_session_process_bye),
+       (rtp_session_process_app), (rtp_session_process_rtcp),
+       (rtp_session_send_rtp), (rtp_session_get_rtcp_interval),
+       (rtp_session_produce_rtcp):
+       * gst/rtpmanager/rtpsession.h:
+       The advanced beginnings of the main session manager that handles the
+       participant database of RTPSources, SSRC probation, SSRC collisions,
+       parse RTCP to update source stats. etc..
+
+       * gst/rtpmanager/rtpsource.c: (rtp_source_class_init),
+       (rtp_source_init), (rtp_source_finalize), (rtp_source_new),
+       (rtp_source_set_callbacks), (rtp_source_set_as_csrc),
+       (rtp_source_set_rtp_from), (rtp_source_set_rtcp_from),
+       (push_packet), (get_clock_rate), (calculate_jitter),
+       (rtp_source_process_rtp), (rtp_source_process_bye),
+       (rtp_source_send_rtp), (rtp_source_process_sr),
+       (rtp_source_process_rb):
+       * gst/rtpmanager/rtpsource.h:
+       Object that encapsulates an SSRC and its state in the database.
+       Calculates the jitter and transit times of data packets.
+
+       * gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
+       (rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter):
+       * gst/rtpmanager/rtpstats.h:
+       Various stats regarding the session and sources.
+       Used to calculate the RTCP interval.
+
 2007-04-17  Tim-Philipp Müller  <tim at centricular dot net>
 
        * gst/app/Makefile.am:
diff --git a/common b/common
index 9097e25..e05f45f 160000 (submodule)
--- a/common
+++ b/common
@@ -1 +1 @@
-Subproject commit 9097e252e477e18182f08a032d8860bdee9a0416
+Subproject commit e05f45f13961b851501ca8938aa2049fa96c7b11
index f8499a4..97f46b5 100644 (file)
@@ -95,7 +95,6 @@ GST_PLUGINS_ALL="\
   nuvdemux \
   real \
   replaygain \
-  rtpmanager \
   spectrum \
   speed \
   qtdemux \
index f844e47..9e47cbd 100644 (file)
@@ -17,6 +17,9 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
                              gstrtpjitterbuffer.c \
                              gstrtpptdemux.c \
                              gstrtpssrcdemux.c \
+                             rtpsession.c      \
+                             rtpsource.c      \
+                             rtpstats.c      \
                              gstrtpsession.c
 
 nodist_libgstrtpmanager_la_SOURCES = \
@@ -28,11 +31,15 @@ noinst_HEADERS = gstrtpbin.h \
                 gstrtpjitterbuffer.h \
                  gstrtpptdemux.h \
                  gstrtpssrcdemux.h \
+                rtpsession.h  \
+                rtpsource.h  \
+                rtpstats.h  \
                 gstrtpsession.h
 
 libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS)
 libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS)
-libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@
+libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ \
+                             -lgstnetbuffer-@GST_MAJORMINOR@
 
 CLEANFILES = $(BUILT_SOURCES)
 
index 6825e9c..9162d76 100644 (file)
@@ -129,7 +129,7 @@ typedef struct _GstRTPBinClient GstRTPBinClient;
 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
 
 static GstCaps *pt_map_requested (GstElement * element, guint pt,
-    GstRTPBinStream * stream);
+    GstRTPBinSession * session);
 
 /* Manages the RTP stream for one SSRC.
  *
@@ -215,9 +215,9 @@ static GstRTPBinSession *
 create_session (GstRTPBin * rtpbin, gint id)
 {
   GstRTPBinSession *sess;
-  GstElement *elem, *demux;
+  GstElement *session, *demux;
 
-  if (!(elem = gst_element_factory_make ("rtpsession", NULL)))
+  if (!(session = gst_element_factory_make ("rtpsession", NULL)))
     goto no_session;
 
   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
@@ -227,13 +227,17 @@ create_session (GstRTPBin * rtpbin, gint id)
   sess->lock = g_mutex_new ();
   sess->id = id;
   sess->bin = rtpbin;
-  sess->session = elem;
+  sess->session = session;
   sess->demux = demux;
   sess->ptmap = g_hash_table_new (NULL, NULL);
   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
 
-  gst_bin_add (GST_BIN_CAST (rtpbin), elem);
-  gst_element_set_state (elem, GST_STATE_PLAYING);
+  /* provide clock_rate to the session manager when needed */
+  g_signal_connect (session, "request-pt-map",
+      (GCallback) pt_map_requested, sess);
+
+  gst_bin_add (GST_BIN_CAST (rtpbin), session);
+  gst_element_set_state (session, GST_STATE_PLAYING);
   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
   gst_element_set_state (demux, GST_STATE_PLAYING);
 
@@ -247,7 +251,7 @@ no_session:
   }
 no_demux:
   {
-    gst_object_unref (elem);
+    gst_object_unref (session);
     g_warning ("rtpbin: could not create rtpssrcdemux element");
     return NULL;
   }
@@ -351,7 +355,7 @@ create_stream (GstRTPBinSession * session, guint32 ssrc)
 
   /* provide clock_rate to the jitterbuffer when needed */
   g_signal_connect (buffer, "request-pt-map",
-      (GCallback) pt_map_requested, stream);
+      (GCallback) pt_map_requested, session);
 
   gst_bin_add (GST_BIN_CAST (session->bin), buffer);
   gst_element_set_state (buffer, GST_STATE_PLAYING);
@@ -590,14 +594,12 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,
 }
 
 static GstCaps *
-pt_map_requested (GstElement * element, guint pt, GstRTPBinStream * stream)
+pt_map_requested (GstElement * element, guint pt, GstRTPBinSession * session)
 {
   GstRTPBin *rtpbin;
-  GstRTPBinSession *session;
   GstCaps *caps;
 
-  rtpbin = stream->bin;
-  session = stream->session;
+  rtpbin = session->bin;
 
   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
       session->id);
@@ -647,7 +649,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
    * demuxer so that it can apply a proper caps on the buffers for the
    * depayloaders. */
   stream->demux_ptreq_sig = g_signal_connect (stream->demux,
-      "request-pt-map", (GCallback) pt_map_requested, stream);
+      "request-pt-map", (GCallback) pt_map_requested, session);
 
   GST_RTP_SESSION_UNLOCK (session);
 
index e101039..3cbcd62 100644 (file)
@@ -63,6 +63,7 @@ struct _GstRTPJitterBufferClass
 {
   GstElementClass parent_class;
 
+  /* signals */
   GstCaps* (*request_pt_map) (GstRTPJitterBuffer *buffer, guint pt);
 
   /*< private > */
index cdad7e9..03b0802 100644 (file)
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
+
+#include "gstrtpbin-marshal.h"
 #include "gstrtpsession.h"
+#include "rtpsession.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
 #define GST_CAT_DEFAULT gst_rtp_session_debug
@@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
-static GstStaticPadTemplate rtpsession_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src",
+static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
     GST_PAD_SRC,
     GST_PAD_REQUEST,
     GST_STATIC_CAPS ("application/x-rtcp")
@@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",
 /* signals and args */
 enum
 {
-  /* FILL ME */
+  SIGNAL_REQUEST_PT_MAP,
   LAST_SIGNAL
 };
 
@@ -123,6 +126,31 @@ enum
 struct _GstRTPSessionPrivate
 {
   GMutex *lock;
+  RTPSession *session;
+  /* thread for sending out RTCP */
+  GstClockID id;
+  gboolean stop_thread;
+  GThread *thread;
+};
+
+/* callbacks to handle actions from the session manager */
+static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
+    RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
+    RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
+    RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
+    gpointer user_data);
+static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
+    gpointer user_data);
+
+static RTPSessionCallbacks callbacks = {
+  gst_rtp_session_process_rtp,
+  gst_rtp_session_send_rtp,
+  gst_rtp_session_send_rtcp,
+  gst_rtp_session_clock_rate,
+  gst_rtp_session_get_time
 };
 
 /* GObject vmethods */
@@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name);
 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
 
-/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */
+static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
 
 GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
 
@@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass)
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
   gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&rtpsession_rtcp_src_template));
+      gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
 
   gst_element_class_set_details (element_class, &rtpsession_details);
 }
@@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
   gobject_class->set_property = gst_rtp_session_set_property;
   gobject_class->get_property = gst_rtp_session_get_property;
 
+  /**
+   * GstRTPSession::request-pt-map:
+   * @sess: the object which received the signal
+   * @pt: the pt
+   *
+   * Request the payload type as #GstCaps for @pt.
+   */
+  gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
+      g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map),
+      NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
+      G_TYPE_UINT);
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
   gstelement_class->request_new_pad =
@@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
 {
   rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
   rtpsession->priv->lock = g_mutex_new ();
+  rtpsession->priv->session = rtp_session_new ();
+  /* configure callbacks */
+  rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
 }
 
 static void
@@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object)
 
   rtpsession = GST_RTP_SESSION (object);
   g_mutex_free (rtpsession->priv->lock);
+  g_object_unref (rtpsession->priv->session);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
   }
 }
 
+static void
+rtcp_thread (GstRTPSession * rtpsession)
+{
+  GstClock *clock;
+  GstClockID id;
+
+  clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+  if (clock == NULL)
+    return;
+
+  GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  while (!rtpsession->priv->stop_thread) {
+    gdouble timeout;
+    GstClockTime target;
+
+    timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session);
+    GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout);
+
+    target = gst_clock_get_time (clock);
+    target += GST_SECOND * timeout;
+    id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target);
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+
+    gst_clock_id_wait (id, NULL);
+
+    GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout");
+
+    /* make the session manager produce RTCP, we ignore the result. */
+    rtp_session_produce_rtcp (rtpsession->priv->session);
+
+    GST_RTP_SESSION_LOCK (rtpsession);
+    gst_clock_id_unref (id);
+    rtpsession->priv->id = NULL;
+  }
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  gst_object_unref (clock);
+
+  GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
+}
+
+static gboolean
+start_rtcp_thread (GstRTPSession * rtpsession)
+{
+  GError *error = NULL;
+  gboolean res;
+
+  GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  rtpsession->priv->stop_thread = FALSE;
+  rtpsession->priv->thread =
+      g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  if (error != NULL) {
+    res = FALSE;
+    GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
+    g_error_free (error);
+  } else {
+    res = TRUE;
+  }
+  return res;
+}
+
+static void
+stop_rtcp_thread (GstRTPSession * rtpsession)
+{
+  GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  rtpsession->priv->stop_thread = TRUE;
+  if (rtpsession->priv->id)
+    gst_clock_id_unschedule (rtpsession->priv->id);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  g_thread_join (rtpsession->priv->thread);
+}
+
 static GstStateChangeReturn
 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
 {
@@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       break;
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      stop_rtcp_thread (rtpsession);
     default:
       break;
   }
@@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
   res = parent_class->change_state (element, transition);
 
   switch (transition) {
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      if (!start_rtcp_thread (rtpsession))
+        goto failed_thread;
+      break;
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
       break;
   }
   return res;
+
+  /* ERRORS */
+failed_thread:
+  {
+    return GST_STATE_CHANGE_FAILURE;
+  }
+}
+
+/* called when the session manager has an RTP packet ready for further
+ * processing */
+static GstFlowReturn
+gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
+    GstBuffer * buffer, gpointer user_data)
+{
+  GstFlowReturn result;
+  GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
+
+  rtpsession = GST_RTP_SESSION (user_data);
+  priv = rtpsession->priv;
+
+  if (rtpsession->recv_rtp_src) {
+    result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+  } else {
+    gst_buffer_unref (buffer);
+    result = GST_FLOW_OK;
+  }
+  return result;
+}
+
+/* called when the session manager has an RTP packet ready for further
+ * sending */
+static GstFlowReturn
+gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
+    GstBuffer * buffer, gpointer user_data)
+{
+  GstFlowReturn result;
+  GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
+
+  rtpsession = GST_RTP_SESSION (user_data);
+  priv = rtpsession->priv;
+
+  if (rtpsession->send_rtp_src) {
+    result = gst_pad_push (rtpsession->send_rtp_src, buffer);
+  } else {
+    gst_buffer_unref (buffer);
+    result = GST_FLOW_OK;
+  }
+  return result;
+}
+
+/* called when the session manager has an RTCP packet ready for further
+ * sending */
+static GstFlowReturn
+gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
+    GstBuffer * buffer, gpointer user_data)
+{
+  GstFlowReturn result;
+  GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
+
+  rtpsession = GST_RTP_SESSION (user_data);
+  priv = rtpsession->priv;
+
+  if (rtpsession->send_rtcp_src) {
+    result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
+  } else {
+    gst_buffer_unref (buffer);
+    result = GST_FLOW_OK;
+  }
+  return result;
+}
+
+
+/* called when the session manager needs the clock rate */
+static gint
+gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
+    gpointer user_data)
+{
+  gint result = -1;
+  GstRTPSession *rtpsession;
+  GValue ret = { 0 };
+  GValue args[2] = { {0}, {0} };
+  GstCaps *caps;
+  const GstStructure *caps_struct;
+
+  rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+  g_value_init (&args[0], GST_TYPE_ELEMENT);
+  g_value_set_object (&args[0], rtpsession);
+  g_value_init (&args[1], G_TYPE_UINT);
+  g_value_set_uint (&args[1], payload);
+
+  g_value_init (&ret, GST_TYPE_CAPS);
+  g_value_set_boxed (&ret, NULL);
+
+  g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
+      &ret);
+
+  caps = (GstCaps *) g_value_get_boxed (&ret);
+  if (!caps)
+    goto no_caps;
+
+  caps_struct = gst_caps_get_structure (caps, 0);
+  if (!gst_structure_get_int (caps_struct, "clock-rate", &result))
+    goto no_clock_rate;
+
+  return result;
+
+  /* ERRORS */
+no_caps:
+  {
+    GST_DEBUG_OBJECT (rtpsession, "could not get caps");
+    return -1;
+  }
+no_clock_rate:
+  {
+    GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps");
+    return -1;
+  }
+}
+
+/* called when the session manager needs the time of clock */
+static GstClockTime
+gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
+{
+  GstClockTime result;
+  GstRTPSession *rtpsession;
+  GstClock *clock;
+
+  rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+  clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+  if (clock) {
+    result = gst_clock_get_time (clock);
+    gst_object_unref (clock);
+  } else
+    result = GST_CLOCK_TIME_NONE;
+
+  return result;
 }
 
 static GstFlowReturn
 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
 {
   GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
   gboolean ret = FALSE;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
 
   GST_DEBUG_OBJECT (rtpsession, "received event %s",
       GST_EVENT_TYPE_NAME (event));
@@ -305,14 +580,15 @@ static GstFlowReturn
 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
 {
   GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
   GstFlowReturn ret;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
 
   GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
 
-  /* FIXME, do something */
-  ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+  ret = rtp_session_process_rtp (priv->session, buffer);
 
   gst_object_unref (rtpsession);
 
@@ -323,9 +599,11 @@ static GstFlowReturn
 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
 {
   GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
   gboolean ret = FALSE;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
 
   GST_DEBUG_OBJECT (rtpsession, "received event %s",
       GST_EVENT_TYPE_NAME (event));
@@ -347,14 +625,15 @@ static GstFlowReturn
 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
 {
   GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
   GstFlowReturn ret;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
 
-  /* FIXME, do something */
   GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
 
-  ret = gst_pad_push (rtpsession->sync_src, buffer);
+  ret = rtp_session_process_rtcp (priv->session, buffer);
 
   gst_object_unref (rtpsession);
 
@@ -365,9 +644,11 @@ static GstFlowReturn
 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
 {
   GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
   gboolean ret = FALSE;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
 
   GST_DEBUG_OBJECT (rtpsession, "received event");
 
@@ -388,14 +669,15 @@ static GstFlowReturn
 gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
 {
   GstRTPSession *rtpsession;
+  GstRTPSessionPrivate *priv;
   GstFlowReturn ret;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
 
   GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
 
-  /* FIXME, do something */
-  ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
+  ret = rtp_session_send_rtp (priv->session, buffer);
 
   gst_object_unref (rtpsession);
 
@@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession)
  * RTCP packets.
  */
 static GstPad *
-create_rtcp_src (GstRTPSession * rtpsession)
+create_send_rtcp_src (GstRTPSession * rtpsession)
 {
   GST_DEBUG_OBJECT (rtpsession, "creating pad");
 
-  rtpsession->rtcp_src =
-      gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
-  gst_pad_set_active (rtpsession->rtcp_src, TRUE);
-  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
+  rtpsession->send_rtcp_src =
+      gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
+      NULL);
+  gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
+  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+      rtpsession->send_rtcp_src);
 
-  return rtpsession->rtcp_src;
+  return rtpsession->send_rtcp_src;
 }
 
 static GstPad *
@@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element,
       goto exists;
 
     result = create_send_rtp_sink (rtpsession);
-  } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) {
-    if (rtpsession->rtcp_src != NULL)
+  } else if (templ == gst_element_class_get_pad_template (klass,
+          "send_rtcp_src")) {
+    if (rtpsession->send_rtcp_src != NULL)
       goto exists;
 
-    result = create_rtcp_src (rtpsession);
+    result = create_send_rtcp_src (rtpsession);
   } else
     goto wrong_template;
 
index 8b34306..25bbb6e 100644 (file)
@@ -32,6 +32,7 @@
   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION))
 #define GST_IS_RTP_SESSION_CLASS(klass) \
   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION))
+#define GST_RTP_SESSION_CAST(obj) ((GstRTPSession *)(obj))
 
 typedef struct _GstRTPSession GstRTPSession;
 typedef struct _GstRTPSessionClass GstRTPSessionClass;
@@ -48,13 +49,16 @@ struct _GstRTPSession {
   GstPad        *recv_rtp_src;
   GstPad        *sync_src;
   GstPad        *send_rtp_src;
-  GstPad        *rtcp_src;
+  GstPad        *send_rtcp_src;
 
   GstRTPSessionPrivate *priv;
 };
 
 struct _GstRTPSessionClass {
   GstElementClass parent_class;
+
+  /* signals */
+  GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt);
 };
 
 GType gst_rtp_session_get_type (void);
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c
new file mode 100644 (file)
index 0000000..2283dc9
--- /dev/null
@@ -0,0 +1,1026 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <string.h>
+
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+#include "rtpsession.h"
+
+GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
+#define GST_CAT_DEFAULT rtp_session_debug
+
+/* signals and args */
+enum
+{
+  SIGNAL_ON_NEW_SSRC,
+  SIGNAL_ON_SSRC_COLLISION,
+  SIGNAL_ON_SSRC_VALIDATED,
+  SIGNAL_ON_BYE_SSRC,
+  LAST_SIGNAL
+};
+
+#define RTP_DEFAULT_BANDWIDTH        64000.0
+#define RTP_DEFAULT_RTCP_BANDWIDTH   1000
+
+enum
+{
+  PROP_0
+};
+
+/* GObject vmethods */
+static void rtp_session_finalize (GObject * object);
+static void rtp_session_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void rtp_session_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
+
+G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
+
+static void
+rtp_session_class_init (RTPSessionClass * klass)
+{
+  GObjectClass *gobject_class;
+
+  gobject_class = (GObjectClass *) klass;
+
+  gobject_class->finalize = rtp_session_finalize;
+  gobject_class->set_property = rtp_session_set_property;
+  gobject_class->get_property = rtp_session_get_property;
+
+  /**
+   * RTPSession::on-new-ssrc:
+   * @session: the object which received the signal
+   * @src: the new RTPSource
+   *
+   * Notify of a new SSRC that entered @session.
+   */
+  rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
+      g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      G_TYPE_OBJECT);
+  /**
+   * RTPSession::on-ssrc_collision:
+   * @session: the object which received the signal
+   * @src: the #RTPSource that caused a collision
+   *
+   * Notify when we have an SSRC collision
+   */
+  rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
+      g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      G_TYPE_OBJECT);
+  /**
+   * RTPSession::on-ssrc_validated:
+   * @session: the object which received the signal
+   * @src: the new validated RTPSource
+   *
+   * Notify of a new SSRC that became validated.
+   */
+  rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
+      g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      G_TYPE_OBJECT);
+  /**
+   * RTPSession::on-bye-ssrc:
+   * @session: the object which received the signal
+   * @src: the RTPSource that went away
+   *
+   * Notify of an SSRC that became inactive because of a BYE packet.
+   */
+  rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
+      g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      G_TYPE_OBJECT);
+
+  GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
+}
+
+static void
+rtp_session_init (RTPSession * sess)
+{
+  sess->lock = g_mutex_new ();
+  sess->ssrcs =
+      g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref);
+  sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
+
+  /* create an SSRC for this session manager */
+  sess->source = rtp_session_create_source (sess);
+
+  rtp_stats_init_defaults (&sess->stats);
+
+  /* default UDP header length */
+  sess->header_len = 28;
+
+  GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
+}
+
+static void
+rtp_session_finalize (GObject * object)
+{
+  RTPSession *sess;
+
+  sess = RTP_SESSION_CAST (object);
+
+  g_mutex_free (sess->lock);
+  g_hash_table_unref (sess->ssrcs);
+  g_hash_table_unref (sess->cnames);
+  g_object_unref (sess->source);
+
+  G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
+}
+
+static void
+rtp_session_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  RTPSession *sess;
+
+  sess = RTP_SESSION (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+rtp_session_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  RTPSession *sess;
+
+  sess = RTP_SESSION (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+on_new_ssrc (RTPSession * sess, RTPSource * source)
+{
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
+}
+
+static void
+on_ssrc_collision (RTPSession * sess, RTPSource * source)
+{
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
+      source);
+}
+
+static void
+on_ssrc_validated (RTPSession * sess, RTPSource * source)
+{
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
+      source);
+}
+
+static void
+on_bye_ssrc (RTPSession * sess, RTPSource * source)
+{
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
+}
+
+/**
+ * rtp_session_new:
+ *
+ * Create a new session object.
+ *
+ * Returns: a new #RTPSession. g_object_unref() after usage.
+ */
+RTPSession *
+rtp_session_new (void)
+{
+  RTPSession *sess;
+
+  sess = g_object_new (RTP_TYPE_SESSION, NULL);
+
+  return sess;
+}
+
+/**
+ * rtp_session_set_callbacks:
+ * @sess: an #RTPSession
+ * @callbacks: callbacks to configure
+ * @user_data: user data passed in the callbacks
+ *
+ * Configure a set of callbacks to be notified of actions.
+ */
+void
+rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
+    gpointer user_data)
+{
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  sess->callbacks.process_rtp = callbacks->process_rtp;
+  sess->callbacks.send_rtp = callbacks->send_rtp;
+  sess->callbacks.send_rtcp = callbacks->send_rtcp;
+  sess->callbacks.clock_rate = callbacks->clock_rate;
+  sess->callbacks.get_time = callbacks->get_time;
+  sess->user_data = user_data;
+}
+
+/**
+ * rtp_session_set_bandwidth:
+ * @sess: an #RTPSession
+ * @bandwidth: the bandwidth allocated
+ *
+ * Set the session bandwidth in bytes per second.
+ */
+void
+rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
+{
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  sess->stats.bandwidth = bandwidth;
+}
+
+/**
+ * rtp_session_get_bandwidth:
+ * @sess: an #RTPSession
+ *
+ * Get the session bandwidth.
+ *
+ * Returns: the session bandwidth.
+ */
+gdouble
+rtp_session_get_bandwidth (RTPSession * sess)
+{
+  g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
+
+  return sess->stats.bandwidth;
+}
+
+/**
+ * rtp_session_set_rtcp_bandwidth:
+ * @sess: an #RTPSession
+ * @bandwidth: the RTCP bandwidth
+ *
+ * Set the bandwidth that should be used for RTCP
+ * messages. 
+ */
+void
+rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth)
+{
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  sess->stats.rtcp_bandwidth = bandwidth;
+}
+
+/**
+ * rtp_session_get_rtcp_bandwidth:
+ * @sess: an #RTPSession
+ *
+ * Get the session bandwidth used for RTCP.
+ *
+ * Returns: The bandwidth used for RTCP messages.
+ */
+gdouble
+rtp_session_get_rtcp_bandwidth (RTPSession * sess)
+{
+  g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
+
+  return sess->stats.rtcp_bandwidth;
+}
+
+static GstFlowReturn
+source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
+{
+  GstFlowReturn result = GST_FLOW_OK;
+
+  if (source == session->source) {
+    GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
+    if (session->callbacks.send_rtp)
+      result =
+          session->callbacks.send_rtp (session, source, buffer,
+          session->user_data);
+    else
+      gst_buffer_unref (buffer);
+  } else {
+    GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
+    if (session->callbacks.process_rtp)
+      result =
+          session->callbacks.process_rtp (session, source, buffer,
+          session->user_data);
+    else
+      gst_buffer_unref (buffer);
+  }
+  return result;
+}
+
+static gint
+source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
+{
+  gint result;
+
+  if (session->callbacks.clock_rate)
+    result = session->callbacks.clock_rate (session, pt, session->user_data);
+  else
+    result = -1;
+
+  GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
+
+  return result;
+}
+
+static RTPSourceCallbacks callbacks = {
+  (RTPSourcePushRTP) source_push_rtp,
+  (RTPSourceClockRate) source_clock_rate,
+};
+
+static gboolean
+check_collision (RTPSession * sess, RTPSource * source,
+    RTPArrivalStats * arrival)
+{
+  /* FIXME, do collision check */
+  return FALSE;
+}
+
+static RTPSource *
+obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+    RTPArrivalStats * arrival, gboolean rtp)
+{
+  RTPSource *source;
+
+  source = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc));
+  if (source == NULL) {
+    /* make new Source in probation and insert */
+    source = rtp_source_new (ssrc);
+
+    if (rtp)
+      source->probation = RTP_DEFAULT_PROBATION;
+    else
+      source->probation = 0;
+
+    /* store from address, if any */
+    if (arrival->have_address) {
+      if (rtp)
+        rtp_source_set_rtp_from (source, &arrival->address);
+      else
+        rtp_source_set_rtcp_from (source, &arrival->address);
+    }
+
+    /* configure a callback on the source */
+    rtp_source_set_callbacks (source, &callbacks, sess);
+
+    g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source);
+
+    /* we have one more source now */
+    sess->total_sources++;
+    *created = TRUE;
+  } else {
+    *created = FALSE;
+    /* check for collision, this updates the address when not previously set */
+    if (check_collision (sess, source, arrival))
+      on_ssrc_collision (sess, source);
+  }
+  return source;
+}
+
+/**
+ * rtp_session_add_source:
+ * @sess: a #RTPSession
+ * @src: #RTPSource to add
+ *
+ * Add @src to @session.
+ *
+ * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
+ * existed in the session.
+ */
+gboolean
+rtp_session_add_source (RTPSession * sess, RTPSource * src)
+{
+  gboolean result = FALSE;
+  RTPSource *find;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
+  g_return_val_if_fail (src != NULL, FALSE);
+
+  RTP_SESSION_LOCK (sess);
+  find = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (src->ssrc));
+  if (find == NULL) {
+    g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (src->ssrc), src);
+    /* we have one more source now */
+    sess->total_sources++;
+    result = TRUE;
+  }
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_get_num_sources:
+ * @sess: an #RTPSession
+ *
+ * Get the number of sources in @sess.
+ *
+ * Returns: The number of sources in @sess.
+ */
+gint
+rtp_session_get_num_sources (RTPSession * sess)
+{
+  gint result;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
+
+  RTP_SESSION_LOCK (sess);
+  result = sess->total_sources;
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_get_num_active_sources:
+ * @sess: an #RTPSession
+ *
+ * Get the number of active sources in @sess. A source is considered active when
+ * it has been validated and has not yet received a BYE RTCP message.
+ *
+ * Returns: The number of active sources in @sess.
+ */
+gint
+rtp_session_get_num_active_sources (RTPSession * sess)
+{
+  gint result;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
+
+  RTP_SESSION_LOCK (sess);
+  result = sess->stats.active_sources;
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_get_source_by_ssrc:
+ * @sess: an #RTPSession
+ * @ssrc: an SSRC
+ *
+ * Find the source with @ssrc in @sess.
+ *
+ * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
+ * g_object_unref() after usage.
+ */
+RTPSource *
+rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
+{
+  RTPSource *result;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
+
+  RTP_SESSION_LOCK (sess);
+  result = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc));
+  if (result)
+    g_object_ref (result);
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_get_source_by_cname:
+ * @sess: a #RTPSession
+ * @cname: an CNAME
+ *
+ * Find the source with @cname in @sess.
+ *
+ * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
+ * g_object_unref() after usage.
+ */
+RTPSource *
+rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
+{
+  RTPSource *result;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
+  g_return_val_if_fail (cname != NULL, NULL);
+
+  RTP_SESSION_LOCK (sess);
+  result = g_hash_table_lookup (sess->cnames, cname);
+  if (result)
+    g_object_ref (result);
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_create_source:
+ * @sess: an #RTPSession
+ *
+ * Create an #RTPSource for use in @sess. This function will create a source
+ * with an ssrc that is currently not used by any participants in the session.
+ *
+ * Returns: an #RTPSource.
+ */
+RTPSource *
+rtp_session_create_source (RTPSession * sess)
+{
+  guint32 ssrc;
+  RTPSource *source;
+
+  RTP_SESSION_LOCK (sess);
+  while (TRUE) {
+    ssrc = g_random_int ();
+
+    /* see if it exists in the session, we're done if it doesn't */
+    if (g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)) == NULL)
+      break;
+  }
+  source = rtp_source_new (ssrc);
+  g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source);
+  /* we have one more source now */
+  sess->total_sources++;
+  RTP_SESSION_UNLOCK (sess);
+
+  return source;
+}
+
+/* update the RTPArrivalStats structure with the current time and other bits
+ * about the current buffer we are handling.
+ * This function is typically called when a validated packet is received.
+ */
+static void
+update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
+    gboolean rtp, GstBuffer * buffer)
+{
+  /* get time or arrival */
+  if (sess->callbacks.get_time)
+    arrival->time = sess->callbacks.get_time (sess, sess->user_data);
+  else
+    arrival->time = GST_CLOCK_TIME_NONE;
+
+  /* update sizes */
+  arrival->bytes = GST_BUFFER_SIZE (buffer) + 28;
+  arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0);
+
+  /* for netbuffer we can store the IP address to check for collisions */
+  arrival->have_address = GST_IS_NETBUFFER (buffer);
+  if (arrival->have_address) {
+    GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
+
+    memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
+  }
+}
+
+/**
+ * rtp_session_process_rtp:
+ * @sess: and #RTPSession
+ * @buffer: an RTP buffer
+ *
+ * Process an RTP buffer in the session manager. This function takes ownership
+ * of @buffer.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
+{
+  GstFlowReturn result;
+  guint32 ssrc;
+  RTPSource *source;
+  gboolean created;
+  gboolean prevsender, prevactive;
+  RTPArrivalStats arrival;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+  if (!gst_rtp_buffer_validate (buffer))
+    goto invalid_packet;
+
+  /* update arrival stats */
+  update_arrival_stats (sess, &arrival, TRUE, buffer);
+
+  /* get SSRC and look up in session database */
+  ssrc = gst_rtp_buffer_get_ssrc (buffer);
+
+  RTP_SESSION_LOCK (sess);
+  source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
+
+  prevsender = RTP_SOURCE_IS_SENDER (source);
+  prevactive = RTP_SOURCE_IS_ACTIVE (source);
+
+  /* let source process the packet */
+  result = rtp_source_process_rtp (source, buffer, &arrival);
+
+  /* source became active */
+  if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
+    sess->stats.active_sources++;
+    GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
+        sess->stats.active_sources);
+    on_ssrc_validated (sess, source);
+  }
+  if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
+    sess->stats.sender_sources++;
+    GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
+        sess->stats.sender_sources);
+  }
+
+  if (created)
+    on_new_ssrc (sess, source);
+
+  /* for validated sources, we add the CSRCs as well */
+  if (source->validated) {
+    guint8 i, count;
+
+    count = gst_rtp_buffer_get_csrc_count (buffer);
+
+    for (i = 0; i < count; i++) {
+      guint32 csrc;
+      RTPSource *csrc_src;
+
+      csrc = gst_rtp_buffer_get_csrc (buffer, i);
+
+      /* get source */
+      csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+      if (created) {
+        GST_DEBUG ("created new CSRC: %08x", csrc);
+        rtp_source_set_as_csrc (csrc_src);
+        if (RTP_SOURCE_IS_ACTIVE (csrc_src))
+          sess->stats.active_sources++;
+        on_new_ssrc (sess, source);
+      }
+    }
+  }
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+
+  /* ERRORS */
+invalid_packet:
+  {
+    GST_DEBUG ("invalid RTP packet received");
+    return GST_FLOW_OK;
+  }
+}
+
+/* A Sender report contains statistics about how the sender is doing. This
+ * includes timing informataion about the relation between RTP and NTP
+ * timestamps is it using and the number of packets/bytes it sent to us.
+ *
+ * In this report is also included a set of report blocks related to how this
+ * sender is receiving data (in case we (or somebody else) is also sending stuff
+ * to it). This info includes the packet loss, jitter and seqnum. It also
+ * contains information to calculate the round trip time (LSR/DLSR).
+ */
+static void
+rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
+    RTPArrivalStats * arrival)
+{
+  guint32 senderssrc, rtptime, packet_count, octet_count;
+  guint64 ntptime;
+  guint count, i;
+  RTPSource *source;
+  gboolean created;
+
+  gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
+      &packet_count, &octet_count);
+
+  RTP_SESSION_LOCK (sess);
+  source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+
+  /* first update the source */
+  rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count);
+
+  if (created)
+    on_new_ssrc (sess, source);
+
+  count = gst_rtcp_packet_get_rb_count (packet);
+  for (i = 0; i < count; i++) {
+    guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
+    guint8 fractionlost;
+    gint32 packetslost;
+
+    gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
+        &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+
+    if (ssrc == sess->source->ssrc) {
+      /* only deal with report blocks for our session, we update the stats of
+       * the sender of the TCP message. We could also compare our stats against
+       * the other sender to see if we are better or worse. */
+      rtp_source_process_rb (source, fractionlost, packetslost,
+          exthighestseq, jitter, lsr, dlsr);
+    }
+  }
+  RTP_SESSION_UNLOCK (sess);
+}
+
+/* A receiver report contains statistics about how a receiver is doing. It
+ * includes stuff like packet loss, jitter and the seqnum it received last. It
+ * also contains info to calculate the round trip time.
+ *
+ * We are only interested in how the sender of this report is doing wrt to us.
+ */
+static void
+rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
+    RTPArrivalStats * arrival)
+{
+  guint32 senderssrc;
+  guint count, i;
+  RTPSource *source;
+  gboolean created;
+
+  senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
+
+  GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
+
+  RTP_SESSION_LOCK (sess);
+  source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+
+  if (created)
+    on_new_ssrc (sess, source);
+
+  count = gst_rtcp_packet_get_rb_count (packet);
+  for (i = 0; i < count; i++) {
+    guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
+    guint8 fractionlost;
+    gint32 packetslost;
+
+    gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
+        &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+
+    if (ssrc == sess->source->ssrc) {
+      rtp_source_process_rb (source, fractionlost, packetslost,
+          exthighestseq, jitter, lsr, dlsr);
+    }
+  }
+  RTP_SESSION_UNLOCK (sess);
+}
+
+/* FIXME, we're just printing this for now... */
+static void
+rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
+    RTPArrivalStats * arrival)
+{
+  guint chunks, i, j;
+  gboolean more_chunks, more_items;
+
+  chunks = gst_rtcp_packet_sdes_get_chunk_count (packet);
+  GST_DEBUG ("got SDES packet with %d chunks", chunks);
+
+  more_chunks = gst_rtcp_packet_sdes_first_chunk (packet);
+  i = 0;
+  while (more_chunks) {
+    guint32 ssrc;
+
+    ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
+
+    GST_DEBUG ("chunk %d, SSRC %08x", i, ssrc);
+
+    more_items = gst_rtcp_packet_sdes_first_item (packet);
+    j = 0;
+    while (more_items) {
+      GstRTCPSDESType type;
+      guint8 len;
+      gchar *data;
+
+      gst_rtcp_packet_sdes_get_item (packet, &type, &len, &data);
+
+      GST_DEBUG ("item %d, type %d, len %d, data %s", j, type, len, data);
+
+      more_items = gst_rtcp_packet_sdes_next_item (packet);
+      j++;
+    }
+    more_chunks = gst_rtcp_packet_sdes_next_chunk (packet);
+    i++;
+  }
+}
+
+/* BYE is sent when a client leaves the session
+ */
+static void
+rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
+    RTPArrivalStats * arrival)
+{
+  guint count, i;
+  gchar *reason;
+
+  reason = gst_rtcp_packet_bye_get_reason (packet);
+  GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
+
+  count = gst_rtcp_packet_bye_get_ssrc_count (packet);
+  for (i = 0; i < count; i++) {
+    guint32 ssrc;
+    RTPSource *source;
+    gboolean created, prevactive, prevsender;
+
+    ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
+    GST_DEBUG ("SSRC: %08x", ssrc);
+
+    /* find src and mark bye, no probation when dealing with RTCP */
+    RTP_SESSION_LOCK (sess);
+    source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+
+    prevactive = RTP_SOURCE_IS_ACTIVE (source);
+    prevsender = RTP_SOURCE_IS_SENDER (source);
+
+    /* let the source handle the rest */
+    rtp_source_process_bye (source, reason);
+
+    if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
+      sess->stats.active_sources--;
+      GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
+          sess->stats.active_sources);
+    }
+    if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
+      sess->stats.sender_sources--;
+      GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
+          sess->stats.sender_sources);
+    }
+
+    if (created)
+      on_new_ssrc (sess, source);
+
+    on_bye_ssrc (sess, source);
+    RTP_SESSION_UNLOCK (sess);
+  }
+  g_free (reason);
+}
+
+static void
+rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
+    RTPArrivalStats * arrival)
+{
+  GST_DEBUG ("received APP");
+}
+
+/**
+ * rtp_session_process_rtcp:
+ * @sess: and #RTPSession
+ * @buffer: an RTCP buffer
+ *
+ * Process an RTCP buffer in the session manager.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
+{
+  GstRTCPPacket packet;
+  gboolean more;
+  RTPArrivalStats arrival;
+  guint size;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+  if (!gst_rtcp_buffer_validate (buffer))
+    goto invalid_packet;
+
+  /* update arrival stats */
+  update_arrival_stats (sess, &arrival, FALSE, buffer);
+
+  GST_DEBUG ("received RTCP packet");
+
+  /* get packet size including header overhead */
+  size = GST_BUFFER_SIZE (buffer) + sess->header_len;
+
+  /* update average RTCP packet size */
+  if (sess->stats.avg_rtcp_packet_size == 0)
+    sess->stats.avg_rtcp_packet_size = size;
+  else
+    sess->stats.avg_rtcp_packet_size =
+        (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4;
+
+  /* start processing the compound packet */
+  more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
+  while (more) {
+    switch (gst_rtcp_packet_get_type (&packet)) {
+      case GST_RTCP_TYPE_SR:
+        rtp_session_process_sr (sess, &packet, &arrival);
+        break;
+      case GST_RTCP_TYPE_RR:
+        rtp_session_process_rr (sess, &packet, &arrival);
+        break;
+      case GST_RTCP_TYPE_SDES:
+        rtp_session_process_sdes (sess, &packet, &arrival);
+        break;
+      case GST_RTCP_TYPE_BYE:
+        rtp_session_process_bye (sess, &packet, &arrival);
+        break;
+      case GST_RTCP_TYPE_APP:
+        rtp_session_process_app (sess, &packet, &arrival);
+        break;
+      default:
+        GST_WARNING ("got unknown RTCP packet");
+        break;
+    }
+    more = gst_rtcp_packet_move_to_next (&packet);
+  }
+
+  gst_buffer_unref (buffer);
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+invalid_packet:
+  {
+    GST_DEBUG ("invalid RTCP packet received");
+    return GST_FLOW_OK;
+  }
+}
+
+/**
+ * rtp_session_send_rtp:
+ * @sess: and #RTPSession
+ * @buffer: an RTP buffer
+ *
+ * Send the RTP buffer in the session manager.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
+{
+  GstFlowReturn result;
+  RTPSource *source;
+  gboolean prevsender;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+  source = sess->source;
+
+  prevsender = RTP_SOURCE_IS_SENDER (source);
+
+  /* we use our own source to send */
+  result = rtp_source_send_rtp (sess->source, buffer);
+
+  if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
+    sess->stats.sender_sources++;
+
+  return result;
+}
+
+/**
+ * rtp_session_get_rtcp_interval:
+ * @sess: an #RTPSession
+ *
+ * Get the interval for sending out the next RTCP packet
+ *
+ * Returns: an interval in seconds.
+ */
+gdouble
+rtp_session_get_rtcp_interval (RTPSession * sess)
+{
+  gdouble result;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+
+  RTP_SESSION_LOCK (sess);
+  result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE);
+  result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_produce_rtcp:
+ * @sess: an #RTPSession
+ *
+ * Instruct the session manager to generate RTCP packets with current stats.
+ * This function will call the #RTPSessionSendRTCP callback, possibly multiple
+ * times, for each packet that should be processed.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_produce_rtcp (RTPSession * sess)
+{
+  /* FIXME: implement me */
+  return GST_FLOW_NOT_SUPPORTED;
+}
diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h
new file mode 100644 (file)
index 0000000..46062c9
--- /dev/null
@@ -0,0 +1,206 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_SESSION_H__
+#define __RTP_SESSION_H__
+
+#include <gst/gst.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+#include "rtpsource.h"
+
+typedef struct _RTPSession RTPSession;
+typedef struct _RTPSessionClass RTPSessionClass;
+
+#define RTP_TYPE_SESSION             (rtp_session_get_type())
+#define RTP_SESSION(sess)            (G_TYPE_CHECK_INSTANCE_CAST((sess),RTP_TYPE_SESSION,RTPSession))
+#define RTP_SESSION_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SESSION,RTPSessionClass))
+#define RTP_IS_SESSION(sess)         (G_TYPE_CHECK_INSTANCE_TYPE((sess),RTP_TYPE_SESSION))
+#define RTP_IS_SESSION_CLASS(klass)  (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SESSION))
+#define RTP_SESSION_CAST(sess)       ((RTPSession *)(sess))
+
+#define RTP_SESSION_LOCK(sess)     (g_mutex_lock ((sess)->lock))
+#define RTP_SESSION_UNLOCK(sess)   (g_mutex_unlock ((sess)->lock))
+
+/**
+ * RTPSessionProcessRTP:
+ * @sess: an #RTPSession
+ * @src: the #RTPSource
+ * @buffer: the RTP buffer ready for processing
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess has @buffer ready for further
+ * processing. Processing the buffer typically includes decoding and displaying
+ * the buffer.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSessionSendRTP:
+ * @sess: an #RTPSession
+ * @src: the #RTPSource
+ * @buffer: the RTP buffer ready for sending
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess has @buffer ready for sending to
+ * all listening participants in this session.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSessionSendRTCP:
+ * @sess: an #RTPSession
+ * @src: the #RTPSource
+ * @buffer: the RTCP buffer ready for sending
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess has @buffer ready for sending to
+ * all listening participants in this session.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSessionClockRate:
+ * @sess: an #RTPSession
+ * @payload: the payload
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess needs the clock-rate of @payload.
+ *
+ * Returns: the clock-rate of @pt.
+ */
+typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data);
+
+/**
+ * RTPSessionGetTime:
+ * @sess: an #RTPSession
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess needs the current time in
+ * nanoseconds.
+ *
+ * Returns: a #GstClockTime with the current time in nanoseconds.
+ */
+typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data);
+
+/**
+ * RTPSessionCallbacks:
+ * @RTPSessionProcessRTP: callback to process RTP packets
+ * @RTPSessionSendRTP: callback for sending RTP packets
+ * @RTPSessionSendRTCP: callback for sending RTCP packets
+ * @RTPSessionGetTime: callback for returning the current time
+ *
+ * These callbacks can be installed on the session manager to get notification
+ * when RTP and RTCP packets are ready for further processing. These callbacks
+ * are not implemented with signals for performance reasons.
+ */
+typedef struct {
+  RTPSessionProcessRTP  process_rtp;
+  RTPSessionSendRTP     send_rtp;
+  RTPSessionSendRTCP    send_rtcp;
+  RTPSessionClockRate   clock_rate;
+  RTPSessionGetTime     get_time;
+} RTPSessionCallbacks;
+
+/**
+ * RTPSession:
+ * @lock: lock to protect the session
+ * @source: the source of this session
+ * @ssrcs: Hashtable of sources indexed by SSRC
+ * @cnames: Hashtable of sources indexed by CNAME
+ * @num_sources: the number of sources
+ * @activecount: the number of active sources
+ * @callbacks: callbacks
+ * @user_data: user data passed in callbacks
+ *
+ * The RTP session manager object
+ */
+struct _RTPSession {
+  GObject       object;
+
+  GMutex       *lock;
+
+  guint         header_len;
+
+  RTPSource    *source;
+  GHashTable   *ssrcs;
+  GHashTable   *cnames;
+  guint         total_sources;
+
+  RTPSessionCallbacks callbacks;
+  gpointer            user_data;
+
+  RTPSessionStats stats;
+};
+
+/**
+ * RTPSessionClass:
+ * @on_new_ssrc: emited when a new source is found
+ * @on_bye_ssrc: emited when a source is gone
+ *
+ * The session class.
+ */
+struct _RTPSessionClass {
+  GObjectClass   parent_class;
+
+  /* signals */
+  void (*on_new_ssrc)       (RTPSession *sess, RTPSource *source);
+  void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source);
+  void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source);
+  void (*on_bye_ssrc)       (RTPSession *sess, RTPSource *source);
+};
+
+GType rtp_session_get_type (void);
+
+/* create and configure */
+RTPSession*     rtp_session_new           (void);
+void            rtp_session_set_callbacks          (RTPSession *sess, 
+                                                   RTPSessionCallbacks *callbacks,
+                                                    gpointer user_data);
+void            rtp_session_set_bandwidth          (RTPSession *sess, gdouble bandwidth);
+gdouble         rtp_session_get_bandwidth          (RTPSession *sess);
+void            rtp_session_set_rtcp_fraction      (RTPSession *sess, gdouble fraction);
+gdouble         rtp_session_get_rtcp_fraction      (RTPSession *sess);
+
+/* handling sources */
+gboolean        rtp_session_add_source             (RTPSession *sess, RTPSource *src);
+gint            rtp_session_get_num_sources        (RTPSession *sess);
+gint            rtp_session_get_num_active_sources (RTPSession *sess);
+RTPSource*      rtp_session_get_source_by_ssrc     (RTPSession *sess, guint32 ssrc);
+RTPSource*      rtp_session_get_source_by_cname    (RTPSession *sess, const gchar *cname);
+RTPSource*      rtp_session_create_source          (RTPSession *sess);
+
+/* processing packets from receivers */
+GstFlowReturn   rtp_session_process_rtp            (RTPSession *sess, GstBuffer *buffer);
+GstFlowReturn   rtp_session_process_rtcp           (RTPSession *sess, GstBuffer *buffer);
+
+/* processing packets for sending */
+GstFlowReturn   rtp_session_send_rtp               (RTPSession *sess, GstBuffer *buffer);
+
+/* get interval for next RTCP interval */
+gdouble         rtp_session_get_rtcp_interval      (RTPSession *sess);
+GstFlowReturn   rtp_session_produce_rtcp           (RTPSession *sess);
+
+#endif /* __RTP_SESSION_H__ */
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c
new file mode 100644 (file)
index 0000000..36f5438
--- /dev/null
@@ -0,0 +1,477 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+#include <string.h>
+
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+
+#include "rtpsource.h"
+
+GST_DEBUG_CATEGORY_STATIC (rtp_source_debug);
+#define GST_CAT_DEFAULT rtp_source_debug
+
+#define RTP_MAX_PROBATION_LEN  32
+
+/* signals and args */
+enum
+{
+  LAST_SIGNAL
+};
+
+enum
+{
+  PROP_0
+};
+
+/* GObject vmethods */
+static void rtp_source_finalize (GObject * object);
+
+/* static guint rtp_source_signals[LAST_SIGNAL] = { 0 }; */
+
+G_DEFINE_TYPE (RTPSource, rtp_source, G_TYPE_OBJECT);
+
+static void
+rtp_source_class_init (RTPSourceClass * klass)
+{
+  GObjectClass *gobject_class;
+
+  gobject_class = (GObjectClass *) klass;
+
+  gobject_class->finalize = rtp_source_finalize;
+
+  GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
+}
+
+static void
+rtp_source_init (RTPSource * src)
+{
+  /* sources are initialy on probation until we receive enough valid RTP
+   * packets or a valid RTCP packet */
+  src->validated = FALSE;
+  src->probation = RTP_DEFAULT_PROBATION;
+
+  src->payload = 0;
+  src->clock_rate = -1;
+  src->packets = g_queue_new ();
+
+  src->stats.jitter = 0;
+  src->stats.transit = -1;
+  src->stats.curr_sr = 0;
+  src->stats.curr_rr = 0;
+}
+
+static void
+rtp_source_finalize (GObject * object)
+{
+  RTPSource *src;
+  GstBuffer *buffer;
+
+  src = RTP_SOURCE_CAST (object);
+
+  while ((buffer = g_queue_pop_head (src->packets)))
+    gst_buffer_unref (buffer);
+  g_queue_free (src->packets);
+
+  G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object);
+}
+
+/**
+ * rtp_source_new:
+ * @ssrc: an SSRC
+ *
+ * Create a #RTPSource with @ssrc.
+ *
+ * Returns: a new #RTPSource. Use g_object_unref() after usage.
+ */
+RTPSource *
+rtp_source_new (guint32 ssrc)
+{
+  RTPSource *src;
+
+  src = g_object_new (RTP_TYPE_SOURCE, NULL);
+  src->ssrc = ssrc;
+
+  return src;
+}
+
+/**
+ * rtp_source_set_callbacks:
+ * @src: an #RTPSource
+ * @cb: callback functions
+ * @user_data: user data
+ *
+ * Set the callbacks for the source.
+ */
+void
+rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb,
+    gpointer user_data)
+{
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  src->callbacks.push_rtp = cb->push_rtp;
+  src->callbacks.clock_rate = cb->clock_rate;
+  src->user_data = user_data;
+}
+
+/**
+ * rtp_source_set_as_csrc:
+ * @src: an #RTPSource
+ *
+ * Configure @src as a CSRC, this will validate the RTpSource.
+ */
+void
+rtp_source_set_as_csrc (RTPSource * src)
+{
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  src->validated = TRUE;
+  src->is_csrc = TRUE;
+}
+
+/**
+ * rtp_source_set_rtp_from:
+ * @src: an #RTPSource
+ * @address: the RTP address to set
+ *
+ * Set that @src is receiving RTP packets from @address. This is used for
+ * collistion checking.
+ */
+void
+rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address)
+{
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  src->have_rtp_from = TRUE;
+  memcpy (&src->rtp_from, address, sizeof (GstNetAddress));
+}
+
+/**
+ * rtp_source_set_rtcp_from:
+ * @src: an #RTPSource
+ * @address: the RTCP address to set
+ *
+ * Set that @src is receiving RTCP packets from @address. This is used for
+ * collistion checking.
+ */
+void
+rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address)
+{
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  src->have_rtcp_from = TRUE;
+  memcpy (&src->rtcp_from, address, sizeof (GstNetAddress));
+}
+
+static GstFlowReturn
+push_packet (RTPSource * src, GstBuffer * buffer)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  /* push queued packets first if any */
+  while (!g_queue_is_empty (src->packets)) {
+    GstBuffer *buffer = GST_BUFFER_CAST (g_queue_pop_head (src->packets));
+
+    GST_DEBUG ("pushing queued packet");
+    if (src->callbacks.push_rtp)
+      src->callbacks.push_rtp (src, buffer, src->user_data);
+    else
+      gst_buffer_unref (buffer);
+  }
+  GST_DEBUG ("pushing new packet");
+  /* push packet */
+  if (src->callbacks.push_rtp)
+    ret = src->callbacks.push_rtp (src, buffer, src->user_data);
+  else
+    gst_buffer_unref (buffer);
+
+  return ret;
+}
+
+static gint
+get_clock_rate (RTPSource * src, guint8 payload)
+{
+  if (payload != src->payload) {
+    gint clock_rate = -1;
+
+    if (src->callbacks.clock_rate)
+      clock_rate = src->callbacks.clock_rate (src, payload, src->user_data);
+
+    GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate);
+
+    src->clock_rate = clock_rate;
+    src->payload = payload;
+  }
+  return src->clock_rate;
+}
+
+static void
+calculate_jitter (RTPSource * src, GstBuffer * buffer,
+    RTPArrivalStats * arrival)
+{
+  GstClockTime current;
+  guint32 rtparrival, transit, rtptime;
+  gint32 diff;
+  gint clock_rate;
+  guint8 pt;
+
+  /* get arrival time */
+  if ((current = arrival->time) == GST_CLOCK_TIME_NONE)
+    goto no_time;
+
+  pt = gst_rtp_buffer_get_payload_type (buffer);
+
+  /* get clockrate */
+  if ((clock_rate = get_clock_rate (src, pt)) == -1)
+    goto no_clock_rate;
+
+  rtptime = gst_rtp_buffer_get_timestamp (buffer);
+
+  /* convert arrival time to RTP timestamp units */
+  rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND);
+
+  /* transit time is difference with RTP timestamp */
+  transit = rtparrival - rtptime;
+  /* get diff with previous transit time */
+  if (src->stats.transit != -1)
+    diff = transit - src->stats.transit;
+  else
+    diff = 0;
+  src->stats.transit = transit;
+  if (diff < 0)
+    diff = -diff;
+  /* update jitter */
+  src->stats.jitter += diff - ((src->stats.jitter + 8) >> 4);
+
+  src->stats.prev_rtptime = src->stats.last_rtptime;
+  src->stats.last_rtptime = rtparrival;
+
+  GST_DEBUG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %u",
+      rtparrival, rtptime, clock_rate, diff, src->stats.jitter);
+
+  return;
+
+  /* ERRORS */
+no_time:
+  {
+    GST_WARNING ("cannot get current time");
+    return;
+  }
+no_clock_rate:
+  {
+    GST_WARNING ("cannot get clock-rate for pt %d", pt);
+    return;
+  }
+}
+
+/**
+ * rtp_source_process_rtp:
+ * @src: an #RTPSource
+ * @buffer: an RTP buffer
+ *
+ * Let @src handle the incomming RTP @buffer.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
+    RTPArrivalStats * arrival)
+{
+  GstFlowReturn result = GST_FLOW_OK;
+
+  g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+  /* if we are still on probation, check seqnum */
+  if (src->probation) {
+    guint16 seqnr, expected;
+
+    expected = src->stats.max_seqnr + 1;
+
+    /* when in probation, we require consecutive seqnums */
+    seqnr = gst_rtp_buffer_get_seq (buffer);
+    if (seqnr == expected) {
+      /* expected packet */
+      src->probation--;
+      src->stats.max_seqnr = seqnr;
+
+      GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
+    } else {
+      GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected);
+      src->probation = RTP_DEFAULT_PROBATION;
+      src->stats.max_seqnr = seqnr;
+    }
+  }
+  if (src->probation) {
+    GstBuffer *q;
+
+    GST_DEBUG ("probation %d: queue buffer", src->probation);
+    /* when still in probation, keep packets in a list. */
+    g_queue_push_tail (src->packets, buffer);
+    /* remove packets from queue if there are too many */
+    while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
+      q = g_queue_pop_head (src->packets);
+      gst_object_unref (q);
+    }
+  } else {
+    /* we are not in probation */
+    src->stats.octetsreceived += arrival->payload_len;
+    src->stats.bytesreceived += arrival->bytes;
+    src->stats.packetsreceived++;
+    src->is_sender = TRUE;
+
+    GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
+        src->stats.packetsreceived, src->stats.octetsreceived);
+
+    /* calculate jitter */
+    calculate_jitter (src, buffer, arrival);
+
+    /* we're ready to push the RTP packet now */
+    result = push_packet (src, buffer);
+  }
+  return result;
+}
+
+/**
+ * rtp_source_process_bye:
+ * @src: an #RTPSource
+ * @reason: the reason for leaving
+ *
+ * Notify @src that a BYE packet has been received. This will make the source
+ * inactive.
+ */
+void
+rtp_source_process_bye (RTPSource * src, const gchar * reason)
+{
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc,
+      GST_STR_NULL (reason));
+
+  /* copy the reason and mark as received_bye */
+  g_free (src->bye_reason);
+  src->bye_reason = g_strdup (reason);
+  src->received_bye = TRUE;
+}
+
+/**
+ * rtp_source_send_rtp:
+ * @src: an #RTPSource
+ * @buffer: an RTP buffer
+ *
+ * Send an RTP @buffer originating from @src. This will make @src a sender.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
+{
+  GstFlowReturn result = GST_FLOW_OK;
+
+  g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+  /* we are a sender now */
+  src->is_sender = TRUE;
+
+  /* push packet */
+  if (src->callbacks.push_rtp)
+    result = src->callbacks.push_rtp (src, buffer, src->user_data);
+  else
+    gst_buffer_unref (buffer);
+
+  return result;
+}
+
+/**
+ * rtp_source_process_sr:
+ * @src: an #RTPSource
+ * @ntptime: the NTP time
+ * @rtptime: the RTP time
+ * @packet_count: the packet count
+ * @octet_count: the octect count
+ *
+ * Update the sender report in @src.
+ */
+void
+rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
+    guint32 packet_count, guint32 octet_count)
+{
+  RTPSenderReport *curr;
+  gint curridx;
+
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  GST_DEBUG ("got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT
+      ", RTP %u, PC %u, OC %u", src->ssrc, ntptime, rtptime, packet_count,
+      octet_count);
+
+  curridx = src->stats.curr_sr ^ 1;
+  curr = &src->stats.sr[curridx];
+
+  /* update current */
+  curr->is_valid = TRUE;
+  curr->ntptime = ntptime;
+  curr->rtptime = rtptime;
+  curr->packet_count = packet_count;
+  curr->octet_count = octet_count;
+
+  /* make current */
+  src->stats.curr_sr = curridx;
+}
+
+/**
+ * rtp_source_process_rb:
+ * @src: an #RTPSource
+ * @fractionlost: fraction lost since last SR/RR
+ * @packetslost: the cumululative number of packets lost
+ * @exthighestseq: the extended last sequence number received
+ * @jitter: the interarrival jitter
+ * @lsr: the last SR packet from this source
+ * @dlsr: the delay since last SR packet
+ *
+ * Update the report block in @src.
+ */
+void
+rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost,
+    guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr)
+{
+  RTPReceiverReport *curr;
+  gint curridx;
+
+  g_return_if_fail (RTP_IS_SOURCE (src));
+
+  GST_DEBUG ("got RB packet %d: SSRC %08x, FL %u"
+      ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", src->ssrc, fractionlost,
+      packetslost, exthighestseq, jitter, lsr, dlsr);
+
+  curridx = src->stats.curr_rr ^ 1;
+  curr = &src->stats.rr[curridx];
+
+  /* update current */
+  curr->is_valid = TRUE;
+  curr->fractionlost = fractionlost;
+  curr->packetslost = packetslost;
+  curr->exthighestseq = exthighestseq;
+  curr->jitter = jitter;
+  curr->lsr = lsr;
+  curr->dlsr = dlsr;
+
+  /* make current */
+  src->stats.curr_rr = curridx;
+}
diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h
new file mode 100644 (file)
index 0000000..d4ae6f5
--- /dev/null
@@ -0,0 +1,162 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_SOURCE_H__
+#define __RTP_SOURCE_H__
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+#include "rtpstats.h"
+
+/* the default number of consecutive RTP packets we need to receive before the
+ * source is considered valid */
+#define RTP_NO_PROBATION        0
+#define RTP_DEFAULT_PROBATION   2
+
+typedef struct _RTPSource RTPSource;
+typedef struct _RTPSourceClass RTPSourceClass;
+
+#define RTP_TYPE_SOURCE             (rtp_source_get_type())
+#define RTP_SOURCE(src)             (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_SOURCE,RTPSource))
+#define RTP_SOURCE_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SOURCE,RTPSourceClass))
+#define RTP_IS_SOURCE(src)          (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_SOURCE))
+#define RTP_IS_SOURCE_CLASS(klass)  (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SOURCE))
+#define RTP_SOURCE_CAST(src)        ((RTPSource *)(src))
+
+/**
+ * RTP_SOURCE_IS_ACTIVE:
+ * @src: an #RTPSource
+ *
+ * Check if @src is active. A source is active when it has been validated
+ * and has not yet received a BYE packet.
+ */
+#define RTP_SOURCE_IS_ACTIVE(src)  (src->validated && !src->received_bye)
+
+/**
+ * RTP_SOURCE_IS_SENDER:
+ * @src: an #RTPSource
+ *
+ * Check if @src is a sender.
+ */
+#define RTP_SOURCE_IS_SENDER(src)  (src->is_sender)
+
+/**
+ * RTPSourcePushRTP:
+ * @src: an #RTPSource
+ * @buffer: the RTP buffer ready for processing
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @src has @buffer ready for further
+ * processing.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data);
+
+/**
+ * RTPSourceClockRate:
+ * @src: an #RTPSource
+ * @payload: a payload type
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @src needs the clock-rate of the
+ * @payload.
+ *
+ * Returns: a clock-rate for @payload.
+ */
+typedef gint (*RTPSourceClockRate) (RTPSource *src, guint8 payload, gpointer user_data);
+
+/**
+ * RTPSourceCallbacks:
+ * @push_rtp: a packet becomes available for handling
+ * @clock_rate: a clock-rate is requested
+ * @get_time: the current clock time is requested
+ *
+ * Callbacks performed by #RTPSource when actions need to be performed.
+ */
+typedef struct {
+  RTPSourcePushRTP     push_rtp;
+  RTPSourceClockRate   clock_rate;
+} RTPSourceCallbacks;
+
+/**
+ * RTPSource:
+ *
+ * A source in the #RTPSession
+ */
+struct _RTPSource {
+  GObject       object;
+
+  /*< private >*/
+  RTPSourceCallbacks callbacks;
+  gpointer           user_data;
+
+  guint32       ssrc;
+  gchar        *cname;
+  gint          probation;
+  gboolean      validated;
+  gboolean      received_bye;
+  gchar        *bye_reason;
+
+  gboolean      is_csrc;
+  gboolean      is_sender;
+
+  gboolean      have_rtp_from;
+  GstNetAddress rtp_from;
+  gboolean      have_rtcp_from;
+  GstNetAddress rtcp_from;
+
+  guint8        payload;
+  gint          clock_rate;
+
+  GQueue       *packets;
+
+  RTPSourceStats stats;
+};
+
+struct _RTPSourceClass {
+  GObjectClass   parent_class;
+};
+
+GType rtp_source_get_type (void);
+
+/* managing lifetime of sources */
+RTPSource*      rtp_source_new            (guint32 ssrc);
+
+void            rtp_source_set_callbacks  (RTPSource *src, RTPSourceCallbacks *cb, gpointer data);
+void            rtp_source_set_as_csrc    (RTPSource *src);
+
+void            rtp_source_set_rtp_from   (RTPSource *src, GstNetAddress *address);
+void            rtp_source_set_rtcp_from  (RTPSource *src, GstNetAddress *address);
+
+GstFlowReturn   rtp_source_process_rtp    (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival);
+
+GstFlowReturn   rtp_source_send_rtp       (RTPSource *src, GstBuffer *buffer);
+
+/* RTCP messages */
+void            rtp_source_process_bye    (RTPSource *src, const gchar *reason);
+void            rtp_source_process_sr     (RTPSource *src, guint64 ntptime, guint32 rtptime,
+                                           guint32 packet_count, guint32 octet_count);
+void            rtp_source_process_rb     (RTPSource *src, guint8 fractionlost, gint32 packetslost,
+                                           guint32 exthighestseq, guint32 jitter,
+                                           guint32 lsr, guint32 dlsr);
+
+#endif /* __RTP_SOURCE_H__ */
diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c
new file mode 100644 (file)
index 0000000..b9076ea
--- /dev/null
@@ -0,0 +1,111 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "rtpstats.h"
+
+/**
+ * rtp_stats_init_defaults:
+ * @stats: an #RTPSessionStats struct
+ *
+ * Initialize @stats with its default values.
+ */
+void
+rtp_stats_init_defaults (RTPSessionStats * stats)
+{
+  stats->bandwidth = RTP_STATS_BANDWIDTH;
+  stats->sender_fraction = RTP_STATS_SENDER_FRACTION;
+  stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION;
+  stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH;
+  stats->min_interval = RTP_STATS_MIN_INTERVAL;
+}
+
+/**
+ * rtp_stats_calculate_rtcp_interval:
+ * @stats: an #RTPSessionStats struct
+ * 
+ * Calculate the RTCP interval. The result of this function is the amount of
+ * time to wait (in seconds) before sender a new RTCP message.
+ *
+ * Returns: the RTCP interval.
+ */
+gdouble
+rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender)
+{
+  gdouble active, senders, receivers, sfraction;
+  gboolean avg_rtcp;
+  gdouble interval;
+
+  active = stats->active_sources;
+  /* Try to avoid division by zero */
+  if (stats->active_sources == 0)
+    active += 1.0;
+
+  senders = (gdouble) stats->sender_sources;
+  receivers = (gdouble) (active - senders);
+  avg_rtcp = (gdouble) stats->avg_rtcp_packet_size;
+
+  sfraction = senders / active;
+
+  GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f",
+      senders, receivers, avg_rtcp, sfraction);
+
+  if (sfraction <= stats->sender_fraction) {
+    if (sender) {
+      interval =
+          (avg_rtcp * senders) / (stats->sender_fraction *
+          stats->rtcp_bandwidth);
+    } else {
+      interval =
+          (avg_rtcp * receivers) / ((1.0 -
+              stats->sender_fraction) * stats->rtcp_bandwidth);
+    }
+  } else {
+    interval = (avg_rtcp * active) / stats->rtcp_bandwidth;
+  }
+
+  if (interval < stats->min_interval)
+    interval = stats->min_interval;
+
+  if (!stats->sent_rtcp)
+    interval /= 2.0;
+
+  return interval;
+}
+
+/**
+ * rtp_stats_calculate_rtcp_interval:
+ * @stats: an #RTPSessionStats struct
+ * @interval: an RTCP interval
+ * 
+ * Apply a random jitter to the @interval. @interval is typically obtained with
+ * rtp_stats_calculate_rtcp_interval().
+ *
+ * Returns: the new RTCP interval.
+ */
+gdouble
+rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval)
+{
+  /* see RFC 3550 p 30 
+   * To compensate for "unconditional reconsideration" converging to a
+   * value below the intended average.
+   */
+#define COMPENSATION  (2.71828 - 1.5);
+
+  return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION;
+}
diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h
new file mode 100644 (file)
index 0000000..66aa7bf
--- /dev/null
@@ -0,0 +1,161 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __RTP_STATS_H__
+#define __RTP_STATS_H__
+
+#include <gst/gst.h>
+#include <gst/netbuffer/gstnetbuffer.h>
+
+/**
+ * RTPSenderReport:
+ *
+ * A sender report structure.
+ */
+typedef struct {
+  gboolean is_valid;
+  guint64 ntptime;
+  guint32 rtptime;
+  guint32 packet_count;
+  guint32 octet_count;
+} RTPSenderReport;
+
+/**
+ * RTPReceiverReport:
+ *
+ * A receiver report structure.
+ */
+typedef struct {
+  gboolean is_valid;
+  guint32 ssrc; /* who the report is from */
+  guint8  fractionlost;
+  guint32 packetslost;
+  guint32 exthighestseq;
+  guint32 jitter;
+  guint32 lsr;
+  guint32 dlsr;
+} RTPReceiverReport;
+
+/**
+ * RTPArrivalStats:
+ * @time: arrival time of a packet
+ * @address: address of the sender of the packet
+ * @bytes: bytes of the packet including lowlevel overhead
+ * @payload_len: bytes of the RTP payload
+ *
+ * Structure holding information about the arrival stats of a packet.
+ */
+typedef struct {
+  GstClockTime  time;
+  gboolean      have_address;
+  GstNetAddress address;
+  guint         bytes;
+  guint         payload_len;
+} RTPArrivalStats;
+
+/**
+ * RTPSourceStats:
+ * @packetsreceived: number of received packets in total
+ * @prevpacketsreceived: number of packets received in previous reporting
+ *                       interval
+ * @octetsreceived: number of payload bytes received
+ * @bytesreceived: number of total bytes received including headers and lower
+ *                 protocol level overhead
+ * @max_seqnr: highest sequence number received
+ * @transit: previous transit time used for calculating @jitter
+ * @jitter: current jitter
+ * @prev_rtptime: previous time when an RTP packet was received
+ * @prev_rtcptime: previous time when an RTCP packet was received
+ * @last_rtptime: time when last RTP packet received
+ * @last_rtcptime: time when last RTCP packet received
+ * @curr_rr: index of current @rr block
+ * @rr: previous and current receiver report block
+ * @curr_sr: index of current @sr block
+ * @sr: previous and current sender report block
+ *
+ * Stats about a source.
+ */
+typedef struct {
+  guint64      packetsreceived;
+  guint64      prevpacketsreceived;
+  guint64      octetsreceived;
+  guint64      bytesreceived;
+  guint16      max_seqnr;
+  guint32      transit;
+  guint32      jitter;
+
+  /* when we received stuff */
+  GstClockTime prev_rtptime;
+  GstClockTime prev_rtcptime;
+  GstClockTime last_rtptime;
+  GstClockTime last_rtcptime;
+
+  /* sender and receiver reports */
+  gint              curr_rr;
+  RTPReceiverReport rr[2];
+  gint              curr_sr;
+  RTPSenderReport   sr[2];
+} RTPSourceStats;
+
+#define RTP_STATS_BANDWIDTH           64000.0
+#define RTP_STATS_RTCP_BANDWIDTH      3000.0
+/*
+ * Minimum average time between RTCP packets from this site (in
+ * seconds).  This time prevents the reports from `clumping' when
+ * sessions are small and the law of large numbers isn't helping
+ * to smooth out the traffic.  It also keeps the report interval
+ * from becoming ridiculously small during transient outages like
+ * a network partition.
+ */
+#define RTP_STATS_MIN_INTERVAL      5.0
+ /*
+ * Fraction of the RTCP bandwidth to be shared among active
+ * senders.  (This fraction was chosen so that in a typical
+ * session with one or two active senders, the computed report
+ * time would be roughly equal to the minimum report time so that
+ * we don't unnecessarily slow down receiver reports.) The
+ * receiver fraction must be 1 - the sender fraction.
+ */
+#define RTP_STATS_SENDER_FRACTION       (0.25)
+#define RTP_STATS_RECEIVER_FRACTION     (1.0 - RTP_STATS_SENDER_FRACTION)
+
+/**
+ * RTPSessionStats:
+ *
+ * Stats kept for a session and used to produce RTCP packet timeouts.
+ */
+typedef struct {
+  gdouble       bandwidth;
+  gdouble       sender_fraction;
+  gdouble       receiver_fraction;
+  gdouble       rtcp_bandwidth;
+  gdouble       min_interval;
+  guint         sender_sources;
+  guint         active_sources;
+  guint         avg_rtcp_packet_size;
+  guint         avg_bye_packet_size;
+  gboolean      sent_rtcp;
+} RTPSessionStats;
+
+void           rtp_stats_init_defaults               (RTPSessionStats *stats);
+
+gdouble        rtp_stats_calculate_rtcp_interval    (RTPSessionStats *stats, gboolean sender);
+gdouble        rtp_stats_add_rtcp_jitter            (RTPSessionStats *stats, gdouble interval);
+
+#endif /* __RTP_STATS_H__ */