Merge remote-tracking branch 'upstream/master' into tizen
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index c598d5a..2ca6673 100644 (file)
@@ -24,6 +24,7 @@
 
 /**
  * SECTION:element-queue
+ * @title: queue
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -59,6 +60,7 @@
 
 #include <gst/gst.h>
 #include "gstqueue.h"
+#include "gstcoreelementselements.h"
 
 #include "../../gst/gst-i18n-lib.h"
 #include "../../gst/glib-compat-private.h"
@@ -120,6 +122,9 @@ enum
   PROP_MIN_THRESHOLD_TIME,
   PROP_LEAKY,
   PROP_SILENT,
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  PROP_EMPTY_BUFFERS,
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
   PROP_FLUSH_ON_EOS
 };
 
@@ -186,6 +191,7 @@ enum
         "dataflow inside the queue element");
 #define gst_queue_parent_class parent_class
 G_DEFINE_TYPE_WITH_CODE (GstQueue, gst_queue, GST_TYPE_ELEMENT, _do_init);
+GST_ELEMENT_REGISTER_DEFINE (queue, "queue", GST_RANK_NONE, GST_TYPE_QUEUE);
 
 static void gst_queue_finalize (GObject * object);
 static void gst_queue_set_property (GObject * object,
@@ -195,11 +201,13 @@ static void gst_queue_get_property (GObject * object,
 
 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer);
+static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
 static void gst_queue_loop (GstPad * pad);
 
-static gboolean gst_queue_handle_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event);
+static GstFlowReturn gst_queue_handle_sink_event (GstPad * pad,
+    GstObject * parent, GstEvent * event);
 static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
@@ -221,9 +229,9 @@ static gboolean gst_queue_is_filled (GstQueue * queue);
 
 typedef struct
 {
-  gboolean is_query;
   GstMiniObject *item;
   gsize size;
+  gboolean is_query;
 } GstQueueItem;
 
 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
@@ -270,7 +278,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_UNDERRUN] =
       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
   /**
    * GstQueue::running:
    * @queue: the queue instance
@@ -282,7 +290,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_RUNNING] =
       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
   /**
    * GstQueue::overrun:
    * @queue: the queue instance
@@ -295,7 +303,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_OVERRUN] =
       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
   /**
    * GstQueue::pushing:
    * @queue: the queue instance
@@ -306,7 +314,7 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_queue_signals[SIGNAL_PUSHING] =
       g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
-      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+      NULL, G_TYPE_NONE, 0);
 
   /* properties */
   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
@@ -326,35 +334,46 @@ gst_queue_class_init (GstQueueClass * klass)
       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
           "Max. amount of data in the queue (bytes, 0=disable)",
           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
           DEFAULT_MAX_SIZE_BUFFERS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
-          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          DEFAULT_MAX_SIZE_TIME,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
-          "Min. number of buffers in the queue to allow reading (0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Min. number of buffers in the queue to allow reading (0=disable)", 0,
+          G_MAXUINT, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
-          0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT64, 0,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_LEAKY,
       g_param_spec_enum ("leaky", "Leaky",
           "Where the queue leaks, if at all",
           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   /**
    * GstQueue:silent
@@ -365,10 +384,18 @@ gst_queue_class_init (GstQueueClass * klass)
   g_object_class_install_property (gobject_class, PROP_SILENT,
       g_param_spec_boolean ("silent", "Silent",
           "Don't emit queue signals", FALSE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
+
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  g_object_class_install_property (gobject_class, PROP_EMPTY_BUFFERS,
+      g_param_spec_boolean ("empty-buffers", "empty_buffers",
+          "Drop all the incomming buffers and flush buffers in queue",
+         FALSE, G_PARAM_READWRITE));
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
 
   /**
-   * GstQueue:flush-on-eos
+   * queue:flush-on-eos:
    *
    * Discard all data in the queue when an EOS event is received, and pass
    * on the EOS event as soon as possible (instead of waiting until all
@@ -385,17 +412,16 @@ gst_queue_class_init (GstQueueClass * klass)
   g_object_class_install_property (gobject_class, PROP_FLUSH_ON_EOS,
       g_param_spec_boolean ("flush-on-eos", "Flush on EOS",
           "Discard all data in the queue when an EOS event is received", FALSE,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   gobject_class->finalize = gst_queue_finalize;
 
   gst_element_class_set_static_metadata (gstelement_class,
       "Queue",
       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
 
   /* Registering debug symbols for function pointers */
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
@@ -404,6 +430,9 @@ gst_queue_class_init (GstQueueClass * klass)
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
+
+  gst_type_mark_as_plugin_api (GST_TYPE_QUEUE_LEAKY, 0);
 }
 
 static void
@@ -412,9 +441,10 @@ gst_queue_init (GstQueue * queue)
   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
 
   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+  gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
   gst_pad_set_activatemode_function (queue->sinkpad,
       gst_queue_sink_activate_mode);
-  gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
+  gst_pad_set_event_full_function (queue->sinkpad, gst_queue_handle_sink_event);
   gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query);
   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
@@ -446,16 +476,22 @@ gst_queue_init (GstQueue * queue)
   g_cond_init (&queue->item_del);
   g_cond_init (&queue->query_handled);
 
-  queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
+  queue->queue =
+      gst_queue_array_new_for_struct (sizeof (GstQueueItem),
+      DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
 
-  queue->sinktime = GST_CLOCK_TIME_NONE;
-  queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = GST_CLOCK_STIME_NONE;
+  queue->srctime = GST_CLOCK_STIME_NONE;
 
   queue->sink_tainted = TRUE;
   queue->src_tainted = TRUE;
 
   queue->newseg_applied_to_src = FALSE;
 
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  queue->empty_buffers = FALSE;
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+
   GST_DEBUG_OBJECT (queue,
       "initialized queue's not_empty & not_full conditions");
 }
@@ -465,15 +501,14 @@ static void
 gst_queue_finalize (GObject * object)
 {
   GstQueue *queue = GST_QUEUE (object);
+  GstQueueItem *qitem;
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while (!gst_queue_array_is_empty (queue->queue)) {
-    GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
     /* FIXME: if it's a query, shouldn't we unref that too? */
     if (!qitem->is_query)
       gst_mini_object_unref (qitem->item);
-    g_slice_free (GstQueueItem, qitem);
   }
   gst_queue_array_free (queue->queue);
 
@@ -485,6 +520,23 @@ gst_queue_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. */
 static void
@@ -495,7 +547,7 @@ update_time_level (GstQueue * queue)
   if (queue->sink_tainted) {
     GST_LOG_OBJECT (queue, "update sink time");
     queue->sinktime =
-        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->sink_segment,
         queue->sink_segment.position);
     queue->sink_tainted = FALSE;
   }
@@ -504,16 +556,17 @@ update_time_level (GstQueue * queue)
   if (queue->src_tainted) {
     GST_LOG_OBJECT (queue, "update src time");
     queue->srctime =
-        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->src_segment,
         queue->src_segment.position);
     queue->src_tainted = FALSE;
   }
   src_time = queue->srctime;
 
-  GST_LOG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+  GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
-  if (sink_time >= src_time)
+  if (GST_CLOCK_STIME_IS_VALID (src_time)
+      && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
   else
     queue->cur_level.time = 0;
@@ -548,14 +601,42 @@ apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
   update_time_level (queue);
 }
 
+static void
+apply_gap (GstQueue * queue, GstEvent * event,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+  GstClockTime duration;
+
+  gst_event_parse_gap (event, &timestamp, &duration);
+
+  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+
+    if (GST_CLOCK_TIME_IS_VALID (duration)) {
+      timestamp += duration;
+    }
+
+    segment->position = timestamp;
+
+    if (is_sink)
+      queue->sink_tainted = TRUE;
+    else
+      queue->src_tainted = TRUE;
+
+    /* calc diff with other end */
+    update_time_level (queue);
+  }
+}
+
+
 /* take a buffer and update segment, updating the time level of the queue. */
 static void
 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
-    gboolean with_duration, gboolean sink)
+    gboolean sink)
 {
   GstClockTime duration, timestamp;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
   duration = GST_BUFFER_DURATION (buffer);
 
   /* if no timestamp is set, assume it's continuous with the previous
@@ -564,10 +645,11 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
     timestamp = segment->position;
 
   /* add duration */
-  if (with_duration && duration != GST_CLOCK_TIME_NONE)
+  if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_LOG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+  GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
+      segment == &queue->sink_segment ? "sink" : "src",
       GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
@@ -581,12 +663,61 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
+{
+  GstClockTime *timestamp = user_data;
+  GstClockTime btime;
+
+  GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx, GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  btime = GST_BUFFER_DTS_OR_PTS (*buf);
+  if (GST_CLOCK_TIME_IS_VALID (btime))
+    *timestamp = btime;
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+
+  return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean sink)
+{
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->position;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  segment->position = timestamp;
+
+  if (sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 gst_queue_locked_flush (GstQueue * queue, gboolean full)
 {
-  while (!gst_queue_array_is_empty (queue->queue)) {
-    GstQueueItem *qitem = gst_queue_array_pop_head (queue->queue);
+  GstQueueItem *qitem;
 
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
     /* Then lose another reference because we are supposed to destroy that
        data when flushing */
     if (!full && !qitem->is_query && GST_IS_EVENT (qitem->item)
@@ -597,7 +728,7 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
     }
     if (!qitem->is_query)
       gst_mini_object_unref (qitem->item);
-    g_slice_free (GstQueueItem, qitem);
+    memset (qitem, 0, sizeof (GstQueueItem));
   }
   queue->last_query = FALSE;
   g_cond_signal (&queue->query_handled);
@@ -609,7 +740,7 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
 
-  queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
   queue->sink_tainted = queue->src_tainted = TRUE;
 
   /* we deleted a lot of something */
@@ -620,27 +751,47 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
 static inline void
 gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
 {
-  GstQueueItem *qitem;
+  GstQueueItem qitem;
   GstBuffer *buffer = GST_BUFFER_CAST (item);
   gsize bsize = gst_buffer_get_size (buffer);
 
   /* add buffer to the statistics */
   queue->cur_level.buffers++;
   queue->cur_level.bytes += bsize;
-  apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
+  apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
 
-  qitem = g_slice_new (GstQueueItem);
-  qitem->item = item;
-  qitem->is_query = FALSE;
-  qitem->size = bsize;
-  gst_queue_array_push_tail (queue->queue, qitem);
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = bsize;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
+  GST_QUEUE_SIGNAL_ADD (queue);
+}
+
+static inline void
+gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
+{
+  GstQueueItem qitem;
+  GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+  gsize bsize;
+
+  bsize = gst_buffer_list_calculate_size (buffer_list);
+
+  /* add buffer to the statistics */
+  queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
+  queue->cur_level.bytes += bsize;
+  apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = bsize;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
 static inline void
 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 {
-  GstQueueItem *qitem;
+  GstQueueItem qitem;
   GstEvent *event = GST_EVENT_CAST (item);
 
   switch (GST_EVENT_TYPE (event)) {
@@ -667,14 +818,17 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
        * from downstream */
       queue->unexpected = FALSE;
       break;
+    case GST_EVENT_GAP:
+      apply_gap (queue, event, &queue->sink_segment, TRUE);
+      break;
     default:
       break;
   }
 
-  qitem = g_slice_new (GstQueueItem);
-  qitem->item = item;
-  qitem->is_query = FALSE;
-  gst_queue_array_push_tail (queue->queue, qitem);
+  qitem.item = item;
+  qitem.is_query = FALSE;
+  qitem.size = 0;
+  gst_queue_array_push_tail_struct (queue->queue, &qitem);
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
@@ -686,13 +840,12 @@ gst_queue_locked_dequeue (GstQueue * queue)
   GstMiniObject *item;
   gsize bufsize;
 
-  qitem = gst_queue_array_pop_head (queue->queue);
+  qitem = gst_queue_array_pop_head_struct (queue->queue);
   if (qitem == NULL)
     goto no_item;
 
   item = qitem->item;
   bufsize = qitem->size;
-  g_slice_free (GstQueueItem, qitem);
 
   if (GST_IS_BUFFER (item)) {
     GstBuffer *buffer = GST_BUFFER_CAST (item);
@@ -702,12 +855,24 @@ gst_queue_locked_dequeue (GstQueue * queue)
 
     queue->cur_level.buffers--;
     queue->cur_level.bytes -= bufsize;
-    apply_buffer (queue, buffer, &queue->src_segment, TRUE, FALSE);
+    apply_buffer (queue, buffer, &queue->src_segment, FALSE);
 
     /* if the queue is empty now, update the other side */
     if (queue->cur_level.buffers == 0)
       queue->cur_level.time = 0;
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
 
+    queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
+    queue->cur_level.bytes -= bufsize;
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+
+    /* if the queue is empty now, update the other side */
+    if (queue->cur_level.buffers == 0)
+      queue->cur_level.time = 0;
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
@@ -727,6 +892,9 @@ gst_queue_locked_dequeue (GstQueue * queue)
           queue->newseg_applied_to_src = FALSE;
         }
         break;
+      case GST_EVENT_GAP:
+        apply_gap (queue, event, &queue->src_segment, FALSE);
+        break;
       default:
         break;
     }
@@ -753,7 +921,7 @@ no_item:
   }
 }
 
-static gboolean
+static GstFlowReturn
 gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   gboolean ret = TRUE;
@@ -761,9 +929,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   queue = GST_QUEUE (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      STATUS (queue, pad, "received flush start event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -773,17 +943,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       /* unblock the loop and chain functions */
       GST_QUEUE_SIGNAL_ADD (queue);
       GST_QUEUE_SIGNAL_DEL (queue);
-      queue->last_query = FALSE;
-      g_cond_signal (&queue->query_handled);
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       /* make sure it pauses, this should happen since we sent
        * flush_start downstream. */
       gst_pad_pause_task (queue->srcpad);
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+
+      /* unblock query handler after the streaming thread is shut down.
+       * Otherwise downstream might have a query that is already unreffed
+       * upstream */
+      GST_QUEUE_MUTEX_LOCK (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
       break;
     case GST_EVENT_FLUSH_STOP:
-      STATUS (queue, pad, "received flush stop event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -807,6 +982,14 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
         GST_QUEUE_MUTEX_LOCK (queue);
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->srcresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->srcresult = GST_FLOW_OK;
+
         if (queue->srcresult != GST_FLOW_OK) {
           /* Errors in sticky event pushing are no problem and ignored here
            * as they will cause more meaningful errors during data flow.
@@ -814,21 +997,43 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
            * return FALSE here though and report an error.
            */
           if (!GST_EVENT_IS_STICKY (event)) {
+            GST_QUEUE_MUTEX_UNLOCK (queue);
             goto out_flow_error;
           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
             if (queue->srcresult == GST_FLOW_NOT_LINKED
                 || queue->srcresult < GST_FLOW_EOS) {
-              GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-                  (_("Internal data flow error.")),
-                  ("streaming task paused, reason %s (%d)",
-                      gst_flow_get_name (queue->srcresult), queue->srcresult));
+              GST_QUEUE_MUTEX_UNLOCK (queue);
+              GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
+            } else {
+              GST_QUEUE_MUTEX_UNLOCK (queue);
             }
             goto out_flow_error;
           }
         }
-        /* refuse more events on EOS */
-        if (queue->eos)
-          goto out_eos;
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+              } else {
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue_locked_enqueue_event (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -837,7 +1042,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       }
       break;
   }
-  return ret;
+  if (ret == FALSE) {
+    GST_ERROR_OBJECT (queue, "Failed to push event");
+    return GST_FLOW_ERROR;
+  }
+  return GST_FLOW_OK;
 
   /* ERRORS */
 out_eos:
@@ -845,16 +1054,15 @@ out_eos:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_event_unref (event);
-    return FALSE;
+    return GST_FLOW_EOS;
   }
 out_flow_error:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "refusing event, we have a downstream flow error: %s",
         gst_flow_get_name (queue->srcresult));
-    GST_QUEUE_MUTEX_UNLOCK (queue);
     gst_event_unref (event);
-    return FALSE;
+    return queue->srcresult;
   }
 }
 
@@ -867,17 +1075,20 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
   switch (GST_QUERY_TYPE (query)) {
     default:
       if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
-        GstQueueItem *qitem;
+        GstQueueItem qitem;
 
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
         GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
             GST_QUERY_TYPE_NAME (query));
-        qitem = g_slice_new (GstQueueItem);
-        qitem->item = GST_MINI_OBJECT_CAST (query);
-        qitem->is_query = TRUE;
-        gst_queue_array_push_tail (queue->queue, qitem);
+        qitem.item = GST_MINI_OBJECT_CAST (query);
+        qitem.is_query = TRUE;
+        qitem.size = 0;
+        gst_queue_array_push_tail_struct (queue->queue, &qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        g_cond_wait (&queue->query_handled, &queue->qlock);
+        while (queue->srcresult == GST_FLOW_OK &&
+            queue->last_handled_query != query)
+          g_cond_wait (&queue->query_handled, &queue->qlock);
+        queue->last_handled_query = NULL;
         if (queue->srcresult != GST_FLOW_OK)
           goto out_flushing;
         res = queue->last_query;
@@ -901,17 +1112,18 @@ out_flushing:
 static gboolean
 gst_queue_is_empty (GstQueue * queue)
 {
-  GstQueueItem *head;
+  GstQueueItem *tail;
+
+  tail = gst_queue_array_peek_tail_struct (queue->queue);
 
-  if (gst_queue_array_is_empty (queue->queue))
+  if (tail == NULL)
     return TRUE;
 
   /* Only consider the queue empty if the minimum thresholds
-   * are not reached and data is at the queue head. Otherwise
+   * are not reached and data is at the queue tail. Otherwise
    * we would block forever on serialized queries.
    */
-  head = gst_queue_array_peek_head (queue->queue);
-  if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item))
+  if (!GST_IS_BUFFER (tail->item) && !GST_IS_BUFFER_LIST (tail->item))
     return FALSE;
 
   /* It is possible that a max size is reached before all min thresholds are.
@@ -964,11 +1176,27 @@ gst_queue_leak_downstream (GstQueue * queue)
   }
 }
 
+static gboolean
+discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
+{
+  GstQueue *queue = user_data;
+  GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
+
+  if (subbuffer) {
+    *buffer = subbuffer;
+    GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
+  } else {
+    GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+  }
+
+  return FALSE;
+}
+
 static GstFlowReturn
-gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
+    GstMiniObject * obj, gboolean is_list)
 {
   GstQueue *queue;
-  GstClockTime duration, timestamp;
 
   queue = GST_QUEUE_CAST (parent);
 
@@ -980,13 +1208,32 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
-  duration = GST_BUFFER_DURATION (buffer);
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+  /* Added to not enqueue buffers in the queue while paused */
+  if (queue->empty_buffers) {
+    GST_CAT_LOG_OBJECT(queue_dataflow, queue, "drop buffer %p", obj);
+    gst_mini_object_unref(obj);
+    GST_QUEUE_MUTEX_UNLOCK(queue);
+    return GST_FLOW_OK;
+  }
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
+
+  if (!is_list) {
+    GstClockTime duration, timestamp;
+    GstBuffer *buffer = GST_BUFFER_CAST (obj);
+
+    timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
+    duration = GST_BUFFER_DURATION (buffer);
 
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
-      G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
-      GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+        G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+        GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+        GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+  } else {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "received buffer list %p with %u buffers", obj,
+        gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
+  }
 
   /* We make space available if we're "full" according to whatever
    * the user defined as "full". Note that this only applies to buffers.
@@ -1023,10 +1270,10 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
             "queue is full, waiting for free space");
 
         /* don't leak. Instead, wait for space to be available */
-        do {
-          /* for as long as the queue is filled, wait till an item was deleted. */
+        /* for as long as the queue is filled, wait till an item was deleted. */
+        while (gst_queue_is_filled (queue)) {
           GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
-        } while (gst_queue_is_filled (queue));
+        };
 
         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not full");
 
@@ -1041,19 +1288,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   }
 
   if (queue->tail_needs_discont) {
-    GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+    if (!is_list) {
+      GstBuffer *buffer = GST_BUFFER_CAST (obj);
+      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+      if (subbuffer) {
+        buffer = subbuffer;
+        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      } else {
+        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      }
 
-    if (subbuffer) {
-      buffer = subbuffer;
-      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      obj = GST_MINI_OBJECT_CAST (buffer);
     } else {
-      GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
+
+      buffer_list = gst_buffer_list_make_writable (buffer_list);
+      gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+      obj = GST_MINI_OBJECT_CAST (buffer_list);
     }
     queue->tail_needs_discont = FALSE;
   }
 
   /* put buffer in queue now */
-  gst_queue_locked_enqueue_buffer (queue, buffer);
+  if (is_list)
+    gst_queue_locked_enqueue_buffer_list (queue, obj);
+  else
+    gst_queue_locked_enqueue_buffer (queue, obj);
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -1063,7 +1324,7 @@ out_unref:
   {
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_OK;
   }
@@ -1074,7 +1335,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return ret;
   }
@@ -1083,7 +1344,7 @@ out_eos:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
@@ -1092,12 +1353,27 @@ out_unexpected:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
 }
 
+static GstFlowReturn
+gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer_list), TRUE);
+}
+
+static GstFlowReturn
+gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer), FALSE);
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
@@ -1105,31 +1381,49 @@ gst_queue_push_one (GstQueue * queue)
 {
   GstFlowReturn result = queue->srcresult;
   GstMiniObject *data;
+  gboolean is_list;
 
   data = gst_queue_locked_dequeue (queue);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (GST_IS_BUFFER (data)) {
-    GstBuffer *buffer;
+  is_list = GST_IS_BUFFER_LIST (data);
 
-    buffer = GST_BUFFER_CAST (data);
+  if (GST_IS_BUFFER (data) || is_list) {
+    if (!is_list) {
+      GstBuffer *buffer;
 
-    if (queue->head_needs_discont) {
-      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+      buffer = GST_BUFFER_CAST (data);
 
-      if (subbuffer) {
-        buffer = subbuffer;
-        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
-      } else {
-        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      if (queue->head_needs_discont) {
+        GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+        if (subbuffer) {
+          buffer = subbuffer;
+          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+        } else {
+          GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+        }
+        queue->head_needs_discont = FALSE;
       }
-      queue->head_needs_discont = FALSE;
-    }
 
-    GST_QUEUE_MUTEX_UNLOCK (queue);
-    result = gst_pad_push (queue->srcpad, buffer);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push (queue->srcpad, buffer);
+    } else {
+      GstBufferList *buffer_list;
+
+      buffer_list = GST_BUFFER_LIST_CAST (data);
+
+      if (queue->head_needs_discont) {
+        buffer_list = gst_buffer_list_make_writable (buffer_list);
+        gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+        queue->head_needs_discont = FALSE;
+      }
+
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push_list (queue->srcpad, buffer_list);
+    }
 
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@@ -1145,11 +1439,16 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping EOS buffer %p", data);
           gst_buffer_unref (GST_BUFFER_CAST (data));
+        } else if (GST_IS_BUFFER_LIST (data)) {
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping EOS buffer list %p", data);
+          gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
         } else if (GST_IS_EVENT (data)) {
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+              || type == GST_EVENT_STREAM_START) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
                 "pushing pushable event %s after EOS",
@@ -1198,6 +1497,7 @@ next:
     ret = gst_pad_peer_query (queue->srcpad, query);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
     queue->last_query = ret;
+    queue->last_handled_query = query;
     g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
@@ -1207,21 +1507,25 @@ next:
   /* ERRORS */
 no_item:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+    GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
         "exit because we have no item in the queue");
     return GST_FLOW_ERROR;
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GstFlowReturn ret = queue->srcresult;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 out_flushing_query:
   {
+    GstFlowReturn ret = queue->srcresult;
     queue->last_query = FALSE;
     g_cond_signal (&queue->query_handled);
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 }
 
@@ -1276,18 +1580,18 @@ out_flushing:
     gst_pad_pause_task (queue->srcpad);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (ret));
-    if (ret == GST_FLOW_FLUSHING)
+    if (ret == GST_FLOW_FLUSHING) {
       gst_queue_locked_flush (queue, FALSE);
-    else
+    } else {
       GST_QUEUE_SIGNAL_DEL (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+    }
     GST_QUEUE_MUTEX_UNLOCK (queue);
     /* let app know about us giving up if upstream is not expected to do so */
     /* EOS is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
-      GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)",
-              gst_flow_get_name (ret), ret));
+      GST_ELEMENT_FLOW_ERROR (queue, ret);
       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
     }
     return;
@@ -1361,9 +1665,13 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       switch (format) {
         case GST_FORMAT_BYTES:
           peer_pos -= queue->cur_level.bytes;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         case GST_FORMAT_TIME:
           peer_pos -= queue->cur_level.time;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         default:
           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
@@ -1385,13 +1693,17 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
        * limit, the best thing we can do is to return an infinite delay. In
        * reality a better estimate would be the byte/buffer rate but that is not
        * possible right now. */
-      if (queue->max_size.time > 0 && max != -1)
+      /* TODO: Use CONVERT query? */
+      if (queue->max_size.time > 0 && max != -1
+          && queue->leaky == GST_QUEUE_NO_LEAK)
         max += queue->max_size.time;
+      else if (queue->max_size.time > 0 && queue->leaky != GST_QUEUE_NO_LEAK)
+        max = MAX (queue->max_size.time, max);
       else
         max = -1;
 
       /* adjust for min-threshold */
-      if (queue->min_threshold.time > 0 && min != -1)
+      if (queue->min_threshold.time > 0)
         min += queue->min_threshold.time;
 
       gst_query_set_latency (query, live, min, max);
@@ -1427,10 +1739,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_FLUSHING;
         /* the item del signal will unblock */
-        g_cond_signal (&queue->item_del);
-        /* unblock query handler */
-        queue->last_query = FALSE;
-        g_cond_signal (&queue->query_handled);
+        GST_QUEUE_SIGNAL_DEL (queue);
         GST_QUEUE_MUTEX_UNLOCK (queue);
 
         /* step 2, wait until streaming thread stopped and flush queue */
@@ -1479,6 +1788,10 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
 
         /* step 2, make sure streaming finishes */
         result = gst_pad_stop_task (pad);
+
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, FALSE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       break;
     default:
@@ -1552,6 +1865,16 @@ gst_queue_set_property (GObject * object,
     case PROP_SILENT:
       queue->silent = g_value_get_boolean (value);
       break;
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+    case PROP_EMPTY_BUFFERS:
+      queue->empty_buffers = g_value_get_boolean (value);
+      GST_INFO_OBJECT(queue, "set empty buffer : %d", queue->empty_buffers);
+      if (queue->empty_buffers) {
+        gst_queue_locked_flush(queue, FALSE);
+      }
+      GST_INFO_OBJECT(queue, "done");
+      break;
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
     case PROP_FLUSH_ON_EOS:
       queue->flush_on_eos = g_value_get_boolean (value);
       break;
@@ -1605,6 +1928,11 @@ gst_queue_get_property (GObject * object,
     case PROP_SILENT:
       g_value_set_boolean (value, queue->silent);
       break;
+#ifdef TIZEN_FEATURE_QUEUE_MODIFICATION
+    case PROP_EMPTY_BUFFERS:
+      g_value_set_boolean(value, queue->empty_buffers);
+      break;
+#endif /* TIZEN_FEATURE_QUEUE_MODIFICATION */
     case PROP_FLUSH_ON_EOS:
       g_value_set_boolean (value, queue->flush_on_eos);
       break;