rtsp-stream: Always unref return value of gst_object_get_parent()
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
index 6c7cacf..60442ad 100644 (file)
@@ -55,6 +55,8 @@
 #include <gst/app/gstappsrc.h>
 #include <gst/app/gstappsink.h>
 
+#include <gst/rtp/gstrtpbuffer.h>
+
 #include "rtsp-stream.h"
 
 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
@@ -105,10 +107,12 @@ struct _GstRTSPStreamPrivate
    * sockets */
   GstElement *udpsrc_v6[2];
 
+  GstElement *udpqueue[2];
   GstElement *udpsink[2];
 
   /* for TCP transport */
   GstElement *appsrc[2];
+  GstClockTime appsrc_base_time[2];
   GstElement *appqueue[2];
   GstElement *appsink[2];
 
@@ -1385,8 +1389,8 @@ gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
 }
 
 /**
- * gst_rtsp_media_get_retransmission_time:
- * @media: a #GstRTSPMedia
+ * gst_rtsp_stream_get_retransmission_time:
+ * @stream: a #GstRTSPStream
  *
  * Get the amount of time to store retransmission data.
  *
@@ -1406,6 +1410,13 @@ gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
   return ret;
 }
 
+/**
+ * gst_rtsp_stream_set_retransmission_pt:
+ * @stream: a #GstRTSPStream
+ * @rtx_pt: a #guint
+ *
+ * Set the payload type (pt) for retransmission of this stream.
+ */
 void
 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
 {
@@ -1427,6 +1438,14 @@ gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
   g_mutex_unlock (&stream->priv->lock);
 }
 
+/**
+ * gst_rtsp_stream_get_retransmission_pt:
+ * @stream: a #GstRTSPStream
+ *
+ * Get the payload-type used for retransmission of this stream
+ *
+ * Returns: The retransmission PT.
+ */
 guint
 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
 {
@@ -1441,6 +1460,46 @@ gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
   return rtx_pt;
 }
 
+/**
+ * gst_rtsp_stream_set_buffer_size:
+ * @stream: a #GstRTSPStream
+ * @size: the buffer size
+ *
+ * Set the size of the UDP transmission buffer (in bytes)
+ * Needs to be set before the stream is joined to a bin.
+ *
+ * Since: 1.6
+ */
+void
+gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
+{
+  g_mutex_lock (&stream->priv->lock);
+  stream->priv->buffer_size = size;
+  g_mutex_unlock (&stream->priv->lock);
+}
+
+/**
+ * gst_rtsp_stream_get_buffer_size:
+ * @stream: a #GstRTSPStream
+ *
+ * Get the size of the UDP transmission buffer (in bytes)
+ *
+ * Returns: the size of the UDP TX buffer
+ *
+ * Since: 1.6
+ */
+guint
+gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
+{
+  guint buffer_size;
+
+  g_mutex_lock (&stream->priv->lock);
+  buffer_size = stream->priv->buffer_size;
+  g_mutex_unlock (&stream->priv->lock);
+
+  return buffer_size;
+}
+
 /* executed from streaming thread */
 static void
 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
@@ -1624,6 +1683,38 @@ on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
 }
 
 static void
+on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
+{
+  GST_INFO ("%p: new sender source %p", stream, source);
+#ifndef DUMP_STATS
+  {
+    GstStructure *stats;
+    g_object_get (source, "stats", &stats, NULL);
+    if (stats) {
+      dump_structure (stats);
+      gst_structure_free (stats);
+    }
+  }
+#endif
+}
+
+static void
+on_sender_ssrc_active (GObject * session, GObject * source,
+    GstRTSPStream * stream)
+{
+#ifndef DUMP_STATS
+  {
+    GstStructure *stats;
+    g_object_get (source, "stats", &stats, NULL);
+    if (stats) {
+      dump_structure (stats);
+      gst_structure_free (stats);
+    }
+  }
+#endif
+}
+
+static void
 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
 {
   if (is_rtp) {
@@ -1813,8 +1904,19 @@ request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
   return gst_object_ref (priv->srtpdec);
 }
 
-static GstElement *
-request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
+/**
+ * gst_rtsp_stream_request_aux_sender:
+ * @stream: a #GstRTSPStream
+ * @sessid: the session id
+ *
+ * Creating a rtxsend bin
+ *
+ * Returns: (transfer full): a #GstElement.
+ *
+ * Since: 1.6
+ */
+GstElement *
+gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
 {
   GstElement *bin;
   GstPad *pad;
@@ -1823,6 +1925,8 @@ request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
   guint pt, rtx_pt;
   gchar *pt_s;
 
+  g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
+
   pt = gst_rtsp_stream_get_pt (stream);
   pt_s = g_strdup_printf ("%u", pt);
   rtx_pt = stream->priv->rtx_pt;
@@ -2008,11 +2112,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
         (GCallback) request_rtp_rtcp_decoder, stream);
   }
 
-  if (priv->rtx_time > 0 && priv->srcpad) {
-    /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
-    g_signal_connect (rtpbin, "request-aux-sender",
-        (GCallback) request_aux_sender, stream);
-  }
   if (priv->sinkpad) {
     g_signal_connect (rtpbin, "request-pt-map",
         (GCallback) request_pt_map, stream);
@@ -2068,18 +2167,24 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
       stream);
 
+  /* signal for sender ssrc */
+  g_signal_connect (priv->session, "on-new-sender-ssrc",
+      (GCallback) on_new_sender_ssrc, stream);
+  g_signal_connect (priv->session, "on-sender-ssrc-active",
+      (GCallback) on_sender_ssrc_active, stream);
+
   for (i = 0; i < 2; i++) {
     GstPad *teepad, *queuepad;
     /* For the sender we create this bit of pipeline for both
      * RTP and RTCP. Sync and preroll are enabled on udpsink so
-     * we need to add a queue before appsink to make the pipeline
-     * not block. For the TCP case, we want to pump data to the
-     * client as fast as possible anyway.
+     * we need to add a queue before appsink and udpsink to make
+     * the pipeline not block. For the TCP case, we want to pump
+     * data to the client as fast as possible.
      *
-     * .--------.      .-----.    .---------.
-     * | rtpbin |      | tee |    | udpsink |
-     * |       send->sink   src->sink       |
-     * '--------'      |     |    '---------'
+     * .--------.      .-----.    .---------.    .---------.
+     * | rtpbin |      | tee |    |  queue  |    | udpsink |
+     * |       send->sink   src->sink      src->sink       |
+     * '--------'      |     |    '---------'    '---------'
      *                 |     |    .---------.    .---------.
      *                 |     |    |  queue  |    | appsink |
      *                 |    src->sink      src->sink       |
@@ -2102,13 +2207,26 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
       gst_pad_link (priv->send_src[i], pad);
       gst_object_unref (pad);
 
-      /* link tee to udpsink */
+      priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
+      g_object_set (priv->udpqueue[i], "max-size-buffers",
+          1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
+      gst_bin_add (bin, priv->udpqueue[i]);
+      /* link tee to udpqueue */
       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
-      gst_pad_link (teepad, sinkpad);
+      pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
+      gst_pad_link (teepad, pad);
+      gst_object_unref (pad);
       gst_object_unref (teepad);
 
+      /* link udpqueue to udpsink */
+      queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
+      gst_pad_link (queuepad, sinkpad);
+      gst_object_unref (queuepad);
+
       /* make queue */
       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
+      g_object_set (priv->appqueue[i], "max-size-buffers",
+          1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
       gst_bin_add (bin, priv->appqueue[i]);
       /* and link to tee */
       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
@@ -2193,6 +2311,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
       /* make and add appsrc */
       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
+      priv->appsrc_base_time[i] = -1;
       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
       gst_bin_add (bin, priv->appsrc[i]);
       /* and link to the funnel */
@@ -2211,6 +2330,8 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
         gst_element_set_state (priv->appsink[i], state);
       if (priv->appqueue[i])
         gst_element_set_state (priv->appqueue[i], state);
+      if (priv->udpqueue[i])
+        gst_element_set_state (priv->udpqueue[i], state);
       if (priv->tee[i])
         gst_element_set_state (priv->tee[i], state);
       if (priv->funnel[i])
@@ -2307,6 +2428,8 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
     if (priv->appqueue[i])
       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
+    if (priv->udpqueue[i])
+      gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
     if (priv->tee[i])
       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
     if (priv->funnel[i])
@@ -2346,6 +2469,8 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       gst_bin_remove (bin, priv->appsink[i]);
     if (priv->appqueue[i])
       gst_bin_remove (bin, priv->appqueue[i]);
+    if (priv->udpqueue[i])
+      gst_bin_remove (bin, priv->udpqueue[i]);
     if (priv->tee[i])
       gst_bin_remove (bin, priv->tee[i]);
     if (priv->funnel[i])
@@ -2361,6 +2486,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
     priv->appsrc[i] = NULL;
     priv->appsink[i] = NULL;
     priv->appqueue[i] = NULL;
+    priv->udpqueue[i] = NULL;
     priv->tee[i] = NULL;
     priv->funnel[i] = NULL;
   }
@@ -2438,6 +2564,62 @@ gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
 
   g_mutex_lock (&priv->lock);
 
+  /* First try to extract the information from the last buffer on the sinks.
+   * This will have a more accurate sequence number and timestamp, as between
+   * the payloader and the sink there can be some queues
+   */
+  if (priv->udpsink[0] || priv->appsink[0]) {
+    GstSample *last_sample;
+
+    if (priv->udpsink[0])
+      g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
+    else
+      g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
+
+    if (last_sample) {
+      GstCaps *caps;
+      GstBuffer *buffer;
+      GstSegment *segment;
+      GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
+
+      caps = gst_sample_get_caps (last_sample);
+      buffer = gst_sample_get_buffer (last_sample);
+      segment = gst_sample_get_segment (last_sample);
+
+      if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
+        if (seq) {
+          *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
+        }
+
+        if (rtptime) {
+          *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
+        }
+
+        gst_rtp_buffer_unmap (&rtp_buffer);
+
+        if (running_time) {
+          *running_time =
+              gst_segment_to_running_time (segment, GST_FORMAT_TIME,
+              GST_BUFFER_TIMESTAMP (buffer));
+        }
+
+        if (clock_rate) {
+          GstStructure *s = gst_caps_get_structure (caps, 0);
+
+          gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
+
+          if (*clock_rate == 0 && running_time)
+            *running_time = GST_CLOCK_TIME_NONE;
+        }
+        gst_sample_unref (last_sample);
+
+        goto done;
+      } else {
+        gst_sample_unref (last_sample);
+      }
+    }
+  }
+
   if (g_object_class_find_property (payobjclass, "stats")) {
     g_object_get (priv->payloader, "stats", &stats, NULL);
     if (stats == NULL)
@@ -2472,6 +2654,8 @@ gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
     if (running_time)
       *running_time = GST_CLOCK_TIME_NONE;
   }
+
+done:
   g_mutex_unlock (&priv->lock);
 
   return TRUE;
@@ -2544,6 +2728,31 @@ gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
   g_mutex_unlock (&priv->lock);
 
   if (element) {
+    if (priv->appsrc_base_time[0] == -1) {
+      /* Take current running_time. This timestamp will be put on
+       * the first buffer of each stream because we are a live source and so we
+       * timestamp with the running_time. When we are dealing with TCP, we also
+       * only timestamp the first buffer (using the DISCONT flag) because a server
+       * typically bursts data, for which we don't want to compensate by speeding
+       * up the media. The other timestamps will be interpollated from this one
+       * using the RTP timestamps. */
+      GST_OBJECT_LOCK (element);
+      if (GST_ELEMENT_CLOCK (element)) {
+        GstClockTime now;
+        GstClockTime base_time;
+
+        now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
+        base_time = GST_ELEMENT_CAST (element)->base_time;
+
+        priv->appsrc_base_time[0] = now - base_time;
+        GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
+        GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
+            ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
+            GST_TIME_ARGS (base_time));
+      }
+      GST_OBJECT_UNLOCK (element);
+    }
+
     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
     gst_object_unref (element);
   } else {
@@ -2587,6 +2796,31 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
   g_mutex_unlock (&priv->lock);
 
   if (element) {
+    if (priv->appsrc_base_time[1] == -1) {
+      /* Take current running_time. This timestamp will be put on
+       * the first buffer of each stream because we are a live source and so we
+       * timestamp with the running_time. When we are dealing with TCP, we also
+       * only timestamp the first buffer (using the DISCONT flag) because a server
+       * typically bursts data, for which we don't want to compensate by speeding
+       * up the media. The other timestamps will be interpollated from this one
+       * using the RTP timestamps. */
+      GST_OBJECT_LOCK (element);
+      if (GST_ELEMENT_CLOCK (element)) {
+        GstClockTime now;
+        GstClockTime base_time;
+
+        now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
+        base_time = GST_ELEMENT_CAST (element)->base_time;
+
+        priv->appsrc_base_time[1] = now - base_time;
+        GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
+        GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
+            ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
+            GST_TIME_ARGS (base_time));
+      }
+      GST_OBJECT_UNLOCK (element);
+    }
+
     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
     gst_object_unref (element);
   } else {
@@ -2647,7 +2881,6 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
           gst_object_unref (pad);
           gst_object_unref (selpad);
         }
-        gst_object_unref (bin);
 
         priv->transport_sources =
             g_list_prepend (priv->transport_sources, source);
@@ -2683,6 +2916,8 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
         }
       }
 
+      gst_object_unref (bin);
+
       /* fall through for the generic case */
     }
     case GST_RTSP_LOWER_TRANS_UDP: