Move dataurisrc element from -bad
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
index b90b942..7a3e211 100644 (file)
@@ -25,7 +25,7 @@
  * @short_description: Base class for getrange based source elements
  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
  *
- * This is a generice base class for source elements. The following
+ * This is a generic base class for source elements. The following
  * types of sources are supported:
  * <itemizedlist>
  *   <listitem><para>random access sources like files</para></listitem>
  *
  * The source can be configured to operate in any #GstFormat with the
  * gst_base_src_set_format() method. The currently set format determines
- * the format of the internal #GstSegment and any #GST_EVENT_SEGMENT
- * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
+ * the format of the internal #GstSegment and any %GST_EVENT_SEGMENT
+ * events. The default format for #GstBaseSrc is %GST_FORMAT_BYTES.
  *
  * #GstBaseSrc always supports push mode scheduling. If the following
  * conditions are met, it also supports pull mode scheduling:
  * <itemizedlist>
- *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
+ *   <listitem><para>The format is set to %GST_FORMAT_BYTES (default).</para>
  *   </listitem>
  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
  *   </listitem>
@@ -50,7 +50,7 @@
  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
  * automatically seekable in push mode as well. The following conditions must
  * be met to make the element seekable in push mode when the format is not
- * #GST_FORMAT_BYTES:
+ * %GST_FORMAT_BYTES:
  * <itemizedlist>
  *   <listitem><para>
  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
@@ -82,7 +82,7 @@
  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
  * PLAYING. To signal the pipeline that the element will not produce data, the
  * return value from the READY to PAUSED state will be
- * #GST_STATE_CHANGE_NO_PREROLL.
+ * %GST_STATE_CHANGE_NO_PREROLL.
  *
  * A typical live source will timestamp the buffers it creates with the
  * current running time of the pipeline. This is one reason why a live source
  * There is only support in #GstBaseSrc for exactly one source pad, which
  * should be named "src". A source implementation (subclass of #GstBaseSrc)
  * should install a pad template in its class_init function, like so:
- * |[
+ * |[<!-- language="C" -->
  * static void
  * my_element_class_init (GstMyElementClass *klass)
  * {
  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
  *   // srctemplate should be a #GstStaticPadTemplate with direction
- *   // #GST_PAD_SRC and name "src"
- *   gst_element_class_add_pad_template (gstelement_class,
- *       gst_static_pad_template_get (&amp;srctemplate));
+ *   // %GST_PAD_SRC and name "src"
+ *   gst_element_class_add_static_pad_template (gstelement_class, &amp;srctemplate);
  *
  *   gst_element_class_set_static_metadata (gstelement_class,
  *      "Source name",
  *
  * An application may send an EOS event to a source element to make it
  * perform the EOS logic (send EOS event downstream or post a
- * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
+ * %GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
  * with the gst_element_send_event() function on the element or its parent bin.
  *
  * After the EOS has been sent to the element, the application should wait for
  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
  * received, it may safely shut down the entire pipeline.
- *
- * Last reviewed on 2007-12-19 (0.10.16)
  * </para>
  * </refsect2>
  */
@@ -188,6 +185,12 @@ GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
 
+#define CLEAR_PENDING_EOS(bsrc) \
+  G_STMT_START { \
+    g_atomic_int_set (&bsrc->priv->has_pending_eos, FALSE); \
+    gst_event_replace (&bsrc->priv->pending_eos, NULL); \
+  } G_STMT_END
+
 
 /* BaseSrc signals and args */
 enum
@@ -224,11 +227,17 @@ struct _GstBaseSrcPrivate
   /* if a stream-start event should be sent */
   gboolean stream_start_pending;
 
-  /* if segment should be sent */
+  /* if segment should be sent and a
+   * seqnum if it was originated by a seek */
   gboolean segment_pending;
+  guint32 segment_seqnum;
 
   /* if EOS is pending (atomic) */
-  gint pending_eos;
+  GstEvent *pending_eos;
+  gint has_pending_eos;
+
+  /* if the eos was caused by a forced eos from the application */
+  gboolean forced_eos;
 
   /* startup latency is the time it takes between going to PLAYING and producing
    * the first BUFFER with running_time 0. This value is included in the latency
@@ -240,6 +249,7 @@ struct _GstBaseSrcPrivate
 
   gboolean do_timestamp;
   volatile gint dynamic_size;
+  volatile gint automatic_eos;
 
   /* stream sequence number */
   guint32 seqnum;
@@ -423,6 +433,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   g_cond_init (&basesrc->live_cond);
   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
   basesrc->num_buffers_left = -1;
+  basesrc->priv->automatic_eos = TRUE;
 
   basesrc->can_activate_push = TRUE;
 
@@ -494,11 +505,11 @@ gst_base_src_finalize (GObject * object)
  * and call this method before continuing to produce the remaining data.
  *
  * This function will block until a state change to PLAYING happens (in which
- * case this function returns #GST_FLOW_OK) or the processing must be stopped due
+ * case this function returns %GST_FLOW_OK) or the processing must be stopped due
  * to a state change to READY or a FLUSH event (in which case this function
- * returns #GST_FLOW_FLUSHING).
+ * returns %GST_FLOW_FLUSHING).
  *
- * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
+ * Returns: %GST_FLOW_OK if @src is PLAYING and processing can
  * continue. Any other return value should be returned from the create vmethod.
  */
 GstFlowReturn
@@ -580,7 +591,7 @@ gst_base_src_is_live (GstBaseSrc * src)
  * for sending SEGMENT events and for performing seeks.
  *
  * If a format of GST_FORMAT_BYTES is set, the element will be able to
- * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
+ * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns %TRUE.
  *
  * This function must only be called in states < %GST_STATE_PAUSED.
  */
@@ -613,6 +624,26 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
 }
 
 /**
+ * gst_base_src_set_automatic_eos:
+ * @src: base source instance
+ * @automatic_eos: automatic eos
+ *
+ * If @automatic_eos is %TRUE, @src will automatically go EOS if a buffer
+ * after the total size is returned. By default this is %TRUE but sources
+ * that can't return an authoritative size and only know that they're EOS
+ * when trying to read more should set this to %FALSE.
+ *
+ * Since: 1.4
+ */
+void
+gst_base_src_set_automatic_eos (GstBaseSrc * src, gboolean automatic_eos)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+
+  g_atomic_int_set (&src->priv->automatic_eos, automatic_eos);
+}
+
+/**
  * gst_base_src_set_async:
  * @src: base source instance
  * @async: new async mode
@@ -662,14 +693,14 @@ gst_base_src_is_async (GstBaseSrc * src)
  * @min_latency: (out) (allow-none): the min latency of the source
  * @max_latency: (out) (allow-none): the max latency of the source
  *
- * Query the source for the latency parameters. @live will be TRUE when @src is
- * configured as a live source. @min_latency will be set to the difference
- * between the running time and the timestamp of the first buffer.
- * @max_latency is always the undefined value of -1.
+ * Query the source for the latency parameters. @live will be %TRUE when @src is
+ * configured as a live source. @min_latency and @max_latency will be set
+ * to the difference between the running time and the timestamp of the first
+ * buffer.
  *
  * This function is mostly used by subclasses.
  *
- * Returns: TRUE if the query succeeded.
+ * Returns: %TRUE if the query succeeded.
  */
 gboolean
 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
@@ -694,11 +725,11 @@ gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
   if (min_latency)
     *min_latency = min;
   if (max_latency)
-    *max_latency = -1;
+    *max_latency = min;
 
   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
-      GST_TIME_ARGS (-1));
+      GST_TIME_ARGS (min));
   GST_OBJECT_UNLOCK (src);
 
   return TRUE;
@@ -761,6 +792,8 @@ gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
 
   GST_OBJECT_LOCK (src);
   src->priv->do_timestamp = timestamp;
+  if (timestamp && src->segment.format != GST_FORMAT_TIME)
+    gst_segment_init (&src->segment, GST_FORMAT_TIME);
   GST_OBJECT_UNLOCK (src);
 }
 
@@ -791,7 +824,7 @@ gst_base_src_get_do_timestamp (GstBaseSrc * src)
  * @src: The source
  * @start: The new start value for the segment
  * @stop: Stop value for the new segment
- * @time: The new time value for the start of the new segent
+ * @time: The new time value for the start of the new segment
  *
  * Prepare a new seamless segment for emission downstream. This function must
  * only be called by derived sub-classes, and only from the create() function,
@@ -818,6 +851,7 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
 
   /* Mark pending segment. Will be sent before next data */
   src->priv->segment_pending = TRUE;
+  src->priv->segment_seqnum = gst_util_seqnum_next ();
 
   GST_DEBUG_OBJECT (src,
       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
@@ -860,7 +894,7 @@ gst_base_src_send_stream_start (GstBaseSrc * src)
 /**
  * gst_base_src_set_caps:
  * @src: a #GstBaseSrc
- * @caps: a #GstCaps
+ * @caps: (transfer none): a #GstCaps
  *
  * Set new caps on the basesrc source pad.
  *
@@ -871,16 +905,27 @@ gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
 {
   GstBaseSrcClass *bclass;
   gboolean res = TRUE;
+  GstCaps *current_caps;
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
   gst_base_src_send_stream_start (src);
 
-  if (bclass->set_caps)
-    res = bclass->set_caps (src, caps);
+  current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (src));
+  if (current_caps && gst_caps_is_equal (current_caps, caps)) {
+    GST_DEBUG_OBJECT (src, "New caps equal to old ones: %" GST_PTR_FORMAT,
+        caps);
+    res = TRUE;
+  } else {
+    if (bclass->set_caps)
+      res = bclass->set_caps (src, caps);
 
-  if (res)
-    res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
+    if (res)
+      res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
+  }
+
+  if (current_caps)
+    gst_caps_unref (current_caps);
 
   return res;
 }
@@ -992,7 +1037,9 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
           } else
             res = TRUE;
 
-          gst_query_set_position (query, format, position);
+          if (res)
+            gst_query_set_position (query, format, position);
+
           break;
         }
       }
@@ -1045,7 +1092,10 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
              * means that we cannot report the duration at all. */
             res = TRUE;
           }
-          gst_query_set_duration (query, format, duration);
+
+          if (res)
+            gst_query_set_duration (query, format, duration);
+
           break;
         }
       }
@@ -1068,7 +1118,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
             gst_base_src_seekable (src), 0, duration);
         res = TRUE;
       } else {
-        /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
+        /* FIXME 2.0: return TRUE + seekable=FALSE for SEEKING query here */
         /* Don't reply to the query to make up for demuxers which don't
          * handle the SEEKING query yet. Players like Totem will fall back
          * to the duration when the SEEKING query isn't answered. */
@@ -1078,23 +1128,23 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
     }
     case GST_QUERY_SEGMENT:
     {
+      GstFormat format;
       gint64 start, stop;
 
       GST_OBJECT_LOCK (src);
-      /* no end segment configured, current duration then */
+
+      format = src->segment.format;
+
+      start =
+          gst_segment_to_stream_time (&src->segment, format,
+          src->segment.start);
       if ((stop = src->segment.stop) == -1)
         stop = src->segment.duration;
-      start = src->segment.start;
+      else
+        stop = gst_segment_to_stream_time (&src->segment, format, stop);
 
-      /* adjust to stream time */
-      if (src->segment.time != -1) {
-        start -= src->segment.time;
-        if (stop != -1)
-          stop -= src->segment.time;
-      }
+      gst_query_set_segment (query, src->segment.rate, format, start, stop);
 
-      gst_query_set_segment (query, src->segment.rate, src->segment.format,
-          start, stop);
       GST_OBJECT_UNLOCK (src);
       res = TRUE;
       break;
@@ -1660,13 +1710,8 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
       gst_element_post_message (GST_ELEMENT (src), message);
     }
 
-    /* for deriving a stop position for the playback segment from the seek
-     * segment, we must take the duration when the stop is not set */
-    /* FIXME: This is never used below */
-    if ((stop = seeksegment.stop) == -1)
-      stop = seeksegment.duration;
-
     src->priv->segment_pending = TRUE;
+    src->priv->segment_seqnum = seqnum;
   }
 
   src->priv->discont = TRUE;
@@ -1698,8 +1743,10 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
 {
   GstBaseSrc *src;
   gboolean result = FALSE;
+  GstBaseSrcClass *bclass;
 
   src = GST_BASE_SRC (element);
+  bclass = GST_BASE_SRC_GET_CLASS (src);
 
   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
 
@@ -1708,43 +1755,89 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
     case GST_EVENT_FLUSH_START:
       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
       result = gst_pad_push_event (src->srcpad, event);
+      /* also unblock the create function */
+      gst_base_src_activate_pool (src, FALSE);
+      /* unlock any subclasses, we need to do this before grabbing the
+       * LIVE_LOCK since we hold this lock before going into ::create. We pass an
+       * unlock to the params because of backwards compat (see seek handler)*/
+      if (bclass->unlock)
+        bclass->unlock (src);
+
+      /* the live lock is released when we are blocked, waiting for playing or
+       * when we sync to the clock. */
+      GST_LIVE_LOCK (src);
+      src->priv->flushing = TRUE;
+      /* clear pending EOS if any */
+      if (g_atomic_int_get (&src->priv->has_pending_eos)) {
+        GST_OBJECT_LOCK (src);
+        CLEAR_PENDING_EOS (src);
+        src->priv->forced_eos = FALSE;
+        GST_OBJECT_UNLOCK (src);
+      }
+      if (bclass->unlock_stop)
+        bclass->unlock_stop (src);
+      if (src->clock_id)
+        gst_clock_id_unschedule (src->clock_id);
+      GST_DEBUG_OBJECT (src, "signal");
+      GST_LIVE_SIGNAL (src);
+      GST_LIVE_UNLOCK (src);
       event = NULL;
       break;
     case GST_EVENT_FLUSH_STOP:
+    {
+      gboolean start;
+
       GST_LIVE_LOCK (src);
       src->priv->segment_pending = TRUE;
-      /* sending random flushes downstream can break stuff,
-       * especially sync since all segment info will get flushed */
+      src->priv->flushing = FALSE;
       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
       result = gst_pad_push_event (src->srcpad, event);
+
+      gst_base_src_activate_pool (src, TRUE);
+
+      GST_OBJECT_LOCK (src->srcpad);
+      start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
+      GST_OBJECT_UNLOCK (src->srcpad);
+
+      if (src->is_live) {
+        if (!src->live_running)
+          start = FALSE;
+      }
+
+      if (start)
+        gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+            src->srcpad, NULL);
       GST_LIVE_UNLOCK (src);
       event = NULL;
       break;
+    }
 
       /* downstream serialized events */
     case GST_EVENT_EOS:
     {
-      GstBaseSrcClass *bclass;
-
-      bclass = GST_BASE_SRC_GET_CLASS (src);
-
       /* queue EOS and make sure the task or pull function performs the EOS
        * actions.
        *
        * We have two possibilities:
        *
-       *  - Before we are to enter the _create function, we check the pending_eos
+       *  - Before we are to enter the _create function, we check the has_pending_eos
        *    first and do EOS instead of entering it.
        *  - If we are in the _create function or we did not manage to set the
        *    flag fast enough and we are about to enter the _create function,
        *    we unlock it so that we exit with FLUSHING immediately. We then
        *    check the EOS flag and do the EOS logic.
        */
-      g_atomic_int_set (&src->priv->pending_eos, TRUE);
-      GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
+      GST_OBJECT_LOCK (src);
+      g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+      if (src->priv->pending_eos)
+        gst_event_unref (src->priv->pending_eos);
+      src->priv->pending_eos = event;
+      event = NULL;
+      GST_OBJECT_UNLOCK (src);
 
+      GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
 
-      /* unlock the _create function so that we can check the pending_eos flag
+      /* unlock the _create function so that we can check the has_pending_eos flag
        * and we can do EOS. This will eventually release the LIVE_LOCK again so
        * that we can grab it and stop the unlock again. We don't take the stream
        * lock so that this operation is guaranteed to never block. */
@@ -1772,7 +1865,8 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
     case GST_EVENT_TAG:
     case GST_EVENT_CUSTOM_DOWNSTREAM:
     case GST_EVENT_CUSTOM_BOTH:
-      /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
+    case GST_EVENT_PROTECTION:
+      /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH, PROTECTION in the dataflow */
       GST_OBJECT_LOCK (src);
       src->priv->pending_events =
           g_list_append (src->priv->pending_events, event);
@@ -2149,8 +2243,12 @@ gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
       if (do_timestamp) {
         dts = running_time;
-      } else {
-        dts = 0;
+      } else if (!GST_CLOCK_TIME_IS_VALID (pts)) {
+        if (GST_CLOCK_TIME_IS_VALID (basesrc->segment.start)) {
+          dts = basesrc->segment.start;
+        } else {
+          dts = 0;
+        }
       }
       GST_BUFFER_DTS (buffer) = dts;
 
@@ -2228,24 +2326,26 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
 {
   guint64 size, maxsize;
   GstBaseSrcClass *bclass;
-  GstFormat format;
   gint64 stop;
 
+  /* only operate if we are working with bytes */
+  if (src->segment.format != GST_FORMAT_BYTES)
+    return TRUE;
+
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
-  format = src->segment.format;
   stop = src->segment.stop;
   /* get total file size */
   size = src->segment.duration;
 
-  /* only operate if we are working with bytes */
-  if (format != GST_FORMAT_BYTES)
-    return TRUE;
-
-  /* the max amount of bytes to read is the total size or
-   * up to the segment.stop if present. */
-  if (stop != -1)
-    maxsize = MIN (size, stop);
+  /* when not doing automatic EOS, just use the stop position. We don't use
+   * the size to check for EOS */
+  if (!g_atomic_int_get (&src->priv->automatic_eos))
+    maxsize = stop;
+  /* Otherwise, the max amount of bytes to read is the total
+   * size or up to the segment.stop if present. */
+  else if (stop != -1)
+    maxsize = size != -1 ? MIN (size, stop) : stop;
   else
     maxsize = size;
 
@@ -2257,33 +2357,38 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
   /* check size if we have one */
   if (maxsize != -1) {
     /* if we run past the end, check if the file became bigger and
-     * retry. */
-    if (G_UNLIKELY (offset + *length >= maxsize || force)) {
+     * retry.  Mind wrap when checking. */
+    if (G_UNLIKELY (offset >= maxsize || offset + *length >= maxsize || force)) {
       /* see if length of the file changed */
       if (bclass->get_size)
         if (!bclass->get_size (src, &size))
           size = -1;
 
-      /* make sure we don't exceed the configured segment stop
-       * if it was set */
-      if (stop != -1)
-        maxsize = MIN (size, stop);
+      /* when not doing automatic EOS, just use the stop position. We don't use
+       * the size to check for EOS */
+      if (!g_atomic_int_get (&src->priv->automatic_eos))
+        maxsize = stop;
+      /* Otherwise, the max amount of bytes to read is the total
+       * size or up to the segment.stop if present. */
+      else if (stop != -1)
+        maxsize = size != -1 ? MIN (size, stop) : stop;
       else
         maxsize = size;
 
-      /* if we are at or past the end, EOS */
-      if (G_UNLIKELY (offset >= maxsize))
-        goto unexpected_length;
-
-      /* else we can clip to the end */
-      if (G_UNLIKELY (offset + *length >= maxsize))
-        *length = maxsize - offset;
+      if (maxsize != -1) {
+        /* if we are at or past the end, EOS */
+        if (G_UNLIKELY (offset >= maxsize))
+          goto unexpected_length;
 
+        /* else we can clip to the end */
+        if (G_UNLIKELY (offset + *length >= maxsize))
+          *length = maxsize - offset;
+      }
     }
   }
 
-  /* keep track of current duration.
-   * segment is in bytes, we checked that above. */
+  /* keep track of current duration. segment is in bytes, we checked
+   * that above. */
   GST_OBJECT_LOCK (src);
   src->segment.duration = size;
   GST_OBJECT_UNLOCK (src);
@@ -2293,6 +2398,7 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
   /* ERRORS */
 unexpected_length:
   {
+    GST_WARNING_OBJECT (src, "processing at or past EOS");
     return FALSE;
   }
 }
@@ -2344,9 +2450,11 @@ again:
   }
 
   /* don't enter the create function if a pending EOS event was set. For the
-   * logic of the pending_eos, check the event function of this class. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
+   * logic of the has_pending_eos, check the event function of this class. */
+  if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
+    src->priv->forced_eos = TRUE;
     goto eos;
+  }
 
   GST_DEBUG_OBJECT (src,
       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
@@ -2359,11 +2467,12 @@ again:
   /* The create function could be unlocked because we have a pending EOS. It's
    * possible that we have a valid buffer from create that we need to
    * discard when the create function returned _OK. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
+  if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
     if (ret == GST_FLOW_OK) {
       if (*buf == NULL)
         gst_buffer_unref (res_buf);
     }
+    src->priv->forced_eos = TRUE;
     goto eos;
   }
 
@@ -2593,10 +2702,12 @@ gst_base_src_loop (GstPad * pad)
   if (gst_pad_check_reconfigure (pad)) {
     if (!gst_base_src_negotiate (src)) {
       gst_pad_mark_reconfigure (pad);
-      if (GST_PAD_IS_FLUSHING (pad))
+      if (GST_PAD_IS_FLUSHING (pad)) {
+        GST_LIVE_LOCK (src);
         goto flushing;
-      else
+      } else {
         goto negotiate_failed;
+      }
     }
   }
 
@@ -2640,7 +2751,11 @@ gst_base_src_loop (GstPad * pad)
 
   /* push events to close/start our segment before we push the buffer. */
   if (G_UNLIKELY (src->priv->segment_pending)) {
-    gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
+    GstEvent *seg_event = gst_event_new_segment (&src->segment);
+
+    gst_event_set_seqnum (seg_event, src->priv->segment_seqnum);
+    src->priv->segment_seqnum = gst_util_seqnum_next ();
+    gst_pad_push_event (pad, seg_event);
     src->priv->segment_pending = FALSE;
   }
 
@@ -2747,7 +2862,9 @@ gst_base_src_loop (GstPad * pad)
     goto pause;
   }
 
-  if (G_UNLIKELY (eos)) {
+  /* Segment pending means that a new segment was configured
+   * during this loop run */
+  if (G_UNLIKELY (eos && !src->priv->segment_pending)) {
     GST_INFO_OBJECT (src, "pausing after end of segment");
     ret = GST_FLOW_EOS;
     goto pause;
@@ -2792,12 +2909,19 @@ pause:
       GstFormat format;
       gint64 position;
 
-      /* perform EOS logic */
       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
       format = src->segment.format;
       position = src->segment.position;
 
-      if (flag_segment) {
+      /* perform EOS logic */
+      if (src->priv->forced_eos) {
+        g_assert (g_atomic_int_get (&src->priv->has_pending_eos));
+        GST_OBJECT_LOCK (src);
+        event = src->priv->pending_eos;
+        src->priv->pending_eos = NULL;
+        GST_OBJECT_UNLOCK (src);
+
+      } else if (flag_segment) {
         GstMessage *message;
 
         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
@@ -2806,12 +2930,15 @@ pause:
         gst_element_post_message (GST_ELEMENT_CAST (src), message);
         event = gst_event_new_segment_done (format, position);
         gst_event_set_seqnum (event, src->priv->seqnum);
-        gst_pad_push_event (pad, event);
+
       } else {
         event = gst_event_new_eos ();
         gst_event_set_seqnum (event, src->priv->seqnum);
-        gst_pad_push_event (pad, event);
       }
+
+      gst_pad_push_event (pad, event);
+      src->priv->forced_eos = FALSE;
+
     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
       event = gst_event_new_eos ();
       gst_event_set_seqnum (event, src->priv->seqnum);
@@ -2821,9 +2948,7 @@ pause:
        * due to flushing and posting an error message because of
        * that is the wrong thing to do, e.g. when we're doing
        * a flushing seek. */
-      GST_ELEMENT_ERROR (src, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)", reason, ret));
+      GST_ELEMENT_FLOW_ERROR (src, ret);
       gst_pad_push_event (pad, event);
     }
     goto done;
@@ -2858,6 +2983,11 @@ gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
   oldalloc = priv->allocator;
   priv->allocator = allocator;
 
+  if (priv->pool)
+    gst_object_ref (priv->pool);
+  if (priv->allocator)
+    gst_object_ref (priv->allocator);
+
   if (params)
     priv->params = *params;
   else
@@ -2948,7 +3078,25 @@ gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
     config = gst_buffer_pool_get_config (pool);
     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
     gst_buffer_pool_config_set_allocator (config, allocator, &params);
-    gst_buffer_pool_set_config (pool, config);
+
+    /* buffer pool may have to do some changes */
+    if (!gst_buffer_pool_set_config (pool, config)) {
+      config = gst_buffer_pool_get_config (pool);
+
+      /* If change are not acceptable, fallback to generic pool */
+      if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min,
+              max)) {
+        GST_DEBUG_OBJECT (basesrc, "unsupported pool, making new pool");
+
+        gst_object_unref (pool);
+        pool = gst_buffer_pool_new ();
+        gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
+        gst_buffer_pool_config_set_allocator (config, allocator, &params);
+      }
+
+      if (!gst_buffer_pool_set_config (pool, config))
+        goto config_failed;
+    }
   }
 
   if (update_allocator)
@@ -2964,6 +3112,13 @@ gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
   }
 
   return TRUE;
+
+config_failed:
+  GST_ELEMENT_ERROR (basesrc, RESOURCE, SETTINGS,
+      ("Failed to configure the buffer pool"),
+      ("Configuration is most likely invalid, please report this issue."));
+  gst_object_unref (pool);
+  return FALSE;
 }
 
 static gboolean
@@ -3010,6 +3165,11 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
 
   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
 
+  if (allocator)
+    gst_object_unref (allocator);
+  if (pool)
+    gst_object_unref (pool);
+
   gst_query_unref (query);
 
   return result;
@@ -3151,6 +3311,8 @@ gst_base_src_start (GstBaseSrc * basesrc)
   basesrc->num_buffers_left = basesrc->num_buffers;
   basesrc->running = FALSE;
   basesrc->priv->segment_pending = FALSE;
+  basesrc->priv->segment_seqnum = gst_util_seqnum_next ();
+  basesrc->priv->forced_eos = FALSE;
   GST_LIVE_UNLOCK (basesrc);
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
@@ -3189,7 +3351,10 @@ was_started:
 could_not_start:
   {
     GST_DEBUG_OBJECT (basesrc, "could not start");
-    /* subclass is supposed to post a message. We don't have to call _stop. */
+    /* subclass is supposed to post a message but we post one as a fallback
+     * just in case. We don't have to call _stop. */
+    GST_ELEMENT_ERROR (basesrc, CORE, STATE_CHANGE, (NULL),
+        ("Failed to start"));
     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
     return FALSE;
   }
@@ -3265,24 +3430,30 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
   /* take the stream lock here, we only want to let the task run when we have
    * set the STARTED flag */
   GST_PAD_STREAM_LOCK (basesrc->srcpad);
-  if (mode == GST_PAD_MODE_PUSH) {
-    /* do initial seek, which will start the task */
-    GST_OBJECT_LOCK (basesrc);
-    event = basesrc->pending_seek;
-    basesrc->pending_seek = NULL;
-    GST_OBJECT_UNLOCK (basesrc);
-
-    /* The perform seek code will start the task when finished. We don't have to
-     * unlock the streaming thread because it is not running yet */
-    if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
-      goto seek_failed;
-
-    if (event)
-      gst_event_unref (event);
-  } else {
-    /* if not random_access, we cannot operate in pull mode for now */
-    if (G_UNLIKELY (!basesrc->random_access))
-      goto no_get_range;
+  switch (mode) {
+    case GST_PAD_MODE_PUSH:
+      /* do initial seek, which will start the task */
+      GST_OBJECT_LOCK (basesrc);
+      event = basesrc->pending_seek;
+      basesrc->pending_seek = NULL;
+      GST_OBJECT_UNLOCK (basesrc);
+
+      /* The perform seek code will start the task when finished. We don't have to
+       * unlock the streaming thread because it is not running yet */
+      if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
+        goto seek_failed;
+
+      if (event)
+        gst_event_unref (event);
+      break;
+    case GST_PAD_MODE_PULL:
+      /* if not random_access, we cannot operate in pull mode for now */
+      if (G_UNLIKELY (!basesrc->random_access))
+        goto no_get_range;
+      break;
+    default:
+      goto not_activated_yet;
+      break;
   }
 
   GST_OBJECT_LOCK (basesrc);
@@ -3314,6 +3485,14 @@ no_get_range:
     ret = GST_FLOW_ERROR;
     goto error;
   }
+not_activated_yet:
+  {
+    GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
+    gst_base_src_stop (basesrc);
+    GST_WARNING_OBJECT (basesrc, "pad not activated yet");
+    ret = GST_FLOW_ERROR;
+    goto error;
+  }
 error:
   {
     GST_OBJECT_LOCK (basesrc);
@@ -3421,7 +3600,12 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
     basesrc->live_running = TRUE;
 
     /* clear pending EOS if any */
-    g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+    if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
+      GST_OBJECT_LOCK (basesrc);
+      CLEAR_PENDING_EOS (basesrc);
+      basesrc->priv->forced_eos = FALSE;
+      GST_OBJECT_UNLOCK (basesrc);
+    }
 
     /* step 1, now that we have the LIVE lock, clear our unlock request */
     if (bclass->unlock_stop)
@@ -3592,6 +3776,8 @@ gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
 
   src->priv->stream_start_pending = FALSE;
 
+  GST_DEBUG_OBJECT (pad, "activating in mode %d", mode);
+
   switch (mode) {
     case GST_PAD_MODE_PULL:
       res = gst_base_src_activate_pull (pad, parent, active);
@@ -3653,7 +3839,11 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
     {
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
-      g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+      if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
+        GST_OBJECT_LOCK (basesrc);
+        CLEAR_PENDING_EOS (basesrc);
+        GST_OBJECT_UNLOCK (basesrc);
+      }
       gst_event_replace (&basesrc->pending_seek, NULL);
       break;
     }
@@ -3681,7 +3871,7 @@ failure:
  * @src: a #GstBaseSrc
  *
  * Returns: (transfer full): the instance of the #GstBufferPool used
- * by the src; free it after use it
+ * by the src; unref it after usage.
  */
 GstBufferPool *
 gst_base_src_get_buffer_pool (GstBaseSrc * src)
@@ -3700,12 +3890,12 @@ gst_base_src_get_buffer_pool (GstBaseSrc * src)
  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
  * used
  * @params: (out) (allow-none) (transfer full): the
- * #GstAllocatorParams of @allocator
+ * #GstAllocationParams of @allocator
  *
  * Lets #GstBaseSrc sub-classes to know the memory @allocator
  * used by the base class and its @params.
  *
- * Unref the @allocator after use it.
+ * Unref the @allocator after usage.
  */
 void
 gst_base_src_get_allocator (GstBaseSrc * src,