queue2: implement seeking in download mode
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 23 Mar 2010 18:25:29 +0000 (19:25 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 23 Mar 2010 18:28:34 +0000 (19:28 +0100)
When in download mode and the requested offset is too far away, attempt to do a
seek request to fetch the data.
Keep track of all downloaded parts and merge ranges when needed.

Fixes #600877

plugins/elements/gstqueue2.c
plugins/elements/gstqueue2.h

index f5f715a..532a317 100644 (file)
@@ -145,7 +145,7 @@ enum
                       queue->cur_level.time, \
                       queue->max_level.time, \
                       (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
-                        queue->writing_pos - queue->max_reading_pos : \
+                        queue->current->writing_pos - queue->current->max_reading_pos : \
                         queue->queue->length))
 
 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
@@ -452,6 +452,107 @@ gst_queue2_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+static void
+debug_ranges (GstQueue2 * queue)
+{
+  GstQueue2Range *walk;
+
+  for (walk = queue->ranges; walk; walk = walk->next) {
+    GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT,
+        walk->offset, walk->writing_pos);
+  }
+}
+
+/* clear all the downloaded ranges */
+static void
+clean_ranges (GstQueue2 * queue)
+{
+  GST_DEBUG_OBJECT (queue, "clean queue ranges");
+
+  g_slice_free_chain (GstQueue2Range, queue->ranges, next);
+  queue->ranges = NULL;
+  queue->current = NULL;
+}
+
+/* find a range that contains @offset or NULL when nothing does */
+static GstQueue2Range *
+find_range (GstQueue2 * queue, guint64 offset, guint64 length)
+{
+  GstQueue2Range *range, *walk;
+
+  /* first do a quick check for the current range */
+  for (walk = queue->ranges; walk; walk = walk->next) {
+    if (offset >= walk->offset && offset <= walk->writing_pos) {
+      /* we can reuse an existing range */
+      range = walk;
+      break;
+    }
+  }
+  return range;
+}
+
+/* make a new range for @offset or reuse an existing range */
+static GstQueue2Range *
+add_range (GstQueue2 * queue, guint64 offset)
+{
+  GstQueue2Range *range, *prev, *next;
+
+  GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
+
+  if ((range = find_range (queue, offset, 0))) {
+    GST_DEBUG_OBJECT (queue,
+        "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
+        range->writing_pos);
+    range->writing_pos = offset;
+  } else {
+    GST_DEBUG_OBJECT (queue,
+        "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
+
+    range = g_slice_new0 (GstQueue2Range);
+    range->offset = offset;
+    range->writing_pos = offset;
+    range->reading_pos = offset;
+    range->max_reading_pos = offset;
+
+    /* insert sorted */
+    prev = NULL;
+    next = queue->ranges;
+    while (next) {
+      if (next->offset > offset) {
+        /* insert before next */
+        GST_DEBUG_OBJECT (queue,
+            "insert before range %p, offset %" G_GUINT64_FORMAT, next,
+            next->offset);
+        break;
+      }
+      /* try next */
+      prev = next;
+      next = next->next;
+    }
+    range->next = next;
+    if (prev)
+      prev->next = range;
+    else
+      queue->ranges = range;
+  }
+  debug_ranges (queue);
+
+  return range;
+}
+
+
+/* clear and init the download ranges for offset 0 */
+static void
+init_ranges (GstQueue2 * queue)
+{
+  GST_DEBUG_OBJECT (queue, "init queue ranges");
+
+  /* get rid of all the current ranges */
+  clean_ranges (queue);
+  /* make a range for offset 0 */
+  queue->current = add_range (queue, 0);
+}
+
 static gboolean
 gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
 {
@@ -653,7 +754,7 @@ update_buffering (GstQueue2 * queue)
         if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
           buffering_left =
               (gdouble) ((duration -
-                  queue->writing_pos) * 1000) / queue->byte_in_rate;
+                  queue->current->writing_pos) * 1000) / queue->byte_in_rate;
       } else {
         buffering_left = G_MAXINT64;
       }
@@ -785,13 +886,18 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
   guint size;
   guint8 *data;
   int ret;
+  guint64 writing_pos, max_reading_pos;
+  GstQueue2Range *next;
+
+  writing_pos = queue->current->writing_pos;
+  max_reading_pos = queue->current->max_reading_pos;
 
 #ifdef HAVE_FSEEKO
-  fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET);
+  fseeko (queue->temp_file, (off_t) writing_pos, SEEK_SET);
 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
-  lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET);
+  lseek (fileno (queue->temp_file), (off_t) writing_pos, SEEK_SET);
 #else
-  fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
+  fseek (queue->temp_file, writing_pos, SEEK_SET);
 #endif
 
   data = GST_BUFFER_DATA (buffer);
@@ -802,26 +908,87 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
     /* FIXME do something useful here */
     GST_ERROR_OBJECT (queue, "fwrite returned error");
   }
-  queue->writing_pos += size;
+  writing_pos += size;
 
-  if (queue->writing_pos > queue->max_reading_pos)
-    queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
+  if (writing_pos > max_reading_pos)
+    queue->cur_level.bytes = writing_pos - max_reading_pos;
   else
     queue->cur_level.bytes = 0;
+
+  /* try to merge with next range */
+  while ((next = queue->current->next)) {
+    GST_DEBUG_OBJECT (queue,
+        "cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos,
+        next->offset);
+    if (writing_pos < next->offset)
+      break;
+
+    GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
+        next->writing_pos);
+    /* we ran over the offset of the next group */
+    queue->current->writing_pos = writing_pos = next->writing_pos;
+
+    /* remove the group */
+    queue->current->next = next->next;
+    g_slice_free (GstQueue2Range, next);
+
+    debug_ranges (queue);
+  }
+  queue->current->writing_pos = writing_pos;
+}
+
+static gboolean
+perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
+{
+  GstEvent *event;
+  gboolean res;
+
+  GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
+
+  event =
+      gst_event_new_seek (1.0, GST_FORMAT_BYTES,
+      GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
+      GST_SEEK_TYPE_NONE, -1);
+
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+  res = gst_pad_push_event (queue->sinkpad, event);
+  GST_QUEUE2_MUTEX_LOCK (queue);
+
+  if (res) {
+    queue->current = add_range (queue, offset);
+  }
+  return res;
 }
 
 /* see if there is enough data in the file to read a full buffer */
 static gboolean
 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
 {
-  GST_DEBUG_OBJECT (queue,
-      "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
-      length, queue->writing_pos);
-  if (queue->is_eos)
-    return TRUE;
+  GstQueue2Range *range;
 
-  if (offset + length < queue->writing_pos)
-    return TRUE;
+  GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset,
+      length);
+
+  if ((range = find_range (queue, offset, length))) {
+    if (queue->current != range) {
+      GST_DEBUG_OBJECT (queue, "switching ranges");
+      perform_seek_to_offset (queue, range->writing_pos);
+    }
+    /* we have a range for offset */
+    GST_DEBUG_OBJECT (queue,
+        "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
+        G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
+
+    if (queue->is_eos)
+      return TRUE;
+
+    if (offset + length < range->writing_pos)
+      return TRUE;
+
+  } else {
+    /* we don't have the range, see how far away we are */
+    perform_seek_to_offset (queue, offset);
+  }
 
   return FALSE;
 }
@@ -832,6 +999,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 {
   size_t res;
   GstBuffer *buf;
+  guint64 reading_pos, max_reading_pos, writing_pos;
 
   /* check if we have enough data at @offset. If there is not enough data, we
    * block and wait. */
@@ -874,14 +1042,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
   *buffer = buf;
 
-  queue->reading_pos = offset + length;
-  queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos);
+  reading_pos = queue->current->reading_pos;
+  writing_pos = queue->current->writing_pos;
+  max_reading_pos = queue->current->max_reading_pos;
+
+  reading_pos = offset + length;
+  max_reading_pos = MAX (max_reading_pos, reading_pos);
 
-  if (queue->writing_pos > queue->max_reading_pos)
-    queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
+  if (writing_pos > max_reading_pos)
+    queue->cur_level.bytes = writing_pos - max_reading_pos;
   else
     queue->cur_level.bytes = 0;
 
+  queue->current->reading_pos = reading_pos;
+  queue->current->max_reading_pos = max_reading_pos;
+
   return GST_FLOW_OK;
 
   /* ERRORS */
@@ -921,9 +1096,12 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
   } else {
     GstFlowReturn ret;
     GstBuffer *buffer;
+    guint64 reading_pos;
+
+    reading_pos = queue->current->reading_pos;
 
     ret =
-        gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
+        gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
         &buffer);
     switch (ret) {
       case GST_FLOW_OK:
@@ -946,6 +1124,9 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
   gint fd = -1;
   gchar *name = NULL;
 
+  if (queue->temp_file)
+    goto already_opened;
+
   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
 
   /* we have two cases:
@@ -982,14 +1163,18 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
     if (queue->temp_file == NULL)
       goto open_failed;
   }
+  GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
 
-  queue->writing_pos = 0;
-  queue->reading_pos = 0;
-  queue->max_reading_pos = 0;
+  init_ranges (queue);
 
   return TRUE;
 
   /* ERRORS */
+already_opened:
+  {
+    GST_DEBUG_OBJECT (queue, "temp file was already open");
+    return TRUE;
+  }
 no_directory:
   {
     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
@@ -1031,6 +1216,7 @@ gst_queue2_close_temp_location_file (GstQueue2 * queue)
     remove (queue->temp_location);
 
   queue->temp_file = NULL;
+  clean_ranges (queue);
 }
 
 static void
@@ -1043,9 +1229,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
 
   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
 
-  queue->writing_pos = 0;
-  queue->reading_pos = 0;
-  queue->max_reading_pos = 0;
+  init_ranges (queue);
 }
 
 static void
@@ -1108,6 +1292,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
       case GST_EVENT_EOS:
         /* Zero the thresholds, this makes sure the queue is completely
          * filled and we can read all data from the queue. */
+        GST_DEBUG_OBJECT (queue, "we have EOS");
         queue->is_eos = TRUE;
         break;
       case GST_EVENT_NEWSEGMENT:
@@ -1245,39 +1430,50 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_FLUSH_START:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
-      /* forward event */
-      gst_pad_push_event (queue->srcpad, event);
+      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+        /* forward event */
+        gst_pad_push_event (queue->srcpad, event);
 
-      /* now unblock the chain function */
-      GST_QUEUE2_MUTEX_LOCK (queue);
-      queue->srcresult = GST_FLOW_WRONG_STATE;
-      /* unblock the loop and chain functions */
-      g_cond_signal (queue->item_add);
-      g_cond_signal (queue->item_del);
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
+        /* now unblock the chain function */
+        GST_QUEUE2_MUTEX_LOCK (queue);
+        queue->srcresult = GST_FLOW_WRONG_STATE;
+        /* unblock the loop and chain functions */
+        g_cond_signal (queue->item_add);
+        g_cond_signal (queue->item_del);
+        GST_QUEUE2_MUTEX_UNLOCK (queue);
 
-      /* make sure it pauses, this should happen since we sent
-       * flush_start downstream. */
-      gst_pad_pause_task (queue->srcpad);
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+        /* make sure it pauses, this should happen since we sent
+         * flush_start downstream. */
+        gst_pad_pause_task (queue->srcpad);
+        GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+      }
       goto done;
     }
     case GST_EVENT_FLUSH_STOP:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
-      /* forward event */
-      gst_pad_push_event (queue->srcpad, event);
 
-      GST_QUEUE2_MUTEX_LOCK (queue);
-      gst_queue2_locked_flush (queue);
-      queue->srcresult = GST_FLOW_OK;
-      queue->is_eos = FALSE;
-      queue->unexpected = FALSE;
-      /* reset rate counters */
-      reset_rate_timer (queue);
-      gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
-          queue->srcpad);
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
+      if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
+        /* forward event */
+        gst_pad_push_event (queue->srcpad, event);
+
+        GST_QUEUE2_MUTEX_LOCK (queue);
+        gst_queue2_locked_flush (queue);
+        queue->srcresult = GST_FLOW_OK;
+        queue->is_eos = FALSE;
+        queue->unexpected = FALSE;
+        /* reset rate counters */
+        reset_rate_timer (queue);
+        gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
+            queue->srcpad);
+        GST_QUEUE2_MUTEX_UNLOCK (queue);
+      } else {
+        GST_QUEUE2_MUTEX_LOCK (queue);
+        queue->segment_event_received = FALSE;
+        queue->is_eos = FALSE;
+        queue->unexpected = FALSE;
+        GST_QUEUE2_MUTEX_UNLOCK (queue);
+      }
       goto done;
     }
     default:
@@ -1323,7 +1519,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
     return FALSE;
 
   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
-    return queue->writing_pos == queue->max_reading_pos;
+    return queue->current->writing_pos == queue->current->max_reading_pos;
   } else {
     if (queue->queue->length == 0)
       return TRUE;
@@ -1626,7 +1822,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
     /* just forward upstream */
     res = gst_pad_push_event (queue->sinkpad, event);
   } else {
-    /* when using a temp file, we unblock the pending read */
+    /* when using a temp file, we eat the event */
     res = TRUE;
     gst_event_unref (event);
   }
@@ -1706,6 +1902,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
         GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
       } else {
         gint64 start, stop;
+        guint64 writing_pos;
 
         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
 
@@ -1721,20 +1918,22 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
                     &duration))
               goto peer_failed;
 
+            writing_pos = queue->current->writing_pos;
+
             GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
-                G_GINT64_FORMAT, duration, queue->writing_pos);
+                G_GINT64_FORMAT, duration, writing_pos);
 
             start = 0;
             /* get our available data relative to the duration */
             if (duration != -1)
-              stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration;
+              stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
             else
               stop = -1;
             break;
           }
           case GST_FORMAT_BYTES:
             start = 0;
-            stop = queue->writing_pos;
+            stop = queue->current->writing_pos;
             break;
           default:
             start = -1;
@@ -1774,7 +1973,7 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
 
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
-  offset = (offset == -1) ? queue->reading_pos : offset;
+  offset = (offset == -1) ? queue->current->reading_pos : offset;
 
   /* function will block when the range is not yet available */
   ret = gst_queue2_create_read (queue, offset, length, buffer);
@@ -1887,12 +2086,14 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
 
   if (active) {
     if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+      /* open the temp file now */
+      result = gst_queue2_open_temp_location_file (queue);
+
       GST_QUEUE2_MUTEX_LOCK (queue);
       GST_DEBUG_OBJECT (queue, "activating pull mode");
       queue->srcresult = GST_FLOW_OK;
       queue->is_eos = FALSE;
       queue->unexpected = FALSE;
-      result = TRUE;
       GST_QUEUE2_MUTEX_UNLOCK (queue);
     } else {
       GST_QUEUE2_MUTEX_LOCK (queue);
index 55ad92b..a47f883 100644 (file)
@@ -45,6 +45,7 @@ G_BEGIN_DECLS
 typedef struct _GstQueue2 GstQueue2;
 typedef struct _GstQueue2Size GstQueue2Size;
 typedef struct _GstQueue2Class GstQueue2Class;
+typedef struct _GstQueue2Range GstQueue2Range;
 
 /* used to keep track of sizes (current and max) */
 struct _GstQueue2Size
@@ -55,6 +56,16 @@ struct _GstQueue2Size
   guint64 rate_time;
 };
 
+struct _GstQueue2Range
+{
+  GstQueue2Range *next;
+
+  guint64 offset;
+  guint64 writing_pos;
+  guint64 reading_pos;
+  guint64 max_reading_pos;
+};
+
 struct _GstQueue2
 {
   GstElement element;
@@ -112,14 +123,13 @@ struct _GstQueue2
   gchar *temp_location;
   gboolean temp_remove;
   FILE *temp_file;
-  guint64 writing_pos;
-  guint64 reading_pos;
-  guint64 max_reading_pos;
+  /* list of downloaded areas and the current area */
+  GstQueue2Range *ranges;
+  GstQueue2Range *current;
   /* we need this to send the first new segment event of the stream
    * because we can't save it on the file */
   gboolean segment_event_received;
   GstEvent *starting_segment;
-
 };
 
 struct _GstQueue2Class