rtspsrc: improve RTP session handling
authorWim Taymans <wim.taymans@collabora.co.uk>
Thu, 23 Dec 2010 14:24:29 +0000 (15:24 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Thu, 23 Dec 2010 14:24:29 +0000 (15:24 +0100)
Store the RTP session in the stream so that we can more efficiently
perform actions on the stream based on RTP signals.

gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h

index a21a14a..5b2e84d 100644 (file)
@@ -1126,6 +1126,10 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream)
     gst_object_unref (stream->rtcppad);
     stream->rtcppad = NULL;
   }
+  if (stream->session) {
+    g_object_unref (stream->session);
+    stream->session = NULL;
+  }
   g_free (stream);
 }
 
@@ -1143,14 +1147,14 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src)
   }
   g_list_free (src->streams);
   src->streams = NULL;
-  if (src->session) {
-    if (src->session_sig_id) {
-      g_signal_handler_disconnect (src->session, src->session_sig_id);
-      src->session_sig_id = 0;
+  if (src->manager) {
+    if (src->manager_sig_id) {
+      g_signal_handler_disconnect (src->manager, src->manager_sig_id);
+      src->manager_sig_id = 0;
     }
-    gst_element_set_state (src->session, GST_STATE_NULL);
-    gst_bin_remove (GST_BIN_CAST (src), src->session);
-    src->session = NULL;
+    gst_element_set_state (src->manager, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN_CAST (src), src->manager);
+    src->manager = NULL;
   }
   src->numstreams = 0;
   if (src->props)
@@ -1672,8 +1676,8 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush)
   if (base_time != -1)
     gst_element_set_base_time (GST_ELEMENT_CAST (src), base_time);
   /* to manage jitterbuffer buffer mode */
-  if (src->session)
-    gst_element_set_base_time (GST_ELEMENT_CAST (src->session), base_time);
+  if (src->manager)
+    gst_element_set_base_time (GST_ELEMENT_CAST (src->manager), base_time);
 }
 
 static GstRTSPResult
@@ -2138,7 +2142,7 @@ was_ok:
 /* this callback is called when the session manager generated a new src pad with
  * payloaded RTP packets. We simply ghost the pad here. */
 static void
-new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src)
+new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src)
 {
   gchar *name;
   GstPadTemplate *template;
@@ -2147,7 +2151,7 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src)
   GstRTSPStream *stream;
   gboolean all_added;
 
-  GST_DEBUG_OBJECT (src, "got new session pad %" GST_PTR_FORMAT, pad);
+  GST_DEBUG_OBJECT (src, "got new manager pad %" GST_PTR_FORMAT, pad);
 
   GST_RTSP_STATE_LOCK (src);
   /* find stream */
@@ -2210,7 +2214,7 @@ unknown_stream:
 }
 
 static GstCaps *
-request_pt_map (GstElement * sess, guint session, guint pt, GstRTSPSrc * src)
+request_pt_map (GstElement * manager, guint session, guint pt, GstRTSPSrc * src)
 {
   GstRTSPStream *stream;
   GstCaps *caps;
@@ -2238,17 +2242,12 @@ unknown_stream:
 }
 
 static void
-gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session)
+gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, GstRTSPStream * stream)
 {
-  GstRTSPStream *stream;
+  guint session = stream->id;
 
   GST_DEBUG_OBJECT (src, "setting stream for session %u to EOS", session);
 
-  /* get stream for session */
-  stream = find_stream (src, &session, (gpointer) find_stream_by_id);
-  if (!stream)
-    goto unknown_stream;
-
   if (stream->eos)
     goto was_eos;
 
@@ -2257,11 +2256,6 @@ gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session)
   return;
 
   /* ERRORS */
-unknown_stream:
-  {
-    GST_DEBUG_OBJECT (src, "unknown stream for session %u", session);
-    return;
-  }
 was_eos:
   {
     GST_DEBUG_OBJECT (src, "stream for session %u was already EOS", session);
@@ -2270,30 +2264,41 @@ was_eos:
 }
 
 static void
-on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
-    GstRTSPSrc * src)
+on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
 {
-  GST_DEBUG_OBJECT (src, "SSRC %08x in session %u received BYE", ssrc, session);
+  GstRTSPSrc *src = stream->parent;
+
+  GST_DEBUG_OBJECT (src, "source in session %u received BYE", stream->id);
 
-  gst_rtspsrc_do_stream_eos (src, session);
+  gst_rtspsrc_do_stream_eos (src, stream);
 }
 
 static void
-on_timeout (GstElement * manager, guint session, guint32 ssrc, GstRTSPSrc * src)
+on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
 {
-  GST_DEBUG_OBJECT (src, "SSRC %08x in session %u timed out", ssrc, session);
+  GstRTSPSrc *src = stream->parent;
 
-  gst_rtspsrc_do_stream_eos (src, session);
+  GST_DEBUG_OBJECT (src, "source in session %u timed out", stream->id);
+
+  gst_rtspsrc_do_stream_eos (src, stream);
+}
+
+static void
+on_npt_stop (GObject * session, GObject * source, GstRTSPStream * stream)
+{
+  GstRTSPSrc *src = stream->parent;
+
+  GST_DEBUG_OBJECT (src, "source in session %u reached NPT stop", stream->id);
+
+  gst_rtspsrc_do_stream_eos (src, stream);
 }
 
 static void
-on_npt_stop (GstElement * manager, guint session, guint32 ssrc,
-    GstRTSPSrc * src)
+on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
 {
-  GST_DEBUG_OBJECT (src, "SSRC %08x in session %u reached the NPT stop", ssrc,
-      session);
+  GstRTSPSrc *src = stream->parent;
 
-  gst_rtspsrc_do_stream_eos (src, session);
+  GST_DEBUG_OBJECT (src, "source in session %u is active", stream->id);
 }
 
 /* try to get and configure a manager */
@@ -2313,11 +2318,11 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
     GST_DEBUG_OBJECT (src, "using manager %s", manager);
 
     /* configure the manager */
-    if (src->session == NULL) {
+    if (src->manager == NULL) {
       GObjectClass *klass;
       GstState target;
 
-      if (!(src->session = gst_element_factory_make (manager, NULL))) {
+      if (!(src->manager = gst_element_factory_make (manager, NULL))) {
         /* fallback */
         if (gst_rtsp_transport_get_manager (transport->trans, &manager, 1) < 0)
           goto no_manager;
@@ -2325,27 +2330,27 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
         if (!manager)
           goto use_no_manager;
 
-        if (!(src->session = gst_element_factory_make (manager, NULL)))
+        if (!(src->manager = gst_element_factory_make (manager, NULL)))
           goto manager_failed;
       }
 
       /* we manage this element */
-      gst_bin_add (GST_BIN_CAST (src), src->session);
+      gst_bin_add (GST_BIN_CAST (src), src->manager);
 
       GST_OBJECT_LOCK (src);
       target = GST_STATE_TARGET (src);
       GST_OBJECT_UNLOCK (src);
 
-      ret = gst_element_set_state (src->session, target);
+      ret = gst_element_set_state (src->manager, target);
       if (ret == GST_STATE_CHANGE_FAILURE)
-        goto start_session_failure;
+        goto start_manager_failure;
 
-      g_object_set (src->session, "latency", src->latency, NULL);
+      g_object_set (src->manager, "latency", src->latency, NULL);
 
-      klass = G_OBJECT_GET_CLASS (G_OBJECT (src->session));
+      klass = G_OBJECT_GET_CLASS (G_OBJECT (src->manager));
       if (g_object_class_find_property (klass, "buffer-mode")) {
         if (src->buffer_mode != BUFFER_MODE_AUTO) {
-          g_object_set (src->session, "buffer-mode", src->buffer_mode, NULL);
+          g_object_set (src->manager, "buffer-mode", src->buffer_mode, NULL);
         } else {
           gboolean need_slave;
           GstStructure *s;
@@ -2368,11 +2373,11 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
           if (GST_CLOCK_TIME_IS_VALID (src->segment.duration) &&
               src->segment.duration && !need_slave) {
             GST_DEBUG_OBJECT (src, "selected buffer");
-            g_object_set (src->session, "buffer-mode", BUFFER_MODE_BUFFER,
+            g_object_set (src->manager, "buffer-mode", BUFFER_MODE_BUFFER,
                 NULL);
           } else {
             GST_DEBUG_OBJECT (src, "selected slave");
-            g_object_set (src->session, "buffer-mode", BUFFER_MODE_SLAVE, NULL);
+            g_object_set (src->manager, "buffer-mode", BUFFER_MODE_SLAVE, NULL);
           }
         }
       }
@@ -2380,46 +2385,35 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
       /* connect to signals if we did not already do so */
       GST_DEBUG_OBJECT (src, "connect to signals on session manager, stream %p",
           stream);
-      src->session_sig_id =
-          g_signal_connect (src->session, "pad-added",
-          (GCallback) new_session_pad, src);
-      src->session_ptmap_id =
-          g_signal_connect (src->session, "request-pt-map",
+      src->manager_sig_id =
+          g_signal_connect (src->manager, "pad-added",
+          (GCallback) new_manager_pad, src);
+      src->manager_ptmap_id =
+          g_signal_connect (src->manager, "request-pt-map",
           (GCallback) request_pt_map, src);
-      g_signal_connect (src->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
-          src);
-      g_signal_connect (src->session, "on-bye-timeout", (GCallback) on_timeout,
-          src);
-      g_signal_connect (src->session, "on-timeout", (GCallback) on_timeout,
-          src);
-      /* FIXME: remove this once the rdtmanager is released */
-      if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (src->session)) != 0) {
-        g_signal_connect (src->session, "on-npt-stop", (GCallback) on_npt_stop,
-            src);
-      } else {
-        GST_INFO_OBJECT (src, "skipping on-npt-stop handling, not implemented");
-      }
     }
 
     /* we stream directly to the manager, get some pads. Each RTSP stream goes
      * into a separate RTP session. */
     name = g_strdup_printf ("recv_rtp_sink_%d", stream->id);
-    stream->channelpad[0] = gst_element_get_request_pad (src->session, name);
+    stream->channelpad[0] = gst_element_get_request_pad (src->manager, name);
     g_free (name);
     name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id);
-    stream->channelpad[1] = gst_element_get_request_pad (src->session, name);
+    stream->channelpad[1] = gst_element_get_request_pad (src->manager, name);
     g_free (name);
 
-    /* now configure the bandwidth in the session */
+    /* now configure the bandwidth in the manager */
     if (g_signal_lookup ("get-internal-session",
-            G_OBJECT_TYPE (src->session)) != 0) {
+            G_OBJECT_TYPE (src->manager)) != 0) {
       GObject *rtpsession;
 
-      g_signal_emit_by_name (src->session, "get-internal-session", stream->id,
+      g_signal_emit_by_name (src->manager, "get-internal-session", stream->id,
           &rtpsession);
       if (rtpsession) {
         GST_INFO_OBJECT (src, "configure bandwidth in session %p", rtpsession);
 
+        stream->session = rtpsession;
+
         if (stream->as_bandwidth != -1) {
           GST_INFO_OBJECT (src, "setting AS: %f",
               (gdouble) (stream->as_bandwidth * 1000));
@@ -2436,7 +2430,16 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
           g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth,
               NULL);
         }
-        g_object_unref (rtpsession);
+        g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc,
+            stream);
+        g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout,
+            stream);
+        g_signal_connect (rtpsession, "on-timeout", (GCallback) on_timeout,
+            stream);
+        g_signal_connect (rtpsession, "on-npt-stop", (GCallback) on_npt_stop,
+            stream);
+        g_signal_connect (rtpsession, "on-ssrc-active",
+            (GCallback) on_ssrc_active, stream);
       }
     }
   }
@@ -2455,9 +2458,9 @@ manager_failed:
     GST_DEBUG_OBJECT (src, "no session manager element %s found", manager);
     return FALSE;
   }
-start_session_failure:
+start_manager_failure:
   {
-    GST_DEBUG_OBJECT (src, "could not start session");
+    GST_DEBUG_OBJECT (src, "could not start session manager");
     return FALSE;
   }
 }
@@ -2545,7 +2548,7 @@ gst_rtspsrc_stream_configure_tcp (GstRTSPSrc * src, GstRTSPStream * stream,
     gst_object_unref (template);
   }
   /* setup RTCP transport back to the server if we have to. */
-  if (src->session && src->do_rtcp) {
+  if (src->manager && src->do_rtcp) {
     GstPad *pad;
 
     template = gst_static_pad_template_get (&anysinktemplate);
@@ -2557,7 +2560,7 @@ gst_rtspsrc_stream_configure_tcp (GstRTSPSrc * src, GstRTSPStream * stream,
 
     /* get session RTCP pad */
     name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
-    pad = gst_element_get_request_pad (src->session, name);
+    pad = gst_element_get_request_pad (src->manager, name);
     g_free (name);
 
     /* and link */
@@ -2780,7 +2783,7 @@ gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src,
   do_rtp = (rtp_port != -1);
   /* it's possible that the server does not want us to send RTCP in which case
    * the port is -1 */
-  do_rtcp = (rtcp_port != -1 && src->session != NULL && src->do_rtcp);
+  do_rtcp = (rtcp_port != -1 && src->manager != NULL && src->do_rtcp);
 
   /* we need a destination when we have RTP or RTCP ports */
   if (destination == NULL && (do_rtp || do_rtcp))
@@ -2889,7 +2892,7 @@ gst_rtspsrc_stream_configure_udp_sinks (GstRTSPSrc * src,
 
     /* get session RTCP pad */
     name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
-    pad = gst_element_get_request_pad (src->session, name);
+    pad = gst_element_get_request_pad (src->manager, name);
     g_free (name);
 
     /* and link */
@@ -3066,7 +3069,7 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src)
     if (stream->srcpad) {
       /* if we don't have a session manager, set the caps now. If we have a
        * session, we will get a notification of the pad and the caps. */
-      if (!src->session) {
+      if (!src->manager) {
         GST_DEBUG_OBJECT (src, "setting pad caps for stream %p", stream);
         gst_pad_set_caps (stream->srcpad, stream->caps);
       }
@@ -3134,9 +3137,9 @@ gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment)
     }
     GST_DEBUG_OBJECT (src, "stream %p, caps %" GST_PTR_FORMAT, stream, caps);
   }
-  if (src->session) {
+  if (src->manager) {
     GST_DEBUG_OBJECT (src, "clear session");
-    g_signal_emit_by_name (src->session, "clear-pt-map", NULL);
+    g_signal_emit_by_name (src->manager, "clear-pt-map", NULL);
   }
 }
 
@@ -3592,7 +3595,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
     src->need_activate = FALSE;
   }
 
-  if (!src->session) {
+  if (!src->manager) {
     /* set stream caps on buffer when we don't have a session manager to do it
      * for us */
     gst_buffer_set_caps (buf, stream->caps);
index bdaff49..00861b6 100644 (file)
@@ -138,6 +138,9 @@ struct _GstRTSPStream {
   /* per stream connection */
   GstRTSPConnInfo  conninfo;
 
+  /* session */
+  GObject      *session;
+
   /* bandwidth */
   guint         as_bandwidth;
   guint         rs_bandwidth;
@@ -233,9 +236,9 @@ struct _GstRTSPSrc {
   gboolean           seekable;
 
   /* session management */
-  GstElement      *session;
-  gulong           session_sig_id;
-  gulong           session_ptmap_id;
+  GstElement      *manager;
+  gulong           manager_sig_id;
+  gulong           manager_ptmap_id;
 
   GstRTSPConnInfo  conninfo;