rtpsession: Always keep at least one NACK on early RTCP
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
index e3bf338..fa9dfed 100644 (file)
@@ -54,6 +54,7 @@ enum
   SIGNAL_ON_RECEIVING_RTCP,
   SIGNAL_ON_NEW_SENDER_SSRC,
   SIGNAL_ON_SENDER_SSRC_ACTIVE,
+  SIGNAL_ON_SENDING_NACKS,
   LAST_SIGNAL
 };
 
@@ -75,6 +76,7 @@ enum
 #define DEFAULT_MAX_MISORDER_TIME    2000
 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
+#define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
 
 enum
 {
@@ -99,7 +101,8 @@ enum
   PROP_MAX_MISORDER_TIME,
   PROP_STATS,
   PROP_RTP_PROFILE,
-  PROP_RTCP_REDUCED_SIZE
+  PROP_RTCP_REDUCED_SIZE,
+  PROP_RTCP_DISABLE_SR_TIMESTAMP
 };
 
 /* update average packet size */
@@ -121,6 +124,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id,
 
 static gboolean rtp_session_send_rtcp (RTPSession * sess,
     GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
+    GstClockTime deadline);
 
 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
 
@@ -418,6 +423,39 @@ rtp_session_class_init (RTPSessionClass * klass)
           on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
 
+  /**
+   * RTPSession::on-sending-nack
+   * @session: the object which received the signal
+   * @sender_ssrc: the sender ssrc
+   * @media_ssrc: the media ssrc
+   * @nacks: (element-type guint16): the list of seqnum to be nacked
+   * @buffer: the #GstBuffer containing the RTCP packet about to be sent
+   *
+   * This signal is emitted before NACK packets are added into the RTCP
+   * packet. This signal can be used to override the conversion of the NACK
+   * seqnum array into packets. This can be used if your protocol uses
+   * different type of NACK (e.g. based on RTCP APP).
+   *
+   * The handler should transform the seqnum from @nacks array into packets.
+   * @nacks seqnum must be consumed from the start. The remaining will be
+   * rescheduled for later base on bandwidth. Only one handler will be
+   * signalled.
+   *
+   * A handler may return 0 to signal that generic NACKs should be created
+   * for this set. This can be useful if the signal is used for other purpose
+   * or if the other type of NACK would use more space.
+   *
+   * Returns: the number of NACK seqnum that was consumed from @nacks.
+   *
+   * Since: 1.16
+   */
+  rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
+      g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
+      g_signal_accumulator_first_wins, NULL, g_cclosure_marshal_generic,
+      G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
+      GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
           "The internal SSRC used for the session (deprecated)",
@@ -581,6 +619,21 @@ rtp_session_class_init (RTPSessionClass * klass)
           DEFAULT_RTCP_REDUCED_SIZE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * RTPSession::disable-sr-timestamp:
+   *
+   * Whether sender reports should be timestamped.
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class,
+      PROP_RTCP_DISABLE_SR_TIMESTAMP,
+      g_param_spec_boolean ("disable-sr-timestamp",
+          "Disable Sender Report Timestamp",
+          "Whether sender reports should be timestamped",
+          DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   klass->get_source_by_ssrc =
       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
@@ -661,8 +714,7 @@ rtp_session_init (RTPSession * sess)
       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
   sess->rtp_profile = DEFAULT_RTP_PROFILE;
   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
-
-  sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
+  sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
 
   sess->is_doing_ptp = TRUE;
 }
@@ -724,16 +776,15 @@ rtp_session_create_sources (RTPSession * sess)
 static void
 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
 {
-  GValue value = G_VALUE_INIT;
+  GValue *value;
   GstStructure *s;
 
   g_object_get (source, "stats", &s, NULL);
 
-  g_value_init (&value, GST_TYPE_STRUCTURE);
-  gst_value_set_structure (&value, s);
-  g_value_array_append (arr, &value);
-  gst_structure_free (s);
-  g_value_unset (&value);
+  g_value_array_append (arr, NULL);
+  value = g_value_array_get_nth (arr, arr->n_values - 1);
+  g_value_init (value, GST_TYPE_STRUCTURE);
+  g_value_take_boxed (value, s);
 }
 
 static GstStructure *
@@ -824,6 +875,9 @@ rtp_session_set_property (GObject * object, guint prop_id,
       if (sess->callbacks.reconsider)
         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
       break;
+    case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
+      sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
+      break;
     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
       break;
@@ -848,6 +902,9 @@ rtp_session_set_property (GObject * object, guint prop_id,
     case PROP_RTCP_REDUCED_SIZE:
       sess->reduced_size_rtcp = g_value_get_boolean (value);
       break;
+    case PROP_RTCP_DISABLE_SR_TIMESTAMP:
+      sess->timestamp_sender_reports = !g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -903,6 +960,9 @@ rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_RTCP_MIN_INTERVAL:
       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
       break;
+    case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
+      g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
+      break;
     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
       break;
@@ -924,6 +984,9 @@ rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_RTCP_REDUCED_SIZE:
       g_value_set_boolean (value, sess->reduced_size_rtcp);
       break;
+    case PROP_RTCP_DISABLE_SR_TIMESTAMP:
+      g_value_set_boolean (value, !sess->timestamp_sender_reports);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1064,6 +1127,47 @@ rtp_session_new (void)
 }
 
 /**
+ * rtp_session_reset:
+ * @sess: an #RTPSession
+ *
+ * Reset the sources of @sess.
+ */
+void
+rtp_session_reset (RTPSession * sess)
+{
+  g_return_if_fail (RTP_IS_SESSION (sess));
+
+  /* remove all sources */
+  g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
+  sess->total_sources = 0;
+  sess->stats.sender_sources = 0;
+  sess->stats.internal_sender_sources = 0;
+  sess->stats.internal_sources = 0;
+  sess->stats.active_sources = 0;
+
+  sess->generation = 0;
+  sess->first_rtcp = TRUE;
+  sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+  sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
+  sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
+  sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
+  sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+  sess->scheduled_bye = FALSE;
+
+  /* reset session stats */
+  sess->stats.bye_members = 0;
+  sess->stats.nacks_dropped = 0;
+  sess->stats.nacks_sent = 0;
+  sess->stats.nacks_received = 0;
+
+  sess->is_doing_ptp = TRUE;
+
+  g_list_free_full (sess->conflicting_addresses,
+      (GDestroyNotify) rtp_conflicting_address_free);
+  sess->conflicting_addresses = NULL;
+}
+
+/**
  * rtp_session_set_callbacks:
  * @sess: an #RTPSession
  * @callbacks: callbacks to configure
@@ -1117,6 +1221,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
     sess->callbacks.reconfigure = callbacks->reconfigure;
     sess->reconfigure_user_data = user_data;
   }
+  if (callbacks->notify_early_rtcp) {
+    sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
+    sess->notify_early_rtcp_user_data = user_data;
+  }
 }
 
 /**
@@ -1348,6 +1456,12 @@ rtp_session_get_sdes_struct (RTPSession * sess)
   return result;
 }
 
+static void
+source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
+{
+  rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
+}
+
 /**
  * rtp_session_set_sdes_struct:
  * @sess: an #RTSPSession
@@ -1365,6 +1479,9 @@ rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
   if (sess->sdes)
     gst_structure_free (sess->sdes);
   sess->sdes = gst_structure_copy (sdes);
+
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) source_set_sdes, sess->sdes);
   RTP_SESSION_UNLOCK (sess);
 }
 
@@ -1927,34 +2044,6 @@ rtp_session_create_new_ssrc (RTPSession * sess)
   return ssrc;
 }
 
-
-/**
- * rtp_session_create_source:
- * @sess: an #RTPSession
- *
- * Create an #RTPSource for use in @sess. This function will create a source
- * with an ssrc that is currently not used by any participants in the session.
- *
- * Returns: an #RTPSource.
- */
-RTPSource *
-rtp_session_create_source (RTPSession * sess)
-{
-  guint32 ssrc;
-  RTPSource *source;
-
-  RTP_SESSION_LOCK (sess);
-  ssrc = rtp_session_create_new_ssrc (sess);
-  source = rtp_source_new (ssrc);
-  rtp_source_set_callbacks (source, &callbacks, sess);
-  /* we need an additional ref for the source in the hashtable */
-  g_object_ref (source);
-  add_source (sess, source);
-  RTP_SESSION_UNLOCK (sess);
-
-  return source;
-}
-
 static gboolean
 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
 {
@@ -2011,7 +2100,7 @@ invalid_packet:
 /* update the RTPPacketInfo structure with the current time and other bits
  * about the current buffer we are handling.
  * This function is typically called when a validated packet is received.
- * This function should be called with the SESSION_LOCK
+ * This function should be called with the RTP_SESSION_LOCK
  */
 static gboolean
 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
@@ -2137,7 +2226,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
           current_time, running_time, ntpnstime)) {
     GST_DEBUG ("invalid RTP packet received");
     RTP_SESSION_UNLOCK (sess);
-    return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
+    return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
+        ntpnstime);
   }
 
   ssrc = pinfo.ssrc;
@@ -2150,6 +2240,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   prevactive = RTP_SOURCE_IS_ACTIVE (source);
   oldrate = source->bitrate;
 
+  if (created)
+    on_new_ssrc (sess, source);
+
   /* let source process the packet */
   result = rtp_source_process_rtp (source, &pinfo);
 
@@ -2162,8 +2255,6 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   if (oldrate != source->bitrate)
     sess->recalc_bandwidth = TRUE;
 
-  if (created)
-    on_new_ssrc (sess, source);
 
   if (source->validated) {
     gboolean created;
@@ -2565,34 +2656,39 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
 
 static gboolean
 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
-    gboolean fir, GstClockTime current_time)
+    guint32 media_ssrc, gboolean fir, GstClockTime current_time)
 {
   guint32 round_trip = 0;
 
   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
 
-  if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
+  if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
         GST_SECOND, 65536);
 
-    if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
-      GST_DEBUG ("Ignoring %s request because one was send without one "
+    /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
+     * packets with erroneous values resulting in crazy high RTT. */
+    if (round_trip_in_ns > 5 * GST_SECOND)
+      round_trip_in_ns = GST_SECOND / 2;
+
+    if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
+      GST_DEBUG ("Ignoring %s request from %X because one was send without one "
           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
-          fir ? "FIR" : "PLI",
-          GST_TIME_ARGS (current_time - sess->last_keyframe_request),
+          fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
+          GST_TIME_ARGS (current_time - src->last_keyframe_request),
           GST_TIME_ARGS (round_trip_in_ns));
       return FALSE;
     }
   }
 
-  sess->last_keyframe_request = current_time;
+  src->last_keyframe_request = current_time;
 
-  GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
-      rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
+  GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI",
+      rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp,
       sess->callbacks.request_key_unit);
 
   RTP_SESSION_UNLOCK (sess);
-  sess->callbacks.request_key_unit (sess, fir,
+  sess->callbacks.request_key_unit (sess, media_ssrc, fir,
       sess->request_key_unit_user_data);
   RTP_SESSION_LOCK (sess);
 
@@ -2609,15 +2705,21 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
     return;
 
   src = find_source (sess, sender_ssrc);
-  if (src == NULL)
-    return;
+  if (src == NULL) {
+    /* try to find a src with media_ssrc instead */
+    src = find_source (sess, media_ssrc);
+    if (src == NULL)
+      return;
+  }
 
-  rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+  rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE,
+      current_time);
 }
 
 static void
 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
-    guint8 * fci_data, guint fci_length, GstClockTime current_time)
+    guint32 media_ssrc, guint8 * fci_data, guint fci_length,
+    GstClockTime current_time)
 {
   RTPSource *src;
   guint32 ssrc;
@@ -2668,7 +2770,8 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
   if (!our_request)
     return;
 
-  rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+  rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE,
+      current_time);
 }
 
 static void
@@ -2703,20 +2806,34 @@ static void
 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
     RTPPacketInfo * pinfo, GstClockTime current_time)
 {
-  GstRTCPType type = gst_rtcp_packet_get_type (packet);
-  GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
-  guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
-  guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
-  guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
-  guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
+  GstRTCPType type;
+  GstRTCPFBType fbtype;
+  guint32 sender_ssrc, media_ssrc;
+  guint8 *fci_data;
+  guint fci_length;
   RTPSource *src;
 
+  /* The feedback packet must include both sender SSRC and media SSRC */
+  if (packet->length < 2)
+    return;
+
+  type = gst_rtcp_packet_get_type (packet);
+  fbtype = gst_rtcp_packet_fb_get_type (packet);
+  sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
+  media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
+
   src = find_source (sess, media_ssrc);
 
   /* skip non-bye packets for sources that are marked BYE */
   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
     return;
 
+  if (src)
+    g_object_ref (src);
+
+  fci_data = gst_rtcp_packet_fb_get_fci (packet);
+  fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
+
   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
 
@@ -2740,7 +2857,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
       gst_buffer_unref (fci_buffer);
   }
 
-  if (src && sess->rtcp_feedback_retention_window) {
+  if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
   }
 
@@ -2759,8 +2876,8 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
           case GST_RTCP_PSFB_TYPE_FIR:
             if (src)
               src->stats.recv_fir_count++;
-            rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
-                current_time);
+            rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data,
+                fci_length, current_time);
             break;
           default:
             break;
@@ -2769,6 +2886,8 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
       case GST_RTCP_TYPE_RTPFB:
         switch (fbtype) {
           case GST_RTCP_RTPFB_TYPE_NACK:
+            if (src)
+              src->stats.recv_nack_count++;
             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
                 fci_data, fci_length, current_time);
             break;
@@ -2779,6 +2898,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
         break;
     }
   }
+
+  if (src)
+    g_object_unref (src);
 }
 
 /**
@@ -2795,7 +2917,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
  */
 GstFlowReturn
 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
-    GstClockTime current_time, guint64 ntpnstime)
+    GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
 {
   GstRTCPPacket packet;
   gboolean more, is_bye = FALSE, do_sync = FALSE;
@@ -2817,7 +2939,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
   RTP_SESSION_LOCK (sess);
   /* update pinfo stats */
   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
-      -1, ntpnstime);
+      running_time, ntpnstime);
 
   /* start processing the compound packet */
   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
@@ -2850,8 +2972,14 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
       case GST_RTCP_TYPE_PSFB:
         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
         break;
+      case GST_RTCP_TYPE_XR:
+        /* FIXME: This block is added to downgrade warning level.
+         * Once the parser is implemented, it should be replaced with
+         * a proper process function. */
+        GST_DEBUG ("got RTCP XR packet, but ignored");
+        break;
       default:
-        GST_WARNING ("got unknown RTCP packet");
+        GST_WARNING ("got unknown RTCP packet type: %d", type);
         break;
     }
     more = gst_rtcp_packet_move_to_next (&packet);
@@ -2937,6 +3065,10 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
       if (source) {
         rtp_source_update_caps (source, caps);
+
+        if (created)
+          on_new_sender_ssrc (sess, source);
+
         g_object_unref (source);
       }
     }
@@ -2954,8 +3086,8 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
  * @current_time: the current system time
  * @running_time: the running time of @data
  *
- * Send the RTP buffer in the session manager. This function takes ownership of
- * @buffer.
+ * Send the RTP data (a buffer or buffer list) in the session manager. This
+ * function takes ownership of @data.
  *
  * Returns: a #GstFlowReturn.
  */
@@ -2984,6 +3116,10 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
   if (created)
     on_new_sender_ssrc (sess, source);
 
+  if (!source->internal)
+    /* FIXME: Send GstRTPCollision upstream  */
+    goto collision;
+
   prevsender = RTP_SOURCE_IS_SENDER (source);
   oldrate = source->bitrate;
 
@@ -3008,6 +3144,15 @@ invalid_packet:
     GST_DEBUG ("invalid RTP packet received");
     return GST_FLOW_OK;
   }
+collision:
+  {
+    g_object_unref (source);
+    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    RTP_SESSION_UNLOCK (sess);
+    GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
+        pinfo.ssrc);
+    return GST_FLOW_OK;
+  }
 }
 
 static void
@@ -3317,7 +3462,9 @@ session_start_rtcp (RTPSession * sess, ReportData * data)
 
     /* fill in sender report info */
     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
-        ntptime, rtptime, packet_count, octet_count);
+        sess->timestamp_sender_reports ? ntptime : 0,
+        sess->timestamp_sender_reports ? rtptime : 0,
+        packet_count, octet_count);
   } else {
     /* we are only receiver, create RR */
     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
@@ -3355,8 +3502,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
     return;
   }
 
-  /* only report about other sender */
-  if (source == data->source)
+  /* only report about remote sources */
+  if (source->internal)
     goto reported;
 
   if (!RTP_SOURCE_IS_SENDER (source)) {
@@ -3364,6 +3511,11 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
     goto reported;
   }
 
+  if (source->disable_rtcp) {
+    GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+    goto reported;
+  }
+
   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
 
   /* get new stats */
@@ -3490,15 +3642,64 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data)
 static void
 session_nack (const gchar * key, RTPSource * source, ReportData * data)
 {
+  RTPSession *sess = data->sess;
   GstRTCPBuffer *rtcp = &data->rtcpbuf;
   GstRTCPPacket *packet = &data->packet;
-  guint32 *nacks;
-  guint n_nacks, i;
+  guint16 *nacks;
+  GstClockTime *nack_deadlines;
+  guint n_nacks, i = 0;
+  guint nacked_seqnums = 0;
+  guint16 n_fb_nacks = 0;
   guint8 *fci_data;
 
   if (!source->send_nack)
     return;
 
+  nacks = rtp_source_get_nacks (source, &n_nacks);
+  nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
+  GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
+      GST_TIME_ARGS (data->current_time));
+
+  /* cleanup expired nacks */
+  for (i = 0; i < n_nacks; i++) {
+    GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
+        GST_TIME_ARGS (nack_deadlines[i]));
+    if (nack_deadlines[i] >= data->current_time)
+      break;
+  }
+
+  if (data->is_early) {
+    /* don't remove them all if this is an early RTCP packet. It may happen
+     * that the NACKs are late due to high RTT, not sending NACKs at all would
+     * keep the RTX RTT stats high and maintain a dropping state. */
+    i = MIN (n_nacks - 1, i);
+  }
+
+  if (i) {
+    GST_WARNING ("Removing %u expired NACKS", i);
+    rtp_source_clear_nacks (source, i);
+    n_nacks -= i;
+    if (n_nacks == 0)
+      return;
+  }
+
+  /* allow overriding NACK to packet conversion */
+  if (g_signal_has_handler_pending (sess,
+          rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
+    /* this is needed as it will actually resize the buffer */
+    gst_rtcp_buffer_unmap (rtcp);
+
+    g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
+        data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
+        &nacked_seqnums);
+
+    /* and now remap for the remaining work */
+    gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
+
+    if (nacked_seqnums > 0)
+      goto done;
+  }
+
   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
     /* exit because the packet is full, will put next request in a
      * further packet */
@@ -3508,19 +3709,46 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data)
   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
 
-  nacks = rtp_source_get_nacks (source, &n_nacks);
-  GST_DEBUG ("%u NACKs", n_nacks);
-  if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
+  if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
+    gst_rtcp_packet_remove (packet);
+    GST_WARNING ("no nacks fit in the packet");
     return;
+  }
 
   fci_data = gst_rtcp_packet_fb_get_fci (packet);
-  for (i = 0; i < n_nacks; i++) {
-    GST_WRITE_UINT32_BE (fci_data, nacks[i]);
+  for (i = 0; i < n_nacks; i = nacked_seqnums) {
+    guint16 seqnum = nacks[i];
+    guint16 blp = 0;
+    guint j;
+
+    if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
+      break;
+
+    n_fb_nacks++;
+    nacked_seqnums++;
+
+    for (j = i + 1; j < n_nacks; j++) {
+      gint diff;
+
+      diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
+      GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
+      if (diff > 16)
+        break;
+
+      blp |= 1 << (diff - 1);
+      nacked_seqnums++;
+    }
+
+    GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
     fci_data += 4;
-    data->nacked_seqnums++;
   }
 
-  rtp_source_clear_nacks (source);
+  GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
+  source->stats.sent_nack_count += n_fb_nacks;
+
+done:
+  data->nacked_seqnums += nacked_seqnums;
+  rtp_source_clear_nacks (source, nacked_seqnums);
   data->may_suppress = FALSE;
 }
 
@@ -3541,8 +3769,8 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
   /* check for outdated collisions */
   if (source->internal) {
     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
-    rtp_source_timeout (source, data->current_time,
-        data->running_time - sess->rtcp_feedback_retention_window);
+    rtp_source_timeout (source, data->current_time, data->running_time,
+        sess->rtcp_feedback_retention_window);
   }
 
   /* nothing else to do when without RTCP */
@@ -3764,8 +3992,8 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
   else
     data->is_early = FALSE;
 
-  if (data->is_early && sess->next_early_rtcp_time < current_time) {
-    GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
+  if (data->is_early && sess->next_early_rtcp_time <= current_time) {
+    GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
         GST_TIME_ARGS (current_time));
   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
@@ -3880,6 +4108,12 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
   if (sess->scheduled_bye && !source->marked_bye)
     return;
 
+  /* skip if RTCP is disabled */
+  if (source->disable_rtcp) {
+    GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+    return;
+  }
+
   data->source = source;
 
   /* open packet */
@@ -3941,6 +4175,47 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data)
   }
 }
 
+static void
+schedule_remaining_nacks (const gchar * key, RTPSource * source,
+    ReportData * data)
+{
+  RTPSession *sess = data->sess;
+  GstClockTime *nack_deadlines;
+  GstClockTime deadline;
+  guint n_nacks;
+
+  if (!source->send_nack)
+    return;
+
+  /* the scheduling is entirely based on available bandwidth, just take the
+   * biggest seqnum, which will have the largest deadline to request early
+   * RTCP. */
+  nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
+  deadline = nack_deadlines[n_nacks - 1];
+  RTP_SESSION_UNLOCK (sess);
+  rtp_session_send_rtcp_with_deadline (sess, deadline);
+  RTP_SESSION_LOCK (sess);
+}
+
+static gboolean
+rtp_session_are_all_sources_bye (RTPSession * sess)
+{
+  GHashTableIter iter;
+  RTPSource *src;
+
+  RTP_SESSION_LOCK (sess);
+  g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
+  while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
+    if (src->internal && !src->sent_bye) {
+      RTP_SESSION_UNLOCK (sess);
+      return FALSE;
+    }
+  }
+  RTP_SESSION_UNLOCK (sess);
+
+  return TRUE;
+}
+
 /**
  * rtp_session_on_timeout:
  * @sess: an #RTPSession
@@ -3967,6 +4242,7 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   ReportData data = { GST_RTCP_BUFFER_INIT };
   GHashTable *table_copy;
   ReportOutput *output;
+  gboolean all_empty = FALSE;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
@@ -4030,6 +4306,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   if (!is_rtcp_time (sess, current_time, &data))
     goto done;
 
+  /* check if all the buffers are empty afer generation */
+  all_empty = TRUE;
+
   GST_DEBUG
       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
@@ -4055,7 +4334,7 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
 
   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
-      GST_TIME_ARGS (sess->last_rtcp_send_time),
+      GST_TIME_ARGS (sess->last_rtcp_check_time),
       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
   sess->last_rtcp_check_time = data.current_time;
   sess->first_rtcp = FALSE;
@@ -4080,8 +4359,8 @@ done:
 
     empty_buffer = gst_buffer_get_size (buffer) == 0;
 
-    if (empty_buffer)
-      g_warning ("rtpsession: Trying to send an empty RTCP packet");
+    if (!empty_buffer)
+      all_empty = FALSE;
 
     if (sess->callbacks.send_rtcp &&
         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
@@ -4093,8 +4372,8 @@ done:
       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
           sess->stats.avg_rtcp_packet_size, packet_size);
       result =
-          sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
-          sess->send_rtcp_user_data);
+          sess->callbacks.send_rtcp (sess, source, buffer,
+          rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
 
       RTP_SESSION_LOCK (sess);
       sess->stats.nacks_sent += data.nacked_seqnums;
@@ -4115,6 +4394,16 @@ done:
     g_object_unref (source);
     g_slice_free (ReportOutput, output);
   }
+
+  if (all_empty)
+    GST_ERROR ("generated empty RTCP messages for all the sources");
+
+  /* schedule remaining nacks */
+  RTP_SESSION_LOCK (sess);
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) schedule_remaining_nacks, &data);
+  RTP_SESSION_UNLOCK (sess);
+
   return result;
 }
 
@@ -4287,6 +4576,35 @@ end:
 }
 
 static gboolean
+rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
+    GstClockTime max_delay)
+{
+  /* notify the application that we intend to send early RTCP */
+  if (sess->callbacks.notify_early_rtcp)
+    sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
+
+  return rtp_session_request_early_rtcp (sess, now, max_delay);
+}
+
+static gboolean
+rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
+{
+  GstClockTime now, max_delay;
+
+  if (!sess->callbacks.send_rtcp)
+    return FALSE;
+
+  now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+  if (deadline < now)
+    return FALSE;
+
+  max_delay = deadline - now;
+
+  return rtp_session_send_rtcp_internal (sess, now, max_delay);
+}
+
+static gboolean
 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
 {
   GstClockTime now;
@@ -4296,7 +4614,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
 
   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
 
-  return rtp_session_request_early_rtcp (sess, now, max_delay);
+  return rtp_session_send_rtcp_internal (sess, now, max_delay);
 }
 
 gboolean
@@ -4305,11 +4623,6 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
 {
   RTPSource *src;
 
-  if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
-    GST_DEBUG ("FIR/PLI not sent");
-    return FALSE;
-  }
-
   RTP_SESSION_LOCK (sess);
   src = find_source (sess, ssrc);
   if (src == NULL)
@@ -4327,6 +4640,10 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
   }
   RTP_SESSION_UNLOCK (sess);
 
+  if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+    GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
+  }
+
   return TRUE;
 
   /* ERRORS */
@@ -4353,21 +4670,27 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
     GstClockTime max_delay)
 {
   RTPSource *source;
+  GstClockTime now;
 
-  if (!rtp_session_send_rtcp (sess, max_delay)) {
-    GST_DEBUG ("NACK not sent");
+  if (!sess->callbacks.send_rtcp)
     return FALSE;
-  }
+
+  now = sess->callbacks.request_time (sess, sess->request_time_user_data);
 
   RTP_SESSION_LOCK (sess);
   source = find_source (sess, ssrc);
   if (source == NULL)
     goto no_source;
 
-  GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
-  rtp_source_register_nack (source, seqnum);
+  GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
+      ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
+  rtp_source_register_nack (source, seqnum, now + max_delay);
   RTP_SESSION_UNLOCK (sess);
 
+  if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
+    GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
+  }
+
   return TRUE;
 
   /* ERRORS */