rtsp-media: Add support for sending+receiving medias
authorSebastian Dröge <sebastian@centricular.com>
Thu, 12 Oct 2017 18:00:16 +0000 (21:00 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 16 Feb 2018 09:04:53 +0000 (11:04 +0200)
We need to add an appsrc/appsink in that case because otherwise the
media bin will be a sink and a source for rtpbin, causing a pipeline
loop.

https://bugzilla.gnome.org/show_bug.cgi?id=788950

gst/rtsp-server/rtsp-media.c

index 500e8b9..433cc8e 100644 (file)
@@ -119,6 +119,7 @@ struct _GstRTSPMediaPrivate
   GSource *source;
   guint id;
   GstRTSPThread *thread;
+  GList *pending_pipeline_elements;
 
   gboolean time_provider;
   GstNetTimeProvider *nettime;
@@ -462,6 +463,7 @@ gst_rtsp_media_finalize (GObject * obj)
   g_ptr_array_unref (priv->streams);
 
   g_list_free_full (priv->dynamic, gst_object_unref);
+  g_list_free_full (priv->pending_pipeline_elements, gst_object_unref);
 
   if (priv->pipeline)
     gst_object_unref (priv->pipeline);
@@ -867,6 +869,7 @@ gst_rtsp_media_take_pipeline (GstRTSPMedia * media, GstPipeline * pipeline)
   GstRTSPMediaPrivate *priv;
   GstElement *old;
   GstNetTimeProvider *nettime;
+  GList *l;
 
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
   g_return_if_fail (GST_IS_PIPELINE (pipeline));
@@ -887,6 +890,12 @@ gst_rtsp_media_take_pipeline (GstRTSPMedia * media, GstPipeline * pipeline)
     gst_object_unref (nettime);
 
   gst_bin_add (GST_BIN_CAST (pipeline), priv->element);
+
+  for (l = priv->pending_pipeline_elements; l; l = l->next) {
+    gst_bin_add (GST_BIN_CAST (pipeline), l->data);
+  }
+  g_list_free (priv->pending_pipeline_elements);
+  priv->pending_pipeline_elements = NULL;
 }
 
 /**
@@ -1852,6 +1861,78 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
   }
 }
 
+typedef struct
+{
+  GstElement *appsink, *appsrc;
+  GstRTSPStream *stream;
+} AppSinkSrcData;
+
+static GstFlowReturn
+appsink_new_sample (GstAppSink * appsink, gpointer user_data)
+{
+  AppSinkSrcData *data = user_data;
+  GstSample *sample;
+  GstFlowReturn ret;
+
+  sample = gst_app_sink_pull_sample (appsink);
+  if (!sample)
+    return GST_FLOW_FLUSHING;
+
+
+  ret = gst_app_src_push_sample (GST_APP_SRC (data->appsrc), sample);
+  gst_sample_unref (sample);
+  return ret;
+}
+
+static GstAppSinkCallbacks appsink_callbacks = {
+  NULL,
+  NULL,
+  appsink_new_sample,
+};
+
+static GstPadProbeReturn
+appsink_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  AppSinkSrcData *data = user_data;
+
+  if (GST_IS_EVENT (info->data)
+      && GST_EVENT_TYPE (info->data) == GST_EVENT_LATENCY) {
+    GstClockTime min, max;
+
+    if (gst_base_sink_query_latency (GST_BASE_SINK (data->appsink), NULL, NULL,
+            &min, &max)) {
+      g_object_set (data->appsrc, "min-latency", min, "max-latency", max, NULL);
+      g_print ("setting latency to %lu %lu\n", min, max);
+    }
+  } else if (GST_IS_QUERY (info->data)) {
+    GstPad *srcpad = gst_element_get_static_pad (data->appsrc, "src");
+    if (gst_pad_peer_query (srcpad, GST_QUERY_CAST (info->data))) {
+      gst_object_unref (srcpad);
+      return GST_PAD_PROBE_HANDLED;
+    }
+    gst_object_unref (srcpad);
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
+static GstPadProbeReturn
+appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  AppSinkSrcData *data = user_data;
+
+  if (GST_IS_QUERY (info->data)) {
+    GstPad *sinkpad = gst_element_get_static_pad (data->appsink, "sink");
+    if (gst_pad_peer_query (sinkpad, GST_QUERY_CAST (info->data))) {
+      gst_object_unref (sinkpad);
+      return GST_PAD_PROBE_HANDLED;
+    }
+    gst_object_unref (sinkpad);
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
 /**
  * gst_rtsp_media_create_stream:
  * @media: a #GstRTSPMedia
@@ -1870,9 +1951,10 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
 {
   GstRTSPMediaPrivate *priv;
   GstRTSPStream *stream;
-  GstPad *ghostpad;
+  GstPad *streampad;
   gchar *name;
   gint idx;
+  AppSinkSrcData *data = NULL;
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
@@ -1890,12 +1972,71 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
   else
     name = g_strdup_printf ("sink_%u", idx);
 
-  ghostpad = gst_ghost_pad_new (name, pad);
-  gst_pad_set_active (ghostpad, TRUE);
-  gst_element_add_pad (priv->element, ghostpad);
+  if ((GST_PAD_IS_SRC (pad) && priv->element->numsinkpads > 0) ||
+      (GST_PAD_IS_SINK (pad) && priv->element->numsrcpads > 0)) {
+    GstElement *appsink, *appsrc;
+    GstPad *sinkpad, *srcpad;
+
+    appsink = gst_element_factory_make ("appsink", NULL);
+    appsrc = gst_element_factory_make ("appsrc", NULL);
+
+    if (GST_PAD_IS_SINK (pad)) {
+      srcpad = gst_element_get_static_pad (appsrc, "src");
+
+      gst_bin_add (GST_BIN (priv->element), appsrc);
+
+      gst_pad_link (srcpad, pad);
+      gst_object_unref (srcpad);
+
+      streampad = gst_element_get_static_pad (appsink, "sink");
+
+      priv->pending_pipeline_elements =
+          g_list_prepend (priv->pending_pipeline_elements, appsink);
+    } else {
+      sinkpad = gst_element_get_static_pad (appsink, "sink");
+
+      gst_pad_link (pad, sinkpad);
+      gst_object_unref (sinkpad);
+
+      streampad = gst_element_get_static_pad (appsrc, "src");
+
+      priv->pending_pipeline_elements =
+          g_list_prepend (priv->pending_pipeline_elements, appsrc);
+    }
+
+    g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
+        TRUE, NULL);
+    g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
+
+    data = g_new0 (AppSinkSrcData, 1);
+    data->appsink = appsink;
+    data->appsrc = appsrc;
+
+    sinkpad = gst_element_get_static_pad (appsink, "sink");
+    gst_pad_add_probe (sinkpad,
+        GST_PAD_PROBE_TYPE_EVENT_UPSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
+        appsink_pad_probe, data, NULL);
+    gst_object_unref (sinkpad);
+
+    srcpad = gst_element_get_static_pad (appsrc, "src");
+    gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
+        appsrc_pad_probe, data, NULL);
+    gst_object_unref (srcpad);
+
+    gst_app_sink_set_callbacks (GST_APP_SINK (appsink), &appsink_callbacks,
+        data, NULL);
+    g_object_set_data_full (G_OBJECT (streampad), "media-appsink-appsrc", data,
+        g_free);
+  } else {
+    streampad = gst_ghost_pad_new (name, pad);
+    gst_pad_set_active (streampad, TRUE);
+    gst_element_add_pad (priv->element, streampad);
+  }
   g_free (name);
 
-  stream = gst_rtsp_stream_new (idx, payloader, ghostpad);
+  stream = gst_rtsp_stream_new (idx, payloader, streampad);
+  if (data)
+    data->stream = stream;
   if (priv->pool)
     gst_rtsp_stream_set_address_pool (stream, priv->pool);
   gst_rtsp_stream_set_multicast_iface (stream, priv->multicast_iface);
@@ -1943,13 +2084,28 @@ gst_rtsp_media_remove_stream (GstRTSPMedia * media, GstRTSPStream * stream)
 {
   GstRTSPMediaPrivate *priv;
   GstPad *srcpad;
+  AppSinkSrcData *data;
 
   priv = media->priv;
 
   g_mutex_lock (&priv->lock);
   /* remove the ghostpad */
   srcpad = gst_rtsp_stream_get_srcpad (stream);
-  gst_element_remove_pad (priv->element, srcpad);
+  data = g_object_get_data (G_OBJECT (srcpad), "media-appsink-appsrc");
+  if (data) {
+    if (GST_OBJECT_PARENT (data->appsrc) == GST_OBJECT_CAST (priv->pipeline))
+      gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsrc);
+    else if (GST_OBJECT_PARENT (data->appsrc) ==
+        GST_OBJECT_CAST (priv->element))
+      gst_bin_remove (GST_BIN_CAST (priv->element), data->appsrc);
+    if (GST_OBJECT_PARENT (data->appsink) == GST_OBJECT_CAST (priv->pipeline))
+      gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsink);
+    else if (GST_OBJECT_PARENT (data->appsink) ==
+        GST_OBJECT_CAST (priv->element))
+      gst_bin_remove (GST_BIN_CAST (priv->element), data->appsink);
+  } else {
+    gst_element_remove_pad (priv->element, srcpad);
+  }
   gst_object_unref (srcpad);
   /* now remove the stream */
   g_object_ref (stream);