gst/playback/gstqueue2.c: Add support for filebased buffering. Fixes #441264.
authorThiago Sousa Santos <thiagossantos@gmail.com>
Tue, 5 Jun 2007 16:14:23 +0000 (16:14 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 5 Jun 2007 16:14:23 +0000 (16:14 +0000)
Original commit message from CVS:
Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
* gst/playback/gstqueue2.c: (gst_queue_class_init),
(gst_queue_init), (gst_queue_finalize),
(gst_queue_write_buffer_to_file), (gst_queue_have_data),
(gst_queue_create_read), (gst_queue_read_item_from_file),
(gst_queue_open_temp_location_file),
(gst_queue_close_temp_location_file), (gst_queue_locked_flush),
(gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
(gst_queue_is_empty), (gst_queue_is_filled),
(gst_queue_change_state), (gst_queue_set_temp_location),
(gst_queue_set_property):
Add support for filebased buffering. Fixes #441264.

ChangeLog
gst/playback/gstqueue2.c

index 139a9e8..5808ffb 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,21 @@
 2007-06-05  Wim Taymans  <wim@fluendo.com>
 
+       Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
+
+       * gst/playback/gstqueue2.c: (gst_queue_class_init),
+       (gst_queue_init), (gst_queue_finalize),
+       (gst_queue_write_buffer_to_file), (gst_queue_have_data),
+       (gst_queue_create_read), (gst_queue_read_item_from_file),
+       (gst_queue_open_temp_location_file),
+       (gst_queue_close_temp_location_file), (gst_queue_locked_flush),
+       (gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
+       (gst_queue_is_empty), (gst_queue_is_filled),
+       (gst_queue_change_state), (gst_queue_set_temp_location),
+       (gst_queue_set_property):
+       Add support for filebased buffering. Fixes #441264.
+
+2007-06-05  Wim Taymans  <wim@fluendo.com>
+
        * gst/playback/gstdecodebin2.c: (gst_decode_bin_factory_filter),
        (analyze_new_pad), (connect_pad), (expose_pad), (caps_notify_cb),
        (caps_notify_group_cb), (gst_decode_group_new),
index 23663f1..10949ee 100644 (file)
@@ -1,9 +1,8 @@
 /* GStreamer
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
- *                    2000 Wim Taymans <wtay@chello.be>
  *                    2003 Colin Walters <cwalters@gnome.org>
- *                    2005 Wim Taymans <wim@fluendo.com>
- *                    2007 Wim Taymans <wim@fluendo.com>
+ *                    2000,2005,2007 Wim Taymans <wim@fluendo.com>
+ *                    2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
  *
  * gstqueue2.c:
  *
  *
  * The default queue size limits are 100 buffers, 2MB of data, or
  * two seconds worth of data, whichever is reached first.
+ * 
+ * If you set temp-location, the element will buffer data on the file
+ * specified by it. By using this, it will buffer the entire 
+ * stream data on the file independently of the queue size limits, they
+ * will only be used for buffering statistics.
  *
  */
+
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 
+#include <glib/gstdio.h>
+
 #include <gst/gst.h>
 #include <gst/gst-i18n-plugin.h>
 
@@ -83,6 +90,10 @@ enum
 #define DEFAULT_LOW_PERCENT        10
 #define DEFAULT_HIGH_PERCENT       99
 
+/* other defines */
+#define DEFAULT_BUFFER_SIZE 4096
+#define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL)
+
 enum
 {
   PROP_0,
@@ -176,6 +187,13 @@ struct _GstQueue
 
   /* temp location stuff */
   gchar *temp_location;
+  FILE *temp_file;
+  guint64 writing_pos;
+  guint64 reading_pos;
+  /* we need this to send the first new segment event of the stream
+   * because we can't save it on the file */
+  gboolean segment_event_received;
+  GstEvent *starting_segment;
 };
 
 struct _GstQueueClass
@@ -195,7 +213,9 @@ struct _GstQueueClass
                       queue->max_level.bytes, \
                       queue->cur_level.time, \
                       queue->max_level.time, \
-                      queue->queue->length)
+                      QUEUE_IS_USING_TEMP_FILE(queue) ? \
+                        queue->writing_pos - queue->reading_pos : \
+                        queue->queue->length)
 
 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
   g_mutex_lock (q->qlock);                                              \
@@ -371,7 +391,7 @@ gst_queue_class_init (GstQueueClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
       g_param_spec_string ("temp-location", "Temporary File Location",
-          "Location of a temporary file to store data in (unused)",
+          "Location of a temporary file to store data in",
           NULL, G_PARAM_READWRITE));
 
   gst_element_class_add_pad_template (gstelement_class,
@@ -443,6 +463,10 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
   queue->item_del = g_cond_new ();
   queue->queue = g_queue_new ();
 
+  /* tempfile related */
+  queue->temp_location = NULL;
+  queue->temp_file = NULL;
+
   GST_DEBUG_OBJECT (queue,
       "initialized queue's not_empty & not_full conditions");
 }
@@ -460,12 +484,17 @@ gst_queue_finalize (GObject * object)
 
     gst_mini_object_unref (data);
   }
+
   g_queue_free (queue->queue);
   g_mutex_free (queue->qlock);
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
   g_timer_destroy (queue->timer);
 
+  /* temp_file path cleanup  */
+  if (queue->temp_location != NULL)
+    g_free (queue->temp_location);
+
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -720,19 +749,208 @@ update_rates (GstQueue * queue)
 }
 
 static void
+gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer)
+{
+  guint size;
+  guint8 *data;
+
+  fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
+
+  data = GST_BUFFER_DATA (buffer);
+  size = GST_BUFFER_SIZE (buffer);
+
+  fwrite (data, 1, size, queue->temp_file);
+  queue->writing_pos += size;
+}
+
+/* see if there is enough data in the file to read a full buffer */
+static gboolean
+gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
+{
+  GST_DEBUG_OBJECT (queue,
+      "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
+      length, queue->writing_pos);
+  if (queue->is_eos)
+    return TRUE;
+
+  if (offset + length < queue->writing_pos)
+    return TRUE;
+
+  return FALSE;
+}
+
+static GstFlowReturn
+gst_queue_create_read (GstQueue * queue, guint64 offset, guint length,
+    GstBuffer ** buffer)
+{
+  size_t res;
+  GstBuffer *buf;
+  off_t sres;
+
+  /* check if we have enough data at @offset. If there is not enough data, we
+   * block and wait. */
+  while (!gst_queue_have_data (queue, offset, length)) {
+    GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+  }
+
+  sres = fseeko (queue->temp_file, offset, SEEK_SET);
+  if (G_UNLIKELY (sres < 0))
+    goto seek_failed;
+
+  buf = gst_buffer_new_and_alloc (length);
+
+  /* this should not block */
+  GST_LOG_OBJECT (queue, "Reading %d bytes", length);
+  res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
+  GST_LOG_OBJECT (queue, "read %d bytes", res);
+
+  if (G_UNLIKELY (res == 0)) {
+    /* check for errors or EOF */
+    if (ferror (queue->temp_file))
+      goto could_not_read;
+    if (feof (queue->temp_file) && length > 0)
+      goto eos;
+  }
+
+  length = res;
+
+  GST_BUFFER_SIZE (buf) = length;
+  GST_BUFFER_OFFSET (buf) = offset;
+  GST_BUFFER_OFFSET_END (buf) = offset + length;
+
+  *buffer = buf;
+
+  queue->reading_pos = offset + length;
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (queue, "we are flushing");
+    return GST_FLOW_WRONG_STATE;
+  }
+seek_failed:
+  {
+    GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return GST_FLOW_ERROR;
+  }
+could_not_read:
+  {
+    GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    gst_buffer_unref (buf);
+    return GST_FLOW_ERROR;
+  }
+eos:
+  {
+    GST_DEBUG ("non-regular file hits EOS");
+    gst_buffer_unref (buf);
+    return GST_FLOW_UNEXPECTED;
+  }
+}
+
+static GstMiniObject *
+gst_queue_read_item_from_file (GstQueue * queue)
+{
+  GstMiniObject *item;
+
+  if (queue->starting_segment != NULL) {
+    item = GST_MINI_OBJECT_CAST (queue->starting_segment);
+    queue->starting_segment = NULL;
+  } else {
+    GstFlowReturn ret;
+    GstBuffer *buffer;
+
+    ret =
+        gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
+        &buffer);
+    switch (ret) {
+      case GST_FLOW_OK:
+        item = GST_MINI_OBJECT_CAST (buffer);
+        break;
+      case GST_FLOW_UNEXPECTED:
+        item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
+        break;
+      default:
+        item = NULL;
+        break;
+    }
+  }
+  return item;
+}
+
+static gboolean
+gst_queue_open_temp_location_file (GstQueue * queue)
+{
+  /* nothing to do */
+  if (queue->temp_location == NULL)
+    goto no_filename;
+
+  /* open the file for update/writing */
+  queue->temp_file = g_fopen (queue->temp_location, "wb+");
+  /* error creating file */
+  if (queue->temp_file == NULL)
+    goto open_failed;
+
+  queue->writing_pos = 0;
+  queue->reading_pos = 0;
+
+  return TRUE;
+
+  /* ERRORS */
+no_filename:
+  {
+    GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
+        (_("No file name specified.")), (NULL));
+    return FALSE;
+  }
+open_failed:
+  {
+    GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
+        (_("Could not open file \"%s\" for reading."), queue->temp_location),
+        GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+}
+
+static void
+gst_queue_close_temp_location_file (GstQueue * queue)
+{
+  /* nothing to do */
+  if (queue->temp_file == NULL)
+    return;
+
+  /* we don't remove the file so that the application can use it as a cache
+   * later on */
+  fflush (queue->temp_file);
+  fclose (queue->temp_file);
+  remove (queue->temp_location);
+  queue->temp_file = NULL;
+}
+
+static void
 gst_queue_locked_flush (GstQueue * queue)
 {
-  while (!g_queue_is_empty (queue->queue)) {
-    GstMiniObject *data = g_queue_pop_head (queue->queue);
+  if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    gst_queue_close_temp_location_file (queue);
+    gst_queue_open_temp_location_file (queue);
+  } else {
+    while (!g_queue_is_empty (queue->queue)) {
+      GstMiniObject *data = g_queue_pop_head (queue->queue);
 
-    /* Then lose another reference because we are supposed to destroy that
-       data when flushing */
-    gst_mini_object_unref (data);
+      /* Then lose another reference because we are supposed to destroy that
+         data when flushing */
+      gst_mini_object_unref (data);
+    }
   }
   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
   queue->is_eos = FALSE;
+  if (queue->starting_segment != NULL)
+    gst_event_unref (queue->starting_segment);
+  queue->starting_segment = NULL;
+  queue->segment_event_received = FALSE;
 
   /* we deleted a lot of something */
   GST_QUEUE_SIGNAL_DEL (queue);
@@ -760,6 +978,10 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
     /* update the buffering status */
     update_buffering (queue);
 
+    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+      gst_queue_write_buffer_to_file (queue, buffer);
+    }
+
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event;
 
@@ -773,8 +995,19 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
         break;
       case GST_EVENT_NEWSEGMENT:
         apply_segment (queue, event, &queue->sink_segment);
+        /* This is our first new segment, we hold it
+         * as we can't save it on the temp file */
+        if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+          if (queue->segment_event_received)
+            goto unexpected_event;
+
+          queue->segment_event_received = TRUE;
+          queue->starting_segment = event;
+        }
         break;
       default:
+        if (QUEUE_IS_USING_TEMP_FILE (queue))
+          goto unexpected_event;
         break;
     }
   } else {
@@ -784,9 +1017,22 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
     item = NULL;
   }
 
-  if (item)
+  if (!QUEUE_IS_USING_TEMP_FILE (queue) && item)
     g_queue_push_tail (queue->queue, item);
   GST_QUEUE_SIGNAL_ADD (queue);
+
+  return;
+
+  /* ERRORS */
+unexpected_event:
+  {
+    g_warning
+        ("Unexpected event of kind %s can't be added in temp file of queue %s ",
+        gst_event_type_get_name (GST_EVENT_TYPE (item)),
+        GST_OBJECT_NAME (queue));
+    gst_event_unref (GST_EVENT_CAST (item));
+    return;
+  }
 }
 
 /* dequeue an item from the queue and update level stats */
@@ -795,7 +1041,11 @@ gst_queue_locked_dequeue (GstQueue * queue)
 {
   GstMiniObject *item;
 
-  item = g_queue_pop_head (queue->queue);
+  if (QUEUE_IS_USING_TEMP_FILE (queue))
+    item = gst_queue_read_item_from_file (queue);
+  else
+    item = g_queue_pop_head (queue->queue);
+
   if (item == NULL)
     goto no_item;
 
@@ -927,8 +1177,12 @@ gst_queue_is_empty (GstQueue * queue)
   if (queue->is_eos)
     return FALSE;
 
-  if (queue->queue->length == 0)
-    return TRUE;
+  if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    return queue->writing_pos == queue->reading_pos;
+  } else {
+    if (queue->queue->length == 0)
+      return TRUE;
+  }
 
   return FALSE;
 }
@@ -942,6 +1196,10 @@ gst_queue_is_filled (GstQueue * queue)
   if (queue->is_eos)
     return TRUE;
 
+  /* if using file, we're never filled if we don't have EOS */
+  if (QUEUE_IS_USING_TEMP_FILE (queue))
+    return FALSE;
+
 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
                (queue->cur_level.format) >= (queue->max_level.format))
 
@@ -1221,6 +1479,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+        if (!gst_queue_open_temp_location_file (queue))
+          ret = GST_STATE_CHANGE_FAILURE;
+      }
+      queue->segment_event_received = FALSE;
+      queue->starting_segment = NULL;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       break;
@@ -1234,6 +1498,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      if (QUEUE_IS_USING_TEMP_FILE (queue))
+        gst_queue_close_temp_location_file (queue);
+      if (queue->starting_segment != NULL) {
+        gst_event_unref (queue->starting_segment);
+        queue->starting_segment = NULL;
+      }
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       break;
@@ -1257,6 +1527,35 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
 #define QUEUE_THRESHOLD_CHANGE(q)\
   g_cond_signal (queue->item_add);
 
+static gboolean
+gst_queue_set_temp_location (GstQueue * queue, const gchar * location)
+{
+  GstState state;
+
+  /* the element must be stopped in order to do this */
+  GST_OBJECT_LOCK (queue);
+  state = GST_STATE (queue);
+  if (state != GST_STATE_READY && state != GST_STATE_NULL)
+    goto wrong_state;
+  GST_OBJECT_UNLOCK (queue);
+
+  /* set new location */
+  g_free (queue->temp_location);
+  queue->temp_location = g_strdup (location);
+
+  g_object_notify (G_OBJECT (queue), "temp-location");
+
+  return TRUE;
+
+/* ERROR */
+wrong_state:
+  {
+    GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state");
+    GST_OBJECT_UNLOCK (queue);
+    return FALSE;
+  }
+}
+
 static void
 gst_queue_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec)
@@ -1296,7 +1595,7 @@ gst_queue_set_property (GObject * object,
       queue->high_percent = g_value_get_int (value);
       break;
     case PROP_TEMP_LOCATION:
-      queue->temp_location = g_value_dup_string (value);
+      gst_queue_set_temp_location (queue, g_value_dup_string (value));
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);