gst/rtp/gstrtpmp4gdepay.*: Handle interleaved streams by reordering AU in a queue.
authorWim Taymans <wim.taymans@gmail.com>
Mon, 15 Sep 2008 21:10:23 +0000 (21:10 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Mon, 15 Sep 2008 21:10:23 +0000 (21:10 +0000)
Original commit message from CVS:
* gst/rtp/gstrtpmp4gdepay.c: (gst_rtp_mp4g_depay_init),
(gst_rtp_mp4g_depay_finalize), (gst_rtp_mp4g_depay_setcaps),
(gst_rtp_mp4g_depay_clear_queue), (gst_rtp_mp4g_depay_flush_queue),
(gst_rtp_mp4g_depay_queue), (gst_rtp_mp4g_depay_process),
(gst_rtp_mp4g_depay_change_state):
* gst/rtp/gstrtpmp4gdepay.h:
Handle interleaved streams by reordering AU in a queue.

ChangeLog
gst/rtp/gstrtpmp4gdepay.c
gst/rtp/gstrtpmp4gdepay.h

index 7892296..6671d4d 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,15 @@
 2008-09-15  Wim Taymans  <wim.taymans@collabora.co.uk>
 
+       * gst/rtp/gstrtpmp4gdepay.c: (gst_rtp_mp4g_depay_init),
+       (gst_rtp_mp4g_depay_finalize), (gst_rtp_mp4g_depay_setcaps),
+       (gst_rtp_mp4g_depay_clear_queue), (gst_rtp_mp4g_depay_flush_queue),
+       (gst_rtp_mp4g_depay_queue), (gst_rtp_mp4g_depay_process),
+       (gst_rtp_mp4g_depay_change_state):
+       * gst/rtp/gstrtpmp4gdepay.h:
+       Handle interleaved streams by reordering AU in a queue.
+
+2008-09-15  Wim Taymans  <wim.taymans@collabora.co.uk>
+
        * gst/rtp/gstrtpmp4gdepay.c: (gst_bs_parse_init),
        (gst_bs_parse_read), (gst_rtp_mp4g_depay_process):
        Change some of the ranges in the caps, mostly for the amount of bits we
index d594244..9fcbddc 100644 (file)
@@ -188,6 +188,7 @@ gst_rtp_mp4g_depay_init (GstRtpMP4GDepay * rtpmp4gdepay,
     GstRtpMP4GDepayClass * klass)
 {
   rtpmp4gdepay->adapter = gst_adapter_new ();
+  rtpmp4gdepay->packets = g_queue_new ();
 }
 
 static void
@@ -199,6 +200,8 @@ gst_rtp_mp4g_depay_finalize (GObject * object)
 
   g_object_unref (rtpmp4gdepay->adapter);
   rtpmp4gdepay->adapter = NULL;
+  g_queue_free (rtpmp4gdepay->packets);
+  rtpmp4gdepay->packets = NULL;
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -241,10 +244,14 @@ gst_rtp_mp4g_depay_setcaps (GstBaseRTPDepayload * depayload, GstCaps * caps)
     if (strcmp (str, "audio") == 0) {
       srccaps = gst_caps_new_simple ("audio/mpeg",
           "mpegversion", G_TYPE_INT, 4, NULL);
+      /* AAC always has a default constant duration of 1024 but it can be
+       * overriden below. */
+      rtpmp4gdepay->constantDuration = 1024;
     } else if (strcmp (str, "video") == 0) {
       srccaps = gst_caps_new_simple ("video/mpeg",
           "mpegversion", G_TYPE_INT, 4,
           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
+      rtpmp4gdepay->constantDuration = 0;
     }
   }
   if (srccaps == NULL)
@@ -268,6 +275,12 @@ gst_rtp_mp4g_depay_setcaps (GstBaseRTPDepayload * depayload, GstCaps * caps)
       gst_rtp_mp4g_depay_parse_int (structure, "streamstateindication", 0);
   rtpmp4gdepay->auxiliarydatasizelength =
       gst_rtp_mp4g_depay_parse_int (structure, "auxiliarydatasizelength", 0);
+  rtpmp4gdepay->constantSize =
+      gst_rtp_mp4g_depay_parse_int (structure, "constantsize", 0);
+  rtpmp4gdepay->constantDuration =
+      gst_rtp_mp4g_depay_parse_int (structure, "constantduration",
+      rtpmp4gdepay->constantDuration);
+
 
   /* get config string */
   if ((str = gst_structure_get_string (structure, "config"))) {
@@ -299,11 +312,115 @@ unknown_media:
   }
 }
 
+static void
+gst_rtp_mp4g_depay_clear_queue (GstRtpMP4GDepay * rtpmp4gdepay)
+{
+  GstBuffer *outbuf;
+
+  while ((outbuf = g_queue_pop_head (rtpmp4gdepay->packets)))
+    gst_buffer_unref (outbuf);
+}
+
+static void
+gst_rtp_mp4g_depay_flush_queue (GstRtpMP4GDepay * rtpmp4gdepay)
+{
+  GstBuffer *outbuf;
+  gboolean discont = FALSE;
+  guint AU_index;
+
+  while ((outbuf = g_queue_pop_head (rtpmp4gdepay->packets))) {
+    AU_index = GST_BUFFER_OFFSET (outbuf);
+
+    GST_DEBUG_OBJECT (rtpmp4gdepay, "next available AU_index %u", AU_index);
+
+    if (rtpmp4gdepay->next_AU_index != AU_index) {
+      GST_DEBUG_OBJECT (rtpmp4gdepay, "discont, expected AU_index %u",
+          rtpmp4gdepay->next_AU_index);
+      discont = TRUE;
+    }
+
+    if (discont) {
+      GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+      discont = FALSE;
+    }
+
+    GST_DEBUG_OBJECT (rtpmp4gdepay, "pushing AU_index %u", AU_index);
+    gst_base_rtp_depayload_push (GST_BASE_RTP_DEPAYLOAD (rtpmp4gdepay), outbuf);
+    rtpmp4gdepay->next_AU_index = AU_index + 1;
+  }
+}
+
+static void
+gst_rtp_mp4g_depay_queue (GstRtpMP4GDepay * rtpmp4gdepay, GstBuffer * outbuf)
+{
+  guint AU_index = GST_BUFFER_OFFSET (outbuf);
+
+  if (rtpmp4gdepay->next_AU_index == -1) {
+    GST_DEBUG_OBJECT (rtpmp4gdepay, "Init AU counter %u", AU_index);
+    rtpmp4gdepay->next_AU_index = AU_index;
+  }
+
+  if (rtpmp4gdepay->next_AU_index == AU_index) {
+    GST_DEBUG_OBJECT (rtpmp4gdepay, "pushing expected AU_index %u", AU_index);
+
+    /* we received the expected packet, push it and flush as much as we can from
+     * the queue */
+    gst_base_rtp_depayload_push (GST_BASE_RTP_DEPAYLOAD (rtpmp4gdepay), outbuf);
+    rtpmp4gdepay->next_AU_index++;
+
+    while ((outbuf = g_queue_peek_head (rtpmp4gdepay->packets))) {
+      AU_index = GST_BUFFER_OFFSET (outbuf);
+
+      GST_DEBUG_OBJECT (rtpmp4gdepay, "next available AU_index %u", AU_index);
+
+      if (rtpmp4gdepay->next_AU_index == AU_index) {
+        GST_DEBUG_OBJECT (rtpmp4gdepay, "pushing expected AU_index %u",
+            AU_index);
+        outbuf = g_queue_pop_head (rtpmp4gdepay->packets);
+        gst_base_rtp_depayload_push (GST_BASE_RTP_DEPAYLOAD (rtpmp4gdepay),
+            outbuf);
+        rtpmp4gdepay->next_AU_index++;
+      } else {
+        GST_DEBUG_OBJECT (rtpmp4gdepay, "waiting for next AU_index %u",
+            rtpmp4gdepay->next_AU_index);
+        break;
+      }
+    }
+  } else {
+    GList *list;
+
+    GST_DEBUG_OBJECT (rtpmp4gdepay, "queueing AU_index %u", AU_index);
+
+    /* loop the list to skip strictly smaller AU_index buffers */
+    for (list = rtpmp4gdepay->packets->head; list; list = g_list_next (list)) {
+      guint idx;
+      gint gap;
+
+      idx = GST_BUFFER_OFFSET (GST_BUFFER_CAST (list->data));
+
+      /* compare the new seqnum to the one in the buffer */
+      gap = (gint) (idx - AU_index);
+
+      GST_DEBUG_OBJECT (rtpmp4gdepay, "compare with AU_index %u, gap %d", idx,
+          gap);
+
+      /* AU_index <= idx, we can stop looking */
+      if (G_LIKELY (gap > 0))
+        break;
+    }
+    if (G_LIKELY (list))
+      g_queue_insert_before (rtpmp4gdepay->packets, list, outbuf);
+    else
+      g_queue_push_tail (rtpmp4gdepay->packets, outbuf);
+  }
+}
+
 static GstBuffer *
 gst_rtp_mp4g_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
 {
   GstRtpMP4GDepay *rtpmp4gdepay;
   GstBuffer *outbuf;
+  GstClockTime timestamp;
 
   rtpmp4gdepay = GST_RTP_MP4G_DEPAY (depayload);
 
@@ -312,21 +429,24 @@ gst_rtp_mp4g_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
 
   /* flush remaining data on discont */
   if (GST_BUFFER_IS_DISCONT (buf)) {
+    GST_DEBUG_OBJECT (rtpmp4gdepay, "received DISCONT");
     gst_adapter_clear (rtpmp4gdepay->adapter);
   }
 
+  timestamp = GST_BUFFER_TIMESTAMP (buf);
+
   {
     gint payload_len, payload_AU;
     guint8 *payload;
-    guint32 timestamp;
+    guint32 rtptime;
     guint AU_headers_len;
-    guint AU_size, AU_index, payload_AU_size;
+    guint AU_size, AU_index, AU_index_delta, payload_AU_size;
     gboolean M;
 
     payload_len = gst_rtp_buffer_get_payload_len (buf);
     payload = gst_rtp_buffer_get_payload (buf);
 
-    timestamp = gst_rtp_buffer_get_timestamp (buf);
+    rtptime = gst_rtp_buffer_get_timestamp (buf);
     M = gst_rtp_buffer_get_marker (buf);
 
     if (rtpmp4gdepay->sizelength > 0) {
@@ -364,6 +484,7 @@ gst_rtp_mp4g_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
 
       /* point the bitstream parser to the first AU header bit */
       gst_bs_parse_init (&bs, payload, payload_len);
+      AU_index = AU_index_delta = 0;
 
       for (i = 0; i < num_AU_headers && payload_AU_size > 0; i++) {
         /* parse AU header
@@ -386,24 +507,76 @@ gst_rtp_mp4g_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
          *  +---------------------------------------+
          */
         AU_size = gst_bs_parse_read (&bs, rtpmp4gdepay->sizelength);
-        if (i == 0)
+
+        /* calculate the AU_index, which is only on the first AU of the packet
+         * and the AU_index_delta on the other AUs. This will be used to
+         * reconstruct the AU ordering when interleaving. */
+        if (i == 0) {
           AU_index = gst_bs_parse_read (&bs, rtpmp4gdepay->indexlength);
-        else
-          AU_index = gst_bs_parse_read (&bs, rtpmp4gdepay->indexdeltalength);
+          if (AU_index == 0 && rtpmp4gdepay->prev_AU_index == 0) {
+            gint diff;
+
+            /* if we see two consecutive packets with AU_index of 0, we can
+             * assume we have constantDuration packets. Since we don't have
+             * the index we must use the AU duration to calculate the
+             * index. Get the diff between the timestamps first, this can be
+             * positive or negative. */
+            if (rtpmp4gdepay->prev_rtptime <= rtptime)
+              diff = rtptime - rtpmp4gdepay->prev_rtptime;
+            else
+              diff = -(rtpmp4gdepay->prev_rtptime - rtptime);
+
+            /* get the number of packets by dividing with the duration */
+            diff /= rtpmp4gdepay->constantDuration;
+
+            rtpmp4gdepay->last_AU_index += diff;
+            rtpmp4gdepay->prev_AU_index = AU_index;
+
+            AU_index = rtpmp4gdepay->last_AU_index;
+
+          } else {
+            rtpmp4gdepay->prev_AU_index = AU_index;
+            rtpmp4gdepay->last_AU_index = AU_index;
+          }
+
+          /* keep track of the higest AU_index */
+          if (rtpmp4gdepay->max_AU_index != -1
+              && rtpmp4gdepay->max_AU_index <= AU_index) {
+            GST_DEBUG_OBJECT (rtpmp4gdepay, "new interleave group, flushing");
+            /* a new interleave group started, flush */
+            gst_rtp_mp4g_depay_flush_queue (rtpmp4gdepay);
+          }
+          rtpmp4gdepay->prev_rtptime = rtptime;
+        } else {
+          AU_index_delta =
+              gst_bs_parse_read (&bs, rtpmp4gdepay->indexdeltalength);
+          AU_index += AU_index_delta + 1;
+        }
+        /* keep track of highest AU_index */
+        if (rtpmp4gdepay->max_AU_index == -1
+            || AU_index > rtpmp4gdepay->max_AU_index)
+          rtpmp4gdepay->max_AU_index = AU_index;
+
+        /* the presentation time offset, a 2s-complement value, we need this to
+         * calculate the timestamp on the output packet. */
         if (rtpmp4gdepay->ctsdeltalength > 0) {
           if (gst_bs_parse_read (&bs, 1))
             gst_bs_parse_read (&bs, rtpmp4gdepay->ctsdeltalength);
         }
+        /* the decoding time offset, a 2s-complement value */
         if (rtpmp4gdepay->dtsdeltalength > 0) {
           if (gst_bs_parse_read (&bs, 1))
             gst_bs_parse_read (&bs, rtpmp4gdepay->dtsdeltalength);
         }
+        /* RAP-flag to indicate that the AU contains a keyframe */
         if (rtpmp4gdepay->randomaccessindication)
           gst_bs_parse_read (&bs, 1);
+        /* stream-state */
         if (rtpmp4gdepay->streamstateindication > 0)
           gst_bs_parse_read (&bs, rtpmp4gdepay->streamstateindication);
 
-        GST_DEBUG_OBJECT (rtpmp4gdepay, "size %d, index %d", AU_size, AU_index);
+        GST_DEBUG_OBJECT (rtpmp4gdepay, "size %d, index %d, delta %d", AU_size,
+            AU_index, AU_index_delta);
 
         /* fragmented pakets have the AU_size set to the size of the
          * unfragmented AU. */
@@ -425,14 +598,20 @@ gst_rtp_mp4g_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf)
           outbuf = gst_adapter_take_buffer (rtpmp4gdepay->adapter, avail);
           gst_buffer_set_caps (outbuf, GST_PAD_CAPS (depayload->srcpad));
 
-          GST_DEBUG ("gst_rtp_mp4g_depay_chain: pushing buffer of size %d",
+          /* copy some of the fields we calculated above on the buffer. We also
+           * copy the AU_index so that we can sort the packets in our queue. */
+          GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
+          GST_BUFFER_OFFSET (outbuf) = AU_index;
+
+          /* make sure we don't use the timestamp again for other AUs in this
+           * RTP packet. */
+          timestamp = -1;
+
+          GST_DEBUG_OBJECT (depayload, "pushing buffer of size %d",
               GST_BUFFER_SIZE (outbuf));
 
-          /* only apply the timestamp for the first buffer */
-          if (i == 0)
-            gst_base_rtp_depayload_push_ts (depayload, timestamp, outbuf);
-          else
-            gst_base_rtp_depayload_push (depayload, outbuf);
+          gst_rtp_mp4g_depay_queue (rtpmp4gdepay, outbuf);
+
         }
         payload_AU += AU_size;
         payload_AU_size -= AU_size;
@@ -487,6 +666,11 @@ gst_rtp_mp4g_depay_change_state (GstElement * element,
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       gst_adapter_clear (rtpmp4gdepay->adapter);
+      rtpmp4gdepay->max_AU_index = -1;
+      rtpmp4gdepay->next_AU_index = -1;
+      rtpmp4gdepay->prev_AU_index = -1;
+      rtpmp4gdepay->prev_rtptime = -1;
+      rtpmp4gdepay->last_AU_index = -1;
       break;
     default:
       break;
@@ -495,6 +679,10 @@ gst_rtp_mp4g_depay_change_state (GstElement * element,
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
   switch (transition) {
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      gst_adapter_clear (rtpmp4gdepay->adapter);
+      gst_rtp_mp4g_depay_clear_queue (rtpmp4gdepay);
+      break;
     default:
       break;
   }
index 88bd4e6..dd82544 100644 (file)
@@ -46,6 +46,10 @@ struct _GstRtpMP4GDepay
 
   gint profile_level_id;
   gint streamtype;
+
+  gint constantSize;
+  gint constantDuration;
+
   gint sizelength;
   gint indexlength;
   gint indexdeltalength;
@@ -54,6 +58,14 @@ struct _GstRtpMP4GDepay
   gint randomaccessindication;
   gint streamstateindication;
   gint auxiliarydatasizelength;
+
+  guint max_AU_index;
+  guint prev_AU_index;
+  guint last_AU_index;
+  guint next_AU_index;
+  guint32 prev_rtptime;
+
+  GQueue *packets;
   
   GstAdapter *adapter;
 };