/**
* SECTION:element-decodebin3
+ * @title: decodebin3
*
* #GstBin that auto-magically constructs a decoding pipeline using available
* decoders and demuxers via auto-plugging. The output is raw audio, video
*
* decodebin3 differs from the previous decodebin (decodebin2) in important ways:
*
- * <itemizedlist>
- * <listitem>
- * supports publication and selection of stream information via
+ * * supports publication and selection of stream information via
* GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
- * </listitem>
- * <listitem>
- * dynamically switches stream connections internally, and
+ *
+ * * dynamically switches stream connections internally, and
* reuses decoder elements when stream selections change, so that in
* the normal case it maintains 1 decoder of each type (video/audio/subtitle)
* and only creates new elements when streams change and an existing decoder
* is not capable of handling the new format.
- * </listitem>
- * <listitem>
- * supports multiple input pads for the parallel decoding of auxilliary streams
+ *
+ * * supports multiple input pads for the parallel decoding of auxilliary streams
* not muxed with the primary stream.
- * </listitem>
- * <listitem>
- * does not handle network stream buffering. decodebin3 expects that network stream
+ *
+ * * does not handle network stream buffering. decodebin3 expects that network stream
* buffering is handled upstream, before data is passed to it.
- * </listitem>
- * </itemizedlist>
*
* <emphasis>decodebin3 is still experimental API and a technology preview.
* Its behaviour and exposed API is subject to change.</emphasis>
* * STREAM_START :
* a new stream is starting => link it further if needed
*
- *
* 3) Gradual replacement
*
* If the caps change at any point in decodebin (input sink pad, demuxer output,
* b.1) The new CAPS are accepted, keep current configuration
* b.2) The new CAPS are not accepted, remove following elements then do a)
*
- *
- *
* Components:
*
* MultiQ Output
GstElement *multiqueue;
- /* FIXME : Mutex for protecting values below */
- GstStreamCollection *collection; /* Active collection */
-
+ /* selection_lock protects access to following variables */
+ GMutex selection_lock;
GList *input_streams; /* List of DecodebinInputStream for active collection */
GList *output_streams; /* List of DecodebinOutputStream used for output */
GList *slots; /* List of MultiQueueSlot */
guint slot_id;
- /* selection_lock protects access to following variables */
- GMutex selection_lock;
+ /* Active collection */
+ GstStreamCollection *collection;
/* requested selection of stream-id to activate post-multiqueue */
GList *requested_selection;
/* list of stream-id currently activated in output */
* FIXME : Is this really needed ? */
GList *pending_collection;
-
/* Factories */
GMutex factories_lock;
guint32 factories_cookie;
gulong pad_added_sigid;
gulong pad_removed_sigid;
+ gulong drained_sigid;
+
+ /* TRUE if the input got drained
+ * FIXME : When do we reset it if re-used ?
+ */
+ gboolean drained;
/* HACK : Remove these fields */
/* List of PendingPad structures */
} PendingPad;
/* properties */
-#define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
-
enum
{
PROP_0,
enum
{
SIGNAL_SELECT_STREAM,
+ SIGNAL_ABOUT_TO_FINISH,
LAST_SIGNAL
};
static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
MultiQueueSlot * slot);
static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
-static void update_requested_selection (GstDecodebin3 * dbin,
- GstStreamCollection * collection);
+static void update_requested_selection (GstDecodebin3 * dbin);
/* FIXME: Really make all the parser stuff a self-contained helper object */
#include "gstdecodebin3-parse.c"
_gst_int_accumulator, NULL, g_cclosure_marshal_generic,
G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
+ /**
+ * GstDecodebin3::about-to-finish:
+ *
+ * This signal is emitted when the data for the selected URI is
+ * entirely buffered and it is safe to specify anothe URI.
+ */
+ gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
+ g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
+ 0, G_TYPE_NONE);
+
element_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
return FALSE;
}
+static void
+parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
+{
+ GstDecodebin3 *dbin = input->dbin;
+ gboolean all_drained;
+ GList *tmp;
+
+ GST_WARNING_OBJECT (dbin, "input %p drained", input);
+ input->drained = TRUE;
+
+ all_drained = dbin->main_input->drained;
+ for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
+ DecodebinInput *data = (DecodebinInput *) tmp->data;
+
+ all_drained &= data->drained;
+ }
+
+ if (all_drained) {
+ GST_WARNING_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
+ g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
+ NULL);
+ }
+}
+
/* Call with INPUT_LOCK taken */
static gboolean
ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
input->pad_removed_sigid =
g_signal_connect (input->parsebin, "pad-removed",
(GCallback) parsebin_pad_removed_cb, input);
+ input->drained_sigid =
+ g_signal_connect (input->parsebin, "drained",
+ (GCallback) parsebin_drained_cb, input);
g_signal_connect (input->parsebin, "autoplug-continue",
(GCallback) parsebin_autoplug_continue_cb, dbin);
}
input->collection = NULL;
}
+ SELECTION_LOCK (dbin);
collection = get_merged_collection (dbin);
if (collection && collection != dbin->collection) {
GstMessage *msg;
gst_message_new_stream_collection ((GstObject *) dbin,
dbin->collection);
+ SELECTION_UNLOCK (dbin);
gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
- update_requested_selection (dbin, dbin->collection);
- }
+ update_requested_selection (dbin);
+ } else
+ SELECTION_UNLOCK (dbin);
gst_bin_remove (GST_BIN (dbin), input->parsebin);
gst_element_set_state (input->parsebin, GST_STATE_NULL);
g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
+ g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
gst_pad_remove_probe (input->parsebin_sink, probe_id);
gst_object_unref (input->parsebin);
gst_object_unref (input->parsebin_sink);
if (input->parsebin) {
g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
+ g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
gst_element_set_state (input->parsebin, GST_STATE_NULL);
gst_object_unref (input->parsebin);
gst_object_unref (input->parsebin_sink);
/* We are ignoring names for the time being, not sure it makes any sense
* within the context of decodebin3 ... */
- INPUT_LOCK (dbin);
input = create_new_input (dbin, FALSE);
if (input) {
+ INPUT_LOCK (dbin);
dbin->other_inputs = g_list_append (dbin->other_inputs, input);
res = input->ghost_sink;
+ INPUT_UNLOCK (dbin);
}
- INPUT_UNLOCK (dbin);
return res;
}
}
static void
-update_requested_selection (GstDecodebin3 * dbin,
- GstStreamCollection * collection)
+update_requested_selection (GstDecodebin3 * dbin)
{
guint i, nb;
GList *tmp = NULL;
GstStreamType used_types = 0;
-
- nb = gst_stream_collection_get_size (collection);
+ GstStreamCollection *collection;
/* 1. Is there a pending SELECT_STREAMS we can return straight away since
* the switch handler will take care of the pending selection */
goto beach;
}
+ collection = dbin->collection;
+ if (G_UNLIKELY (collection == NULL)) {
+ GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
+ goto beach;
+ }
+ nb = gst_stream_collection_get_size (collection);
+
/* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
#endif
/* Store collection for later usage */
+ SELECTION_LOCK (dbin);
if (dbin->collection == NULL) {
dbin->collection = collection;
} else {
/* dbin->pending_collection = */
/* g_list_append (dbin->pending_collection, collection); */
}
+ SELECTION_UNLOCK (dbin);
}
static void
posting_collection = TRUE;
INPUT_UNLOCK (dbin);
}
+
+ SELECTION_LOCK (dbin);
if (dbin->collection && collection != dbin->collection) {
/* Replace collection message, we most likely aggregated it */
GstMessage *new_msg;
gst_message_unref (message);
message = new_msg;
}
+ SELECTION_UNLOCK (dbin);
+
if (collection)
gst_object_unref (collection);
break;
if (posting_collection) {
/* Figure out a selection for that collection */
- update_requested_selection (dbin, dbin->collection);
+ update_requested_selection (dbin);
}
}
return msg;
}
+/* Must be called with SELECTION_LOCK taken */
+static void
+check_all_slot_for_eos (GstDecodebin3 * dbin)
+{
+ gboolean all_drained = TRUE;
+ GList *iter;
+
+ GST_DEBUG_OBJECT (dbin, "check slot for eos");
+
+ for (iter = dbin->slots; iter; iter = iter->next) {
+ MultiQueueSlot *slot = iter->data;
+
+ if (!slot->output)
+ continue;
+
+ if (slot->is_drained) {
+ GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
+ continue;
+ }
+
+ all_drained = FALSE;
+ break;
+ }
+
+ if (all_drained) {
+ INPUT_LOCK (dbin);
+ if (!pending_pads_are_eos (dbin->main_input))
+ all_drained = FALSE;
+
+ if (all_drained) {
+ for (iter = dbin->other_inputs; iter; iter = iter->next) {
+ if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
+ all_drained = FALSE;
+ break;
+ }
+ }
+ }
+ INPUT_UNLOCK (dbin);
+ }
+
+ if (all_drained) {
+ GST_DEBUG_OBJECT (dbin,
+ "All active slots are drained, and no pending input, push EOS");
+
+ for (iter = dbin->input_streams; iter; iter = iter->next) {
+ DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
+ GstPad *peer = gst_pad_get_peer (input->srcpad);
+
+ /* Send EOS to all slots */
+ if (peer) {
+ GstEvent *stream_start =
+ gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
+ /* First forward the STREAM_START event to reset the EOS status (if any) */
+ if (stream_start)
+ gst_pad_send_event (peer, stream_start);
+ gst_pad_send_event (peer, gst_event_new_eos ());
+ gst_object_unref (peer);
+ } else
+ GST_DEBUG_OBJECT (dbin, "no output");
+ }
+ }
+}
+
static GstPadProbeReturn
multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
MultiQueueSlot * slot)
}
break;
case GST_EVENT_EOS:
- /* FIXME : Figure out */
+ {
+ const GstStructure *s = gst_event_get_structure (ev);
+ slot->is_drained = TRUE;
+
+ /* Custom EOS handling first */
+ if (s && gst_structure_has_field (s, "decodebin3-custom-eos")) {
+ ret = GST_PAD_PROBE_DROP;
+ SELECTION_LOCK (dbin);
+ if (slot->input == NULL) {
+ GST_DEBUG_OBJECT (pad,
+ "Got custom-eos from null input stream, remove output stream");
+ /* Remove the output */
+ if (slot->output) {
+ DecodebinOutputStream *output = slot->output;
+ dbin->output_streams =
+ g_list_remove (dbin->output_streams, output);
+ free_output_stream (dbin, output);
+ }
+ slot->probe_id = 0;
+ dbin->slots = g_list_remove (dbin->slots, slot);
+ free_multiqueue_slot_async (dbin, slot);
+ ret = GST_PAD_PROBE_REMOVE;
+ } else {
+ check_all_slot_for_eos (dbin);
+ }
+ SELECTION_UNLOCK (dbin);
+ break;
+ }
GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
slot->input);
- slot->is_drained = TRUE;
if (slot->input == NULL) {
GstPad *peer;
GST_DEBUG_OBJECT (pad,
}
slot->probe_id = 0;
dbin->slots = g_list_remove (dbin->slots, slot);
- free_multiqueue_slot_async (dbin, slot);
SELECTION_UNLOCK (dbin);
+
+ free_multiqueue_slot_async (dbin, slot);
ret = GST_PAD_PROBE_REMOVE;
- }
- break;
- case GST_EVENT_CUSTOM_DOWNSTREAM:
- if (gst_event_has_name (ev, "decodebin3-custom-eos")) {
- slot->is_drained = TRUE;
- ret = GST_PAD_PROBE_DROP;
+ } else {
+ GST_DEBUG_OBJECT (pad, "What happens with event ?");
SELECTION_LOCK (dbin);
- if (slot->input == NULL) {
- GST_DEBUG_OBJECT (pad,
- "Got custom-eos from null input stream, remove output stream");
- /* Remove the output */
- if (slot->output) {
- DecodebinOutputStream *output = slot->output;
- dbin->output_streams =
- g_list_remove (dbin->output_streams, output);
- free_output_stream (dbin, output);
- }
- slot->probe_id = 0;
- dbin->slots = g_list_remove (dbin->slots, slot);
- free_multiqueue_slot_async (dbin, slot);
- ret = GST_PAD_PROBE_REMOVE;
- }
+ check_all_slot_for_eos (dbin);
SELECTION_UNLOCK (dbin);
}
+ }
break;
default:
break;
gst_stream_type_get_name (type));
slot = g_new0 (MultiQueueSlot, 1);
slot->dbin = dbin;
+
slot->id = dbin->slot_id++;
+
slot->type = type;
slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
if (slot->sink_pad == NULL)
goto fail;
+
it = gst_pad_iterate_internal_links (slot->sink_pad);
if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
|| ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
GST_DEBUG_PAD_NAME (slot->src_pad));
+
dbin->slots = g_list_append (dbin->slots, slot);
+
return slot;
/* ERRORS */
static void
link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
{
- GstEvent *event;
if (slot->input != NULL && slot->input != input) {
GST_ERROR_OBJECT (slot->dbin,
"Trying to link input to an already used slot");
gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
slot->pending_stream = input->active_stream;
slot->input = input;
- event = gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
- if (event)
- gst_pad_send_event (slot->sink_pad, event);
}
#if 0
if (slot_to_deactivate) {
GST_DEBUG_OBJECT (dbin,
"Slot %p (%s) should be deactivated, no longer used", slot,
- gst_stream_get_stream_id (slot->active_stream));
+ slot->
+ active_stream ? gst_stream_get_stream_id (slot->active_stream) :
+ "NULL");
to_deactivate = g_list_append (to_deactivate, slot);
}
}
g_list_free (unknown);
}
+ if (to_activate && !slots_to_reassign) {
+ for (tmp = to_activate; tmp; tmp = tmp->next) {
+ MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
+ gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
+ (GstPadProbeCallback) idle_reconfigure, slot, NULL);
+ }
+ }
+
/* For all streams to deactivate, add an idle probe where we will do
* the unassignment and switch over */
for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {