queue2: set seeking flag with the queue lock
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
index 203abc6..35e5e1e 100644 (file)
@@ -63,6 +63,7 @@
 #include <glib/gstdio.h>
 
 #include "gst/gst-i18n-lib.h"
+#include "gst/glib-compat-private.h"
 
 #include <string.h>
 
@@ -155,7 +156,7 @@ enum
                       queue->max_level.time, \
                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
                         queue->current->writing_pos - queue->current->max_reading_pos : \
-                        queue->queue->length))
+                        queue->queue.length))
 
 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
   g_mutex_lock (q->qlock);                                              \
@@ -225,6 +226,8 @@ static void gst_queue2_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 
 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain_list (GstPad * pad,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
     guint size, GstCaps * caps, GstBuffer ** buf);
 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
@@ -255,6 +258,13 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
 
 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
 
+typedef enum
+{
+  GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
+  GST_QUEUE2_ITEM_TYPE_BUFFER,
+  GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
+  GST_QUEUE2_ITEM_TYPE_EVENT
+} GstQueue2ItemType;
 
 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
 
@@ -281,8 +291,6 @@ gst_queue2_class_init (GstQueue2Class * klass)
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->set_property = gst_queue2_set_property;
   gobject_class->get_property = gst_queue2_get_property;
 
@@ -365,13 +373,13 @@ gst_queue2_class_init (GstQueue2Class * klass)
    * The maximum size of the ring buffer in bytes. If set to 0, the ring
    * buffer is disabled. Default 0.
    *
-   * Since: 0.10.30
+   * Since: 0.10.31
    */
   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
       g_param_spec_uint64 ("ring-buffer-max-size",
           "Max. ring buffer size (bytes)",
-          "Max. amount of data in the ring buffer (bytes, 0 = disabled",
-          0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
+          "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
+          0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* set several parent class virtual functions */
@@ -388,6 +396,8 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
 
   gst_pad_set_chain_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_chain));
+  gst_pad_set_chain_list_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
   gst_pad_set_activatepush_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
   gst_pad_set_event_function (queue->sinkpad,
@@ -450,7 +460,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
   queue->item_add = g_cond_new ();
   queue->waiting_del = FALSE;
   queue->item_del = g_cond_new ();
-  queue->queue = g_queue_new ();
+  g_queue_init (&queue->queue);
 
   queue->buffering_percent = 100;
 
@@ -475,13 +485,13 @@ gst_queue2_finalize (GObject * object)
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while (!g_queue_is_empty (queue->queue)) {
-    GstMiniObject *data = g_queue_pop_head (queue->queue);
+  while (!g_queue_is_empty (&queue->queue)) {
+    GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
     gst_mini_object_unref (data);
   }
 
-  g_queue_free (queue->queue);
+  g_queue_clear (&queue->queue);
   g_mutex_free (queue->qlock);
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
@@ -650,13 +660,17 @@ gst_queue2_getcaps (GstPad * pad)
   GstPad *otherpad;
   GstCaps *result;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  if (G_UNLIKELY (queue == NULL))
+    return gst_caps_new_any ();
 
   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
   result = gst_pad_peer_get_caps (otherpad);
   if (result == NULL)
     result = gst_caps_new_any ();
 
+  gst_object_unref (queue);
+
   return result;
 }
 
@@ -697,7 +711,9 @@ update_time_level (GstQueue2 * queue)
   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
 
-  if (queue->sinktime >= queue->srctime)
+  if (queue->sinktime != GST_CLOCK_TIME_NONE
+      && queue->srctime != GST_CLOCK_TIME_NONE
+      && queue->sinktime >= queue->srctime)
     queue->cur_level.time = queue->sinktime - queue->srctime;
   else
     queue->cur_level.time = 0;
@@ -724,7 +740,7 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
       G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
 
   if (format == GST_FORMAT_BYTES) {
-    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
       /* start is where we'll be getting from and as such writing next */
       queue->current = add_range (queue, start);
       /* update the stats for this range */
@@ -768,7 +784,7 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
   timestamp = GST_BUFFER_TIMESTAMP (buffer);
   duration = GST_BUFFER_DURATION (buffer);
 
-  /* if no timestamp is set, assume it's continuous with the previous 
+  /* if no timestamp is set, assume it's continuous with the previous
    * time */
   if (timestamp == GST_CLOCK_TIME_NONE)
     timestamp = segment->last_stop;
@@ -791,13 +807,59 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+static GstBufferListItem
+buffer_list_apply_time (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+  GstClockTime *timestamp = data;
+
+  GST_TRACE ("buffer %u in group %u has ts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx, group,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+    *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->last_stop;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+
+  if (is_sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 update_buffering (GstQueue2 * queue)
 {
   gint64 percent;
   gboolean post = FALSE;
 
-  if (!queue->use_buffering || queue->high_percent <= 0)
+  if (queue->high_percent <= 0)
     return;
 
 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
@@ -894,6 +956,7 @@ reset_rate_timer (GstQueue2 * queue)
   queue->bytes_in = 0;
   queue->bytes_out = 0;
   queue->byte_in_rate = 0.0;
+  queue->byte_in_period = 0;
   queue->byte_out_rate = 0.0;
   queue->last_in_elapsed = 0.0;
   queue->last_out_elapsed = 0.0;
@@ -906,8 +969,11 @@ reset_rate_timer (GstQueue2 * queue)
 /* Tuning for rate estimation. We use a large window for the input rate because
  * it should be stable when connected to a network. The output rate is less
  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
- * therefore adapt more quickly. */
-#define AVG_IN(avg,val)  ((avg) * 15.0 + (val)) / 16.0
+ * therefore adapt more quickly.
+ * However, initial input rate may be subject to a burst, and should therefore
+ * initially also adapt more quickly to changes, and only later on give higher
+ * weight to previous values. */
+#define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
 
 static void
@@ -929,14 +995,20 @@ update_in_rates (GstQueue2 * queue)
     period = elapsed - queue->last_in_elapsed;
 
     GST_DEBUG_OBJECT (queue,
-        "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
+        "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
+        period, queue->bytes_in, queue->byte_in_period);
 
     byte_in_rate = queue->bytes_in / period;
 
     if (queue->byte_in_rate == 0.0)
       queue->byte_in_rate = byte_in_rate;
     else
-      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
+      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
+          (double) queue->byte_in_period, period);
+
+    /* another data point, cap at 16 for long time running average */
+    if (queue->byte_in_period < 16 * RATE_INTERVAL)
+      queue->byte_in_period += period;
 
     /* reset the values to calculate rate over the next interval */
     queue->last_in_elapsed = elapsed;
@@ -1015,6 +1087,10 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
   GstEvent *event;
   gboolean res;
 
+  /* until we receive the FLUSH_STOP from this seek, we skip data */
+  queue->seeking = TRUE;
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+
   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
 
   event =
@@ -1022,7 +1098,6 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
       GST_SEEK_TYPE_NONE, -1);
 
-  GST_QUEUE2_MUTEX_UNLOCK (queue);
   res = gst_pad_push_event (queue->sinkpad, event);
   GST_QUEUE2_MUTEX_LOCK (queue);
 
@@ -1066,20 +1141,27 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
           (offset + length) - range->writing_pos);
 
   } else {
-    GST_INFO_OBJECT (queue, "not found in any range");
-    /* we don't have the range, see how far away we are, FIXME, find a good
-     * threshold based on the incomming rate. */
+    GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
+        " len %u", offset, length);
+    /* we don't have the range, see how far away we are */
     if (!queue->is_eos && queue->current) {
+      /* FIXME, find a good threshold based on the incoming rate. */
+      guint64 threshold = 1024 * 512;
+
       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-        if (offset < queue->current->offset || offset >
-            queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
-            queue->cur_level.bytes) {
-          perform_seek_to_offset (queue, offset);
-        } else {
+        guint64 distance;
+
+        distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+        /* don't wait for the complete buffer to fill */
+        distance = MIN (distance, threshold);
+
+        if (offset >= queue->current->offset && offset <=
+            queue->current->writing_pos + distance) {
           GST_INFO_OBJECT (queue,
               "requested data is within range, wait for data");
+          return FALSE;
         }
-      } else if (offset < queue->current->writing_pos + 200000) {
+      } else if (offset < queue->current->writing_pos + threshold) {
         update_cur_pos (queue, queue->current, offset + length);
         GST_INFO_OBJECT (queue, "wait for data");
         return FALSE;
@@ -1164,6 +1246,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
   guint block_length, remaining, read_length;
   gint64 read_return;
   guint64 rb_size;
+  guint64 max_size;
   guint64 rpos;
 
   /* allocate the output buffer of the requested size */
@@ -1175,6 +1258,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
   rpos = offset;
   rb_size = queue->ring_buffer_max_size;
+  max_size = QUEUE_MAX_BYTES (queue);
 
   remaining = length;
   while (remaining > 0) {
@@ -1193,16 +1277,16 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
         GST_DEBUG_OBJECT (queue,
             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
-            ", level %" G_GUINT64_FORMAT,
-            rpos, queue->current->writing_pos, level);
+            ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+            rpos, queue->current->writing_pos, level, max_size);
 
-        if (level >= rb_size) {
+        if (level >= max_size) {
           /* we don't have the data but if we have a ring buffer that is full, we
            * need to read */
           GST_DEBUG_OBJECT (queue,
-              "ring buffer full, reading ring-buffer-max-size %"
-              G_GUINT64_FORMAT " bytes", rb_size);
-          read_length = rb_size;
+              "ring buffer full, reading QUEUE_MAX_BYTES %"
+              G_GUINT64_FORMAT " bytes", max_size);
+          read_length = max_size;
         } else if (queue->is_eos) {
           /* won't get any more data so read any data we have */
           if (level) {
@@ -1210,9 +1294,12 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
                 level);
             read_length = level;
+            remaining = level;
+            length = level;
           } else {
             GST_DEBUG_OBJECT (queue,
                 "EOS hit and we don't have any requested data");
+            gst_buffer_unref (buf);
             return GST_FLOW_UNEXPECTED;
           }
         }
@@ -1241,7 +1328,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     /* set range reading_pos to actual reading position for this read */
     queue->current->reading_pos = rpos;
 
-    /* congfigure how much and from where to read */
+    /* configure how much and from where to read */
     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
       file_offset =
           (queue->current->rb_offset + (rpos -
@@ -1292,6 +1379,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
+    gst_buffer_unref (buf);
     return GST_FLOW_WRONG_STATE;
   }
 read_error:
@@ -1462,8 +1550,8 @@ gst_queue2_locked_flush (GstQueue2 * queue)
       gst_queue2_flush_temp_file (queue);
     init_ranges (queue);
   } else {
-    while (!g_queue_is_empty (queue->queue)) {
-      GstMiniObject *data = g_queue_pop_head (queue->queue);
+    while (!g_queue_is_empty (&queue->queue)) {
+      GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
       /* Then lose another reference because we are supposed to destroy that
          data when flushing */
@@ -1750,7 +1838,8 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
     update_cur_level (queue, queue->current);
 
     /* update the buffering status */
-    update_buffering (queue);
+    if (queue->use_buffering)
+      update_buffering (queue);
 
     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
@@ -1789,11 +1878,41 @@ handle_error:
   }
 }
 
+static GstBufferListItem
+buffer_list_create_write (GstBuffer ** buf, guint group, guint idx, gpointer q)
+{
+  GstQueue2 *queue = q;
+
+  GST_TRACE_OBJECT (queue, "writing buffer %u in group %u of size %u bytes",
+      idx, group, GST_BUFFER_SIZE (*buf));
+
+  if (!gst_queue2_create_write (queue, *buf)) {
+    GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
+    return GST_BUFFER_LIST_END;
+  }
+
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
+static GstBufferListItem
+buffer_list_calc_size (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+  guint *p_size = data;
+  guint buf_size;
+
+  buf_size = GST_BUFFER_SIZE (*buf);
+  GST_TRACE ("buffer %u in group %u has size %u", idx, group, buf_size);
+  *p_size += buf_size;
+
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
 /* enqueue an item an update the level stats */
 static void
-gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
+gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
+    GstQueue2ItemType item_type)
 {
-  if (GST_IS_BUFFER (item)) {
+  if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
     guint size;
 
@@ -1816,7 +1935,32 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
       /* FIXME - check return value? */
       gst_queue2_create_write (queue, buffer);
     }
-  } else if (GST_IS_EVENT (item)) {
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
+
+    /* add buffer to the statistics */
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers++;
+      queue->cur_level.bytes += size;
+    }
+    queue->bytes_in += size;
+
+    /* apply new buffer to segment stats */
+    apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+    /* update the byterate stats */
+    update_in_rates (queue);
+
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
+      gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
+    }
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event;
 
     event = GST_EVENT_CAST (item);
@@ -1860,10 +2004,11 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
 
   if (item) {
     /* update the buffering status */
-    update_buffering (queue);
+    if (queue->use_buffering)
+      update_buffering (queue);
 
     if (QUEUE_IS_USING_QUEUE (queue)) {
-      g_queue_push_tail (queue->queue, item);
+      g_queue_push_tail (&queue->queue, item);
     } else {
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
     }
@@ -1887,14 +2032,14 @@ unexpected_event:
 
 /* dequeue an item from the queue and update level stats */
 static GstMiniObject *
-gst_queue2_locked_dequeue (GstQueue2 * queue)
+gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
 {
   GstMiniObject *item;
 
   if (!QUEUE_IS_USING_QUEUE (queue))
     item = gst_queue2_read_item_from_file (queue);
   else
-    item = g_queue_pop_head (queue->queue);
+    item = g_queue_pop_head (&queue->queue);
 
   if (item == NULL)
     goto no_item;
@@ -1905,6 +2050,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
 
     buffer = GST_BUFFER_CAST (item);
     size = GST_BUFFER_SIZE (buffer);
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved buffer %p from queue", buffer);
@@ -1919,11 +2065,14 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
     /* update the byterate stats */
     update_out_rates (queue);
     /* update the buffering */
-    update_buffering (queue);
+    if (queue->use_buffering)
+      update_buffering (queue);
 
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
+    *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
+
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved event %p from queue", event);
 
@@ -1938,11 +2087,36 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
       default:
         break;
     }
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
+
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers--;
+      queue->cur_level.bytes -= size;
+    }
+    queue->bytes_out += size;
+
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+    /* update the byterate stats */
+    update_out_rates (queue);
+    /* update the buffering */
+    if (queue->use_buffering)
+      update_buffering (queue);
+
   } else {
     g_warning
         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
         item, GST_OBJECT_NAME (queue));
     item = NULL;
+    *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
   }
   GST_QUEUE2_SIGNAL_DEL (queue);
 
@@ -1967,7 +2141,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_FLUSH_START:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
-      if (QUEUE_IS_USING_QUEUE (queue)) {
+      if (queue->srcpad->mode == GST_ACTIVATE_PUSH) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -1999,7 +2173,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
 
-      if (QUEUE_IS_USING_QUEUE (queue)) {
+      if (queue->srcpad->mode == GST_ACTIVATE_PUSH) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -2009,6 +2183,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         queue->sinkresult = GST_FLOW_OK;
         queue->is_eos = FALSE;
         queue->unexpected = FALSE;
+        queue->seeking = FALSE;
         /* reset rate counters */
         reset_rate_timer (queue);
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
@@ -2033,7 +2208,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         /* refuse more events on EOS */
         if (queue->is_eos)
           goto out_eos;
-        gst_queue2_locked_enqueue (queue, event);
+        gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
       } else {
         /* non-serialized events are passed upstream. */
@@ -2071,7 +2246,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
     return queue->current->writing_pos <= queue->current->max_reading_pos;
   } else {
-    if (queue->queue->length == 0)
+    if (queue->queue.length == 0)
       return TRUE;
   }
 
@@ -2123,18 +2298,9 @@ gst_queue2_is_filled (GstQueue2 * queue)
 }
 
 static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
+    GstMiniObject * item, GstQueue2ItemType item_type)
 {
-  GstQueue2 *queue;
-
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
-
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
   /* we have to lock the queue since we span threads */
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
   /* when we received EOS, we refuse more data */
@@ -2144,11 +2310,15 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
+  /* while we didn't receive the newsegment, we're seeking and we skip data */
+  if (queue->seeking)
+    goto out_seeking;
+
   if (!gst_queue2_wait_free_space (queue))
     goto out_flushing;
 
   /* put buffer in queue now */
-  gst_queue2_locked_enqueue (queue, buffer);
+  gst_queue2_locked_enqueue (queue, item, item_type);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -2161,7 +2331,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return ret;
   }
@@ -2169,21 +2339,106 @@ out_eos:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_UNEXPECTED;
   }
+out_seeking:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
+    gst_mini_object_unref (item);
+
+    return GST_FLOW_OK;
+  }
 out_unexpected:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because we received UNEXPECTED");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_UNEXPECTED;
   }
 }
 
+static GstFlowReturn
+gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+}
+
+static GstFlowReturn
+gst_queue2_chain_list (GstPad * pad, GstBufferList * buffer_list)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer list %p", buffer_list);
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+}
+
+static GstMiniObject *
+gst_queue2_dequeue_on_unexpected (GstQueue2 * queue,
+    GstQueue2ItemType * item_type)
+{
+  GstMiniObject *data;
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream");
+
+  /* stop pushing buffers, we dequeue all items until we see an item that we
+   * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+   * queue we can push, we set a flag to make the sinkpad refuse more
+   * buffers with an UNEXPECTED return value until we receive something
+   * pushable again or we get flushed. */
+  while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
+    if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping UNEXPECTED buffer %p", data);
+      gst_buffer_unref (GST_BUFFER_CAST (data));
+    } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
+      GstEvent *event = GST_EVENT_CAST (data);
+      GstEventType type = GST_EVENT_TYPE (event);
+
+      if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+        /* we found a pushable item in the queue, push it out */
+        GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+            "pushing pushable event %s after UNEXPECTED",
+            GST_EVENT_TYPE_NAME (event));
+        return data;
+      }
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping UNEXPECTED event %p", event);
+      gst_event_unref (event);
+    } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping UNEXPECTED buffer list %p", data);
+      gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
+    }
+  }
+  /* no more items in the queue. Set the unexpected flag so that upstream
+   * make us refuse any more buffers on the sinkpad. Since we will still
+   * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+   * task function does not shut down. */
+  queue->unexpected = TRUE;
+  return NULL;
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
@@ -2191,21 +2446,22 @@ gst_queue2_push_one (GstQueue2 * queue)
 {
   GstFlowReturn result = GST_FLOW_OK;
   GstMiniObject *data;
+  GstQueue2ItemType item_type;
 
-  data = gst_queue2_locked_dequeue (queue);
+  data = gst_queue2_locked_dequeue (queue, &item_type);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (GST_IS_BUFFER (data)) {
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+  if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
     GstCaps *caps;
 
     buffer = GST_BUFFER_CAST (data);
     caps = GST_BUFFER_CAPS (buffer);
 
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
-
     /* set caps before pushing the buffer so that core does not try to do
      * something fancy to check if this is possible. */
     if (caps && caps != GST_PAD_CAPS (queue->srcpad))
@@ -2216,56 +2472,54 @@ next:
     /* need to check for srcresult here as well */
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
     if (result == GST_FLOW_UNEXPECTED) {
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-          "got UNEXPECTED from downstream");
-      /* stop pushing buffers, we dequeue all items until we see an item that we
-       * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
-       * queue we can push, we set a flag to make the sinkpad refuse more
-       * buffers with an UNEXPECTED return value until we receive something
-       * pushable again or we get flushed. */
-      while ((data = gst_queue2_locked_dequeue (queue))) {
-        if (GST_IS_BUFFER (data)) {
-          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping UNEXPECTED buffer %p", data);
-          gst_buffer_unref (GST_BUFFER_CAST (data));
-        } else if (GST_IS_EVENT (data)) {
-          GstEvent *event = GST_EVENT_CAST (data);
-          GstEventType type = GST_EVENT_TYPE (event);
-
-          if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
-            /* we found a pushable item in the queue, push it out */
-            GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-                "pushing pushable event %s after UNEXPECTED",
-                GST_EVENT_TYPE_NAME (event));
-            goto next;
-          }
-          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping UNEXPECTED event %p", event);
-          gst_event_unref (event);
-        }
-      }
-      /* no more items in the queue. Set the unexpected flag so that upstream
-       * make us refuse any more buffers on the sinkpad. Since we will still
-       * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
-       * task function does not shut down. */
-      queue->unexpected = TRUE;
+      data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+      if (data != NULL)
+        goto next;
+      /* Since we will still accept EOS and NEWSEGMENT we return _FLOW_OK
+       * to the caller so that the task function does not shut down */
       result = GST_FLOW_OK;
     }
-  } else if (GST_IS_EVENT (data)) {
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event = GST_EVENT_CAST (data);
     GstEventType type = GST_EVENT_TYPE (event);
 
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
-
     gst_pad_push_event (queue->srcpad, event);
 
-    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
     /* if we're EOS, return UNEXPECTED so that the task pauses. */
     if (type == GST_EVENT_EOS) {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
           "pushed EOS event %p, return UNEXPECTED", event);
       result = GST_FLOW_UNEXPECTED;
     }
+
+    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    GstBuffer *first_buf;
+    GstCaps *caps;
+
+    buffer_list = GST_BUFFER_LIST_CAST (data);
+
+    first_buf = gst_buffer_list_get (buffer_list, 0, 0);
+    caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
+
+    /* set caps before pushing the buffer so that core does not try to do
+     * something fancy to check if this is possible. */
+    if (caps && caps != GST_PAD_CAPS (queue->srcpad))
+      gst_pad_set_caps (queue->srcpad, caps);
+
+    result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+    /* need to check for srcresult here as well */
+    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+    if (result == GST_FLOW_UNEXPECTED) {
+      data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+      if (data != NULL)
+        goto next;
+      /* Since we will still accept EOS and NEWSEGMENT we return _FLOW_OK
+       * to the caller so that the task function does not shut down */
+      result = GST_FLOW_OK;
+    }
   }
   return result;
 
@@ -2283,7 +2537,7 @@ out_flushing:
   }
 }
 
-/* called repeadedly with @pad as the source pad. This function should push out
+/* called repeatedly with @pad as the source pad. This function should push out
  * data to the peer element. */
 static void
 gst_queue2_loop (GstPad * pad)
@@ -2333,9 +2587,9 @@ out_flushing:
     GstFlowReturn ret = queue->srcresult;
 
     gst_pad_pause_task (queue->srcpad);
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
     /* let app know about us giving up if upstream is not expected to do so */
     /* UNEXPECTED is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
@@ -2353,8 +2607,12 @@ static gboolean
 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
 {
   gboolean res = TRUE;
-  GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
 
+  if (G_UNLIKELY (queue == NULL)) {
+    gst_event_unref (event);
+    return FALSE;
+  }
 #ifndef GST_DISABLE_GST_DEBUG
   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
       event, GST_EVENT_TYPE_NAME (event));
@@ -2386,12 +2644,6 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
         /* now unblock the getrange function */
         GST_QUEUE2_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_OK;
-        if (queue->current) {
-          /* forget the highest read offset, we'll calculate a new one when we
-           * get the next getrange request. We need to do this in order to reset
-           * the buffering percentage */
-          queue->current->max_reading_pos = 0;
-        }
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         /* when using a temp file, we eat the event */
@@ -2404,6 +2656,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
       break;
   }
 
+  gst_object_unref (queue);
   return res;
 }
 
@@ -2425,7 +2678,9 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  if (G_UNLIKELY (queue == NULL))
+    return FALSE;
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
@@ -2596,12 +2851,14 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
       break;
   }
 
+  gst_object_unref (queue);
   return TRUE;
 
   /* ERRORS */
 peer_failed:
   {
     GST_DEBUG_OBJECT (queue, "failed peer query");
+    gst_object_unref (queue);
     return FALSE;
   }
 }
@@ -2634,8 +2891,8 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
 
   queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
 
-  GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
+  GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
   offset = (offset == -1) ? queue->current->reading_pos : offset;
 
   GST_DEBUG_OBJECT (queue,
@@ -2652,6 +2909,14 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
       goto out_unexpected;
   }
 
+  if (G_UNLIKELY (offset + length > queue->upstream_size)) {
+    gst_queue2_update_upstream_size (queue);
+    if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
+      length = queue->upstream_size - offset;
+      GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
+    }
+  }
+
   /* FIXME - function will block when the range is not yet available */
   ret = gst_queue2_create_read (queue, offset, length, buffer);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
@@ -2795,17 +3060,15 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
       queue->is_eos = FALSE;
       queue->unexpected = FALSE;
       queue->upstream_size = 0;
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
     } else {
-      GST_QUEUE2_MUTEX_LOCK (queue);
       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
       /* this is not allowed, we cannot operate in pull mode without a temp
        * file. */
       queue->srcresult = GST_FLOW_WRONG_STATE;
       queue->sinkresult = GST_FLOW_WRONG_STATE;
       result = FALSE;
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
     }
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
   } else {
     GST_QUEUE2_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
@@ -2878,6 +3141,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
           g_free (queue->ring_buffer);
           queue->ring_buffer = NULL;
         }
+        clean_ranges (queue);
       }
       if (queue->starting_segment != NULL) {
         gst_event_unref (queue->starting_segment);