rtsp-media: Unblock all streams
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-media.c
index 15ab6a7..345559d 100644 (file)
@@ -50,7 +50,8 @@
  * #GstRTSPSession and #GstRTSPSessionMedia.
  *
  * The state of the media can be controlled with gst_rtsp_media_set_state ().
- * Seeking can be done with gst_rtsp_media_seek().
+ * Seeking can be done with gst_rtsp_media_seek(), or gst_rtsp_media_seek_full()
+ * or gst_rtsp_media_seek_trickmode() for finer control of the seek.
  *
  * With gst_rtsp_media_unprepare() the pipeline is stopped and shut down. When
  * gst_rtsp_media_set_eos_shutdown() an EOS will be sent to the pipeline to
@@ -115,6 +116,7 @@ struct _GstRTSPMediaPrivate
   gint prepare_count;
   gint n_active;
   gboolean complete;
+  gboolean finishing_unprepare;
 
   /* the pipeline for the media */
   GstElement *pipeline;
@@ -144,11 +146,13 @@ struct _GstRTSPMediaPrivate
   gboolean do_retransmission;   /* protected by lock */
   guint latency;                /* protected by lock */
   GstClock *clock;              /* protected by lock */
+  gboolean do_rate_control;     /* protected by lock */
   GstRTSPPublishClockMode publish_clock_mode;
 
   /* Dynamic element handling */
   guint nb_dynamic_elements;
   guint no_more_pads_pending;
+  gboolean expected_async_done;
 };
 
 #define DEFAULT_SHARED          FALSE
@@ -165,6 +169,7 @@ struct _GstRTSPMediaPrivate
 #define DEFAULT_STOP_ON_DISCONNECT TRUE
 #define DEFAULT_MAX_MCAST_TTL   255
 #define DEFAULT_BIND_MCAST_ADDRESS FALSE
+#define DEFAULT_DO_RATE_CONTROL TRUE
 
 #define DEFAULT_DO_RETRANSMISSION FALSE
 
@@ -231,7 +236,7 @@ static gboolean default_handle_sdp (GstRTSPMedia * media, GstSDPMessage * sdp);
 
 static gboolean wait_preroll (GstRTSPMedia * media);
 
-static GstElement *find_payload_element (GstElement * payloader);
+static GstElement *find_payload_element (GstElement * payloader, GstPad * pad);
 
 static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
 
@@ -239,6 +244,8 @@ static gboolean check_complete (GstRTSPMedia * media);
 
 #define C_ENUM(v) ((gint) v)
 
+#define TRICKMODE_FLAGS (GST_SEEK_FLAG_TRICKMODE | GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | GST_SEEK_FLAG_TRICKMODE_FORWARD_PREDICTED)
+
 GType
 gst_rtsp_suspend_mode_get_type (void)
 {
@@ -469,6 +476,8 @@ gst_rtsp_media_init (GstRTSPMedia * media)
   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
   priv->max_mcast_ttl = DEFAULT_MAX_MCAST_TTL;
   priv->bind_mcast_address = DEFAULT_BIND_MCAST_ADDRESS;
+  priv->do_rate_control = DEFAULT_DO_RATE_CONTROL;
+  priv->expected_async_done = FALSE;
 }
 
 static void
@@ -730,25 +739,6 @@ default_create_rtpbin (GstRTSPMedia * media)
   return rtpbin;
 }
 
-static gboolean
-is_receive_only (GstRTSPMedia * media)
-{
-  GstRTSPMediaPrivate *priv = media->priv;
-  gboolean recive_only = TRUE;
-  guint i;
-
-  for (i = 0; i < priv->streams->len; i++) {
-    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
-    if (gst_rtsp_stream_is_sender (stream) ||
-        !gst_rtsp_stream_is_receiver (stream)) {
-      recive_only = FALSE;
-      break;
-    }
-  }
-
-  return recive_only;
-}
-
 /* must be called with state lock */
 static void
 check_seekable (GstRTSPMedia * media)
@@ -757,7 +747,7 @@ check_seekable (GstRTSPMedia * media)
   GstRTSPMediaPrivate *priv = media->priv;
 
   /* Update the seekable state of the pipeline in case it changed */
-  if (is_receive_only (media)) {
+  if (gst_rtsp_media_is_receive_only (media)) {
     /* TODO: Seeking for "receive-only"? */
     priv->seekable = -1;
   } else {
@@ -1868,6 +1858,8 @@ gst_rtsp_media_get_multicast_iface (GstRTSPMedia * media)
  * Set the maximum time-to-live value of outgoing multicast packets.
  *
  * Returns: %TRUE if the requested ttl has been set successfully.
+ *
+ * Since: 1.16
  */
 gboolean
 gst_rtsp_media_set_max_mcast_ttl (GstRTSPMedia * media, guint ttl)
@@ -1906,6 +1898,8 @@ gst_rtsp_media_set_max_mcast_ttl (GstRTSPMedia * media, guint ttl)
  * Get the the maximum time-to-live value of outgoing multicast packets.
  *
  * Returns: the maximum time-to-live value of outgoing multicast packets.
+ *
+ * Since: 1.16
  */
 guint
 gst_rtsp_media_get_max_mcast_ttl (GstRTSPMedia * media)
@@ -1931,6 +1925,8 @@ gst_rtsp_media_get_max_mcast_ttl (GstRTSPMedia * media)
  *
  * Decide whether the multicast socket should be bound to a multicast address or
  * INADDR_ANY.
+ *
+ * Since: 1.16
  */
 void
 gst_rtsp_media_set_bind_mcast_address (GstRTSPMedia * media,
@@ -1959,6 +1955,8 @@ gst_rtsp_media_set_bind_mcast_address (GstRTSPMedia * media,
  * Check if multicast sockets are configured to be bound to multicast addresses.
  *
  * Returns: %TRUE if multicast sockets are configured to be bound to multicast addresses.
+ *
+ * Since: 1.16
  */
 gboolean
 gst_rtsp_media_is_bind_mcast_address (GstRTSPMedia * media)
@@ -2052,7 +2050,7 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
       pad = gst_element_get_static_pad (elem, "src");
 
       /* find the real payload element in case elem is a GstBin */
-      pay = find_payload_element (elem);
+      pay = find_payload_element (elem, pad);
 
       /* create the stream */
       if (pay == NULL) {
@@ -2219,7 +2217,8 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
   g_mutex_lock (&priv->lock);
   idx = priv->streams->len;
 
-  GST_DEBUG ("media %p: creating stream with index %d", media, idx);
+  GST_DEBUG ("media %p: creating stream with index %d and payloader %"
+      GST_PTR_FORMAT, media, idx, payloader);
 
   if (GST_PAD_IS_SRC (pad))
     name = g_strdup_printf ("src_%u", idx);
@@ -2259,8 +2258,9 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
     }
 
     g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
-        TRUE, NULL);
-    g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
+        TRUE, "emit-signals", FALSE, NULL);
+    g_object_set (appsink, "sync", FALSE, "async", FALSE, "emit-signals",
+        FALSE, "buffer-list", TRUE, NULL);
 
     data = g_new0 (AppSinkSrcData, 1);
     data->appsink = appsink;
@@ -2301,6 +2301,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
   gst_rtsp_stream_set_retransmission_time (stream, priv->rtx_time);
   gst_rtsp_stream_set_buffer_size (stream, priv->buffer_size);
   gst_rtsp_stream_set_publish_clock_mode (stream, priv->publish_clock_mode);
+  gst_rtsp_stream_set_rate_control (stream, priv->do_rate_control);
 
   g_ptr_array_add (priv->streams, stream);
 
@@ -2544,6 +2545,77 @@ conversion_failed:
   }
 }
 
+/**
+ * gst_rtsp_media_get_rates:
+ * @media: a #GstRTSPMedia
+ * @rate (allow-none): the rate of the current segment
+ * @applied_rate (allow-none): the applied_rate of the current segment
+ *
+ * Get the rate and applied_rate of the current segment.
+ *
+ * Returns: %FALSE if looking up the rate and applied rate failed. Otherwise
+ * %TRUE is returned and @rate and @applied_rate are set to the rate and
+ * applied_rate of the current segment.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_get_rates (GstRTSPMedia * media, gdouble * rate,
+    gdouble * applied_rate)
+{
+  GstRTSPMediaPrivate *priv;
+  GstRTSPStream *stream;
+  gdouble save_rate, save_applied_rate;
+  gboolean result = TRUE;
+  gboolean first_stream = TRUE;
+  gint i;
+
+  g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+  if (!rate && !applied_rate) {
+    GST_WARNING_OBJECT (media, "rate and applied_rate are both NULL");
+    return FALSE;
+  }
+
+  priv = media->priv;
+
+  g_mutex_lock (&priv->lock);
+
+  g_assert (priv->streams->len > 0);
+  for (i = 0; i < priv->streams->len; i++) {
+    stream = g_ptr_array_index (priv->streams, i);
+    if (gst_rtsp_stream_is_complete (stream)) {
+      if (gst_rtsp_stream_get_rates (stream, rate, applied_rate)) {
+        if (first_stream) {
+          save_rate = *rate;
+          save_applied_rate = *applied_rate;
+          first_stream = FALSE;
+        } else {
+          if (save_rate != *rate || save_applied_rate != *applied_rate) {
+            /* diffrent rate or applied_rate, weird */
+            g_assert (FALSE);
+            result = FALSE;
+            break;
+          }
+        }
+      } else {
+        /* complete stream withot rate and applied_rate, weird */
+        g_assert (FALSE);
+        result = FALSE;
+        break;
+      }
+    }
+  }
+
+  if (!result) {
+    GST_WARNING_OBJECT (media,
+        "failed to obtain consistent rate and applied_rate");
+  }
+
+  g_mutex_unlock (&priv->lock);
+
+  return result;
+}
+
 static void
 stream_update_blocked (GstRTSPStream * stream, GstRTSPMedia * media)
 {
@@ -2563,15 +2635,18 @@ media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked)
 static void
 stream_unblock (GstRTSPStream * stream, GstRTSPMedia * media)
 {
-  gst_rtsp_stream_unblock_linked (stream);
+  gst_rtsp_stream_set_blocked (stream, FALSE);
 }
 
 static void
-media_unblock_linked (GstRTSPMedia * media)
+media_unblock (GstRTSPMedia * media)
 {
   GstRTSPMediaPrivate *priv = media->priv;
 
-  GST_DEBUG ("media %p unblocking linked streams", media);
+  GST_DEBUG ("media %p unblocking streams", media);
+  /* media is not blocked any longer, as it contains active streams,
+   * streams that are complete */
+  priv->blocked = FALSE;
   g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media);
 }
 
@@ -2622,19 +2697,27 @@ gst_rtsp_media_get_status (GstRTSPMedia * media)
 }
 
 /**
- * gst_rtsp_media_seek_full:
+ * gst_rtsp_media_seek_trickmode:
  * @media: a #GstRTSPMedia
  * @range: (transfer none): a #GstRTSPTimeRange
  * @flags: The minimal set of #GstSeekFlags to use
+ * @rate: the rate to use in the seek
+ * @trickmode_interval: The trickmode interval to use for KEY_UNITS trick mode
  *
- * Seek the pipeline of @media to @range. @media must be prepared with
- * gst_rtsp_media_prepare().
+ * Seek the pipeline of @media to @range with the given @flags and @rate,
+ * and @trickmode_interval.
+ * @media must be prepared with gst_rtsp_media_prepare().
+ * In order to perform the seek operation, the pipeline must contain all
+ * needed transport parts (transport sinks).
  *
  * Returns: %TRUE on success.
+ *
+ * Since: 1.18
  */
 gboolean
-gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
-    GstSeekFlags flags)
+gst_rtsp_media_seek_trickmode (GstRTSPMedia * media,
+    GstRTSPTimeRange * range, GstSeekFlags flags, gdouble rate,
+    GstClockTime trickmode_interval)
 {
   GstRTSPMediaClass *klass;
   GstRTSPMediaPrivate *priv;
@@ -2642,12 +2725,15 @@ gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
   GstClockTime start, stop;
   GstSeekType start_type, stop_type;
   gint64 current_position;
+  gboolean force_seek;
 
   klass = GST_RTSP_MEDIA_GET_CLASS (media);
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
-  g_return_val_if_fail (range != NULL, FALSE);
-  g_return_val_if_fail (klass->convert_range != NULL, FALSE);
+  /* if there's a range then klass->convert_range must be set */
+  g_return_val_if_fail (range == NULL || klass->convert_range != NULL, FALSE);
+
+  GST_DEBUG ("flags=%x  rate=%f", flags, rate);
 
   priv = media->priv;
 
@@ -2673,15 +2759,21 @@ gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
   }
 
   start_type = stop_type = GST_SEEK_TYPE_NONE;
+  start = stop = GST_CLOCK_TIME_NONE;
 
-  if (!klass->convert_range (media, range, GST_RTSP_RANGE_NPT))
-    goto not_supported;
-  gst_rtsp_range_get_times (range, &start, &stop);
+  /* if caller provided a range convert it to NPT format
+   * if no range provided the seek is assumed to be the same position but with
+   * e.g. the rate changed */
+  if (range != NULL) {
+    if (!klass->convert_range (media, range, GST_RTSP_RANGE_NPT))
+      goto not_supported;
+    gst_rtsp_range_get_times (range, &start, &stop);
 
-  GST_INFO ("got %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
-  GST_INFO ("current %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (priv->range_start), GST_TIME_ARGS (priv->range_stop));
+    GST_INFO ("got %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
+    GST_INFO ("current %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (priv->range_start), GST_TIME_ARGS (priv->range_stop));
+  }
 
   current_position = -1;
   if (klass->query_position)
@@ -2692,29 +2784,25 @@ gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
   if (start != GST_CLOCK_TIME_NONE)
     start_type = GST_SEEK_TYPE_SET;
 
-  if (priv->range_stop == stop)
-    stop = GST_CLOCK_TIME_NONE;
-  else if (stop != GST_CLOCK_TIME_NONE)
+  if (stop != GST_CLOCK_TIME_NONE)
     stop_type = GST_SEEK_TYPE_SET;
 
-  if (start != GST_CLOCK_TIME_NONE || stop != GST_CLOCK_TIME_NONE) {
-    gboolean had_flags = flags != 0;
+  /* we force a seek if any trickmode flag is set, or if the rate
+   * is non-standard, i.e. not 1.0 */
+  force_seek = (flags & TRICKMODE_FLAGS) || rate != 1.0;
 
+  if (start != GST_CLOCK_TIME_NONE || stop != GST_CLOCK_TIME_NONE || force_seek) {
     GST_INFO ("seeking to %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
 
     /* depends on the current playing state of the pipeline. We might need to
      * queue this until we get EOS. */
-    if (had_flags)
-      flags |= GST_SEEK_FLAG_FLUSH;
-    else
-      flags = GST_SEEK_FLAG_FLUSH;
-
+    flags |= GST_SEEK_FLAG_FLUSH;
 
     /* if range start was not supplied we must continue from current position.
      * but since we're doing a flushing seek, let us query the current position
      * so we end up at exactly the same position after the seek. */
-    if (range->min.type == GST_RTSP_TIME_END) { /* Yepp, that's right! */
+    if (range == NULL || range->min.type == GST_RTSP_TIME_END) {
       if (current_position == -1) {
         GST_WARNING ("current position unknown");
       } else {
@@ -2722,27 +2810,72 @@ gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
             GST_TIME_ARGS (current_position));
         start = current_position;
         start_type = GST_SEEK_TYPE_SET;
-        if (!had_flags)
-          flags |= GST_SEEK_FLAG_ACCURATE;
       }
-    } else {
-      /* only set keyframe flag when modifying start */
-      if (start_type != GST_SEEK_TYPE_NONE)
-        if (!had_flags)
-          flags |= GST_SEEK_FLAG_KEY_UNIT;
     }
 
-    if (start == current_position && stop_type == GST_SEEK_TYPE_NONE) {
-      GST_DEBUG ("not seeking because no position change");
+    if (start == current_position && stop_type == GST_SEEK_TYPE_NONE &&
+        !force_seek) {
+      GST_DEBUG ("no position change, no flags set by caller, so not seeking");
       res = TRUE;
     } else {
+      GstEvent *seek_event;
+      gboolean unblock = FALSE;
+
+      /* Handle expected async-done before waiting on next async-done.
+       * 
+       * Since the seek further down in code will cause a preroll and
+       * a async-done will be generated it's important to wait on async-done
+       * if that is expected. Otherwise there is the risk that the waiting
+       * for async-done after the seek is detecting the expected async-done
+       * instead of the one that corresponds to the seek. Then execution
+       * continue and act as if the pipeline is prerolled, but it's not.
+       * 
+       * During wait_preroll message GST_MESSAGE_ASYNC_DONE will come
+       * and then the state will change from preparing to prepared */
+      if (priv->expected_async_done) {
+        GST_DEBUG (" expected to get async-done, waiting ");
+        gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
+        g_rec_mutex_unlock (&priv->state_lock);
+
+        /* wait until pipeline is prerolled  */
+        if (!wait_preroll (media))
+          goto preroll_failed_expected_async_done;
+
+        g_rec_mutex_lock (&priv->state_lock);
+        GST_DEBUG (" got expected async-done");
+      }
+
       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
-      if (priv->blocked)
+
+      if (rate < 0.0) {
+        GstClockTime temp_time = start;
+        GstSeekType temp_type = start_type;
+
+        start = stop;
+        start_type = stop_type;
+        stop = temp_time;
+        stop_type = temp_type;
+      }
+
+      seek_event = gst_event_new_seek (rate, GST_FORMAT_TIME, flags, start_type,
+          start, stop_type, stop);
+
+      gst_event_set_seek_trickmode_interval (seek_event, trickmode_interval);
+
+      if (!media->priv->blocked) {
+        /* Prevent a race condition with multiple streams,
+         * where one stream may have time to preroll before others
+         * have even started flushing, causing async-done to be
+         * posted too early.
+         */
         media_streams_set_blocked (media, TRUE);
+        unblock = TRUE;
+      }
+
+      res = gst_element_send_event (priv->pipeline, seek_event);
 
-      /* FIXME, we only do forwards playback, no trick modes yet */
-      res = gst_element_seek (priv->pipeline, 1.0, GST_FORMAT_TIME,
-          flags, start_type, start, stop_type, stop);
+      if (unblock)
+        media_streams_set_blocked (media, FALSE);
 
       /* and block for the seek to complete */
       GST_INFO ("done seeking %d", res);
@@ -2803,8 +2936,31 @@ preroll_failed:
     GST_WARNING ("failed to preroll after seek");
     return FALSE;
   }
+preroll_failed_expected_async_done:
+  {
+    GST_WARNING ("failed to preroll");
+    return FALSE;
+  }
 }
 
+/**
+ * gst_rtsp_media_seek_full:
+ * @media: a #GstRTSPMedia
+ * @range: (transfer none): a #GstRTSPTimeRange
+ * @flags: The minimal set of #GstSeekFlags to use
+ *
+ * Seek the pipeline of @media to @range with the given @flags.
+ * @media must be prepared with gst_rtsp_media_prepare().
+ *
+ * Returns: %TRUE on success.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_seek_full (GstRTSPMedia * media, GstRTSPTimeRange * range,
+    GstSeekFlags flags)
+{
+  return gst_rtsp_media_seek_trickmode (media, range, flags, 1.0, 0);
+}
 
 /**
  * gst_rtsp_media_seek:
@@ -2819,10 +2975,10 @@ preroll_failed:
 gboolean
 gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range)
 {
-  return gst_rtsp_media_seek_full (media, range, 0);
+  return gst_rtsp_media_seek_trickmode (media, range, GST_SEEK_FLAG_NONE,
+      1.0, 0);
 }
 
-
 static void
 stream_collect_blocking (GstRTSPStream * stream, gboolean * blocked)
 {
@@ -2896,8 +3052,9 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
       GST_DEBUG ("%p: went from %s to %s (pending %s)", media,
           gst_element_state_get_name (old), gst_element_state_get_name (new),
           gst_element_state_get_name (pending));
-      if (priv->no_more_pads_pending == 0 && is_receive_only (media) &&
-          old == GST_STATE_READY && new == GST_STATE_PAUSED) {
+      if (priv->no_more_pads_pending == 0
+          && gst_rtsp_media_is_receive_only (media) && old == GST_STATE_READY
+          && new == GST_STATE_PAUSED) {
         GST_INFO ("%p: went to PAUSED, prepared now", media);
         collect_media_stats (media);
 
@@ -2992,6 +3149,8 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message)
     case GST_MESSAGE_STREAM_STATUS:
       break;
     case GST_MESSAGE_ASYNC_DONE:
+      if (priv->expected_async_done)
+        priv->expected_async_done = FALSE;
       if (priv->complete) {
         /* receive the final ASYNC_DONE, that is posted by the media pipeline
          * after all the transport parts have been successfully added to
@@ -3043,27 +3202,57 @@ watch_destroyed (GstRTSPMedia * media)
   g_object_unref (media);
 }
 
+static gboolean
+is_payloader (GstElement * element)
+{
+  GstElementClass *eclass = GST_ELEMENT_GET_CLASS (element);
+  const gchar *klass;
+
+  klass = gst_element_class_get_metadata (eclass, GST_ELEMENT_METADATA_KLASS);
+  if (klass == NULL)
+    return FALSE;
+
+  if (strstr (klass, "Payloader") && strstr (klass, "RTP")) {
+    return TRUE;
+  }
+
+  return FALSE;
+}
+
 static GstElement *
-find_payload_element (GstElement * payloader)
+find_payload_element (GstElement * payloader, GstPad * pad)
 {
   GstElement *pay = NULL;
 
   if (GST_IS_BIN (payloader)) {
     GstIterator *iter;
     GValue item = { 0 };
+    gchar *pad_name, *payloader_name;
+    GstElement *element;
+
+    if ((element = gst_bin_get_by_name (GST_BIN (payloader), "pay"))) {
+      if (is_payloader (element))
+        return element;
+      gst_object_unref (element);
+    }
+
+    pad_name = gst_object_get_name (GST_OBJECT (pad));
+    payloader_name = g_strdup_printf ("pay_%s", pad_name);
+    g_free (pad_name);
+    if ((element = gst_bin_get_by_name (GST_BIN (payloader), payloader_name))) {
+      g_free (payloader_name);
+      if (is_payloader (element))
+        return element;
+      gst_object_unref (element);
+    } else {
+      g_free (payloader_name);
+    }
 
     iter = gst_bin_iterate_recurse (GST_BIN (payloader));
     while (gst_iterator_next (iter, &item) == GST_ITERATOR_OK) {
-      GstElement *element = (GstElement *) g_value_get_object (&item);
-      GstElementClass *eclass = GST_ELEMENT_GET_CLASS (element);
-      const gchar *klass;
+      element = (GstElement *) g_value_get_object (&item);
 
-      klass =
-          gst_element_class_get_metadata (eclass, GST_ELEMENT_METADATA_KLASS);
-      if (klass == NULL)
-        continue;
-
-      if (strstr (klass, "Payloader") && strstr (klass, "RTP")) {
+      if (is_payloader (element)) {
         pay = gst_object_ref (element);
         g_value_unset (&item);
         break;
@@ -3087,7 +3276,7 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
   GstElement *pay;
 
   /* find the real payload element */
-  pay = find_payload_element (element);
+  pay = find_payload_element (element, pad);
   stream = gst_rtsp_media_create_stream (media, pay, pad);
   gst_object_unref (pay);
 
@@ -3371,7 +3560,7 @@ start_prepare (GstRTSPMedia * media)
     g_object_set_data (G_OBJECT (elem), "gst-rtsp-dynpay-handlers", handlers);
   }
 
-  if (priv->nb_dynamic_elements == 0 && is_receive_only (media)) {
+  if (priv->nb_dynamic_elements == 0 && gst_rtsp_media_is_receive_only (media)) {
     /* If we are receive_only (RECORD), do not try to preroll, to avoid
      * a second ASYNC state change failing */
     priv->is_live = TRUE;
@@ -3616,6 +3805,10 @@ finish_unprepare (GstRTSPMedia * media)
   gint i;
   GList *walk;
 
+  if (priv->finishing_unprepare)
+    return;
+  priv->finishing_unprepare = TRUE;
+
   GST_DEBUG ("shutting down");
 
   /* release the lock on shutdown, otherwise pad_added_cb might try to
@@ -3626,9 +3819,6 @@ finish_unprepare (GstRTSPMedia * media)
 
   media_streams_set_blocked (media, FALSE);
 
-  if (priv->status != GST_RTSP_MEDIA_STATUS_UNPREPARING)
-    return;
-
   for (i = 0; i < priv->streams->len; i++) {
     GstRTSPStream *stream;
 
@@ -3681,6 +3871,8 @@ finish_unprepare (GstRTSPMedia * media)
     GST_DEBUG ("stop thread");
     gst_rtsp_thread_stop (priv->thread);
   }
+
+  priv->finishing_unprepare = FALSE;
 }
 
 /* called with state-lock */
@@ -4195,14 +4387,14 @@ default_unsuspend (GstRTSPMedia * media)
 
   switch (priv->suspend_mode) {
     case GST_RTSP_SUSPEND_MODE_NONE:
-      if (is_receive_only (media))
+      if (gst_rtsp_media_is_receive_only (media))
         break;
       if (media_streams_blocking (media)) {
         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
         /* at this point the media pipeline has been updated and contain all
          * specific transport parts: all active streams contain at least one sink
-         * element and it's safe to unblock any blocked streams that are active */
-        media_unblock_linked (media);
+         * element and it's safe to unblock all blocked streams */
+        media_unblock (media);
       } else {
         /* streams are not blocked and media is suspended from PAUSED */
         gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED);
@@ -4222,8 +4414,8 @@ default_unsuspend (GstRTSPMedia * media)
       gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING);
       /* at this point the media pipeline has been updated and contain all
        * specific transport parts: all active streams contain at least one sink
-       * element and it's safe to unblock any blocked streams that are active */
-      media_unblock_linked (media);
+       * element and it's safe to unblock all blocked streams */
+      media_unblock (media);
       if (!start_preroll (media))
         goto start_failed;
 
@@ -4300,6 +4492,8 @@ static void
 media_set_pipeline_state_locked (GstRTSPMedia * media, GstState state)
 {
   GstRTSPMediaPrivate *priv = media->priv;
+  GstStateChangeReturn set_state_ret;
+  priv->expected_async_done = FALSE;
 
   if (state == GST_STATE_NULL) {
     gst_rtsp_media_unprepare (media);
@@ -4313,13 +4507,17 @@ media_set_pipeline_state_locked (GstRTSPMedia * media, GstState state)
     } else {
       if (state == GST_STATE_PLAYING)
         /* make sure pads are not blocking anymore when going to PLAYING */
-        media_unblock_linked (media);
+        media_unblock (media);
 
-      set_state (media, state);
-
-      /* and suspend after pause */
-      if (state == GST_STATE_PAUSED)
+      if (state == GST_STATE_PAUSED) {
+        set_state_ret = set_state (media, state);
+        if (set_state_ret == GST_STATE_CHANGE_ASYNC)
+          priv->expected_async_done = TRUE;
+        /* and suspend after pause */
         gst_rtsp_media_suspend (media);
+      } else {
+        set_state (media, state);
+      }
     }
   }
 }
@@ -4369,6 +4567,13 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
   priv = media->priv;
 
   g_rec_mutex_lock (&priv->state_lock);
+
+  if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING
+      && gst_rtsp_media_is_shared (media)) {
+    g_rec_mutex_unlock (&priv->state_lock);
+    gst_rtsp_media_get_status (media);
+    g_rec_mutex_lock (&priv->state_lock);
+  }
   if (priv->status == GST_RTSP_MEDIA_STATUS_ERROR)
     goto error_status;
   if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARED &&
@@ -4427,8 +4632,10 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
   /* we just activated the first media, do the playing state change */
   if (old_active == 0 && activate)
     do_state = TRUE;
-  /* if we have no more active media, do the downward state changes */
-  else if (priv->n_active == 0)
+  /* if we have no more active media and prepare count is not indicate 
+   * that there are new session/sessions ongoing,
+   * do the downward state changes */
+  else if (priv->n_active == 0 && priv->prepare_count <= 1)
     do_state = TRUE;
   else
     do_state = FALSE;
@@ -4530,7 +4737,7 @@ gst_rtsp_media_get_transport_mode (GstRTSPMedia * media)
 }
 
 /**
- * gst_rtsp_media_get_seekable:
+ * gst_rtsp_media_seekable:
  * @media: a #GstRTSPMedia
  *
  * Check if the pipeline for @media seek and up to what point in time,
@@ -4539,6 +4746,8 @@ gst_rtsp_media_get_transport_mode (GstRTSPMedia * media)
  * Returns: -1 if the stream is not seekable, 0 if seekable only to the beginning
  * and > 0 to indicate the longest duration between any two random access points.
  * %G_MAXINT64 means any value is possible.
+ *
+ * Since: 1.14
  */
 GstClockTimeDiff
 gst_rtsp_media_seekable (GstRTSPMedia * media)
@@ -4568,6 +4777,8 @@ gst_rtsp_media_seekable (GstRTSPMedia * media)
  * SETUP.
  *
  * Returns: %TRUE if the media pipeline has been sucessfully updated.
+ *
+ * Since: 1.14
  */
 gboolean
 gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports)
@@ -4609,3 +4820,114 @@ gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports)
 
   return TRUE;
 }
+
+/**
+ * gst_rtsp_media_is_receive_only:
+ *
+ * Returns: %TRUE if @media is receive-only, %FALSE otherwise.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_is_receive_only (GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv = media->priv;
+  gboolean receive_only = TRUE;
+  guint i;
+
+  for (i = 0; i < priv->streams->len; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+    if (gst_rtsp_stream_is_sender (stream) ||
+        !gst_rtsp_stream_is_receiver (stream)) {
+      receive_only = FALSE;
+      break;
+    }
+  }
+
+  return receive_only;
+}
+
+/**
+ * gst_rtsp_media_has_completed_sender:
+ *
+ * See gst_rtsp_stream_is_complete(), gst_rtsp_stream_is_sender().
+ *    
+ * Returns: whether @media has at least one complete sender stream.
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_has_completed_sender (GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv = media->priv;
+  gboolean sender = FALSE;
+  guint i;
+
+  g_mutex_lock (&priv->lock);
+  for (i = 0; i < priv->streams->len; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+    if (gst_rtsp_stream_is_complete (stream))
+      if (gst_rtsp_stream_is_sender (stream) ||
+          !gst_rtsp_stream_is_receiver (stream)) {
+        sender = TRUE;
+        break;
+      }
+  }
+  g_mutex_unlock (&priv->lock);
+
+  return sender;
+}
+
+/**
+ * gst_rtsp_media_set_rate_control:
+ *
+ * Define whether @media will follow the Rate-Control=no behaviour as specified
+ * in the ONVIF replay spec.
+ *
+ * Since: 1.18
+ */
+void
+gst_rtsp_media_set_rate_control (GstRTSPMedia * media, gboolean enabled)
+{
+  GstRTSPMediaPrivate *priv;
+  guint i;
+
+  g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+  GST_LOG_OBJECT (media, "%s rate control", enabled ? "Enabling" : "Disabling");
+
+  priv = media->priv;
+
+  g_mutex_lock (&priv->lock);
+  priv->do_rate_control = enabled;
+  for (i = 0; i < priv->streams->len; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+
+    gst_rtsp_stream_set_rate_control (stream, enabled);
+
+  }
+  g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_rate_control:
+ *
+ * Returns: whether @media will follow the Rate-Control=no behaviour as specified
+ * in the ONVIF replay spec.
+ *
+ * Since: 1.18
+ */
+gboolean
+gst_rtsp_media_get_rate_control (GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv;
+  gboolean res;
+
+  g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+  priv = media->priv;
+
+  g_mutex_lock (&priv->lock);
+  res = priv->do_rate_control;
+  g_mutex_unlock (&priv->lock);
+
+  return res;
+}