rtpsession: add locking for clear-pt-map
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
index 98912fb..04fca37 100644 (file)
  */
 
 /**
- * SECTION:element-gstrtpsession
- * @see_also: gstrtpjitterbuffer, gstrtpbin, gstrtpptdemux, gstrtpssrcdemux
+ * SECTION:element-rtpsession
+ * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
  *
- * The RTP session manager models one participant with a unique SSRC in an RTP
+ * The RTP session manager models participants with unique SSRC in an RTP
  * session. This session can be used to send and receive RTP and RTCP packets.
  * Based on what REQUEST pads are requested from the session manager, specific
  * functionality can be activated.
  *   <listitem>
  *     <para>Scheduling of RR/SR RTCP packets.</para>
  *   </listitem>
+ *   <listitem>
+ *     <para>Support for multiple sender SSRC.</para>
+ *   </listitem>
  * </itemizedlist>
  *
- * The gstrtpsession will not demux packets based on SSRC or payload type, nor will
+ * The rtpsession will not demux packets based on SSRC or payload type, nor will
  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
@@ -64,9 +67,8 @@
  * that should be sent to all participants in the session.
  *
  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
- * automatically create a send_rtp_src pad. The session manager will modify the
- * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
- * send_rtp_src pad after updating its internal state.
+ * automatically create a send_rtp_src pad. The session manager will
+ * forward the packets on the send_rtp_src pad after updating its internal state.
  *
  * The session manager needs the clock-rate of the payload types it is handling
  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
  * <refsect2>
  * <title>Example pipelines</title>
  * |[
- * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
+ * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
  * configured based on some negotiation process such as RTSP for this pipeline
  * to work correctly.
  * |[
- * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession name=session \
+ * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \
  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
  * configured based on some negotiation process such as RTSP for this pipeline
  * to work correctly.
  * |[
- * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession .send_rtp_src ! udpsink port=5000
+ * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000
  * ]| Send theora RTP packets through the session manager and out on UDP port
  * 5000.
  * |[
- * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession name=session .send_rtp_src \
+ * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \
  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
  * ]| Send theora RTP packets through the session manager and out on UDP port
  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
  * packets are sent in the PAUSED state). Applications should manually set and
  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
  * </refsect2>
- *
- * Last reviewed on 2007-05-28 (0.10.5)
  */
 
 #ifdef HAVE_CONFIG_H
 
 #include <gst/glib-compat-private.h>
 
-#include "gstrtpbin-marshal.h"
 #include "gstrtpsession.h"
 #include "rtpsession.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
 #define GST_CAT_DEFAULT gst_rtp_session_debug
 
+GType
+gst_rtp_ntp_time_source_get_type (void)
+{
+  static GType type = 0;
+  static const GEnumValue values[] = {
+    {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
+    {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
+    {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME,
+          "Running time based on pipeline clock",
+        "running-time"},
+    {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
+    {0, NULL, NULL},
+  };
+
+  if (!type) {
+    type = g_enum_register_static ("GstRtpNtpTimeSource", values);
+  }
+  return type;
+}
+
 /* sink pads */
 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
@@ -189,11 +208,13 @@ enum
   SIGNAL_ON_BYE_TIMEOUT,
   SIGNAL_ON_TIMEOUT,
   SIGNAL_ON_SENDER_TIMEOUT,
+  SIGNAL_ON_NEW_SENDER_SSRC,
+  SIGNAL_ON_SENDER_SSRC_ACTIVE,
   LAST_SIGNAL
 };
 
-#define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION        (RTP_STATS_BANDWIDTH * RTP_STATS_RTCP_FRACTION)
+#define DEFAULT_BANDWIDTH            0
+#define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
 #define DEFAULT_SDES                 NULL
@@ -202,6 +223,11 @@ enum
 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
+#define DEFAULT_MAX_DROPOUT_TIME     60000
+#define DEFAULT_MAX_MISORDER_TIME    2000
+#define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
+#define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
+#define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
 
 enum
 {
@@ -217,12 +243,14 @@ enum
   PROP_USE_PIPELINE_CLOCK,
   PROP_RTCP_MIN_INTERVAL,
   PROP_PROBATION,
-  PROP_LAST
+  PROP_MAX_DROPOUT_TIME,
+  PROP_MAX_MISORDER_TIME,
+  PROP_STATS,
+  PROP_RTP_PROFILE,
+  PROP_NTP_TIME_SOURCE,
+  PROP_RTCP_SYNC_SEND_TIME
 };
 
-#define GST_RTP_SESSION_GET_PRIVATE(obj)  \
-          (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
-
 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->priv->lock)
 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
 
@@ -250,6 +278,11 @@ struct _GstRtpSessionPrivate
   GstClockTime send_latency;
 
   gboolean use_pipeline_clock;
+  GstRtpNtpTimeSource ntp_time_source;
+  gboolean rtcp_sync_send_time;
+
+  guint recv_rtx_req_count;
+  guint sent_rtx_req_count;
 };
 
 /* callbacks to handle actions from the session manager */
@@ -264,10 +297,23 @@ static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
     gpointer user_data);
 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
-static void gst_rtp_session_request_key_unit (RTPSession * sess,
+static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
     gboolean all_headers, gpointer user_data);
 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
     gpointer user_data);
+static void gst_rtp_session_notify_nack (RTPSession * sess,
+    guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
+static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
+static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
+    gpointer user_data);
+static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
+    GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_session_chain_send_rtp_list (GstPad * pad,
+    GstObject * parent, GstBufferList * list);
 
 static RTPSessionCallbacks callbacks = {
   gst_rtp_session_process_rtp,
@@ -277,7 +323,10 @@ static RTPSessionCallbacks callbacks = {
   gst_rtp_session_clock_rate,
   gst_rtp_session_reconsider,
   gst_rtp_session_request_key_unit,
-  gst_rtp_session_request_time
+  gst_rtp_session_request_time,
+  gst_rtp_session_notify_nack,
+  gst_rtp_session_reconfigure,
+  gst_rtp_session_notify_early_rtcp
 };
 
 /* GObject vmethods */
@@ -301,6 +350,8 @@ static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad,
 
 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
 
+static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession);
+
 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
 
 static void
@@ -313,8 +364,39 @@ on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
 static void
 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
 {
+  GstPad *send_rtp_sink;
+
   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
       src->ssrc);
+
+  GST_RTP_SESSION_LOCK (sess);
+  if ((send_rtp_sink = sess->send_rtp_sink))
+    gst_object_ref (send_rtp_sink);
+  GST_RTP_SESSION_UNLOCK (sess);
+
+  if (send_rtp_sink) {
+    GstStructure *structure;
+    GstEvent *event;
+    RTPSource *internal_src;
+    guint32 suggested_ssrc;
+
+    structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT,
+        (guint) src->ssrc, NULL);
+
+    /* if there is no source using the suggested ssrc, most probably because
+     * this ssrc has just collided, suggest upstream to use it */
+    suggested_ssrc = rtp_session_suggest_ssrc (session, NULL);
+    internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc);
+    if (!internal_src)
+      gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT,
+          (guint) suggested_ssrc, NULL);
+    else
+      g_object_unref (internal_src);
+
+    event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);
+    gst_pad_push_event (send_rtp_sink, event);
+    gst_object_unref (send_rtp_sink);
+  }
 }
 
 static void
@@ -377,8 +459,30 @@ on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
       src->ssrc);
 }
 
+static void
+on_new_sender_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
+      src->ssrc);
+}
+
+static void
+on_sender_ssrc_active (RTPSession * session, RTPSource * src,
+    GstRtpSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
+      src->ssrc);
+}
+
+static void
+on_notify_stats (RTPSession * session, GParamSpec * spec,
+    GstRtpSession * rtpsession)
+{
+  g_object_notify (G_OBJECT (rtpsession), "stats");
+}
+
 #define gst_rtp_session_parent_class parent_class
-G_DEFINE_TYPE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
 
 static void
 gst_rtp_session_class_init (GstRtpSessionClass * klass)
@@ -389,8 +493,6 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
-
   gobject_class->finalize = gst_rtp_session_finalize;
   gobject_class->set_property = gst_rtp_session_set_property;
   gobject_class->get_property = gst_rtp_session_get_property;
@@ -405,8 +507,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
-      NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
-      G_TYPE_UINT);
+      NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT);
   /**
    * GstRtpSession::clear-pt-map:
    * @sess: the object which received the signal
@@ -415,8 +516,9 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
    */
   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
-      NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE);
 
   /**
    * GstRtpSession::on-new-ssrc:
@@ -454,7 +556,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
       G_TYPE_NONE, 1, G_TYPE_UINT);
   /**
-   * GstRtpSession::on-ssrc_active:
+   * GstRtpSession::on-ssrc-active:
    * @sess: the object which received the signal
    * @ssrc: the SSRC
    *
@@ -523,6 +625,35 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
           on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
       G_TYPE_NONE, 1, G_TYPE_UINT);
 
+  /**
+   * GstRtpSession::on-new-sender-ssrc:
+   * @sess: the object which received the signal
+   * @ssrc: the sender SSRC
+   *
+   * Notify of a new sender SSRC that entered @session.
+   *
+   * Since: 1.8
+   */
+  gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
+      g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
+      NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+
+  /**
+   * GstRtpSession::on-sender-ssrc-active:
+   * @sess: the object which received the signal
+   * @ssrc: the sender SSRC
+   *
+   * Notify of a sender SSRC that is active, i.e., sending RTCP.
+   *
+   * Since: 1.8
+   */
+  gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
+      g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
+          on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
+      G_TYPE_NONE, 1, G_TYPE_UINT);
+
   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
       g_param_spec_double ("bandwidth", "Bandwidth",
           "The bandwidth of the session in bytes per second (0 for auto-discover)",
@@ -571,9 +702,10 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
-          "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
+          "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
+          "(DEPRECATED: Use ntp-time-source property)",
           DEFAULT_USE_PIPELINE_CLOCK,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
 
   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
@@ -587,6 +719,62 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
           0, G_MAXUINT, DEFAULT_PROBATION,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
+      g_param_spec_uint ("max-dropout-time", "Max dropout time",
+          "The maximum time (milliseconds) of missing packets tolerated.",
+          0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
+      g_param_spec_uint ("max-misorder-time", "Max misorder time",
+          "The maximum time (milliseconds) of misordered packets tolerated.",
+          0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstRtpSession::stats:
+   *
+   * Various session statistics. This property returns a GstStructure
+   * with name application/x-rtp-session-stats with the following fields:
+   *
+   *  "recv-rtx-req-count  G_TYPE_UINT   The number of retransmission event
+   *      received from downstream (in receiver mode) (Since 1.16)
+   *  "sent-rtx-req-count" G_TYPE_UINT   The number of retransmission event
+   *      sent downstream (in sender mode) (Since 1.16)
+   *  "rtx-count"          G_TYPE_UINT   DEPRECATED Since 1.16, same as
+   *      "recv-rtx-req-count".
+   *  "rtx-drop-count"     G_TYPE_UINT   The number of retransmission events
+   *      dropped (due to bandwidth constraints)
+   *  "sent-nack-count"    G_TYPE_UINT   Number of NACKs sent
+   *  "recv-nack-count"    G_TYPE_UINT   Number of NACKs received
+   *  "source-stats"       G_TYPE_BOXED  GValueArray of #RTPSource::stats for all
+   *      RTP sources (Since 1.8)
+   *
+   * Since: 1.4
+   */
+  g_object_class_install_property (gobject_class, PROP_STATS,
+      g_param_spec_boxed ("stats", "Statistics",
+          "Various statistics", GST_TYPE_STRUCTURE,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
+      g_param_spec_enum ("rtp-profile", "RTP Profile",
+          "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
+      g_param_spec_enum ("ntp-time-source", "NTP Time Source",
+          "NTP time source for RTCP packets",
+          gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
+      g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
+          "Use send time or capture time for RTCP sync "
+          "(TRUE = send time, FALSE = capture time)",
+          DEFAULT_RTCP_SYNC_SEND_TIME,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
   gstelement_class->request_new_pad =
@@ -597,22 +785,22 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
 
   /* sink pads */
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_recv_rtp_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_recv_rtcp_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_send_rtp_sink_template);
 
   /* src pads */
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_sync_src_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_recv_rtp_src_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_sync_src_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_send_rtp_src_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &rtpsession_send_rtcp_src_template);
 
   gst_element_class_set_static_metadata (gstelement_class, "RTP Session",
       "Filter/Network/RTP",
@@ -620,17 +808,24 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
 
   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
       "rtpsession", 0, "RTP Session");
+
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
+
 }
 
 static void
 gst_rtp_session_init (GstRtpSession * rtpsession)
 {
-  rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
+  rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession);
   g_mutex_init (&rtpsession->priv->lock);
   g_cond_init (&rtpsession->priv->cond);
   rtpsession->priv->sysclock = gst_system_clock_obtain ();
   rtpsession->priv->session = rtp_session_new ();
   rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
+  rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
 
   /* configure callbacks */
   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
@@ -653,13 +848,26 @@ gst_rtp_session_init (GstRtpSession * rtpsession)
       (GCallback) on_timeout, rtpsession);
   g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
       (GCallback) on_sender_timeout, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-new-sender-ssrc",
+      (GCallback) on_new_sender_ssrc, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-sender-ssrc-active",
+      (GCallback) on_sender_ssrc_active, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "notify::stats",
+      (GCallback) on_notify_stats, rtpsession);
   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
       (GDestroyNotify) gst_caps_unref);
 
+  rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
+
   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
 
   rtpsession->priv->thread_stopped = TRUE;
+
+  rtpsession->priv->recv_rtx_req_count = 0;
+  rtpsession->priv->sent_rtx_req_count = 0;
+
+  rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
 }
 
 static void
@@ -716,6 +924,23 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
     case PROP_PROBATION:
       g_object_set_property (G_OBJECT (priv->session), "probation", value);
       break;
+    case PROP_MAX_DROPOUT_TIME:
+      g_object_set_property (G_OBJECT (priv->session), "max-dropout-time",
+          value);
+      break;
+    case PROP_MAX_MISORDER_TIME:
+      g_object_set_property (G_OBJECT (priv->session), "max-misorder-time",
+          value);
+      break;
+    case PROP_RTP_PROFILE:
+      g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value);
+      break;
+    case PROP_NTP_TIME_SOURCE:
+      priv->ntp_time_source = g_value_get_enum (value);
+      break;
+    case PROP_RTCP_SYNC_SEND_TIME:
+      priv->rtcp_sync_send_time = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -770,17 +995,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_PROBATION:
       g_object_get_property (G_OBJECT (priv->session), "probation", value);
       break;
+    case PROP_MAX_DROPOUT_TIME:
+      g_object_get_property (G_OBJECT (priv->session), "max-dropout-time",
+          value);
+      break;
+    case PROP_MAX_MISORDER_TIME:
+      g_object_get_property (G_OBJECT (priv->session), "max-misorder-time",
+          value);
+      break;
+    case PROP_STATS:
+      g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
+      break;
+    case PROP_RTP_PROFILE:
+      g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
+      break;
+    case PROP_NTP_TIME_SOURCE:
+      g_value_set_enum (value, priv->ntp_time_source);
+      break;
+    case PROP_RTCP_SYNC_SEND_TIME:
+      g_value_set_boolean (value, priv->rtcp_sync_send_time);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
 }
 
+static GstStructure *
+gst_rtp_session_create_stats (GstRtpSession * rtpsession)
+{
+  GstStructure *s;
+
+  g_object_get (rtpsession->priv->session, "stats", &s, NULL);
+  gst_structure_set (s, "rtx-count", G_TYPE_UINT,
+      rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT,
+      rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT,
+      rtpsession->priv->sent_rtx_req_count, NULL);
+
+  return s;
+}
+
 static void
 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
     guint64 * ntpnstime)
 {
-  guint64 ntpns;
+  guint64 ntpns = -1;
   GstClock *clock;
   GstClockTime base_time, rt, clock_time;
 
@@ -796,17 +1055,36 @@ get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
 
     if (rtpsession->priv->use_pipeline_clock) {
       ntpns = rt;
+      /* add constant to convert from 1970 based time to 1900 based time */
+      ntpns += (2208988800LL * GST_SECOND);
     } else {
-      GTimeVal current;
-
-      /* get current NTP time */
-      g_get_current_time (&current);
-      ntpns = GST_TIMEVAL_TO_TIME (current);
+      switch (rtpsession->priv->ntp_time_source) {
+        case GST_RTP_NTP_TIME_SOURCE_NTP:
+        case GST_RTP_NTP_TIME_SOURCE_UNIX:{
+          GTimeVal current;
+
+          /* get current NTP time */
+          g_get_current_time (&current);
+          ntpns = GST_TIMEVAL_TO_TIME (current);
+
+          /* add constant to convert from 1970 based time to 1900 based time */
+          if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
+            ntpns += (2208988800LL * GST_SECOND);
+          break;
+        }
+        case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
+          ntpns = rt;
+          break;
+        case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
+          ntpns = clock_time;
+          break;
+        default:
+          ntpns = -1;
+          g_assert_not_reached ();
+          break;
+      }
     }
 
-    /* add constant to convert from 1970 based time to 1900 based time */
-    ntpns += (2208988800LL * GST_SECOND);
-
     gst_object_unref (clock);
   } else {
     GST_OBJECT_UNLOCK (rtpsession);
@@ -819,6 +1097,17 @@ get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
     *ntpnstime = ntpns;
 }
 
+/* must be called with GST_RTP_SESSION_LOCK */
+static void
+signal_waiting_rtcp_thread_unlocked (GstRtpSession * rtpsession)
+{
+  if (rtpsession->priv->wait_send) {
+    GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
+    rtpsession->priv->wait_send = FALSE;
+    GST_RTP_SESSION_SIGNAL (rtpsession);
+  }
+}
+
 static void
 rtcp_thread (GstRtpSession * rtpsession)
 {
@@ -835,7 +1124,7 @@ rtcp_thread (GstRtpSession * rtpsession)
   GST_RTP_SESSION_LOCK (rtpsession);
 
   while (rtpsession->priv->wait_send) {
-    GST_LOG_OBJECT (rtpsession, "waiting for RTP thread");
+    GST_LOG_OBJECT (rtpsession, "waiting for getting started");
     GST_RTP_SESSION_WAIT (rtpsession);
     GST_LOG_OBJECT (rtpsession, "signaled...");
   }
@@ -939,8 +1228,7 @@ stop_rtcp_thread (GstRtpSession * rtpsession)
 
   GST_RTP_SESSION_LOCK (rtpsession);
   rtpsession->priv->stop_thread = TRUE;
-  rtpsession->priv->wait_send = FALSE;
-  GST_RTP_SESSION_SIGNAL (rtpsession);
+  signal_waiting_rtcp_thread_unlocked (rtpsession);
   if (rtpsession->priv->id)
     gst_clock_id_unschedule (rtpsession->priv->id);
   GST_RTP_SESSION_UNLOCK (rtpsession);
@@ -978,8 +1266,7 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       GST_RTP_SESSION_LOCK (rtpsession);
-      if (rtpsession->send_rtp_src)
-        rtpsession->priv->wait_send = TRUE;
+      rtpsession->priv->wait_send = TRUE;
       GST_RTP_SESSION_UNLOCK (rtpsession);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -1007,6 +1294,7 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       /* downstream is now releasing the dataflow and we can join. */
       join_rtcp_thread (rtpsession);
+      rtp_session_reset (rtpsession->priv->session);
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       break;
@@ -1031,7 +1319,9 @@ return_true (gpointer key, gpointer value, gpointer user_data)
 static void
 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
 {
+  GST_RTP_SESSION_LOCK (rtpsession);
   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
 }
 
 /* called when the session manager has an RTP packet or a list of packets
@@ -1078,11 +1368,7 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
   GST_RTP_SESSION_LOCK (rtpsession);
   if ((rtp_src = rtpsession->send_rtp_src))
     gst_object_ref (rtp_src);
-  if (rtpsession->priv->wait_send) {
-    GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
-    rtpsession->priv->wait_send = FALSE;
-    GST_RTP_SESSION_SIGNAL (rtpsession);
-  }
+  signal_waiting_rtcp_thread_unlocked (rtpsession);
   GST_RTP_SESSION_UNLOCK (rtpsession);
 
   if (rtp_src) {
@@ -1137,6 +1423,8 @@ do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
   GST_RTP_SESSION_UNLOCK (rtpsession);
 
   event = gst_event_new_stream_start (stream_id);
+  rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event);
+  gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
   if (have_group_id)
     gst_event_set_group_id (event, group_id);
   gst_pad_push_event (srcpad, event);
@@ -1148,6 +1436,7 @@ do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
 
   gst_segment_init (&seg, GST_FORMAT_TIME);
   event = gst_event_new_segment (&seg);
+  gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
   gst_pad_push_event (srcpad, event);
 }
 
@@ -1156,7 +1445,7 @@ do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
  * well. */
 static GstFlowReturn
 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
-    GstBuffer * buffer, gboolean eos, gpointer user_data)
+    GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data)
 {
   GstFlowReturn result;
   GstRtpSession *rtpsession;
@@ -1179,10 +1468,18 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
     GST_LOG_OBJECT (rtpsession, "sending RTCP");
     result = gst_pad_push (rtcp_src, buffer);
 
-    /* we have to send EOS after this packet */
-    if (eos) {
+    /* Forward send an EOS on the RTCP sink if we received an EOS on the
+     * send_rtp_sink. We don't need to check the recv_rtp_sink since in this
+     * case the EOS event would already have been sent */
+    if (all_sources_bye && rtpsession->send_rtp_sink &&
+        GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) {
+      GstEvent *event;
+
       GST_LOG_OBJECT (rtpsession, "sending EOS");
-      gst_pad_push_event (rtcp_src, gst_event_new_eos ());
+
+      event = gst_event_new_eos ();
+      gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
+      gst_pad_push_event (rtcp_src, event);
     }
     gst_object_unref (rtcp_src);
   } else {
@@ -1224,6 +1521,14 @@ gst_rtp_session_sync_rtcp (RTPSession * sess,
     gst_object_ref (sync_src);
     GST_RTP_SESSION_UNLOCK (rtpsession);
 
+    /* set rtcp caps on output pad, this happens
+     * when we receive RTCP muxed with RTP according
+     * to RFC5761. Otherwise we would have forwarded
+     * the events from the recv_rtcp_sink pad already
+     */
+    if (!gst_pad_has_current_caps (sync_src))
+      do_rtcp_events (rtpsession, sync_src);
+
     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
     result = gst_pad_push (sync_src, buffer);
     gst_object_unref (sync_src);
@@ -1398,6 +1703,7 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
     }
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+      rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
       break;
     case GST_EVENT_SEGMENT:
@@ -1420,6 +1726,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
       break;
     }
+    case GST_EVENT_EOS:
+    {
+      GstPad *rtcp_src;
+
+      ret =
+          gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event));
+
+      GST_RTP_SESSION_LOCK (rtpsession);
+      if ((rtcp_src = rtpsession->send_rtcp_src))
+        gst_object_ref (rtcp_src);
+      GST_RTP_SESSION_UNLOCK (rtpsession);
+
+      gst_event_unref (event);
+
+      if (rtcp_src) {
+        event = gst_event_new_eos ();
+        if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID)
+          gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
+        ret = gst_pad_push_event (rtcp_src, event);
+        gst_object_unref (rtcp_src);
+      } else {
+        ret = TRUE;
+      }
+      break;
+    }
     default:
       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
       break;
@@ -1455,7 +1786,7 @@ gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
 
     if (pli || fir)
       return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
-          gst_clock_get_time (rtpsession->priv->sysclock), fir, count);
+          fir, count);
   }
 
   return FALSE;
@@ -1489,6 +1820,37 @@ gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
         if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
                 all_headers, count))
           forward = FALSE;
+      } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
+        guint seqnum, delay, deadline, max_delay, avg_rtt;
+
+        GST_RTP_SESSION_LOCK (rtpsession);
+        rtpsession->priv->recv_rtx_req_count++;
+        GST_RTP_SESSION_UNLOCK (rtpsession);
+
+        if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+          ssrc = -1;
+        if (!gst_structure_get_uint (s, "seqnum", &seqnum))
+          seqnum = -1;
+        if (!gst_structure_get_uint (s, "delay", &delay))
+          delay = 0;
+        if (!gst_structure_get_uint (s, "deadline", &deadline))
+          deadline = 100;
+        if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt))
+          avg_rtt = 40;
+
+        /* remaining time to receive the packet */
+        max_delay = deadline;
+        if (max_delay > delay)
+          max_delay -= delay;
+        /* estimated RTT */
+        if (max_delay > avg_rtt)
+          max_delay -= avg_rtt;
+        else
+          max_delay = 0;
+
+        if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum,
+                max_delay * GST_MSECOND))
+          forward = FALSE;
       }
       break;
     default:
@@ -1545,6 +1907,8 @@ gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent)
     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
     g_value_unset (&val);
     gst_object_unref (otherpad);
+  } else {
+    it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
   }
 
   return it;
@@ -1573,26 +1937,32 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
   GstFlowReturn ret;
   GstClockTime current_time, running_time;
   GstClockTime timestamp;
+  guint64 ntpnstime;
 
   rtpsession = GST_RTP_SESSION (parent);
   priv = rtpsession->priv;
 
   GST_LOG_OBJECT (rtpsession, "received RTP packet");
 
+  GST_RTP_SESSION_LOCK (rtpsession);
+  signal_waiting_rtcp_thread_unlocked (rtpsession);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
   /* get NTP time when this packet was captured, this depends on the timestamp. */
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  timestamp = GST_BUFFER_PTS (buffer);
   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
     /* convert to running time using the segment values */
     running_time =
         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
         timestamp);
+    ntpnstime = GST_CLOCK_TIME_NONE;
   } else {
-    get_current_times (rtpsession, &running_time, NULL);
+    get_current_times (rtpsession, &running_time, &ntpnstime);
   }
   current_time = gst_clock_get_time (priv->sysclock);
 
   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
-      running_time);
+      running_time, ntpnstime);
   if (ret != GST_FLOW_OK)
     goto push_error;
 
@@ -1622,6 +1992,18 @@ gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
       GST_EVENT_TYPE_NAME (event));
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEGMENT:
+      /* Make sure that the sync_src pad has caps before the segment event.
+       * Otherwise we might get a segment event before caps from the receive
+       * RTCP pad, and then later when receiving RTCP packets will set caps.
+       * This will results in a sticky event misordering warning
+       */
+      if (!gst_pad_has_current_caps (rtpsession->sync_src)) {
+        GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp");
+        gst_pad_set_caps (rtpsession->sync_src, caps);
+        gst_caps_unref (caps);
+      }
+      /* fall through */
     default:
       ret = gst_pad_push_event (rtpsession->sync_src, event);
       break;
@@ -1640,6 +2022,7 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
   GstRtpSession *rtpsession;
   GstRtpSessionPrivate *priv;
   GstClockTime current_time;
+  GstClockTime running_time;
   guint64 ntpnstime;
 
   rtpsession = GST_RTP_SESSION (parent);
@@ -1647,10 +2030,15 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
 
   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
 
+  GST_RTP_SESSION_LOCK (rtpsession);
+  signal_waiting_rtcp_thread_unlocked (rtpsession);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
   current_time = gst_clock_get_time (priv->sysclock);
-  get_current_times (rtpsession, NULL, &ntpnstime);
+  get_current_times (rtpsession, &running_time, &ntpnstime);
 
-  rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
+  rtp_session_process_rtcp (priv->session, buffer, current_time, running_time,
+      ntpnstime);
 
   return GST_FLOW_OK;           /* always return OK */
 }
@@ -1762,6 +2150,7 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
        * because we stop sending. */
       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
+
       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
       rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream");
       rtp_session_schedule_bye (rtpsession->priv->session, current_time);
@@ -1823,18 +2212,26 @@ gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
   GstCaps *result;
   GstStructure *s1, *s2;
   guint ssrc;
+  gboolean is_random;
 
   priv = rtpsession->priv;
 
-  ssrc = rtp_session_get_internal_ssrc (priv->session);
+  ssrc = rtp_session_suggest_ssrc (priv->session, &is_random);
 
   /* we can basically accept anything but we prefer to receive packets with our
    * internal SSRC so that we don't have to patch it. Create a structure with
-   * the SSRC and another one without. */
-  s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc, NULL);
-  s2 = gst_structure_new_empty ("application/x-rtp");
-
-  result = gst_caps_new_full (s1, s2, NULL);
+   * the SSRC and another one without.
+   * Only do this if the session actually decided on an ssrc already,
+   * otherwise we give upstream the opportunity to select an ssrc itself */
+  if (!is_random) {
+    s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc,
+        NULL);
+    s2 = gst_structure_new_empty ("application/x-rtp");
+
+    result = gst_caps_new_full (s1, s2, NULL);
+  } else {
+    result = gst_caps_new_empty_simple ("application/x-rtp");
+  }
 
   if (filter) {
     GstCaps *caps = result;
@@ -1882,21 +2279,15 @@ gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
     GstCaps * caps)
 {
   GstRtpSessionPrivate *priv;
-  GstStructure *s = gst_caps_get_structure (caps, 0);
-  guint ssrc;
 
   priv = rtpsession->priv;
 
-  if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
-    GST_DEBUG_OBJECT (rtpsession, "setting internal SSRC to %08x", ssrc);
-    rtp_session_set_internal_ssrc (priv->session, ssrc);
-  }
   rtp_session_update_send_caps (priv->session, caps);
 
   return TRUE;
 }
 
-/* Recieve an RTP packet or a list of packets to be send to the receivers,
+/* Receive an RTP packet or a list of packets to be sent to the receivers,
  * send to RTP session manager and forward to send_rtp_src.
  */
 static GstFlowReturn
@@ -1916,15 +2307,15 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
   if (is_list) {
     GstBuffer *buffer = NULL;
 
-    /* All groups in an list have the same timestamp.
-     * So, just take it from the first group. */
+    /* All buffers in a list have the same timestamp.
+     * So, just take it from the first buffer. */
     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
     if (buffer)
-      timestamp = GST_BUFFER_TIMESTAMP (buffer);
+      timestamp = GST_BUFFER_PTS (buffer);
     else
       timestamp = -1;
   } else {
-    timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (data));
+    timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data));
   }
 
   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
@@ -1932,7 +2323,8 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
     running_time =
         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
         timestamp);
-    running_time += priv->send_latency;
+    if (priv->rtcp_sync_send_time)
+      running_time += priv->send_latency;
   } else {
     /* no timestamp. */
     running_time = -1;
@@ -1992,6 +2384,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
       gst_rtp_session_event_recv_rtp_sink);
   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
       gst_rtp_session_iterate_internal_links);
+  GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink);
   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
       rtpsession->recv_rtp_sink);
@@ -2106,6 +2499,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
       gst_rtp_session_event_send_rtp_sink);
   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
       gst_rtp_session_iterate_internal_links);
+  GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink);
+  GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink);
   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
       rtpsession->send_rtp_sink);
@@ -2117,6 +2512,7 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
       gst_rtp_session_iterate_internal_links);
   gst_pad_set_event_function (rtpsession->send_rtp_src,
       gst_rtp_session_event_send_rtp_src);
+  GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src);
   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
 
@@ -2231,13 +2627,13 @@ gst_rtp_session_request_new_pad (GstElement * element,
 wrong_template:
   {
     GST_RTP_SESSION_UNLOCK (rtpsession);
-    g_warning ("gstrtpsession: this is not our template");
+    g_warning ("rtpsession: this is not our template");
     return NULL;
   }
 exists:
   {
     GST_RTP_SESSION_UNLOCK (rtpsession);
-    g_warning ("gstrtpsession: pad already requested");
+    g_warning ("rtpsession: pad already requested");
     return NULL;
   }
 }
@@ -2275,14 +2671,14 @@ gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
 wrong_pad:
   {
     GST_RTP_SESSION_UNLOCK (rtpsession);
-    g_warning ("gstrtpsession: asked to release an unknown pad");
+    g_warning ("rtpsession: asked to release an unknown pad");
     return;
   }
 }
 
 static void
 gst_rtp_session_request_key_unit (RTPSession * sess,
-    gboolean all_headers, gpointer user_data)
+    guint32 ssrc, gboolean all_headers, gpointer user_data)
 {
   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
   GstEvent *event;
@@ -2295,7 +2691,7 @@ gst_rtp_session_request_key_unit (RTPSession * sess,
 
   if (send_rtp_sink) {
     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
-        gst_structure_new ("GstForceKeyUnit",
+        gst_structure_new ("GstForceKeyUnit", "ssrc", G_TYPE_UINT, ssrc,
             "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
     gst_pad_push_event (send_rtp_sink, event);
     gst_object_unref (send_rtp_sink);
@@ -2309,3 +2705,71 @@ gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
 
   return gst_clock_get_time (rtpsession->priv->sysclock);
 }
+
+static void
+gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
+    guint16 blp, guint32 ssrc, gpointer user_data)
+{
+  GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+  GstEvent *event;
+  GstPad *send_rtp_sink;
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if ((send_rtp_sink = rtpsession->send_rtp_sink))
+    gst_object_ref (send_rtp_sink);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  if (send_rtp_sink) {
+    while (TRUE) {
+      event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+          gst_structure_new ("GstRTPRetransmissionRequest",
+              "seqnum", G_TYPE_UINT, (guint) seqnum,
+              "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
+      gst_pad_push_event (send_rtp_sink, event);
+
+      GST_RTP_SESSION_LOCK (rtpsession);
+      rtpsession->priv->sent_rtx_req_count++;
+      GST_RTP_SESSION_UNLOCK (rtpsession);
+
+      if (blp == 0)
+        break;
+
+      seqnum++;
+      while ((blp & 1) == 0) {
+        seqnum++;
+        blp >>= 1;
+      }
+      blp >>= 1;
+    }
+    gst_object_unref (send_rtp_sink);
+  }
+}
+
+static void
+gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
+{
+  GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+  GstPad *send_rtp_sink;
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if ((send_rtp_sink = rtpsession->send_rtp_sink))
+    gst_object_ref (send_rtp_sink);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  if (send_rtp_sink) {
+    gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ());
+    gst_object_unref (send_rtp_sink);
+  }
+}
+
+static void
+gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data)
+{
+  GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+
+  GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP");
+  /* with an early RTCP request, we might have to start the RTCP thread */
+  GST_RTP_SESSION_LOCK (rtpsession);
+  signal_waiting_rtcp_thread_unlocked (rtpsession);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+}