queue2: merge write buffer functions and fix bugs
authorRobert Swain <robert.swain@collabora.co.uk>
Fri, 18 Jun 2010 12:36:33 +0000 (14:36 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 13 Aug 2010 14:38:57 +0000 (16:38 +0200)
Cached data could have been overwritten so it is now protected until
it is read. Similarly data was overread as _have_data () was always
looking for the originally requested data even if part of it had been
read already.

plugins/elements/gstqueue2.c

index 7dfcb5f..0b096c2 100644 (file)
@@ -366,8 +366,8 @@ gst_queue2_class_init (GstQueue2Class * klass)
   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
       g_param_spec_uint64 ("ring-buffer-max-size",
           "Max. ring buffer size (bytes)",
-          "Max. amount of data in the ring buffer (bytes, 0=unlimited)",
-          DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
+          "Max. amount of data in the ring buffer (bytes, 0 = disabled",
+          0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* set several parent class virtual functions */
@@ -951,89 +951,6 @@ update_out_rates (GstQueue2 * queue)
       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
 }
 
-#ifdef HAVE_FSEEKO
-#define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
-#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
-#define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
-#else
-#define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
-#endif
-
-static gboolean
-gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
-{
-  guint size;
-  guint8 *data;
-  guint64 writing_pos, max_reading_pos;
-  GstQueue2Range *next;
-
-  writing_pos = queue->current->writing_pos;
-  max_reading_pos = queue->current->max_reading_pos;
-
-  if (FSEEK_FILE (queue->temp_file, writing_pos))
-    goto seek_failed;
-
-  data = GST_BUFFER_DATA (buffer);
-  size = GST_BUFFER_SIZE (buffer);
-
-  if (fwrite (data, size, 1, queue->temp_file) != 1)
-    goto handle_error;
-
-  writing_pos += size;
-
-  GST_INFO_OBJECT (queue,
-      "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
-      writing_pos, max_reading_pos);
-
-  /* try to merge with next range */
-  while ((next = queue->current->next)) {
-    GST_INFO_OBJECT (queue,
-        "checking merge with next range %" G_GUINT64_FORMAT " < %"
-        G_GUINT64_FORMAT, writing_pos, next->offset);
-    if (writing_pos < next->offset)
-      break;
-
-    GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
-        next->writing_pos);
-
-    /* remove the group, we could choose to not read the data in this range
-     * again. This would involve us doing a seek to the current writing position
-     * in the range. FIXME, It would probably make sense to do a seek when there
-     * is a lot of data in the range we merged with to avoid reading it all
-     * again. */
-    queue->current->next = next->next;
-    g_slice_free (GstQueue2Range, next);
-
-    debug_ranges (queue);
-  }
-  queue->current->writing_pos = writing_pos;
-  update_cur_level (queue, queue->current);
-
-  return TRUE;
-
-  /* ERRORS */
-seek_failed:
-  {
-    GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
-    return FALSE;
-  }
-handle_error:
-  {
-    switch (errno) {
-      case ENOSPC:{
-        GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
-        break;
-      }
-      default:{
-        GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
-            (_("Error while writing to download file.")),
-            ("%s", g_strerror (errno)));
-      }
-    }
-    return FALSE;
-  }
-}
-
 static void
 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
 {
@@ -1087,9 +1004,6 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
       perform_seek_to_offset (queue, range->writing_pos);
     }
 
-    /* update the current reading position in the range */
-    update_cur_pos (queue, queue->current, offset + length);
-
     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
 
@@ -1133,6 +1047,14 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
   return FALSE;
 }
 
+#ifdef HAVE_FSEEKO
+#define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
+#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
+#define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
+#else
+#define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
+#endif
+
 static gint64
 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
     guint8 * dst)
@@ -1185,6 +1107,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
   guint block_length, remaining, read_length;
   gint64 read_return;
   guint64 rb_size;
+  guint64 rpos;
 
   /* allocate the output buffer of the requested size */
   buf = gst_buffer_new_and_alloc (length);
@@ -1193,20 +1116,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
       offset);
 
+  rpos = queue->current->reading_pos = offset;
   rb_size = queue->ring_buffer_max_size;
 
   remaining = length;
   while (remaining > 0) {
     /* configure how much/whether to read */
-    if (!gst_queue2_have_data (queue, offset, length)) {
+    if (!gst_queue2_have_data (queue, rpos, remaining)) {
       read_length = 0;
 
       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
         guint64 level;
 
         /* calculate how far away the offset is */
-        if (queue->current->writing_pos > offset)
-          level = queue->current->writing_pos - offset;
+        if (queue->current->writing_pos > rpos)
+          level = queue->current->writing_pos - rpos;
         else
           level = 0;
 
@@ -1224,7 +1148,14 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
           read_length = rb_size;
         }
       }
+
       if (read_length == 0) {
+        if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+          /* protect cached data (data between offset and max_reading_pos)
+           * and update current level */
+          queue->current->max_reading_pos = rpos;
+          update_cur_level (queue, queue->current);
+        }
         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
         continue;
       }
@@ -1236,7 +1167,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     /* congfigure how much and from where to read */
     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
       file_offset =
-          (queue->current->rb_offset + (offset -
+          (queue->current->rb_offset + (rpos -
               queue->current->offset)) % rb_size;
       if (file_offset + read_length > rb_size) {
         block_length = rb_size - file_offset;
@@ -1244,7 +1175,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
         block_length = read_length;
       }
     } else {
-      file_offset = offset;
+      file_offset = rpos;
       block_length = read_length;
     }
 
@@ -1265,7 +1196,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
       block_length = read_length;
       remaining -= read_return;
 
-      queue->current->reading_pos += read_return;
+      rpos = (queue->current->reading_pos += read_return);
+      update_cur_pos (queue, queue->current, queue->current->reading_pos);
     }
     GST_QUEUE2_SIGNAL_DEL (queue);
     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
@@ -1504,15 +1436,19 @@ out_flushing:
 }
 
 static gboolean
-gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
+gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
 {
   guint8 *data;
   guint size, rb_size;
-  guint64 writing_pos, new_writing_pos;
+  guint64 writing_pos, new_writing_pos, max_reading_pos;
   gint64 space;
-  GstQueue2Range *range, *prev;
+  GstQueue2Range *range, *prev, *next;
 
-  writing_pos = queue->current->rb_writing_pos;
+  if (QUEUE_IS_USING_RING_BUFFER (queue))
+    writing_pos = queue->current->rb_writing_pos;
+  else
+    writing_pos = queue->current->writing_pos;
+  max_reading_pos = queue->current->max_reading_pos;
   rb_size = queue->ring_buffer_max_size;
 
   size = GST_BUFFER_SIZE (buffer);
@@ -1521,46 +1457,77 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
   while (size > 0) {
     guint to_write;
 
-    /* calculate the space in the ring buffer not used by data from the
-     * current range */
-    while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
-      /* wait until there is some free space */
-      GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
-    }
-    /* get the amount of space we have */
-    space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
-
-    /* calculate if we need to split or if we can write the entire buffer now */
-    to_write = MIN (size, space);
-
-    /* the writing position in the ring buffer after writing (part or all of)
-     * the buffer */
-    new_writing_pos = (writing_pos + to_write) % rb_size;
-
-    prev = NULL;
-    range = queue->ranges;
-
-    /* if we need to overwrite data in the ring buffer, we need to update the
-     * ranges
-     * warning: this code is complicated and includes some simplifications -
-     * pen, paper and diagrams for the cases recommended! */
-    while (range) {
-      guint64 range_data_start, range_data_end;
-      GstQueue2Range *range_to_destroy = NULL;
-
-      /* we don't edit the current range here */
-      if (range == queue->current)
-        goto next_range;
+    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+      /* calculate the space in the ring buffer not used by data from
+       * the current range */
+      while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
+        /* wait until there is some free space */
+        GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
+      }
+      /* get the amount of space we have */
+      space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+
+      /* calculate if we need to split or if we can write the entire
+       * buffer now */
+      to_write = MIN (size, space);
+
+      /* the writing position in the ring buffer after writing (part
+       * or all of) the buffer */
+      new_writing_pos = (writing_pos + to_write) % rb_size;
+
+      debug_ranges (queue);
+
+      prev = NULL;
+      range = queue->ranges;
+
+      /* if we need to overwrite data in the ring buffer, we need to
+       * update the ranges
+       *
+       * warning: this code is complicated and includes some
+       * simplifications - pen, paper and diagrams for the cases
+       * recommended! */
+      while (range) {
+        guint64 range_data_start, range_data_end;
+        GstQueue2Range *range_to_destroy = NULL;
+
+        /* we don't edit the current range here */
+        if (range == queue->current)
+          goto next_range;
 
-      range_data_start = range->rb_offset;
-      range_data_end = range->rb_writing_pos;
+        range_data_start = range->rb_offset;
+        range_data_end = range->rb_writing_pos;
+
+        if (range_data_end > range_data_start) {
+          if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
+            goto next_range;
+
+          if (new_writing_pos > range_data_start) {
+            if (new_writing_pos >= range_data_end) {
+              GST_DEBUG_OBJECT (queue,
+                  "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
+                  G_GUINT64_FORMAT, range->offset, range->writing_pos);
+              /* remove range */
+              range_to_destroy = range;
+              if (prev)
+                prev->next = range->next;
+            } else {
+              GST_DEBUG_OBJECT (queue,
+                  "advancing offsets from %" G_GUINT64_FORMAT " (%"
+                  G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
+                  G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
+                  range->offset + new_writing_pos - range_data_start,
+                  new_writing_pos);
+              range->offset += (new_writing_pos - range_data_start);
+              range->rb_offset = new_writing_pos;
+            }
+          }
+        } else {
+          guint64 new_wpos_virt = writing_pos + to_write;
 
-      if (range_data_end > range_data_start) {
-        if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
-          goto next_range;
+          if (new_wpos_virt <= range_data_start)
+            goto next_range;
 
-        if (new_writing_pos > range_data_start) {
-          if (new_writing_pos >= range_data_end) {
+          if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
             GST_DEBUG_OBJECT (queue,
                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
@@ -1575,47 +1542,28 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
                 range->offset + new_writing_pos - range_data_start,
                 new_writing_pos);
-            range->offset += (new_writing_pos - range_data_start);
+            range->offset += (new_wpos_virt - range_data_start);
             range->rb_offset = new_writing_pos;
           }
         }
-      } else {
-        guint64 new_wpos_virt = writing_pos + to_write;
 
-        if (new_wpos_virt <= range_data_start)
-          goto next_range;
+      next_range:
+        if (!range_to_destroy)
+          prev = range;
 
-        if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
-          GST_DEBUG_OBJECT (queue,
-              "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
-              G_GUINT64_FORMAT, range->offset, range->writing_pos);
-          /* remove range */
-          range_to_destroy = range;
-          if (prev)
-            prev->next = range->next;
-        } else {
-          GST_DEBUG_OBJECT (queue,
-              "advancing offsets from %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT
-              ") to %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT ")",
-              range->offset, range->rb_offset,
-              range->offset + new_writing_pos - range_data_start,
-              new_writing_pos);
-          range->offset += (new_wpos_virt - range_data_start);
-          range->rb_offset = new_writing_pos;
+        range = range->next;
+        if (range_to_destroy) {
+          if (range_to_destroy == queue->ranges)
+            queue->ranges = range;
+          g_slice_free (GstQueue2Range, range_to_destroy);
+          range_to_destroy = NULL;
         }
       }
+      debug_ranges (queue);
 
-    next_range:
-      if (!range_to_destroy)
-        prev = range;
-
-      range = range->next;
-      if (range_to_destroy) {
-        if (range_to_destroy == queue->ranges)
-          queue->ranges = range;
-        g_slice_free (GstQueue2Range, range_to_destroy);
-        range_to_destroy = NULL;
-      }
+    } else {
+      space = to_write = size;
+      new_writing_pos = writing_pos + to_write;
     }
 
     if (FSEEK_FILE (queue->temp_file, writing_pos))
@@ -1623,9 +1571,32 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
 
     if (new_writing_pos > writing_pos) {
       GST_INFO_OBJECT (queue, "writing %u bytes", to_write);
-      /* no wrapping, just write */
+      /* either not using ring buffer or no wrapping, just write */
       if (fwrite (data, to_write, 1, queue->temp_file) != 1)
         goto handle_error;
+
+      if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
+        /* try to merge with next range */
+        while ((next = queue->current->next)) {
+          GST_INFO_OBJECT (queue,
+              "checking merge with next range %" G_GUINT64_FORMAT " < %"
+              G_GUINT64_FORMAT, new_writing_pos, next->offset);
+          if (new_writing_pos < next->offset)
+            break;
+
+          GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
+              next->writing_pos);
+          /* we will run over the offset of the next group */
+          queue->current->writing_pos = new_writing_pos = next->writing_pos;
+
+          /* remove the group */
+          queue->current->next = next->next;
+          g_slice_free (GstQueue2Range, next);
+
+          debug_ranges (queue);
+        }
+        goto update_and_signal;
+      }
     } else {
       /* wrapping */
       guint block_one, block_two;
@@ -1650,17 +1621,22 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
       }
     }
 
+  update_and_signal:
     /* update the writing positions */
     size -= to_write;
-    data += to_write;
-    queue->current->writing_pos += to_write;
-    queue->current->rb_writing_pos = writing_pos = new_writing_pos;
-    update_cur_level (queue, queue->current);
-
     GST_INFO_OBJECT (queue,
         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
         to_write, writing_pos, size);
 
+    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+      data += to_write;
+      queue->current->writing_pos += to_write;
+      queue->current->rb_writing_pos = writing_pos = new_writing_pos;
+    } else {
+      queue->current->writing_pos = writing_pos = new_writing_pos;
+    }
+    update_cur_level (queue, queue->current);
+
     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
 
@@ -1721,13 +1697,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
     /* update the byterate stats */
     update_in_rates (queue);
 
-    /* FIXME - check return values? */
-    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-      gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
-    } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
-      gst_queue2_write_buffer_to_file (queue, buffer);
-    }
-
+    /* FIXME - check return value? */
+    gst_queue2_write_buffer_to_file (queue, buffer);
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event;