Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
index 4e8a732..77c5594 100644 (file)
@@ -42,50 +42,55 @@ enum
   SIGNAL_ON_BYE_TIMEOUT,
   SIGNAL_ON_TIMEOUT,
   SIGNAL_ON_SENDER_TIMEOUT,
+  SIGNAL_ON_SENDING_RTCP,
+  SIGNAL_ON_FEEDBACK_RTCP,
+  SIGNAL_SEND_RTCP,
   LAST_SIGNAL
 };
 
 #define DEFAULT_INTERNAL_SOURCE      NULL
 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
-#define DEFAULT_SDES_CNAME           NULL
-#define DEFAULT_SDES_NAME            NULL
-#define DEFAULT_SDES_EMAIL           NULL
-#define DEFAULT_SDES_PHONE           NULL
-#define DEFAULT_SDES_LOCATION        NULL
-#define DEFAULT_SDES_TOOL            NULL
-#define DEFAULT_SDES_NOTE            NULL
+#define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
+#define DEFAULT_RTCP_RR_BANDWIDTH    -1
+#define DEFAULT_RTCP_RS_BANDWIDTH    -1
+#define DEFAULT_RTCP_MTU             1400
+#define DEFAULT_SDES                 NULL
 #define DEFAULT_NUM_SOURCES          0
 #define DEFAULT_NUM_ACTIVE_SOURCES   0
 #define DEFAULT_SOURCES              NULL
+#define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
+#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
 
 enum
 {
   PROP_0,
+  PROP_INTERNAL_SSRC,
   PROP_INTERNAL_SOURCE,
   PROP_BANDWIDTH,
   PROP_RTCP_FRACTION,
-  PROP_SDES_CNAME,
-  PROP_SDES_NAME,
-  PROP_SDES_EMAIL,
-  PROP_SDES_PHONE,
-  PROP_SDES_LOCATION,
-  PROP_SDES_TOOL,
-  PROP_SDES_NOTE,
+  PROP_RTCP_RR_BANDWIDTH,
+  PROP_RTCP_RS_BANDWIDTH,
+  PROP_RTCP_MTU,
+  PROP_SDES,
   PROP_NUM_SOURCES,
   PROP_NUM_ACTIVE_SOURCES,
   PROP_SOURCES,
+  PROP_FAVOR_NEW,
+  PROP_RTCP_MIN_INTERVAL,
+  PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
   PROP_LAST
 };
 
-/* update average packet size, we keep this scaled by 16 to keep enough
- * precision. */
-#define UPDATE_AVG(avg, val)           \
-  if ((avg) == 0)                      \
-   (avg) = (val) << 4;                 \
-  else                                         \
+/* update average packet size */
+#define INIT_AVG(avg, val) \
+   (avg) = (val);
+#define UPDATE_AVG(avg, val)            \
+  if ((avg) == 0)                       \
+   (avg) = (val);                       \
+  else                                  \
    (avg) = ((val) + (15 * (avg))) >> 4;
 
+
 /* The number RTCP intervals after which to timeout entries in the
  * collision table
  */
@@ -98,17 +103,102 @@ static void rtp_session_set_property (GObject * object, guint prop_id,
 static void rtp_session_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
+static gboolean rtp_session_on_sending_rtcp (RTPSession * sess,
+    GstBuffer * buffer, gboolean early);
+static void rtp_session_send_rtcp (RTPSession * sess,
+    GstClockTimeDiff max_delay);
+
+
 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
 
 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
 
 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
-static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess,
+static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
     const gchar * reason, GstClockTime current_time);
 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
     gboolean deterministic, gboolean first);
 
+static gboolean
+accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
+    const GValue * handler_return, gpointer data)
+{
+  if (g_value_get_boolean (handler_return))
+    g_value_set_boolean (return_accu, TRUE);
+
+  return TRUE;
+}
+
+static void
+gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN (GClosure * closure,
+    GValue * return_value G_GNUC_UNUSED, guint n_param_values,
+    const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
+    gpointer marshal_data)
+{
+  typedef gboolean (*GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (gpointer data1,
+      gpointer arg_1, gboolean arg_2, gpointer data2);
+  register GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN callback;
+  register GCClosure *cc = (GCClosure *) closure;
+  register gpointer data1, data2;
+  gboolean v_return;
+
+  g_return_if_fail (return_value != NULL);
+  g_return_if_fail (n_param_values == 3);
+
+  if (G_CCLOSURE_SWAP_DATA (closure)) {
+    data1 = closure->data;
+    data2 = g_value_peek_pointer (param_values + 0);
+  } else {
+    data1 = g_value_peek_pointer (param_values + 0);
+    data2 = closure->data;
+  }
+  callback =
+      (GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (marshal_data ? marshal_data :
+      cc->callback);
+
+  v_return = callback (data1,
+      gst_value_get_mini_object (param_values + 1),
+      g_value_get_boolean (param_values + 2), data2);
+
+  g_value_set_boolean (return_value, v_return);
+}
+
+static void
+gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT (GClosure * closure,
+    GValue * return_value G_GNUC_UNUSED, guint n_param_values,
+    const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
+    gpointer marshal_data)
+{
+  typedef void (*GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (gpointer
+      data1, guint arg_1, guint arg_2, guint arg_3, guint arg_4, gpointer arg_5,
+      gpointer data2);
+  register GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT callback;
+  register GCClosure *cc = (GCClosure *) closure;
+  register gpointer data1, data2;
+
+  g_return_if_fail (n_param_values == 6);
+
+  if (G_CCLOSURE_SWAP_DATA (closure)) {
+    data1 = closure->data;
+    data2 = g_value_peek_pointer (param_values + 0);
+  } else {
+    data1 = g_value_peek_pointer (param_values + 0);
+    data2 = closure->data;
+  }
+  callback =
+      (GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (marshal_data ?
+      marshal_data : cc->callback);
+
+  callback (data1,
+      g_value_get_uint (param_values + 1),
+      g_value_get_uint (param_values + 2),
+      g_value_get_uint (param_values + 3),
+      g_value_get_uint (param_values + 4),
+      gst_value_get_mini_object (param_values + 5), data2);
+}
+
+
 static void
 rtp_session_class_init (RTPSessionClass * klass)
 {
@@ -242,6 +332,66 @@ rtp_session_class_init (RTPSessionClass * klass)
       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
       RTP_TYPE_SOURCE);
 
+  /**
+   * RTPSession::on-sending-rtcp
+   * @session: the object which received the signal
+   * @buffer: the #GstBuffer containing the RTCP packet about to be sent
+   * @early: %TRUE if the packet is early, %FALSE if it is regular
+   *
+   * This signal is emitted before sending an RTCP packet, it can be used
+   * to add extra RTCP Packets.
+   *
+   * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
+   * if suppressing it is acceptable
+   */
+  rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
+      g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
+      accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN,
+      G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER, G_TYPE_BOOLEAN);
+
+  /**
+   * RTPSession::on-feedback-rtcp:
+   * @session: the object which received the signal
+   * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
+   *  %GST_RTCP_TYPE_RTPFB
+   * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
+   * @sender_ssrc: The SSRC of the sender
+   * @media_ssrc: The SSRC of the media this refers to
+   * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
+   * there was no FCI
+   *
+   * Notify that a RTCP feedback packet has been received
+   */
+
+  rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
+      g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT,
+      G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
+      GST_TYPE_BUFFER);
+
+  /**
+   * RTPSession::send-rtcp:
+   * @session: the object which received the signal
+   * @max_delay: The maximum delay after which the feedback will not be useful
+   *  anymore
+   *
+   * Requests that the #RTPSession initiate a new RTCP packet as soon as
+   * possible within the requested delay.
+   */
+
+  rtp_session_signals[SIGNAL_SEND_RTCP] =
+      g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+      gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
+
+  g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
+      g_param_spec_uint ("internal-ssrc", "Internal SSRC",
+          "The internal SSRC used for the session",
+          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
       g_param_spec_object ("internal-source", "Internal Source",
           "The internal source element of the session",
@@ -249,50 +399,38 @@ rtp_session_class_init (RTPSessionClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
       g_param_spec_double ("bandwidth", "Bandwidth",
-          "The bandwidth of the session",
+          "The bandwidth of the session (0 for auto-discover)",
           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
-          "The fraction of the bandwidth used for RTCP",
+          "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
-      g_param_spec_string ("sdes-cname", "SDES CNAME",
-          "The CNAME to put in SDES messages of this session",
-          DEFAULT_SDES_CNAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_SDES_NAME,
-      g_param_spec_string ("sdes-name", "SDES NAME",
-          "The NAME to put in SDES messages of this session",
-          DEFAULT_SDES_NAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
-      g_param_spec_string ("sdes-email", "SDES EMAIL",
-          "The EMAIL to put in SDES messages of this session",
-          DEFAULT_SDES_EMAIL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
-      g_param_spec_string ("sdes-phone", "SDES PHONE",
-          "The PHONE to put in SDES messages of this session",
-          DEFAULT_SDES_PHONE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
-      g_param_spec_string ("sdes-location", "SDES LOCATION",
-          "The LOCATION to put in SDES messages of this session",
-          DEFAULT_SDES_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
-      g_param_spec_string ("sdes-tool", "SDES TOOL",
-          "The TOOL to put in SDES messages of this session",
-          DEFAULT_SDES_TOOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
-  g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
-      g_param_spec_string ("sdes-note", "SDES NOTE",
-          "The NOTE to put in SDES messages of this session",
-          DEFAULT_SDES_NOTE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
+      g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
+          "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
+          -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
+      g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
+          "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
+          -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
+      g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
+          "The maximum size of the RTCP packets",
+          16, G_MAXINT16, DEFAULT_RTCP_MTU,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_SDES,
+      g_param_spec_boxed ("sdes", "SDES",
+          "The SDES items of this session",
+          GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
       g_param_spec_uint ("num-sources", "Num Sources",
@@ -335,8 +473,30 @@ rtp_session_class_init (RTPSessionClass * klass)
           "An array of all known sources in the session",
           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
+      g_param_spec_boolean ("favor-new", "Favor new sources",
+          "Resolve SSRC conflict in favor of new sources", FALSE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
+      g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
+          "Minimum interval between Regular RTCP packet (in ns)",
+          0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class,
+      PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
+      g_param_spec_uint64 ("rtcp-feedback-retention-window",
+          "RTCP Feedback retention window",
+          "Duration during which RTCP Feedback packets are retained (in ns)",
+          0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+
   klass->get_source_by_ssrc =
       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
+  klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
+  klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
 
   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
 }
@@ -361,15 +521,22 @@ rtp_session_init (RTPSession * sess)
 
   rtp_stats_init_defaults (&sess->stats);
 
+  sess->recalc_bandwidth = TRUE;
+  sess->bandwidth = DEFAULT_BANDWIDTH;
+  sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
+  sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
+  sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
+
   /* create an active SSRC for this session manager */
   sess->source = rtp_session_create_source (sess);
   sess->source->validated = TRUE;
   sess->source->internal = TRUE;
   sess->stats.active_sources++;
+  INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
 
   /* default UDP header length */
   sess->header_len = 28;
-  sess->mtu = 1400;
+  sess->mtu = DEFAULT_RTCP_MTU;
 
   /* some default SDES entries */
   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
@@ -381,6 +548,10 @@ rtp_session_init (RTPSession * sess)
   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
 
   sess->first_rtcp = TRUE;
+  sess->allow_early = TRUE;
+  sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
+
+  sess->rtcp_pli_requests = g_array_new (FALSE, FALSE, sizeof (guint32));
 
   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
 }
@@ -402,6 +573,8 @@ rtp_session_finalize (GObject * object)
   g_hash_table_destroy (sess->cnames);
   g_object_unref (sess->source);
 
+  g_array_free (sess->rtcp_pli_requests, TRUE);
+
   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
 }
 
@@ -412,6 +585,7 @@ copy_source (gpointer key, RTPSource * source, GValueArray * arr)
 
   g_value_init (&value, RTP_TYPE_SOURCE);
   g_value_take_object (&value, source);
+  /* copies the value */
   g_value_array_append (arr, &value);
 }
 
@@ -443,39 +617,37 @@ rtp_session_set_property (GObject * object, guint prop_id,
   sess = RTP_SESSION (object);
 
   switch (prop_id) {
+    case PROP_INTERNAL_SSRC:
+      rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
+      break;
     case PROP_BANDWIDTH:
-      rtp_session_set_bandwidth (sess, g_value_get_double (value));
+      sess->bandwidth = g_value_get_double (value);
+      sess->recalc_bandwidth = TRUE;
       break;
     case PROP_RTCP_FRACTION:
-      rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
-      break;
-    case PROP_SDES_CNAME:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME,
-          g_value_get_string (value));
+      sess->rtcp_bandwidth = g_value_get_double (value);
+      sess->recalc_bandwidth = TRUE;
       break;
-    case PROP_SDES_NAME:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME,
-          g_value_get_string (value));
+    case PROP_RTCP_RR_BANDWIDTH:
+      sess->rtcp_rr_bandwidth = g_value_get_int (value);
+      sess->recalc_bandwidth = TRUE;
       break;
-    case PROP_SDES_EMAIL:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL,
-          g_value_get_string (value));
+    case PROP_RTCP_RS_BANDWIDTH:
+      sess->rtcp_rs_bandwidth = g_value_get_int (value);
+      sess->recalc_bandwidth = TRUE;
       break;
-    case PROP_SDES_PHONE:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE,
-          g_value_get_string (value));
+    case PROP_RTCP_MTU:
+      sess->mtu = g_value_get_uint (value);
       break;
-    case PROP_SDES_LOCATION:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC,
-          g_value_get_string (value));
+    case PROP_SDES:
+      rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
       break;
-    case PROP_SDES_TOOL:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL,
-          g_value_get_string (value));
+    case PROP_FAVOR_NEW:
+      sess->favor_new = g_value_get_boolean (value);
       break;
-    case PROP_SDES_NOTE:
-      rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE,
-          g_value_get_string (value));
+    case PROP_RTCP_MIN_INTERVAL:
+      rtp_stats_set_min_interval (&sess->stats,
+          (gdouble) g_value_get_uint64 (value) / GST_SECOND);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -492,42 +664,29 @@ rtp_session_get_property (GObject * object, guint prop_id,
   sess = RTP_SESSION (object);
 
   switch (prop_id) {
+    case PROP_INTERNAL_SSRC:
+      g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
+      break;
     case PROP_INTERNAL_SOURCE:
       g_value_take_object (value, rtp_session_get_internal_source (sess));
       break;
     case PROP_BANDWIDTH:
-      g_value_set_double (value, rtp_session_get_bandwidth (sess));
+      g_value_set_double (value, sess->bandwidth);
       break;
     case PROP_RTCP_FRACTION:
-      g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
-      break;
-    case PROP_SDES_CNAME:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_CNAME));
-      break;
-    case PROP_SDES_NAME:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_NAME));
+      g_value_set_double (value, sess->rtcp_bandwidth);
       break;
-    case PROP_SDES_EMAIL:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_EMAIL));
+    case PROP_RTCP_RR_BANDWIDTH:
+      g_value_set_int (value, sess->rtcp_rr_bandwidth);
       break;
-    case PROP_SDES_PHONE:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_PHONE));
+    case PROP_RTCP_RS_BANDWIDTH:
+      g_value_set_int (value, sess->rtcp_rs_bandwidth);
       break;
-    case PROP_SDES_LOCATION:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_LOC));
+    case PROP_RTCP_MTU:
+      g_value_set_uint (value, sess->mtu);
       break;
-    case PROP_SDES_TOOL:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_TOOL));
-      break;
-    case PROP_SDES_NOTE:
-      g_value_take_string (value, rtp_session_get_sdes_string (sess,
-              GST_RTCP_SDES_NOTE));
+    case PROP_SDES:
+      g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
       break;
     case PROP_NUM_SOURCES:
       g_value_set_uint (value, rtp_session_get_num_sources (sess));
@@ -538,6 +697,12 @@ rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_SOURCES:
       g_value_take_boxed (value, rtp_session_create_sources (sess));
       break;
+    case PROP_FAVOR_NEW:
+      g_value_set_boolean (value, sess->favor_new);
+      break;
+    case PROP_RTCP_MIN_INTERVAL:
+      g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -693,6 +858,14 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
     sess->callbacks.reconsider = callbacks->reconsider;
     sess->reconsider_user_data = user_data;
   }
+  if (callbacks->request_key_unit) {
+    sess->callbacks.request_key_unit = callbacks->request_key_unit;
+    sess->request_key_unit_user_data = user_data;
+  }
+  if (callbacks->request_time) {
+    sess->callbacks.request_time = callbacks->request_time;
+    sess->request_time_user_data = user_data;
+  }
 }
 
 /**
@@ -804,6 +977,24 @@ rtp_session_set_reconsider_callback (RTPSession * sess,
 }
 
 /**
+ * rtp_session_set_request_time_callback:
+ * @sess: an #RTPSession
+ * @callback: callback to set
+ * @user_data: user data passed in the callback
+ *
+ * Configure only the request_time callback
+ */
+void
+rtp_session_set_request_time_callback (RTPSession * sess,
+    RTPSessionRequestTime callback, gpointer user_data)
+{
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  sess->callbacks.request_time = callback;
+  sess->request_time_user_data = user_data;
+}
+
+/**
  * rtp_session_set_bandwidth:
  * @sess: an #RTPSession
  * @bandwidth: the bandwidth allocated
@@ -847,7 +1038,7 @@ rtp_session_get_bandwidth (RTPSession * sess)
  * @sess: an #RTPSession
  * @bandwidth: the RTCP bandwidth
  *
- * Set the bandwidth that should be used for RTCP
+ * Set the bandwidth in bytes per second that should be used for RTCP
  * messages.
  */
 void
@@ -886,9 +1077,9 @@ rtp_session_get_rtcp_fraction (RTPSession * sess)
  * rtp_session_set_sdes_string:
  * @sess: an #RTPSession
  * @type: the type of the SDES item
- * @item: a null-terminated string to set. 
+ * @item: a null-terminated string to set.
  *
- * Store an SDES item of @type in @sess. 
+ * Store an SDES item of @type in @sess.
  *
  * Returns: %FALSE if the data was unchanged @type is invalid.
  */
@@ -912,7 +1103,7 @@ rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
  * @sess: an #RTPSession
  * @type: the type of the SDES item
  *
- * Get the SDES item of @type from @sess. 
+ * Get the SDES item of @type from @sess.
  *
  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
  * valid. g_free() after usage.
@@ -931,8 +1122,52 @@ rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
   return result;
 }
 
+/**
+ * rtp_session_get_sdes_struct:
+ * @sess: an #RTSPSession
+ *
+ * Get the SDES data as a #GstStructure
+ *
+ * Returns: a GstStructure with SDES items for @sess. This function returns a
+ * copy of the SDES structure, use gst_structure_free() after usage.
+ */
+GstStructure *
+rtp_session_get_sdes_struct (RTPSession * sess)
+{
+  const GstStructure *sdes;
+  GstStructure *result = NULL;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
+
+  RTP_SESSION_LOCK (sess);
+  sdes = rtp_source_get_sdes_struct (sess->source);
+  if (sdes)
+    result = gst_structure_copy (sdes);
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_set_sdes_struct:
+ * @sess: an #RTSPSession
+ * @sdes: a #GstStructure
+ *
+ * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
+ */
+void
+rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
+{
+  g_return_if_fail (sdes);
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  RTP_SESSION_LOCK (sess);
+  rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
+  RTP_SESSION_UNLOCK (sess);
+}
+
 static GstFlowReturn
-source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
+source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
 {
   GstFlowReturn result = GST_FLOW_OK;
 
@@ -943,21 +1178,21 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
 
     if (session->callbacks.send_rtp)
       result =
-          session->callbacks.send_rtp (session, source, buffer,
+          session->callbacks.send_rtp (session, source, data,
           session->send_rtp_user_data);
-    else
-      gst_buffer_unref (buffer);
-
+    else {
+      gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    }
   } else {
     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
     RTP_SESSION_UNLOCK (session);
 
     if (session->callbacks.process_rtp)
       result =
-          session->callbacks.process_rtp (session, source, buffer,
-          session->process_rtp_user_data);
+          session->callbacks.process_rtp (session, source,
+          GST_BUFFER_CAST (data), session->process_rtp_user_data);
     else
-      gst_buffer_unref (buffer);
+      gst_buffer_unref (GST_BUFFER_CAST (data));
   }
   RTP_SESSION_LOCK (session);
 
@@ -990,44 +1225,6 @@ static RTPSourceCallbacks callbacks = {
   (RTPSourceClockRate) source_clock_rate,
 };
 
-/**
- * find_add_conflicting_addresses:
- * @sess: The session to check in
- * @arrival: The arrival stats for the buffer
- *
- * Checks if an address which has a conflict is already known,
- *  otherwise remembers it to prevent loops.
- *
- * Returns: TRUE if it was a known conflict, FALSE otherwise
- */
-
-static gboolean
-find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival)
-{
-  GList *item;
-  RTPConflictingAddress *new_conflict;
-
-  for (item = g_list_first (sess->conflicting_addresses);
-      item; item = g_list_next (item)) {
-    RTPConflictingAddress *known_conflict = item->data;
-
-    if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) {
-      known_conflict->time = arrival->time;
-      return TRUE;
-    }
-  }
-
-  new_conflict = g_new0 (RTPConflictingAddress, 1);
-
-  memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress));
-  new_conflict->time = arrival->time;
-
-  sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses,
-      new_conflict);
-
-  return FALSE;
-}
-
 static gboolean
 check_collision (RTPSession * sess, RTPSource * source,
     RTPArrivalStats * arrival, gboolean rtp)
@@ -1037,42 +1234,99 @@ check_collision (RTPSession * sess, RTPSource * source,
     return FALSE;
 
   if (sess->source != source) {
+    GstNetAddress *from;
+    gboolean have_from;
+
     /* This is not our local source, but lets check if two remote
      * source collide
      */
+
     if (rtp) {
-      if (source->have_rtp_from) {
-        if (gst_netaddress_equal (&source->rtp_from, &arrival->address))
-          /* Address is the same */
-          return FALSE;
-      } else {
-        /* We don't already have a from address for RTP, just set it */
-        rtp_source_set_rtp_from (source, &arrival->address);
+      from = &source->rtp_from;
+      have_from = source->have_rtp_from;
+    } else {
+      from = &source->rtcp_from;
+      have_from = source->have_rtcp_from;
+    }
+
+    if (have_from) {
+      if (gst_netaddress_equal (from, &arrival->address)) {
+        /* Address is the same */
         return FALSE;
+      } else {
+        GST_LOG ("we have a third-party collision or loop ssrc:%x",
+            rtp_source_get_ssrc (source));
+        if (sess->favor_new) {
+          if (rtp_source_find_conflicting_address (source,
+                  &arrival->address, arrival->current_time)) {
+            gchar buf1[40];
+            gst_netaddress_to_string (&arrival->address, buf1, 40);
+            GST_LOG ("Known conflict on %x for %s, dropping packet",
+                rtp_source_get_ssrc (source), buf1);
+            return TRUE;
+          } else {
+            gchar buf1[40], buf2[40];
+
+            /* Current address is not a known conflict, lets assume this is
+             * a new source. Save old address in possible conflict list
+             */
+            rtp_source_add_conflicting_address (source, from,
+                arrival->current_time);
+
+            gst_netaddress_to_string (from, buf1, 40);
+            gst_netaddress_to_string (&arrival->address, buf2, 40);
+            GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
+                " saving old as known conflict",
+                rtp_source_get_ssrc (source), buf1, buf2);
+
+            if (rtp)
+              rtp_source_set_rtp_from (source, &arrival->address);
+            else
+              rtp_source_set_rtcp_from (source, &arrival->address);
+            return FALSE;
+          }
+        } else {
+          /* Don't need to save old addresses, we ignore new sources */
+          return TRUE;
+        }
       }
     } else {
-      if (source->have_rtcp_from) {
-        if (gst_netaddress_equal (&source->rtcp_from, &arrival->address))
-          /* Address is the same */
-          return FALSE;
-      } else {
-        /* We don't already have a from address for RTCP, just set it */
+      /* We don't already have a from address for RTP, just set it */
+      if (rtp)
+        rtp_source_set_rtp_from (source, &arrival->address);
+      else
         rtp_source_set_rtcp_from (source, &arrival->address);
-        return FALSE;
-      }
+      return FALSE;
     }
-    /* We received RTP or RTCP from this source before but the network address
-     * changed. In this case, we have third-party collision or loop */
-    GST_DEBUG ("we have a third-party collision or loop");
 
     /* FIXME: Log 3rd party collision somehow
      * Maybe should be done in upper layer, only the SDES can tell us
      * if its a collision or a loop
      */
+
+    /* If the source has been inactive for some time, we assume that it has
+     * simply changed its transport source address. Hence, there is no true
+     * third-party collision - only a simulated one. */
+    if (arrival->current_time > source->last_activity) {
+      GstClockTime inactivity_period =
+          arrival->current_time - source->last_activity;
+      if (inactivity_period > 1 * GST_SECOND) {
+        /* Use new network address */
+        if (rtp) {
+          g_assert (source->have_rtp_from);
+          rtp_source_set_rtp_from (source, &arrival->address);
+        } else {
+          g_assert (source->have_rtcp_from);
+          rtp_source_set_rtcp_from (source, &arrival->address);
+        }
+        return FALSE;
+      }
+    }
   } else {
     /* This is sending with our ssrc, is it an address we already know */
 
-    if (find_add_conflicting_addresses (sess, arrival)) {
+    if (rtp_source_find_conflicting_address (source, &arrival->address,
+            arrival->current_time)) {
       /* Its a known conflict, its probably a loop, not a collision
        * lets just drop the incoming packet
        */
@@ -1080,10 +1334,14 @@ check_collision (RTPSession * sess, RTPSource * source,
     } else {
       /* Its a new collision, lets change our SSRC */
 
+      rtp_source_add_conflicting_address (source, &arrival->address,
+          arrival->current_time);
+
       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
       on_ssrc_collision (sess, source);
 
-      rtp_session_send_bye_locked (sess, "SSRC Collision", arrival->time);
+      rtp_session_schedule_bye_locked (sess, "SSRC Collision",
+          arrival->current_time);
 
       sess->change_ssrc = TRUE;
     }
@@ -1093,7 +1351,8 @@ check_collision (RTPSession * sess, RTPSource * source,
 }
 
 
-/* must be called with the session lock */
+/* must be called with the session lock, the returned source needs to be
+ * unreffed after usage. */
 static RTPSource *
 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
     RTPArrivalStats * arrival, gboolean rtp)
@@ -1139,9 +1398,10 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
     }
   }
   /* update last activity */
-  source->last_activity = arrival->time;
+  source->last_activity = arrival->current_time;
   if (rtp)
-    source->last_rtp_activity = arrival->time;
+    source->last_rtp_activity = arrival->current_time;
+  g_object_ref (source);
 
   return source;
 }
@@ -1177,15 +1437,24 @@ void
 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
 {
   RTP_SESSION_LOCK (sess);
-  g_hash_table_steal (sess->ssrcs[sess->mask_idx],
-      GINT_TO_POINTER (sess->source->ssrc));
+  if (ssrc != sess->source->ssrc) {
+    g_hash_table_steal (sess->ssrcs[sess->mask_idx],
+        GINT_TO_POINTER (sess->source->ssrc));
 
-  sess->source->ssrc = ssrc;
-  rtp_source_reset (sess->source);
+    GST_DEBUG ("setting internal SSRC to %08x", ssrc);
+    /* After this call, any receiver of the old SSRC either in RTP or RTCP
+     * packets will timeout on the old SSRC, we could potentially schedule a
+     * BYE RTCP for the old SSRC... */
+    sess->source->ssrc = ssrc;
+    rtp_source_reset (sess->source);
 
-  g_hash_table_insert (sess->ssrcs[sess->mask_idx],
-      GINT_TO_POINTER (sess->source->ssrc), sess->source);
+    /* rehash with the new SSRC */
+    g_hash_table_insert (sess->ssrcs[sess->mask_idx],
+        GINT_TO_POINTER (sess->source->ssrc), sess->source);
+  }
   RTP_SESSION_UNLOCK (sess);
+
+  g_object_notify (G_OBJECT (sess), "internal-ssrc");
 }
 
 /**
@@ -1194,7 +1463,7 @@ rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
  *
  * Get the internal SSRC of @sess.
  *
- * Returns: The SSRC of the session. 
+ * Returns: The SSRC of the session.
  */
 guint32
 rtp_session_get_internal_ssrc (RTPSession * sess)
@@ -1342,6 +1611,7 @@ rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
   return result;
 }
 
+/* should be called with the SESSION lock */
 static guint32
 rtp_session_create_new_ssrc (RTPSession * sess)
 {
@@ -1355,7 +1625,6 @@ rtp_session_create_new_ssrc (RTPSession * sess)
             GINT_TO_POINTER (ssrc)) == NULL)
       break;
   }
-
   return ssrc;
 }
 
@@ -1378,8 +1647,9 @@ rtp_session_create_source (RTPSession * sess)
   RTP_SESSION_LOCK (sess);
   ssrc = rtp_session_create_new_ssrc (sess);
   source = rtp_source_new (ssrc);
-  g_object_ref (source);
   rtp_source_set_callbacks (source, &callbacks, sess);
+  /* we need an additional ref for the source in the hashtable */
+  g_object_ref (source);
   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
       source);
   /* we have one more source now */
@@ -1399,8 +1669,10 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
     GstClockTime running_time, guint64 ntpnstime)
 {
+  GstMetaNetAddress *meta;
+
   /* get time of arrival */
-  arrival->time = current_time;
+  arrival->current_time = current_time;
   arrival->running_time = running_time;
   arrival->ntpnstime = ntpnstime;
 
@@ -1414,11 +1686,12 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
   }
 
   /* for netbuffer we can store the IP address to check for collisions */
-  arrival->have_address = GST_IS_NETBUFFER (buffer);
-  if (arrival->have_address) {
-    GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
-
-    memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
+  meta = gst_buffer_get_meta_net_address (buffer);
+  if (meta) {
+    arrival->have_address = TRUE;
+    memcpy (&arrival->address, &meta->naddr, sizeof (GstNetAddress));
+  } else {
+    arrival->have_address = FALSE;
   }
 }
 
@@ -1427,7 +1700,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
  * @sess: and #RTPSession
  * @buffer: an RTP buffer
  * @current_time: the current system time
- * @ntpnstime: the NTP arrival time in nanoseconds
+ * @running_time: the running_time of @buffer
  *
  * Process an RTP buffer in the session manager. This function takes ownership
  * of @buffer.
@@ -1436,7 +1709,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
  */
 GstFlowReturn
 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
-    GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
+    GstClockTime current_time, GstClockTime running_time)
 {
   GstFlowReturn result;
   guint32 ssrc;
@@ -1444,6 +1717,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   gboolean created;
   gboolean prevsender, prevactive;
   RTPArrivalStats arrival;
+  guint32 csrcs[16];
+  guint8 i, count;
+  guint64 oldrate;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -1454,7 +1730,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   RTP_SESSION_LOCK (sess);
   /* update arrival stats */
   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
-      running_time, ntpnstime);
+      running_time, -1);
 
   /* ignore more RTP packets when we left the session */
   if (sess->source->received_bye)
@@ -1463,15 +1739,21 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   /* get SSRC and look up in session database */
   ssrc = gst_rtp_buffer_get_ssrc (buffer);
   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
-
   if (!source)
     goto collision;
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
   prevactive = RTP_SOURCE_IS_ACTIVE (source);
+  oldrate = source->bitrate;
+
+  /* copy available csrc for later */
+  count = gst_rtp_buffer_get_csrc_count (buffer);
+  /* make sure to not overflow our array. An RTP buffer can maximally contain
+   * 16 CSRCs */
+  count = MIN (count, 16);
 
-  /* we need to ref so that we can process the CSRCs later */
-  gst_buffer_ref (buffer);
+  for (i = 0; i < count; i++)
+    csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
 
   /* let source process the packet */
   result = rtp_source_process_rtp (source, buffer, &arrival);
@@ -1488,36 +1770,38 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
         sess->stats.sender_sources);
   }
+  if (oldrate != source->bitrate)
+    sess->recalc_bandwidth = TRUE;
 
   if (created)
     on_new_ssrc (sess, source);
 
   if (source->validated) {
-    guint8 i, count;
     gboolean created;
 
     /* for validated sources, we add the CSRCs as well */
-    count = gst_rtp_buffer_get_csrc_count (buffer);
-
     for (i = 0; i < count; i++) {
       guint32 csrc;
       RTPSource *csrc_src;
 
-      csrc = gst_rtp_buffer_get_csrc (buffer, i);
+      csrc = csrcs[i];
 
       /* get source */
       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+      if (!csrc_src)
+        continue;
 
       if (created) {
         GST_DEBUG ("created new CSRC: %08x", csrc);
         rtp_source_set_as_csrc (csrc_src);
         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
           sess->stats.active_sources++;
-        on_new_ssrc (sess, source);
+        on_new_ssrc (sess, csrc_src);
       }
+      g_object_unref (csrc_src);
     }
   }
-  gst_buffer_unref (buffer);
+  g_object_unref (source);
 
   RTP_SESSION_UNLOCK (sess);
 
@@ -1567,12 +1851,11 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
       /* only deal with report blocks for our session, we update the stats of
        * the sender of the RTCP message. We could also compare our stats against
        * the other sender to see if we are better or worse. */
-      rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
-          exthighestseq, jitter, lsr, dlsr);
-
-      on_ssrc_active (sess, source);
+      rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
+          packetslost, exthighestseq, jitter, lsr, dlsr);
     }
   }
+  on_ssrc_active (sess, source);
 }
 
 /* A Sender report contains statistics about how the sender is doing. This
@@ -1586,7 +1869,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
  */
 static void
 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival)
+    RTPArrivalStats * arrival, gboolean * do_sync)
 {
   guint32 senderssrc, rtptime, packet_count, octet_count;
   guint64 ntptime;
@@ -1597,18 +1880,23 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
       &packet_count, &octet_count);
 
   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
-      senderssrc, GST_TIME_ARGS (arrival->time));
+      senderssrc, GST_TIME_ARGS (arrival->current_time));
 
   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
-
   if (!source)
     return;
 
+  /* don't try to do lip-sync for sources that sent a BYE */
+  if (rtp_source_received_bye (source))
+    *do_sync = FALSE;
+  else
+    *do_sync = TRUE;
+
   prevsender = RTP_SOURCE_IS_SENDER (source);
 
   /* first update the source */
-  rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
-      octet_count);
+  rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
+      packet_count, octet_count);
 
   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
     sess->stats.sender_sources++;
@@ -1620,6 +1908,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
     on_new_ssrc (sess, source);
 
   rtp_session_process_rb (sess, source, packet, arrival);
+  g_object_unref (source);
 }
 
 /* A receiver report contains statistics about how a receiver is doing. It
@@ -1641,7 +1930,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
 
   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
-
   if (!source)
     return;
 
@@ -1649,6 +1937,7 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
     on_new_ssrc (sess, source);
 
   rtp_session_process_rb (sess, source, packet, arrival);
+  g_object_unref (source);
 }
 
 /* Get SDES items and store them in the SSRC */
@@ -1666,45 +1955,77 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
   i = 0;
   while (more_items) {
     guint32 ssrc;
-    gboolean changed, created;
+    gboolean changed, created, validated;
     RTPSource *source;
+    GstStructure *sdes;
 
     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
 
     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
 
-    /* find src, no probation when dealing with RTCP */
-    source = obtain_source (sess, ssrc, &created, arrival, FALSE);
     changed = FALSE;
 
+    /* find src, no probation when dealing with RTCP */
+    source = obtain_source (sess, ssrc, &created, arrival, FALSE);
     if (!source)
       return;
 
+    sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
+
     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
     j = 0;
     while (more_entries) {
       GstRTCPSDESType type;
       guint8 len;
       guint8 *data;
+      gchar *name;
+      gchar *value;
 
       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
 
       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
           data);
 
-      changed |= rtp_source_set_sdes (source, type, data, len);
+      if (type == GST_RTCP_SDES_PRIV) {
+        name = g_strndup ((const gchar *) &data[1], data[0]);
+        len -= data[0] + 1;
+        data += data[0] + 1;
+      } else {
+        name = g_strdup (gst_rtcp_sdes_type_to_name (type));
+      }
+
+      value = g_strndup ((const gchar *) data, len);
+
+      gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
+
+      g_free (name);
+      g_free (value);
 
       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
       j++;
     }
 
+    /* takes ownership of sdes */
+    changed = rtp_source_set_sdes_struct (source, sdes);
+
+    validated = !RTP_SOURCE_IS_ACTIVE (source);
     source->validated = TRUE;
 
+    /* source became active */
+    if (validated) {
+      sess->stats.active_sources++;
+      GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
+          sess->stats.active_sources);
+      on_ssrc_validated (sess, source);
+    }
+
     if (created)
       on_new_ssrc (sess, source);
     if (changed)
       on_ssrc_sdes (sess, source);
 
+    g_object_unref (source);
+
     more_items = gst_rtcp_packet_sdes_next_item (packet);
     i++;
   }
@@ -1718,6 +2039,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
 {
   guint count, i;
   gchar *reason;
+  gboolean reconsider = FALSE;
 
   reason = gst_rtcp_packet_bye_get_reason (packet);
   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
@@ -1732,14 +2054,16 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
     GST_DEBUG ("SSRC: %08x", ssrc);
 
+    if (ssrc == sess->source->ssrc)
+      return;
+
     /* find src and mark bye, no probation when dealing with RTCP */
     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
-
     if (!source)
       return;
 
     /* store time for when we need to time out this source */
-    source->bye_time = arrival->time;
+    source->bye_time = arrival->current_time;
 
     prevactive = RTP_SOURCE_IS_ACTIVE (source);
     prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -1765,23 +2089,21 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
       /* some members went away since the previous timeout estimate.
        * Perform reverse reconsideration but only when we are not scheduling a
        * BYE ourselves. */
-      if (arrival->time < sess->next_rtcp_check_time) {
+      if (arrival->current_time < sess->next_rtcp_check_time) {
         GstClockTime time_remaining;
 
-        time_remaining = sess->next_rtcp_check_time - arrival->time;
+        time_remaining = sess->next_rtcp_check_time - arrival->current_time;
         sess->next_rtcp_check_time =
             gst_util_uint64_scale (time_remaining, members, pmembers);
 
         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
             GST_TIME_ARGS (sess->next_rtcp_check_time));
 
-        sess->next_rtcp_check_time += arrival->time;
+        sess->next_rtcp_check_time += arrival->current_time;
 
-        RTP_SESSION_UNLOCK (sess);
-        /* notify app of reconsideration */
-        if (sess->callbacks.reconsider)
-          sess->callbacks.reconsider (sess, sess->reconsider_user_data);
-        RTP_SESSION_LOCK (sess);
+        /* mark pending reconsider. We only want to signal the reconsideration
+         * once after we handled all the source in the bye packet */
+        reconsider = TRUE;
       }
     }
 
@@ -1789,6 +2111,15 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
       on_new_ssrc (sess, source);
 
     on_bye_ssrc (sess, source);
+
+    g_object_unref (source);
+  }
+  if (reconsider) {
+    RTP_SESSION_UNLOCK (sess);
+    /* notify app of reconsideration */
+    if (sess->callbacks.reconsider)
+      sess->callbacks.reconsider (sess, sess->reconsider_user_data);
+    RTP_SESSION_LOCK (sess);
   }
   g_free (reason);
 }
@@ -1800,11 +2131,113 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
   GST_DEBUG ("received APP");
 }
 
+static void
+rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
+    guint32 media_ssrc, GstClockTime current_time)
+{
+  RTPSource *src;
+  guint32 round_trip = 0;
+
+  if (!sess->callbacks.request_key_unit)
+    return;
+
+  src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+      GINT_TO_POINTER (sender_ssrc));
+
+  if (!src)
+    return;
+
+  if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
+      rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL,
+          &round_trip)) {
+    GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
+        GST_SECOND, 65536);
+
+    if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
+        current_time - sess->last_keyframe_request < round_trip_in_ns) {
+      GST_DEBUG ("Ignoring PLI because one was send without one RTT (%"
+          GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
+          GST_TIME_ARGS (current_time - sess->last_keyframe_request),
+          GST_TIME_ARGS (round_trip_in_ns));;
+      return;
+    }
+  }
+
+  sess->last_keyframe_request = current_time;
+
+  GST_LOG ("received PLI from %X %p(%p)", sender_ssrc,
+      sess->callbacks.process_rtp, sess->callbacks.request_key_unit);
+
+  sess->callbacks.request_key_unit (sess, FALSE,
+      sess->request_key_unit_user_data);
+}
+
+static void
+rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
+    RTPArrivalStats * arrival, GstClockTime current_time)
+{
+  GstRTCPType type = gst_rtcp_packet_get_type (packet);
+  GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
+  guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
+  guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
+  guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
+  guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
+
+  GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
+      "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
+
+  if (g_signal_has_handler_pending (sess,
+          rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
+    GstBuffer *fci_buffer = NULL;
+
+    if (fci_length > 0) {
+      fci_buffer = gst_buffer_create_sub (packet->buffer,
+          fci_data - GST_BUFFER_DATA (packet->buffer), fci_length);
+      GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
+    }
+
+    RTP_SESSION_UNLOCK (sess);
+    g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
+        type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
+    RTP_SESSION_LOCK (sess);
+
+    if (fci_buffer)
+      gst_buffer_unref (fci_buffer);
+  }
+
+  if (sess->rtcp_feedback_retention_window) {
+    RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+        GINT_TO_POINTER (media_ssrc));
+
+    if (src)
+      rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
+  }
+
+  if (rtp_source_get_ssrc (sess->source) == media_ssrc) {
+    switch (type) {
+      case GST_RTCP_TYPE_PSFB:
+        switch (fbtype) {
+          case GST_RTCP_PSFB_TYPE_PLI:
+            rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
+                current_time);
+            break;
+          default:
+            break;
+        }
+        break;
+      case GST_RTCP_TYPE_RTPFB:
+      default:
+        break;
+    }
+  }
+}
+
 /**
  * rtp_session_process_rtcp:
  * @sess: and #RTPSession
  * @buffer: an RTCP buffer
  * @current_time: the current system time
+ * @ntpnstime: the current NTP time in nanoseconds
  *
  * Process an RTCP buffer in the session manager. This function takes ownership
  * of @buffer.
@@ -1813,10 +2246,10 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
  */
 GstFlowReturn
 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
-    GstClockTime current_time)
+    GstClockTime current_time, guint64 ntpnstime)
 {
   GstRTCPPacket packet;
-  gboolean more, is_bye = FALSE, is_sr = FALSE;
+  gboolean more, is_bye = FALSE, do_sync = FALSE;
   RTPArrivalStats arrival;
   GstFlowReturn result = GST_FLOW_OK;
 
@@ -1830,14 +2263,12 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   RTP_SESSION_LOCK (sess);
   /* update arrival stats */
-  update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1, -1);
+  update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
+      ntpnstime);
 
   if (sess->sent_bye)
     goto ignore;
 
-  /* make writable, we might want to change the buffer */
-  buffer = gst_buffer_make_metadata_writable (buffer);
-
   /* start processing the compound packet */
   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
   while (more) {
@@ -1853,8 +2284,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
     switch (type) {
       case GST_RTCP_TYPE_SR:
-        rtp_session_process_sr (sess, &packet, &arrival);
-        is_sr = TRUE;
+        rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
         break;
       case GST_RTCP_TYPE_RR:
         rtp_session_process_rr (sess, &packet, &arrival);
@@ -1864,11 +2294,17 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
         break;
       case GST_RTCP_TYPE_BYE:
         is_bye = TRUE;
+        /* don't try to attempt lip-sync anymore for streams with a BYE */
+        do_sync = FALSE;
         rtp_session_process_bye (sess, &packet, &arrival);
         break;
       case GST_RTCP_TYPE_APP:
         rtp_session_process_app (sess, &packet, &arrival);
         break;
+      case GST_RTCP_TYPE_RTPFB:
+      case GST_RTCP_TYPE_PSFB:
+        rtp_session_process_feedback (sess, &packet, &arrival, current_time);
+        break;
       default:
         GST_WARNING ("got unknown RTCP packet");
         break;
@@ -1888,13 +2324,18 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
     /* keep track of average packet size */
     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
   }
+  GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
+      sess->stats.avg_rtcp_packet_size, arrival.bytes);
   RTP_SESSION_UNLOCK (sess);
 
   /* notify caller of sr packets in the callback */
-  if (is_sr && sess->callbacks.sync_rtcp)
+  if (do_sync && sess->callbacks.sync_rtcp) {
+    /* make writable, we might want to change the buffer */
+    buffer = gst_buffer_make_metadata_writable (buffer);
+
     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
         sess->sync_rtcp_user_data);
-  else
+  else
     gst_buffer_unref (buffer);
 
   return result;
@@ -1918,10 +2359,10 @@ ignore:
 /**
  * rtp_session_send_rtp:
  * @sess: an #RTPSession
- * @buffer: an RTP buffer
+ * @data: pointer to either an RTP buffer or a list of RTP buffers
+ * @is_list: TRUE when @data is a buffer list
  * @current_time: the current system time
- * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
- * This is the buffer timestamp converted to NTP time.
+ * @running_time: the running time of @data
  *
  * Send the RTP buffer in the session manager. This function takes ownership of
  * @buffer.
@@ -1929,20 +2370,28 @@ ignore:
  * Returns: a #GstFlowReturn.
  */
 GstFlowReturn
-rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer,
-    GstClockTime current_time, guint64 ntpnstime)
+rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
+    GstClockTime current_time, GstClockTime running_time)
 {
   GstFlowReturn result;
   RTPSource *source;
   gboolean prevsender;
+  gboolean valid_packet;
+  guint64 oldrate;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
-  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+  g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
 
-  if (!gst_rtp_buffer_validate (buffer))
+  if (is_list) {
+    valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
+  } else {
+    valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
+  }
+
+  if (!valid_packet)
     goto invalid_packet;
 
-  GST_LOG ("received RTP packet for sending");
+  GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
 
   RTP_SESSION_LOCK (sess);
   source = sess->source;
@@ -1951,12 +2400,15 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer,
   source->last_rtp_activity = current_time;
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
+  oldrate = source->bitrate;
 
   /* we use our own source to send */
-  result = rtp_source_send_rtp (source, buffer, ntpnstime);
+  result = rtp_source_send_rtp (source, data, is_list, running_time);
 
   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
     sess->stats.sender_sources++;
+  if (oldrate != source->bitrate)
+    sess->recalc_bandwidth = TRUE;
   RTP_SESSION_UNLOCK (sess);
 
   return result;
@@ -1964,18 +2416,46 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer,
   /* ERRORS */
 invalid_packet:
   {
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     GST_DEBUG ("invalid RTP packet received");
     return GST_FLOW_OK;
   }
 }
 
+static void
+add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
+{
+  *bandwidth += source->bitrate;
+}
+
 static GstClockTime
 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
     gboolean first)
 {
   GstClockTime result;
 
+  /* recalculate bandwidth when it changed */
+  if (sess->recalc_bandwidth) {
+    gdouble bandwidth;
+
+    if (sess->bandwidth > 0)
+      bandwidth = sess->bandwidth;
+    else {
+      /* If it is <= 0, then try to estimate the actual bandwidth */
+      bandwidth = sess->source->bitrate;
+
+      g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth);
+      bandwidth /= 8.0;
+    }
+    if (bandwidth == 0)
+      bandwidth = RTP_STATS_BANDWIDTH;
+
+    rtp_stats_set_bandwidths (&sess->stats, bandwidth,
+        sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
+
+    sess->recalc_bandwidth = FALSE;
+  }
+
   if (sess->source->received_bye) {
     result = rtp_stats_calculate_bye_interval (&sess->stats);
   } else {
@@ -1986,7 +2466,7 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
       GST_TIME_ARGS (result), first);
 
-  if (!deterministic)
+  if (!deterministic && result != GST_CLOCK_TIME_NONE)
     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
 
   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
@@ -1994,19 +2474,11 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
   return result;
 }
 
-/**
- * rtp_session_send_bye_locked:
- * @sess: an #RTPSession
- * @reason: a reason or NULL
- *
- * Stop the current @sess and schedule a BYE message for the other members.
- *
+/* Stop the current @sess and schedule a BYE message for the other members.
  * One must have the session lock to call this function
- *
- * Returns: a #GstFlowReturn.
  */
 static GstFlowReturn
-rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason,
+rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
     GstClockTime current_time)
 {
   GstFlowReturn result = GST_FLOW_OK;
@@ -2026,10 +2498,11 @@ rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason,
   /* at least one member wants to send a BYE */
   g_free (sess->bye_reason);
   sess->bye_reason = g_strdup (reason);
-  sess->stats.avg_rtcp_packet_size = 100;
+  INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
   sess->stats.bye_members = 1;
   sess->first_rtcp = TRUE;
   sess->sent_bye = FALSE;
+  sess->allow_early = TRUE;
 
   /* reschedule transmission */
   sess->last_rtcp_send_time = current_time;
@@ -2050,19 +2523,17 @@ done:
 }
 
 /**
- * rtp_session_send_bye:
+ * rtp_session_schedule_bye:
  * @sess: an #RTPSession
  * @reason: a reason or NULL
  * @current_time: the current system time
  *
  * Stop the current @sess and schedule a BYE message for the other members.
  *
- * One must have the session lock to call this function
- *
  * Returns: a #GstFlowReturn.
  */
 GstFlowReturn
-rtp_session_send_bye (RTPSession * sess, const gchar * reason,
+rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
     GstClockTime current_time)
 {
   GstFlowReturn result = GST_FLOW_OK;
@@ -2070,7 +2541,7 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason,
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
   RTP_SESSION_LOCK (sess);
-  result = rtp_session_send_bye_locked (sess, reason, current_time);
+  result = rtp_session_schedule_bye_locked (sess, reason, current_time);
   RTP_SESSION_UNLOCK (sess);
 
   return result;
@@ -2089,12 +2560,17 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason,
 GstClockTime
 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
 {
-  GstClockTime result;
+  GstClockTime result, interval = 0;
 
-  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
 
   RTP_SESSION_LOCK (sess);
 
+  if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+    result = sess->next_early_rtcp_time;
+    goto early_exit;
+  }
+
   result = sess->next_rtcp_check_time;
 
   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
@@ -2110,26 +2586,36 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
   if (sess->source->received_bye) {
     if (sess->sent_bye) {
       GST_DEBUG ("we sent BYE already");
-      result = GST_CLOCK_TIME_NONE;
+      interval = GST_CLOCK_TIME_NONE;
     } else if (sess->stats.active_sources >= 50) {
       GST_DEBUG ("reconsider BYE, more than 50 sources");
       /* reconsider BYE if members >= 50 */
-      result += calculate_rtcp_interval (sess, FALSE, TRUE);
+      interval = calculate_rtcp_interval (sess, FALSE, TRUE);
     }
   } else {
     if (sess->first_rtcp) {
       GST_DEBUG ("first RTCP packet");
       /* we are called for the first time */
-      result += calculate_rtcp_interval (sess, FALSE, TRUE);
+      interval = calculate_rtcp_interval (sess, FALSE, TRUE);
     } else if (sess->next_rtcp_check_time < current_time) {
       GST_DEBUG ("old check time expired, getting new timeout");
       /* get a new timeout when we need to */
-      result += calculate_rtcp_interval (sess, FALSE, FALSE);
+      interval = calculate_rtcp_interval (sess, FALSE, FALSE);
     }
   }
+
+  if (interval != GST_CLOCK_TIME_NONE)
+    result += interval;
+  else
+    result = GST_CLOCK_TIME_NONE;
+
   sess->next_rtcp_check_time = result;
 
-  GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
+early_exit:
+
+  GST_DEBUG ("current time: %" GST_TIME_FORMAT
+      ", next time: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
   RTP_SESSION_UNLOCK (sess);
 
   return result;
@@ -2141,10 +2627,13 @@ typedef struct
   GstBuffer *rtcp;
   GstClockTime current_time;
   guint64 ntpnstime;
+  GstClockTime running_time;
   GstClockTime interval;
   GstRTCPPacket packet;
   gboolean is_bye;
   gboolean has_sdes;
+  gboolean is_early;
+  gboolean may_suppress;
 } ReportData;
 
 static void
@@ -2165,8 +2654,8 @@ session_start_rtcp (RTPSession * sess, ReportData * data)
     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
 
     /* get latest stats */
-    rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
-        &packet_count, &octet_count);
+    rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
+        &ntptime, &rtptime, &packet_count, &octet_count);
     /* store stats */
     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
         packet_count, octet_count);
@@ -2192,6 +2681,9 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
   /* create a new buffer if needed */
   if (data->rtcp == NULL) {
     session_start_rtcp (sess, data);
+  } else if (data->is_early) {
+    /* Put a single RR or SR in minimal compound packets */
+    return;
   }
   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
     /* only report about other sender sources */
@@ -2205,6 +2697,15 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
 
+      /* store last generated RR packet */
+      source->last_rr.is_valid = TRUE;
+      source->last_rr.fractionlost = fractionlost;
+      source->last_rr.packetslost = packetslost;
+      source->last_rr.exthighestseq = exthighestseq;
+      source->last_rr.jitter = jitter;
+      source->last_rr.lsr = lsr;
+      source->last_rr.dlsr = dlsr;
+
       /* packet is not yet filled, add report block for this source. */
       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
           exthighestseq, jitter, lsr, dlsr);
@@ -2213,7 +2714,7 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
 }
 
 /* perform cleanup of sources that timed out */
-static gboolean
+static void
 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
 {
   gboolean remove = FALSE;
@@ -2281,34 +2782,70 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
     if (sendertimeout)
       on_sender_timeout (sess, source);
   }
-  return remove;
+
+  source->closing = remove;
 }
 
 static void
 session_sdes (RTPSession * sess, ReportData * data)
 {
   GstRTCPPacket *packet = &data->packet;
-  guint8 *sdes_data;
-  guint sdes_len;
+  const GstStructure *sdes;
+  gint i, n_fields;
 
   /* add SDES packet */
   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
 
   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
 
-  rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data,
-      &sdes_len);
-  gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len,
-      sdes_data);
-
-  /* other SDES items must only be added at regular intervals and only when the
-   * user requests to since it might be a privacy problem */
-#if 0
-  gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
-      strlen (sess->name), (guint8 *) sess->name);
-  gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
-      strlen (sess->tool), (guint8 *) sess->tool);
-#endif
+  sdes = rtp_source_get_sdes_struct (sess->source);
+
+  /* add all fields in the structure, the order is not important. */
+  n_fields = gst_structure_n_fields (sdes);
+  for (i = 0; i < n_fields; ++i) {
+    const gchar *field;
+    const gchar *value;
+    GstRTCPSDESType type;
+
+    field = gst_structure_nth_field_name (sdes, i);
+    if (field == NULL)
+      continue;
+    value = gst_structure_get_string (sdes, field);
+    if (value == NULL)
+      continue;
+    type = gst_rtcp_sdes_name_to_type (field);
+
+    /* Early packets are minimal and only include the CNAME */
+    if (data->is_early && type != GST_RTCP_SDES_CNAME)
+      continue;
+
+    if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
+      gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
+          (const guint8 *) value);
+    } else if (type == GST_RTCP_SDES_PRIV) {
+      gsize prefix_len;
+      gsize value_len;
+      gsize data_len;
+      guint8 data[256];
+
+      /* don't accept entries that are too big */
+      prefix_len = strlen (field);
+      if (prefix_len > 255)
+        continue;
+      value_len = strlen (value);
+      if (value_len > 255)
+        continue;
+      data_len = 1 + prefix_len + value_len;
+      if (data_len > 255)
+        continue;
+
+      data[0] = prefix_len;
+      memcpy (&data[1], field, prefix_len);
+      memcpy (&data[1 + prefix_len], value, value_len);
+
+      gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
+    }
+  }
 
   data->has_sdes = TRUE;
 }
@@ -2339,7 +2876,9 @@ static gboolean
 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
 {
   GstClockTime new_send_time, elapsed;
-  gboolean result;
+
+  if (data->is_early && sess->next_early_rtcp_time < current_time)
+    goto early;
 
   /* no need to check yet */
   if (sess->next_rtcp_check_time > current_time) {
@@ -2364,18 +2903,52 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
   if (current_time < new_send_time) {
     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
         GST_TIME_ARGS (new_send_time));
-    result = FALSE;
     /* store new check time */
     sess->next_rtcp_check_time = new_send_time;
-  } else {
-    result = TRUE;
-    new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+    return FALSE;
+  }
 
-    GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (new_send_time));
-    sess->next_rtcp_check_time = current_time + new_send_time;
+early:
+
+  new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+
+  GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (new_send_time));
+  sess->next_rtcp_check_time = current_time + new_send_time;
+
+  /* Apply the rules from RFC 4585 section 3.5.3 */
+  if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
+    GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) *
+        sess->stats.min_interval;
+
+    /* This will caused the RTCP to be suppressed if no FB packets are added */
+    if (sess->last_rtcp_send_time + T_rr_current_interval >
+        sess->next_rtcp_check_time) {
+      GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
+          " last: %" GST_TIME_FORMAT
+          " + T_rr_current_interval: %" GST_TIME_FORMAT
+          " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (sess->stats.min_interval),
+          GST_TIME_ARGS (sess->last_rtcp_send_time),
+          GST_TIME_ARGS (T_rr_current_interval),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      data->may_suppress = TRUE;
+    }
   }
-  return result;
+
+  return TRUE;
+}
+
+static void
+clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
+{
+  g_hash_table_insert (hash_table, key, g_object_ref (source));
+}
+
+static gboolean
+remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
+{
+  return source->closing;
 }
 
 /**
@@ -2383,6 +2956,7 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
  * @sess: an #RTPSession
  * @current_time: the current system time
  * @ntpnstime: the current NTP time in nanoseconds
+ * @running_time: the current running_time of the pipeline
  *
  * Perform maintenance actions after the timeout obtained with
  * rtp_session_next_timeout() expired.
@@ -2397,12 +2971,13 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
  */
 GstFlowReturn
 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
-    guint64 ntpnstime)
+    guint64 ntpnstime, GstClockTime running_time)
 {
   GstFlowReturn result = GST_FLOW_OK;
-  GList *item;
   ReportData data;
   RTPSource *own;
+  GHashTable *table_copy;
+  gboolean notify = FALSE;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
@@ -2415,6 +2990,8 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   data.ntpnstime = ntpnstime;
   data.is_bye = FALSE;
   data.has_sdes = FALSE;
+  data.may_suppress = FALSE;
+  data.running_time = running_time;
 
   own = sess->source;
 
@@ -2422,9 +2999,26 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   /* get a new interval, we need this for various cleanups etc */
   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
 
-  /* first perform cleanups */
+  /* Make a local copy of the hashtable. We need to do this because the
+   * cleanup stage below releases the session lock. */
+  table_copy = g_hash_table_new_full (NULL, NULL, NULL,
+      (GDestroyNotify) g_object_unref);
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) clone_ssrcs_hashtable, table_copy);
+
+  /* Clean up the session, mark the source for removing, this might release the
+   * session lock. */
+  g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
+  g_hash_table_destroy (table_copy);
+
+  /* Now remove the marked sources */
   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
-      (GHRFunc) session_cleanup, &data);
+      (GHRFunc) remove_closing_sources, NULL);
+
+  if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
+    data.is_early = TRUE;
+  else
+    data.is_early = FALSE;
 
   /* see if we need to generate SR or RR packets */
   if (is_rtcp_time (sess, current_time, &data)) {
@@ -2441,38 +3035,23 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   }
 
   if (data.rtcp) {
-    guint size;
-
     /* we keep track of the last report time in order to timeout inactive
      * receivers or senders */
-    sess->last_rtcp_send_time = data.current_time;
+    if (!data.is_early && !data.may_suppress)
+      sess->last_rtcp_send_time = data.current_time;
     sess->first_rtcp = FALSE;
+    sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
 
     /* add SDES for this source when not already added */
     if (!data.has_sdes)
       session_sdes (sess, &data);
-
-    /* update average RTCP size before sending */
-    size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
-    UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
   }
 
   /* check for outdated collisions */
-  GST_DEBUG ("checking collision list");
-  item = g_list_first (sess->conflicting_addresses);
-  while (item) {
-    RTPConflictingAddress *known_conflict = item->data;
-    GList *next_item = g_list_next (item);
-
-    if (known_conflict->time < current_time - (data.interval *
-            RTCP_INTERVAL_COLLISION_TIMEOUT)) {
-      sess->conflicting_addresses =
-          g_list_delete_link (sess->conflicting_addresses, item);
-      GST_DEBUG ("collision %p timed out", known_conflict);
-      g_free (known_conflict);
-    }
-    item = next_item;
-  }
+  GST_DEBUG ("Timing out collisions");
+  rtp_source_timeout (sess->source, current_time,
+      data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT,
+      running_time - sess->rtcp_feedback_retention_window);
 
   if (sess->change_ssrc) {
     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
@@ -2489,24 +3068,185 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
     sess->bye_reason = NULL;
     sess->sent_bye = FALSE;
     sess->change_ssrc = FALSE;
+    notify = TRUE;
     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
   }
+
+  sess->allow_early = TRUE;
+
   RTP_SESSION_UNLOCK (sess);
 
+  if (notify)
+    g_object_notify (G_OBJECT (sess), "internal-ssrc");
+
   /* push out the RTCP packet */
   if (data.rtcp) {
-    /* close the RTCP packet */
-    gst_rtcp_buffer_end (data.rtcp);
+    gboolean do_not_suppress;
 
-    GST_DEBUG ("sending packet");
-    if (sess->callbacks.send_rtcp)
-      result = sess->callbacks.send_rtcp (sess, own, data.rtcp,
-          sess->sent_bye, sess->send_rtcp_user_data);
-    else {
-      GST_DEBUG ("freeing packet");
+    /* Give the user a change to add its own packet */
+    g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
+        data.rtcp, data.is_early, &do_not_suppress);
+
+    if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
+      guint packet_size;
+
+      /* close the RTCP packet */
+      gst_rtcp_buffer_end (data.rtcp);
+
+      packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
+
+      UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
+      GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
+          sess->stats.avg_rtcp_packet_size, packet_size);
+      result =
+          sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye,
+          sess->send_rtcp_user_data);
+    } else {
+      GST_DEBUG ("freeing packet callback: %p"
+          " do_not_suppress: %d may_suppress: %d",
+          sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
       gst_buffer_unref (data.rtcp);
     }
   }
 
   return result;
 }
+
+void
+rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
+    GstClockTimeDiff max_delay)
+{
+  GstClockTime T_dither_max;
+
+  /* Implements the algorithm described in RFC 4585 section 3.5.2 */
+
+  RTP_SESSION_LOCK (sess);
+
+  /* Check if already requested */
+  /*  RFC 4585 section 3.5.2 step 2 */
+  if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
+    goto dont_send;
+
+  /* Ignore the request a scheduled packet will be in time anyway */
+  if (current_time + max_delay > sess->next_rtcp_check_time)
+    goto dont_send;
+
+  /*  RFC 4585 section 3.5.2 step 2b */
+  /* If the total sources is <=2, then there is only us and one peer */
+  if (sess->total_sources <= 2) {
+    T_dither_max = 0;
+  } else {
+    /* Divide by 2 because l = 0.5 */
+    T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+    T_dither_max /= 2;
+  }
+
+  /*  RFC 4585 section 3.5.2 step 3 */
+  if (current_time + T_dither_max > sess->next_rtcp_check_time)
+    goto dont_send;
+
+  /*  RFC 4585 section 3.5.2 step 4 */
+  if (sess->allow_early == FALSE)
+    goto dont_send;
+
+  if (T_dither_max) {
+    /* Schedule an early transmission later */
+    sess->next_early_rtcp_time = g_random_double () * T_dither_max +
+        current_time;
+  } else {
+    /* If no dithering, schedule it for NOW */
+    sess->next_early_rtcp_time = current_time;
+  }
+
+  RTP_SESSION_UNLOCK (sess);
+
+  /* notify app of need to send packet early
+   * and therefore of timeout change */
+  if (sess->callbacks.reconsider)
+    sess->callbacks.reconsider (sess, sess->reconsider_user_data);
+
+  return;
+
+dont_send:
+
+  RTP_SESSION_UNLOCK (sess);
+
+}
+
+void
+rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc)
+{
+  guint i;
+
+  for (i = 0; i < sess->rtcp_pli_requests->len; i++)
+    if (ssrc == g_array_index (sess->rtcp_pli_requests, guint32, i))
+      return;
+
+  g_array_append_val (sess->rtcp_pli_requests, ssrc);
+}
+
+static gboolean
+has_pli_compare_func (gconstpointer a, gconstpointer ignored)
+{
+  GstRTCPPacket packet;
+
+  packet.buffer = (GstBuffer *) a;
+  packet.offset = 0;
+
+  if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
+      gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
+    return TRUE;
+  else
+    return FALSE;
+}
+
+static gboolean
+rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer,
+    gboolean early)
+{
+  gboolean ret = FALSE;
+
+  RTP_SESSION_LOCK (sess);
+
+  while (sess->rtcp_pli_requests->len) {
+    GstRTCPPacket rtcppacket;
+    guint media_ssrc = g_array_index (sess->rtcp_pli_requests, guint32, 0);
+    RTPSource *media_src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+        GUINT_TO_POINTER (media_ssrc));
+
+    if (media_src && !rtp_source_has_retained (media_src,
+            has_pli_compare_func, NULL)) {
+      if (gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB, &rtcppacket)) {
+        gst_rtcp_packet_fb_set_type (&rtcppacket, GST_RTCP_PSFB_TYPE_PLI);
+        gst_rtcp_packet_fb_set_sender_ssrc (&rtcppacket,
+            rtp_source_get_ssrc (sess->source));
+        gst_rtcp_packet_fb_set_media_ssrc (&rtcppacket, media_ssrc);
+        ret = TRUE;
+      } else {
+        /* Break because the packet is full, will put next request in a
+         * further packet
+         */
+        break;
+      }
+    }
+
+    g_array_remove_index (sess->rtcp_pli_requests, 0);
+  }
+
+  RTP_SESSION_UNLOCK (sess);
+
+  return ret;
+}
+
+static void
+rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
+{
+  GstClockTime now;
+
+  if (!sess->callbacks.send_rtcp)
+    return;
+
+  now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+  rtp_session_request_early_rtcp (sess, now, max_delay);
+}