decodebin3: Refactor parsebin output handling
authorEdward Hervey <edward@centricular.com>
Sat, 5 Nov 2022 08:16:41 +0000 (09:16 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 16 Nov 2022 14:01:46 +0000 (14:01 +0000)
* Instead of creating temporary `PendingPad` structures, always create a
  DecodebinInputStream for every pad of parsebin
* Remove never used `pending_stream` field from DecodebinInputStream
* When unblocking a given DecodebinInput (i.e. wrapping a parsebin), also make
  sure that other parsebins from the same GstStreamCollection are unblocked
  since they come from the same source

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>

subprojects/gst-plugins-base/gst/playback/gstdecodebin3-parse.c
subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c

index 6675aae..b4007e5 100644 (file)
@@ -49,29 +49,31 @@ _custom_eos_quark_get (void)
   return g_quark;
 }
 
-/* Streams that come from demuxers (input/upstream) */
+/* Streams that come from parsebin */
 /* FIXME : All this is hardcoded. Switch to tree of chains */
 struct _DecodebinInputStream
 {
   GstDecodebin3 *dbin;
-  GstStream *pending_stream;    /* Extra ref */
+
   GstStream *active_stream;
 
   DecodebinInput *input;
 
-  GstPad *srcpad;               /* From demuxer */
+  GstPad *srcpad;               /* From parsebin */
 
   /* id of the pad event probe */
   gulong output_event_probe_id;
 
-  /* id of the buffer blocking probe on the input (demuxer src) pad */
-  gulong input_buffer_probe_id;
+  /* id of the buffer blocking probe on the parsebin srcpad pad */
+  gulong buffer_probe_id;
 
   /* Whether we saw an EOS on input. This should be treated accordingly
    * when the stream is no longer used */
   gboolean saw_eos;
 };
 
+static void unblock_pending_input (DecodebinInput * input,
+    gboolean unblock_other_inputs);
 static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
     DecodebinInput * input);
 static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
@@ -79,21 +81,6 @@ static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
 
 /* WITH SELECTION_LOCK TAKEN! */
 static gboolean
-pending_pads_are_eos (DecodebinInput * input)
-{
-  GList *tmp;
-
-  for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
-    PendingPad *ppad = (PendingPad *) tmp->data;
-    if (ppad->saw_eos == FALSE)
-      return FALSE;
-  }
-
-  return TRUE;
-}
-
-/* WITH SELECTION_LOCK TAKEN! */
-static gboolean
 all_inputs_are_eos (GstDecodebin3 * dbin)
 {
   GList *tmp;
@@ -104,13 +91,6 @@ all_inputs_are_eos (GstDecodebin3 * dbin)
       return FALSE;
   }
 
-  /* Check pending pads */
-  if (!pending_pads_are_eos (dbin->main_input))
-    return FALSE;
-  for (tmp = dbin->other_inputs; tmp; tmp = tmp->next)
-    if (!pending_pads_are_eos ((DecodebinInput *) tmp->data))
-      return FALSE;
-
   GST_DEBUG_OBJECT (dbin, "All streams are EOS");
   return TRUE;
 }
@@ -288,6 +268,15 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
         }
       }
         break;
+      case GST_EVENT_GAP:
+      {
+        /* If we are still waiting to be unblocked and we get a gap, unblock */
+        if (input->buffer_probe_id) {
+          GST_DEBUG_OBJECT (pad, "Got a gap event! Unblocking input(s) !");
+          unblock_pending_input (input->input, TRUE);
+        }
+        break;
+      }
       case GST_EVENT_CAPS:
       {
         GstCaps *caps = NULL;
@@ -366,18 +355,26 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
   return ret;
 }
 
-static DecodebinInputStream *
-create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
+static GstPadProbeReturn
+parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
     DecodebinInput * input)
 {
+  /* We have at least one buffer pending, unblock parsebin(s) */
+  GST_DEBUG_OBJECT (pad, "Got a buffer ! unblocking");
+  unblock_pending_input (input, TRUE);
+
+  return GST_PAD_PROBE_OK;
+}
+
+static DecodebinInputStream *
+create_input_stream (GstDecodebin3 * dbin, GstPad * pad, DecodebinInput * input)
+{
   DecodebinInputStream *res = g_new0 (DecodebinInputStream, 1);
 
-  GST_DEBUG_OBJECT (pad, "Creating input stream for stream %p %s (input:%p)",
-      stream, gst_stream_get_stream_id (stream), input);
+  GST_DEBUG_OBJECT (dbin, "Creating input stream for %" GST_PTR_FORMAT, pad);
 
   res->dbin = dbin;
   res->input = input;
-  res->pending_stream = gst_object_ref (stream);
   res->srcpad = pad;
 
   /* Put probe on output source pad (for detecting EOS/STREAM_START/FLUSH) */
@@ -387,6 +384,12 @@ create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
       | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
       (GstPadProbeCallback) parse_chain_output_probe, res, NULL);
 
+  /* Install a blocking buffer probe */
+  res->buffer_probe_id =
+      gst_pad_add_probe (pad,
+      GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
+      (GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
+
   /* Add to list of current input streams */
   SELECTION_LOCK (dbin);
   dbin->input_streams = g_list_append (dbin->input_streams, res);
@@ -406,6 +409,8 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
       stream->active_stream ? gst_stream_get_stream_id (stream->active_stream) :
       "<NONE>");
 
+  gst_object_replace ((GstObject **) & stream->active_stream, NULL);
+
   /* Unlink from slot */
   if (stream->srcpad) {
     GstPad *peer;
@@ -414,6 +419,8 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
       gst_pad_unlink (stream->srcpad, peer);
       gst_object_unref (peer);
     }
+    if (stream->buffer_probe_id)
+      gst_pad_remove_probe (stream->srcpad, stream->buffer_probe_id);
   }
 
   slot = get_slot_for_input (dbin, stream);
@@ -423,32 +430,31 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
     GST_DEBUG_OBJECT (dbin, "slot %p cleared", slot);
   }
 
-  if (stream->active_stream)
-    gst_object_unref (stream->active_stream);
-  if (stream->pending_stream)
-    gst_object_unref (stream->pending_stream);
-
   dbin->input_streams = g_list_remove (dbin->input_streams, stream);
 
   g_free (stream);
 }
 
 static void
-unblock_pending_input (DecodebinInput * input)
+unblock_pending_input (DecodebinInput * input, gboolean unblock_other_inputs)
 {
   GstDecodebin3 *dbin = input->dbin;
   GList *tmp, *unused_slot = NULL;
 
-  /* 1. Re-use existing streams if/when possible */
+  GST_DEBUG_OBJECT (dbin,
+      "DecodebinInput for %" GST_PTR_FORMAT " , unblock_other_inputs:%d",
+      input->parsebin, unblock_other_inputs);
+
+  /* Re-use existing streams if/when possible */
   GST_FIXME_OBJECT (dbin, "Re-use existing input streams if/when possible");
 
-  /* 2. Remove unused streams (push EOS) */
-  GST_DEBUG_OBJECT (dbin, "Removing unused streams");
+  /* Unblock all input streams and link to a slot if needed */
   SELECTION_LOCK (dbin);
   tmp = dbin->input_streams;
   while (tmp != NULL) {
     DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
     GList *next = tmp->next;
+    MultiQueueSlot *slot;
 
     if (input_stream->input != input) {
       tmp = next;
@@ -456,58 +462,33 @@ unblock_pending_input (DecodebinInput * input)
     }
 
     GST_DEBUG_OBJECT (dbin, "Checking input stream %p", input_stream);
-    if (input_stream->input_buffer_probe_id) {
+
+    if (!input_stream->active_stream)
+      input_stream->active_stream = gst_pad_get_stream (input_stream->srcpad);
+
+    /* Ensure the stream has an associated slot */
+    slot = get_slot_for_input (dbin, input_stream);
+    if (slot->input != input_stream)
+      link_input_to_slot (input_stream, slot);
+
+    if (input_stream->buffer_probe_id) {
       GST_DEBUG_OBJECT (dbin,
           "Removing pad block on input %p pad %" GST_PTR_FORMAT, input_stream,
           input_stream->srcpad);
       gst_pad_remove_probe (input_stream->srcpad,
-          input_stream->input_buffer_probe_id);
+          input_stream->buffer_probe_id);
+      input_stream->buffer_probe_id = 0;
     }
-    input_stream->input_buffer_probe_id = 0;
 
     if (input_stream->saw_eos) {
+      GST_DEBUG_OBJECT (dbin, "Removing EOS'd stream");
       remove_input_stream (dbin, input_stream);
       tmp = dbin->input_streams;
     } else
       tmp = next;
   }
-  SELECTION_UNLOCK (dbin);
-
-  GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
-  /* 3. Create new streams */
-  for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
-    GstStream *stream;
-    PendingPad *ppad = (PendingPad *) tmp->data;
-
-    stream = gst_pad_get_stream (ppad->pad);
-    if (stream == NULL) {
-      GST_ERROR_OBJECT (dbin, "No stream for pad ????");
-    } else {
-      MultiQueueSlot *slot;
-      DecodebinInputStream *input_stream;
-      /* The remaining pads in pending_pads are the ones that require a new
-       * input stream */
-      input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input);
-      /* See if we can link it straight away */
-      input_stream->active_stream = stream;
-
-      SELECTION_LOCK (dbin);
-      slot = get_slot_for_input (dbin, input_stream);
-      link_input_to_slot (input_stream, slot);
-      SELECTION_UNLOCK (dbin);
-
-      /* Remove the buffer and event probe */
-      gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
-      gst_pad_remove_probe (ppad->pad, ppad->event_probe);
-      g_free (ppad);
-    }
-  }
-
-  g_list_free (input->pending_pads);
-  input->pending_pads = NULL;
 
   /* Weed out unused multiqueue slots */
-  SELECTION_LOCK (dbin);
   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
     GST_LOG_OBJECT (dbin, "Slot %d input:%p", slot->id, slot->input);
@@ -518,105 +499,43 @@ unblock_pending_input (DecodebinInput * input)
   }
   SELECTION_UNLOCK (dbin);
 
-  for (tmp = unused_slot; tmp; tmp = tmp->next) {
-    GstPad *sink_pad = (GstPad *) tmp->data;
-    GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
-    gst_pad_send_event (sink_pad, gst_event_new_eos ());
-  }
-
-  if (unused_slot)
+  if (unused_slot) {
+    for (tmp = unused_slot; tmp; tmp = tmp->next) {
+      GstPad *sink_pad = (GstPad *) tmp->data;
+      GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
+      gst_pad_send_event (sink_pad, gst_event_new_eos ());
+    }
     g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref);
+  }
 
-}
-
-/* FIXME : HACK, REMOVE, USE INPUT CHAINS */
-static GstPadProbeReturn
-parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
-    DecodebinInput * input)
-{
-  /* Any data out the demuxer means it's not creating pads
-   * any more right now */
-  GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
-  unblock_pending_input (input);
-
-  return GST_PAD_PROBE_OK;
-}
-
-static GstPadProbeReturn
-parsebin_pending_event_probe (GstPad * pad, GstPadProbeInfo * info,
-    PendingPad * ppad)
-{
-  GstDecodebin3 *dbin = ppad->dbin;
-  /* We drop all events by default */
-  GstPadProbeReturn ret = GST_PAD_PROBE_DROP;
-  GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
-
-  GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
-  switch (GST_EVENT_TYPE (ev)) {
-    case GST_EVENT_EOS:
-    {
-      GST_DEBUG_OBJECT (pad, "Pending pad marked as EOS, removing");
-      ppad->input->pending_pads =
-          g_list_remove (ppad->input->pending_pads, ppad);
-      gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
-      gst_pad_remove_probe (ppad->pad, ppad->event_probe);
-      g_free (ppad);
-
-      SELECTION_LOCK (dbin);
-      check_all_streams_for_eos (dbin, ev);
-      SELECTION_UNLOCK (dbin);
+  if (unblock_other_inputs) {
+    GList *tmp;
+    /* If requrested, unblock inputs which are targetting the same collection */
+    if (dbin->main_input != input) {
+      if (dbin->main_input->collection == input->collection) {
+        GST_DEBUG_OBJECT (dbin, "Unblock main input");
+        unblock_pending_input (dbin->main_input, FALSE);
+      }
+    }
+    for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
+      DecodebinInput *other = tmp->data;
+      if (other->collection == input->collection) {
+        GST_DEBUG_OBJECT (dbin, "Unblock other input");
+        unblock_pending_input (other, FALSE);
+      }
     }
-      break;
-    case GST_EVENT_GAP:
-      GST_DEBUG_OBJECT (pad, "Got a gap event! UNBLOCK !");
-      unblock_pending_input (ppad->input);
-      ret = GST_PAD_PROBE_OK;
-      break;
-    default:
-      break;
   }
-
-  return ret;
 }
 
 static void
 parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input)
 {
   GstDecodebin3 *dbin = input->dbin;
-  PendingPad *ppad;
-  GList *tmp;
 
   GST_DEBUG_OBJECT (dbin, "New pad %s:%s (input:%p)", GST_DEBUG_PAD_NAME (pad),
       input);
 
-  ppad = g_new0 (PendingPad, 1);
-  ppad->dbin = dbin;
-  ppad->input = input;
-  ppad->pad = pad;
-
-  ppad->event_probe =
-      gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
-      (GstPadProbeCallback) parsebin_pending_event_probe, ppad, NULL);
-  ppad->buffer_probe =
-      gst_pad_add_probe (pad,
-      GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
-      (GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
-
-  input->pending_pads = g_list_append (input->pending_pads, ppad);
-
-  /* Check if all existing input streams have a buffer probe set */
-  for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
-    DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
-    if (input_stream->input == input &&
-        input_stream->input_buffer_probe_id == 0) {
-      GST_DEBUG_OBJECT (input_stream->srcpad, "Adding blocking buffer probe");
-      input_stream->input_buffer_probe_id =
-          gst_pad_add_probe (input_stream->srcpad,
-          GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
-          (GstPadProbeCallback) parsebin_buffer_probe, input_stream->input,
-          NULL);
-    }
-  }
+  create_input_stream (dbin, pad, input);
 }
 
 static void
@@ -624,50 +543,45 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
 {
   GstDecodebin3 *dbin = inp->dbin;
   DecodebinInputStream *input = NULL;
+  MultiQueueSlot *slot;
   GList *tmp;
+
   GST_DEBUG_OBJECT (pad, "removed");
 
   for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
     DecodebinInputStream *cand = (DecodebinInputStream *) tmp->data;
-    if (cand->srcpad == pad)
+    if (cand->srcpad == pad) {
       input = cand;
+      break;
+    }
   }
+  g_assert (input);
+
   /* If there are no pending pads, this means we will definitely not need this
    * stream anymore */
-  if (input) {
-    GST_DEBUG_OBJECT (pad, "stream %p", input);
-    if (inp->pending_pads == NULL) {
-      MultiQueueSlot *slot;
-
-      GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
-
-      SELECTION_LOCK (dbin);
-      slot = get_slot_for_input (dbin, input);
-
-      remove_input_stream (dbin, input);
-      if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
-        /* if slot is still there and already drained, remove it in here */
-        if (slot->output) {
-          DecodebinOutputStream *output = slot->output;
-          GST_DEBUG_OBJECT (pad,
-              "Multiqueue was drained, Remove output stream");
-
-          dbin->output_streams = g_list_remove (dbin->output_streams, output);
-          free_output_stream (dbin, output);
-        }
-        GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
-        if (slot->probe_id)
-          gst_pad_remove_probe (slot->src_pad, slot->probe_id);
-        slot->probe_id = 0;
-        dbin->slots = g_list_remove (dbin->slots, slot);
-        free_multiqueue_slot_async (dbin, slot);
-      }
-      SELECTION_UNLOCK (dbin);
-    } else {
-      input->srcpad = NULL;
-      if (input->input_buffer_probe_id)
-        gst_pad_remove_probe (pad, input->input_buffer_probe_id);
-      input->input_buffer_probe_id = 0;
+
+  GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
+
+  SELECTION_LOCK (dbin);
+  slot = get_slot_for_input (dbin, input);
+  input->srcpad = NULL;
+  remove_input_stream (dbin, input);
+
+  if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
+    /* if slot is still there and already drained, remove it in here */
+    if (slot->output) {
+      DecodebinOutputStream *output = slot->output;
+      GST_DEBUG_OBJECT (pad, "Multiqueue was drained, Remove output stream");
+
+      dbin->output_streams = g_list_remove (dbin->output_streams, output);
+      free_output_stream (dbin, output);
     }
+    GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
+    if (slot->probe_id)
+      gst_pad_remove_probe (slot->src_pad, slot->probe_id);
+    slot->probe_id = 0;
+    dbin->slots = g_list_remove (dbin->slots, slot);
+    free_multiqueue_slot_async (dbin, slot);
   }
+  SELECTION_UNLOCK (dbin);
 }
index 0a1437d..274b84c 100644 (file)
@@ -303,10 +303,6 @@ struct _DecodebinInput
    * FIXME : When do we reset it if re-used ?
    */
   gboolean drained;
-
-  /* HACK : Remove these fields */
-  /* List of PendingPad structures */
-  GList *pending_pads;
 };
 
 /* Multiqueue Slots */
@@ -362,18 +358,6 @@ struct _DecodebinOutputStream
   gulong drop_probe_id;
 };
 
-/* Pending pads from parsebin */
-typedef struct _PendingPad
-{
-  GstDecodebin3 *dbin;
-  DecodebinInput *input;
-  GstPad *pad;
-
-  gulong buffer_probe;
-  gulong event_probe;
-  gboolean saw_eos;
-} PendingPad;
-
 /* properties */
 enum
 {
@@ -1935,21 +1919,9 @@ check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
     break;
   }
 
-  if (all_drained) {
-    INPUT_LOCK (dbin);
-    if (!pending_pads_are_eos (dbin->main_input))
-      all_drained = FALSE;
-
-    if (all_drained) {
-      for (iter = dbin->other_inputs; iter; iter = iter->next) {
-        if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
-          all_drained = FALSE;
-          break;
-        }
-      }
-    }
-    INPUT_UNLOCK (dbin);
-  }
+  /* Also check with the inputs, data might be pending */
+  if (all_drained)
+    all_drained = all_inputs_are_eos (dbin);
 
   if (all_drained) {
     GST_DEBUG_OBJECT (dbin,