queue2: fix possible data corruption in ring buffer mode when seeking
authorTim-Philipp Müller <tim.muller@collabora.co.uk>
Thu, 24 May 2012 12:08:16 +0000 (13:08 +0100)
committerTim-Philipp Müller <tim@centricular.net>
Wed, 12 Sep 2012 11:59:50 +0000 (12:59 +0100)
Fix race that could cause data corruption when seeking in ring buffer
mode.

In perform_seek_to_offset(), called from the demuxer's pull_range
request, we drop the lock, tell upstream (usually a http source)
to seek to a different offset, then re-acquire the lock before we
do things to the ranges. However, between us sending the seek event
and re-acquiring the lock, the source thread might already have pushed
some data and moved along the range's writing_pos beyond the seek
offset. In that case we don't want to set the writing position back
to the requested seek position, as it would cause data to be written
to the wrong offset in the file or ring buffer.

Reproducible doing seek-emulated fast-forward/backward on 006653.

Conflicts:
plugins/elements/gstqueue2.c

plugins/elements/gstqueue2.c

index 13e9d22..3f12939 100644 (file)
@@ -549,7 +549,7 @@ update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
 
 /* make a new range for @offset or reuse an existing range */
 static GstQueue2Range *
-add_range (GstQueue2 * queue, guint64 offset)
+add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
 {
   GstQueue2Range *range, *prev, *next;
 
@@ -559,7 +559,11 @@ add_range (GstQueue2 * queue, guint64 offset)
     GST_DEBUG_OBJECT (queue,
         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
         range->writing_pos);
-    range->writing_pos = offset;
+    if (update_existing && range->writing_pos != offset) {
+      GST_DEBUG_OBJECT (queue, "updating range writing position to "
+          "%" G_GUINT64_FORMAT, offset);
+      range->writing_pos = offset;
+    }
   } else {
     GST_DEBUG_OBJECT (queue,
         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
@@ -612,7 +616,7 @@ init_ranges (GstQueue2 * queue)
   /* get rid of all the current ranges */
   clean_ranges (queue);
   /* make a range for offset 0 */
-  queue->current = add_range (queue, 0);
+  queue->current = add_range (queue, 0, TRUE);
 }
 
 /* calculate the diff between running time on the sink and src of the queue.
@@ -656,7 +660,7 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
   if (segment->format == GST_FORMAT_BYTES) {
     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
       /* start is where we'll be getting from and as such writing next */
-      queue->current = add_range (queue, segment->start);
+      queue->current = add_range (queue, segment->start, TRUE);
     }
   }
 
@@ -1010,8 +1014,16 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
   res = gst_pad_push_event (queue->sinkpad, event);
   GST_QUEUE2_MUTEX_LOCK (queue);
 
-  if (res)
-    queue->current = add_range (queue, offset);
+  if (res) {
+    /* Between us sending the seek event and re-acquiring the lock, the source
+     * thread might already have pushed data and moved along the range's
+     * writing_pos beyond the seek offset. In that case we don't want to set
+     * the writing position back to the requested seek position, as it would
+     * cause data to be written to the wrong offset in the file or ring buffer.
+     * We still do the add_range call to switch the current range to the
+     * requested range, or create one if one doesn't exist yet. */
+    queue->current = add_range (queue, offset, FALSE);
+  }
 
   return res;
 }
@@ -1550,6 +1562,14 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
       writing_pos);
 
+  /* sanity check */
+  if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
+      GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
+    GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
+        "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
+        GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
+  }
+
   while (size > 0) {
     guint to_write;