task: add GDestroyNotify to _new
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
index 11ded3d..1908188 100644 (file)
@@ -41,7 +41,7 @@
  * The default queue size limits are 100 buffers, 2MB of data, or
  * two seconds worth of data, whichever is reached first.
  *
- * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
+ * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
  * will allocate a random free filename and buffer data in the file.
  * By using this, it will buffer the entire stream data on the file independently
  * of the queue size limits, they will only be used for buffering statistics.
@@ -373,7 +373,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
 
-  gst_element_class_set_details_simple (gstelement_class, "Queue 2",
+  gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
       "Generic",
       "Simple data queue",
       "Erik Walthinsen <omega@cse.ogi.edu>, "
@@ -399,7 +399,6 @@ gst_queue2_init (GstQueue2 * queue)
   gst_pad_set_query_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
-  GST_PAD_SET_PROXY_ALLOCATION (queue->sinkpad);
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
 
   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
@@ -662,11 +661,9 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
   gst_event_copy_segment (event, segment);
 
   if (segment->format == GST_FORMAT_BYTES) {
-    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
       /* start is where we'll be getting from and as such writing next */
       queue->current = add_range (queue, segment->start);
-      /* update the stats for this range */
-      update_cur_level (queue, queue->current);
     }
   }
 
@@ -1006,6 +1003,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
   GstEvent *event;
   gboolean res;
 
+  /* until we receive the FLUSH_STOP from this seek, we skip data */
+  queue->seeking = TRUE;
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
@@ -1058,20 +1057,27 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
           (offset + length) - range->writing_pos);
 
   } else {
-    GST_INFO_OBJECT (queue, "not found in any range");
-    /* we don't have the range, see how far away we are, FIXME, find a good
-     * threshold based on the incoming rate. */
+    GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
+        " len %u", offset, length);
+    /* we don't have the range, see how far away we are */
     if (!queue->is_eos && queue->current) {
+      /* FIXME, find a good threshold based on the incoming rate. */
+      guint64 threshold = 1024 * 512;
+
       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-        if (offset < queue->current->offset || offset >
-            queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
-            queue->cur_level.bytes) {
-          perform_seek_to_offset (queue, offset);
-        } else {
+        guint64 distance;
+
+        distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+        /* don't wait for the complete buffer to fill */
+        distance = MIN (distance, threshold);
+
+        if (offset >= queue->current->offset && offset <=
+            queue->current->writing_pos + distance) {
           GST_INFO_OBJECT (queue,
               "requested data is within range, wait for data");
+          return FALSE;
         }
-      } else if (offset < queue->current->writing_pos + 200000) {
+      } else if (offset < queue->current->writing_pos + threshold) {
         update_cur_pos (queue, queue->current, offset + length);
         GST_INFO_OBJECT (queue, "wait for data");
         return FALSE;
@@ -1158,11 +1164,16 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
   guint64 file_offset;
   guint block_length, remaining, read_length;
   guint64 rb_size;
+  guint64 max_size;
   guint64 rpos;
   GstFlowReturn ret = GST_FLOW_OK;
 
   /* allocate the output buffer of the requested size */
-  buf = gst_buffer_new_allocate (NULL, length, 0);
+  if (*buffer == NULL)
+    buf = gst_buffer_new_allocate (NULL, length, NULL);
+  else
+    buf = *buffer;
+
   gst_buffer_map (buf, &info, GST_MAP_WRITE);
   data = info.data;
 
@@ -1171,6 +1182,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
   rpos = offset;
   rb_size = queue->ring_buffer_max_size;
+  max_size = QUEUE_MAX_BYTES (queue);
 
   remaining = length;
   while (remaining > 0) {
@@ -1189,16 +1201,16 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
         GST_DEBUG_OBJECT (queue,
             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
-            ", level %" G_GUINT64_FORMAT,
-            rpos, queue->current->writing_pos, level);
+            ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+            rpos, queue->current->writing_pos, level, max_size);
 
-        if (level >= rb_size) {
+        if (level >= max_size) {
           /* we don't have the data but if we have a ring buffer that is full, we
            * need to read */
           GST_DEBUG_OBJECT (queue,
-              "ring buffer full, reading ring-buffer-max-size %"
-              G_GUINT64_FORMAT " bytes", rb_size);
-          read_length = rb_size;
+              "ring buffer full, reading QUEUE_MAX_BYTES %"
+              G_GUINT64_FORMAT " bytes", max_size);
+          read_length = max_size;
         } else if (queue->is_eos) {
           /* won't get any more data so read any data we have */
           if (level) {
@@ -1206,21 +1218,20 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
                 level);
             read_length = level;
+            remaining = level;
+            length = level;
           } else
             goto hit_eos;
         }
       }
 
       if (read_length == 0) {
-        if (QUEUE_IS_USING_RING_BUFFER (queue)
-            && queue->current->max_reading_pos > rpos) {
-          /* protect cached data (data between offset and max_reading_pos)
-           * and update current level */
+        if (QUEUE_IS_USING_RING_BUFFER (queue)) {
           GST_DEBUG_OBJECT (queue,
-              "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
-              "]", rpos, queue->current->max_reading_pos);
-          queue->current->max_reading_pos = rpos;
-          update_cur_level (queue, queue->current);
+              "update current position [%" G_GUINT64_FORMAT "-%"
+              G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
+          update_cur_pos (queue, queue->current, rpos);
+          GST_QUEUE2_SIGNAL_DEL (queue);
         }
         GST_DEBUG_OBJECT (queue, "waiting for add");
         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
@@ -1290,21 +1301,24 @@ hit_eos:
   {
     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
     gst_buffer_unmap (buf, &info);
-    gst_buffer_unref (buf);
+    if (*buffer == NULL)
+      gst_buffer_unref (buf);
     return GST_FLOW_EOS;
   }
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
     gst_buffer_unmap (buf, &info);
-    gst_buffer_unref (buf);
+    if (*buffer == NULL)
+      gst_buffer_unref (buf);
     return GST_FLOW_FLUSHING;
   }
 read_error:
   {
     GST_DEBUG_OBJECT (queue, "we have a read error");
     gst_buffer_unmap (buf, &info);
-    gst_buffer_unref (buf);
+    if (*buffer == NULL)
+      gst_buffer_unref (buf);
     return ret;
   }
 }
@@ -1323,7 +1337,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
     queue->starting_segment = NULL;
   } else {
     GstFlowReturn ret;
-    GstBuffer *buffer;
+    GstBuffer *buffer = NULL;
     guint64 reading_pos;
 
     reading_pos = queue->current->reading_pos;
@@ -1552,7 +1566,7 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
   data = info.data;
 
   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
-      GST_BUFFER_OFFSET (buffer));
+      writing_pos);
 
   while (size > 0) {
     guint to_write;
@@ -2079,7 +2093,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
     case GST_EVENT_FLUSH_START:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
-      if (QUEUE_IS_USING_QUEUE (queue)) {
+      if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -2111,7 +2125,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
 
-      if (QUEUE_IS_USING_QUEUE (queue)) {
+      if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -2121,10 +2135,11 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
         queue->sinkresult = GST_FLOW_OK;
         queue->is_eos = FALSE;
         queue->unexpected = FALSE;
+        queue->seeking = FALSE;
         /* reset rate counters */
         reset_rate_timer (queue);
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
-            queue->srcpad);
+            queue->srcpad, NULL);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
       } else {
         GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2132,6 +2147,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
         queue->is_eos = FALSE;
         queue->unexpected = FALSE;
         queue->sinkresult = GST_FLOW_OK;
+        queue->seeking = FALSE;
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         gst_event_unref (event);
@@ -2181,7 +2197,12 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
 
   switch (GST_QUERY_TYPE (query)) {
     default:
-      res = gst_pad_query_default (pad, parent, query);
+      if (GST_QUERY_IS_SERIALIZED (query)) {
+        GST_WARNING_OBJECT (pad, "unhandled serialized query");
+        res = FALSE;
+      } else {
+        res = gst_pad_query_default (pad, parent, query);
+      }
       break;
   }
   return res;
@@ -2261,6 +2282,10 @@ gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
   if (queue->unexpected)
     goto out_unexpected;
 
+  /* while we didn't receive the newsegment, we're seeking and we skip data */
+  if (queue->seeking)
+    goto out_seeking;
+
   if (!gst_queue2_wait_free_space (queue))
     goto out_flushing;
 
@@ -2290,6 +2315,14 @@ out_eos:
 
     return GST_FLOW_EOS;
   }
+out_seeking:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
+    gst_mini_object_unref (item);
+
+    return GST_FLOW_OK;
+  }
 out_unexpected:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
@@ -2394,21 +2427,8 @@ next:
 
   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
-#if 0
-    GstCaps *caps;
-#endif
 
     buffer = GST_BUFFER_CAST (data);
-#if 0
-    caps = GST_BUFFER_CAPS (buffer);
-#endif
-
-#if 0
-    /* set caps before pushing the buffer so that core does not try to do
-     * something fancy to check if this is possible. */
-    if (caps && caps != GST_PAD_CAPS (queue->srcpad))
-      gst_pad_set_caps (queue->srcpad, caps);
-#endif
 
     result = gst_pad_push (queue->srcpad, buffer);
 
@@ -2438,23 +2458,9 @@ next:
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
     GstBufferList *buffer_list;
-#if 0
-    GstBuffer *first_buf;
-    GstCaps *caps;
-#endif
 
     buffer_list = GST_BUFFER_LIST_CAST (data);
 
-#if 0
-    first_buf = gst_buffer_list_get (buffer_list, 0);
-    caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
-
-    /* set caps before pushing the buffer so that core does not try to do
-     * something fancy to check if this is possible. */
-    if (caps && caps != GST_PAD_CAPS (queue->srcpad))
-      gst_pad_set_caps (queue->srcpad, caps);
-#endif
-
     result = gst_pad_push_list (queue->srcpad, buffer_list);
 
     /* need to check for srcresult here as well */
@@ -2587,12 +2593,6 @@ gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         /* now unblock the getrange function */
         GST_QUEUE2_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_OK;
-        if (queue->current) {
-          /* forget the highest read offset, we'll calculate a new one when we
-           * get the next getrange request. We need to do this in order to reset
-           * the buffering percentage */
-          queue->current->max_reading_pos = 0;
-        }
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         /* when using a temp file, we eat the event */
@@ -2723,7 +2723,9 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
             start = 0;
             /* get our available data relative to the duration */
             if (duration != -1)
-              stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
+              stop =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
+                  duration);
             else
               stop = -1;
             break;
@@ -2747,8 +2749,12 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
                 range_stop = 0;
                 break;
               }
-              range_start = 100 * queued_ranges->offset / duration;
-              range_stop = 100 * queued_ranges->writing_pos / duration;
+              range_start =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+                  queued_ranges->offset, duration);
+              range_stop =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+                  queued_ranges->writing_pos, duration);
               break;
             case GST_FORMAT_BYTES:
               range_start = queued_ranges->offset;
@@ -2944,7 +2950,8 @@ gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
     queue->sinkresult = GST_FLOW_OK;
     queue->is_eos = FALSE;
     queue->unexpected = FALSE;
-    result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
+    result =
+        gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
     GST_QUEUE2_MUTEX_UNLOCK (queue);
   } else {
     /* unblock loop function */
@@ -3195,7 +3202,7 @@ gst_queue2_set_property (GObject * object,
     case PROP_TEMP_LOCATION:
       g_free (queue->temp_location);
       queue->temp_location = g_value_dup_string (value);
-      /* you can set the property back to NULL to make it use the temp-tmpl
+      /* you can set the property back to NULL to make it use the temp-template
        * property. */
       queue->temp_location_set = queue->temp_location != NULL;
       break;