task: add GDestroyNotify to _new
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
index 424f1c5..1908188 100644 (file)
@@ -41,7 +41,7 @@
  * The default queue size limits are 100 buffers, 2MB of data, or
  * two seconds worth of data, whichever is reached first.
  *
- * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
+ * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
  * will allocate a random free filename and buffer data in the file.
  * By using this, it will buffer the entire stream data on the file independently
  * of the queue size limits, they will only be used for buffering statistics.
@@ -63,6 +63,7 @@
 #include <glib/gstdio.h>
 
 #include "gst/gst-i18n-lib.h"
+#include "gst/glib-compat-private.h"
 
 #include <string.h>
 
@@ -158,7 +159,7 @@ enum
                         queue->queue.length))
 
 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
-  g_mutex_lock (q->qlock);                                              \
+  g_mutex_lock (&q->qlock);                                              \
 } G_STMT_END
 
 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
@@ -168,13 +169,13 @@ enum
 } G_STMT_END
 
 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
-  g_mutex_unlock (q->qlock);                                            \
+  g_mutex_unlock (&q->qlock);                                            \
 } G_STMT_END
 
 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
   STATUS (queue, q->sinkpad, "wait for DEL");                           \
   q->waiting_del = TRUE;                                                \
-  g_cond_wait (q->item_del, queue->qlock);                              \
+  g_cond_wait (&q->item_del, &queue->qlock);                              \
   q->waiting_del = FALSE;                                               \
   if (res != GST_FLOW_OK) {                                             \
     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
@@ -186,7 +187,7 @@ enum
 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
   STATUS (queue, q->srcpad, "wait for ADD");                            \
   q->waiting_add = TRUE;                                                \
-  g_cond_wait (q->item_add, q->qlock);                                  \
+  g_cond_wait (&q->item_add, &q->qlock);                                  \
   q->waiting_add = FALSE;                                               \
   if (res != GST_FLOW_OK) {                                             \
     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
@@ -198,14 +199,14 @@ enum
 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
   if (q->waiting_del) {                                                 \
     STATUS (q, q->srcpad, "signal DEL");                                \
-    g_cond_signal (q->item_del);                                        \
+    g_cond_signal (&q->item_del);                                        \
   }                                                                     \
 } G_STMT_END
 
 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
   if (q->waiting_add) {                                                 \
     STATUS (q, q->sinkpad, "signal ADD");                               \
-    g_cond_signal (q->item_add);                                        \
+    g_cond_signal (&q->item_add);                                        \
   }                                                                     \
 } G_STMT_END
 
@@ -223,24 +224,32 @@ static void gst_queue2_set_property (GObject * object,
 static void gst_queue2_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 
-static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
 static void gst_queue2_loop (GstPad * pad);
 
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 
-static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 static gboolean gst_queue2_handle_query (GstElement * element,
     GstQuery * query);
 
-static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
-    guint length, GstBuffer ** buffer);
+static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
+    guint64 offset, guint length, GstBuffer ** buffer);
 
-static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
-static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
+static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active);
+static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active);
 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
     GstStateChange transition);
 
@@ -249,6 +258,14 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
 
 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
 
+typedef enum
+{
+  GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
+  GST_QUEUE2_ITEM_TYPE_BUFFER,
+  GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
+  GST_QUEUE2_ITEM_TYPE_EVENT
+} GstQueue2ItemType;
+
 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
 
 static void
@@ -356,7 +373,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
 
-  gst_element_class_set_details_simple (gstelement_class, "Queue 2",
+  gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
       "Generic",
       "Simple data queue",
       "Erik Walthinsen <omega@cse.ogi.edu>, "
@@ -373,28 +390,28 @@ gst_queue2_init (GstQueue2 * queue)
 
   gst_pad_set_chain_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_chain));
-  gst_pad_set_activatepush_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
+  gst_pad_set_chain_list_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
+  gst_pad_set_activatemode_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
   gst_pad_set_event_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
   gst_pad_set_query_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
-  GST_OBJECT_FLAG_SET (queue->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
+  GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
 
   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
 
-  gst_pad_set_activatepull_function (queue->srcpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
-  gst_pad_set_activatepush_function (queue->srcpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
+  gst_pad_set_activatemode_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
   gst_pad_set_getrange_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
   gst_pad_set_event_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
   gst_pad_set_query_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
-  GST_OBJECT_FLAG_SET (queue->srcpad, GST_PAD_FLAG_PROXY_CAPS);
+  GST_PAD_SET_PROXY_CAPS (queue->srcpad);
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
 
   /* levels */
@@ -416,17 +433,17 @@ gst_queue2_init (GstQueue2 * queue)
   queue->sink_tainted = TRUE;
   queue->src_tainted = TRUE;
 
-  queue->srcresult = GST_FLOW_WRONG_STATE;
-  queue->sinkresult = GST_FLOW_WRONG_STATE;
+  queue->srcresult = GST_FLOW_FLUSHING;
+  queue->sinkresult = GST_FLOW_FLUSHING;
   queue->is_eos = FALSE;
   queue->in_timer = g_timer_new ();
   queue->out_timer = g_timer_new ();
 
-  queue->qlock = g_mutex_new ();
+  g_mutex_init (&queue->qlock);
   queue->waiting_add = FALSE;
-  queue->item_add = g_cond_new ();
+  g_cond_init (&queue->item_add);
   queue->waiting_del = FALSE;
-  queue->item_del = g_cond_new ();
+  g_cond_init (&queue->item_del);
   g_queue_init (&queue->queue);
 
   queue->buffering_percent = 100;
@@ -459,9 +476,9 @@ gst_queue2_finalize (GObject * object)
   }
 
   g_queue_clear (&queue->queue);
-  g_mutex_free (queue->qlock);
-  g_cond_free (queue->item_add);
-  g_cond_free (queue->item_del);
+  g_mutex_clear (&queue->qlock);
+  g_cond_clear (&queue->item_add);
+  g_cond_clear (&queue->item_del);
   g_timer_destroy (queue->in_timer);
   g_timer_destroy (queue->out_timer);
 
@@ -644,11 +661,9 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
   gst_event_copy_segment (event, segment);
 
   if (segment->format == GST_FORMAT_BYTES) {
-    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
       /* start is where we'll be getting from and as such writing next */
       queue->current = add_range (queue, segment->start);
-      /* update the stats for this range */
-      update_cur_level (queue, queue->current);
     }
   }
 
@@ -707,6 +722,52 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
+{
+  GstClockTime *timestamp = data;
+
+  GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+    *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+  return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->position;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  segment->position = timestamp;
+
+  if (is_sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 update_buffering (GstQueue2 * queue)
 {
@@ -942,6 +1003,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
   GstEvent *event;
   gboolean res;
 
+  /* until we receive the FLUSH_STOP from this seek, we skip data */
+  queue->seeking = TRUE;
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
@@ -994,20 +1057,27 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
           (offset + length) - range->writing_pos);
 
   } else {
-    GST_INFO_OBJECT (queue, "not found in any range");
-    /* we don't have the range, see how far away we are, FIXME, find a good
-     * threshold based on the incoming rate. */
+    GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
+        " len %u", offset, length);
+    /* we don't have the range, see how far away we are */
     if (!queue->is_eos && queue->current) {
+      /* FIXME, find a good threshold based on the incoming rate. */
+      guint64 threshold = 1024 * 512;
+
       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-        if (offset < queue->current->offset || offset >
-            queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
-            queue->cur_level.bytes) {
-          perform_seek_to_offset (queue, offset);
-        } else {
+        guint64 distance;
+
+        distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+        /* don't wait for the complete buffer to fill */
+        distance = MIN (distance, threshold);
+
+        if (offset >= queue->current->offset && offset <=
+            queue->current->writing_pos + distance) {
           GST_INFO_OBJECT (queue,
               "requested data is within range, wait for data");
+          return FALSE;
         }
-      } else if (offset < queue->current->writing_pos + 200000) {
+      } else if (offset < queue->current->writing_pos + threshold) {
         update_cur_pos (queue, queue->current, offset + length);
         GST_INFO_OBJECT (queue, "wait for data");
         return FALSE;
@@ -1089,22 +1159,30 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     GstBuffer ** buffer)
 {
   GstBuffer *buf;
+  GstMapInfo info;
   guint8 *data;
   guint64 file_offset;
   guint block_length, remaining, read_length;
   guint64 rb_size;
+  guint64 max_size;
   guint64 rpos;
   GstFlowReturn ret = GST_FLOW_OK;
 
   /* allocate the output buffer of the requested size */
-  buf = gst_buffer_new_allocate (NULL, length, 0);
-  data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
+  if (*buffer == NULL)
+    buf = gst_buffer_new_allocate (NULL, length, NULL);
+  else
+    buf = *buffer;
+
+  gst_buffer_map (buf, &info, GST_MAP_WRITE);
+  data = info.data;
 
   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
       offset);
 
   rpos = offset;
   rb_size = queue->ring_buffer_max_size;
+  max_size = QUEUE_MAX_BYTES (queue);
 
   remaining = length;
   while (remaining > 0) {
@@ -1123,16 +1201,16 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
         GST_DEBUG_OBJECT (queue,
             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
-            ", level %" G_GUINT64_FORMAT,
-            rpos, queue->current->writing_pos, level);
+            ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+            rpos, queue->current->writing_pos, level, max_size);
 
-        if (level >= rb_size) {
+        if (level >= max_size) {
           /* we don't have the data but if we have a ring buffer that is full, we
            * need to read */
           GST_DEBUG_OBJECT (queue,
-              "ring buffer full, reading ring-buffer-max-size %"
-              G_GUINT64_FORMAT " bytes", rb_size);
-          read_length = rb_size;
+              "ring buffer full, reading QUEUE_MAX_BYTES %"
+              G_GUINT64_FORMAT " bytes", max_size);
+          read_length = max_size;
         } else if (queue->is_eos) {
           /* won't get any more data so read any data we have */
           if (level) {
@@ -1140,21 +1218,20 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
                 level);
             read_length = level;
+            remaining = level;
+            length = level;
           } else
             goto hit_eos;
         }
       }
 
       if (read_length == 0) {
-        if (QUEUE_IS_USING_RING_BUFFER (queue)
-            && queue->current->max_reading_pos > rpos) {
-          /* protect cached data (data between offset and max_reading_pos)
-           * and update current level */
+        if (QUEUE_IS_USING_RING_BUFFER (queue)) {
           GST_DEBUG_OBJECT (queue,
-              "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
-              "]", rpos, queue->current->max_reading_pos);
-          queue->current->max_reading_pos = rpos;
-          update_cur_level (queue, queue->current);
+              "update current position [%" G_GUINT64_FORMAT "-%"
+              G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
+          update_cur_pos (queue, queue->current, rpos);
+          GST_QUEUE2_SIGNAL_DEL (queue);
         }
         GST_DEBUG_OBJECT (queue, "waiting for add");
         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
@@ -1209,7 +1286,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
   }
 
-  gst_buffer_unmap (buf, data, length);
+  gst_buffer_unmap (buf, &info);
+  gst_buffer_resize (buf, 0, length);
 
   GST_BUFFER_OFFSET (buf) = offset;
   GST_BUFFER_OFFSET_END (buf) = offset + length;
@@ -1222,20 +1300,25 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 hit_eos:
   {
     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
-    gst_buffer_unref (buf);
+    gst_buffer_unmap (buf, &info);
+    if (*buffer == NULL)
+      gst_buffer_unref (buf);
     return GST_FLOW_EOS;
   }
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
-    gst_buffer_unref (buf);
-    return GST_FLOW_WRONG_STATE;
+    gst_buffer_unmap (buf, &info);
+    if (*buffer == NULL)
+      gst_buffer_unref (buf);
+    return GST_FLOW_FLUSHING;
   }
 read_error:
   {
     GST_DEBUG_OBJECT (queue, "we have a read error");
-    gst_buffer_unmap (buf, data, 0);
-    gst_buffer_unref (buf);
+    gst_buffer_unmap (buf, &info);
+    if (*buffer == NULL)
+      gst_buffer_unref (buf);
     return ret;
   }
 }
@@ -1246,12 +1329,15 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
 {
   GstMiniObject *item;
 
-  if (queue->starting_segment != NULL) {
+  if (queue->stream_start_event != NULL) {
+    item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
+    queue->stream_start_event = NULL;
+  } else if (queue->starting_segment != NULL) {
     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
     queue->starting_segment = NULL;
   } else {
     GstFlowReturn ret;
-    GstBuffer *buffer;
+    GstBuffer *buffer = NULL;
     guint64 reading_pos;
 
     reading_pos = queue->current->reading_pos;
@@ -1417,6 +1503,7 @@ gst_queue2_locked_flush (GstQueue2 * queue)
     gst_event_unref (queue->starting_segment);
   queue->starting_segment = NULL;
   queue->segment_event_received = FALSE;
+  gst_event_replace (&queue->stream_start_event, NULL);
 
   /* we deleted a lot of something */
   GST_QUEUE2_SIGNAL_DEL (queue);
@@ -1460,9 +1547,9 @@ out_flushing:
 static gboolean
 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
 {
-  guint8 *odata, *data, *ring_buffer;
+  GstMapInfo info;
+  guint8 *data, *ring_buffer;
   guint size, rb_size;
-  gsize osize;
   guint64 writing_pos, new_writing_pos;
   GstQueue2Range *range, *prev, *next;
 
@@ -1473,13 +1560,13 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
   ring_buffer = queue->ring_buffer;
   rb_size = queue->ring_buffer_max_size;
 
-  odata = gst_buffer_map (buffer, &osize, NULL, GST_MAP_READ);
+  gst_buffer_map (buffer, &info, GST_MAP_READ);
 
-  size = osize;
-  data = odata;
+  size = info.size;
+  data = info.data;
 
   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
-      GST_BUFFER_OFFSET (buffer));
+      writing_pos);
 
   while (size > 0) {
     guint to_write;
@@ -1700,7 +1787,7 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
     GST_QUEUE2_SIGNAL_ADD (queue);
   }
 
-  gst_buffer_unmap (buffer, odata, osize);
+  gst_buffer_unmap (buffer, &info);
 
   return TRUE;
 
@@ -1708,14 +1795,14 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
-    gst_buffer_unmap (buffer, odata, osize);
+    gst_buffer_unmap (buffer, &info);
     /* FIXME - GST_FLOW_EOS ? */
     return FALSE;
   }
 seek_failed:
   {
     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
-    gst_buffer_unmap (buffer, odata, osize);
+    gst_buffer_unmap (buffer, &info);
     return FALSE;
   }
 handle_error:
@@ -1731,16 +1818,45 @@ handle_error:
             ("%s", g_strerror (errno)));
       }
     }
-    gst_buffer_unmap (buffer, odata, osize);
+    gst_buffer_unmap (buffer, &info);
     return FALSE;
   }
 }
 
+static gboolean
+buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
+{
+  GstQueue2 *queue = q;
+
+  GST_TRACE_OBJECT (queue,
+      "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
+      gst_buffer_get_size (*buf));
+
+  if (!gst_queue2_create_write (queue, *buf)) {
+    GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
+    return FALSE;
+  }
+  return TRUE;
+}
+
+static gboolean
+buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
+{
+  guint *p_size = data;
+  gsize buf_size;
+
+  buf_size = gst_buffer_get_size (*buf);
+  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
+  *p_size += buf_size;
+  return TRUE;
+}
+
 /* enqueue an item an update the level stats */
 static void
-gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
+gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
+    GstQueue2ItemType item_type)
 {
-  if (isbuffer) {
+  if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
     guint size;
 
@@ -1763,7 +1879,32 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
       /* FIXME - check return value? */
       gst_queue2_create_write (queue, buffer);
     }
-  } else if (GST_IS_EVENT (item)) {
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
+
+    /* add buffer to the statistics */
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers++;
+      queue->cur_level.bytes += size;
+    }
+    queue->bytes_in += size;
+
+    /* apply new buffer to segment stats */
+    apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+    /* update the byterate stats */
+    update_in_rates (queue);
+
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
+      gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
+    }
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event;
 
     event = GST_EVENT_CAST (item);
@@ -1793,6 +1934,13 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
          * from downstream */
         queue->unexpected = FALSE;
         break;
+      case GST_EVENT_STREAM_START:
+        if (!QUEUE_IS_USING_QUEUE (queue)) {
+          gst_event_replace (&queue->stream_start_event, event);
+          gst_event_unref (event);
+          item = NULL;
+        }
+        break;
       default:
         if (!QUEUE_IS_USING_QUEUE (queue))
           goto unexpected_event;
@@ -1835,7 +1983,7 @@ unexpected_event:
 
 /* dequeue an item from the queue and update level stats */
 static GstMiniObject *
-gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
+gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
 {
   GstMiniObject *item;
 
@@ -1853,7 +2001,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
 
     buffer = GST_BUFFER_CAST (item);
     size = gst_buffer_get_size (buffer);
-    *is_buffer = TRUE;
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved buffer %p from queue", buffer);
@@ -1874,7 +2022,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
-    *is_buffer = FALSE;
+    *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved event %p from queue", event);
@@ -1890,11 +2038,36 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
       default:
         break;
     }
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
+
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers--;
+      queue->cur_level.bytes -= size;
+    }
+    queue->bytes_out += size;
+
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+    /* update the byterate stats */
+    update_out_rates (queue);
+    /* update the buffering */
+    if (queue->use_buffering)
+      update_buffering (queue);
+
   } else {
     g_warning
         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
         item, GST_OBJECT_NAME (queue));
     item = NULL;
+    *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
   }
   GST_QUEUE2_SIGNAL_DEL (queue);
 
@@ -1909,24 +2082,25 @@ no_item:
 }
 
 static gboolean
-gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
-      if (QUEUE_IS_USING_QUEUE (queue)) {
+      if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
         /* now unblock the chain function */
         GST_QUEUE2_MUTEX_LOCK (queue);
-        queue->srcresult = GST_FLOW_WRONG_STATE;
-        queue->sinkresult = GST_FLOW_WRONG_STATE;
+        queue->srcresult = GST_FLOW_FLUSHING;
+        queue->sinkresult = GST_FLOW_FLUSHING;
         /* unblock the loop and chain functions */
         GST_QUEUE2_SIGNAL_ADD (queue);
         GST_QUEUE2_SIGNAL_DEL (queue);
@@ -1939,7 +2113,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
       } else {
         GST_QUEUE2_MUTEX_LOCK (queue);
         /* flush the sink pad */
-        queue->sinkresult = GST_FLOW_WRONG_STATE;
+        queue->sinkresult = GST_FLOW_FLUSHING;
         GST_QUEUE2_SIGNAL_DEL (queue);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
@@ -1951,7 +2125,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
 
-      if (QUEUE_IS_USING_QUEUE (queue)) {
+      if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -1961,10 +2135,11 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         queue->sinkresult = GST_FLOW_OK;
         queue->is_eos = FALSE;
         queue->unexpected = FALSE;
+        queue->seeking = FALSE;
         /* reset rate counters */
         reset_rate_timer (queue);
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
-            queue->srcpad);
+            queue->srcpad, NULL);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
       } else {
         GST_QUEUE2_MUTEX_LOCK (queue);
@@ -1972,6 +2147,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         queue->is_eos = FALSE;
         queue->unexpected = FALSE;
         queue->sinkresult = GST_FLOW_OK;
+        queue->seeking = FALSE;
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         gst_event_unref (event);
@@ -1985,7 +2161,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         /* refuse more events on EOS */
         if (queue->is_eos)
           goto out_eos;
-        gst_queue2_locked_enqueue (queue, event, FALSE);
+        gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
       } else {
         /* non-serialized events are passed upstream. */
@@ -2014,13 +2190,19 @@ out_eos:
 }
 
 static gboolean
-gst_queue2_handle_sink_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+    GstQuery * query)
 {
   gboolean res;
 
   switch (GST_QUERY_TYPE (query)) {
     default:
-      res = gst_pad_query_default (pad, query);
+      if (GST_QUERY_IS_SERIALIZED (query)) {
+        GST_WARNING_OBJECT (pad, "unhandled serialized query");
+        res = FALSE;
+      } else {
+        res = gst_pad_query_default (pad, parent, query);
+      }
       break;
   }
   return res;
@@ -2088,18 +2270,9 @@ gst_queue2_is_filled (GstQueue2 * queue)
 }
 
 static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
+    GstMiniObject * item, GstQueue2ItemType item_type)
 {
-  GstQueue2 *queue;
-
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
-
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
-      G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
   /* we have to lock the queue since we span threads */
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
   /* when we received EOS, we refuse more data */
@@ -2109,11 +2282,15 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
+  /* while we didn't receive the newsegment, we're seeking and we skip data */
+  if (queue->seeking)
+    goto out_seeking;
+
   if (!gst_queue2_wait_free_space (queue))
     goto out_flushing;
 
   /* put buffer in queue now */
-  gst_queue2_locked_enqueue (queue, buffer, TRUE);
+  gst_queue2_locked_enqueue (queue, item, item_type);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -2126,7 +2303,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return ret;
   }
@@ -2134,20 +2311,104 @@ out_eos:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_EOS;
   }
+out_seeking:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
+    gst_mini_object_unref (item);
+
+    return GST_FLOW_OK;
+  }
 out_unexpected:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_EOS;
   }
 }
 
+static GstFlowReturn
+gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (parent);
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
+      "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+}
+
+static GstFlowReturn
+gst_queue2_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (parent);
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer list %p", buffer_list);
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+}
+
+static GstMiniObject *
+gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
+{
+  GstMiniObject *data;
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
+
+  /* stop pushing buffers, we dequeue all items until we see an item that we
+   * can push again, which is EOS or SEGMENT. If there is nothing in the
+   * queue we can push, we set a flag to make the sinkpad refuse more
+   * buffers with an EOS return value until we receive something
+   * pushable again or we get flushed. */
+  while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
+    if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping EOS buffer %p", data);
+      gst_buffer_unref (GST_BUFFER_CAST (data));
+    } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
+      GstEvent *event = GST_EVENT_CAST (data);
+      GstEventType type = GST_EVENT_TYPE (event);
+
+      if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+        /* we found a pushable item in the queue, push it out */
+        GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+            "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
+        return data;
+      }
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping EOS event %p", event);
+      gst_event_unref (event);
+    } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping EOS buffer list %p", data);
+      gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
+    }
+  }
+  /* no more items in the queue. Set the unexpected flag so that upstream
+   * make us refuse any more buffers on the sinkpad. Since we will still
+   * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
+   * task function does not shut down. */
+  queue->unexpected = TRUE;
+  return NULL;
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
@@ -2155,73 +2416,33 @@ gst_queue2_push_one (GstQueue2 * queue)
 {
   GstFlowReturn result = GST_FLOW_OK;
   GstMiniObject *data;
-  gboolean is_buffer = FALSE;
+  GstQueue2ItemType item_type;
 
-  data = gst_queue2_locked_dequeue (queue, &is_buffer);
+  data = gst_queue2_locked_dequeue (queue, &item_type);
   if (data == NULL)
     goto no_item;
 
 next:
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
-  if (is_buffer) {
+  if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
-#if 0
-    GstCaps *caps;
-#endif
 
     buffer = GST_BUFFER_CAST (data);
-#if 0
-    caps = GST_BUFFER_CAPS (buffer);
-#endif
-
-#if 0
-    /* set caps before pushing the buffer so that core does not try to do
-     * something fancy to check if this is possible. */
-    if (caps && caps != GST_PAD_CAPS (queue->srcpad))
-      gst_pad_set_caps (queue->srcpad, caps);
-#endif
 
     result = gst_pad_push (queue->srcpad, buffer);
 
     /* need to check for srcresult here as well */
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
     if (result == GST_FLOW_EOS) {
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
-      /* stop pushing buffers, we dequeue all items until we see an item that we
-       * can push again, which is EOS or SEGMENT. If there is nothing in the
-       * queue we can push, we set a flag to make the sinkpad refuse more
-       * buffers with an EOS return value until we receive something
-       * pushable again or we get flushed. */
-      while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
-        if (is_buffer) {
-          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping EOS buffer %p", data);
-          gst_buffer_unref (GST_BUFFER_CAST (data));
-        } else if (GST_IS_EVENT (data)) {
-          GstEvent *event = GST_EVENT_CAST (data);
-          GstEventType type = GST_EVENT_TYPE (event);
-
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
-            /* we found a pushable item in the queue, push it out */
-            GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-                "pushing pushable event %s after EOS",
-                GST_EVENT_TYPE_NAME (event));
-            goto next;
-          }
-          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping EOS event %p", event);
-          gst_event_unref (event);
-        }
-      }
-      /* no more items in the queue. Set the unexpected flag so that upstream
-       * make us refuse any more buffers on the sinkpad. Since we will still
-       * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
-       * task function does not shut down. */
-      queue->unexpected = TRUE;
+      data = gst_queue2_dequeue_on_eos (queue, &item_type);
+      if (data != NULL)
+        goto next;
+      /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
+       * to the caller so that the task function does not shut down */
       result = GST_FLOW_OK;
     }
-  } else if (GST_IS_EVENT (data)) {
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event = GST_EVENT_CAST (data);
     GstEventType type = GST_EVENT_TYPE (event);
 
@@ -2235,6 +2456,23 @@ next:
     }
 
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+
+    buffer_list = GST_BUFFER_LIST_CAST (data);
+
+    result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+    /* need to check for srcresult here as well */
+    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+    if (result == GST_FLOW_EOS) {
+      data = gst_queue2_dequeue_on_eos (queue, &item_type);
+      if (data != NULL)
+        goto next;
+      /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
+       * to the caller so that the task function does not shut down */
+      result = GST_FLOW_OK;
+    }
   }
   return result;
 
@@ -2248,7 +2486,7 @@ no_item:
 out_flushing:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_WRONG_STATE;
+    return GST_FLOW_FLUSHING;
   }
 }
 
@@ -2319,10 +2557,10 @@ out_flushing:
 }
 
 static gboolean
-gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   gboolean res = TRUE;
-  GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  GstQueue2 *queue = GST_QUEUE2 (parent);
 
 #ifndef GST_DISABLE_GST_DEBUG
   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
@@ -2338,7 +2576,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
         /* now unblock the getrange function */
         GST_QUEUE2_MUTEX_LOCK (queue);
         GST_DEBUG_OBJECT (queue, "flushing");
-        queue->srcresult = GST_FLOW_WRONG_STATE;
+        queue->srcresult = GST_FLOW_FLUSHING;
         GST_QUEUE2_SIGNAL_ADD (queue);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
@@ -2355,12 +2593,6 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
         /* now unblock the getrange function */
         GST_QUEUE2_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_OK;
-        if (queue->current) {
-          /* forget the highest read offset, we'll calculate a new one when we
-           * get the next getrange request. We need to do this in order to reset
-           * the buffering percentage */
-          queue->current->max_reading_pos = 0;
-        }
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         /* when using a temp file, we eat the event */
@@ -2377,11 +2609,11 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
 }
 
 static gboolean
-gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
@@ -2491,7 +2723,9 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
             start = 0;
             /* get our available data relative to the duration */
             if (duration != -1)
-              stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
+              stop =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
+                  duration);
             else
               stop = -1;
             break;
@@ -2515,8 +2749,12 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
                 range_stop = 0;
                 break;
               }
-              range_start = 100 * queued_ranges->offset / duration;
-              range_stop = 100 * queued_ranges->writing_pos / duration;
+              range_start =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+                  queued_ranges->offset, duration);
+              range_stop =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+                  queued_ranges->writing_pos, duration);
               break;
             case GST_FORMAT_BYTES:
               range_start = queued_ranges->offset;
@@ -2546,16 +2784,22 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
     case GST_QUERY_SCHEDULING:
     {
       gboolean pull_mode;
+      GstSchedulingFlags flags = 0;
 
       /* we can operate in pull mode when we are using a tempfile */
       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
 
-      gst_query_set_scheduling (query, pull_mode, pull_mode, FALSE, 0, -1, 1);
+      if (pull_mode)
+        flags |= GST_SCHEDULING_FLAG_SEEKABLE;
+      gst_query_set_scheduling (query, flags, 0, -1, 0);
+      if (pull_mode)
+        gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
+      gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
       break;
     }
     default:
       /* peer handled other queries */
-      if (!gst_pad_query_default (pad, query))
+      if (!gst_pad_query_default (pad, parent, query))
         goto peer_failed;
       break;
   }
@@ -2573,8 +2817,11 @@ peer_failed:
 static gboolean
 gst_queue2_handle_query (GstElement * element, GstQuery * query)
 {
+  GstQueue2 *queue = GST_QUEUE2 (element);
+
   /* simply forward to the srcpad query function */
-  return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
+  return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
+      query);
 }
 
 static void
@@ -2590,13 +2837,13 @@ gst_queue2_update_upstream_size (GstQueue2 * queue)
 }
 
 static GstFlowReturn
-gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
-    GstBuffer ** buffer)
+gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
+    guint length, GstBuffer ** buffer)
 {
   GstQueue2 *queue;
   GstFlowReturn ret;
 
-  queue = GST_QUEUE2_CAST (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2_CAST (parent);
 
   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
@@ -2649,44 +2896,52 @@ out_unexpected:
 
 /* sink currently only operates in push mode */
 static gboolean
-gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
+gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active)
 {
-  gboolean result = TRUE;
+  gboolean result;
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
-  if (active) {
-    GST_QUEUE2_MUTEX_LOCK (queue);
-    GST_DEBUG_OBJECT (queue, "activating push mode");
-    queue->srcresult = GST_FLOW_OK;
-    queue->sinkresult = GST_FLOW_OK;
-    queue->is_eos = FALSE;
-    queue->unexpected = FALSE;
-    reset_rate_timer (queue);
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
-  } else {
-    /* unblock chain function */
-    GST_QUEUE2_MUTEX_LOCK (queue);
-    GST_DEBUG_OBJECT (queue, "deactivating push mode");
-    queue->srcresult = GST_FLOW_WRONG_STATE;
-    queue->sinkresult = GST_FLOW_WRONG_STATE;
-    gst_queue2_locked_flush (queue);
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
+  switch (mode) {
+    case GST_PAD_MODE_PUSH:
+      if (active) {
+        GST_QUEUE2_MUTEX_LOCK (queue);
+        GST_DEBUG_OBJECT (queue, "activating push mode");
+        queue->srcresult = GST_FLOW_OK;
+        queue->sinkresult = GST_FLOW_OK;
+        queue->is_eos = FALSE;
+        queue->unexpected = FALSE;
+        reset_rate_timer (queue);
+        GST_QUEUE2_MUTEX_UNLOCK (queue);
+      } else {
+        /* unblock chain function */
+        GST_QUEUE2_MUTEX_LOCK (queue);
+        GST_DEBUG_OBJECT (queue, "deactivating push mode");
+        queue->srcresult = GST_FLOW_FLUSHING;
+        queue->sinkresult = GST_FLOW_FLUSHING;
+        gst_queue2_locked_flush (queue);
+        GST_QUEUE2_MUTEX_UNLOCK (queue);
+      }
+      result = TRUE;
+      break;
+    default:
+      result = FALSE;
+      break;
   }
-
   return result;
 }
 
 /* src operating in push mode, we start a task on the source pad that pushes out
  * buffers from the queue */
 static gboolean
-gst_queue2_src_activate_push (GstPad * pad, gboolean active)
+gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
 {
   gboolean result = FALSE;
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
   if (active) {
     GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2695,14 +2950,15 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
     queue->sinkresult = GST_FLOW_OK;
     queue->is_eos = FALSE;
     queue->unexpected = FALSE;
-    result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
+    result =
+        gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
     GST_QUEUE2_MUTEX_UNLOCK (queue);
   } else {
     /* unblock loop function */
     GST_QUEUE2_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "deactivating push mode");
-    queue->srcresult = GST_FLOW_WRONG_STATE;
-    queue->sinkresult = GST_FLOW_WRONG_STATE;
+    queue->srcresult = GST_FLOW_FLUSHING;
+    queue->sinkresult = GST_FLOW_FLUSHING;
     /* the item add signal will unblock */
     GST_QUEUE2_SIGNAL_ADD (queue);
     GST_QUEUE2_MUTEX_UNLOCK (queue);
@@ -2716,12 +2972,12 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
 
 /* pull mode, downstream will call our getrange function */
 static gboolean
-gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
+gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
 {
   gboolean result;
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
   if (active) {
     GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2747,16 +3003,16 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
       /* this is not allowed, we cannot operate in pull mode without a temp
        * file. */
-      queue->srcresult = GST_FLOW_WRONG_STATE;
-      queue->sinkresult = GST_FLOW_WRONG_STATE;
+      queue->srcresult = GST_FLOW_FLUSHING;
+      queue->sinkresult = GST_FLOW_FLUSHING;
       result = FALSE;
     }
     GST_QUEUE2_MUTEX_UNLOCK (queue);
   } else {
     GST_QUEUE2_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
-    queue->srcresult = GST_FLOW_WRONG_STATE;
-    queue->sinkresult = GST_FLOW_WRONG_STATE;
+    queue->srcresult = GST_FLOW_FLUSHING;
+    queue->sinkresult = GST_FLOW_FLUSHING;
     /* this will unlock getrange */
     GST_QUEUE2_SIGNAL_ADD (queue);
     result = TRUE;
@@ -2766,6 +3022,27 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
   return result;
 }
 
+static gboolean
+gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
+    gboolean active)
+{
+  gboolean res;
+
+  switch (mode) {
+    case GST_PAD_MODE_PULL:
+      res = gst_queue2_src_activate_pull (pad, parent, active);
+      break;
+    case GST_PAD_MODE_PUSH:
+      res = gst_queue2_src_activate_push (pad, parent, active);
+      break;
+    default:
+      GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
+      res = FALSE;
+      break;
+  }
+  return res;
+}
+
 static GstStateChangeReturn
 gst_queue2_change_state (GstElement * element, GstStateChange transition)
 {
@@ -2795,6 +3072,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
       }
       queue->segment_event_received = FALSE;
       queue->starting_segment = NULL;
+      gst_event_replace (&queue->stream_start_event, NULL);
       GST_QUEUE2_MUTEX_UNLOCK (queue);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -2829,6 +3107,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
         gst_event_unref (queue->starting_segment);
         queue->starting_segment = NULL;
       }
+      gst_event_replace (&queue->stream_start_event, NULL);
       GST_QUEUE2_MUTEX_UNLOCK (queue);
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
@@ -2923,7 +3202,7 @@ gst_queue2_set_property (GObject * object,
     case PROP_TEMP_LOCATION:
       g_free (queue->temp_location);
       queue->temp_location = g_value_dup_string (value);
-      /* you can set the property back to NULL to make it use the temp-tmpl
+      /* you can set the property back to NULL to make it use the temp-template
        * property. */
       queue->temp_location_set = queue->temp_location != NULL;
       break;