rtpsource: Retain RTCP Feedback packets for a specified amount of time
authorOlivier CrĂȘte <olivier.crete@collabora.co.uk>
Tue, 22 Jun 2010 17:33:32 +0000 (13:33 -0400)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 1 Feb 2011 17:28:51 +0000 (18:28 +0100)
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h

index 5938d55..ec40115 100644 (file)
@@ -58,6 +58,7 @@ enum
 #define DEFAULT_NUM_ACTIVE_SOURCES   0
 #define DEFAULT_SOURCES              NULL
 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
+#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
 
 enum
 {
@@ -75,6 +76,7 @@ enum
   PROP_SOURCES,
   PROP_FAVOR_NEW,
   PROP_RTCP_MIN_INTERVAL,
+  PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
   PROP_LAST
 };
 
@@ -390,6 +392,15 @@ rtp_session_class_init (RTPSessionClass * klass)
           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class,
+      PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
+      g_param_spec_uint64 ("rtcp-feedback-retention-window",
+          "RTCP Feedback retention window",
+          "Duration during which RTCP Feedback packets are retained (in ns)",
+          0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+
   klass->get_source_by_ssrc =
       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
 
@@ -444,6 +455,7 @@ rtp_session_init (RTPSession * sess)
 
   sess->first_rtcp = TRUE;
   sess->allow_early = TRUE;
+  sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
 
   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
 }
@@ -2005,12 +2017,22 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
       GST_BUFFER_TIMESTAMP (fci) = arrival->running_time;
     }
 
+    RTP_SESSION_UNLOCK (sess);
     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
         type, fbtype, sender_ssrc, media_ssrc, fci);
+    RTP_SESSION_LOCK (sess);
 
     if (fci)
       gst_buffer_unref (fci);
   }
+
+  if (sess->rtcp_feedback_retention_window) {
+    RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+        GINT_TO_POINTER (media_ssrc));
+
+    if (src)
+      rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
+  }
 }
 
 /**
@@ -2829,7 +2851,8 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   /* check for outdated collisions */
   GST_DEBUG ("Timing out collisions");
   rtp_source_timeout (sess->source, current_time,
-      data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT);
+      data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT,
+      running_time - sess->rtcp_feedback_retention_window);
 
   if (sess->change_ssrc) {
     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
index b63bbcc..6ae318e 100644 (file)
@@ -202,6 +202,7 @@ struct _RTPSession {
 
   gboolean      change_ssrc;
   gboolean      favor_new;
+  GstClockTime  rtcp_feedback_retention_window;
 };
 
 /**
index 4a3af16..423d2b6 100644 (file)
@@ -238,6 +238,8 @@ rtp_source_init (RTPSource * src)
   src->seqnum_base = -1;
   src->last_rtptime = -1;
 
+  src->retained_feedback = g_queue_new ();
+
   rtp_source_reset (src);
 }
 
@@ -262,6 +264,10 @@ rtp_source_finalize (GObject * object)
   g_list_foreach (src->conflicting_addresses, (GFunc) g_free, NULL);
   g_list_free (src->conflicting_addresses);
 
+  while ((buffer = g_queue_pop_head (src->retained_feedback)))
+    gst_buffer_unref (buffer);
+  g_queue_free (src->retained_feedback);
+
   G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object);
 }
 
@@ -1719,14 +1725,18 @@ rtp_source_add_conflicting_address (RTPSource * src,
  * @src: The #RTPSource
  * @current_time: The current time
  * @collision_timeout: The amount of time after which a collision is timed out
+ * @feedback_retention_window: The running time before which retained feedback
+ * packets have to be discarded
  *
  * This is processed on each RTCP interval. It times out old collisions.
+ * It also times out old retained feedback packets
  */
 void
 rtp_source_timeout (RTPSource * src, GstClockTime current_time,
-    GstClockTime collision_timeout)
+    GstClockTime collision_timeout, GstClockTime feedback_retention_window)
 {
   GList *item;
+  GstRTCPPacket *pkt;
 
   item = g_list_first (src->conflicting_addresses);
   while (item) {
@@ -1744,4 +1754,41 @@ rtp_source_timeout (RTPSource * src, GstClockTime current_time,
     }
     item = next_item;
   }
+
+  /* Time out AVPF packets that are older than the desired length */
+  while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&
+      GST_BUFFER_TIMESTAMP (pkt) < feedback_retention_window)
+    gst_buffer_unref (g_queue_pop_tail (src->retained_feedback));
+}
+
+static gint
+compare_buffers (gconstpointer a, gconstpointer b, gpointer user_data)
+{
+  const GstBuffer *bufa = a;
+  const GstBuffer *bufb = b;
+
+  return GST_BUFFER_TIMESTAMP (bufa) - GST_BUFFER_TIMESTAMP (bufb);
+}
+
+void
+rtp_source_retain_rtcp_packet (RTPSource * src, GstRTCPPacket * packet,
+    GstClockTime running_time)
+{
+  GstBuffer *buffer;
+
+  buffer = gst_buffer_create_sub (packet->buffer, packet->offset,
+      (gst_rtcp_packet_get_length (packet) + 1) * 4);
+
+  GST_BUFFER_TIMESTAMP (buffer) = running_time;
+
+  g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL);
+}
+
+gboolean
+rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data)
+{
+  if (g_queue_find_custom (src->retained_feedback, data, func))
+    return TRUE;
+  else
+    return FALSE;
 }
index 94c22b3..fadbe30 100644 (file)
@@ -170,6 +170,8 @@ struct _RTPSource {
   RTPReceiverReport last_rr;
 
   GList         *conflicting_addresses;
+
+  GQueue        *retained_feedback;
 };
 
 struct _RTPSourceClass {
@@ -248,7 +250,16 @@ void            rtp_source_add_conflicting_address (RTPSource * src,
 
 void            rtp_source_timeout             (RTPSource * src,
                                                 GstClockTime current_time,
-                                                GstClockTime collision_timeout);
+                                                GstClockTime collision_timeout,
+                                                GstClockTime feedback_retention_window);
+
+void            rtp_source_retain_rtcp_packet  (RTPSource * src,
+                                                GstRTCPPacket *pkt,
+                                                GstClockTime running_time);
+
+gboolean        rtp_source_has_retained        (RTPSource * src,
+                                                GCompareFunc func,
+                                                gconstpointer data);
 
 
 #endif /* __RTP_SOURCE_H__ */