From 900ff7247e2192ce7dc4437c6f917847b8e90116 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 16 Dec 2009 12:43:27 +0100 Subject: [PATCH] matroskademux: support push based mode Fixes #598610. --- gst/matroska/ebml-read.c | 43 ++++- gst/matroska/ebml-read.h | 8 + gst/matroska/matroska-demux.c | 419 +++++++++++++++++++++++++++++++++++++++++- gst/matroska/matroska-demux.h | 8 + 4 files changed, 470 insertions(+), 8 deletions(-) diff --git a/gst/matroska/ebml-read.c b/gst/matroska/ebml-read.c index aa3da97..5a8eea5 100644 --- a/gst/matroska/ebml-read.c +++ b/gst/matroska/ebml-read.c @@ -171,6 +171,29 @@ gst_ebml_read_change_state (GstElement * element, GstStateChange transition) } /* + * Used in push mode. + * Provided buffer is used as cache, based on offset 0, and no further reads + * will be issued. + */ + +void +gst_ebml_read_reset_cache (GstEbmlRead * ebml, GstBuffer * buffer, + guint64 offset) +{ + if (ebml->cached_buffer) + gst_buffer_unref (ebml->cached_buffer); + + ebml->cached_buffer = buffer; + ebml->push_cache = TRUE; + buffer = gst_buffer_make_metadata_writable (buffer); + GST_BUFFER_OFFSET (buffer) = offset; + ebml->offset = offset; + g_list_foreach (ebml->level, (GFunc) gst_ebml_level_free, NULL); + g_list_free (ebml->level); + ebml->level = NULL; +} + +/* * Return: the amount of levels in the hierarchy that the * current element lies higher than the previous one. * The opposite isn't done - that's auto-done using master @@ -224,6 +247,13 @@ gst_ebml_read_peek_bytes (GstEbmlRead * ebml, guint size, GstBuffer ** p_buf, return GST_FLOW_OK; } /* not enough data in the cache, free cache and get a new one */ + /* never drop pushed cache */ + if (ebml->push_cache) { + if (ebml->offset == cache_offset + cache_size) + return GST_FLOW_END; + else + return GST_FLOW_UNEXPECTED; + } gst_buffer_unref (ebml->cached_buffer); ebml->cached_buffer = NULL; } @@ -437,8 +467,17 @@ gst_ebml_peek_id (GstEbmlRead * ebml, guint * level_up, guint32 * id) next: off = ebml->offset; /* save offset */ - if ((ret = gst_ebml_read_element_id (ebml, id, &level_up_tmp)) != GST_FLOW_OK) - return ret; + if ((ret = gst_ebml_read_element_id (ebml, id, &level_up_tmp)) != GST_FLOW_OK) { + if (ret != GST_FLOW_END) + return ret; + else { + /* simulate dummy VOID element, + * and have the call stack bail out all the way */ + *id = GST_EBML_ID_VOID; + *level_up = G_MAXUINT32 >> 2; + return GST_FLOW_OK; + } + } ebml->offset = off; /* restore offset */ diff --git a/gst/matroska/ebml-read.h b/gst/matroska/ebml-read.h index 2d5a5a1..d23d129 100644 --- a/gst/matroska/ebml-read.h +++ b/gst/matroska/ebml-read.h @@ -39,6 +39,9 @@ G_BEGIN_DECLS #define GST_EBML_READ_GET_CLASS(obj) \ (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_EBML_READ, GstEbmlReadClass)) +/* custom flow return code */ +#define GST_FLOW_END GST_FLOW_CUSTOM_SUCCESS + typedef struct _GstEbmlLevel { guint64 start; guint64 length; @@ -48,6 +51,7 @@ typedef struct _GstEbmlRead { GstElement parent; GstBuffer *cached_buffer; + gboolean push_cache; GstPad *sinkpad; guint64 offset; @@ -63,6 +67,10 @@ GType gst_ebml_read_get_type (void); void gst_ebml_level_free (GstEbmlLevel *level); +void gst_ebml_read_reset_cache (GstEbmlRead * ebml, + GstBuffer * buffer, + guint64 offset); + GstFlowReturn gst_ebml_peek_id (GstEbmlRead *ebml, guint *level_up, guint32 *id); diff --git a/gst/matroska/matroska-demux.c b/gst/matroska/matroska-demux.c index f684410..f5d3b52 100644 --- a/gst/matroska/matroska-demux.c +++ b/gst/matroska/matroska-demux.c @@ -149,6 +149,11 @@ static const GstQueryType *gst_matroska_demux_get_src_query_types (GstPad * static gboolean gst_matroska_demux_handle_src_query (GstPad * pad, GstQuery * query); +static gboolean gst_matroska_demux_handle_sink_event (GstPad * pad, + GstEvent * event); +static GstFlowReturn gst_matroska_demux_chain (GstPad * pad, + GstBuffer * buffer); + static GstStateChangeReturn gst_matroska_demux_change_state (GstElement * element, GstStateChange transition); @@ -208,6 +213,8 @@ gst_matroska_demux_finalize (GObject * object) demux->global_tags = NULL; } + g_object_unref (demux->adapter); + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -244,6 +251,10 @@ gst_matroska_demux_init (GstMatroskaDemux * demux, GST_DEBUG_FUNCPTR (gst_matroska_demux_sink_activate)); gst_pad_set_activatepull_function (demux->sinkpad, GST_DEBUG_FUNCPTR (gst_matroska_demux_sink_activate_pull)); + gst_pad_set_chain_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_matroska_demux_chain)); + gst_pad_set_event_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_matroska_demux_handle_sink_event)); gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); GST_EBML_READ (demux)->sinkpad = demux->sinkpad; @@ -255,6 +266,8 @@ gst_matroska_demux_init (GstMatroskaDemux * demux, demux->index = NULL; demux->global_tags = NULL; + demux->adapter = gst_adapter_new (); + /* finish off */ gst_matroska_demux_reset (GST_ELEMENT (demux)); } @@ -390,6 +403,10 @@ gst_matroska_demux_reset (GstElement * element) demux->duration = -1; demux->last_stop_end = GST_CLOCK_TIME_NONE; + demux->offset = 0; + demux->cluster_time = GST_CLOCK_TIME_NONE; + demux->cluster_offset = 0; + if (demux->close_segment) { gst_event_unref (demux->close_segment); demux->close_segment = NULL; @@ -2339,15 +2356,12 @@ gst_matroska_demux_handle_src_event (GstPad * pad, GstEvent * event) } static GstFlowReturn -gst_matroska_demux_init_stream (GstMatroskaDemux * demux) +gst_matroska_demux_parse_header (GstMatroskaDemux * demux) { GstEbmlRead *ebml = GST_EBML_READ (demux); - guint32 id; + GstFlowReturn ret; gchar *doctype; guint version; - GstFlowReturn ret; - - GST_DEBUG_OBJECT (demux, "Init stream"); if ((ret = gst_ebml_read_header (ebml, &doctype, &version)) != GST_FLOW_OK) return ret; @@ -2366,6 +2380,21 @@ gst_matroska_demux_init_stream (GstMatroskaDemux * demux) return GST_FLOW_ERROR; } + return ret; +} + +static GstFlowReturn +gst_matroska_demux_init_stream (GstMatroskaDemux * demux) +{ + GstEbmlRead *ebml = GST_EBML_READ (demux); + guint32 id; + GstFlowReturn ret; + + GST_DEBUG_OBJECT (demux, "Init stream"); + + if ((ret = gst_matroska_demux_parse_header (demux)) != GST_FLOW_OK) + return ret; + /* find segment, must be the next element but search as long as * we find it anyway */ while (TRUE) { @@ -5168,11 +5197,389 @@ pause: } } +static inline void +gst_matroska_demux_take (GstMatroskaDemux * demux, guint bytes) +{ + GstBuffer *buffer; + + GST_LOG_OBJECT (demux, "caching %d bytes for parsing", bytes); + buffer = gst_adapter_take_buffer (demux->adapter, bytes); + gst_ebml_read_reset_cache (GST_EBML_READ (demux), buffer, demux->offset); + demux->offset += bytes; +} + +static inline void +gst_matroska_demux_flush (GstMatroskaDemux * demux, guint flush) +{ + GST_LOG_OBJECT (demux, "skipping %d bytes", flush); + gst_adapter_flush (demux->adapter, flush); + demux->offset += flush; +} + +static GstFlowReturn +gst_matroska_demux_peek_id_length (GstMatroskaDemux * demux, guint32 * _id, + guint64 * _length, guint * _needed) +{ + guint avail, needed; + const guint8 *buf; + gint len_mask = 0x80, read = 1, n = 1, num_ffs = 0; + guint64 total; + guint8 b; + + g_return_val_if_fail (_id != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (_length != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (_needed != NULL, GST_FLOW_ERROR); + + /* well ... */ + *_id = (guint32) GST_EBML_SIZE_UNKNOWN; + *_length = GST_EBML_SIZE_UNKNOWN; + + /* read element id */ + needed = 2; + avail = gst_adapter_available (demux->adapter); + if (avail < needed) + goto exit; + + buf = gst_adapter_peek (demux->adapter, 1); + b = GST_READ_UINT8 (buf); + + total = (guint64) b; + while (read <= 4 && !(total & len_mask)) { + read++; + len_mask >>= 1; + } + if (G_UNLIKELY (read > 4)) + goto invalid_id; + /* need id and at least something for subsequent length */ + if ((needed = read + 1) > avail) + goto exit; + + buf = gst_adapter_peek (demux->adapter, needed); + while (n < read) { + b = GST_READ_UINT8 (buf + n); + total = (total << 8) | b; + ++n; + } + *_id = (guint32) total; + + /* read element length */ + b = GST_READ_UINT8 (buf + n); + total = (guint64) b; + len_mask = 0x80; + read = 1; + while (read <= 8 && !(total & len_mask)) { + read++; + len_mask >>= 1; + } + if (G_UNLIKELY (read > 8)) + goto invalid_length; + if ((needed += read - 1) > avail) + goto exit; + if ((total &= (len_mask - 1)) == len_mask - 1) + num_ffs++; + + buf = gst_adapter_peek (demux->adapter, needed); + buf += (needed - read); + n = 1; + while (n < read) { + guint8 b = GST_READ_UINT8 (buf + n); + + if (G_UNLIKELY (b == 0xff)) + num_ffs++; + total = (total << 8) | b; + ++n; + } + + if (G_UNLIKELY (read == num_ffs)) + *_length = G_MAXUINT64; + else + *_length = total; + *_length = total; + +exit: + *_needed = needed; + + return GST_FLOW_OK; + + /* ERRORS */ +invalid_id: + { + GST_ERROR_OBJECT (demux, + "Invalid EBML ID size tag (0x%x) at position %" G_GUINT64_FORMAT " (0x%" + G_GINT64_MODIFIER "x)", (guint) b, demux->offset, demux->offset); + return GST_FLOW_ERROR; + } +invalid_length: + { + GST_ERROR_OBJECT (demux, + "Invalid EBML length size tag (0x%x) at position %" G_GUINT64_FORMAT + " (0x%" G_GINT64_MODIFIER "x)", (guint) b, demux->offset, + demux->offset); + return GST_FLOW_ERROR; + } +} + +static GstFlowReturn +gst_matroska_demux_chain (GstPad * pad, GstBuffer * buffer) +{ + GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (pad)); + GstEbmlRead *ebml = GST_EBML_READ (demux); + guint available; + GstFlowReturn ret = GST_FLOW_OK; + guint needed; + guint32 id; + guint64 length; + gchar *name; + + gst_adapter_push (demux->adapter, buffer); + buffer = NULL; + +next: + available = gst_adapter_available (demux->adapter); + + ret = gst_matroska_demux_peek_id_length (demux, &id, &length, &needed); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto parse_failed; + + GST_LOG_OBJECT (demux, "Offset %" G_GUINT64_FORMAT ", Element id 0x%x, " + "size %" G_GUINT64_FORMAT ", needed %d, available %d", demux->offset, id, + length, needed, available); + + if (needed > available) + return GST_FLOW_OK; + + /* only a few blocks are expected/allowed to be large, + * and will be recursed into, whereas others must fit */ + if (G_LIKELY (id != GST_MATROSKA_ID_SEGMENT && id != GST_MATROSKA_ID_CLUSTER)) { + if (needed + length > available) + return GST_FLOW_OK; + /* probably happens with 'large pieces' at the end, so consider it EOS */ + if (G_UNLIKELY (length > 10 * 1024 * 1024)) { + GST_WARNING_OBJECT (demux, "forcing EOS due to size %" G_GUINT64_FORMAT, + length); + return GST_FLOW_UNEXPECTED; + } + } + + switch (demux->state) { + case GST_MATROSKA_DEMUX_STATE_START: + switch (id) { + case GST_EBML_ID_HEADER: + gst_matroska_demux_take (demux, length + needed); + ret = gst_matroska_demux_parse_header (demux); + if (ret != GST_FLOW_OK) + goto parse_failed; + demux->state = GST_MATROSKA_DEMUX_STATE_HEADER; + break; + default: + goto invalid_header; + break; + } + break; + case GST_MATROSKA_DEMUX_STATE_HEADER: + case GST_MATROSKA_DEMUX_STATE_DATA: + switch (id) { + case GST_MATROSKA_ID_SEGMENT: + /* eat segment prefix */ + gst_matroska_demux_flush (demux, needed); + GST_DEBUG_OBJECT (demux, + "Found Segment start at offset %" G_GUINT64_FORMAT, ebml->offset); + /* seeks are from the beginning of the segment, + * after the segment ID/length */ + demux->ebml_segment_start = demux->offset; + break; + case GST_MATROSKA_ID_SEGMENTINFO: + if (!demux->segmentinfo_parsed) { + gst_matroska_demux_take (demux, length + needed); + ret = gst_matroska_demux_parse_info (demux); + } else { + gst_matroska_demux_flush (demux, needed + length); + } + break; + case GST_MATROSKA_ID_TRACKS: + if (!demux->tracks_parsed) { + gst_matroska_demux_take (demux, length + needed); + ret = gst_matroska_demux_parse_tracks (demux); + } else { + gst_matroska_demux_flush (demux, needed + length); + } + break; + case GST_MATROSKA_ID_CLUSTER: + if (G_UNLIKELY (!demux->tracks_parsed)) { + GST_DEBUG_OBJECT (demux, "Cluster before Track"); + goto not_streamable; + } else if (demux->state == GST_MATROSKA_DEMUX_STATE_HEADER) { + demux->state = GST_MATROSKA_DEMUX_STATE_DATA; + GST_DEBUG_OBJECT (demux, "signaling no more pads"); + gst_element_no_more_pads (GST_ELEMENT (demux)); + /* send initial newsegment */ + gst_matroska_demux_send_event (demux, + gst_event_new_new_segment (FALSE, 1.0, + GST_FORMAT_TIME, 0, + (demux->segment.duration > + 0) ? demux->segment.duration : -1, 0)); + } + demux->cluster_time = GST_CLOCK_TIME_NONE; + demux->cluster_offset = demux->parent.offset; + /* eat cluster prefix */ + gst_matroska_demux_flush (demux, needed); + break; + case GST_MATROSKA_ID_CLUSTERTIMECODE: + { + guint64 num; + + gst_matroska_demux_take (demux, length + needed); + if ((ret = gst_ebml_read_uint (ebml, &id, &num)) != GST_FLOW_OK) + goto parse_failed; + GST_DEBUG_OBJECT (demux, "ClusterTimeCode: %" G_GUINT64_FORMAT, num); + demux->cluster_time = num; + break; + } + case GST_MATROSKA_ID_BLOCKGROUP: + gst_matroska_demux_take (demux, length + needed); + DEBUG_ELEMENT_START (demux, ebml, "BlockGroup"); + if ((ret = gst_ebml_read_master (ebml, &id)) == GST_FLOW_OK) { + ret = gst_matroska_demux_parse_blockgroup_or_simpleblock (demux, + demux->cluster_time, demux->cluster_offset, FALSE); + } + DEBUG_ELEMENT_STOP (demux, ebml, "BlockGroup", ret); + if (ret != GST_FLOW_OK) + goto exit; + break; + case GST_MATROSKA_ID_SIMPLEBLOCK: + gst_matroska_demux_take (demux, length + needed); + DEBUG_ELEMENT_START (demux, ebml, "SimpleBlock"); + ret = gst_matroska_demux_parse_blockgroup_or_simpleblock (demux, + demux->cluster_time, demux->cluster_offset, TRUE); + DEBUG_ELEMENT_STOP (demux, ebml, "SimpleBlock", ret); + if (ret != GST_FLOW_OK) + goto exit; + break; + case GST_MATROSKA_ID_ATTACHMENTS: + if (!demux->attachments_parsed) { + gst_matroska_demux_take (demux, length + needed); + ret = gst_matroska_demux_parse_attachments (demux); + } else { + gst_matroska_demux_flush (demux, needed + length); + } + break; + case GST_MATROSKA_ID_TAGS: + gst_matroska_demux_take (demux, length + needed); + ret = gst_matroska_demux_parse_metadata (demux); + break; + case GST_MATROSKA_ID_CHAPTERS: + name = "Cues"; + goto skip; + case GST_MATROSKA_ID_SEEKHEAD: + name = "SeekHead"; + goto skip; + case GST_MATROSKA_ID_CUES: + name = "Cues"; + goto skip; + default: + name = "Unknown"; + skip: + GST_DEBUG_OBJECT (demux, "skipping Element 0x%x (%s)", id, name); + gst_matroska_demux_flush (demux, needed + length); + break; + } + break; + } + + if (ret != GST_FLOW_OK) + goto parse_failed; + else + goto next; + +exit: + return ret; + + /* ERRORS */ +parse_failed: + { + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), + ("Failed to parse Element 0x%x", id)); + return GST_FLOW_ERROR; + } +not_streamable: + { + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), + ("File layout does not permit streaming")); + return GST_FLOW_ERROR; + } +invalid_header: + { + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, (NULL), ("Invalid header")); + return GST_FLOW_ERROR; + } +} + +static gboolean +gst_matroska_demux_handle_sink_event (GstPad * pad, GstEvent * event) +{ + gboolean res = TRUE; + GstMatroskaDemux *demux = GST_MATROSKA_DEMUX (GST_PAD_PARENT (pad)); + + GST_DEBUG_OBJECT (demux, + "have event type %s: %p on sink pad", GST_EVENT_TYPE_NAME (event), event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_NEWSEGMENT: + { + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time = 0; + gboolean update; + GstSegment segment; + + /* some debug output */ + gst_segment_init (&segment, GST_FORMAT_UNDEFINED); + gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, + &start, &stop, &time); + gst_segment_set_newsegment_full (&segment, update, rate, arate, format, + start, stop, time); + GST_DEBUG_OBJECT (demux, + "received format %d newsegment %" GST_SEGMENT_FORMAT, format, + &segment); + + /* chain will send initial newsegment after pads have been added */ + GST_DEBUG_OBJECT (demux, "eating event"); + gst_event_unref (event); + res = TRUE; + break; + } + case GST_EVENT_EOS: + { + if (demux->state != GST_MATROSKA_DEMUX_STATE_DATA) { + gst_event_unref (event); + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, + (NULL), ("got eos and didn't receive a complete header object")); + } else if (demux->num_streams == 0) { + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, + (NULL), ("got eos but no streams (yet)")); + } else { + gst_matroska_demux_send_event (demux, event); + } + break; + } + default: + res = gst_pad_event_default (pad, event); + break; + } + + return res; +} + static gboolean gst_matroska_demux_sink_activate (GstPad * sinkpad) { - if (gst_pad_check_pull_range (sinkpad)) + if (gst_pad_check_pull_range (sinkpad)) { + GST_DEBUG ("going to pull mode"); return gst_pad_activate_pull (sinkpad, TRUE); + } else { + GST_DEBUG ("going to push (streaming) mode"); + return gst_pad_activate_push (sinkpad, TRUE); + } return FALSE; } diff --git a/gst/matroska/matroska-demux.h b/gst/matroska/matroska-demux.h index 04a3828..59a3eb9 100644 --- a/gst/matroska/matroska-demux.h +++ b/gst/matroska/matroska-demux.h @@ -23,6 +23,7 @@ #define __GST_MATROSKA_DEMUX_H__ #include +#include #include "ebml-read.h" #include "matroska-ids.h" @@ -97,6 +98,13 @@ typedef struct _GstMatroskaDemux { GstEvent *close_segment; GstEvent *new_segment; GstTagList *global_tags; + + /* push based mode usual suspects */ + guint64 offset; + GstAdapter *adapter; + /* some state saving */ + GstClockTime cluster_time; + guint64 cluster_offset; } GstMatroskaDemux; typedef struct _GstMatroskaDemuxClass { -- 2.7.4