tsdemux: Push packets as early as possible
authorEdward Hervey <edward.hervey@collabora.co.uk>
Mon, 5 Mar 2012 08:38:57 +0000 (09:38 +0100)
committerEdward Hervey <edward.hervey@collabora.co.uk>
Mon, 5 Mar 2012 08:41:48 +0000 (09:41 +0100)
When the PES header tells us how big the outgoing packet is, push the
packet downstream as soon as we have the specified size instead of waiting
for the beginning of the next packet.
Reduces latency and removes issues with very sparse streams (like subtitles
and subpictures).

gst/mpegtsdemux/tsdemux.c

index 9bb05cd..255809e 100644 (file)
@@ -128,6 +128,11 @@ struct _TSDemuxStream
   GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS];
   guint8 nbpending;
 
+  /* Size of data to push (if known) */
+  guint expected_size;
+  /* Size of currently queued data */
+  guint current_size;
+
   /* Current data to be pushed out */
   GstBufferList *current;
   GstBufferListIterator *currentit;
@@ -1025,6 +1030,8 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream)
   memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
   stream->nbpending = 0;
 
+  stream->expected_size = 0;
+  stream->current_size = 0;
   stream->current = NULL;
   stream->need_newsegment = TRUE;
   stream->pts = GST_CLOCK_TIME_NONE;
@@ -1270,7 +1277,10 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
   }
 
   /* Remove PES headers */
-  GST_DEBUG ("Moving data forward  by %d bytes", header.header_size);
+  GST_DEBUG ("Moving data forward  by %d bytes (packet_size:%d, have:%d)",
+      header.header_size, header.packet_length,
+      GST_BUFFER_SIZE (stream->pendingbuffers[0]));
+  stream->expected_size = header.packet_length;
   GST_BUFFER_DATA (stream->pendingbuffers[0]) += header.header_size;
   GST_BUFFER_SIZE (stream->pendingbuffers[0]) -= header.header_size;
 
@@ -1340,12 +1350,14 @@ gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
     GST_LOG ("HEADER: appending data to array");
     /* Append to the array */
     stream->pendingbuffers[stream->nbpending++] = buf;
+    stream->current_size += GST_BUFFER_SIZE (buf);
 
     /* parse the header */
     gst_ts_demux_parse_pes_header (demux, stream);
   } else if (stream->state == PENDING_PACKET_BUFFER) {
     GST_LOG ("BUFFER: appending data to bufferlist");
     stream->currentlist = g_list_prepend (stream->currentlist, buf);
+    stream->current_size += GST_BUFFER_SIZE (buf);
   }
 
 
@@ -1500,6 +1512,8 @@ beach:
   memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
   stream->nbpending = 0;
   stream->current = NULL;
+  stream->expected_size = 0;
+  stream->current_size = 0;
 
   return res;
 }
@@ -1538,9 +1552,14 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
           GST_BUFFER_OFFSET (packet->buffer));
   }
 
-  if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED))
+  if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)) {
     gst_ts_demux_queue_data (demux, stream, packet);
-  else
+    GST_DEBUG ("current_size:%d, expected_size:%d",
+        stream->current_size, stream->expected_size);
+    /* Finally check if the data we queued completes a packet */
+    if (stream->expected_size && stream->current_size == stream->expected_size)
+      res = gst_ts_demux_push_pending_data (demux, stream);
+  } else
     gst_buffer_unref (packet->buffer);
 
   return res;