gst/asfdemux/: New packet parsing code: should put halfway decent timestamps on buffe...
authorTim-Philipp Müller <tim@centricular.net>
Fri, 20 Apr 2007 20:57:56 +0000 (20:57 +0000)
committerTim-Philipp Müller <tim@centricular.net>
Fri, 20 Apr 2007 20:57:56 +0000 (20:57 +0000)
Original commit message from CVS:
* gst/asfdemux/Makefile.am:
* gst/asfdemux/asfpacket.c: (asf_packet_read_varlen_int),
(asf_packet_create_payload_buffer),
(asf_payload_find_previous_fragment),
(gst_asf_payload_queue_for_stream), (gst_asf_demux_parse_payload),
(gst_asf_demux_parse_packet):
* gst/asfdemux/asfpacket.h:
* gst/asfdemux/gstasfdemux.c:
(gst_asf_demux_reset_stream_state_after_discont),
(gst_asf_demux_push_complete_payloads), (gst_asf_demux_loop),
(gst_asf_demux_setup_pad), (gst_asf_demux_descramble_buffer),
(gst_asf_demux_process_chunk):
* gst/asfdemux/gstasfdemux.h:
New packet parsing code: should put halfway decent timestamps on
buffers, and might even set the appropriate keyframe/discont buffer
flags from time to time (and even if it doesn't, I'm at least able
to debug this code); only used in pull-mode so far. Still needs
some more work, like payload extensions parsing and proper flow
aggregation, and stream activation based on preroll. Stay tuned.

ChangeLog
gst/asfdemux/Makefile.am
gst/asfdemux/asfpacket.c [new file with mode: 0644]
gst/asfdemux/asfpacket.h [new file with mode: 0644]
gst/asfdemux/gstasfdemux.c
gst/asfdemux/gstasfdemux.h

index c2a7044..cece297 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,27 @@
 2007-04-20  Tim-Philipp Müller  <tim at centricular dot net>
 
+       * gst/asfdemux/Makefile.am:
+       * gst/asfdemux/asfpacket.c: (asf_packet_read_varlen_int),
+       (asf_packet_create_payload_buffer),
+       (asf_payload_find_previous_fragment),
+       (gst_asf_payload_queue_for_stream), (gst_asf_demux_parse_payload),
+       (gst_asf_demux_parse_packet):
+       * gst/asfdemux/asfpacket.h:
+       * gst/asfdemux/gstasfdemux.c:
+       (gst_asf_demux_reset_stream_state_after_discont),
+       (gst_asf_demux_push_complete_payloads), (gst_asf_demux_loop),
+       (gst_asf_demux_setup_pad), (gst_asf_demux_descramble_buffer),
+       (gst_asf_demux_process_chunk):
+       * gst/asfdemux/gstasfdemux.h:
+         New packet parsing code: should put halfway decent timestamps on
+         buffers, and might even set the appropriate keyframe/discont buffer
+         flags from time to time (and even if it doesn't, I'm at least able
+         to debug this code); only used in pull-mode so far. Still needs
+         some more work, like payload extensions parsing and proper flow
+         aggregation, and stream activation based on preroll. Stay tuned.
+         
+2007-04-20  Tim-Philipp Müller  <tim at centricular dot net>
+
        * gst/asfdemux/asfheaders.h:
        * gst/asfdemux/gstasfdemux.c: (gst_asf_demux_free_stream),
        (gst_asf_demux_seek_index_lookup),
index 3ebd876..b0ae6d9 100644 (file)
@@ -1,9 +1,9 @@
 plugin_LTLIBRARIES = libgstasf.la
 
-libgstasf_la_SOURCES = gstasfdemux.c gstasf.c asfheaders.c
+libgstasf_la_SOURCES = gstasfdemux.c gstasf.c asfheaders.c asfpacket.c
 libgstasf_la_CFLAGS = $(GST_BASE_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(GST_CFLAGS)
 libgstasf_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS)\
                -lgstriff-@GST_MAJORMINOR@
 libgstasf_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
 
-noinst_HEADERS = gstasfdemux.h asfheaders.h gstasfmux.h
+noinst_HEADERS = gstasfdemux.h asfheaders.h asfpacket.h gstasfmux.h
diff --git a/gst/asfdemux/asfpacket.c b/gst/asfdemux/asfpacket.c
new file mode 100644 (file)
index 0000000..31ad650
--- /dev/null
@@ -0,0 +1,430 @@
+/* GStreamer ASF/WMV/WMA demuxer
+ * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/* FIXME:
+ *  file:///home/tpm/samples/video/asf//336370-regis-velo862.wmv
+ *  file:///home/tpm/samples/video/asf//336370-eichhoer.wmv
+ * throw errors (not always necessarily) in this code path
+ * (looks like they carry broken payloads/packets though) */
+
+#include "asfpacket.h"
+
+#include <gst/gstutils.h>
+#include <gst/gstinfo.h>
+#include <string.h>
+
+/* we are unlikely to deal with lengths > 2GB here any time soon, so just
+ * return a signed int and use that for error reporting */
+static gint
+asf_packet_read_varlen_int (guint lentype_flags, guint lentype_bit_offset,
+    const guint8 ** p_data, guint * p_size)
+{
+  const guint lens[4] = { 0, 1, 2, 4 };
+  guint len, val;
+
+  len = lens[(lentype_flags >> lentype_bit_offset) & 0x03];
+
+  /* will make caller bail out with a short read if there's not enough data */
+  if (*p_size < len) {
+    GST_WARNING ("need %u bytes, but only %u bytes available", len, *p_size);
+    return -1;
+  }
+
+  switch (len) {
+    case 0:
+      val = 0;
+      break;
+    case 1:
+      val = GST_READ_UINT8 (*p_data);
+      break;
+    case 2:
+      val = GST_READ_UINT16_LE (*p_data);
+      break;
+    case 4:
+      val = GST_READ_UINT32_LE (*p_data);
+      break;
+    default:
+      g_assert_not_reached ();
+  }
+
+  *p_data += len;
+  *p_size -= len;
+
+  return (gint) val;
+}
+
+static GstBuffer *
+asf_packet_create_payload_buffer (AsfPacket * packet, const guint8 ** p_data,
+    guint * p_size, guint payload_len)
+{
+  guint off;
+
+  g_assert (payload_len <= *p_size);
+
+  off = (guint) (*p_data - GST_BUFFER_DATA (packet->buf));
+  g_assert (off < GST_BUFFER_SIZE (packet->buf));
+
+  *p_data += payload_len;
+  *p_size -= payload_len;
+
+  return gst_buffer_create_sub (packet->buf, off, payload_len);
+}
+
+static AsfPayload *
+asf_payload_find_previous_fragment (AsfPayload * payload, AsfStream * stream)
+{
+  AsfPayload *ret;
+
+  if (stream->payloads->len == 0) {
+    GST_DEBUG ("No previous fragments to merge with for stream %u", stream->id);
+    return NULL;
+  }
+
+  ret =
+      &g_array_index (stream->payloads, AsfPayload, stream->payloads->len - 1);
+
+  if (ret->mo_size != payload->mo_size ||
+      ret->mo_number != payload->mo_number || ret->mo_offset != 0) {
+    GST_WARNING ("Previous fragment does not match continued fragment");
+    return NULL;
+  }
+#if 0
+  if (this_fragment->mo_offset + this_payload_len > first_fragment->mo_size) {
+    GST_WARNING ("Merged fragments would be bigger than the media object");
+    return FALSE;
+  }
+#endif
+
+  return ret;
+}
+
+static void
+gst_asf_payload_queue_for_stream (GstASFDemux * demux, AsfPayload * payload,
+    AsfStream * stream)
+{
+  /* remove any incomplete payloads that will never be completed */
+  while (stream->payloads->len > 0) {
+    AsfPayload *prev;
+    guint idx_last;
+
+    idx_last = stream->payloads->len - 1;
+    prev = &g_array_index (stream->payloads, AsfPayload, idx_last);
+
+    if (gst_asf_payload_is_complete (prev))
+      break;
+
+    GST_DEBUG_OBJECT (demux, "Dropping incomplete fragmented media object "
+        "queued for stream %u", stream->id);
+
+    gst_buffer_replace (&prev->buf, NULL);
+    g_array_remove_index (stream->payloads, idx_last);
+
+    /* there's data missing, so there's a discontinuity now */
+    GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT);
+  }
+
+  g_array_append_vals (stream->payloads, payload, 1);
+}
+
+static gboolean
+gst_asf_demux_parse_payload (GstASFDemux * demux, AsfPacket * packet,
+    gint lentype, const guint8 ** p_data, guint * p_size)
+{
+  AsfPayload payload = { 0, };
+  AsfStream *stream;
+  gboolean is_compressed;
+  guint payload_len;
+  guint stream_num;
+
+  if (*p_size < 1) {
+    GST_WARNING_OBJECT (demux, "Short packet!");
+    return FALSE;
+  }
+
+  stream_num = GST_READ_UINT8 (*p_data) & 0x7f;
+  payload.keyframe = ((GST_READ_UINT8 (*p_data) & 0x80) != 0);
+
+  *p_data += 1;
+  *p_size -= 1;
+
+  payload.mo_number =
+      asf_packet_read_varlen_int (packet->prop_flags, 4, p_data, p_size);
+  payload.mo_offset =
+      asf_packet_read_varlen_int (packet->prop_flags, 2, p_data, p_size);
+  payload.rep_data_len =
+      asf_packet_read_varlen_int (packet->prop_flags, 0, p_data, p_size);
+
+  is_compressed = (payload.rep_data_len == 1);
+
+  GST_LOG_OBJECT (demux, "payload for stream %u", stream_num);
+  GST_LOG_OBJECT (demux, "keyframe   : %s", (payload.keyframe) ? "yes" : "no");
+  GST_LOG_OBJECT (demux, "compressed : %s", (is_compressed) ? "yes" : "no");
+
+  if (*p_size < payload.rep_data_len) {
+    GST_WARNING_OBJECT (demux, "Short packet! rep_data_len=%u, size=%u",
+        payload.rep_data_len, *p_size);
+    return FALSE;
+  }
+
+  memcpy (payload.rep_data, *p_data,
+      MIN (sizeof (payload.rep_data), payload.rep_data_len));
+
+  *p_data += payload.rep_data_len;
+  *p_size -= payload.rep_data_len;
+
+  if (*p_size == 0) {
+    GST_WARNING_OBJECT (demux, "payload without data!?");
+    return FALSE;
+  }
+
+  /* we use -1 as lentype for a single payload that's the size of the packet */
+  if (lentype >= 0 && lentype <= 3) {
+    payload_len = asf_packet_read_varlen_int (lentype, 0, p_data, p_size);
+    if (*p_size < payload_len) {
+      GST_WARNING_OBJECT (demux, "Short packet! payload_len=%u, size=%u",
+          payload_len, *p_size);
+      return FALSE;
+    }
+  } else {
+    payload_len = *p_size;
+  }
+
+  GST_LOG_OBJECT (demux, "payload length: %u", payload_len);
+
+  stream = gst_asf_demux_get_stream (demux, stream_num);
+
+  if (stream == NULL) {
+    GST_WARNING_OBJECT (demux, "Payload for unknown stream %u, skipping",
+        stream_num);
+    *p_data += payload_len;
+    *p_size -= payload_len;
+    return TRUE;
+  }
+
+  if (!is_compressed) {
+    GstClockTime ts = GST_CLOCK_TIME_NONE;
+
+    GST_LOG_OBJECT (demux, "replicated data length: %u", payload.rep_data_len);
+
+    if (payload.rep_data_len >= 8) {
+      payload.mo_size = GST_READ_UINT32_LE (payload.rep_data);
+      ts = GST_READ_UINT32_LE (payload.rep_data + 4) * GST_MSECOND;
+      ts -= demux->preroll * GST_MSECOND;
+
+      GST_LOG_OBJECT (demux, "media object size   : %u", payload.mo_size);
+      GST_LOG_OBJECT (demux, "media object ts     : %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (ts));
+      /* TODO: parse payload extensions, if there are any */
+    } else if (payload.rep_data_len != 0) {
+      GST_WARNING_OBJECT (demux, "invalid replicated data length, very bad");
+    }
+
+    GST_LOG_OBJECT (demux, "media object offset : %u", payload.mo_offset);
+
+    GST_LOG_OBJECT (demux, "payload length: %u", payload_len);
+
+    if ((stream = gst_asf_demux_get_stream (demux, stream_num))) {
+      payload.buf = asf_packet_create_payload_buffer (packet, p_data, p_size,
+          payload_len);
+
+      /* n-th fragment of a fragmented media object? */
+      if (payload.mo_offset != 0) {
+        AsfPayload *prev;
+
+        if ((prev = asf_payload_find_previous_fragment (&payload, stream))) {
+          if (payload.mo_offset != GST_BUFFER_SIZE (prev->buf)) {
+            GST_WARNING_OBJECT (demux, "Offset doesn't match previous data?!");
+          }
+          /* note: buffer join/merge might not preserve buffer flags */
+          prev->buf = gst_buffer_join (prev->buf, payload.buf);
+          GST_LOG_OBJECT (demux, "Merged fragments, merged size: %u",
+              GST_BUFFER_SIZE (prev->buf));
+        } else {
+          gst_buffer_unref (payload.buf);
+        }
+        payload.buf = NULL;
+      } else {
+        GST_BUFFER_TIMESTAMP (payload.buf) = ts;
+        gst_asf_payload_queue_for_stream (demux, &payload, stream);
+      }
+    }
+  } else {
+    const guint8 *payload_data;
+    GstClockTime ts, ts_delta;
+    guint num;
+
+    GST_LOG_OBJECT (demux, "Compressed payload, length=%u", payload_len);
+
+    payload_data = *p_data;
+
+    *p_data += payload_len;
+    *p_size -= payload_len;
+
+    ts = (payload.mo_offset - demux->preroll) * GST_MSECOND;
+    ts_delta = payload.rep_data[0] * GST_MSECOND;
+
+    for (num = 0; payload_len > 0; ++num) {
+      guint sub_payload_len;
+
+      sub_payload_len = GST_READ_UINT8 (payload_data);
+
+      GST_LOG_OBJECT (demux, "subpayload #%u: len=%u, ts=%" GST_TIME_FORMAT,
+          num, sub_payload_len, GST_TIME_ARGS (ts));
+
+      ++payload_data;
+      --payload_len;
+
+      if (payload_len < sub_payload_len) {
+        GST_WARNING_OBJECT (demux, "Short payload! %u bytes left", payload_len);
+        return FALSE;
+      }
+
+      payload.buf = asf_packet_create_payload_buffer (packet,
+          &payload_data, &payload_len, sub_payload_len);
+
+      GST_BUFFER_TIMESTAMP (payload.buf) = ts;
+      GST_BUFFER_DURATION (payload.buf) = ts_delta;
+      gst_asf_payload_queue_for_stream (demux, &payload, stream);
+
+      ts += ts_delta;
+    }
+  }
+
+  return TRUE;
+}
+
+gboolean
+gst_asf_demux_parse_packet (GstASFDemux * demux, GstBuffer * buf)
+{
+  AsfPacket packet = { 0, };
+  const guint8 *data;
+  gboolean has_multiple_payloads;
+  gboolean ret = TRUE;
+  guint8 ec_flags, flags1;
+  guint size;
+
+  data = GST_BUFFER_DATA (buf);
+  size = GST_BUFFER_SIZE (buf);
+
+  /* need at least two payload flag bytes, send time, and duration */
+  if (size < 2 + 4 + 2)
+    goto short_packet;
+
+  packet.buf = buf;
+
+  ec_flags = GST_READ_UINT8 (data);
+
+  /* skip optional error correction stuff */
+  if ((ec_flags & 0x80) != 0) {
+    guint ec_len_type, ec_len;
+
+    ec_len_type = (ec_flags & 0x60) >> 5;
+    if (ec_len_type == 0) {
+      ec_len = ec_flags & 0x0f;
+    } else {
+      GST_WARNING_OBJECT (demux, "unexpected error correction length type %u",
+          ec_len_type);
+      ec_len = 2;
+    }
+    GST_LOG ("packet has error correction (%u bytes)", ec_len);
+
+    /* still need at least two payload flag bytes, send time, and duration */
+    if (size <= (1 + ec_len) + 2 + 4 + 2)
+      goto short_packet;
+
+    data += 1 + ec_len;
+    size -= 1 + ec_len;
+  }
+
+  /* parse payload info */
+  flags1 = GST_READ_UINT8 (data);
+  packet.prop_flags = GST_READ_UINT8 (data + 1);
+
+  data += 2;
+  size -= 2;
+
+  has_multiple_payloads = (flags1 & 0x01) != 0;
+
+  packet.length = asf_packet_read_varlen_int (flags1, 5, &data, &size);
+
+  packet.sequence = asf_packet_read_varlen_int (flags1, 1, &data, &size);
+
+  packet.padding = asf_packet_read_varlen_int (flags1, 3, &data, &size);
+
+  if (size < 6)
+    goto short_packet;
+
+  packet.send_time = GST_READ_UINT32_LE (data) * GST_MSECOND;
+  packet.duration = GST_READ_UINT16_LE (data + 4) * GST_MSECOND;
+
+  data += 4 + 2;
+  size -= 4 + 2;
+
+  GST_LOG_OBJECT (demux, "multiple payloads: %u", has_multiple_payloads);
+  GST_LOG_OBJECT (demux, "packet length    : %u", packet.length);
+  GST_LOG_OBJECT (demux, "sequence         : %u", packet.sequence);
+  GST_LOG_OBJECT (demux, "padding          : %u", packet.padding);
+  GST_LOG_OBJECT (demux, "send time        : %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (packet.send_time));
+  GST_LOG_OBJECT (demux, "duration         : %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (packet.duration));
+
+  if (packet.padding == (guint) - 1 || size < packet.padding)
+    goto short_packet;
+
+  size -= packet.padding;
+
+  if (has_multiple_payloads) {
+    guint i, num, lentype;
+
+    if (size < 1)
+      goto short_packet;
+
+    num = (GST_READ_UINT8 (data) & 0x3F) >> 0;
+    lentype = (GST_READ_UINT8 (data) & 0xC0) >> 6;
+
+    ++data;
+    --size;
+
+    GST_LOG_OBJECT (demux, "num payloads     : %u", num);
+
+    for (i = 0; i < num; ++i) {
+      GST_LOG_OBJECT (demux, "Parsing payload %u/%u", i + 1, num);
+
+      ret = gst_asf_demux_parse_payload (demux, &packet, lentype, &data, &size);
+
+      if (!ret) {
+        GST_WARNING_OBJECT (demux, "Failed to parse payload %u/%u", i + 1, num);
+        break;
+      }
+    }
+  } else {
+    GST_LOG_OBJECT (demux, "Parsing single payload");
+    ret = gst_asf_demux_parse_payload (demux, &packet, -1, &data, &size);
+  }
+
+  return ret;
+
+/* ERRORS */
+short_packet:
+  {
+    GST_WARNING_OBJECT (demux, "Short packet!");
+    return FALSE;
+  }
+}
diff --git a/gst/asfdemux/asfpacket.h b/gst/asfdemux/asfpacket.h
new file mode 100644 (file)
index 0000000..a42c60d
--- /dev/null
@@ -0,0 +1,60 @@
+/* GStreamer ASF/WMV/WMA demuxer
+ * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __ASF_PACKET_H__
+#define __ASF_PACKET_H__
+
+#include <gst/gstbuffer.h>
+#include <gst/gstclock.h>
+
+#include "gstasfdemux.h"
+
+G_BEGIN_DECLS
+
+typedef struct {
+  gboolean      keyframe;          /* buffer flags might not survive merge.. */
+  guint         mo_number;         /* media object number (unused)           */
+  guint         mo_offset;         /* offset (timestamp for compressed data) */
+  guint         mo_size;           /* size of media-object-to-be, or 0       */
+  guint         rep_data_len;      /* should never be more than 256, since   */
+  guint8        rep_data[256];     /* the length should be stored in a byte  */
+  /* GstClockTime  duration; */    /* TODO:get from payload extension system */
+  GstBuffer    *buf;
+} AsfPayload;
+
+typedef struct {
+  GstBuffer    *buf;
+  guint         length;            /* packet length (unused)               */
+  guint         padding;           /* length of padding at end of packet   */
+  guint         sequence;          /* sequence (unused)                    */
+  GstClockTime  send_time;
+  GstClockTime  duration;
+
+  guint8        prop_flags;        /* payload length types                 */
+} AsfPacket;
+
+gboolean   gst_asf_demux_parse_packet (GstASFDemux * demux, GstBuffer * buf);
+
+#define gst_asf_payload_is_complete(payload) \
+    (GST_BUFFER_SIZE ((payload)->buf) >= (payload)->mo_size)
+
+G_END_DECLS
+
+#endif /* __ASF_PACKET_H__ */
+
index 8817616..d37e6e6 100644 (file)
@@ -45,6 +45,7 @@
 
 #include "gstasfdemux.h"
 #include "asfheaders.h"
+#include "asfpacket.h"
 
 static GstStaticPadTemplate gst_asf_demux_sink_template =
 GST_STATIC_PAD_TEMPLATE ("sink",
@@ -103,6 +104,8 @@ static GstFlowReturn gst_asf_demux_handle_data (GstASFDemux * demux,
 static void gst_asf_demux_reset_stream_state_after_discont (GstASFDemux * asf);
 static gboolean
 gst_asf_demux_parse_data_object_start (GstASFDemux * demux, guint8 * data);
+static void gst_asf_demux_descramble_buffer (GstASFDemux * demux,
+    AsfStream * stream, GstBuffer ** p_buffer);
 
 GST_BOILERPLATE (GstASFDemux, gst_asf_demux, GstElement, GST_TYPE_ELEMENT);
 
@@ -358,6 +361,17 @@ gst_asf_demux_reset_stream_state_after_discont (GstASFDemux * demux)
     demux->stream[n].last_buffer_timestamp = GST_CLOCK_TIME_NONE;
     demux->stream[n].sequence = 0;
     demux->stream[n].discont = TRUE;
+    demux->stream[n].last_flow = GST_FLOW_OK;
+
+    while (demux->stream[n].payloads->len > 0) {
+      AsfPayload *payload;
+      guint last;
+
+      last = demux->stream[n].payloads->len - 1;
+      payload = &g_array_index (demux->stream[n].payloads, AsfPayload, last);
+      gst_buffer_replace (&payload->buf, NULL);
+      g_array_remove_index (demux->stream[n].payloads, last);
+    }
   }
 }
 
@@ -911,6 +925,61 @@ parse_failed:
 }
 
 static void
+gst_asf_demux_push_complete_payloads (GstASFDemux * demux)
+{
+  guint i;
+
+  for (i = 0; i < demux->num_streams; ++i) {
+    AsfStream *stream;
+
+    stream = &demux->stream[i];
+    while (stream->payloads->len > 0) {
+      AsfPayload *payload;
+
+      payload = &g_array_index (stream->payloads, AsfPayload, 0);
+      if (!gst_asf_payload_is_complete (payload))
+        break;
+
+      /* We have the whole packet now so we should push the packet to
+       * the src pad now. First though we should check if we need to do
+       * descrambling */
+      if (demux->span > 1) {
+        gst_asf_demux_descramble_buffer (demux, stream, &payload->buf);
+      }
+
+      payload->buf = gst_buffer_make_metadata_writable (payload->buf);
+
+      if (!payload->keyframe) {
+        GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DELTA_UNIT);
+      }
+
+      if (stream->discont) {
+        GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT);
+        stream->discont = FALSE;
+      }
+
+      gst_buffer_set_caps (payload->buf, stream->caps);
+
+      /* FIXME: we should really set durations on buffers if we can */
+      /* (this is a hack, obviously) 
+         if (strncmp (GST_PAD_NAME (stream->pad), "video", 5) == 0 &&
+         !GST_BUFFER_DURATION_IS_VALID (payload->buf)) {
+         GST_BUFFER_DURATION (payload->buf) = GST_SECOND / 30;
+         }
+       */
+
+      GST_LOG_OBJECT (stream->pad, "pushing buffer, ts=%" GST_TIME_FORMAT
+          ", size=%u", GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (payload->buf)),
+          GST_BUFFER_SIZE (payload->buf));
+
+      stream->last_flow = gst_pad_push (stream->pad, payload->buf);
+      payload->buf = NULL;
+      g_array_remove_index (stream->payloads, 0);
+    }
+  }
+}
+
+static void
 gst_asf_demux_loop (GstASFDemux * demux)
 {
   GstFlowReturn flow;
@@ -944,12 +1013,44 @@ gst_asf_demux_loop (GstASFDemux * demux)
       goto read_failed;
   }
 
-  /*
-     flow = gst_asf_demux_parse_packet (demux, buf);
-     gst_buffer_unref (buf);
-   */
+  if (demux->need_newsegment) {
+    if (demux->segment.stop == GST_CLOCK_TIME_NONE)
+      demux->segment.stop = demux->segment.duration;
+
+    GST_DEBUG_OBJECT (demux, "sending new-segment event %" GST_SEGMENT_FORMAT,
+        &demux->segment);
+
+    /* FIXME: check last parameter, streams may have non-zero start */
+    gst_asf_demux_send_event_unlocked (demux,
+        gst_event_new_new_segment (FALSE, demux->segment.rate,
+            GST_FORMAT_TIME, demux->segment.start, demux->segment.stop,
+            demux->segment.start));
+
+    demux->need_newsegment = FALSE;
+    demux->segment_running = TRUE;
+  }
+
+  /* FIXME: maybe we should just skip broken packets and error out only
+   * after a few broken packets in a row? */
+  if (!gst_asf_demux_parse_packet (demux, buf)) {
+    GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL),
+        ("Error parsing ASF packet %u", (guint) demux->packet));
+    flow = GST_FLOW_ERROR;
+  }
+
+  gst_buffer_unref (buf);
+
+  gst_asf_demux_push_complete_payloads (demux);
+
+  ++demux->packet;
+
+  if (demux->num_packets > 0 && demux->packet >= demux->num_packets) {
+    GST_LOG_OBJECT (demux, "reached EOS");
+    goto eos;
+  }
+
+  /* FIXME: aggregate flow returns from the various streams */
 
-  flow = gst_asf_demux_chain (demux->sinkpad, buf);
   if (flow != GST_FLOW_OK)
     goto pause;
 
@@ -1312,6 +1413,9 @@ gst_asf_demux_setup_pad (GstASFDemux * demux, GstPad * src_pad,
   stream->pending_tags = tags;
   stream->discont = TRUE;
 
+  stream->payloads = g_array_new (FALSE, FALSE, sizeof (AsfPayload));
+
+
   gst_pad_set_element_private (src_pad, stream);
 
   GST_INFO ("Adding pad %s for stream %u with caps %" GST_PTR_FORMAT,
@@ -2548,11 +2652,11 @@ error_encrypted:
 }
 
 static void
-gst_asf_demux_descramble_segment (GstASFDemux * demux,
-    asf_segment_info * segment_info, AsfStream * stream)
+gst_asf_demux_descramble_buffer (GstASFDemux * demux, AsfStream * stream,
+    GstBuffer ** p_buffer)
 {
-  GstBuffer *scrambled_buffer;
   GstBuffer *descrambled_buffer;
+  GstBuffer *scrambled_buffer;
   GstBuffer *sub_buffer;
   guint offset;
   guint off;
@@ -2562,12 +2666,12 @@ gst_asf_demux_descramble_segment (GstASFDemux * demux,
 
   /* descrambled_buffer is initialised in the first iteration */
   descrambled_buffer = NULL;
-  scrambled_buffer = stream->payload;
+  scrambled_buffer = *p_buffer;
 
-  if (segment_info->segment_size < demux->ds_packet_size * demux->span)
+  if (GST_BUFFER_SIZE (scrambled_buffer) < demux->ds_packet_size * demux->span)
     return;
 
-  for (offset = 0; offset < segment_info->segment_size;
+  for (offset = 0; offset < GST_BUFFER_SIZE (scrambled_buffer);
       offset += demux->ds_chunk_size) {
     off = offset / demux->ds_chunk_size;
     row = off / demux->span;
@@ -2575,8 +2679,8 @@ gst_asf_demux_descramble_segment (GstASFDemux * demux,
     idx = row + col * demux->ds_packet_size / demux->ds_chunk_size;
     GST_DEBUG ("idx=%u, row=%u, col=%u, off=%u, ds_chunk_size=%u", idx, row,
         col, off, demux->ds_chunk_size);
-    GST_DEBUG ("segment_info->segment_size=%u, span=%u, packet_size=%u",
-        segment_info->segment_size, demux->span, demux->ds_packet_size);
+    GST_DEBUG ("scrambled buffer size=%u, span=%u, packet_size=%u",
+        GST_BUFFER_SIZE (scrambled_buffer), demux->span, demux->ds_packet_size);
     GST_DEBUG ("GST_BUFFER_SIZE (scrambled_buffer) = %u",
         GST_BUFFER_SIZE (scrambled_buffer));
     sub_buffer =
@@ -2585,17 +2689,15 @@ gst_asf_demux_descramble_segment (GstASFDemux * demux,
     if (!offset) {
       descrambled_buffer = sub_buffer;
     } else {
-      GstBuffer *newbuf;
-
-      newbuf = gst_buffer_merge (descrambled_buffer, sub_buffer);
-      gst_buffer_unref (sub_buffer);
-      gst_buffer_unref (descrambled_buffer);
-      descrambled_buffer = newbuf;
+      descrambled_buffer = gst_buffer_join (descrambled_buffer, sub_buffer);
     }
   }
 
-  stream->payload = descrambled_buffer;
+  gst_buffer_stamp (descrambled_buffer, scrambled_buffer);
+  /* FIXME/CHECK: do we need to transfer buffer flags here too? */
+
   gst_buffer_unref (scrambled_buffer);
+  *p_buffer = descrambled_buffer;
 }
 
 static gboolean
@@ -2811,15 +2913,15 @@ gst_asf_demux_process_chunk (GstASFDemux * demux,
        the src pad now. First though we should check if we need to do
        descrambling */
     if (demux->span > 1) {
-      gst_asf_demux_descramble_segment (demux, segment_info, stream);
+      gst_asf_demux_descramble_buffer (demux, stream, &stream->payload);
     }
 
     if (stream->is_video) {
       GST_DEBUG ("%s: demux->pts=%lld=%" GST_TIME_FORMAT
           ", stream->last_pts=%lld=%" GST_TIME_FORMAT,
           GST_PAD_NAME (stream->pad), demux->pts,
-          GST_TIME_ARGS ((GST_SECOND / 1000) * demux->pts), stream->last_pts,
-          GST_TIME_ARGS ((GST_SECOND / 1000) * stream->last_pts));
+          GST_TIME_ARGS (GST_MSECOND * demux->pts), stream->last_pts,
+          GST_TIME_ARGS (GST_MSECOND * stream->last_pts));
     }
 
     /* FIXME: last_pts is not a GstClockTime and not in nanoseconds, so
@@ -2829,11 +2931,11 @@ gst_asf_demux_process_chunk (GstASFDemux * demux,
       stream->last_pts = demux->pts;
     }
 
-    GST_BUFFER_TIMESTAMP (stream->payload) =
-        (GST_SECOND / 1000) * stream->last_pts;
+    GST_BUFFER_TIMESTAMP (stream->payload) = GST_MSECOND * stream->last_pts;
 
-    GST_DEBUG ("sending stream %d of size %d", stream->id,
-        segment_info->chunk_size);
+    GST_DEBUG ("sending stream %d of size %d, ts=%" GST_TIME_FORMAT,
+        stream->id, segment_info->chunk_size,
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (stream->payload)));
 
     if (!stream->fps_known) {
       if (!stream->cache) {
index 94549e7..d98c712 100644 (file)
@@ -65,6 +65,11 @@ typedef struct
   GstTagList *pending_tags;
 
   gboolean    discont;
+
+  /* for new parsing code */
+  GstFlowReturn   last_flow; /* last flow return */
+  GArray         *payloads;  /* pending payloads */
+
 } AsfStream;
 
 typedef enum {
@@ -141,7 +146,9 @@ struct _GstASFDemuxClass {
   GstElementClass parent_class;
 };
 
-GType gst_asf_demux_get_type (void);
+GType           gst_asf_demux_get_type (void);
+
+AsfStream     * gst_asf_demux_get_stream (GstASFDemux * demux, guint16 id);
 
 G_END_DECLS