gst/rtpmanager/rtpsession.*: Implement collision and loop detection in rtpmanager.
authorOlivier Crete <tester@tester.ca>
Tue, 11 Mar 2008 12:40:58 +0000 (12:40 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:34 +0000 (02:30 +0100)
Original commit message from CVS:
Patch by: Olivier Crete <tester at tester dot ca>
* gst/rtpmanager/rtpsession.c: (find_add_conflicting_addresses),
(check_collision), (obtain_source), (rtp_session_create_new_ssrc),
(rtp_session_create_source), (rtp_session_process_rtp),
(rtp_session_process_sr), (rtp_session_process_rr),
(rtp_session_process_sdes), (rtp_session_process_bye),
(rtp_session_send_bye_locked), (rtp_session_send_bye),
(rtp_session_on_timeout):
* gst/rtpmanager/rtpsession.h:
Implement collision and loop detection in rtpmanager.
Fixes #520626.
* gst/rtpmanager/rtpsource.c: (rtp_source_reset),
(rtp_source_init):
* gst/rtpmanager/rtpsource.h:
Add method to reset stats.

gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h

index 3b2951a..0eaf3e6 100644 (file)
@@ -82,6 +82,11 @@ enum
   else                                         \
    (avg) = ((val) + (15 * (avg))) >> 4;
 
+/* The number RTCP intervals after which to timeout entries in the
+ * collision table
+ */
+#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
+
 /* GObject vmethods */
 static void rtp_session_finalize (GObject * object);
 static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -95,6 +100,10 @@ G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
 
 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
+static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess,
+    const gchar * reason);
+static GstClockTime calculate_rtcp_interval (RTPSession * sess,
+    gboolean deterministic, gboolean first);
 
 static void
 rtp_session_class_init (RTPSessionClass * klass)
@@ -852,14 +861,110 @@ static RTPSourceCallbacks callbacks = {
   (RTPSourceClockRate) source_clock_rate,
 };
 
+/**
+ * find_add_conflicting_addresses:
+ * @sess: The session to check in
+ * @arrival: The arrival stats for the buffer
+ *
+ * Checks if an address which has a conflict is already known,
+ *  otherwise remembers it to prevent loops.
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+
 static gboolean
-check_collision (RTPSession * sess, RTPSource * source,
-    RTPArrivalStats * arrival)
+find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival)
 {
-  /* FIXME, do collision check */
+  GList *item;
+  RTPConflictingAddress *new_conflict;
+
+  for (item = g_list_first (sess->conflicting_addresses);
+      item; item = g_list_next (item)) {
+    RTPConflictingAddress *known_conflict = item->data;
+
+    if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) {
+      known_conflict->time = arrival->time;
+      return TRUE;
+    }
+  }
+
+  new_conflict = g_new0 (RTPConflictingAddress, 1);
+
+  memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress));
+  new_conflict->time = arrival->time;
+
+  sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses,
+      new_conflict);
+
   return FALSE;
 }
 
+static gboolean
+check_collision (RTPSession * sess, RTPSource * source,
+    RTPArrivalStats * arrival, gboolean rtp)
+{
+  /* If we have not arrival address, we can't do collision checking */
+  if (!arrival->have_address) {
+    return FALSE;
+  }
+
+  if (sess->source != source) {
+    /* This is not our local source, but lets check if two remote
+     * source collide
+     */
+
+    if (rtp) {
+      if (source->have_rtp_from) {
+        if (gst_netaddress_equal (&source->rtp_from, &arrival->address))
+          /* Address is the same */
+          return FALSE;
+      } else {
+        /* We don't already have a from address for RTP, just set it */
+        rtp_source_set_rtp_from (source, &arrival->address);
+        return FALSE;
+      }
+    } else {
+      if (source->have_rtcp_from) {
+        if (gst_netaddress_equal (&source->rtcp_from, &arrival->address))
+          /* Address is the same */
+          return FALSE;
+      } else {
+        /* We don't already have a from address for RTCP, just set it */
+        rtp_source_set_rtcp_from (source, &arrival->address);
+        return FALSE;
+      }
+    }
+
+    /* In this case, we have third-party collision or loop */
+
+    /* FIXME: Log 3rd party collision somehow
+     * Maybe should be done in upper layer, only the SDES can tell us
+     * if its a collision or a loop
+     */
+  } else {
+    /* This is sending with our ssrc, is it an address we already know */
+
+    if (find_add_conflicting_addresses (sess, arrival)) {
+      /* Its a known conflict, its probably a loop, not a collision
+       * lets just drop the incoming packet
+       */
+      GST_DEBUG ("Our packets are being looped back to us, dropping");
+    } else {
+      /* Its a new collision, lets change our SSRC */
+
+      GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
+      on_ssrc_collision (sess, source);
+
+      rtp_session_send_bye_locked (sess, "SSRC Collision");
+
+      sess->change_ssrc = TRUE;
+    }
+  }
+
+  return TRUE;
+}
+
+
 /* must be called with the session lock */
 static RTPSource *
 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
@@ -901,8 +1006,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
   } else {
     *created = FALSE;
     /* check for collision, this updates the address when not previously set */
-    if (check_collision (sess, source, arrival))
-      on_ssrc_collision (sess, source);
+    if (check_collision (sess, source, arrival, rtp)) {
+      return NULL;
+    }
   }
   /* update last activity */
   source->last_activity = arrival->time;
@@ -1066,6 +1172,24 @@ rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
   return result;
 }
 
+static guint32
+rtp_session_create_new_ssrc (RTPSession * sess)
+{
+  guint32 ssrc;
+
+  while (TRUE) {
+    ssrc = g_random_int ();
+
+    /* see if it exists in the session, we're done if it doesn't */
+    if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+            GINT_TO_POINTER (ssrc)) == NULL)
+      break;
+  }
+
+  return ssrc;
+}
+
+
 /**
  * rtp_session_create_source:
  * @sess: an #RTPSession
@@ -1082,14 +1206,7 @@ rtp_session_create_source (RTPSession * sess)
   RTPSource *source;
 
   RTP_SESSION_LOCK (sess);
-  while (TRUE) {
-    ssrc = g_random_int ();
-
-    /* see if it exists in the session, we're done if it doesn't */
-    if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
-            GINT_TO_POINTER (ssrc)) == NULL)
-      break;
-  }
+  ssrc = rtp_session_create_new_ssrc (sess);
   source = rtp_source_new (ssrc);
   g_object_ref (source);
   rtp_source_set_callbacks (source, &callbacks, sess);
@@ -1176,6 +1293,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   ssrc = gst_rtp_buffer_get_ssrc (buffer);
   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
 
+  if (!source)
+    goto collision;
+
   prevsender = RTP_SOURCE_IS_SENDER (source);
   prevactive = RTP_SOURCE_IS_ACTIVE (source);
 
@@ -1246,6 +1366,13 @@ ignore:
     GST_DEBUG ("ignoring RTP packet because we are leaving");
     return GST_FLOW_OK;
   }
+collision:
+  {
+    gst_buffer_unref (buffer);
+    RTP_SESSION_UNLOCK (sess);
+    GST_DEBUG ("ignoring packet because its collisioning");
+    return GST_FLOW_OK;
+  }
 }
 
 static void
@@ -1303,6 +1430,9 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
 
   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
 
+  if (!source)
+    return;
+
   GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -1343,6 +1473,9 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
 
   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
 
+  if (!source)
+    return;
+
   if (created)
     on_new_ssrc (sess, source);
 
@@ -1375,6 +1508,9 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
     changed = FALSE;
 
+    if (!source)
+      return;
+
     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
     j = 0;
     while (more_entries) {
@@ -1428,6 +1564,9 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
     /* find src and mark bye, no probation when dealing with RTCP */
     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
 
+    if (!source)
+      return;
+
     /* store time for when we need to time out this source */
     source->bye_time = arrival->time;
 
@@ -1677,16 +1816,18 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
 }
 
 /**
- * rtp_session_send_bye:
+ * rtp_session_send_bye_locked:
  * @sess: an #RTPSession
  * @reason: a reason or NULL
  *
  * Stop the current @sess and schedule a BYE message for the other members.
  *
+ * One must have the session lock to call this function
+ *
  * Returns: a #GstFlowReturn.
  */
-GstFlowReturn
-rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+static GstFlowReturn
+rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason)
 {
   GstFlowReturn result = GST_FLOW_OK;
   RTPSource *source;
@@ -1695,7 +1836,6 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason)
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
-  RTP_SESSION_LOCK (sess);
   source = sess->source;
 
   /* ignore more BYEs */
@@ -1728,6 +1868,30 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason)
   if (sess->callbacks.reconsider)
     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
 done:
+
+  return result;
+}
+
+/**
+ * rtp_session_send_bye:
+ * @sess: an #RTPSession
+ * @reason: a reason or NULL
+ *
+ * Stop the current @sess and schedule a BYE message for the other members.
+ *
+ * One must have the session lock to call this function
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+{
+  GstFlowReturn result = GST_FLOW_OK;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+
+  RTP_SESSION_LOCK (sess);
+  result = rtp_session_send_bye_locked (sess, reason);
   RTP_SESSION_UNLOCK (sess);
 
   return result;
@@ -2051,6 +2215,7 @@ GstFlowReturn
 rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
 {
   GstFlowReturn result = GST_FLOW_OK;
+  GList *item;
   ReportData data;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
@@ -2102,6 +2267,37 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
   }
+
+  /* check for outdated collisions */
+  item = g_list_first (sess->conflicting_addresses);
+  while (item) {
+    RTPConflictingAddress *known_conflict = item->data;
+    GList *next_item = g_list_next (item);
+
+    if (known_conflict->time < time - (data.interval *
+            RTCP_INTERVAL_COLLISION_TIMEOUT)) {
+      sess->conflicting_addresses =
+          g_list_delete_link (sess->conflicting_addresses, item);
+      g_free (known_conflict);
+    }
+    item = next_item;
+  }
+
+  if (sess->change_ssrc) {
+    g_hash_table_steal (sess->ssrcs[sess->mask_idx],
+        GINT_TO_POINTER (sess->source->ssrc));
+
+    sess->source->ssrc = rtp_session_create_new_ssrc (sess);
+    rtp_source_reset (sess->source);
+
+    g_hash_table_insert (sess->ssrcs[sess->mask_idx],
+        GINT_TO_POINTER (sess->source->ssrc), sess->source);
+
+    g_free (sess->bye_reason);
+    sess->bye_reason = NULL;
+    sess->sent_bye = FALSE;
+    sess->change_ssrc = FALSE;
+  }
   RTP_SESSION_UNLOCK (sess);
 
   /* push out the RTCP packet */
index 5970330..e14e2da 100644 (file)
@@ -140,6 +140,20 @@ typedef struct {
 } RTPSessionCallbacks;
 
 /**
+ * RTPConflictingAddress:
+ * @address: #GstNetAddress which conflicted
+ * @last_conflict_time: time when the last conflict was seen
+ *
+ * This structure is used to account for addresses that have conflicted to find
+ * loops.
+ */
+
+typedef struct {
+  GstNetAddress address;
+  GstClockTime time;
+} RTPConflictingAddress;
+
+/**
  * RTPSession:
  * @lock: lock to protect the session
  * @source: the source of this session
@@ -149,6 +163,8 @@ typedef struct {
  * @activecount: the number of active sources
  * @callbacks: callbacks
  * @user_data: user data passed in callbacks
+ * @stats: session statistics
+ * @conflicting_addresses: GList of conflicting addresses
  *
  * The RTP session manager object
  */
@@ -187,6 +203,9 @@ struct _RTPSession {
 
   RTPSessionStats stats;
 
+  GList         *conflicting_addresses;
+  gboolean      change_ssrc;
+
   /* for mapping clock time to NTP time */
   GstClockTime  base_time;
 };
index 1938324..938a1d5 100644 (file)
@@ -142,6 +142,24 @@ rtp_source_class_init (RTPSourceClass * klass)
   GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
 }
 
+/**
+ * rtp_source_reset:
+ * @src: an #RTPSource
+ *
+ * Reset the stats of @src.
+ */
+void
+rtp_source_reset (RTPSource * src)
+{
+  src->received_bye = FALSE;
+
+  src->stats.cycles = -1;
+  src->stats.jitter = 0;
+  src->stats.transit = -1;
+  src->stats.curr_sr = 0;
+  src->stats.curr_rr = 0;
+}
+
 static void
 rtp_source_init (RTPSource * src)
 {
@@ -157,11 +175,7 @@ rtp_source_init (RTPSource * src)
   src->seqnum_base = -1;
   src->last_rtptime = -1;
 
-  src->stats.cycles = -1;
-  src->stats.jitter = 0;
-  src->stats.transit = -1;
-  src->stats.curr_sr = 0;
-  src->stats.curr_rr = 0;
+  rtp_source_reset (src);
 }
 
 static void
index b731ae6..1eae0c1 100644 (file)
@@ -215,4 +215,6 @@ gboolean        rtp_source_get_last_rb         (RTPSource *src, guint8 *fraction
                                                 guint32 *exthighestseq, guint32 *jitter,
                                                 guint32 *lsr, guint32 *dlsr);
 
+void            rtp_source_reset               (RTPSource * src);
+
 #endif /* __RTP_SOURCE_H__ */