Get payloader stats only for the sending streams
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
index 0a9898b..578c385 100644 (file)
@@ -112,14 +112,14 @@ struct _GstRTSPMediaPrivate
   GstRTSPMediaStatus status;    /* protected by lock */
   gint prepare_count;
   gint n_active;
-  gboolean adding;
+  gboolean complete;
 
   /* the pipeline for the media */
   GstElement *pipeline;
-  GstElement *fakesink;         /* protected by lock */
   GSource *source;
   guint id;
   GstRTSPThread *thread;
+  GList *pending_pipeline_elements;
 
   gboolean time_provider;
   GstNetTimeProvider *nettime;
@@ -139,6 +139,7 @@ struct _GstRTSPMediaPrivate
 
   GList *payloads;              /* protected by lock */
   GstClockTime rtx_time;        /* protected by lock */
+  gboolean do_retransmission;   /* protected by lock */
   guint latency;                /* protected by lock */
   GstClock *clock;              /* protected by lock */
   GstRTSPPublishClockMode publish_clock_mode;
@@ -161,6 +162,8 @@ struct _GstRTSPMediaPrivate
 #define DEFAULT_TRANSPORT_MODE  GST_RTSP_TRANSPORT_MODE_PLAY
 #define DEFAULT_STOP_ON_DISCONNECT TRUE
 
+#define DEFAULT_DO_RETRANSMISSION FALSE
+
 /* define to dump received RTCP packets */
 #undef DUMP_STATS
 
@@ -226,6 +229,8 @@ static GstElement *find_payload_element (GstElement * payloader);
 
 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
 
+static gboolean check_complete (GstRTSPMedia * media);
+
 #define C_ENUM(v) ((gint) v)
 
 GType
@@ -444,6 +449,7 @@ gst_rtsp_media_init (GstRTSPMedia * media)
   priv->transport_mode = DEFAULT_TRANSPORT_MODE;
   priv->stop_on_disconnect = DEFAULT_STOP_ON_DISCONNECT;
   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
+  priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
 }
 
 static void
@@ -463,6 +469,7 @@ gst_rtsp_media_finalize (GObject * obj)
   g_ptr_array_unref (priv->streams);
 
   g_list_free_full (priv->dynamic, gst_object_unref);
+  g_list_free_full (priv->pending_pipeline_elements, gst_object_unref);
 
   if (priv->pipeline)
     gst_object_unref (priv->pipeline);
@@ -588,6 +595,7 @@ gst_rtsp_media_set_property (GObject * object, guint propid,
 typedef struct
 {
   gint64 position;
+  gboolean complete_streams_only;
   gboolean ret;
 } DoQueryPositionData;
 
@@ -596,6 +604,14 @@ do_query_position (GstRTSPStream * stream, DoQueryPositionData * data)
 {
   gint64 tmp;
 
+  if (!gst_rtsp_stream_is_sender (stream))
+    return;
+
+  if (data->complete_streams_only && !gst_rtsp_stream_is_complete (stream)) {
+    GST_DEBUG_OBJECT (stream, "stream not complete, do not query position");
+    return;
+  }
+
   if (gst_rtsp_stream_query_position (stream, &tmp)) {
     data->position = MIN (data->position, tmp);
     data->ret = TRUE;
@@ -616,6 +632,15 @@ default_query_position (GstRTSPMedia * media, gint64 * position)
   data.position = G_MAXINT64;
   data.ret = FALSE;
 
+  /* if the media is complete, i.e. one or more streams have been configured
+   * with sinks, then we want to query the position on those streams only.
+   * a query on an incmplete stream may return a position that originates from
+   * an earlier preroll */
+  if (check_complete (media))
+    data.complete_streams_only = TRUE;
+  else
+    data.complete_streams_only = FALSE;
+
   g_ptr_array_foreach (priv->streams, (GFunc) do_query_position, &data);
 
   if (!data.ret)
@@ -671,6 +696,25 @@ default_create_rtpbin (GstRTSPMedia * media)
   return rtpbin;
 }
 
+static gboolean
+is_receive_only (GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv = media->priv;
+  gboolean recive_only = TRUE;
+  guint i;
+
+  for (i = 0; i < priv->streams->len; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+    if (gst_rtsp_stream_is_sender (stream) ||
+        !gst_rtsp_stream_is_receiver (stream)) {
+      recive_only = FALSE;
+      break;
+    }
+  }
+
+  return recive_only;
+}
+
 /* must be called with state lock */
 static void
 check_seekable (GstRTSPMedia * media)
@@ -679,8 +723,8 @@ check_seekable (GstRTSPMedia * media)
   GstRTSPMediaPrivate *priv = media->priv;
 
   /* Update the seekable state of the pipeline in case it changed */
-  if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) {
-    /* TODO: Seeking for RECORD? */
+  if (is_receive_only (media)) {
+    /* TODO: Seeking for "receive-only"? */
     priv->seekable = -1;
   } else {
     guint i, n = priv->streams->len;
@@ -703,9 +747,21 @@ check_seekable (GstRTSPMedia * media)
     gint64 start, end;
 
     gst_query_parse_seeking (query, &format, &seekable, &start, &end);
-    priv->seekable = seekable ? G_MAXINT64 : 0.0;
+    priv->seekable = seekable ? G_MAXINT64 : 0;
+  } else if (priv->streams->len) {
+    gboolean seekable = TRUE;
+    guint i, n = priv->streams->len;
+
+    GST_DEBUG_OBJECT (media, "Checking %d streams", n);
+    for (i = 0; i < n; i++) {
+      GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+      seekable &= gst_rtsp_stream_seekable (stream);
+    }
+    priv->seekable = seekable ? G_MAXINT64 : -1;
   }
 
+  GST_DEBUG_OBJECT (media, "seekable:%" G_GINT64_FORMAT, priv->seekable);
+
   gst_query_unref (query);
 }
 
@@ -856,6 +912,7 @@ gst_rtsp_media_take_pipeline (GstRTSPMedia * media, GstPipeline * pipeline)
   GstRTSPMediaPrivate *priv;
   GstElement *old;
   GstNetTimeProvider *nettime;
+  GList *l;
 
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
   g_return_if_fail (GST_IS_PIPELINE (pipeline));
@@ -876,12 +933,18 @@ gst_rtsp_media_take_pipeline (GstRTSPMedia * media, GstPipeline * pipeline)
     gst_object_unref (nettime);
 
   gst_bin_add (GST_BIN_CAST (pipeline), priv->element);
+
+  for (l = priv->pending_pipeline_elements; l; l = l->next) {
+    gst_bin_add (GST_BIN_CAST (pipeline), l->data);
+  }
+  g_list_free (priv->pending_pipeline_elements);
+  priv->pending_pipeline_elements = NULL;
 }
 
 /**
  * gst_rtsp_media_set_permissions:
  * @media: a #GstRTSPMedia
- * @permissions: (transfer none): a #GstRTSPPermissions
+ * @permissions: (transfer none) (nullable): a #GstRTSPPermissions
  *
  * Set @permissions on @media.
  */
@@ -909,7 +972,7 @@ gst_rtsp_media_set_permissions (GstRTSPMedia * media,
  *
  * Get the permissions object from @media.
  *
- * Returns: (transfer full): a #GstRTSPPermissions object, unref after usage.
+ * Returns: (transfer full) (nullable): a #GstRTSPPermissions object, unref after usage.
  */
 GstRTSPPermissions *
 gst_rtsp_media_get_permissions (GstRTSPMedia * media)
@@ -1369,9 +1432,6 @@ gst_rtsp_media_set_retransmission_time (GstRTSPMedia * media, GstClockTime time)
 
     gst_rtsp_stream_set_retransmission_time (stream, time);
   }
-
-  if (priv->rtpbin)
-    g_object_set (priv->rtpbin, "do-retransmission", time > 0, NULL);
   g_mutex_unlock (&priv->lock);
 }
 
@@ -1401,6 +1461,55 @@ gst_rtsp_media_get_retransmission_time (GstRTSPMedia * media)
 }
 
 /**
+ * gst_rtsp_media_set_do_retransmission:
+ *
+ * Set whether retransmission requests will be sent
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_media_set_do_retransmission (GstRTSPMedia * media,
+    gboolean do_retransmission)
+{
+  GstRTSPMediaPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+  priv = media->priv;
+
+  g_mutex_lock (&priv->lock);
+  priv->do_retransmission = do_retransmission;
+
+  if (priv->rtpbin)
+    g_object_set (priv->rtpbin, "do-retransmission", do_retransmission, NULL);
+  g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_do_retransmission:
+ *
+ * Returns: Whether retransmission requests will be sent
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_media_get_do_retransmission (GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv;
+  gboolean res;
+
+  g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0);
+
+  priv = media->priv;
+
+  g_mutex_lock (&priv->lock);
+  res = priv->do_retransmission;
+  g_mutex_unlock (&priv->lock);
+
+  return res;
+}
+
+/**
  * gst_rtsp_media_set_latency:
  * @media: a #GstRTSPMedia
  * @latency: latency in milliseconds
@@ -1411,6 +1520,7 @@ void
 gst_rtsp_media_set_latency (GstRTSPMedia * media, guint latency)
 {
   GstRTSPMediaPrivate *priv;
+  guint i;
 
   g_return_if_fail (GST_IS_RTSP_MEDIA (media));
 
@@ -1420,8 +1530,19 @@ gst_rtsp_media_set_latency (GstRTSPMedia * media, guint latency)
 
   g_mutex_lock (&priv->lock);
   priv->latency = latency;
-  if (priv->rtpbin)
+  if (priv->rtpbin) {
     g_object_set (priv->rtpbin, "latency", latency, NULL);
+
+    for (i = 0; i < media->priv->streams->len; i++) {
+      GObject *storage = NULL;
+
+      g_signal_emit_by_name (G_OBJECT (media->priv->rtpbin), "get-storage",
+          i, &storage);
+      if (storage)
+        g_object_set (storage, "size-time", (media->priv->latency + 50) * GST_MSECOND, NULL);
+    }
+  }
+
   g_mutex_unlock (&priv->lock);
 }
 
@@ -1501,7 +1622,7 @@ gst_rtsp_media_is_time_provider (GstRTSPMedia * media)
 /**
  * gst_rtsp_media_set_clock:
  * @media: a #GstRTSPMedia
- * @clock: #GstClock to be used
+ * @clock: (nullable): #GstClock to be used
  *
  * Configure the clock used for the media.
  */
@@ -1587,7 +1708,7 @@ gst_rtsp_media_get_publish_clock_mode (GstRTSPMedia * media)
 /**
  * gst_rtsp_media_set_address_pool:
  * @media: a #GstRTSPMedia
- * @pool: (transfer none): a #GstRTSPAddressPool
+ * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
  *
  * configure @pool to be used as the address pool of @media.
  */
@@ -1623,8 +1744,8 @@ gst_rtsp_media_set_address_pool (GstRTSPMedia * media,
  *
  * Get the #GstRTSPAddressPool used as the address pool of @media.
  *
- * Returns: (transfer full): the #GstRTSPAddressPool of @media. g_object_unref() after
- * usage.
+ * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @media.
+ * g_object_unref() after usage.
  */
 GstRTSPAddressPool *
 gst_rtsp_media_get_address_pool (GstRTSPMedia * media)
@@ -1647,7 +1768,7 @@ gst_rtsp_media_get_address_pool (GstRTSPMedia * media)
 /**
  * gst_rtsp_media_set_multicast_iface:
  * @media: a #GstRTSPMedia
- * @multicast_iface: (transfer none): a multicast interface name
+ * @multicast_iface: (transfer none) (nullable): a multicast interface name
  *
  * configure @multicast_iface to be used for @media.
  */
@@ -1683,8 +1804,8 @@ gst_rtsp_media_set_multicast_iface (GstRTSPMedia * media,
  *
  * Get the multicast interface used for @media.
  *
- * Returns: (transfer full): the multicast interface for @media. g_free() after
- * usage.
+ * Returns: (transfer full) (nullable): the multicast interface for @media.
+ * g_free() after usage.
  */
 gchar *
 gst_rtsp_media_get_multicast_iface (GstRTSPMedia * media)
@@ -1808,6 +1929,8 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
       priv->dynamic = g_list_prepend (priv->dynamic, elem);
       g_mutex_unlock (&priv->lock);
 
+      priv->nb_dynamic_elements++;
+
       have_elem = TRUE;
       more_elem_remaining = TRUE;
       mode |= GST_RTSP_TRANSPORT_MODE_PLAY;
@@ -1825,8 +1948,6 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
       gst_object_unref (pad);
       gst_object_unref (elem);
 
-      priv->nb_dynamic_elements++;
-
       have_elem = TRUE;
       more_elem_remaining = TRUE;
       mode |= GST_RTSP_TRANSPORT_MODE_RECORD;
@@ -1841,6 +1962,79 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
   }
 }
 
+typedef struct
+{
+  GstElement *appsink, *appsrc;
+  GstRTSPStream *stream;
+} AppSinkSrcData;
+
+static GstFlowReturn
+appsink_new_sample (GstAppSink * appsink, gpointer user_data)
+{
+  AppSinkSrcData *data = user_data;
+  GstSample *sample;
+  GstFlowReturn ret;
+
+  sample = gst_app_sink_pull_sample (appsink);
+  if (!sample)
+    return GST_FLOW_FLUSHING;
+
+
+  ret = gst_app_src_push_sample (GST_APP_SRC (data->appsrc), sample);
+  gst_sample_unref (sample);
+  return ret;
+}
+
+static GstAppSinkCallbacks appsink_callbacks = {
+  NULL,
+  NULL,
+  appsink_new_sample,
+};
+
+static GstPadProbeReturn
+appsink_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  AppSinkSrcData *data = user_data;
+
+  if (GST_IS_EVENT (info->data)
+      && GST_EVENT_TYPE (info->data) == GST_EVENT_LATENCY) {
+    GstClockTime min, max;
+
+    if (gst_base_sink_query_latency (GST_BASE_SINK (data->appsink), NULL, NULL,
+            &min, &max)) {
+      g_object_set (data->appsrc, "min-latency", min, "max-latency", max, NULL);
+      GST_DEBUG ("setting latency to min %" GST_TIME_FORMAT " max %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
+    }
+  } else if (GST_IS_QUERY (info->data)) {
+    GstPad *srcpad = gst_element_get_static_pad (data->appsrc, "src");
+    if (gst_pad_peer_query (srcpad, GST_QUERY_CAST (info->data))) {
+      gst_object_unref (srcpad);
+      return GST_PAD_PROBE_HANDLED;
+    }
+    gst_object_unref (srcpad);
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
+static GstPadProbeReturn
+appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  AppSinkSrcData *data = user_data;
+
+  if (GST_IS_QUERY (info->data)) {
+    GstPad *sinkpad = gst_element_get_static_pad (data->appsink, "sink");
+    if (gst_pad_peer_query (sinkpad, GST_QUERY_CAST (info->data))) {
+      gst_object_unref (sinkpad);
+      return GST_PAD_PROBE_HANDLED;
+    }
+    gst_object_unref (sinkpad);
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
 /**
  * gst_rtsp_media_create_stream:
  * @media: a #GstRTSPMedia
@@ -1859,9 +2053,10 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
 {
   GstRTSPMediaPrivate *priv;
   GstRTSPStream *stream;
-  GstPad *ghostpad;
+  GstPad *streampad;
   gchar *name;
   gint idx;
+  AppSinkSrcData *data = NULL;
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
@@ -1879,12 +2074,71 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
   else
     name = g_strdup_printf ("sink_%u", idx);
 
-  ghostpad = gst_ghost_pad_new (name, pad);
-  gst_pad_set_active (ghostpad, TRUE);
-  gst_element_add_pad (priv->element, ghostpad);
+  if ((GST_PAD_IS_SRC (pad) && priv->element->numsinkpads > 0) ||
+      (GST_PAD_IS_SINK (pad) && priv->element->numsrcpads > 0)) {
+    GstElement *appsink, *appsrc;
+    GstPad *sinkpad, *srcpad;
+
+    appsink = gst_element_factory_make ("appsink", NULL);
+    appsrc = gst_element_factory_make ("appsrc", NULL);
+
+    if (GST_PAD_IS_SINK (pad)) {
+      srcpad = gst_element_get_static_pad (appsrc, "src");
+
+      gst_bin_add (GST_BIN (priv->element), appsrc);
+
+      gst_pad_link (srcpad, pad);
+      gst_object_unref (srcpad);
+
+      streampad = gst_element_get_static_pad (appsink, "sink");
+
+      priv->pending_pipeline_elements =
+          g_list_prepend (priv->pending_pipeline_elements, appsink);
+    } else {
+      sinkpad = gst_element_get_static_pad (appsink, "sink");
+
+      gst_pad_link (pad, sinkpad);
+      gst_object_unref (sinkpad);
+
+      streampad = gst_element_get_static_pad (appsrc, "src");
+
+      priv->pending_pipeline_elements =
+          g_list_prepend (priv->pending_pipeline_elements, appsrc);
+    }
+
+    g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
+        TRUE, NULL);
+    g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
+
+    data = g_new0 (AppSinkSrcData, 1);
+    data->appsink = appsink;
+    data->appsrc = appsrc;
+
+    sinkpad = gst_element_get_static_pad (appsink, "sink");
+    gst_pad_add_probe (sinkpad,
+        GST_PAD_PROBE_TYPE_EVENT_UPSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
+        appsink_pad_probe, data, NULL);
+    gst_object_unref (sinkpad);
+
+    srcpad = gst_element_get_static_pad (appsrc, "src");
+    gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
+        appsrc_pad_probe, data, NULL);
+    gst_object_unref (srcpad);
+
+    gst_app_sink_set_callbacks (GST_APP_SINK (appsink), &appsink_callbacks,
+        data, NULL);
+    g_object_set_data_full (G_OBJECT (streampad), "media-appsink-appsrc", data,
+        g_free);
+  } else {
+    streampad = gst_ghost_pad_new (name, pad);
+    gst_pad_set_active (streampad, TRUE);
+    gst_element_add_pad (priv->element, streampad);
+  }
   g_free (name);
 
-  stream = gst_rtsp_stream_new (idx, payloader, ghostpad);
+  stream = gst_rtsp_stream_new (idx, payloader, streampad);
+  if (data)
+    data->stream = stream;
   if (priv->pool)
     gst_rtsp_stream_set_address_pool (stream, priv->pool);
   gst_rtsp_stream_set_multicast_iface (stream, priv->multicast_iface);
@@ -1932,13 +2186,28 @@ gst_rtsp_media_remove_stream (GstRTSPMedia * media, GstRTSPStream * stream)
 {
   GstRTSPMediaPrivate *priv;
   GstPad *srcpad;
+  AppSinkSrcData *data;
 
   priv = media->priv;
 
   g_mutex_lock (&priv->lock);
   /* remove the ghostpad */
   srcpad = gst_rtsp_stream_get_srcpad (stream);
-  gst_element_remove_pad (priv->element, srcpad);
+  data = g_object_get_data (G_OBJECT (srcpad), "media-appsink-appsrc");
+  if (data) {
+    if (GST_OBJECT_PARENT (data->appsrc) == GST_OBJECT_CAST (priv->pipeline))
+      gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsrc);
+    else if (GST_OBJECT_PARENT (data->appsrc) ==
+        GST_OBJECT_CAST (priv->element))
+      gst_bin_remove (GST_BIN_CAST (priv->element), data->appsrc);
+    if (GST_OBJECT_PARENT (data->appsink) == GST_OBJECT_CAST (priv->pipeline))
+      gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsink);
+    else if (GST_OBJECT_PARENT (data->appsink) ==
+        GST_OBJECT_CAST (priv->element))
+      gst_bin_remove (GST_BIN_CAST (priv->element), data->appsink);
+  } else {
+    gst_element_remove_pad (priv->element, srcpad);
+  }
   gst_object_unref (srcpad);
   /* now remove the stream */
   g_object_ref (stream);
@@ -2063,7 +2332,7 @@ default_convert_range (GstRTSPMedia * media, GstRTSPTimeRange * range,
  * Get the current range as a string. @media must be prepared with
  * gst_rtsp_media_prepare ().
  *
- * Returns: (transfer full): The range as a string, g_free() after usage.
+ * Returns: (transfer full) (nullable): The range as a string, g_free() after usage.
  */
 gchar *
 gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play,
@@ -2473,8 +2742,8 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
       GST_DEBUG ("%p: went from %s to %s (pending %s)", media,
           gst_element_state_get_name (old), gst_element_state_get_name (new),
           gst_element_state_get_name (pending));
-      if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)
-          && old == GST_STATE_READY && new == GST_STATE_PAUSED) {
+      if (priv->no_more_pads_pending == 0 && is_receive_only (media) &&
+          old == GST_STATE_READY && new == GST_STATE_PAUSED) {
         GST_INFO ("%p: went to PAUSED, prepared now", media);
         collect_media_stats (media);
 
@@ -2555,8 +2824,9 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
       s = gst_message_get_structure (message);
       if (gst_structure_has_name (s, "GstRTSPStreamBlocking")) {
         GST_DEBUG ("media received blocking message");
-        if (priv->blocked && media_streams_blocking (media)) {
-          GST_DEBUG ("media is blocking");
+        if (priv->blocked && media_streams_blocking (media) &&
+            priv->no_more_pads_pending == 0) {
+          GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message), "media is blocking");
           collect_media_stats (media);
 
           if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING)
@@ -2568,13 +2838,11 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
     case GST_MESSAGE_STREAM_STATUS:
       break;
     case GST_MESSAGE_ASYNC_DONE:
-      if (priv->adding) {
-        /* when we are dynamically adding pads, the addition of the udpsrc will
-         * temporarily produce ASYNC_DONE messages. We have to ignore them and
-         * wait for the final ASYNC_DONE after everything prerolled */
-        GST_INFO ("%p: ignoring ASYNC_DONE", media);
-      } else {
-        GST_INFO ("%p: got ASYNC_DONE", media);
+      if (priv->complete) {
+        /* receive the final ASYNC_DONE, that is posted by the media pipeline
+         * after all the transport parts have been successfully added to
+         * the media streams. */
+        GST_DEBUG_OBJECT (media, "got async-done");
         if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING)
           gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
       }
@@ -2677,11 +2945,6 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
 
   g_object_set_data (G_OBJECT (pad), "gst-rtsp-dynpad-stream", stream);
 
-  /* we will be adding elements below that will cause ASYNC_DONE to be
-   * posted in the bus. We want to ignore those messages until the
-   * pipeline really prerolled. */
-  priv->adding = TRUE;
-
   /* join the element in the PAUSED state because this callback is
    * called from the streaming thread and it is PAUSED */
   if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
@@ -2692,7 +2955,6 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
   if (priv->blocked)
     gst_rtsp_stream_set_blocked (stream, TRUE);
 
-  priv->adding = FALSE;
   g_rec_mutex_unlock (&priv->state_lock);
 
   return;
@@ -2727,37 +2989,14 @@ pad_removed_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
 }
 
 static void
-remove_fakesink (GstRTSPMediaPrivate * priv)
-{
-  GstElement *fakesink;
-
-  g_mutex_lock (&priv->lock);
-  if ((fakesink = priv->fakesink))
-    gst_object_ref (fakesink);
-  priv->fakesink = NULL;
-  g_mutex_unlock (&priv->lock);
-
-  if (fakesink) {
-    gst_bin_remove (GST_BIN (priv->pipeline), fakesink);
-    gst_element_set_state (fakesink, GST_STATE_NULL);
-    gst_object_unref (fakesink);
-    GST_INFO ("removed fakesink");
-  }
-}
-
-static void
 no_more_pads_cb (GstElement * element, GstRTSPMedia * media)
 {
   GstRTSPMediaPrivate *priv = media->priv;
-  gboolean remaining_dynamic;
 
   GST_INFO_OBJECT (element, "no more pads");
   g_mutex_lock (&priv->lock);
   priv->no_more_pads_pending--;
-  remaining_dynamic = priv->no_more_pads_pending;
   g_mutex_unlock (&priv->lock);
-  if (remaining_dynamic == 0)
-    remove_fakesink (priv);
 }
 
 typedef struct _DynPaySignalHandlers DynPaySignalHandlers;
@@ -2840,6 +3079,32 @@ request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
   GstRTSPMediaPrivate *priv = media->priv;
   GstRTSPStream *stream = NULL;
   guint i;
+  GstElement *res = NULL;
+
+  g_mutex_lock (&priv->lock);
+  for (i = 0; i < priv->streams->len; i++) {
+    stream = g_ptr_array_index (priv->streams, i);
+
+    if (sessid == gst_rtsp_stream_get_index (stream))
+      break;
+
+    stream = NULL;
+  }
+  g_mutex_unlock (&priv->lock);
+
+  if (stream)
+    res = gst_rtsp_stream_request_aux_sender (stream, sessid);
+
+  return res;
+}
+
+static GstElement *
+request_aux_receiver (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv = media->priv;
+  GstRTSPStream *stream = NULL;
+  guint i;
+  GstElement *res = NULL;
 
   g_mutex_lock (&priv->lock);
   for (i = 0; i < priv->streams->len; i++) {
@@ -2847,10 +3112,47 @@ request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
 
     if (sessid == gst_rtsp_stream_get_index (stream))
       break;
+
+    stream = NULL;
   }
   g_mutex_unlock (&priv->lock);
 
-  return gst_rtsp_stream_request_aux_sender (stream, sessid);
+  if (stream)
+    res = gst_rtsp_stream_request_aux_receiver (stream, sessid);
+
+  return res;
+}
+
+static GstElement *
+request_fec_decoder (GstElement * rtpbin, guint sessid, GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv = media->priv;
+  GstRTSPStream *stream = NULL;
+  guint i;
+  GstElement *res = NULL;
+
+  g_mutex_lock (&priv->lock);
+  for (i = 0; i < priv->streams->len; i++) {
+    stream = g_ptr_array_index (priv->streams, i);
+
+    if (sessid == gst_rtsp_stream_get_index (stream))
+      break;
+
+    stream = NULL;
+  }
+  g_mutex_unlock (&priv->lock);
+
+  if (stream) {
+    res = gst_rtsp_stream_request_ulpfec_decoder (stream, rtpbin, sessid);
+  }
+
+  return res;
+}
+
+static void
+new_storage_cb (GstElement * rtpbin, GObject * storage, guint sessid, GstRTSPMedia * media)
+{
+  g_object_set (storage, "size-time", (media->priv->latency + 50) * GST_MSECOND, NULL);
 }
 
 static gboolean
@@ -2864,6 +3166,9 @@ start_prepare (GstRTSPMedia * media)
   if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARING)
     goto no_longer_preparing;
 
+  g_signal_connect (priv->rtpbin, "new-storage", G_CALLBACK (new_storage_cb), media);
+  g_signal_connect (priv->rtpbin, "request-fec-decoder", G_CALLBACK (request_fec_decoder), media);
+
   /* link streams we already have, other streams might appear when we have
    * dynamic elements */
   for (i = 0; i < priv->streams->len; i++) {
@@ -2877,6 +3182,11 @@ start_prepare (GstRTSPMedia * media)
           (GCallback) request_aux_sender, media);
     }
 
+    if (priv->do_retransmission) {
+      g_signal_connect (priv->rtpbin, "request-aux-receiver",
+          (GCallback) request_aux_receiver, media);
+    }
+
     if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
             priv->rtpbin, GST_STATE_NULL)) {
       goto join_bin_failed;
@@ -2884,7 +3194,8 @@ start_prepare (GstRTSPMedia * media)
   }
 
   if (priv->rtpbin)
-    g_object_set (priv->rtpbin, "do-retransmission", priv->rtx_time > 0, NULL);
+    g_object_set (priv->rtpbin, "do-retransmission", priv->do_retransmission,
+        "do-lost", TRUE, NULL);
 
   for (walk = priv->dynamic; walk; walk = g_list_next (walk)) {
     GstElement *elem = walk->data;
@@ -2900,17 +3211,16 @@ start_prepare (GstRTSPMedia * media)
         (GCallback) no_more_pads_cb, media);
 
     g_object_set_data (G_OBJECT (elem), "gst-rtsp-dynpay-handlers", handlers);
-
-    if (!priv->fakesink) {
-      /* we add a fakesink here in order to make the state change async. We remove
-       * the fakesink again in the no-more-pads callback. */
-      priv->fakesink = gst_element_factory_make ("fakesink", "fakesink");
-      gst_bin_add (GST_BIN (priv->pipeline), priv->fakesink);
-    }
   }
 
-  if (!start_preroll (media))
+  if (priv->nb_dynamic_elements == 0 && is_receive_only (media)) {
+    /* If we are receive_only (RECORD), do not try to preroll, to avoid
+     * a second ASYNC state change failing */
+    priv->is_live = TRUE;
+    gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
+  } else if (!start_preroll (media)) {
     goto preroll_failed;
+  }
 
   g_rec_mutex_unlock (&priv->state_lock);
 
@@ -3161,8 +3471,6 @@ finish_unprepare (GstRTSPMedia * media)
   if (priv->status != GST_RTSP_MEDIA_STATUS_UNPREPARING)
     return;
 
-  remove_fakesink (priv);
-
   for (i = 0; i < priv->streams->len; i++) {
     GstRTSPStream *stream;
 
@@ -3317,7 +3625,7 @@ get_clock_unlocked (GstRTSPMedia * media)
  *
  * @media must be prepared before this method returns a valid clock object.
  *
- * Returns: (transfer full): the #GstClock used by @media. unref after usage.
+ * Returns: (transfer full) (nullable): the #GstClock used by @media. unref after usage.
  */
 GstClock *
 gst_rtsp_media_get_clock (GstRTSPMedia * media)
@@ -3548,6 +3856,9 @@ default_handle_sdp (GstRTSPMedia * media, GstSDPMessage * sdp)
       s = gst_caps_get_structure (caps, 0);
       gst_structure_set_name (s, "application/x-rtp");
 
+      if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
+        gst_structure_set (s, "is-fec", G_TYPE_BOOLEAN, TRUE, NULL);
+
       gst_rtsp_stream_set_pt_map (stream, pt, caps);
       gst_caps_unref (caps);
     }
@@ -3609,8 +3920,11 @@ static void
 do_set_seqnum (GstRTSPStream * stream)
 {
   guint16 seq_num;
-  seq_num = gst_rtsp_stream_get_current_seqnum (stream);
-  gst_rtsp_stream_set_seqnum_offset (stream, seq_num + 1);
+
+  if (gst_rtsp_stream_is_sender (stream)) {
+    seq_num = gst_rtsp_stream_get_current_seqnum (stream);
+    gst_rtsp_stream_set_seqnum_offset (stream, seq_num + 1);
+  }
 }
 
 /* call with state_lock */
@@ -3723,13 +4037,18 @@ default_unsuspend (GstRTSPMedia * media)
 
   switch (priv->suspend_mode) {
     case GST_RTSP_SUSPEND_MODE_NONE:
-      if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD))
+      if (is_receive_only (media))
         break;
-      gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
-      /* at this point the media pipeline has been updated and contain all
-       * specific transport parts: all active streams contain at least one sink
-       * element and it's safe to unblock any blocked streams that are active */
-      media_unblock_linked (media);
+      if (media_streams_blocking (media)) {
+        gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
+        /* at this point the media pipeline has been updated and contain all
+         * specific transport parts: all active streams contain at least one sink
+         * element and it's safe to unblock any blocked streams that are active */
+        media_unblock_linked (media);
+      } else {
+        /* streams are not blocked and media is suspended from PAUSED */
+        gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
+      }
       g_rec_mutex_unlock (&priv->state_lock);
       if (gst_rtsp_media_get_status (media) == GST_RTSP_MEDIA_STATUS_ERROR) {
         g_rec_mutex_lock (&priv->state_lock);
@@ -4053,7 +4372,7 @@ gst_rtsp_media_get_transport_mode (GstRTSPMedia * media)
 }
 
 /**
- * gst_rtsp_media_get_seekbale:
+ * gst_rtsp_media_get_seekable:
  * @media: a #GstRTSPMedia
  *
  * Check if the pipeline for @media seek and up to what point in time,
@@ -4061,22 +4380,31 @@ gst_rtsp_media_get_transport_mode (GstRTSPMedia * media)
  *
  * Returns: -1 if the stream is not seekable, 0 if seekable only to the beginning
  * and > 0 to indicate the longest duration between any two random access points.
- * G_MAXINT64 means any value is possible.
+ * %G_MAXINT64 means any value is possible.
  */
 GstClockTimeDiff
 gst_rtsp_media_seekable (GstRTSPMedia * media)
 {
+  GstRTSPMediaPrivate *priv;
+  GstClockTimeDiff res;
+
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
 
+  priv = media->priv;
+
   /* Currently we are not able to seek on live streams,
    * and no stream is seekable only to the beginning */
-  return media->priv->seekable;
+  g_mutex_lock (&priv->lock);
+  res = priv->seekable;
+  g_mutex_unlock (&priv->lock);
+
+  return res;
 }
 
 /**
  * gst_rtsp_media_complete_pipeline:
  * @media: a #GstRTSPMedia
- * @transports: a list of #GstRTSPTransport
+ * @transports: (element-type GstRTSPTransport): a list of #GstRTSPTransport
  *
  * Add a receiver and sender parts to the pipeline based on the transport from
  * SETUP.
@@ -4117,6 +4445,8 @@ gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports)
       return FALSE;
     }
   }
+
+  priv->complete = TRUE;
   g_mutex_unlock (&priv->lock);
 
   return TRUE;