Merge remote-tracking branch 'origin/0.10'
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads2.c
index 7bf4ec6..0577232 100644 (file)
@@ -86,9 +86,6 @@
 #  include "config.h"
 #endif
 
-/* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
- * with newer GLib versions (>= 2.31.0) */
-#define GLIB_DISABLE_DEPRECATION_WARNINGS
 #include <gst/gst_private.h>
 
 #include "gstcollectpads2.h"
@@ -133,19 +130,25 @@ struct _GstCollectPads2Private
   gpointer compare_user_data;
   GstCollectPads2EventFunction event_func;      /* function and data for event callback */
   gpointer event_user_data;
+  GstCollectPads2QueryFunction query_func;
+  gpointer query_user_data;
   GstCollectPads2ClipFunction clip_func;
   gpointer clip_user_data;
 
   /* no other lock needed */
-  GMutex *evt_lock;             /* these make up sort of poor man's event signaling */
-  GCond *evt_cond;
+  GMutex evt_lock;              /* these make up sort of poor man's event signaling */
+  GCond evt_cond;
   guint32 evt_cookie;
 };
 
 static void gst_collect_pads2_clear (GstCollectPads2 * pads,
     GstCollectData2 * data);
-static GstFlowReturn gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer);
-static gboolean gst_collect_pads2_event (GstPad * pad, GstEvent * event);
+static GstFlowReturn gst_collect_pads2_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer);
+static gboolean gst_collect_pads2_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_collect_pads2_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 static void gst_collect_pads2_finalize (GObject * object);
 static GstFlowReturn gst_collect_pads2_default_collected (GstCollectPads2 *
     pads, gpointer user_data);
@@ -156,14 +159,20 @@ static gboolean gst_collect_pads2_recalculate_full (GstCollectPads2 * pads);
 static void ref_data (GstCollectData2 * data);
 static void unref_data (GstCollectData2 * data);
 
+static gboolean gst_collect_pads2_event_default_internal (GstCollectPads2 *
+    pads, GstCollectData2 * data, GstEvent * event, gpointer user_data);
+static gboolean gst_collect_pads2_query_default_internal (GstCollectPads2 *
+    pads, GstCollectData2 * data, GstQuery * query, gpointer user_data);
+
+
 /* Some properties are protected by LOCK, others by STREAM_LOCK
  * However, manipulating either of these partitions may require
  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
  * Alternative implementations are possible, e.g. some low-level re-implementing
  * of the 2 above locks to drop both of them atomically when going into _WAIT.
  */
-#define GST_COLLECT_PADS2_GET_EVT_COND(pads) (((GstCollectPads2 *)pads)->priv->evt_cond)
-#define GST_COLLECT_PADS2_GET_EVT_LOCK(pads) (((GstCollectPads2 *)pads)->priv->evt_lock)
+#define GST_COLLECT_PADS2_GET_EVT_COND(pads) (&((GstCollectPads2 *)pads)->priv->evt_cond)
+#define GST_COLLECT_PADS2_GET_EVT_LOCK(pads) (&((GstCollectPads2 *)pads)->priv->evt_lock)
 #define GST_COLLECT_PADS2_EVT_WAIT(pads, cookie) G_STMT_START {    \
   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
   /* should work unless a lot of event'ing and thread starvation */\
@@ -227,7 +236,7 @@ gst_collect_pads2_init (GstCollectPads2 * pads)
   pads->priv->eospads = 0;
   pads->priv->started = FALSE;
 
-  g_static_rec_mutex_init (&pads->stream_lock);
+  g_rec_mutex_init (&pads->stream_lock);
 
   pads->priv->func = gst_collect_pads2_default_collected;
   pads->priv->user_data = NULL;
@@ -242,13 +251,16 @@ gst_collect_pads2_init (GstCollectPads2 * pads)
   pads->priv->earliest_data = NULL;
   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
 
+  pads->priv->event_func = gst_collect_pads2_event_default_internal;
+  pads->priv->query_func = gst_collect_pads2_query_default_internal;
+
   /* members to manage the pad list */
   pads->priv->pad_cookie = 0;
   pads->priv->pad_list = NULL;
 
   /* members for event */
-  pads->priv->evt_lock = g_mutex_new ();
-  pads->priv->evt_cond = g_cond_new ();
+  g_mutex_init (&pads->priv->evt_lock);
+  g_cond_init (&pads->priv->evt_cond);
   pads->priv->evt_cookie = 0;
 }
 
@@ -259,10 +271,10 @@ gst_collect_pads2_finalize (GObject * object)
 
   GST_DEBUG_OBJECT (object, "finalize");
 
-  g_static_rec_mutex_free (&pads->stream_lock);
+  g_rec_mutex_clear (&pads->stream_lock);
 
-  g_cond_free (pads->priv->evt_cond);
-  g_mutex_free (pads->priv->evt_lock);
+  g_cond_clear (&pads->priv->evt_cond);
+  g_mutex_clear (&pads->priv->evt_lock);
 
   /* Remove pads and free pads list */
   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
@@ -425,8 +437,8 @@ unref_data (GstCollectData2 * data)
  * @func: the function to set
  * @user_data: user data passed to the function
  *
- * Set the event callback function and user data that will be called after
- * collectpads has processed and event originating from one of the collected
+ * Set the event callback function and user data that will be called when
+ * collectpads has received an event originating from one of the collected
  * pads.  If the event being processed is a serialized one, this callback is
  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
  * held when calling a number of CollectPads functions, it should be acquired
@@ -450,6 +462,36 @@ gst_collect_pads2_set_event_function (GstCollectPads2 * pads,
 }
 
 /**
+ * gst_collect_pads2_set_query_function:
+ * @pads: the collectspads to use
+ * @func: the function to set
+ * @user_data: user data passed to the function
+ *
+ * Set the query callback function and user data that will be called after
+ * collectpads has received a query originating from one of the collected
+ * pads.  If the query being processed is a serialized one, this callback is
+ * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
+ * held when calling a number of CollectPads functions, it should be acquired
+ * if so (unusually) needed.
+ *
+ * MT safe.
+ *
+ * Since: 0.10.36
+ */
+void
+gst_collect_pads2_set_query_function (GstCollectPads2 * pads,
+    GstCollectPads2QueryFunction func, gpointer user_data)
+{
+  g_return_if_fail (pads != NULL);
+  g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
+
+  GST_OBJECT_LOCK (pads);
+  pads->priv->query_func = func;
+  pads->priv->query_user_data = user_data;
+  GST_OBJECT_UNLOCK (pads);
+}
+
+/**
 * gst_collect_pads2_clip_running_time:
 * @pads: the collectspads to use
 * @cdata: collect data of corresponding pad
@@ -483,7 +525,7 @@ gst_collect_pads2_clip_running_time (GstCollectPads2 * pads,
       GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
           GST_TIME_FORMAT " running time",
           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
-      *outbuf = gst_buffer_make_metadata_writable (buf);
+      *outbuf = gst_buffer_make_writable (buf);
       GST_BUFFER_TIMESTAMP (*outbuf) = time;
     }
   }
@@ -619,6 +661,7 @@ gst_collect_pads2_add_pad_full (GstCollectPads2 * pads, GstPad * pad,
   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_chain));
   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_event));
+  gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_query));
   /* backward compat, also add to data if stopped, so that the element already
    * has this in the public data list before going PAUSED (typically)
    * this can only be done when we are stopped because we don't take the
@@ -1116,7 +1159,7 @@ gst_collect_pads2_available (GstCollectPads2 * pads)
     }
 
     /* this is the size left of the buffer */
-    size = GST_BUFFER_SIZE (buffer) - pdata->pos;
+    size = gst_buffer_get_size (buffer) - pdata->pos;
     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
 
     /* need to return the min of all available data */
@@ -1136,50 +1179,6 @@ not_filled:
 }
 
 /**
- * gst_collect_pads2_read:
- * @pads: the collectspads to query
- * @data: the data to use
- * @bytes: (out) (transfer none) (array length=size): a pointer to a byte array
- * @size: the number of bytes to read
- *
- * Get a pointer in @bytes where @size bytes can be read from the
- * given pad data.
- *
- * This function should be called with @pads STREAM_LOCK held, such as
- * in the callback.
- *
- * MT safe.
- *
- * Returns: The number of bytes available for consumption in the
- * memory pointed to by @bytes. This can be less than @size and
- * is 0 if the pad is end-of-stream.
- *
- * Since: 0.10.36
- */
-guint
-gst_collect_pads2_read (GstCollectPads2 * pads, GstCollectData2 * data,
-    guint8 ** bytes, guint size)
-{
-  guint readsize;
-  GstBuffer *buffer;
-
-  g_return_val_if_fail (pads != NULL, 0);
-  g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
-  g_return_val_if_fail (data != NULL, 0);
-  g_return_val_if_fail (bytes != NULL, 0);
-
-  /* no buffer, must be EOS */
-  if ((buffer = data->buffer) == NULL)
-    return 0;
-
-  readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
-
-  *bytes = GST_BUFFER_DATA (buffer) + data->pos;
-
-  return readsize;
-}
-
-/**
  * gst_collect_pads2_flush:
  * @pads: the collectspads to query
  * @data: the data to use
@@ -1202,6 +1201,7 @@ gst_collect_pads2_flush (GstCollectPads2 * pads, GstCollectData2 * data,
     guint size)
 {
   guint flushsize;
+  gsize bsize;
   GstBuffer *buffer;
 
   g_return_val_if_fail (pads != NULL, 0);
@@ -1212,12 +1212,14 @@ gst_collect_pads2_flush (GstCollectPads2 * pads, GstCollectData2 * data,
   if ((buffer = data->buffer) == NULL)
     return 0;
 
+  bsize = gst_buffer_get_size (buffer);
+
   /* this is what we can flush at max */
-  flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
+  flushsize = MIN (size, bsize - data->pos);
 
   data->pos += size;
 
-  if (data->pos >= GST_BUFFER_SIZE (buffer))
+  if (data->pos >= bsize)
     /* _clear will also reset data->pos to 0 */
     gst_collect_pads2_clear (pads, data);
 
@@ -1258,9 +1260,10 @@ gst_collect_pads2_read_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
   if ((buffer = data->buffer) == NULL)
     return NULL;
 
-  readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
+  readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
 
-  return gst_buffer_create_sub (buffer, data->pos, readsize);
+  return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
+      readsize);
 }
 
 /**
@@ -1290,7 +1293,7 @@ gst_collect_pads2_take_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
   GstBuffer *buffer = gst_collect_pads2_read_buffer (pads, data, size);
 
   if (buffer) {
-    gst_collect_pads2_flush (pads, data, GST_BUFFER_SIZE (buffer));
+    gst_collect_pads2_flush (pads, data, gst_buffer_get_size (buffer));
   }
   return buffer;
 }
@@ -1328,7 +1331,7 @@ gst_collect_pads2_set_waiting (GstCollectPads2 * pads, GstCollectData2 * data,
   /* Do something only on a change and if not locked */
   if (!GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED) &&
       (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING) !=
-          !!waiting)) {
+          ! !waiting)) {
     /* Set waiting state for this pad */
     if (waiting)
       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_WAITING);
@@ -1497,8 +1500,6 @@ gst_collect_pads2_recalculate_waiting (GstCollectPads2 * pads)
       GST_WARNING_OBJECT (pads,
           "GstCollectPads2 has no time segment, assuming 0 based.");
       gst_segment_init (&data->segment, GST_FORMAT_TIME);
-      gst_segment_set_newsegment (&data->segment, FALSE, 1.0f,
-          GST_FORMAT_TIME, 0, -1, 0);
       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
     }
 
@@ -1636,7 +1637,7 @@ gst_collect_pads2_default_collected (GstCollectPads2 * pads, gpointer user_data)
   if (G_UNLIKELY (best == NULL)) {
     ret = func (pads, best, NULL, buffer_user_data);
     if (ret == GST_FLOW_OK)
-      ret = GST_FLOW_UNEXPECTED;
+      ret = GST_FLOW_EOS;
     goto done;
   }
 
@@ -1687,50 +1688,44 @@ gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
   return 0;
 }
 
-static gboolean
-gst_collect_pads2_event (GstPad * pad, GstEvent * event)
+/**
+ * gst_collect_pads2_event_default:
+ * @pads: the collectspads to use
+ * @data: collect data of corresponding pad
+ * @event: event being processed
+ * @discard: process but do not send event downstream
+ *
+ * Default GstCollectPads2 event handling that elements should always
+ * chain up to to ensure proper operation.  Element might however indicate
+ * event should not be forwarded downstream.
+ *
+ * Since: 0.11.x
+ */
+gboolean
+gst_collect_pads2_event_default (GstCollectPads2 * pads, GstCollectData2 * data,
+    GstEvent * event, gboolean discard)
 {
-  gboolean res = FALSE, need_unlock = FALSE;
-  GstCollectData2 *data;
-  GstCollectPads2 *pads;
-  GstCollectPads2EventFunction event_func;
+  gboolean res = TRUE;
   GstCollectPads2BufferFunction buffer_func;
-  gpointer event_user_data;
-
-  /* some magic to get the managing collect_pads2 */
-  GST_OBJECT_LOCK (pad);
-  data = (GstCollectData2 *) gst_pad_get_element_private (pad);
-  if (G_UNLIKELY (data == NULL))
-    goto pad_removed;
-  ref_data (data);
-  GST_OBJECT_UNLOCK (pad);
-
-  res = FALSE;
-
-  pads = data->collect;
-
-  GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad from %s",
-      GST_EVENT_TYPE_NAME (event), GST_OBJECT_NAME (GST_EVENT_SRC (event)));
+  GstObject *parent;
+  GstPad *pad;
 
   GST_OBJECT_LOCK (pads);
-  event_func = pads->priv->event_func;
-  event_user_data = pads->priv->event_user_data;
   buffer_func = pads->priv->buffer_func;
   GST_OBJECT_UNLOCK (pads);
 
+  pad = data->pad;
+  parent = GST_OBJECT_PARENT (pad);
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
       /* forward event to unblock check_collected */
-      if (event_func)
-        res = event_func (pads, data, event, event_user_data);
-      if (!res) {
-        GST_DEBUG_OBJECT (pad, "forwarding flush start");
-        res = gst_pad_event_default (pad, event);
-      }
+      GST_DEBUG_OBJECT (pad, "forwarding flush start");
+      res = gst_pad_event_default (pad, parent, event);
 
       /* now unblock the chain function.
-       * no cond per pad, so they all unblock, 
+       * no cond per pad, so they all unblock,
        * non-flushing block again */
       GST_COLLECT_PADS2_STREAM_LOCK (pads);
       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
@@ -1750,9 +1745,7 @@ gst_collect_pads2_event (GstPad * pad, GstEvent * event)
 
       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
 
-      /* event already cleaned up by forwarding */
-      res = TRUE;
-      goto done;
+      goto eat;
     }
     case GST_EVENT_FLUSH_STOP:
     {
@@ -1775,8 +1768,7 @@ gst_collect_pads2_event (GstPad * pad, GstEvent * event)
       }
       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
 
-      /* forward event */
-      goto forward_or_default;
+      goto forward;
     }
     case GST_EVENT_EOS:
     {
@@ -1795,36 +1787,27 @@ gst_collect_pads2_event (GstPad * pad, GstEvent * event)
       gst_collect_pads2_check_collected (pads);
       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
 
-      goto forward_or_eat;
+      goto eat;
     }
-    case GST_EVENT_NEWSEGMENT:
+    case GST_EVENT_SEGMENT:
     {
-      gint64 start, stop, time;
-      gdouble rate, arate;
-      GstFormat format;
-      gboolean update;
+      GstSegment seg;
       gint cmp_res;
 
       GST_COLLECT_PADS2_STREAM_LOCK (pads);
 
-      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
-          &start, &stop, &time);
+      gst_event_copy_segment (event, &seg);
 
-      GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
-          ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
-          GST_TIME_ARGS (stop));
+      GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
 
       /* default collection can not handle other segment formats than time */
-      if (buffer_func && format != GST_FORMAT_TIME) {
+      if (buffer_func && seg.format != GST_FORMAT_TIME) {
         GST_WARNING_OBJECT (pads, "GstCollectPads2 default collecting "
             "can only handle time segments. Non time segment ignored.");
         goto newsegment_done;
       }
 
-      /* accept segment */
-      gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
-          format, start, stop, time);
-
+      data->segment = seg;
       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
 
       /* default muxing functionality */
@@ -1840,7 +1823,7 @@ gst_collect_pads2_event (GstPad * pad, GstEvent * event)
 
       /* Check if the waiting state of the pad should change. */
       cmp_res =
-          pads->priv->compare_func (pads, data, start,
+          pads->priv->compare_func (pads, data, seg.start,
           pads->priv->earliest_data, pads->priv->earliest_time,
           pads->priv->compare_user_data);
 
@@ -1852,44 +1835,181 @@ gst_collect_pads2_event (GstPad * pad, GstEvent * event)
       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
       /* we must not forward this event since multiple segments will be
        * accumulated and this is certainly not what we want. */
-      goto forward_or_eat;
+      goto eat;
     }
+    case GST_EVENT_CAPS:
+    case GST_EVENT_STREAM_START:
+    case GST_EVENT_STREAM_CONFIG:
+      goto eat;
     default:
       /* forward other events */
-      goto forward_or_default;
+      goto forward;
   }
 
-forward_or_default:
+eat:
+  gst_event_unref (event);
+  return res;
+
+forward:
+  if (discard)
+    goto eat;
+  else
+    return gst_pad_event_default (pad, parent, event);
+}
+
+static gboolean
+gst_collect_pads2_event_default_internal (GstCollectPads2 * pads,
+    GstCollectData2 * data, GstEvent * event, gpointer user_data)
+{
+  return gst_collect_pads2_event_default (pads, data, event, FALSE);
+}
+
+static gboolean
+gst_collect_pads2_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  gboolean res = FALSE, need_unlock = FALSE;
+  GstCollectData2 *data;
+  GstCollectPads2 *pads;
+  GstCollectPads2EventFunction event_func;
+  gpointer event_user_data;
+
+  /* some magic to get the managing collect_pads2 */
+  GST_OBJECT_LOCK (pad);
+  data = (GstCollectData2 *) gst_pad_get_element_private (pad);
+  if (G_UNLIKELY (data == NULL))
+    goto pad_removed;
+  ref_data (data);
+  GST_OBJECT_UNLOCK (pad);
+
+  res = FALSE;
+
+  pads = data->collect;
+
+  GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
+      GST_EVENT_TYPE_NAME (event));
+
+  GST_OBJECT_LOCK (pads);
+  event_func = pads->priv->event_func;
+  event_user_data = pads->priv->event_user_data;
+  GST_OBJECT_UNLOCK (pads);
+
   if (GST_EVENT_IS_SERIALIZED (event)) {
     GST_COLLECT_PADS2_STREAM_LOCK (pads);
     need_unlock = TRUE;
   }
-  if (event_func)
+
+  if (G_LIKELY (event_func)) {
     res = event_func (pads, data, event, event_user_data);
-  if (!res) {
-    GST_DEBUG_OBJECT (pad, "forwarding %s", GST_EVENT_TYPE_NAME (event));
-    res = gst_pad_event_default (pad, event);
   }
+
   if (need_unlock)
     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
-  goto done;
 
-forward_or_eat:
-  if (GST_EVENT_IS_SERIALIZED (event)) {
+  unref_data (data);
+  return res;
+
+  /* ERRORS */
+pad_removed:
+  {
+    GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
+    GST_OBJECT_UNLOCK (pad);
+    return FALSE;
+  }
+}
+
+/**
+ * gst_collect_pads2_query_default:
+ * @pads: the collectspads to use
+ * @data: collect data of corresponding pad
+ * @query: query being processed
+ * @discard: process but do not send event downstream
+ *
+ * Default GstCollectPads2 query handling that elements should always
+ * chain up to to ensure proper operation.  Element might however indicate
+ * query should not be forwarded downstream.
+ *
+ * Since: 0.11.x
+ */
+gboolean
+gst_collect_pads2_query_default (GstCollectPads2 * pads, GstCollectData2 * data,
+    GstQuery * query, gboolean discard)
+{
+  gboolean res = TRUE;
+  GstObject *parent;
+  GstPad *pad;
+
+  pad = data->pad;
+  parent = GST_OBJECT_PARENT (pad);
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_SEEKING:
+    {
+      GstFormat format;
+
+      /* don't pass it along as some (file)sink might claim it does
+       * whereas with a collectpads in between that will not likely work */
+      gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
+      gst_query_set_seeking (query, format, FALSE, 0, -1);
+      res = TRUE;
+      discard = TRUE;
+      break;
+    }
+    default:
+      break;
+  }
+
+  if (!discard)
+    return gst_pad_query_default (pad, parent, query);
+  else
+    return res;
+}
+
+static gboolean
+gst_collect_pads2_query_default_internal (GstCollectPads2 * pads,
+    GstCollectData2 * data, GstQuery * query, gpointer user_data)
+{
+  return gst_collect_pads2_query_default (pads, data, query, FALSE);
+}
+
+static gboolean
+gst_collect_pads2_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+  gboolean res = FALSE, need_unlock = FALSE;
+  GstCollectData2 *data;
+  GstCollectPads2 *pads;
+  GstCollectPads2QueryFunction query_func;
+  gpointer query_user_data;
+
+  GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
+      GST_QUERY_TYPE_NAME (query));
+
+  /* some magic to get the managing collect_pads2 */
+  GST_OBJECT_LOCK (pad);
+  data = (GstCollectData2 *) gst_pad_get_element_private (pad);
+  if (G_UNLIKELY (data == NULL))
+    goto pad_removed;
+  ref_data (data);
+  GST_OBJECT_UNLOCK (pad);
+
+  pads = data->collect;
+
+  GST_OBJECT_LOCK (pads);
+  query_func = pads->priv->query_func;
+  query_user_data = pads->priv->query_user_data;
+  GST_OBJECT_UNLOCK (pads);
+
+  if (GST_QUERY_IS_SERIALIZED (query)) {
     GST_COLLECT_PADS2_STREAM_LOCK (pads);
     need_unlock = TRUE;
   }
-  if (event_func)
-    res = event_func (pads, data, event, event_user_data);
-  if (!res) {
-    gst_event_unref (event);
-    res = TRUE;
+
+  if (G_LIKELY (query_func)) {
+    res = query_func (pads, data, query, query_user_data);
   }
+
   if (need_unlock)
     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
-  goto done;
 
-done:
   unref_data (data);
   return res;
 
@@ -1902,6 +2022,7 @@ pad_removed:
   }
 }
 
+
 /* For each buffer we receive we check if our collected condition is reached
  * and if so we call the collected function. When this is done we check if
  * data has been unqueued. If data is still queued we wait holding the stream
@@ -1909,7 +2030,7 @@ pad_removed:
  * collected 
  */
 static GstFlowReturn
-gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer)
+gst_collect_pads2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstCollectData2 *data;
   GstCollectPads2 *pads;
@@ -1940,7 +2061,7 @@ gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer)
   /* pad was EOS, we can refuse this data */
   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
               GST_COLLECT_PADS2_STATE_EOS)))
-    goto unexpected;
+    goto eos;
 
   /* see if we need to clip */
   if (pads->priv->clip_func) {
@@ -1953,8 +2074,8 @@ gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer)
     if (G_UNLIKELY (outbuf == NULL))
       goto clipped;
 
-    if (G_UNLIKELY (ret == GST_FLOW_UNEXPECTED))
-      goto unexpected;
+    if (G_UNLIKELY (ret == GST_FLOW_EOS))
+      goto eos;
     else if (G_UNLIKELY (ret != GST_FLOW_OK))
       goto error;
   }
@@ -1973,7 +2094,7 @@ gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer)
     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
 
     if (GST_CLOCK_TIME_IS_VALID (timestamp))
-      gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
+      data->segment.position = timestamp;
   }
 
   /* While we have data queued on this pad try to collect stuff */
@@ -2056,22 +2177,22 @@ not_started:
   {
     GST_DEBUG ("not started");
     gst_collect_pads2_clear (pads, data);
-    ret = GST_FLOW_WRONG_STATE;
+    ret = GST_FLOW_FLUSHING;
     goto unlock_done;
   }
 flushing:
   {
     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
     gst_collect_pads2_clear (pads, data);
-    ret = GST_FLOW_WRONG_STATE;
+    ret = GST_FLOW_FLUSHING;
     goto unlock_done;
   }
-unexpected:
+eos:
   {
     /* we should not post an error for this, just inform upstream that
      * we don't expect anything anymore */
     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
-    ret = GST_FLOW_UNEXPECTED;
+    ret = GST_FLOW_EOS;
     goto unlock_done;
   }
 clipped: