else \
(avg) = ((val) + (15 * (avg))) >> 4;
+/* The number RTCP intervals after which to timeout entries in the
+ * collision table
+ */
+#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
+
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
+static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess,
+ const gchar * reason);
+static GstClockTime calculate_rtcp_interval (RTPSession * sess,
+ gboolean deterministic, gboolean first);
static void
rtp_session_class_init (RTPSessionClass * klass)
(RTPSourceClockRate) source_clock_rate,
};
+/**
+ * find_add_conflicting_addresses:
+ * @sess: The session to check in
+ * @arrival: The arrival stats for the buffer
+ *
+ * Checks if an address which has a conflict is already known,
+ * otherwise remembers it to prevent loops.
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+
static gboolean
-check_collision (RTPSession * sess, RTPSource * source,
- RTPArrivalStats * arrival)
+find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival)
{
- /* FIXME, do collision check */
+ GList *item;
+ RTPConflictingAddress *new_conflict;
+
+ for (item = g_list_first (sess->conflicting_addresses);
+ item; item = g_list_next (item)) {
+ RTPConflictingAddress *known_conflict = item->data;
+
+ if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) {
+ known_conflict->time = arrival->time;
+ return TRUE;
+ }
+ }
+
+ new_conflict = g_new0 (RTPConflictingAddress, 1);
+
+ memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress));
+ new_conflict->time = arrival->time;
+
+ sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses,
+ new_conflict);
+
return FALSE;
}
+static gboolean
+check_collision (RTPSession * sess, RTPSource * source,
+ RTPArrivalStats * arrival, gboolean rtp)
+{
+ /* If we have not arrival address, we can't do collision checking */
+ if (!arrival->have_address) {
+ return FALSE;
+ }
+
+ if (sess->source != source) {
+ /* This is not our local source, but lets check if two remote
+ * source collide
+ */
+
+ if (rtp) {
+ if (source->have_rtp_from) {
+ if (gst_netaddress_equal (&source->rtp_from, &arrival->address))
+ /* Address is the same */
+ return FALSE;
+ } else {
+ /* We don't already have a from address for RTP, just set it */
+ rtp_source_set_rtp_from (source, &arrival->address);
+ return FALSE;
+ }
+ } else {
+ if (source->have_rtcp_from) {
+ if (gst_netaddress_equal (&source->rtcp_from, &arrival->address))
+ /* Address is the same */
+ return FALSE;
+ } else {
+ /* We don't already have a from address for RTCP, just set it */
+ rtp_source_set_rtcp_from (source, &arrival->address);
+ return FALSE;
+ }
+ }
+
+ /* In this case, we have third-party collision or loop */
+
+ /* FIXME: Log 3rd party collision somehow
+ * Maybe should be done in upper layer, only the SDES can tell us
+ * if its a collision or a loop
+ */
+ } else {
+ /* This is sending with our ssrc, is it an address we already know */
+
+ if (find_add_conflicting_addresses (sess, arrival)) {
+ /* Its a known conflict, its probably a loop, not a collision
+ * lets just drop the incoming packet
+ */
+ GST_DEBUG ("Our packets are being looped back to us, dropping");
+ } else {
+ /* Its a new collision, lets change our SSRC */
+
+ GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
+ on_ssrc_collision (sess, source);
+
+ rtp_session_send_bye_locked (sess, "SSRC Collision");
+
+ sess->change_ssrc = TRUE;
+ }
+ }
+
+ return TRUE;
+}
+
+
/* must be called with the session lock */
static RTPSource *
obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
} else {
*created = FALSE;
/* check for collision, this updates the address when not previously set */
- if (check_collision (sess, source, arrival))
- on_ssrc_collision (sess, source);
+ if (check_collision (sess, source, arrival, rtp)) {
+ return NULL;
+ }
}
/* update last activity */
source->last_activity = arrival->time;
return result;
}
+static guint32
+rtp_session_create_new_ssrc (RTPSession * sess)
+{
+ guint32 ssrc;
+
+ while (TRUE) {
+ ssrc = g_random_int ();
+
+ /* see if it exists in the session, we're done if it doesn't */
+ if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (ssrc)) == NULL)
+ break;
+ }
+
+ return ssrc;
+}
+
+
/**
* rtp_session_create_source:
* @sess: an #RTPSession
RTPSource *source;
RTP_SESSION_LOCK (sess);
- while (TRUE) {
- ssrc = g_random_int ();
-
- /* see if it exists in the session, we're done if it doesn't */
- if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (ssrc)) == NULL)
- break;
- }
+ ssrc = rtp_session_create_new_ssrc (sess);
source = rtp_source_new (ssrc);
g_object_ref (source);
rtp_source_set_callbacks (source, &callbacks, sess);
ssrc = gst_rtp_buffer_get_ssrc (buffer);
source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
+ if (!source)
+ goto collision;
+
prevsender = RTP_SOURCE_IS_SENDER (source);
prevactive = RTP_SOURCE_IS_ACTIVE (source);
GST_DEBUG ("ignoring RTP packet because we are leaving");
return GST_FLOW_OK;
}
+collision:
+ {
+ gst_buffer_unref (buffer);
+ RTP_SESSION_UNLOCK (sess);
+ GST_DEBUG ("ignoring packet because its collisioning");
+ return GST_FLOW_OK;
+ }
}
static void
source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ if (!source)
+ return;
+
GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
prevsender = RTP_SOURCE_IS_SENDER (source);
source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ if (!source)
+ return;
+
if (created)
on_new_ssrc (sess, source);
source = obtain_source (sess, ssrc, &created, arrival, FALSE);
changed = FALSE;
+ if (!source)
+ return;
+
more_entries = gst_rtcp_packet_sdes_first_entry (packet);
j = 0;
while (more_entries) {
/* find src and mark bye, no probation when dealing with RTCP */
source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+ if (!source)
+ return;
+
/* store time for when we need to time out this source */
source->bye_time = arrival->time;
}
/**
- * rtp_session_send_bye:
+ * rtp_session_send_bye_locked:
* @sess: an #RTPSession
* @reason: a reason or NULL
*
* Stop the current @sess and schedule a BYE message for the other members.
*
+ * One must have the session lock to call this function
+ *
* Returns: a #GstFlowReturn.
*/
-GstFlowReturn
-rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+static GstFlowReturn
+rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason)
{
GstFlowReturn result = GST_FLOW_OK;
RTPSource *source;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
- RTP_SESSION_LOCK (sess);
source = sess->source;
/* ignore more BYEs */
if (sess->callbacks.reconsider)
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
done:
+
+ return result;
+}
+
+/**
+ * rtp_session_send_bye:
+ * @sess: an #RTPSession
+ * @reason: a reason or NULL
+ *
+ * Stop the current @sess and schedule a BYE message for the other members.
+ *
+ * One must have the session lock to call this function
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+{
+ GstFlowReturn result = GST_FLOW_OK;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+
+ RTP_SESSION_LOCK (sess);
+ result = rtp_session_send_bye_locked (sess, reason);
RTP_SESSION_UNLOCK (sess);
return result;
rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
{
GstFlowReturn result = GST_FLOW_OK;
+ GList *item;
ReportData data;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
}
+
+ /* check for outdated collisions */
+ item = g_list_first (sess->conflicting_addresses);
+ while (item) {
+ RTPConflictingAddress *known_conflict = item->data;
+ GList *next_item = g_list_next (item);
+
+ if (known_conflict->time < time - (data.interval *
+ RTCP_INTERVAL_COLLISION_TIMEOUT)) {
+ sess->conflicting_addresses =
+ g_list_delete_link (sess->conflicting_addresses, item);
+ g_free (known_conflict);
+ }
+ item = next_item;
+ }
+
+ if (sess->change_ssrc) {
+ g_hash_table_steal (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (sess->source->ssrc));
+
+ sess->source->ssrc = rtp_session_create_new_ssrc (sess);
+ rtp_source_reset (sess->source);
+
+ g_hash_table_insert (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (sess->source->ssrc), sess->source);
+
+ g_free (sess->bye_reason);
+ sess->bye_reason = NULL;
+ sess->sent_bye = FALSE;
+ sess->change_ssrc = FALSE;
+ }
RTP_SESSION_UNLOCK (sess);
/* push out the RTCP packet */