queue2: improve buffer level measurement in download mode
authorWim Taymans <wim.taymans@collabora.co.uk>
Thu, 25 Mar 2010 16:21:02 +0000 (17:21 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Thu, 25 Mar 2010 16:38:01 +0000 (17:38 +0100)
Keep track of the current buffer level in the current range in download mode so
that we post the correct buffering messages.

plugins/elements/gstqueue2.c

index f64a0f8..67ee674 100644 (file)
@@ -883,6 +883,20 @@ update_out_rates (GstQueue2 * queue)
 }
 
 static void
+update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
+{
+  guint64 max_reading_pos, writing_pos;
+
+  writing_pos = range->writing_pos;
+  max_reading_pos = range->max_reading_pos;
+
+  if (writing_pos > max_reading_pos)
+    queue->cur_level.bytes = writing_pos - max_reading_pos;
+  else
+    queue->cur_level.bytes = 0;
+}
+
+static void
 gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
 {
   guint size;
@@ -912,6 +926,10 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
   }
   writing_pos += size;
 
+  GST_INFO_OBJECT (queue,
+      "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
+      writing_pos, max_reading_pos);
+
   if (writing_pos > max_reading_pos)
     queue->cur_level.bytes = writing_pos - max_reading_pos;
   else
@@ -919,9 +937,9 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
 
   /* try to merge with next range */
   while ((next = queue->current->next)) {
-    GST_DEBUG_OBJECT (queue,
-        "cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos,
-        next->offset);
+    GST_INFO_OBJECT (queue,
+        "checking merge with next range %" G_GUINT64_FORMAT " < %"
+        G_GUINT64_FORMAT, writing_pos, next->offset);
     if (writing_pos < next->offset)
       break;
 
@@ -939,6 +957,21 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
   queue->current->writing_pos = writing_pos;
 }
 
+static void
+update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
+{
+  guint64 reading_pos, max_reading_pos;
+
+  reading_pos = pos;
+  max_reading_pos = range->max_reading_pos;
+
+  max_reading_pos = MAX (max_reading_pos, reading_pos);
+
+  range->max_reading_pos = max_reading_pos;
+
+  update_cur_level (queue, range);
+}
+
 static gboolean
 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
 {
@@ -958,6 +991,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
 
   if (res) {
     queue->current = add_range (queue, offset);
+    /* update the stats for this range */
+    update_cur_level (queue, queue->current);
   }
   return res;
 }
@@ -968,14 +1003,18 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
 {
   GstQueue2Range *range;
 
-  GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset,
-      length);
+  GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
+      offset, length);
 
   if ((range = find_range (queue, offset, length))) {
     if (queue->current != range) {
-      GST_DEBUG_OBJECT (queue, "switching ranges");
+      GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
       perform_seek_to_offset (queue, range->writing_pos);
     }
+
+    /* update the current reading position in the range */
+    update_cur_pos (queue, queue->current, offset + length);
+
     /* we have a range for offset */
     GST_DEBUG_OBJECT (queue,
         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
@@ -988,10 +1027,16 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
       return TRUE;
 
   } else {
+    GST_INFO_OBJECT (queue, "not found in any range");
     /* we don't have the range, see how far away we are, FIXME, find a good
      * threshold based on the incomming rate. */
-    if (queue->current && offset < queue->current->writing_pos + 200000)
-      return FALSE;
+    if (queue->current) {
+      if (offset < queue->current->writing_pos + 200000) {
+        update_cur_pos (queue, queue->current, offset + length);
+        GST_INFO_OBJECT (queue, "wait for data");
+        return FALSE;
+      }
+    }
 
     /* too far away, do a seek */
     perform_seek_to_offset (queue, offset);
@@ -1006,7 +1051,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 {
   size_t res;
   GstBuffer *buf;
-  guint64 reading_pos, max_reading_pos, writing_pos;
 
   /* check if we have enough data at @offset. If there is not enough data, we
    * block and wait. */
@@ -1049,21 +1093,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
   *buffer = buf;
 
-  reading_pos = queue->current->reading_pos;
-  writing_pos = queue->current->writing_pos;
-  max_reading_pos = queue->current->max_reading_pos;
-
-  reading_pos = offset + length;
-  max_reading_pos = MAX (max_reading_pos, reading_pos);
-
-  if (writing_pos > max_reading_pos)
-    queue->cur_level.bytes = writing_pos - max_reading_pos;
-  else
-    queue->cur_level.bytes = 0;
-
-  queue->current->reading_pos = reading_pos;
-  queue->current->max_reading_pos = max_reading_pos;
-
   return GST_FLOW_OK;
 
   /* ERRORS */
@@ -1277,8 +1306,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
     size = GST_BUFFER_SIZE (buffer);
 
     /* add buffer to the statistics */
-    queue->cur_level.buffers++;
-    queue->cur_level.bytes += size;
+    if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      queue->cur_level.buffers++;
+      queue->cur_level.bytes += size;
+    }
     queue->bytes_in += size;
 
     /* apply new buffer to segment stats */
@@ -1382,9 +1413,12 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved buffer %p from queue", buffer);
 
-    queue->cur_level.buffers--;
-    queue->cur_level.bytes -= size;
+    if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      queue->cur_level.buffers--;
+      queue->cur_level.bytes -= size;
+    }
     queue->bytes_out += size;
+
     apply_buffer (queue, buffer, &queue->src_segment);
     /* update the byterate stats */
     update_out_rates (queue);
@@ -1854,6 +1888,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
         /* now unblock the getrange function */
         GST_QUEUE2_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_OK;
+        if (queue->current)
+          queue->current->max_reading_pos = 0;
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         /* when using a temp file, we eat the event */