aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
index bedb88e..e7e3815 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
  * SECTION:gstbasesrc
+ * @title: GstBaseSrc
  * @short_description: Base class for getrange based source elements
  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
  *
- * This is a generice base class for source elements. The following
+ * This is a generic base class for source elements. The following
  * types of sources are supported:
- * <itemizedlist>
- *   <listitem><para>random access sources like files</para></listitem>
- *   <listitem><para>seekable sources</para></listitem>
- *   <listitem><para>live sources</para></listitem>
- * </itemizedlist>
+ *
+ *   * random access sources like files
+ *   * seekable sources
+ *   * live sources
  *
  * The source can be configured to operate in any #GstFormat with the
  * gst_base_src_set_format() method. The currently set format determines
- * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
- * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
+ * the format of the internal #GstSegment and any %GST_EVENT_SEGMENT
+ * events. The default format for #GstBaseSrc is %GST_FORMAT_BYTES.
  *
  * #GstBaseSrc always supports push mode scheduling. If the following
  * conditions are met, it also supports pull mode scheduling:
- * <itemizedlist>
- *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
- *   </listitem>
- *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
- *   </listitem>
- * </itemizedlist>
+ *
+ *   * The format is set to %GST_FORMAT_BYTES (default).
+ *   * #GstBaseSrcClass.is_seekable() returns %TRUE.
  *
  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
  * automatically seekable in push mode as well. The following conditions must
  * be met to make the element seekable in push mode when the format is not
- * #GST_FORMAT_BYTES:
- * <itemizedlist>
- *   <listitem><para>
- *     #GstBaseSrcClass.is_seekable() returns %TRUE.
- *   </para></listitem>
- *   <listitem><para>
- *     #GstBaseSrcClass.query() can convert all supported seek formats to the
- *     internal format as set with gst_base_src_set_format().
- *   </para></listitem>
- *   <listitem><para>
- *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
- *      %TRUE.
- *   </para></listitem>
- * </itemizedlist>
+ * %GST_FORMAT_BYTES:
+ *
+ * * #GstBaseSrcClass.is_seekable() returns %TRUE.
+ * * #GstBaseSrcClass.query() can convert all supported seek formats to the
+ *   internal format as set with gst_base_src_set_format().
+ * * #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
+ *    %TRUE.
  *
  * When the element does not meet the requirements to operate in pull mode, the
  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
@@ -82,7 +72,7 @@
  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
  * PLAYING. To signal the pipeline that the element will not produce data, the
  * return value from the READY to PAUSED state will be
- * #GST_STATE_CHANGE_NO_PREROLL.
+ * %GST_STATE_CHANGE_NO_PREROLL.
  *
  * A typical live source will timestamp the buffers it creates with the
  * current running time of the pipeline. This is one reason why a live source
  * There is only support in #GstBaseSrc for exactly one source pad, which
  * should be named "src". A source implementation (subclass of #GstBaseSrc)
  * should install a pad template in its class_init function, like so:
- * |[
+ * |[<!-- language="C" -->
  * static void
  * my_element_class_init (GstMyElementClass *klass)
  * {
  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
  *   // srctemplate should be a #GstStaticPadTemplate with direction
- *   // #GST_PAD_SRC and name "src"
- *   gst_element_class_add_pad_template (gstelement_class,
- *       gst_static_pad_template_get (&amp;srctemplate));
- *   // see #GstElementDetails
- *   gst_element_class_set_details (gstelement_class, &amp;details);
+ *   // %GST_PAD_SRC and name "src"
+ *   gst_element_class_add_static_pad_template (gstelement_class, &amp;srctemplate);
+ *
+ *   gst_element_class_set_static_metadata (gstelement_class,
+ *      "Source name",
+ *      "Source",
+ *      "My Source element",
+ *      "The author <my.sink@my.email>");
  * }
  * ]|
  *
- * <refsect2>
- * <title>Controlled shutdown of live sources in applications</title>
- * <para>
+ * ## Controlled shutdown of live sources in applications
+ *
  * Applications that record from a live source may want to stop recording
  * in a controlled way, so that the recording is stopped, but the data
  * already in the pipeline is processed to the end (remember that many live
  *
  * An application may send an EOS event to a source element to make it
  * perform the EOS logic (send EOS event downstream or post a
- * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
+ * %GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
  * with the gst_element_send_event() function on the element or its parent bin.
  *
  * After the EOS has been sent to the element, the application should wait for
  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
  * received, it may safely shut down the entire pipeline.
  *
- * Last reviewed on 2007-12-19 (0.10.16)
- * </para>
- * </refsect2>
  */
 
 #ifdef HAVE_CONFIG_H
 #include <gst/glib-compat-private.h>
 
 #include "gstbasesrc.h"
-#include "gsttypefindhelper.h"
 #include <gst/gst-i18n-lib.h>
 
 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
@@ -179,6 +167,18 @@ GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
 
+
+#define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
+#define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
+#define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
+
+#define CLEAR_PENDING_EOS(bsrc) \
+  G_STMT_START { \
+    g_atomic_int_set (&bsrc->priv->has_pending_eos, FALSE); \
+    gst_event_replace (&bsrc->priv->pending_eos, NULL); \
+  } G_STMT_END
+
+
 /* BaseSrc signals and args */
 enum
 {
@@ -188,7 +188,6 @@ enum
 
 #define DEFAULT_BLOCKSIZE       4096
 #define DEFAULT_NUM_BUFFERS     -1
-#define DEFAULT_TYPEFIND        FALSE
 #define DEFAULT_DO_TIMESTAMP    FALSE
 
 enum
@@ -196,60 +195,79 @@ enum
   PROP_0,
   PROP_BLOCKSIZE,
   PROP_NUM_BUFFERS,
+#ifndef GST_REMOVE_DEPRECATED
   PROP_TYPEFIND,
+#endif
   PROP_DO_TIMESTAMP
 };
 
-#define GST_BASE_SRC_GET_PRIVATE(obj)  \
-   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
-
+/* The basesrc implementation need to respect the following locking order:
+ *   1. STREAM_LOCK
+ *   2. LIVE_LOCK
+ *   3. OBJECT_LOCK
+ */
 struct _GstBaseSrcPrivate
 {
-  gboolean discont;
-  gboolean flushing;
+  gboolean discont;             /* STREAM_LOCK */
+  gboolean flushing;            /* LIVE_LOCK */
 
-  GstFlowReturn start_result;
-  gboolean async;
+  GstFlowReturn start_result;   /* OBJECT_LOCK */
+  gboolean async;               /* OBJECT_LOCK */
 
   /* if a stream-start event should be sent */
-  gboolean stream_start_pending;
+  gboolean stream_start_pending;        /* STREAM_LOCK */
 
-  /* if segment should be sent */
-  gboolean segment_pending;
+  /* if segment should be sent and a
+   * seqnum if it was originated by a seek */
+  gboolean segment_pending;     /* OBJECT_LOCK */
+  guint32 segment_seqnum;       /* OBJECT_LOCK */
 
   /* if EOS is pending (atomic) */
-  gint pending_eos;
+  GstEvent *pending_eos;        /* OBJECT_LOCK */
+  gint has_pending_eos;         /* atomic */
+
+  /* if the eos was caused by a forced eos from the application */
+  gboolean forced_eos;          /* LIVE_LOCK */
 
   /* startup latency is the time it takes between going to PLAYING and producing
    * the first BUFFER with running_time 0. This value is included in the latency
    * reporting. */
-  GstClockTime latency;
+  GstClockTime latency;         /* OBJECT_LOCK */
   /* timestamp offset, this is the offset add to the values of gst_times for
    * pseudo live sources */
-  GstClockTimeDiff ts_offset;
+  GstClockTimeDiff ts_offset;   /* OBJECT_LOCK */
 
-  gboolean do_timestamp;
-  volatile gint dynamic_size;
+  gboolean do_timestamp;        /* OBJECT_LOCK */
+  volatile gint dynamic_size;   /* atomic */
+  volatile gint automatic_eos;  /* atomic */
 
   /* stream sequence number */
-  guint32 seqnum;
+  guint32 seqnum;               /* STREAM_LOCK */
 
   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
    * pushed in the data stream */
-  GList *pending_events;
-  volatile gint have_events;
+  GList *pending_events;        /* OBJECT_LOCK */
+  volatile gint have_events;    /* OBJECT_LOCK */
 
   /* QoS *//* with LOCK */
-  gboolean qos_enabled;
-  gdouble proportion;
-  GstClockTime earliest_time;
+  gdouble proportion;           /* OBJECT_LOCK */
+  GstClockTime earliest_time;   /* OBJECT_LOCK */
 
-  GstBufferPool *pool;
-  GstAllocator *allocator;
-  GstAllocationParams params;
+  GstBufferPool *pool;          /* OBJECT_LOCK */
+  GstAllocator *allocator;      /* OBJECT_LOCK */
+  GstAllocationParams params;   /* OBJECT_LOCK */
+
+  GCond async_cond;             /* OBJECT_LOCK */
+
+  /* for _submit_buffer_list() */
+  GstBufferList *pending_bufferlist;
 };
 
+#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
+    ((src)->priv->pending_bufferlist != NULL)
+
 static GstElementClass *parent_class = NULL;
+static gint private_offset = 0;
 
 static void gst_base_src_class_init (GstBaseSrcClass * klass);
 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
@@ -277,11 +295,21 @@ gst_base_src_get_type (void)
 
     _type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
+
+    private_offset =
+        g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate));
+
     g_once_init_leave (&base_src_type, _type);
   }
   return base_src_type;
 }
 
+static inline GstBaseSrcPrivate *
+gst_base_src_get_instance_private (GstBaseSrc * self)
+{
+  return (G_STRUCT_MEMBER_P (self, private_offset));
+}
+
 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
     GstCaps * filter);
 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
@@ -302,8 +330,8 @@ static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
-static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
-    gboolean active);
+static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc,
+    gboolean flushing);
 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
     GstSegment * segment);
@@ -318,7 +346,7 @@ static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
     GstQuery * query);
 
 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
-    gboolean flushing, gboolean live_play, gboolean * playing);
+    gboolean flushing);
 
 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
@@ -334,7 +362,7 @@ static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
 static gboolean gst_base_src_seekable (GstBaseSrc * src);
 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
-    guint * length);
+    guint * length, gboolean force);
 
 static void
 gst_base_src_class_init (GstBaseSrcClass * klass)
@@ -345,9 +373,10 @@ gst_base_src_class_init (GstBaseSrcClass * klass)
   gobject_class = G_OBJECT_CLASS (klass);
   gstelement_class = GST_ELEMENT_CLASS (klass);
 
-  GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
+  if (private_offset != 0)
+    g_type_class_adjust_private_offset (klass, &private_offset);
 
-  g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
+  GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
 
   parent_class = g_type_class_peek_parent (klass);
 
@@ -364,10 +393,12 @@ gst_base_src_class_init (GstBaseSrcClass * klass)
           "Number of buffers to output before sending EOS (-1 = unlimited)",
           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
           G_PARAM_STATIC_STRINGS));
+#ifndef GST_REMOVE_DEPRECATED
   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
       g_param_spec_boolean ("typefind", "Typefind",
-          "Run typefind before negotiating", DEFAULT_TYPEFIND,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Run typefind before negotiating (deprecated, non-functional)", FALSE,
+          G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS));
+#endif
   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
       g_param_spec_boolean ("do-timestamp", "Do timestamp",
           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
@@ -404,13 +435,14 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   GstPad *pad;
   GstPadTemplate *pad_template;
 
-  basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
+  basesrc->priv = gst_base_src_get_instance_private (basesrc);
 
   basesrc->is_live = FALSE;
   g_mutex_init (&basesrc->live_lock);
   g_cond_init (&basesrc->live_cond);
   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
   basesrc->num_buffers_left = -1;
+  basesrc->priv->automatic_eos = TRUE;
 
   basesrc->can_activate_push = TRUE;
 
@@ -436,10 +468,10 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   basesrc->clock_id = NULL;
   /* we operate in BYTES by default */
   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
-  basesrc->typefind = DEFAULT_TYPEFIND;
   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
 
+  g_cond_init (&basesrc->priv->async_cond);
   basesrc->priv->start_result = GST_FLOW_FLUSHING;
   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
@@ -458,6 +490,7 @@ gst_base_src_finalize (GObject * object)
 
   g_mutex_clear (&basesrc->live_lock);
   g_cond_clear (&basesrc->live_cond);
+  g_cond_clear (&basesrc->priv->async_cond);
 
   event_p = &basesrc->pending_seek;
   gst_event_replace (event_p, NULL);
@@ -471,6 +504,31 @@ gst_base_src_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+/* Call with LIVE_LOCK held */
+static GstFlowReturn
+gst_base_src_wait_playing_unlocked (GstBaseSrc * src)
+{
+  while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) {
+    /* block until the state changes, or we get a flush, or something */
+    GST_DEBUG_OBJECT (src, "live source waiting for running state");
+    GST_LIVE_WAIT (src);
+    GST_DEBUG_OBJECT (src, "live source unlocked");
+  }
+
+  if (src->priv->flushing)
+    goto flushing;
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG_OBJECT (src, "we are flushing");
+    return GST_FLOW_FLUSHING;
+  }
+}
+
+
 /**
  * gst_base_src_wait_playing:
  * @src: the src
@@ -480,35 +538,25 @@ gst_base_src_finalize (GObject * object)
  * and call this method before continuing to produce the remaining data.
  *
  * This function will block until a state change to PLAYING happens (in which
- * case this function returns #GST_FLOW_OK) or the processing must be stopped due
+ * case this function returns %GST_FLOW_OK) or the processing must be stopped due
  * to a state change to READY or a FLUSH event (in which case this function
- * returns #GST_FLOW_FLUSHING).
+ * returns %GST_FLOW_FLUSHING).
  *
- * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
+ * Returns: %GST_FLOW_OK if @src is PLAYING and processing can
  * continue. Any other return value should be returned from the create vmethod.
  */
 GstFlowReturn
 gst_base_src_wait_playing (GstBaseSrc * src)
 {
-  g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
+  GstFlowReturn ret;
 
-  do {
-    /* block until the state changes, or we get a flush, or something */
-    GST_DEBUG_OBJECT (src, "live source waiting for running state");
-    GST_LIVE_WAIT (src);
-    GST_DEBUG_OBJECT (src, "live source unlocked");
-    if (src->priv->flushing)
-      goto flushing;
-  } while (G_UNLIKELY (!src->live_running));
+  g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
 
-  return GST_FLOW_OK;
+  GST_LIVE_LOCK (src);
+  ret = gst_base_src_wait_playing_unlocked (src);
+  GST_LIVE_UNLOCK (src);
 
-  /* ERRORS */
-flushing:
-  {
-    GST_DEBUG_OBJECT (src, "we are flushing");
-    return GST_FLOW_FLUSHING;
-  }
+  return ret;
 }
 
 /**
@@ -563,10 +611,10 @@ gst_base_src_is_live (GstBaseSrc * src)
  * @format: the format to use
  *
  * Sets the default format of the source. This will be the format used
- * for sending NEW_SEGMENT events and for performing seeks.
+ * for sending SEGMENT events and for performing seeks.
  *
  * If a format of GST_FORMAT_BYTES is set, the element will be able to
- * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
+ * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns %TRUE.
  *
  * This function must only be called in states < %GST_STATE_PAUSED.
  */
@@ -599,6 +647,32 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
 }
 
 /**
+ * gst_base_src_set_automatic_eos:
+ * @src: base source instance
+ * @automatic_eos: automatic eos
+ *
+ * If @automatic_eos is %TRUE, @src will automatically go EOS if a buffer
+ * after the total size is returned. By default this is %TRUE but sources
+ * that can't return an authoritative size and only know that they're EOS
+ * when trying to read more should set this to %FALSE.
+ *
+ * When @src operates in %GST_FORMAT_TIME, #GstBaseSrc will send an EOS
+ * when a buffer outside of the currently configured segment is pushed if
+ * @automatic_eos is %TRUE. Since 1.16, if @automatic_eos is %FALSE an
+ * EOS will be pushed only when the #GstBaseSrc.create implementation
+ * returns %GST_FLOW_EOS.
+ *
+ * Since: 1.4
+ */
+void
+gst_base_src_set_automatic_eos (GstBaseSrc * src, gboolean automatic_eos)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+
+  g_atomic_int_set (&src->priv->automatic_eos, automatic_eos);
+}
+
+/**
  * gst_base_src_set_async:
  * @src: base source instance
  * @async: new async mode
@@ -648,14 +722,14 @@ gst_base_src_is_async (GstBaseSrc * src)
  * @min_latency: (out) (allow-none): the min latency of the source
  * @max_latency: (out) (allow-none): the max latency of the source
  *
- * Query the source for the latency parameters. @live will be TRUE when @src is
- * configured as a live source. @min_latency will be set to the difference
- * between the running time and the timestamp of the first buffer.
- * @max_latency is always the undefined value of -1.
+ * Query the source for the latency parameters. @live will be %TRUE when @src is
+ * configured as a live source. @min_latency and @max_latency will be set
+ * to the difference between the running time and the timestamp of the first
+ * buffer.
  *
  * This function is mostly used by subclasses.
  *
- * Returns: TRUE if the query succeeded.
+ * Returns: %TRUE if the query succeeded.
  */
 gboolean
 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
@@ -680,11 +754,11 @@ gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
   if (min_latency)
     *min_latency = min;
   if (max_latency)
-    *max_latency = -1;
+    *max_latency = min;
 
   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
-      GST_TIME_ARGS (-1));
+      GST_TIME_ARGS (min));
   GST_OBJECT_UNLOCK (src);
 
   return TRUE;
@@ -747,6 +821,8 @@ gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
 
   GST_OBJECT_LOCK (src);
   src->priv->do_timestamp = timestamp;
+  if (timestamp && src->segment.format != GST_FORMAT_TIME)
+    gst_segment_init (&src->segment, GST_FORMAT_TIME);
   GST_OBJECT_UNLOCK (src);
 }
 
@@ -777,7 +853,7 @@ gst_base_src_get_do_timestamp (GstBaseSrc * src)
  * @src: The source
  * @start: The new start value for the segment
  * @stop: Stop value for the new segment
- * @position: The position value for the new segent
+ * @time: The new time value for the start of the new segment
  *
  * Prepare a new seamless segment for emission downstream. This function must
  * only be called by derived sub-classes, and only from the create() function,
@@ -790,25 +866,28 @@ gst_base_src_get_do_timestamp (GstBaseSrc * src)
  */
 gboolean
 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
-    gint64 position)
+    gint64 time)
 {
   gboolean res = TRUE;
 
-  GST_DEBUG_OBJECT (src,
-      "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
-      GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
-      GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
-
   GST_OBJECT_LOCK (src);
 
   src->segment.base = gst_segment_to_running_time (&src->segment,
       src->segment.format, src->segment.position);
-  src->segment.start = start;
+  src->segment.position = src->segment.start = start;
   src->segment.stop = stop;
-  src->segment.position = position;
+  src->segment.time = time;
 
-  /* forward, we send data from position to stop */
+  /* Mark pending segment. Will be sent before next data */
   src->priv->segment_pending = TRUE;
+  src->priv->segment_seqnum = gst_util_seqnum_next ();
+
+  GST_DEBUG_OBJECT (src,
+      "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
+      GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
+      GST_TIME_ARGS (src->segment.base));
+
   GST_OBJECT_UNLOCK (src);
 
   src->priv->discont = TRUE;
@@ -817,14 +896,26 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
   return res;
 }
 
+/* called with STREAM_LOCK */
 static gboolean
 gst_base_src_send_stream_start (GstBaseSrc * src)
 {
   gboolean ret = TRUE;
 
   if (src->priv->stream_start_pending) {
-    ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
+    gchar *stream_id;
+    GstEvent *event;
+
+    stream_id =
+        gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
+
+    GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
+    event = gst_event_new_stream_start (stream_id);
+    gst_event_set_group_id (event, gst_util_group_id_next ());
+
+    ret = gst_pad_push_event (src->srcpad, event);
     src->priv->stream_start_pending = FALSE;
+    g_free (stream_id);
   }
 
   return ret;
@@ -833,7 +924,7 @@ gst_base_src_send_stream_start (GstBaseSrc * src)
 /**
  * gst_base_src_set_caps:
  * @src: a #GstBaseSrc
- * @caps: a #GstCaps
+ * @caps: (transfer none): a #GstCaps
  *
  * Set new caps on the basesrc source pad.
  *
@@ -844,15 +935,27 @@ gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
 {
   GstBaseSrcClass *bclass;
   gboolean res = TRUE;
+  GstCaps *current_caps;
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
   gst_base_src_send_stream_start (src);
 
-  if (bclass->set_caps)
-    res = bclass->set_caps (src, caps);
-  if (res)
-    res = gst_pad_set_caps (src->srcpad, caps);
+  current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (src));
+  if (current_caps && gst_caps_is_equal (current_caps, caps)) {
+    GST_DEBUG_OBJECT (src, "New caps equal to old ones: %" GST_PTR_FORMAT,
+        caps);
+    res = TRUE;
+  } else {
+    if (bclass->set_caps)
+      res = bclass->set_caps (src, caps);
+
+    if (res)
+      res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
+  }
+
+  if (current_caps)
+    gst_caps_unref (current_caps);
 
   return res;
 }
@@ -964,7 +1067,9 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
           } else
             res = TRUE;
 
-          gst_query_set_position (query, format, position);
+          if (res)
+            gst_query_set_position (query, format, position);
+
           break;
         }
       }
@@ -992,8 +1097,8 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
           guint length = 0;
 
           /* may have to refresh duration */
-          if (g_atomic_int_get (&src->priv->dynamic_size))
-            gst_base_src_update_length (src, 0, &length);
+          gst_base_src_update_length (src, 0, &length,
+              g_atomic_int_get (&src->priv->dynamic_size));
 
           /* this is the duration as configured by the subclass. */
           GST_OBJECT_LOCK (src);
@@ -1017,7 +1122,10 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
              * means that we cannot report the duration at all. */
             res = TRUE;
           }
-          gst_query_set_duration (query, format, duration);
+
+          if (res)
+            gst_query_set_duration (query, format, duration);
+
           break;
         }
       }
@@ -1040,7 +1148,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
             gst_base_src_seekable (src), 0, duration);
         res = TRUE;
       } else {
-        /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
+        /* FIXME 2.0: return TRUE + seekable=FALSE for SEEKING query here */
         /* Don't reply to the query to make up for demuxers which don't
          * handle the SEEKING query yet. Players like Totem will fall back
          * to the duration when the SEEKING query isn't answered. */
@@ -1050,23 +1158,23 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
     }
     case GST_QUERY_SEGMENT:
     {
+      GstFormat format;
       gint64 start, stop;
 
       GST_OBJECT_LOCK (src);
-      /* no end segment configured, current duration then */
+
+      format = src->segment.format;
+
+      start =
+          gst_segment_to_stream_time (&src->segment, format,
+          src->segment.start);
       if ((stop = src->segment.stop) == -1)
         stop = src->segment.duration;
-      start = src->segment.start;
+      else
+        stop = gst_segment_to_stream_time (&src->segment, format, stop);
 
-      /* adjust to stream time */
-      if (src->segment.time != -1) {
-        start -= src->segment.time;
-        if (stop != -1)
-          stop -= src->segment.time;
-      }
+      gst_query_set_segment (query, src->segment.rate, format, start, stop);
 
-      gst_query_set_segment (query, src->segment.rate, src->segment.format,
-          start, stop);
       GST_OBJECT_UNLOCK (src);
       res = TRUE;
       break;
@@ -1193,6 +1301,22 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
         res = FALSE;
       break;
     }
+    case GST_QUERY_URI:{
+      if (GST_IS_URI_HANDLER (src)) {
+        gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
+
+        if (uri != NULL) {
+          gst_query_set_uri (query, uri);
+          g_free (uri);
+          res = TRUE;
+        } else {
+          res = FALSE;
+        }
+      } else {
+        res = FALSE;
+      }
+      break;
+    }
     default:
       res = FALSE;
       break;
@@ -1246,6 +1370,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
+  GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
+
   if (bclass->do_seek)
     result = bclass->do_seek (src, segment);
 
@@ -1265,8 +1391,8 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
    *     seek format, adjust by the relative seek offset and then convert back to
    *     the processing format
    */
-  GstSeekType cur_type, stop_type;
-  gint64 cur, stop;
+  GstSeekType start_type, stop_type;
+  gint64 start, stop;
   GstSeekFlags flags;
   GstFormat seek_format, dest_format;
   gdouble rate;
@@ -1274,25 +1400,25 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
   gboolean res = TRUE;
 
   gst_event_parse_seek (event, &rate, &seek_format, &flags,
-      &cur_type, &cur, &stop_type, &stop);
+      &start_type, &start, &stop_type, &stop);
   dest_format = segment->format;
 
   if (seek_format == dest_format) {
     gst_segment_do_seek (segment, rate, seek_format, flags,
-        cur_type, cur, stop_type, stop, &update);
+        start_type, start, stop_type, stop, &update);
     return TRUE;
   }
 
-  if (cur_type != GST_SEEK_TYPE_NONE) {
-    /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
+  if (start_type != GST_SEEK_TYPE_NONE) {
+    /* FIXME: Handle seek_end by converting the input segment vals */
     res =
-        gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format,
-        &cur);
-    cur_type = GST_SEEK_TYPE_SET;
+        gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
+        &start);
+    start_type = GST_SEEK_TYPE_SET;
   }
 
   if (res && stop_type != GST_SEEK_TYPE_NONE) {
-    /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
+    /* FIXME: Handle seek_end by converting the input segment vals */
     res =
         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
         &stop);
@@ -1300,7 +1426,7 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
   }
 
   /* And finally, configure our output segment in the desired format */
-  gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
+  gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
       stop_type, stop, &update);
 
   if (!res)
@@ -1336,11 +1462,23 @@ gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
 {
   GstFlowReturn ret;
   GstBaseSrcPrivate *priv = src->priv;
+  GstBufferPool *pool = NULL;
+  GstAllocator *allocator = NULL;
+  GstAllocationParams params;
 
+  GST_OBJECT_LOCK (src);
   if (priv->pool) {
-    ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
+    pool = gst_object_ref (priv->pool);
+  } else if (priv->allocator) {
+    allocator = gst_object_ref (priv->allocator);
+  }
+  params = priv->params;
+  GST_OBJECT_UNLOCK (src);
+
+  if (pool) {
+    ret = gst_buffer_pool_acquire_buffer (pool, buffer, NULL);
   } else if (size != -1) {
-    *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
+    *buffer = gst_buffer_new_allocate (allocator, size, &params);
     if (G_UNLIKELY (*buffer == NULL))
       goto alloc_failed;
 
@@ -1350,13 +1488,21 @@ gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
         size);
     goto alloc_failed;
   }
+
+done:
+  if (pool)
+    gst_object_unref (pool);
+  if (allocator)
+    gst_object_unref (allocator);
+
   return ret;
 
   /* ERRORS */
 alloc_failed:
   {
     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
-    return GST_FLOW_ERROR;
+    ret = GST_FLOW_ERROR;
+    goto done;
   }
 }
 
@@ -1471,9 +1617,9 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
   gdouble rate;
   GstFormat seek_format, dest_format;
   GstSeekFlags flags;
-  GstSeekType cur_type, stop_type;
-  gint64 cur, stop;
-  gboolean flush, playing;
+  GstSeekType start_type, stop_type;
+  gint64 start, stop;
+  gboolean flush;
   gboolean update;
   gboolean relative_seek = FALSE;
   gboolean seekseg_configured = FALSE;
@@ -1489,9 +1635,9 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
 
   if (event) {
     gst_event_parse_seek (event, &rate, &seek_format, &flags,
-        &cur_type, &cur, &stop_type, &stop);
+        &start_type, &start, &stop_type, &stop);
 
-    relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
+    relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
         SEEK_TYPE_IS_RELATIVE (stop_type);
 
     if (dest_format != seek_format && !relative_seek) {
@@ -1525,7 +1671,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
 
   /* unblock streaming thread. */
   if (unlock)
-    gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
+    gst_base_src_set_flushing (src, TRUE);
 
   /* grab streaming lock, this should eventually be possible, either
    * because the task is paused, our streaming thread stopped
@@ -1541,7 +1687,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
   }
 
   if (unlock)
-    gst_base_src_set_flushing (src, FALSE, playing, NULL);
+    gst_base_src_set_flushing (src, FALSE);
 
   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
    * copy the current segment info into the temp segment that we can actually
@@ -1564,7 +1710,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
         /* The seek format matches our processing format, no need to ask the
          * the subclass to configure the segment. */
         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
-            cur_type, cur, stop_type, stop, &update);
+            start_type, start, stop_type, stop, &update);
       }
     }
     /* Else, no seek event passed, so we're just (re)starting the
@@ -1614,12 +1760,8 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
       gst_element_post_message (GST_ELEMENT (src), message);
     }
 
-    /* for deriving a stop position for the playback segment from the seek
-     * segment, we must take the duration when the stop is not set */
-    if ((stop = seeksegment.stop) == -1)
-      stop = seeksegment.duration;
-
     src->priv->segment_pending = TRUE;
+    src->priv->segment_seqnum = seqnum;
   }
 
   src->priv->discont = TRUE;
@@ -1651,8 +1793,10 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
 {
   GstBaseSrc *src;
   gboolean result = FALSE;
+  GstBaseSrcClass *bclass;
 
   src = GST_BASE_SRC (element);
+  bclass = GST_BASE_SRC_GET_CLASS (src);
 
   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
 
@@ -1660,62 +1804,112 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
       /* bidirectional events */
     case GST_EVENT_FLUSH_START:
       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
+
       result = gst_pad_push_event (src->srcpad, event);
+      gst_base_src_set_flushing (src, TRUE);
       event = NULL;
       break;
     case GST_EVENT_FLUSH_STOP:
-      GST_LIVE_LOCK (src->srcpad);
-      src->priv->segment_pending = TRUE;
-      /* sending random flushes downstream can break stuff,
-       * especially sync since all segment info will get flushed */
+    {
+      gboolean start;
+
+      GST_PAD_STREAM_LOCK (src->srcpad);
+      gst_base_src_set_flushing (src, FALSE);
+
       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
       result = gst_pad_push_event (src->srcpad, event);
-      GST_LIVE_UNLOCK (src->srcpad);
+
+      /* For external flush, restart the task .. */
+      GST_LIVE_LOCK (src);
+      src->priv->segment_pending = TRUE;
+
+      GST_OBJECT_LOCK (src->srcpad);
+      start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
+      GST_OBJECT_UNLOCK (src->srcpad);
+
+      /* ... and for live sources, only if in playing state */
+      if (src->is_live) {
+        if (!src->live_running)
+          start = FALSE;
+      }
+
+      if (start)
+        gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+            src->srcpad, NULL);
+
+      GST_LIVE_UNLOCK (src);
+      GST_PAD_STREAM_UNLOCK (src->srcpad);
+
       event = NULL;
       break;
+    }
 
       /* downstream serialized events */
     case GST_EVENT_EOS:
     {
-      GstBaseSrcClass *bclass;
-
-      bclass = GST_BASE_SRC_GET_CLASS (src);
+      gboolean push_mode;
 
       /* queue EOS and make sure the task or pull function performs the EOS
        * actions.
        *
-       * We have two possibilities:
+       * For push mode, This will be done in 3 steps. It is required to not
+       * block here as gst_element_send_event() will hold the STATE_LOCK, hence
+       * blocking would prevent asynchronous state change to complete.
        *
-       *  - Before we are to enter the _create function, we check the pending_eos
-       *    first and do EOS instead of entering it.
-       *  - If we are in the _create function or we did not manage to set the
-       *    flag fast enough and we are about to enter the _create function,
-       *    we unlock it so that we exit with FLUSHING immediately. We then
-       *    check the EOS flag and do the EOS logic.
+       * 1. We stop the streaming thread
+       * 2. We set the pending eos
+       * 3. We start the streaming thread again, so it is performed
+       *    asynchronously.
+       *
+       * For pull mode, we simply mark the pending EOS without flushing.
        */
-      g_atomic_int_set (&src->priv->pending_eos, TRUE);
-      GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
 
+      GST_OBJECT_LOCK (src->srcpad);
+      push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
+      GST_OBJECT_UNLOCK (src->srcpad);
 
-      /* unlock the _create function so that we can check the pending_eos flag
-       * and we can do EOS. This will eventually release the LIVE_LOCK again so
-       * that we can grab it and stop the unlock again. We don't take the stream
-       * lock so that this operation is guaranteed to never block. */
-      gst_base_src_activate_pool (src, FALSE);
-      if (bclass->unlock)
-        bclass->unlock (src);
+      if (push_mode) {
+        gst_base_src_set_flushing (src, TRUE);
 
-      GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
+        GST_PAD_STREAM_LOCK (src->srcpad);
+        gst_base_src_set_flushing (src, FALSE);
 
-      GST_LIVE_LOCK (src);
-      GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
-      /* now stop the unlock of the streaming thread again. Grabbing the live
-       * lock is enough because that protects the create function. */
-      if (bclass->unlock_stop)
-        bclass->unlock_stop (src);
-      gst_base_src_activate_pool (src, TRUE);
-      GST_LIVE_UNLOCK (src);
+        GST_OBJECT_LOCK (src);
+        g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+        if (src->priv->pending_eos)
+          gst_event_unref (src->priv->pending_eos);
+        src->priv->pending_eos = event;
+        GST_OBJECT_UNLOCK (src);
+
+        GST_DEBUG_OBJECT (src,
+            "EOS marked, start task for asynchronous handling");
+        gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+            src->srcpad, NULL);
+
+        GST_PAD_STREAM_UNLOCK (src->srcpad);
+      } else {
+        /* In pull mode, we need not to return flushing to downstream, though
+         * the stream lock is not kept after getrange was unblocked */
+        GST_OBJECT_LOCK (src);
+        g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+        if (src->priv->pending_eos)
+          gst_event_unref (src->priv->pending_eos);
+        src->priv->pending_eos = event;
+        GST_OBJECT_UNLOCK (src);
 
+        gst_base_src_set_pool_flushing (src, TRUE);
+        if (bclass->unlock)
+          bclass->unlock (src);
+
+        GST_PAD_STREAM_LOCK (src->srcpad);
+        if (bclass->unlock_stop)
+          bclass->unlock_stop (src);
+        gst_base_src_set_pool_flushing (src, TRUE);
+        GST_PAD_STREAM_UNLOCK (src->srcpad);
+      }
+
+
+      event = NULL;
       result = TRUE;
       break;
     }
@@ -1723,9 +1917,11 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
       /* sending random SEGMENT downstream can break sync. */
       break;
     case GST_EVENT_TAG:
+    case GST_EVENT_SINK_MESSAGE:
     case GST_EVENT_CUSTOM_DOWNSTREAM:
     case GST_EVENT_CUSTOM_BOTH:
-      /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
+    case GST_EVENT_PROTECTION:
+      /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH, PROTECTION in the dataflow */
       GST_OBJECT_LOCK (src);
       src->priv->pending_events =
           g_list_append (src->priv->pending_events, event);
@@ -1856,10 +2052,10 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
     case GST_EVENT_FLUSH_START:
       /* cancel any blocking getrange, is normally called
        * when in pull mode. */
-      result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
+      result = gst_base_src_set_flushing (src, TRUE);
       break;
     case GST_EVENT_FLUSH_STOP:
-      result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
+      result = gst_base_src_set_flushing (src, FALSE);
       break;
     case GST_EVENT_QOS:
     {
@@ -1935,9 +2131,11 @@ gst_base_src_set_property (GObject * object, guint prop_id,
     case PROP_NUM_BUFFERS:
       src->num_buffers = g_value_get_int (value);
       break;
+#ifndef GST_REMOVE_DEPRECATED
     case PROP_TYPEFIND:
       src->typefind = g_value_get_boolean (value);
       break;
+#endif
     case PROP_DO_TIMESTAMP:
       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
       break;
@@ -1962,9 +2160,11 @@ gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_NUM_BUFFERS:
       g_value_set_int (value, src->num_buffers);
       break;
+#ifndef GST_REMOVE_DEPRECATED
     case PROP_TYPEFIND:
       g_value_set_boolean (value, src->typefind);
       break;
+#endif
     case PROP_DO_TIMESTAMP:
       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
       break;
@@ -2102,8 +2302,12 @@ gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
       if (do_timestamp) {
         dts = running_time;
-      } else {
-        dts = 0;
+      } else if (!GST_CLOCK_TIME_IS_VALID (pts)) {
+        if (GST_CLOCK_TIME_IS_VALID (basesrc->segment.start)) {
+          dts = basesrc->segment.start;
+        } else {
+          dts = 0;
+        }
       }
       GST_BUFFER_DTS (buffer) = dts;
 
@@ -2176,29 +2380,31 @@ no_sync:
 
 /* Called with STREAM_LOCK and LIVE_LOCK */
 static gboolean
-gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
+gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
+    gboolean force)
 {
   guint64 size, maxsize;
   GstBaseSrcClass *bclass;
-  GstFormat format;
   gint64 stop;
-  gboolean dynamic;
+
+  /* only operate if we are working with bytes */
+  if (src->segment.format != GST_FORMAT_BYTES)
+    return TRUE;
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
-  format = src->segment.format;
   stop = src->segment.stop;
   /* get total file size */
   size = src->segment.duration;
 
-  /* only operate if we are working with bytes */
-  if (format != GST_FORMAT_BYTES)
-    return TRUE;
-
-  /* the max amount of bytes to read is the total size or
-   * up to the segment.stop if present. */
-  if (stop != -1)
-    maxsize = MIN (size, stop);
+  /* when not doing automatic EOS, just use the stop position. We don't use
+   * the size to check for EOS */
+  if (!g_atomic_int_get (&src->priv->automatic_eos))
+    maxsize = stop;
+  /* Otherwise, the max amount of bytes to read is the total
+   * size or up to the segment.stop if present. */
+  else if (stop != -1)
+    maxsize = size != -1 ? MIN (size, stop) : stop;
   else
     maxsize = size;
 
@@ -2207,39 +2413,41 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
       *length, size, stop, maxsize);
 
-  dynamic = g_atomic_int_get (&src->priv->dynamic_size);
-  GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
-
   /* check size if we have one */
   if (maxsize != -1) {
     /* if we run past the end, check if the file became bigger and
-     * retry. */
-    if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
+     * retry.  Mind wrap when checking. */
+    if (G_UNLIKELY (offset >= maxsize || offset + *length >= maxsize || force)) {
       /* see if length of the file changed */
       if (bclass->get_size)
         if (!bclass->get_size (src, &size))
           size = -1;
 
-      /* make sure we don't exceed the configured segment stop
-       * if it was set */
-      if (stop != -1)
-        maxsize = MIN (size, stop);
+      /* when not doing automatic EOS, just use the stop position. We don't use
+       * the size to check for EOS */
+      if (!g_atomic_int_get (&src->priv->automatic_eos))
+        maxsize = stop;
+      /* Otherwise, the max amount of bytes to read is the total
+       * size or up to the segment.stop if present. */
+      else if (stop != -1)
+        maxsize = size != -1 ? MIN (size, stop) : stop;
       else
         maxsize = size;
 
-      /* if we are at or past the end, EOS */
-      if (G_UNLIKELY (offset >= maxsize))
-        goto unexpected_length;
-
-      /* else we can clip to the end */
-      if (G_UNLIKELY (offset + *length >= maxsize))
-        *length = maxsize - offset;
+      if (maxsize != -1) {
+        /* if we are at or past the end, EOS */
+        if (G_UNLIKELY (offset >= maxsize))
+          goto unexpected_length;
 
+        /* else we can clip to the end */
+        if (G_UNLIKELY (offset + *length >= maxsize))
+          *length = maxsize - offset;
+      }
     }
   }
 
-  /* keep track of current duration.
-   * segment is in bytes, we checked that above. */
+  /* keep track of current duration. segment is in bytes, we checked
+   * that above. */
   GST_OBJECT_LOCK (src);
   src->segment.duration = size;
   GST_OBJECT_UNLOCK (src);
@@ -2249,6 +2457,7 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
   /* ERRORS */
 unexpected_length:
   {
+    GST_DEBUG_OBJECT (src, "processing at or past EOS");
     return FALSE;
   }
 }
@@ -2263,13 +2472,14 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
   GstClockReturn status;
   GstBuffer *res_buf;
   GstBuffer *in_buf;
+  gboolean own_res_buf;
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
 again:
   if (src->is_live) {
     if (G_UNLIKELY (!src->live_running)) {
-      ret = gst_base_src_wait_playing (src);
+      ret = gst_base_src_wait_playing_unlocked (src);
       if (ret != GST_FLOW_OK)
         goto stopped;
     }
@@ -2282,7 +2492,7 @@ again:
   if (G_UNLIKELY (!bclass->create))
     goto no_function;
 
-  if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
+  if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE)))
     goto unexpected_length;
 
   /* track position */
@@ -2300,26 +2510,46 @@ again:
   }
 
   /* don't enter the create function if a pending EOS event was set. For the
-   * logic of the pending_eos, check the event function of this class. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
+   * logic of the has_pending_eos, check the event function of this class. */
+  if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
+    src->priv->forced_eos = TRUE;
     goto eos;
+  }
 
   GST_DEBUG_OBJECT (src,
       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
       G_GINT64_FORMAT, offset, length, src->segment.time);
 
   res_buf = in_buf = *buf;
+  own_res_buf = (*buf == NULL);
 
+  GST_LIVE_UNLOCK (src);
   ret = bclass->create (src, offset, length, &res_buf);
+  GST_LIVE_LOCK (src);
+
+  /* As we released the LIVE_LOCK, the state may have changed */
+  if (src->is_live) {
+    if (G_UNLIKELY (!src->live_running)) {
+      GstFlowReturn wait_ret;
+      wait_ret = gst_base_src_wait_playing_unlocked (src);
+      if (wait_ret != GST_FLOW_OK) {
+        if (ret == GST_FLOW_OK && own_res_buf)
+          gst_buffer_unref (res_buf);
+        ret = wait_ret;
+        goto stopped;
+      }
+    }
+  }
 
   /* The create function could be unlocked because we have a pending EOS. It's
    * possible that we have a valid buffer from create that we need to
    * discard when the create function returned _OK. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
+  if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
     if (ret == GST_FLOW_OK) {
-      if (*buf == NULL)
+      if (own_res_buf)
         gst_buffer_unref (res_buf);
     }
+    src->priv->forced_eos = TRUE;
     goto eos;
   }
 
@@ -2334,7 +2564,9 @@ again:
     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
         "fill the provided buffer, copying");
 
-    gst_buffer_map (in_buf, &info, GST_MAP_WRITE);
+    if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
+      goto map_failed;
+
     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
     gst_buffer_unmap (in_buf, &info);
     gst_buffer_set_size (in_buf, copied_size);
@@ -2345,6 +2577,16 @@ again:
     res_buf = in_buf;
   }
 
+  if (res_buf == NULL) {
+    GstBufferList *pending_list = src->priv->pending_bufferlist;
+
+    if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
+      goto null_buffer;
+
+    res_buf = gst_buffer_list_get_writable (pending_list, 0);
+    own_res_buf = FALSE;
+  }
+
   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
   if (offset == 0 && src->segment.time == 0
       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
@@ -2373,7 +2615,7 @@ again:
       /* this case is triggered when we were waiting for the clock and
        * it got unlocked because we did a state change. In any case, get rid of
        * the buffer. */
-      if (*buf == NULL)
+      if (own_res_buf)
         gst_buffer_unref (res_buf);
 
       if (!src->live_running) {
@@ -2395,7 +2637,7 @@ again:
       GST_ELEMENT_ERROR (src, CORE, CLOCK,
           (_("Internal clock error.")),
           ("clock returned unexpected return value %d", status));
-      if (*buf == NULL)
+      if (own_res_buf)
         gst_buffer_unref (res_buf);
       ret = GST_FLOW_ERROR;
       break;
@@ -2418,6 +2660,15 @@ not_ok:
         gst_flow_get_name (ret));
     return ret;
   }
+map_failed:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
+        (_("Failed to map buffer.")),
+        ("failed to map result buffer in WRITE mode"));
+    if (own_res_buf)
+      gst_buffer_unref (res_buf);
+    return GST_FLOW_ERROR;
+  }
 not_started:
   {
     GST_DEBUG_OBJECT (src, "getrange but not started");
@@ -2442,7 +2693,7 @@ reached_num_buffers:
 flushing:
   {
     GST_DEBUG_OBJECT (src, "we are flushing");
-    if (*buf == NULL)
+    if (own_res_buf)
       gst_buffer_unref (res_buf);
     return GST_FLOW_FLUSHING;
   }
@@ -2451,6 +2702,14 @@ eos:
     GST_DEBUG_OBJECT (src, "we are EOS");
     return GST_FLOW_EOS;
   }
+null_buffer:
+  {
+    GST_ELEMENT_ERROR (src, STREAM, FAILED,
+        (_("Internal data flow error.")),
+        ("Subclass %s neither returned a buffer nor submitted a buffer list "
+            "from its create function", G_OBJECT_TYPE_NAME (src)));
+    return GST_FLOW_ERROR;
+  }
 }
 
 static GstFlowReturn
@@ -2505,6 +2764,7 @@ start_failed:
   }
 }
 
+/* Called with STREAM_LOCK */
 static void
 gst_base_src_loop (GstPad * pad)
 {
@@ -2520,17 +2780,44 @@ gst_base_src_loop (GstPad * pad)
 
   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
 
+  /* Just leave immediately if we're flushing */
+  GST_LIVE_LOCK (src);
+  if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
+    goto flushing;
+  GST_LIVE_UNLOCK (src);
+
+  /* Just return if EOS is pushed again, as the app might be unaware that an
+   * EOS have been sent already */
+  if (GST_PAD_IS_EOS (pad)) {
+    GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task");
+    gst_pad_pause_task (pad);
+    goto done;
+  }
+
   gst_base_src_send_stream_start (src);
 
+  /* The stream-start event could've caused something to flush us */
+  GST_LIVE_LOCK (src);
+  if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
+    goto flushing;
+  GST_LIVE_UNLOCK (src);
+
   /* check if we need to renegotiate */
   if (gst_pad_check_reconfigure (pad)) {
-    if (!gst_base_src_negotiate (src))
-      goto not_negotiated;
+    if (!gst_base_src_negotiate (src)) {
+      gst_pad_mark_reconfigure (pad);
+      if (GST_PAD_IS_FLUSHING (pad)) {
+        GST_LIVE_LOCK (src);
+        goto flushing;
+      } else {
+        goto negotiate_failed;
+      }
+    }
   }
 
   GST_LIVE_LOCK (src);
 
-  if (G_UNLIKELY (src->priv->flushing))
+  if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
 
   blocksize = src->blocksize;
@@ -2555,6 +2842,12 @@ gst_base_src_loop (GstPad * pad)
   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
       GST_TIME_ARGS (position), blocksize);
 
+  /* clean up just in case we got interrupted or so last time round */
+  if (src->priv->pending_bufferlist != NULL) {
+    gst_buffer_list_unref (src->priv->pending_bufferlist);
+    src->priv->pending_bufferlist = NULL;
+  }
+
   ret = gst_base_src_get_range (src, position, blocksize, &buf);
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
@@ -2562,13 +2855,19 @@ gst_base_src_loop (GstPad * pad)
     GST_LIVE_UNLOCK (src);
     goto pause;
   }
-  /* this should not happen */
-  if (G_UNLIKELY (buf == NULL))
-    goto null_buffer;
+
+  /* Note: at this point buf might be a single buf returned which we own or
+   * the first buf of a pending buffer list submitted via submit_buffer_list(),
+   * in which case the buffer is owned by the pending buffer list and not us. */
+  g_assert (buf != NULL);
 
   /* push events to close/start our segment before we push the buffer. */
   if (G_UNLIKELY (src->priv->segment_pending)) {
-    gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
+    GstEvent *seg_event = gst_event_new_segment (&src->segment);
+
+    gst_event_set_seqnum (seg_event, src->priv->segment_seqnum);
+    src->priv->segment_seqnum = gst_util_seqnum_next ();
+    gst_pad_push_event (pad, seg_event);
     src->priv->segment_pending = FALSE;
   }
 
@@ -2638,7 +2937,8 @@ gst_base_src_loop (GstPad * pad)
       /* positive rate, check if we reached the stop */
       if (src->segment.stop != -1) {
         if (position >= src->segment.stop) {
-          eos = TRUE;
+          if (g_atomic_int_get (&src->priv->automatic_eos))
+            eos = TRUE;
           position = src->segment.stop;
         }
       }
@@ -2646,7 +2946,8 @@ gst_base_src_loop (GstPad * pad)
       /* negative rate, check if we reached the start. start is always set to
        * something different from -1 */
       if (position <= src->segment.start) {
-        eos = TRUE;
+        if (g_atomic_int_get (&src->priv->automatic_eos))
+          eos = TRUE;
         position = src->segment.start;
       }
       /* when going reverse, all buffers are DISCONT */
@@ -2665,14 +2966,26 @@ gst_base_src_loop (GstPad * pad)
   }
   GST_LIVE_UNLOCK (src);
 
-  ret = gst_pad_push (pad, buf);
+  /* push buffer or buffer list */
+  if (src->priv->pending_bufferlist != NULL) {
+    ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
+    src->priv->pending_bufferlist = NULL;
+  } else {
+    ret = gst_pad_push (pad, buf);
+  }
+
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
+    if (ret == GST_FLOW_NOT_NEGOTIATED) {
+      goto not_negotiated;
+    }
     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
         gst_flow_get_name (ret));
     goto pause;
   }
 
-  if (G_UNLIKELY (eos)) {
+  /* Segment pending means that a new segment was configured
+   * during this loop run */
+  if (G_UNLIKELY (eos && !src->priv->segment_pending)) {
     GST_INFO_OBJECT (src, "pausing after end of segment");
     ret = GST_FLOW_EOS;
     goto pause;
@@ -2684,7 +2997,16 @@ done:
   /* special cases */
 not_negotiated:
   {
-    GST_DEBUG_OBJECT (src, "Failed to renegotiate");
+    if (gst_pad_needs_reconfigure (pad)) {
+      GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
+      return;
+    }
+    /* fallthrough when push returns NOT_NEGOTIATED and we don't have
+     * a pending negotiation request on our srcpad */
+  }
+negotiate_failed:
+  {
+    GST_DEBUG_OBJECT (src, "Not negotiated");
     ret = GST_FLOW_NOT_NEGOTIATED;
     goto pause;
   }
@@ -2708,12 +3030,19 @@ pause:
       GstFormat format;
       gint64 position;
 
-      /* perform EOS logic */
       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
       format = src->segment.format;
       position = src->segment.position;
 
-      if (flag_segment) {
+      /* perform EOS logic */
+      if (src->priv->forced_eos) {
+        g_assert (g_atomic_int_get (&src->priv->has_pending_eos));
+        GST_OBJECT_LOCK (src);
+        event = src->priv->pending_eos;
+        src->priv->pending_eos = NULL;
+        GST_OBJECT_UNLOCK (src);
+
+      } else if (flag_segment) {
         GstMessage *message;
 
         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
@@ -2722,12 +3051,15 @@ pause:
         gst_element_post_message (GST_ELEMENT_CAST (src), message);
         event = gst_event_new_segment_done (format, position);
         gst_event_set_seqnum (event, src->priv->seqnum);
-        gst_pad_push_event (pad, event);
+
       } else {
         event = gst_event_new_eos ();
         gst_event_set_seqnum (event, src->priv->seqnum);
-        gst_pad_push_event (pad, event);
       }
+
+      gst_pad_push_event (pad, event);
+      src->priv->forced_eos = FALSE;
+
     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
       event = gst_event_new_eos ();
       gst_event_set_seqnum (event, src->priv->seqnum);
@@ -2737,20 +3069,11 @@ pause:
        * due to flushing and posting an error message because of
        * that is the wrong thing to do, e.g. when we're doing
        * a flushing seek. */
-      GST_ELEMENT_ERROR (src, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)", reason, ret));
+      GST_ELEMENT_FLOW_ERROR (src, ret);
       gst_pad_push_event (pad, event);
     }
     goto done;
   }
-null_buffer:
-  {
-    GST_ELEMENT_ERROR (src, STREAM, FAILED,
-        (_("Internal data flow error.")), ("element returned NULL buffer"));
-    GST_LIVE_UNLOCK (src);
-    goto done;
-  }
 }
 
 static gboolean
@@ -2774,6 +3097,11 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
   oldalloc = priv->allocator;
   priv->allocator = allocator;
 
+  if (priv->pool)
+    gst_object_ref (priv->pool);
+  if (priv->allocator)
+    gst_object_ref (priv->allocator);
+
   if (params)
     priv->params = *params;
   else
@@ -2801,12 +3129,11 @@ activate_failed:
   }
 }
 
-static gboolean
-gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
+static void
+gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing)
 {
   GstBaseSrcPrivate *priv = basesrc->priv;
   GstBufferPool *pool;
-  gboolean res = TRUE;
 
   GST_OBJECT_LOCK (basesrc);
   if ((pool = priv->pool))
@@ -2814,10 +3141,9 @@ gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
   GST_OBJECT_UNLOCK (basesrc);
 
   if (pool) {
-    res = gst_buffer_pool_set_active (pool, active);
+    gst_buffer_pool_set_flushing (pool, flushing);
     gst_object_unref (pool);
   }
-  return res;
 }
 
 
@@ -2864,7 +3190,25 @@ gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
     config = gst_buffer_pool_get_config (pool);
     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
     gst_buffer_pool_config_set_allocator (config, allocator, &params);
-    gst_buffer_pool_set_config (pool, config);
+
+    /* buffer pool may have to do some changes */
+    if (!gst_buffer_pool_set_config (pool, config)) {
+      config = gst_buffer_pool_get_config (pool);
+
+      /* If change are not acceptable, fallback to generic pool */
+      if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min,
+              max)) {
+        GST_DEBUG_OBJECT (basesrc, "unsupported pool, making new pool");
+
+        gst_object_unref (pool);
+        pool = gst_buffer_pool_new ();
+        gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
+        gst_buffer_pool_config_set_allocator (config, allocator, &params);
+      }
+
+      if (!gst_buffer_pool_set_config (pool, config))
+        goto config_failed;
+    }
   }
 
   if (update_allocator)
@@ -2880,6 +3224,13 @@ gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
   }
 
   return TRUE;
+
+config_failed:
+  GST_ELEMENT_ERROR (basesrc, RESOURCE, SETTINGS,
+      ("Failed to configure the buffer pool"),
+      ("Configuration is most likely invalid, please report this issue."));
+  gst_object_unref (pool);
+  return FALSE;
 }
 
 static gboolean
@@ -2926,6 +3277,11 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
 
   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
 
+  if (allocator)
+    gst_object_unref (allocator);
+  if (pool)
+    gst_object_unref (pool);
+
   gst_query_unref (query);
 
   return result;
@@ -3052,6 +3408,8 @@ gst_base_src_start (GstBaseSrc * basesrc)
   gboolean result;
 
   GST_LIVE_LOCK (basesrc);
+
+  GST_OBJECT_LOCK (basesrc);
   if (GST_BASE_SRC_IS_STARTING (basesrc))
     goto was_starting;
   if (GST_BASE_SRC_IS_STARTED (basesrc))
@@ -3059,12 +3417,14 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   basesrc->priv->start_result = GST_FLOW_FLUSHING;
   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+  gst_segment_init (&basesrc->segment, basesrc->segment.format);
+  GST_OBJECT_UNLOCK (basesrc);
+
   basesrc->num_buffers_left = basesrc->num_buffers;
   basesrc->running = FALSE;
   basesrc->priv->segment_pending = FALSE;
-  GST_OBJECT_LOCK (basesrc);
-  gst_segment_init (&basesrc->segment, basesrc->segment.format);
-  GST_OBJECT_UNLOCK (basesrc);
+  basesrc->priv->segment_seqnum = gst_util_seqnum_next ();
+  basesrc->priv->forced_eos = FALSE;
   GST_LIVE_UNLOCK (basesrc);
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
@@ -3076,8 +3436,12 @@ gst_base_src_start (GstBaseSrc * basesrc)
   if (!result)
     goto could_not_start;
 
-  if (!gst_base_src_is_async (basesrc))
+  if (!gst_base_src_is_async (basesrc)) {
     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
+    /* not really waiting here, we call this to get the result
+     * from the start_complete call */
+    result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
+  }
 
   return result;
 
@@ -3085,19 +3449,24 @@ gst_base_src_start (GstBaseSrc * basesrc)
 was_starting:
   {
     GST_DEBUG_OBJECT (basesrc, "was starting");
+    GST_OBJECT_UNLOCK (basesrc);
     GST_LIVE_UNLOCK (basesrc);
     return TRUE;
   }
 was_started:
   {
     GST_DEBUG_OBJECT (basesrc, "was started");
+    GST_OBJECT_UNLOCK (basesrc);
     GST_LIVE_UNLOCK (basesrc);
     return TRUE;
   }
 could_not_start:
   {
     GST_DEBUG_OBJECT (basesrc, "could not start");
-    /* subclass is supposed to post a message. We don't have to call _stop. */
+    /* subclass is supposed to post a message but we post one as a fallback
+     * just in case. We don't have to call _stop. */
+    GST_ELEMENT_ERROR (basesrc, CORE, STATE_CHANGE, (NULL),
+        ("Failed to start"));
     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
     return FALSE;
   }
@@ -3160,10 +3529,6 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
 
   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
 
-  /* stop flushing now but for live sources, still block in the LIVE lock when
-   * we are not yet PLAYING */
-  gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
-
   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
 
   GST_OBJECT_LOCK (basesrc->srcpad);
@@ -3173,32 +3538,38 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
   /* take the stream lock here, we only want to let the task run when we have
    * set the STARTED flag */
   GST_PAD_STREAM_LOCK (basesrc->srcpad);
-  if (mode == GST_PAD_MODE_PUSH) {
-    /* do initial seek, which will start the task */
-    GST_OBJECT_LOCK (basesrc);
-    event = basesrc->pending_seek;
-    basesrc->pending_seek = NULL;
-    GST_OBJECT_UNLOCK (basesrc);
-
-    /* The perform seek code will start the task when finished. We don't have to
-     * unlock the streaming thread because it is not running yet */
-    if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
-      goto seek_failed;
-
-    if (event)
-      gst_event_unref (event);
-  } else {
-    /* if not random_access, we cannot operate in pull mode for now */
-    if (G_UNLIKELY (!basesrc->random_access))
-      goto no_get_range;
+  switch (mode) {
+    case GST_PAD_MODE_PUSH:
+      /* do initial seek, which will start the task */
+      GST_OBJECT_LOCK (basesrc);
+      event = basesrc->pending_seek;
+      basesrc->pending_seek = NULL;
+      GST_OBJECT_UNLOCK (basesrc);
+
+      /* The perform seek code will start the task when finished. We don't have to
+       * unlock the streaming thread because it is not running yet */
+      if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
+        goto seek_failed;
+
+      if (event)
+        gst_event_unref (event);
+      break;
+    case GST_PAD_MODE_PULL:
+      /* if not random_access, we cannot operate in pull mode for now */
+      if (G_UNLIKELY (!basesrc->random_access))
+        goto no_get_range;
+      break;
+    default:
+      goto not_activated_yet;
+      break;
   }
 
-  GST_LIVE_LOCK (basesrc);
+  GST_OBJECT_LOCK (basesrc);
   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
   basesrc->priv->start_result = ret;
-  GST_LIVE_SIGNAL (basesrc);
-  GST_LIVE_UNLOCK (basesrc);
+  GST_ASYNC_SIGNAL (basesrc);
+  GST_OBJECT_UNLOCK (basesrc);
 
   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
 
@@ -3208,7 +3579,7 @@ seek_failed:
   {
     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
+    gst_base_src_stop (basesrc);
     if (event)
       gst_event_unref (event);
     ret = GST_FLOW_ERROR;
@@ -3217,18 +3588,26 @@ seek_failed:
 no_get_range:
   {
     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
+    gst_base_src_stop (basesrc);
     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
     ret = GST_FLOW_ERROR;
     goto error;
   }
+not_activated_yet:
+  {
+    GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
+    gst_base_src_stop (basesrc);
+    GST_WARNING_OBJECT (basesrc, "pad not activated yet");
+    ret = GST_FLOW_ERROR;
+    goto error;
+  }
 error:
   {
-    GST_LIVE_LOCK (basesrc);
+    GST_OBJECT_LOCK (basesrc);
     basesrc->priv->start_result = ret;
     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
-    GST_LIVE_SIGNAL (basesrc);
-    GST_LIVE_UNLOCK (basesrc);
+    GST_ASYNC_SIGNAL (basesrc);
+    GST_OBJECT_UNLOCK (basesrc);
     return;
   }
 }
@@ -3246,27 +3625,16 @@ gst_base_src_start_wait (GstBaseSrc * basesrc)
 {
   GstFlowReturn result;
 
-  GST_LIVE_LOCK (basesrc);
-  if (G_UNLIKELY (basesrc->priv->flushing))
-    goto flushing;
-
+  GST_OBJECT_LOCK (basesrc);
   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
-    GST_LIVE_WAIT (basesrc);
-    if (G_UNLIKELY (basesrc->priv->flushing))
-      goto flushing;
+    GST_ASYNC_WAIT (basesrc);
   }
   result = basesrc->priv->start_result;
-  GST_LIVE_UNLOCK (basesrc);
+  GST_OBJECT_UNLOCK (basesrc);
 
-  return result;
+  GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
 
-  /* ERRORS */
-flushing:
-  {
-    GST_DEBUG_OBJECT (basesrc, "we are flushing");
-    GST_LIVE_UNLOCK (basesrc);
-    return GST_FLOW_FLUSHING;
-  }
+  return result;
 }
 
 static gboolean
@@ -3278,32 +3646,40 @@ gst_base_src_stop (GstBaseSrc * basesrc)
   GST_DEBUG_OBJECT (basesrc, "stopping source");
 
   /* flush all */
-  gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
+  gst_base_src_set_flushing (basesrc, TRUE);
+
   /* stop the task */
   gst_pad_stop_task (basesrc->srcpad);
+  /* stop flushing, this will balance unlock/unlock_stop calls */
+  gst_base_src_set_flushing (basesrc, FALSE);
 
-  GST_LIVE_LOCK (basesrc);
+  GST_OBJECT_LOCK (basesrc);
   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
     goto was_stopped;
 
   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
   basesrc->priv->start_result = GST_FLOW_FLUSHING;
-  GST_LIVE_SIGNAL (basesrc);
-  GST_LIVE_UNLOCK (basesrc);
+  GST_ASYNC_SIGNAL (basesrc);
+  GST_OBJECT_UNLOCK (basesrc);
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
   if (bclass->stop)
     result = bclass->stop (basesrc);
 
+  if (basesrc->priv->pending_bufferlist != NULL) {
+    gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
+    basesrc->priv->pending_bufferlist = NULL;
+  }
+
   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
 
   return result;
 
 was_stopped:
   {
-    GST_DEBUG_OBJECT (basesrc, "was started");
-    GST_LIVE_UNLOCK (basesrc);
+    GST_DEBUG_OBJECT (basesrc, "was stopped");
+    GST_OBJECT_UNLOCK (basesrc);
     return TRUE;
   }
 }
@@ -3311,49 +3687,39 @@ was_stopped:
 /* start or stop flushing dataprocessing
  */
 static gboolean
-gst_base_src_set_flushing (GstBaseSrc * basesrc,
-    gboolean flushing, gboolean live_play, gboolean * playing)
+gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
 {
   GstBaseSrcClass *bclass;
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
 
-  GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
+  GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
 
   if (flushing) {
-    gst_base_src_activate_pool (basesrc, FALSE);
-    /* unlock any subclasses, we need to do this before grabbing the
-     * LIVE_LOCK since we hold this lock before going into ::create. We pass an
-     * unlock to the params because of backwards compat (see seek handler)*/
+    gst_base_src_set_pool_flushing (basesrc, TRUE);
+    /* unlock any subclasses to allow turning off the streaming thread */
     if (bclass->unlock)
       bclass->unlock (basesrc);
   }
 
-  /* the live lock is released when we are blocked, waiting for playing or
-   * when we sync to the clock. */
+  /* the live lock is released when we are blocked, waiting for playing,
+   * when we sync to the clock or creating a buffer */
   GST_LIVE_LOCK (basesrc);
-  if (playing)
-    *playing = basesrc->live_running;
   basesrc->priv->flushing = flushing;
   if (flushing) {
-    /* if we are locked in the live lock, signal it to make it flush */
-    basesrc->live_running = TRUE;
-
     /* clear pending EOS if any */
-    g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
-
-    /* step 1, now that we have the LIVE lock, clear our unlock request */
-    if (bclass->unlock_stop)
-      bclass->unlock_stop (basesrc);
+    if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
+      GST_OBJECT_LOCK (basesrc);
+      CLEAR_PENDING_EOS (basesrc);
+      basesrc->priv->forced_eos = FALSE;
+      GST_OBJECT_UNLOCK (basesrc);
+    }
 
-    /* step 2, unblock clock sync (if any) or any other blocking thing */
+    /* unblock clock sync (if any) or any other blocking thing */
     if (basesrc->clock_id)
       gst_clock_id_unschedule (basesrc->clock_id);
   } else {
-    /* signal the live source that it can start playing */
-    basesrc->live_running = live_play;
-
-    gst_base_src_activate_pool (basesrc, TRUE);
+    gst_base_src_set_pool_flushing (basesrc, FALSE);
 
     /* Drop all delayed events */
     GST_OBJECT_LOCK (basesrc);
@@ -3366,9 +3732,18 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
     }
     GST_OBJECT_UNLOCK (basesrc);
   }
+
   GST_LIVE_SIGNAL (basesrc);
   GST_LIVE_UNLOCK (basesrc);
 
+  if (!flushing) {
+    /* Now wait for the stream lock to be released and clear our unlock request */
+    GST_PAD_STREAM_LOCK (basesrc->srcpad);
+    if (bclass->unlock_stop)
+      bclass->unlock_stop (basesrc);
+    GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
+  }
+
   return TRUE;
 }
 
@@ -3377,17 +3752,6 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
 static gboolean
 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
 {
-  GstBaseSrcClass *bclass;
-
-  bclass = GST_BASE_SRC_GET_CLASS (basesrc);
-
-  /* unlock subclasses locked in ::create, we only do this when we stop playing. */
-  if (!live_play) {
-    GST_DEBUG_OBJECT (basesrc, "unlock");
-    if (bclass->unlock)
-      bclass->unlock (basesrc);
-  }
-
   /* we are now able to grab the LIVE lock, when we get it, we can be
    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
    * for the clock. */
@@ -3405,13 +3769,10 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
   if (live_play) {
     gboolean start;
 
-    /* clear our unlock request when going to PLAYING */
-    GST_DEBUG_OBJECT (basesrc, "unlock stop");
-    if (bclass->unlock_stop)
-      bclass->unlock_stop (basesrc);
-
     /* for live sources we restart the timestamp correction */
+    GST_OBJECT_LOCK (basesrc);
     basesrc->priv->latency = -1;
+    GST_OBJECT_UNLOCK (basesrc);
     /* have to restart the task in case it stopped because of the unlock when
      * we went to PAUSED. Only do this if we operating in push mode. */
     GST_OBJECT_LOCK (basesrc->srcpad);
@@ -3507,12 +3868,18 @@ gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
     GstPadMode mode, gboolean active)
 {
   gboolean res;
+  GstBaseSrc *src = GST_BASE_SRC (parent);
+
+  src->priv->stream_start_pending = FALSE;
+
+  GST_DEBUG_OBJECT (pad, "activating in mode %d", mode);
 
   switch (mode) {
     case GST_PAD_MODE_PULL:
       res = gst_base_src_activate_pull (pad, parent, active);
       break;
     case GST_PAD_MODE_PUSH:
+      src->priv->stream_start_pending = active;
       res = gst_base_src_activate_push (pad, parent, active);
       break;
     default:
@@ -3537,7 +3904,6 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      basesrc->priv->stream_start_pending = TRUE;
       no_preroll = gst_base_src_is_live (basesrc);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -3560,7 +3926,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
       if (gst_base_src_is_live (basesrc)) {
-        /* make sure we block in the live lock in PAUSED */
+        /* make sure we block in the live cond in PAUSED */
         gst_base_src_set_playing (basesrc, FALSE);
         no_preroll = TRUE;
       }
@@ -3569,9 +3935,12 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
     {
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
-      g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+      if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
+        GST_OBJECT_LOCK (basesrc);
+        CLEAR_PENDING_EOS (basesrc);
+        GST_OBJECT_UNLOCK (basesrc);
+      }
       gst_event_replace (&basesrc->pending_seek, NULL);
-      basesrc->priv->stream_start_pending = FALSE;
       break;
     }
     case GST_STATE_CHANGE_READY_TO_NULL:
@@ -3592,3 +3961,92 @@ failure:
     return result;
   }
 }
+
+/**
+ * gst_base_src_get_buffer_pool:
+ * @src: a #GstBaseSrc
+ *
+ * Returns: (transfer full): the instance of the #GstBufferPool used
+ * by the src; unref it after usage.
+ */
+GstBufferPool *
+gst_base_src_get_buffer_pool (GstBaseSrc * src)
+{
+  GstBufferPool *ret = NULL;
+
+  g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
+
+  GST_OBJECT_LOCK (src);
+  if (src->priv->pool)
+    ret = gst_object_ref (src->priv->pool);
+  GST_OBJECT_UNLOCK (src);
+
+  return ret;
+}
+
+/**
+ * gst_base_src_get_allocator:
+ * @src: a #GstBaseSrc
+ * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
+ * used
+ * @params: (out) (allow-none) (transfer full): the
+ * #GstAllocationParams of @allocator
+ *
+ * Lets #GstBaseSrc sub-classes to know the memory @allocator
+ * used by the base class and its @params.
+ *
+ * Unref the @allocator after usage.
+ */
+void
+gst_base_src_get_allocator (GstBaseSrc * src,
+    GstAllocator ** allocator, GstAllocationParams * params)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+
+  GST_OBJECT_LOCK (src);
+  if (allocator)
+    *allocator = src->priv->allocator ?
+        gst_object_ref (src->priv->allocator) : NULL;
+
+  if (params)
+    *params = src->priv->params;
+  GST_OBJECT_UNLOCK (src);
+}
+
+/**
+ * gst_base_src_submit_buffer_list:
+ * @src: a #GstBaseSrc
+ * @buffer_list: (transfer full): a #GstBufferList
+ *
+ * Subclasses can call this from their create virtual method implementation
+ * to submit a buffer list to be pushed out later. This is useful in
+ * cases where the create function wants to produce multiple buffers to be
+ * pushed out in one go in form of a #GstBufferList, which can reduce overhead
+ * drastically, especially for packetised inputs (for data streams where
+ * the packetisation/chunking is not important it is usually more efficient
+ * to return larger buffers instead).
+ *
+ * Subclasses that use this function from their create function must return
+ * %GST_FLOW_OK and no buffer from their create virtual method implementation.
+ * If a buffer is returned after a buffer list has also been submitted via this
+ * function the behaviour is undefined.
+ *
+ * Subclasses must only call this function once per create function call and
+ * subclasses must only call this function when the source operates in push
+ * mode.
+ *
+ * Since: 1.14
+ */
+void
+gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+  g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
+  g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
+
+  /* we need it to be writable later in get_range() where we use get_writable */
+  src->priv->pending_bufferlist = gst_buffer_list_make_writable (buffer_list);
+
+  GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
+      gst_buffer_list_length (buffer_list));
+}