rtpsession: Use bandwidth calculation by default instead of some arbitrary hardcoded...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
index 8eb131f..a260186 100644 (file)
@@ -49,12 +49,14 @@ enum
   SIGNAL_ON_SENDING_RTCP,
   SIGNAL_ON_FEEDBACK_RTCP,
   SIGNAL_SEND_RTCP,
+  SIGNAL_SEND_RTCP_FULL,
+  SIGNAL_ON_RECEIVING_RTCP,
   LAST_SIGNAL
 };
 
 #define DEFAULT_INTERNAL_SOURCE      NULL
-#define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
+#define DEFAULT_BANDWIDTH            0.0
+#define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
 #define DEFAULT_RTCP_MTU             1400
@@ -86,7 +88,7 @@ enum
   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
   PROP_PROBATION,
-  PROP_LAST
+  PROP_STATS
 };
 
 /* update average packet size */
@@ -99,11 +101,6 @@ enum
    (avg) = ((val) + (15 * (avg))) >> 4;
 
 
-/* The number RTCP intervals after which to timeout entries in the
- * collision table
- */
-#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
-
 /* GObject vmethods */
 static void rtp_session_finalize (GObject * object);
 static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -111,7 +108,8 @@ static void rtp_session_set_property (GObject * object, guint prop_id,
 static void rtp_session_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
-static void rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp (RTPSession * sess,
+    GstClockTime max_delay);
 
 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
 
@@ -119,9 +117,9 @@ G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
 
 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
-    gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
+    gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
 static RTPSource *obtain_internal_source (RTPSession * sess,
-    guint32 ssrc, gboolean * created);
+    guint32 ssrc, gboolean * created, GstClockTime current_time);
 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
     GstClockTime current_time);
 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
@@ -322,6 +320,42 @@ rtp_session_class_init (RTPSessionClass * klass)
       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
 
+  /**
+   * RTPSession::send-rtcp-full:
+   * @session: the object which received the signal
+   * @max_delay: The maximum delay after which the feedback will not be useful
+   *  anymore
+   *
+   * Requests that the #RTPSession initiate a new RTCP packet as soon as
+   * possible within the requested delay.
+   *
+   * Returns: TRUE if the new RTCP packet could be scheduled within the
+   * requested delay, FALSE otherwise.
+   *
+   * Since: 1.6
+   */
+  rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
+      g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+      g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
+
+  /**
+   * RTPSession::on-receiving-rtcp
+   * @session: the object which received the signal
+   * @buffer: the #GstBuffer containing the RTCP packet that was received
+   *
+   * This signal is emitted when receiving an RTCP packet before it is handled
+   * by the session. It can be used to extract custom information from RTCP packets.
+   *
+   * Since: 1.6
+   */
+  rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
+      g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
+      GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
           "The internal SSRC used for the session (deprecated)",
@@ -432,9 +466,9 @@ rtp_session_class_init (RTPSessionClass * klass)
       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
           "RTCP Immediate Feedback threshold",
           "The maximum number of members of a RTP session for which immediate"
-          " feedback is used",
+          " feedback is used (DEPRECATED: has no effect and is not needed)",
           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
 
   g_object_class_install_property (gobject_class, PROP_PROBATION,
       g_param_spec_uint ("probation", "Number of probations",
@@ -442,6 +476,24 @@ rtp_session_class_init (RTPSessionClass * klass)
           0, G_MAXUINT, DEFAULT_PROBATION,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * RTPSession::stats:
+   *
+   * Various session statistics. This property returns a GstStructure
+   * with name application/x-rtp-session-stats with the following fields:
+   *
+   *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
+   *      dropped (due to bandwidth constraints)
+   *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
+   *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
+   *
+   * Since: 1.4
+   */
+  g_object_class_install_property (gobject_class, PROP_STATS,
+      g_param_spec_boxed ("stats", "Statistics",
+          "Various statistics", GST_TYPE_STRUCTURE,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
   klass->get_source_by_ssrc =
       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
@@ -460,7 +512,11 @@ rtp_session_init (RTPSession * sess)
   sess->mask_idx = 0;
   sess->mask = 0;
 
-  for (i = 0; i < 32; i++) {
+  /* TODO: We currently only use the first hash table but this is the
+   * beginning of an implementation for RFC2762
+   for (i = 0; i < 32; i++) {
+   */
+  for (i = 0; i < 1; i++) {
     sess->ssrcs[i] =
         g_hash_table_new_full (NULL, NULL, NULL,
         (GDestroyNotify) g_object_unref);
@@ -505,6 +561,7 @@ rtp_session_init (RTPSession * sess)
 
   sess->first_rtcp = TRUE;
   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+  sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
 
   sess->allow_early = TRUE;
   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
@@ -513,6 +570,8 @@ rtp_session_init (RTPSession * sess)
       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
 
   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
+
+  sess->is_doing_ptp = TRUE;
 }
 
 static void
@@ -525,7 +584,13 @@ rtp_session_finalize (GObject * object)
 
   gst_structure_free (sess->sdes);
 
-  for (i = 0; i < 32; i++)
+  g_list_free_full (sess->conflicting_addresses,
+      (GDestroyNotify) rtp_conflicting_address_free);
+
+  /* TODO: Change this again when implementing RFC 2762
+   * for (i = 0; i < 32; i++)
+   */
+  for (i = 0; i < 1; i++)
     g_hash_table_destroy (sess->ssrcs[i]);
 
   g_mutex_clear (&sess->lock);
@@ -563,6 +628,19 @@ rtp_session_create_sources (RTPSession * sess)
   return res;
 }
 
+static GstStructure *
+rtp_session_create_stats (RTPSession * sess)
+{
+  GstStructure *s;
+
+  s = gst_structure_new ("application/x-rtp-session-stats",
+      "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
+      "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
+      "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
+
+  return s;
+}
+
 static void
 rtp_session_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
@@ -573,6 +651,11 @@ rtp_session_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_INTERNAL_SSRC:
+      RTP_SESSION_LOCK (sess);
+      sess->suggested_ssrc = g_value_get_uint (value);
+      RTP_SESSION_UNLOCK (sess);
+      if (sess->callbacks.reconfigure)
+        sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
       break;
     case PROP_BANDWIDTH:
       RTP_SESSION_LOCK (sess);
@@ -684,6 +767,9 @@ rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_PROBATION:
       g_value_set_uint (value, sess->probation);
       break;
+    case PROP_STATS:
+      g_value_take_boxed (value, rtp_session_create_stats (sess));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -851,6 +937,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
     sess->callbacks.notify_nack = callbacks->notify_nack;
     sess->notify_nack_user_data = user_data;
   }
+  if (callbacks->reconfigure) {
+    sess->callbacks.reconfigure = callbacks->reconfigure;
+    sess->reconfigure_user_data = user_data;
+  }
 }
 
 /**
@@ -1161,14 +1251,51 @@ static RTPSourceCallbacks callbacks = {
   (RTPSourceClockRate) source_clock_rate,
 };
 
+
+/**
+ * rtp_session_find_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to check for
+ * @time: The time when the packet that is possibly in conflict arrived
+ *
+ * Checks if an address which has a conflict is already known. If it is
+ * a known conflict, remember the time
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+static gboolean
+rtp_session_find_conflicting_address (RTPSession * session,
+    GSocketAddress * address, GstClockTime time)
+{
+  return find_conflicting_address (session->conflicting_addresses, address,
+      time);
+}
+
+/**
+ * rtp_session_add_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to remember
+ * @time: The time when the packet that is in conflict arrived
+ *
+ * Adds a new conflict address
+ */
+static void
+rtp_session_add_conflicting_address (RTPSession * sess,
+    GSocketAddress * address, GstClockTime time)
+{
+  sess->conflicting_addresses =
+      add_conflicting_address (sess->conflicting_addresses, address, time);
+}
+
+
 static gboolean
 check_collision (RTPSession * sess, RTPSource * source,
-    RTPArrivalStats * arrival, gboolean rtp)
+    RTPPacketInfo * pinfo, gboolean rtp)
 {
   guint32 ssrc;
 
-  /* If we have no arrival address, we can't do collision checking */
-  if (!arrival->address)
+  /* If we have no pinfo address, we can't do collision checking */
+  if (!pinfo->address)
     return FALSE;
 
   ssrc = rtp_source_get_ssrc (source);
@@ -1185,17 +1312,17 @@ check_collision (RTPSession * sess, RTPSource * source,
     }
 
     if (from) {
-      if (__g_socket_address_equal (from, arrival->address)) {
+      if (__g_socket_address_equal (from, pinfo->address)) {
         /* Address is the same */
         return FALSE;
       } else {
         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
         if (sess->favor_new) {
           if (rtp_source_find_conflicting_address (source,
-                  arrival->address, arrival->current_time)) {
+                  pinfo->address, pinfo->current_time)) {
             gchar *buf1;
 
-            buf1 = __g_socket_address_to_string (arrival->address);
+            buf1 = __g_socket_address_to_string (pinfo->address);
             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
                 buf1);
             g_free (buf1);
@@ -1208,18 +1335,18 @@ check_collision (RTPSession * sess, RTPSource * source,
              * a new source. Save old address in possible conflict list
              */
             rtp_source_add_conflicting_address (source, from,
-                arrival->current_time);
+                pinfo->current_time);
 
             buf1 = __g_socket_address_to_string (from);
-            buf2 = __g_socket_address_to_string (arrival->address);
+            buf2 = __g_socket_address_to_string (pinfo->address);
 
             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
                 " saving old as known conflict", ssrc, buf1, buf2);
 
             if (rtp)
-              rtp_source_set_rtp_from (source, arrival->address);
+              rtp_source_set_rtp_from (source, pinfo->address);
             else
-              rtp_source_set_rtcp_from (source, arrival->address);
+              rtp_source_set_rtcp_from (source, pinfo->address);
 
             g_free (buf1);
             g_free (buf2);
@@ -1234,9 +1361,9 @@ check_collision (RTPSession * sess, RTPSource * source,
     } else {
       /* We don't already have a from address for RTP, just set it */
       if (rtp)
-        rtp_source_set_rtp_from (source, arrival->address);
+        rtp_source_set_rtp_from (source, pinfo->address);
       else
-        rtp_source_set_rtcp_from (source, arrival->address);
+        rtp_source_set_rtcp_from (source, pinfo->address);
       return FALSE;
     }
 
@@ -1246,16 +1373,16 @@ check_collision (RTPSession * sess, RTPSource * source,
      */
   } else {
     /* This is sending with our ssrc, is it an address we already know */
-    if (rtp_source_find_conflicting_address (source, arrival->address,
-            arrival->current_time)) {
+    if (rtp_session_find_conflicting_address (sess, pinfo->address,
+            pinfo->current_time)) {
       /* Its a known conflict, its probably a loop, not a collision
        * lets just drop the incoming packet
        */
       GST_DEBUG ("Our packets are being looped back to us, dropping");
     } else {
       /* Its a new collision, lets change our SSRC */
-      rtp_source_add_conflicting_address (source, arrival->address,
-          arrival->current_time);
+      rtp_session_add_conflicting_address (sess, pinfo->address,
+          pinfo->current_time);
 
       GST_DEBUG ("Collision for SSRC %x", ssrc);
       /* mark the source BYE */
@@ -1266,18 +1393,95 @@ check_collision (RTPSession * sess, RTPSource * source,
 
       on_ssrc_collision (sess, source);
 
-      rtp_session_schedule_bye_locked (sess, arrival->current_time);
+      rtp_session_schedule_bye_locked (sess, pinfo->current_time);
     }
   }
 
   return TRUE;
 }
 
-static RTPSource *
-find_source (RTPSession * sess, guint32 ssrc)
+typedef struct
 {
-  return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
-      GINT_TO_POINTER (ssrc));
+  gboolean is_doing_ptp;
+  GSocketAddress *new_addr;
+} CompareAddrData;
+
+/* check if the two given ip addr are the same (do not care about the port) */
+static gboolean
+ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
+{
+  return
+      g_inet_address_equal (g_inet_socket_address_get_address
+      (G_INET_SOCKET_ADDRESS (a)),
+      g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
+}
+
+static void
+compare_rtp_source_addr (const gchar * key, RTPSource * source,
+    CompareAddrData * data)
+{
+  /* only compare ip addr of remote sources which are also not closing */
+  if (!source->internal && !source->closing && source->rtp_from) {
+    /* look for the first rtp source */
+    if (!data->new_addr)
+      data->new_addr = source->rtp_from;
+    /* compare current ip addr with the first one */
+    else
+      data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
+  }
+}
+
+static void
+compare_rtcp_source_addr (const gchar * key, RTPSource * source,
+    CompareAddrData * data)
+{
+  /* only compare ip addr of remote sources which are also not closing */
+  if (!source->internal && !source->closing && source->rtcp_from) {
+    /* look for the first rtcp source */
+    if (!data->new_addr)
+      data->new_addr = source->rtcp_from;
+    else
+      /* compare current ip addr with the first one */
+      data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
+  }
+}
+
+/* loop over our non-internal source to know if the session
+ * is doing point-to-point */
+static void
+session_update_ptp (RTPSession * sess)
+{
+  /* to know if the session is doing point to point, the ip addr
+   * of each non-internal (=remotes) source have to be compared
+   * to each other.
+   */
+  gboolean is_doing_rtp_ptp;
+  gboolean is_doing_rtcp_ptp;
+  CompareAddrData data;
+
+  /* compare the first remote source's ip addr that receive rtp packets
+   * with other remote rtp source.
+   * it's enough because the session just needs to know if they are all
+   * equals or not
+   */
+  data.is_doing_ptp = TRUE;
+  data.new_addr = NULL;
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) compare_rtp_source_addr, (gpointer) & data);
+  is_doing_rtp_ptp = data.is_doing_ptp;
+
+  /* same but about rtcp */
+  data.is_doing_ptp = TRUE;
+  data.new_addr = NULL;
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
+  is_doing_rtcp_ptp = data.is_doing_ptp;
+
+  /* the session is doing point-to-point if all rtp remote have the same
+   * ip addr and if all rtcp remote sources have the same ip addr */
+  sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
+
+  GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
 }
 
 static void
@@ -1296,13 +1500,24 @@ add_source (RTPSession * sess, RTPSource * src)
     if (sess->suggested_ssrc != src->ssrc)
       sess->suggested_ssrc = src->ssrc;
   }
+
+  /* update point-to-point status */
+  if (!src->internal)
+    session_update_ptp (sess);
+}
+
+static RTPSource *
+find_source (RTPSession * sess, guint32 ssrc)
+{
+  return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+      GINT_TO_POINTER (ssrc));
 }
 
 /* must be called with the session lock, the returned source needs to be
  * unreffed after usage. */
 static RTPSource *
 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
-    RTPArrivalStats * arrival, gboolean rtp)
+    RTPPacketInfo * pinfo, gboolean rtp)
 {
   RTPSource *source;
 
@@ -1322,11 +1537,11 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
       g_object_set (source, "probation", 0, NULL);
 
     /* store from address, if any */
-    if (arrival->address) {
+    if (pinfo->address) {
       if (rtp)
-        rtp_source_set_rtp_from (source, arrival->address);
+        rtp_source_set_rtp_from (source, pinfo->address);
       else
-        rtp_source_set_rtcp_from (source, arrival->address);
+        rtp_source_set_rtcp_from (source, pinfo->address);
     }
 
     /* configure a callback on the source */
@@ -1337,7 +1552,7 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
   } else {
     *created = FALSE;
     /* check for collision, this updates the address when not previously set */
-    if (check_collision (sess, source, arrival, rtp)) {
+    if (check_collision (sess, source, pinfo, rtp)) {
       return NULL;
     }
     /* Receiving RTCP packets of an SSRC is a strong indication that we
@@ -1346,9 +1561,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
       g_object_set (source, "probation", 0, NULL);
   }
   /* update last activity */
-  source->last_activity = arrival->current_time;
+  source->last_activity = pinfo->current_time;
   if (rtp)
-    source->last_rtp_activity = arrival->current_time;
+    source->last_rtp_activity = pinfo->current_time;
   g_object_ref (source);
 
   return source;
@@ -1357,7 +1572,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
 /* must be called with the session lock, the returned source needs to be
  * unreffed after usage. */
 static RTPSource *
-obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
+obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+    GstClockTime current_time)
 {
   RTPSource *source;
 
@@ -1370,6 +1586,7 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
 
     source->validated = TRUE;
     source->internal = TRUE;
+    source->probation = FALSE;
     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
     rtp_source_set_callbacks (source, &callbacks, sess);
 
@@ -1378,6 +1595,11 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
   } else {
     *created = FALSE;
   }
+  /* update last activity */
+  if (current_time != GST_CLOCK_TIME_NONE) {
+    source->last_activity = current_time;
+    source->last_rtp_activity = current_time;
+  }
   g_object_ref (source);
 
   return source;
@@ -1499,7 +1721,7 @@ rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
 
   RTP_SESSION_LOCK (sess);
   result = find_source (sess, ssrc);
-  if (result)
+  if (result != NULL)
     g_object_ref (result);
   RTP_SESSION_UNLOCK (sess);
 
@@ -1550,51 +1772,104 @@ rtp_session_create_source (RTPSession * sess)
   return source;
 }
 
-/* update the RTPArrivalStats structure with the current time and other bits
- * about the current buffer we are handling.
- * This function is typically called when a validated packet is received.
- * This function should be called with the SESSION_LOCK
- */
-static void
-update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
-    gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
-    GstClockTime running_time, guint64 ntpnstime)
+static gboolean
+update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
 {
   GstNetAddressMeta *meta;
-  GstRTPBuffer rtpb = { NULL };
-
-  /* get time of arrival */
-  arrival->current_time = current_time;
-  arrival->running_time = running_time;
-  arrival->ntpnstime = ntpnstime;
 
   /* get packet size including header overhead */
-  arrival->bytes = gst_buffer_get_size (buffer) + sess->header_len;
+  pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
+  pinfo->packets++;
+
+  if (pinfo->rtp) {
+    GstRTPBuffer rtp = { NULL };
+
+    if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
+      goto invalid_packet;
+
+    pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
+    if (idx == 0) {
+      gint i;
+
+      /* only keep info for first buffer */
+      pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+      pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
+      pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
+      pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+      /* copy available csrc */
+      pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
+      for (i = 0; i < pinfo->csrc_count; i++)
+        pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
+    }
+    gst_rtp_buffer_unmap (&rtp);
+  }
 
-  if (rtp) {
-    gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpb);
-    arrival->payload_len = gst_rtp_buffer_get_payload_len (&rtpb);
-    gst_rtp_buffer_unmap (&rtpb);
-  } else {
-    arrival->payload_len = 0;
+  if (idx == 0) {
+    /* for netbuffer we can store the IP address to check for collisions */
+    meta = gst_buffer_get_net_address_meta (*buffer);
+    if (pinfo->address)
+      g_object_unref (pinfo->address);
+    if (meta) {
+      pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
+    } else {
+      pinfo->address = NULL;
+    }
   }
+  return TRUE;
 
-  /* for netbuffer we can store the IP address to check for collisions */
-  meta = gst_buffer_get_net_address_meta (buffer);
-  if (arrival->address)
-    g_object_unref (arrival->address);
-  if (meta) {
-    arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
+  /* ERRORS */
+invalid_packet:
+  {
+    GST_DEBUG ("invalid RTP packet received");
+    return FALSE;
+  }
+}
+
+/* update the RTPPacketInfo structure with the current time and other bits
+ * about the current buffer we are handling.
+ * This function is typically called when a validated packet is received.
+ * This function should be called with the SESSION_LOCK
+ */
+static gboolean
+update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
+    gboolean send, gboolean rtp, gboolean is_list, gpointer data,
+    GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
+{
+  gboolean res;
+
+  pinfo->send = send;
+  pinfo->rtp = rtp;
+  pinfo->is_list = is_list;
+  pinfo->data = data;
+  pinfo->current_time = current_time;
+  pinfo->running_time = running_time;
+  pinfo->ntpnstime = ntpnstime;
+  pinfo->header_len = sess->header_len;
+  pinfo->bytes = 0;
+  pinfo->payload_len = 0;
+  pinfo->packets = 0;
+
+  if (is_list) {
+    GstBufferList *list = GST_BUFFER_LIST_CAST (data);
+    res =
+        gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
+        pinfo);
   } else {
-    arrival->address = NULL;
+    GstBuffer *buffer = GST_BUFFER_CAST (data);
+    res = update_packet (&buffer, 0, pinfo);
   }
+  return res;
 }
 
 static void
-clean_arrival_stats (RTPArrivalStats * arrival)
+clean_packet_info (RTPPacketInfo * pinfo)
 {
-  if (arrival->address)
-    g_object_unref (arrival->address);
+  if (pinfo->address)
+    g_object_unref (pinfo->address);
+  if (pinfo->data) {
+    gst_mini_object_unref (pinfo->data);
+    pinfo->data = NULL;
+  }
 }
 
 static gboolean
@@ -1659,45 +1934,32 @@ source_update_sender (RTPSession * sess, RTPSource * source,
  */
 GstFlowReturn
 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
-    GstClockTime current_time, GstClockTime running_time)
+    GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
 {
   GstFlowReturn result;
   guint32 ssrc;
   RTPSource *source;
   gboolean created;
   gboolean prevsender, prevactive;
-  RTPArrivalStats arrival = { NULL, };
-  guint32 csrcs[16];
-  guint8 i, count;
+  RTPPacketInfo pinfo = { 0, };
   guint64 oldrate;
-  GstRTPBuffer rtp = { NULL };
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
-  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
-    goto invalid_packet;
-
-  /* get SSRC to look up in session database */
-  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
-  /* copy available csrc for later */
-  count = gst_rtp_buffer_get_csrc_count (&rtp);
-  /* make sure to not overflow our array. An RTP buffer can maximally contain
-   * 16 CSRCs */
-  count = MIN (count, 16);
-
-  for (i = 0; i < count; i++)
-    csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
-
-  gst_rtp_buffer_unmap (&rtp);
-
   RTP_SESSION_LOCK (sess);
 
-  /* update arrival stats */
-  update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
-      running_time, -1);
+  /* update pinfo stats */
+  if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
+          current_time, running_time, ntpnstime)) {
+    GST_DEBUG ("invalid RTP packet received");
+    RTP_SESSION_UNLOCK (sess);
+    return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
+  }
+
+  ssrc = pinfo.ssrc;
 
-  source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
+  source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
   if (!source)
     goto collision;
 
@@ -1706,7 +1968,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   oldrate = source->bitrate;
 
   /* let source process the packet */
-  result = rtp_source_process_rtp (source, buffer, &arrival);
+  result = rtp_source_process_rtp (source, &pinfo);
 
   /* source became active */
   if (source_update_active (sess, source, prevactive))
@@ -1722,16 +1984,17 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
 
   if (source->validated) {
     gboolean created;
+    gint i;
 
     /* for validated sources, we add the CSRCs as well */
-    for (i = 0; i < count; i++) {
+    for (i = 0; i < pinfo.csrc_count; i++) {
       guint32 csrc;
       RTPSource *csrc_src;
 
-      csrc = csrcs[i];
+      csrc = pinfo.csrcs[i];
 
       /* get source */
-      csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+      csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
       if (!csrc_src)
         continue;
 
@@ -1748,22 +2011,15 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
 
   RTP_SESSION_UNLOCK (sess);
 
-  clean_arrival_stats (&arrival);
+  clean_packet_info (&pinfo);
 
   return result;
 
   /* ERRORS */
-invalid_packet:
-  {
-    gst_buffer_unref (buffer);
-    GST_DEBUG ("invalid RTP packet received");
-    return GST_FLOW_OK;
-  }
 collision:
   {
     RTP_SESSION_UNLOCK (sess);
-    gst_buffer_unref (buffer);
-    clean_arrival_stats (&arrival);
+    clean_packet_info (&pinfo);
     GST_DEBUG ("ignoring packet because its collisioning");
     return GST_FLOW_OK;
   }
@@ -1771,7 +2027,7 @@ collision:
 
 static void
 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
-    GstRTCPPacket * packet, RTPArrivalStats * arrival)
+    GstRTCPPacket * packet, RTPPacketInfo * pinfo)
 {
   guint count, i;
 
@@ -1797,7 +2053,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
        * the sender of the RTCP message. We could also compare our stats against
        * the other sender to see if we are better or worse. */
       /* FIXME, need to keep track who the RB block is from */
-      rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
+      rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
           packetslost, exthighestseq, jitter, lsr, dlsr);
     }
   }
@@ -1815,7 +2071,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
  */
 static void
 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival, gboolean * do_sync)
+    RTPPacketInfo * pinfo, gboolean * do_sync)
 {
   guint32 senderssrc, rtptime, packet_count, octet_count;
   guint64 ntptime;
@@ -1826,12 +2082,16 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
       &packet_count, &octet_count);
 
   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
-      senderssrc, GST_TIME_ARGS (arrival->current_time));
+      senderssrc, GST_TIME_ARGS (pinfo->current_time));
 
-  source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+  source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
   if (!source)
     return;
 
+  /* skip non-bye packets for sources that are marked BYE */
+  if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+    goto out;
+
   /* don't try to do lip-sync for sources that sent a BYE */
   if (RTP_SOURCE_IS_MARKED_BYE (source))
     *do_sync = FALSE;
@@ -1841,7 +2101,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
   prevsender = RTP_SOURCE_IS_SENDER (source);
 
   /* first update the source */
-  rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
+  rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
       packet_count, octet_count);
 
   source_update_sender (sess, source, prevsender);
@@ -1849,7 +2109,9 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
   if (created)
     on_new_ssrc (sess, source);
 
-  rtp_session_process_rb (sess, source, packet, arrival);
+  rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
   g_object_unref (source);
 }
 
@@ -1861,7 +2123,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
  */
 static void
 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival)
+    RTPPacketInfo * pinfo)
 {
   guint32 senderssrc;
   RTPSource *source;
@@ -1871,21 +2133,27 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
 
   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
 
-  source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+  source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
   if (!source)
     return;
 
+  /* skip non-bye packets for sources that are marked BYE */
+  if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+    goto out;
+
   if (created)
     on_new_ssrc (sess, source);
 
-  rtp_session_process_rb (sess, source, packet, arrival);
+  rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
   g_object_unref (source);
 }
 
 /* Get SDES items and store them in the SSRC */
 static void
 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival)
+    RTPPacketInfo * pinfo)
 {
   guint items, i, j;
   gboolean more_items, more_entries;
@@ -1908,10 +2176,14 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
     changed = FALSE;
 
     /* find src, no probation when dealing with RTCP */
-    source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+    source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
     if (!source)
       return;
 
+    /* skip non-bye packets for sources that are marked BYE */
+    if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+      goto next;
+
     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
 
     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
@@ -1963,6 +2235,7 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
     if (changed)
       on_ssrc_sdes (sess, source);
 
+  next:
     g_object_unref (source);
 
     more_items = gst_rtcp_packet_sdes_next_item (packet);
@@ -1974,7 +2247,7 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
  */
 static void
 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival)
+    RTPPacketInfo * pinfo)
 {
   guint count, i;
   gchar *reason;
@@ -1994,7 +2267,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
     GST_DEBUG ("SSRC: %08x", ssrc);
 
     /* find src and mark bye, no probation when dealing with RTCP */
-    source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+    source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
     if (!source)
       return;
 
@@ -2005,7 +2278,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
     }
 
     /* store time for when we need to time out this source */
-    source->bye_time = arrival->current_time;
+    source->bye_time = pinfo->current_time;
 
     prevactive = RTP_SOURCE_IS_ACTIVE (source);
     prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -2025,17 +2298,17 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
        * Perform reverse reconsideration but only when we are not scheduling a
        * BYE ourselves. */
       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
-          arrival->current_time < sess->next_rtcp_check_time) {
+          pinfo->current_time < sess->next_rtcp_check_time) {
         GstClockTime time_remaining;
 
-        time_remaining = sess->next_rtcp_check_time - arrival->current_time;
+        time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
         sess->next_rtcp_check_time =
             gst_util_uint64_scale (time_remaining, members, pmembers);
 
         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
             GST_TIME_ARGS (sess->next_rtcp_check_time));
 
-        sess->next_rtcp_check_time += arrival->current_time;
+        sess->next_rtcp_check_time += pinfo->current_time;
 
         /* mark pending reconsider. We only want to signal the reconsideration
          * once after we handled all the source in the bye packet */
@@ -2062,7 +2335,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
 
 static void
 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival)
+    RTPPacketInfo * pinfo)
 {
   GST_DEBUG ("received APP");
 }
@@ -2084,7 +2357,7 @@ rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
           fir ? "FIR" : "PLI",
           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
-          GST_TIME_ARGS (round_trip_in_ns));;
+          GST_TIME_ARGS (round_trip_in_ns));
       return FALSE;
     }
   }
@@ -2113,10 +2386,12 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
     return;
 
   src = find_source (sess, sender_ssrc);
-  if (!src)
+  if (src == NULL)
     return;
 
   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+
+  src->stats.recv_pli_count++;
 }
 
 static void
@@ -2161,6 +2436,9 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
     ssrc = GST_READ_UINT32_BE (data);
 
     own = find_source (sess, ssrc);
+    if (own == NULL)
+      continue;
+
     if (own->internal) {
       our_request = TRUE;
       break;
@@ -2170,6 +2448,7 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
     return;
 
   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+  src->stats.recv_fir_count++;
 }
 
 static void
@@ -2177,6 +2456,8 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
     GstClockTime current_time)
 {
+  sess->stats.nacks_received++;
+
   if (!sess->callbacks.notify_nack)
     return;
 
@@ -2186,10 +2467,10 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
     seqnum = GST_READ_UINT16_BE (fci_data);
     blp = GST_READ_UINT16_BE (fci_data + 2);
 
-    GST_DEBUG ("NACK #%u, blp %04x", seqnum, blp);
+    GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
 
     RTP_SESSION_UNLOCK (sess);
-    sess->callbacks.notify_nack (sess, seqnum, blp,
+    sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
         sess->notify_nack_user_data);
     RTP_SESSION_LOCK (sess);
 
@@ -2200,7 +2481,7 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
 
 static void
 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
-    RTPArrivalStats * arrival, GstClockTime current_time)
+    RTPPacketInfo * pinfo, GstClockTime current_time)
 {
   GstRTCPType type = gst_rtcp_packet_get_type (packet);
   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
@@ -2210,6 +2491,12 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
   RTPSource *src;
 
+  src = find_source (sess, media_ssrc);
+
+  /* skip non-bye packets for sources that are marked BYE */
+  if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
+    return;
+
   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
 
@@ -2221,7 +2508,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
           fci_length);
-      GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
+      GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
     }
 
     RTP_SESSION_UNLOCK (sess);
@@ -2233,15 +2520,11 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
       gst_buffer_unref (fci_buffer);
   }
 
-  src = find_source (sess, media_ssrc);
-  if (!src)
-    return;
-
-  if (sess->rtcp_feedback_retention_window) {
-    rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
+  if (src && sess->rtcp_feedback_retention_window) {
+    rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
   }
 
-  if (src->internal ||
+  if ((src && src->internal) ||
       /* PSFB FIR puts the media ssrc inside the FCI */
       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
     switch (type) {
@@ -2292,7 +2575,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 {
   GstRTCPPacket packet;
   gboolean more, is_bye = FALSE, do_sync = FALSE;
-  RTPArrivalStats arrival = { NULL, };
+  RTPPacketInfo pinfo = { 0, };
   GstFlowReturn result = GST_FLOW_OK;
   GstRTCPBuffer rtcp = { NULL, };
 
@@ -2304,10 +2587,13 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   GST_DEBUG ("received RTCP packet");
 
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
+      buffer);
+
   RTP_SESSION_LOCK (sess);
-  /* update arrival stats */
-  update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
-      ntpnstime);
+  /* update pinfo stats */
+  update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
+      -1, ntpnstime);
 
   /* start processing the compound packet */
   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
@@ -2317,40 +2603,33 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
     type = gst_rtcp_packet_get_type (&packet);
 
-    /* when we are leaving the session, we should ignore all non-BYE messages */
-    if (sess->scheduled_bye && type != GST_RTCP_TYPE_BYE) {
-      GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
-      goto next;
-    }
-
     switch (type) {
       case GST_RTCP_TYPE_SR:
-        rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
+        rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
         break;
       case GST_RTCP_TYPE_RR:
-        rtp_session_process_rr (sess, &packet, &arrival);
+        rtp_session_process_rr (sess, &packet, &pinfo);
         break;
       case GST_RTCP_TYPE_SDES:
-        rtp_session_process_sdes (sess, &packet, &arrival);
+        rtp_session_process_sdes (sess, &packet, &pinfo);
         break;
       case GST_RTCP_TYPE_BYE:
         is_bye = TRUE;
         /* don't try to attempt lip-sync anymore for streams with a BYE */
         do_sync = FALSE;
-        rtp_session_process_bye (sess, &packet, &arrival);
+        rtp_session_process_bye (sess, &packet, &pinfo);
         break;
       case GST_RTCP_TYPE_APP:
-        rtp_session_process_app (sess, &packet, &arrival);
+        rtp_session_process_app (sess, &packet, &pinfo);
         break;
       case GST_RTCP_TYPE_RTPFB:
       case GST_RTCP_TYPE_PSFB:
-        rtp_session_process_feedback (sess, &packet, &arrival, current_time);
+        rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
         break;
       default:
         GST_WARNING ("got unknown RTCP packet");
         break;
     }
-  next:
     more = gst_rtcp_packet_move_to_next (&packet);
   }
 
@@ -2358,20 +2637,20 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   /* if we are scheduling a BYE, we only want to count bye packets, else we
    * count everything */
-  if (sess->scheduled_bye) {
-    if (is_bye) {
-      sess->stats.bye_members++;
-      UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
-    }
-  } else {
-    /* keep track of average packet size */
-    UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+  if (sess->scheduled_bye && is_bye) {
+    sess->bye_stats.bye_members++;
+    UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
   }
+
+  /* keep track of average packet size */
+  UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+
   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
-      sess->stats.avg_rtcp_packet_size, arrival.bytes);
+      sess->stats.avg_rtcp_packet_size, pinfo.bytes);
   RTP_SESSION_UNLOCK (sess);
 
-  clean_arrival_stats (&arrival);
+  pinfo.data = NULL;
+  clean_packet_info (&pinfo);
 
   /* notify caller of sr packets in the callback */
   if (do_sync && sess->callbacks.sync_rtcp) {
@@ -2416,11 +2695,20 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
     gboolean created;
 
     RTP_SESSION_LOCK (sess);
-    source = obtain_internal_source (sess, ssrc, &created);
+    source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
     if (source) {
       rtp_source_update_caps (source, caps);
       g_object_unref (source);
     }
+
+    if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
+      source =
+          obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+      if (source) {
+        rtp_source_update_caps (source, caps);
+        g_object_unref (source);
+      }
+    }
     RTP_SESSION_UNLOCK (sess);
   }
 }
@@ -2446,9 +2734,7 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
   RTPSource *source;
   gboolean prevsender;
   guint64 oldrate;
-  GstBuffer *buffer;
-  GstRTPBuffer rtp = { NULL };
-  guint32 ssrc;
+  RTPPacketInfo pinfo = { 0, };
   gboolean created;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
@@ -2456,35 +2742,18 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
 
   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
 
-  if (is_list) {
-    GstBufferList *list = GST_BUFFER_LIST_CAST (data);
-
-    buffer = gst_buffer_list_get (list, 0);
-    if (!buffer)
-      goto no_buffer;
-  } else {
-    buffer = GST_BUFFER_CAST (data);
-  }
-
-  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
-    goto invalid_packet;
-
-  /* get SSRC and look up in session database */
-  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
-
-  gst_rtp_buffer_unmap (&rtp);
-
   RTP_SESSION_LOCK (sess);
-  source = obtain_internal_source (sess, ssrc, &created);
+  if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
+          current_time, running_time, -1))
+    goto invalid_packet;
 
-  /* update last activity */
-  source->last_rtp_activity = current_time;
+  source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
   oldrate = source->bitrate;
 
   /* we use our own source to send */
-  result = rtp_source_send_rtp (source, data, is_list, running_time);
+  result = rtp_source_send_rtp (source, &pinfo);
 
   source_update_sender (sess, source, prevsender);
 
@@ -2493,21 +2762,17 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
   RTP_SESSION_UNLOCK (sess);
 
   g_object_unref (source);
+  clean_packet_info (&pinfo);
 
   return result;
 
 invalid_packet:
   {
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    RTP_SESSION_UNLOCK (sess);
     GST_DEBUG ("invalid RTP packet received");
     return GST_FLOW_OK;
   }
-no_buffer:
-  {
-    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
-    GST_DEBUG ("no buffer in list");
-    return GST_FLOW_OK;
-  }
 }
 
 static void
@@ -2522,6 +2787,7 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
     gboolean first)
 {
   GstClockTime result;
+  RTPSessionStats *stats;
 
   /* recalculate bandwidth when it changed */
   if (sess->recalc_bandwidth) {
@@ -2535,9 +2801,8 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
 
       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
           (GHFunc) add_bitrates, &bandwidth);
-      bandwidth /= 8.0;
     }
-    if (bandwidth < 8000)
+    if (bandwidth < RTP_STATS_BANDWIDTH)
       bandwidth = RTP_STATS_BANDWIDTH;
 
     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
@@ -2547,17 +2812,19 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
   }
 
   if (sess->scheduled_bye) {
-    result = rtp_stats_calculate_bye_interval (&sess->stats);
+    stats = &sess->bye_stats;
+    result = rtp_stats_calculate_bye_interval (stats);
   } else {
-    result = rtp_stats_calculate_rtcp_interval (&sess->stats,
-        sess->stats.internal_sender_sources > 0, first);
+    stats = &sess->stats;
+    result = rtp_stats_calculate_rtcp_interval (stats,
+        stats->internal_sender_sources > 0, first);
   }
 
   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
       GST_TIME_ARGS (result), first);
 
   if (!deterministic && result != GST_CLOCK_TIME_NONE)
-    result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+    result = rtp_stats_add_rtcp_jitter (stats, result);
 
   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
 
@@ -2605,8 +2872,9 @@ rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
   /* we schedule BYE now */
   sess->scheduled_bye = TRUE;
   /* at least one member wants to send a BYE */
-  INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
-  sess->stats.bye_members = 1;
+  memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
+  INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
+  sess->bye_stats.bye_members = 1;
   sess->first_rtcp = TRUE;
   sess->allow_early = TRUE;
 
@@ -2644,7 +2912,7 @@ done:
 GstFlowReturn
 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
 {
-  GstFlowReturn result = GST_FLOW_OK;
+  GstFlowReturn result;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
@@ -2675,6 +2943,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
   RTP_SESSION_LOCK (sess);
 
   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+    GST_DEBUG ("have early rtcp time");
     result = sess->next_early_rtcp_time;
     goto early_exit;
   }
@@ -2693,7 +2962,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
   }
 
   if (sess->scheduled_bye) {
-    if (sess->stats.active_sources >= 50) {
+    if (sess->bye_stats.active_sources >= 50) {
       GST_DEBUG ("reconsider BYE, more than 50 sources");
       /* reconsider BYE if members >= 50 */
       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
@@ -2753,6 +3022,7 @@ typedef struct
   gboolean is_early;
   gboolean may_suppress;
   GQueue output;
+  guint nacked_seqnums;
 } ReportData;
 
 static void
@@ -2812,15 +3082,21 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
     return;
   }
 
-  /* only report about other sender */
-  if (source == data->source)
-    goto reported;
+  if (g_hash_table_contains (source->reported_in_sr_of,
+          GUINT_TO_POINTER (data->source->ssrc))) {
+    GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
+    return;
+  }
 
   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
     GST_DEBUG ("max RB count reached");
     return;
   }
 
+  /* only report about other sender */
+  if (source == data->source)
+    goto reported;
+
   if (!RTP_SOURCE_IS_SENDER (source)) {
     GST_DEBUG ("source %08x not sender", source->ssrc);
     goto reported;
@@ -2846,14 +3122,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
       exthighestseq, jitter, lsr, dlsr);
 
 reported:
-  /* source is reported, move to next generation */
-  source->generation = sess->generation + 1;
-
-  /* if we reported all sources in this generation, move to next */
-  if (--data->num_to_report == 0) {
-    sess->generation++;
-    GST_DEBUG ("all reported, generation now %u", sess->generation);
-  }
+  g_hash_table_add (source->reported_in_sr_of,
+      GUINT_TO_POINTER (data->source->ssrc));
 }
 
 /* construct FIR */
@@ -2881,6 +3151,7 @@ session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
   fci_data[1] = fci_data[2] = fci_data[3] = 0;
 
   source->send_fir = FALSE;
+  source->stats.sent_fir_count++;
 }
 
 static void
@@ -2949,6 +3220,8 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data)
 
   source->send_pli = FALSE;
   data->may_suppress = FALSE;
+
+  source->stats.sent_pli_count++;
 }
 
 /* construct NACK */
@@ -2982,6 +3255,7 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data)
   for (i = 0; i < n_nacks; i++) {
     GST_WRITE_UINT32_BE (fci_data, nacks[i]);
     fci_data += 4;
+    data->nacked_seqnums++;
   }
 
   rtp_source_clear_nacks (source);
@@ -3006,8 +3280,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
   if (source->internal) {
     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
     rtp_source_timeout (source, data->current_time,
-        /* "a relatively long time" -- RFC 3550 section 8.2 */
-        RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
         data->running_time - sess->rtcp_feedback_retention_window);
   }
 
@@ -3041,26 +3313,41 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
       GST_TIME_ARGS (binterval));
 
-  if (!source->internal) {
-    if (source->marked_bye) {
-      /* if we received a BYE from the source, remove the source after some
-       * time. */
-      if (data->current_time > source->bye_time &&
-          data->current_time - source->bye_time > sess->stats.bye_timeout) {
-        GST_DEBUG ("removing BYE source %08x", source->ssrc);
-        remove = TRUE;
-        byetimeout = TRUE;
-      }
+  if (!source->internal && source->marked_bye) {
+    /* if we received a BYE from the source, remove the source after some
+     * time. */
+    if (data->current_time > source->bye_time &&
+        data->current_time - source->bye_time > sess->stats.bye_timeout) {
+      GST_DEBUG ("removing BYE source %08x", source->ssrc);
+      remove = TRUE;
+      byetimeout = TRUE;
     }
-    /* sources that were inactive for more than 5 times the deterministic reporting
-     * interval get timed out. the min timeout is 5 seconds. */
-    /* mind old time that might pre-date last time going to PLAYING */
-    btime = MAX (source->last_activity, sess->start_time);
-    if (data->current_time > btime) {
-      interval = MAX (binterval * 5, 5 * GST_SECOND);
-      if (data->current_time - btime > interval) {
-        GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
-            source->ssrc, GST_TIME_ARGS (btime));
+  }
+
+  if (source->internal && source->sent_bye) {
+    GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
+    remove = TRUE;
+  }
+
+  /* sources that were inactive for more than 5 times the deterministic reporting
+   * interval get timed out. the min timeout is 5 seconds. */
+  /* mind old time that might pre-date last time going to PLAYING */
+  btime = MAX (source->last_activity, sess->start_time);
+  if (data->current_time > btime) {
+    interval = MAX (binterval * 5, 5 * GST_SECOND);
+    if (data->current_time - btime > interval) {
+      GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+          source->ssrc, GST_TIME_ARGS (btime));
+      if (source->internal) {
+        /* this is an internal source that is not using our suggested ssrc.
+         * since there must be another source using this ssrc, we can remove
+         * this one instead of making it a receiver forever */
+        if (source->ssrc != sess->suggested_ssrc) {
+          rtp_source_mark_bye (source, "timed out");
+          /* do not schedule bye here, since we are inside the RTCP timeout
+           * processing and scheduling bye will interfere with SR/RR sending */
+        }
+      } else {
         remove = TRUE;
       }
     }
@@ -3074,16 +3361,9 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
     if (data->current_time > btime) {
       interval = MAX (binterval * 2, 5 * GST_SECOND);
       if (data->current_time - btime > interval) {
-        if (source->internal && source->sent_bye) {
-          /* an internal source is BYE and stopped sending RTP, remove */
-          GST_DEBUG ("internal BYE source %08x timed out, last %"
-              GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
-          remove = TRUE;
-        } else {
-          GST_DEBUG ("sender source %08x timed out and became receiver, last %"
-              GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
-          sendertimeout = TRUE;
-        }
+        GST_DEBUG ("sender source %08x timed out and became receiver, last %"
+            GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
+        sendertimeout = TRUE;
       }
     }
   }
@@ -3208,15 +3488,26 @@ make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
 static gboolean
 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
 {
-  GstClockTime new_send_time, elapsed;
+  GstClockTime new_send_time;
+  GstClockTime interval;
+  RTPSessionStats *stats;
+
+  if (sess->scheduled_bye)
+    stats = &sess->bye_stats;
+  else
+    stats = &sess->stats;
 
   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
     data->is_early = TRUE;
   else
     data->is_early = FALSE;
 
-  if (data->is_early && sess->next_early_rtcp_time < current_time)
+  if (data->is_early && sess->next_early_rtcp_time < current_time) {
+    GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
+        GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
+        GST_TIME_ARGS (current_time));
     goto early;
+  }
 
   /* no need to check yet */
   if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
@@ -3227,62 +3518,70 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
     return FALSE;
   }
 
-  /* get elapsed time since we last reported */
-  elapsed = current_time - sess->last_rtcp_send_time;
+early:
 
-  new_send_time = data->interval;
-  /* perform forward reconsideration */
-  if (new_send_time != GST_CLOCK_TIME_NONE) {
-    new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, new_send_time);
+  /* take interval and add jitter */
+  interval = data->interval;
+  if (interval != GST_CLOCK_TIME_NONE)
+    interval = rtp_stats_add_rtcp_jitter (stats, interval);
 
-    GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time),
-        GST_TIME_ARGS (elapsed));
+  if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
+    /* perform forward reconsideration */
+    if (interval != GST_CLOCK_TIME_NONE) {
+      GstClockTime elapsed;
 
-    new_send_time += sess->last_rtcp_send_time;
-  }
+      /* get elapsed time since we last reported */
+      elapsed = current_time - sess->last_rtcp_send_time;
 
-  /* check if reconsideration */
-  if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
-    GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (new_send_time));
-    /* store new check time */
-    sess->next_rtcp_check_time = new_send_time;
-    return FALSE;
+      GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
+      new_send_time = interval + sess->last_rtcp_send_time;
+    } else {
+      new_send_time = sess->last_rtcp_send_time;
+    }
+  } else {
+    /* If this is the first RTCP packet, we can reconsider anything based
+     * on the last RTCP send time because there was none.
+     */
+    g_warn_if_fail (!data->is_early);
+    data->is_early = FALSE;
+    new_send_time = current_time;
   }
 
-early:
-
-  new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
-
-  GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (new_send_time));
-
-  sess->next_rtcp_check_time = new_send_time;
-  if (new_send_time != GST_CLOCK_TIME_NONE) {
-    sess->next_rtcp_check_time += current_time;
-
+  if (!data->is_early) {
+    /* check if reconsideration */
+    if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
+      GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (new_send_time));
+      /* store new check time */
+      sess->next_rtcp_check_time = new_send_time;
+      return FALSE;
+    }
+    sess->next_rtcp_check_time = current_time + interval;
+  } else if (interval != GST_CLOCK_TIME_NONE) {
     /* Apply the rules from RFC 4585 section 3.5.3 */
-    if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
+    if (stats->min_interval != 0 && !sess->first_rtcp) {
       GstClockTime T_rr_current_interval =
-          g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
+          g_random_double_range (0.5, 1.5) * stats->min_interval;
 
       /* This will caused the RTCP to be suppressed if no FB packets are added */
-      if (sess->last_rtcp_send_time + T_rr_current_interval >
-          sess->next_rtcp_check_time) {
+      if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
         GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
             " last: %" GST_TIME_FORMAT
             " + T_rr_current_interval: %" GST_TIME_FORMAT
-            " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (sess->stats.min_interval),
+            " >  new_send_time: %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (stats->min_interval),
             GST_TIME_ARGS (sess->last_rtcp_send_time),
             GST_TIME_ARGS (T_rr_current_interval),
-            GST_TIME_ARGS (sess->next_rtcp_check_time));
+            GST_TIME_ARGS (new_send_time));
         data->may_suppress = TRUE;
       }
     }
   }
 
+  GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (new_send_time));
+
   return TRUE;
 }
 
@@ -3320,6 +3619,10 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
   if (!source->internal || source->sent_bye)
     return;
 
+  /* ignore other sources when we do the timeout after a scheduled BYE */
+  if (sess->scheduled_bye && !source->marked_bye)
+    return;
+
   data->source = source;
 
   /* open packet */
@@ -3359,6 +3662,28 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
   g_queue_push_tail (&data->output, output);
 }
 
+static void
+update_generation (const gchar * key, RTPSource * source, ReportData * data)
+{
+  RTPSession *sess = data->sess;
+
+  if (g_hash_table_size (source->reported_in_sr_of) >=
+      sess->stats.internal_sources) {
+    /* source is reported, move to next generation */
+    source->generation = sess->generation + 1;
+    g_hash_table_remove_all (source->reported_in_sr_of);
+
+    GST_LOG ("reported source %x, new generation: %d", source->ssrc,
+        source->generation);
+
+    /* if we reported all sources in this generation, move to next */
+    if (--data->num_to_report == 0) {
+      sess->generation++;
+      GST_DEBUG ("all reported, generation now %u", sess->generation);
+    }
+  }
+}
+
 /**
  * rtp_session_on_timeout:
  * @sess: an #RTPSession
@@ -3398,21 +3723,28 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   data.running_time = running_time;
   data.num_to_report = 0;
   data.may_suppress = FALSE;
+  data.nacked_seqnums = 0;
   g_queue_init (&data.output);
 
   RTP_SESSION_LOCK (sess);
   /* get a new interval, we need this for various cleanups etc */
   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
 
+  GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
+
   /* we need an internal source now */
   if (sess->stats.internal_sources == 0) {
     RTPSource *source;
     gboolean created;
 
-    source = obtain_internal_source (sess, sess->suggested_ssrc, &created);
+    source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
+        current_time);
     g_object_unref (source);
   }
 
+  sess->conflicting_addresses =
+      timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
+
   /* Make a local copy of the hashtable. We need to do this because the
    * cleanup stage below releases the session lock. */
   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
@@ -3429,23 +3761,36 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
       (GHRFunc) remove_closing_sources, &data);
 
+  /* update point-to-point status */
+  session_update_ptp (sess);
+
   /* see if we need to generate SR or RR packets */
   if (!is_rtcp_time (sess, current_time, &data))
     goto done;
 
-  GST_DEBUG ("doing RTCP generation %u for %u sources", sess->generation,
-      data.num_to_report);
+  GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
+      sess->generation, data.num_to_report, data.is_early);
 
   /* generate RTCP for all internal sources */
   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
       (GHFunc) generate_rtcp, &data);
 
+  /* update the generation for all the sources that have been reported */
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) update_generation, &data);
+
   /* we keep track of the last report time in order to timeout inactive
    * receivers or senders */
   if (!data.is_early && !data.may_suppress)
     sess->last_rtcp_send_time = data.current_time;
   sess->first_rtcp = FALSE;
   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+  sess->scheduled_bye = FALSE;
+
+  /*  RFC 4585 section 3.5.2 step 6 */
+  if (!data.is_early) {
+    sess->allow_early = TRUE;
+  }
 
 done:
   RTP_SESSION_UNLOCK (sess);
@@ -3471,10 +3816,12 @@ done:
       result =
           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
           sess->send_rtcp_user_data);
+      sess->stats.nacks_sent += data.nacked_seqnums;
     } else {
       GST_DEBUG ("freeing packet callback: %p"
           " do_not_suppress: %d may_suppress: %d",
           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
+      sess->stats.nacks_dropped += data.nacked_seqnums;
       gst_buffer_unref (buffer);
     }
     g_object_unref (source);
@@ -3490,12 +3837,15 @@ done:
  * @max_delay: maximum delay
  *
  * Request transmission of early RTCP
+ *
+ * Returns: %TRUE if the related RTCP can be scheduled.
  */
-void
+gboolean
 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
     GstClockTime max_delay)
 {
-  GstClockTime T_dither_max;
+  GstClockTime T_dither_max, T_rr;
+  gboolean ret;
 
   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
 
@@ -3503,39 +3853,92 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
 
   /* Check if already requested */
   /*  RFC 4585 section 3.5.2 step 2 */
-  if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
-    goto dont_send;
+  if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+    GST_LOG_OBJECT (sess, "already have next early rtcp time");
+    ret = TRUE;
+    goto end;
+  }
+
+  if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
+    GST_LOG_OBJECT (sess, "no next RTCP check time");
+    ret = FALSE;
+    goto end;
+  }
 
-  if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time))
-    goto dont_send;
+  /* RFC 4585 section 3.5.3 step 1
+   * If no regular RTCP packet has been sent before, then a regular
+   * RTCP packet has to be scheduled first and FB messages might be
+   * included there
+   */
+  if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
+    GST_LOG_OBJECT (sess, "no RTCP sent yet");
+
+    if (current_time + max_delay > sess->next_rtcp_check_time) {
+      GST_LOG_OBJECT (sess,
+          "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+          " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+          GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = TRUE;
+    } else {
+      GST_LOG_OBJECT (sess,
+          "can't allow early feedback, next scheduled time is too late %"
+          GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = FALSE;
+    }
+    goto end;
+  }
 
-  /* Ignore the request a scheduled packet will be in time anyway */
-  if (current_time + max_delay > sess->next_rtcp_check_time)
-    goto dont_send;
+  T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
 
   /*  RFC 4585 section 3.5.2 step 2b */
   /* If the total sources is <=2, then there is only us and one peer */
-  if (sess->total_sources <= 2) {
+  /* When there is one auxiliary stream the session can still do point
+   * to point.
+   */
+  if (sess->is_doing_ptp) {
     T_dither_max = 0;
   } else {
     /* Divide by 2 because l = 0.5 */
-    T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+    T_dither_max = T_rr;
     T_dither_max /= 2;
   }
 
   /*  RFC 4585 section 3.5.2 step 3 */
-  if (current_time + T_dither_max > sess->next_rtcp_check_time)
-    goto dont_send;
-
-  /*  RFC 4585 section 3.5.2 step 4
-   * Don't send if allow_early is FALSE, but not if we are in
-   * immediate mode, meaning we are part of a group of at most the
-   * application-specific threshold.
-   */
-  if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
-      sess->allow_early == FALSE)
-    goto dont_send;
+  if (current_time + T_dither_max > sess->next_rtcp_check_time) {
+    GST_LOG_OBJECT (sess,
+        "don't send because of dither, next scheduled time is soon %"
+        GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
+        GST_TIME_ARGS (sess->next_rtcp_check_time));
+    ret = TRUE;
+    goto end;
+  }
+
+  /*  RFC 4585 section 3.5.2 step 4a */
+  if (sess->allow_early == FALSE) {
+    /* Ignore the request a scheduled packet will be in time anyway */
+    if (current_time + max_delay > sess->next_rtcp_check_time) {
+      GST_LOG_OBJECT (sess,
+          "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+          " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+          GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = TRUE;
+    } else {
+      GST_LOG_OBJECT (sess,
+          "can't allow early feedback, next scheduled time is too late %"
+          GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = FALSE;
+    }
+    goto end;
+  }
 
+  /*  RFC 4585 section 3.5.2 step 4b */
   if (T_dither_max) {
     /* Schedule an early transmission later */
     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
@@ -3545,6 +3948,18 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
     sess->next_early_rtcp_time = current_time;
   }
 
+  /*  RFC 4585 section 3.5.2 step 6 */
+  sess->allow_early = FALSE;
+  /* Delay next regular RTCP packet to not exceed the short-term
+   * RTCP bandwidth when using early feedback as compared to
+   * without */
+  sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
+  sess->last_rtcp_send_time += T_rr;
+
+  GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
+      ", next regular RTCP time %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (sess->next_early_rtcp_time),
+      GST_TIME_ARGS (sess->next_rtcp_check_time));
   RTP_SESSION_UNLOCK (sess);
 
   /* notify app of need to send packet early
@@ -3552,34 +3967,43 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
   if (sess->callbacks.reconsider)
     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
 
-  return;
+  return TRUE;
 
-dont_send:
+end:
 
   RTP_SESSION_UNLOCK (sess);
+
+  return ret;
 }
 
-static void
+static gboolean
 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
 {
   GstClockTime now;
 
   if (!sess->callbacks.send_rtcp)
-    return;
+    return FALSE;
 
   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
 
-  rtp_session_request_early_rtcp (sess, now, max_delay);
+  return rtp_session_request_early_rtcp (sess, now, max_delay);
 }
 
 gboolean
 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
     gboolean fir, gint count)
 {
-  RTPSource *src = find_source (sess, ssrc);
+  RTPSource *src;
 
-  if (!src)
+  if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+    GST_DEBUG ("FIR/PLI not sent");
     return FALSE;
+  }
+
+  RTP_SESSION_LOCK (sess);
+  src = find_source (sess, ssrc);
+  if (src == NULL)
+    goto no_source;
 
   if (fir) {
     src->send_pli = FALSE;
@@ -3591,10 +4015,16 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
   } else if (!src->send_fir) {
     src->send_pli = TRUE;
   }
-
-  rtp_session_send_rtcp (sess, 200 * GST_MSECOND);
+  RTP_SESSION_UNLOCK (sess);
 
   return TRUE;
+
+  /* ERRORS */
+no_source:
+  {
+    RTP_SESSION_UNLOCK (sess);
+    return FALSE;
+  }
 }
 
 /**
@@ -3612,15 +4042,28 @@ gboolean
 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
     GstClockTime max_delay)
 {
-  RTPSource *source = find_source (sess, ssrc);
+  RTPSource *source;
 
-  if (source == NULL)
+  if (!rtp_session_send_rtcp (sess, max_delay)) {
+    GST_DEBUG ("NACK not sent");
     return FALSE;
+  }
+
+  RTP_SESSION_LOCK (sess);
+  source = find_source (sess, ssrc);
+  if (source == NULL)
+    goto no_source;
 
   GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
   rtp_source_register_nack (source, seqnum);
-
-  rtp_session_send_rtcp (sess, max_delay);
+  RTP_SESSION_UNLOCK (sess);
 
   return TRUE;
+
+  /* ERRORS */
+no_source:
+  {
+    RTP_SESSION_UNLOCK (sess);
+    return FALSE;
+  }
 }