caps: avoid using in-place oprations
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
index df0a718..1113405 100644 (file)
 #include <string.h>
 
 #include <gst/gst_private.h>
+#include <gst/glib-compat-private.h>
 
 #include "gstbasesrc.h"
 #include "gsttypefindhelper.h"
-#include <gst/gstmarshal.h>
 #include <gst/gst-i18n-lib.h>
 
 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
 #define GST_CAT_DEFAULT gst_base_src_debug
 
-#define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
+#define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
-#define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
+#define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
-#define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
-                                                                                timeval)
+#define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
 
@@ -209,6 +208,12 @@ struct _GstBaseSrcPrivate
   gboolean discont;
   gboolean flushing;
 
+  GstFlowReturn start_result;
+  gboolean async;
+
+  /* if a stream-start event should be sent */
+  gboolean stream_start_pending;
+
   /* if segment should be sent */
   gboolean segment_pending;
 
@@ -240,7 +245,7 @@ struct _GstBaseSrcPrivate
   GstClockTime earliest_time;
 
   GstBufferPool *pool;
-  const GstAllocator *allocator;
+  GstAllocator *allocator;
   guint prefix;
   guint alignment;
 };
@@ -280,14 +285,12 @@ gst_base_src_get_type (void)
 
 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
     GstCaps * filter);
-static void gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
-static void gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
+static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
+static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
 
 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
-static gboolean gst_base_src_activate_push (GstPad * pad, GstObject * parent,
-    gboolean active);
-static gboolean gst_base_src_activate_pull (GstPad * pad, GstObject * parent,
-    gboolean active);
+static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active);
 static void gst_base_src_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_base_src_get_property (GObject * object, guint prop_id,
@@ -315,6 +318,7 @@ static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
 
 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
+
 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
 
@@ -384,8 +388,7 @@ gst_base_src_class_init (GstBaseSrcClass * klass)
   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
 
   /* Registering debug symbols for function pointers */
-  GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
-  GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
@@ -401,8 +404,8 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
 
   basesrc->is_live = FALSE;
-  basesrc->live_lock = g_mutex_new ();
-  basesrc->live_cond = g_cond_new ();
+  g_mutex_init (&basesrc->live_lock);
+  g_cond_init (&basesrc->live_cond);
   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
   basesrc->num_buffers_left = -1;
 
@@ -416,8 +419,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   pad = gst_pad_new_from_template (pad_template, "src");
 
   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
-  gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
-  gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
+  gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
   gst_pad_set_event_function (pad, gst_base_src_event);
   gst_pad_set_query_function (pad, gst_base_src_query);
   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
@@ -435,8 +437,10 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
 
-  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
-  GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE);
+  basesrc->priv->start_result = GST_FLOW_FLUSHING;
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+  GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
 
   GST_DEBUG_OBJECT (basesrc, "init done");
 }
@@ -449,8 +453,8 @@ gst_base_src_finalize (GObject * object)
 
   basesrc = GST_BASE_SRC (object);
 
-  g_mutex_free (basesrc->live_lock);
-  g_cond_free (basesrc->live_cond);
+  g_mutex_clear (&basesrc->live_lock);
+  g_cond_clear (&basesrc->live_cond);
 
   event_p = &basesrc->pending_seek;
   gst_event_replace (event_p, NULL);
@@ -475,7 +479,7 @@ gst_base_src_finalize (GObject * object)
  * This function will block until a state change to PLAYING happens (in which
  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
  * to a state change to READY or a FLUSH event (in which case this function
- * returns #GST_FLOW_WRONG_STATE).
+ * returns #GST_FLOW_FLUSHING).
  *
  * Since: 0.10.12
  *
@@ -502,7 +506,7 @@ gst_base_src_wait_playing (GstBaseSrc * src)
 flushing:
   {
     GST_DEBUG_OBJECT (src, "we are flushing");
-    return GST_FLOW_WRONG_STATE;
+    return GST_FLOW_FLUSHING;
   }
 }
 
@@ -598,6 +602,49 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
 }
 
 /**
+ * gst_base_src_set_async:
+ * @src: base source instance
+ * @async: new async mode
+ *
+ * Configure async behaviour in @src, no state change will block. The open,
+ * close, start, stop, play and pause virtual methods will be executed in a
+ * different thread and are thus allowed to perform blocking operations. Any
+ * blocking operation should be unblocked with the unlock vmethod.
+ */
+void
+gst_base_src_set_async (GstBaseSrc * src, gboolean async)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+
+  GST_OBJECT_LOCK (src);
+  src->priv->async = async;
+  GST_OBJECT_UNLOCK (src);
+}
+
+/**
+ * gst_base_src_is_async:
+ * @src: base source instance
+ *
+ * Get the current async behaviour of @src. See also gst_base_src_set_async().
+ *
+ * Returns: %TRUE if @src is operating in async mode.
+ */
+gboolean
+gst_base_src_is_async (GstBaseSrc * src)
+{
+  gboolean res;
+
+  g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
+  GST_OBJECT_LOCK (src);
+  res = src->priv->async;
+  GST_OBJECT_UNLOCK (src);
+
+  return res;
+}
+
+
+/**
  * gst_base_src_query_latency:
  * @src: the source
  * @live: (out) (allow-none): if the source is live
@@ -785,6 +832,19 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
   return res;
 }
 
+static gboolean
+gst_base_src_send_stream_start (GstBaseSrc * src)
+{
+  gboolean ret = TRUE;
+
+  if (src->priv->stream_start_pending) {
+    ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
+    src->priv->stream_start_pending = FALSE;
+  }
+
+  return ret;
+}
+
 /**
  * gst_base_src_set_caps:
  * @src: a #GstBaseSrc
@@ -802,6 +862,7 @@ gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
+  gst_base_src_send_stream_start (src);
   gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
 
   if (bclass->set_caps)
@@ -837,14 +898,14 @@ gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
   return caps;
 }
 
-static void
+static GstCaps *
 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
 {
   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
-  gst_caps_fixate (caps);
+  return gst_caps_fixate (caps);
 }
 
-static void
+static GstCaps *
 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
 {
   GstBaseSrcClass *bclass;
@@ -852,7 +913,9 @@ gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
 
   if (bclass->fixate)
-    bclass->fixate (bsrc, caps);
+    caps = bclass->fixate (bsrc, caps);
+
+  return caps;
 }
 
 static gboolean
@@ -1133,10 +1196,13 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
       bclass = GST_BASE_SRC_GET_CLASS (src);
       if (bclass->get_caps) {
         gst_query_parse_caps (query, &filter);
-        caps = bclass->get_caps (src, filter);
-        gst_query_set_caps_result (query, caps);
-        gst_caps_unref (caps);
-        res = TRUE;
+        if ((caps = bclass->get_caps (src, filter))) {
+          gst_query_set_caps_result (query, caps);
+          gst_caps_unref (caps);
+          res = TRUE;
+        } else {
+          res = FALSE;
+        }
       } else
         res = FALSE;
       break;
@@ -1805,6 +1871,12 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
       result = TRUE;
       break;
     }
+    case GST_EVENT_RECONFIGURE:
+      result = TRUE;
+      break;
+    case GST_EVENT_LATENCY:
+      result = TRUE;
+      break;
     default:
       result = FALSE;
       break;
@@ -2179,7 +2251,8 @@ again:
     }
   }
 
-  if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
+  if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
+          && !GST_BASE_SRC_IS_STARTING (src)))
     goto not_started;
 
   if (G_UNLIKELY (!bclass->create))
@@ -2230,6 +2303,7 @@ again:
   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
   if (offset == 0 && src->segment.time == 0
       && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) {
+    GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
     *buf = gst_buffer_make_writable (*buf);
     GST_BUFFER_TIMESTAMP (*buf) = 0;
   }
@@ -2261,7 +2335,7 @@ again:
          * get rid of the produced buffer. */
         GST_DEBUG_OBJECT (src,
             "clock was unscheduled (%d), returning WRONG_STATE", status);
-        ret = GST_FLOW_WRONG_STATE;
+        ret = GST_FLOW_FLUSHING;
       } else {
         /* If we are running when this happens, we quickly switched between
          * pause and playing. We try to produce a new buffer */
@@ -2298,7 +2372,7 @@ not_ok:
 not_started:
   {
     GST_DEBUG_OBJECT (src, "getrange but not started");
-    return GST_FLOW_WRONG_STATE;
+    return GST_FLOW_FLUSHING;
   }
 no_function:
   {
@@ -2321,7 +2395,7 @@ flushing:
     GST_DEBUG_OBJECT (src, "we are flushing");
     gst_buffer_unref (*buf);
     *buf = NULL;
-    return GST_FLOW_WRONG_STATE;
+    return GST_FLOW_FLUSHING;
   }
 eos:
   {
@@ -2354,7 +2428,7 @@ done:
 flushing:
   {
     GST_DEBUG_OBJECT (src, "we are flushing");
-    res = GST_FLOW_WRONG_STATE;
+    res = GST_FLOW_FLUSHING;
     goto done;
   }
 }
@@ -2363,13 +2437,23 @@ static gboolean
 gst_base_src_is_random_access (GstBaseSrc * src)
 {
   /* we need to start the basesrc to check random access */
-  if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
+  if (!GST_BASE_SRC_IS_STARTED (src)) {
     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
-    if (G_LIKELY (gst_base_src_start (src)))
+    if (G_LIKELY (gst_base_src_start (src))) {
+      if (gst_base_src_start_wait (src) != GST_FLOW_OK)
+        goto start_failed;
       gst_base_src_stop (src);
+    }
   }
 
   return src->random_access;
+
+  /* ERRORS */
+start_failed:
+  {
+    GST_DEBUG_OBJECT (src, "failed to start");
+    return FALSE;
+  }
 }
 
 static void
@@ -2387,10 +2471,12 @@ gst_base_src_loop (GstPad * pad)
 
   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
 
+  gst_base_src_send_stream_start (src);
+
   /* check if we need to renegotiate */
   if (gst_pad_check_reconfigure (pad)) {
     if (!gst_base_src_negotiate (src))
-      GST_DEBUG_OBJECT (src, "Failed to renegotiate");
+      goto not_negotiated;
   }
 
   GST_LIVE_LOCK (src);
@@ -2523,6 +2609,7 @@ gst_base_src_loop (GstPad * pad)
   }
 
   if (G_UNLIKELY (src->priv->discont)) {
+    GST_INFO_OBJECT (src, "marking pending DISCONT");
     buf = gst_buffer_make_writable (buf);
     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
     src->priv->discont = FALSE;
@@ -2546,11 +2633,17 @@ done:
   return;
 
   /* special cases */
+not_negotiated:
+  {
+    GST_DEBUG_OBJECT (src, "Failed to renegotiate");
+    ret = GST_FLOW_NOT_NEGOTIATED;
+    goto pause;
+  }
 flushing:
   {
     GST_DEBUG_OBJECT (src, "we are flushing");
     GST_LIVE_UNLOCK (src);
-    ret = GST_FLOW_WRONG_STATE;
+    ret = GST_FLOW_FLUSHING;
     goto pause;
   }
 pause:
@@ -2610,8 +2703,9 @@ null_buffer:
 
 static gboolean
 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
-    const GstAllocator * allocator, guint prefix, guint alignment)
+    GstAllocator * allocator, guint prefix, guint alignment)
 {
+  GstAllocator *oldalloc;
   GstBufferPool *oldpool;
   GstBaseSrcPrivate *priv = basesrc->priv;
 
@@ -2625,6 +2719,7 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
   oldpool = priv->pool;
   priv->pool = pool;
 
+  oldalloc = priv->allocator;
   priv->allocator = allocator;
 
   priv->prefix = prefix;
@@ -2639,6 +2734,9 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
     }
     gst_object_unref (oldpool);
   }
+  if (oldalloc) {
+    gst_allocator_unref (oldalloc);
+  }
   return TRUE;
 
   /* ERRORS */
@@ -2675,7 +2773,7 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
   gboolean result = TRUE;
   GstQuery *query;
   GstBufferPool *pool = NULL;
-  const GstAllocator *allocator = NULL;
+  GstAllocator *allocator = NULL;
   guint size, min, max, prefix, alignment;
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
@@ -2698,15 +2796,12 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
       &alignment, &pool);
 
   if (size == 0) {
-    const gchar *mem = NULL;
-
     /* no size, we have variable size buffers */
     if (gst_query_get_n_allocation_memories (query) > 0) {
-      mem = gst_query_parse_nth_allocation_memory (query, 0);
+      if ((allocator = gst_query_parse_nth_allocation_memory (query, 0)))
+        gst_allocator_ref (allocator);
     }
-    GST_DEBUG_OBJECT (basesrc, "0 size, getting allocator %s",
-        GST_STR_NULL (mem));
-    allocator = gst_allocator_find (mem);
+    GST_DEBUG_OBJECT (basesrc, "0 size, using allocator %p", allocator);
   } else if (pool == NULL) {
     /* fixed size, we can use a bufferpool */
     GstStructure *config;
@@ -2773,8 +2868,7 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc)
        * nego is not needed */
       result = TRUE;
     } else {
-      caps = gst_caps_make_writable (caps);
-      gst_base_src_fixate (basesrc, caps);
+      caps = gst_base_src_fixate (basesrc, caps);
       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
       if (gst_caps_is_fixed (caps)) {
         /* yay, fixed caps, use those then, it's possible that the subclass does
@@ -2784,6 +2878,8 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc)
     }
     gst_caps_unref (caps);
   } else {
+    if (caps)
+      gst_caps_unref (caps);
     GST_DEBUG_OBJECT (basesrc, "no common caps");
   }
   return result;
@@ -2838,24 +2934,23 @@ static gboolean
 gst_base_src_start (GstBaseSrc * basesrc)
 {
   GstBaseSrcClass *bclass;
-  gboolean result, have_size;
-  guint64 size;
-  gboolean seekable;
-  GstFormat format;
-
-  if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
-    return TRUE;
+  gboolean result;
 
-  GST_DEBUG_OBJECT (basesrc, "starting source");
+  GST_LIVE_LOCK (basesrc);
+  if (GST_BASE_SRC_IS_STARTING (basesrc))
+    goto was_starting;
+  if (GST_BASE_SRC_IS_STARTED (basesrc))
+    goto was_started;
 
+  basesrc->priv->start_result = GST_FLOW_FLUSHING;
+  GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
   basesrc->num_buffers_left = basesrc->num_buffers;
-
+  basesrc->running = FALSE;
+  basesrc->priv->segment_pending = FALSE;
   GST_OBJECT_LOCK (basesrc);
   gst_segment_init (&basesrc->segment, basesrc->segment.format);
   GST_OBJECT_UNLOCK (basesrc);
-
-  basesrc->running = FALSE;
-  basesrc->priv->segment_pending = FALSE;
+  GST_LIVE_UNLOCK (basesrc);
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
   if (bclass->start)
@@ -2866,14 +2961,65 @@ gst_base_src_start (GstBaseSrc * basesrc)
   if (!result)
     goto could_not_start;
 
-  GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
+  if (!gst_base_src_is_async (basesrc))
+    gst_base_src_start_complete (basesrc, GST_FLOW_OK);
 
+  return result;
+
+  /* ERROR */
+was_starting:
+  {
+    GST_DEBUG_OBJECT (basesrc, "was starting");
+    GST_LIVE_UNLOCK (basesrc);
+    return TRUE;
+  }
+was_started:
+  {
+    GST_DEBUG_OBJECT (basesrc, "was started");
+    GST_LIVE_UNLOCK (basesrc);
+    return TRUE;
+  }
+could_not_start:
+  {
+    GST_DEBUG_OBJECT (basesrc, "could not start");
+    /* subclass is supposed to post a message. We don't have to call _stop. */
+    gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
+    return FALSE;
+  }
+}
+
+/**
+ * gst_base_src_start_complete:
+ * @src: base source instance
+ * @ret: a #GstFlowReturn
+ *
+ * Complete an asynchronous start operation. When the subclass overrides the
+ * start method, it should call gst_base_src_start_complete() when the start
+ * operation completes either from the same thread or from an asynchronous
+ * helper thread.
+ */
+void
+gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
+{
+  gboolean have_size;
+  guint64 size;
+  gboolean seekable;
+  GstFormat format;
+  GstPadMode mode;
+  GstEvent *event;
+
+  if (ret != GST_FLOW_OK)
+    goto error;
+
+  GST_DEBUG_OBJECT (basesrc, "starting source");
   format = basesrc->segment.format;
 
   /* figure out the size */
   have_size = FALSE;
   size = -1;
   if (format == GST_FORMAT_BYTES) {
+    GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
+
     if (bclass->get_size) {
       if (!(have_size = bclass->get_size (basesrc, &size)))
         size = -1;
@@ -2899,16 +3045,107 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
 
+  /* stop flushing now but for live sources, still block in the LIVE lock when
+   * we are not yet PLAYING */
+  gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
+
   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
 
-  return TRUE;
+  GST_OBJECT_LOCK (basesrc->srcpad);
+  mode = GST_PAD_MODE (basesrc->srcpad);
+  GST_OBJECT_UNLOCK (basesrc->srcpad);
 
-  /* ERROR */
-could_not_start:
+  if (mode == GST_PAD_MODE_PUSH) {
+    /* do initial seek, which will start the task */
+    GST_OBJECT_LOCK (basesrc);
+    event = basesrc->pending_seek;
+    basesrc->pending_seek = NULL;
+    GST_OBJECT_UNLOCK (basesrc);
+
+    /* no need to unlock anything, the task is certainly
+     * not running here. The perform seek code will start the task when
+     * finished. */
+    if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
+      goto seek_failed;
+
+    if (event)
+      gst_event_unref (event);
+  } else {
+    /* if not random_access, we cannot operate in pull mode for now */
+    if (G_UNLIKELY (!basesrc->random_access))
+      goto no_get_range;
+  }
+
+  GST_LIVE_LOCK (basesrc);
+  GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+  basesrc->priv->start_result = ret;
+  GST_LIVE_SIGNAL (basesrc);
+  GST_LIVE_UNLOCK (basesrc);
+
+  return;
+
+seek_failed:
   {
-    GST_DEBUG_OBJECT (basesrc, "could not start");
-    /* subclass is supposed to post a message. We don't have to call _stop. */
-    return FALSE;
+    GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
+    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
+    if (event)
+      gst_event_unref (event);
+    ret = GST_FLOW_ERROR;
+    goto error;
+  }
+no_get_range:
+  {
+    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
+    GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
+    ret = GST_FLOW_ERROR;
+    goto error;
+  }
+error:
+  {
+    GST_LIVE_LOCK (basesrc);
+    basesrc->priv->start_result = ret;
+    GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+    GST_LIVE_SIGNAL (basesrc);
+    GST_LIVE_UNLOCK (basesrc);
+    return;
+  }
+}
+
+/**
+ * gst_base_src_start_wait:
+ * @src: base source instance
+ * @ret: a #GstFlowReturn
+ *
+ * Wait until the start operation completes.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+gst_base_src_start_wait (GstBaseSrc * basesrc)
+{
+  GstFlowReturn result;
+
+  GST_LIVE_LOCK (basesrc);
+  if (G_UNLIKELY (basesrc->priv->flushing))
+    goto flushing;
+
+  while (GST_BASE_SRC_IS_STARTING (basesrc)) {
+    GST_LIVE_WAIT (basesrc);
+    if (G_UNLIKELY (basesrc->priv->flushing))
+      goto flushing;
+  }
+  result = basesrc->priv->start_result;
+  GST_LIVE_UNLOCK (basesrc);
+
+  return result;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG_OBJECT (basesrc, "we are flushing");
+    GST_LIVE_UNLOCK (basesrc);
+    return GST_FLOW_FLUSHING;
   }
 }
 
@@ -2918,21 +3155,37 @@ gst_base_src_stop (GstBaseSrc * basesrc)
   GstBaseSrcClass *bclass;
   gboolean result = TRUE;
 
-  if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
-    return TRUE;
-
   GST_DEBUG_OBJECT (basesrc, "stopping source");
 
+  /* flush all */
+  gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
+  /* stop the task */
+  gst_pad_stop_task (basesrc->srcpad);
+
+  GST_LIVE_LOCK (basesrc);
+  if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
+    goto was_stopped;
+
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
+  basesrc->priv->start_result = GST_FLOW_FLUSHING;
+  GST_LIVE_SIGNAL (basesrc);
+  GST_LIVE_UNLOCK (basesrc);
+
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
   if (bclass->stop)
     result = bclass->stop (basesrc);
 
   gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0);
 
-  if (result)
-    GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
-
   return result;
+
+was_stopped:
+  {
+    GST_DEBUG_OBJECT (basesrc, "was started");
+    GST_LIVE_UNLOCK (basesrc);
+    return TRUE;
+  }
 }
 
 /* start or stop flushing dataprocessing
@@ -3059,7 +3312,6 @@ static gboolean
 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
 {
   GstBaseSrc *basesrc;
-  GstEvent *event;
 
   basesrc = GST_BASE_SRC (parent);
 
@@ -3072,30 +3324,8 @@ gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
 
     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
       goto error_start;
-
-    basesrc->priv->discont = TRUE;
-    gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
-
-    /* do initial seek, which will start the task */
-    GST_OBJECT_LOCK (basesrc);
-    event = basesrc->pending_seek;
-    basesrc->pending_seek = NULL;
-    GST_OBJECT_UNLOCK (basesrc);
-
-    /* no need to unlock anything, the task is certainly
-     * not running here. The perform seek code will start the task when
-     * finished. */
-    if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
-      goto seek_failed;
-
-    if (event)
-      gst_event_unref (event);
   } else {
     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
-    /* flush all */
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
-    /* stop the task */
-    gst_pad_stop_task (pad);
     /* now we can stop the source */
     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
       goto error_stop;
@@ -3113,19 +3343,6 @@ error_start:
     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
     return FALSE;
   }
-seek_failed:
-  {
-    GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
-    /* flush all */
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
-    /* stop the task */
-    gst_pad_stop_task (pad);
-    /* Stop the basesrc */
-    gst_base_src_stop (basesrc);
-    if (event)
-      gst_event_unref (event);
-    return FALSE;
-  }
 error_stop:
   {
     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
@@ -3145,19 +3362,8 @@ gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
       goto error_start;
-
-    /* if not random_access, we cannot operate in pull mode for now */
-    if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc)))
-      goto no_get_range;
-
-    /* stop flushing now but for live sources, still block in the LIVE lock when
-     * we are not yet PLAYING */
-    gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
   } else {
     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
-    /* flush all, there is no task to stop */
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
-
     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
       goto error_stop;
   }
@@ -3169,12 +3375,6 @@ error_start:
     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
     return FALSE;
   }
-no_get_range:
-  {
-    GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
-    gst_base_src_stop (basesrc);
-    return FALSE;
-  }
 error_stop:
   {
     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
@@ -3182,6 +3382,28 @@ error_stop:
   }
 }
 
+static gboolean
+gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active)
+{
+  gboolean res;
+
+  switch (mode) {
+    case GST_PAD_MODE_PULL:
+      res = gst_base_src_activate_pull (pad, parent, active);
+      break;
+    case GST_PAD_MODE_PUSH:
+      res = gst_base_src_activate_push (pad, parent, active);
+      break;
+    default:
+      GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
+      res = FALSE;
+      break;
+  }
+  return res;
+}
+
+
 static GstStateChangeReturn
 gst_base_src_change_state (GstElement * element, GstStateChange transition)
 {
@@ -3195,6 +3417,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      basesrc->priv->stream_start_pending = TRUE;
       no_preroll = gst_base_src_is_live (basesrc);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -3224,13 +3447,11 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
     {
-      GstEvent **event_p;
-
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
-      event_p = &basesrc->pending_seek;
-      gst_event_replace (event_p, NULL);
+      gst_event_replace (&basesrc->pending_seek, NULL);
+      basesrc->priv->stream_start_pending = FALSE;
       break;
     }
     case GST_STATE_CHANGE_READY_TO_NULL: