Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index eb50c13..e98c1f6 100644 (file)
@@ -24,6 +24,7 @@
 
 /**
  * SECTION:element-queue
+ * @title: queue
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -195,11 +196,13 @@ static void gst_queue_get_property (GObject * object,
 
 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer);
+static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
 static void gst_queue_loop (GstPad * pad);
 
-static gboolean gst_queue_handle_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event);
+static GstFlowReturn gst_queue_handle_sink_event (GstPad * pad,
+    GstObject * parent, GstEvent * event);
 static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
@@ -221,8 +224,9 @@ static gboolean gst_queue_is_filled (GstQueue * queue);
 
 typedef struct
 {
-  gboolean is_query;
   GstMiniObject *item;
+  gsize size;
+  gboolean is_query;
 } GstQueueItem;
 
 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
@@ -325,35 +329,46 @@ gst_queue_class_init (GstQueueClass * klass)
       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
           "Max. amount of data in the queue (bytes, 0=disable)",
           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
           DEFAULT_MAX_SIZE_BUFFERS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
-          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          DEFAULT_MAX_SIZE_TIME,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
-          "Min. number of buffers in the queue to allow reading (0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Min. number of buffers in the queue to allow reading (0=disable)", 0,
+          G_MAXUINT, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
-          0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT64, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_LEAKY,
       g_param_spec_enum ("leaky", "Leaky",
           "Where the queue leaks, if at all",
           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   /**
    * GstQueue:silent
@@ -364,7 +379,8 @@ gst_queue_class_init (GstQueueClass * klass)
   g_object_class_install_property (gobject_class, PROP_SILENT,
       g_param_spec_boolean ("silent", "Silent",
           "Don't emit queue signals", FALSE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   /**
    * GstQueue:flush-on-eos
@@ -376,7 +392,7 @@ gst_queue_class_init (GstQueueClass * klass)
    * Flushing the queue on EOS might be useful when capturing and encoding
    * from a live source, to finish up the recording quickly in cases when
    * the encoder is slow. Note that this might mean some data from the end of
-   * the recoding data might be lost though (never more than the configured
+   * the recording data might be lost though (never more than the configured
    * max. sizes though).
    *
    * Since: 1.2
@@ -384,17 +400,16 @@ gst_queue_class_init (GstQueueClass * klass)
   g_object_class_install_property (gobject_class, PROP_FLUSH_ON_EOS,
       g_param_spec_boolean ("flush-on-eos", "Flush on EOS",
           "Discard all data in the queue when an EOS event is received", FALSE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   gobject_class->finalize = gst_queue_finalize;
 
   gst_element_class_set_static_metadata (gstelement_class,
       "Queue",
       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
 
   /* Registering debug symbols for function pointers */
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
@@ -403,6 +418,7 @@ gst_queue_class_init (GstQueueClass * klass)
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
 }
 
 static void
@@ -411,9 +427,10 @@ gst_queue_init (GstQueue * queue)
   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
 
   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+  gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
   gst_pad_set_activatemode_function (queue->sinkpad,
       gst_queue_sink_activate_mode);
-  gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
+  gst_pad_set_event_full_function (queue->sinkpad, gst_queue_handle_sink_event);
   gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query);
   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
@@ -445,10 +462,12 @@ gst_queue_init (GstQueue * queue)
   g_cond_init (&queue->item_del);
   g_cond_init (&queue->query_handled);
 
-  queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
+  queue->queue =
+      gst_queue_array_new_for_struct (sizeof (GstQueueItem),
+      DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
 
-  queue->sinktime = GST_CLOCK_TIME_NONE;
-  queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = GST_CLOCK_STIME_NONE;
+  queue->srctime = GST_CLOCK_STIME_NONE;
 
   queue->sink_tainted = TRUE;
   queue->src_tainted = TRUE;
@@ -464,15 +483,14 @@ static void
 gst_queue_finalize (GObject * object)
 {
   GstQueue *queue = GST_QUEUE (object);
+  GstQueueItem *qitem;
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while (!gst_queue_array_is_empty (queue->queue)) {
-    GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
     /* FIXME: if it's a query, shouldn't we unref that too? */
     if (!qitem->is_query)
       gst_mini_object_unref (qitem->item);
-    g_slice_free (GstQueueItem, qitem);
   }
   gst_queue_array_free (queue->queue);
 
@@ -484,6 +502,23 @@ gst_queue_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. */
 static void
@@ -494,7 +529,7 @@ update_time_level (GstQueue * queue)
   if (queue->sink_tainted) {
     GST_LOG_OBJECT (queue, "update sink time");
     queue->sinktime =
-        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->sink_segment,
         queue->sink_segment.position);
     queue->sink_tainted = FALSE;
   }
@@ -503,14 +538,14 @@ update_time_level (GstQueue * queue)
   if (queue->src_tainted) {
     GST_LOG_OBJECT (queue, "update src time");
     queue->srctime =
-        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->src_segment,
         queue->src_segment.position);
     queue->src_tainted = FALSE;
   }
   src_time = queue->srctime;
 
-  GST_LOG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+  GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
   if (sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
@@ -547,14 +582,42 @@ apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
   update_time_level (queue);
 }
 
+static void
+apply_gap (GstQueue * queue, GstEvent * event,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+  GstClockTime duration;
+
+  gst_event_parse_gap (event, &timestamp, &duration);
+
+  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+
+    if (GST_CLOCK_TIME_IS_VALID (duration)) {
+      timestamp += duration;
+    }
+
+    segment->position = timestamp;
+
+    if (is_sink)
+      queue->sink_tainted = TRUE;
+    else
+      queue->src_tainted = TRUE;
+
+    /* calc diff with other end */
+    update_time_level (queue);
+  }
+}
+
+
 /* take a buffer and update segment, updating the time level of the queue. */
 static void
 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
-    gboolean with_duration, gboolean sink)
+    gboolean sink)
 {
   GstClockTime duration, timestamp;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
   duration = GST_BUFFER_DURATION (buffer);
 
   /* if no timestamp is set, assume it's continuous with the previous
@@ -563,10 +626,11 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
     timestamp = segment->position;
 
   /* add duration */
-  if (with_duration && duration != GST_CLOCK_TIME_NONE)
+  if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_LOG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+  GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
+      segment == &queue->sink_segment ? "sink" : "src",
       GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
@@ -580,12 +644,61 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
+{
+  GstClockTime *timestamp = user_data;
+  GstClockTime btime;
+
+  GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx, GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  btime = GST_BUFFER_DTS_OR_PTS (*buf);
+  if (GST_CLOCK_TIME_IS_VALID (btime))
+    *timestamp = btime;
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+
+  return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean sink)
+{
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->position;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  segment->position = timestamp;
+
+  if (sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 gst_queue_locked_flush (GstQueue * queue, gboolean full)
 {
-  while (!gst_queue_array_is_empty (queue->queue)) {
-    GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
+  GstQueueItem *qitem;
 
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
     /* Then lose another reference because we are supposed to destroy that
        data when flushing */
     if (!full && !qitem->is_query && GST_IS_EVENT (qitem->item)
@@ -596,7 +709,7 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
     }
     if (!qitem->is_query)
       gst_mini_object_unref (qitem->item);
-    g_slice_free (GstQueueItem, qitem);
+    memset (qitem, 0, sizeof (GstQueueItem));
   }
   queue->last_query = FALSE;
   g_cond_signal (&queue->query_handled);
@@ -608,7 +721,7 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
 
-  queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
   queue->sink_tainted = queue->src_tainted = TRUE;
 
   /* we deleted a lot of something */
@@ -619,25 +732,59 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
 static inline void
 gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
 {
+  GstQueueItem qitem;
   GstBuffer *buffer = GST_BUFFER_CAST (item);
+  gsize bsize = gst_buffer_get_size (buffer);
 
   /* add buffer to the statistics */
   queue->cur_level.buffers++;
-  queue->cur_level.bytes += gst_buffer_get_size (buffer);
-  apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
-
-  if (item) {
-    GstQueueItem *qitem = g_slice_new (GstQueueItem);
-    qitem->item = item;
-    qitem->is_query = FALSE;
-    gst_queue_array_push_tail (queue->queue, qitem);
-  }
+  queue->cur_level.bytes += bsize;
+  apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
+
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = bsize;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
+  GST_QUEUE_SIGNAL_ADD (queue);
+}
+
+static gboolean
+buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
+{
+  guint *p_size = data;
+  gsize buf_size;
+
+  buf_size = gst_buffer_get_size (*buf);
+  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
+  *p_size += buf_size;
+  return TRUE;
+}
+
+static inline void
+gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
+{
+  GstQueueItem qitem;
+  GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+  gsize bsize = 0;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
+
+  /* add buffer to the statistics */
+  queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
+  queue->cur_level.bytes += bsize;
+  apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = bsize;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
 static inline void
 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 {
+  GstQueueItem qitem;
   GstEvent *event = GST_EVENT_CAST (item);
 
   switch (GST_EVENT_TYPE (event)) {
@@ -664,16 +811,17 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
        * from downstream */
       queue->unexpected = FALSE;
       break;
+    case GST_EVENT_GAP:
+      apply_gap (queue, event, &queue->sink_segment, TRUE);
+      break;
     default:
       break;
   }
 
-  if (item) {
-    GstQueueItem *qitem = g_slice_new (GstQueueItem);
-    qitem->item = item;
-    qitem->is_query = FALSE;
-    gst_queue_array_push_tail (queue->queue, qitem);
-  }
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = 0;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
@@ -683,13 +831,14 @@ gst_queue_locked_dequeue (GstQueue * queue)
 {
   GstQueueItem *qitem;
   GstMiniObject *item;
+  gsize bufsize;
 
-  qitem = gst_queue_array_pop_head (queue->queue);
+  qitem = gst_queue_array_pop_head_struct (queue->queue);
   if (qitem == NULL)
     goto no_item;
 
   item = qitem->item;
-  g_slice_free (GstQueueItem, qitem);
+  bufsize = qitem->size;
 
   if (GST_IS_BUFFER (item)) {
     GstBuffer *buffer = GST_BUFFER_CAST (item);
@@ -698,13 +847,25 @@ gst_queue_locked_dequeue (GstQueue * queue)
         "retrieved buffer %p from queue", buffer);
 
     queue->cur_level.buffers--;
-    queue->cur_level.bytes -= gst_buffer_get_size (buffer);
-    apply_buffer (queue, buffer, &queue->src_segment, TRUE, FALSE);
+    queue->cur_level.bytes -= bufsize;
+    apply_buffer (queue, buffer, &queue->src_segment, FALSE);
 
     /* if the queue is empty now, update the other side */
     if (queue->cur_level.buffers == 0)
       queue->cur_level.time = 0;
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
 
+    queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
+    queue->cur_level.bytes -= bufsize;
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+
+    /* if the queue is empty now, update the other side */
+    if (queue->cur_level.buffers == 0)
+      queue->cur_level.time = 0;
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
@@ -724,6 +885,9 @@ gst_queue_locked_dequeue (GstQueue * queue)
           queue->newseg_applied_to_src = FALSE;
         }
         break;
+      case GST_EVENT_GAP:
+        apply_gap (queue, event, &queue->src_segment, FALSE);
+        break;
       default:
         break;
     }
@@ -750,19 +914,19 @@ no_item:
   }
 }
 
-static gboolean
+static GstFlowReturn
 gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
+  gboolean ret = TRUE;
   GstQueue *queue;
 
   queue = GST_QUEUE (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-    {
       STATUS (queue, pad, "received flush start event");
       /* forward event */
-      gst_pad_push_event (queue->srcpad, event);
+      ret = gst_pad_push_event (queue->srcpad, event);
 
       /* now unblock the chain function */
       GST_QUEUE_MUTEX_LOCK (queue);
@@ -770,34 +934,42 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       /* unblock the loop and chain functions */
       GST_QUEUE_SIGNAL_ADD (queue);
       GST_QUEUE_SIGNAL_DEL (queue);
-      queue->last_query = FALSE;
-      g_cond_signal (&queue->query_handled);
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       /* make sure it pauses, this should happen since we sent
        * flush_start downstream. */
       gst_pad_pause_task (queue->srcpad);
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
-      goto done;
-    }
+
+      /* unblock query handler after the streaming thread is shut down.
+       * Otherwise downstream might have a query that is already unreffed
+       * upstream */
+      GST_QUEUE_MUTEX_LOCK (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      break;
     case GST_EVENT_FLUSH_STOP:
-    {
       STATUS (queue, pad, "received flush stop event");
       /* forward event */
-      gst_pad_push_event (queue->srcpad, event);
+      ret = gst_pad_push_event (queue->srcpad, event);
 
       GST_QUEUE_MUTEX_LOCK (queue);
       gst_queue_locked_flush (queue, FALSE);
       queue->srcresult = GST_FLOW_OK;
       queue->eos = FALSE;
       queue->unexpected = FALSE;
-      gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
-          queue->srcpad, NULL);
+      if (gst_pad_is_active (queue->srcpad)) {
+        gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+            queue->srcpad, NULL);
+      } else {
+        GST_INFO_OBJECT (queue->srcpad, "not re-starting task on srcpad, "
+            "pad not active any longer");
+      }
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       STATUS (queue, pad, "after flush");
-      goto done;
-    }
+      break;
     default:
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
@@ -806,11 +978,21 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
           /* Errors in sticky event pushing are no problem and ignored here
            * as they will cause more meaningful errors during data flow.
            * For EOS events, that are not followed by data flow, we still
-           * return FALSE here though.
+           * return FALSE here though and report an error.
            */
-          if (!GST_EVENT_IS_STICKY (event) ||
-              GST_EVENT_TYPE (event) == GST_EVENT_EOS)
+          if (!GST_EVENT_IS_STICKY (event)) {
+            GST_QUEUE_MUTEX_UNLOCK (queue);
+            goto out_flow_error;
+          } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+            if (queue->srcresult == GST_FLOW_NOT_LINKED
+                || queue->srcresult < GST_FLOW_EOS) {
+              GST_QUEUE_MUTEX_UNLOCK (queue);
+              GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
+            } else {
+              GST_QUEUE_MUTEX_UNLOCK (queue);
+            }
             goto out_flow_error;
+          }
         }
         /* refuse more events on EOS */
         if (queue->eos)
@@ -819,12 +1001,15 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
         /* non-serialized events are forwarded downstream immediately */
-        gst_pad_push_event (queue->srcpad, event);
+        ret = gst_pad_push_event (queue->srcpad, event);
       }
       break;
   }
-done:
-  return TRUE;
+  if (ret == FALSE) {
+    GST_ERROR_OBJECT (queue, "Failed to push event");
+    return GST_FLOW_ERROR;
+  }
+  return GST_FLOW_OK;
 
   /* ERRORS */
 out_eos:
@@ -832,16 +1017,15 @@ out_eos:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_event_unref (event);
-    return FALSE;
+    return GST_FLOW_EOS;
   }
 out_flow_error:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "refusing event, we have a downstream flow error: %s",
         gst_flow_get_name (queue->srcresult));
-    GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_event_unref (event);
-    return FALSE;
+    return queue->srcresult;
   }
 }
 
@@ -854,17 +1038,20 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
   switch (GST_QUERY_TYPE (query)) {
     default:
       if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
-        GstQueueItem *qitem;
+        GstQueueItem qitem;
 
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
         GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
             GST_QUERY_TYPE_NAME (query));
-        qitem = g_slice_new (GstQueueItem);
-        qitem->item = GST_MINI_OBJECT_CAST (query);
-        qitem->is_query = TRUE;
-        gst_queue_array_push_tail (queue->queue, qitem);
+        qitem.item = GST_MINI_OBJECT_CAST (query);
+        qitem.is_query = TRUE;
+        qitem.size = 0;
+        gst_queue_array_push_tail_struct (queue->queue, &qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        g_cond_wait (&queue->query_handled, &queue->qlock);
+        while (queue->srcresult == GST_FLOW_OK &&
+            queue->last_handled_query != query)
+          g_cond_wait (&queue->query_handled, &queue->qlock);
+        queue->last_handled_query = NULL;
         if (queue->srcresult != GST_FLOW_OK)
           goto out_flushing;
         res = queue->last_query;
@@ -890,14 +1077,15 @@ gst_queue_is_empty (GstQueue * queue)
 {
   GstQueueItem *head;
 
-  if (gst_queue_array_is_empty (queue->queue))
+  head = gst_queue_array_peek_head_struct (queue->queue);
+
+  if (head == NULL)
     return TRUE;
 
   /* Only consider the queue empty if the minimum thresholds
    * are not reached and data is at the queue head. Otherwise
    * we would block forever on serialized queries.
    */
-  head = gst_queue_array_peek_head (queue->queue);
   if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item))
     return FALSE;
 
@@ -951,11 +1139,27 @@ gst_queue_leak_downstream (GstQueue * queue)
   }
 }
 
+static gboolean
+discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
+{
+  GstQueue *queue = user_data;
+  GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
+
+  if (subbuffer) {
+    *buffer = subbuffer;
+    GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
+  } else {
+    GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+  }
+
+  return FALSE;
+}
+
 static GstFlowReturn
-gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
+    GstMiniObject * obj, gboolean is_list)
 {
   GstQueue *queue;
-  GstClockTime duration, timestamp;
 
   queue = GST_QUEUE_CAST (parent);
 
@@ -967,13 +1171,22 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
-  duration = GST_BUFFER_DURATION (buffer);
+  if (!is_list) {
+    GstClockTime duration, timestamp;
+    GstBuffer *buffer = GST_BUFFER_CAST (obj);
 
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
-      G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
-      GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+    timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
+    duration = GST_BUFFER_DURATION (buffer);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+        G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+        GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+        GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+  } else {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "received buffer list %p with %u buffers", obj,
+        gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
+  }
 
   /* We make space available if we're "full" according to whatever
    * the user defined as "full". Note that this only applies to buffers.
@@ -1028,19 +1241,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   }
 
   if (queue->tail_needs_discont) {
-    GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+    if (!is_list) {
+      GstBuffer *buffer = GST_BUFFER_CAST (obj);
+      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
 
-    if (subbuffer) {
-      buffer = subbuffer;
-      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      if (subbuffer) {
+        buffer = subbuffer;
+        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      } else {
+        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      }
+
+      obj = GST_MINI_OBJECT_CAST (buffer);
     } else {
-      GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
+
+      buffer_list = gst_buffer_list_make_writable (buffer_list);
+      gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+      obj = GST_MINI_OBJECT_CAST (buffer_list);
     }
     queue->tail_needs_discont = FALSE;
   }
 
   /* put buffer in queue now */
-  gst_queue_locked_enqueue_buffer (queue, buffer);
+  if (is_list)
+    gst_queue_locked_enqueue_buffer_list (queue, obj);
+  else
+    gst_queue_locked_enqueue_buffer (queue, obj);
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -1050,7 +1277,7 @@ out_unref:
   {
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_OK;
   }
@@ -1061,7 +1288,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return ret;
   }
@@ -1070,7 +1297,7 @@ out_eos:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
@@ -1079,44 +1306,77 @@ out_unexpected:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
 }
 
+static GstFlowReturn
+gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer_list), TRUE);
+}
+
+static GstFlowReturn
+gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer), FALSE);
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
 gst_queue_push_one (GstQueue * queue)
 {
-  GstFlowReturn result = GST_FLOW_OK;
+  GstFlowReturn result = queue->srcresult;
   GstMiniObject *data;
+  gboolean is_list;
 
   data = gst_queue_locked_dequeue (queue);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (GST_IS_BUFFER (data)) {
-    GstBuffer *buffer;
+  is_list = GST_IS_BUFFER_LIST (data);
 
-    buffer = GST_BUFFER_CAST (data);
+  if (GST_IS_BUFFER (data) || is_list) {
+    if (!is_list) {
+      GstBuffer *buffer;
 
-    if (queue->head_needs_discont) {
-      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+      buffer = GST_BUFFER_CAST (data);
 
-      if (subbuffer) {
-        buffer = subbuffer;
-        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
-      } else {
-        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      if (queue->head_needs_discont) {
+        GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+        if (subbuffer) {
+          buffer = subbuffer;
+          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+        } else {
+          GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+        }
+        queue->head_needs_discont = FALSE;
       }
-      queue->head_needs_discont = FALSE;
-    }
 
-    GST_QUEUE_MUTEX_UNLOCK (queue);
-    result = gst_pad_push (queue->srcpad, buffer);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push (queue->srcpad, buffer);
+    } else {
+      GstBufferList *buffer_list;
+
+      buffer_list = GST_BUFFER_LIST_CAST (data);
+
+      if (queue->head_needs_discont) {
+        buffer_list = gst_buffer_list_make_writable (buffer_list);
+        gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+        queue->head_needs_discont = FALSE;
+      }
+
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push_list (queue->srcpad, buffer_list);
+    }
 
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@@ -1132,6 +1392,10 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping EOS buffer %p", data);
           gst_buffer_unref (GST_BUFFER_CAST (data));
+        } else if (GST_IS_BUFFER_LIST (data)) {
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping EOS buffer list %p", data);
+          gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
         } else if (GST_IS_EVENT (data)) {
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
@@ -1185,6 +1449,7 @@ next:
     ret = gst_pad_peer_query (queue->srcpad, query);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
     queue->last_query = ret;
+    queue->last_handled_query = query;
     g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
@@ -1194,7 +1459,7 @@ next:
   /* ERRORS */
 no_item:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+    GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
         "exit because we have no item in the queue");
     return GST_FLOW_ERROR;
   }
@@ -1263,18 +1528,18 @@ out_flushing:
     gst_pad_pause_task (queue->srcpad);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (ret));
-    if (ret == GST_FLOW_FLUSHING)
+    if (ret == GST_FLOW_FLUSHING) {
       gst_queue_locked_flush (queue, FALSE);
-    else
+    } else {
       GST_QUEUE_SIGNAL_DEL (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+    }
     GST_QUEUE_MUTEX_UNLOCK (queue);
     /* let app know about us giving up if upstream is not expected to do so */
     /* EOS is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
-      GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)",
-              gst_flow_get_name (ret), ret));
+      GST_ELEMENT_FLOW_ERROR (queue, ret);
       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
     }
     return;
@@ -1348,9 +1613,13 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       switch (format) {
         case GST_FORMAT_BYTES:
           peer_pos -= queue->cur_level.bytes;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         case GST_FORMAT_TIME:
           peer_pos -= queue->cur_level.time;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         default:
           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
@@ -1372,13 +1641,17 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
        * limit, the best thing we can do is to return an infinite delay. In
        * reality a better estimate would be the byte/buffer rate but that is not
        * possible right now. */
-      if (queue->max_size.time > 0 && max != -1)
+      /* TODO: Use CONVERT query? */
+      if (queue->max_size.time > 0 && max != -1
+          && queue->leaky == GST_QUEUE_NO_LEAK)
         max += queue->max_size.time;
+      else if (queue->max_size.time > 0 && queue->leaky != GST_QUEUE_NO_LEAK)
+        max = MIN (queue->max_size.time, max);
       else
         max = -1;
 
       /* adjust for min-threshold */
-      if (queue->min_threshold.time > 0 && min != -1)
+      if (queue->min_threshold.time > 0)
         min += queue->min_threshold.time;
 
       gst_query_set_latency (query, live, min, max);
@@ -1414,10 +1687,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_FLUSHING;
         /* the item del signal will unblock */
-        g_cond_signal (&queue->item_del);
-        /* unblock query handler */
-        queue->last_query = FALSE;
-        g_cond_signal (&queue->query_handled);
+        GST_QUEUE_SIGNAL_DEL (queue);
         GST_QUEUE_MUTEX_UNLOCK (queue);
 
         /* step 2, wait until streaming thread stopped and flush queue */
@@ -1466,6 +1736,10 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
 
         /* step 2, make sure streaming finishes */
         result = gst_pad_stop_task (pad);
+
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, FALSE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       break;
     default: