tsdemux: Adapt to new packetizer API
authorEdward Hervey <edward.hervey@collabora.co.uk>
Mon, 21 May 2012 15:53:37 +0000 (17:53 +0200)
committerEdward Hervey <edward.hervey@collabora.co.uk>
Tue, 22 May 2012 15:43:38 +0000 (17:43 +0200)
We no longer use GstBufferList and instead copy the incoming data
into newly allocated memory.

This makes tsdemux behaviour 3 to 4 times faster.

gst/mpegtsdemux/tsdemux.c

index 39106a384607e44beb67b784550cf447e26679c2..aab4e0497373964dc749a22924f5fd1bd2eb646d 100644 (file)
@@ -54,9 +54,6 @@
 
 #define TABLE_ID_UNSET 0xFF
 
-/* Size of the pendingbuffers array. */
-#define TS_MAX_PENDING_BUFFERS  256
-
 #define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
 /* small PCR for wrap detection */
 #define PCR_SMALL 17775000
@@ -122,19 +119,16 @@ struct _TSDemuxStream
 
   /* Output data */
   PendingPacketState state;
-  /* Pending buffers array. */
-  /* These buffers are stored in this array until the PES header (if needed)
-   * is succesfully parsed. */
-  GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS];
-  guint8 nbpending;
+
+  /* Data to push (allocated) */
+  guint8 *data;
 
   /* Size of data to push (if known) */
   guint expected_size;
+
   /* Size of currently queued data */
   guint current_size;
-
-  /* Current data to be pushed out */
-  GList *currentlist;
+  guint allocated_size;
 
   /* Current PTS/DTS for this stream */
   GstClockTime pts;
@@ -1022,23 +1016,16 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
 static void
 gst_ts_demux_stream_flush (TSDemuxStream * stream)
 {
-  gint i;
-
   stream->pts = GST_CLOCK_TIME_NONE;
 
-  for (i = 0; i < stream->nbpending; i++)
-    gst_buffer_unref (stream->pendingbuffers[i]);
-  memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
-  stream->nbpending = 0;
-
-  if (stream->currentlist) {
-    g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
-    g_list_free (stream->currentlist);
-    stream->currentlist = NULL;
-  }
+  GST_DEBUG ("flushing stream %p", stream);
 
+  if (stream->data)
+    g_free (stream->data);
+  stream->data = NULL;
   stream->state = PENDING_PACKET_EMPTY;
   stream->expected_size = 0;
+  stream->allocated_size = 0;
   stream->current_size = 0;
   stream->need_newsegment = TRUE;
   stream->pts = GST_CLOCK_TIME_NONE;
@@ -1231,27 +1218,18 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
   }
 }
 
-static GstFlowReturn
-gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
+static void
+gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
+    guint8 * data, guint32 length, guint64 bufferoffset)
 {
   MpegTSBase *base = (MpegTSBase *) demux;
   PESHeader header;
-  GstBuffer *buf;
-  GstFlowReturn res = GST_FLOW_OK;
   gint offset = 0;
-  GstMapInfo map;
-  guint64 bufferoffset;
   PESParsingResult parseres;
 
-  buf = stream->pendingbuffers[0] =
-      gst_buffer_make_writable (stream->pendingbuffers[0]);
-  gst_buffer_map (buf, &map, GST_MAP_READ);
-  bufferoffset = GST_BUFFER_OFFSET (buf);
-
-  GST_MEMDUMP ("Header buffer", map.data, MIN (map.size, 32));
+  GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
 
-  parseres = mpegts_parse_pes_header (map.data, map.size, &header, &offset);
-  gst_buffer_unmap (buf, &map);
+  parseres = mpegts_parse_pes_header (data, length, &header, &offset);
   if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
     goto discont;
   if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
@@ -1271,45 +1249,32 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
         GST_TIME_ARGS (stream->pts),
         GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS)));
 
-    {
-      MpegTSPacketizer2 *packetizer = base->packetizer;
-
-      GST_BUFFER_DTS (buf) =
-          mpegts_packetizer_pts_to_ts (packetizer, stream->dts);
-      GST_BUFFER_PTS (buf) =
-          mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
-    }
-    GST_DEBUG ("buf %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
   }
 
   /* Remove PES headers */
-  GST_DEBUG ("Moving data forward  by %d bytes (packet_size:%d, have:%"
-      G_GSIZE_FORMAT ")", header.header_size, header.packet_length, map.size);
+  GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
+      header.header_size, header.packet_length, length);
   stream->expected_size = header.packet_length;
-  gst_buffer_resize (buf, header.header_size, map.size - header.header_size);
-
-  /* FIXME : responsible for switching to PENDING_PACKET_BUFFER and
-   * creating the bufferlist */
-  if (1) {
-    /* Append to the buffer list */
-    if (G_UNLIKELY (stream->currentlist == NULL)) {
-      guint8 i;
-
-      /* Push pending buffers into the list */
-      for (i = stream->nbpending; i; i--)
-        stream->currentlist =
-            g_list_prepend (stream->currentlist, stream->pendingbuffers[i - 1]);
-      memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
-      stream->nbpending = 0;
-    }
-    stream->state = PENDING_PACKET_BUFFER;
-  }
+  data += header.header_size;
+  length -= header.header_size;
 
-  return res;
+  /* Create the output buffer */
+  if (stream->expected_size)
+    stream->allocated_size = stream->expected_size;
+  else
+    stream->allocated_size = 8192;
+  g_assert (stream->data == NULL);
+  stream->data = g_malloc (stream->allocated_size);
+  memcpy (stream->data, data, length);
+  stream->current_size = length;
+
+  stream->state = PENDING_PACKET_BUFFER;
+
+  return;
 
 discont:
   stream->state = PENDING_PACKET_DISCONT;
-  return res;
+  return;
 }
 
  /* ONLY CALL THIS:
@@ -1320,17 +1285,13 @@ static inline void
 gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
     MpegTSPacketizerPacket * packet)
 {
-  GstBuffer *buf;
+  guint8 *data;
+  guint size;
 
   GST_DEBUG ("state:%d", stream->state);
 
-  buf = packet->buffer;
-
-  GST_DEBUG ("Resizing buffer to %d (size:%d) (Was %" G_GSIZE_FORMAT
-      " bytes long)", (int) (packet->payload - packet->bufmap.data),
-      (int) (packet->data_end - packet->payload), packet->bufmap.size);
-  gst_buffer_resize (buf, packet->payload - packet->bufmap.data,
-      packet->data_end - packet->payload);
+  size = packet->data_end - packet->payload;
+  data = packet->payload;
 
   if (stream->state == PENDING_PACKET_EMPTY) {
     if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
@@ -1345,26 +1306,31 @@ gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
   switch (stream->state) {
     case PENDING_PACKET_HEADER:
     {
-      GST_LOG ("HEADER: appending data to array");
-      /* Append to the array */
-      stream->pendingbuffers[stream->nbpending++] = buf;
-      stream->current_size += packet->bufmap.size;
+      GST_LOG ("HEADER: Parsing PES header");
 
       /* parse the header */
-      gst_ts_demux_parse_pes_header (demux, stream);
+      gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
       break;
     }
     case PENDING_PACKET_BUFFER:
     {
-      GST_LOG ("BUFFER: appending data to bufferlist");
-      stream->currentlist = g_list_prepend (stream->currentlist, buf);
-      stream->current_size += packet->bufmap.size;
+      GST_LOG ("BUFFER: appending data");
+      if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
+        GST_LOG ("resizing buffer");
+        stream->allocated_size = stream->allocated_size * 2;
+        stream->data = g_realloc (stream->data, stream->allocated_size);
+      }
+      memcpy (stream->data + stream->current_size, data, size);
+      stream->current_size += size;
       break;
     }
     case PENDING_PACKET_DISCONT:
     {
-      GST_LOG ("DISCONT: dropping buffer");
-      gst_buffer_unref (packet->buffer);
+      GST_LOG ("DISCONT: not storing/pushing");
+      if (G_UNLIKELY (stream->data)) {
+        g_free (stream->data);
+        stream->data = NULL;
+      }
       break;
     }
     default:
@@ -1461,18 +1427,16 @@ static GstFlowReturn
 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
 {
   GstFlowReturn res = GST_FLOW_OK;
-  GList *tmp;
   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
+  GstBuffer *buffer = NULL;
   MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux);
-  GstBuffer *buf;
-  GstClockTime ts;
 
   GST_DEBUG_OBJECT (stream->pad,
       "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
       bs->stream_type, stream->state);
 
-  if (G_UNLIKELY (stream->currentlist == NULL)) {
-    GST_LOG ("stream->current == NULL");
+  if (G_UNLIKELY (stream->data == NULL)) {
+    GST_LOG ("stream->data == NULL");
     goto beach;
   }
 
@@ -1490,44 +1454,29 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
     activate_pad_for_stream (demux, stream);
 
   if (G_UNLIKELY (stream->pad == NULL)) {
-    g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
-    g_list_free (stream->currentlist);
-    stream->currentlist = NULL;
+    g_free (stream->data);
     goto beach;
   }
 
   if (G_UNLIKELY (stream->need_newsegment))
     calculate_and_push_newsegment (demux, stream);
 
-  /* We have a confirmed buffer, let's push it out */
-  GST_LOG_OBJECT (stream->pad, "Putting pending data into GstBufferList");
-  stream->currentlist = g_list_reverse (stream->currentlist);
-  buf = gst_buffer_make_writable ((GstBuffer *) stream->currentlist->data);
-
-  ts = GST_BUFFER_PTS (buf);
+  buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
 
   GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
       GST_TIME_ARGS (stream->pts));
-  if (GST_CLOCK_TIME_IS_VALID (stream->pts)
-      && !GST_CLOCK_TIME_IS_VALID (ts))
-    ts = mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
-
-  for (tmp = stream->currentlist->next; tmp; tmp = tmp->next) {
-    gst_buffer_copy_into (buf, (GstBuffer *) tmp->data, GST_BUFFER_COPY_MEMORY,
-        0, -1);
-    gst_buffer_unref ((GstBuffer *) tmp->data);
-  }
+  if (GST_CLOCK_TIME_IS_VALID (stream->pts))
+    GST_BUFFER_PTS (buffer) =
+        mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
+  if (GST_CLOCK_TIME_IS_VALID (stream->dts))
+    GST_BUFFER_DTS (buffer) =
+        mpegts_packetizer_pts_to_ts (packetizer, stream->dts);
 
-  g_list_free (stream->currentlist);
-  stream->currentlist = NULL;
-
-  GST_BUFFER_PTS (buf) = ts;
-  GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
   GST_DEBUG_OBJECT (stream->pad,
-      "Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
+      "Pushing buffer with timestamp: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
 
-  res = gst_pad_push (stream->pad, buf);
+  res = gst_pad_push (stream->pad, buffer);
   GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
   res = tsdemux_combine_flows (demux, stream, res);
   GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
@@ -1536,15 +1485,9 @@ beach:
   /* Reset everything */
   GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
   stream->state = PENDING_PACKET_EMPTY;
-  memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
-  stream->nbpending = 0;
+  stream->data = NULL;
   stream->expected_size = 0;
   stream->current_size = 0;
-  if (stream->currentlist) {
-    g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
-    g_list_free (stream->currentlist);
-  }
-  stream->currentlist = NULL;
 
   return res;
 }
@@ -1555,20 +1498,14 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
 {
   GstFlowReturn res = GST_FLOW_OK;
 
-#if 0
-  GST_DEBUG ("buffer:%p, data:%p", GST_BUFFER_DATA (packet->buffer),
-      packet->data);
-#endif
-  GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p",
-      packet->pid,
-      packet->payload_unit_start_indicator,
-      packet->adaptation_field_control,
+  GST_DEBUG ("data:%p", packet->data);
+  GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid,
+      packet->payload_unit_start_indicator, packet->adaptation_field_control,
       packet->continuity_counter, packet->payload);
 
   if (section) {
-    GST_DEBUG ("section complete:%d, buffer size %" G_GSIZE_FORMAT,
-        section->complete, gst_buffer_get_size (section->buffer));
-    gst_buffer_unref (packet->buffer);
+    GST_DEBUG ("section complete:%d, buffer size %d",
+        section->complete, section->section_length);
     return res;
   }
 
@@ -1579,11 +1516,9 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
 
   if (packet->adaptation_field_control & 0x2) {
     if (packet->afc_flags & MPEGTS_AFC_PCR_FLAG)
-      gst_ts_demux_record_pcr (demux, stream, packet->pcr,
-          GST_BUFFER_OFFSET (packet->buffer));
+      gst_ts_demux_record_pcr (demux, stream, packet->pcr, packet->offset);
     if (packet->afc_flags & MPEGTS_AFC_OPCR_FLAG)
-      gst_ts_demux_record_opcr (demux, stream, packet->opcr,
-          GST_BUFFER_OFFSET (packet->buffer));
+      gst_ts_demux_record_opcr (demux, stream, packet->opcr, packet->offset);
   }
 
   if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
@@ -1594,8 +1529,7 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
     /* Finally check if the data we queued completes a packet */
     if (stream->expected_size && stream->current_size == stream->expected_size)
       res = gst_ts_demux_push_pending_data (demux, stream);
-  } else
-    gst_buffer_unref (packet->buffer);
+  }
 
   return res;
 }
@@ -1621,11 +1555,7 @@ gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
 
     if (stream) {
       res = gst_ts_demux_handle_packet (demux, stream, packet, section);
-    } else if (packet->buffer)
-      gst_buffer_unref (packet->buffer);
-  } else {
-    if (packet->buffer)
-      gst_buffer_unref (packet->buffer);
+    }
   }
   return res;
 }