gst/realmedia/rmdemux.c: Do fragment collection in the demuxer so that we can now...
authorWim Taymans <wim.taymans@gmail.com>
Tue, 7 Aug 2007 10:57:09 +0000 (10:57 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 7 Aug 2007 10:57:09 +0000 (10:57 +0000)
Original commit message from CVS:
* gst/realmedia/rmdemux.c: (gst_rmdemux_reset),
(gst_rmdemux_chain), (gst_rmdemux_parse_mdpr),
(gst_rmdemux_fix_timestamp), (gst_rmdemux_parse_video_packet),
(gst_rmdemux_parse_audio_packet), (gst_rmdemux_parse_packet):
Do fragment collection in the demuxer so that we can now work with
both ffmpeg and realvideodec to decoder real video content.

ChangeLog
gst/realmedia/rmdemux.c

index 6794bd0..418f7c7 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+2007-08-07  Wim Taymans  <wim.taymans@gmail.com>
+
+       * gst/realmedia/rmdemux.c: (gst_rmdemux_reset),
+       (gst_rmdemux_chain), (gst_rmdemux_parse_mdpr),
+       (gst_rmdemux_fix_timestamp), (gst_rmdemux_parse_video_packet),
+       (gst_rmdemux_parse_audio_packet), (gst_rmdemux_parse_packet):
+       Do fragment collection in the demuxer so that we can now work with
+       both ffmpeg and realvideodec to decoder real video content.
+
 2007-08-04  Stefan Kost  <ensonic@users.sf.net>
 
        * gst/realmedia/asmrules.c:
index 5575b61..06e8365 100644 (file)
@@ -6,6 +6,7 @@
  * Copyright (C) <2005> Michael Smith <fluendo.com>
  * Copyright (C) <2006> Wim Taymans <wim@fluendo.com>
  * Copyright (C) <2006> Tim-Philipp Müller <tim centricular net>
+ * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -43,6 +44,8 @@
 #define HEADER_SIZE 10
 #define DATA_SIZE 8
 
+#define MAX_FRAGS 256
+
 typedef struct _GstRMDemuxIndex GstRMDemuxIndex;
 
 struct _GstRMDemuxStream
@@ -80,6 +83,14 @@ struct _GstRMDemuxStream
   guint subpackets_needed;      /* subpackets needed for descrambling    */
   GPtrArray *subpackets;        /* array containing subpacket GstBuffers */
 
+  gint frag_seqnum;
+  gint frag_subseq;
+  guint frag_length;
+  guint frag_current;
+  guint frag_count;
+  guint frag_offset[MAX_FRAGS];
+  GstAdapter *adapter;
+
   GstTagList *pending_tags;
 };
 
@@ -154,7 +165,7 @@ static void gst_rmdemux_parse_data (GstRMDemux * rmdemux, const guint8 * data,
 static void gst_rmdemux_parse_cont (GstRMDemux * rmdemux, const guint8 * data,
     int length);
 static GstFlowReturn gst_rmdemux_parse_packet (GstRMDemux * rmdemux,
-    const guint8 * data, guint16 version, guint16 length);
+    GstBuffer * in, guint16 version);
 static void gst_rmdemux_parse_indx_data (GstRMDemux * rmdemux,
     const guint8 * data, int length);
 static void gst_rmdemux_stream_clear_cached_subpackets (GstRMDemux * rmdemux,
@@ -681,6 +692,7 @@ gst_rmdemux_reset (GstRMDemux * rmdemux)
   for (cur = rmdemux->streams; cur; cur = cur->next) {
     GstRMDemuxStream *stream = cur->data;
 
+    g_object_unref (stream->adapter);
     gst_rmdemux_stream_clear_cached_subpackets (rmdemux, stream);
     gst_element_remove_pad (GST_ELEMENT (rmdemux), stream->pad);
     if (stream->pending_tags)
@@ -951,16 +963,19 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
   GstFlowReturn ret = GST_FLOW_OK;
   const guint8 *data;
   guint16 version;
+  guint avail;
 
   GstRMDemux *rmdemux = GST_RMDEMUX (GST_PAD_PARENT (pad));
 
+  gst_adapter_push (rmdemux->adapter, buffer);
+
   GST_LOG_OBJECT (rmdemux, "Chaining buffer of size %d",
       GST_BUFFER_SIZE (buffer));
 
-  gst_adapter_push (rmdemux->adapter, buffer);
-
   while (TRUE) {
-    GST_LOG_OBJECT (rmdemux, "looping in chain");
+    avail = gst_adapter_available (rmdemux->adapter);
+
+    GST_LOG_OBJECT (rmdemux, "looping in chain, avail %u", avail);
     switch (rmdemux->state) {
       case RMDEMUX_STATE_HEADER:
       {
@@ -1155,20 +1170,30 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
           data = gst_adapter_peek (rmdemux->adapter, 4);
 
           length = RMDEMUX_GUINT16_GET (data + 2);
+          GST_LOG_OBJECT (rmdemux, "Got length %d", length);
+
           if (length < 4) {
+            GST_LOG_OBJECT (rmdemux, "length too small, dropping");
             /* Invalid, just drop it */
             gst_adapter_flush (rmdemux->adapter, 4);
           } else {
-            if (gst_adapter_available (rmdemux->adapter) < length)
+            GstBuffer *buffer;
+
+            avail = gst_adapter_available (rmdemux->adapter);
+            if (avail < length)
               goto unlock;
-            data = gst_adapter_peek (rmdemux->adapter, length);
 
-            ret =
-                gst_rmdemux_parse_packet (rmdemux, data + 4, version,
-                length - 4);
-            rmdemux->chunk_index++;
+            GST_LOG_OBJECT (rmdemux, "we have %u available and we needed %d",
+                avail, length);
+
+            /* flush version and length */
+            gst_adapter_flush (rmdemux->adapter, 4);
+            length -= 4;
 
-            gst_adapter_flush (rmdemux->adapter, length);
+            buffer = gst_adapter_take_buffer (rmdemux->adapter, length);
+
+            ret = gst_rmdemux_parse_packet (rmdemux, buffer, version);
+            rmdemux->chunk_index++;
           }
 
           if (rmdemux->chunk_index == rmdemux->n_chunks || length == 0)
@@ -1509,14 +1534,13 @@ gst_rmdemux_parse_mdpr (GstRMDemux * rmdemux, const guint8 * data, int length)
   int stream_type;
   int offset;
 
-  /* re_hexdump_bytes ((guint8 *) data, length, 0); */
-
   stream = g_new0 (GstRMDemuxStream, 1);
 
   stream->id = RMDEMUX_GUINT16_GET (data);
   stream->index = NULL;
   stream->seek_offset = 0;
   stream->last_flow = GST_FLOW_OK;
+  stream->adapter = gst_adapter_new ();
   GST_LOG_OBJECT (rmdemux, "stream_number=%d", stream->id);
 
   offset = 30;
@@ -1579,8 +1603,8 @@ gst_rmdemux_parse_mdpr (GstRMDemux * rmdemux, const guint8 * data, int length)
       stream->rate = RMDEMUX_GUINT16_GET (data + offset + 16);
       stream->subformat = RMDEMUX_GUINT32_GET (data + offset + 26);
       stream->format = RMDEMUX_GUINT32_GET (data + offset + 30);
-      stream->extra_data_size = length - (offset + 34);
-      stream->extra_data = (guint8 *) data + offset + 34;
+      stream->extra_data_size = length - (offset + 26);
+      stream->extra_data = (guint8 *) data + offset + 26;
       /* Natural way to represent framerates here requires unsigned 32 bit
        * numerator, which we don't have. For the nasty case, approximate...
        */
@@ -1949,54 +1973,410 @@ gst_rmdemux_handle_scrambled_packet (GstRMDemux * rmdemux,
   return ret;
 }
 
+#if 0
+static GstClockTime
+gst_rmdemux_fix_timestamp (GstRMDemux * rmdemux, GstClockTime timestamp)
+{
+  GstFlowReturn ret;
+  const guint8 *b;
+  guint8 frame_type;
+  guint16 seq;
+  GstClockTime ts = timestamp;
+
+  GST_LOG_OBJECT (rmdemux, "timestamp %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+
+  /* Fix timestamp. */
+  b = gst_adapter_peek (dec->adapter, 4);
+  switch (dec->version) {
+    case GST_REAL_VIDEO_DEC_VERSION_2:
+    {
+
+      /*
+       * Bit  1- 2: frame type
+       * Bit  3- 9: ?
+       * Bit 10-22: sequence number
+       * Bit 23-32: ?
+       */
+      frame_type = (b[0] >> 6) & 0x03;
+      seq = ((b[1] & 0x7f) << 6) + ((b[2] & 0xfc) >> 2);
+      break;
+    }
+    case GST_REAL_VIDEO_DEC_VERSION_3:
+    {
+      /*
+       * Bit  1- 2: ?
+       * Bit     3: skip packet if 1
+       * Bit  4- 5: frame type
+       * Bit  6-12: ?
+       * Bit 13-25: sequence number
+       * Bit 26-32: ?
+       */
+      frame_type = (b[0] >> 3) & 0x03;
+      seq = ((b[1] & 0x0f) << 9) + (b[2] << 1) + ((b[3] & 0x80) >> 7);
+      break;
+    }
+    case GST_REAL_VIDEO_DEC_VERSION_4:
+    {
+      /*
+       * Bit     1: skip packet if 1
+       * Bit  2- 3: frame type
+       * Bit  4-13: ?
+       * Bit 14-26: sequence number
+       * Bit 27-32: ?
+       */
+      frame_type = (b[0] >> 5) & 0x03;
+      seq = ((b[1] & 0x07) << 10) + (b[2] << 2) + ((b[3] & 0xc0) >> 6);
+      break;
+    }
+    default:
+      goto unknown_version;
+  }
+
+  GST_LOG_OBJECT (dec, "frame_type:%d", frame_type);
+
+  switch (frame_type) {
+    case 0:
+    case 1:
+    {
+      /* I frame */
+      timestamp = dec->next_ts;
+      dec->last_ts = dec->next_ts;
+      dec->next_ts = ts;
+      dec->last_seq = dec->next_seq;
+      dec->next_seq = seq;
+
+      break;
+    }
+    case 2:
+    {
+      /* P frame */
+      timestamp = dec->last_ts = dec->next_ts;
+      if (seq < dec->next_seq)
+        dec->next_ts += (seq + 0x2000 - dec->next_seq) * GST_MSECOND;
+      else
+        dec->next_ts += (seq - dec->next_seq) * GST_MSECOND;
+      dec->last_seq = dec->next_seq;
+      dec->next_seq = seq;
+      break;
+    }
+    case 3:
+    {
+      /* B frame */
+      if (seq < dec->last_seq) {
+        timestamp = (seq + 0x2000 - dec->last_seq) * GST_MSECOND + dec->last_ts;
+      } else {
+        timestamp = (seq - dec->last_seq) * GST_MSECOND + dec->last_ts;
+      }
+
+      break;
+    }
+    default:
+      goto unknown_frame_type;
+  }
+  /* Errors */
+unknown_version:
+  {
+    GST_ELEMENT_ERROR (dec, STREAM, DECODE,
+        ("Unknown version: %i.", dec->version), (NULL));
+    return GST_FLOW_ERROR;
+  }
+
+unknown_frame_type:
+  {
+    GST_ELEMENT_ERROR (dec, STREAM, DECODE, ("Unknown frame type."), (NULL));
+    return GST_FLOW_ERROR;
+  }
+}
+#endif
+
+#define PARSE_NUMBER(data, size, number, label) \
+G_STMT_START {                                  \
+  if (size < 2)                                 \
+    goto label;                                 \
+  number = GST_READ_UINT16_BE (data);           \
+  if (!(number & 0xc000)) {                     \
+    if (size < 4)                               \
+      goto label;                               \
+    number = GST_READ_UINT32_BE (data);         \
+    data += 4;                                  \
+    size -= 4;                                  \
+  } else {                                      \
+    number &= 0x3fff;                           \
+    data += 2;                                  \
+    size -= 2;                                  \
+  }                                             \
+} G_STMT_END
+
+static GstFlowReturn
+gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
+    GstBuffer * in, guint offset, guint16 version,
+    GstClockTime timestamp, gboolean key)
+{
+  GstFlowReturn ret;
+  const guint8 *data, *base;
+  guint size;
+
+  base = GST_BUFFER_DATA (in);
+  data = base + offset;
+  size = GST_BUFFER_SIZE (in) - offset;
+
+  while (size > 2) {
+    guint8 pkg_header;
+    guint pkg_offset;
+    guint pkg_length;
+    guint pkg_subseq = 0, pkg_seqnum = -1;
+    guint fragment_size;
+    GstBuffer *fragment;
+
+    pkg_header = *data++;
+    size--;
+
+    /* packet header
+     * bit 7: 1=last block in block chain
+     * bit 6: 1=short header (only one block?)
+     */
+    if ((pkg_header & 0xc0) == 0x40) {
+      /* skip unknown byte */
+      data++;
+      size--;
+      pkg_offset = 0;
+      pkg_length = size;
+    } else {
+      if ((pkg_header & 0x40) == 0) {
+        pkg_subseq = (*data++) & 0x7f;
+        size--;
+      } else {
+        pkg_subseq = 0;
+      }
+
+      /* length */
+      PARSE_NUMBER (data, size, pkg_length, not_enough_data);
+
+      /* offset */
+      PARSE_NUMBER (data, size, pkg_offset, not_enough_data);
+
+      /* seqnum */
+      if (size < 1)
+        goto not_enough_data;
+
+      pkg_seqnum = *data++;
+      size--;
+    }
+
+    GST_DEBUG_OBJECT (rmdemux,
+        "seq %d, subseq %d, offset %d, length %d, size %d, header %02x",
+        pkg_seqnum, pkg_subseq, pkg_offset, pkg_length, size, pkg_header);
+
+    /* calc size of fragment */
+    if ((pkg_header & 0xc0) == 0x80) {
+      fragment_size = pkg_offset;
+    } else {
+      if ((pkg_header & 0xc0) == 0)
+        fragment_size = size;
+      else
+        fragment_size = pkg_length;
+    }
+    GST_DEBUG_OBJECT (rmdemux, "fragment size %d", fragment_size);
+
+    /* get the fragment */
+    fragment = gst_buffer_create_sub (in, data - base, fragment_size);
+
+    if (pkg_subseq == 1) {
+      GST_DEBUG_OBJECT (rmdemux, "start new fragment");
+      gst_adapter_clear (stream->adapter);
+      stream->frag_current = 0;
+      stream->frag_count = 0;
+      stream->frag_length = pkg_length;
+    } else if (pkg_subseq == 0) {
+      GST_DEBUG_OBJECT (rmdemux, "non fragmented packet");
+      stream->frag_current = 0;
+      stream->frag_count = 0;
+      stream->frag_length = fragment_size;
+    }
+
+    /* put fragment in adapter */
+    gst_adapter_push (stream->adapter, fragment);
+    stream->frag_offset[stream->frag_count] = stream->frag_current;
+    stream->frag_current += fragment_size;
+    stream->frag_count++;
+
+    if (stream->frag_count > MAX_FRAGS)
+      goto too_many_fragments;
+
+    GST_DEBUG_OBJECT (rmdemux, "stored fragment in adapter %d/%d",
+        stream->frag_current, stream->frag_length);
+
+    /* flush fragment when complete */
+    if (stream->frag_current >= stream->frag_length) {
+      GstBuffer *out;
+      guint8 *outdata;
+      guint header_size;
+      gint i;
+
+      /* calculate header size, which is:
+       * 1 byte for the number of fragments - 1
+       * for each fragment:
+       *   4 bytes 0x00000001 little endian
+       *   4 bytes fragment offset
+       *
+       * This is also the matroska header for realvideo, the decoder needs the
+       * fragment offsets, both in ffmpeg and real .so, so we just give it that
+       * in front of the data.
+       */
+      header_size = 1 + (8 * (stream->frag_count));
+
+      GST_DEBUG_OBJECT (rmdemux,
+          "fragmented completed. count %d, header_size %u", stream->frag_count,
+          header_size);
+
+      out = gst_buffer_new_and_alloc (header_size + stream->frag_length);
+      outdata = GST_BUFFER_DATA (out);
+
+      /* create header */
+      *outdata++ = stream->frag_count - 1;
+      for (i = 0; i < stream->frag_count; i++) {
+        GST_WRITE_UINT32_LE (outdata, 0x00000001);
+        outdata += 4;
+        GST_WRITE_UINT32_LE (outdata, stream->frag_offset[i]);
+        outdata += 4;
+      }
+
+      /* copy packet data after the header now */
+      gst_adapter_copy (stream->adapter, outdata, 0, stream->frag_length);
+      gst_adapter_flush (stream->adapter, stream->frag_length);
+
+      gst_buffer_set_caps (out, GST_PAD_CAPS (stream->pad));
+      GST_BUFFER_TIMESTAMP (out) = timestamp;
+
+      ret = gst_pad_push (stream->pad, out);
+    }
+    data += fragment_size;
+    size -= fragment_size;
+  }
+  GST_DEBUG_OBJECT (rmdemux, "%d bytes left", size);
+
+  gst_buffer_unref (in);
+
+  ret = GST_FLOW_OK;
+
+  return ret;
+
+  /* ERRORS */
+not_enough_data:
+  {
+    GST_ELEMENT_WARNING (rmdemux, STREAM, DECODE, ("Skipping bad packet."),
+        (NULL));
+    gst_buffer_unref (in);
+    return GST_FLOW_OK;
+  }
+too_many_fragments:
+  {
+    GST_ELEMENT_ERROR (rmdemux, STREAM, DECODE,
+        ("Got more fragments (%u) than can be handled (%u)",
+            stream->frag_count, MAX_FRAGS), (NULL));
+    gst_buffer_unref (in);
+    return GST_FLOW_ERROR;
+  }
+}
+
+static GstFlowReturn
+gst_rmdemux_parse_audio_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
+    GstBuffer * in, guint offset, guint16 version,
+    GstClockTime timestamp, gboolean key)
+{
+  GstFlowReturn ret, cret;
+  GstBuffer *buffer;
+  const guint8 *data;
+  guint size;
+
+  data = GST_BUFFER_DATA (in) + offset;
+  size = GST_BUFFER_SIZE (in) - offset;
+
+  ret = gst_pad_alloc_buffer_and_set_caps (stream->pad,
+      GST_BUFFER_OFFSET_NONE, size, GST_PAD_CAPS (stream->pad), &buffer);
+
+  cret = gst_rmdemux_combine_flows (rmdemux, stream, ret);
+  if (ret != GST_FLOW_OK)
+    goto alloc_failed;
+
+  memcpy (GST_BUFFER_DATA (buffer), (guint8 *) data, size);
+  GST_BUFFER_TIMESTAMP (buffer) = timestamp - rmdemux->first_ts;
+
+  if (stream->needs_descrambling) {
+    ret = gst_rmdemux_handle_scrambled_packet (rmdemux, stream, buffer, key);
+  } else {
+    GST_LOG_OBJECT (rmdemux, "Pushing buffer of size %d to pad %s",
+        GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
+
+    ret = gst_pad_push (stream->pad, buffer);
+  }
+  return ret;
+
+  /* ERRORS */
+alloc_failed:
+  {
+    GST_DEBUG_OBJECT (rmdemux, "pad alloc returned %d", ret);
+    return cret;
+  }
+}
+
 static GstFlowReturn
-gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const guint8 * data,
-    guint16 version, guint16 length)
+gst_rmdemux_parse_packet (GstRMDemux * rmdemux, GstBuffer * in, guint16 version)
 {
   guint16 id;
   GstRMDemuxStream *stream;
-  GstBuffer *buffer;
-  guint16 packet_size;
+  guint size;
   GstFlowReturn cret, ret;
   GstClockTime timestamp;
-  gboolean key = FALSE;
+  gboolean key;
+  guint8 *data, *base;
+  guint8 flags;
 
+  base = data = GST_BUFFER_DATA (in);
+  size = GST_BUFFER_SIZE (in);
+
+  /* stream number */
   id = RMDEMUX_GUINT16_GET (data);
+
+  stream = gst_rmdemux_get_stream_by_id (rmdemux, id);
+  if (!stream || !stream->pad)
+    goto unknown_stream;
+
   /* timestamp in Msec */
   timestamp = RMDEMUX_GUINT32_GET (data + 2) * GST_MSECOND;
 
   gst_segment_set_last_stop (&rmdemux->segment, GST_FORMAT_TIME, timestamp);
 
   GST_LOG_OBJECT (rmdemux, "Parsing a packet for stream=%d, timestamp=%"
-      GST_TIME_FORMAT ", version=%d", id, GST_TIME_ARGS (timestamp), version);
+      GST_TIME_FORMAT ", size %u, version=%d", id, GST_TIME_ARGS (timestamp),
+      size, version);
+
+  if (rmdemux->first_ts == GST_CLOCK_TIME_NONE) {
+    GST_DEBUG_OBJECT (rmdemux, "First timestamp: %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (timestamp));
+    rmdemux->first_ts = timestamp;
+  }
 
   /* skip stream_id and timestamp */
-  data += 2 + 4;
-  packet_size = length - (2 + 4);
+  data += (2 + 4);
+  size -= (2 + 4);
 
   /* skip other stuff */
   if (version == 0) {
     /* uint8 packet_group */
     /* uint8 flags        */
-    key = ((GST_READ_UINT8 (data + 1) & 0x02) != 0);
+    flags = GST_READ_UINT8 (data + 1);
     data += 2;
-    packet_size -= 2;
+    size -= 2;
   } else {
     /* uint16 asm_rule */
     /* uint8 asm_flags */
+    flags = GST_READ_UINT8 (data + 2);
     data += 3;
-    packet_size -= 3;
-  }
-
-  stream = gst_rmdemux_get_stream_by_id (rmdemux, id);
-  if (!stream || !stream->pad)
-    goto unknown_stream;
-
-  if (rmdemux->first_ts == GST_CLOCK_TIME_NONE) {
-    GST_DEBUG_OBJECT (rmdemux, "First timestamp: %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (timestamp));
-    rmdemux->first_ts = timestamp;
+    size -= 3;
   }
+  key = (flags & 0x02) != 0;
+  GST_DEBUG_OBJECT (rmdemux, "flags %d, Keyframe %d", flags, key);
 
   if (rmdemux->need_newsegment) {
     GstEvent *event;
@@ -2019,32 +2399,25 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const guint8 * data,
     stream->pending_tags = NULL;
   }
 
-  if ((rmdemux->offset + packet_size) <= stream->seek_offset) {
+  if ((rmdemux->offset + size) <= stream->seek_offset) {
     GST_DEBUG_OBJECT (rmdemux,
-        "Stream %d is skipping: seek_offset=%d, offset=%d, packet_size=%u",
-        stream->id, stream->seek_offset, rmdemux->offset, packet_size);
+        "Stream %d is skipping: seek_offset=%d, offset=%d, size=%u",
+        stream->id, stream->seek_offset, rmdemux->offset, size);
     cret = GST_FLOW_OK;
     goto beach;
   }
 
-  ret = gst_pad_alloc_buffer_and_set_caps (stream->pad,
-      GST_BUFFER_OFFSET_NONE, packet_size, GST_PAD_CAPS (stream->pad), &buffer);
-
-  cret = gst_rmdemux_combine_flows (rmdemux, stream, ret);
-  if (ret != GST_FLOW_OK)
-    goto alloc_failed;
-
-  memcpy (GST_BUFFER_DATA (buffer), (guint8 *) data, packet_size);
-  GST_BUFFER_TIMESTAMP (buffer) = timestamp - rmdemux->first_ts;
-
-  if (stream->needs_descrambling) {
-    ret = gst_rmdemux_handle_scrambled_packet (rmdemux, stream, buffer, key);
-  } else {
-    GST_LOG_OBJECT (rmdemux, "Pushing buffer of size %d to pad %s",
-        GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
-
-    ret = gst_pad_push (stream->pad, buffer);
-  }
+  /* do special headers */
+  if (stream->subtype == GST_RMDEMUX_STREAM_VIDEO) {
+    ret =
+        gst_rmdemux_parse_video_packet (rmdemux, stream, in, data - base,
+        version, timestamp, key);
+  } else if (stream->subtype == GST_RMDEMUX_STREAM_AUDIO) {
+    ret =
+        gst_rmdemux_parse_audio_packet (rmdemux, stream, in, data - base,
+        version, timestamp, key);
+  } else
+    ret = GST_FLOW_OK;
 
   cret = gst_rmdemux_combine_flows (rmdemux, stream, ret);
 
@@ -2058,11 +2431,6 @@ unknown_stream:
         "data packet", id);
     return GST_FLOW_OK;
   }
-alloc_failed:
-  {
-    GST_DEBUG_OBJECT (rmdemux, "pad alloc returned %d", ret);
-    return cret;
-  }
 }
 
 static gboolean