3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
31 #include <gst/pbutils/pbutils.h>
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
38 * SECTION:element-decodebin3
41 * #GstBin that auto-magically constructs a decoding pipeline using available
42 * decoders and demuxers via auto-plugging. The output is raw audio, video
43 * or subtitle streams.
45 * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
47 * * supports publication and selection of stream information via
48 * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
50 * * dynamically switches stream connections internally, and
51 * reuses decoder elements when stream selections change, so that in
52 * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53 * and only creates new elements when streams change and an existing decoder
54 * is not capable of handling the new format.
56 * * supports multiple input pads for the parallel decoding of auxiliary streams
57 * not muxed with the primary stream.
59 * * does not handle network stream buffering. decodebin3 expects that network stream
60 * buffering is handled upstream, before data is passed to it.
62 * > decodebin3 is still experimental API and a technology preview.
63 * > Its behaviour and exposed API is subject to change.
70 * 1) From sink pad to elementary streams (GstParseBin)
72 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73 * through typefind. When the caps are detected (or changed) we recursively
74 * figure out which demuxer, parser or depayloader is needed until we get to
77 * All elementary streams (whether decoded or not, whether exposed or not) are
78 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
80 * => MultiQueue is the cornerstone.
81 * => No buffering before multiqueue
83 * 2) Elementary streams
85 * After GstParseBin, there are 3 main components:
86 * 1) Input Streams (provided by GstParseBin)
90 * Input Streams correspond to the stream coming from GstParseBin and that gets
91 * fed into a multiqueue slot.
93 * Output Streams correspond to the combination of a (optional) decoder and an
94 * output ghostpad. Output Streams can be moved from one multiqueue slot to
95 * another, can reconfigure itself (different decoders), and can be
96 * added/removed depending on the configuration (all streams outputted, only one
99 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100 * each 'active' Input Stream there is a corresponding slot.
101 * Slots might have different streams on input and output (due to internal
104 * Due to internal queuing/buffering/..., all those components (might) behave
105 * asynchronously. Therefore probes will be used on each component source pad to
106 * detect various key-points:
108 * the stream is done => Mark that component as done, optionally freeing/removing it
110 * a new stream is starting => link it further if needed
112 * 3) Gradual replacement
114 * If the caps change at any point in decodebin (input sink pad, demuxer output,
115 * multiqueue output, ..), we gradually replace (if needed) the following elements.
117 * This is handled by the probes in various locations:
119 * b) multiqueue input (source pad of Input Streams)
120 * c) multiqueue output (source pad of Multiqueue Slots)
121 * d) final output (target of source ghostpads)
123 * When CAPS event arrive at those points, one of three things can happen:
124 * a) There is no elements downstream yet, just create/link-to following elements
125 * b) There are downstream elements, do a ACCEPT_CAPS query
126 * b.1) The new CAPS are accepted, keep current configuration
127 * b.2) The new CAPS are not accepted, remove following elements then do a)
132 * Input(s) Slots Streams
133 * /-------------------------------------------\ /-----\ /------------- \
135 * +-------------------------------------------------------------------------+
137 * | +---------------------------------------------+ |
138 * | | GstParseBin(s) | |
139 * | | +--------------+ | +-----+ |
140 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
141 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
142 * | | | (if needed) |---[parser]-[|--| qu | |
143 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
144 * | | +--------------+ | +------ ^ |
145 * | +---------------------------------------------+ ^ | |
147 * +-----------------------------------------------+--------+-------------+--+
150 * Probes --/--------/-------------/
154 * We want to ensure we re-use decoders when switching streams. This takes place
155 * at the multiqueue output level.
158 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
159 * the streaming thread in the multiqueue_src_probe() and only if the
160 stream is in the REQUESTED selection.
161 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162 * within the stream thread, but only in a purposefully called IDLE probe
163 * that calls reassign_slot().
165 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166 * 1) requested_selection
167 * All streams within that list should be activated
168 * 2) active_selection
169 * List of streams that are exposed by decodebin
171 * List of streams that will be moved to requested_selection in the
172 * reassign_slot() method (i.e. once a stream was deactivated, and the output
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
180 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
182 #define EXTRA_DEBUG 1
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
187 _custom_final_eos_quark_get (void)
189 static gsize g_quark;
191 if (g_once_init_enter (&g_quark)) {
193 (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194 g_once_init_leave (&g_quark, quark);
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
206 struct _GstDecodebin3
210 /* input_lock protects the following variables */
212 /* Main input (static sink pad) */
213 DecodebinInput *main_input;
214 /* Supplementary input (request sink pads) */
216 /* counter for input */
217 guint32 input_counter;
218 /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219 /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
220 guint32 current_group_id;
221 /* End of variables protected by input_lock */
223 GstElement *multiqueue;
224 GstClockTime default_mq_min_interleave;
225 GstClockTime current_mq_min_interleave;
227 /* selection_lock protects access to following variables */
228 GMutex selection_lock;
229 GList *input_streams; /* List of DecodebinInputStream for active collection */
230 GList *output_streams; /* List of DecodebinOutputStream used for output */
231 GList *slots; /* List of MultiQueueSlot */
234 /* Active collection */
235 GstStreamCollection *collection;
236 /* requested selection of stream-id to activate post-multiqueue */
237 GList *requested_selection;
238 /* list of stream-id currently activated in output */
239 GList *active_selection;
240 /* List of stream-id that need to be activated (after a stream switch for ex) */
242 /* Pending select streams event */
243 guint32 select_streams_seqnum;
244 /* pending list of streams to select (from downstream) */
245 GList *pending_select_streams;
246 /* TRUE if requested_selection was updated, will become FALSE once
247 * it has fully transitioned to active */
248 gboolean selection_updated;
249 /* End of variables protected by selection_lock */
250 gboolean upstream_selected;
252 /* List of pending collections.
253 * FIXME : Is this really needed ? */
254 GList *pending_collection;
257 GMutex factories_lock;
258 guint32 factories_cookie;
259 /* All DECODABLE factories */
261 /* Only DECODER factories */
262 GList *decoder_factories;
263 /* DECODABLE but not DECODER factories */
264 GList *decodable_factories;
266 /* counters for pads */
267 guint32 apadcount, vpadcount, tpadcount, opadcount;
273 struct _GstDecodebin3Class
277 gint (*select_stream) (GstDecodebin3 * dbin,
278 GstStreamCollection * collection, GstStream * stream);
281 /* Input of decodebin, controls input pad and parsebin */
282 struct _DecodebinInput
289 GstPad *parsebin_sink;
291 GstStreamCollection *collection; /* Active collection */
292 gboolean upstream_selected;
296 GstElement *parsebin;
298 gulong pad_added_sigid;
299 gulong pad_removed_sigid;
300 gulong drained_sigid;
302 /* TRUE if the input got drained
303 * FIXME : When do we reset it if re-used ?
307 /* HACK : Remove these fields */
308 /* List of PendingPad structures */
312 /* Multiqueue Slots */
313 typedef struct _MultiQueueSlot
318 /* Type of stream handled by this slot */
321 /* Linked input and output */
322 DecodebinInputStream *input;
324 /* pending => last stream received on sink pad */
325 GstStream *pending_stream;
326 /* active => last stream outputted on source pad */
327 GstStream *active_stream;
329 GstPad *sink_pad, *src_pad;
331 /* id of the MQ src_pad event probe */
336 DecodebinOutputStream *output;
339 /* Streams that are exposed downstream (i.e. output) */
340 struct _DecodebinOutputStream
343 /* The type of stream handled by this output stream */
346 /* The slot to which this output stream is currently connected to */
347 MultiQueueSlot *slot;
349 GstElement *decoder; /* Optional */
350 GstPad *decoder_sink, *decoder_src;
355 /* Flag if ghost pad is exposed */
356 gboolean src_exposed;
358 /* Reported decoder latency */
359 GstClockTime decoder_latency;
361 /* keyframe dropping probe */
362 gulong drop_probe_id;
365 /* Pending pads from parsebin */
366 typedef struct _PendingPad
369 DecodebinInput *input;
387 SIGNAL_SELECT_STREAM,
388 SIGNAL_ABOUT_TO_FINISH,
391 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
393 #define SELECTION_LOCK(dbin) G_STMT_START { \
394 GST_LOG_OBJECT (dbin, \
395 "selection locking from thread %p", \
397 g_mutex_lock (&dbin->selection_lock); \
398 GST_LOG_OBJECT (dbin, \
399 "selection locked from thread %p", \
403 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
404 GST_LOG_OBJECT (dbin, \
405 "selection unlocking from thread %p", \
407 g_mutex_unlock (&dbin->selection_lock); \
410 #define INPUT_LOCK(dbin) G_STMT_START { \
411 GST_LOG_OBJECT (dbin, \
412 "input locking from thread %p", \
414 g_mutex_lock (&dbin->input_lock); \
415 GST_LOG_OBJECT (dbin, \
416 "input locked from thread %p", \
420 #define INPUT_UNLOCK(dbin) G_STMT_START { \
421 GST_LOG_OBJECT (dbin, \
422 "input unlocking from thread %p", \
424 g_mutex_unlock (&dbin->input_lock); \
427 GType gst_decodebin3_get_type (void);
428 #define gst_decodebin3_parent_class parent_class
429 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
431 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
432 playback_element_init (plugin);
433 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
434 GST_TYPE_DECODEBIN3, _do_init);
436 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
438 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
441 GST_STATIC_CAPS_ANY);
443 static GstStaticPadTemplate request_sink_template =
444 GST_STATIC_PAD_TEMPLATE ("sink_%u",
447 GST_STATIC_CAPS_ANY);
449 static GstStaticPadTemplate video_src_template =
450 GST_STATIC_PAD_TEMPLATE ("video_%u",
453 GST_STATIC_CAPS_ANY);
455 static GstStaticPadTemplate audio_src_template =
456 GST_STATIC_PAD_TEMPLATE ("audio_%u",
459 GST_STATIC_CAPS_ANY);
461 static GstStaticPadTemplate text_src_template =
462 GST_STATIC_PAD_TEMPLATE ("text_%u",
465 GST_STATIC_CAPS_ANY);
467 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
470 GST_STATIC_CAPS_ANY);
473 static void gst_decodebin3_dispose (GObject * object);
474 static void gst_decodebin3_finalize (GObject * object);
475 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
476 const GValue * value, GParamSpec * pspec);
477 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
478 GValue * value, GParamSpec * pspec);
480 static gboolean parsebin_autoplug_continue_cb (GstElement *
481 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
484 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
485 GstStreamCollection * collection, GstStream * stream)
487 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
492 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
493 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
494 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
495 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
496 GstStateChange transition);
497 static gboolean gst_decodebin3_send_event (GstElement * element,
500 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
502 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
503 GstElementFactoryListType ftype);
506 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
507 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
508 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
509 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
511 static void reconfigure_output_stream (DecodebinOutputStream * output,
512 MultiQueueSlot * slot);
513 static void free_output_stream (GstDecodebin3 * dbin,
514 DecodebinOutputStream * output);
515 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
518 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
519 GstPadProbeInfo * info, MultiQueueSlot * slot);
520 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
521 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
522 DecodebinInputStream * input);
523 static void link_input_to_slot (DecodebinInputStream * input,
524 MultiQueueSlot * slot);
525 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
526 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
527 MultiQueueSlot * slot);
529 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
530 static void update_requested_selection (GstDecodebin3 * dbin);
532 /* FIXME: Really make all the parser stuff a self-contained helper object */
533 #include "gstdecodebin3-parse.c"
536 _gst_int_accumulator (GSignalInvocationHint * ihint,
537 GValue * return_accu, const GValue * handler_return, gpointer dummy)
539 gint res = g_value_get_int (handler_return);
541 g_value_set_int (return_accu, res);
550 gst_decodebin3_class_init (GstDecodebin3Class * klass)
552 GObjectClass *gobject_klass = (GObjectClass *) klass;
553 GstElementClass *element_class = (GstElementClass *) klass;
554 GstBinClass *bin_klass = (GstBinClass *) klass;
556 gobject_klass->dispose = gst_decodebin3_dispose;
557 gobject_klass->finalize = gst_decodebin3_finalize;
558 gobject_klass->set_property = gst_decodebin3_set_property;
559 gobject_klass->get_property = gst_decodebin3_get_property;
561 /* FIXME : ADD PROPERTIES ! */
562 g_object_class_install_property (gobject_klass, PROP_CAPS,
563 g_param_spec_boxed ("caps", "Caps",
564 "The caps on which to stop decoding. (NULL = default)",
565 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
567 /* FIXME : ADD SIGNALS ! */
569 * GstDecodebin3::select-stream
570 * @decodebin: a #GstDecodebin3
571 * @collection: a #GstStreamCollection
572 * @stream: a #GstStream
574 * This signal is emitted whenever @decodebin needs to decide whether
575 * to expose a @stream of a given @collection.
577 * Note that the prefered way to select streams is to listen to
578 * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
579 * GST_EVENT_SELECT_STREAMS with the streams the user wants.
581 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
582 * A value of -1 (default) lets @decodebin decide what to do with the stream.
584 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
585 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
586 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
587 _gst_int_accumulator, NULL, NULL,
588 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
591 * GstDecodebin3::about-to-finish:
593 * This signal is emitted when the data for the selected URI is
594 * entirely buffered and it is safe to specify another URI.
596 gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
597 g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
598 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
601 element_class->request_new_pad =
602 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
603 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
604 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
606 gst_element_class_add_pad_template (element_class,
607 gst_static_pad_template_get (&sink_template));
608 gst_element_class_add_pad_template (element_class,
609 gst_static_pad_template_get (&request_sink_template));
610 gst_element_class_add_pad_template (element_class,
611 gst_static_pad_template_get (&video_src_template));
612 gst_element_class_add_pad_template (element_class,
613 gst_static_pad_template_get (&audio_src_template));
614 gst_element_class_add_pad_template (element_class,
615 gst_static_pad_template_get (&text_src_template));
616 gst_element_class_add_pad_template (element_class,
617 gst_static_pad_template_get (&src_template));
619 gst_element_class_set_static_metadata (element_class,
620 "Decoder Bin 3", "Generic/Bin/Decoder",
621 "Autoplug and decode to raw media",
622 "Edward Hervey <edward@centricular.com>");
624 bin_klass->handle_message = gst_decodebin3_handle_message;
626 klass->select_stream = gst_decodebin3_select_stream;
630 gst_decodebin3_init (GstDecodebin3 * dbin)
632 /* Create main input */
633 dbin->main_input = create_new_input (dbin, TRUE);
635 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
636 g_object_get (dbin->multiqueue, "min-interleave-time",
637 &dbin->default_mq_min_interleave, NULL);
638 dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
639 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
640 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
641 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
643 dbin->current_group_id = GST_GROUP_ID_INVALID;
645 g_mutex_init (&dbin->factories_lock);
646 g_mutex_init (&dbin->selection_lock);
647 g_mutex_init (&dbin->input_lock);
649 dbin->caps = gst_static_caps_get (&default_raw_caps);
651 GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
655 gst_decodebin3_dispose (GObject * object)
657 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
660 if (dbin->factories) {
661 gst_plugin_feature_list_free (dbin->factories);
662 dbin->factories = NULL;
664 if (dbin->decoder_factories) {
665 g_list_free (dbin->decoder_factories);
666 dbin->decoder_factories = NULL;
668 if (dbin->decodable_factories) {
669 g_list_free (dbin->decodable_factories);
670 dbin->decodable_factories = NULL;
672 g_list_free_full (dbin->requested_selection, g_free);
673 g_list_free_full (dbin->active_selection, g_free);
674 g_list_free (dbin->to_activate);
675 g_list_free (dbin->pending_select_streams);
677 dbin->requested_selection = NULL;
678 dbin->active_selection = NULL;
679 dbin->to_activate = NULL;
680 dbin->pending_select_streams = NULL;
682 g_clear_object (&dbin->collection);
684 if (dbin->main_input) {
685 free_input (dbin, dbin->main_input);
686 dbin->main_input = NULL;
689 for (walk = dbin->other_inputs; walk; walk = next) {
690 DecodebinInput *input = walk->data;
692 next = g_list_next (walk);
694 free_input (dbin, input);
695 dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
698 G_OBJECT_CLASS (parent_class)->dispose (object);
702 gst_decodebin3_finalize (GObject * object)
704 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
706 g_mutex_clear (&dbin->factories_lock);
707 g_mutex_clear (&dbin->selection_lock);
708 g_mutex_clear (&dbin->input_lock);
710 G_OBJECT_CLASS (parent_class)->finalize (object);
714 gst_decodebin3_set_property (GObject * object, guint prop_id,
715 const GValue * value, GParamSpec * pspec)
717 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
719 /* FIXME : IMPLEMENT */
722 GST_OBJECT_LOCK (dbin);
724 gst_caps_unref (dbin->caps);
725 dbin->caps = g_value_dup_boxed (value);
726 GST_OBJECT_UNLOCK (dbin);
729 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
735 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
738 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
740 /* FIXME : IMPLEMENT */
743 GST_OBJECT_LOCK (dbin);
744 g_value_set_boxed (value, dbin->caps);
745 GST_OBJECT_UNLOCK (dbin);
748 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
754 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
755 GstCaps * caps, GstDecodebin3 * dbin)
757 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
759 /* If it matches our target caps, expose it */
760 if (gst_caps_can_intersect (caps, dbin->caps))
766 /* This method should be called whenever a STREAM_START event
767 * comes out of a given parsebin.
768 * The caller shall replace the group_id if the function returns TRUE */
770 set_input_group_id (DecodebinInput * input, guint32 * group_id)
772 GstDecodebin3 *dbin = input->dbin;
774 if (input->group_id != *group_id) {
775 if (input->group_id != GST_GROUP_ID_INVALID)
776 GST_WARNING_OBJECT (dbin,
777 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
778 ") on input %p ", input->group_id, *group_id, input);
779 input->group_id = *group_id;
782 if (*group_id != dbin->current_group_id) {
783 if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
784 GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
786 dbin->current_group_id = *group_id;
788 *group_id = dbin->current_group_id;
796 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
798 GstDecodebin3 *dbin = input->dbin;
799 gboolean all_drained;
802 GST_INFO_OBJECT (dbin, "input %p drained", input);
803 input->drained = TRUE;
805 all_drained = dbin->main_input->drained;
806 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
807 DecodebinInput *data = (DecodebinInput *) tmp->data;
809 all_drained &= data->drained;
813 GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
814 g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
819 /* Call with INPUT_LOCK taken */
821 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
823 gboolean set_state = FALSE;
825 if (input->parsebin == NULL) {
826 input->parsebin = gst_element_factory_make ("parsebin", NULL);
827 if (input->parsebin == NULL)
829 input->parsebin = gst_object_ref (input->parsebin);
830 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
831 input->pad_added_sigid =
832 g_signal_connect (input->parsebin, "pad-added",
833 (GCallback) parsebin_pad_added_cb, input);
834 input->pad_removed_sigid =
835 g_signal_connect (input->parsebin, "pad-removed",
836 (GCallback) parsebin_pad_removed_cb, input);
837 input->drained_sigid =
838 g_signal_connect (input->parsebin, "drained",
839 (GCallback) parsebin_drained_cb, input);
840 g_signal_connect (input->parsebin, "autoplug-continue",
841 (GCallback) parsebin_autoplug_continue_cb, dbin);
844 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
845 /* The state lock is taken so that we ensure we are the one (de)activating
846 * parsebin. We need to do this to ensure any activation taking place in
847 * parsebin (including by elements doing upstream activation) are done
848 * within the same thread. */
849 GST_STATE_LOCK (input->parsebin);
850 gst_bin_add (GST_BIN (dbin), input->parsebin);
854 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
855 input->parsebin_sink);
858 gst_element_sync_state_with_parent (input->parsebin);
859 GST_STATE_UNLOCK (input->parsebin);
867 gst_element_post_message ((GstElement *) dbin,
868 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
873 static GstPadLinkReturn
874 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
876 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
877 GstPadLinkReturn res = GST_PAD_LINK_OK;
878 DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
880 g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
882 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
883 ". Creating parsebin if needed", pad);
886 if (!ensure_input_parsebin (dbin, input))
887 res = GST_PAD_LINK_REFUSED;
893 /* Drop duration query during _input_pad_unlink */
894 static GstPadProbeReturn
895 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
896 DecodebinInput * input)
898 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
900 if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
901 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
902 if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
903 GST_LOG_OBJECT (pad, "stop forwarding query duration");
904 ret = GST_PAD_PROBE_HANDLED;
912 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
914 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
915 DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
917 g_return_if_fail (input);
919 GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
920 ". Removing parsebin.", pad);
923 if (input->parsebin == NULL) {
928 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
929 GstStreamCollection *collection = NULL;
930 gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
931 GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
932 (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
934 /* Clear stream-collection corresponding to current INPUT and post new
935 * stream-collection message, if needed */
936 if (input->collection) {
937 gst_object_unref (input->collection);
938 input->collection = NULL;
941 SELECTION_LOCK (dbin);
942 collection = get_merged_collection (dbin);
943 if (collection && collection != dbin->collection) {
945 GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
947 if (dbin->collection)
948 gst_object_unref (dbin->collection);
949 dbin->collection = collection;
950 dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
953 gst_message_new_stream_collection ((GstObject *) dbin,
956 SELECTION_UNLOCK (dbin);
957 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
958 update_requested_selection (dbin);
961 gst_object_unref (collection);
962 SELECTION_UNLOCK (dbin);
965 gst_bin_remove (GST_BIN (dbin), input->parsebin);
966 gst_element_set_state (input->parsebin, GST_STATE_NULL);
967 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
968 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
969 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
970 gst_pad_remove_probe (input->parsebin_sink, probe_id);
971 gst_object_unref (input->parsebin);
972 gst_object_unref (input->parsebin_sink);
974 input->parsebin = NULL;
975 input->parsebin_sink = NULL;
977 if (!input->is_main) {
978 dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
979 free_input_async (dbin, input);
987 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
989 GST_DEBUG ("Freeing input %p", input);
990 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
991 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
992 if (input->parsebin) {
993 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
994 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
995 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
996 gst_element_set_state (input->parsebin, GST_STATE_NULL);
997 gst_object_unref (input->parsebin);
998 gst_object_unref (input->parsebin_sink);
1000 if (input->collection)
1001 gst_object_unref (input->collection);
1006 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
1008 GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
1009 gst_element_call_async (GST_ELEMENT_CAST (dbin),
1010 (GstElementCallAsyncFunc) free_input, input, NULL);
1014 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1016 DecodebinInput *input =
1017 g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1019 g_return_val_if_fail (input, FALSE);
1021 GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1023 /* We accept any caps, since we will reconfigure ourself internally if the new
1024 * stream is incompatible */
1025 if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1026 GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1027 gst_query_set_accept_caps_result (query, TRUE);
1030 return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1034 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1036 DecodebinInput *input =
1037 g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1039 g_return_val_if_fail (input, FALSE);
1041 GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1043 switch (GST_EVENT_TYPE (event)) {
1044 case GST_EVENT_STREAM_START:
1046 GstQuery *q = gst_query_new_selectable ();
1048 /* Query whether upstream can handle stream selection or not */
1049 if (gst_pad_peer_query (sinkpad, q)) {
1050 gst_query_parse_selectable (q, &input->upstream_selected);
1051 GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1052 input->upstream_selected);
1054 input->upstream_selected = FALSE;
1055 GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1057 gst_query_unref (q);
1059 /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1060 inputs is. This means things might break if there's a mix */
1061 if (input->upstream_selected)
1062 dbin->upstream_selected = TRUE;
1065 case GST_EVENT_CAPS:
1067 GST_DEBUG_OBJECT (sinkpad,
1068 "New caps, checking if they are compatible with existing parsebin");
1069 if (input->parsebin_sink) {
1070 GstCaps *newcaps = NULL;
1072 gst_event_parse_caps (event, &newcaps);
1073 GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1076 && !gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1077 GST_DEBUG_OBJECT (sinkpad,
1078 "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1079 GST_STATE_LOCK (dbin);
1080 /* Reset parsebin so that it reconfigures itself for the new stream format */
1081 gst_element_set_state (input->parsebin, GST_STATE_NULL);
1082 gst_element_sync_state_with_parent (input->parsebin);
1083 GST_STATE_UNLOCK (dbin);
1085 GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1094 /* Chain to parent function */
1095 return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1098 /* Call with INPUT_LOCK taken */
1099 static DecodebinInput *
1100 create_new_input (GstDecodebin3 * dbin, gboolean main)
1102 DecodebinInput *input;
1104 input = g_new0 (DecodebinInput, 1);
1106 input->is_main = main;
1107 input->group_id = GST_GROUP_ID_INVALID;
1109 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1111 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1112 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1115 input->upstream_selected = FALSE;
1116 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1117 gst_pad_set_event_function (input->ghost_sink,
1118 (GstPadEventFunction) sink_event_function);
1119 gst_pad_set_query_function (input->ghost_sink,
1120 (GstPadQueryFunction) sink_query_function);
1121 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1122 gst_pad_set_unlink_function (input->ghost_sink,
1123 gst_decodebin3_input_pad_unlink);
1125 gst_pad_set_active (input->ghost_sink, TRUE);
1126 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1133 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1134 const gchar * name, const GstCaps * caps)
1136 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1137 DecodebinInput *input;
1140 /* We are ignoring names for the time being, not sure it makes any sense
1141 * within the context of decodebin3 ... */
1142 input = create_new_input (dbin, FALSE);
1145 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1146 res = input->ghost_sink;
1147 INPUT_UNLOCK (dbin);
1153 /* Must be called with factories lock! */
1155 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1159 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1160 if (!dbin->factories || dbin->factories_cookie != cookie) {
1162 if (dbin->factories)
1163 gst_plugin_feature_list_free (dbin->factories);
1164 if (dbin->decoder_factories)
1165 g_list_free (dbin->decoder_factories);
1166 if (dbin->decodable_factories)
1167 g_list_free (dbin->decodable_factories);
1169 gst_element_factory_list_get_elements
1170 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1172 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1173 dbin->factories_cookie = cookie;
1175 /* Filter decoder and other decodables */
1176 dbin->decoder_factories = NULL;
1177 dbin->decodable_factories = NULL;
1178 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1179 GstElementFactory *fact = (GstElementFactory *) tmp->data;
1180 if (gst_element_factory_list_is_type (fact,
1181 GST_ELEMENT_FACTORY_TYPE_DECODER))
1182 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1184 dbin->decodable_factories =
1185 g_list_append (dbin->decodable_factories, fact);
1190 /* Must be called with appropriate lock if list is a protected variable */
1191 static const gchar *
1192 stream_in_list (GList * list, const gchar * sid)
1197 for (tmp = list; tmp; tmp = tmp->next) {
1198 gchar *osid = (gchar *) tmp->data;
1199 GST_DEBUG ("Checking %s against %s", sid, osid);
1203 for (tmp = list; tmp; tmp = tmp->next) {
1204 const gchar *osid = (gchar *) tmp->data;
1205 if (!g_strcmp0 (sid, osid))
1213 stream_list_equal (GList * lista, GList * listb)
1217 if (g_list_length (lista) != g_list_length (listb))
1220 for (tmp = lista; tmp; tmp = tmp->next) {
1221 gchar *osid = tmp->data;
1222 if (!stream_in_list (listb, osid))
1230 update_requested_selection (GstDecodebin3 * dbin)
1234 gboolean all_user_selected = TRUE;
1235 GstStreamType used_types = 0;
1236 GstStreamCollection *collection;
1238 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1239 * the switch handler will take care of the pending selection */
1240 SELECTION_LOCK (dbin);
1241 if (dbin->pending_select_streams) {
1242 GST_DEBUG_OBJECT (dbin,
1243 "No need to create pending selection, SELECT_STREAMS underway");
1247 collection = dbin->collection;
1248 if (G_UNLIKELY (collection == NULL)) {
1249 GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1252 nb = gst_stream_collection_get_size (collection);
1254 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1255 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1257 /* 3. If not, check if we already have some of the streams in the
1258 * existing active/requested selection */
1259 for (i = 0; i < nb; i++) {
1260 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1261 const gchar *sid = gst_stream_get_stream_id (stream);
1263 /* Fire select-stream signal to see if outside components want to
1264 * hint at which streams should be selected */
1265 g_signal_emit (G_OBJECT (dbin),
1266 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1268 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1271 all_user_selected = FALSE;
1272 if (request == 1 || (request == -1
1273 && (stream_in_list (dbin->requested_selection, sid)
1274 || stream_in_list (dbin->active_selection, sid)))) {
1275 GstStreamType curtype = gst_stream_get_stream_type (stream);
1277 GST_DEBUG_OBJECT (dbin,
1278 "Using stream requested by 'select-stream' signal : %s", sid);
1280 GST_DEBUG_OBJECT (dbin,
1281 "Re-using stream already present in requested or active selection : %s",
1283 tmp = g_list_append (tmp, (gchar *) sid);
1284 used_types |= curtype;
1288 /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1289 if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1290 for (i = 0; i < nb; i++) {
1291 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1292 GstStreamType curtype = gst_stream_get_stream_type (stream);
1293 if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1294 const gchar *sid = gst_stream_get_stream_id (stream);
1295 GST_DEBUG_OBJECT (dbin,
1296 "Automatically selecting stream '%s' of type %s", sid,
1297 gst_stream_type_get_name (curtype));
1298 tmp = g_list_append (tmp, (gchar *) sid);
1299 used_types |= curtype;
1305 if (stream_list_equal (tmp, dbin->requested_selection)) {
1306 /* If the selection is equal, there is nothign to do */
1307 GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1313 /* Finally set the requested selection */
1314 if (dbin->requested_selection) {
1315 GST_FIXME_OBJECT (dbin,
1316 "Replacing non-NULL requested_selection, what should we do ??");
1317 g_list_free_full (dbin->requested_selection, g_free);
1319 dbin->requested_selection =
1320 g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1321 dbin->selection_updated = TRUE;
1324 SELECTION_UNLOCK (dbin);
1328 * GCompareFunc to use with lists of GstStream.
1329 * Sorts GstStreams by stream type and SELECT flag and stream-id
1330 * First video, then audio, then others.
1332 * Return: negative if a<b, 0 if a==b, positive if a>b
1335 sort_streams (GstStream * sa, GstStream * sb)
1337 GstStreamType typea, typeb;
1338 GstStreamFlags flaga, flagb;
1339 const gchar *ida, *idb;
1342 typea = gst_stream_get_stream_type (sa);
1343 typeb = gst_stream_get_stream_type (sb);
1345 GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1346 gst_stream_get_stream_id (sb));
1348 /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1349 if (typea != typeb) {
1350 if (typea & GST_STREAM_TYPE_VIDEO)
1352 else if (typea & GST_STREAM_TYPE_AUDIO)
1353 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1354 else if (typea & GST_STREAM_TYPE_TEXT)
1355 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1356 && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1357 else if (typea & GST_STREAM_TYPE_CONTAINER)
1358 ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1363 GST_LOG ("Sort by stream-type: %d", ret);
1368 /* Sort by SELECT flag, if stream type is same. */
1369 flaga = gst_stream_get_stream_flags (sa);
1370 flagb = gst_stream_get_stream_flags (sb);
1373 (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1374 -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1377 GST_LOG ("Sort by SELECT flag: %d", ret);
1381 /* Sort by stream-id, if otherwise the same. */
1382 ida = gst_stream_get_stream_id (sa);
1383 idb = gst_stream_get_stream_id (sb);
1384 ret = g_strcmp0 (ida, idb);
1386 GST_LOG ("Sort by stream-id: %d", ret);
1391 /* Call with INPUT_LOCK taken */
1392 static GstStreamCollection *
1393 get_merged_collection (GstDecodebin3 * dbin)
1395 gboolean needs_merge = FALSE;
1396 GstStreamCollection *res = NULL;
1398 GList *unsorted_streams = NULL;
1401 /* First check if we need to do a merge or just return the only collection */
1402 res = dbin->main_input->collection;
1404 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1405 DecodebinInput *input = (DecodebinInput *) tmp->data;
1406 if (input->collection) {
1411 res = input->collection;
1416 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1417 return res ? gst_object_ref (res) : NULL;
1420 /* We really need to create a new collection */
1421 /* FIXME : Some numbering scheme maybe ?? */
1422 res = gst_stream_collection_new ("decodebin3");
1423 if (dbin->main_input->collection) {
1424 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1425 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1426 for (i = 0; i < nb_stream; i++) {
1428 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1429 unsorted_streams = g_list_append (unsorted_streams, stream);
1433 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1434 DecodebinInput *input = (DecodebinInput *) tmp->data;
1435 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1437 if (input->collection) {
1438 nb_stream = gst_stream_collection_get_size (input->collection);
1439 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1440 for (i = 0; i < nb_stream; i++) {
1442 gst_stream_collection_get_stream (input->collection, i);
1443 /* Only add if not already present in the list */
1444 if (!g_list_find (unsorted_streams, stream))
1445 unsorted_streams = g_list_append (unsorted_streams, stream);
1450 /* re-order streams : video, then audio, then others */
1452 g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1453 for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1454 GstStream *stream = (GstStream *) tmp->data;
1455 GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1456 gst_stream_get_stream_id (stream));
1457 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1460 if (unsorted_streams)
1461 g_list_free (unsorted_streams);
1466 /* Call with INPUT_LOCK taken */
1467 static DecodebinInput *
1468 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1470 DecodebinInput *input = NULL;
1471 GstElement *parent = gst_object_ref (child);
1475 GstElement *next_parent;
1477 GST_DEBUG_OBJECT (dbin, "parent %s",
1478 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1480 if (parent == dbin->main_input->parsebin) {
1481 input = dbin->main_input;
1484 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1485 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1486 if (parent == cur->parsebin) {
1491 next_parent = (GstElement *) gst_element_get_parent (parent);
1492 gst_object_unref (parent);
1493 parent = next_parent;
1495 } while (parent && parent != (GstElement *) dbin);
1498 gst_object_unref (parent);
1503 static const gchar *
1504 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1508 if (dbin->collection == NULL)
1510 len = gst_stream_collection_get_size (dbin->collection);
1511 for (i = 0; i < len; i++) {
1512 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1513 const gchar *osid = gst_stream_get_stream_id (stream);
1514 if (!g_strcmp0 (sid, osid))
1521 /* Call with INPUT_LOCK taken */
1523 handle_stream_collection (GstDecodebin3 * dbin,
1524 GstStreamCollection * collection, DecodebinInput * input)
1526 #ifndef GST_DISABLE_GST_DEBUG
1527 const gchar *upstream_id;
1531 GST_DEBUG_OBJECT (dbin,
1532 "Couldn't find corresponding input, most likely shutting down");
1536 /* Replace collection in input */
1537 if (input->collection)
1538 gst_object_unref (input->collection);
1539 input->collection = gst_object_ref (collection);
1540 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1543 /* Merge collection if needed */
1544 collection = get_merged_collection (dbin);
1546 #ifndef GST_DISABLE_GST_DEBUG
1547 /* Just some debugging */
1548 upstream_id = gst_stream_collection_get_upstream_id (collection);
1549 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1550 GST_DEBUG ("From input %p", input);
1551 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1552 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1553 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1554 GstTagList *taglist;
1557 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1558 GST_DEBUG (" type : %s",
1559 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1560 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1561 taglist = gst_stream_get_tags (stream);
1562 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1563 caps = gst_stream_get_caps (stream);
1564 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1566 gst_tag_list_unref (taglist);
1568 gst_caps_unref (caps);
1572 /* Store collection for later usage */
1573 SELECTION_LOCK (dbin);
1574 if (dbin->collection == NULL) {
1575 dbin->collection = collection;
1577 /* We need to check who emitted this collection (the owner).
1578 * If we already had a collection from that user, this one is an update,
1579 * that is to say that we need to figure out how we are going to re-use
1580 * the streams/slot */
1581 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1582 /* FIXME : When do we switch from pending collection to active collection ?
1583 * When all streams from active collection are drained in multiqueue output ? */
1584 gst_object_unref (dbin->collection);
1585 dbin->collection = collection;
1586 /* dbin->pending_collection = */
1587 /* g_list_append (dbin->pending_collection, collection); */
1589 dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1590 SELECTION_UNLOCK (dbin);
1593 /* Must be called with the selection lock taken */
1595 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1597 GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1600 GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1601 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1602 DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1603 if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1604 if (max_latency == GST_CLOCK_TIME_NONE
1605 || out->decoder_latency > max_latency)
1606 max_latency = out->decoder_latency;
1609 GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1610 GST_TIME_ARGS (max_latency));
1612 if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1615 /* Make sure we keep an extra overhead */
1616 max_latency += 100 * GST_MSECOND;
1617 if (max_latency == dbin->current_mq_min_interleave)
1620 dbin->current_mq_min_interleave = max_latency;
1621 GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1622 GST_TIME_ARGS (dbin->current_mq_min_interleave));
1623 g_object_set (dbin->multiqueue, "min-interleave-time",
1624 dbin->current_mq_min_interleave, NULL);
1628 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1630 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1631 gboolean posting_collection = FALSE;
1633 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1635 switch (GST_MESSAGE_TYPE (message)) {
1636 case GST_MESSAGE_STREAM_COLLECTION:
1638 GstStreamCollection *collection = NULL;
1639 DecodebinInput *input;
1643 find_message_parsebin (dbin,
1644 (GstElement *) GST_MESSAGE_SRC (message));
1645 if (input == NULL) {
1646 GST_DEBUG_OBJECT (dbin,
1647 "Couldn't find corresponding input, most likely shutting down");
1648 INPUT_UNLOCK (dbin);
1651 if (input->upstream_selected) {
1652 GST_DEBUG_OBJECT (dbin,
1653 "Upstream handles selection, not using/forwarding collection");
1654 INPUT_UNLOCK (dbin);
1657 gst_message_parse_stream_collection (message, &collection);
1659 handle_stream_collection (dbin, collection, input);
1660 posting_collection = TRUE;
1662 INPUT_UNLOCK (dbin);
1664 SELECTION_LOCK (dbin);
1665 if (dbin->collection) {
1666 /* Replace collection message, we most likely aggregated it */
1667 GstMessage *new_msg;
1669 gst_message_new_stream_collection ((GstObject *) dbin,
1671 gst_message_unref (message);
1674 SELECTION_UNLOCK (dbin);
1677 gst_object_unref (collection);
1680 case GST_MESSAGE_LATENCY:
1683 /* Check if this is from one of our decoders */
1684 SELECTION_LOCK (dbin);
1685 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1686 DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1687 if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1688 GstClockTime min, max;
1689 if (GST_IS_VIDEO_DECODER (out->decoder)) {
1690 gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1692 GST_DEBUG_OBJECT (dbin,
1693 "Got latency update from one of our decoders. min: %"
1694 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1695 GST_TIME_ARGS (max));
1696 out->decoder_latency = min;
1697 /* Trigger recalculation */
1698 gst_decodebin3_update_min_interleave (dbin);
1703 SELECTION_UNLOCK (dbin);
1709 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1711 if (posting_collection) {
1712 /* Figure out a selection for that collection */
1713 update_requested_selection (dbin);
1720 GST_DEBUG_OBJECT (bin, "dropping message");
1721 gst_message_unref (message);
1725 static DecodebinOutputStream *
1726 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1729 GstStreamType stype = gst_stream_get_stream_type (stream);
1731 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1732 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1733 if (output->type == stype && output->slot && output->slot->active_stream) {
1734 GstStream *tstream = output->slot->active_stream;
1735 if (!stream_in_list (dbin->requested_selection,
1736 (gchar *) gst_stream_get_stream_id (tstream))) {
1745 /* Give a certain slot, figure out if it should be linked to an
1747 * CALL WITH SELECTION LOCK TAKEN !*/
1748 static DecodebinOutputStream *
1749 get_output_for_slot (MultiQueueSlot * slot)
1751 GstDecodebin3 *dbin = slot->dbin;
1752 DecodebinOutputStream *output = NULL;
1753 const gchar *stream_id;
1755 gchar *id_in_list = NULL;
1757 /* If we already have a configured output, just use it */
1758 if (slot->output != NULL)
1759 return slot->output;
1764 * This method needs to be split into multiple parts
1766 * 1) Figure out whether stream should be exposed or not
1767 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1768 * in the default stream attribution
1770 * 2) Figure out whether an output stream should be created, whether
1771 * we can re-use the output stream already linked to the slot, or
1772 * whether we need to get re-assigned another (currently used) output
1776 stream_id = gst_stream_get_stream_id (slot->active_stream);
1777 caps = gst_stream_get_caps (slot->active_stream);
1778 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1779 gst_caps_unref (caps);
1781 /* 0. Emit autoplug-continue signal for pending caps ? */
1782 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1784 /* 1. if in EXPOSE_ALL_MODE, just accept */
1785 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1788 /* FIXME : The idea around this was to avoid activating a stream for
1789 * which we have no decoder. Unfortunately it is way too
1790 * expensive. Need to figure out a better solution */
1791 /* 2. Is there a potential decoder (if one is required) */
1792 if (!gst_caps_can_intersect (caps, dbin->caps)
1793 && !have_factory (dbin, (GstCaps *) caps,
1794 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1795 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1797 SELECTION_UNLOCK (dbin);
1798 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1799 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1800 SELECTION_LOCK (dbin);
1805 /* 3. In default mode check if we should expose */
1806 id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1807 if (id_in_list || dbin->upstream_selected) {
1808 /* Check if we can steal an existing output stream we could re-use.
1810 * * an output stream whose slot->stream is not in requested
1811 * * and is of the same type as this stream
1813 output = find_free_compatible_output (dbin, slot->active_stream);
1815 /* Move this output from its current slot to this slot */
1817 g_list_append (dbin->to_activate, (gchar *) stream_id);
1818 dbin->requested_selection =
1819 g_list_remove (dbin->requested_selection, id_in_list);
1820 g_free (id_in_list);
1821 SELECTION_UNLOCK (dbin);
1822 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1823 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1824 SELECTION_LOCK (dbin);
1828 output = create_output_stream (dbin, slot->type);
1829 output->slot = slot;
1830 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1831 slot->output = output;
1832 GST_DEBUG ("Adding '%s' to active_selection", stream_id);
1833 dbin->active_selection =
1834 g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
1836 GST_DEBUG ("Not creating any output for slot %p", slot);
1841 /* Returns SELECTED_STREAMS message if active_selection is equal to
1842 * requested_selection, else NULL.
1843 * Must be called with LOCK taken */
1845 is_selection_done (GstDecodebin3 * dbin)
1850 if (!dbin->selection_updated)
1853 GST_LOG_OBJECT (dbin, "Checking");
1855 if (dbin->to_activate != NULL) {
1856 GST_DEBUG ("Still have streams to activate");
1859 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1860 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1861 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1862 GST_DEBUG ("Not in active selection, returning");
1867 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1869 /* We are completely active */
1870 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1871 if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
1872 gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
1874 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1875 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1877 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1878 gst_stream_get_stream_id (output->slot->active_stream));
1880 gst_message_streams_selected_add (msg, output->slot->active_stream);
1882 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1884 dbin->selection_updated = FALSE;
1888 /* Must be called with SELECTION_LOCK taken */
1890 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
1892 gboolean all_drained = TRUE;
1895 GST_DEBUG_OBJECT (dbin, "check slot for eos");
1897 for (iter = dbin->slots; iter; iter = iter->next) {
1898 MultiQueueSlot *slot = iter->data;
1903 if (slot->is_drained) {
1904 GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1908 all_drained = FALSE;
1914 if (!pending_pads_are_eos (dbin->main_input))
1915 all_drained = FALSE;
1918 for (iter = dbin->other_inputs; iter; iter = iter->next) {
1919 if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1920 all_drained = FALSE;
1925 INPUT_UNLOCK (dbin);
1929 GST_DEBUG_OBJECT (dbin,
1930 "All active slots are drained, and no pending input, push EOS");
1932 for (iter = dbin->input_streams; iter; iter = iter->next) {
1933 DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1934 GstPad *peer = gst_pad_get_peer (input->srcpad);
1936 /* Send EOS to all slots */
1938 GstEvent *stream_start, *eos;
1941 gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1943 /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1946 GstEvent *custom_stream_start = gst_event_copy (stream_start);
1947 gst_event_unref (stream_start);
1948 s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1949 gst_structure_set (s, "decodebin3-flushing-stream-start",
1950 G_TYPE_BOOLEAN, TRUE, NULL);
1951 gst_pad_send_event (peer, custom_stream_start);
1954 eos = gst_event_new_eos ();
1955 gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
1956 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1957 CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1959 gst_pad_send_event (peer, eos);
1960 gst_object_unref (peer);
1962 GST_DEBUG_OBJECT (dbin, "no output");
1967 static GstPadProbeReturn
1968 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1969 MultiQueueSlot * slot)
1971 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1972 GstDecodebin3 *dbin = slot->dbin;
1974 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1975 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1977 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1978 switch (GST_EVENT_TYPE (ev)) {
1979 case GST_EVENT_STREAM_START:
1981 GstStream *stream = NULL;
1982 const GstStructure *s = gst_event_get_structure (ev);
1984 /* Drop STREAM_START events used to cleanup multiqueue */
1986 && gst_structure_has_field (s,
1987 "decodebin3-flushing-stream-start")) {
1988 ret = GST_PAD_PROBE_HANDLED;
1989 gst_event_unref (ev);
1993 gst_event_parse_stream (ev, &stream);
1994 if (stream == NULL) {
1995 GST_ERROR_OBJECT (pad,
1996 "Got a STREAM_START event without a GstStream");
1999 slot->is_drained = FALSE;
2000 GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
2001 gst_stream_get_stream_id (stream));
2002 if (slot->active_stream == NULL) {
2003 slot->active_stream = stream;
2004 } else if (slot->active_stream != stream) {
2005 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
2006 gst_stream_get_stream_id (slot->active_stream),
2007 gst_stream_get_stream_id (stream));
2008 gst_object_unref (slot->active_stream);
2009 slot->active_stream = stream;
2011 gst_object_unref (stream);
2012 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
2014 gboolean is_active, is_requested;
2015 /* Quick check to see if we're in the current selection */
2016 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2017 SELECTION_LOCK (dbin);
2018 GST_DEBUG_OBJECT (dbin, "Checking active selection");
2019 is_active = stream_in_list (dbin->active_selection, stream_id);
2020 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2021 is_requested = stream_in_list (dbin->requested_selection, stream_id);
2022 SELECTION_UNLOCK (dbin);
2024 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2027 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2029 else if (slot->output) {
2030 GST_DEBUG_OBJECT (pad,
2031 "Slot needs to be deactivated ? It's no longer in requested selection");
2032 } else if (!is_active)
2033 GST_DEBUG_OBJECT (pad,
2034 "Slot in neither active nor requested selection");
2039 case GST_EVENT_CAPS:
2041 /* Configure the output slot if needed */
2042 DecodebinOutputStream *output;
2043 GstMessage *msg = NULL;
2044 SELECTION_LOCK (dbin);
2045 output = get_output_for_slot (slot);
2047 reconfigure_output_stream (output, slot);
2048 msg = is_selection_done (dbin);
2050 SELECTION_UNLOCK (dbin);
2052 gst_element_post_message ((GstElement *) slot->dbin, msg);
2057 gboolean was_drained = slot->is_drained;
2058 slot->is_drained = TRUE;
2060 /* Custom EOS handling first */
2061 if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2062 CUSTOM_EOS_QUARK)) {
2063 /* remove custom-eos */
2064 ev = gst_event_make_writable (ev);
2065 GST_PAD_PROBE_INFO_DATA (info) = ev;
2066 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2067 CUSTOM_EOS_QUARK, NULL, NULL);
2069 GST_LOG_OBJECT (pad, "Received custom EOS");
2070 ret = GST_PAD_PROBE_HANDLED;
2071 SELECTION_LOCK (dbin);
2072 if (slot->input == NULL) {
2073 GST_DEBUG_OBJECT (pad,
2074 "Got custom-eos from null input stream, remove output stream");
2075 /* Remove the output */
2077 DecodebinOutputStream *output = slot->output;
2078 dbin->output_streams =
2079 g_list_remove (dbin->output_streams, output);
2080 free_output_stream (dbin, output);
2081 /* Reacalculate min interleave */
2082 gst_decodebin3_update_min_interleave (dbin);
2085 dbin->slots = g_list_remove (dbin->slots, slot);
2086 free_multiqueue_slot_async (dbin, slot);
2087 ret = GST_PAD_PROBE_REMOVE;
2088 } else if (!was_drained) {
2089 check_all_slot_for_eos (dbin, ev);
2091 if (ret == GST_PAD_PROBE_HANDLED)
2092 gst_event_unref (ev);
2093 SELECTION_UNLOCK (dbin);
2097 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2099 if (slot->input == NULL) {
2101 GST_DEBUG_OBJECT (pad,
2102 "last EOS for input, forwarding and removing slot");
2103 peer = gst_pad_get_peer (pad);
2105 gst_pad_send_event (peer, gst_event_ref (ev));
2106 gst_object_unref (peer);
2108 SELECTION_LOCK (dbin);
2109 /* FIXME : Shouldn't we try to re-assign the output instead of just
2111 /* Remove the output */
2113 DecodebinOutputStream *output = slot->output;
2114 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2115 free_output_stream (dbin, output);
2118 dbin->slots = g_list_remove (dbin->slots, slot);
2119 SELECTION_UNLOCK (dbin);
2121 /* FIXME: Removing the slot is async, which means actually
2122 * unlinking the pad is async. Other things like stream-start
2123 * might flow through this (now unprobed) link before it actually
2125 free_multiqueue_slot_async (dbin, slot);
2126 ret = GST_PAD_PROBE_REMOVE;
2127 } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2128 CUSTOM_FINAL_EOS_QUARK)) {
2129 GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2131 GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2132 /* drop current event as eos will be sent in check_all_slot_for_eos
2133 * when all output streams are also eos */
2134 ret = GST_PAD_PROBE_DROP;
2135 SELECTION_LOCK (dbin);
2136 check_all_slot_for_eos (dbin, ev);
2137 SELECTION_UNLOCK (dbin);
2144 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2145 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2146 switch (GST_QUERY_TYPE (query)) {
2147 case GST_QUERY_CAPS:
2149 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2150 gst_query_set_caps_result (query, GST_CAPS_ANY);
2151 ret = GST_PAD_PROBE_HANDLED;
2155 case GST_QUERY_ACCEPT_CAPS:
2157 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2158 /* If the current decoder doesn't accept caps, we'll reconfigure
2159 * on the actual caps event. So accept any caps. */
2160 gst_query_set_accept_caps_result (query, TRUE);
2161 ret = GST_PAD_PROBE_HANDLED;
2171 /* Create a new multiqueue slot for the given type
2173 * It is up to the caller to know whether that slot is needed or not
2174 * (and release it when no longer needed) */
2175 static MultiQueueSlot *
2176 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2178 MultiQueueSlot *slot;
2179 GstIterator *it = NULL;
2180 GValue item = { 0, };
2182 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2183 gst_stream_type_get_name (type));
2184 slot = g_new0 (MultiQueueSlot, 1);
2187 slot->id = dbin->slot_id++;
2190 slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2191 if (slot->sink_pad == NULL)
2194 it = gst_pad_iterate_internal_links (slot->sink_pad);
2195 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2196 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2197 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2198 GST_DEBUG_PAD_NAME (slot->src_pad));
2201 gst_iterator_free (it);
2202 g_value_reset (&item);
2204 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2206 /* Add event probe */
2208 gst_pad_add_probe (slot->src_pad,
2209 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2210 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2212 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2213 GST_DEBUG_PAD_NAME (slot->src_pad));
2215 dbin->slots = g_list_append (dbin->slots, slot);
2223 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2229 /* Must be called with SELECTION_LOCK */
2230 static MultiQueueSlot *
2231 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2234 MultiQueueSlot *empty_slot = NULL;
2235 GstStreamType input_type = 0;
2236 gchar *stream_id = NULL;
2238 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2239 input, input->active_stream,
2241 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2243 if (input->active_stream) {
2244 input_type = gst_stream_get_stream_type (input->active_stream);
2245 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2248 /* Go over existing slots and check if there is already one for it */
2249 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2250 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2251 /* Already used input, return that one */
2252 if (slot->input == input) {
2253 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2258 /* Go amongst all unused slots of the right type and try to find a candidate */
2259 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2260 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2261 if (slot->input == NULL && input_type == slot->type) {
2262 /* Remember this empty slot for later */
2264 /* Check if available slot is of the same stream_id */
2265 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2266 slot->id, slot->active_stream);
2267 if (stream_id && slot->active_stream) {
2269 (gchar *) gst_stream_get_stream_id (slot->active_stream);
2270 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2271 ostream_id, stream_id);
2272 if (!g_strcmp0 (stream_id, ostream_id))
2279 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2280 empty_slot->input = input;
2285 return create_new_slot (dbin, input_type);
2291 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2293 if (slot->input != NULL && slot->input != input) {
2294 GST_ERROR_OBJECT (slot->dbin,
2295 "Trying to link input to an already used slot");
2298 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2299 slot->pending_stream = input->active_stream;
2300 slot->input = input;
2305 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2306 GstElementFactoryListType ftype)
2308 gboolean ret = FALSE;
2311 g_mutex_lock (&dbin->factories_lock);
2312 gst_decode_bin_update_factories_list (dbin);
2313 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2315 gst_element_factory_list_filter (dbin->decoder_factories,
2316 caps, GST_PAD_SINK, TRUE);
2319 gst_element_factory_list_filter (dbin->decodable_factories,
2320 caps, GST_PAD_SINK, TRUE);
2321 g_mutex_unlock (&dbin->factories_lock);
2325 gst_plugin_feature_list_free (res);
2333 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2337 g_mutex_lock (&dbin->factories_lock);
2338 gst_decode_bin_update_factories_list (dbin);
2339 res = gst_element_factory_list_filter (dbin->decoder_factories,
2340 caps, GST_PAD_SINK, TRUE);
2341 g_mutex_unlock (&dbin->factories_lock);
2345 static GstPadProbeReturn
2346 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2347 DecodebinOutputStream * output)
2349 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2350 /* If we have a keyframe, remove the probe and let all data through */
2351 /* FIXME : HANDLE HEADER BUFFER ?? */
2352 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2353 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2354 GST_DEBUG_OBJECT (pad,
2355 "Buffer is keyframe or header, letting through and removing probe");
2356 output->drop_probe_id = 0;
2357 return GST_PAD_PROBE_REMOVE;
2359 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2360 return GST_PAD_PROBE_DROP;
2364 reconfigure_output_stream (DecodebinOutputStream * output,
2365 MultiQueueSlot * slot)
2367 GstDecodebin3 *dbin = output->dbin;
2368 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2369 gboolean needs_decoder;
2371 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2373 GST_DEBUG_OBJECT (dbin,
2374 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2377 /* FIXME : Maybe make the output un-hook itself automatically ? */
2378 if (output->slot != NULL && output->slot != slot) {
2379 GST_WARNING_OBJECT (dbin,
2380 "Output still linked to another slot (%p)", output->slot);
2381 gst_caps_unref (new_caps);
2385 /* Check if existing config is reusable as-is by checking if
2386 * the existing decoder accepts the new caps, if not delete
2387 * it and create a new one */
2388 if (output->decoder) {
2389 gboolean can_reuse_decoder;
2391 if (needs_decoder) {
2393 gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2395 can_reuse_decoder = FALSE;
2397 if (can_reuse_decoder) {
2398 if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2399 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2400 output->drop_probe_id =
2401 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2402 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2404 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2405 if (output->linked == FALSE) {
2406 gst_pad_link_full (slot->src_pad, output->decoder_sink,
2407 GST_PAD_LINK_CHECK_NOTHING);
2408 output->linked = TRUE;
2410 gst_caps_unref (new_caps);
2414 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2417 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2418 output->linked = FALSE;
2419 if (output->drop_probe_id) {
2420 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2421 output->drop_probe_id = 0;
2424 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2425 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2426 gst_caps_unref (new_caps);
2430 gst_element_set_locked_state (output->decoder, TRUE);
2431 gst_element_set_state (output->decoder, GST_STATE_NULL);
2433 gst_bin_remove ((GstBin *) dbin, output->decoder);
2434 output->decoder = NULL;
2435 output->decoder_latency = GST_CLOCK_TIME_NONE;
2436 } else if (output->linked) {
2437 /* Otherwise if we have no decoder yet but the output is linked make
2438 * sure that the ghost pad is really unlinked in case no decoder was
2439 * needed previously */
2440 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2441 GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2442 gst_caps_unref (new_caps);
2447 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2448 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2450 /* If a decoder is required, create one */
2451 if (needs_decoder) {
2452 GList *factories, *next_factory;
2454 factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2455 while (!output->decoder) {
2456 gboolean decoder_failed = FALSE;
2458 /* If we don't have a decoder yet, instantiate one */
2460 output->decoder = gst_element_factory_create ((GstElementFactory *)
2461 next_factory->data, NULL);
2462 GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2464 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2467 if (output->decoder == NULL) {
2470 SELECTION_UNLOCK (dbin);
2471 /* FIXME : Should we be smarter if there's a missing decoder ?
2472 * Should we deactivate that stream ? */
2473 caps = gst_stream_get_caps (slot->active_stream);
2474 gst_element_post_message (GST_ELEMENT_CAST (dbin),
2475 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2476 gst_caps_unref (caps);
2477 SELECTION_LOCK (dbin);
2480 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2481 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2484 output->decoder_sink =
2485 gst_element_get_static_pad (output->decoder, "sink");
2486 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2487 if (output->type & GST_STREAM_TYPE_VIDEO) {
2488 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2489 output->drop_probe_id =
2490 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2491 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2493 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2494 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2495 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2496 GST_DEBUG_PAD_NAME (output->decoder_sink));
2499 if (gst_element_set_state (output->decoder,
2500 GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2501 GST_DEBUG_OBJECT (dbin,
2502 "Decoder '%s' failed to reach READY state, trying the next type",
2503 GST_ELEMENT_NAME (output->decoder));
2504 decoder_failed = TRUE;
2506 if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2507 GST_DEBUG_OBJECT (dbin,
2508 "Decoder '%s' did not accept the caps, trying the next type",
2509 GST_ELEMENT_NAME (output->decoder));
2510 decoder_failed = TRUE;
2512 if (decoder_failed) {
2513 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2514 if (output->drop_probe_id) {
2515 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2516 output->drop_probe_id = 0;
2519 gst_element_set_locked_state (output->decoder, TRUE);
2520 gst_element_set_state (output->decoder, GST_STATE_NULL);
2522 gst_bin_remove ((GstBin *) dbin, output->decoder);
2523 output->decoder = NULL;
2525 next_factory = next_factory->next;
2527 gst_plugin_feature_list_free (factories);
2529 output->decoder_src = gst_object_ref (slot->src_pad);
2530 output->decoder_sink = NULL;
2532 gst_caps_unref (new_caps);
2534 output->linked = TRUE;
2535 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2536 output->decoder_src)) {
2537 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2540 if (output->src_exposed == FALSE) {
2541 GstEvent *stream_start;
2543 stream_start = gst_pad_get_sticky_event (slot->src_pad,
2544 GST_EVENT_STREAM_START, 0);
2546 /* Ensure GstStream is accesiable from pad-added callback */
2548 gst_pad_store_sticky_event (output->src_pad, stream_start);
2549 gst_event_unref (stream_start);
2551 GST_WARNING_OBJECT (slot->src_pad,
2552 "Pad has no stored stream-start event");
2555 output->src_exposed = TRUE;
2556 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2559 if (output->decoder)
2560 gst_element_sync_state_with_parent (output->decoder);
2562 output->slot = slot;
2567 GST_DEBUG_OBJECT (dbin, "Cleanup");
2568 if (output->decoder_sink) {
2569 gst_object_unref (output->decoder_sink);
2570 output->decoder_sink = NULL;
2572 if (output->decoder_src) {
2573 gst_object_unref (output->decoder_src);
2574 output->decoder_src = NULL;
2576 if (output->decoder) {
2577 gst_element_set_state (output->decoder, GST_STATE_NULL);
2578 gst_bin_remove ((GstBin *) dbin, output->decoder);
2579 output->decoder = NULL;
2584 static GstPadProbeReturn
2585 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2587 GstMessage *msg = NULL;
2588 DecodebinOutputStream *output;
2590 SELECTION_LOCK (slot->dbin);
2591 output = get_output_for_slot (slot);
2593 GST_DEBUG_OBJECT (pad, "output : %p", output);
2596 reconfigure_output_stream (output, slot);
2597 msg = is_selection_done (slot->dbin);
2599 SELECTION_UNLOCK (slot->dbin);
2601 gst_element_post_message ((GstElement *) slot->dbin, msg);
2603 return GST_PAD_PROBE_REMOVE;
2606 static MultiQueueSlot *
2607 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2611 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2612 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2613 const gchar *stream_id;
2614 if (slot->active_stream) {
2615 stream_id = gst_stream_get_stream_id (slot->active_stream);
2616 if (!g_strcmp0 (sid, stream_id))
2619 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2620 stream_id = gst_stream_get_stream_id (slot->pending_stream);
2621 if (!g_strcmp0 (sid, stream_id))
2629 /* This function handles the reassignment of a slot. Call this from
2630 * the streaming thread of a slot. */
2632 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2634 DecodebinOutputStream *output;
2635 MultiQueueSlot *target_slot = NULL;
2637 const gchar *sid, *tsid;
2639 SELECTION_LOCK (dbin);
2640 output = slot->output;
2642 if (G_UNLIKELY (slot->active_stream == NULL)) {
2643 GST_DEBUG_OBJECT (slot->src_pad,
2644 "Called on inactive slot (active_stream == NULL)");
2645 SELECTION_UNLOCK (dbin);
2649 if (G_UNLIKELY (output == NULL)) {
2650 GST_DEBUG_OBJECT (slot->src_pad,
2651 "Slot doesn't have any output to be removed");
2652 SELECTION_UNLOCK (dbin);
2656 sid = gst_stream_get_stream_id (slot->active_stream);
2657 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2659 /* Recheck whether this stream is still in the list of streams to deactivate */
2660 if (stream_in_list (dbin->requested_selection, sid)) {
2661 /* Stream is in the list of requested streams, don't remove */
2662 SELECTION_UNLOCK (dbin);
2663 GST_DEBUG_OBJECT (slot->src_pad,
2664 "Stream '%s' doesn't need to be deactivated", sid);
2668 /* Unlink slot from output */
2669 /* FIXME : Handle flushing ? */
2670 /* FIXME : Handle outputs without decoders */
2671 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2672 output->decoder_sink);
2673 if (output->decoder_sink)
2674 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2675 output->linked = FALSE;
2676 slot->output = NULL;
2677 output->slot = NULL;
2678 /* Remove sid from active selection */
2679 GST_DEBUG ("Removing '%s' from active_selection", sid);
2680 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2681 if (!g_strcmp0 (sid, tmp->data)) {
2683 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2687 /* Can we re-assign this output to a requested stream ? */
2688 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2689 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2690 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2691 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2692 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2693 if (tslot && tslot->type == output->type && tslot->output == NULL) {
2694 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2695 target_slot = tslot;
2697 /* Pass target stream id to requested selection */
2698 dbin->requested_selection =
2699 g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2700 dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
2706 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2708 target_slot->output = output;
2709 output->slot = target_slot;
2710 GST_DEBUG ("Adding '%s' to active_selection", tsid);
2711 dbin->active_selection =
2712 g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
2713 SELECTION_UNLOCK (dbin);
2715 /* Wakeup the target slot so that it retries to send events/buffers
2716 * thereby triggering the output reconfiguration codepath */
2717 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2718 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2719 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2723 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2724 free_output_stream (dbin, output);
2725 msg = is_selection_done (slot->dbin);
2726 SELECTION_UNLOCK (dbin);
2729 gst_element_post_message ((GstElement *) slot->dbin, msg);
2735 /* Idle probe called when a slot should be unassigned from its output stream.
2736 * This is needed to ensure nothing is flowing when unlinking the slot.
2738 * Also, this method will search for a pending stream which could re-use
2739 * the output stream. */
2740 static GstPadProbeReturn
2741 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2742 MultiQueueSlot * slot)
2744 GstDecodebin3 *dbin = slot->dbin;
2746 reassign_slot (dbin, slot);
2748 return GST_PAD_PROBE_REMOVE;
2752 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2755 gboolean ret = TRUE;
2757 /* List of slots to (de)activate. */
2758 GList *to_deactivate = NULL;
2759 GList *to_activate = NULL;
2760 /* List of unknown stream id, most likely means the event
2761 * should be sent upstream so that elements can expose the requested stream */
2762 GList *unknown = NULL;
2763 GList *to_reassign = NULL;
2764 GList *future_request_streams = NULL;
2765 GList *pending_streams = NULL;
2766 GList *slots_to_reassign = NULL;
2768 SELECTION_LOCK (dbin);
2769 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2770 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2771 SELECTION_UNLOCK (dbin);
2774 /* Remove pending select_streams */
2775 g_list_free (dbin->pending_select_streams);
2776 dbin->pending_select_streams = NULL;
2778 /* COMPARE the requested streams to the active and requested streams
2781 /* First check the slots to activate and which ones are unknown */
2782 for (tmp = select_streams; tmp; tmp = tmp->next) {
2783 const gchar *sid = (const gchar *) tmp->data;
2784 MultiQueueSlot *slot;
2785 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2786 slot = find_slot_for_stream_id (dbin, sid);
2787 /* Find the corresponding slot */
2789 if (stream_in_collection (dbin, (gchar *) sid)) {
2790 pending_streams = g_list_append (pending_streams, (gchar *) sid);
2792 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2793 unknown = g_list_append (unknown, (gchar *) sid);
2795 } else if (slot->output == NULL) {
2796 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2798 to_activate = g_list_append (to_activate, slot);
2800 GST_DEBUG_OBJECT (dbin,
2801 "Stream '%s' from slot %p is already active on output %p", sid, slot,
2803 future_request_streams =
2804 g_list_append (future_request_streams, (gchar *) sid);
2808 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2809 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2810 /* For slots that have an output, check if it's part of the streams to
2813 gboolean slot_to_deactivate = TRUE;
2815 if (slot->active_stream) {
2816 if (stream_in_list (select_streams,
2817 gst_stream_get_stream_id (slot->active_stream)))
2818 slot_to_deactivate = FALSE;
2820 if (slot_to_deactivate && slot->pending_stream
2821 && slot->pending_stream != slot->active_stream) {
2822 if (stream_in_list (select_streams,
2823 gst_stream_get_stream_id (slot->pending_stream)))
2824 slot_to_deactivate = FALSE;
2826 if (slot_to_deactivate) {
2827 GST_DEBUG_OBJECT (dbin,
2828 "Slot %p (%s) should be deactivated, no longer used", slot,
2830 active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2832 to_deactivate = g_list_append (to_deactivate, slot);
2837 if (to_deactivate != NULL) {
2838 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2839 /* We need to compare what needs to be activated and deactivated in order
2840 * to determine whether there are outputs that can be transferred */
2841 /* Take the stream-id of the slots that are to be activated, for which there
2842 * is a slot of the same type that needs to be deactivated */
2843 tmp = to_deactivate;
2845 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2846 gboolean removeit = FALSE;
2848 GST_DEBUG_OBJECT (dbin,
2849 "Checking if slot to deactivate (%p) has a candidate slot to activate",
2850 slot_to_deactivate);
2851 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2852 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2853 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2854 if (slot_to_activate->type == slot_to_deactivate->type) {
2855 GST_DEBUG_OBJECT (dbin, "Re-using");
2856 to_reassign = g_list_append (to_reassign, (gchar *)
2857 gst_stream_get_stream_id (slot_to_activate->active_stream));
2859 g_list_append (slots_to_reassign, slot_to_deactivate);
2860 to_activate = g_list_remove (to_activate, slot_to_activate);
2867 to_deactivate = g_list_delete_link (to_deactivate, tmp);
2872 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2873 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2874 GST_DEBUG_OBJECT (dbin,
2875 "Really need to deactivate slot %p, but no available alternative",
2878 slots_to_reassign = g_list_append (slots_to_reassign, slot);
2881 /* The only slots left to activate are the ones that won't be reassigned and
2882 * therefore really need to have a new output created */
2883 for (tmp = to_activate; tmp; tmp = tmp->next) {
2884 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2885 if (slot->active_stream)
2886 future_request_streams =
2887 g_list_append (future_request_streams,
2888 (gchar *) gst_stream_get_stream_id (slot->active_stream));
2889 else if (slot->pending_stream)
2890 future_request_streams =
2891 g_list_append (future_request_streams,
2892 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2894 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2897 if (to_activate == NULL && pending_streams != NULL) {
2898 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2899 if (dbin->requested_selection)
2900 g_list_free_full (dbin->requested_selection, g_free);
2901 dbin->requested_selection =
2902 g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2903 g_list_free (to_deactivate);
2904 g_list_free (pending_streams);
2905 to_deactivate = NULL;
2906 pending_streams = NULL;
2908 if (dbin->requested_selection)
2909 g_list_free_full (dbin->requested_selection, g_free);
2910 dbin->requested_selection =
2911 g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2912 dbin->requested_selection =
2913 g_list_concat (dbin->requested_selection,
2914 g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2915 if (dbin->to_activate)
2916 g_list_free (dbin->to_activate);
2917 dbin->to_activate = g_list_copy (to_reassign);
2920 dbin->selection_updated = TRUE;
2921 SELECTION_UNLOCK (dbin);
2924 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2925 g_list_free (unknown);
2928 if (to_activate && !slots_to_reassign) {
2929 for (tmp = to_activate; tmp; tmp = tmp->next) {
2930 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2931 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2932 (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2936 /* For all streams to deactivate, add an idle probe where we will do
2937 * the unassignment and switch over */
2938 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2939 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2940 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2941 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2945 g_list_free (to_deactivate);
2947 g_list_free (to_activate);
2949 g_list_free (to_reassign);
2950 if (future_request_streams)
2951 g_list_free (future_request_streams);
2952 if (pending_streams)
2953 g_list_free (pending_streams);
2954 if (slots_to_reassign)
2955 g_list_free (slots_to_reassign);
2960 static GstPadProbeReturn
2961 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2962 DecodebinOutputStream * output)
2964 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2965 GstDecodebin3 *dbin = output->dbin;
2966 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2968 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2970 switch (GST_EVENT_TYPE (event)) {
2971 case GST_EVENT_SELECT_STREAMS:
2974 GList *streams = NULL;
2975 guint32 seqnum = gst_event_get_seqnum (event);
2977 if (dbin->upstream_selected) {
2978 GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
2982 SELECTION_LOCK (dbin);
2983 if (seqnum == dbin->select_streams_seqnum) {
2984 SELECTION_UNLOCK (dbin);
2985 GST_DEBUG_OBJECT (pad,
2986 "Already handled/handling that SELECT_STREAMS event");
2987 gst_event_unref (event);
2988 ret = GST_PAD_PROBE_HANDLED;
2991 dbin->select_streams_seqnum = seqnum;
2992 if (dbin->pending_select_streams != NULL) {
2993 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2994 g_list_free (dbin->pending_select_streams);
2995 dbin->pending_select_streams = NULL;
2997 gst_event_parse_select_streams (event, &streams);
2998 dbin->pending_select_streams = g_list_copy (streams);
2999 SELECTION_UNLOCK (dbin);
3001 /* Send event upstream */
3002 if ((peer = gst_pad_get_peer (pad))) {
3003 gst_pad_send_event (peer, event);
3004 gst_object_unref (peer);
3006 gst_event_unref (event);
3008 /* Finally handle the switch */
3010 handle_stream_switch (dbin, streams, seqnum);
3011 g_list_free_full (streams, g_free);
3013 ret = GST_PAD_PROBE_HANDLED;
3024 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3026 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3028 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3029 if (!dbin->upstream_selected
3030 && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3031 GList *streams = NULL;
3032 guint32 seqnum = gst_event_get_seqnum (event);
3034 SELECTION_LOCK (dbin);
3035 if (seqnum == dbin->select_streams_seqnum) {
3036 SELECTION_UNLOCK (dbin);
3037 GST_DEBUG_OBJECT (dbin,
3038 "Already handled/handling that SELECT_STREAMS event");
3041 dbin->select_streams_seqnum = seqnum;
3042 if (dbin->pending_select_streams != NULL) {
3043 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3044 g_list_free (dbin->pending_select_streams);
3045 dbin->pending_select_streams = NULL;
3047 gst_event_parse_select_streams (event, &streams);
3048 dbin->pending_select_streams = g_list_copy (streams);
3049 SELECTION_UNLOCK (dbin);
3051 /* FIXME : We don't have an upstream ?? */
3053 /* Send event upstream */
3054 if ((peer = gst_pad_get_peer (pad))) {
3055 gst_pad_send_event (peer, event);
3056 gst_object_unref (peer);
3059 /* Finally handle the switch */
3061 handle_stream_switch (dbin, streams, seqnum);
3062 g_list_free_full (streams, g_free);
3065 gst_event_unref (event);
3068 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3073 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3076 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3078 if (slot->input->srcpad)
3079 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3082 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3083 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3084 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3085 gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3090 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3092 GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3093 gst_element_call_async (GST_ELEMENT_CAST (dbin),
3094 (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3097 /* Create a DecodebinOutputStream for a given type
3098 * Note: It will be empty initially, it needs to be configured
3100 static DecodebinOutputStream *
3101 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3103 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3105 const gchar *prefix;
3106 GstStaticPadTemplate *templ;
3107 GstPadTemplate *ptmpl;
3109 GstPad *internal_pad;
3111 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3112 res, gst_stream_type_get_name (type));
3116 res->decoder_latency = GST_CLOCK_TIME_NONE;
3118 if (type & GST_STREAM_TYPE_VIDEO) {
3119 templ = &video_src_template;
3120 counter = &dbin->vpadcount;
3122 } else if (type & GST_STREAM_TYPE_AUDIO) {
3123 templ = &audio_src_template;
3124 counter = &dbin->apadcount;
3126 } else if (type & GST_STREAM_TYPE_TEXT) {
3127 templ = &text_src_template;
3128 counter = &dbin->tpadcount;
3131 templ = &src_template;
3132 counter = &dbin->opadcount;
3136 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3138 ptmpl = gst_static_pad_template_get (templ);
3139 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3140 gst_object_unref (ptmpl);
3142 gst_pad_set_active (res->src_pad, TRUE);
3143 /* Put an event probe on the internal proxy pad to detect upstream
3146 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3147 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3148 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3149 gst_object_unref (internal_pad);
3151 dbin->output_streams = g_list_append (dbin->output_streams, res);
3157 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3160 if (output->decoder_sink && output->decoder)
3161 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3163 output->slot->output = NULL;
3164 output->slot = NULL;
3166 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3167 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3168 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3169 if (output->src_exposed) {
3170 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3172 if (output->decoder) {
3173 gst_element_set_locked_state (output->decoder, TRUE);
3174 gst_element_set_state (output->decoder, GST_STATE_NULL);
3175 gst_bin_remove ((GstBin *) dbin, output->decoder);
3180 static GstStateChangeReturn
3181 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3183 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3184 GstStateChangeReturn ret;
3187 switch (transition) {
3191 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3192 if (ret == GST_STATE_CHANGE_FAILURE)
3195 switch (transition) {
3196 case GST_STATE_CHANGE_PAUSED_TO_READY:
3200 /* Free output streams */
3201 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3202 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3203 free_output_stream (dbin, output);
3205 g_list_free (dbin->output_streams);
3206 dbin->output_streams = NULL;
3207 /* Free multiqueue slots */
3208 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3209 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3210 free_multiqueue_slot (dbin, slot);
3212 g_list_free (dbin->slots);
3214 dbin->current_group_id = GST_GROUP_ID_INVALID;
3216 /* Reset the main input group id since it will get a new id on a new stream */
3217 dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3218 /* Reset multiqueue to default interleave */
3219 g_object_set (dbin->multiqueue, "min-interleave-time",
3220 dbin->default_mq_min_interleave, NULL);
3221 dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3222 dbin->upstream_selected = FALSE;
3223 g_list_free_full (dbin->active_selection, g_free);
3224 dbin->active_selection = NULL;