/* stream methods */
static void gst_matroska_demux_reset (GstElement * element);
+static gboolean perform_seek_to_offset (GstMatroskaDemux * demux,
+ guint64 offset);
GType gst_matroska_demux_get_type (void);
GST_BOILERPLATE (GstMatroskaDemux, gst_matroska_demux, GstEbmlRead,
demux->offset = 0;
demux->cluster_time = GST_CLOCK_TIME_NONE;
demux->cluster_offset = 0;
+ demux->index_offset = 0;
+ demux->seekable = FALSE;
+ demux->need_newsegment = FALSE;
+ demux->building_index = FALSE;
+ if (demux->seek_event) {
+ gst_event_unref (demux->seek_event);
+ demux->seek_event = NULL;
+ }
demux->from_offset = -1;
demux->to_offset = G_MAXINT64;
break;
}
- case GST_QUERY_SEEKING:{
+ case GST_QUERY_SEEKING:
+ {
GstFormat fmt;
- res = TRUE;
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
+ if (fmt == GST_FORMAT_TIME) {
+ gboolean seekable;
- if (fmt != GST_FORMAT_TIME || !demux->index) {
- gst_query_set_seeking (query, fmt, FALSE, -1, -1);
- } else {
- gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0,
- demux->segment.duration);
- }
+ if (demux->streaming) {
+ /* assuming we'll be able to get an index ... */
+ seekable = demux->seekable;
+ } else {
+ seekable = !!demux->index;
+ }
+ gst_query_set_seeking (query, GST_FORMAT_TIME, seekable,
+ 0, demux->segment.duration);
+ res = TRUE;
+ }
break;
}
default:
return track;
}
+static void
+gst_matroska_demux_reset_streams (GstMatroskaDemux * demux, GstClockTime time,
+ gboolean full)
+{
+ gint i;
+
+ GST_DEBUG_OBJECT (demux, "resetting stream state");
+
+ g_assert (demux->src->len == demux->num_streams);
+ for (i = 0; i < demux->src->len; i++) {
+ GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i);
+ context->pos = time;
+ context->set_discont = TRUE;
+ context->eos = FALSE;
+ context->from_time = GST_CLOCK_TIME_NONE;
+ if (full)
+ context->last_flow = GST_FLOW_OK;
+ }
+}
+
static gboolean
gst_matroska_demux_move_to_entry (GstMatroskaDemux * demux,
GstMatroskaIndex * entry, gboolean reset)
{
- gint i;
-
GST_OBJECT_LOCK (demux);
/* seek (relative to matroska segment) */
entry->block, GST_TIME_ARGS (entry->time));
/* update the time */
- g_assert (demux->src->len == demux->num_streams);
- for (i = 0; i < demux->src->len; i++) {
- GstMatroskaTrackContext *context = g_ptr_array_index (demux->src, i);
- context->pos = entry->time;
- context->set_discont = TRUE;
- context->last_flow = GST_FLOW_OK;
- context->eos = FALSE;
- context->from_time = GST_CLOCK_TIME_NONE;
- }
+ gst_matroska_demux_reset_streams (demux, entry->time, TRUE);
demux->segment.last_stop = entry->time;
demux->seek_block = entry->block;
demux->last_stop_end = GST_CLOCK_TIME_NONE;
GstSegment seeksegment = { 0, };
gboolean update;
- /* no seeking until we are (safely) ready */
- if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
- GST_DEBUG_OBJECT (demux, "not ready for seeking yet");
- return FALSE;
- }
-
if (pad)
track = gst_pad_get_element_private (pad);
GST_DEBUG_OBJECT (demux, "Seek position looks sane");
GST_OBJECT_UNLOCK (demux);
+ if (demux->streaming) {
+ /* need to seek to cluster start to pick up cluster time */
+ /* upstream takes care of flushing and all that
+ * ... and newsegment event handling takes care of the rest */
+ return perform_seek_to_offset (demux,
+ entry->pos + demux->ebml_segment_start);
+ }
+
flush = !!(flags & GST_SEEK_FLAG_FLUSH);
keyunit = !!(flags & GST_SEEK_FLAG_KEY_UNIT);
}
}
+/*
+ * Handle whether we can perform the seek event or if we have to let the chain
+ * function handle seeks to build the seek indexes first.
+ */
+static gboolean
+gst_matroska_demux_handle_seek_push (GstMatroskaDemux * demux, GstPad * pad,
+ GstEvent * event)
+{
+ GstSeekFlags flags;
+ GstSeekType cur_type, stop_type;
+ GstFormat format;
+ gdouble rate;
+ gint64 cur, stop;
+
+ gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
+ &stop_type, &stop);
+
+ /* sanity checks */
+
+ /* we can only seek on time */
+ if (format != GST_FORMAT_TIME) {
+ GST_DEBUG_OBJECT (demux, "Can only seek on TIME");
+ return FALSE;
+ }
+
+ if (stop_type != GST_SEEK_TYPE_NONE && stop != GST_CLOCK_TIME_NONE) {
+ GST_DEBUG_OBJECT (demux, "Seek end-time not supported in streaming mode");
+ return FALSE;
+ }
+
+ if (!(flags & GST_SEEK_FLAG_FLUSH)) {
+ GST_DEBUG_OBJECT (demux,
+ "Non-flushing seek not supported in streaming mode");
+ return FALSE;
+ }
+
+ if (flags & GST_SEEK_FLAG_SEGMENT) {
+ GST_DEBUG_OBJECT (demux, "Segment seek not supported in streaming mode");
+ return FALSE;
+ }
+
+ /* check for having parsed index already */
+ if (!demux->index_parsed) {
+ guint64 offset;
+ gboolean building_index;
+
+ if (!demux->index_offset) {
+ GST_DEBUG_OBJECT (demux, "no index (location); no seek in push mode");
+ return FALSE;
+ }
+
+ GST_OBJECT_LOCK (demux);
+ /* handle the seek event in the chain function */
+ demux->state = GST_MATROSKA_DEMUX_STATE_SEEK;
+ /* no more seek can be issued until state reset to _DATA */
+
+ /* copy the event */
+ if (demux->seek_event)
+ gst_event_unref (demux->seek_event);
+ demux->seek_event = gst_event_ref (event);
+
+ /* set the building_index flag so that only one thread can setup the
+ * structures for index seeking. */
+ building_index = demux->building_index;
+ if (!building_index) {
+ demux->building_index = TRUE;
+ offset = demux->index_offset;
+ }
+ GST_OBJECT_UNLOCK (demux);
+
+ if (!building_index) {
+ /* seek to the first subindex or legacy index */
+ GST_INFO_OBJECT (demux, "Seeking to Cues at %" G_GUINT64_FORMAT, offset);
+ return perform_seek_to_offset (demux, offset);
+ }
+
+ /* well, we are handling it already */
+ return TRUE;
+ }
+
+ /* delegate to tweaked regular seek */
+ return gst_matroska_demux_handle_seek_event (demux, pad, event);
+}
+
static gboolean
gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event)
{
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
- res = gst_matroska_demux_handle_seek_event (demux, pad, event);
+ /* no seeking until we are (safely) ready */
+ if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) {
+ GST_DEBUG_OBJECT (demux, "not ready for seeking yet");
+ return FALSE;
+ }
+ if (!demux->streaming)
+ res = gst_matroska_demux_handle_seek_event (demux, pad, event);
+ else
+ res = gst_matroska_demux_handle_seek_push (demux, pad, event);
gst_event_unref (event);
break;
lace_time = GST_CLOCK_TIME_NONE;
}
+ /* need to refresh segment info ASAP */
+ if (GST_CLOCK_TIME_IS_VALID (lace_time) && demux->need_newsegment) {
+ GST_DEBUG_OBJECT (demux,
+ "generating segment starting at %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (lace_time));
+ /* pretend we seeked here */
+ gst_segment_set_seek (&demux->segment, demux->segment.rate,
+ GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, lace_time,
+ GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE, NULL);
+ /* now convey our segment notion downstream */
+ gst_matroska_demux_send_event (demux, gst_event_new_new_segment (FALSE,
+ demux->segment.rate, demux->segment.format, demux->segment.start,
+ demux->segment.stop, demux->segment.start));
+ demux->need_newsegment = FALSE;
+ }
+
if (block_duration) {
if (stream->timecodescale == 1.0)
duration = block_duration * demux->time_scale;
break;
}
+ /* only pick up index location when streaming */
+ if (demux->streaming) {
+ if (seek_id == GST_MATROSKA_ID_CUES) {
+ demux->index_offset = seek_pos + demux->ebml_segment_start;
+ GST_DEBUG_OBJECT (demux, "Cues located at offset %" G_GUINT64_FORMAT,
+ demux->index_offset);
+ }
+ break;
+ }
+
/* seek */
if (gst_ebml_read_seek (ebml, seek_pos + demux->ebml_segment_start) !=
GST_FLOW_OK)
}
}
+/*
+ * Create and push a flushing seek event upstream
+ */
+static gboolean
+perform_seek_to_offset (GstMatroskaDemux * demux, guint64 offset)
+{
+ GstEvent *event;
+ gboolean res = 0;
+
+ GST_DEBUG_OBJECT (demux, "Seeking to %" G_GUINT64_FORMAT, offset);
+
+ event =
+ gst_event_new_seek (1.0, GST_FORMAT_BYTES,
+ GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
+ GST_SEEK_TYPE_NONE, -1);
+
+ res = gst_pad_push_event (demux->sinkpad, event);
+
+ /* newsegment event will update offset */
+ return res;
+}
+
+static void
+gst_matroska_demux_check_seekability (GstMatroskaDemux * demux)
+{
+ GstQuery *query;
+ gboolean seekable = FALSE;
+ gint64 start = -1, stop = -1;
+
+ query = gst_query_new_seeking (GST_FORMAT_BYTES);
+ if (!gst_pad_peer_query (demux->sinkpad, query)) {
+ GST_DEBUG_OBJECT (demux, "seeking query failed");
+ goto done;
+ }
+
+ gst_query_parse_seeking (query, NULL, &seekable, &start, &stop);
+
+ /* try harder to query upstream size if we didn't get it the first time */
+ if (seekable && stop == -1) {
+ GstFormat fmt = GST_FORMAT_BYTES;
+
+ GST_DEBUG_OBJECT (demux, "doing duration query to fix up unset stop");
+ gst_pad_query_peer_duration (demux->sinkpad, &fmt, &stop);
+ }
+
+ /* if upstream doesn't know the size, it's likely that it's not seekable in
+ * practice even if it technically may be seekable */
+ if (seekable && (start != 0 || stop <= start)) {
+ GST_DEBUG_OBJECT (demux, "seekable but unknown start/stop -> disable");
+ seekable = FALSE;
+ }
+
+done:
+ GST_INFO_OBJECT (demux, "seekable: %d (%" G_GUINT64_FORMAT " - %"
+ G_GUINT64_FORMAT ")", seekable, start, stop);
+ demux->seekable = seekable;
+
+ gst_query_unref (query);
+}
+
static inline void
gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes)
{
guint64 length;
const gchar *name;
+ if (G_UNLIKELY (GST_BUFFER_IS_DISCONT (buffer))) {
+ GST_DEBUG_OBJECT (demux, "got DISCONT");
+ gst_adapter_clear (demux->adapter);
+ gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, FALSE);
+ }
+
gst_adapter_push (demux->adapter, buffer);
buffer = NULL;
if (ret != GST_FLOW_OK)
goto parse_failed;
demux->state = GST_MATROSKA_DEMUX_STATE_HEADER;
+ gst_matroska_demux_check_seekability (demux);
break;
default:
goto invalid_header;
break;
case GST_MATROSKA_DEMUX_STATE_HEADER:
case GST_MATROSKA_DEMUX_STATE_DATA:
+ case GST_MATROSKA_DEMUX_STATE_SEEK:
switch (id) {
case GST_MATROSKA_ID_SEGMENT:
/* eat segment prefix */
name = "Cues";
goto skip;
case GST_MATROSKA_ID_SEEKHEAD:
- name = "SeekHead";
- goto skip;
+ gst_matroska_demux_take (demux, length + needed);
+ DEBUG_ELEMENT_START (demux, ebml, "SeekHead");
+ if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK)
+ ret = gst_matroska_demux_parse_contents (demux);
+ if (ret != GST_FLOW_OK)
+ goto exit;
+ break;
case GST_MATROSKA_ID_CUES:
- name = "Cues";
- goto skip;
+ if (demux->index_parsed) {
+ gst_matroska_demux_flush (demux, needed + length);
+ break;
+ }
+ gst_matroska_demux_take (demux, length + needed);
+ ret = gst_matroska_demux_parse_index (demux);
+ if (demux->state == GST_MATROSKA_DEMUX_STATE_SEEK) {
+ GstEvent *event;
+
+ GST_OBJECT_LOCK (demux);
+ event = demux->seek_event;
+ demux->seek_event = NULL;
+ GST_OBJECT_UNLOCK (demux);
+
+ g_assert (event);
+ /* unlikely to fail, since we managed to seek to this point */
+ if (!gst_matroska_demux_handle_seek_event (demux, NULL, event))
+ goto seek_failed;
+ /* resume data handling, main thread clear to seek again */
+ GST_OBJECT_LOCK (demux);
+ demux->state = GST_MATROSKA_DEMUX_STATE_DATA;
+ GST_OBJECT_UNLOCK (demux);
+ }
+ break;
default:
name = "Unknown";
skip:
GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header"));
return GST_FLOW_ERROR;
}
+seek_failed:
+ {
+ GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Failed to seek"));
+ return GST_FLOW_ERROR;
+ }
}
static gboolean
"received format %d newsegment %" GST_SEGMENT_FORMAT, format,
&segment);
- /* chain will send initial newsegment after pads have been added */
+ if (demux->state < GST_MATROSKA_DEMUX_STATE_DATA) {
+ GST_DEBUG_OBJECT (demux, "still starting");
+ goto exit;
+ }
+
+ /* we only expect a BYTE segment, e.g. following a seek */
+ if (format != GST_FORMAT_BYTES) {
+ GST_DEBUG_OBJECT (demux, "unsupported segment format, ignoring");
+ goto exit;
+ }
+
+ GST_DEBUG_OBJECT (demux, "clearing segment state");
+ /* clear current segment leftover */
+ gst_adapter_clear (demux->adapter);
+ /* and some streaming setup */
+ demux->offset = start;
+ /* do not know where we are;
+ * need to come across a cluster and generate newsegment */
+ demux->segment.last_stop = GST_CLOCK_TIME_NONE;
+ demux->cluster_time = GST_CLOCK_TIME_NONE;
+ demux->cluster_offset = 0;
+ demux->need_newsegment = TRUE;
+ /* but keep some of the upstream segment */
+ demux->segment.rate = rate;
+ exit:
+ /* chain will send initial newsegment after pads have been added,
+ * or otherwise come up with one */
GST_DEBUG_OBJECT (demux, "eating event");
gst_event_unref (event);
res = TRUE;
}
break;
}
+ case GST_EVENT_FLUSH_STOP:
+ {
+ gst_adapter_clear (demux->adapter);
+ gst_matroska_demux_reset_streams (demux, GST_CLOCK_TIME_NONE, TRUE);
+ demux->segment.last_stop = GST_CLOCK_TIME_NONE;
+ demux->cluster_time = GST_CLOCK_TIME_NONE;
+ demux->cluster_offset = 0;
+ /* fall-through */
+ }
default:
res = gst_pad_event_default (pad, event);
break;
static gboolean
gst_matroska_demux_sink_activate (GstPad * sinkpad)
{
+ GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (sinkpad));
+
if (gst_pad_check_pull_range (sinkpad)) {
GST_DEBUG ("going to pull mode");
+ demux->streaming = FALSE;
return gst_pad_activate_pull (sinkpad, TRUE);
} else {
GST_DEBUG ("going to push (streaming) mode");
+ demux->streaming = TRUE;
return gst_pad_activate_push (sinkpad, TRUE);
}