queue2: ring buffer work in progress
authorRobert Swain <robert.swain@collabora.co.uk>
Fri, 7 May 2010 07:30:44 +0000 (09:30 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 13 Aug 2010 14:38:51 +0000 (16:38 +0200)
plugins/elements/gstqueue2.c
plugins/elements/gstqueue2.h

index eedcbff..d1c9e30 100644 (file)
@@ -93,6 +93,11 @@ enum
   LAST_SIGNAL
 };
 
+/* other defines */
+#define DEFAULT_BUFFER_SIZE 4096
+#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
+#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer)    /* for consistency with the above macro */
+
 /* default property values */
 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
@@ -103,11 +108,7 @@ enum
 #define DEFAULT_HIGH_PERCENT       99
 #define DEFAULT_TEMP_REMOVE        TRUE
 #define DEFAULT_USE_RING_BUFFER    FALSE
-#define DEFAULT_RING_BUFFER_MAX_SIZE (16 * 1024 * 1024) /* 16 MB */
-
-/* other defines */
-#define DEFAULT_BUFFER_SIZE 4096
-#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
+#define DEFAULT_RING_BUFFER_MAX_SIZE (1024 * DEFAULT_BUFFER_SIZE)       /* 4 MB */
 
 enum
 {
@@ -376,7 +377,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
       g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)",
           "Max. amount of data in the ring buffer (bytes, 0=unlimited)",
-          0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
+          DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* set several parent class virtual functions */
@@ -458,6 +459,8 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
   queue->temp_location_set = FALSE;
   queue->temp_remove = DEFAULT_TEMP_REMOVE;
 
+  queue->use_ring_buffer = DEFAULT_USE_RING_BUFFER;
+  queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
   GST_DEBUG_OBJECT (queue,
       "initialized queue's not_empty & not_full conditions");
 }
@@ -549,8 +552,12 @@ add_range (GstQueue2 * queue, guint64 offset)
 
     range = g_slice_new0 (GstQueue2Range);
     range->offset = offset;
+    /* we want to write to the next location in the ring buffer */
+    range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
     range->writing_pos = offset;
+    range->rb_writing_pos = range->rb_offset;
     range->reading_pos = offset;
+    range->rb_reading_pos = range->rb_offset;
     range->max_reading_pos = offset;
 
     /* insert sorted */
@@ -792,7 +799,7 @@ update_buffering (GstQueue2 * queue)
 
     queue->buffering_percent = percent;
 
-    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
       GstFormat fmt = GST_FORMAT_BYTES;
       gint64 duration;
 
@@ -941,6 +948,14 @@ update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
     queue->cur_level.bytes = 0;
 }
 
+#ifdef HAVE_FSEEKO
+#define FSEEK_FILE(file, offset)  (fseeko (file, (off_t) offset, SEEK_SET))
+#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
+#define FSEEK_FILE(file, offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET))
+#else
+#define FSEEK_FILE(file, offset)  (fseek (file, offset, SEEK_SET))
+#endif
+
 static gboolean
 gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
 {
@@ -952,13 +967,7 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
   writing_pos = queue->current->writing_pos;
   max_reading_pos = queue->current->max_reading_pos;
 
-#ifdef HAVE_FSEEKO
-  fseeko (queue->temp_file, (off_t) writing_pos, SEEK_SET);
-#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
-  lseek (fileno (queue->temp_file), (off_t) writing_pos, SEEK_SET);
-#else
-  fseek (queue->temp_file, writing_pos, SEEK_SET);
-#endif
+  FSEEK_FILE (queue->temp_file, writing_pos);
 
   data = GST_BUFFER_DATA (buffer);
   size = GST_BUFFER_SIZE (buffer);
@@ -1073,6 +1082,10 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
     /* update the current reading position in the range */
     update_cur_pos (queue, queue->current, offset + length);
 
+    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
+        queue->cur_level.bytes, MIN (queue->max_level.bytes,
+            queue->ring_buffer_max_size));
+
     /* we have a range for offset */
     GST_DEBUG_OBJECT (queue,
         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
@@ -1104,17 +1117,10 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
 }
 
 static GstFlowReturn
-gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
-    GstBuffer ** buffer)
+gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
+    guint8 * dst)
 {
   size_t res;
-  GstBuffer *buf;
-
-  /* check if we have enough data at @offset. If there is not enough data, we
-   * block and wait. */
-  while (!gst_queue2_have_data (queue, offset, length)) {
-    GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
-  }
 
 #ifdef HAVE_FSEEKO
   if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
@@ -1128,14 +1134,13 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     goto seek_failed;
 #endif
 
-  buf = gst_buffer_new_and_alloc (length);
-
   /* this should not block */
-  GST_LOG_OBJECT (queue, "Reading %d bytes", length);
-  res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
+  GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
+      length, offset);
+  res = fread (dst, 1, length, queue->temp_file);
   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
 
-  if (G_UNLIKELY (res == 0)) {
+  if (G_UNLIKELY (res < length)) {
     /* check for errors or EOF */
     if (ferror (queue->temp_file))
       goto could_not_read;
@@ -1143,22 +1148,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
       goto eos;
   }
 
-  length = res;
-
-  GST_BUFFER_SIZE (buf) = length;
-  GST_BUFFER_OFFSET (buf) = offset;
-  GST_BUFFER_OFFSET_END (buf) = offset + length;
-
-  *buffer = buf;
-
   return GST_FLOW_OK;
 
-  /* ERRORS */
-out_flushing:
-  {
-    GST_DEBUG_OBJECT (queue, "we are flushing");
-    return GST_FLOW_WRONG_STATE;
-  }
 seek_failed:
   {
     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
@@ -1167,17 +1158,84 @@ seek_failed:
 could_not_read:
   {
     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_buffer_unref (buf);
     return GST_FLOW_ERROR;
   }
 eos:
   {
     GST_DEBUG ("non-regular file hits EOS");
-    gst_buffer_unref (buf);
     return GST_FLOW_UNEXPECTED;
   }
 }
 
+static GstFlowReturn
+gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
+    GstBuffer ** buffer)
+{
+  GstFlowReturn flow_ret;
+  GstBuffer *buf;
+  guint8 *data;
+  guint64 file_offset;
+  guint block_length;
+
+  /* check if we have enough data at @offset. If there is not enough data, we
+   * block and wait. */
+  while (!gst_queue2_have_data (queue, offset, length)) {
+    GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
+  }
+
+  if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+    file_offset =
+        (queue->current->rb_offset + (offset -
+            queue->current->offset)) % queue->ring_buffer_max_size;
+    if (file_offset + length > queue->ring_buffer_max_size) {
+      block_length = queue->ring_buffer_max_size - file_offset;
+    } else {
+      block_length = length;
+    }
+  } else {
+    file_offset = offset;
+    block_length = length;
+  }
+
+  buf = gst_buffer_new_and_alloc (length);
+  data = GST_BUFFER_DATA (buf);
+
+  if ((flow_ret =
+          gst_queue2_read_data_at_offset (queue, file_offset, block_length,
+              data)) != GST_FLOW_OK) {
+    gst_buffer_unref (buf);
+    return flow_ret;
+  }
+
+  if (block_length < length) {
+    /* read second block into a second buffer, then merge the two */
+    data += block_length;
+    block_length = length - block_length;
+
+    if ((flow_ret =
+            gst_queue2_read_data_at_offset (queue, 0, block_length,
+                data)) != GST_FLOW_OK) {
+      gst_buffer_unref (buf);
+      return flow_ret;
+    }
+  }
+
+  GST_BUFFER_SIZE (buf) = length;
+  GST_BUFFER_OFFSET (buf) = offset;
+  GST_BUFFER_OFFSET_END (buf) = offset + length;
+
+  *buffer = buf;
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (queue, "we are flushing");
+    return GST_FLOW_WRONG_STATE;
+  }
+}
+
 /* should be called with QUEUE_LOCK */
 static GstMiniObject *
 gst_queue2_read_item_from_file (GstQueue2 * queue)
@@ -1200,6 +1258,11 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
     switch (ret) {
       case GST_FLOW_OK:
         item = GST_MINI_OBJECT_CAST (buffer);
+        queue->current->reading_pos += DEFAULT_BUFFER_SIZE;
+        if (QUEUE_IS_USING_RING_BUFFER (queue))
+          queue->current->rb_reading_pos =
+              (queue->current->rb_reading_pos +
+              DEFAULT_BUFFER_SIZE) % queue->ring_buffer_max_size;
         break;
       case GST_FLOW_UNEXPECTED:
         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
@@ -1329,7 +1392,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
 static void
 gst_queue2_locked_flush (GstQueue2 * queue)
 {
-  if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+  if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
     gst_queue2_flush_temp_file (queue);
   } else {
     while (!g_queue_is_empty (queue->queue)) {
@@ -1352,6 +1415,205 @@ gst_queue2_locked_flush (GstQueue2 * queue)
   GST_QUEUE2_SIGNAL_DEL (queue);
 }
 
+static gboolean
+gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
+{
+  GstBuffer *buf, *rem;
+  guint buf_size, rem_size;
+  const guint rb_size = queue->ring_buffer_max_size;
+  guint8 *data;
+  guint64 writing_pos, reading_pos, new_writing_pos;
+  gint64 space;
+  GstQueue2Range *range, *prev;
+
+  writing_pos = queue->current->rb_writing_pos;
+  reading_pos = queue->current->rb_reading_pos;
+
+  rem = buffer;
+
+  /* loop if we can't write the whole buffer at once */
+  do {
+    /* calculate the space in the ring buffer not used by data from the
+     * current range */
+    space =
+        MIN (queue->max_level.bytes,
+        queue->ring_buffer_max_size) - queue->cur_level.bytes;
+
+    rem_size = GST_BUFFER_SIZE (rem);
+    /* don't try to process 0 size buffers */
+    if (!rem_size)
+      break;
+
+    /* calculate if we need to split or if we can write the entire buffer now */
+    if (rem_size > space) {
+      buf_size = space;
+      buf = gst_buffer_create_sub (rem, 0, space);
+
+      rem_size -= space;
+      rem = gst_buffer_create_sub (rem, space, rem_size);
+      space = 0;
+    } else {
+      buf_size = rem_size;
+      buf = rem;
+
+      rem_size = 0;
+      rem = NULL;
+      space -= buf_size;
+    }
+
+    data = GST_BUFFER_DATA (buf);
+
+    /* the writing position in the ring buffer after writing (part or all of)
+     * the buffer */
+    new_writing_pos = (writing_pos + buf_size) % rb_size;
+
+    prev = NULL;
+    range = queue->ranges;
+
+    /* if we need to overwrite data in the ring buffer, we need to update the
+     * ranges
+     * warning: this code is complicated and includes some simplifications -
+     * pen, paper and diagrams for the cases recommended! */
+    while (range) {
+      guint64 range_data_start, range_data_end;
+      GstQueue2Range *range_to_destroy = NULL;
+
+      /* we don't edit the current range here */
+      if (range == queue->current)
+        goto next_range;
+
+      range_data_start = range->rb_offset;
+      range_data_end = range->rb_writing_pos;
+
+      if (range_data_end > range_data_start) {
+        if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
+          goto next_range;
+
+        if (new_writing_pos > range_data_start) {
+          if (new_writing_pos >= range_data_end) {
+            /* remove range */
+            range_to_destroy = range;
+            if (prev)
+              prev->next = range->next;
+          } else {
+            range->offset += (new_writing_pos - range_data_start);
+            range->rb_offset = new_writing_pos;
+          }
+        }
+      } else {
+        guint64 new_wpos_virt = writing_pos + buf_size;
+
+        if (new_wpos_virt <= range_data_start)
+          goto next_range;
+
+        if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
+          /* remove range */
+          range_to_destroy = range;
+          if (prev)
+            prev->next = range->next;
+        } else {
+          range->offset += (new_wpos_virt - range_data_start);
+          range->rb_offset = new_writing_pos;
+        }
+      }
+
+    next_range:
+      if (!range_to_destroy)
+        prev = range;
+
+      range = range->next;
+      if (range_to_destroy) {
+        if (range_to_destroy == queue->ranges)
+          queue->ranges = range;
+        g_slice_free1 (sizeof (GstQueue2Range), range_to_destroy);
+        range_to_destroy = NULL;
+      }
+    }
+
+    FSEEK_FILE (queue->temp_file, writing_pos);
+
+    if (new_writing_pos > writing_pos) {
+      /* no wrapping, just write */
+      if (fwrite (data, buf_size, 1, queue->temp_file) != 1)
+        goto handle_error;
+    } else {
+      /* wrapping */
+      guint block_one, block_two;
+
+      block_one = rb_size - writing_pos;
+      block_two = buf_size - block_one;
+
+      /* write data to end of ring buffer */
+      if (fwrite (data, block_one, 1, queue->temp_file) != 1)
+        goto handle_error;
+
+      FSEEK_FILE (queue->temp_file, 0);
+
+      data += block_one;
+      if (fwrite (data, block_two, 1, queue->temp_file) != 1)
+        goto handle_error;
+    }
+
+    /* update the writing positions */
+    GST_INFO_OBJECT (queue, "wrote %u bytes to %" G_GUINT64_FORMAT, buf_size,
+        writing_pos);
+    queue->current->writing_pos += buf_size;
+    queue->current->rb_writing_pos = writing_pos = new_writing_pos;
+
+    update_cur_level (queue, queue->current);
+    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
+        queue->cur_level.bytes, MIN (queue->max_level.bytes,
+            queue->ring_buffer_max_size));
+
+    /* if we have a remainder of the buffer data, wait until there's space to
+     * write before looping */
+    if (rem_size) {
+      gboolean started;
+
+      /* pause the timer while we wait. The fact that we are waiting does not mean
+       * the byterate on the input pad is lower */
+      if ((started = queue->in_timer_started))
+        g_timer_stop (queue->in_timer);
+
+      GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+          "queue is full, waiting for free space");
+      while (gst_queue2_is_filled (queue)) {
+        /* Wait for space to be available, we could be unlocked because of a flush. */
+        GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
+      }
+
+      /* and continue if we were running before */
+      if (started)
+        g_timer_continue (queue->in_timer);
+    }
+  } while (rem_size);
+
+  return TRUE;
+
+  /* ERRORS */
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (queue, "we are flushing");
+    /* FIXME - GST_FLOW_UNEXPECTED ? */
+    return FALSE;
+  }
+handle_error:
+  {
+    switch (errno) {
+      case ENOSPC:{
+        GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
+        break;
+      }
+      default:{
+        GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
+            (_("Error while writing to download file.")),
+            ("%s", g_strerror (errno)));
+      }
+    }
+    return FALSE;
+  }
+}
+
 /* enqueue an item an update the level stats */
 static void
 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
@@ -1364,7 +1626,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
     size = GST_BUFFER_SIZE (buffer);
 
     /* add buffer to the statistics */
-    if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (!(QUEUE_IS_USING_TEMP_FILE (queue)
+            || QUEUE_IS_USING_RING_BUFFER (queue))) {
       queue->cur_level.buffers++;
       queue->cur_level.bytes += size;
     }
@@ -1375,7 +1638,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
     /* update the byterate stats */
     update_in_rates (queue);
 
-    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    /* FIXME - check return values? */
+    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+      gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
+    } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
       gst_queue2_write_buffer_to_file (queue, buffer);
     }
 
@@ -1395,7 +1661,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
         apply_segment (queue, event, &queue->sink_segment);
         /* This is our first new segment, we hold it
          * as we can't save it on the temp file */
-        if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+        if (QUEUE_IS_USING_RING_BUFFER (queue)
+            || QUEUE_IS_USING_TEMP_FILE (queue)) {
           if (queue->segment_event_received)
             goto unexpected_event;
 
@@ -1410,7 +1677,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
         queue->unexpected = FALSE;
         break;
       default:
-        if (QUEUE_IS_USING_TEMP_FILE (queue))
+        if (QUEUE_IS_USING_RING_BUFFER (queue)
+            || QUEUE_IS_USING_TEMP_FILE (queue))
           goto unexpected_event;
         break;
     }
@@ -1425,7 +1693,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
     /* update the buffering status */
     update_buffering (queue);
 
-    if (!QUEUE_IS_USING_TEMP_FILE (queue))
+    if (!(QUEUE_IS_USING_TEMP_FILE (queue)
+            || QUEUE_IS_USING_RING_BUFFER (queue)))
       g_queue_push_tail (queue->queue, item);
     else
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
@@ -1453,7 +1722,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
 {
   GstMiniObject *item;
 
-  if (QUEUE_IS_USING_TEMP_FILE (queue))
+  if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue))
     item = gst_queue2_read_item_from_file (queue);
   else
     item = g_queue_pop_head (queue->queue);
@@ -1471,7 +1740,8 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved buffer %p from queue", buffer);
 
-    if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (!(QUEUE_IS_USING_TEMP_FILE (queue)
+            || QUEUE_IS_USING_RING_BUFFER (queue))) {
       queue->cur_level.buffers--;
       queue->cur_level.bytes -= size;
     }
@@ -1529,7 +1799,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_FLUSH_START:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
-      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
+              || QUEUE_IS_USING_TEMP_FILE (queue))) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -1553,7 +1824,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
 
-      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
+              || QUEUE_IS_USING_TEMP_FILE (queue))) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -1619,8 +1891,8 @@ gst_queue2_is_empty (GstQueue2 * queue)
   if (queue->is_eos)
     return FALSE;
 
-  if (QUEUE_IS_USING_TEMP_FILE (queue)) {
-    return queue->current->writing_pos == queue->current->max_reading_pos;
+  if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
+    return queue->current->writing_pos <= queue->current->max_reading_pos;
   } else {
     if (queue->queue->length == 0)
       return TRUE;
@@ -1638,6 +1910,12 @@ gst_queue2_is_filled (GstQueue2 * queue)
   if (queue->is_eos)
     return TRUE;
 
+  /* if using a ring buffer we're filled if all ring buffer space is used
+   * _by the current range_ */
+  if (QUEUE_IS_USING_RING_BUFFER (queue))
+    return queue->cur_level.bytes >= MIN (queue->max_level.bytes,
+        queue->ring_buffer_max_size);
+
   /* if using file, we're never filled if we don't have EOS */
   if (QUEUE_IS_USING_TEMP_FILE (queue))
     return FALSE;
@@ -1922,7 +2200,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
+              || QUEUE_IS_USING_TEMP_FILE (queue))) {
         /* just forward upstream */
         res = gst_pad_push_event (queue->sinkpad, event);
       } else {
@@ -1939,7 +2218,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
       }
       break;
     case GST_EVENT_FLUSH_STOP:
-      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
+              || QUEUE_IS_USING_TEMP_FILE (queue))) {
         /* just forward upstream */
         res = gst_pad_push_event (queue->sinkpad, event);
       } else {
@@ -2028,7 +2308,8 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
 
       GST_DEBUG_OBJECT (queue, "query buffering");
 
-      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
+              || QUEUE_IS_USING_TEMP_FILE (queue))) {
         /* no temp file, just forward to the peer */
         if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
           goto peer_failed;
@@ -2084,8 +2365,9 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
             if (!peer_res)
               goto peer_failed;
 
-            GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
-                G_GINT64_FORMAT, duration, writing_pos);
+            GST_DEBUG_OBJECT (queue,
+                "duration %" G_GINT64_FORMAT ", writing %" G_GINT64_FORMAT,
+                duration, writing_pos);
 
             start = 0;
             /* get our available data relative to the duration */
@@ -2143,12 +2425,18 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
   GstFlowReturn ret;
 
   queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
+  if (length > queue->ring_buffer_max_size) {
+    GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT,
+        (_("Buffer is too large to fit in ring buffer")),
+        ("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size));
+    return GST_FLOW_ERROR;
+  }
 
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
   offset = (offset == -1) ? queue->current->reading_pos : offset;
 
-  /* function will block when the range is not yet available */
+  /* FIXME - function will block when the range is not yet available */
   ret = gst_queue2_create_read (queue, offset, length, buffer);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
@@ -2176,7 +2464,7 @@ gst_queue2_src_checkgetrange_function (GstPad * pad)
   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
 
   /* we can operate in pull mode when we are using a tempfile */
-  ret = QUEUE_IS_USING_TEMP_FILE (queue);
+  ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue);
 
   gst_object_unref (GST_OBJECT (queue));
 
@@ -2264,7 +2552,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
 
   if (active) {
-    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
       /* open the temp file now */
       result = gst_queue2_open_temp_location_file (queue);
 
@@ -2312,7 +2600,8 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+      if (QUEUE_IS_USING_RING_BUFFER (queue)
+          || QUEUE_IS_USING_TEMP_FILE (queue)) {
         if (!gst_queue2_open_temp_location_file (queue))
           ret = GST_STATE_CHANGE_FAILURE;
       }
@@ -2337,7 +2626,8 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      if (QUEUE_IS_USING_TEMP_FILE (queue))
+      if (QUEUE_IS_USING_RING_BUFFER (queue)
+          || QUEUE_IS_USING_TEMP_FILE (queue))
         gst_queue2_close_temp_location_file (queue);
       if (queue->starting_segment != NULL) {
         gst_event_unref (queue->starting_segment);
index 8c8ca3a..21c9c8a 100644 (file)
@@ -60,10 +60,13 @@ struct _GstQueue2Range
 {
   GstQueue2Range *next;
 
-  guint64 offset;
-  guint64 writing_pos;
-  guint64 reading_pos;
-  guint64 max_reading_pos;
+  guint64 offset;          /* offset of range start in source */
+  guint64 rb_offset;       /* offset of range start in ring buffer */
+  guint64 writing_pos;     /* writing position in source */
+  guint64 rb_writing_pos;  /* writing position in ring buffer */
+  guint64 reading_pos;     /* reading position in source */
+  guint64 rb_reading_pos;  /* reading position in ring buffer */
+  guint64 max_reading_pos; /* latest requested offset in source */
 };
 
 struct _GstQueue2
@@ -134,7 +137,7 @@ struct _GstQueue2
   GstEvent *starting_segment;
 
   gboolean use_ring_buffer;
-  guint ring_buffer_max_size;
+  guint64 ring_buffer_max_size;
 };
 
 struct _GstQueue2Class