3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
31 #include <gst/pbutils/pbutils.h>
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
38 * SECTION:element-decodebin3
41 * #GstBin that auto-magically constructs a decoding pipeline using available
42 * decoders and demuxers via auto-plugging. The output is raw audio, video
43 * or subtitle streams.
45 * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
47 * * supports publication and selection of stream information via
48 * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
50 * * dynamically switches stream connections internally, and
51 * reuses decoder elements when stream selections change, so that in
52 * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53 * and only creates new elements when streams change and an existing decoder
54 * is not capable of handling the new format.
56 * * supports multiple input pads for the parallel decoding of auxilliary streams
57 * not muxed with the primary stream.
59 * * does not handle network stream buffering. decodebin3 expects that network stream
60 * buffering is handled upstream, before data is passed to it.
62 * <emphasis>decodebin3 is still experimental API and a technology preview.
63 * Its behaviour and exposed API is subject to change.</emphasis>
70 * 1) From sink pad to elementary streams (GstParseBin)
72 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73 * through typefind. When the caps are detected (or changed) we recursively
74 * figure out which demuxer, parser or depayloader is needed until we get to
77 * All elementary streams (whether decoded or not, whether exposed or not) are
78 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
80 * => MultiQueue is the cornerstone.
81 * => No buffering before multiqueue
83 * 2) Elementary streams
85 * After GstParseBin, there are 3 main components:
86 * 1) Input Streams (provided by GstParseBin)
90 * Input Streams correspond to the stream coming from GstParseBin and that gets
91 * fed into a multiqueue slot.
93 * Output Streams correspond to the combination of a (optional) decoder and an
94 * output ghostpad. Output Streams can be moved from one multiqueue slot to
95 * another, can reconfigure itself (different decoders), and can be
96 * added/removed depending on the configuration (all streams outputted, only one
99 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100 * each 'active' Input Stream there is a corresponding slot.
101 * Slots might have different streams on input and output (due to internal
104 * Due to internal queuing/buffering/..., all those components (might) behave
105 * asynchronously. Therefore probes will be used on each component source pad to
106 * detect various key-points:
108 * the stream is done => Mark that component as done, optionally freeing/removing it
110 * a new stream is starting => link it further if needed
112 * 3) Gradual replacement
114 * If the caps change at any point in decodebin (input sink pad, demuxer output,
115 * multiqueue output, ..), we gradually replace (if needed) the following elements.
117 * This is handled by the probes in various locations:
119 * b) multiqueue input (source pad of Input Streams)
120 * c) multiqueue output (source pad of Multiqueue Slots)
121 * d) final output (target of source ghostpads)
123 * When CAPS event arrive at those points, one of three things can happen:
124 * a) There is no elements downstream yet, just create/link-to following elements
125 * b) There are downstream elements, do a ACCEPT_CAPS query
126 * b.1) The new CAPS are accepted, keep current configuration
127 * b.2) The new CAPS are not accepted, remove following elements then do a)
132 * Input(s) Slots Streams
133 * /-------------------------------------------\ /-----\ /------------- \
135 * +-------------------------------------------------------------------------+
137 * | +---------------------------------------------+ |
138 * | | GstParseBin(s) | |
139 * | | +--------------+ | +-----+ |
140 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
141 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
142 * | | | (if needed) |---[parser]-[|--| qu | |
143 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
144 * | | +--------------+ | +------ ^ |
145 * | +---------------------------------------------+ ^ | |
147 * +-----------------------------------------------+--------+-------------+--+
150 * Probes --/--------/-------------/
154 * We want to ensure we re-use decoders when switching streams. This takes place
155 * at the multiqueue output level.
158 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
159 * the streaming thread in the multiqueue_src_probe() and only if the
160 stream is in the REQUESTED selection.
161 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162 * within the stream thread, but only in a purposefully called IDLE probe
163 * that calls reassign_slot().
165 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166 * 1) requested_selection
167 * All streams within that list should be activated
168 * 2) active_selection
169 * List of streams that are exposed by decodebin
171 * List of streams that will be moved to requested_selection in the
172 * reassign_slot() method (i.e. once a stream was deactivated, and the output
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
180 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
182 #define EXTRA_DEBUG 1
184 typedef struct _GstDecodebin3 GstDecodebin3;
185 typedef struct _GstDecodebin3Class GstDecodebin3Class;
187 typedef struct _DecodebinInputStream DecodebinInputStream;
188 typedef struct _DecodebinInput DecodebinInput;
189 typedef struct _DecodebinOutputStream DecodebinOutputStream;
191 struct _GstDecodebin3
195 /* input_lock protects the following variables */
197 /* Main input (static sink pad) */
198 DecodebinInput *main_input;
199 /* Supplementary input (request sink pads) */
201 /* counter for input */
202 guint32 input_counter;
203 /* Current stream group_id (default : G_MAXUINT32) */
204 /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
205 guint32 current_group_id;
206 /* End of variables protected by input_lock */
208 GstElement *multiqueue;
210 /* FIXME : Mutex for protecting values below */
211 GstStreamCollection *collection; /* Active collection */
213 GList *input_streams; /* List of DecodebinInputStream for active collection */
214 GList *output_streams; /* List of DecodebinOutputStream used for output */
215 GList *slots; /* List of MultiQueueSlot */
218 /* selection_lock protects access to following variables */
219 GMutex selection_lock;
220 /* requested selection of stream-id to activate post-multiqueue */
221 GList *requested_selection;
222 /* list of stream-id currently activated in output */
223 GList *active_selection;
224 /* List of stream-id that need to be activated (after a stream switch for ex) */
226 /* Pending select streams event */
227 guint32 select_streams_seqnum;
228 /* pending list of streams to select (from downstream) */
229 GList *pending_select_streams;
230 /* TRUE if requested_selection was updated, will become FALSE once
231 * it has fully transitioned to active */
232 gboolean selection_updated;
233 /* End of variables protected by selection_lock */
235 /* List of pending collections.
236 * FIXME : Is this really needed ? */
237 GList *pending_collection;
241 GMutex factories_lock;
242 guint32 factories_cookie;
243 /* All DECODABLE factories */
245 /* Only DECODER factories */
246 GList *decoder_factories;
247 /* DECODABLE but not DECODER factories */
248 GList *decodable_factories;
250 /* counters for pads */
251 guint32 apadcount, vpadcount, tpadcount, opadcount;
257 struct _GstDecodebin3Class
261 gint (*select_stream) (GstDecodebin3 * dbin,
262 GstStreamCollection * collection, GstStream * stream);
265 /* Input of decodebin, controls input pad and parsebin */
266 struct _DecodebinInput
273 GstPad *parsebin_sink;
275 GstStreamCollection *collection; /* Active collection */
279 GstElement *parsebin;
281 gulong pad_added_sigid;
282 gulong pad_removed_sigid;
284 /* HACK : Remove these fields */
285 /* List of PendingPad structures */
289 /* Multiqueue Slots */
290 typedef struct _MultiQueueSlot
295 /* Type of stream handled by this slot */
298 /* Linked input and output */
299 DecodebinInputStream *input;
301 /* pending => last stream received on sink pad */
302 GstStream *pending_stream;
303 /* active => last stream outputted on source pad */
304 GstStream *active_stream;
306 GstPad *sink_pad, *src_pad;
308 /* id of the MQ src_pad event probe */
313 DecodebinOutputStream *output;
316 /* Streams that are exposed downstream (i.e. output) */
317 struct _DecodebinOutputStream
320 /* The type of stream handled by this output stream */
323 /* The slot to which this output stream is currently connected to */
324 MultiQueueSlot *slot;
326 GstElement *decoder; /* Optional */
327 GstPad *decoder_sink, *decoder_src;
332 /* Flag if ghost pad is exposed */
333 gboolean src_exposed;
335 /* keyframe dropping probe */
336 gulong drop_probe_id;
339 /* Pending pads from parsebin */
340 typedef struct _PendingPad
343 DecodebinInput *input;
352 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
363 SIGNAL_SELECT_STREAM,
366 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
368 #define SELECTION_LOCK(dbin) G_STMT_START { \
369 GST_LOG_OBJECT (dbin, \
370 "selection locking from thread %p", \
372 g_mutex_lock (&dbin->selection_lock); \
373 GST_LOG_OBJECT (dbin, \
374 "selection locked from thread %p", \
378 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
379 GST_LOG_OBJECT (dbin, \
380 "selection unlocking from thread %p", \
382 g_mutex_unlock (&dbin->selection_lock); \
385 #define INPUT_LOCK(dbin) G_STMT_START { \
386 GST_LOG_OBJECT (dbin, \
387 "input locking from thread %p", \
389 g_mutex_lock (&dbin->input_lock); \
390 GST_LOG_OBJECT (dbin, \
391 "input locked from thread %p", \
395 #define INPUT_UNLOCK(dbin) G_STMT_START { \
396 GST_LOG_OBJECT (dbin, \
397 "input unlocking from thread %p", \
399 g_mutex_unlock (&dbin->input_lock); \
402 GType gst_decodebin3_get_type (void);
403 #define gst_decodebin3_parent_class parent_class
404 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
406 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
408 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
411 GST_STATIC_CAPS_ANY);
413 static GstStaticPadTemplate request_sink_template =
414 GST_STATIC_PAD_TEMPLATE ("sink_%u",
417 GST_STATIC_CAPS_ANY);
419 static GstStaticPadTemplate video_src_template =
420 GST_STATIC_PAD_TEMPLATE ("video_%u",
423 GST_STATIC_CAPS_ANY);
425 static GstStaticPadTemplate audio_src_template =
426 GST_STATIC_PAD_TEMPLATE ("audio_%u",
429 GST_STATIC_CAPS_ANY);
431 static GstStaticPadTemplate text_src_template =
432 GST_STATIC_PAD_TEMPLATE ("text_%u",
435 GST_STATIC_CAPS_ANY);
437 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
440 GST_STATIC_CAPS_ANY);
443 static void gst_decodebin3_dispose (GObject * object);
444 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
445 const GValue * value, GParamSpec * pspec);
446 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
447 GValue * value, GParamSpec * pspec);
449 static gboolean parsebin_autoplug_continue_cb (GstElement *
450 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
453 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
454 GstStreamCollection * collection, GstStream * stream)
456 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
461 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
462 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
463 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
464 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
465 GstStateChange transition);
466 static gboolean gst_decodebin3_send_event (GstElement * element,
469 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
471 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
472 GstElementFactoryListType ftype);
475 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
476 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
477 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
478 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
480 static void reconfigure_output_stream (DecodebinOutputStream * output,
481 MultiQueueSlot * slot);
482 static void free_output_stream (GstDecodebin3 * dbin,
483 DecodebinOutputStream * output);
484 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
487 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
488 GstPadProbeInfo * info, MultiQueueSlot * slot);
489 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
490 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
491 DecodebinInputStream * input);
492 static void link_input_to_slot (DecodebinInputStream * input,
493 MultiQueueSlot * slot);
494 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
495 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
496 MultiQueueSlot * slot);
498 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
499 static void update_requested_selection (GstDecodebin3 * dbin,
500 GstStreamCollection * collection);
502 /* FIXME: Really make all the parser stuff a self-contained helper object */
503 #include "gstdecodebin3-parse.c"
506 _gst_int_accumulator (GSignalInvocationHint * ihint,
507 GValue * return_accu, const GValue * handler_return, gpointer dummy)
509 gint res = g_value_get_int (handler_return);
511 if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
512 g_value_set_int (return_accu, res);
521 gst_decodebin3_class_init (GstDecodebin3Class * klass)
523 GObjectClass *gobject_klass = (GObjectClass *) klass;
524 GstElementClass *element_class = (GstElementClass *) klass;
525 GstBinClass *bin_klass = (GstBinClass *) klass;
527 gobject_klass->dispose = gst_decodebin3_dispose;
528 gobject_klass->set_property = gst_decodebin3_set_property;
529 gobject_klass->get_property = gst_decodebin3_get_property;
531 /* FIXME : ADD PROPERTIES ! */
532 g_object_class_install_property (gobject_klass, PROP_CAPS,
533 g_param_spec_boxed ("caps", "Caps",
534 "The caps on which to stop decoding. (NULL = default)",
535 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
537 /* FIXME : ADD SIGNALS ! */
539 * GstDecodebin3::select-stream
540 * @decodebin: a #GstDecodebin3
541 * @collection: a #GstStreamCollection
542 * @stream: a #GstStream
544 * This signal is emitted whenever @decodebin needs to decide whether
545 * to expose a @stream of a given @collection.
547 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
548 * A value of -1 (default) lets @decodebin decide what to do with the stream.
550 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
551 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
552 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
553 _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
554 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
557 element_class->request_new_pad =
558 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
559 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
560 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
562 gst_element_class_add_pad_template (element_class,
563 gst_static_pad_template_get (&sink_template));
564 gst_element_class_add_pad_template (element_class,
565 gst_static_pad_template_get (&request_sink_template));
566 gst_element_class_add_pad_template (element_class,
567 gst_static_pad_template_get (&video_src_template));
568 gst_element_class_add_pad_template (element_class,
569 gst_static_pad_template_get (&audio_src_template));
570 gst_element_class_add_pad_template (element_class,
571 gst_static_pad_template_get (&text_src_template));
572 gst_element_class_add_pad_template (element_class,
573 gst_static_pad_template_get (&src_template));
575 gst_element_class_set_static_metadata (element_class,
576 "Decoder Bin 3", "Generic/Bin/Decoder",
577 "Autoplug and decode to raw media",
578 "Edward Hervey <edward@centricular.com>");
580 bin_klass->handle_message = gst_decodebin3_handle_message;
582 klass->select_stream = gst_decodebin3_select_stream;
586 gst_decodebin3_init (GstDecodebin3 * dbin)
588 /* Create main input */
589 dbin->main_input = create_new_input (dbin, TRUE);
591 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
592 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
593 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
594 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
596 dbin->current_group_id = G_MAXUINT32;
598 g_mutex_init (&dbin->factories_lock);
599 g_mutex_init (&dbin->selection_lock);
600 g_mutex_init (&dbin->input_lock);
602 dbin->caps = gst_static_caps_get (&default_raw_caps);
604 GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
608 gst_decodebin3_dispose (GObject * object)
610 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
614 gst_plugin_feature_list_free (dbin->factories);
615 if (dbin->decoder_factories)
616 g_list_free (dbin->decoder_factories);
617 if (dbin->decodable_factories)
618 g_list_free (dbin->decodable_factories);
619 g_list_free_full (dbin->requested_selection, g_free);
620 g_list_free (dbin->active_selection);
621 g_list_free (dbin->to_activate);
622 g_list_free (dbin->pending_select_streams);
623 g_clear_object (&dbin->collection);
625 free_input (dbin, dbin->main_input);
627 for (walk = dbin->other_inputs; walk; walk = next) {
628 DecodebinInput *input = walk->data;
630 next = g_list_next (walk);
632 free_input (dbin, input);
633 dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
636 G_OBJECT_CLASS (parent_class)->dispose (object);
640 gst_decodebin3_set_property (GObject * object, guint prop_id,
641 const GValue * value, GParamSpec * pspec)
643 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
645 /* FIXME : IMPLEMENT */
648 GST_OBJECT_LOCK (dbin);
650 gst_caps_unref (dbin->caps);
651 dbin->caps = g_value_dup_boxed (value);
652 GST_OBJECT_UNLOCK (dbin);
655 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
661 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
664 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
666 /* FIXME : IMPLEMENT */
669 GST_OBJECT_LOCK (dbin);
670 g_value_set_boxed (value, dbin->caps);
671 GST_OBJECT_UNLOCK (dbin);
674 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
680 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
681 GstCaps * caps, GstDecodebin3 * dbin)
683 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
685 /* If it matches our target caps, expose it */
686 if (gst_caps_can_intersect (caps, dbin->caps))
692 /* This method should be called whenever a STREAM_START event
693 * comes out of a given parsebin.
694 * The caller shall replace the group_id if the function returns TRUE */
696 set_input_group_id (DecodebinInput * input, guint32 * group_id)
698 GstDecodebin3 *dbin = input->dbin;
700 if (input->group_id != *group_id) {
701 if (input->group_id != G_MAXUINT32)
702 GST_WARNING_OBJECT (dbin,
703 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
704 ") on input %p ", input->group_id, *group_id, input);
705 input->group_id = *group_id;
708 if (*group_id != dbin->current_group_id) {
709 if (dbin->current_group_id == G_MAXUINT32) {
710 GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
712 dbin->current_group_id = *group_id;
714 *group_id = dbin->current_group_id;
721 /* Call with INPUT_LOCK taken */
723 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
725 gboolean set_state = FALSE;
727 if (input->parsebin == NULL) {
728 input->parsebin = gst_element_factory_make ("parsebin", NULL);
729 if (input->parsebin == NULL)
731 input->parsebin = gst_object_ref (input->parsebin);
732 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
733 input->pad_added_sigid =
734 g_signal_connect (input->parsebin, "pad-added",
735 (GCallback) parsebin_pad_added_cb, input);
736 input->pad_removed_sigid =
737 g_signal_connect (input->parsebin, "pad-removed",
738 (GCallback) parsebin_pad_removed_cb, input);
739 g_signal_connect (input->parsebin, "autoplug-continue",
740 (GCallback) parsebin_autoplug_continue_cb, dbin);
743 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
744 gst_bin_add (GST_BIN (dbin), input->parsebin);
748 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
749 input->parsebin_sink);
751 gst_element_sync_state_with_parent (input->parsebin);
758 gst_element_post_message ((GstElement *) dbin,
759 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
764 static GstPadLinkReturn
765 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
767 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
768 GstPadLinkReturn res = GST_PAD_LINK_OK;
769 DecodebinInput *input;
771 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
772 ". Creating parsebin if needed", pad);
774 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
778 if (!ensure_input_parsebin (dbin, input))
779 res = GST_PAD_LINK_REFUSED;
784 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
785 return GST_PAD_LINK_REFUSED;
788 /* Drop duration query during _input_pad_unlink */
789 static GstPadProbeReturn
790 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
791 DecodebinInput * input)
793 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
795 if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
796 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
797 if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
798 GST_LOG_OBJECT (pad, "stop forwarding query duration");
799 ret = GST_PAD_PROBE_HANDLED;
807 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
809 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
810 DecodebinInput *input;
812 GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
813 ". Removing parsebin.", pad);
815 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
819 if (input->parsebin == NULL) {
824 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
825 GstStreamCollection *collection = NULL;
826 gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
827 GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
828 (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
830 /* Clear stream-collection corresponding to current INPUT and post new
831 * stream-collection message, if needed */
832 if (input->collection) {
833 gst_object_unref (input->collection);
834 input->collection = NULL;
837 collection = get_merged_collection (dbin);
838 if (collection && collection != dbin->collection) {
840 GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
842 if (dbin->collection)
843 gst_object_unref (dbin->collection);
844 dbin->collection = collection;
847 gst_message_new_stream_collection ((GstObject *) dbin,
850 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
851 update_requested_selection (dbin, dbin->collection);
854 gst_bin_remove (GST_BIN (dbin), input->parsebin);
855 gst_element_set_state (input->parsebin, GST_STATE_NULL);
856 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
857 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
858 gst_pad_remove_probe (input->parsebin_sink, probe_id);
859 gst_object_unref (input->parsebin);
860 gst_object_unref (input->parsebin_sink);
862 input->parsebin = NULL;
863 input->parsebin_sink = NULL;
865 if (!input->is_main) {
866 dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
867 free_input_async (dbin, input);
874 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
879 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
881 GST_DEBUG ("Freeing input %p", input);
882 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
883 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
884 if (input->parsebin) {
885 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
886 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
887 gst_element_set_state (input->parsebin, GST_STATE_NULL);
888 gst_object_unref (input->parsebin);
889 gst_object_unref (input->parsebin_sink);
891 if (input->collection)
892 gst_object_unref (input->collection);
897 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
899 GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
900 gst_element_call_async (GST_ELEMENT_CAST (dbin),
901 (GstElementCallAsyncFunc) free_input, input, NULL);
904 /* Call with INPUT_LOCK taken */
905 static DecodebinInput *
906 create_new_input (GstDecodebin3 * dbin, gboolean main)
908 DecodebinInput *input;
910 input = g_new0 (DecodebinInput, 1);
912 input->is_main = main;
913 input->group_id = G_MAXUINT32;
915 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
917 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
918 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
921 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
922 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
923 gst_pad_set_unlink_function (input->ghost_sink,
924 gst_decodebin3_input_pad_unlink);
926 gst_pad_set_active (input->ghost_sink, TRUE);
927 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
934 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
935 const gchar * name, const GstCaps * caps)
937 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
938 DecodebinInput *input;
941 /* We are ignoring names for the time being, not sure it makes any sense
942 * within the context of decodebin3 ... */
944 input = create_new_input (dbin, FALSE);
946 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
947 res = input->ghost_sink;
954 /* Must be called with factories lock! */
956 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
960 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
961 if (!dbin->factories || dbin->factories_cookie != cookie) {
964 gst_plugin_feature_list_free (dbin->factories);
965 if (dbin->decoder_factories)
966 g_list_free (dbin->decoder_factories);
967 if (dbin->decodable_factories)
968 g_list_free (dbin->decodable_factories);
970 gst_element_factory_list_get_elements
971 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
973 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
974 dbin->factories_cookie = cookie;
976 /* Filter decoder and other decodables */
977 dbin->decoder_factories = NULL;
978 dbin->decodable_factories = NULL;
979 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
980 GstElementFactory *fact = (GstElementFactory *) tmp->data;
981 if (gst_element_factory_list_is_type (fact,
982 GST_ELEMENT_FACTORY_TYPE_DECODER))
983 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
985 dbin->decodable_factories =
986 g_list_append (dbin->decodable_factories, fact);
991 /* Must be called with appropriate lock if list is a protected variable */
993 stream_in_list (GList * list, const gchar * sid)
998 for (tmp = list; tmp; tmp = tmp->next) {
999 gchar *osid = (gchar *) tmp->data;
1000 GST_DEBUG ("Checking %s against %s", sid, osid);
1004 for (tmp = list; tmp; tmp = tmp->next) {
1005 const gchar *osid = (gchar *) tmp->data;
1006 if (!g_strcmp0 (sid, osid))
1014 update_requested_selection (GstDecodebin3 * dbin,
1015 GstStreamCollection * collection)
1019 GstStreamType used_types = 0;
1021 nb = gst_stream_collection_get_size (collection);
1023 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1024 * the switch handler will take care of the pending selection */
1025 SELECTION_LOCK (dbin);
1026 if (dbin->pending_select_streams) {
1027 GST_DEBUG_OBJECT (dbin,
1028 "No need to create pending selection, SELECT_STREAMS underway");
1032 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1033 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1035 /* 3. If not, check if we already have some of the streams in the
1036 * existing active/requested selection */
1037 for (i = 0; i < nb; i++) {
1038 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1039 const gchar *sid = gst_stream_get_stream_id (stream);
1041 /* Fire select-stream signal to see if outside components want to
1042 * hint at which streams should be selected */
1043 g_signal_emit (G_OBJECT (dbin),
1044 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1046 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1047 if (request == 1 || (request == -1
1048 && (stream_in_list (dbin->requested_selection, sid)
1049 || stream_in_list (dbin->active_selection, sid)))) {
1050 GstStreamType curtype = gst_stream_get_stream_type (stream);
1052 GST_DEBUG_OBJECT (dbin,
1053 "Using stream requested by 'select-stream' signal : %s", sid);
1055 GST_DEBUG_OBJECT (dbin,
1056 "Re-using stream already present in requested or active selection : %s",
1058 tmp = g_list_append (tmp, (gchar *) sid);
1059 used_types |= curtype;
1063 /* 4. If not, match one stream of each type */
1064 for (i = 0; i < nb; i++) {
1065 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1066 GstStreamType curtype = gst_stream_get_stream_type (stream);
1067 if (!(used_types & curtype)) {
1068 const gchar *sid = gst_stream_get_stream_id (stream);
1069 GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
1070 sid, gst_stream_type_get_name (curtype));
1071 tmp = g_list_append (tmp, (gchar *) sid);
1072 used_types |= curtype;
1077 /* Finally set the requested selection */
1079 if (dbin->requested_selection) {
1080 GST_FIXME_OBJECT (dbin,
1081 "Replacing non-NULL requested_selection, what should we do ??");
1082 g_list_free_full (dbin->requested_selection, g_free);
1084 dbin->requested_selection =
1085 g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1086 dbin->selection_updated = TRUE;
1089 SELECTION_UNLOCK (dbin);
1092 /* Call with INPUT_LOCK taken */
1093 static GstStreamCollection *
1094 get_merged_collection (GstDecodebin3 * dbin)
1096 gboolean needs_merge = FALSE;
1097 GstStreamCollection *res = NULL;
1101 /* First check if we need to do a merge or just return the only collection */
1102 res = dbin->main_input->collection;
1104 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1105 DecodebinInput *input = (DecodebinInput *) tmp->data;
1106 if (input->collection) {
1111 res = input->collection;
1116 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1117 return res ? gst_object_ref (res) : NULL;
1120 /* We really need to create a new collection */
1121 /* FIXME : Some numbering scheme maybe ?? */
1122 res = gst_stream_collection_new ("decodebin3");
1123 if (dbin->main_input->collection) {
1124 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1125 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1126 for (i = 0; i < nb_stream; i++) {
1128 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1129 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1133 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1134 DecodebinInput *input = (DecodebinInput *) tmp->data;
1135 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1137 if (input->collection) {
1138 nb_stream = gst_stream_collection_get_size (input->collection);
1139 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1140 for (i = 0; i < nb_stream; i++) {
1142 gst_stream_collection_get_stream (input->collection, i);
1143 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1151 /* Call with INPUT_LOCK taken */
1152 static DecodebinInput *
1153 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1155 DecodebinInput *input = NULL;
1156 GstElement *parent = gst_object_ref (child);
1160 GstElement *next_parent;
1162 GST_DEBUG_OBJECT (dbin, "parent %s",
1163 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1165 if (parent == dbin->main_input->parsebin) {
1166 input = dbin->main_input;
1169 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1170 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1171 if (parent == cur->parsebin) {
1176 next_parent = (GstElement *) gst_element_get_parent (parent);
1177 gst_object_unref (parent);
1178 parent = next_parent;
1180 } while (parent && parent != (GstElement *) dbin);
1183 gst_object_unref (parent);
1188 static const gchar *
1189 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1193 if (dbin->collection == NULL)
1195 len = gst_stream_collection_get_size (dbin->collection);
1196 for (i = 0; i < len; i++) {
1197 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1198 const gchar *osid = gst_stream_get_stream_id (stream);
1199 if (!g_strcmp0 (sid, osid))
1206 /* Call with INPUT_LOCK taken */
1208 handle_stream_collection (GstDecodebin3 * dbin,
1209 GstStreamCollection * collection, GstElement * child)
1211 #ifndef GST_DISABLE_GST_DEBUG
1212 const gchar *upstream_id;
1215 DecodebinInput *input = find_message_parsebin (dbin, child);
1218 GST_DEBUG_OBJECT (dbin,
1219 "Couldn't find corresponding input, most likely shutting down");
1223 /* Replace collection in input */
1224 if (input->collection)
1225 gst_object_unref (input->collection);
1226 input->collection = gst_object_ref (collection);
1227 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1230 /* Merge collection if needed */
1231 collection = get_merged_collection (dbin);
1233 #ifndef GST_DISABLE_GST_DEBUG
1234 /* Just some debugging */
1235 upstream_id = gst_stream_collection_get_upstream_id (collection);
1236 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1237 GST_DEBUG ("From input %p", input);
1238 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1239 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1240 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1241 GstTagList *taglist;
1244 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1245 GST_DEBUG (" type : %s",
1246 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1247 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1248 taglist = gst_stream_get_tags (stream);
1249 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1250 caps = gst_stream_get_caps (stream);
1251 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1253 gst_tag_list_unref (taglist);
1255 gst_caps_unref (caps);
1259 /* Store collection for later usage */
1260 if (dbin->collection == NULL) {
1261 dbin->collection = collection;
1263 /* We need to check who emitted this collection (the owner).
1264 * If we already had a collection from that user, this one is an update,
1265 * that is to say that we need to figure out how we are going to re-use
1266 * the streams/slot */
1267 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1268 /* FIXME : When do we switch from pending collection to active collection ?
1269 * When all streams from active collection are drained in multiqueue output ? */
1270 gst_object_unref (dbin->collection);
1271 dbin->collection = collection;
1272 /* dbin->pending_collection = */
1273 /* g_list_append (dbin->pending_collection, collection); */
1278 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1280 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1281 gboolean posting_collection = FALSE;
1283 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1285 switch (GST_MESSAGE_TYPE (message)) {
1286 case GST_MESSAGE_STREAM_COLLECTION:
1288 GstStreamCollection *collection = NULL;
1289 gst_message_parse_stream_collection (message, &collection);
1292 handle_stream_collection (dbin, collection,
1293 (GstElement *) GST_MESSAGE_SRC (message));
1294 posting_collection = TRUE;
1295 INPUT_UNLOCK (dbin);
1297 if (dbin->collection && collection != dbin->collection) {
1298 /* Replace collection message, we most likely aggregated it */
1299 GstMessage *new_msg;
1301 gst_message_new_stream_collection ((GstObject *) dbin,
1303 gst_message_unref (message);
1307 gst_object_unref (collection);
1314 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1316 if (posting_collection) {
1317 /* Figure out a selection for that collection */
1318 update_requested_selection (dbin, dbin->collection);
1322 static DecodebinOutputStream *
1323 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1326 GstStreamType stype = gst_stream_get_stream_type (stream);
1328 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1329 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1330 if (output->type == stype && output->slot && output->slot->active_stream) {
1331 GstStream *tstream = output->slot->active_stream;
1332 if (!stream_in_list (dbin->requested_selection,
1333 (gchar *) gst_stream_get_stream_id (tstream))) {
1342 /* Give a certain slot, figure out if it should be linked to an
1344 * CALL WITH SELECTION LOCK TAKEN !*/
1345 static DecodebinOutputStream *
1346 get_output_for_slot (MultiQueueSlot * slot)
1348 GstDecodebin3 *dbin = slot->dbin;
1349 DecodebinOutputStream *output = NULL;
1350 const gchar *stream_id;
1352 gchar *id_in_list = NULL;
1354 /* If we already have a configured output, just use it */
1355 if (slot->output != NULL)
1356 return slot->output;
1361 * This method needs to be split into multiple parts
1363 * 1) Figure out whether stream should be exposed or not
1364 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1365 * in the default stream attribution
1367 * 2) Figure out whether an output stream should be created, whether
1368 * we can re-use the output stream already linked to the slot, or
1369 * whether we need to get re-assigned another (currently used) output
1373 stream_id = gst_stream_get_stream_id (slot->active_stream);
1374 caps = gst_stream_get_caps (slot->active_stream);
1375 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1376 gst_caps_unref (caps);
1378 /* 0. Emit autoplug-continue signal for pending caps ? */
1379 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1381 /* 1. if in EXPOSE_ALL_MODE, just accept */
1382 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1385 /* FIXME : The idea around this was to avoid activating a stream for
1386 * which we have no decoder. Unfortunately it is way too
1387 * expensive. Need to figure out a better solution */
1388 /* 2. Is there a potential decoder (if one is required) */
1389 if (!gst_caps_can_intersect (caps, dbin->caps)
1390 && !have_factory (dbin, (GstCaps *) caps,
1391 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1392 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1394 SELECTION_UNLOCK (dbin);
1395 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1396 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1397 SELECTION_LOCK (dbin);
1402 /* 3. In default mode check if we should expose */
1403 id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1405 /* Check if we can steal an existing output stream we could re-use.
1407 * * an output stream whose slot->stream is not in requested
1408 * * and is of the same type as this stream
1410 output = find_free_compatible_output (dbin, slot->active_stream);
1412 /* Move this output from its current slot to this slot */
1414 g_list_append (dbin->to_activate, (gchar *) stream_id);
1415 dbin->requested_selection =
1416 g_list_remove (dbin->requested_selection, id_in_list);
1417 g_free (id_in_list);
1418 SELECTION_UNLOCK (dbin);
1419 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1420 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1421 SELECTION_LOCK (dbin);
1425 output = create_output_stream (dbin, slot->type);
1426 output->slot = slot;
1427 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1428 slot->output = output;
1429 dbin->active_selection =
1430 g_list_append (dbin->active_selection, (gchar *) stream_id);
1432 GST_DEBUG ("Not creating any output for slot %p", slot);
1437 /* Returns SELECTED_STREAMS message if active_selection is equal to
1438 * requested_selection, else NULL.
1439 * Must be called with LOCK taken */
1441 is_selection_done (GstDecodebin3 * dbin)
1446 if (!dbin->selection_updated)
1449 GST_LOG_OBJECT (dbin, "Checking");
1451 if (dbin->to_activate != NULL) {
1452 GST_DEBUG ("Still have streams to activate");
1455 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1456 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1457 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1458 GST_DEBUG ("Not in active selection, returning");
1463 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1465 /* We are completely active */
1466 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1467 GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1468 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1469 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1471 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1472 gst_stream_get_stream_id (output->slot->active_stream));
1474 gst_message_streams_selected_add (msg, output->slot->active_stream);
1476 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1478 dbin->selection_updated = FALSE;
1482 /* Must be called with SELECTION_LOCK taken */
1484 check_all_slot_for_eos (GstDecodebin3 * dbin)
1486 gboolean all_drained = TRUE;
1489 GST_DEBUG_OBJECT (dbin, "check slot for eos");
1491 for (iter = dbin->slots; iter; iter = iter->next) {
1492 MultiQueueSlot *slot = iter->data;
1497 if (slot->is_drained) {
1498 GST_DEBUG_OBJECT (slot->sink_pad, "slot %p is draned", slot);
1502 all_drained = FALSE;
1508 if (!pending_pads_are_eos (dbin->main_input))
1509 all_drained = FALSE;
1512 for (iter = dbin->other_inputs; iter; iter = iter->next) {
1513 if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1514 all_drained = FALSE;
1519 INPUT_UNLOCK (dbin);
1523 GST_DEBUG_OBJECT (dbin,
1524 "All active slots are drained, and no pending input, push EOS");
1526 for (iter = dbin->input_streams; iter; iter = iter->next) {
1527 DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1528 GstPad *peer = gst_pad_get_peer (input->srcpad);
1530 /* Send EOS and then remove elements */
1532 gst_pad_send_event (peer, gst_event_new_eos ());
1533 gst_object_unref (peer);
1539 static GstPadProbeReturn
1540 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1541 MultiQueueSlot * slot)
1543 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1544 GstDecodebin3 *dbin = slot->dbin;
1546 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1547 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1549 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1550 switch (GST_EVENT_TYPE (ev)) {
1551 case GST_EVENT_STREAM_START:
1553 GstStream *stream = NULL;
1554 const gchar *stream_id;
1556 gst_event_parse_stream (ev, &stream);
1557 if (stream == NULL) {
1558 GST_ERROR_OBJECT (pad,
1559 "Got a STREAM_START event without a GstStream");
1562 slot->is_drained = FALSE;
1563 stream_id = gst_stream_get_stream_id (stream);
1564 GST_DEBUG_OBJECT (pad, "Stream Start '%s'", stream_id);
1565 if (slot->active_stream == NULL) {
1566 slot->active_stream = stream;
1567 } else if (slot->active_stream != stream) {
1568 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1569 gst_stream_get_stream_id (slot->active_stream),
1570 gst_stream_get_stream_id (stream));
1571 gst_object_unref (slot->active_stream);
1572 slot->active_stream = stream;
1574 gst_object_unref (stream);
1575 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1577 gboolean is_active, is_requested;
1578 /* Quick check to see if we're in the current selection */
1579 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1580 SELECTION_LOCK (dbin);
1581 GST_DEBUG_OBJECT (dbin, "Checking active selection");
1582 is_active = stream_in_list (dbin->active_selection, stream_id);
1583 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1584 is_requested = stream_in_list (dbin->requested_selection, stream_id);
1585 SELECTION_UNLOCK (dbin);
1587 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1590 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1592 else if (slot->output) {
1593 GST_DEBUG_OBJECT (pad,
1594 "Slot needs to be deactivated ? It's no longer in requested selection");
1595 } else if (!is_active)
1596 GST_DEBUG_OBJECT (pad,
1597 "Slot in neither active nor requested selection");
1602 case GST_EVENT_CAPS:
1604 /* Configure the output slot if needed */
1605 DecodebinOutputStream *output;
1606 GstMessage *msg = NULL;
1607 SELECTION_LOCK (dbin);
1608 output = get_output_for_slot (slot);
1610 reconfigure_output_stream (output, slot);
1611 msg = is_selection_done (dbin);
1613 SELECTION_UNLOCK (dbin);
1615 gst_element_post_message ((GstElement *) slot->dbin, msg);
1619 /* FIXME : Figure out */
1620 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1622 slot->is_drained = TRUE;
1623 if (slot->input == NULL) {
1625 GST_DEBUG_OBJECT (pad,
1626 "last EOS for input, forwarding and removing slot");
1627 peer = gst_pad_get_peer (pad);
1629 gst_pad_send_event (peer, ev);
1630 gst_object_unref (peer);
1632 gst_event_unref (ev);
1634 SELECTION_LOCK (dbin);
1635 /* FIXME : Shouldn't we try to re-assign the output instead of just
1637 /* Remove the output */
1639 DecodebinOutputStream *output = slot->output;
1640 dbin->output_streams = g_list_remove (dbin->output_streams, output);
1641 free_output_stream (dbin, output);
1644 dbin->slots = g_list_remove (dbin->slots, slot);
1645 free_multiqueue_slot_async (dbin, slot);
1646 SELECTION_UNLOCK (dbin);
1647 ret = GST_PAD_PROBE_REMOVE;
1650 case GST_EVENT_CUSTOM_DOWNSTREAM:
1651 if (gst_event_has_name (ev, "decodebin3-custom-eos")) {
1652 slot->is_drained = TRUE;
1653 ret = GST_PAD_PROBE_DROP;
1654 SELECTION_LOCK (dbin);
1655 if (slot->input == NULL) {
1656 GST_DEBUG_OBJECT (pad,
1657 "Got custom-eos from null input stream, remove output stream");
1658 /* Remove the output */
1660 DecodebinOutputStream *output = slot->output;
1661 dbin->output_streams =
1662 g_list_remove (dbin->output_streams, output);
1663 free_output_stream (dbin, output);
1666 dbin->slots = g_list_remove (dbin->slots, slot);
1667 free_multiqueue_slot_async (dbin, slot);
1668 ret = GST_PAD_PROBE_REMOVE;
1670 check_all_slot_for_eos (dbin);
1672 SELECTION_UNLOCK (dbin);
1678 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1679 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1680 switch (GST_QUERY_TYPE (query)) {
1681 case GST_QUERY_CAPS:
1683 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1684 gst_query_set_caps_result (query, GST_CAPS_ANY);
1685 ret = GST_PAD_PROBE_HANDLED;
1689 case GST_QUERY_ACCEPT_CAPS:
1691 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1692 /* If the current decoder doesn't accept caps, we'll reconfigure
1693 * on the actual caps event. So accept any caps. */
1694 gst_query_set_accept_caps_result (query, TRUE);
1695 ret = GST_PAD_PROBE_HANDLED;
1705 /* Create a new multiqueue slot for the given type
1707 * It is up to the caller to know whether that slot is needed or not
1708 * (and release it when no longer needed) */
1709 static MultiQueueSlot *
1710 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1712 MultiQueueSlot *slot;
1713 GstIterator *it = NULL;
1714 GValue item = { 0, };
1716 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1717 gst_stream_type_get_name (type));
1718 slot = g_new0 (MultiQueueSlot, 1);
1720 slot->id = dbin->slot_id++;
1722 slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1723 if (slot->sink_pad == NULL)
1725 it = gst_pad_iterate_internal_links (slot->sink_pad);
1726 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1727 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1728 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1729 GST_DEBUG_PAD_NAME (slot->src_pad));
1732 gst_iterator_free (it);
1733 g_value_reset (&item);
1735 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1737 /* Add event probe */
1739 gst_pad_add_probe (slot->src_pad,
1740 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1741 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1743 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1744 GST_DEBUG_PAD_NAME (slot->src_pad));
1745 dbin->slots = g_list_append (dbin->slots, slot);
1752 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1758 /* Must be called with SELECTION_LOCK */
1759 static MultiQueueSlot *
1760 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1763 MultiQueueSlot *empty_slot = NULL;
1764 GstStreamType input_type = 0;
1765 gchar *stream_id = NULL;
1767 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1768 input, input->active_stream,
1770 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1772 if (input->active_stream) {
1773 input_type = gst_stream_get_stream_type (input->active_stream);
1774 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1777 /* Go over existing slots and check if there is already one for it */
1778 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1779 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1780 /* Already used input, return that one */
1781 if (slot->input == input) {
1782 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1787 /* Go amongst all unused slots of the right type and try to find a candidate */
1788 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1789 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1790 if (slot->input == NULL && input_type == slot->type) {
1791 /* Remember this empty slot for later */
1793 /* Check if available slot is of the same stream_id */
1794 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
1795 slot->id, slot->active_stream);
1796 if (stream_id && slot->active_stream) {
1798 (gchar *) gst_stream_get_stream_id (slot->active_stream);
1799 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
1800 ostream_id, stream_id);
1801 if (!g_strcmp0 (stream_id, ostream_id))
1808 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
1809 empty_slot->input = input;
1814 return create_new_slot (dbin, input_type);
1820 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
1822 if (slot->input != NULL && slot->input != input) {
1823 GST_ERROR_OBJECT (slot->dbin,
1824 "Trying to link input to an already used slot");
1827 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
1828 slot->pending_stream = input->active_stream;
1829 slot->input = input;
1834 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
1835 GstElementFactoryListType ftype)
1837 gboolean ret = FALSE;
1840 g_mutex_lock (&dbin->factories_lock);
1841 gst_decode_bin_update_factories_list (dbin);
1842 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1844 gst_element_factory_list_filter (dbin->decoder_factories,
1845 caps, GST_PAD_SINK, TRUE);
1848 gst_element_factory_list_filter (dbin->decodable_factories,
1849 caps, GST_PAD_SINK, TRUE);
1850 g_mutex_unlock (&dbin->factories_lock);
1854 gst_plugin_feature_list_free (res);
1862 create_element (GstDecodebin3 * dbin, GstStream * stream,
1863 GstElementFactoryListType ftype)
1866 GstElement *element = NULL;
1869 g_mutex_lock (&dbin->factories_lock);
1870 gst_decode_bin_update_factories_list (dbin);
1871 caps = gst_stream_get_caps (stream);
1872 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1874 gst_element_factory_list_filter (dbin->decoder_factories,
1875 caps, GST_PAD_SINK, TRUE);
1878 gst_element_factory_list_filter (dbin->decodable_factories,
1879 caps, GST_PAD_SINK, TRUE);
1880 g_mutex_unlock (&dbin->factories_lock);
1884 gst_element_factory_create ((GstElementFactory *) res->data, NULL);
1885 GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
1886 gst_plugin_feature_list_free (res);
1888 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps);
1891 gst_caps_unref (caps);
1895 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
1897 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
1899 return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
1902 static GstPadProbeReturn
1903 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
1904 DecodebinOutputStream * output)
1906 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
1907 /* If we have a keyframe, remove the probe and let all data through */
1908 /* FIXME : HANDLE HEADER BUFFER ?? */
1909 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1910 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
1911 GST_DEBUG_OBJECT (pad,
1912 "Buffer is keyframe or header, letting through and removing probe");
1913 output->drop_probe_id = 0;
1914 return GST_PAD_PROBE_REMOVE;
1916 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
1917 return GST_PAD_PROBE_DROP;
1921 reconfigure_output_stream (DecodebinOutputStream * output,
1922 MultiQueueSlot * slot)
1924 GstDecodebin3 *dbin = output->dbin;
1925 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
1926 gboolean needs_decoder;
1928 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
1930 GST_DEBUG_OBJECT (dbin,
1931 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
1934 /* FIXME : Maybe make the output un-hook itself automatically ? */
1935 if (output->slot != NULL && output->slot != slot) {
1936 GST_WARNING_OBJECT (dbin,
1937 "Output still linked to another slot (%p)", output->slot);
1938 gst_caps_unref (new_caps);
1942 /* Check if existing config is reusable as-is by checking if
1943 * the existing decoder accepts the new caps, if not delete
1944 * it and create a new one */
1945 if (output->decoder) {
1946 gboolean can_reuse_decoder;
1948 if (needs_decoder) {
1950 gst_pad_query_accept_caps (output->decoder_sink, new_caps);
1952 can_reuse_decoder = FALSE;
1954 if (can_reuse_decoder) {
1955 if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
1956 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1957 output->drop_probe_id =
1958 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1959 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1961 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
1962 if (output->linked == FALSE) {
1963 gst_pad_link_full (slot->src_pad, output->decoder_sink,
1964 GST_PAD_LINK_CHECK_NOTHING);
1965 output->linked = TRUE;
1967 gst_caps_unref (new_caps);
1971 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
1974 gst_pad_unlink (slot->src_pad, output->decoder_sink);
1975 output->linked = FALSE;
1976 if (output->drop_probe_id) {
1977 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
1978 output->drop_probe_id = 0;
1981 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
1982 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
1983 gst_caps_unref (new_caps);
1987 gst_element_set_locked_state (output->decoder, TRUE);
1988 gst_element_set_state (output->decoder, GST_STATE_NULL);
1990 gst_bin_remove ((GstBin *) dbin, output->decoder);
1991 output->decoder = NULL;
1994 gst_caps_unref (new_caps);
1996 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
1997 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
1999 /* If a decoder is required, create one */
2000 if (needs_decoder) {
2001 /* If we don't have a decoder yet, instantiate one */
2002 output->decoder = create_decoder (dbin, slot->active_stream);
2003 if (output->decoder == NULL) {
2006 SELECTION_UNLOCK (dbin);
2007 /* FIXME : Should we be smarter if there's a missing decoder ?
2008 * Should we deactivate that stream ? */
2009 caps = gst_stream_get_caps (slot->active_stream);
2010 gst_element_post_message (GST_ELEMENT_CAST (dbin),
2011 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2012 gst_caps_unref (caps);
2013 SELECTION_LOCK (dbin);
2016 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2017 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2020 output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
2021 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2022 if (output->type & GST_STREAM_TYPE_VIDEO) {
2023 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2024 output->drop_probe_id =
2025 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2026 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2028 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2029 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2030 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2031 GST_DEBUG_PAD_NAME (output->decoder_sink));
2035 output->decoder_src = gst_object_ref (slot->src_pad);
2036 output->decoder_sink = NULL;
2038 output->linked = TRUE;
2039 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2040 output->decoder_src)) {
2041 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2044 if (output->src_exposed == FALSE) {
2045 output->src_exposed = TRUE;
2046 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2049 if (output->decoder)
2050 gst_element_sync_state_with_parent (output->decoder);
2052 output->slot = slot;
2057 GST_DEBUG_OBJECT (dbin, "Cleanup");
2058 if (output->decoder_sink) {
2059 gst_object_unref (output->decoder_sink);
2060 output->decoder_sink = NULL;
2062 if (output->decoder_src) {
2063 gst_object_unref (output->decoder_src);
2064 output->decoder_src = NULL;
2066 if (output->decoder) {
2067 gst_element_set_state (output->decoder, GST_STATE_NULL);
2068 gst_bin_remove ((GstBin *) dbin, output->decoder);
2069 output->decoder = NULL;
2074 static GstPadProbeReturn
2075 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2077 GstMessage *msg = NULL;
2078 DecodebinOutputStream *output;
2080 SELECTION_LOCK (slot->dbin);
2081 output = get_output_for_slot (slot);
2083 GST_DEBUG_OBJECT (pad, "output : %p", output);
2086 reconfigure_output_stream (output, slot);
2087 msg = is_selection_done (slot->dbin);
2089 SELECTION_UNLOCK (slot->dbin);
2091 gst_element_post_message ((GstElement *) slot->dbin, msg);
2093 return GST_PAD_PROBE_REMOVE;
2096 static MultiQueueSlot *
2097 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2101 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2102 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2103 const gchar *stream_id;
2104 if (slot->active_stream) {
2105 stream_id = gst_stream_get_stream_id (slot->active_stream);
2106 if (!g_strcmp0 (sid, stream_id))
2109 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2110 stream_id = gst_stream_get_stream_id (slot->pending_stream);
2111 if (!g_strcmp0 (sid, stream_id))
2119 /* This function handles the reassignment of a slot. Call this from
2120 * the streaming thread of a slot. */
2122 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2124 DecodebinOutputStream *output;
2125 MultiQueueSlot *target_slot = NULL;
2127 const gchar *sid, *tsid;
2129 SELECTION_LOCK (dbin);
2130 output = slot->output;
2132 if (G_UNLIKELY (slot->active_stream == NULL)) {
2133 GST_DEBUG_OBJECT (slot->src_pad,
2134 "Called on inactive slot (active_stream == NULL)");
2135 SELECTION_UNLOCK (dbin);
2139 if (G_UNLIKELY (output == NULL)) {
2140 GST_DEBUG_OBJECT (slot->src_pad,
2141 "Slot doesn't have any output to be removed");
2142 SELECTION_UNLOCK (dbin);
2146 sid = gst_stream_get_stream_id (slot->active_stream);
2147 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2149 /* Recheck whether this stream is still in the list of streams to deactivate */
2150 if (stream_in_list (dbin->requested_selection, sid)) {
2151 /* Stream is in the list of requested streams, don't remove */
2152 SELECTION_UNLOCK (dbin);
2153 GST_DEBUG_OBJECT (slot->src_pad,
2154 "Stream '%s' doesn't need to be deactivated", sid);
2158 /* Unlink slot from output */
2159 /* FIXME : Handle flushing ? */
2160 /* FIXME : Handle outputs without decoders */
2161 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2162 output->decoder_sink);
2163 if (output->decoder_sink)
2164 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2165 output->linked = FALSE;
2166 slot->output = NULL;
2167 output->slot = NULL;
2168 /* Remove sid from active selection */
2169 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2170 if (!g_strcmp0 (sid, tmp->data)) {
2171 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2175 /* Can we re-assign this output to a requested stream ? */
2176 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2177 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2178 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2179 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2180 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2181 if (tslot && tslot->type == output->type && tslot->output == NULL) {
2182 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2183 target_slot = tslot;
2185 /* Pass target stream id to requested selection */
2186 dbin->requested_selection =
2187 g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2188 dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2194 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2196 target_slot->output = output;
2197 output->slot = target_slot;
2198 dbin->active_selection =
2199 g_list_append (dbin->active_selection, (gchar *) tsid);
2200 SELECTION_UNLOCK (dbin);
2202 /* Wakeup the target slot so that it retries to send events/buffers
2203 * thereby triggering the output reconfiguration codepath */
2204 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2205 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2206 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2210 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2211 free_output_stream (dbin, output);
2212 msg = is_selection_done (slot->dbin);
2213 SELECTION_UNLOCK (dbin);
2216 gst_element_post_message ((GstElement *) slot->dbin, msg);
2222 /* Idle probe called when a slot should be unassigned from its output stream.
2223 * This is needed to ensure nothing is flowing when unlinking the slot.
2225 * Also, this method will search for a pending stream which could re-use
2226 * the output stream. */
2227 static GstPadProbeReturn
2228 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2229 MultiQueueSlot * slot)
2231 GstDecodebin3 *dbin = slot->dbin;
2233 reassign_slot (dbin, slot);
2235 return GST_PAD_PROBE_REMOVE;
2239 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2242 gboolean ret = TRUE;
2244 /* List of slots to (de)activate. */
2245 GList *to_deactivate = NULL;
2246 GList *to_activate = NULL;
2247 /* List of unknown stream id, most likely means the event
2248 * should be sent upstream so that elements can expose the requested stream */
2249 GList *unknown = NULL;
2250 GList *to_reassign = NULL;
2251 GList *future_request_streams = NULL;
2252 GList *pending_streams = NULL;
2253 GList *slots_to_reassign = NULL;
2255 SELECTION_LOCK (dbin);
2256 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2257 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2258 SELECTION_UNLOCK (dbin);
2261 /* Remove pending select_streams */
2262 g_list_free (dbin->pending_select_streams);
2263 dbin->pending_select_streams = NULL;
2265 /* COMPARE the requested streams to the active and requested streams
2268 /* First check the slots to activate and which ones are unknown */
2269 for (tmp = select_streams; tmp; tmp = tmp->next) {
2270 const gchar *sid = (const gchar *) tmp->data;
2271 MultiQueueSlot *slot;
2272 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2273 slot = find_slot_for_stream_id (dbin, sid);
2274 /* Find the corresponding slot */
2276 if (stream_in_collection (dbin, (gchar *) sid)) {
2277 pending_streams = g_list_append (pending_streams, (gchar *) sid);
2279 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2280 unknown = g_list_append (unknown, (gchar *) sid);
2282 } else if (slot->output == NULL) {
2283 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2285 to_activate = g_list_append (to_activate, slot);
2287 GST_DEBUG_OBJECT (dbin,
2288 "Stream '%s' from slot %p is already active on output %p", sid, slot,
2290 future_request_streams =
2291 g_list_append (future_request_streams, (gchar *) sid);
2295 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2296 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2297 /* For slots that have an output, check if it's part of the streams to
2300 gboolean slot_to_deactivate = TRUE;
2302 if (slot->active_stream) {
2303 if (stream_in_list (select_streams,
2304 gst_stream_get_stream_id (slot->active_stream)))
2305 slot_to_deactivate = FALSE;
2307 if (slot_to_deactivate && slot->pending_stream
2308 && slot->pending_stream != slot->active_stream) {
2309 if (stream_in_list (select_streams,
2310 gst_stream_get_stream_id (slot->pending_stream)))
2311 slot_to_deactivate = FALSE;
2313 if (slot_to_deactivate) {
2314 GST_DEBUG_OBJECT (dbin,
2315 "Slot %p (%s) should be deactivated, no longer used", slot,
2317 active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2319 to_deactivate = g_list_append (to_deactivate, slot);
2324 if (to_deactivate != NULL) {
2325 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2326 /* We need to compare what needs to be activated and deactivated in order
2327 * to determine whether there are outputs that can be transferred */
2328 /* Take the stream-id of the slots that are to be activated, for which there
2329 * is a slot of the same type that needs to be deactivated */
2330 tmp = to_deactivate;
2332 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2333 gboolean removeit = FALSE;
2335 GST_DEBUG_OBJECT (dbin,
2336 "Checking if slot to deactivate (%p) has a candidate slot to activate",
2337 slot_to_deactivate);
2338 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2339 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2340 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2341 if (slot_to_activate->type == slot_to_deactivate->type) {
2342 GST_DEBUG_OBJECT (dbin, "Re-using");
2343 to_reassign = g_list_append (to_reassign, (gchar *)
2344 gst_stream_get_stream_id (slot_to_activate->active_stream));
2346 g_list_append (slots_to_reassign, slot_to_deactivate);
2347 to_activate = g_list_remove (to_activate, slot_to_activate);
2354 to_deactivate = g_list_delete_link (to_deactivate, tmp);
2359 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2360 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2361 GST_DEBUG_OBJECT (dbin,
2362 "Really need to deactivate slot %p, but no available alternative",
2365 slots_to_reassign = g_list_append (slots_to_reassign, slot);
2368 /* The only slots left to activate are the ones that won't be reassigned and
2369 * therefore really need to have a new output created */
2370 for (tmp = to_activate; tmp; tmp = tmp->next) {
2371 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2372 if (slot->active_stream)
2373 future_request_streams =
2374 g_list_append (future_request_streams,
2375 (gchar *) gst_stream_get_stream_id (slot->active_stream));
2376 else if (slot->pending_stream)
2377 future_request_streams =
2378 g_list_append (future_request_streams,
2379 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2381 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2384 if (to_activate == NULL && pending_streams != NULL) {
2385 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2386 if (dbin->requested_selection)
2387 g_list_free_full (dbin->requested_selection, g_free);
2388 dbin->requested_selection =
2389 g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2390 g_list_free (to_deactivate);
2391 g_list_free (pending_streams);
2392 to_deactivate = NULL;
2393 pending_streams = NULL;
2395 if (dbin->requested_selection)
2396 g_list_free_full (dbin->requested_selection, g_free);
2397 dbin->requested_selection =
2398 g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2399 dbin->requested_selection =
2400 g_list_concat (dbin->requested_selection,
2401 g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2402 if (dbin->to_activate)
2403 g_list_free (dbin->to_activate);
2404 dbin->to_activate = g_list_copy (to_reassign);
2407 dbin->selection_updated = TRUE;
2408 SELECTION_UNLOCK (dbin);
2411 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2412 g_list_free (unknown);
2415 /* For all streams to deactivate, add an idle probe where we will do
2416 * the unassignment and switch over */
2417 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2418 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2419 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2420 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2424 g_list_free (to_deactivate);
2426 g_list_free (to_activate);
2428 g_list_free (to_reassign);
2429 if (future_request_streams)
2430 g_list_free (future_request_streams);
2431 if (pending_streams)
2432 g_list_free (pending_streams);
2433 if (slots_to_reassign)
2434 g_list_free (slots_to_reassign);
2439 static GstPadProbeReturn
2440 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2441 DecodebinOutputStream * output)
2443 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2444 GstDecodebin3 *dbin = output->dbin;
2445 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2447 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2449 switch (GST_EVENT_TYPE (event)) {
2450 case GST_EVENT_SELECT_STREAMS:
2453 GList *streams = NULL;
2454 guint32 seqnum = gst_event_get_seqnum (event);
2456 SELECTION_LOCK (dbin);
2457 if (seqnum == dbin->select_streams_seqnum) {
2458 SELECTION_UNLOCK (dbin);
2459 GST_DEBUG_OBJECT (pad,
2460 "Already handled/handling that SELECT_STREAMS event");
2463 dbin->select_streams_seqnum = seqnum;
2464 if (dbin->pending_select_streams != NULL) {
2465 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2466 g_list_free (dbin->pending_select_streams);
2467 dbin->pending_select_streams = NULL;
2469 gst_event_parse_select_streams (event, &streams);
2470 dbin->pending_select_streams = g_list_copy (streams);
2471 SELECTION_UNLOCK (dbin);
2473 /* Send event upstream */
2474 if ((peer = gst_pad_get_peer (pad))) {
2475 gst_pad_send_event (peer, event);
2476 gst_object_unref (peer);
2478 gst_event_unref (event);
2480 /* Finally handle the switch */
2482 handle_stream_switch (dbin, streams, seqnum);
2483 g_list_free_full (streams, g_free);
2485 ret = GST_PAD_PROBE_HANDLED;
2496 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2498 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2499 if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2500 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2501 GList *streams = NULL;
2502 guint32 seqnum = gst_event_get_seqnum (event);
2504 SELECTION_LOCK (dbin);
2505 if (seqnum == dbin->select_streams_seqnum) {
2506 SELECTION_UNLOCK (dbin);
2507 GST_DEBUG_OBJECT (dbin,
2508 "Already handled/handling that SELECT_STREAMS event");
2511 dbin->select_streams_seqnum = seqnum;
2512 if (dbin->pending_select_streams != NULL) {
2513 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2514 g_list_free (dbin->pending_select_streams);
2515 dbin->pending_select_streams = NULL;
2517 gst_event_parse_select_streams (event, &streams);
2518 dbin->pending_select_streams = g_list_copy (streams);
2519 SELECTION_UNLOCK (dbin);
2521 /* FIXME : We don't have an upstream ?? */
2523 /* Send event upstream */
2524 if ((peer = gst_pad_get_peer (pad))) {
2525 gst_pad_send_event (peer, event);
2526 gst_object_unref (peer);
2529 /* Finally handle the switch */
2531 handle_stream_switch (dbin, streams, seqnum);
2532 g_list_free_full (streams, g_free);
2535 gst_event_unref (event);
2538 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2543 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2546 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2548 if (slot->input->srcpad)
2549 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2552 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2553 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2554 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2555 gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2560 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2562 GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2563 gst_element_call_async (GST_ELEMENT_CAST (dbin),
2564 (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2567 /* Create a DecodebinOutputStream for a given type
2568 * Note: It will be empty initially, it needs to be configured
2570 static DecodebinOutputStream *
2571 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2573 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2575 const gchar *prefix;
2576 GstStaticPadTemplate *templ;
2577 GstPadTemplate *ptmpl;
2579 GstPad *internal_pad;
2581 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2582 res, gst_stream_type_get_name (type));
2587 if (type & GST_STREAM_TYPE_VIDEO) {
2588 templ = &video_src_template;
2589 counter = &dbin->vpadcount;
2591 } else if (type & GST_STREAM_TYPE_AUDIO) {
2592 templ = &audio_src_template;
2593 counter = &dbin->apadcount;
2595 } else if (type & GST_STREAM_TYPE_TEXT) {
2596 templ = &text_src_template;
2597 counter = &dbin->tpadcount;
2600 templ = &src_template;
2601 counter = &dbin->opadcount;
2605 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2607 ptmpl = gst_static_pad_template_get (templ);
2608 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2609 gst_object_unref (ptmpl);
2611 gst_pad_set_active (res->src_pad, TRUE);
2612 /* Put an event probe on the internal proxy pad to detect upstream
2615 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2616 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2617 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2618 gst_object_unref (internal_pad);
2620 dbin->output_streams = g_list_append (dbin->output_streams, res);
2626 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2629 if (output->decoder_sink && output->decoder)
2630 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2632 output->slot->output = NULL;
2633 output->slot = NULL;
2635 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2636 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2637 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2638 if (output->src_exposed) {
2639 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2641 if (output->decoder) {
2642 gst_element_set_locked_state (output->decoder, TRUE);
2643 gst_element_set_state (output->decoder, GST_STATE_NULL);
2644 gst_bin_remove ((GstBin *) dbin, output->decoder);
2649 static GstStateChangeReturn
2650 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2652 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2653 GstStateChangeReturn ret;
2656 switch (transition) {
2660 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2661 if (ret == GST_STATE_CHANGE_FAILURE)
2664 switch (transition) {
2665 case GST_STATE_CHANGE_PAUSED_TO_READY:
2669 /* Free output streams */
2670 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2671 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2672 free_output_stream (dbin, output);
2674 g_list_free (dbin->output_streams);
2675 dbin->output_streams = NULL;
2676 /* Free multiqueue slots */
2677 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2678 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2679 free_multiqueue_slot (dbin, slot);
2681 g_list_free (dbin->slots);
2694 gst_decodebin3_plugin_init (GstPlugin * plugin)
2696 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2698 return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2699 GST_TYPE_DECODEBIN3);