decodebin3: Add new about-to-finish signal
[platform/upstream/gstreamer.git] / gst / playback / gstdecodebin3.c
index f0d6acd..3f09a1c 100644 (file)
@@ -36,6 +36,7 @@
 
 /**
  * SECTION:element-decodebin3
+ * @title: decodebin3
  *
  * #GstBin that auto-magically constructs a decoding pipeline using available
  * decoders and demuxers via auto-plugging. The output is raw audio, video
  *
  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
  *
- * <itemizedlist>
- * <listitem>
- * supports publication and selection of stream information via
+ * * supports publication and selection of stream information via
  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
- * </listitem>
- * <listitem>
- * dynamically switches stream connections internally, and
+ *
+ * * dynamically switches stream connections internally, and
  * reuses decoder elements when stream selections change, so that in
  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
  * and only creates new elements when streams change and an existing decoder
  * is not capable of handling the new format.
- * </listitem>
- * <listitem>
- * supports multiple input pads for the parallel decoding of auxilliary streams
+ *
+ * * supports multiple input pads for the parallel decoding of auxilliary streams
  * not muxed with the primary stream.
- * </listitem>
- * <listitem>
- * does not handle network stream buffering. decodebin3 expects that network stream
+ *
+ * * does not handle network stream buffering. decodebin3 expects that network stream
  * buffering is handled upstream, before data is passed to it.
- * </listitem>
- * </itemizedlist>
  *
  * <emphasis>decodebin3 is still experimental API and a technology preview.
  * Its behaviour and exposed API is subject to change.</emphasis>
  *  * STREAM_START :
  *     a new stream is starting => link it further if needed
  *
- *
  * 3) Gradual replacement
  *
  * If the caps change at any point in decodebin (input sink pad, demuxer output,
  *  b.1) The new CAPS are accepted, keep current configuration
  *  b.2) The new CAPS are not accepted, remove following elements then do a)
  *
- *
- *
  *    Components:
  *
  *                                                   MultiQ     Output
@@ -216,16 +207,15 @@ struct _GstDecodebin3
 
   GstElement *multiqueue;
 
-  /* FIXME : Mutex for protecting values below */
-  GstStreamCollection *collection;      /* Active collection */
-
+  /* selection_lock protects access to following variables */
+  GMutex selection_lock;
   GList *input_streams;         /* List of DecodebinInputStream for active collection */
   GList *output_streams;        /* List of DecodebinOutputStream used for output */
   GList *slots;                 /* List of MultiQueueSlot */
   guint slot_id;
 
-  /* selection_lock protects access to following variables */
-  GMutex selection_lock;
+  /* Active collection */
+  GstStreamCollection *collection;
   /* requested selection of stream-id to activate post-multiqueue */
   GList *requested_selection;
   /* list of stream-id currently activated in output */
@@ -245,7 +235,6 @@ struct _GstDecodebin3
    * FIXME : Is this really needed ? */
   GList *pending_collection;
 
-
   /* Factories */
   GMutex factories_lock;
   guint32 factories_cookie;
@@ -289,6 +278,12 @@ struct _DecodebinInput
 
   gulong pad_added_sigid;
   gulong pad_removed_sigid;
+  gulong drained_sigid;
+
+  /* TRUE if the input got drained
+   * FIXME : When do we reset it if re-used ?
+   */
+  gboolean drained;
 
   /* HACK : Remove these fields */
   /* List of PendingPad structures */
@@ -358,8 +353,6 @@ typedef struct _PendingPad
 } PendingPad;
 
 /* properties */
-#define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
-
 enum
 {
   PROP_0,
@@ -370,6 +363,7 @@ enum
 enum
 {
   SIGNAL_SELECT_STREAM,
+  SIGNAL_ABOUT_TO_FINISH,
   LAST_SIGNAL
 };
 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
@@ -505,8 +499,7 @@ static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
     MultiQueueSlot * slot);
 
 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
-static void update_requested_selection (GstDecodebin3 * dbin,
-    GstStreamCollection * collection);
+static void update_requested_selection (GstDecodebin3 * dbin);
 
 /* FIXME: Really make all the parser stuff a self-contained helper object */
 #include "gstdecodebin3-parse.c"
@@ -562,6 +555,17 @@ gst_decodebin3_class_init (GstDecodebin3Class * klass)
       _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
 
+  /**
+   * GstDecodebin3::about-to-finish:
+   *
+   * This signal is emitted when the data for the selected URI is
+   * entirely buffered and it is safe to specify anothe URI.
+   */
+  gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
+      g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
+      0, G_TYPE_NONE);
+
 
   element_class->request_new_pad =
       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
@@ -727,6 +731,30 @@ set_input_group_id (DecodebinInput * input, guint32 * group_id)
   return FALSE;
 }
 
+static void
+parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
+{
+  GstDecodebin3 *dbin = input->dbin;
+  gboolean all_drained;
+  GList *tmp;
+
+  GST_WARNING_OBJECT (dbin, "input %p drained", input);
+  input->drained = TRUE;
+
+  all_drained = dbin->main_input->drained;
+  for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
+    DecodebinInput *data = (DecodebinInput *) tmp->data;
+
+    all_drained &= data->drained;
+  }
+
+  if (all_drained) {
+    GST_WARNING_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
+    g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
+        NULL);
+  }
+}
+
 /* Call with INPUT_LOCK taken */
 static gboolean
 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
@@ -745,6 +773,9 @@ ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
     input->pad_removed_sigid =
         g_signal_connect (input->parsebin, "pad-removed",
         (GCallback) parsebin_pad_removed_cb, input);
+    input->drained_sigid =
+        g_signal_connect (input->parsebin, "drained",
+        (GCallback) parsebin_drained_cb, input);
     g_signal_connect (input->parsebin, "autoplug-continue",
         (GCallback) parsebin_autoplug_continue_cb, dbin);
   }
@@ -843,6 +874,7 @@ gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
       input->collection = NULL;
     }
 
+    SELECTION_LOCK (dbin);
     collection = get_merged_collection (dbin);
     if (collection && collection != dbin->collection) {
       GstMessage *msg;
@@ -856,14 +888,17 @@ gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
           gst_message_new_stream_collection ((GstObject *) dbin,
           dbin->collection);
 
+      SELECTION_UNLOCK (dbin);
       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
-      update_requested_selection (dbin, dbin->collection);
-    }
+      update_requested_selection (dbin);
+    } else
+      SELECTION_UNLOCK (dbin);
 
     gst_bin_remove (GST_BIN (dbin), input->parsebin);
     gst_element_set_state (input->parsebin, GST_STATE_NULL);
     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
+    g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
     gst_pad_remove_probe (input->parsebin_sink, probe_id);
     gst_object_unref (input->parsebin);
     gst_object_unref (input->parsebin_sink);
@@ -893,6 +928,7 @@ free_input (GstDecodebin3 * dbin, DecodebinInput * input)
   if (input->parsebin) {
     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
+    g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
     gst_element_set_state (input->parsebin, GST_STATE_NULL);
     gst_object_unref (input->parsebin);
     gst_object_unref (input->parsebin_sink);
@@ -949,13 +985,13 @@ gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
 
   /* We are ignoring names for the time being, not sure it makes any sense
    * within the context of decodebin3 ... */
-  INPUT_LOCK (dbin);
   input = create_new_input (dbin, FALSE);
   if (input) {
+    INPUT_LOCK (dbin);
     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
     res = input->ghost_sink;
+    INPUT_UNLOCK (dbin);
   }
-  INPUT_UNLOCK (dbin);
 
   return res;
 }
@@ -1020,14 +1056,12 @@ stream_in_list (GList * list, const gchar * sid)
 }
 
 static void
-update_requested_selection (GstDecodebin3 * dbin,
-    GstStreamCollection * collection)
+update_requested_selection (GstDecodebin3 * dbin)
 {
   guint i, nb;
   GList *tmp = NULL;
   GstStreamType used_types = 0;
-
-  nb = gst_stream_collection_get_size (collection);
+  GstStreamCollection *collection;
 
   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
    *  the switch handler will take care of the pending selection */
@@ -1038,6 +1072,13 @@ update_requested_selection (GstDecodebin3 * dbin,
     goto beach;
   }
 
+  collection = dbin->collection;
+  if (G_UNLIKELY (collection == NULL)) {
+    GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
+    goto beach;
+  }
+  nb = gst_stream_collection_get_size (collection);
+
   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
 
@@ -1266,6 +1307,7 @@ handle_stream_collection (GstDecodebin3 * dbin,
 #endif
 
   /* Store collection for later usage */
+  SELECTION_LOCK (dbin);
   if (dbin->collection == NULL) {
     dbin->collection = collection;
   } else {
@@ -1281,6 +1323,7 @@ handle_stream_collection (GstDecodebin3 * dbin,
     /* dbin->pending_collection = */
     /*     g_list_append (dbin->pending_collection, collection); */
   }
+  SELECTION_UNLOCK (dbin);
 }
 
 static void
@@ -1303,6 +1346,8 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
         posting_collection = TRUE;
         INPUT_UNLOCK (dbin);
       }
+
+      SELECTION_LOCK (dbin);
       if (dbin->collection && collection != dbin->collection) {
         /* Replace collection message, we most likely aggregated it */
         GstMessage *new_msg;
@@ -1312,6 +1357,8 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
         gst_message_unref (message);
         message = new_msg;
       }
+      SELECTION_UNLOCK (dbin);
+
       if (collection)
         gst_object_unref (collection);
       break;
@@ -1324,7 +1371,7 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
 
   if (posting_collection) {
     /* Figure out a selection for that collection */
-    update_requested_selection (dbin, dbin->collection);
+    update_requested_selection (dbin);
   }
 }
 
@@ -1488,6 +1535,69 @@ is_selection_done (GstDecodebin3 * dbin)
   return msg;
 }
 
+/* Must be called with SELECTION_LOCK taken */
+static void
+check_all_slot_for_eos (GstDecodebin3 * dbin)
+{
+  gboolean all_drained = TRUE;
+  GList *iter;
+
+  GST_DEBUG_OBJECT (dbin, "check slot for eos");
+
+  for (iter = dbin->slots; iter; iter = iter->next) {
+    MultiQueueSlot *slot = iter->data;
+
+    if (!slot->output)
+      continue;
+
+    if (slot->is_drained) {
+      GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
+      continue;
+    }
+
+    all_drained = FALSE;
+    break;
+  }
+
+  if (all_drained) {
+    INPUT_LOCK (dbin);
+    if (!pending_pads_are_eos (dbin->main_input))
+      all_drained = FALSE;
+
+    if (all_drained) {
+      for (iter = dbin->other_inputs; iter; iter = iter->next) {
+        if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
+          all_drained = FALSE;
+          break;
+        }
+      }
+    }
+    INPUT_UNLOCK (dbin);
+  }
+
+  if (all_drained) {
+    GST_DEBUG_OBJECT (dbin,
+        "All active slots are drained, and no pending input, push EOS");
+
+    for (iter = dbin->input_streams; iter; iter = iter->next) {
+      DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
+      GstPad *peer = gst_pad_get_peer (input->srcpad);
+
+      /* Send EOS to all slots */
+      if (peer) {
+        GstEvent *stream_start =
+            gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
+        /* First forward the STREAM_START event to reset the EOS status (if any) */
+        if (stream_start)
+          gst_pad_send_event (peer, stream_start);
+        gst_pad_send_event (peer, gst_event_new_eos ());
+        gst_object_unref (peer);
+      } else
+        GST_DEBUG_OBJECT (dbin, "no output");
+    }
+  }
+}
+
 static GstPadProbeReturn
 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
     MultiQueueSlot * slot)
@@ -1568,10 +1678,36 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
       }
         break;
       case GST_EVENT_EOS:
-        /* FIXME : Figure out */
+      {
+        const GstStructure *s = gst_event_get_structure (ev);
+        slot->is_drained = TRUE;
+
+        /* Custom EOS handling first */
+        if (s && gst_structure_has_field (s, "decodebin3-custom-eos")) {
+          ret = GST_PAD_PROBE_DROP;
+          SELECTION_LOCK (dbin);
+          if (slot->input == NULL) {
+            GST_DEBUG_OBJECT (pad,
+                "Got custom-eos from null input stream, remove output stream");
+            /* Remove the output */
+            if (slot->output) {
+              DecodebinOutputStream *output = slot->output;
+              dbin->output_streams =
+                  g_list_remove (dbin->output_streams, output);
+              free_output_stream (dbin, output);
+            }
+            slot->probe_id = 0;
+            dbin->slots = g_list_remove (dbin->slots, slot);
+            free_multiqueue_slot_async (dbin, slot);
+            ret = GST_PAD_PROBE_REMOVE;
+          } else {
+            check_all_slot_for_eos (dbin);
+          }
+          SELECTION_UNLOCK (dbin);
+          break;
+        }
         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
             slot->input);
-        slot->is_drained = TRUE;
         if (slot->input == NULL) {
           GstPad *peer;
           GST_DEBUG_OBJECT (pad,
@@ -1594,33 +1730,17 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
           }
           slot->probe_id = 0;
           dbin->slots = g_list_remove (dbin->slots, slot);
-          free_multiqueue_slot_async (dbin, slot);
           SELECTION_UNLOCK (dbin);
+
+          free_multiqueue_slot_async (dbin, slot);
           ret = GST_PAD_PROBE_REMOVE;
-        }
-        break;
-      case GST_EVENT_CUSTOM_DOWNSTREAM:
-        if (gst_event_has_name (ev, "decodebin3-custom-eos")) {
-          slot->is_drained = TRUE;
-          ret = GST_PAD_PROBE_DROP;
+        } else {
+          GST_DEBUG_OBJECT (pad, "What happens with event ?");
           SELECTION_LOCK (dbin);
-          if (slot->input == NULL) {
-            GST_DEBUG_OBJECT (pad,
-                "Got custom-eos from null input stream, remove output stream");
-            /* Remove the output */
-            if (slot->output) {
-              DecodebinOutputStream *output = slot->output;
-              dbin->output_streams =
-                  g_list_remove (dbin->output_streams, output);
-              free_output_stream (dbin, output);
-            }
-            slot->probe_id = 0;
-            dbin->slots = g_list_remove (dbin->slots, slot);
-            free_multiqueue_slot_async (dbin, slot);
-            ret = GST_PAD_PROBE_REMOVE;
-          }
+          check_all_slot_for_eos (dbin);
           SELECTION_UNLOCK (dbin);
         }
+      }
         break;
       default:
         break;
@@ -1667,11 +1787,14 @@ create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
       gst_stream_type_get_name (type));
   slot = g_new0 (MultiQueueSlot, 1);
   slot->dbin = dbin;
+
   slot->id = dbin->slot_id++;
+
   slot->type = type;
   slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
   if (slot->sink_pad == NULL)
     goto fail;
+
   it = gst_pad_iterate_internal_links (slot->sink_pad);
   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
@@ -1692,7 +1815,9 @@ create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
 
   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
       GST_DEBUG_PAD_NAME (slot->src_pad));
+
   dbin->slots = g_list_append (dbin->slots, slot);
+
   return slot;
 
   /* ERRORS */
@@ -1769,7 +1894,6 @@ get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
 static void
 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
 {
-  GstEvent *event;
   if (slot->input != NULL && slot->input != input) {
     GST_ERROR_OBJECT (slot->dbin,
         "Trying to link input to an already used slot");
@@ -1778,9 +1902,6 @@ link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
   slot->pending_stream = input->active_stream;
   slot->input = input;
-  event = gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
-  if (event)
-    gst_pad_send_event (slot->sink_pad, event);
 }
 
 #if 0
@@ -2267,7 +2388,9 @@ handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
       if (slot_to_deactivate) {
         GST_DEBUG_OBJECT (dbin,
             "Slot %p (%s) should be deactivated, no longer used", slot,
-            gst_stream_get_stream_id (slot->active_stream));
+            slot->
+            active_stream ? gst_stream_get_stream_id (slot->active_stream) :
+            "NULL");
         to_deactivate = g_list_append (to_deactivate, slot);
       }
     }
@@ -2364,6 +2487,14 @@ handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
     g_list_free (unknown);
   }
 
+  if (to_activate && !slots_to_reassign) {
+    for (tmp = to_activate; tmp; tmp = tmp->next) {
+      MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
+      gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
+          (GstPadProbeCallback) idle_reconfigure, slot, NULL);
+    }
+  }
+
   /* For all streams to deactivate, add an idle probe where we will do
    * the unassignment and switch over */
   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {