adaptivedemux: Get rid of internal stream adapter and let subclasses handle this...
authorSebastian Dröge <sebastian@centricular.com>
Mon, 28 Mar 2016 10:45:36 +0000 (13:45 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 1 Jul 2016 12:10:31 +0000 (14:10 +0200)
This allows subclasses to have more control and especially ensure that they
push data downstream with the correct offsets.

https://bugzilla.gnome.org/show_bug.cgi?id=764684

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h
ext/hls/gsthlsdemux.c
ext/hls/gsthlsdemux.h
gst-libs/gst/adaptivedemux/gstadaptivedemux.c
gst-libs/gst/adaptivedemux/gstadaptivedemux.h

index 42e11b5..1f562b4 100644 (file)
@@ -253,7 +253,7 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream *
 static void gst_dash_demux_advance_period (GstAdaptiveDemux * demux);
 static gboolean gst_dash_demux_has_next_period (GstAdaptiveDemux * demux);
 static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux,
-    GstAdaptiveDemuxStream * stream);
+    GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
 static GstFlowReturn
 gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux,
     GstAdaptiveDemuxStream * stream);
@@ -631,6 +631,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
     }
 
     gst_isoff_sidx_parser_init (&stream->sidx_parser);
+    if (gst_mpd_client_has_isoff_ondemand_profile (demux->client))
+      stream->sidx_adapter = gst_adapter_new ();
   }
 
   return TRUE;
@@ -1301,6 +1303,7 @@ gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream,
       /* if we switched, we need a new index */
       gst_isoff_sidx_parser_clear (&dashstream->sidx_parser);
       gst_isoff_sidx_parser_init (&dashstream->sidx_parser);
+      gst_adapter_clear (dashstream->sidx_adapter);
     }
   }
 
@@ -1388,6 +1391,8 @@ gst_dash_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
     if (flags & GST_SEEK_FLAG_FLUSH) {
       gst_isoff_sidx_parser_clear (&dashstream->sidx_parser);
       gst_isoff_sidx_parser_init (&dashstream->sidx_parser);
+      if (dashstream->sidx_adapter)
+        gst_adapter_clear (dashstream->sidx_adapter);
     }
     gst_dash_demux_stream_seek (iter->data, rate >= 0, 0, target_pos, NULL);
   }
@@ -1631,24 +1636,25 @@ gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux,
 
 static GstFlowReturn
 gst_dash_demux_data_received (GstAdaptiveDemux * demux,
-    GstAdaptiveDemuxStream * stream)
+    GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
 {
   GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
   GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (demux);
   GstFlowReturn ret = GST_FLOW_OK;
-  GstBuffer *buffer;
-  gsize available;
 
   if (!gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client))
-    return GST_ADAPTIVE_DEMUX_CLASS (parent_class)->data_received (demux,
-        stream);
+    return gst_adaptive_demux_stream_push_buffer (stream, buffer);
+
+  gst_adapter_push (dash_stream->sidx_adapter, buffer);
+  buffer = NULL;
 
   if (stream->downloading_index) {
     GstIsoffParserResult res;
     guint consumed;
+    gsize available;
 
-    available = gst_adapter_available (stream->adapter);
-    buffer = gst_adapter_take_buffer (stream->adapter, available);
+    available = gst_adapter_available (dash_stream->sidx_adapter);
+    buffer = gst_adapter_take_buffer (dash_stream->sidx_adapter, available);
 
     if (dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) {
       res =
@@ -1677,23 +1683,24 @@ gst_dash_demux_data_received (GstAdaptiveDemux * demux,
           /* we still need to keep some data around for the next parsing round
            * so just push what was already processed by the parser */
           pending = _gst_buffer_split (buffer, consumed, -1);
-          gst_adapter_push (stream->adapter, pending);
+          gst_adapter_push (dash_stream->sidx_adapter, pending);
         }
       }
     }
     ret = gst_adaptive_demux_stream_push_buffer (stream, buffer);
   } else if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
+    gsize available;
 
     while (ret == GST_FLOW_OK
-        && ((available = gst_adapter_available (stream->adapter)) > 0)) {
+        && ((available = gst_adapter_available (dash_stream->sidx_adapter)) > 0)) {
       gboolean advance = FALSE;
 
       if (available < dash_stream->sidx_current_remaining) {
-        buffer = gst_adapter_take_buffer (stream->adapter, available);
+        buffer = gst_adapter_take_buffer (dash_stream->sidx_adapter, available);
         dash_stream->sidx_current_remaining -= available;
       } else {
         buffer =
-            gst_adapter_take_buffer (stream->adapter,
+            gst_adapter_take_buffer (dash_stream->sidx_adapter,
             dash_stream->sidx_current_remaining);
         dash_stream->sidx_current_remaining = 0;
         advance = TRUE;
@@ -1714,8 +1721,8 @@ gst_dash_demux_data_received (GstAdaptiveDemux * demux,
     /* this should be the main header, just push it all */
     ret =
         gst_adaptive_demux_stream_push_buffer (stream,
-        gst_adapter_take_buffer (stream->adapter,
-            gst_adapter_available (stream->adapter)));
+        gst_adapter_take_buffer (dash_stream->sidx_adapter,
+            gst_adapter_available (dash_stream->sidx_adapter)));
   }
 
   return ret;
@@ -1727,6 +1734,8 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream)
   GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
 
   gst_isoff_sidx_parser_clear (&dash_stream->sidx_parser);
+  if (dash_stream->sidx_adapter)
+    g_object_unref (dash_stream->sidx_adapter);
 }
 
 static GstDashDemuxClockDrift *
index 5ed9620..326d6f9 100644 (file)
@@ -67,6 +67,7 @@ struct _GstDashDemuxStream
   GstMediaFragmentInfo current_fragment;
 
   /* index parsing */
+  GstAdapter *sidx_adapter;
   GstSidxParser sidx_parser;
   gsize sidx_current_remaining;
   gint sidx_index;
index 43ecdb7..fed487a 100644 (file)
@@ -92,7 +92,7 @@ gst_hls_demux_start_fragment (GstAdaptiveDemux * demux,
 static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
     GstAdaptiveDemuxStream * stream);
 static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux,
-    GstAdaptiveDemuxStream * stream);
+    GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
 static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream *
     stream);
 static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream *
@@ -114,6 +114,7 @@ gst_hls_demux_finalize (GObject * obj)
   GstHLSDemux *demux = GST_HLS_DEMUX (obj);
 
   gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux));
+  g_object_unref (demux->pending_encrypted_data);
   gst_m3u8_client_free (demux->client);
 
   G_OBJECT_CLASS (parent_class)->finalize (obj);
@@ -172,6 +173,7 @@ static void
 gst_hls_demux_init (GstHLSDemux * demux)
 {
   demux->do_typefind = TRUE;
+  demux->pending_encrypted_data = gst_adapter_new ();
 }
 
 static GstStateChangeReturn
@@ -520,6 +522,7 @@ key_failed:
   }
 }
 
+/* Handles decrypted buffers only */
 static GstFlowReturn
 gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux,
     GstAdaptiveDemuxStream * stream, GstBuffer * buffer, gboolean force)
@@ -532,6 +535,10 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux,
     guint buffer_size;
     GstTypeFindProbability prob = GST_TYPE_FIND_NONE;
 
+    if (hlsdemux->pending_typefind_buffer)
+      buffer = gst_buffer_append (hlsdemux->pending_typefind_buffer, buffer);
+    hlsdemux->pending_typefind_buffer = NULL;
+
     gst_buffer_map (buffer, &info, GST_MAP_READ);
     buffer_size = info.size;
 
@@ -553,11 +560,7 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux,
         gst_buffer_unref (buffer);
         return GST_FLOW_NOT_NEGOTIATED;
       } else {
-        if (hlsdemux->pending_buffer)
-          hlsdemux->pending_buffer =
-              gst_buffer_append (buffer, hlsdemux->pending_buffer);
-        else
-          hlsdemux->pending_buffer = buffer;
+        hlsdemux->pending_typefind_buffer = buffer;
         return GST_FLOW_OK;
       }
     }
@@ -569,6 +572,8 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux,
     hlsdemux->do_typefind = FALSE;
   }
 
+  g_assert (hlsdemux->pending_typefind_buffer == NULL);
+
   if (buffer)
     return gst_adaptive_demux_stream_push_buffer (stream, buffer);
   return GST_FLOW_OK;
@@ -584,35 +589,30 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
   if (hlsdemux->current_key)
     gst_hls_demux_decrypt_end (hlsdemux);
 
-  /* ideally this should be empty, but this eos might have been
-   * caused by an error on the source element */
-  GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received"
-      ": %" G_GSIZE_FORMAT, gst_adapter_available (stream->adapter));
-  gst_adapter_clear (stream->adapter);
-
   if (stream->last_ret == GST_FLOW_OK) {
-    if (hlsdemux->pending_buffer) {
+    if (hlsdemux->pending_decrypted_buffer) {
       if (hlsdemux->current_key) {
         GstMapInfo info;
         gssize unpadded_size;
 
         /* Handle pkcs7 unpadding here */
-        gst_buffer_map (hlsdemux->pending_buffer, &info, GST_MAP_READ);
+        gst_buffer_map (hlsdemux->pending_decrypted_buffer, &info,
+            GST_MAP_READ);
         unpadded_size = info.size - info.data[info.size - 1];
-        gst_buffer_unmap (hlsdemux->pending_buffer, &info);
+        gst_buffer_unmap (hlsdemux->pending_decrypted_buffer, &info);
 
-        gst_buffer_resize (hlsdemux->pending_buffer, 0, unpadded_size);
+        gst_buffer_resize (hlsdemux->pending_decrypted_buffer, 0,
+            unpadded_size);
       }
 
       ret =
-          gst_hls_demux_handle_buffer (demux, stream, hlsdemux->pending_buffer,
-          TRUE);
-      hlsdemux->pending_buffer = NULL;
+          gst_hls_demux_handle_buffer (demux, stream,
+          hlsdemux->pending_decrypted_buffer, TRUE);
+      hlsdemux->pending_decrypted_buffer = NULL;
     }
   } else {
-    if (hlsdemux->pending_buffer)
-      gst_buffer_unref (hlsdemux->pending_buffer);
-    hlsdemux->pending_buffer = NULL;
+    gst_buffer_replace (&hlsdemux->pending_decrypted_buffer, NULL);
+    gst_adapter_clear (hlsdemux->pending_encrypted_data);
   }
 
   if (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED)
@@ -623,27 +623,27 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
 
 static GstFlowReturn
 gst_hls_demux_data_received (GstAdaptiveDemux * demux,
-    GstAdaptiveDemuxStream * stream)
+    GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
 {
   GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
-  gsize available;
-  GstBuffer *buffer = NULL;
-
-  available = gst_adapter_available (stream->adapter);
 
   /* Is it encrypted? */
   if (hlsdemux->current_key) {
     GError *err = NULL;
+    gsize size;
     GstBuffer *tmp_buffer;
 
+    gst_adapter_push (hlsdemux->pending_encrypted_data, buffer);
+    size = gst_adapter_available (hlsdemux->pending_encrypted_data);
+
     /* must be a multiple of 16 */
-    available = available & (~0xF);
+    size = size & (~0xF);
 
-    if (available == 0) {
+    if (size == 0) {
       return GST_FLOW_OK;
     }
 
-    buffer = gst_adapter_take_buffer (stream->adapter, available);
+    buffer = gst_adapter_take_buffer (hlsdemux->pending_encrypted_data, size);
     buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err);
     if (buffer == NULL) {
       GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"),
@@ -652,15 +652,9 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux,
       return GST_FLOW_ERROR;
     }
 
-    tmp_buffer = hlsdemux->pending_buffer;
-    hlsdemux->pending_buffer = buffer;
+    tmp_buffer = hlsdemux->pending_decrypted_buffer;
+    hlsdemux->pending_decrypted_buffer = buffer;
     buffer = tmp_buffer;
-  } else {
-    buffer = gst_adapter_take_buffer (stream->adapter, available);
-    if (hlsdemux->pending_buffer) {
-      buffer = gst_buffer_append (hlsdemux->pending_buffer, buffer);
-      hlsdemux->pending_buffer = NULL;
-    }
   }
 
   return gst_hls_demux_handle_buffer (demux, stream, buffer, FALSE);
@@ -779,9 +773,9 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
   demux->client = gst_m3u8_client_new ("", NULL);
 
   demux->srcpad_counter = 0;
-  if (demux->pending_buffer)
-    gst_buffer_unref (demux->pending_buffer);
-  demux->pending_buffer = NULL;
+  gst_adapter_clear (demux->pending_encrypted_data);
+  gst_buffer_replace (&demux->pending_decrypted_buffer, NULL);
+  gst_buffer_replace (&demux->pending_typefind_buffer, NULL);
   if (demux->current_key) {
     g_free (demux->current_key);
     demux->current_key = NULL;
index 1154236..da84069 100644 (file)
@@ -84,11 +84,10 @@ struct _GstHLSDemux
 #endif
   gchar *current_key;
   guint8 *current_iv;
-  GstBuffer *pending_buffer; /* decryption scenario:
-                              * the last buffer can only be pushed when
-                              * resized, so need to store and wait for
-                              * EOS to know it is the last */
-
+  GstAdapter *pending_encrypted_data;  /* for chunking data into 16 byte multiples for decryption */
+  GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding.
+                                          We only know that it is the last at EOS */
+  GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */
   gboolean reset_pts;
 };
 
index c021577..4e1f2b9 100644 (file)
@@ -273,7 +273,7 @@ gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
     stream, GstFlowReturn ret, GError * err);
 static GstFlowReturn
 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
-    GstAdaptiveDemuxStream * stream);
+    GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
 static GstFlowReturn
 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
     GstAdaptiveDemuxStream * stream);
@@ -1117,7 +1117,6 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
   g_cond_init (&stream->fragment_download_cond);
   g_mutex_init (&stream->fragment_download_lock);
-  stream->adapter = gst_adapter_new ();
 
   demux->next_streams = g_list_append (demux->next_streams, stream);
 
@@ -1221,8 +1220,6 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
 
   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
 
-  g_object_unref (stream->adapter);
-
   g_free (stream);
 }
 
@@ -1722,7 +1719,6 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
 
     stream->download_error_count = 0;
     stream->need_header = TRUE;
-    gst_adapter_clear (stream->adapter);
   }
 }
 
@@ -2019,12 +2015,8 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
  */
 static GstFlowReturn
 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
-    GstAdaptiveDemuxStream * stream)
+    GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
 {
-  GstBuffer *buffer;
-
-  buffer = gst_adapter_take_buffer (stream->adapter,
-      gst_adapter_available (stream->adapter));
   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
 }
 
@@ -2124,12 +2116,10 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
       stream->download_chunk_start_time;
   stream->download_total_bytes += gst_buffer_get_size (buffer);
 
-  gst_adapter_push (stream->adapter, buffer);
-  GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT
-      ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer),
-      gst_adapter_available (stream->adapter));
+  GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
+      gst_buffer_get_size (buffer));
 
-  ret = klass->data_received (demux, stream);
+  ret = klass->data_received (demux, stream, buffer);
 
   if (ret == GST_FLOW_FLUSHING) {
     /* do not make any changes if the stream is cancelled */
@@ -3302,7 +3292,6 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
       stream->need_header = TRUE;
-      gst_adapter_clear (stream->adapter);
       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
     }
 
index 8fcc47a..3db54bb 100644 (file)
@@ -120,8 +120,6 @@ struct _GstAdaptiveDemuxStream
 
   GstSegment segment;
 
-  GstAdapter *adapter;
-
   GstCaps *pending_caps;
   GstEvent *pending_segment;
   GstTagList *pending_tags;
@@ -385,13 +383,14 @@ struct _GstAdaptiveDemuxClass
    * data_received:
    * @demux: #GstAdaptiveDemux
    * @stream: #GstAdaptiveDemuxStream
+   * @buffer: #GstBuffer
    *
    * Notifies the subclass that a fragment chunk was downloaded. The subclass
-   * can look at the data at the adapter and modify/push data as desired.
+   * can look at the data and modify/push data as desired.
    *
    * Returns: #GST_FLOW_OK if successful, #GST_FLOW_ERROR in case of error.
    */
-  GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream);
+  GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
 
   /**
    * get_live_seek_range: