query: improve scheduling query
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
index 9f62488..c6e0296 100644 (file)
@@ -155,7 +155,7 @@ enum
                       queue->max_level.time, \
                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
                         queue->current->writing_pos - queue->current->max_reading_pos : \
-                        queue->queue->length))
+                        queue->queue.length))
 
 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
   g_mutex_lock (q->qlock);                                              \
@@ -209,13 +209,12 @@ enum
   }                                                                     \
 } G_STMT_END
 
-#define _do_init(bla) \
+#define _do_init \
     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
         "dataflow inside the queue element");
-
-GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
-    _do_init);
+#define gst_queue2_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
 
 static void gst_queue2_finalize (GObject * object);
 
@@ -224,29 +223,32 @@ static void gst_queue2_set_property (GObject * object,
 static void gst_queue2_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 
-static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
-static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
-    guint size, GstCaps * caps, GstBuffer ** buf);
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer);
 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
 static void gst_queue2_loop (GstPad * pad);
 
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 
-static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 static gboolean gst_queue2_handle_query (GstElement * element,
     GstQuery * query);
 
-static GstCaps *gst_queue2_getcaps (GstPad * pad);
-static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
+static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
+    guint64 offset, guint length, GstBuffer ** buffer);
 
-static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
-    guint length, GstBuffer ** buffer);
-static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
-
-static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
-static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
+static gboolean gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent,
+    gboolean active);
+static gboolean gst_queue2_src_activate_push (GstPad * pad, GstObject * parent,
+    gboolean active);
+static gboolean gst_queue2_sink_activate_push (GstPad * pad, GstObject * parent,
+    gboolean active);
 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
     GstStateChange transition);
 
@@ -255,27 +257,9 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
 
 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
 
-
 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
 
 static void
-gst_queue2_base_init (gpointer g_class)
-{
-  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
-
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
-
-  gst_element_class_set_details_simple (gstelement_class, "Queue 2",
-      "Generic",
-      "Simple data queue",
-      "Erik Walthinsen <omega@cse.ogi.edu>, "
-      "Wim Taymans <wim.taymans@gmail.com>");
-}
-
-static void
 gst_queue2_class_init (GstQueue2Class * klass)
 {
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
@@ -368,19 +352,30 @@ gst_queue2_class_init (GstQueue2Class * klass)
   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
       g_param_spec_uint64 ("ring-buffer-max-size",
           "Max. ring buffer size (bytes)",
-          "Max. amount of data in the ring buffer (bytes, 0 = disabled",
+          "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* set several parent class virtual functions */
   gobject_class->finalize = gst_queue2_finalize;
 
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&srctemplate));
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&sinktemplate));
+
+  gst_element_class_set_details_simple (gstelement_class, "Queue 2",
+      "Generic",
+      "Simple data queue",
+      "Erik Walthinsen <omega@cse.ogi.edu>, "
+      "Wim Taymans <wim.taymans@gmail.com>");
+
   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
 }
 
 static void
-gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
+gst_queue2_init (GstQueue2 * queue)
 {
   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
 
@@ -390,12 +385,9 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
   gst_pad_set_event_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
-  gst_pad_set_getcaps_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
-  gst_pad_set_acceptcaps_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
-  gst_pad_set_bufferalloc_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
+  gst_pad_set_query_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
+  GST_OBJECT_FLAG_SET (queue->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
 
   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
@@ -406,16 +398,11 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
   gst_pad_set_getrange_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
-  gst_pad_set_checkgetrange_function (queue->srcpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
-  gst_pad_set_getcaps_function (queue->srcpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
-  gst_pad_set_acceptcaps_function (queue->srcpad,
-      GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
   gst_pad_set_event_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
   gst_pad_set_query_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
+  GST_OBJECT_FLAG_SET (queue->srcpad, GST_PAD_FLAG_PROXY_CAPS);
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
 
   /* levels */
@@ -448,7 +435,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
   queue->item_add = g_cond_new ();
   queue->waiting_del = FALSE;
   queue->item_del = g_cond_new ();
-  queue->queue = g_queue_new ();
+  g_queue_init (&queue->queue);
 
   queue->buffering_percent = 100;
 
@@ -473,13 +460,13 @@ gst_queue2_finalize (GObject * object)
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while (!g_queue_is_empty (queue->queue)) {
-    GstMiniObject *data = g_queue_pop_head (queue->queue);
+  while (!g_queue_is_empty (&queue->queue)) {
+    GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
     gst_mini_object_unref (data);
   }
 
-  g_queue_free (queue->queue);
+  g_queue_clear (&queue->queue);
   g_mutex_free (queue->qlock);
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
@@ -626,57 +613,6 @@ init_ranges (GstQueue2 * queue)
   queue->current = add_range (queue, 0);
 }
 
-static gboolean
-gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
-{
-  GstQueue2 *queue;
-  GstPad *otherpad;
-  gboolean result;
-
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
-
-  otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
-  result = gst_pad_peer_accept_caps (otherpad, caps);
-
-  return result;
-}
-
-static GstCaps *
-gst_queue2_getcaps (GstPad * pad)
-{
-  GstQueue2 *queue;
-  GstPad *otherpad;
-  GstCaps *result;
-
-  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
-  if (G_UNLIKELY (queue == NULL))
-    return gst_caps_new_any ();
-
-  otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
-  result = gst_pad_peer_get_caps (otherpad);
-  if (result == NULL)
-    result = gst_caps_new_any ();
-
-  gst_object_unref (queue);
-
-  return result;
-}
-
-static GstFlowReturn
-gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
-    GstCaps * caps, GstBuffer ** buf)
-{
-  GstQueue2 *queue;
-  GstFlowReturn result;
-
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
-
-  /* Forward to src pad, without setting caps on the src pad */
-  result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
-
-  return result;
-}
-
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. */
 static void
@@ -685,14 +621,14 @@ update_time_level (GstQueue2 * queue)
   if (queue->sink_tainted) {
     queue->sinktime =
         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
-        queue->sink_segment.last_stop);
+        queue->sink_segment.position);
     queue->sink_tainted = FALSE;
   }
 
   if (queue->src_tainted) {
     queue->srctime =
         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
-        queue->src_segment.last_stop);
+        queue->src_segment.position);
     queue->src_tainted = FALSE;
   }
 
@@ -707,30 +643,18 @@ update_time_level (GstQueue2 * queue)
     queue->cur_level.time = 0;
 }
 
-/* take a NEWSEGMENT event and apply the values to segment, updating the time
+/* take a SEGMENT event and apply the values to segment, updating the time
  * level of queue. */
 static void
 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
     gboolean is_sink)
 {
-  gboolean update;
-  GstFormat format;
-  gdouble rate, arate;
-  gint64 start, stop, time;
-
-  gst_event_parse_new_segment_full (event, &update, &rate, &arate,
-      &format, &start, &stop, &time);
-
-  GST_DEBUG_OBJECT (queue,
-      "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
-      "format %d, "
-      "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
-      G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
+  gst_event_copy_segment (event, segment);
 
-  if (format == GST_FORMAT_BYTES) {
+  if (segment->format == GST_FORMAT_BYTES) {
     if (QUEUE_IS_USING_TEMP_FILE (queue)) {
       /* start is where we'll be getting from and as such writing next */
-      queue->current = add_range (queue, start);
+      queue->current = add_range (queue, segment->start);
       /* update the stats for this range */
       update_cur_level (queue, queue->current);
     }
@@ -738,20 +662,16 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
 
   /* now configure the values, we use these to track timestamps on the
    * sinkpad. */
-  if (format != GST_FORMAT_TIME) {
+  if (segment->format != GST_FORMAT_TIME) {
     /* non-time format, pretent the current time segment is closed with a
      * 0 start and unknown stop time. */
-    update = FALSE;
-    format = GST_FORMAT_TIME;
-    start = 0;
-    stop = -1;
-    time = 0;
+    segment->format = GST_FORMAT_TIME;
+    segment->start = 0;
+    segment->stop = -1;
+    segment->time = 0;
   }
-  gst_segment_set_newsegment_full (segment, update,
-      rate, arate, format, start, stop, time);
 
-  GST_DEBUG_OBJECT (queue,
-      "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
+  GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
 
   if (is_sink)
     queue->sink_tainted = TRUE;
@@ -775,16 +695,16 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
   /* if no timestamp is set, assume it's continuous with the previous
    * time */
   if (timestamp == GST_CLOCK_TIME_NONE)
-    timestamp = segment->last_stop;
+    timestamp = segment->position;
 
   /* add duration */
   if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+  GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
       GST_TIME_ARGS (timestamp));
 
-  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+  segment->position = timestamp;
 
   if (is_sink)
     queue->sink_tainted = TRUE;
@@ -857,7 +777,6 @@ update_buffering (GstQueue2 * queue)
       queue->buffering_percent = percent;
 
       if (!QUEUE_IS_USING_QUEUE (queue)) {
-        GstFormat fmt = GST_FORMAT_BYTES;
         gint64 duration;
 
         if (QUEUE_IS_USING_RING_BUFFER (queue))
@@ -866,10 +785,12 @@ update_buffering (GstQueue2 * queue)
           mode = GST_BUFFERING_DOWNLOAD;
 
         if (queue->byte_in_rate > 0) {
-          if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
+          if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
+                  &duration)) {
             buffering_left =
                 (gdouble) ((duration -
                     queue->current->writing_pos) * 1000) / queue->byte_in_rate;
+          }
         } else {
           buffering_left = G_MAXINT64;
         }
@@ -898,6 +819,7 @@ reset_rate_timer (GstQueue2 * queue)
   queue->bytes_in = 0;
   queue->bytes_out = 0;
   queue->byte_in_rate = 0.0;
+  queue->byte_in_period = 0;
   queue->byte_out_rate = 0.0;
   queue->last_in_elapsed = 0.0;
   queue->last_out_elapsed = 0.0;
@@ -910,8 +832,11 @@ reset_rate_timer (GstQueue2 * queue)
 /* Tuning for rate estimation. We use a large window for the input rate because
  * it should be stable when connected to a network. The output rate is less
  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
- * therefore adapt more quickly. */
-#define AVG_IN(avg,val)  ((avg) * 15.0 + (val)) / 16.0
+ * therefore adapt more quickly.
+ * However, initial input rate may be subject to a burst, and should therefore
+ * initially also adapt more quickly to changes, and only later on give higher
+ * weight to previous values. */
+#define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
 
 static void
@@ -933,14 +858,20 @@ update_in_rates (GstQueue2 * queue)
     period = elapsed - queue->last_in_elapsed;
 
     GST_DEBUG_OBJECT (queue,
-        "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
+        "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
+        period, queue->bytes_in, queue->byte_in_period);
 
     byte_in_rate = queue->bytes_in / period;
 
     if (queue->byte_in_rate == 0.0)
       queue->byte_in_rate = byte_in_rate;
     else
-      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
+      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
+          (double) queue->byte_in_period, period);
+
+    /* another data point, cap at 16 for long time running average */
+    if (queue->byte_in_period < 16 * RATE_INTERVAL)
+      queue->byte_in_period += period;
 
     /* reset the values to calculate rate over the next interval */
     queue->last_in_elapsed = elapsed;
@@ -1073,7 +1004,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
   } else {
     GST_INFO_OBJECT (queue, "not found in any range");
     /* we don't have the range, see how far away we are, FIXME, find a good
-     * threshold based on the incomming rate. */
+     * threshold based on the incoming rate. */
     if (!queue->is_eos && queue->current) {
       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
         if (offset < queue->current->offset || offset >
@@ -1106,9 +1037,9 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
 #endif
 
-static gint64
+static GstFlowReturn
 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
-    guint8 * dst)
+    guint8 * dst, gint64 * read_return)
 {
   guint8 *ring_buffer;
   size_t res;
@@ -1140,7 +1071,9 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
       goto eos;
   }
 
-  return res;
+  *read_return = res;
+
+  return GST_FLOW_OK;
 
 seek_failed:
   {
@@ -1155,7 +1088,7 @@ could_not_read:
 eos:
   {
     GST_DEBUG ("non-regular file hits EOS");
-    return GST_FLOW_UNEXPECTED;
+    return GST_FLOW_EOS;
   }
 }
 
@@ -1167,13 +1100,13 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
   guint8 *data;
   guint64 file_offset;
   guint block_length, remaining, read_length;
-  gint64 read_return;
   guint64 rb_size;
   guint64 rpos;
+  GstFlowReturn ret = GST_FLOW_OK;
 
   /* allocate the output buffer of the requested size */
-  buf = gst_buffer_new_and_alloc (length);
-  data = GST_BUFFER_DATA (buf);
+  buf = gst_buffer_new_allocate (NULL, length, 0);
+  data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
 
   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
       offset);
@@ -1215,12 +1148,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
                 level);
             read_length = level;
-          } else {
-            GST_DEBUG_OBJECT (queue,
-                "EOS hit and we don't have any requested data");
-            gst_buffer_unref (buf);
-            return GST_FLOW_UNEXPECTED;
-          }
+          } else
+            goto hit_eos;
         }
       }
 
@@ -1247,7 +1176,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     /* set range reading_pos to actual reading position for this read */
     queue->current->reading_pos = rpos;
 
-    /* congfigure how much and from where to read */
+    /* configure how much and from where to read */
     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
       file_offset =
           (queue->current->rb_offset + (rpos -
@@ -1264,10 +1193,12 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
     /* while we still have data to read, we loop */
     while (read_length > 0) {
-      read_return =
+      gint64 read_return;
+
+      ret =
           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
-          data);
-      if (read_return < 0)
+          data, &read_return);
+      if (ret != GST_FLOW_OK)
         goto read_error;
 
       file_offset += read_return;
@@ -1286,15 +1217,22 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
   }
 
-  GST_BUFFER_SIZE (buf) = length;
+  gst_buffer_unmap (buf, data, length);
+
   GST_BUFFER_OFFSET (buf) = offset;
   GST_BUFFER_OFFSET_END (buf) = offset + length;
 
   *buffer = buf;
 
-  return GST_FLOW_OK;
+  return ret;
 
   /* ERRORS */
+hit_eos:
+  {
+    GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
+    gst_buffer_unref (buf);
+    return GST_FLOW_EOS;
+  }
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
@@ -1304,8 +1242,9 @@ out_flushing:
 read_error:
   {
     GST_DEBUG_OBJECT (queue, "we have a read error");
+    gst_buffer_unmap (buf, data, 0);
     gst_buffer_unref (buf);
-    return read_return;
+    return ret;
   }
 }
 
@@ -1333,7 +1272,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
       case GST_FLOW_OK:
         item = GST_MINI_OBJECT_CAST (buffer);
         break;
-      case GST_FLOW_UNEXPECTED:
+      case GST_FLOW_EOS:
         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
         break;
       default:
@@ -1469,8 +1408,8 @@ gst_queue2_locked_flush (GstQueue2 * queue)
       gst_queue2_flush_temp_file (queue);
     init_ranges (queue);
   } else {
-    while (!g_queue_is_empty (queue->queue)) {
-      GstMiniObject *data = g_queue_pop_head (queue->queue);
+    while (!g_queue_is_empty (&queue->queue)) {
+      GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
       /* Then lose another reference because we are supposed to destroy that
          data when flushing */
@@ -1529,8 +1468,9 @@ out_flushing:
 static gboolean
 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
 {
-  guint8 *data, *ring_buffer;
+  guint8 *odata, *data, *ring_buffer;
   guint size, rb_size;
+  gsize osize;
   guint64 writing_pos, new_writing_pos;
   GstQueue2Range *range, *prev, *next;
 
@@ -1541,8 +1481,10 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
   ring_buffer = queue->ring_buffer;
   rb_size = queue->ring_buffer_max_size;
 
-  size = GST_BUFFER_SIZE (buffer);
-  data = GST_BUFFER_DATA (buffer);
+  odata = gst_buffer_map (buffer, &osize, NULL, GST_MAP_READ);
+
+  size = osize;
+  data = odata;
 
   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
       GST_BUFFER_OFFSET (buffer));
@@ -1764,7 +1706,9 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
 
     GST_QUEUE2_SIGNAL_ADD (queue);
-  };
+  }
+
+  gst_buffer_unmap (buffer, odata, osize);
 
   return TRUE;
 
@@ -1772,12 +1716,14 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
-    /* FIXME - GST_FLOW_UNEXPECTED ? */
+    gst_buffer_unmap (buffer, odata, osize);
+    /* FIXME - GST_FLOW_EOS ? */
     return FALSE;
   }
 seek_failed:
   {
     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
+    gst_buffer_unmap (buffer, odata, osize);
     return FALSE;
   }
 handle_error:
@@ -1793,6 +1739,7 @@ handle_error:
             ("%s", g_strerror (errno)));
       }
     }
+    gst_buffer_unmap (buffer, odata, osize);
     return FALSE;
   }
 }
@@ -1806,7 +1753,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
     guint size;
 
     buffer = GST_BUFFER_CAST (item);
-    size = GST_BUFFER_SIZE (buffer);
+    size = gst_buffer_get_size (buffer);
 
     /* add buffer to the statistics */
     if (QUEUE_IS_USING_QUEUE (queue)) {
@@ -1836,7 +1783,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
         GST_DEBUG_OBJECT (queue, "we have EOS");
         queue->is_eos = TRUE;
         break;
-      case GST_EVENT_NEWSEGMENT:
+      case GST_EVENT_SEGMENT:
         apply_segment (queue, event, &queue->sink_segment, TRUE);
         /* This is our first new segment, we hold it
          * as we can't save it on the temp file */
@@ -1850,7 +1797,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
           queue->starting_segment = event;
           item = NULL;
         }
-        /* a new segment allows us to accept more buffers if we got UNEXPECTED
+        /* a new segment allows us to accept more buffers if we got EOS
          * from downstream */
         queue->unexpected = FALSE;
         break;
@@ -1872,7 +1819,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
       update_buffering (queue);
 
     if (QUEUE_IS_USING_QUEUE (queue)) {
-      g_queue_push_tail (queue->queue, item);
+      g_queue_push_tail (&queue->queue, item);
     } else {
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
     }
@@ -1903,7 +1850,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
   if (!QUEUE_IS_USING_QUEUE (queue))
     item = gst_queue2_read_item_from_file (queue);
   else
-    item = g_queue_pop_head (queue->queue);
+    item = g_queue_pop_head (&queue->queue);
 
   if (item == NULL)
     goto no_item;
@@ -1913,7 +1860,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
     guint size;
 
     buffer = GST_BUFFER_CAST (item);
-    size = GST_BUFFER_SIZE (buffer);
+    size = gst_buffer_get_size (buffer);
     *is_buffer = TRUE;
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
@@ -1945,7 +1892,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
         /* queue is empty now that we dequeued the EOS */
         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
         break;
-      case GST_EVENT_NEWSEGMENT:
+      case GST_EVENT_SEGMENT:
         apply_segment (queue, event, &queue->src_segment, FALSE);
         break;
       default:
@@ -1970,11 +1917,12 @@ no_item:
 }
 
 static gboolean
-gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
@@ -2075,6 +2023,20 @@ out_eos:
 }
 
 static gboolean
+gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+    GstQuery * query)
+{
+  gboolean res;
+
+  switch (GST_QUERY_TYPE (query)) {
+    default:
+      res = gst_pad_query_default (pad, parent, query);
+      break;
+  }
+  return res;
+}
+
+static gboolean
 gst_queue2_is_empty (GstQueue2 * queue)
 {
   /* never empty on EOS */
@@ -2084,7 +2046,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
     return queue->current->writing_pos <= queue->current->max_reading_pos;
   } else {
-    if (queue->queue->length == 0)
+    if (queue->queue.length == 0)
       return TRUE;
   }
 
@@ -2136,15 +2098,15 @@ gst_queue2_is_filled (GstQueue2 * queue)
 }
 
 static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE2 (parent);
 
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+      G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
 
@@ -2184,16 +2146,15 @@ out_eos:
     GST_QUEUE2_MUTEX_UNLOCK (queue);
     gst_buffer_unref (buffer);
 
-    return GST_FLOW_UNEXPECTED;
+    return GST_FLOW_EOS;
   }
 out_unexpected:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-        "exit because we received UNEXPECTED");
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
     gst_buffer_unref (buffer);
 
-    return GST_FLOW_UNEXPECTED;
+    return GST_FLOW_EOS;
   }
 }
 
@@ -2215,52 +2176,57 @@ next:
 
   if (is_buffer) {
     GstBuffer *buffer;
+#if 0
     GstCaps *caps;
+#endif
 
     buffer = GST_BUFFER_CAST (data);
+#if 0
     caps = GST_BUFFER_CAPS (buffer);
+#endif
 
+#if 0
     /* set caps before pushing the buffer so that core does not try to do
      * something fancy to check if this is possible. */
     if (caps && caps != GST_PAD_CAPS (queue->srcpad))
       gst_pad_set_caps (queue->srcpad, caps);
+#endif
 
     result = gst_pad_push (queue->srcpad, buffer);
 
     /* need to check for srcresult here as well */
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
-    if (result == GST_FLOW_UNEXPECTED) {
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-          "got UNEXPECTED from downstream");
+    if (result == GST_FLOW_EOS) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
       /* stop pushing buffers, we dequeue all items until we see an item that we
-       * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+       * can push again, which is EOS or SEGMENT. If there is nothing in the
        * queue we can push, we set a flag to make the sinkpad refuse more
-       * buffers with an UNEXPECTED return value until we receive something
+       * buffers with an EOS return value until we receive something
        * pushable again or we get flushed. */
       while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
         if (is_buffer) {
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping UNEXPECTED buffer %p", data);
+              "dropping EOS buffer %p", data);
           gst_buffer_unref (GST_BUFFER_CAST (data));
         } else if (GST_IS_EVENT (data)) {
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-                "pushing pushable event %s after UNEXPECTED",
+                "pushing pushable event %s after EOS",
                 GST_EVENT_TYPE_NAME (event));
             goto next;
           }
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping UNEXPECTED event %p", event);
+              "dropping EOS event %p", event);
           gst_event_unref (event);
         }
       }
       /* no more items in the queue. Set the unexpected flag so that upstream
        * make us refuse any more buffers on the sinkpad. Since we will still
-       * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+       * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
        * task function does not shut down. */
       queue->unexpected = TRUE;
       result = GST_FLOW_OK;
@@ -2271,11 +2237,11 @@ next:
 
     gst_pad_push_event (queue->srcpad, event);
 
-    /* if we're EOS, return UNEXPECTED so that the task pauses. */
+    /* if we're EOS, return EOS so that the task pauses. */
     if (type == GST_EVENT_EOS) {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-          "pushed EOS event %p, return UNEXPECTED", event);
-      result = GST_FLOW_UNEXPECTED;
+          "pushed EOS event %p, return EOS", event);
+      result = GST_FLOW_EOS;
     }
 
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
@@ -2296,7 +2262,7 @@ out_flushing:
   }
 }
 
-/* called repeadedly with @pad as the source pad. This function should push out
+/* called repeatedly with @pad as the source pad. This function should push out
  * data to the peer element. */
 static void
 gst_queue2_loop (GstPad * pad)
@@ -2350,8 +2316,8 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
     /* let app know about us giving up if upstream is not expected to do so */
-    /* UNEXPECTED is already taken care of elsewhere */
-    if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
+    /* EOS is already taken care of elsewhere */
+    if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
           (_("Internal data flow error.")),
           ("streaming task paused, reason %s (%d)",
@@ -2363,15 +2329,11 @@ out_flushing:
 }
 
 static gboolean
-gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   gboolean res = TRUE;
-  GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  GstQueue2 *queue = GST_QUEUE2 (parent);
 
-  if (G_UNLIKELY (queue == NULL)) {
-    gst_event_unref (event);
-    return FALSE;
-  }
 #ifndef GST_DISABLE_GST_DEBUG
   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
       event, GST_EVENT_TYPE_NAME (event));
@@ -2421,31 +2383,15 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
       break;
   }
 
-  gst_object_unref (queue);
   return res;
 }
 
 static gboolean
-gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
-{
-  gboolean ret = FALSE;
-  GstPad *peer;
-
-  if ((peer = gst_pad_get_peer (pad))) {
-    ret = gst_pad_query (peer, query);
-    gst_object_unref (peer);
-  }
-  return ret;
-}
-
-static gboolean
-gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
-  if (G_UNLIKELY (queue == NULL))
-    return FALSE;
+  queue = GST_QUEUE2 (parent);
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
@@ -2453,7 +2399,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
       gint64 peer_pos;
       GstFormat format;
 
-      if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+      if (!gst_pad_peer_query (queue->sinkpad, query))
         goto peer_failed;
 
       /* get peer position */
@@ -2480,7 +2426,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
     {
       GST_DEBUG_OBJECT (queue, "doing peer query");
 
-      if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+      if (!gst_pad_peer_query (queue->sinkpad, query))
         goto peer_failed;
 
       GST_DEBUG_OBJECT (queue, "peer query success");
@@ -2495,7 +2441,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
       /* FIXME - is this condition correct? what should ring buffer do? */
       if (QUEUE_IS_USING_QUEUE (queue)) {
         /* no temp file, just forward to the peer */
-        if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+        if (!gst_pad_peer_query (queue->sinkpad, query))
           goto peer_failed;
         GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
       } else {
@@ -2503,7 +2449,6 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
         guint64 writing_pos;
         gint percent;
         gint64 estimated_total, buffering_left;
-        GstFormat peer_fmt;
         gint64 duration;
         gboolean peer_res, is_buffering, is_eos;
         gdouble byte_in_rate, byte_out_rate;
@@ -2526,9 +2471,8 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
           duration = writing_pos;
         } else {
           /* get duration of upstream in bytes */
-          peer_fmt = GST_FORMAT_BYTES;
-          peer_res = gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
-              &duration);
+          peer_res = gst_pad_peer_query_duration (queue->sinkpad,
+              GST_FORMAT_BYTES, &duration);
         }
 
         /* calculate remaining and total download time */
@@ -2609,21 +2553,35 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
       }
       break;
     }
+    case GST_QUERY_SCHEDULING:
+    {
+      gboolean pull_mode;
+      GstSchedulingFlags flags = 0;
+
+      /* we can operate in pull mode when we are using a tempfile */
+      pull_mode = !QUEUE_IS_USING_QUEUE (queue);
+
+      if (pull_mode)
+        flags |= GST_SCHEDULING_FLAG_SEEKABLE;
+      gst_query_set_scheduling (query, flags, 0, -1, 0);
+      if (pull_mode)
+        gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
+      gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
+      break;
+    }
     default:
       /* peer handled other queries */
-      if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+      if (!gst_pad_query_default (pad, parent, query))
         goto peer_failed;
       break;
   }
 
-  gst_object_unref (queue);
   return TRUE;
 
   /* ERRORS */
 peer_failed:
   {
     GST_DEBUG_OBJECT (queue, "failed peer query");
-    gst_object_unref (queue);
     return FALSE;
   }
 }
@@ -2631,30 +2589,33 @@ peer_failed:
 static gboolean
 gst_queue2_handle_query (GstElement * element, GstQuery * query)
 {
+  GstQueue2 *queue = GST_QUEUE2 (element);
+
   /* simply forward to the srcpad query function */
-  return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
+  return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
+      query);
 }
 
 static void
 gst_queue2_update_upstream_size (GstQueue2 * queue)
 {
-  GstFormat fmt = GST_FORMAT_BYTES;
   gint64 upstream_size = -1;
 
-  if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &upstream_size)) {
+  if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
+          &upstream_size)) {
     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
     queue->upstream_size = upstream_size;
   }
 }
 
 static GstFlowReturn
-gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
-    GstBuffer ** buffer)
+gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
+    guint length, GstBuffer ** buffer)
 {
   GstQueue2 *queue;
   GstFlowReturn ret;
 
-  queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
+  queue = GST_QUEUE2_CAST (parent);
 
   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
@@ -2686,8 +2647,6 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
   ret = gst_queue2_create_read (queue, offset, length, buffer);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
-  gst_object_unref (queue);
-
   return ret;
 
   /* ERRORS */
@@ -2697,42 +2656,25 @@ out_flushing:
 
     GST_DEBUG_OBJECT (queue, "we are flushing");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_object_unref (queue);
     return ret;
   }
 out_unexpected:
   {
     GST_DEBUG_OBJECT (queue, "read beyond end of file");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_object_unref (queue);
-    return GST_FLOW_UNEXPECTED;
+    return GST_FLOW_EOS;
   }
 }
 
-static gboolean
-gst_queue2_src_checkgetrange_function (GstPad * pad)
-{
-  GstQueue2 *queue;
-  gboolean ret;
-
-  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
-
-  /* we can operate in pull mode when we are using a tempfile */
-  ret = !QUEUE_IS_USING_QUEUE (queue);
-
-  gst_object_unref (GST_OBJECT (queue));
-
-  return ret;
-}
-
 /* sink currently only operates in push mode */
 static gboolean
-gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
+gst_queue2_sink_activate_push (GstPad * pad, GstObject * parent,
+    gboolean active)
 {
   gboolean result = TRUE;
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  queue = GST_QUEUE2 (parent);
 
   if (active) {
     GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2753,20 +2695,18 @@ gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
     GST_QUEUE2_MUTEX_UNLOCK (queue);
   }
 
-  gst_object_unref (queue);
-
   return result;
 }
 
 /* src operating in push mode, we start a task on the source pad that pushes out
  * buffers from the queue */
 static gboolean
-gst_queue2_src_activate_push (GstPad * pad, gboolean active)
+gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
 {
   gboolean result = FALSE;
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  queue = GST_QUEUE2 (parent);
 
   if (active) {
     GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2791,19 +2731,17 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
     result = gst_pad_stop_task (pad);
   }
 
-  gst_object_unref (queue);
-
   return result;
 }
 
 /* pull mode, downstream will call our getrange function */
 static gboolean
-gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
+gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
 {
   gboolean result;
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  queue = GST_QUEUE2 (parent);
 
   if (active) {
     GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2825,17 +2763,15 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
       queue->is_eos = FALSE;
       queue->unexpected = FALSE;
       queue->upstream_size = 0;
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
     } else {
-      GST_QUEUE2_MUTEX_LOCK (queue);
       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
       /* this is not allowed, we cannot operate in pull mode without a temp
        * file. */
       queue->srcresult = GST_FLOW_WRONG_STATE;
       queue->sinkresult = GST_FLOW_WRONG_STATE;
       result = FALSE;
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
     }
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
   } else {
     GST_QUEUE2_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
@@ -2846,7 +2782,6 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
     result = TRUE;
     GST_QUEUE2_MUTEX_UNLOCK (queue);
   }
-  gst_object_unref (queue);
 
   return result;
 }