stream->PAT.version_number = -1;
stream->PMT_pid = FLUTS_MAX_PID + 1;
stream->flags |= FLUTS_STREAM_FLAG_STREAM_TYPE_UNKNOWN;
-
+ stream->pes_buffer_in_sync = FALSE;
switch (PID) {
/* check for fixed mapping */
case PID_PROGRAM_ASSOCIATION_TABLE:
if (stream->pes_buffer) {
GST_BUFFER_SIZE (stream->pes_buffer) = stream->pes_buffer_used;
ret = gst_pes_filter_push (&stream->filter, stream->pes_buffer);
+ if (ret == GST_FLOW_LOST_SYNC)
+ stream->pes_buffer_in_sync = FALSE;
stream->pes_buffer = NULL;
}
return ret;
stream->pes_buffer_size <<= 1;
ret = gst_fluts_stream_pes_buffer_flush (stream);
+ if (ret == GST_FLOW_LOST_SYNC)
+ goto done;
}
if (G_UNLIKELY (!stream->pes_buffer)) {
memcpy (out_data, in_data, in_size);
#endif
stream->pes_buffer_used += in_size;
-
+done:
return ret;
}
for (i = 0; i < FLUTS_MAX_PID + 1; i++) {
GstFluTSStream *stream = demux->streams[i];
if (stream && stream->pad) {
- ret = gst_fluts_stream_pes_buffer_flush (stream);
- if (G_UNLIKELY (ret == GST_FLOW_OK))
- goto done;
+ gst_fluts_stream_pes_buffer_flush (stream);
+ stream->pes_buffer_in_sync = FALSE;
}
}
+ return ret;
+}
-done:
+static FORCE_INLINE GstFlowReturn
+gst_fluts_demux_push_fragment (GstFluTSStream * stream,
+ const guint8 * in_data, guint in_size)
+{
+ GstFlowReturn ret;
+ GstBuffer *es_buf = gst_buffer_new_and_alloc (in_size);
+#ifdef USE_LIBOIL
+ oil_memcpy (GST_BUFFER_DATA (es_buf), in_data, in_size);
+#else
+ memcpy (GST_BUFFER_DATA (es_buf), in_data, in_size);
+#endif
+ ret = gst_pes_filter_push (&stream->filter, es_buf);
+
+ /* If PES filter return ok then PES fragment buffering
+ * can be enabled */
+ if (ret == GST_FLOW_OK)
+ stream->pes_buffer_in_sync = TRUE;
+ else if (ret == GST_FLOW_LOST_SYNC)
+ stream->pes_buffer_in_sync = FALSE;
return ret;
}
"bytes of %u bytes in the PES buffer",
PID, stream->pes_buffer_used, stream->pes_buffer_size);
/* Flush buffered PES data */
- ret = gst_fluts_stream_pes_buffer_flush (stream);
+ gst_fluts_stream_pes_buffer_flush (stream);
gst_pes_filter_drain (&stream->filter);
/* Resize the buffer to half if no overflow detected and
* had been used less than half of it */
GST_DEBUG_OBJECT (demux, "PES buffer size reduced to %u bytes",
stream->pes_buffer_size);
}
- if (ret == GST_FLOW_LOST_SYNC)
- goto done;
+ /* mark the stream not in sync to give a chance on PES filter to
+ * detect lost sync */
+ stream->pes_buffer_in_sync = FALSE;
stream->pes_buffer_overflow = FALSE;
}
GST_LOG_OBJECT (demux, "Elementary packet of size %u for PID 0x%04x",
datalen, PID);
if (datalen > 0) {
- ret = gst_fluts_stream_pes_buffer_push (stream, data, datalen);
+ if (!stream->pes_buffer_in_sync) {
+ /* Push the first fragment to PES filter to have a chance to
+ * detect GST_FLOW_LOST_SYNC.
+ */
+ GST_LOG_OBJECT (demux, "fragment directly pushed to PES filter");
+ ret = gst_fluts_demux_push_fragment (stream, data, datalen);
+ } else {
+ /* Otherwhise we buffer the PES fragment */
+ ret = gst_fluts_stream_pes_buffer_push (stream, data, datalen);
+ /* If sync is lost here is due a pes_buffer_flush and we can try
+ * to resync in the PES filter with the current fragment
+ */
+ if (ret == GST_FLOW_LOST_SYNC) {
+ GST_LOG_OBJECT (demux, "resync, fragment pushed to PES filter");
+ ret = gst_fluts_demux_push_fragment (stream, data, datalen);
+ }
+ }
+
break;
} else {
GST_WARNING_OBJECT (demux, "overflow of datalen: %u so skipping",
while (ptr_data <= end_scan && sync_count < LENGHT_SYNC_LUT) {
/* if sync code is found try to store it in the LUT */
if (G_LIKELY (IS_MPEGTS_SYNC (ptr_data))) {
- demux->sync_lut[sync_count] = ptr_data;
- sync_count++;
/* skip paketsize bytes and try find next */
- ptr_data += demux->packetsize;
+ guint8 *next_sync = ptr_data + demux->packetsize;
+ if (next_sync < end_scan) {
+ demux->sync_lut[sync_count] = ptr_data;
+ sync_count++;
+ ptr_data += demux->packetsize;
+ } else
+ goto done;
+
} else {
ptr_data++;
}
}
-
+done:
*flush = ptr_data - in_data;
return sync_count;
gint i;
guint sync_count;
- if (GST_BUFFER_IS_DISCONT (buffer))
+ if (GST_BUFFER_IS_DISCONT (buffer)) {
+ /* Flush buffered PES data */
+ gst_fluts_demux_pes_buffer_flush (demux);
gst_adapter_clear (demux->adapter);
-
+ }
/* first push the new buffer into the adapter */
gst_adapter_push (demux->adapter, buffer);
/* process all packets */
for (i = 0; i < sync_count; i++) {
ret = gst_fluts_demux_parse_transport_packet (demux, demux->sync_lut[i]);
+ if (G_UNLIKELY (ret == GST_FLOW_LOST_SYNC)) {
+ flush = demux->sync_lut[i] - data + 1;
+ ret = GST_FLOW_OK;
+ continue;
+ }
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
- if (ret == GST_FLOW_LOST_SYNC) {
- ret = GST_FLOW_OK;
- continue;
- }
flush = demux->sync_lut[i] - data + demux->packetsize;
+ flush = MIN (avail, flush);
goto done;
}
}
done:
/* flush processed data */
- gst_adapter_flush (demux->adapter, flush);
+ if (flush) {
+ GST_DEBUG_OBJECT (demux, "flushing %d/%d", flush, avail);
+ gst_adapter_flush (demux->adapter, flush);
+ }
gst_object_unref (demux);