rtpsession: Use bandwidth calculation by default instead of some arbitrary hardcoded...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
index 1074866..a260186 100644 (file)
@@ -49,12 +49,14 @@ enum
   SIGNAL_ON_SENDING_RTCP,
   SIGNAL_ON_FEEDBACK_RTCP,
   SIGNAL_SEND_RTCP,
+  SIGNAL_SEND_RTCP_FULL,
+  SIGNAL_ON_RECEIVING_RTCP,
   LAST_SIGNAL
 };
 
 #define DEFAULT_INTERNAL_SOURCE      NULL
-#define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
+#define DEFAULT_BANDWIDTH            0.0
+#define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
 #define DEFAULT_RTCP_MTU             1400
@@ -86,8 +88,7 @@ enum
   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
   PROP_PROBATION,
-  PROP_STATS,
-  PROP_LAST
+  PROP_STATS
 };
 
 /* update average packet size */
@@ -100,11 +101,6 @@ enum
    (avg) = ((val) + (15 * (avg))) >> 4;
 
 
-/* The number RTCP intervals after which to timeout entries in the
- * collision table
- */
-#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
-
 /* GObject vmethods */
 static void rtp_session_finalize (GObject * object);
 static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -112,7 +108,8 @@ static void rtp_session_set_property (GObject * object, guint prop_id,
 static void rtp_session_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
-static void rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp (RTPSession * sess,
+    GstClockTime max_delay);
 
 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
 
@@ -122,7 +119,7 @@ static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
 static RTPSource *obtain_internal_source (RTPSession * sess,
-    guint32 ssrc, gboolean * created);
+    guint32 ssrc, gboolean * created, GstClockTime current_time);
 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
     GstClockTime current_time);
 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
@@ -323,6 +320,42 @@ rtp_session_class_init (RTPSessionClass * klass)
       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
 
+  /**
+   * RTPSession::send-rtcp-full:
+   * @session: the object which received the signal
+   * @max_delay: The maximum delay after which the feedback will not be useful
+   *  anymore
+   *
+   * Requests that the #RTPSession initiate a new RTCP packet as soon as
+   * possible within the requested delay.
+   *
+   * Returns: TRUE if the new RTCP packet could be scheduled within the
+   * requested delay, FALSE otherwise.
+   *
+   * Since: 1.6
+   */
+  rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
+      g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+      g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
+
+  /**
+   * RTPSession::on-receiving-rtcp
+   * @session: the object which received the signal
+   * @buffer: the #GstBuffer containing the RTCP packet that was received
+   *
+   * This signal is emitted when receiving an RTCP packet before it is handled
+   * by the session. It can be used to extract custom information from RTCP packets.
+   *
+   * Since: 1.6
+   */
+  rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
+      g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
+      GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
           "The internal SSRC used for the session (deprecated)",
@@ -433,9 +466,9 @@ rtp_session_class_init (RTPSessionClass * klass)
       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
           "RTCP Immediate Feedback threshold",
           "The maximum number of members of a RTP session for which immediate"
-          " feedback is used",
+          " feedback is used (DEPRECATED: has no effect and is not needed)",
           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
 
   g_object_class_install_property (gobject_class, PROP_PROBATION,
       g_param_spec_uint ("probation", "Number of probations",
@@ -479,7 +512,11 @@ rtp_session_init (RTPSession * sess)
   sess->mask_idx = 0;
   sess->mask = 0;
 
-  for (i = 0; i < 32; i++) {
+  /* TODO: We currently only use the first hash table but this is the
+   * beginning of an implementation for RFC2762
+   for (i = 0; i < 32; i++) {
+   */
+  for (i = 0; i < 1; i++) {
     sess->ssrcs[i] =
         g_hash_table_new_full (NULL, NULL, NULL,
         (GDestroyNotify) g_object_unref);
@@ -524,6 +561,7 @@ rtp_session_init (RTPSession * sess)
 
   sess->first_rtcp = TRUE;
   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+  sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
 
   sess->allow_early = TRUE;
   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
@@ -546,7 +584,13 @@ rtp_session_finalize (GObject * object)
 
   gst_structure_free (sess->sdes);
 
-  for (i = 0; i < 32; i++)
+  g_list_free_full (sess->conflicting_addresses,
+      (GDestroyNotify) rtp_conflicting_address_free);
+
+  /* TODO: Change this again when implementing RFC 2762
+   * for (i = 0; i < 32; i++)
+   */
+  for (i = 0; i < 1; i++)
     g_hash_table_destroy (sess->ssrcs[i]);
 
   g_mutex_clear (&sess->lock);
@@ -607,6 +651,11 @@ rtp_session_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_INTERNAL_SSRC:
+      RTP_SESSION_LOCK (sess);
+      sess->suggested_ssrc = g_value_get_uint (value);
+      RTP_SESSION_UNLOCK (sess);
+      if (sess->callbacks.reconfigure)
+        sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
       break;
     case PROP_BANDWIDTH:
       RTP_SESSION_LOCK (sess);
@@ -888,6 +937,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
     sess->callbacks.notify_nack = callbacks->notify_nack;
     sess->notify_nack_user_data = user_data;
   }
+  if (callbacks->reconfigure) {
+    sess->callbacks.reconfigure = callbacks->reconfigure;
+    sess->reconfigure_user_data = user_data;
+  }
 }
 
 /**
@@ -1198,6 +1251,43 @@ static RTPSourceCallbacks callbacks = {
   (RTPSourceClockRate) source_clock_rate,
 };
 
+
+/**
+ * rtp_session_find_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to check for
+ * @time: The time when the packet that is possibly in conflict arrived
+ *
+ * Checks if an address which has a conflict is already known. If it is
+ * a known conflict, remember the time
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+static gboolean
+rtp_session_find_conflicting_address (RTPSession * session,
+    GSocketAddress * address, GstClockTime time)
+{
+  return find_conflicting_address (session->conflicting_addresses, address,
+      time);
+}
+
+/**
+ * rtp_session_add_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to remember
+ * @time: The time when the packet that is in conflict arrived
+ *
+ * Adds a new conflict address
+ */
+static void
+rtp_session_add_conflicting_address (RTPSession * sess,
+    GSocketAddress * address, GstClockTime time)
+{
+  sess->conflicting_addresses =
+      add_conflicting_address (sess->conflicting_addresses, address, time);
+}
+
+
 static gboolean
 check_collision (RTPSession * sess, RTPSource * source,
     RTPPacketInfo * pinfo, gboolean rtp)
@@ -1283,7 +1373,7 @@ check_collision (RTPSession * sess, RTPSource * source,
      */
   } else {
     /* This is sending with our ssrc, is it an address we already know */
-    if (rtp_source_find_conflicting_address (source, pinfo->address,
+    if (rtp_session_find_conflicting_address (sess, pinfo->address,
             pinfo->current_time)) {
       /* Its a known conflict, its probably a loop, not a collision
        * lets just drop the incoming packet
@@ -1291,7 +1381,7 @@ check_collision (RTPSession * sess, RTPSource * source,
       GST_DEBUG ("Our packets are being looped back to us, dropping");
     } else {
       /* Its a new collision, lets change our SSRC */
-      rtp_source_add_conflicting_address (source, pinfo->address,
+      rtp_session_add_conflicting_address (sess, pinfo->address,
           pinfo->current_time);
 
       GST_DEBUG ("Collision for SSRC %x", ssrc);
@@ -1365,8 +1455,8 @@ session_update_ptp (RTPSession * sess)
    * of each non-internal (=remotes) source have to be compared
    * to each other.
    */
-  gboolean is_doing_rtp_ptp = FALSE;
-  gboolean is_doing_rtcp_ptp = FALSE;
+  gboolean is_doing_rtp_ptp;
+  gboolean is_doing_rtcp_ptp;
   CompareAddrData data;
 
   /* compare the first remote source's ip addr that receive rtp packets
@@ -1482,7 +1572,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
 /* must be called with the session lock, the returned source needs to be
  * unreffed after usage. */
 static RTPSource *
-obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
+obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+    GstClockTime current_time)
 {
   RTPSource *source;
 
@@ -1504,6 +1595,11 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
   } else {
     *created = FALSE;
   }
+  /* update last activity */
+  if (current_time != GST_CLOCK_TIME_NONE) {
+    source->last_activity = current_time;
+    source->last_rtp_activity = current_time;
+  }
   g_object_ref (source);
 
   return source;
@@ -1625,7 +1721,7 @@ rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
 
   RTP_SESSION_LOCK (sess);
   result = find_source (sess, ssrc);
-  if (result)
+  if (result != NULL)
     g_object_ref (result);
   RTP_SESSION_UNLOCK (sess);
 
@@ -2261,7 +2357,7 @@ rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
           fir ? "FIR" : "PLI",
           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
-          GST_TIME_ARGS (round_trip_in_ns));;
+          GST_TIME_ARGS (round_trip_in_ns));
       return FALSE;
     }
   }
@@ -2290,10 +2386,12 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
     return;
 
   src = find_source (sess, sender_ssrc);
-  if (!src)
+  if (src == NULL)
     return;
 
   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+
+  src->stats.recv_pli_count++;
 }
 
 static void
@@ -2338,6 +2436,9 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
     ssrc = GST_READ_UINT32_BE (data);
 
     own = find_source (sess, ssrc);
+    if (own == NULL)
+      continue;
+
     if (own->internal) {
       our_request = TRUE;
       break;
@@ -2347,6 +2448,7 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
     return;
 
   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+  src->stats.recv_fir_count++;
 }
 
 static void
@@ -2354,6 +2456,8 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
     GstClockTime current_time)
 {
+  sess->stats.nacks_received++;
+
   if (!sess->callbacks.notify_nack)
     return;
 
@@ -2363,10 +2467,10 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
     seqnum = GST_READ_UINT16_BE (fci_data);
     blp = GST_READ_UINT16_BE (fci_data + 2);
 
-    GST_DEBUG ("NACK #%u, blp %04x", seqnum, blp);
+    GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
 
     RTP_SESSION_UNLOCK (sess);
-    sess->callbacks.notify_nack (sess, seqnum, blp,
+    sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
         sess->notify_nack_user_data);
     RTP_SESSION_LOCK (sess);
 
@@ -2396,8 +2500,6 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
 
-  sess->stats.nacks_received++;
-
   if (g_signal_has_handler_pending (sess,
           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
     GstBuffer *fci_buffer = NULL;
@@ -2418,14 +2520,11 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
       gst_buffer_unref (fci_buffer);
   }
 
-  if (!src)
-    return;
-
-  if (sess->rtcp_feedback_retention_window) {
+  if (src && sess->rtcp_feedback_retention_window) {
     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
   }
 
-  if (src->internal ||
+  if ((src && src->internal) ||
       /* PSFB FIR puts the media ssrc inside the FCI */
       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
     switch (type) {
@@ -2488,6 +2587,9 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   GST_DEBUG ("received RTCP packet");
 
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
+      buffer);
+
   RTP_SESSION_LOCK (sess);
   /* update pinfo stats */
   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
@@ -2535,15 +2637,14 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
 
   /* if we are scheduling a BYE, we only want to count bye packets, else we
    * count everything */
-  if (sess->scheduled_bye) {
-    if (is_bye) {
-      sess->stats.bye_members++;
-      UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
-    }
-  } else {
-    /* keep track of average packet size */
-    UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+  if (sess->scheduled_bye && is_bye) {
+    sess->bye_stats.bye_members++;
+    UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
   }
+
+  /* keep track of average packet size */
+  UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+
   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
   RTP_SESSION_UNLOCK (sess);
@@ -2594,11 +2695,20 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
     gboolean created;
 
     RTP_SESSION_LOCK (sess);
-    source = obtain_internal_source (sess, ssrc, &created);
+    source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
     if (source) {
       rtp_source_update_caps (source, caps);
       g_object_unref (source);
     }
+
+    if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
+      source =
+          obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+      if (source) {
+        rtp_source_update_caps (source, caps);
+        g_object_unref (source);
+      }
+    }
     RTP_SESSION_UNLOCK (sess);
   }
 }
@@ -2637,10 +2747,7 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
           current_time, running_time, -1))
     goto invalid_packet;
 
-  source = obtain_internal_source (sess, pinfo.ssrc, &created);
-
-  /* update last activity */
-  source->last_rtp_activity = current_time;
+  source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
   oldrate = source->bitrate;
@@ -2680,6 +2787,7 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
     gboolean first)
 {
   GstClockTime result;
+  RTPSessionStats *stats;
 
   /* recalculate bandwidth when it changed */
   if (sess->recalc_bandwidth) {
@@ -2693,9 +2801,8 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
 
       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
           (GHFunc) add_bitrates, &bandwidth);
-      bandwidth /= 8.0;
     }
-    if (bandwidth < 8000)
+    if (bandwidth < RTP_STATS_BANDWIDTH)
       bandwidth = RTP_STATS_BANDWIDTH;
 
     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
@@ -2705,17 +2812,19 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
   }
 
   if (sess->scheduled_bye) {
-    result = rtp_stats_calculate_bye_interval (&sess->stats);
+    stats = &sess->bye_stats;
+    result = rtp_stats_calculate_bye_interval (stats);
   } else {
-    result = rtp_stats_calculate_rtcp_interval (&sess->stats,
-        sess->stats.internal_sender_sources > 0, first);
+    stats = &sess->stats;
+    result = rtp_stats_calculate_rtcp_interval (stats,
+        stats->internal_sender_sources > 0, first);
   }
 
   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
       GST_TIME_ARGS (result), first);
 
   if (!deterministic && result != GST_CLOCK_TIME_NONE)
-    result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+    result = rtp_stats_add_rtcp_jitter (stats, result);
 
   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
 
@@ -2763,8 +2872,9 @@ rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
   /* we schedule BYE now */
   sess->scheduled_bye = TRUE;
   /* at least one member wants to send a BYE */
-  INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
-  sess->stats.bye_members = 1;
+  memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
+  INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
+  sess->bye_stats.bye_members = 1;
   sess->first_rtcp = TRUE;
   sess->allow_early = TRUE;
 
@@ -2802,7 +2912,7 @@ done:
 GstFlowReturn
 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
 {
-  GstFlowReturn result = GST_FLOW_OK;
+  GstFlowReturn result;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
@@ -2852,7 +2962,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
   }
 
   if (sess->scheduled_bye) {
-    if (sess->stats.active_sources >= 50) {
+    if (sess->bye_stats.active_sources >= 50) {
       GST_DEBUG ("reconsider BYE, more than 50 sources");
       /* reconsider BYE if members >= 50 */
       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
@@ -2972,15 +3082,21 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
     return;
   }
 
-  /* only report about other sender */
-  if (source == data->source)
-    goto reported;
+  if (g_hash_table_contains (source->reported_in_sr_of,
+          GUINT_TO_POINTER (data->source->ssrc))) {
+    GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
+    return;
+  }
 
   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
     GST_DEBUG ("max RB count reached");
     return;
   }
 
+  /* only report about other sender */
+  if (source == data->source)
+    goto reported;
+
   if (!RTP_SOURCE_IS_SENDER (source)) {
     GST_DEBUG ("source %08x not sender", source->ssrc);
     goto reported;
@@ -3006,14 +3122,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
       exthighestseq, jitter, lsr, dlsr);
 
 reported:
-  /* source is reported, move to next generation */
-  source->generation = sess->generation + 1;
-
-  /* if we reported all sources in this generation, move to next */
-  if (--data->num_to_report == 0) {
-    sess->generation++;
-    GST_DEBUG ("all reported, generation now %u", sess->generation);
-  }
+  g_hash_table_add (source->reported_in_sr_of,
+      GUINT_TO_POINTER (data->source->ssrc));
 }
 
 /* construct FIR */
@@ -3041,6 +3151,7 @@ session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
   fci_data[1] = fci_data[2] = fci_data[3] = 0;
 
   source->send_fir = FALSE;
+  source->stats.sent_fir_count++;
 }
 
 static void
@@ -3109,6 +3220,8 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data)
 
   source->send_pli = FALSE;
   data->may_suppress = FALSE;
+
+  source->stats.sent_pli_count++;
 }
 
 /* construct NACK */
@@ -3167,8 +3280,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
   if (source->internal) {
     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
     rtp_source_timeout (source, data->current_time,
-        /* "a relatively long time" -- RFC 3550 section 8.2 */
-        RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
         data->running_time - sess->rtcp_feedback_retention_window);
   }
 
@@ -3202,26 +3313,41 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
       GST_TIME_ARGS (binterval));
 
-  if (!source->internal) {
-    if (source->marked_bye) {
-      /* if we received a BYE from the source, remove the source after some
-       * time. */
-      if (data->current_time > source->bye_time &&
-          data->current_time - source->bye_time > sess->stats.bye_timeout) {
-        GST_DEBUG ("removing BYE source %08x", source->ssrc);
-        remove = TRUE;
-        byetimeout = TRUE;
-      }
+  if (!source->internal && source->marked_bye) {
+    /* if we received a BYE from the source, remove the source after some
+     * time. */
+    if (data->current_time > source->bye_time &&
+        data->current_time - source->bye_time > sess->stats.bye_timeout) {
+      GST_DEBUG ("removing BYE source %08x", source->ssrc);
+      remove = TRUE;
+      byetimeout = TRUE;
     }
-    /* sources that were inactive for more than 5 times the deterministic reporting
-     * interval get timed out. the min timeout is 5 seconds. */
-    /* mind old time that might pre-date last time going to PLAYING */
-    btime = MAX (source->last_activity, sess->start_time);
-    if (data->current_time > btime) {
-      interval = MAX (binterval * 5, 5 * GST_SECOND);
-      if (data->current_time - btime > interval) {
-        GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
-            source->ssrc, GST_TIME_ARGS (btime));
+  }
+
+  if (source->internal && source->sent_bye) {
+    GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
+    remove = TRUE;
+  }
+
+  /* sources that were inactive for more than 5 times the deterministic reporting
+   * interval get timed out. the min timeout is 5 seconds. */
+  /* mind old time that might pre-date last time going to PLAYING */
+  btime = MAX (source->last_activity, sess->start_time);
+  if (data->current_time > btime) {
+    interval = MAX (binterval * 5, 5 * GST_SECOND);
+    if (data->current_time - btime > interval) {
+      GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+          source->ssrc, GST_TIME_ARGS (btime));
+      if (source->internal) {
+        /* this is an internal source that is not using our suggested ssrc.
+         * since there must be another source using this ssrc, we can remove
+         * this one instead of making it a receiver forever */
+        if (source->ssrc != sess->suggested_ssrc) {
+          rtp_source_mark_bye (source, "timed out");
+          /* do not schedule bye here, since we are inside the RTCP timeout
+           * processing and scheduling bye will interfere with SR/RR sending */
+        }
+      } else {
         remove = TRUE;
       }
     }
@@ -3235,16 +3361,9 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
     if (data->current_time > btime) {
       interval = MAX (binterval * 2, 5 * GST_SECOND);
       if (data->current_time - btime > interval) {
-        if (source->internal && source->sent_bye) {
-          /* an internal source is BYE and stopped sending RTP, remove */
-          GST_DEBUG ("internal BYE source %08x timed out, last %"
-              GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
-          remove = TRUE;
-        } else {
-          GST_DEBUG ("sender source %08x timed out and became receiver, last %"
-              GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
-          sendertimeout = TRUE;
-        }
+        GST_DEBUG ("sender source %08x timed out and became receiver, last %"
+            GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
+        sendertimeout = TRUE;
       }
     }
   }
@@ -3369,8 +3488,14 @@ make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
 static gboolean
 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
 {
-  GstClockTime new_send_time, elapsed;
+  GstClockTime new_send_time;
   GstClockTime interval;
+  RTPSessionStats *stats;
+
+  if (sess->scheduled_bye)
+    stats = &sess->bye_stats;
+  else
+    stats = &sess->stats;
 
   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
     data->is_early = TRUE;
@@ -3394,21 +3519,33 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
   }
 
 early:
-  /* get elapsed time since we last reported */
-  elapsed = current_time - sess->last_rtcp_send_time;
 
   /* take interval and add jitter */
   interval = data->interval;
   if (interval != GST_CLOCK_TIME_NONE)
-    interval = rtp_stats_add_rtcp_jitter (&sess->stats, interval);
+    interval = rtp_stats_add_rtcp_jitter (stats, interval);
+
+  if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
+    /* perform forward reconsideration */
+    if (interval != GST_CLOCK_TIME_NONE) {
+      GstClockTime elapsed;
+
+      /* get elapsed time since we last reported */
+      elapsed = current_time - sess->last_rtcp_send_time;
 
-  /* perform forward reconsideration */
-  if (interval != GST_CLOCK_TIME_NONE) {
-    GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
-    new_send_time = interval + sess->last_rtcp_send_time;
+      GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
+      new_send_time = interval + sess->last_rtcp_send_time;
+    } else {
+      new_send_time = sess->last_rtcp_send_time;
+    }
   } else {
-    new_send_time = sess->last_rtcp_send_time;
+    /* If this is the first RTCP packet, we can reconsider anything based
+     * on the last RTCP send time because there was none.
+     */
+    g_warn_if_fail (!data->is_early);
+    data->is_early = FALSE;
+    new_send_time = current_time;
   }
 
   if (!data->is_early) {
@@ -3423,9 +3560,9 @@ early:
     sess->next_rtcp_check_time = current_time + interval;
   } else if (interval != GST_CLOCK_TIME_NONE) {
     /* Apply the rules from RFC 4585 section 3.5.3 */
-    if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
+    if (stats->min_interval != 0 && !sess->first_rtcp) {
       GstClockTime T_rr_current_interval =
-          g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
+          g_random_double_range (0.5, 1.5) * stats->min_interval;
 
       /* This will caused the RTCP to be suppressed if no FB packets are added */
       if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
@@ -3433,7 +3570,7 @@ early:
             " last: %" GST_TIME_FORMAT
             " + T_rr_current_interval: %" GST_TIME_FORMAT
             " >  new_send_time: %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (sess->stats.min_interval),
+            GST_TIME_ARGS (stats->min_interval),
             GST_TIME_ARGS (sess->last_rtcp_send_time),
             GST_TIME_ARGS (T_rr_current_interval),
             GST_TIME_ARGS (new_send_time));
@@ -3482,6 +3619,10 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
   if (!source->internal || source->sent_bye)
     return;
 
+  /* ignore other sources when we do the timeout after a scheduled BYE */
+  if (sess->scheduled_bye && !source->marked_bye)
+    return;
+
   data->source = source;
 
   /* open packet */
@@ -3521,6 +3662,28 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
   g_queue_push_tail (&data->output, output);
 }
 
+static void
+update_generation (const gchar * key, RTPSource * source, ReportData * data)
+{
+  RTPSession *sess = data->sess;
+
+  if (g_hash_table_size (source->reported_in_sr_of) >=
+      sess->stats.internal_sources) {
+    /* source is reported, move to next generation */
+    source->generation = sess->generation + 1;
+    g_hash_table_remove_all (source->reported_in_sr_of);
+
+    GST_LOG ("reported source %x, new generation: %d", source->ssrc,
+        source->generation);
+
+    /* if we reported all sources in this generation, move to next */
+    if (--data->num_to_report == 0) {
+      sess->generation++;
+      GST_DEBUG ("all reported, generation now %u", sess->generation);
+    }
+  }
+}
+
 /**
  * rtp_session_on_timeout:
  * @sess: an #RTPSession
@@ -3574,10 +3737,14 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
     RTPSource *source;
     gboolean created;
 
-    source = obtain_internal_source (sess, sess->suggested_ssrc, &created);
+    source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
+        current_time);
     g_object_unref (source);
   }
 
+  sess->conflicting_addresses =
+      timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
+
   /* Make a local copy of the hashtable. We need to do this because the
    * cleanup stage below releases the session lock. */
   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
@@ -3608,12 +3775,22 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
       (GHFunc) generate_rtcp, &data);
 
+  /* update the generation for all the sources that have been reported */
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) update_generation, &data);
+
   /* we keep track of the last report time in order to timeout inactive
    * receivers or senders */
   if (!data.is_early && !data.may_suppress)
     sess->last_rtcp_send_time = data.current_time;
   sess->first_rtcp = FALSE;
   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+  sess->scheduled_bye = FALSE;
+
+  /*  RFC 4585 section 3.5.2 step 6 */
+  if (!data.is_early) {
+    sess->allow_early = TRUE;
+  }
 
 done:
   RTP_SESSION_UNLOCK (sess);
@@ -3660,12 +3837,15 @@ done:
  * @max_delay: maximum delay
  *
  * Request transmission of early RTCP
+ *
+ * Returns: %TRUE if the related RTCP can be scheduled.
  */
-void
+gboolean
 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
     GstClockTime max_delay)
 {
-  GstClockTime T_dither_max;
+  GstClockTime T_dither_max, T_rr;
+  gboolean ret;
 
   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
 
@@ -3675,23 +3855,44 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
   /*  RFC 4585 section 3.5.2 step 2 */
   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
     GST_LOG_OBJECT (sess, "already have next early rtcp time");
-    goto dont_send;
+    ret = TRUE;
+    goto end;
   }
 
   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
     GST_LOG_OBJECT (sess, "no next RTCP check time");
-    goto dont_send;
+    ret = FALSE;
+    goto end;
   }
 
-  /* Ignore the request a scheduled packet will be in time anyway */
-  if (current_time + max_delay > sess->next_rtcp_check_time) {
-    GST_LOG_OBJECT (sess, "next scheduled time is soon %" GST_TIME_FORMAT " + %"
-        GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (current_time),
-        GST_TIME_ARGS (max_delay), GST_TIME_ARGS (sess->next_rtcp_check_time));
-    goto dont_send;
+  /* RFC 4585 section 3.5.3 step 1
+   * If no regular RTCP packet has been sent before, then a regular
+   * RTCP packet has to be scheduled first and FB messages might be
+   * included there
+   */
+  if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
+    GST_LOG_OBJECT (sess, "no RTCP sent yet");
+
+    if (current_time + max_delay > sess->next_rtcp_check_time) {
+      GST_LOG_OBJECT (sess,
+          "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+          " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+          GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = TRUE;
+    } else {
+      GST_LOG_OBJECT (sess,
+          "can't allow early feedback, next scheduled time is too late %"
+          GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = FALSE;
+    }
+    goto end;
   }
 
+  T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+
   /*  RFC 4585 section 3.5.2 step 2b */
   /* If the total sources is <=2, then there is only us and one peer */
   /* When there is one auxiliary stream the session can still do point
@@ -3701,27 +3902,43 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
     T_dither_max = 0;
   } else {
     /* Divide by 2 because l = 0.5 */
-    T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+    T_dither_max = T_rr;
     T_dither_max /= 2;
   }
 
   /*  RFC 4585 section 3.5.2 step 3 */
   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
-    GST_LOG_OBJECT (sess, "don't send because of dither");
-    goto dont_send;
-  }
-
-  /*  RFC 4585 section 3.5.2 step 4
-   * Don't send if allow_early is FALSE, but not if we are in
-   * immediate mode, meaning we are part of a group of at most the
-   * application-specific threshold.
-   */
-  if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
-      sess->allow_early == FALSE) {
-    GST_LOG_OBJECT (sess, "can't allow early feedback");
-    goto dont_send;
+    GST_LOG_OBJECT (sess,
+        "don't send because of dither, next scheduled time is soon %"
+        GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
+        GST_TIME_ARGS (sess->next_rtcp_check_time));
+    ret = TRUE;
+    goto end;
+  }
+
+  /*  RFC 4585 section 3.5.2 step 4a */
+  if (sess->allow_early == FALSE) {
+    /* Ignore the request a scheduled packet will be in time anyway */
+    if (current_time + max_delay > sess->next_rtcp_check_time) {
+      GST_LOG_OBJECT (sess,
+          "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+          " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+          GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = TRUE;
+    } else {
+      GST_LOG_OBJECT (sess,
+          "can't allow early feedback, next scheduled time is too late %"
+          GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+          GST_TIME_ARGS (sess->next_rtcp_check_time));
+      ret = FALSE;
+    }
+    goto end;
   }
 
+  /*  RFC 4585 section 3.5.2 step 4b */
   if (T_dither_max) {
     /* Schedule an early transmission later */
     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
@@ -3731,8 +3948,18 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
     sess->next_early_rtcp_time = current_time;
   }
 
-  GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sess->next_early_rtcp_time));
+  /*  RFC 4585 section 3.5.2 step 6 */
+  sess->allow_early = FALSE;
+  /* Delay next regular RTCP packet to not exceed the short-term
+   * RTCP bandwidth when using early feedback as compared to
+   * without */
+  sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
+  sess->last_rtcp_send_time += T_rr;
+
+  GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
+      ", next regular RTCP time %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (sess->next_early_rtcp_time),
+      GST_TIME_ARGS (sess->next_rtcp_check_time));
   RTP_SESSION_UNLOCK (sess);
 
   /* notify app of need to send packet early
@@ -3740,24 +3967,26 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
   if (sess->callbacks.reconsider)
     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
 
-  return;
+  return TRUE;
 
-dont_send:
+end:
 
   RTP_SESSION_UNLOCK (sess);
+
+  return ret;
 }
 
-static void
+static gboolean
 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
 {
   GstClockTime now;
 
   if (!sess->callbacks.send_rtcp)
-    return;
+    return FALSE;
 
   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
 
-  rtp_session_request_early_rtcp (sess, now, max_delay);
+  return rtp_session_request_early_rtcp (sess, now, max_delay);
 }
 
 gboolean
@@ -3766,9 +3995,14 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
 {
   RTPSource *src;
 
+  if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+    GST_DEBUG ("FIR/PLI not sent");
+    return FALSE;
+  }
+
   RTP_SESSION_LOCK (sess);
   src = find_source (sess, ssrc);
-  if (!src)
+  if (src == NULL)
     goto no_source;
 
   if (fir) {
@@ -3783,8 +4017,6 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
   }
   RTP_SESSION_UNLOCK (sess);
 
-  rtp_session_send_rtcp (sess, 200 * GST_MSECOND);
-
   return TRUE;
 
   /* ERRORS */
@@ -3812,6 +4044,11 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
 {
   RTPSource *source;
 
+  if (!rtp_session_send_rtcp (sess, max_delay)) {
+    GST_DEBUG ("NACK not sent");
+    return FALSE;
+  }
+
   RTP_SESSION_LOCK (sess);
   source = find_source (sess, ssrc);
   if (source == NULL)
@@ -3821,8 +4058,6 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
   rtp_source_register_nack (source, seqnum);
   RTP_SESSION_UNLOCK (sess);
 
-  rtp_session_send_rtcp (sess, max_delay);
-
   return TRUE;
 
   /* ERRORS */