#define TABLE_ID_UNSET 0xFF
-/* Size of the pendingbuffers array. */
-#define TS_MAX_PENDING_BUFFERS 256
-
#define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
/* small PCR for wrap detection */
#define PCR_SMALL 17775000
/* Output data */
PendingPacketState state;
- /* Pending buffers array. */
- /* These buffers are stored in this array until the PES header (if needed)
- * is succesfully parsed. */
- GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS];
- guint8 nbpending;
+
+ /* Data to push (allocated) */
+ guint8 *data;
/* Size of data to push (if known) */
guint expected_size;
+
/* Size of currently queued data */
guint current_size;
-
- /* Current data to be pushed out */
- GList *currentlist;
+ guint allocated_size;
/* Current PTS/DTS for this stream */
GstClockTime pts;
static void
gst_ts_demux_stream_flush (TSDemuxStream * stream)
{
- gint i;
-
stream->pts = GST_CLOCK_TIME_NONE;
- for (i = 0; i < stream->nbpending; i++)
- gst_buffer_unref (stream->pendingbuffers[i]);
- memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
- stream->nbpending = 0;
-
- if (stream->currentlist) {
- g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
- g_list_free (stream->currentlist);
- stream->currentlist = NULL;
- }
+ GST_DEBUG ("flushing stream %p", stream);
+ if (stream->data)
+ g_free (stream->data);
+ stream->data = NULL;
stream->state = PENDING_PACKET_EMPTY;
stream->expected_size = 0;
+ stream->allocated_size = 0;
stream->current_size = 0;
stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE;
}
}
-static GstFlowReturn
-gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
+static void
+gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
+ guint8 * data, guint32 length, guint64 bufferoffset)
{
MpegTSBase *base = (MpegTSBase *) demux;
PESHeader header;
- GstBuffer *buf;
- GstFlowReturn res = GST_FLOW_OK;
gint offset = 0;
- GstMapInfo map;
- guint64 bufferoffset;
PESParsingResult parseres;
- buf = stream->pendingbuffers[0] =
- gst_buffer_make_writable (stream->pendingbuffers[0]);
- gst_buffer_map (buf, &map, GST_MAP_READ);
- bufferoffset = GST_BUFFER_OFFSET (buf);
-
- GST_MEMDUMP ("Header buffer", map.data, MIN (map.size, 32));
+ GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
- parseres = mpegts_parse_pes_header (map.data, map.size, &header, &offset);
- gst_buffer_unmap (buf, &map);
+ parseres = mpegts_parse_pes_header (data, length, &header, &offset);
if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
goto discont;
if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
GST_TIME_ARGS (stream->pts),
GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS)));
- {
- MpegTSPacketizer2 *packetizer = base->packetizer;
-
- GST_BUFFER_DTS (buf) =
- mpegts_packetizer_pts_to_ts (packetizer, stream->dts);
- GST_BUFFER_PTS (buf) =
- mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
- }
- GST_DEBUG ("buf %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
}
/* Remove PES headers */
- GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%"
- G_GSIZE_FORMAT ")", header.header_size, header.packet_length, map.size);
+ GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
+ header.header_size, header.packet_length, length);
stream->expected_size = header.packet_length;
- gst_buffer_resize (buf, header.header_size, map.size - header.header_size);
-
- /* FIXME : responsible for switching to PENDING_PACKET_BUFFER and
- * creating the bufferlist */
- if (1) {
- /* Append to the buffer list */
- if (G_UNLIKELY (stream->currentlist == NULL)) {
- guint8 i;
-
- /* Push pending buffers into the list */
- for (i = stream->nbpending; i; i--)
- stream->currentlist =
- g_list_prepend (stream->currentlist, stream->pendingbuffers[i - 1]);
- memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
- stream->nbpending = 0;
- }
- stream->state = PENDING_PACKET_BUFFER;
- }
+ data += header.header_size;
+ length -= header.header_size;
- return res;
+ /* Create the output buffer */
+ if (stream->expected_size)
+ stream->allocated_size = stream->expected_size;
+ else
+ stream->allocated_size = 8192;
+ g_assert (stream->data == NULL);
+ stream->data = g_malloc (stream->allocated_size);
+ memcpy (stream->data, data, length);
+ stream->current_size = length;
+
+ stream->state = PENDING_PACKET_BUFFER;
+
+ return;
discont:
stream->state = PENDING_PACKET_DISCONT;
- return res;
+ return;
}
/* ONLY CALL THIS:
gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
MpegTSPacketizerPacket * packet)
{
- GstBuffer *buf;
+ guint8 *data;
+ guint size;
GST_DEBUG ("state:%d", stream->state);
- buf = packet->buffer;
-
- GST_DEBUG ("Resizing buffer to %d (size:%d) (Was %" G_GSIZE_FORMAT
- " bytes long)", (int) (packet->payload - packet->bufmap.data),
- (int) (packet->data_end - packet->payload), packet->bufmap.size);
- gst_buffer_resize (buf, packet->payload - packet->bufmap.data,
- packet->data_end - packet->payload);
+ size = packet->data_end - packet->payload;
+ data = packet->payload;
if (stream->state == PENDING_PACKET_EMPTY) {
if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
switch (stream->state) {
case PENDING_PACKET_HEADER:
{
- GST_LOG ("HEADER: appending data to array");
- /* Append to the array */
- stream->pendingbuffers[stream->nbpending++] = buf;
- stream->current_size += packet->bufmap.size;
+ GST_LOG ("HEADER: Parsing PES header");
/* parse the header */
- gst_ts_demux_parse_pes_header (demux, stream);
+ gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
break;
}
case PENDING_PACKET_BUFFER:
{
- GST_LOG ("BUFFER: appending data to bufferlist");
- stream->currentlist = g_list_prepend (stream->currentlist, buf);
- stream->current_size += packet->bufmap.size;
+ GST_LOG ("BUFFER: appending data");
+ if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
+ GST_LOG ("resizing buffer");
+ stream->allocated_size = stream->allocated_size * 2;
+ stream->data = g_realloc (stream->data, stream->allocated_size);
+ }
+ memcpy (stream->data + stream->current_size, data, size);
+ stream->current_size += size;
break;
}
case PENDING_PACKET_DISCONT:
{
- GST_LOG ("DISCONT: dropping buffer");
- gst_buffer_unref (packet->buffer);
+ GST_LOG ("DISCONT: not storing/pushing");
+ if (G_UNLIKELY (stream->data)) {
+ g_free (stream->data);
+ stream->data = NULL;
+ }
break;
}
default:
gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
{
GstFlowReturn res = GST_FLOW_OK;
- GList *tmp;
MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
+ GstBuffer *buffer = NULL;
MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux);
- GstBuffer *buf;
- GstClockTime ts;
GST_DEBUG_OBJECT (stream->pad,
"stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
bs->stream_type, stream->state);
- if (G_UNLIKELY (stream->currentlist == NULL)) {
- GST_LOG ("stream->current == NULL");
+ if (G_UNLIKELY (stream->data == NULL)) {
+ GST_LOG ("stream->data == NULL");
goto beach;
}
activate_pad_for_stream (demux, stream);
if (G_UNLIKELY (stream->pad == NULL)) {
- g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
- g_list_free (stream->currentlist);
- stream->currentlist = NULL;
+ g_free (stream->data);
goto beach;
}
if (G_UNLIKELY (stream->need_newsegment))
calculate_and_push_newsegment (demux, stream);
- /* We have a confirmed buffer, let's push it out */
- GST_LOG_OBJECT (stream->pad, "Putting pending data into GstBufferList");
- stream->currentlist = g_list_reverse (stream->currentlist);
- buf = gst_buffer_make_writable ((GstBuffer *) stream->currentlist->data);
-
- ts = GST_BUFFER_PTS (buf);
+ buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->pts));
- if (GST_CLOCK_TIME_IS_VALID (stream->pts)
- && !GST_CLOCK_TIME_IS_VALID (ts))
- ts = mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
-
- for (tmp = stream->currentlist->next; tmp; tmp = tmp->next) {
- gst_buffer_copy_into (buf, (GstBuffer *) tmp->data, GST_BUFFER_COPY_MEMORY,
- 0, -1);
- gst_buffer_unref ((GstBuffer *) tmp->data);
- }
+ if (GST_CLOCK_TIME_IS_VALID (stream->pts))
+ GST_BUFFER_PTS (buffer) =
+ mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
+ if (GST_CLOCK_TIME_IS_VALID (stream->dts))
+ GST_BUFFER_DTS (buffer) =
+ mpegts_packetizer_pts_to_ts (packetizer, stream->dts);
- g_list_free (stream->currentlist);
- stream->currentlist = NULL;
-
- GST_BUFFER_PTS (buf) = ts;
- GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
GST_DEBUG_OBJECT (stream->pad,
- "Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
+ "Pushing buffer with timestamp: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
- res = gst_pad_push (stream->pad, buf);
+ res = gst_pad_push (stream->pad, buffer);
GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
res = tsdemux_combine_flows (demux, stream, res);
GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
/* Reset everything */
GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
stream->state = PENDING_PACKET_EMPTY;
- memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
- stream->nbpending = 0;
+ stream->data = NULL;
stream->expected_size = 0;
stream->current_size = 0;
- if (stream->currentlist) {
- g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
- g_list_free (stream->currentlist);
- }
- stream->currentlist = NULL;
return res;
}
{
GstFlowReturn res = GST_FLOW_OK;
-#if 0
- GST_DEBUG ("buffer:%p, data:%p", GST_BUFFER_DATA (packet->buffer),
- packet->data);
-#endif
- GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p",
- packet->pid,
- packet->payload_unit_start_indicator,
- packet->adaptation_field_control,
+ GST_DEBUG ("data:%p", packet->data);
+ GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid,
+ packet->payload_unit_start_indicator, packet->adaptation_field_control,
packet->continuity_counter, packet->payload);
if (section) {
- GST_DEBUG ("section complete:%d, buffer size %" G_GSIZE_FORMAT,
- section->complete, gst_buffer_get_size (section->buffer));
- gst_buffer_unref (packet->buffer);
+ GST_DEBUG ("section complete:%d, buffer size %d",
+ section->complete, section->section_length);
return res;
}
if (packet->adaptation_field_control & 0x2) {
if (packet->afc_flags & MPEGTS_AFC_PCR_FLAG)
- gst_ts_demux_record_pcr (demux, stream, packet->pcr,
- GST_BUFFER_OFFSET (packet->buffer));
+ gst_ts_demux_record_pcr (demux, stream, packet->pcr, packet->offset);
if (packet->afc_flags & MPEGTS_AFC_OPCR_FLAG)
- gst_ts_demux_record_opcr (demux, stream, packet->opcr,
- GST_BUFFER_OFFSET (packet->buffer));
+ gst_ts_demux_record_opcr (demux, stream, packet->opcr, packet->offset);
}
if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
/* Finally check if the data we queued completes a packet */
if (stream->expected_size && stream->current_size == stream->expected_size)
res = gst_ts_demux_push_pending_data (demux, stream);
- } else
- gst_buffer_unref (packet->buffer);
+ }
return res;
}
if (stream) {
res = gst_ts_demux_handle_packet (demux, stream, packet, section);
- } else if (packet->buffer)
- gst_buffer_unref (packet->buffer);
- } else {
- if (packet->buffer)
- gst_buffer_unref (packet->buffer);
+ }
}
return res;
}