session: delay allocation of internal source
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 26 Jul 2013 08:24:22 +0000 (10:24 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 26 Jul 2013 10:18:01 +0000 (12:18 +0200)
Allocate the internal source when we receive a caps with the SSRC or when we see
a buffer with the SSRC.

gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h

index 56da482..8365d66 100644 (file)
@@ -463,8 +463,6 @@ rtp_session_init (RTPSession * sess)
 {
   gint i;
   gchar *str;
-  guint32 ssrc;
-  gboolean created;
 
   g_mutex_init (&sess->lock);
   sess->key = g_random_int ();
@@ -511,9 +509,8 @@ rtp_session_init (RTPSession * sess)
 
   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
 
-  /* create an active SSRC for this session manager */
-  ssrc = rtp_session_create_new_ssrc (sess);
-  sess->source = obtain_internal_source (sess, ssrc, &created);
+  /* this is the SSRC we suggest */
+  sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
 
   sess->first_rtcp = TRUE;
   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
@@ -540,7 +537,6 @@ rtp_session_finalize (GObject * object)
   for (i = 0; i < 32; i++)
     g_hash_table_destroy (sess->ssrcs[i]);
 
-  g_object_unref (sess->source);
   g_mutex_clear (&sess->lock);
 
   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
@@ -655,7 +651,8 @@ rtp_session_get_property (GObject * object, guint prop_id,
       g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
       break;
     case PROP_INTERNAL_SOURCE:
-      g_value_set_object (value, sess->source);
+      /* FIXME, return a random source */
+      g_value_set_object (value, NULL);
       break;
     case PROP_BANDWIDTH:
       g_value_set_double (value, sess->bandwidth);
@@ -1389,26 +1386,6 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
   return source;
 }
 
-static void
-rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
-{
-  if (ssrc != sess->source->ssrc) {
-    g_hash_table_steal (sess->ssrcs[sess->mask_idx],
-        GINT_TO_POINTER (sess->source->ssrc));
-
-    GST_DEBUG ("setting internal SSRC to %08x", ssrc);
-    /* After this call, any receiver of the old SSRC either in RTP or RTCP
-     * packets will timeout on the old SSRC, we could potentially schedule a
-     * BYE RTCP for the old SSRC... */
-    sess->source->ssrc = ssrc;
-    rtp_source_reset (sess->source);
-
-    /* rehash with the new SSRC */
-    g_hash_table_insert (sess->ssrcs[sess->mask_idx],
-        GINT_TO_POINTER (sess->source->ssrc), sess->source);
-  }
-}
-
 /**
  * rtp_session_suggest_ssrc:
  * @sess: a #RTPSession
@@ -1670,9 +1647,13 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
   gst_rtp_buffer_unmap (&rtp);
 
   RTP_SESSION_LOCK (sess);
+#if 0
+  /* FIXME, we should simply not update any stats on the BYE
+   * internal sources */
   /* ignore more RTP packets when we left the session */
   if (sess->source->marked_bye)
     goto ignore;
+#endif
 
   /* update arrival stats */
   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
@@ -1747,6 +1728,7 @@ invalid_packet:
     GST_DEBUG ("invalid RTP packet received");
     return GST_FLOW_OK;
   }
+#if 0
 ignore:
   {
     RTP_SESSION_UNLOCK (sess);
@@ -1754,6 +1736,7 @@ ignore:
     GST_DEBUG ("ignoring RTP packet because we are leaving");
     return GST_FLOW_OK;
   }
+#endif
 collision:
   {
     RTP_SESSION_UNLOCK (sess);
@@ -1775,16 +1758,23 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
     guint8 fractionlost;
     gint32 packetslost;
+    RTPSource *src;
 
     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
 
     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
 
-    if (ssrc == sess->source->ssrc) {
+    /* find our own source */
+    src = find_source (sess, ssrc);
+    if (src == NULL)
+      continue;
+
+    if (src->internal) {
       /* only deal with report blocks for our session, we update the stats of
        * the sender of the RTCP message. We could also compare our stats against
        * the other sender to see if we are better or worse. */
+      /* FIXME, need to keep track who the RB block is from */
       rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
           packetslost, exthighestseq, jitter, lsr, dlsr);
     }
@@ -2146,19 +2136,17 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
   if (!src && sender_ssrc == 1) {
     GHashTableIter iter;
 
-    if (sess->stats.sender_sources >
-        RTP_SOURCE_IS_SENDER (sess->source) ? 2 : 1)
+    /* we can't find the source if there are multiple */
+    if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
       return;
 
     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
-
     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
-      if (src != sess->source && rtp_source_is_sender (src))
+      if (!src->internal && rtp_source_is_sender (src))
         break;
       src = NULL;
     }
   }
-
   if (!src)
     return;
 
@@ -2283,8 +2271,11 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
       ntpnstime);
 
+#if 0
+  /* FIXME, simply ignore RTCP for iternal sources with BYE */
   if (sess->source->sent_bye)
     goto ignore;
+#endif
 
   /* start processing the compound packet */
   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
@@ -2369,6 +2360,7 @@ invalid_packet:
     gst_buffer_unref (buffer);
     return GST_FLOW_OK;
   }
+#if 0
 ignore:
   {
     RTP_SESSION_UNLOCK (sess);
@@ -2377,6 +2369,7 @@ ignore:
     GST_DEBUG ("ignoring RTCP packet because we left");
     return GST_FLOW_OK;
   }
+#endif
 }
 
 /**
@@ -2399,12 +2392,18 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
 
   s = gst_caps_get_structure (caps, 0);
 
-  if (gst_structure_get_uint (s, "ssrc", &ssrc))
-    rtp_session_set_internal_ssrc (sess, ssrc);
+  if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
+    RTPSource *source;
+    gboolean created;
 
-  RTP_SESSION_LOCK (sess);
-  rtp_source_update_caps (sess->source, caps);
-  RTP_SESSION_UNLOCK (sess);
+    RTP_SESSION_LOCK (sess);
+    source = obtain_internal_source (sess, ssrc, &created);
+    if (source) {
+      rtp_source_update_caps (source, caps);
+      g_object_unref (source);
+    }
+    RTP_SESSION_UNLOCK (sess);
+  }
 }
 
 /**
@@ -2428,14 +2427,36 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
   RTPSource *source;
   gboolean prevsender;
   guint64 oldrate;
+  GstBuffer *buffer;
+  GstRTPBuffer rtp = { NULL };
+  guint32 ssrc;
+  gboolean created;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
 
   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
 
+  if (is_list) {
+    GstBufferList *list = GST_BUFFER_LIST_CAST (data);
+
+    buffer = gst_buffer_list_get (list, 0);
+    if (!buffer)
+      goto no_buffer;
+  } else {
+    buffer = GST_BUFFER_CAST (data);
+  }
+
+  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
+    goto invalid_packet;
+
+  /* get SSRC and look up in session database */
+  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+
+  gst_rtp_buffer_unmap (&rtp);
+
   RTP_SESSION_LOCK (sess);
-  source = sess->source;
+  source = obtain_internal_source (sess, ssrc, &created);
 
   /* update last activity */
   source->last_rtp_activity = current_time;
@@ -2454,7 +2475,22 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
     sess->recalc_bandwidth = TRUE;
   RTP_SESSION_UNLOCK (sess);
 
+  g_object_unref (source);
+
   return result;
+
+invalid_packet:
+  {
+    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    GST_DEBUG ("invalid RTP packet received");
+    return GST_FLOW_OK;
+  }
+no_buffer:
+  {
+    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    GST_DEBUG ("no buffer in list");
+    return GST_FLOW_OK;
+  }
 }
 
 static void
@@ -2640,10 +2676,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
   }
 
   if (sess->scheduled_bye) {
-    if (sess->source->sent_bye) {
-      GST_DEBUG ("we sent BYE already");
-      interval = GST_CLOCK_TIME_NONE;
-    } else if (sess->stats.active_sources >= 50) {
+    if (sess->stats.active_sources >= 50) {
       GST_DEBUG ("reconsider BYE, more than 50 sources");
       /* reconsider BYE if members >= 50 */
       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
index 2ac3388..341bcbd 100644 (file)
@@ -198,7 +198,6 @@ struct _RTPSession {
   guint        rtcp_rr_bandwidth;
   guint        rtcp_rs_bandwidth;
 
-  RTPSource    *source;
   guint32       suggested_ssrc;
 
   /* for sender/receiver counting */