matroskademux: implement push mode seeking
authorMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Wed, 14 Apr 2010 09:53:46 +0000 (11:53 +0200)
committerMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Fri, 30 Apr 2010 11:49:39 +0000 (13:49 +0200)
gst/matroska/matroska-demux.c
gst/matroska/matroska-demux.h

index 59b2cb1a5b830c7926a0a23f78622002bbac6dc8..7feda07ae49f44716e420cfc0916e4e9c4f45db7 100644 (file)
@@ -175,6 +175,8 @@ static GstCaps
 
 /* stream methods */
 static void gst_matroska_demux_reset (GstElement * element);
+static gboolean perform_seek_to_offset (GstMatroskaDemux * demux,
+    guint64 offset);
 
 GType gst_matroska_demux_get_type (void);
 GST_BOILERPLATE (GstMatroskaDemux, gst_matroska_demux, GstEbmlRead,
@@ -408,6 +410,14 @@ gst_matroska_demux_reset (GstElement * element)
   demux->offset = 0;
   demux->cluster_time = GST_CLOCK_TIME_NONE;
   demux->cluster_offset = 0;
+  demux->index_offset = 0;
+  demux->seekable = FALSE;
+  demux->need_newsegment = FALSE;
+  demux->building_index = FALSE;
+  if (demux->seek_event) {
+    gst_event_unref (demux->seek_event);
+    demux->seek_event = NULL;
+  }
 
   demux->from_offset = -1;
   demux->to_offset = G_MAXINT64;
@@ -1969,19 +1979,25 @@ gst_matroska_demux_query (GstMatroskaDemux * demux, GstPad * pad,
       break;
     }
 
-    case GST_QUERY_SEEKING:{
+    case GST_QUERY_SEEKING:
+    {
       GstFormat fmt;
 
-      res = TRUE;
       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
+      if (fmt == GST_FORMAT_TIME) {
+        gboolean seekable;
 
-      if (fmt != GST_FORMAT_TIME || !demux->index) {
-        gst_query_set_seeking (query, fmt, FALSE, -1, -1);
-      } else {
-        gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0,
-            demux->segment.duration);
-      }
+        if (demux->streaming) {
+          /* assuming we'll be able to get an index ... */
+          seekable = demux->seekable;
+        } else {
+          seekable = !!demux->index;
+        }
 
+        gst_query_set_seeking (query, GST_FORMAT_TIME, seekable,
+            0, demux->segment.duration);
+        res = TRUE;
+      }
       break;
     }
     default:
@@ -2164,12 +2180,30 @@ gst_matroska_demux_get_seek_track (GstMatroskaDemux * demux,
   return track;
 }
 
+static void
+gst_matroska_demux_reset_streams (GstMatroskaDemux * demux, GstClockTime time,
+    gboolean full)
+{
+  gint i;
+
+  GST_DEBUG_OBJECT (demux, "resetting stream state");
+
+  g_assert (demux->src->len == demux->num_streams);
+  for (i = 0; i < demux->src->len; i++) {
+    GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i);
+    context->pos = time;
+    context->set_discont = TRUE;
+    context->eos = FALSE;
+    context->from_time = GST_CLOCK_TIME_NONE;
+    if (full)
+      context->last_flow = GST_FLOW_OK;
+  }
+}
+
 static gboolean
 gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux,
     GstMatroskaIndex * entry, gboolean reset)
 {
-  gint i;
-
   GST_OBJECT_LOCK (demux);
 
   /* seek (relative to matroska segment) */
@@ -2185,15 +2219,7 @@ gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux,
       entry->block, GST_TIME_ARGS (entry->time));
 
   /* update the time */
-  g_assert (demux->src->len == demux->num_streams);
-  for (i = 0; i < demux->src->len; i++) {
-    GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i);
-    context->pos = entry->time;
-    context->set_discont = TRUE;
-    context->last_flow = GST_FLOW_OK;
-    context->eos = FALSE;
-    context->from_time = GST_CLOCK_TIME_NONE;
-  }
+  gst_matroska_demux_reset_streams (demux, entry->time, TRUE);
   demux->segment.last_stop = entry->time;
   demux->seek_block = entry->block;
   demux->last_stop_end = GST_CLOCK_TIME_NONE;
@@ -2230,12 +2256,6 @@ gst_matroska_demux_handle_seek_event (GstMatroskaDemux * demux,
   GstSegment seeksegment = { 0, };
   gboolean update;
 
-  /* no seeking until we are (safely) ready */
-  if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
-    GST_DEBUG_OBJECT (demux, "not ready for seeking yet");
-    return FALSE;
-  }
-
   if (pad)
     track = gst_pad_get_element_private (pad);
 
@@ -2274,6 +2294,14 @@ gst_matroska_demux_handle_seek_event (GstMatroskaDemux * demux,
   GST_DEBUG_OBJECT (demux, "Seek position looks sane");
   GST_OBJECT_UNLOCK (demux);
 
+  if (demux->streaming) {
+    /* need to seek to cluster start to pick up cluster time */
+    /* upstream takes care of flushing and all that
+     * ... and newsegment event handling takes care of the rest */
+    return perform_seek_to_offset (demux,
+        entry->pos + demux->ebml_segment_start);
+  }
+
   flush = !!(flags & GST_SEEK_FLAG_FLUSH);
   keyunit = !!(flags & GST_SEEK_FLAG_KEY_UNIT);
 
@@ -2364,6 +2392,90 @@ seek_error:
   }
 }
 
+/*
+ * Handle whether we can perform the seek event or if we have to let the chain
+ * function handle seeks to build the seek indexes first.
+ */
+static gboolean
+gst_matroska_demux_handle_seek_push (GstMatroskaDemux * demux, GstPad * pad,
+    GstEvent * event)
+{
+  GstSeekFlags flags;
+  GstSeekType cur_type, stop_type;
+  GstFormat format;
+  gdouble rate;
+  gint64 cur, stop;
+
+  gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
+      &stop_type, &stop);
+
+  /* sanity checks */
+
+  /* we can only seek on time */
+  if (format != GST_FORMAT_TIME) {
+    GST_DEBUG_OBJECT (demux, "Can only seek on TIME");
+    return FALSE;
+  }
+
+  if (stop_type != GST_SEEK_TYPE_NONE && stop != GST_CLOCK_TIME_NONE) {
+    GST_DEBUG_OBJECT (demux, "Seek end-time not supported in streaming mode");
+    return FALSE;
+  }
+
+  if (!(flags & GST_SEEK_FLAG_FLUSH)) {
+    GST_DEBUG_OBJECT (demux,
+        "Non-flushing seek not supported in streaming mode");
+    return FALSE;
+  }
+
+  if (flags & GST_SEEK_FLAG_SEGMENT) {
+    GST_DEBUG_OBJECT (demux, "Segment seek not supported in streaming mode");
+    return FALSE;
+  }
+
+  /* check for having parsed index already */
+  if (!demux->index_parsed) {
+    guint64 offset;
+    gboolean building_index;
+
+    if (!demux->index_offset) {
+      GST_DEBUG_OBJECT (demux, "no index (location); no seek in push mode");
+      return FALSE;
+    }
+
+    GST_OBJECT_LOCK (demux);
+    /* handle the seek event in the chain function */
+    demux->state = GST_MATROSKA_DEMUX_STATE_SEEK;
+    /* no more seek can be issued until state reset to _DATA */
+
+    /* copy the event */
+    if (demux->seek_event)
+      gst_event_unref (demux->seek_event);
+    demux->seek_event = gst_event_ref (event);
+
+    /* set the building_index flag so that only one thread can setup the
+     * structures for index seeking. */
+    building_index = demux->building_index;
+    if (!building_index) {
+      demux->building_index = TRUE;
+      offset = demux->index_offset;
+    }
+    GST_OBJECT_UNLOCK (demux);
+
+    if (!building_index) {
+      /* seek to the first subindex or legacy index */
+      GST_INFO_OBJECT (demux, "Seeking to Cues at %" G_GUINT64_FORMAT, offset);
+      return perform_seek_to_offset (demux, offset);
+    }
+
+    /* well, we are handling it already */
+    return TRUE;
+  }
+
+  /* delegate to tweaked regular seek */
+  return gst_matroska_demux_handle_seek_event (demux, pad, event);
+}
+
 static gboolean
 gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
 {
@@ -2372,7 +2484,15 @@ gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_SEEK:
-      res = gst_matroska_demux_handle_seek_event (demux, pad, event);
+      /* no seeking until we are (safely) ready */
+      if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
+        GST_DEBUG_OBJECT (demux, "not ready for seeking yet");
+        return FALSE;
+      }
+      if (!demux->streaming)
+        res = gst_matroska_demux_handle_seek_event (demux, pad, event);
+      else
+        res = gst_matroska_demux_handle_seek_push (demux, pad, event);
       gst_event_unref (event);
       break;
 
@@ -4464,6 +4584,22 @@ gst_matroska_demux_parse_blockgroup_or_simpleblock (GstMatroskaDemux * demux,
       lace_time = GST_CLOCK_TIME_NONE;
     }
 
+    /* need to refresh segment info ASAP */
+    if (GST_CLOCK_TIME_IS_VALID (lace_time) && demux->need_newsegment) {
+      GST_DEBUG_OBJECT (demux,
+          "generating segment starting at %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (lace_time));
+      /* pretend we seeked here */
+      gst_segment_set_seek (&demux->segment, demux->segment.rate,
+          GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, lace_time,
+          GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE, NULL);
+      /* now convey our segment notion downstream */
+      gst_matroska_demux_send_event (demux, gst_event_new_new_segment (FALSE,
+              demux->segment.rate, demux->segment.format, demux->segment.start,
+              demux->segment.stop, demux->segment.start));
+      demux->need_newsegment = FALSE;
+    }
+
     if (block_duration) {
       if (stream->timecodescale == 1.0)
         duration = block_duration * demux->time_scale;
@@ -4902,6 +5038,16 @@ gst_matroska_demux_parse_contents_seekentry (GstMatroskaDemux * demux)
         break;
       }
 
+      /* only pick up index location when streaming */
+      if (demux->streaming) {
+        if (seek_id == GST_MATROSKA_ID_CUES) {
+          demux->index_offset = seek_pos + demux->ebml_segment_start;
+          GST_DEBUG_OBJECT (demux, "Cues located at offset %" G_GUINT64_FORMAT,
+              demux->index_offset);
+        }
+        break;
+      }
+
       /* seek */
       if (gst_ebml_read_seek (ebml, seek_pos + demux->ebml_segment_start) !=
           GST_FLOW_OK)
@@ -5404,6 +5550,66 @@ pause:
   }
 }
 
+/*
+ * Create and push a flushing seek event upstream
+ */
+static gboolean
+perform_seek_to_offset (GstMatroskaDemux * demux, guint64 offset)
+{
+  GstEvent *event;
+  gboolean res = 0;
+
+  GST_DEBUG_OBJECT (demux, "Seeking to %" G_GUINT64_FORMAT, offset);
+
+  event =
+      gst_event_new_seek (1.0, GST_FORMAT_BYTES,
+      GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
+      GST_SEEK_TYPE_NONE, -1);
+
+  res = gst_pad_push_event (demux->sinkpad, event);
+
+  /* newsegment event will update offset */
+  return res;
+}
+
+static void
+gst_matroska_demux_check_seekability (GstMatroskaDemux * demux)
+{
+  GstQuery *query;
+  gboolean seekable = FALSE;
+  gint64 start = -1, stop = -1;
+
+  query = gst_query_new_seeking (GST_FORMAT_BYTES);
+  if (!gst_pad_peer_query (demux->sinkpad, query)) {
+    GST_DEBUG_OBJECT (demux, "seeking query failed");
+    goto done;
+  }
+
+  gst_query_parse_seeking (query, NULL, &seekable, &start, &stop);
+
+  /* try harder to query upstream size if we didn't get it the first time */
+  if (seekable && stop == -1) {
+    GstFormat fmt = GST_FORMAT_BYTES;
+
+    GST_DEBUG_OBJECT (demux, "doing duration query to fix up unset stop");
+    gst_pad_query_peer_duration (demux->sinkpad, &fmt, &stop);
+  }
+
+  /* if upstream doesn't know the size, it's likely that it's not seekable in
+   * practice even if it technically may be seekable */
+  if (seekable && (start != 0 || stop <= start)) {
+    GST_DEBUG_OBJECT (demux, "seekable but unknown start/stop -> disable");
+    seekable = FALSE;
+  }
+
+done:
+  GST_INFO_OBJECT (demux, "seekable: %d (%" G_GUINT64_FORMAT " - %"
+      G_GUINT64_FORMAT ")", seekable, start, stop);
+  demux->seekable = seekable;
+
+  gst_query_unref (query);
+}
+
 static inline void
 gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes)
 {
@@ -5538,6 +5744,12 @@ gst_matroska_demux_chain (GstPad * pad, GstBuffer * buffer)
   guint64 length;
   const gchar *name;
 
+  if (G_UNLIKELY (GST_BUFFER_IS_DISCONT (buffer))) {
+    GST_DEBUG_OBJECT (demux, "got DISCONT");
+    gst_adapter_clear (demux->adapter);
+    gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, FALSE);
+  }
+
   gst_adapter_push (demux->adapter, buffer);
   buffer = NULL;
 
@@ -5577,6 +5789,7 @@ next:
           if (ret != GST_FLOW_OK)
             goto parse_failed;
           demux->state = GST_MATROSKA_DEMUX_STATE_HEADER;
+          gst_matroska_demux_check_seekability (demux);
           break;
         default:
           goto invalid_header;
@@ -5585,6 +5798,7 @@ next:
       break;
     case GST_MATROSKA_DEMUX_STATE_HEADER:
     case GST_MATROSKA_DEMUX_STATE_DATA:
+    case GST_MATROSKA_DEMUX_STATE_SEEK:
       switch (id) {
         case GST_MATROSKA_ID_SEGMENT:
           /* eat segment prefix */
@@ -5678,11 +5892,38 @@ next:
           name = "Cues";
           goto skip;
         case GST_MATROSKA_ID_SEEKHEAD:
-          name = "SeekHead";
-          goto skip;
+          gst_matroska_demux_take (demux, length + needed);
+          DEBUG_ELEMENT_START (demux, ebml, "SeekHead");
+          if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK)
+            ret = gst_matroska_demux_parse_contents (demux);
+          if (ret != GST_FLOW_OK)
+            goto exit;
+          break;
         case GST_MATROSKA_ID_CUES:
-          name = "Cues";
-          goto skip;
+          if (demux->index_parsed) {
+            gst_matroska_demux_flush (demux, needed + length);
+            break;
+          }
+          gst_matroska_demux_take (demux, length + needed);
+          ret = gst_matroska_demux_parse_index (demux);
+          if (demux->state == GST_MATROSKA_DEMUX_STATE_SEEK) {
+            GstEvent *event;
+
+            GST_OBJECT_LOCK (demux);
+            event = demux->seek_event;
+            demux->seek_event = NULL;
+            GST_OBJECT_UNLOCK (demux);
+
+            g_assert (event);
+            /* unlikely to fail, since we managed to seek to this point */
+            if (!gst_matroska_demux_handle_seek_event (demux, NULL, event))
+              goto seek_failed;
+            /* resume data handling, main thread clear to seek again */
+            GST_OBJECT_LOCK (demux);
+            demux->state = GST_MATROSKA_DEMUX_STATE_DATA;
+            GST_OBJECT_UNLOCK (demux);
+          }
+          break;
         default:
           name = "Unknown";
         skip:
@@ -5719,6 +5960,11 @@ invalid_header:
     GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header"));
     return GST_FLOW_ERROR;
   }
+seek_failed:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Failed to seek"));
+    return GST_FLOW_ERROR;
+  }
 }
 
 static gboolean
@@ -5749,7 +5995,33 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
           "received format %d newsegment %" GST_SEGMENT_FORMAT, format,
           &segment);
 
-      /* chain will send initial newsegment after pads have been added */
+      if (demux->state < GST_MATROSKA_DEMUX_STATE_DATA) {
+        GST_DEBUG_OBJECT (demux, "still starting");
+        goto exit;
+      }
+
+      /* we only expect a BYTE segment, e.g. following a seek */
+      if (format != GST_FORMAT_BYTES) {
+        GST_DEBUG_OBJECT (demux, "unsupported segment format, ignoring");
+        goto exit;
+      }
+
+      GST_DEBUG_OBJECT (demux, "clearing segment state");
+      /* clear current segment leftover */
+      gst_adapter_clear (demux->adapter);
+      /* and some streaming setup */
+      demux->offset = start;
+      /* do not know where we are;
+       * need to come across a cluster and generate newsegment */
+      demux->segment.last_stop = GST_CLOCK_TIME_NONE;
+      demux->cluster_time = GST_CLOCK_TIME_NONE;
+      demux->cluster_offset = 0;
+      demux->need_newsegment = TRUE;
+      /* but keep some of the upstream segment */
+      demux->segment.rate = rate;
+    exit:
+      /* chain will send initial newsegment after pads have been added,
+       * or otherwise come up with one */
       GST_DEBUG_OBJECT (demux, "eating event");
       gst_event_unref (event);
       res = TRUE;
@@ -5769,6 +6041,15 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
       }
       break;
     }
+    case GST_EVENT_FLUSH_STOP:
+    {
+      gst_adapter_clear (demux->adapter);
+      gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, TRUE);
+      demux->segment.last_stop = GST_CLOCK_TIME_NONE;
+      demux->cluster_time = GST_CLOCK_TIME_NONE;
+      demux->cluster_offset = 0;
+      /* fall-through */
+    }
     default:
       res = gst_pad_event_default (pad, event);
       break;
@@ -5780,11 +6061,15 @@ gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
 static gboolean
 gst_matroska_demux_sink_activate (GstPad * sinkpad)
 {
+  GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (sinkpad));
+
   if (gst_pad_check_pull_range (sinkpad)) {
     GST_DEBUG ("going to pull mode");
+    demux->streaming = FALSE;
     return gst_pad_activate_pull (sinkpad, TRUE);
   } else {
     GST_DEBUG ("going to push (streaming) mode");
+    demux->streaming = TRUE;
     return gst_pad_activate_push (sinkpad, TRUE);
   }
 
index 24d72eebf95c700e9f49726006c2ef35b3e63255..6c14d79aaef27dcd0f422468e815eefaa2efa6c8 100644 (file)
@@ -44,7 +44,8 @@ G_BEGIN_DECLS
 typedef enum {
   GST_MATROSKA_DEMUX_STATE_START,
   GST_MATROSKA_DEMUX_STATE_HEADER,
-  GST_MATROSKA_DEMUX_STATE_DATA
+  GST_MATROSKA_DEMUX_STATE_DATA,
+  GST_MATROSKA_DEMUX_STATE_SEEK
 } GstMatroskaDemuxState;
 
 typedef struct _GstMatroskaDemux {
@@ -70,6 +71,7 @@ typedef struct _GstMatroskaDemux {
   gint64                   created;
 
   /* state */
+  gboolean                 streaming;
   GstMatroskaDemuxState    state;
   guint                    level_up;
   guint64                  seek_block;
@@ -105,6 +107,12 @@ typedef struct _GstMatroskaDemux {
   /* some state saving */
   GstClockTime             cluster_time;
   guint64                  cluster_offset;
+  /* index stuff */
+  gboolean                 seekable;
+  gboolean                 building_index;
+  guint64                  index_offset;
+  GstEvent                *seek_event;
+  gboolean                 need_newsegment;
 
   /* reverse playback */
   GArray                  *seek_index;