rtpsession: Keep local conflicting addresses in the session
authorOlivier Crête <olivier.crete@ocrete.ca>
Sat, 3 May 2014 22:30:20 +0000 (18:30 -0400)
committerOlivier Crête <olivier.crete@ocrete.ca>
Sat, 3 May 2014 22:30:20 +0000 (18:30 -0400)
As we now replace the local RTPSource on a conflict, it's no longer possible
to keep local conflicts in the RTPSource, so they instead need to be kept
in the RTPSession.

Also fix the rtpcollision test to generate multiple collisions instead of
one by change the address, as otherwise we detected that it was a single one.

gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h
tests/check/elements/rtpcollision.c

index c294272..fd4ddb0 100644 (file)
@@ -100,11 +100,6 @@ enum
    (avg) = ((val) + (15 * (avg))) >> 4;
 
 
-/* The number RTCP intervals after which to timeout entries in the
- * collision table
- */
-#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
-
 /* GObject vmethods */
 static void rtp_session_finalize (GObject * object);
 static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -546,6 +541,9 @@ rtp_session_finalize (GObject * object)
 
   gst_structure_free (sess->sdes);
 
+  g_list_free_full (sess->conflicting_addresses,
+      (GDestroyNotify) rtp_conflicting_address_free);
+
   for (i = 0; i < 32; i++)
     g_hash_table_destroy (sess->ssrcs[i]);
 
@@ -1207,6 +1205,43 @@ static RTPSourceCallbacks callbacks = {
   (RTPSourceClockRate) source_clock_rate,
 };
 
+
+/**
+ * rtp_session_find_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to check for
+ * @time: The time when the packet that is possibly in conflict arrived
+ *
+ * Checks if an address which has a conflict is already known. If it is
+ * a known conflict, remember the time
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+static gboolean
+rtp_session_find_conflicting_address (RTPSession * session,
+    GSocketAddress * address, GstClockTime time)
+{
+  return find_conflicting_address (session->conflicting_addresses, address,
+      time);
+}
+
+/**
+ * rtp_session_add_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to remember
+ * @time: The time when the packet that is in conflict arrived
+ *
+ * Adds a new conflict address
+ */
+static void
+rtp_session_add_conflicting_address (RTPSession * sess,
+    GSocketAddress * address, GstClockTime time)
+{
+  sess->conflicting_addresses =
+      add_conflicting_address (sess->conflicting_addresses, address, time);
+}
+
+
 static gboolean
 check_collision (RTPSession * sess, RTPSource * source,
     RTPPacketInfo * pinfo, gboolean rtp)
@@ -1292,7 +1327,7 @@ check_collision (RTPSession * sess, RTPSource * source,
      */
   } else {
     /* This is sending with our ssrc, is it an address we already know */
-    if (rtp_source_find_conflicting_address (source, pinfo->address,
+    if (rtp_session_find_conflicting_address (sess, pinfo->address,
             pinfo->current_time)) {
       /* Its a known conflict, its probably a loop, not a collision
        * lets just drop the incoming packet
@@ -1300,7 +1335,7 @@ check_collision (RTPSession * sess, RTPSource * source,
       GST_DEBUG ("Our packets are being looped back to us, dropping");
     } else {
       /* Its a new collision, lets change our SSRC */
-      rtp_source_add_conflicting_address (source, pinfo->address,
+      rtp_session_add_conflicting_address (sess, pinfo->address,
           pinfo->current_time);
 
       GST_DEBUG ("Collision for SSRC %x", ssrc);
@@ -3179,8 +3214,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
   if (source->internal) {
     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
     rtp_source_timeout (source, data->current_time,
-        /* "a relatively long time" -- RFC 3550 section 8.2 */
-        RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
         data->running_time - sess->rtcp_feedback_retention_window);
   }
 
@@ -3622,6 +3655,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
     g_object_unref (source);
   }
 
+  sess->conflicting_addresses =
+      timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
+
   /* Make a local copy of the hashtable. We need to do this because the
    * cleanup stage below releases the session lock. */
   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
index 87388ae..b567ee4 100644 (file)
@@ -204,6 +204,7 @@ typedef struct {
  * @callbacks: callbacks
  * @user_data: user data passed in callbacks
  * @stats: session statistics
+ * @conflicting_addresses: GList of conflicting addresses
  *
  * The RTP session manager object
  */
@@ -269,6 +270,8 @@ struct _RTPSession {
   gboolean     last_keyframe_all_headers;
 
   gboolean      is_doing_ptp;
+
+  GList         *conflicting_addresses;
 };
 
 /**
index 9a56955..d47c85f 100644 (file)
@@ -267,11 +267,11 @@ rtp_source_init (RTPSource * src)
   rtp_source_reset (src);
 }
 
-static void
+void
 rtp_conflicting_address_free (RTPConflictingAddress * addr)
 {
   g_object_unref (addr->address);
-  g_free (addr);
+  g_slice_free (RTPConflictingAddress, addr);
 }
 
 static void
@@ -292,10 +292,8 @@ rtp_source_finalize (GObject * object)
 
   gst_caps_replace (&src->caps, NULL);
 
-  g_list_foreach (src->conflicting_addresses,
-      (GFunc) rtp_conflicting_address_free, NULL);
-  g_list_free (src->conflicting_addresses);
-
+  g_list_free_full (src->conflicting_addresses,
+      (GDestroyNotify) rtp_conflicting_address_free);
   while ((buffer = g_queue_pop_head (src->retained_feedback)))
     gst_buffer_unref (buffer);
   g_queue_free (src->retained_feedback);
@@ -1600,6 +1598,67 @@ rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost,
   return TRUE;
 }
 
+gboolean
+find_conflicting_address (GList * conflicting_addresses,
+    GSocketAddress * address, GstClockTime time)
+{
+  GList *item;
+
+  for (item = conflicting_addresses; item; item = g_list_next (item)) {
+    RTPConflictingAddress *known_conflict = item->data;
+
+    if (__g_socket_address_equal (address, known_conflict->address)) {
+      known_conflict->time = time;
+      return TRUE;
+    }
+  }
+
+  return FALSE;
+}
+
+GList *
+add_conflicting_address (GList * conflicting_addresses,
+    GSocketAddress * address, GstClockTime time)
+{
+  RTPConflictingAddress *new_conflict;
+
+  new_conflict = g_slice_new (RTPConflictingAddress);
+
+  new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address));
+  new_conflict->time = time;
+
+  return g_list_prepend (conflicting_addresses, new_conflict);
+}
+
+GList *
+timeout_conflicting_addresses (GList * conflicting_addresses,
+    GstClockTime current_time)
+{
+  GList *item;
+  /* "a relatively long time" -- RFC 3550 section 8.2 */
+  const GstClockTime collision_timeout =
+      RTP_STATS_MIN_INTERVAL * GST_SECOND * 10;
+
+  item = g_list_first (conflicting_addresses);
+  while (item) {
+    RTPConflictingAddress *known_conflict = item->data;
+    GList *next_item = g_list_next (item);
+
+    if (known_conflict->time < current_time - collision_timeout) {
+      gchar *buf;
+
+      conflicting_addresses = g_list_delete_link (conflicting_addresses, item);
+      buf = __g_socket_address_to_string (known_conflict->address);
+      GST_DEBUG ("collision %p timed out: %s", known_conflict, buf);
+      g_free (buf);
+      rtp_conflicting_address_free (known_conflict);
+    }
+    item = next_item;
+  }
+
+  return conflicting_addresses;
+}
+
 /**
  * rtp_source_find_conflicting_address:
  * @src: The source the packet came in
@@ -1615,19 +1674,7 @@ gboolean
 rtp_source_find_conflicting_address (RTPSource * src, GSocketAddress * address,
     GstClockTime time)
 {
-  GList *item;
-
-  for (item = g_list_first (src->conflicting_addresses);
-      item; item = g_list_next (item)) {
-    RTPConflictingAddress *known_conflict = item->data;
-
-    if (__g_socket_address_equal (address, known_conflict->address)) {
-      known_conflict->time = time;
-      return TRUE;
-    }
-  }
-
-  return FALSE;
+  return find_conflicting_address (src->conflicting_addresses, address, time);
 }
 
 /**
@@ -1642,22 +1689,14 @@ void
 rtp_source_add_conflicting_address (RTPSource * src,
     GSocketAddress * address, GstClockTime time)
 {
-  RTPConflictingAddress *new_conflict;
-
-  new_conflict = g_new0 (RTPConflictingAddress, 1);
-
-  new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address));
-  new_conflict->time = time;
-
-  src->conflicting_addresses = g_list_prepend (src->conflicting_addresses,
-      new_conflict);
+  src->conflicting_addresses =
+      add_conflicting_address (src->conflicting_addresses, address, time);
 }
 
 /**
  * rtp_source_timeout:
  * @src: The #RTPSource
  * @current_time: The current time
- * @collision_timeout: The amount of time after which a collision is timed out
  * @feedback_retention_window: The running time before which retained feedback
  * packets have to be discarded
  *
@@ -1666,29 +1705,12 @@ rtp_source_add_conflicting_address (RTPSource * src,
  */
 void
 rtp_source_timeout (RTPSource * src, GstClockTime current_time,
-    GstClockTime collision_timeout, GstClockTime feedback_retention_window)
+    GstClockTime feedback_retention_window)
 {
-  GList *item;
   GstRTCPPacket *pkt;
 
-  item = g_list_first (src->conflicting_addresses);
-  while (item) {
-    RTPConflictingAddress *known_conflict = item->data;
-    GList *next_item = g_list_next (item);
-
-    if (known_conflict->time < current_time - collision_timeout) {
-      gchar *buf;
-
-      src->conflicting_addresses =
-          g_list_delete_link (src->conflicting_addresses, item);
-      buf = __g_socket_address_to_string (known_conflict->address);
-      GST_DEBUG ("collision %p timed out: %s", known_conflict, buf);
-      g_free (buf);
-      g_object_unref (known_conflict->address);
-      g_free (known_conflict);
-    }
-    item = next_item;
-  }
+  src->conflicting_addresses =
+      timeout_conflicting_addresses (src->conflicting_addresses, current_time);
 
   /* Time out AVPF packets that are older than the desired length */
   while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&
index 9af7d8a..ed4adc9 100644 (file)
@@ -266,9 +266,20 @@ void            rtp_source_add_conflicting_address (RTPSource * src,
                                                 GSocketAddress *address,
                                                 GstClockTime time);
 
+gboolean        find_conflicting_address       (GList * conflicting_address,
+                                                GSocketAddress * address,
+                                                GstClockTime time);
+
+GList *         add_conflicting_address        (GList * conflicting_addresses,
+                                                GSocketAddress * address,
+                                                GstClockTime time);
+GList *         timeout_conflicting_addresses  (GList * conflicting_addresses,
+                                                GstClockTime current_time);
+
+void            rtp_conflicting_address_free   (RTPConflictingAddress * addr);
+
 void            rtp_source_timeout             (RTPSource * src,
                                                 GstClockTime current_time,
-                                                GstClockTime collision_timeout,
                                                 GstClockTime feedback_retention_window);
 
 void            rtp_source_retain_rtcp_packet  (RTPSource * src,
index 6c09ef1..e9528f9 100644 (file)
@@ -76,10 +76,10 @@ message_received (GstBus * bus, GstMessage * message, GstPipeline * bin)
 }
 
 static GstBuffer *
-create_rtcp_app (guint32 ssrc)
+create_rtcp_app (guint32 ssrc, guint count)
 {
   GInetAddress *inet_addr_0;
-  guint16 port = 5678;
+  guint16 port = 5678 + count;
   GSocketAddress *socket_addr_0;
   GstBuffer *rtcp_buffer;
   GstRTCPPacket *rtcp_packet = NULL;
@@ -140,7 +140,7 @@ rtpsession_sinkpad_probe (GstPad * pad, GstPadProbeInfo * info,
      * (note that after being marked as collied the rtpsession ignores
      * all non bye packets)
      */
-    rtcp_buffer = create_rtcp_app (ssrc);
+    rtcp_buffer = create_rtcp_app (ssrc, nb_ssrc_changes);
 
     /* push collied packet on recv_rtcp_sink */
     gst_pad_push (srcpad, rtcp_buffer);
@@ -313,7 +313,7 @@ rtpsession_sinkpad_probe2 (GstPad * pad, GstPadProbeInfo * info,
      * all non bye packets)
      */
     if (i == 2) {
-      GstBuffer *rtcp_buffer = create_rtcp_app (rtx_ssrc_before);
+      GstBuffer *rtcp_buffer = create_rtcp_app (rtx_ssrc_before, 0);
 
       /* push collied packet on recv_rtcp_sink */
       gst_pad_push (srcpad, rtcp_buffer);