Merge remote-tracking branch 'upstream/master' into tizen
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index 191f577..2ca6673 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
  * SECTION:element-queue
+ * @title: queue
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -59,6 +60,7 @@
 
 #include <gst/gst.h>
 #include "gstqueue.h"
+#include "gstcoreelementselements.h"
 
 #include "../../gst/gst-i18n-lib.h"
 #include "../../gst/glib-compat-private.h"
@@ -92,7 +94,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
                       queue->cur_level.time, \
                       queue->min_threshold.time, \
                       queue->max_size.time, \
-                      queue->queue.length)
+                      gst_queue_array_get_length (queue->queue))
 
 /* Queue signals and args */
 enum
@@ -119,7 +121,11 @@ enum
   PROP_MIN_THRESHOLD_BYTES,
   PROP_MIN_THRESHOLD_TIME,
   PROP_LEAKY,
-  PROP_SILENT
+  PROP_SILENT,
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  PROP_EMPTY_BUFFERS,
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+  PROP_FLUSH_ON_EOS
 };
 
 /* default property values */
@@ -185,9 +191,9 @@ enum
         "dataflow inside the queue element");
 #define gst_queue_parent_class parent_class
 G_DEFINE_TYPE_WITH_CODE (GstQueue, gst_queue, GST_TYPE_ELEMENT, _do_init);
+GST_ELEMENT_REGISTER_DEFINE (queue, "queue", GST_RANK_NONE, GST_TYPE_QUEUE);
 
 static void gst_queue_finalize (GObject * object);
-
 static void gst_queue_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
 static void gst_queue_get_property (GObject * object,
@@ -195,11 +201,13 @@ static void gst_queue_get_property (GObject * object,
 
 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer);
+static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
 static void gst_queue_loop (GstPad * pad);
 
-static gboolean gst_queue_handle_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event);
+static GstFlowReturn gst_queue_handle_sink_event (GstPad * pad,
+    GstObject * parent, GstEvent * event);
 static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
@@ -208,7 +216,7 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
 static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
-static void gst_queue_locked_flush (GstQueue * queue);
+static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
 
 static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
     GstPadMode mode, gboolean active);
@@ -218,6 +226,14 @@ static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
 static gboolean gst_queue_is_empty (GstQueue * queue);
 static gboolean gst_queue_is_filled (GstQueue * queue);
 
+
+typedef struct
+{
+  GstMiniObject *item;
+  gsize size;
+  gboolean is_query;
+} GstQueueItem;
+
 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
 
 static GType
@@ -262,7 +278,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_UNDERRUN] =
       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
   /**
    * GstQueue::running:
    * @queue: the queue instance
@@ -274,7 +290,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_RUNNING] =
       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
   /**
    * GstQueue::overrun:
    * @queue: the queue instance
@@ -287,7 +303,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_OVERRUN] =
       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
   /**
    * GstQueue::pushing:
    * @queue: the queue instance
@@ -298,7 +314,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_PUSHING] =
       g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
 
   /* properties */
   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
@@ -318,58 +334,94 @@ gst_queue_class_init (GstQueueClass * klass)
       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
           "Max. amount of data in the queue (bytes, 0=disable)",
           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
           DEFAULT_MAX_SIZE_BUFFERS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
-          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          DEFAULT_MAX_SIZE_TIME,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
-          "Min. number of buffers in the queue to allow reading (0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Min. number of buffers in the queue to allow reading (0=disable)", 0,
+          G_MAXUINT, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
-          0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT64, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_LEAKY,
       g_param_spec_enum ("leaky", "Leaky",
           "Where the queue leaks, if at all",
           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   /**
    * GstQueue:silent
    *
    * Don't emit queue signals. Makes queues more lightweight if no signals are
    * needed.
-   *
-   * Since: 0.10.31
    */
   g_object_class_install_property (gobject_class, PROP_SILENT,
       g_param_spec_boolean ("silent", "Silent",
           "Don't emit queue signals", FALSE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
+
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  g_object_class_install_property (gobject_class, PROP_EMPTY_BUFFERS,
+      g_param_spec_boolean ("empty-buffers", "empty_buffers",
+          "Drop all the incomming buffers and flush buffers in queue",
+         FALSE, G_PARAM_READWRITE));
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+
+  /**
+   * queue:flush-on-eos:
+   *
+   * Discard all data in the queue when an EOS event is received, and pass
+   * on the EOS event as soon as possible (instead of waiting until all
+   * buffers in the queue have been processed, which is the default behaviour).
+   *
+   * Flushing the queue on EOS might be useful when capturing and encoding
+   * from a live source, to finish up the recording quickly in cases when
+   * the encoder is slow. Note that this might mean some data from the end of
+   * the recording data might be lost though (never more than the configured
+   * max. sizes though).
+   *
+   * Since: 1.2
+   */
+  g_object_class_install_property (gobject_class, PROP_FLUSH_ON_EOS,
+      g_param_spec_boolean ("flush-on-eos", "Flush on EOS",
+          "Discard all data in the queue when an EOS event is received", FALSE,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   gobject_class->finalize = gst_queue_finalize;
 
-  gst_element_class_set_details_simple (gstelement_class,
+  gst_element_class_set_static_metadata (gstelement_class,
       "Queue",
       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
 
   /* Registering debug symbols for function pointers */
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
@@ -378,6 +430,9 @@ gst_queue_class_init (GstQueueClass * klass)
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
+
+  gst_type_mark_as_plugin_api (GST_TYPE_QUEUE_LEAKY, 0);
 }
 
 static void
@@ -386,9 +441,10 @@ gst_queue_init (GstQueue * queue)
   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
 
   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+  gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
   gst_pad_set_activatemode_function (queue->sinkpad,
       gst_queue_sink_activate_mode);
-  gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
+  gst_pad_set_event_full_function (queue->sinkpad, gst_queue_handle_sink_event);
   gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query);
   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
@@ -418,17 +474,24 @@ gst_queue_init (GstQueue * queue)
   g_mutex_init (&queue->qlock);
   g_cond_init (&queue->item_add);
   g_cond_init (&queue->item_del);
+  g_cond_init (&queue->query_handled);
 
-  g_queue_init (&queue->queue);
+  queue->queue =
+      gst_queue_array_new_for_struct (sizeof (GstQueueItem),
+      DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
 
-  queue->sinktime = GST_CLOCK_TIME_NONE;
-  queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = GST_CLOCK_STIME_NONE;
+  queue->srctime = GST_CLOCK_STIME_NONE;
 
   queue->sink_tainted = TRUE;
   queue->src_tainted = TRUE;
 
   queue->newseg_applied_to_src = FALSE;
 
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  queue->empty_buffers = FALSE;
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+
   GST_DEBUG_OBJECT (queue,
       "initialized queue's not_empty & not_full conditions");
 }
@@ -437,24 +500,43 @@ gst_queue_init (GstQueue * queue)
 static void
 gst_queue_finalize (GObject * object)
 {
-  GstMiniObject *data;
   GstQueue *queue = GST_QUEUE (object);
+  GstQueueItem *qitem;
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while ((data = g_queue_pop_head (&queue->queue))) {
-    if (!GST_IS_QUERY (data))
-      gst_mini_object_unref (data);
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
+    /* FIXME: if it's a query, shouldn't we unref that too? */
+    if (!qitem->is_query)
+      gst_mini_object_unref (qitem->item);
   }
+  gst_queue_array_free (queue->queue);
 
-  g_queue_clear (&queue->queue);
   g_mutex_clear (&queue->qlock);
   g_cond_clear (&queue->item_add);
   g_cond_clear (&queue->item_del);
+  g_cond_clear (&queue->query_handled);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. */
 static void
@@ -465,7 +547,7 @@ update_time_level (GstQueue * queue)
   if (queue->sink_tainted) {
     GST_LOG_OBJECT (queue, "update sink time");
     queue->sinktime =
-        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->sink_segment,
         queue->sink_segment.position);
     queue->sink_tainted = FALSE;
   }
@@ -474,16 +556,17 @@ update_time_level (GstQueue * queue)
   if (queue->src_tainted) {
     GST_LOG_OBJECT (queue, "update src time");
     queue->srctime =
-        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->src_segment,
         queue->src_segment.position);
     queue->src_tainted = FALSE;
   }
   src_time = queue->srctime;
 
-  GST_LOG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+  GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
-  if (sink_time >= src_time)
+  if (GST_CLOCK_STIME_IS_VALID (src_time)
+      && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
   else
     queue->cur_level.time = 0;
@@ -518,14 +601,42 @@ apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
   update_time_level (queue);
 }
 
+static void
+apply_gap (GstQueue * queue, GstEvent * event,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+  GstClockTime duration;
+
+  gst_event_parse_gap (event, &timestamp, &duration);
+
+  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+
+    if (GST_CLOCK_TIME_IS_VALID (duration)) {
+      timestamp += duration;
+    }
+
+    segment->position = timestamp;
+
+    if (is_sink)
+      queue->sink_tainted = TRUE;
+    else
+      queue->src_tainted = TRUE;
+
+    /* calc diff with other end */
+    update_time_level (queue);
+  }
+}
+
+
 /* take a buffer and update segment, updating the time level of the queue. */
 static void
 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
-    gboolean with_duration, gboolean sink)
+    gboolean sink)
 {
   GstClockTime duration, timestamp;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
   duration = GST_BUFFER_DURATION (buffer);
 
   /* if no timestamp is set, assume it's continuous with the previous
@@ -534,10 +645,11 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
     timestamp = segment->position;
 
   /* add duration */
-  if (with_duration && duration != GST_CLOCK_TIME_NONE)
+  if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_LOG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+  GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
+      segment == &queue->sink_segment ? "sink" : "src",
       GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
@@ -551,17 +663,75 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
+{
+  GstClockTime *timestamp = user_data;
+  GstClockTime btime;
+
+  GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx, GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  btime = GST_BUFFER_DTS_OR_PTS (*buf);
+  if (GST_CLOCK_TIME_IS_VALID (btime))
+    *timestamp = btime;
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+
+  return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
 static void
-gst_queue_locked_flush (GstQueue * queue)
+apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean sink)
 {
-  GstMiniObject *data;
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->position;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  segment->position = timestamp;
+
+  if (sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
 
-  while ((data = g_queue_pop_head (&queue->queue))) {
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
+static void
+gst_queue_locked_flush (GstQueue * queue, gboolean full)
+{
+  GstQueueItem *qitem;
+
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
     /* Then lose another reference because we are supposed to destroy that
        data when flushing */
-    if (!GST_IS_QUERY (data))
-      gst_mini_object_unref (data);
+    if (!full && !qitem->is_query && GST_IS_EVENT (qitem->item)
+        && GST_EVENT_IS_STICKY (qitem->item)
+        && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
+        && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
+      gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
+    }
+    if (!qitem->is_query)
+      gst_mini_object_unref (qitem->item);
+    memset (qitem, 0, sizeof (GstQueueItem));
   }
+  queue->last_query = FALSE;
+  g_cond_signal (&queue->query_handled);
   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
   queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
   queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
@@ -570,7 +740,7 @@ gst_queue_locked_flush (GstQueue * queue)
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
 
-  queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
   queue->sink_tainted = queue->src_tainted = TRUE;
 
   /* we deleted a lot of something */
@@ -581,35 +751,65 @@ gst_queue_locked_flush (GstQueue * queue)
 static inline void
 gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
 {
+  GstQueueItem qitem;
   GstBuffer *buffer = GST_BUFFER_CAST (item);
+  gsize bsize = gst_buffer_get_size (buffer);
 
   /* add buffer to the statistics */
   queue->cur_level.buffers++;
-  queue->cur_level.bytes += gst_buffer_get_size (buffer);
-  apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
+  queue->cur_level.bytes += bsize;
+  apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
+
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = bsize;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
+  GST_QUEUE_SIGNAL_ADD (queue);
+}
+
+static inline void
+gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
+{
+  GstQueueItem qitem;
+  GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+  gsize bsize;
+
+  bsize = gst_buffer_list_calculate_size (buffer_list);
 
-  g_queue_push_tail (&queue->queue, item);
+  /* add buffer to the statistics */
+  queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
+  queue->cur_level.bytes += bsize;
+  apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = bsize;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
 static inline void
 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 {
+  GstQueueItem qitem;
   GstEvent *event = GST_EVENT_CAST (item);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_EOS:
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
       /* Zero the thresholds, this makes sure the queue is completely
        * filled and we can read all data from the queue. */
-      GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
+      if (queue->flush_on_eos)
+        gst_queue_locked_flush (queue, FALSE);
+      else
+        GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
       /* mark the queue as EOS. This prevents us from accepting more data. */
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
       queue->eos = TRUE;
       break;
     case GST_EVENT_SEGMENT:
       apply_segment (queue, event, &queue->sink_segment, TRUE);
       /* if the queue is empty, apply sink segment on the source */
-      if (queue->queue.length == 0) {
+      if (gst_queue_array_is_empty (queue->queue)) {
         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad");
         apply_segment (queue, event, &queue->src_segment, FALSE);
         queue->newseg_applied_to_src = TRUE;
@@ -618,11 +818,17 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
        * from downstream */
       queue->unexpected = FALSE;
       break;
+    case GST_EVENT_GAP:
+      apply_gap (queue, event, &queue->sink_segment, TRUE);
+      break;
     default:
       break;
   }
 
-  g_queue_push_tail (&queue->queue, item);
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = 0;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
@@ -630,12 +836,17 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 static GstMiniObject *
 gst_queue_locked_dequeue (GstQueue * queue)
 {
+  GstQueueItem *qitem;
   GstMiniObject *item;
+  gsize bufsize;
 
-  item = g_queue_pop_head (&queue->queue);
-  if (item == NULL)
+  qitem = gst_queue_array_pop_head_struct (queue->queue);
+  if (qitem == NULL)
     goto no_item;
 
+  item = qitem->item;
+  bufsize = qitem->size;
+
   if (GST_IS_BUFFER (item)) {
     GstBuffer *buffer = GST_BUFFER_CAST (item);
 
@@ -643,13 +854,25 @@ gst_queue_locked_dequeue (GstQueue * queue)
         "retrieved buffer %p from queue", buffer);
 
     queue->cur_level.buffers--;
-    queue->cur_level.bytes -= gst_buffer_get_size (buffer);
-    apply_buffer (queue, buffer, &queue->src_segment, TRUE, FALSE);
+    queue->cur_level.bytes -= bufsize;
+    apply_buffer (queue, buffer, &queue->src_segment, FALSE);
 
     /* if the queue is empty now, update the other side */
     if (queue->cur_level.buffers == 0)
       queue->cur_level.time = 0;
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
 
+    queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
+    queue->cur_level.bytes -= bufsize;
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+
+    /* if the queue is empty now, update the other side */
+    if (queue->cur_level.buffers == 0)
+      queue->cur_level.time = 0;
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
@@ -669,6 +892,9 @@ gst_queue_locked_dequeue (GstQueue * queue)
           queue->newseg_applied_to_src = FALSE;
         }
         break;
+      case GST_EVENT_GAP:
+        apply_gap (queue, event, &queue->src_segment, FALSE);
+        break;
       default:
         break;
     }
@@ -695,19 +921,21 @@ no_item:
   }
 }
 
-static gboolean
+static GstFlowReturn
 gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
+  gboolean ret = TRUE;
   GstQueue *queue;
 
   queue = GST_QUEUE (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-    {
-      STATUS (queue, pad, "received flush start event");
       /* forward event */
-      gst_pad_push_event (queue->srcpad, event);
+      ret = gst_pad_push_event (queue->srcpad, event);
 
       /* now unblock the chain function */
       GST_QUEUE_MUTEX_LOCK (queue);
@@ -721,59 +949,120 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
        * flush_start downstream. */
       gst_pad_pause_task (queue->srcpad);
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
-      goto done;
-    }
+
+      /* unblock query handler after the streaming thread is shut down.
+       * Otherwise downstream might have a query that is already unreffed
+       * upstream */
+      GST_QUEUE_MUTEX_LOCK (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      break;
     case GST_EVENT_FLUSH_STOP:
-    {
-      STATUS (queue, pad, "received flush stop event");
       /* forward event */
-      gst_pad_push_event (queue->srcpad, event);
+      ret = gst_pad_push_event (queue->srcpad, event);
 
       GST_QUEUE_MUTEX_LOCK (queue);
-      gst_queue_locked_flush (queue);
+      gst_queue_locked_flush (queue, FALSE);
       queue->srcresult = GST_FLOW_OK;
       queue->eos = FALSE;
       queue->unexpected = FALSE;
-      gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
-          queue->srcpad);
+      if (gst_pad_is_active (queue->srcpad)) {
+        gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+            queue->srcpad, NULL);
+      } else {
+        GST_INFO_OBJECT (queue->srcpad, "not re-starting task on srcpad, "
+            "pad not active any longer");
+      }
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       STATUS (queue, pad, "after flush");
-      goto done;
-    }
+      break;
     default:
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
-        GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-        /* refuse more events on EOS */
-        if (queue->eos)
-          goto out_eos;
+        GST_QUEUE_MUTEX_LOCK (queue);
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->srcresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->srcresult = GST_FLOW_OK;
+
+        if (queue->srcresult != GST_FLOW_OK) {
+          /* Errors in sticky event pushing are no problem and ignored here
+           * as they will cause more meaningful errors during data flow.
+           * For EOS events, that are not followed by data flow, we still
+           * return FALSE here though and report an error.
+           */
+          if (!GST_EVENT_IS_STICKY (event)) {
+            GST_QUEUE_MUTEX_UNLOCK (queue);
+            goto out_flow_error;
+          } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+            if (queue->srcresult == GST_FLOW_NOT_LINKED
+                || queue->srcresult < GST_FLOW_EOS) {
+              GST_QUEUE_MUTEX_UNLOCK (queue);
+              GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
+            } else {
+              GST_QUEUE_MUTEX_UNLOCK (queue);
+            }
+            goto out_flow_error;
+          }
+        }
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+              } else {
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue_locked_enqueue_event (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
-        /* non-serialized events are passed upstream. */
-        gst_pad_push_event (queue->srcpad, event);
+        /* non-serialized events are forwarded downstream immediately */
+        ret = gst_pad_push_event (queue->srcpad, event);
       }
       break;
   }
-done:
-  return TRUE;
+  if (ret == FALSE) {
+    GST_ERROR_OBJECT (queue, "Failed to push event");
+    return GST_FLOW_ERROR;
+  }
+  return GST_FLOW_OK;
 
   /* ERRORS */
-out_flushing:
+out_eos:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-        "refusing event, we are flushing");
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_event_unref (event);
-    return FALSE;
+    return GST_FLOW_EOS;
   }
-out_eos:
+out_flow_error:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
-    GST_QUEUE_MUTEX_UNLOCK (queue);
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "refusing event, we have a downstream flow error: %s",
+        gst_flow_get_name (queue->srcresult));
     gst_event_unref (event);
-    return FALSE;
+    return queue->srcresult;
   }
 }
 
@@ -786,16 +1075,22 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
   switch (GST_QUERY_TYPE (query)) {
     default:
       if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
+        GstQueueItem qitem;
+
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
         GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
             GST_QUERY_TYPE_NAME (query));
-        g_queue_push_tail (&queue->queue, query);
+        qitem.item = GST_MINI_OBJECT_CAST (query);
+        qitem.is_query = TRUE;
+        qitem.size = 0;
+        gst_queue_array_push_tail_struct (queue->queue, &qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        while (queue->queue.length != 0) {
-          /* for as long as the queue has items, we know the query is
-           * not handled yet */
-          GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
-        }
+        while (queue->srcresult == GST_FLOW_OK &&
+            queue->last_handled_query != query)
+          g_cond_wait (&queue->query_handled, &queue->qlock);
+        queue->last_handled_query = NULL;
+        if (queue->srcresult != GST_FLOW_OK)
+          goto out_flushing;
         res = queue->last_query;
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -817,9 +1112,20 @@ out_flushing:
 static gboolean
 gst_queue_is_empty (GstQueue * queue)
 {
-  if (queue->queue.length == 0)
+  GstQueueItem *tail;
+
+  tail = gst_queue_array_peek_tail_struct (queue->queue);
+
+  if (tail == NULL)
     return TRUE;
 
+  /* Only consider the queue empty if the minimum thresholds
+   * are not reached and data is at the queue tail. Otherwise
+   * we would block forever on serialized queries.
+   */
+  if (!GST_IS_BUFFER (tail->item) && !GST_IS_BUFFER_LIST (tail->item))
+    return FALSE;
+
   /* It is possible that a max size is reached before all min thresholds are.
    * Therefore, only consider it empty if it is not filled. */
   return ((queue->min_threshold.buffers > 0 &&
@@ -856,6 +1162,12 @@ gst_queue_leak_downstream (GstQueue * queue)
 
     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
         "queue is full, leaking item %p on downstream end", leak);
+    if (GST_IS_EVENT (leak) && GST_EVENT_IS_STICKY (leak)) {
+      GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+          "Storing sticky event %s on srcpad", GST_EVENT_TYPE_NAME (leak));
+      gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (leak));
+    }
+
     if (!GST_IS_QUERY (leak))
       gst_mini_object_unref (leak);
 
@@ -864,11 +1176,27 @@ gst_queue_leak_downstream (GstQueue * queue)
   }
 }
 
+static gboolean
+discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
+{
+  GstQueue *queue = user_data;
+  GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
+
+  if (subbuffer) {
+    *buffer = subbuffer;
+    GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
+  } else {
+    GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+  }
+
+  return FALSE;
+}
+
 static GstFlowReturn
-gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
+    GstMiniObject * obj, gboolean is_list)
 {
   GstQueue *queue;
-  GstClockTime duration, timestamp;
 
   queue = GST_QUEUE_CAST (parent);
 
@@ -880,13 +1208,32 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
-  duration = GST_BUFFER_DURATION (buffer);
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  /* Added to not enqueue buffers in the queue while paused */
+  if (queue->empty_buffers) {
+    GST_CAT_LOG_OBJECT(queue_dataflow, queue, "drop buffer %p", obj);
+    gst_mini_object_unref(obj);
+    GST_QUEUE_MUTEX_UNLOCK(queue);
+    return GST_FLOW_OK;
+  }
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
 
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
-      G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
-      GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+  if (!is_list) {
+    GstClockTime duration, timestamp;
+    GstBuffer *buffer = GST_BUFFER_CAST (obj);
+
+    timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
+    duration = GST_BUFFER_DURATION (buffer);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+        G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+        GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+        GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+  } else {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "received buffer list %p with %u buffers", obj,
+        gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
+  }
 
   /* We make space available if we're "full" according to whatever
    * the user defined as "full". Note that this only applies to buffers.
@@ -923,10 +1270,10 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
             "queue is full, waiting for free space");
 
         /* don't leak. Instead, wait for space to be available */
-        do {
-          /* for as long as the queue is filled, wait till an item was deleted. */
+        /* for as long as the queue is filled, wait till an item was deleted. */
+        while (gst_queue_is_filled (queue)) {
           GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
-        } while (gst_queue_is_filled (queue));
+        };
 
         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not full");
 
@@ -941,19 +1288,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   }
 
   if (queue->tail_needs_discont) {
-    GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+    if (!is_list) {
+      GstBuffer *buffer = GST_BUFFER_CAST (obj);
+      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
 
-    if (subbuffer) {
-      buffer = subbuffer;
-      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      if (subbuffer) {
+        buffer = subbuffer;
+        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      } else {
+        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      }
+
+      obj = GST_MINI_OBJECT_CAST (buffer);
     } else {
-      GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
+
+      buffer_list = gst_buffer_list_make_writable (buffer_list);
+      gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+      obj = GST_MINI_OBJECT_CAST (buffer_list);
     }
     queue->tail_needs_discont = FALSE;
   }
 
   /* put buffer in queue now */
-  gst_queue_locked_enqueue_buffer (queue, buffer);
+  if (is_list)
+    gst_queue_locked_enqueue_buffer_list (queue, obj);
+  else
+    gst_queue_locked_enqueue_buffer (queue, obj);
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -963,7 +1324,7 @@ out_unref:
   {
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_OK;
   }
@@ -974,7 +1335,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return ret;
   }
@@ -983,7 +1344,7 @@ out_eos:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
@@ -992,44 +1353,77 @@ out_unexpected:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
 }
 
+static GstFlowReturn
+gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer_list), TRUE);
+}
+
+static GstFlowReturn
+gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer), FALSE);
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
 gst_queue_push_one (GstQueue * queue)
 {
-  GstFlowReturn result = GST_FLOW_OK;
+  GstFlowReturn result = queue->srcresult;
   GstMiniObject *data;
+  gboolean is_list;
 
   data = gst_queue_locked_dequeue (queue);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (GST_IS_BUFFER (data)) {
-    GstBuffer *buffer;
+  is_list = GST_IS_BUFFER_LIST (data);
 
-    buffer = GST_BUFFER_CAST (data);
+  if (GST_IS_BUFFER (data) || is_list) {
+    if (!is_list) {
+      GstBuffer *buffer;
 
-    if (queue->head_needs_discont) {
-      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+      buffer = GST_BUFFER_CAST (data);
 
-      if (subbuffer) {
-        buffer = subbuffer;
-        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
-      } else {
-        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      if (queue->head_needs_discont) {
+        GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+        if (subbuffer) {
+          buffer = subbuffer;
+          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+        } else {
+          GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+        }
+        queue->head_needs_discont = FALSE;
       }
-      queue->head_needs_discont = FALSE;
-    }
 
-    GST_QUEUE_MUTEX_UNLOCK (queue);
-    result = gst_pad_push (queue->srcpad, buffer);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push (queue->srcpad, buffer);
+    } else {
+      GstBufferList *buffer_list;
+
+      buffer_list = GST_BUFFER_LIST_CAST (data);
+
+      if (queue->head_needs_discont) {
+        buffer_list = gst_buffer_list_make_writable (buffer_list);
+        gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+        queue->head_needs_discont = FALSE;
+      }
+
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push_list (queue->srcpad, buffer_list);
+    }
 
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@@ -1045,11 +1439,16 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping EOS buffer %p", data);
           gst_buffer_unref (GST_BUFFER_CAST (data));
+        } else if (GST_IS_BUFFER_LIST (data)) {
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping EOS buffer list %p", data);
+          gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
         } else if (GST_IS_EVENT (data)) {
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+              || type == GST_EVENT_STREAM_START) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
                 "pushing pushable event %s after EOS",
@@ -1065,6 +1464,7 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping query %p because of EOS", query);
           queue->last_query = FALSE;
+          g_cond_signal (&queue->query_handled);
         }
       }
       /* no more items in the queue. Set the unexpected flag so that upstream
@@ -1091,8 +1491,14 @@ next:
     }
   } else if (GST_IS_QUERY (data)) {
     GstQuery *query = GST_QUERY_CAST (data);
+    gboolean ret;
 
-    queue->last_query = gst_pad_peer_query (queue->srcpad, query);
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    ret = gst_pad_peer_query (queue->srcpad, query);
+    GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
+    queue->last_query = ret;
+    queue->last_handled_query = query;
+    g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
   }
@@ -1101,14 +1507,25 @@ next:
   /* ERRORS */
 no_item:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+    GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
         "exit because we have no item in the queue");
     return GST_FLOW_ERROR;
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GstFlowReturn ret = queue->srcresult;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
+  }
+out_flushing_query:
+  {
+    GstFlowReturn ret = queue->srcresult;
+    queue->last_query = FALSE;
+    g_cond_signal (&queue->query_handled);
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 }
 
@@ -1163,18 +1580,18 @@ out_flushing:
     gst_pad_pause_task (queue->srcpad);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (ret));
-    if (ret == GST_FLOW_FLUSHING)
-      gst_queue_locked_flush (queue);
-    else
+    if (ret == GST_FLOW_FLUSHING) {
+      gst_queue_locked_flush (queue, FALSE);
+    } else {
       GST_QUEUE_SIGNAL_DEL (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+    }
     GST_QUEUE_MUTEX_UNLOCK (queue);
     /* let app know about us giving up if upstream is not expected to do so */
     /* EOS is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
-      GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)",
-              gst_flow_get_name (ret), ret));
+      GST_ELEMENT_FLOW_ERROR (queue, ret);
       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
     }
     return;
@@ -1192,7 +1609,24 @@ gst_queue_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
       event, GST_EVENT_TYPE (event));
 #endif
 
-  res = gst_pad_push_event (queue->sinkpad, event);
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_RECONFIGURE:
+      GST_QUEUE_MUTEX_LOCK (queue);
+      if (queue->srcresult == GST_FLOW_NOT_LINKED) {
+        /* when we got not linked, assume downstream is linked again now and we
+         * can try to start pushing again */
+        queue->srcresult = GST_FLOW_OK;
+        gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad, NULL);
+      }
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+
+      res = gst_pad_push_event (queue->sinkpad, event);
+      break;
+    default:
+      res = gst_pad_event_default (pad, parent, event);
+      break;
+  }
+
 
   return res;
 }
@@ -1203,10 +1637,21 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
   GstQueue *queue = GST_QUEUE (parent);
   gboolean res;
 
-  res = gst_pad_query_default (pad, parent, query);
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_SCHEDULING:{
+      gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
+      res = TRUE;
+      break;
+    }
+    default:
+      res = gst_pad_query_default (pad, parent, query);
+      break;
+  }
+
   if (!res)
     return FALSE;
 
+  /* Adjust peer response for data contained in queue */
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
     {
@@ -1220,9 +1665,13 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       switch (format) {
         case GST_FORMAT_BYTES:
           peer_pos -= queue->cur_level.bytes;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         case GST_FORMAT_TIME:
           peer_pos -= queue->cur_level.time;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         default:
           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
@@ -1244,13 +1693,17 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
        * limit, the best thing we can do is to return an infinite delay. In
        * reality a better estimate would be the byte/buffer rate but that is not
        * possible right now. */
-      if (queue->max_size.time > 0 && max != -1)
+      /* TODO: Use CONVERT query? */
+      if (queue->max_size.time > 0 && max != -1
+          && queue->leaky == GST_QUEUE_NO_LEAK)
         max += queue->max_size.time;
+      else if (queue->max_size.time > 0 && queue->leaky != GST_QUEUE_NO_LEAK)
+        max = MAX (queue->max_size.time, max);
       else
         max = -1;
 
       /* adjust for min-threshold */
-      if (queue->min_threshold.time > 0 && min != -1)
+      if (queue->min_threshold.time > 0)
         min += queue->min_threshold.time;
 
       gst_query_set_latency (query, live, min, max);
@@ -1285,8 +1738,16 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         /* step 1, unblock chain function */
         GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_FLUSHING;
-        gst_queue_locked_flush (queue);
+        /* the item del signal will unblock */
+        GST_QUEUE_SIGNAL_DEL (queue);
         GST_QUEUE_MUTEX_UNLOCK (queue);
+
+        /* step 2, wait until streaming thread stopped and flush queue */
+        GST_PAD_STREAM_LOCK (pad);
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, TRUE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
+        GST_PAD_STREAM_UNLOCK (pad);
       }
       result = TRUE;
       break;
@@ -1314,7 +1775,8 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         queue->eos = FALSE;
         queue->unexpected = FALSE;
         result =
-            gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
+            gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
+            NULL);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
         /* step 1, unblock loop function */
@@ -1326,6 +1788,10 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
 
         /* step 2, make sure streaming finishes */
         result = gst_pad_stop_task (pad);
+
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, FALSE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       break;
     default:
@@ -1399,6 +1865,19 @@ gst_queue_set_property (GObject * object,
     case PROP_SILENT:
       queue->silent = g_value_get_boolean (value);
       break;
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+    case PROP_EMPTY_BUFFERS:
+      queue->empty_buffers = g_value_get_boolean (value);
+      GST_INFO_OBJECT(queue, "set empty buffer : %d", queue->empty_buffers);
+      if (queue->empty_buffers) {
+        gst_queue_locked_flush(queue, FALSE);
+      }
+      GST_INFO_OBJECT(queue, "done");
+      break;
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+    case PROP_FLUSH_ON_EOS:
+      queue->flush_on_eos = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1449,6 +1928,14 @@ gst_queue_get_property (GObject * object,
     case PROP_SILENT:
       g_value_set_boolean (value, queue->silent);
       break;
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+    case PROP_EMPTY_BUFFERS:
+      g_value_set_boolean(value, queue->empty_buffers);
+      break;
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+    case PROP_FLUSH_ON_EOS:
+      g_value_set_boolean (value, queue->flush_on_eos);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;