Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
index c8d510d..16a239b 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 /**
  * SECTION:gstcollectpads
+ * @title: GstCollectPads
  * @short_description: manages a set of pads that operate in collect mode
  * @see_also:
  *
  * Manages a set of pads that operate in collect mode. This means that control
  * is given to the manager of this object when all pads have data.
- * <itemizedlist>
- *   <listitem><para>
- *     Collectpads are created with gst_collect_pads_new(). A callback should then
+ *
+ *   * Collectpads are created with gst_collect_pads_new(). A callback should then
  *     be installed with gst_collect_pads_set_function ().
- *   </para></listitem>
- *   <listitem><para>
- *     Pads are added to the collection with gst_collect_pads_add_pad()/
+ *
+ *   * Pads are added to the collection with gst_collect_pads_add_pad()/
  *     gst_collect_pads_remove_pad(). The pad
  *     has to be a sinkpad. The chain and event functions of the pad are
  *     overridden. The element_private of the pad is used to store
  *     private information for the collectpads.
- *   </para></listitem>
- *   <listitem><para>
- *     For each pad, data is queued in the _chain function or by
+ *
+ *   * For each pad, data is queued in the _chain function or by
  *     performing a pull_range.
- *   </para></listitem>
- *   <listitem><para>
- *     When data is queued on all pads in waiting mode, the callback function is called.
- *   </para></listitem>
- *   <listitem><para>
- *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
+ *
+ *   * When data is queued on all pads in waiting mode, the callback function is called.
+ *
+ *   * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
  *     One can peek at the data with the gst_collect_pads_peek() function.
- *     These functions will return NULL if the pad received an EOS event. When all
- *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
+ *     These functions will return %NULL if the pad received an EOS event. When all
+ *     pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
  *     event itself.
- *   </para></listitem>
- *   <listitem><para>
- *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
- *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
- *   </para></listitem>
- *   <listitem><para>
- *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
+ *
+ *   * Data can also be dequeued in byte units using the gst_collect_pads_available(),
+ *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
+ *
+ *   * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
  *     their state change functions to start and stop the processing of the collectpads.
  *     The gst_collect_pads_stop() call should be called before calling the parent
  *     element state change function in the PAUSED_TO_READY state change to ensure
  *     no pad is blocked and the element can finish streaming.
- *   </para></listitem>
- *   <listitem><para>
- *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
- *     elements that start a #GstTask to drive the collect_pads. This feature is however
- *     not yet implemented.
- *   </para></listitem>
- *   <listitem><para>
- *     gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
+ *
+ *   * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
  *     Thus these pads may but need not have data when the callback is called.
  *     All pads are in waiting mode by default.
- *   </para></listitem>
- * </itemizedlist>
  *
- * Last reviewed on 2011-10-28 (0.10.36)
- *
- * Since: 0.10.36
  */
 
 #ifdef HAVE_CONFIG_H
@@ -109,7 +92,6 @@ struct _GstCollectPadsPrivate
 {
   /* with LOCK and/or STREAM_LOCK */
   gboolean started;
-  gboolean stream_started;
 
   /* with STREAM_LOCK */
   guint32 cookie;               /* @data list cookie */
@@ -135,11 +117,17 @@ struct _GstCollectPadsPrivate
   gpointer query_user_data;
   GstCollectPadsClipFunction clip_func;
   gpointer clip_user_data;
+  GstCollectPadsFlushFunction flush_func;
+  gpointer flush_user_data;
 
   /* no other lock needed */
   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
   GCond evt_cond;
   guint32 evt_cookie;
+
+  gboolean seeking;
+  gboolean pending_flush_start;
+  gboolean pending_flush_stop;
 };
 
 static void gst_collect_pads_clear (GstCollectPads * pads,
@@ -236,7 +224,6 @@ gst_collect_pads_init (GstCollectPads * pads)
   pads->priv->queuedpads = 0;
   pads->priv->eospads = 0;
   pads->priv->started = FALSE;
-  pads->priv->stream_started = FALSE;
 
   g_rec_mutex_init (&pads->stream_lock);
 
@@ -264,6 +251,13 @@ gst_collect_pads_init (GstCollectPads * pads)
   g_mutex_init (&pads->priv->evt_lock);
   g_cond_init (&pads->priv->evt_cond);
   pads->priv->evt_cookie = 0;
+
+  pads->priv->seeking = FALSE;
+  pads->priv->pending_flush_start = FALSE;
+  pads->priv->pending_flush_stop = FALSE;
+
+  /* clear floating flag */
+  gst_object_ref_sink (pads);
 }
 
 static void
@@ -290,13 +284,11 @@ gst_collect_pads_finalize (GObject * object)
 /**
  * gst_collect_pads_new:
  *
- * Create a new instance of #GstCollectsPads.
+ * Create a new instance of #GstCollectPads.
  *
  * MT safe.
  *
- * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
- *
- * Since: 0.10.36
+ * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error.
  */
 GstCollectPads *
 gst_collect_pads_new (void)
@@ -320,15 +312,15 @@ gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_buffer_function:
  * @pads: the collectpads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: (closure): user data passed to the function
  *
  * Set the callback function and user data that will be called with
- * the oldest buffer when all pads have been collected.
+ * the oldest buffer when all pads have been collected, or %NULL on EOS.
+ * If a buffer is passed, the callback owns a reference and must unref
+ * it.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
@@ -345,14 +337,12 @@ gst_collect_pads_set_buffer_function (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_compare_function:
  * @pads: the pads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: (closure): user data passed to the function
  *
  * Set the timestamp comparison function.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 /* NOTE allowing to change comparison seems not advisable;
 no known use-case, and collaboration with default algorithm is unpredictable.
@@ -373,8 +363,8 @@ gst_collect_pads_set_compare_function (GstCollectPads * pads,
 
 /**
  * gst_collect_pads_set_function:
- * @pads: the collectspads to use
- * @func: the function to set
+ * @pads: the collectpads to use
+ * @func: (scope call): the function to set
  * @user_data: user data passed to the function
  *
  * CollectPads provides a default collection algorithm that will determine
@@ -388,8 +378,6 @@ gst_collect_pads_set_compare_function (GstCollectPads * pads,
  * If this callback is set, the former will be unset.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_set_function (GstCollectPads * pads,
@@ -435,8 +423,8 @@ unref_data (GstCollectData * data)
 
 /**
  * gst_collect_pads_set_event_function:
- * @pads: the collectspads to use
- * @func: the function to set
+ * @pads: the collectpads to use
+ * @func: (scope call): the function to set
  * @user_data: user data passed to the function
  *
  * Set the event callback function and user data that will be called when
@@ -447,8 +435,6 @@ unref_data (GstCollectData * data)
  * if so (unusually) needed.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_set_event_function (GstCollectPads * pads,
@@ -465,8 +451,8 @@ gst_collect_pads_set_event_function (GstCollectPads * pads,
 
 /**
  * gst_collect_pads_set_query_function:
- * @pads: the collectspads to use
- * @func: the function to set
+ * @pads: the collectpads to use
+ * @func: (scope call): the function to set
  * @user_data: user data passed to the function
  *
  * Set the query callback function and user data that will be called after
@@ -477,8 +463,6 @@ gst_collect_pads_set_event_function (GstCollectPads * pads,
  * if so (unusually) needed.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_set_query_function (GstCollectPads * pads,
@@ -495,56 +479,82 @@ gst_collect_pads_set_query_function (GstCollectPads * pads,
 
 /**
 * gst_collect_pads_clip_running_time:
-* @pads: the collectspads to use
+* @pads: the collectpads to use
 * @cdata: collect data of corresponding pad
 * @buf: buffer being clipped
-* @outbuf: output buffer with running time, or NULL if clipped
+* @outbuf: (allow-none): output buffer with running time, or NULL if clipped
 * @user_data: user data (unused)
 *
 * Convenience clipping function that converts incoming buffer's timestamp
 * to running time, or clips the buffer if outside configured segment.
 *
-* Since: 0.10.37
+* Since 1.6, this clipping function also sets the DTS parameter of the
+* GstCollectData structure. This version of the running time DTS can be
+* negative. G_MININT64 is used to indicate invalid value.
 */
 GstFlowReturn
 gst_collect_pads_clip_running_time (GstCollectPads * pads,
     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
     gpointer user_data)
 {
-  GstClockTime time;
-
   *outbuf = buf;
-  time = GST_BUFFER_TIMESTAMP (buf);
 
   /* invalid left alone and passed */
-  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
-    time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
-    if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
-      GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
-      gst_buffer_unref (buf);
-      *outbuf = NULL;
+  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
+    GstClockTime time;
+    GstClockTime buf_dts, abs_dts;
+    gint dts_sign;
+
+    time = GST_BUFFER_PTS (buf);
+
+    if (GST_CLOCK_TIME_IS_VALID (time)) {
+      time =
+          gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
+      if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
+        GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
+            GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
+        gst_buffer_unref (buf);
+        *outbuf = NULL;
+        return GST_FLOW_OK;
+      }
+    }
+
+    GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
+        GST_TIME_FORMAT " running time",
+        GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
+    *outbuf = gst_buffer_make_writable (buf);
+    GST_BUFFER_PTS (*outbuf) = time;
+
+    dts_sign = gst_segment_to_running_time_full (&cdata->segment,
+        GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
+    buf_dts = GST_BUFFER_DTS (*outbuf);
+    if (dts_sign > 0) {
+      GST_BUFFER_DTS (*outbuf) = abs_dts;
+      GST_COLLECT_PADS_DTS (cdata) = abs_dts;
+    } else if (dts_sign < 0) {
+      GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
+      GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
     } else {
-      GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
-          GST_TIME_FORMAT " running time",
-          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
-      *outbuf = gst_buffer_make_writable (buf);
-      GST_BUFFER_TIMESTAMP (*outbuf) = time;
+      GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
+      GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
     }
+
+    GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
+        GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
+        GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
   }
 
   return GST_FLOW_OK;
 }
 
- /**
+/**
  * gst_collect_pads_set_clip_function:
- * @pads: the collectspads to use
- * @clipfunc: clip function to install
+ * @pads: the collectpads to use
+ * @clipfunc: (scope call): clip function to install
  * @user_data: user data to pass to @clip_func
  *
  * Install a clipping function that is called right after a buffer is received
- * on a pad managed by @pads. See #GstCollectPad2ClipFunction for more info.
- *
- * Since: 0.10.36
+ * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
  */
 void
 gst_collect_pads_set_clip_function (GstCollectPads * pads,
@@ -558,45 +568,35 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads,
 }
 
 /**
- * gst_collect_pads_add_pad:
- * @pads: the collectspads to use
- * @pad: (transfer none): the pad to add
- * @size: the size of the returned #GstCollectData structure
- *
- * Add a pad to the collection of collect pads. The pad has to be
- * a sinkpad. The refcount of the pad is incremented. Use
- * gst_collect_pads_remove_pad() to remove the pad from the collection
- * again.
- *
- * You specify a size for the returned #GstCollectData structure
- * so that you can use it to store additional information.
- *
- * The pad will be automatically activated in push mode when @pads is
- * started.
- *
- * This function calls gst_collect_pads_add_pad_full() passing a value of NULL
- * for destroy_notify and TRUE for locked.
- *
- * MT safe.
+ * gst_collect_pads_set_flush_function:
+ * @pads: the collectpads to use
+ * @func: (scope call): flush function to install
+ * @user_data: user data to pass to @func
  *
- * Returns: a new #GstCollectData to identify the new pad. Or NULL
- *   if wrong parameters are supplied.
+ * Install a flush function that is called when the internal
+ * state of all pads should be flushed as part of flushing seek
+ * handling. See #GstCollectPadsFlushFunction for more info.
  *
- * Since: 0.10.36
+ * Since: 1.4
  */
-GstCollectData *
-gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
+void
+gst_collect_pads_set_flush_function (GstCollectPads * pads,
+    GstCollectPadsFlushFunction func, gpointer user_data)
 {
-  return gst_collect_pads_add_pad_full (pads, pad, size, NULL, TRUE);
+  g_return_if_fail (pads != NULL);
+  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
+
+  pads->priv->flush_func = func;
+  pads->priv->flush_user_data = user_data;
 }
 
 /**
- * gst_collect_pads_add_pad_full:
- * @pads: the collectspads to use
+ * gst_collect_pads_add_pad:
+ * @pads: the collectpads to use
  * @pad: (transfer none): the pad to add
  * @size: the size of the returned #GstCollectData structure
- * @destroy_notify: function to be called before the returned #GstCollectData
- * structure is freed
+ * @destroy_notify: (scope async): function to be called before the returned
+ *   #GstCollectData structure is freed
  * @lock: whether to lock this pad in usual waiting state
  *
  * Add a pad to the collection of collect pads. The pad has to be
@@ -625,14 +625,12 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
  *
  * MT safe.
  *
- * Since: 0.10.36
- *
- * Returns: a new #GstCollectData to identify the new pad. Or NULL
- *   if wrong parameters are supplied.
+ * Returns: (nullable) (transfer none): a new #GstCollectData to identify the
+ *   new pad. Or %NULL if wrong parameters are supplied.
  */
 GstCollectData *
-gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad,
-    guint size, GstCollectDataDestroyNotify destroy_notify, gboolean lock)
+gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
+    GstCollectDataDestroyNotify destroy_notify, gboolean lock)
 {
   GstCollectData *data;
 
@@ -655,6 +653,7 @@ gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad,
   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
   data->priv->refcount = 1;
   data->priv->destroy_notify = destroy_notify;
+  data->ABI.abi.dts = G_MININT64;
 
   GST_OBJECT_LOCK (pads);
   GST_OBJECT_LOCK (pad);
@@ -691,7 +690,7 @@ find_pad (GstCollectData * data, GstPad * pad)
 
 /**
  * gst_collect_pads_remove_pad:
- * @pads: the collectspads to use
+ * @pads: the collectpads to use
  * @pad: (transfer none): the pad to remove
  *
  * Remove a pad from the collection of collect pads. This function will also
@@ -703,8 +702,6 @@ find_pad (GstCollectData * data, GstPad * pad)
  * MT safe.
  *
  * Returns: %TRUE if the pad could be removed.
- *
- * Since: 0.10.36
  */
 gboolean
 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
@@ -778,91 +775,8 @@ unknown_pad:
   }
 }
 
-/**
- * gst_collect_pads_is_active:
- * @pads: the collectspads to use
- * @pad: the pad to check
- *
- * Check if a pad is active.
- *
- * This function is currently not implemented.
- *
- * MT safe.
- *
- * Returns: %TRUE if the pad is active.
- *
- * Since: 0.10.36
- */
-gboolean
-gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
-{
-  g_return_val_if_fail (pads != NULL, FALSE);
-  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
-  g_return_val_if_fail (pad != NULL, FALSE);
-  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
-
-  g_warning ("gst_collect_pads_is_active() is not implemented");
-
-  return FALSE;
-}
-
-/**
- * gst_collect_pads_collect:
- * @pads: the collectspads to use
- *
- * Collect data on all pads. This function is usually called
- * from a #GstTask function in an element. 
- *
- * This function is currently not implemented.
- *
- * MT safe.
- *
- * Returns: #GstFlowReturn of the operation.
- *
- * Since: 0.10.36
- */
-GstFlowReturn
-gst_collect_pads_collect (GstCollectPads * pads)
-{
-  g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
-  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
-
-  g_warning ("gst_collect_pads_collect() is not implemented");
-
-  return GST_FLOW_NOT_SUPPORTED;
-}
-
-/**
- * gst_collect_pads_collect_range:
- * @pads: the collectspads to use
- * @offset: the offset to collect
- * @length: the length to collect
- *
- * Collect data with @offset and @length on all pads. This function
- * is typically called in the getrange function of an element. 
- *
- * This function is currently not implemented.
- *
- * MT safe.
- *
- * Returns: #GstFlowReturn of the operation.
- *
- * Since: 0.10.36
- */
-GstFlowReturn
-gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
-    guint length)
-{
-  g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
-  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
-
-  g_warning ("gst_collect_pads_collect_range() is not implemented");
-
-  return GST_FLOW_NOT_SUPPORTED;
-}
-
 /*
- * Must be called with STREAM_LOCK.
+ * Must be called with STREAM_LOCK and OBJECT_LOCK.
  */
 static void
 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
@@ -871,7 +785,7 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
   GSList *walk = NULL;
 
   /* Update the pads flushing flag */
-  for (walk = pads->data; walk; walk = g_slist_next (walk)) {
+  for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
     GstCollectData *cdata = walk->data;
 
     if (GST_IS_PAD (cdata->pad)) {
@@ -895,7 +809,7 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
 
 /**
  * gst_collect_pads_set_flushing:
- * @pads: the collectspads to use
+ * @pads: the collectpads to use
  * @flushing: desired state of the pads
  *
  * Change the flushing state of all the pads in the collection. No pad
@@ -905,8 +819,6 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
  * e.g. by sending a FLUSH_START downstream.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
@@ -916,19 +828,19 @@ gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
 
   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
   GST_COLLECT_PADS_STREAM_LOCK (pads);
+  GST_OBJECT_LOCK (pads);
   gst_collect_pads_set_flushing_unlocked (pads, flushing);
+  GST_OBJECT_UNLOCK (pads);
   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 }
 
 /**
  * gst_collect_pads_start:
- * @pads: the collectspads to use
+ * @pads: the collectpads to use
  *
  * Starts the processing of data in the collect_pads.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_start (GstCollectPads * pads)
@@ -965,14 +877,12 @@ gst_collect_pads_start (GstCollectPads * pads)
 
 /**
  * gst_collect_pads_stop:
- * @pads: the collectspads to use
+ * @pads: the collectpads to use
  *
  * Stops the processing of data in the collect_pads. this function
  * will also unblock any blocking operations.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_stop (GstCollectPads * pads)
@@ -1015,7 +925,6 @@ gst_collect_pads_stop (GstCollectPads * pads)
     unref_data (pads->priv->earliest_data);
   pads->priv->earliest_data = NULL;
   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
-  pads->priv->stream_started = FALSE;
 
   GST_OBJECT_UNLOCK (pads);
   /* Wake them up so they can end the chain functions. */
@@ -1026,7 +935,7 @@ gst_collect_pads_stop (GstCollectPads * pads)
 
 /**
  * gst_collect_pads_peek:
- * @pads: the collectspads to peek
+ * @pads: the collectpads to peek
  * @data: the data to use
  *
  * Peek at the buffer currently queued in @data. This function
@@ -1035,10 +944,8 @@ gst_collect_pads_stop (GstCollectPads * pads)
  *
  * MT safe.
  *
- * Returns: The buffer in @data or NULL if no buffer is queued.
- *  should unref the buffer after usage.
- *
- * Since: 0.10.36
+ * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
+ * buffer is queued. should unref the buffer after usage.
  */
 GstBuffer *
 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
@@ -1052,7 +959,7 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
   if ((result = data->buffer))
     gst_buffer_ref (result);
 
-  GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
+  GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%" GST_PTR_FORMAT,
       GST_DEBUG_PAD_NAME (data->pad), result);
 
   return result;
@@ -1060,7 +967,7 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
 
 /**
  * gst_collect_pads_pop:
- * @pads: the collectspads to pop
+ * @pads: the collectpads to pop
  * @data: the data to use
  *
  * Pop the buffer currently queued in @data. This function
@@ -1069,10 +976,8 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
  *
  * MT safe.
  *
- * Returns: (transfer full): The buffer in @data or NULL if no buffer was
- *   queued. You should unref the buffer after usage.
- *
- * Since: 0.10.36
+ * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
+ * buffer was queued. You should unref the buffer after usage.
  */
 GstBuffer *
 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
@@ -1093,7 +998,7 @@ gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
 
   GST_COLLECT_PADS_EVT_BROADCAST (pads);
 
-  GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
+  GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%" GST_PTR_FORMAT,
       GST_DEBUG_PAD_NAME (data->pad), result);
 
   return result;
@@ -1112,7 +1017,7 @@ gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
 
 /**
  * gst_collect_pads_available:
- * @pads: the collectspads to query
+ * @pads: the collectpads to query
  *
  * Query how much bytes can be read from each queued buffer. This means
  * that the result of this call is the maximum number of bytes that can
@@ -1125,8 +1030,6 @@ gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
  *
  * Returns: The maximum number of bytes queued on all pads. This function
  * returns 0 if a pad has no queued buffer.
- *
- * Since: 0.10.36
  */
 /* we might pre-calculate this in some struct field,
  * but would then have to maintain this in _chain and particularly _pop, etc,
@@ -1183,7 +1086,7 @@ not_filled:
 
 /**
  * gst_collect_pads_flush:
- * @pads: the collectspads to query
+ * @pads: the collectpads to query
  * @data: the data to use
  * @size: the number of bytes to flush
  *
@@ -1196,8 +1099,6 @@ not_filled:
  *
  * Returns: The number of bytes flushed This can be less than @size and
  * is 0 if the pad was end-of-stream.
- *
- * Since: 0.10.36
  */
 guint
 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
@@ -1231,7 +1132,7 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
 
 /**
  * gst_collect_pads_read_buffer:
- * @pads: the collectspads to query
+ * @pads: the collectpads to query
  * @data: the data to use
  * @size: the number of bytes to read
  *
@@ -1242,17 +1143,15 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
  *
  * MT safe.
  *
- * Since: 0.10.36
- *
- * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
- * A return of NULL signals that the pad is end-of-stream.
- * Unref the buffer after use.
+ * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
+ * be less that requested. A return of %NULL signals that the pad is
+ * end-of-stream. Unref the buffer after use.
  */
 GstBuffer *
 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
     guint size)
 {
-  guint readsize;
+  guint readsize, buf_size;
   GstBuffer *buffer;
 
   g_return_val_if_fail (pads != NULL, NULL);
@@ -1263,7 +1162,8 @@ gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
   if ((buffer = data->buffer) == NULL)
     return NULL;
 
-  readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
+  buf_size = gst_buffer_get_size (buffer);
+  readsize = MIN (size, buf_size - data->pos);
 
   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
       readsize);
@@ -1271,7 +1171,7 @@ gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
 
 /**
  * gst_collect_pads_take_buffer:
- * @pads: the collectspads to query
+ * @pads: the collectpads to query
  * @data: the data to use
  * @size: the number of bytes to read
  *
@@ -1283,11 +1183,9 @@ gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
  *
  * MT safe.
  *
- * Since: 0.10.36
- *
- * Returns: A sub buffer. The size of the buffer can be less that requested.
- * A return of NULL signals that the pad is end-of-stream.
- * Unref the buffer after use.
+ * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
+ * be less that requested. A return of %NULL signals that the pad is
+ * end-of-stream. Unref the buffer after use.
  */
 GstBuffer *
 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
@@ -1303,7 +1201,7 @@ gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
 
 /**
  * gst_collect_pads_set_waiting:
- * @pads: the collectspads
+ * @pads: the collectpads
  * @data: the data to use
  * @waiting: boolean indicating whether this pad should operate
  *           in waiting or non-waiting mode
@@ -1316,8 +1214,6 @@ gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
  * in the callback.
  *
  * MT safe.
- *
- * Since: 0.10.36
  */
 void
 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
@@ -1439,7 +1335,13 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
 
-    flow_ret = func (pads, user_data);
+    if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
+                TRUE, FALSE))) {
+      GST_INFO_OBJECT (pads, "finished seeking");
+    }
+    do {
+      flow_ret = func (pads, user_data);
+    } while (flow_ret == GST_FLOW_OK);
   } else {
     gboolean collected = FALSE;
 
@@ -1451,6 +1353,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
           GST_DEBUG_FUNCPTR_NAME (func));
 
+      if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
+                  TRUE, FALSE))) {
+        GST_INFO_OBJECT (pads, "finished seeking");
+      }
       flow_ret = func (pads, user_data);
       collected = TRUE;
 
@@ -1481,7 +1387,7 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
  *
  * Must be called with STREAM_LOCK.
  *
- * Returns TRUE if a pad was set to waiting
+ * Returns %TRUE if a pad was set to waiting
  * (from non-waiting state).
  */
 static gboolean
@@ -1497,6 +1403,7 @@ gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
     GstCollectData *data = (GstCollectData *) collected->data;
     int cmp_res;
+    GstClockTime comp_time;
 
     /* check if pad has a segment */
     if (data->segment.format == GST_FORMAT_UNDEFINED) {
@@ -1513,7 +1420,8 @@ gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
     }
 
     /* check if the waiting state should be changed */
-    cmp_res = pads->priv->compare_func (pads, data, data->segment.start,
+    comp_time = data->segment.position;
+    cmp_res = pads->priv->compare_func (pads, data, comp_time,
         pads->priv->earliest_data, pads->priv->earliest_time,
         pads->priv->compare_user_data);
     if (cmp_res > 0)
@@ -1542,8 +1450,6 @@ gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
  *
  * This function should be called with STREAM_LOCK held,
  * such as in the callback.
- *
- * Since: 0.10.36
  */
 static void
 gst_collect_pads_find_best_pad (GstCollectPads * pads,
@@ -1564,7 +1470,7 @@ gst_collect_pads_find_best_pad (GstCollectPads * pads,
     buffer = gst_collect_pads_peek (pads, data);
     /* if we have a buffer check if it is better then the current best one */
     if (buffer != NULL) {
-      timestamp = GST_BUFFER_TIMESTAMP (buffer);
+      timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
       gst_buffer_unref (buffer);
       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
               best, best_time, pads->priv->compare_user_data) < 0) {
@@ -1690,18 +1596,70 @@ gst_collect_pads_default_compare_func (GstCollectPads * pads,
   return 0;
 }
 
+/* called with STREAM_LOCK */
+static void
+gst_collect_pads_handle_position_update (GstCollectPads * pads,
+    GstCollectData * data, GstClockTime new_pos)
+{
+  gint cmp_res;
+
+  /* If oldest time is not known, or current pad got newsegment;
+   * recalculate the state */
+  if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
+    gst_collect_pads_recalculate_full (pads);
+    goto exit;
+  }
+
+  /* Check if the waiting state of the pad should change. */
+  cmp_res =
+      pads->priv->compare_func (pads, data, new_pos,
+      pads->priv->earliest_data, pads->priv->earliest_time,
+      pads->priv->compare_user_data);
+
+  if (cmp_res > 0)
+    /* Stop waiting */
+    gst_collect_pads_set_waiting (pads, data, FALSE);
+
+exit:
+  return;
+
+}
+
+static GstClockTime
+gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
+    GstClockTime time)
+{
+  GstClockTime otime = time;
+  GstBuffer *in, *out = NULL;
+
+  if (pads->priv->clip_func) {
+    in = gst_buffer_new ();
+    GST_BUFFER_PTS (in) = time;
+    GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE;
+    pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
+    if (out) {
+      otime = GST_BUFFER_PTS (out);
+      gst_buffer_unref (out);
+    } else {
+      /* FIXME should distinguish between ahead or after segment,
+       * let's assume after segment and use some large time ... */
+      otime = G_MAXINT64 / 2;
+    }
+  }
+
+  return otime;
+}
+
 /**
  * gst_collect_pads_event_default:
- * @pads: the collectspads to use
+ * @pads: the collectpads to use
  * @data: collect data of corresponding pad
  * @event: event being processed
  * @discard: process but do not send event downstream
  *
- * Default GstCollectPads event handling that elements should always
+ * Default #GstCollectPads event handling that elements should always
  * chain up to to ensure proper operation.  Element might however indicate
  * event should not be forwarded downstream.
- *
- * Since: 0.11.x
  */
 gboolean
 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
@@ -1722,33 +1680,59 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
-      /* forward event to unblock check_collected */
-      GST_DEBUG_OBJECT (pad, "forwarding flush start");
-      res = gst_pad_event_default (pad, parent, event);
-      event = NULL;
-
-      /* now unblock the chain function.
-       * no cond per pad, so they all unblock,
-       * non-flushing block again */
-      GST_COLLECT_PADS_STREAM_LOCK (pads);
-      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
-      gst_collect_pads_clear (pads, data);
+      if (g_atomic_int_get (&pads->priv->seeking)) {
+        /* drop all but the first FLUSH_STARTs when seeking */
+        if (!g_atomic_int_compare_and_exchange (&pads->
+                priv->pending_flush_start, TRUE, FALSE))
+          goto eat;
 
-      /* cater for possible default muxing functionality */
-      if (buffer_func) {
-        /* restore to initial state */
-        gst_collect_pads_set_waiting (pads, data, TRUE);
-        /* if the current pad is affected, reset state, recalculate later */
-        if (pads->priv->earliest_data == data) {
-          unref_data (data);
-          pads->priv->earliest_data = NULL;
-          pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
+        /* unblock collect pads */
+        gst_pad_event_default (pad, parent, event);
+        event = NULL;
+
+        GST_COLLECT_PADS_STREAM_LOCK (pads);
+        /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
+         * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
+         * non-flushing (which happens in gst_collect_pads_event_default).
+         */
+        gst_collect_pads_set_flushing (pads, TRUE);
+
+        if (pads->priv->flush_func)
+          pads->priv->flush_func (pads, pads->priv->flush_user_data);
+
+        g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
+        GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+
+        goto eat;
+      } else {
+        /* forward event to unblock check_collected */
+        GST_DEBUG_OBJECT (pad, "forwarding flush start");
+        res = gst_pad_event_default (pad, parent, event);
+        event = NULL;
+
+        /* now unblock the chain function.
+         * no cond per pad, so they all unblock,
+         * non-flushing block again */
+        GST_COLLECT_PADS_STREAM_LOCK (pads);
+        GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
+        gst_collect_pads_clear (pads, data);
+
+        /* cater for possible default muxing functionality */
+        if (buffer_func) {
+          /* restore to initial state */
+          gst_collect_pads_set_waiting (pads, data, TRUE);
+          /* if the current pad is affected, reset state, recalculate later */
+          if (pads->priv->earliest_data == data) {
+            unref_data (data);
+            pads->priv->earliest_data = NULL;
+            pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
+          }
         }
-      }
 
-      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+        GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
-      goto eat;
+        goto eat;
+      }
     }
     case GST_EVENT_FLUSH_STOP:
     {
@@ -1766,12 +1750,22 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
                 GST_COLLECT_PADS_STATE_WAITING))
           pads->priv->queuedpads++;
-        pads->priv->eospads--;
+        if (!g_atomic_int_get (&pads->priv->seeking)) {
+          pads->priv->eospads--;
+        }
         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
       }
       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
-      goto forward;
+      if (g_atomic_int_get (&pads->priv->seeking)) {
+        if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
+                TRUE, FALSE))
+          goto forward;
+        else
+          goto eat;
+      } else {
+        goto forward;
+      }
     }
     case GST_EVENT_EOS:
     {
@@ -1795,7 +1789,6 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
     case GST_EVENT_SEGMENT:
     {
       GstSegment seg;
-      gint cmp_res;
 
       GST_COLLECT_PADS_STREAM_LOCK (pads);
 
@@ -1810,29 +1803,21 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
         goto newsegment_done;
       }
 
+      /* need to update segment first */
       data->segment = seg;
       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
 
+      /* now we can use for e.g. running time */
+      seg.position =
+          gst_collect_pads_clip_time (pads, data, seg.start + seg.offset);
+      /* update again */
+      data->segment = seg;
+
       /* default muxing functionality */
       if (!buffer_func)
         goto newsegment_done;
 
-      /* If oldest time is not known, or current pad got newsegment;
-       * recalculate the state */
-      if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
-        gst_collect_pads_recalculate_full (pads);
-        goto newsegment_done;
-      }
-
-      /* Check if the waiting state of the pad should change. */
-      cmp_res =
-          pads->priv->compare_func (pads, data, seg.start,
-          pads->priv->earliest_data, pads->priv->earliest_time,
-          pads->priv->compare_user_data);
-
-      if (cmp_res > 0)
-        /* Stop waiting */
-        gst_collect_pads_set_waiting (pads, data, FALSE);
+      gst_collect_pads_handle_position_update (pads, data, seg.position);
 
     newsegment_done:
       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
@@ -1840,17 +1825,31 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
        * accumulated and this is certainly not what we want. */
       goto eat;
     }
+    case GST_EVENT_GAP:
+    {
+      GstClockTime start, duration;
+
+      GST_COLLECT_PADS_STREAM_LOCK (pads);
+
+      gst_event_parse_gap (event, &start, &duration);
+      /* FIXME, handle reverse playback case */
+      if (GST_CLOCK_TIME_IS_VALID (duration))
+        start += duration;
+      /* we do not expect another buffer until after gap,
+       * so that is our position now */
+      data->segment.position = gst_collect_pads_clip_time (pads, data, start);
+
+      gst_collect_pads_handle_position_update (pads, data,
+          data->segment.position);
+
+      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+      goto eat;
+    }
     case GST_EVENT_STREAM_START:
-      /* let the only the first one go through */
-      if (!pads->priv->stream_started) {
-        pads->priv->stream_started = TRUE;
-        goto forward;
-      } else {
-        goto eat;
-      }
-      break;
+      /* drop stream start events, element must create its own start event,
+       * we can't just forward the first random stream start event we get */
+      goto eat;
     case GST_EVENT_CAPS:
-    case GST_EVENT_STREAM_CONFIG:
       goto eat;
     default:
       /* forward other events */
@@ -1869,6 +1868,99 @@ forward:
     return gst_pad_event_default (pad, parent, event);
 }
 
+typedef struct
+{
+  GstEvent *event;
+  gboolean result;
+} EventData;
+
+static gboolean
+event_forward_func (GstPad * pad, EventData * data)
+{
+  gboolean ret = TRUE;
+  GstPad *peer = gst_pad_get_peer (pad);
+
+  if (peer) {
+    ret = gst_pad_send_event (peer, gst_event_ref (data->event));
+    gst_object_unref (peer);
+  }
+
+  data->result &= ret;
+  /* Always send to all pads */
+  return FALSE;
+}
+
+static gboolean
+forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
+{
+  EventData data;
+
+  data.event = event;
+  data.result = TRUE;
+
+  gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
+
+  gst_event_unref (event);
+
+  return data.result;
+}
+
+/**
+ * gst_collect_pads_src_event_default:
+ * @pads: the #GstCollectPads to use
+ * @pad: src #GstPad that received the event
+ * @event: event being processed
+ *
+ * Default #GstCollectPads event handling for the src pad of elements.
+ * Elements can chain up to this to let flushing seek event handling
+ * be done by #GstCollectPads.
+ *
+ * Since: 1.4
+ */
+gboolean
+gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
+    GstEvent * event)
+{
+  GstObject *parent;
+  gboolean res = TRUE;
+
+  parent = GST_OBJECT_PARENT (pad);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:{
+      GstSeekFlags flags;
+
+      pads->priv->eospads = 0;
+
+      GST_INFO_OBJECT (pads, "starting seek");
+
+      gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
+      if (flags & GST_SEEK_FLAG_FLUSH) {
+        g_atomic_int_set (&pads->priv->seeking, TRUE);
+        g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
+        /* forward the seek upstream */
+        res = forward_event_to_all_sinkpads (pad, event);
+        event = NULL;
+        if (!res) {
+          g_atomic_int_set (&pads->priv->seeking, FALSE);
+          g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
+        }
+      }
+
+      GST_INFO_OBJECT (pads, "seek done, result: %d", res);
+
+      break;
+    }
+    default:
+      break;
+  }
+
+  if (event)
+    res = gst_pad_event_default (pad, parent, event);
+
+  return res;
+}
+
 static gboolean
 gst_collect_pads_event_default_internal (GstCollectPads * pads,
     GstCollectData * data, GstEvent * event, gpointer user_data)
@@ -1931,16 +2023,14 @@ pad_removed:
 
 /**
  * gst_collect_pads_query_default:
- * @pads: the collectspads to use
+ * @pads: the collectpads to use
  * @data: collect data of corresponding pad
  * @query: query being processed
  * @discard: process but do not send event downstream
  *
- * Default GstCollectPads query handling that elements should always
+ * Default #GstCollectPads query handling that elements should always
  * chain up to to ensure proper operation.  Element might however indicate
  * query should not be forwarded downstream.
- *
- * Since: 0.11.x
  */
 gboolean
 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
@@ -2039,7 +2129,7 @@ pad_removed:
  * and if so we call the collected function. When this is done we check if
  * data has been unqueued. If data is still queued we wait holding the stream
  * lock to make sure no EOS event can happen while we are ready to be
- * collected 
+ * collected
  */
 static GstFlowReturn
 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
@@ -2103,7 +2193,9 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 
   /* update segment last position if in TIME */
   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
-    GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
+    GstClockTime timestamp;
+
+    timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
 
     if (GST_CLOCK_TIME_IS_VALID (timestamp))
       data->segment.position = timestamp;
@@ -2165,7 +2257,9 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 
 unlock_done:
   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
-  unref_data (data);
+  /* data is definitely NULL if pad_removed goto was run. */
+  if (data)
+    unref_data (data);
   if (buffer)
     gst_buffer_unref (buffer);
   return ret;