matroskademux: support push based mode
authorMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Wed, 16 Dec 2009 11:43:27 +0000 (12:43 +0100)
committerMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Wed, 16 Dec 2009 11:46:40 +0000 (12:46 +0100)
Fixes #598610.

gst/matroska/ebml-read.c
gst/matroska/ebml-read.h
gst/matroska/matroska-demux.c
gst/matroska/matroska-demux.h

index aa3da97..5a8eea5 100644 (file)
@@ -171,6 +171,29 @@ gst_ebml_read_change_state (GstElement * element, GstStateChange transition)
 }
 
 /*
+ * Used in push mode.
+ * Provided buffer is used as cache, based on offset 0, and no further reads
+ * will be issued.
+ */
+
+void
+gst_ebml_read_reset_cache (GstEbmlRead * ebml, GstBuffer * buffer,
+    guint64 offset)
+{
+  if (ebml->cached_buffer)
+    gst_buffer_unref (ebml->cached_buffer);
+
+  ebml->cached_buffer = buffer;
+  ebml->push_cache = TRUE;
+  buffer = gst_buffer_make_metadata_writable (buffer);
+  GST_BUFFER_OFFSET (buffer) = offset;
+  ebml->offset = offset;
+  g_list_foreach (ebml->level, (GFunc) gst_ebml_level_free, NULL);
+  g_list_free (ebml->level);
+  ebml->level = NULL;
+}
+
+/*
  * Return: the amount of levels in the hierarchy that the
  * current element lies higher than the previous one.
  * The opposite isn't done - that's auto-done using master
@@ -224,6 +247,13 @@ gst_ebml_read_peek_bytes (GstEbmlRead * ebml, guint size, GstBuffer ** p_buf,
       return GST_FLOW_OK;
     }
     /* not enough data in the cache, free cache and get a new one */
+    /* never drop pushed cache */
+    if (ebml->push_cache) {
+      if (ebml->offset == cache_offset + cache_size)
+        return GST_FLOW_END;
+      else
+        return GST_FLOW_UNEXPECTED;
+    }
     gst_buffer_unref (ebml->cached_buffer);
     ebml->cached_buffer = NULL;
   }
@@ -437,8 +467,17 @@ gst_ebml_peek_id (GstEbmlRead * ebml, guint * level_up, guint32 * id)
 next:
   off = ebml->offset;           /* save offset */
 
-  if ((ret = gst_ebml_read_element_id (ebml, id, &level_up_tmp)) != GST_FLOW_OK)
-    return ret;
+  if ((ret = gst_ebml_read_element_id (ebml, id, &level_up_tmp)) != GST_FLOW_OK) {
+    if (ret != GST_FLOW_END)
+      return ret;
+    else {
+      /* simulate dummy VOID element,
+       * and have the call stack bail out all the way */
+      *id = GST_EBML_ID_VOID;
+      *level_up = G_MAXUINT32 >> 2;
+      return GST_FLOW_OK;
+    }
+  }
 
   ebml->offset = off;           /* restore offset */
 
index 2d5a5a1..d23d129 100644 (file)
@@ -39,6 +39,9 @@ G_BEGIN_DECLS
 #define GST_EBML_READ_GET_CLASS(obj) \
   (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_EBML_READ, GstEbmlReadClass))
 
+/* custom flow return code */
+#define  GST_FLOW_END  GST_FLOW_CUSTOM_SUCCESS
+
 typedef struct _GstEbmlLevel {
   guint64 start;
   guint64 length;
@@ -48,6 +51,7 @@ typedef struct _GstEbmlRead {
   GstElement parent;
 
   GstBuffer *cached_buffer;
+  gboolean push_cache;
 
   GstPad *sinkpad;
   guint64 offset;
@@ -63,6 +67,10 @@ GType    gst_ebml_read_get_type          (void);
 
 void          gst_ebml_level_free        (GstEbmlLevel *level);
 
+void          gst_ebml_read_reset_cache (GstEbmlRead * ebml,
+                                          GstBuffer * buffer,
+                                          guint64 offset);
+
 GstFlowReturn gst_ebml_peek_id           (GstEbmlRead *ebml,
                                           guint       *level_up,
                                           guint32     *id);
index f684410..f5d3b52 100644 (file)
@@ -149,6 +149,11 @@ static const GstQueryType *gst_matroska_demux_get_src_query_types (GstPad *
 static gboolean gst_matroska_demux_handle_src_query (GstPad * pad,
     GstQuery * query);
 
+static gboolean gst_matroska_demux_handle_sink_event (GstPad * pad,
+    GstEvent * event);
+static GstFlowReturn gst_matroska_demux_chain (GstPad * pad,
+    GstBuffer * buffer);
+
 static GstStateChangeReturn
 gst_matroska_demux_change_state (GstElement * element,
     GstStateChange transition);
@@ -208,6 +213,8 @@ gst_matroska_demux_finalize (GObject * object)
     demux->global_tags = NULL;
   }
 
+  g_object_unref (demux->adapter);
+
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -244,6 +251,10 @@ gst_matroska_demux_init (GstMatroskaDemux * demux,
       GST_DEBUG_FUNCPTR (gst_matroska_demux_sink_activate));
   gst_pad_set_activatepull_function (demux->sinkpad,
       GST_DEBUG_FUNCPTR (gst_matroska_demux_sink_activate_pull));
+  gst_pad_set_chain_function (demux->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_matroska_demux_chain));
+  gst_pad_set_event_function (demux->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_matroska_demux_handle_sink_event));
   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
   GST_EBML_READ (demux)->sinkpad = demux->sinkpad;
 
@@ -255,6 +266,8 @@ gst_matroska_demux_init (GstMatroskaDemux * demux,
   demux->index = NULL;
   demux->global_tags = NULL;
 
+  demux->adapter = gst_adapter_new ();
+
   /* finish off */
   gst_matroska_demux_reset (GST_ELEMENT (demux));
 }
@@ -390,6 +403,10 @@ gst_matroska_demux_reset (GstElement * element)
   demux->duration = -1;
   demux->last_stop_end = GST_CLOCK_TIME_NONE;
 
+  demux->offset = 0;
+  demux->cluster_time = GST_CLOCK_TIME_NONE;
+  demux->cluster_offset = 0;
+
   if (demux->close_segment) {
     gst_event_unref (demux->close_segment);
     demux->close_segment = NULL;
@@ -2339,15 +2356,12 @@ gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
 }
 
 static GstFlowReturn
-gst_matroska_demux_init_stream (GstMatroskaDemux * demux)
+gst_matroska_demux_parse_header (GstMatroskaDemux * demux)
 {
   GstEbmlRead *ebml = GST_EBML_READ (demux);
-  guint32 id;
+  GstFlowReturn ret;
   gchar *doctype;
   guint version;
-  GstFlowReturn ret;
-
-  GST_DEBUG_OBJECT (demux, "Init stream");
 
   if ((ret = gst_ebml_read_header (ebml, &doctype, &version)) != GST_FLOW_OK)
     return ret;
@@ -2366,6 +2380,21 @@ gst_matroska_demux_init_stream (GstMatroskaDemux * demux)
     return GST_FLOW_ERROR;
   }
 
+  return ret;
+}
+
+static GstFlowReturn
+gst_matroska_demux_init_stream (GstMatroskaDemux * demux)
+{
+  GstEbmlRead *ebml = GST_EBML_READ (demux);
+  guint32 id;
+  GstFlowReturn ret;
+
+  GST_DEBUG_OBJECT (demux, "Init stream");
+
+  if ((ret = gst_matroska_demux_parse_header (demux)) != GST_FLOW_OK)
+    return ret;
+
   /* find segment, must be the next element but search as long as
    * we find it anyway */
   while (TRUE) {
@@ -5168,11 +5197,389 @@ pause:
   }
 }
 
+static inline void
+gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes)
+{
+  GstBuffer *buffer;
+
+  GST_LOG_OBJECT (demux, "caching %d bytes for parsing", bytes);
+  buffer = gst_adapter_take_buffer (demux->adapter, bytes);
+  gst_ebml_read_reset_cache (GST_EBML_READ (demux), buffer, demux->offset);
+  demux->offset += bytes;
+}
+
+static inline void
+gst_matroska_demux_flush (GstMatroskaDemux * demux, guint flush)
+{
+  GST_LOG_OBJECT (demux, "skipping %d bytes", flush);
+  gst_adapter_flush (demux->adapter, flush);
+  demux->offset += flush;
+}
+
+static GstFlowReturn
+gst_matroska_demux_peek_id_length (GstMatroskaDemux * demux, guint32 * _id,
+    guint64 * _length, guint * _needed)
+{
+  guint avail, needed;
+  const guint8 *buf;
+  gint len_mask = 0x80, read = 1, n = 1, num_ffs = 0;
+  guint64 total;
+  guint8 b;
+
+  g_return_val_if_fail (_id != NULL, GST_FLOW_ERROR);
+  g_return_val_if_fail (_length != NULL, GST_FLOW_ERROR);
+  g_return_val_if_fail (_needed != NULL, GST_FLOW_ERROR);
+
+  /* well ... */
+  *_id = (guint32) GST_EBML_SIZE_UNKNOWN;
+  *_length = GST_EBML_SIZE_UNKNOWN;
+
+  /* read element id */
+  needed = 2;
+  avail = gst_adapter_available (demux->adapter);
+  if (avail < needed)
+    goto exit;
+
+  buf = gst_adapter_peek (demux->adapter, 1);
+  b = GST_READ_UINT8 (buf);
+
+  total = (guint64) b;
+  while (read <= 4 && !(total & len_mask)) {
+    read++;
+    len_mask >>= 1;
+  }
+  if (G_UNLIKELY (read > 4))
+    goto invalid_id;
+  /* need id and at least something for subsequent length */
+  if ((needed = read + 1) > avail)
+    goto exit;
+
+  buf = gst_adapter_peek (demux->adapter, needed);
+  while (n < read) {
+    b = GST_READ_UINT8 (buf + n);
+    total = (total << 8) | b;
+    ++n;
+  }
+  *_id = (guint32) total;
+
+  /* read element length */
+  b = GST_READ_UINT8 (buf + n);
+  total = (guint64) b;
+  len_mask = 0x80;
+  read = 1;
+  while (read <= 8 && !(total & len_mask)) {
+    read++;
+    len_mask >>= 1;
+  }
+  if (G_UNLIKELY (read > 8))
+    goto invalid_length;
+  if ((needed += read - 1) > avail)
+    goto exit;
+  if ((total &= (len_mask - 1)) == len_mask - 1)
+    num_ffs++;
+
+  buf = gst_adapter_peek (demux->adapter, needed);
+  buf += (needed - read);
+  n = 1;
+  while (n < read) {
+    guint8 b = GST_READ_UINT8 (buf + n);
+
+    if (G_UNLIKELY (b == 0xff))
+      num_ffs++;
+    total = (total << 8) | b;
+    ++n;
+  }
+
+  if (G_UNLIKELY (read == num_ffs))
+    *_length = G_MAXUINT64;
+  else
+    *_length = total;
+  *_length = total;
+
+exit:
+  *_needed = needed;
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+invalid_id:
+  {
+    GST_ERROR_OBJECT (demux,
+        "Invalid EBML ID size tag (0x%x) at position %" G_GUINT64_FORMAT " (0x%"
+        G_GINT64_MODIFIER "x)", (guint) b, demux->offset, demux->offset);
+    return GST_FLOW_ERROR;
+  }
+invalid_length:
+  {
+    GST_ERROR_OBJECT (demux,
+        "Invalid EBML length size tag (0x%x) at position %" G_GUINT64_FORMAT
+        " (0x%" G_GINT64_MODIFIER "x)", (guint) b, demux->offset,
+        demux->offset);
+    return GST_FLOW_ERROR;
+  }
+}
+
+static GstFlowReturn
+gst_matroska_demux_chain (GstPad * pad, GstBuffer * buffer)
+{
+  GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (pad));
+  GstEbmlRead *ebml = GST_EBML_READ (demux);
+  guint available;
+  GstFlowReturn ret = GST_FLOW_OK;
+  guint needed;
+  guint32 id;
+  guint64 length;
+  gchar *name;
+
+  gst_adapter_push (demux->adapter, buffer);
+  buffer = NULL;
+
+next:
+  available = gst_adapter_available (demux->adapter);
+
+  ret = gst_matroska_demux_peek_id_length (demux, &id, &length, &needed);
+  if (G_UNLIKELY (ret != GST_FLOW_OK))
+    goto parse_failed;
+
+  GST_LOG_OBJECT (demux, "Offset %" G_GUINT64_FORMAT ", Element id 0x%x, "
+      "size %" G_GUINT64_FORMAT ", needed %d, available %d", demux->offset, id,
+      length, needed, available);
+
+  if (needed > available)
+    return GST_FLOW_OK;
+
+  /* only a few blocks are expected/allowed to be large,
+   * and will be recursed into, whereas others must fit */
+  if (G_LIKELY (id != GST_MATROSKA_ID_SEGMENT && id != GST_MATROSKA_ID_CLUSTER)) {
+    if (needed + length > available)
+      return GST_FLOW_OK;
+    /* probably happens with 'large pieces' at the end, so consider it EOS */
+    if (G_UNLIKELY (length > 10 * 1024 * 1024)) {
+      GST_WARNING_OBJECT (demux, "forcing EOS due to size %" G_GUINT64_FORMAT,
+          length);
+      return GST_FLOW_UNEXPECTED;
+    }
+  }
+
+  switch (demux->state) {
+    case GST_MATROSKA_DEMUX_STATE_START:
+      switch (id) {
+        case GST_EBML_ID_HEADER:
+          gst_matroska_demux_take (demux, length + needed);
+          ret = gst_matroska_demux_parse_header (demux);
+          if (ret != GST_FLOW_OK)
+            goto parse_failed;
+          demux->state = GST_MATROSKA_DEMUX_STATE_HEADER;
+          break;
+        default:
+          goto invalid_header;
+          break;
+      }
+      break;
+    case GST_MATROSKA_DEMUX_STATE_HEADER:
+    case GST_MATROSKA_DEMUX_STATE_DATA:
+      switch (id) {
+        case GST_MATROSKA_ID_SEGMENT:
+          /* eat segment prefix */
+          gst_matroska_demux_flush (demux, needed);
+          GST_DEBUG_OBJECT (demux,
+              "Found Segment start at offset %" G_GUINT64_FORMAT, ebml->offset);
+          /* seeks are from the beginning of the segment,
+           * after the segment ID/length */
+          demux->ebml_segment_start = demux->offset;
+          break;
+        case GST_MATROSKA_ID_SEGMENTINFO:
+          if (!demux->segmentinfo_parsed) {
+            gst_matroska_demux_take (demux, length + needed);
+            ret = gst_matroska_demux_parse_info (demux);
+          } else {
+            gst_matroska_demux_flush (demux, needed + length);
+          }
+          break;
+        case GST_MATROSKA_ID_TRACKS:
+          if (!demux->tracks_parsed) {
+            gst_matroska_demux_take (demux, length + needed);
+            ret = gst_matroska_demux_parse_tracks (demux);
+          } else {
+            gst_matroska_demux_flush (demux, needed + length);
+          }
+          break;
+        case GST_MATROSKA_ID_CLUSTER:
+          if (G_UNLIKELY (!demux->tracks_parsed)) {
+            GST_DEBUG_OBJECT (demux, "Cluster before Track");
+            goto not_streamable;
+          } else if (demux->state == GST_MATROSKA_DEMUX_STATE_HEADER) {
+            demux->state = GST_MATROSKA_DEMUX_STATE_DATA;
+            GST_DEBUG_OBJECT (demux, "signaling no more pads");
+            gst_element_no_more_pads (GST_ELEMENT (demux));
+            /* send initial newsegment */
+            gst_matroska_demux_send_event (demux,
+                gst_event_new_new_segment (FALSE, 1.0,
+                    GST_FORMAT_TIME, 0,
+                    (demux->segment.duration >
+                        0) ? demux->segment.duration : -1, 0));
+          }
+          demux->cluster_time = GST_CLOCK_TIME_NONE;
+          demux->cluster_offset = demux->parent.offset;
+          /* eat cluster prefix */
+          gst_matroska_demux_flush (demux, needed);
+          break;
+        case GST_MATROSKA_ID_CLUSTERTIMECODE:
+        {
+          guint64 num;
+
+          gst_matroska_demux_take (demux, length + needed);
+          if ((ret = gst_ebml_read_uint (ebml, &id, &num)) != GST_FLOW_OK)
+            goto parse_failed;
+          GST_DEBUG_OBJECT (demux, "ClusterTimeCode: %" G_GUINT64_FORMAT, num);
+          demux->cluster_time = num;
+          break;
+        }
+        case GST_MATROSKA_ID_BLOCKGROUP:
+          gst_matroska_demux_take (demux, length + needed);
+          DEBUG_ELEMENT_START (demux, ebml, "BlockGroup");
+          if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK) {
+            ret = gst_matroska_demux_parse_blockgroup_or_simpleblock (demux,
+                demux->cluster_time, demux->cluster_offset, FALSE);
+          }
+          DEBUG_ELEMENT_STOP (demux, ebml, "BlockGroup", ret);
+          if (ret != GST_FLOW_OK)
+            goto exit;
+          break;
+        case GST_MATROSKA_ID_SIMPLEBLOCK:
+          gst_matroska_demux_take (demux, length + needed);
+          DEBUG_ELEMENT_START (demux, ebml, "SimpleBlock");
+          ret = gst_matroska_demux_parse_blockgroup_or_simpleblock (demux,
+              demux->cluster_time, demux->cluster_offset, TRUE);
+          DEBUG_ELEMENT_STOP (demux, ebml, "SimpleBlock", ret);
+          if (ret != GST_FLOW_OK)
+            goto exit;
+          break;
+        case GST_MATROSKA_ID_ATTACHMENTS:
+          if (!demux->attachments_parsed) {
+            gst_matroska_demux_take (demux, length + needed);
+            ret = gst_matroska_demux_parse_attachments (demux);
+          } else {
+            gst_matroska_demux_flush (demux, needed + length);
+          }
+          break;
+        case GST_MATROSKA_ID_TAGS:
+          gst_matroska_demux_take (demux, length + needed);
+          ret = gst_matroska_demux_parse_metadata (demux);
+          break;
+        case GST_MATROSKA_ID_CHAPTERS:
+          name = "Cues";
+          goto skip;
+        case GST_MATROSKA_ID_SEEKHEAD:
+          name = "SeekHead";
+          goto skip;
+        case GST_MATROSKA_ID_CUES:
+          name = "Cues";
+          goto skip;
+        default:
+          name = "Unknown";
+        skip:
+          GST_DEBUG_OBJECT (demux, "skipping Element 0x%x (%s)", id, name);
+          gst_matroska_demux_flush (demux, needed + length);
+          break;
+      }
+      break;
+  }
+
+  if (ret != GST_FLOW_OK)
+    goto parse_failed;
+  else
+    goto next;
+
+exit:
+  return ret;
+
+  /* ERRORS */
+parse_failed:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL),
+        ("Failed to parse Element 0x%x", id));
+    return GST_FLOW_ERROR;
+  }
+not_streamable:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL),
+        ("File layout does not permit streaming"));
+    return GST_FLOW_ERROR;
+  }
+invalid_header:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header"));
+    return GST_FLOW_ERROR;
+  }
+}
+
+static gboolean
+gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event)
+{
+  gboolean res = TRUE;
+  GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (pad));
+
+  GST_DEBUG_OBJECT (demux,
+      "have event type %s: %p on sink pad", GST_EVENT_TYPE_NAME (event), event);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_NEWSEGMENT:
+    {
+      GstFormat format;
+      gdouble rate, arate;
+      gint64 start, stop, time = 0;
+      gboolean update;
+      GstSegment segment;
+
+      /* some debug output */
+      gst_segment_init (&segment, GST_FORMAT_UNDEFINED);
+      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+          &start, &stop, &time);
+      gst_segment_set_newsegment_full (&segment, update, rate, arate, format,
+          start, stop, time);
+      GST_DEBUG_OBJECT (demux,
+          "received format %d newsegment %" GST_SEGMENT_FORMAT, format,
+          &segment);
+
+      /* chain will send initial newsegment after pads have been added */
+      GST_DEBUG_OBJECT (demux, "eating event");
+      gst_event_unref (event);
+      res = TRUE;
+      break;
+    }
+    case GST_EVENT_EOS:
+    {
+      if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
+        gst_event_unref (event);
+        GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
+            (NULL), ("got eos and didn't receive a complete header object"));
+      } else if (demux->num_streams == 0) {
+        GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
+            (NULL), ("got eos but no streams (yet)"));
+      } else {
+        gst_matroska_demux_send_event (demux, event);
+      }
+      break;
+    }
+    default:
+      res = gst_pad_event_default (pad, event);
+      break;
+  }
+
+  return res;
+}
+
 static gboolean
 gst_matroska_demux_sink_activate (GstPad * sinkpad)
 {
-  if (gst_pad_check_pull_range (sinkpad))
+  if (gst_pad_check_pull_range (sinkpad)) {
+    GST_DEBUG ("going to pull mode");
     return gst_pad_activate_pull (sinkpad, TRUE);
+  } else {
+    GST_DEBUG ("going to push (streaming) mode");
+    return gst_pad_activate_push (sinkpad, TRUE);
+  }
 
   return FALSE;
 }
index 04a3828..59a3eb9 100644 (file)
@@ -23,6 +23,7 @@
 #define __GST_MATROSKA_DEMUX_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstadapter.h>
 
 #include "ebml-read.h"
 #include "matroska-ids.h"
@@ -97,6 +98,13 @@ typedef struct _GstMatroskaDemux {
   GstEvent                *close_segment;
   GstEvent                *new_segment;
   GstTagList              *global_tags;
+
+  /* push based mode usual suspects */
+  guint64                  offset;
+  GstAdapter              *adapter;
+  /* some state saving */
+  GstClockTime             cluster_time;
+  guint64                  cluster_offset;
 } GstMatroskaDemux;
 
 typedef struct _GstMatroskaDemuxClass {