From ca9f62e1d062ac1e87f13c9ddddab9164a3cb892 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 28 Mar 2016 13:45:36 +0300 Subject: [PATCH] adaptivedemux: Get rid of internal stream adapter and let subclasses handle this directly This allows subclasses to have more control and especially ensure that they push data downstream with the correct offsets. https://bugzilla.gnome.org/show_bug.cgi?id=764684 --- ext/dash/gstdashdemux.c | 37 ++++++++----- ext/dash/gstdashdemux.h | 1 + ext/hls/gsthlsdemux.c | 76 ++++++++++++--------------- ext/hls/gsthlsdemux.h | 9 ++-- gst-libs/gst/adaptivedemux/gstadaptivedemux.c | 21 ++------ gst-libs/gst/adaptivedemux/gstadaptivedemux.h | 7 ++- 6 files changed, 71 insertions(+), 80 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 42e11b5..1f562b4 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -253,7 +253,7 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream * static void gst_dash_demux_advance_period (GstAdaptiveDemux * demux); static gboolean gst_dash_demux_has_next_period (GstAdaptiveDemux * demux); static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); + GstAdaptiveDemuxStream * stream, GstBuffer * buffer); static GstFlowReturn gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); @@ -631,6 +631,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) } gst_isoff_sidx_parser_init (&stream->sidx_parser); + if (gst_mpd_client_has_isoff_ondemand_profile (demux->client)) + stream->sidx_adapter = gst_adapter_new (); } return TRUE; @@ -1301,6 +1303,7 @@ gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream, /* if we switched, we need a new index */ gst_isoff_sidx_parser_clear (&dashstream->sidx_parser); gst_isoff_sidx_parser_init (&dashstream->sidx_parser); + gst_adapter_clear (dashstream->sidx_adapter); } } @@ -1388,6 +1391,8 @@ gst_dash_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) if (flags & GST_SEEK_FLAG_FLUSH) { gst_isoff_sidx_parser_clear (&dashstream->sidx_parser); gst_isoff_sidx_parser_init (&dashstream->sidx_parser); + if (dashstream->sidx_adapter) + gst_adapter_clear (dashstream->sidx_adapter); } gst_dash_demux_stream_seek (iter->data, rate >= 0, 0, target_pos, NULL); } @@ -1631,24 +1636,25 @@ gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux, static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream; GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (demux); GstFlowReturn ret = GST_FLOW_OK; - GstBuffer *buffer; - gsize available; if (!gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) - return GST_ADAPTIVE_DEMUX_CLASS (parent_class)->data_received (demux, - stream); + return gst_adaptive_demux_stream_push_buffer (stream, buffer); + + gst_adapter_push (dash_stream->sidx_adapter, buffer); + buffer = NULL; if (stream->downloading_index) { GstIsoffParserResult res; guint consumed; + gsize available; - available = gst_adapter_available (stream->adapter); - buffer = gst_adapter_take_buffer (stream->adapter, available); + available = gst_adapter_available (dash_stream->sidx_adapter); + buffer = gst_adapter_take_buffer (dash_stream->sidx_adapter, available); if (dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) { res = @@ -1677,23 +1683,24 @@ gst_dash_demux_data_received (GstAdaptiveDemux * demux, /* we still need to keep some data around for the next parsing round * so just push what was already processed by the parser */ pending = _gst_buffer_split (buffer, consumed, -1); - gst_adapter_push (stream->adapter, pending); + gst_adapter_push (dash_stream->sidx_adapter, pending); } } } ret = gst_adaptive_demux_stream_push_buffer (stream, buffer); } else if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) { + gsize available; while (ret == GST_FLOW_OK - && ((available = gst_adapter_available (stream->adapter)) > 0)) { + && ((available = gst_adapter_available (dash_stream->sidx_adapter)) > 0)) { gboolean advance = FALSE; if (available < dash_stream->sidx_current_remaining) { - buffer = gst_adapter_take_buffer (stream->adapter, available); + buffer = gst_adapter_take_buffer (dash_stream->sidx_adapter, available); dash_stream->sidx_current_remaining -= available; } else { buffer = - gst_adapter_take_buffer (stream->adapter, + gst_adapter_take_buffer (dash_stream->sidx_adapter, dash_stream->sidx_current_remaining); dash_stream->sidx_current_remaining = 0; advance = TRUE; @@ -1714,8 +1721,8 @@ gst_dash_demux_data_received (GstAdaptiveDemux * demux, /* this should be the main header, just push it all */ ret = gst_adaptive_demux_stream_push_buffer (stream, - gst_adapter_take_buffer (stream->adapter, - gst_adapter_available (stream->adapter))); + gst_adapter_take_buffer (dash_stream->sidx_adapter, + gst_adapter_available (dash_stream->sidx_adapter))); } return ret; @@ -1727,6 +1734,8 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream) GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream; gst_isoff_sidx_parser_clear (&dash_stream->sidx_parser); + if (dash_stream->sidx_adapter) + g_object_unref (dash_stream->sidx_adapter); } static GstDashDemuxClockDrift * diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 5ed9620..326d6f9 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -67,6 +67,7 @@ struct _GstDashDemuxStream GstMediaFragmentInfo current_fragment; /* index parsing */ + GstAdapter *sidx_adapter; GstSidxParser sidx_parser; gsize sidx_current_remaining; gint sidx_index; diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index 43ecdb7..fed487a 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -92,7 +92,7 @@ gst_hls_demux_start_fragment (GstAdaptiveDemux * demux, static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); + GstAdaptiveDemuxStream * stream, GstBuffer * buffer); static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream * @@ -114,6 +114,7 @@ gst_hls_demux_finalize (GObject * obj) GstHLSDemux *demux = GST_HLS_DEMUX (obj); gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux)); + g_object_unref (demux->pending_encrypted_data); gst_m3u8_client_free (demux->client); G_OBJECT_CLASS (parent_class)->finalize (obj); @@ -172,6 +173,7 @@ static void gst_hls_demux_init (GstHLSDemux * demux) { demux->do_typefind = TRUE; + demux->pending_encrypted_data = gst_adapter_new (); } static GstStateChangeReturn @@ -520,6 +522,7 @@ key_failed: } } +/* Handles decrypted buffers only */ static GstFlowReturn gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer, gboolean force) @@ -532,6 +535,10 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, guint buffer_size; GstTypeFindProbability prob = GST_TYPE_FIND_NONE; + if (hlsdemux->pending_typefind_buffer) + buffer = gst_buffer_append (hlsdemux->pending_typefind_buffer, buffer); + hlsdemux->pending_typefind_buffer = NULL; + gst_buffer_map (buffer, &info, GST_MAP_READ); buffer_size = info.size; @@ -553,11 +560,7 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, gst_buffer_unref (buffer); return GST_FLOW_NOT_NEGOTIATED; } else { - if (hlsdemux->pending_buffer) - hlsdemux->pending_buffer = - gst_buffer_append (buffer, hlsdemux->pending_buffer); - else - hlsdemux->pending_buffer = buffer; + hlsdemux->pending_typefind_buffer = buffer; return GST_FLOW_OK; } } @@ -569,6 +572,8 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, hlsdemux->do_typefind = FALSE; } + g_assert (hlsdemux->pending_typefind_buffer == NULL); + if (buffer) return gst_adaptive_demux_stream_push_buffer (stream, buffer); return GST_FLOW_OK; @@ -584,35 +589,30 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, if (hlsdemux->current_key) gst_hls_demux_decrypt_end (hlsdemux); - /* ideally this should be empty, but this eos might have been - * caused by an error on the source element */ - GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received" - ": %" G_GSIZE_FORMAT, gst_adapter_available (stream->adapter)); - gst_adapter_clear (stream->adapter); - if (stream->last_ret == GST_FLOW_OK) { - if (hlsdemux->pending_buffer) { + if (hlsdemux->pending_decrypted_buffer) { if (hlsdemux->current_key) { GstMapInfo info; gssize unpadded_size; /* Handle pkcs7 unpadding here */ - gst_buffer_map (hlsdemux->pending_buffer, &info, GST_MAP_READ); + gst_buffer_map (hlsdemux->pending_decrypted_buffer, &info, + GST_MAP_READ); unpadded_size = info.size - info.data[info.size - 1]; - gst_buffer_unmap (hlsdemux->pending_buffer, &info); + gst_buffer_unmap (hlsdemux->pending_decrypted_buffer, &info); - gst_buffer_resize (hlsdemux->pending_buffer, 0, unpadded_size); + gst_buffer_resize (hlsdemux->pending_decrypted_buffer, 0, + unpadded_size); } ret = - gst_hls_demux_handle_buffer (demux, stream, hlsdemux->pending_buffer, - TRUE); - hlsdemux->pending_buffer = NULL; + gst_hls_demux_handle_buffer (demux, stream, + hlsdemux->pending_decrypted_buffer, TRUE); + hlsdemux->pending_decrypted_buffer = NULL; } } else { - if (hlsdemux->pending_buffer) - gst_buffer_unref (hlsdemux->pending_buffer); - hlsdemux->pending_buffer = NULL; + gst_buffer_replace (&hlsdemux->pending_decrypted_buffer, NULL); + gst_adapter_clear (hlsdemux->pending_encrypted_data); } if (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED) @@ -623,27 +623,27 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); - gsize available; - GstBuffer *buffer = NULL; - - available = gst_adapter_available (stream->adapter); /* Is it encrypted? */ if (hlsdemux->current_key) { GError *err = NULL; + gsize size; GstBuffer *tmp_buffer; + gst_adapter_push (hlsdemux->pending_encrypted_data, buffer); + size = gst_adapter_available (hlsdemux->pending_encrypted_data); + /* must be a multiple of 16 */ - available = available & (~0xF); + size = size & (~0xF); - if (available == 0) { + if (size == 0) { return GST_FLOW_OK; } - buffer = gst_adapter_take_buffer (stream->adapter, available); + buffer = gst_adapter_take_buffer (hlsdemux->pending_encrypted_data, size); buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err); if (buffer == NULL) { GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"), @@ -652,15 +652,9 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux, return GST_FLOW_ERROR; } - tmp_buffer = hlsdemux->pending_buffer; - hlsdemux->pending_buffer = buffer; + tmp_buffer = hlsdemux->pending_decrypted_buffer; + hlsdemux->pending_decrypted_buffer = buffer; buffer = tmp_buffer; - } else { - buffer = gst_adapter_take_buffer (stream->adapter, available); - if (hlsdemux->pending_buffer) { - buffer = gst_buffer_append (hlsdemux->pending_buffer, buffer); - hlsdemux->pending_buffer = NULL; - } } return gst_hls_demux_handle_buffer (demux, stream, buffer, FALSE); @@ -779,9 +773,9 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux) demux->client = gst_m3u8_client_new ("", NULL); demux->srcpad_counter = 0; - if (demux->pending_buffer) - gst_buffer_unref (demux->pending_buffer); - demux->pending_buffer = NULL; + gst_adapter_clear (demux->pending_encrypted_data); + gst_buffer_replace (&demux->pending_decrypted_buffer, NULL); + gst_buffer_replace (&demux->pending_typefind_buffer, NULL); if (demux->current_key) { g_free (demux->current_key); demux->current_key = NULL; diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index 1154236..da84069 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -84,11 +84,10 @@ struct _GstHLSDemux #endif gchar *current_key; guint8 *current_iv; - GstBuffer *pending_buffer; /* decryption scenario: - * the last buffer can only be pushed when - * resized, so need to store and wait for - * EOS to know it is the last */ - + GstAdapter *pending_encrypted_data; /* for chunking data into 16 byte multiples for decryption */ + GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding. + We only know that it is the last at EOS */ + GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */ gboolean reset_pts; }; diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index c021577..4e1f2b9 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -273,7 +273,7 @@ gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream * stream, GstFlowReturn ret, GError * err); static GstFlowReturn gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); + GstAdaptiveDemuxStream * stream, GstBuffer * buffer); static GstFlowReturn gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); @@ -1117,7 +1117,6 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad) gst_segment_init (&stream->segment, GST_FORMAT_TIME); g_cond_init (&stream->fragment_download_cond); g_mutex_init (&stream->fragment_download_lock); - stream->adapter = gst_adapter_new (); demux->next_streams = g_list_append (demux->next_streams, stream); @@ -1221,8 +1220,6 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream) g_clear_pointer (&stream->pending_tags, gst_tag_list_unref); - g_object_unref (stream->adapter); - g_free (stream); } @@ -1722,7 +1719,6 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux) stream->download_error_count = 0; stream->need_header = TRUE; - gst_adapter_clear (stream->adapter); } } @@ -2019,12 +2015,8 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, */ static GstFlowReturn gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { - GstBuffer *buffer; - - buffer = gst_adapter_take_buffer (stream->adapter, - gst_adapter_available (stream->adapter)); return gst_adaptive_demux_stream_push_buffer (stream, buffer); } @@ -2124,12 +2116,10 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) stream->download_chunk_start_time; stream->download_total_bytes += gst_buffer_get_size (buffer); - gst_adapter_push (stream->adapter, buffer); - GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT - ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer), - gst_adapter_available (stream->adapter)); + GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT, + gst_buffer_get_size (buffer)); - ret = klass->data_received (demux, stream); + ret = klass->data_received (demux, stream, buffer); if (ret == GST_FLOW_FLUSHING) { /* do not make any changes if the stream is cancelled */ @@ -3302,7 +3292,6 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, if (gst_adaptive_demux_stream_select_bitrate (demux, stream, gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) { stream->need_header = TRUE; - gst_adapter_clear (stream->adapter); ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH; } diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index 8fcc47a..3db54bb 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -120,8 +120,6 @@ struct _GstAdaptiveDemuxStream GstSegment segment; - GstAdapter *adapter; - GstCaps *pending_caps; GstEvent *pending_segment; GstTagList *pending_tags; @@ -385,13 +383,14 @@ struct _GstAdaptiveDemuxClass * data_received: * @demux: #GstAdaptiveDemux * @stream: #GstAdaptiveDemuxStream + * @buffer: #GstBuffer * * Notifies the subclass that a fragment chunk was downloaded. The subclass - * can look at the data at the adapter and modify/push data as desired. + * can look at the data and modify/push data as desired. * * Returns: #GST_FLOW_OK if successful, #GST_FLOW_ERROR in case of error. */ - GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); + GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer); /** * get_live_seek_range: -- 2.7.4